Skip to content

Custom Extension

API Documentation

laktory.api.register_dataframe_namespace
laktory.api.register_lazyframe_namespace
[laktory.api.register_anyframe_namespace][]
laktory.api.register_expr_namespace
[laktory.api.register_spark_dataframe_namespace][]
[laktory.api.register_spark_column_namespace][]

While Narwhals and Laktory provide a rich set of built-in DataFrame methods, Laktory also supports the creation of custom namespaces for registering your own methods and functions, callable from pipeline YAML.

Narwhals namespaces¤

Use Narwhals namespaces when you want your custom logic to work across all backends and plan on manipulating a Narwhals DataFrame. Methods are written against the Narwhals API and are backend-agnostic.

import narwhals as nw
import polars as pl

import laktory as lk


@lk.api.register_anyframe_namespace("custom")
class CustomNamespace:
    def __init__(self, _df):
        self._df = _df  # Narwhals DataFrame

    def with_x2(self):
        return self._df.with_columns(x2=nw.col("x") * 2)


df = nw.from_native(pl.DataFrame({"x": [0, 1]}))

df = df.custom.with_x2()

Use register_dataframe_namespace or register_lazyframe_namespace to restrict the registration to a specific frame type, and register_expr_namespace to extend column expressions.

In a pipeline¤

name: my_pipeline
nodes:
  - name: slv_prices
    source: ...
    transformer:
      nodes:
        - func_name: custom.with_x2
          dataframe_api: NARWHALS

Spark namespaces¤

Use Spark namespaces when your team works exclusively with PySpark and you want to write pure Spark code - no Narwhals imports required.

import pyspark.sql.functions as F
import laktory as lk


@lk.api.register_spark_dataframe_namespace("custom")
class CustomOps:
    def __init__(self, _df):
        self._df = _df  # native PySpark DataFrame

    def with_x2(self):
        return self._df.withColumn("x2", F.col("x1") * 2)

In a pipeline¤

name: my_pipeline
nodes:
  - name: slv_prices
    source: ...
    transformer:
      nodes:
        - func_name: custom.with_x2
          dataframe_api: NATIVE

Spark column namespaces¤

Use register_spark_column_namespace to extend PySpark Column objects with reusable expression helpers. These are accessible from func_args strings when DATAFRAME_API=NATIVE.

import laktory as lk


@lk.api.register_spark_column_namespace("custom")
class CustomColOps:
    def __init__(self, _col):
        self._col = _col

    def double(self):
        return self._col * 2
transformer:
  nodes:
    - func_name: withColumn
      func_args:
        - x2
        - "col('x1').custom.double()"
      dataframe_api: NATIVE

Packaging custom extensions¤

You can combine custom namespaces with a Python Package to bundle and distribute your extensions, making them available to any Laktory pipeline that lists the package as a dependency.