Data Sources and Sinks
Data Sources¤

The DataSource models facilitate loading data into a dataframe. It provides
reusable mechanisms for reading data of various nature given different
configuration and various execution contexts.
It is generally used as a component of a pipeline node. In this context, the sink may be used to store the output of the node or some quarantined data if expectations are set and not met.
File Data Source¤
API Documentation
File data source supports reading files stored on disk.
import laktory as lk
source = lk.models.FileDataSource(
path="/Volumes/sources/landing/events/yahoo-finance/stock_prices/",
format="JSON",
as_stream=False,
dataframe_backend="PYSPARK"
)
df = source.read()
as_stream to True.
from laktory import models
source = models.FileDataSource(
path="/Volumes/sources/landing/events/yahoo-finance/stock_price",
format="JSON",
as_stream=True,
dataframe_backend="PYSPARK"
)
df_stream = source.read()
You can also select a different DataFrame backend for reading your files
import laktory as lk
source = lk.models.FileDataSource(
path="/Volumes/sources/landing/events/yahoo-finance/stock_prices.parquet",
format="PARQUET",
dataframe_backend="POLARS"
)
df = source.read()
Table Data Source¤
API Documentation
laktory.models.HiveMetastoreDataSource
laktory.models.UnityCatalogDataSource
When your data is already loaded into data table, you can use the
UnityCatalogDataSource or HiveMetastoreDataSource models instead
import laktory as lk
source = lk.models.UnityCatalogDataSource(
table_name="brz_stock_prices",
selects=["symbol", "open", "close"],
filter="symbol='AAPL'",
as_stream=True,
)
df = source.read()
- the
selectsargument is used to select onlysymbol,openandclosecolumns - the
filterargument is used to select only rows associated with Apple stock.
More data sources (like Kafka / Event Hub / Kinesis streams) will be supported in the future.
Pipeline Node Data Source¤
API Documentation
To establish a relationship between two nodes in a data pipeline, the
PipelineNodeDataSource must be used. Assuming each node is a vertex in a
directed acyclic graph (DAG), using a PipelineNodeDataSource creates an edge
between two vertices. It also defines the execution order of the nodes.
import laktory as lk
source = lk.models.PipelineNodeDataSource(
node_name="brz_stock_prices",
as_stream=True,
)
- Single Worker Execution: The source uses the in-memory output dataframe from the upstream node.
- Multi-Workers Execution: The source uses the upstream node sink as a source for read the dataframe.
- DLT Execution: The source uses
dlt.read()anddlt,read_stream()to read data from the upstream node.
Data Sinks¤
API Documentation
laktory.models.FileDataSink
laktory.models.UnityCatalogDataSink

Analogously to DataSource, DataSink models facilitate writing a dataframe
into a target location. It provides re-usable mechanisms for writing data
in various formats, adapting to different execution contexts.
It is generally used as a component of a pipeline node.
Data sinks also support the merge of a Change Data Capture (CDC).
File Data Sink¤
API Documentation
File data sink supports writing a dataframe as files to a disk location using a variety of storage format. For streaming dataframes, you also need to specify a checkpoint location.
import narwhals as nw
import polars as pl
import laktory as lk
df = nw.from_native(
pl.DataFrame({"symbol": ["AAPL", "GOOGL"]})
)
sink = lk.models.FileDataSink(
path="/Volumes/sources/landing/events/yahoo-finance/stock_price.parquet",
format="PARQUET",
mode="OVERWRITE",
)
sink.write(df)
Table Data Sink¤
API Documentation
laktory.models.UnityCatalogDataSink
laktory.models.HiveMetastoreDataSink
The UnityCatlaogDataSink and HiveMetastoreDataSink classes provide a convenient way
to write a DataFrame to a data table. It simplifies the process of persisting data
in a structured format, supporting both physical tables and SQL views.
To write a DataFrame to a physical table:
import narwhals as nw
import laktory as lk
df = nw.from_native(
spark.createDataFrame([("AAPL"), ("GOOGL")], ["symbol"])
)
sink = lk.models.UnityCatalogDataSink(
schema_name="finance",
table_name="brz_stock_prices",
)
sink.write(df)
UnityCatlaogDataSink also supports creating non-materialized SQL views instead of
physical tables. To write a DataFrame as a SQL view:
import narwhas as nw
import laktory as lk
df = nw.from_native(
spark.createDataFrame([("AAPL"), ("GOOGL")], ["symbol"])
)
sink = lk.models.TableDataSink(
schema_name="finance",
table_name="brz_stock_prices",
table_type="VIEW",
view_definition="SELECT * from {df}"
)
sink.write(df)
Pipeline View Data Sink¤
API Documentation
laktory.models.PipelineViewDataSink
laktory.models.PipelineViewDataSink
The PipelineViewDataSink can be used in the context of a Declarative Pipeline
such as Databricks Lakeflow Declarative Pipeline. A virtual view is created in the context of the pipeline, but the
data is not materialized. Views are useful for simplifying complex queries, encapsulating business logic, and providing
a consistent interface to the underlying data without duplicating storage.
import laktory as lk
sink = lk.models.PipelineViewDataSink(
pipeline_view_name="brz_stock_prices",
)