Skip to content

PipelineNode

laktory.models.pipeline.PipelineNode ¤

Bases: BaseModel, PipelineChild

Pipeline base component generating a DataFrame by reading a data source and applying a transformer (chain of dataframe transformations). Optional output to a data sink.

Examples:

A node reading stock prices data from a CSV file and writing a DataFrame as a parquet file.

import io

import laktory as lk

node_yaml = '''
    name: brz_stock_prices
    sources:
    - path: "./events/stock_prices/"
      format: JSON
    sinks:
    - path: ./tables/brz_stock_prices/
      format: PARQUET
'''

node = lk.models.PipelineNode.model_validate_yaml(io.StringIO(node_yaml))

# node.execute()

A node reading stock prices from an upstream node and writing a DataFrame to a data table.

import io

import laktory as lk

node_yaml = '''
    name: slv_stock_prices
    sources:
    - node_name: brz_stock_prices
    sinks:
    - schema_name: finance
      table_name: slv_stock_prices
    transformer:
      nodes:
      - expr: |
            SELECT
              data.created_at AS created_at,
              data.symbol AS symbol,
              data.open AS open,
              data.close AS close,
              data.high AS high,
              data.low AS low,
              data.volume AS volume
            FROM
              {df}
      - func_name: drop_duplicates
        func_kwargs:
          subset:
            - symbol
            - timestamp
'''

node = lk.models.PipelineNode.model_validate_yaml(io.StringIO(node_yaml))

# node.execute()
References
PARAMETER DESCRIPTION
comment

Comment for the associated table or view

TYPE: str | VariableType DEFAULT: None

depends_on

Pipeline node names that must complete before this node executes. Use for DAG ordering when no data flows between nodes.

TYPE: list[str | VariableType] | VariableType DEFAULT: []

execution_task_name_

Execution task name when orchestrator (such as Databricks Jobs and Airflow) supports multi-tasks execution. Nodes with the same task name will be executed together in a single task. If None is provided, node will be executed under node-{node-name}.

TYPE: str | VariableType DEFAULT: None

expectations

List of data expectations. Can trigger warnings, drop invalid records or fail a pipeline.

TYPE: list[DataQualityExpectation | VariableType] | VariableType DEFAULT: []

expectations_checkpoint_path_

Path to which the checkpoint file for which expectations on a streaming dataframe should be written.

TYPE: str | Path | VariableType DEFAULT: None

ldp_template

Specify which template (notebook) to use when Lakeflow Declarative Pipeline is selected as the orchestrator.

TYPE: str | None | VariableType DEFAULT: 'DEFAULT'

name

Name given to the node.

TYPE: str | VariableType

primary_keys

A list of column names that uniquely identify each row in the DataFrame. These columns are used to: - Document the uniqueness constraints of the node's output data. - Define the default primary keys for sinks CDC merge operations - Referenced in expectations and unit tests. While optional, specifying primary_keys helps enforce data integrity and ensures that downstream operations, such as deduplication, are consistent and reliable.

TYPE: list[str | VariableType] | VariableType DEFAULT: None

root_path_

Location of the pipeline node root used to store logs, metrics and checkpoints.

TYPE: str | Path | VariableType DEFAULT: None

sinks

Definition of the data sink(s). Set is_quarantine to True to store node quarantine DataFrame.

TYPE: list[PipelineViewDataSink | FileDataSink | UnityCatalogDataSink | HiveMetastoreDataSink | VariableType] | VariableType DEFAULT: []

sources

Data sources for this node. The first entry is the primary input fed into the transformer as {df}. Assign a name to each source to reference it as {sources.name} in transformer expressions.

TYPE: list[CustomDataSource | FileDataSource | UnityCatalogDataSource | HiveMetastoreDataSource | DataFrameDataSource | PipelineNodeDataSource | VariableType] | VariableType DEFAULT: []

tags

Node tags for selective execution.

TYPE: list[str | VariableType] | VariableType DEFAULT: []

time_column

The name of the column that represents the timestamp or temporal dimension in the DataFrame. This column is used to: - Document the time-based ordering and filtering semantics of the node’s output data. - Enable time-aware operations such as point-in-time joins, incremental processing, and time-series analysis. - Serve as a reference in expectations, unit tests, and feature engineering workflows. While optional, specifying time_column helps ensure consistency in time-based logic and improves the reliability of downstream operations that depend on temporal alignment.

