Skip to content

Pipeline

laktory.models.pipeline.Pipeline ¤

Bases: BaseModel, PulumiResource, TerraformResource, PipelineChild

Pipeline model to manage a data pipeline including reading from data sources, applying data transformations and outputting to data sinks.

A pipeline is composed of collections of nodes, each one defining its own source, transformations and optional sink. A node may be the source of another node.

A pipeline may be run manually by using python or the CLI, but it may also be deployed and scheduled using one of the supported orchestrators, such as a Databricks job or Lakeflow Declarative Pipeline.

The DataFrame backend used to run the pipeline can be configured at the pipeline level or at the nodes level.

Examples:

This first example shows how to configure a simple pipeline with 2 nodes. Upon execution, raw data will be read from a JSON files and two DataFrames (bronze and silver) will be created and saved as parquet files. Notice how the first node is used as a data source for the second node. Polars is used as the DataFrame backend.

import io

import laktory as lk

pipeline_yaml = '''
    name: pl-stock-prices
    dataframe_backend: POLARS

    nodes:
    - name: brz_stock_prices
      source:
        path: ./data/stock_prices/
        format: JSONL
      sinks:
      - path: ./data/brz_stock_prices.parquet
        format: PARQUET

    - name: slv_stock_prices
      source:
        node_name: brz_stock_prices
        as_stream: false
      sinks:
      - path: ./data/slv_stock_prices.parquet
        format: PARQUET
      transformer:
        nodes:
        - expr: |
            SELECT
              CAST(data.created_at AS TIMESTAMP) AS created_at,
              data.symbol AS name,
              data.symbol AS symbol,
              data.open AS open,
              data.close AS close,
              data.high AS high,
              data.low AS low,
              data.volume AS volume
            FROM
              {df}
        - func_name: unique
          func_kwargs:
            subset:
              - symbol
              - created_at
            keep:
              any
'''

pl = lk.models.Pipeline.model_validate_yaml(io.StringIO(pipeline_yaml))

# Execute pipeline
# pl.execute()

The next example also defines a 2 nodes pipeline, but uses PySpark as the DataFrame backend. It defines the configuration required to deploy it as a Databricks job. In this case, the sinks are writing to unity catalog tables.

import io

import laktory as lk

pipeline_yaml = '''
    name: pl-stocks-job
    dataframe_backend: PYSPARK
    orchestrator:
      type: DATABRICKS_JOB
      serverless_environment_version: "5"

    nodes:

    - name: brz_stock_prices
      source:
        path: dbfs:/laktory/data/stock_prices/
        as_stream: false
        format: JSONL
      sinks:
      - table_name: brz_stock_prices_job
        mode: OVERWRITE

    - name: slv_stock_prices
      expectations:
      - name: positive_price
        expr: open > 0
        action: DROP
      source:
        node_name: brz_stock_prices
        as_stream: false
      sinks:
      - table_name: slv_stock_prices_job
        mode: OVERWRITE

      transformer:
        nodes:
        - expr: |
            SELECT
                cast(data.created_at AS TIMESTAMP) AS created_at,
                data.symbol AS symbol,
                data.open AS open,
                data.close AS close
            FROM
                {df}
        - func_name: drop_duplicates
          func_kwargs:
            subset: ["created_at", "symbol"]
          dataframe_api: NATIVE
'''
pl = lk.models.Pipeline.model_validate_yaml(io.StringIO(pipeline_yaml))
References
PARAMETER DESCRIPTION
databricks_quality_monitor_enabled

Enable Databricks Quality Monitor. When enabled, quality monitors are created for each sink configured with a quality monitor and deleted for sinks without.

TYPE: bool | VariableType DEFAULT: False

dependencies

List of dependencies required to run the pipeline. If Laktory is not provided, it's current version is added to the list.

TYPE: list[str | VariableType] | VariableType DEFAULT: []

imports

List of modules to import before execution. Generally used to load Narwhals extensions. Packages listed in dependencies are automatically included in the list of imports.

TYPE: list[str | VariableType] | VariableType DEFAULT: []

name

Name of the pipeline

TYPE: str | VariableType

nodes

List of pipeline nodes. Each node defines a data source, a series of transformations and optionally a sink.

TYPE: list[PipelineNode | VariableType] | VariableType DEFAULT: []

