CustomWriter
laktory.models.datasinks.CustomWriter
¤
Bases: BaseModel, PipelineChild
Definition of a custom write function to be called when writing a DataFrame to a sink. Gives the user full control over how data is written.
Laktory manages the streaming query lifecycle (foreachBatch wrapping, trigger, checkpoint, start/await) while the user-supplied function handles the actual write logic.
The function is called as:
# func(df, *func_args, **func_kwargs)
where df is the DataFrame (native or Narwhals depending on the sink's
dataframe_api setting). Laktory optionally injects a laktory_context
keyword argument containing pipeline runtime objects — declare it in your
function signature to opt in.
Examples:
from laktory import models
sink = models.FileDataSink(
path="./my_table/",
format="DELTA",
custom_writer={
"func_name": "mypackage.etl.my_write",
"func_kwargs": {"extra_tag": "production"},
},
)
# sink.write(df)
# mypackage/etl.py
from laktory.models import LaktoryContext
def my_write(
df, extra_tag="default", laktory_context: LaktoryContext = None
) -> None:
sink = laktory_context.sink # access sink coordinates without re-hardcoding
df.write.format("delta").mode("append").save(sink.path)
| PARAMETER | DESCRIPTION |
|---|---|
func_args
|
Positional arguments passed to the function after the DataFrame.
TYPE:
|
func_kwargs
|
Keyword arguments passed to the function.
TYPE:
|
func_name
|
Fully qualified importable module path to the callable (e.g. 'mypackage.etl.my_write'). The function is imported at runtime via
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
execute |
Invoke the user-supplied function with the DataFrame, configured |
execute(df, context=None)
¤
Invoke the user-supplied function with the DataFrame, configured
args/kwargs, and optionally a laktory_context object.
| PARAMETER | DESCRIPTION |
|---|---|
df
|
Input DataFrame (native or Narwhals, as prepared by the caller).
|
context
|
Pre-built LaktoryContext to inject. When
TYPE:
|
Source code in laktory/models/datasinks/customwriter.py
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 | |