TYPE: str | None | VariableType DEFAULT: None

transformer

Data transformations applied between the source and the sink(s).

TYPE: DataFrameTransformer | VariableType DEFAULT: None

METHOD DESCRIPTION
check_expectations

Check expectations, raise errors, warnings where required and build

execute

Execute pipeline node by:

ATTRIBUTE DESCRIPTION
all_sinks

List of all sinks (output and quarantine).

data_sources

Get all sources feeding the pipeline node: explicit sources and {nodes.X} transformer references.

TYPE: list[BaseDataSource]

has_output_sinks

True if node has at least one output sink.

TYPE: bool

has_sinks

True if node has at least one sink.

TYPE: bool

has_streaming_source

True if any declared source is read as a stream.

TYPE: bool

is_orchestrator_ldp

If True, pipeline node is used in the context of a Lakeflow Declarative Pipeline

TYPE: bool

is_orchestrator_sdp

If True, pipeline node is used in the context of a Spark Declarative Pipeline

TYPE: bool

is_sql_expressible

True if the node can be executed on a SQL warehouse.

TYPE: bool

output_df

Dataframe resulting from reading source, applying transformer and dropping rows not meeting data quality

TYPE: AnyFrame

output_sinks

List of sinks writing the output DataFrame

TYPE: list[DataSinksUnion]

primary_sink

Primary output sink used as a source for downstream nodes.

TYPE: DataSinksUnion | None

quarantine_df

DataFrame storing stage_df rows not meeting data quality expectations.

TYPE: AnyFrame

quarantine_sinks

List of sinks writing the quarantine DataFrame

TYPE: list[DataSinksUnion]

sinks_count

Total number of sinks.

TYPE: int

sql_statements

Generate one SQL statement per sink, suitable for execution on a SQL warehouse.

TYPE: list[str]

stage_df

Dataframe resulting from reading source and applying transformer, before data quality checks are applied.

TYPE: AnyFrame

upstream_node_names

Pipeline node names required to execute current node.

TYPE: list[str]

view_definition

Transformer View Definition (when applicable)

all_sinks property ¤

List of all sinks (output and quarantine).

data_sources property ¤

Get all sources feeding the pipeline node: explicit sources and {nodes.X} transformer references.

has_output_sinks property ¤

True if node has at least one output sink.

has_sinks property ¤

True if node has at least one sink.

has_streaming_source property ¤

True if any declared source is read as a stream.

is_orchestrator_ldp property ¤

If True, pipeline node is used in the context of a Lakeflow Declarative Pipeline

is_orchestrator_sdp property ¤

If True, pipeline node is used in the context of a Spark Declarative Pipeline

is_sql_expressible property ¤

True if the node can be executed on a SQL warehouse.

output_df property ¤

Dataframe resulting from reading source, applying transformer and dropping rows not meeting data quality expectations.

output_sinks property ¤

List of sinks writing the output DataFrame

primary_sink property ¤

Primary output sink used as a source for downstream nodes.

quarantine_df property ¤

DataFrame storing stage_df rows not meeting data quality expectations.

quarantine_sinks property ¤

List of sinks writing the quarantine DataFrame

sinks_count property ¤

Total number of sinks.

sql_statements property ¤

Generate one SQL statement per sink, suitable for execution on a SQL warehouse.

Raises ValueError if the node is not SQL-expressible.

stage_df property ¤

Dataframe resulting from reading source and applying transformer, before data quality checks are applied.

upstream_node_names property ¤

Pipeline node names required to execute current node.

view_definition property ¤

Transformer View Definition (when applicable)

check_expectations() ¤

Check expectations, raise errors, warnings where required and build filtered and quarantine DataFrames.

Some actions have to be disabled when selected orchestrator is Databricks LDP:

  • Raising error on Failure when expectation is supported by LDP
  • Dropping rows when expectation is supported by LDP
