Skip to content

DataFrameMethod

laktory.models.dataframe.DataFrameMethod ¤

Bases: BaseModel, PipelineChild

Definition of a DataFrame method to be applied. Both native and Narwhals API are supported.

Examples:

import polars as pl

import laktory as lk

df0 = pl.DataFrame(
    {
        "x": [1.1, 2.2, 3.3],
    }
)

m1 = lk.models.DataFrameMethod(
    func_name="with_columns",
    func_kwargs={"xr": "nw.col('x').round()"},
    dataframe_api="NARWHALS",
)
df = m1.execute(df0)

m2 = lk.models.DataFrameMethod(
    func_name="select", func_args=["pl.col('x').sqrt()"], dataframe_api="NATIVE"
)
df = m2.execute(df0)

print(df.to_native())
'''
| x        |
|----------|
| 1.048809 |
| 1.48324  |
| 1.81659  |
'''
PARAMETER DESCRIPTION
func_args

Arguments passed to method. A DataSource model can be passed instead of a DataFrame.

TYPE: list[DataFrameMethodArg | Any | VariableType] | VariableType DEFAULT: []

func_kwargs

Keyword arguments passed to method. A DataSource model can be passed instead of a DataFrame.

TYPE: dict[str | VariableType, DataFrameMethodArg | Any | VariableType] | VariableType DEFAULT: {}

func_name

DataFrame method or attribute name (e.g. 'select', 'filter', 'dt.strftime'). Resolved as an attribute of the DataFrame object.

TYPE: str | VariableType

METHOD DESCRIPTION
execute

Execute method on provided DataFrame df.

ATTRIBUTE DESCRIPTION
data_sources

Get all sources feeding the DataFrame Method

TYPE: list[DataSourcesUnion]

upstream_node_names

Pipeline node names required to apply transformer node.

TYPE: list[str]

data_sources property ¤

Get all sources feeding the DataFrame Method

upstream_node_names property ¤

Pipeline node names required to apply transformer node.

execute(df) ¤

Execute method on provided DataFrame df.

PARAMETER DESCRIPTION
df

Input dataframe

TYPE: AnyFrame

RETURNS DESCRIPTION
Output dataframe
Source code in laktory/models/dataframe/dataframemethod.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
def execute(self, df: AnyFrame) -> Union[AnyFrame]:
    """
    Execute method on provided DataFrame `df`.

    Parameters
    ----------
    df:
        Input dataframe

    Returns
    -------
        Output dataframe
    """

    # Get and set Backend (required to evaluate arguments)
    backend = DataFrameBackends.from_df(df)
    self.dataframe_backend_ = backend

    # Convert to Narwhals
    if not isinstance(df, AnyFrame):
        df = nw.from_native(df)
    if self.dataframe_api == "NATIVE":
        df = df.to_native()

    # Get Function
    namespace = None
    func_name = self.func_name
    func_full_name = func_name
    if "." in func_name:
        namespace, func_name = func_name.split(".")
    df_as_input = False

    # Get from built-in narwhals and narwhals extension (including Laktory) functions
    f = None
    if f is None:
        # Get function from namespace extension
        if namespace:
            f = getattr(getattr(df, namespace), func_name, None)
        else:
            f = getattr(df, func_name, None)

    if f is None:
        df_type = type(df)
        raise ValueError(
            f"Function {func_full_name} is not available on dataframe of type {str(df_type)} with {self.dataframe_api} API"
        )

    _args = self.func_args
    _kwargs = self.func_kwargs

    # Build log
    func_log = f"df.{func_full_name}("
    func_log += ",".join([a.signature() for a in _args])
    func_log += ",".join([f"{k}={a.signature()}" for k, a in _kwargs.items()])
    func_log += f") with df type {type(df)}"
    logger.info(f"Applying {func_log}")

    # Build args
    args = []
    if df_as_input:
        args += [df]
    for i, _arg in enumerate(_args):
        args += [_arg.eval(backend=backend)]

    # Build kwargs
    kwargs = {}
    for k, _arg in _kwargs.items():
        kwargs[k] = _arg.eval(backend=backend)

    # Inject laktory_context if the function accepts it
    from laktory.models.laktorycontext import LaktoryContext
    from laktory.models.laktorycontext import _build_laktory_context_kwargs

    context = LaktoryContext(
        node=self.parent_pipeline_node,
        pipeline=self.parent_pipeline,
    )
    kwargs.update(_build_laktory_context_kwargs(f, context))

    # Call function
    df = f(*args, **kwargs)

    # Convert to narwhals when custom function don't return a Narwhals DataFrame
    if not isinstance(df, AnyFrame):
        df = nw.from_native(df)

    return df