CustomDataSource
laktory.models.datasources.CustomDataSource
¤
Bases: BaseDataSource
Data source backed by a fully user-supplied read function. Use this when no built-in source type fits — REST APIs, proprietary databases, custom formats, etc.
Laktory calls the function and wraps the returned DataFrame in Narwhals. Post-read operations (filter, selects, renames, drops) are still applied by Laktory after the custom read.
Examples:
from laktory import models
source = models.CustomDataSource(
custom_reader={
"func_name": "mypackage.etl.my_read",
"func_kwargs": {"table": "catalog.schema.my_table"},
},
)
# df = source.read()
# mypackage/etl.py
from laktory.models import LaktoryContext
def my_read(table=None, laktory_context: LaktoryContext = None):
import polars as pl
return pl.scan_delta(table)
References
| PARAMETER | DESCRIPTION |
|---|---|
as_stream
|
If
TYPE:
|
custom_reader
|
Custom reader definition. Can be set as a plain string (func_name only) or a full CustomReader object with func_name, func_args, and func_kwargs.
TYPE:
|
drop_duplicates
|
Remove duplicated rows from source using all columns if
TYPE:
|
drops
|
List of columns to drop
TYPE:
|
filter
|
SQL expression used to select specific rows from the source table
TYPE:
|
renames
|
Mapping between the source column names and desired column names
TYPE:
|
selects
|
Columns to select from the source. Can be specified as a list or as a dictionary to rename the source columns
TYPE:
|
type
|
Source type
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
read |
Read data with options specified in attributes. |
read(**kwargs)
¤
Read data with options specified in attributes.
| RETURNS | DESCRIPTION |
|---|---|
AnyFrame
|
Resulting dataframe |
Source code in laktory/models/datasources/basedatasource.py
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 | |
laktory.models.datasources.CustomReader
¤
Bases: BaseModel, PipelineChild
Definition of a custom read function used by CustomDataSource. Gives the
user full control over how data is read.
The function must return a DataFrame (native or Narwhals). Laktory optionally injects
a laktory_context keyword argument — declare it in your function signature
to opt in:
def my_read(laktory_context=None):
print(laktory_context.source)
print(laktory_context.node)
Examples:
from laktory import models
source = models.CustomDataSource(
custom_reader={
"func_name": "mypackage.etl.my_read",
"func_kwargs": {"table": "catalog.schema.my_table"},
},
)
# df = source.read()
# mypackage/etl.py
from laktory.models import LaktoryContext
def my_read(table=None, laktory_context: LaktoryContext = None):
import polars as pl
return pl.scan_delta(table)
| PARAMETER | DESCRIPTION |
|---|---|
func_args
|
Positional arguments passed to the function.
TYPE:
|
func_kwargs
|
Keyword arguments passed to the function.
TYPE:
|
func_name
|
Fully qualified importable module path to the callable (e.g. 'mypackage.etl.my_read'). The function is imported at runtime via
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
execute |
Invoke the user-supplied function with configured args/kwargs and |
execute()
¤
Invoke the user-supplied function with configured args/kwargs and
optionally a laktory_context object.
| RETURNS | DESCRIPTION |
|---|---|
|
DataFrame returned by the user function (native or Narwhals). |
Source code in laktory/models/datasources/customreader.py
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | |