Skip to content

RecursiveLoader

laktory.yaml.RecursiveLoader(stream, parent_loader=None, vars=None) ¤

Bases: SafeLoader

METHOD DESCRIPTION
append_constructor

Append content of another YAML file to the current list.

custom_mapping_constructor

Custom handling for mappings to support !merge.

custom_sequence_constructor

Custom handling for sequences to support !append.

inject_constructor

Inject content of another YAML file.

load

Load yaml file with support for reference to external yaml and sql files using

merge_constructor

Merge content of another YAML file into the current dictionary.

preprocess_stream

Reformat content to be YAML safe

Source code in laktory/yaml/recursiveloader.py
14
15
16
17
18
19
20
21
22
23
24
25
26
def __init__(self, stream, parent_loader=None, vars=None):
    self.dirpath = Path("./")
    self._loading_paths: set[str] = (
        set(parent_loader._loading_paths) if parent_loader else set()
    )
    stream = self.preprocess_stream(stream)

    self.variables = []
    if vars:
        self.variables += [vars]
    if parent_loader:
        self.variables += parent_loader.variables
    super().__init__(stream)

append_constructor(loader, node) staticmethod ¤

Append content of another YAML file to the current list.

Source code in laktory/yaml/recursiveloader.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
@staticmethod
def append_constructor(loader, node):
    """Append content of another YAML file to the current list."""

    filepath = loader.get_path(loader, node)
    abs_path = str(Path(filepath).resolve())
    if abs_path in loader._loading_paths:
        raise ValueError(
            f"Circular !extend reference: '{filepath}' is already being loaded"
        )
    try:
        with open(filepath, "r") as f:
            append_data = RecursiveLoader.load(f, parent_loader=loader)
    except FileNotFoundError:
        raise FileNotFoundError(
            f"!extend target not found: '{filepath}' (referenced from '{loader.dirpath}')"
        )
    if not isinstance(append_data, list):
        raise TypeError(
            f"Expected a list in {filepath}, but got {type(append_data).__name__}"
        )
    return append_data

custom_mapping_constructor(loader, node) staticmethod ¤

Custom handling for mappings to support !merge.

Source code in laktory/yaml/recursiveloader.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
@staticmethod
def custom_mapping_constructor(loader, node):
    """Custom handling for mappings to support !merge."""

    # Unquoted {nodes.X} / {sources.X} in YAML are flow mappings that
    # PyYAML parses as {"nodes.X": None}. Reconstruct the intended string
    # reference before any further processing. Checking node.flow_style
    # ensures block-style mappings (and multi-line string content) are
    # never affected.
    if node.flow_style and len(node.value) == 1:
        key_node, value_node = node.value[0]
        key = loader.construct_object(key_node)
        value = loader.construct_object(value_node)
        if (
            value is None
            and isinstance(key, str)
            and (key.startswith("nodes.") or key.startswith("sources."))
        ):
            return "{" + key + "}"

    # read variables
    _vars = {}
    for key_node, value_node in node.value:
        key = loader.construct_object(key_node)
        if key == VARIABLES_KEY:
            _vars = loader.construct_object(value_node)
    loader.variables += [_vars]

    # read include and merge
    try:
        mapping = {}
        for key_node, value_node in node.value:
            key = loader.construct_object(key_node)
            value = loader.construct_object(value_node)
            if key == MERGE_KEY and isinstance(value, dict):
                # Merge the dictionary directly into the parent mapping
                mapping.update(value)
            else:
                mapping[key] = value
    finally:
        # Always restore the variables stack, even if construction fails
        del loader.variables[-1]

    return mapping

custom_sequence_constructor(loader, node) staticmethod ¤

Custom handling for sequences to support !append.

Source code in laktory/yaml/recursiveloader.py
269
270
271
272
273
274
275
276
277
278
279
280
@staticmethod
def custom_sequence_constructor(loader, node):
    """Custom handling for sequences to support !append."""

    seq = []
    for child in node.value:
        if child.tag == "!extend":
            append_data = loader.construct_object(child)
            seq.extend(append_data)  # Flatten the appended list
        else:
            seq.append(loader.construct_object(child))
    return seq

inject_constructor(loader, node) staticmethod ¤

Inject content of another YAML file.

Source code in laktory/yaml/recursiveloader.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@staticmethod
def inject_constructor(loader, node):
    """Inject content of another YAML file."""

    filepath = Path(loader.get_path(loader, node))

    if filepath.as_posix().endswith(".sql"):
        try:
            with filepath.open("r", encoding="utf-8") as _fp:
                data = _fp.read()
        except FileNotFoundError:
            raise FileNotFoundError(
                f"!use target not found: '{filepath}' (referenced from '{loader.dirpath}')"
            )
        return data

    if filepath.is_dir():
        objs = []
        for _filepath in sorted(
            set(filepath.rglob("*.yaml")) | set(filepath.rglob("*.yml"))
        ):
            abs_path = str(_filepath.resolve())
            if abs_path in loader._loading_paths:
                raise ValueError(
                    f"Circular !use reference: '{_filepath}' is already being loaded"
                )
            with _filepath.open("r") as f:
                objs += [RecursiveLoader.load(f, parent_loader=loader)]
        return objs

    else:
        abs_path = str(filepath.resolve())
        if abs_path in loader._loading_paths:
            raise ValueError(
                f"Circular !use reference: '{filepath}' is already being loaded"
            )
        try:
            with filepath.open("r") as f:
                return RecursiveLoader.load(f, parent_loader=loader)
        except FileNotFoundError:
            raise FileNotFoundError(
                f"!use target not found: '{filepath}' (referenced from '{loader.dirpath}')"
            )

