Skip to content

Pipeline

laktory.models.Pipeline ¤

Bases: BaseModel, PulumiResource, TerraformResource, PipelineChild

Pipeline model to manage a data pipeline including reading from data sources, applying data transformations and outputting to data sinks.

A pipeline is composed of collections of nodes, each one defining its own source, transformations and optional sink. A node may be the source of another node.

A pipeline may be run manually by using python or the CLI, but it may also be deployed and scheduled using one of the supported orchestrators, such as a Databricks job or Lakeflow Declarative Pipeline.

The DataFrame backend used to run the pipeline can be configured at the pipeline level or at the nodes level.

PARAMETER DESCRIPTION
dataframe_backend

Type of DataFrame backend

TYPE: DataFrameBackends DEFAULT: None

dataframe_api

DataFrame API to use in DataFrame Transformer nodes. Either 'NATIVE' (backend-specific) or 'NARWHALS' (backend-agnostic).

TYPE: Literal[NARWHALS, NATIVE] | VariableType DEFAULT: None

resource_name_

Name of the resource in the context of infrastructure as code. If None, default_resource_name will be used instead.

TYPE: str | VariableType DEFAULT: None

options

Resources options specifications

TYPE: ResourceOptions | VariableType DEFAULT: ResourceOptions(variables={}, is_enabled=True, depends_on=[], provider=None, ignore_changes=None, aliases=None, delete_before_replace=True, import_=None, parent=None, replace_on_changes=None, moved_from=None)

lookup_existing

Lookup resource instead of creating a new one.

TYPE: ResourceLookup | VariableType DEFAULT: None

variables

Dict of variables to be injected in the model at runtime

TYPE: dict[str, Any] DEFAULT: {}

dependencies

List of dependencies required to run the pipeline. If Laktory is not provided, it's current version is added to the list.

TYPE: list[Union[str, VariableType]] | VariableType DEFAULT: []

imports

List of modules to import before execution. Generally used to load Narwhals extensions. Packages listed in dependencies are automatically included in the list of imports.

TYPE: list[Union[str, VariableType]] | VariableType DEFAULT: []

name

Name of the pipeline

TYPE: str | VariableType

nodes

List of pipeline nodes. Each node defines a data source, a series of transformations and optionally a sink.

TYPE: list[Union[PipelineNode, VariableType]] | VariableType DEFAULT: []

orchestrator

Orchestrator used for scheduling and executing the pipeline. The selected option defines which resources are to be deployed. Supported options are instances of classes:

  • DatabricksJobOrchestrator: When deployed through a Databricks Job, a task is created for each pipeline node and all the required dependencies are set automatically. If a given task (or pipeline node) uses a PipelineNodeDataSource as the source, the data will be read from the upstream node sink.
  • DatabricksPipelineOrchestrator: When orchestrated through Databricks DLT, each pipeline node creates a DLT table (or view, if no sink is defined). Behind the scenes, PipelineNodeDataSource leverages native dlt read and read_stream functions to defined the interdependencies between the tables as in a standard DLT pipeline.

TYPE: DatabricksJobOrchestrator | DatabricksPipelineOrchestrator | VariableType DEFAULT: None

root_path

Location of the pipeline node root used to store logs, metrics and checkpoints.

TYPE: str | VariableType DEFAULT: None

Examples:

This first example shows how to configure a simple pipeline with 2 nodes. Upon execution, raw data will be read from a JSON files and two DataFrames (bronze and silver) will be created and saved as parquet files. Notice how the first node is used as a data source for the second node. Polars is used as the DataFrame backend.

import io

import laktory as lk

pipeline_yaml = '''
    name: pl-stock-prices
    dataframe_backend: POLARS

    nodes:
    - name: brz_stock_prices
      source:
        path: ./data/stock_prices/
        format: JSONL
      sinks:
      - path: ./data/brz_stock_prices.parquet
        format: PARQUET

    - name: slv_stock_prices
      source:
        node_name: brz_stock_prices
        as_stream: false
      sinks:
      - path: ./data/slv_stock_prices.parquet
        format: PARQUET
      transformer:
        nodes:
        - expr: |
            SELECT
              CAST(data.created_at AS TIMESTAMP) AS created_at,
              data.symbol AS name,
              data.symbol AS symbol,
              data.open AS open,
              data.close AS close,
              data.high AS high,
              data.low AS low,
              data.volume AS volume
            FROM
              {df}
        - func_name: unique
          func_kwargs:
            subset:
              - symbol
              - created_at
            keep:
              any
'''

pl = lk.models.Pipeline.model_validate_yaml(io.StringIO(pipeline_yaml))

