Skip to content

PipelineNode

laktory.models.pipeline.PipelineNode ¤

Bases: BaseModel, PipelineChild

Pipeline base component generating a DataFrame by reading a data source and applying a transformer (chain of dataframe transformations). Optional output to a data sink.

Examples:

A node reading stock prices data from a CSV file and writing a DataFrame as a parquet file.

import io

import laktory as lk

node_yaml = '''
    name: brz_stock_prices
    source:
      path: "./events/stock_prices/"
      format: JSON
    sinks:
    - path: ./tables/brz_stock_prices/
      format: PARQUET
'''

node = lk.models.PipelineNode.model_validate_yaml(io.StringIO(node_yaml))

# node.execute()

A node reading stock prices from an upstream node and writing a DataFrame to a data table.

import io

import laktory as lk

node_yaml = '''
    name: slv_stock_prices
    source:
      node_name: brz_stock_prices
    sinks:
    - schema_name: finance
      table_name: slv_stock_prices
    transformer:
      nodes:
      - expr: |
            SELECT
              data.created_at AS created_at,
              data.symbol AS symbol,
              data.open AS open,
              data.close AS close,
              data.high AS high,
              data.low AS low,
              data.volume AS volume
            FROM
              {df}
      - func_name: drop_duplicates
        func_kwargs:
          subset:
            - symbol
            - timestamp
'''

node = lk.models.PipelineNode.model_validate_yaml(io.StringIO(node_yaml))

# node.execute()
References
PARAMETER DESCRIPTION
comment

Comment for the associated table or view

TYPE: str | VariableType DEFAULT: None

dlt_template

Specify which template (notebook) to use when Databricks pipeline is selected as the orchestrator.

TYPE: str | None | VariableType DEFAULT: 'DEFAULT'

execution_task_name_

Execution task name when orchestrator (such as Databricks Jobs and Airflow) supports multi-tasks execution. Nodes with the same task name will be executed together in a single task. If None is provided, node will be executed under node-{node-name}.

TYPE: str | VariableType DEFAULT: None

expectations

List of data expectations. Can trigger warnings, drop invalid records or fail a pipeline.

TYPE: list[DataQualityExpectation | VariableType] | VariableType DEFAULT: []

expectations_checkpoint_path_

Path to which the checkpoint file for which expectations on a streaming dataframe should be written.

TYPE: str | Path | VariableType DEFAULT: None

name

Name given to the node.

TYPE: str | VariableType

primary_keys

A list of column names that uniquely identify each row in the DataFrame. These columns are used to: - Document the uniqueness constraints of the node's output data. - Define the default primary keys for sinks CDC merge operations - Referenced in expectations and unit tests. While optional, specifying primary_keys helps enforce data integrity and ensures that downstream operations, such as deduplication, are consistent and reliable.

TYPE: list[str | VariableType] | VariableType DEFAULT: None

root_path_

Location of the pipeline node root used to store logs, metrics and checkpoints.

TYPE: str | Path | VariableType DEFAULT: None

sinks

Definition of the data sink(s). Set is_quarantine to True to store node quarantine DataFrame.

TYPE: list[PipelineViewDataSink | FileDataSink | UnityCatalogDataSink | HiveMetastoreDataSink | VariableType] | VariableType DEFAULT: None

source

Definition of the data source(s)

TYPE: CustomDataSource | FileDataSource | UnityCatalogDataSource | HiveMetastoreDataSource | DataFrameDataSource | PipelineNodeDataSource | None | VariableType DEFAULT: None

tags

Node tags for selective execution.

TYPE: list[str | VariableType] | VariableType DEFAULT: []

time_column

The name of the column that represents the timestamp or temporal dimension in the DataFrame. This column is used to: - Document the time-based ordering and filtering semantics of the node’s output data. - Enable time-aware operations such as point-in-time joins, incremental processing, and time-series analysis. - Serve as a reference in expectations, unit tests, and feature engineering workflows. While optional, specifying time_column helps ensure consistency in time-based logic and improves the reliability of downstream operations that depend on temporal alignment.

TYPE: str | None | VariableType DEFAULT: None

transformer

Data transformations applied between the source and the sink(s).

TYPE: DataFrameTransformer | VariableType DEFAULT: None

METHOD DESCRIPTION
check_expectations

Check expectations, raise errors, warnings where required and build

execute

Execute pipeline node by:

ATTRIBUTE DESCRIPTION
all_sinks

List of all sinks (output and quarantine).

data_sources

Get all sources feeding the pipeline node

TYPE: list[BaseDataSource]

has_output_sinks

True if node has at least one output sink.