Source code in laktory/models/pipeline/pipelinenode.py
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
def check_expectations(self):
    """
    Check expectations, raise errors, warnings where required and build
    filtered and quarantine DataFrames.

    Some actions have to be disabled when selected orchestrator is
    Databricks LDP:

    * Raising error on Failure when expectation is supported by LDP
    * Dropping rows when expectation is supported by LDP
    """

    # Data Quality Checks
    qfilter = None  # Quarantine filter
    kfilter = None  # Keep filter
    if self._stage_df is None:
        # Node without source or transformer
        return
    if not self.expectations:
        return
    if self.is_sdp_execute:
        # SDP blocks DataFrame analysis (AnalyzePlan RPCs) inside decorated
        # functions. SDP-compatible expectations are enforced via @dp.expect_*
        # decorators in laktory_sdp.py; Laktory cannot run checks in-process.
        names = [e.name for e in self.expectations if not e.is_sdp_compatible]
        if names:
            raise TypeError(
                f"Expectations {names} are not natively supported by SDP and "
                "cannot be enforced inside an SDP decorated function."
            )
        return
    is_streaming = getattr(nw.to_native(self._stage_df), "isStreaming", False)

    def _batch_check(df, node):
        for e in node.expectations:
            # Run Check: this only warn or raise exceptions.
            if not e.is_sdp_managed:
                e.run_check(
                    df,
                    raise_or_warn=True,
                    node=node,
                )

    def _stream_check(batch_df, batch_id, node):
        _batch_check(
            batch_df,
            node,
        )

    logger.info("Checking Data Quality Expectations")

    if not is_streaming:
        _batch_check(
            self._stage_df,
            self,
        )

    else:
        skip = False

        if self.is_ldp_execute:
            names = []
            for e in self.expectations:
                if not e.is_ldp_compatible:
                    names += [e.name]
            if names:
                raise TypeError(
                    f"Expectations {names} are not natively supported by Lakeflow and can't be computed on a streaming DataFrame with Lakeflow executor."
                )

            skip = True

        backend = DataFrameBackends.from_df(self._stage_df)
        if backend not in STREAMING_BACKENDS:
            raise TypeError(
                f"DataFrame backend {backend} is not supported for streaming operations"
            )

        if self.expectations_checkpoint_path is None:
            raise ValueError(
                f"Expectations Checkpoint not specified for node '{self.name}'"
            )

        # TODO: Refactor for backend other than spark
        if not skip:
            query = (
                self._stage_df.to_native()
                .writeStream.foreachBatch(
                    lambda batch_df, batch_id: _stream_check(
                        nw.from_native(batch_df), batch_id, self
                    )
                )
                .trigger(availableNow=True)
                .options(
                    checkpointLocation=self.expectations_checkpoint_path,
                )
                .start()
            )
            query.awaitTermination()

    # Build Filters
    for e in self.expectations:
        # Update Keep Filter
        if not e.is_sdp_managed:
            _filter = e.keep_filter
            if _filter is not None:
                if kfilter is None:
                    kfilter = _filter
                else:
                    kfilter = kfilter & _filter

        # Update Quarantine Filter
        _filter = e.quarantine_filter
        if _filter is not None:
            if qfilter is None:
                qfilter = _filter
            else:
                qfilter = qfilter & _filter

    if qfilter is not None:
        logger.info("Building quarantine DataFrame")
        self._quarantine_df = self._stage_df.filter(qfilter)
    else:
        self._quarantine_df = self._stage_df  # .filter("False")

    if kfilter is not None:
        logger.info("Dropping invalid rows")
        self._output_df = self._stage_df.filter(kfilter)
    else:
        self._output_df = self._stage_df

execute(apply_transformer=True, write_sinks=True, full_refresh=False, named_dfs=None, update_tables_metadata=True) ¤

Execute pipeline node by:

  • Reading the source
  • Applying the user defined (and layer-specific if applicable) transformations
  • Checking expectations
  • Writing the sinks
PARAMETER DESCRIPTION
apply_transformer

Flag to apply transformer in the execution

TYPE: bool DEFAULT: True

write_sinks

Flag to include writing sink in the execution

TYPE: bool DEFAULT: True

full_refresh

If True dataframe will be completely re-processed by deleting existing data and checkpoint before processing.

TYPE: bool DEFAULT: False

named_dfs

Named DataFrame passed to transformer nodes

TYPE: dict[str, AnyFrame] DEFAULT: None

update_tables_metadata

Update tables metadata

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
AnyFrame