load(stream, parent_loader=None, vars=None) classmethod ¤

Load yaml file with support for reference to external yaml and sql files using !use, !extend and !update tags. Path to external files can be defined using model or environment variables.

Custom Tags

!use {filepath}: Directly inject the content of the file at filepath. A directory can also be provided. In this case, each yaml file found in the directory will be loaded as an element of a list.

  • !extend {filepath}: Extend the current list with the elements found in the file at filepath. Similar to python list.extend method.

<<: !update {filepath}: Merge the current dictionary with the content of the dictionary defined at filepath. Similar to python dict.update method.

PARAMETER DESCRIPTION
stream

file object structured as a yaml file

parent_loader

Parent loader if file loader from another loader.

TYPE: RecursiveLoader DEFAULT: None

vars

Dict of variables available when parsing filepaths references in yaml files i.e. !use catalog_${vars.env}.yaml

DEFAULT: None

RETURNS DESCRIPTION

Dict or list

Examples:

businesses:
  apple:
    symbol: aapl
    address: !use addresses.yaml
    <<: !update common.yaml
    emails:
      - jane.doe@apple.com
      - extend! emails.yaml
  amazon:
    symbol: amzn
    address: !use addresses.yaml
    <<: update! common.yaml
    emails:
      - john.doe@amazon.com
      - extend! emails.yaml
Source code in laktory/yaml/recursiveloader.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
@classmethod
def load(cls, stream, parent_loader: "RecursiveLoader" = None, vars=None):
    """
    Load yaml file with support for reference to external yaml and sql files using
    `!use`, `!extend` and `!update` tags.
    Path to external files can be defined using model or environment variables.

    Custom Tags
    -----------
    !use {filepath}:
        Directly inject the content of the file at `filepath`. A directory can also
        be provided. In this case, each yaml file found in the directory will be
        loaded as an element of a list.

    - !extend {filepath}:
        Extend the current list with the elements found in the file at `filepath`.
        Similar to python list.extend method.

    <<: !update {filepath}:
        Merge the current dictionary with the content of the dictionary defined at
        `filepath`. Similar to python dict.update method.

    Parameters
    ----------
    stream:
        file object structured as a yaml file
    parent_loader:
        Parent loader if file loader from another loader.
    vars:
        Dict of variables available when parsing filepaths references in yaml files
        i.e. `!use catalog_${vars.env}.yaml`

    Returns
    -------
    :
        Dict or list

    Examples
    --------
    ```yaml
    businesses:
      apple:
        symbol: aapl
        address: !use addresses.yaml
        <<: !update common.yaml
        emails:
          - jane.doe@apple.com
          - extend! emails.yaml
      amazon:
        symbol: amzn
        address: !use addresses.yaml
        <<: update! common.yaml
        emails:
          - john.doe@amazon.com
          - extend! emails.yaml
    ```
    """
    loader = cls(stream, parent_loader, vars=vars)
    try:
        return loader.get_single_data()
    finally:
        loader.dispose()

merge_constructor(loader, node) staticmethod ¤

Merge content of another YAML file into the current dictionary.

Source code in laktory/yaml/recursiveloader.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
@staticmethod
def merge_constructor(loader, node):
    """Merge content of another YAML file into the current dictionary."""

    filepath = loader.get_path(loader, node)
    abs_path = str(Path(filepath).resolve())
    if abs_path in loader._loading_paths:
        raise ValueError(
            f"Circular !update reference: '{filepath}' is already being loaded"
        )
    try:
        with open(filepath, "r") as f:
            merge_data = RecursiveLoader.load(f, parent_loader=loader)
    except FileNotFoundError:
        raise FileNotFoundError(
            f"!update target not found: '{filepath}' (referenced from '{loader.dirpath}')"
        )

    if not isinstance(merge_data, dict):
        raise TypeError(
            f"Expected a dictionary in {filepath}, but got {type(merge_data).__name__}"
        )
    return merge_data

preprocess_stream(stream) ¤

Reformat content to be YAML safe

Source code in laktory/yaml/recursiveloader.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def preprocess_stream(self, stream):
    """Reformat content to be YAML safe"""

    if hasattr(stream, "name"):
        self.dirpath = Path(stream.name).parent
        self._loading_paths.add(str(Path(stream.name).resolve()))

    _lines = []
    for line in stream.readlines():
        if "${include." in line:
            raise ValueError(
                "The `${include.}` syntax has been deprecated in laktory 0.6.0. Please use `!use`, `!update` and `!extend` tags instead."
            )
        # Only replace <<: at the start of line content (after optional
        # whitespace) - the only valid YAML position for a merge key.
        _lines += [re.sub(r"^(\s*)<<:", r"\g<1>" + MERGE_KEY + ":", line)]

    return "\n".join(_lines)