Skip to content

CustomDataSource

laktory.models.datasources.CustomDataSource ¤

Bases: BaseDataSource

Data source backed by a fully user-supplied read function. Use this when no built-in source type fits — REST APIs, proprietary databases, custom formats, etc.

Laktory calls the function and wraps the returned DataFrame in Narwhals. Post-read operations (filter, selects, renames, drops) are still applied by Laktory after the custom read.

Examples:

from laktory import models

source = models.CustomDataSource(
    custom_reader={
        "func_name": "mypackage.etl.my_read",
        "func_kwargs": {"table": "catalog.schema.my_table"},
    },
)
# df = source.read()
# mypackage/etl.py
from laktory.models import LaktoryContext


def my_read(table=None, laktory_context: LaktoryContext = None):
    import polars as pl

    return pl.scan_delta(table)
References
PARAMETER DESCRIPTION
as_stream

If Truesource is read as a streaming DataFrame. Currently only supported by Spark DataFrame backend.

TYPE: bool | VariableType DEFAULT: False

custom_reader

Custom reader definition. Can be set as a plain string (func_name only) or a full CustomReader object with func_name, func_args, and func_kwargs.

TYPE: CustomReader | VariableType

drop_duplicates

Remove duplicated rows from source using all columns if True or only the provided column names.

TYPE: bool | list[str] | VariableType DEFAULT: None

drops

List of columns to drop

TYPE: list | VariableType DEFAULT: None

filter

SQL expression used to select specific rows from the source table

TYPE: str | VariableType DEFAULT: None

renames

Mapping between the source column names and desired column names

TYPE: dict[str | VariableType, str | VariableType] | VariableType DEFAULT: None

selects

Columns to select from the source. Can be specified as a list or as a dictionary to rename the source columns

TYPE: list[str] | dict[str, str] | VariableType DEFAULT: None

type

Source type

TYPE: Literal['CUSTOM'] | VariableType DEFAULT: 'CUSTOM'

METHOD DESCRIPTION
read

Read data with options specified in attributes.

read(**kwargs) ¤

Read data with options specified in attributes.

RETURNS DESCRIPTION
AnyFrame

Resulting dataframe

Source code in laktory/models/datasources/basedatasource.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def read(self, **kwargs) -> AnyFrame:
    """
    Read data with options specified in attributes.

    Returns
    -------
    :
        Resulting dataframe
    """
    logger.info(
        f"Reading `{self.__class__.__name__}` {self._id} with {self.dataframe_backend}"
    )
    df = self._read(**kwargs)

    # Convert to Narwhals
    if not isinstance(df, (nw.LazyFrame, nw.DataFrame)):
        df = nw.from_native(df)

    # Post read
    df = self._post_read(df)

    logger.info("Read completed.")

    return df

laktory.models.datasources.CustomReader ¤

Bases: BaseModel, PipelineChild

Definition of a custom read function used by CustomDataSource. Gives the user full control over how data is read.

The function must return a DataFrame (native or Narwhals). Laktory optionally injects a laktory_context keyword argument — declare it in your function signature to opt in:

def my_read(laktory_context=None):
    print(laktory_context.source)
    print(laktory_context.node)

Examples:

from laktory import models

source = models.CustomDataSource(
    custom_reader={
        "func_name": "mypackage.etl.my_read",
        "func_kwargs": {"table": "catalog.schema.my_table"},
    },
)
# df = source.read()
# mypackage/etl.py
from laktory.models import LaktoryContext


def my_read(table=None, laktory_context: LaktoryContext = None):
    import polars as pl

    return pl.scan_delta(table)
PARAMETER DESCRIPTION
func_args

Positional arguments passed to the function.

TYPE: list[Any | VariableType] | VariableType DEFAULT: <dynamic>

func_kwargs

Keyword arguments passed to the function.

TYPE: dict[str | VariableType, Any | VariableType] | VariableType DEFAULT: <class 'dict'>

func_name

Fully qualified importable module path to the callable (e.g. 'mypackage.etl.my_read'). The function is imported at runtime via importlib and must return a DataFrame (native or Narwhals).

TYPE: str | VariableType

METHOD DESCRIPTION
execute

Invoke the user-supplied function with configured args/kwargs and

execute() ¤

Invoke the user-supplied function with configured args/kwargs and optionally a laktory_context object.

RETURNS DESCRIPTION

DataFrame returned by the user function (native or Narwhals).

Source code in laktory/models/datasources/customreader.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def execute(self):
    """
    Invoke the user-supplied function with configured args/kwargs and
    optionally a `laktory_context` object.

    Returns
    -------
    :
        DataFrame returned by the user function (native or Narwhals).
    """
    func = self._resolve_func()

    func_log = f"{self.func_name}("
    func_log += ",".join([str(a) for a in self.func_args])
    func_log += ",".join([f"{k}={v}" for k, v in self.func_kwargs.items()])
    func_log += ")"
    logger.info(f"Reading with {func_log}")

    context = LaktoryContext(
        node=self.parent_pipeline_node,
        pipeline=self.parent_pipeline,
        source=self.parent,
    )
    call_kwargs = {
        **self.func_kwargs,
        **_build_laktory_context_kwargs(func, context),
    }
    return func(*self.func_args, **call_kwargs)