output Spark DataFrame

Source code in laktory/models/pipeline/pipelinenode.py
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
def execute(
    self,
    apply_transformer: bool = True,
    write_sinks: bool = True,
    full_refresh: bool = False,
    named_dfs: dict[str, AnyFrame] = None,
    update_tables_metadata: bool = True,
) -> AnyFrame:
    """
    Execute pipeline node by:

    - Reading the source
    - Applying the user defined (and layer-specific if applicable) transformations
    - Checking expectations
    - Writing the sinks

    Parameters
    ----------
    apply_transformer:
        Flag to apply transformer in the execution
    write_sinks:
        Flag to include writing sink in the execution
    full_refresh:
        If `True` dataframe will be completely re-processed by deleting
        existing data and checkpoint before processing.
    named_dfs:
        Named DataFrame passed to transformer nodes
    update_tables_metadata:
        Update tables metadata

    Returns
    -------
    :
        output Spark DataFrame
    """
    logger.info(f"Executing pipeline node {self.name}")

    # Install dependencies
    pl = self.parent_pipeline
    if pl and not pl._imports_imported:
        for package_name in pl._imports:
            try:
                logger.info(f"Importing {package_name}")
                importlib.import_module(package_name)
            except ModuleNotFoundError:
                logger.info(f"Importing {package_name} failed.")
        pl._imports_imported = True

    # Skip sink writes when a declarative pipeline framework owns them
    if self.is_orchestrator_ldp or self.is_orchestrator_sdp:
        logger.info(
            "Declarative pipeline orchestrator selected. Sinks writing will be skipped."
        )
        write_sinks = False
        full_refresh = False

    # Refresh
    if full_refresh:
        self.purge()

    # Read all declared sources into named_dfs with "sources." prefix
    if named_dfs is None:
        named_dfs = {}
    for src in self.sources:
        src_key = src.name if src.name else "df"
        named_dfs[f"sources.{src_key}"] = src.read()

    # Primary df: always the first declared source of THIS node
    if self.sources:
        first_src_key = self.sources[0].name if self.sources[0].name else "df"
        self._stage_df = named_dfs.get(f"sources.{first_src_key}")
    else:
        self._stage_df = None

    # Pre-load upstream nodes referenced in transformer ({nodes.X} in SQL/method args)
    if apply_transformer and self.transformer and self.parent_pipeline:
        for upstream_name in self.transformer.upstream_node_names:
            key = f"nodes.{upstream_name}"
            if key not in named_dfs:
                # Reuse if already loaded via a sources entry for the same node
                existing = next(
                    (
                        df
                        for src, df in (
                            (
                                src,
                                named_dfs.get(
                                    f"sources.{src.name if src.name else 'df'}"
                                ),
                            )
                            for src in self.sources
                            if isinstance(src, PipelineNodeDataSource)
                            and src.node_name == upstream_name
                        )
                        if df is not None
                    ),
                    None,
                )
                if existing is not None:
                    named_dfs[key] = existing
                else:
                    tmp = PipelineNodeDataSource(node_name=upstream_name)
                    tmp._parent = self
                    named_dfs[key] = tmp.read()

    # Apply transformer
    if apply_transformer and self.transformer:
        self._stage_df = self.transformer.execute(
            self._stage_df, named_dfs=named_dfs
        )

    # Check expectations
    self._output_df = self._stage_df
    self._quarantine_df = None
    self.check_expectations()

    # Output and Quarantine to Sinks
    if write_sinks and self.sinks:
        view_definition = self.view_definition

        for s in self.sinks:
            # Get DataFrame
            _df = self._output_df
            if s.is_quarantine:
                _df = self._quarantine_df

            # Create Sink
            s.create(df=_df)

            _is_update_metadata = (
                update_tables_metadata and s.metadata and not self.is_ldp_execute
            )

            if self.is_view:
                s.write(view_definition=view_definition)
                if _is_update_metadata:
                    s.metadata.execute()
                self._output_df = s.as_source().read()
            else:
                if _is_update_metadata:
                    s.metadata.execute()
                s.write(df=self._output_df)

                # Metadata update required because of schema overwrite
                if _is_update_metadata and s.metadata.update_required:
                    s.metadata.execute()

    return self._output_df