Skip to content

PipelineNode

laktory.models.pipeline.PipelineNode ¤

Bases: BaseModel, PipelineChild

Pipeline base component generating a DataFrame by reading a data source and applying a transformer (chain of dataframe transformations). Optional output to a data sink.

Examples:

A node reading stock prices data from a CSV file and writing a DataFrame as a parquet file.

import io

import laktory as lk

node_yaml = '''
    name: brz_stock_prices
    source:
      path: "./events/stock_prices/"
      format: JSON
    sinks:
    - path: ./tables/brz_stock_prices/
      format: PARQUET
'''

node = lk.models.PipelineNode.model_validate_yaml(io.StringIO(node_yaml))

# node.execute()

A node reading stock prices from an upstream node and writing a DataFrame to a data table.

import io

import laktory as lk

node_yaml = '''
    name: slv_stock_prices
    source:
      node_name: brz_stock_prices
    sinks:
    - schema_name: finance
      table_name: slv_stock_prices
    transformer:
      nodes:
      - expr: |
            SELECT
              data.created_at AS created_at,
              data.symbol AS symbol,
              data.open AS open,
              data.close AS close,
              data.high AS high,
              data.low AS low,
              data.volume AS volume
            FROM
              {df}
      - func_name: drop_duplicates
        func_kwargs:
          subset:
            - symbol
            - timestamp
'''

node = lk.models.PipelineNode.model_validate_yaml(io.StringIO(node_yaml))

# node.execute()
References
PARAMETER DESCRIPTION
comment

Comment for the associated table or view

TYPE: str | VariableType DEFAULT: None

dlt_template

Specify which template (notebook) to use when Databricks pipeline is selected as the orchestrator.

TYPE: str | None | VariableType DEFAULT: 'DEFAULT'

execution_task_name_

Execution task name when orchestrator (such as Databricks Jobs and Airflow) supports multi-tasks execution. Nodes with the same task name will be executed together in a single task. If None is provided, node will be executed under node-{node-name}.

TYPE: str | VariableType DEFAULT: None

expectations

List of data expectations. Can trigger warnings, drop invalid records or fail a pipeline.

TYPE: list[DataQualityExpectation | VariableType] | VariableType DEFAULT: []

expectations_checkpoint_path_

Path to which the checkpoint file for which expectations on a streaming dataframe should be written.

TYPE: str | Path | VariableType DEFAULT: None

name

Name given to the node.

TYPE: str | VariableType

primary_keys

A list of column names that uniquely identify each row in the DataFrame. These columns are used to: - Document the uniqueness constraints of the node's output data. - Define the default primary keys for sinks CDC merge operations - Referenced in expectations and unit tests. While optional, specifying primary_keys helps enforce data integrity and ensures that downstream operations, such as deduplication, are consistent and reliable.

TYPE: list[str | VariableType] | VariableType DEFAULT: None

root_path_

Location of the pipeline node root used to store logs, metrics and checkpoints.

TYPE: str | Path | VariableType DEFAULT: None

sinks

Definition of the data sink(s). Set is_quarantine to True to store node quarantine DataFrame.

TYPE: list[PipelineViewDataSink | FileDataSink | UnityCatalogDataSink | HiveMetastoreDataSink | VariableType] | VariableType DEFAULT: None

source

Definition of the data source(s)

TYPE: CustomDataSource | FileDataSource | UnityCatalogDataSource | HiveMetastoreDataSource | DataFrameDataSource | PipelineNodeDataSource | None | VariableType DEFAULT: None

tags

Node tags for selective execution.

TYPE: list[str | VariableType] | VariableType DEFAULT: []

time_column

The name of the column that represents the timestamp or temporal dimension in the DataFrame. This column is used to: - Document the time-based ordering and filtering semantics of the node’s output data. - Enable time-aware operations such as point-in-time joins, incremental processing, and time-series analysis. - Serve as a reference in expectations, unit tests, and feature engineering workflows. While optional, specifying time_column helps ensure consistency in time-based logic and improves the reliability of downstream operations that depend on temporal alignment.

TYPE: str | None | VariableType DEFAULT: None

transformer

Data transformations applied between the source and the sink(s).

TYPE: DataFrameTransformer | VariableType DEFAULT: None

METHOD DESCRIPTION
check_expectations

Check expectations, raise errors, warnings where required and build

execute

Execute pipeline node by:

ATTRIBUTE DESCRIPTION
all_sinks

List of all sinks (output and quarantine).

data_sources

Get all sources feeding the pipeline node

TYPE: list[BaseDataSource]

has_output_sinks

True if node has at least one output sink.

TYPE: bool

has_sinks

True if node has at least one sink.

TYPE: bool

is_orchestrator_dlt