# Execute pipeline
# pl.execute()

The next example also defines a 2 nodes pipeline, but uses PySpark as the DataFrame backend. It defines the configuration required to deploy it as a Databricks job. In this case, the sinks are writing to unity catalog tables.

import io

import laktory as lk

pipeline_yaml = '''
    name: pl-stocks-job
    dataframe_backend: PYSPARK
    orchestrator:
      type: DATABRICKS_JOB

    nodes:

    - name: brz_stock_prices
      source:
        path: dbfs:/laktory/data/stock_prices/
        as_stream: false
        format: JSONL
      sinks:
      - table_name: brz_stock_prices_job
        mode: OVERWRITE

    - name: slv_stock_prices
      expectations:
      - name: positive_price
        expr: open > 0
        action: DROP
      source:
        node_name: brz_stock_prices
        as_stream: false
      sinks:
      - table_name: slv_stock_prices_job
        mode: OVERWRITE

      transformer:
        nodes:
        - expr: |
            SELECT
                cast(data.created_at AS TIMESTAMP) AS created_at,
                data.symbol AS symbol,
                data.open AS open,
                data.close AS close
            FROM
                {df}
        - func_name: drop_duplicates
          func_kwargs:
            subset: ["created_at", "symbol"]
          dataframe_api: NATIVE
'''
pl = lk.models.Pipeline.model_validate_yaml(io.StringIO(pipeline_yaml))
References
METHOD DESCRIPTION
dag_figure

[UNDER DEVELOPMENT] Generate a figure representation of the pipeline

execute

Execute the pipeline (read sources and write sinks) by sequentially

inject_vars

Inject model variables values into a model attributes.

inject_vars_into_dump

Inject model variables values into a model dump.

model_validate_json_file

Load model from json file object

model_validate_yaml

Load model from yaml file object using laktory.yaml.RecursiveLoader. Supports

push_vars

Push variable values to all child recursively

validate_assignment_disabled

Updating a model attribute inside a model validator when validate_assignment

ATTRIBUTE DESCRIPTION
additional_core_resources

if orchestrator is DLT:

TYPE: list[PulumiResource]

core_resources

List of core resources to be deployed with this laktory model:

dag

Networkx Directed Acyclic Graph representation of the pipeline. Useful

TYPE: DiGraph

default_resource_name

Resource default name constructed as

TYPE: str

is_orchestrator_dlt

If True, pipeline orchestrator is DLT

TYPE: bool

nodes_dict

Nodes dictionary whose keys are the node names.

TYPE: dict[str, PipelineNode]

pulumi_excludes

List of fields to exclude when dumping model to pulumi

TYPE: Union[list[str], dict[str, bool]]

pulumi_properties

Resources properties formatted for pulumi:

TYPE: dict

pulumi_renames

Map of fields to rename when dumping model to pulumi

TYPE: dict[str, str]

resource_key

Resource key used to build default resource name. Equivalent to

TYPE: str

resource_type_id

pl

TYPE: str

sorted_nodes

Topologically sorted nodes.

TYPE: list[PipelineNode]

terraform_excludes

List of fields to exclude when dumping model to terraform

TYPE: Union[list[str], dict[str, bool]]

terraform_properties

Resources properties formatted for terraform:

TYPE: dict

terraform_renames

Map of fields to rename when dumping model to terraform

TYPE: dict[str, str]

additional_core_resources property ¤

if orchestrator is DLT:

  • DLT Pipeline

if orchestrator is DATABRICKS_JOB:

  • Databricks Job

core_resources property ¤

List of core resources to be deployed with this laktory model: - class instance (self)

dag property ¤

Networkx Directed Acyclic Graph representation of the pipeline. Useful to identify interdependencies between nodes.

RETURNS DESCRIPTION
DiGraph

Directed Acyclic Graph

default_resource_name property ¤

Resource default name constructed as - {self.resource_type_id}-{self.resource_key} - removing ${resources....} tags - removing ${vars....} tags - Replacing special characters with - to avoid conflicts with resource properties

is_orchestrator_dlt property ¤

If True, pipeline orchestrator is DLT

nodes_dict property ¤

Nodes dictionary whose keys are the node names.

RETURNS DESCRIPTION
dict[str, PipelineNode]

Nodes

pulumi_excludes property ¤

List of fields to exclude when dumping model to pulumi

pulumi_properties property ¤

Resources properties formatted for pulumi:

  • Serialization (model dump)
  • Removal of excludes defined in self.pulumi_excludes
  • Renaming of keys according to self.pulumi_renames
  • Injection of variables
