Skip to content

CustomWriter

laktory.models.datasinks.CustomWriter ¤

Bases: BaseModel, PipelineChild

Definition of a custom write function to be called when writing a DataFrame to a sink. Gives the user full control over how data is written.

Laktory manages the streaming query lifecycle (foreachBatch wrapping, trigger, checkpoint, start/await) while the user-supplied function handles the actual write logic.

The function is called as:

# func(df, *func_args, **func_kwargs)

where df is the DataFrame (native or Narwhals depending on the sink's dataframe_api setting). Laktory optionally injects a laktory_context keyword argument containing pipeline runtime objects — declare it in your function signature to opt in.

Examples:

from laktory import models

sink = models.FileDataSink(
    path="./my_table/",
    format="DELTA",
    custom_writer={
        "func_name": "mypackage.etl.my_write",
        "func_kwargs": {"extra_tag": "production"},
    },
)
# sink.write(df)
# mypackage/etl.py
from laktory.models import LaktoryContext


def my_write(
    df, extra_tag="default", laktory_context: LaktoryContext = None
) -> None:
    sink = laktory_context.sink  # access sink coordinates without re-hardcoding
    df.write.format("delta").mode("append").save(sink.path)
PARAMETER DESCRIPTION
func_args

Positional arguments passed to the function after the DataFrame.

TYPE: list[Any | VariableType] | VariableType DEFAULT: []

func_kwargs

Keyword arguments passed to the function.

TYPE: dict[str | VariableType, Any | VariableType] | VariableType DEFAULT: {}

func_name

Fully qualified importable module path to the callable (e.g. 'mypackage.etl.my_write'). The function is imported at runtime via importlib and called with the DataFrame as its first argument.

TYPE: str | VariableType

METHOD DESCRIPTION
execute

Invoke the user-supplied function with the DataFrame, configured

execute(df, context=None) ¤

Invoke the user-supplied function with the DataFrame, configured args/kwargs, and optionally a laktory_context object.

PARAMETER DESCRIPTION
df

Input DataFrame (native or Narwhals, as prepared by the caller).

context

Pre-built LaktoryContext to inject. When None (the default), the context is built from the _parent chain at call time. Callers that run inside a foreachBatch lambda should pre-build the context before the lambda and pass it here so that the _parent references are captured while they are still intact.

TYPE: LaktoryContext DEFAULT: None

Source code in laktory/models/datasinks/customwriter.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def execute(self, df, context: LaktoryContext = None) -> None:
    """
    Invoke the user-supplied function with the DataFrame, configured
    args/kwargs, and optionally a `laktory_context` object.

    Parameters
    ----------
    df:
        Input DataFrame (native or Narwhals, as prepared by the caller).
    context:
        Pre-built LaktoryContext to inject. When ``None`` (the default),
        the context is built from the ``_parent`` chain at call time.
        Callers that run inside a ``foreachBatch`` lambda should pre-build
        the context before the lambda and pass it here so that the
        ``_parent`` references are captured while they are still intact.
    """
    func = self._resolve_func()

    if not isinstance(df, (nw.DataFrame, nw.LazyFrame)):
        df = nw.from_native(df)

    if self.dataframe_api == "NATIVE":
        df = nw.to_native(df)

    func_log = f"{self.func_name}("
    func_log += ",".join([str(a) for a in self.func_args])
    func_log += ",".join([f"{k}={v}" for k, v in self.func_kwargs.items()])
    func_log += f") with df type {type(df)}"
    logger.info(f"Writing df with {func_log}")

    if context is None:
        context = LaktoryContext(
            node=self.parent_pipeline_node,
            pipeline=self.parent_pipeline,
            sink=self.parent,
        )
    call_kwargs = {
        **self.func_kwargs,
        **_build_laktory_context_kwargs(func, context),
    }
    func(df, *self.func_args, **call_kwargs)