If True, pipeline node is used in the context of a DLT pipeline

TYPE: bool

is_sql_expressible

True if the node can be executed on a SQL warehouse.

TYPE: bool

output_df

Dataframe resulting from reading source, applying transformer and dropping rows not meeting data quality

TYPE: AnyFrame

output_sinks

List of sinks writing the output DataFrame

TYPE: list[DataSinksUnion]

primary_sink

Primary output sink used as a source for downstream nodes.

TYPE: DataSinksUnion | None

quarantine_df

DataFrame storing stage_df rows not meeting data quality expectations.

TYPE: AnyFrame

quarantine_sinks

List of sinks writing the quarantine DataFrame

TYPE: list[DataSinksUnion]

sinks_count

Total number of sinks.

TYPE: int

sql_statements

Generate one SQL statement per sink, suitable for execution on a SQL warehouse.

TYPE: list[str]

stage_df

Dataframe resulting from reading source and applying transformer, before data quality checks are applied.

TYPE: AnyFrame

upstream_node_names

Pipeline node names required to execute current node.

TYPE: list[str]

view_definition

Transformer View Definition (when applicable)

all_sinks property ¤

List of all sinks (output and quarantine).

data_sources property ¤

Get all sources feeding the pipeline node

has_output_sinks property ¤

True if node has at least one output sink.

has_sinks property ¤

True if node has at least one sink.

is_orchestrator_dlt property ¤

If True, pipeline node is used in the context of a DLT pipeline

is_sql_expressible property ¤

True if the node can be executed on a SQL warehouse.

output_df property ¤

Dataframe resulting from reading source, applying transformer and dropping rows not meeting data quality expectations.

output_sinks property ¤

List of sinks writing the output DataFrame

primary_sink property ¤

Primary output sink used as a source for downstream nodes.

quarantine_df property ¤

DataFrame storing stage_df rows not meeting data quality expectations.

quarantine_sinks property ¤

List of sinks writing the quarantine DataFrame

sinks_count property ¤

Total number of sinks.

sql_statements property ¤

Generate one SQL statement per sink, suitable for execution on a SQL warehouse.

Raises ValueError if the node is not SQL-expressible.

stage_df property ¤

Dataframe resulting from reading source and applying transformer, before data quality checks are applied.

upstream_node_names property ¤

Pipeline node names required to execute current node.

view_definition property ¤

Transformer View Definition (when applicable)

check_expectations() ¤

Check expectations, raise errors, warnings where required and build filtered and quarantine DataFrames.

Some actions have to be disabled when selected orchestrator is Databricks DLT:

  • Raising error on Failure when expectation is supported by DLT
  • Dropping rows when expectation is supported by DLT
Source code in laktory/models/pipeline/pipelinenode.py
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
def check_expectations(self):
    """
    Check expectations, raise errors, warnings where required and build
    filtered and quarantine DataFrames.

    Some actions have to be disabled when selected orchestrator is
    Databricks DLT:

    * Raising error on Failure when expectation is supported by DLT
    * Dropping rows when expectation is supported by DLT
    """

    # Data Quality Checks
    qfilter = None  # Quarantine filter
    kfilter = None  # Keep filter
    if self._stage_df is None:
        # Node without source or transformer
        return
    is_streaming = getattr(nw.to_native(self._stage_df), "isStreaming", False)
    if not self.expectations:
        return

    def _batch_check(df, node):
        for e in node.expectations:
            # Run Check: this only warn or raise exceptions.
            if not e.is_dlt_managed:
                e.run_check(
                    df,
                    raise_or_warn=True,
                    node=node,
                )

    def _stream_check(batch_df, batch_id, node):
        _batch_check(
            batch_df,
            node,
        )

    logger.info("Checking Data Quality Expectations")

    if not is_streaming:
        _batch_check(
            self._stage_df,
            self,
        )

    else:
        skip = False

        if self.is_dlt_execute:
            names = []
            for e in self.expectations:
                if not e.is_dlt_compatible:
                    names += [e.name]
            if names:
                raise TypeError(
                    f"Expectations {names} are not natively supported by DLT and can't be computed on a streaming DataFrame with DLT executor."
                )

            skip = True

        backend = DataFrameBackends.from_df(self._stage_df)
        if backend not in STREAMING_BACKENDS:
            raise TypeError(
                f"DataFrame backend {backend} is not supported for streaming operations"
            )

        if self.expectations_checkpoint_path is None:
            raise ValueError(
                f"Expectations Checkpoint not specified for node '{self.name}'"
            )

        # TODO: Refactor for backend other than spark
        if not skip:
            query = (
                self._stage_df.to_native()
                .writeStream.foreachBatch(
                    lambda batch_df, batch_id: _stream_check(
                        nw.from_native(batch_df), batch_id, self
                    )
                )
                .trigger(availableNow=True)
                .options(
                    checkpointLocation=self.expectations_checkpoint_path,
                )
                .start()
            )
            query.awaitTermination()

    # Build Filters
    for e in self.expectations:
        # Update Keep Filter
        if not e.is_dlt_managed:
            _filter = e.keep_filter
            if _filter is not None:
                if kfilter is None:
                    kfilter = _filter
                else:
                    kfilter = kfilter & _filter

        # Update Quarantine Filter
        _filter = e.quarantine_filter
        if _filter is not None:
            if qfilter is None:
                qfilter = _filter
            else:
                qfilter = qfilter & _filter

    if qfilter is not None:
        logger.info("Building quarantine DataFrame")
        self._quarantine_df = self._stage_df.filter(qfilter)
    else:
        self._quarantine_df = self._stage_df  # .filter("False")

    if kfilter is not None:
        logger.info("Dropping invalid rows")
        self._output_df = self._stage_df.filter(kfilter)
    else:
        self._output_df = self._stage_df