RETURNS DESCRIPTION
dict

Pulumi-safe model dump

pulumi_renames property ¤

Map of fields to rename when dumping model to pulumi

resource_key property ¤

Resource key used to build default resource name. Equivalent to name properties if available. Otherwise, empty string.

resource_type_id property ¤

pl

sorted_nodes property ¤

Topologically sorted nodes.

RETURNS DESCRIPTION
list[PipelineNode]

List of Topologically sorted nodes.

terraform_excludes property ¤

List of fields to exclude when dumping model to terraform

terraform_properties property ¤

Resources properties formatted for terraform:

  • Serialization (model dump)
  • Removal of excludes defined in self.terraform_excludes
  • Renaming of keys according to self.terraform_renames
  • Injection of variables
RETURNS DESCRIPTION
dict

Terraform-safe model dump

terraform_renames property ¤

Map of fields to rename when dumping model to terraform

dag_figure() ¤

[UNDER DEVELOPMENT] Generate a figure representation of the pipeline DAG.

RETURNS DESCRIPTION
Figure

Plotly figure representation of the pipeline.

Source code in laktory/models/pipeline/pipeline.py
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
def dag_figure(self) -> "Figure":
    """
    [UNDER DEVELOPMENT] Generate a figure representation of the pipeline
    DAG.

    Returns
    -------
    :
        Plotly figure representation of the pipeline.
    """
    import networkx as nx
    import plotly.graph_objs as go

    dag = self.dag
    pos = nx.spring_layout(dag)

    # ------------------------------------------------------------------- #
    # Edges                                                               #
    # ------------------------------------------------------------------- #

    edge_traces = []
    for e in dag.edges:
        edge_traces += [
            go.Scatter(
                x=[pos[e[0]][0], pos[e[1]][0]],
                y=[pos[e[0]][1], pos[e[1]][1]],
                line={
                    "color": "#a006b1",
                },
                marker=dict(
                    symbol="arrow-bar-up",
                    angleref="previous",
                    size=30,
                ),
                mode="lines+markers",
                hoverinfo="none",
                showlegend=False,
            )
        ]

    # ------------------------------------------------------------------- #
    # Nodes                                                               #
    # ------------------------------------------------------------------- #

    nodes_trace = go.Scatter(
        x=[_p[0] for _p in pos.values()],
        y=[_p[1] for _p in pos.values()],
        text=list(dag.nodes),
        name="pl nodes",
        mode="markers+text",
        marker=dict(
            size=50,
            color="#06d49e",
            line=dict(
                width=2,
                color="#dff2ed",
            ),
        ),
        textfont=dict(
            color="#a006b1",
        ),
    )

    return go.Figure(data=[nodes_trace] + edge_traces)

execute(write_sinks=True, full_refresh=False, named_dfs=None) ¤

Execute the pipeline (read sources and write sinks) by sequentially executing each node. The selected orchestrator might impact how data sources or sinks are processed.

PARAMETER DESCRIPTION
write_sinks

If False writing of node sinks will be skipped

DEFAULT: True

full_refresh

If True all nodes will be completely re-processed by deleting existing data and checkpoints before processing.

TYPE: bool DEFAULT: False

named_dfs

Named DataFrames to be passed to pipeline nodes transformer.

TYPE: dict[str, AnyFrame] DEFAULT: None

Source code in laktory/models/pipeline/pipeline.py
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
def execute(
    self,
    write_sinks=True,
    full_refresh: bool = False,
    named_dfs: dict[str, AnyFrame] = None,
) -> None:
    """
    Execute the pipeline (read sources and write sinks) by sequentially
    executing each node. The selected orchestrator might impact how
    data sources or sinks are processed.

    Parameters
    ----------
    write_sinks:
        If `False` writing of node sinks will be skipped
    full_refresh:
        If `True` all nodes will be completely re-processed by deleting
        existing data and checkpoints before processing.
    named_dfs:
        Named DataFrames to be passed to pipeline nodes transformer.
    """
    logger.info("Executing Pipeline")

    for inode, node in enumerate(self.sorted_nodes):
        if named_dfs is None:
            named_dfs = {}

        node.execute(
            write_sinks=write_sinks,
            full_refresh=full_refresh,
            named_dfs=named_dfs,
        )

inject_vars(inplace=False, vars=None) ¤

Inject model variables values into a model attributes.

PARAMETER DESCRIPTION
inplace

If True model is modified in place. Otherwise, a new model instance is returned.

TYPE: bool DEFAULT: False

vars

A dictionary of variables to be injected in addition to the model internal variables.

TYPE: dict DEFAULT: None

