Pipeline
laktory.models.Pipeline
¤
Bases: BaseModel
, PulumiResource
, TerraformResource
, PipelineChild
Pipeline model to manage a full-fledged data pipeline including reading from data sources, applying data transformations through Spark and outputting to data sinks.
A pipeline is composed of collections of nodes
, each one defining its
own source, transformations and optional sink. A node may be the source of
another node.
A pipeline may be run manually by using python or the CLI, but it may also be deployed and scheduled using one of the supported orchestrators, such as a Databricks Delta Live Tables or job.
ATTRIBUTE | DESCRIPTION |
---|---|
databricks_job |
Defines the Databricks Job specifications when DATABRICKS_JOB is selected as the orchestrator. Requires to add the supporting notebook to the stack.
TYPE:
|
databricks_dlt |
Defines the Databricks DLT specifications when DATABRICKS_DLT is selected as the orchestrator. Requires to add the supporting notebook to the stack.
TYPE:
|
dependencies |
List of dependencies required to run the pipeline. If Laktory is not provided, it's current version is added to the list. |
name |
Name of the pipeline
TYPE:
|
nodes |
List of pipeline nodes. Each node defines a data source, a series of transformations and optionally a sink.
TYPE:
|
orchestrator |
Orchestrator used for scheduling and executing the pipeline. The selected option defines which resources are to be deployed. Supported options are:
TYPE:
|
udfs |
List of user defined functions provided to the transformer.
TYPE:
|
root_path |
Location of the pipeline node root used to store logs, metrics and checkpoints.
TYPE:
|
Examples:
This first example shows how to configure a simple pipeline with 2 nodes. Upon execution, raw data will be read from a CSV file and two DataFrames (bronze and silver) will be created and saved as parquet files. Notice how the first node is used as a data source for the second node.
import io
from laktory import models
pipeline_yaml = '''
name: pl-stock-prices
nodes:
- name: brz_stock_prices
layer: BRONZE
source:
format: CSV
path: ./raw/brz_stock_prices.csv
sinks:
- format: PARQUET
mode: OVERWRITE
path: ./dataframes/brz_stock_prices
- name: slv_stock_prices
layer: SILVER
source:
node_name: brz_stock_prices
sinks:
- format: PARQUET
mode: OVERWRITE
path: ./dataframes/slv_stock_prices
transformer:
nodes:
- with_column:
name: created_at
type: timestamp
expr: data.created_at
- with_column:
name: symbol
expr: data.symbol
- with_column:
name: close
type: double
expr: data.close
- func_name: drop
func_args:
- value: data
- value: producer
- value: name
- value: description
'''
pl = models.Pipeline.model_validate_yaml(io.StringIO(pipeline_yaml))
# Execute pipeline
# pl.execute()
The next example defines a 3 nodes pipeline (1 bronze and 2 silvers)
orchestrated with a Databricks Job. Notice how nodes are used as data
sources not only for other nodes, but also for the other
keyword argument
of the smart join function (slv_stock_prices). Because we are using the
DATABRICKS_JOB orchestrator, the job configuration must be declared.
The tasks will be automatically created by the Pipeline model. Each
task will execute a single node using the notebook referenced in
databricks_job.notebook_path
the content of this notebook should be
similar to laktory.resources.notebooks.job_laktory.pl
import io
from laktory import models
pipeline_yaml = '''
name: pl-stock-prices
orchestrator: DATABRICKS_JOB
databricks_job:
name: job-pl-stock-prices
notebook_path: /Workspace/.laktory/jobs/job_laktory_pl.py
clusters:
- name: node-cluster
spark_version: 14.0.x-scala2.12
node_type_id: Standard_DS3_v2
dependencies:
- laktory==0.3.0
- yfinance
nodes:
- name: brz_stock_prices
layer: BRONZE
source:
path: /Volumes/dev/sources/landing/events/yahoo-finance/stock_price/
sinks:
- path: /Volumes/dev/sources/landing/tables/dev_stock_prices/
mode: OVERWRITE
- name: slv_stock_prices
layer: SILVER
source:
node_name: brz_stock_prices
sinks:
- path: /Volumes/dev/sources/landing/tables/slv_stock_prices/
mode: OVERWRITE
transformer:
nodes:
- with_column:
name: created_at
type: timestamp
expr: data.created_at
- with_column:
name: symbol
expr: data.symbol
- with_column:
name: close
type: double
expr: data.close
- func_name: drop
func_args:
- value: data
- value: producer
- value: name
- value: description
- func_name: laktory.smart_join
func_kwargs:
'on':
- symbol
other:
node_name: slv_stock_meta
- name: slv_stock_meta
layer: SILVER
source:
path: /Volumes/dev/sources/landing/events/yahoo-finance/stock_meta/
sinks:
- path: /Volumes/dev/sources/landing/tables/slv_stock_meta/
mode: OVERWRITE
'''
pl = models.Pipeline.model_validate_yaml(io.StringIO(pipeline_yaml))
Finally, we re-implement the previous pipeline, but with a few key differences:
- Orchestrator is
DATABRICKS_DLT
instead of aDATABRICKS_JOB
- Sinks are Unity Catalog tables instead of storage locations
- Data is read as a stream in most nodes
slv_stock_meta
is simply a DLT view since it does not have an associated sink.
We also need to provide some basic configuration for the DLT pipeline.
import io
from laktory import models
pipeline_yaml = '''
name: pl-stock-prices
orchestrator: DATABRICKS_DLT
databricks_dlt:
catalog: dev
target: sandbox
access_controls:
- group_name: users
permission_level: CAN_VIEW
nodes:
- name: brz_stock_prices
layer: BRONZE
source:
path: /Volumes/dev/sources/landing/events/yahoo-finance/stock_price/
as_stream: true
sinks:
- table_name: brz_stock_prices
- name: slv_stock_prices
layer: SILVER
source:
node_name: brz_stock_prices
as_stream: true
sinks:
- table_name: slv_stock_prices
transformer:
nodes:
- with_column:
name: created_at
type: timestamp
expr: data.created_at
- with_column:
name: symbol
expr: data.symbol
- with_column:
name: close
type: double
expr: data.close
- func_name: drop
func_args:
- value: data
- value: producer
- value: name
- value: description
- func_name: laktory.smart_join
func_kwargs:
'on':
- symbol
other:
node_name: slv_stock_meta
- name: slv_stock_meta
layer: SILVER
source:
path: /Volumes/dev/sources/landing/events/yahoo-finance/stock_meta/
'''
pl = models.Pipeline.model_validate_yaml(io.StringIO(pipeline_yaml))
References
METHOD | DESCRIPTION |
---|---|
push_df_backend |
Need to push dataframe_backend which is required to differentiate between spark and polars transformer |
execute |
Execute the pipeline (read sources and write sinks) by sequentially |
dag_figure |
[UNDER DEVELOPMENT] Generate a figure representation of the pipeline |
Attributes¤
nodes_dict
property
¤
nodes_dict
Nodes dictionary whose keys are the node names.
RETURNS | DESCRIPTION |
---|---|
dict[str, PipelineNode]
|
Nodes |
dag
property
¤
dag
Networkx Directed Acyclic Graph representation of the pipeline. Useful to identify interdependencies between nodes.
RETURNS | DESCRIPTION |
---|---|
DiGraph
|
Directed Acyclic Graph |
sorted_nodes
property
¤
sorted_nodes
Topologically sorted nodes.
RETURNS | DESCRIPTION |
---|---|
list[PipelineNode]
|
List of Topologically sorted nodes. |
additional_core_resources
property
¤
additional_core_resources
if orchestrator is DLT
:
- DLT Pipeline
if orchestrator is DATABRICKS_JOB
:
- Databricks Job
Functions¤
push_df_backend
classmethod
¤
push_df_backend(data)
Need to push dataframe_backend which is required to differentiate between spark and polars transformer
Source code in laktory/models/pipeline/pipeline.py
375 376 377 378 379 380 381 382 383 384 385 386 387 |
|
execute
¤
execute(spark=None, udfs=None, write_sinks=True, full_refresh=False)
Execute the pipeline (read sources and write sinks) by sequentially executing each node. The selected orchestrator might impact how data sources or sinks are processed.
PARAMETER | DESCRIPTION |
---|---|
spark
|
Spark Session
DEFAULT:
|
udfs
|
List of user-defined functions used in transformation chains.
DEFAULT:
|
write_sinks
|
If
DEFAULT:
|
full_refresh
|
If
TYPE:
|
Source code in laktory/models/pipeline/pipeline.py
601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 |
|
dag_figure
¤
dag_figure()
[UNDER DEVELOPMENT] Generate a figure representation of the pipeline DAG.
RETURNS | DESCRIPTION |
---|---|
Figure
|
Plotly figure representation of the pipeline. |
Source code in laktory/models/pipeline/pipeline.py
631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 |
|
laktory.models.pipeline.pipeline.PipelineUDF
¤
Bases: BaseModel
Pipeline User Define Function
ATTRIBUTE | DESCRIPTION |
---|---|
module_name |
Name of the module from which the function needs to be imported.
TYPE:
|
function_name |
Name of the function.
TYPE:
|
module_path |
Workspace filepath of the module, if not in the same directory as the pipeline notebook
TYPE:
|