execute(apply_transformer=True, write_sinks=True, full_refresh=False, named_dfs=None, update_tables_metadata=True) ¤

Execute pipeline node by:

  • Reading the source
  • Applying the user defined (and layer-specific if applicable) transformations
  • Checking expectations
  • Writing the sinks
PARAMETER DESCRIPTION
apply_transformer

Flag to apply transformer in the execution

TYPE: bool DEFAULT: True

write_sinks

Flag to include writing sink in the execution

TYPE: bool DEFAULT: True

full_refresh

If True dataframe will be completely re-processed by deleting existing data and checkpoint before processing.

TYPE: bool DEFAULT: False

named_dfs

Named DataFrame passed to transformer nodes

TYPE: dict[str, AnyFrame] DEFAULT: None

update_tables_metadata

Update tables metadata

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
AnyFrame

output Spark DataFrame

Source code in laktory/models/pipeline/pipelinenode.py
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
def execute(
    self,
    apply_transformer: bool = True,
    write_sinks: bool = True,
    full_refresh: bool = False,
    named_dfs: dict[str, AnyFrame] = None,
    update_tables_metadata: bool = True,
) -> AnyFrame:
    """
    Execute pipeline node by:

    - Reading the source
    - Applying the user defined (and layer-specific if applicable) transformations
    - Checking expectations
    - Writing the sinks

    Parameters
    ----------
    apply_transformer:
        Flag to apply transformer in the execution
    write_sinks:
        Flag to include writing sink in the execution
    full_refresh:
        If `True` dataframe will be completely re-processed by deleting
        existing data and checkpoint before processing.
    named_dfs:
        Named DataFrame passed to transformer nodes
    update_tables_metadata:
        Update tables metadata

    Returns
    -------
    :
        output Spark DataFrame
    """
    logger.info(f"Executing pipeline node {self.name}")

    # Install dependencies
    pl = self.parent_pipeline
    if pl and not pl._imports_imported:
        for package_name in pl._imports:
            try:
                logger.info(f"Importing {package_name}")
                importlib.import_module(package_name)
            except ModuleNotFoundError:
                logger.info(f"Importing {package_name} failed.")
        pl._imports_imported = True

    # Parse DLT
    if self.is_orchestrator_dlt:
        logger.info("DLT orchestrator selected. Sinks writing will be skipped.")
        write_sinks = False
        full_refresh = False

    # Refresh
    if full_refresh:
        self.purge()

    # Read Source
    self._stage_df = None
    if self.source:
        self._stage_df = self.source.read()

    # Apply transformer
    if named_dfs is None:
        named_dfs = {}
    if apply_transformer and self.transformer:
        self._stage_df = self.transformer.execute(
            self._stage_df, named_dfs=named_dfs
        )

    # Check expectations
    self._output_df = self._stage_df
    self._quarantine_df = None
    self.check_expectations()

    # Output and Quarantine to Sinks
    if write_sinks and self.sinks:
        view_definition = self.view_definition

        for s in self.sinks:
            # Get DataFrame
            _df = self._output_df
            if s.is_quarantine:
                _df = self._quarantine_df

            # Create Sink
            s.create(df=_df)

            _is_update_metadata = (
                update_tables_metadata and s.metadata and not self.is_dlt_execute
            )

            if self.is_view:
                s.write(view_definition=view_definition)
                if _is_update_metadata:
                    s.metadata.execute()
                self._output_df = s.as_source().read()
            else:
                if _is_update_metadata:
                    s.metadata.execute()
                s.write(df=self._output_df)

                # Metadata update required because of schema overwrite
                if _is_update_metadata and s.metadata.update_required:
                    s.metadata.execute()

    return self._output_df