RETURNS DESCRIPTION

Model instance.

Examples:

from typing import Union

from laktory import models


class Cluster(models.BaseModel):
    name: str = None
    size: Union[int, str] = None


c = Cluster(
    name="cluster-${vars.my_cluster}",
    size="${{ 4 if vars.env == 'prod' else 2 }}",
    variables={
        "env": "dev",
    },
).inject_vars()
print(c)
# > variables={'env': 'dev'} name='cluster-${vars.my_cluster}' size=2
References
Source code in laktory/models/basemodel.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
def inject_vars(self, inplace: bool = False, vars: dict = None):
    """
    Inject model variables values into a model attributes.

    Parameters
    ----------
    inplace:
        If `True` model is modified in place. Otherwise, a new model
        instance is returned.
    vars:
        A dictionary of variables to be injected in addition to the
        model internal variables.


    Returns
    -------
    :
        Model instance.

    Examples
    --------
    ```py
    from typing import Union

    from laktory import models


    class Cluster(models.BaseModel):
        name: str = None
        size: Union[int, str] = None


    c = Cluster(
        name="cluster-${vars.my_cluster}",
        size="${{ 4 if vars.env == 'prod' else 2 }}",
        variables={
            "env": "dev",
        },
    ).inject_vars()
    print(c)
    # > variables={'env': 'dev'} name='cluster-${vars.my_cluster}' size=2
    ```

    References
    ----------
    * [variables](https://www.laktory.ai/concepts/variables/)
    """

    # Fetching vars
    if vars is None:
        vars = {}
    vars = deepcopy(vars)
    vars.update(self.variables)

    # Create copy
    if not inplace:
        self = self.model_copy(deep=True)

    # Inject into field values
    for k in list(self.model_fields_set):
        if k == "variables":
            continue
        o = getattr(self, k)

        if isinstance(o, BaseModel) or isinstance(o, dict) or isinstance(o, list):
            # Mutable objects will be updated in place
            _resolve_values(o, vars)
        else:
            # Simple objects must be updated explicitly
            setattr(self, k, _resolve_value(o, vars))

    # Inject into child resources
    if hasattr(self, "core_resources"):
        for r in self.core_resources:
            if r == self:
                continue
            r.inject_vars(vars=vars, inplace=True)

    if not inplace:
        return self

inject_vars_into_dump(dump, inplace=False, vars=None) ¤

Inject model variables values into a model dump.

PARAMETER DESCRIPTION
dump

Model dump (or any other general purpose mutable object)

TYPE: dict[str, Any]

inplace

If True model is modified in place. Otherwise, a new model instance is returned.

TYPE: bool DEFAULT: False

vars

A dictionary of variables to be injected in addition to the model internal variables.

TYPE: dict[str, Any] DEFAULT: None

RETURNS DESCRIPTION

Model dump with injected variables.

Examples:

from laktory import models

m = models.BaseModel(
    variables={
        "env": "dev",
    },
)
data = {
    "name": "cluster-${vars.my_cluster}",
    "size": "${{ 4 if vars.env == 'prod' else 2 }}",
}
print(m.inject_vars_into_dump(data))
# > {'name': 'cluster-${vars.my_cluster}', 'size': 2}
References
Source code in laktory/models/basemodel.py
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
def inject_vars_into_dump(
    self, dump: dict[str, Any], inplace: bool = False, vars: dict[str, Any] = None
):
    """
    Inject model variables values into a model dump.

    Parameters
    ----------
    dump:
        Model dump (or any other general purpose mutable object)
    inplace:
        If `True` model is modified in place. Otherwise, a new model
        instance is returned.
    vars:
        A dictionary of variables to be injected in addition to the
        model internal variables.


    Returns
    -------
    :
        Model dump with injected variables.


    Examples
    --------
    ```py
    from laktory import models

    m = models.BaseModel(
        variables={
            "env": "dev",
        },
    )
    data = {
        "name": "cluster-${vars.my_cluster}",
        "size": "${{ 4 if vars.env == 'prod' else 2 }}",
    }
    print(m.inject_vars_into_dump(data))
    # > {'name': 'cluster-${vars.my_cluster}', 'size': 2}
    ```

    References
    ----------
    * [variables](https://www.laktory.ai/concepts/variables/)
    """

    # Setting vars
    if vars is None:
        vars = {}
    vars = deepcopy(vars)
    vars.update(self.variables)

    # Create copy
    if not inplace:
        dump = copy.deepcopy(dump)

    # Inject into field values
    _resolve_values(dump, vars)

    if not inplace:
        return dump

