Skip to content

PipelineNodeDataSource

laktory.models.datasources.PipelineNodeDataSource ¤

Bases: BaseDataSource

Data Source using an upstream pipeline node. Using a pipeline node data source defines the interdependencies between each node in a pipeline. Depending on the selected pipeline orchestrator and the context, a pipeline node data source might read the data from:

  • memory
  • upstream node sink
  • DLT table

Examples:

from laktory import models

brz = models.PipelineNode(
    name="brz_stock_prices",
    source={
        "path": "/Volumes/sources/landing/events/yahoo-finance/stock_price",
        "format": "JSON",
    },
    sinks=[
        {
            "path": "/Volumes/sources/landing/tables/brz_stock_prices",
            "format": "PARQUET",
        }
    ],
)

slv = models.PipelineNode(
    name="slv_stock_prices",
    source={"node_name": "brz_stock_prices"},
    sinks=[
        {
            "path": "/Volumes/sources/landing/tables/slv_stock_prices",
            "format": "PARQUET",
        }
    ],
)

pl = models.Pipeline(name="pl-stock-prices", nodes=[brz, slv])

# pl.execute(spark=spark)
References
PARAMETER DESCRIPTION
as_stream

If Truesource is read as a streaming DataFrame. Currently only supported by Spark DataFrame backend.

TYPE: bool | VariableType DEFAULT: False

drop_duplicates

Remove duplicated rows from source using all columns if True or only the provided column names.

TYPE: bool | list[str] | VariableType DEFAULT: None

drops

List of columns to drop

TYPE: list | VariableType DEFAULT: None

filter

SQL expression used to select specific rows from the source table

TYPE: str | VariableType DEFAULT: None

node_name

Name of the upstream pipeline node

TYPE: str | VariableType

reader_kwargs

Keyword arguments passed directly to dataframe backend reader. Passed to .options() method when using PySpark.

TYPE: dict[str | VariableType, Any | VariableType] | VariableType DEFAULT: {}

reader_methods

DataFrame backend reader methods.

TYPE: list[ReaderWriterMethod | VariableType] | VariableType DEFAULT: []

renames

Mapping between the source column names and desired column names

TYPE: dict[str | VariableType, str | VariableType] | VariableType DEFAULT: None

selects

Columns to select from the source. Can be specified as a list or as a dictionary to rename the source columns

TYPE: list[str] | dict[str, str] | VariableType DEFAULT: None

type

Name of the data source type

TYPE: str | VariableType DEFAULT: 'PIPELINE_NODE'

METHOD DESCRIPTION
read

Read data with options specified in attributes.

read(**kwargs) ¤

Read data with options specified in attributes.

RETURNS DESCRIPTION
AnyFrame

Resulting dataframe

Source code in laktory/models/datasources/basedatasource.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def read(self, **kwargs) -> AnyFrame:
    """
    Read data with options specified in attributes.

    Returns
    -------
    :
        Resulting dataframe
    """
    logger.info(
        f"Reading `{self.__class__.__name__}` {self._id} with {self.dataframe_backend}"
    )
    df = self._read(**kwargs)

    # Convert to Narwhals
    if not isinstance(df, (nw.LazyFrame, nw.DataFrame)):
        df = nw.from_native(df)

    # Post read
    df = self._post_read(df)

    logger.info("Read completed.")

    return df