Skip to content

Data Sources and Sinks

Data Sources¤

node source

The DataSource models facilitate loading data into a dataframe. It provides reusable mechanisms for reading data of various nature given different configuration and various execution contexts.

It is generally used as a component of a pipeline node. In this context, the sink may be used to store the output of the node or some quarantined data if expectations are set and not met.

Naming Sources¤

A pipeline node can declare one or more sources via its sources list. Each source accepts an optional name field. The name is used to reference that source inside transformer expressions using the {sources.name} placeholder:

nodes:
- name: slv_stocks
  sources:
  - name: prices        # referenced as {sources.prices}
    node_name: brz_stock_prices
  - name: metadata      # referenced as {sources.metadata}
    node_name: brz_stock_metadata
    selects: [symbol, currency]
  transformer:
    nodes:
    - func_name: join
      func_kwargs:
        other: "{sources.metadata}"
        on: symbol

When a node has only one source, name is optional and the source is accessible as {df} in transformer expressions (which always refers to the flowing DataFrame - the primary source for the first step, and the output of the previous step for subsequent ones).

nodes:
- name: slv_stock_prices
  sources:
  - node_name: brz_stock_prices   # no name needed for a single source
  transformer:
    nodes:
    - expr: SELECT symbol, open, close FROM {df}

File Data Source¤

API Documentation

laktory.models.FileDataSource

File data source supports reading files stored on disk.

import laktory as lk

source = lk.models.FileDataSource(
    path="/Volumes/sources/landing/events/yahoo-finance/stock_prices/",
    format="JSON",
    as_stream=False,
    dataframe_backend="PYSPARK"
)
df = source.read()
Reading the same dataset, but as a spark streaming source, is as easy as changing as_stream to True.
from laktory import models

source = models.FileDataSource(
    path="/Volumes/sources/landing/events/yahoo-finance/stock_price",
    format="JSON",
    as_stream=True,
    dataframe_backend="PYSPARK"
)
df_stream = source.read()

You can also select a different DataFrame backend for reading your files

import laktory as lk

source = lk.models.FileDataSource(
    path="/Volumes/sources/landing/events/yahoo-finance/stock_prices.parquet",
    format="PARQUET",
    dataframe_backend="POLARS"
)
df = source.read()

Table Data Source¤

API Documentation

laktory.models.HiveMetastoreDataSource
laktory.models.UnityCatalogDataSource

When your data is already loaded into data table, you can use the UnityCatalogDataSource or HiveMetastoreDataSource models instead

import laktory as lk

source = lk.models.UnityCatalogDataSource(
    table_name="brz_stock_prices",
    selects=["symbol", "open", "close"],
    filter="symbol='AAPL'",
    as_stream=True,
)
df = source.read()
In this case
  • the selects argument is used to select only symbol, open and close columns
  • the filter argument is used to select only rows associated with Apple stock.

More data sources (like Kafka / Event Hub / Kinesis streams) will be supported in the future.

Pipeline Node Data Source¤

API Documentation

laktory.models.PipelineNodeDataSource

To establish a relationship between two nodes in a data pipeline, the PipelineNodeDataSource must be used. Assuming each node is a vertex in a directed acyclic graph (DAG), using a PipelineNodeDataSource creates an edge between two vertices. It also defines the execution order of the nodes.

import laktory as lk

source = lk.models.PipelineNodeDataSource(
    node_name="brz_stock_prices",
    as_stream=True,
)
This type of data source adapts to its execution context.
  • Single Worker Execution: The source uses the in-memory output dataframe from the upstream node.
  • Multi-Workers Execution: The source uses the upstream node sink as a source for read the dataframe.
  • Declarative Pipeline Execution: The source uses spark.read() and spark.readStream() to read data from the upstream node.

Data Sinks¤

API Documentation

laktory.models.FileDataSink
laktory.models.UnityCatalogDataSink

node sink

Analogously to DataSource, DataSink models facilitate writing a dataframe into a target location. It provides re-usable mechanisms for writing data in various formats, adapting to different execution contexts.

It is generally used as a component of a pipeline node.

Data sinks also support the merge of a Change Data Capture (CDC).

File Data Sink¤

API Documentation

laktory.models.FileDataSink

File data sink supports writing a dataframe as files to a disk location using a variety of storage format. For streaming dataframes, you also need to specify a checkpoint location.

import narwhals as nw
import polars as pl

import laktory as lk

df = nw.from_native(
    pl.DataFrame({"symbol": ["AAPL", "GOOGL"]})
)

sink = lk.models.FileDataSink(
    path="/Volumes/sources/landing/events/yahoo-finance/stock_price.parquet",
    format="PARQUET",
    mode="OVERWRITE",
)
sink.write(df)

Table Data Sink¤

API Documentation

laktory.models.UnityCatalogDataSink
laktory.models.HiveMetastoreDataSink

The UnityCatlaogDataSink and HiveMetastoreDataSink classes provide a convenient way to write a DataFrame to a data table. It simplifies the process of persisting data in a structured format, supporting both physical tables and SQL views.

To write a DataFrame to a physical table:

import narwhals as nw

import laktory as lk

df = nw.from_native(
    spark.createDataFrame([("AAPL"), ("GOOGL")], ["symbol"])
)

sink = lk.models.UnityCatalogDataSink(
    schema_name="finance",
    table_name="brz_stock_prices",
)
sink.write(df)

UnityCatlaogDataSink also supports creating non-materialized SQL views instead of physical tables. To write a DataFrame as a SQL view:

import narwhas as nw

import laktory as lk

df = nw.from_native(
    spark.createDataFrame([("AAPL"), ("GOOGL")], ["symbol"])
)

sink = lk.models.TableDataSink(
    schema_name="finance",
    table_name="brz_stock_prices",
    table_type="VIEW",
    view_definition="SELECT * from {df}"
)
sink.write(df)

Pipeline View Data Sink¤

API Documentation

laktory.models.PipelineViewDataSink
laktory.models.PipelineViewDataSink

The PipelineViewDataSink can be used in the context of a Declarative Pipeline such as Databricks Lakeflow Declarative Pipeline. A virtual view is created in the context of the pipeline, but the data is not materialized. Views are useful for simplifying complex queries, encapsulating business logic, and providing a consistent interface to the underlying data without duplicating storage.

import laktory as lk

sink = lk.models.PipelineViewDataSink(
    pipeline_view_name="brz_stock_prices",
)