Skip to content

DataFrameTransformer

laktory.models.dataframe.DataFrameTransformer ¤

Bases: BaseModel, PipelineChild

A chain of transformations to be applied to a DataFrame. Transformations can be SQL- or DataFrame API-based.

Examples:

import polars as pl

import laktory as lk

df0 = pl.DataFrame(
    {
        "id": ["a", "b", "c"],
        "x1": [1, 2, 3],
    }
)

node0 = lk.models.DataFrameMethod(
    func_name="with_columns",
    func_kwargs={
        "y1": "x1",
    },
)
node1 = lk.models.DataFrameExpr(expr="select id, x1, y1 from {df}")
transformer = lk.models.DataFrameTransformer(nodes=[node0, node1])

df = transformer.execute(df0).collect()

print(df)
'''
┌──────────────────┐
|Narwhals DataFrame|
|------------------|
| | id | x1 | y1 | |
| |----|----|----| |
| | a  | 1  | 1  | |
| | b  | 2  | 2  | |
| | c  | 3  | 3  | |
└──────────────────┘
'''
References
PARAMETER DESCRIPTION
nodes

List of transformations

TYPE: list[DataFrameMethod | DataFrameExpr | VariableType] | VariableType

METHOD DESCRIPTION
execute

Execute transformation nodes on provided DataFrame df

ATTRIBUTE DESCRIPTION
data_sources

Get all sources feeding the Transformer

is_valid_view_definition

Identify if transformer can be used to create a SQL view.

upstream_node_names

Pipeline node names required to apply transformer

TYPE: list[str]

data_sources property ¤

Get all sources feeding the Transformer

is_valid_view_definition property ¤

Identify if transformer can be used to create a SQL view.

upstream_node_names property ¤

Pipeline node names required to apply transformer

execute(df, named_dfs=None) ¤

Execute transformation nodes on provided DataFrame df

PARAMETER DESCRIPTION
df

Input dataframe

named_dfs

Other DataFrame(s) to be passed to the method.

DEFAULT: None

RETURNS DESCRIPTION
Output dataframe
Source code in laktory/models/dataframe/dataframetransformer.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def execute(self, df, named_dfs=None) -> AnyFrame:
    """
    Execute transformation nodes on provided DataFrame `df`

    Parameters
    ----------
    df:
        Input dataframe
    named_dfs:
        Other DataFrame(s) to be passed to the method.

    Returns
    -------
        Output dataframe
    """
    logger.info("Executing DataFrame Transformer")

    if named_dfs is None:
        named_dfs = {}

    for inode, node in enumerate(self.nodes):
        tnode = type(node)
        logger.info(
            f"Executing DataFrame transformer node {inode} ({tnode.__name__})."
        )

        if isinstance(node, DataFrameMethod):
            df = node.execute(df)
        elif isinstance(node, DataFrameExpr):
            dfs = {}
            if df is not None:
                dfs["df"] = df
            dfs = dfs | named_dfs
            df = node.to_df(dfs)
        else:
            raise NotImplementedError()

    return df