TYPE: bool

has_sinks

True if node has at least one sink.

TYPE: bool

is_orchestrator_dlt

If True, pipeline node is used in the context of a DLT pipeline

TYPE: bool

output_df

Dataframe resulting from reading source, applying transformer and dropping rows not meeting data quality

TYPE: AnyFrame

output_sinks

List of sinks writing the output DataFrame

TYPE: list[DataSinksUnion]

primary_sink

Primary output sink used as a source for downstream nodes.

TYPE: DataSinksUnion | None

quarantine_df

DataFrame storing stage_df rows not meeting data quality expectations.

TYPE: AnyFrame

quarantine_sinks

List of sinks writing the quarantine DataFrame

TYPE: list[DataSinksUnion]

sinks_count

Total number of sinks.

TYPE: int

stage_df

Dataframe resulting from reading source and applying transformer, before data quality checks are applied.

TYPE: AnyFrame

upstream_node_names

Pipeline node names required to execute current node.

TYPE: list[str]

view_definition

Transformer View Definition (when applicable)

all_sinks property ¤

List of all sinks (output and quarantine).

data_sources property ¤

Get all sources feeding the pipeline node

has_output_sinks property ¤

True if node has at least one output sink.

has_sinks property ¤

True if node has at least one sink.

is_orchestrator_dlt property ¤

If True, pipeline node is used in the context of a DLT pipeline

output_df property ¤

Dataframe resulting from reading source, applying transformer and dropping rows not meeting data quality expectations.

output_sinks property ¤

List of sinks writing the output DataFrame

primary_sink property ¤

Primary output sink used as a source for downstream nodes.

quarantine_df property ¤

DataFrame storing stage_df rows not meeting data quality expectations.

quarantine_sinks property ¤

List of sinks writing the quarantine DataFrame

sinks_count property ¤

Total number of sinks.

stage_df property ¤

Dataframe resulting from reading source and applying transformer, before data quality checks are applied.

upstream_node_names property ¤

Pipeline node names required to execute current node.

view_definition property ¤

Transformer View Definition (when applicable)

check_expectations() ¤

Check expectations, raise errors, warnings where required and build filtered and quarantine DataFrames.

Some actions have to be disabled when selected orchestrator is Databricks DLT:

  • Raising error on Failure when expectation is supported by DLT
  • Dropping rows when expectation is supported by DLT
Source code in laktory/models/pipeline/pipelinenode.py
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
def check_expectations(self):
    """
    Check expectations, raise errors, warnings where required and build
    filtered and quarantine DataFrames.

    Some actions have to be disabled when selected orchestrator is
    Databricks DLT:

    * Raising error on Failure when expectation is supported by DLT
    * Dropping rows when expectation is supported by DLT
    """

    # Data Quality Checks
    qfilter = None  # Quarantine filter
    kfilter = None  # Keep filter
    if self._stage_df is None:
        # Node without source or transformer
        return
    is_streaming = getattr(nw.to_native(self._stage_df), "isStreaming", False)
    if not self.expectations:
        return

    def _batch_check(df, node):
        for e in node.expectations:
            # Run Check: this only warn or raise exceptions.
            if not e.is_dlt_managed:
                e.run_check(
                    df,
                    raise_or_warn=True,
                    node=node,
                )

    def _stream_check(batch_df, batch_id, node):
        _batch_check(
            batch_df,
            node,
        )

    logger.info("Checking Data Quality Expectations")

    if not is_streaming:
        _batch_check(
            self._stage_df,
            self,
        )

    else:
        skip = False

        if self.is_dlt_execute:
            names = []
            for e in self.expectations:
                if not e.is_dlt_compatible:
                    names += [e.name]
            if names:
                raise TypeError(
                    f"Expectations {names} are not natively supported by DLT and can't be computed on a streaming DataFrame with DLT executor."
                )

            skip = True

        backend = DataFrameBackends.from_df(self._stage_df)
        if backend not in STREAMING_BACKENDS:
            raise TypeError(
                f"DataFrame backend {backend} is not supported for streaming operations"
            )

        if self.expectations_checkpoint_path is None:
            raise ValueError(
                f"Expectations Checkpoint not specified for node '{self.name}'"
            )

        # TODO: Refactor for backend other than spark
        if not skip:
            query = (
                self._stage_df.to_native()
                .writeStream.foreachBatch(
                    lambda batch_df, batch_id: _stream_check(
                        nw.from_native(batch_df), batch_id, self
                    )
                )
                .trigger(availableNow=True)
                .options(
                    checkpointLocation=self.expectations_checkpoint_path,
                )
                .start()
            )
            query.awaitTermination()

    # Build Filters
    for e in self.expectations:
        # Update Keep Filter
        if not e.is_dlt_managed:
            _filter = e.keep_filter
            if _filter is not None:
                if kfilter is None:
                    kfilter = _filter
                else:
                    kfilter = kfilter & _filter

        # Update Quarantine Filter
        _filter = e.quarantine_filter
        if _filter is not None:
            if qfilter is None:
                qfilter = _filter
            else:
                qfilter = qfilter & _filter

    if qfilter is not None:
        logger.info("Building quarantine DataFrame")
        self._quarantine_df = self._stage_df.filter(qfilter)
    else:
        self._quarantine_df = self._stage_df  # .filter("False")

    if kfilter is not None:
        logger.info("Dropping invalid rows")
        self._output_df = self._stage_df.filter(kfilter)
    else:
        self._output_df = self._stage_df