orchestrator

Orchestrator used for scheduling and executing the pipeline. The selected option defines which resources are to be deployed. Supported options are instances of classes:

  • DatabricksJobOrchestrator: When deployed through a Databricks Job, a task is created for each pipeline node and all the required dependencies are set automatically. If a given task (or pipeline node) uses a PipelineNodeDataSource as the source, the data will be read from the upstream node sink.
  • DatabricksPipelineOrchestrator: When orchestrated through Databricks DLT, each pipeline node creates a DLT table (or view, if no sink is defined). Behind the scenes, PipelineNodeDataSource leverages native dlt read and read_stream functions to defined the interdependencies between the tables as in a standard DLT pipeline.

TYPE: DatabricksJobOrchestrator | DatabricksPipelineOrchestrator | AirflowOrchestrator | VariableType DEFAULT: None

root_path_

Location of the pipeline node root used to store logs, metrics and checkpoints.

TYPE: str | Path | VariableType DEFAULT: None

METHOD DESCRIPTION
dag_figure

[UNDER DEVELOPMENT] Generate a figure representation of the pipeline

execute

Execute the pipeline (read sources and write sinks) by sequentially

get_execution_plan

Execute the pipeline (read sources and write sinks) by sequentially

ATTRIBUTE DESCRIPTION
additional_core_resources

if orchestrator is DLT:

TYPE: list[PulumiResource]

dag

Networkx Directed Acyclic Graph representation of the pipeline. Useful

TYPE: DiGraph

is_orchestrator_dlt

If True, pipeline orchestrator is DLT

TYPE: bool

nodes_dict

Nodes dictionary whose keys are the node names.

TYPE: dict[str, PipelineNode]

resource_type_id

pl

TYPE: str

sorted_nodes

Topologically sorted nodes.

TYPE: list[PipelineNode]

additional_core_resources property ¤

if orchestrator is DLT:

  • DLT Pipeline

if orchestrator is DATABRICKS_JOB:

  • Databricks Job

dag property ¤

Networkx Directed Acyclic Graph representation of the pipeline. Useful to identify interdependencies between nodes.

RETURNS DESCRIPTION
DiGraph

Directed Acyclic Graph

is_orchestrator_dlt property ¤

If True, pipeline orchestrator is DLT

nodes_dict property ¤

Nodes dictionary whose keys are the node names.

RETURNS DESCRIPTION
dict[str, PipelineNode]

Nodes

resource_type_id property ¤

pl

sorted_nodes property ¤

Topologically sorted nodes.

RETURNS DESCRIPTION
list[PipelineNode]

List of Topologically sorted nodes.

dag_figure() ¤

[UNDER DEVELOPMENT] Generate a figure representation of the pipeline DAG.

RETURNS DESCRIPTION
Figure

Plotly figure representation of the pipeline.

Source code in laktory/models/pipeline/pipeline.py
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
def dag_figure(self) -> "Figure":
    """
    [UNDER DEVELOPMENT] Generate a figure representation of the pipeline
    DAG.

    Returns
    -------
    :
        Plotly figure representation of the pipeline.
    """
    import networkx as nx
    import plotly.graph_objs as go

    dag = self.dag
    pos = nx.spring_layout(dag)

    # ------------------------------------------------------------------- #
    # Edges                                                               #
    # ------------------------------------------------------------------- #

    edge_traces = []
    for e in dag.edges:
        edge_traces += [
            go.Scatter(
                x=[pos[e[0]][0], pos[e[1]][0]],
                y=[pos[e[0]][1], pos[e[1]][1]],
                line={
                    "color": "#a006b1",
                },
                marker=dict(
                    symbol="arrow-bar-up",
                    angleref="previous",
                    size=30,
                ),
                mode="lines+markers",
                hoverinfo="none",
                showlegend=False,
            )
        ]

    # ------------------------------------------------------------------- #
    # Nodes                                                               #
    # ------------------------------------------------------------------- #

    nodes_trace = go.Scatter(
        x=[_p[0] for _p in pos.values()],
        y=[_p[1] for _p in pos.values()],
        text=list(dag.nodes),
        name="pl nodes",
        mode="markers+text",
        marker=dict(
            size=50,
            color="#06d49e",
            line=dict(
                width=2,
                color="#dff2ed",
            ),
        ),
        textfont=dict(
            color="#a006b1",
        ),
    )

    return go.Figure(data=[nodes_trace] + edge_traces)

