Skip to content

DataFrameMethod

laktory.models.dataframe.DataFrameMethod ¤

Bases: BaseModel, PipelineChild

Definition of a DataFrame method to be applied. Both native and Narwhals API are supported.

Examples:

import polars as pl

import laktory as lk

df0 = pl.DataFrame(
    {
        "x": [1.1, 2.2, 3.3],
    }
)

m1 = lk.models.DataFrameMethod(
    func_name="with_columns",
    func_kwargs={"xr": "nw.col('x').round()"},
    dataframe_api="NARWHALS",
)
df = m1.execute(df0)

m2 = lk.models.DataFrameMethod(
    func_name="select", func_args=["pl.col('x').sqrt()"], dataframe_api="NATIVE"
)
df = m2.execute(df0)

print(df.to_native())
'''
| x        |
|----------|
| 1.048809 |
| 1.48324  |
| 1.81659  |
'''
PARAMETER DESCRIPTION
func_args

Arguments passed to method. Use {df} for the primary source, {sources.name} for a named source, or {nodes.X} for an upstream pipeline node output.

TYPE: list[DataFrameMethodArg | VariableType] | VariableType DEFAULT: []

func_kwargs

Keyword arguments passed to method. Use {df} for the primary source, {sources.name} for a named source, or {nodes.X} for an upstream pipeline node output.

TYPE: dict[str | VariableType, DataFrameMethodArg | VariableType] | VariableType DEFAULT: {}

func_name

DataFrame method or attribute name (e.g. 'select', 'filter', 'dt.strftime'). Resolved as an attribute of the DataFrame object.

TYPE: str | VariableType

METHOD DESCRIPTION
execute

Execute method on provided DataFrame df.

ATTRIBUTE DESCRIPTION
data_sources

Get PipelineNodeDataSource objects for each {nodes.X} reference in func_args/func_kwargs.

TYPE: list

upstream_node_names

Pipeline node names referenced via {nodes.X} in func_args / func_kwargs.

TYPE: list[str]

data_sources property ¤

Get PipelineNodeDataSource objects for each {nodes.X} reference in func_args/func_kwargs.

upstream_node_names property ¤

Pipeline node names referenced via {nodes.X} in func_args / func_kwargs.

execute(df, named_dfs=None) ¤

Execute method on provided DataFrame df.

PARAMETER DESCRIPTION
df

Input dataframe

TYPE: AnyFrame

named_dfs

Pre-loaded named DataFrames available for {nodes.X} / {key} arg references.

TYPE: dict | None DEFAULT: None

RETURNS DESCRIPTION
Output dataframe
Source code in laktory/models/dataframe/dataframemethod.py
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
def execute(self, df: AnyFrame, named_dfs: dict | None = None) -> AnyFrame:
    """
    Execute method on provided DataFrame `df`.

    Parameters
    ----------
    df:
        Input dataframe
    named_dfs:
        Pre-loaded named DataFrames available for ``{nodes.X}`` / ``{key}`` arg references.

    Returns
    -------
        Output dataframe
    """

    # Get and set Backend (required to evaluate arguments)
    backend = DataFrameBackends.from_df(df)
    self.dataframe_backend_ = backend

    # Convert to Narwhals
    if not isinstance(df, AnyFrame):
        df = nw.from_native(df)
    if self.dataframe_api == "NATIVE":
        df = df.to_native()

    # Get Function
    namespace = None
    func_name = self.func_name
    func_full_name = func_name
    if "." in func_name:
        namespace, func_name = func_name.split(".")
    df_as_input = False

    # Get from built-in narwhals and narwhals extension (including Laktory) functions
    f = None
    if f is None:
        # Get function from namespace extension
        if namespace:
            f = getattr(getattr(df, namespace), func_name, None)
        else:
            # getattr requires schema analysis - which SDP blocks. hasattr is safe
            if hasattr(type(df), func_name):
                f = getattr(df, func_name)

    if f is None:
        df_type = type(df)
        raise ValueError(
            f"Function {func_full_name} is not available on dataframe of type {str(df_type)} with {self.dataframe_api} API"
        )

    _args = self.func_args
    _kwargs = self.func_kwargs

    # Build log
    func_log = f"df.{func_full_name}("
    func_log += ",".join([a.signature() for a in _args])
    func_log += ",".join([f"{k}={a.signature()}" for k, a in _kwargs.items()])
    func_log += f") with df type {type(df)}"
    logger.info(f"Applying {func_log}")

    # Build args
    args = []
    if df_as_input:
        args += [df]
    for i, _arg in enumerate(_args):
        args += [_arg.eval(backend=backend, named_dfs=named_dfs)]

    # Build kwargs
    kwargs = {}
    for k, _arg in _kwargs.items():
        kwargs[k] = _arg.eval(backend=backend, named_dfs=named_dfs)

    # Inject laktory_context if the function accepts it
    from laktory.models.laktorycontext import LaktoryContext
    from laktory.models.laktorycontext import _build_laktory_context_kwargs

    context = LaktoryContext(
        node=self.parent_pipeline_node,
        pipeline=self.parent_pipeline,
    )
    kwargs.update(_build_laktory_context_kwargs(f, context))

    # Call function
    df = f(*args, **kwargs)

    # Convert to narwhals when custom function don't return a Narwhals DataFrame
    if not isinstance(df, AnyFrame):
        df = nw.from_native(df)

    return df