execute(apply_transformer=True, write_sinks=True, full_refresh=False, named_dfs=None, update_tables_metadata=True) ¤

Execute pipeline node by:

  • Reading the source
  • Applying the user defined (and layer-specific if applicable) transformations
  • Checking expectations
  • Writing the sinks
PARAMETER DESCRIPTION
apply_transformer

Flag to apply transformer in the execution

TYPE: bool DEFAULT: True

write_sinks

Flag to include writing sink in the execution

TYPE: bool DEFAULT: True

full_refresh

If True dataframe will be completely re-processed by deleting existing data and checkpoint before processing.

TYPE: bool DEFAULT: False

named_dfs

Named DataFrame passed to transformer nodes

TYPE: dict[str, AnyFrame] DEFAULT: None

update_tables_metadata

Update tables metadata

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
AnyFrame

output Spark DataFrame

Source code in laktory/models/pipeline/pipelinenode.py
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
def execute(
    self,
    apply_transformer: bool = True,
    write_sinks: bool = True,
    full_refresh: bool = False,
    named_dfs: dict[str, AnyFrame] = None,
    update_tables_metadata: bool = True,
) -> AnyFrame:
    """
    Execute pipeline node by:

    - Reading the source
    - Applying the user defined (and layer-specific if applicable) transformations
    - Checking expectations
    - Writing the sinks

    Parameters
    ----------
    apply_transformer:
        Flag to apply transformer in the execution
    write_sinks:
        Flag to include writing sink in the execution
    full_refresh:
        If `True` dataframe will be completely re-processed by deleting
        existing data and checkpoint before processing.
    named_dfs:
        Named DataFrame passed to transformer nodes
    update_tables_metadata:
        Update tables metadata

    Returns
    -------
    :
        output Spark DataFrame
    """
    logger.info(f"Executing pipeline node {self.name}")

    # Install dependencies
    pl = self.parent_pipeline
    if pl and not pl._imports_imported:
        for package_name in pl._imports:
            try:
                logger.info(f"Importing {package_name}")
                importlib.import_module(package_name)
            except ModuleNotFoundError:
                logger.info(f"Importing {package_name} failed.")
        pl._imports_imported = True

    # Parse DLT
    if self.is_orchestrator_dlt:
        logger.info("DLT orchestrator selected. Sinks writing will be skipped.")
        write_sinks = False
        full_refresh = False

    # Refresh
    if full_refresh:
        self.purge()

    # Read Source
    self._stage_df = None
    if self.source:
        self._stage_df = self.source.read()

    # Apply transformer
    if named_dfs is None:
        named_dfs = {}
    if apply_transformer and self.transformer:
        self._stage_df = self.transformer.execute(
            self._stage_df, named_dfs=named_dfs
        )

    # Check expectations
    self._output_df = self._stage_df
    self._quarantine_df = None
    self.check_expectations()

    # Output and Quarantine to Sinks
    if write_sinks and self.sinks:
        view_definition = self.view_definition

        for s in self.sinks:
            # Get DataFrame
            _df = self._output_df
            if s.is_quarantine:
                _df = self._quarantine_df

            # Create Sink
            s.create(df=_df)

            _is_update_metadata = (
                update_tables_metadata and s.metadata and not self.is_dlt_execute
            )

            if self.is_view:
                s.write(view_definition=view_definition)
                if _is_update_metadata:
                    s.metadata.execute()
                self._output_df = s.as_source().read()
            else:
                if _is_update_metadata:
                    s.metadata.execute()
                s.write(df=self._output_df)

                # Metadata update required because of schema overwrite
                if _is_update_metadata and s.metadata.update_required:
                    s.metadata.execute()

    return self._output_df