model_validate_json_file(fp) classmethod ¤

Load model from json file object

PARAMETER DESCRIPTION
fp

file object structured as a json file

TYPE: TextIO

RETURNS DESCRIPTION
Model

Model instance

Source code in laktory/models/basemodel.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
@classmethod
def model_validate_json_file(cls: Type[Model], fp: TextIO) -> Model:
    """
    Load model from json file object

    Parameters
    ----------
    fp:
        file object structured as a json file

    Returns
    -------
    :
        Model instance
    """
    data = json.load(fp)
    return cls.model_validate(data)

model_validate_yaml(fp) classmethod ¤

Load model from yaml file object using laktory.yaml.RecursiveLoader. Supports reference to external yaml and sql files using !use, !extend and !update tags. Path to external files can be defined using model or environment variables.

Referenced path should always be relative to the file they are referenced from.

Custom Tags
  • !use {filepath}: Directly inject the content of the file at filepath

  • - !extend {filepath}: Extend the current list with the elements found in the file at filepath. Similar to python list.extend method.

  • <<: !update {filepath}: Merge the current dictionary with the content of the dictionary defined at filepath. Similar to python dict.update method.

PARAMETER DESCRIPTION
fp

file object structured as a yaml file

TYPE: TextIO

RETURNS DESCRIPTION
Model

Model instance

Examples:

businesses:
  apple:
    symbol: aapl
    address: !use addresses.yaml
    <<: !update common.yaml
    emails:
      - jane.doe@apple.com
      - extend! emails.yaml
  amazon:
    symbol: amzn
    address: !use addresses.yaml
    <<: update! common.yaml
    emails:
      - john.doe@amazon.com
      - extend! emails.yaml
Source code in laktory/models/basemodel.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
@classmethod
def model_validate_yaml(cls: Type[Model], fp: TextIO) -> Model:
    """
    Load model from yaml file object using laktory.yaml.RecursiveLoader. Supports
    reference to external yaml and sql files using `!use`, `!extend` and `!update` tags.
    Path to external files can be defined using model or environment variables.

    Referenced path should always be relative to the file they are referenced from.

    Custom Tags
    -----------
    - `!use {filepath}`:
        Directly inject the content of the file at `filepath`

    - `- !extend {filepath}`:
        Extend the current list with the elements found in the file at `filepath`.
        Similar to python list.extend method.

    - `<<: !update {filepath}`:
        Merge the current dictionary with the content of the dictionary defined at
        `filepath`. Similar to python dict.update method.

    Parameters
    ----------
    fp:
        file object structured as a yaml file

    Returns
    -------
    :
        Model instance

    Examples
    --------
    ```yaml
    businesses:
      apple:
        symbol: aapl
        address: !use addresses.yaml
        <<: !update common.yaml
        emails:
          - jane.doe@apple.com
          - extend! emails.yaml
      amazon:
        symbol: amzn
        address: !use addresses.yaml
        <<: update! common.yaml
        emails:
          - john.doe@amazon.com
          - extend! emails.yaml
    ```
    """

    data = RecursiveLoader.load(fp)
    return cls.model_validate(data)

push_vars(update_core_resources=False) ¤

Push variable values to all child recursively

Source code in laktory/models/basemodel.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
def push_vars(self, update_core_resources=False) -> Any:
    """Push variable values to all child recursively"""

    def _update_model(m):
        if not isinstance(m, BaseModel):
            return
        for k, v in self.variables.items():
            m.variables[k] = m.variables.get(k, v)
        m.push_vars()

    def _push_vars(o):
        if isinstance(o, list):
            for _o in o:
                _push_vars(_o)
        elif isinstance(o, dict):
            for _o in o.values():
                _push_vars(_o)
        else:
            _update_model(o)

    for k in self.model_fields.keys():
        _push_vars(getattr(self, k))

    if update_core_resources and hasattr(self, "core_resources"):
        for r in self.core_resources:
            if r != self:
                _push_vars(r)

    return None

validate_assignment_disabled() ¤

Updating a model attribute inside a model validator when validate_assignment is True causes an infinite recursion by design and must be turned off temporarily.

Source code in laktory/models/basemodel.py
323
324
325
326
327
328
329
330
331
332
333
334
335
@contextmanager
def validate_assignment_disabled(self):
    """
    Updating a model attribute inside a model validator when `validate_assignment`
    is `True` causes an infinite recursion by design and must be turned off
    temporarily.
    """
    original_state = self.model_config["validate_assignment"]
    self.model_config["validate_assignment"] = False
    try:
        yield
    finally:
        self.model_config["validate_assignment"] = original_state