execute(write_sinks=True, full_refresh=False, named_dfs=None, update_tables_metadata=True, selects=None) ¤

Execute the pipeline (read sources and write sinks) by sequentially executing each node. The selected orchestrator might impact how data sources or sinks are processed.

PARAMETER DESCRIPTION
write_sinks

If False writing of node sinks will be skipped

DEFAULT: True

full_refresh

If True all nodes will be completely re-processed by deleting existing data and checkpoints before processing.

TYPE: bool DEFAULT: False

named_dfs

Named DataFrames to be passed to pipeline nodes transformer.

TYPE: dict[str, AnyFrame] DEFAULT: None

update_tables_metadata

Update tables metadata

TYPE: bool DEFAULT: True

selects

List of node names with optional dependency notation:

  • {node_name}: Execute the node only.
  • *{node_name}: Execute the node and its upstream dependencies.
  • {node_name}*: Execute the node and its downstream dependencies.
  • *{node_name}*: Execute the node, its upstream, and downstream dependencies.

TYPE: list[str] | None DEFAULT: None

Source code in laktory/models/pipeline/pipeline.py
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
def execute(
    self,
    write_sinks=True,
    full_refresh: bool = False,
    named_dfs: dict[str, AnyFrame] = None,
    update_tables_metadata: bool = True,
    selects: list[str] | None = None,
) -> None:
    """
    Execute the pipeline (read sources and write sinks) by sequentially
    executing each node. The selected orchestrator might impact how
    data sources or sinks are processed.

    Parameters
    ----------
    write_sinks:
        If `False` writing of node sinks will be skipped
    full_refresh:
        If `True` all nodes will be completely re-processed by deleting
        existing data and checkpoints before processing.
    named_dfs:
        Named DataFrames to be passed to pipeline nodes transformer.
    update_tables_metadata:
        Update tables metadata
    selects:
        List of node names with optional dependency notation:

        - `{node_name}`: Execute the node only.
        - `*{node_name}`: Execute the node and its upstream dependencies.
        - `{node_name}*`: Execute the node and its downstream dependencies.
        - `*{node_name}*`: Execute the node, its upstream, and downstream dependencies.
    """

    logger.info(f"Executing pipeline '{self.name}'")

    plan = self.get_execution_plan(selects=selects)
    node_names = plan.node_names

    logger.info(f"Selected nodes: {node_names}")

    if named_dfs is None:
        named_dfs = {}

    for task in plan.tasks:
        task.execute(
            write_sinks=write_sinks,
            full_refresh=full_refresh,
            named_dfs=named_dfs,
            update_tables_metadata=update_tables_metadata,
        )

get_execution_plan(selects=None) ¤

Execute the pipeline (read sources and write sinks) by sequentially executing each node. The selected orchestrator might impact how data sources or sinks are processed.

PARAMETER DESCRIPTION
selects

List of node names with optional dependency notation:

  • {node_name}: Execute the node only.
  • *{node_name}: Execute the node and its upstream dependencies.
  • {node_name}*: Execute the node and its downstream dependencies.
  • *{node_name}*: Execute the node, its upstream, and downstream dependencies.

TYPE: list[str] | None DEFAULT: None

Source code in laktory/models/pipeline/pipeline.py
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
def get_execution_plan(
    self,
    selects: list[str] | None = None,
):
    """
    Execute the pipeline (read sources and write sinks) by sequentially
    executing each node. The selected orchestrator might impact how
    data sources or sinks are processed.

    Parameters
    ----------
    selects:
        List of node names with optional dependency notation:

        - `{node_name}`: Execute the node only.
        - `*{node_name}`: Execute the node and its upstream dependencies.
        - `{node_name}*`: Execute the node and its downstream dependencies.
        - `*{node_name}*`: Execute the node, its upstream, and downstream dependencies.
    """

    from laktory.models.pipeline.pipelineexecutionplan import PipelineExecutionPlan

    plan = PipelineExecutionPlan(
        pipeline=self,
        selects=selects,
    )
    self._plan = plan
    return plan