Skip to content

PipelineTask

laktory.models.pipeline.PipelineTask ¤

Bases: BaseModel

A pipeline task is a unit of execution within a pipeline, defined by a set of nodes to be executed together.

PARAMETER DESCRIPTION
name

Pipeline task name

TYPE: str | VariableType

node_names

List of node names in sorted order of execution

TYPE: list[str | VariableType] | VariableType

pipeline

Pipeline

TYPE: Annotated[Pipeline, SkipValidation] | VariableType

METHOD DESCRIPTION
execute

Execute the pipeline task.

ATTRIBUTE DESCRIPTION
has_sinks

True if at least one sink is found in task nodes.

TYPE: bool

nodes

Task nodes

upstream_task_names

Get upstream task names

TYPE: list[str]

has_sinks property ¤

True if at least one sink is found in task nodes.

nodes property ¤

Task nodes

upstream_task_names property ¤

Get upstream task names

execute(write_sinks=True, full_refresh=False, named_dfs=None, update_tables_metadata=True) ¤

Execute the pipeline task.

PARAMETER DESCRIPTION
write_sinks

If False writing of node sinks will be skipped

DEFAULT: True

full_refresh

If True all nodes will be completely re-processed by deleting existing data and checkpoints before processing.

TYPE: bool DEFAULT: False

named_dfs

Named DataFrames to be passed to pipeline nodes transformer.

TYPE: dict[str, AnyFrame] DEFAULT: None

update_tables_metadata

Update tables metadata

TYPE: bool DEFAULT: True

Source code in laktory/models/pipeline/pipelinetask.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def execute(
    self,
    write_sinks=True,
    full_refresh: bool = False,
    named_dfs: dict[str, AnyFrame] = None,
    update_tables_metadata: bool = True,
) -> None:
    """
    Execute the pipeline task.

    Parameters
    ----------
    write_sinks:
        If `False` writing of node sinks will be skipped
    full_refresh:
        If `True` all nodes will be completely re-processed by deleting
        existing data and checkpoints before processing.
    named_dfs:
        Named DataFrames to be passed to pipeline nodes transformer.
    update_tables_metadata:
        Update tables metadata
    """

    logger.info(f"Executing pipeline task '{self.name}'")

    # Execute nodes
    for node_name in self.node_names:
        node = self.pipeline.nodes_dict[node_name]
        if named_dfs is None:
            named_dfs = {}

        node.execute(
            write_sinks=write_sinks,
            full_refresh=full_refresh,
            named_dfs=named_dfs,
            update_tables_metadata=update_tables_metadata,
        )