Skip to content

PipelineViewDataSink

laktory.models.datasinks.PipelineViewDataSink ¤

Bases: BaseDataSink

Data sink writing to a Declarative Pipeline (formerly Delta Live Tables) view. This view is virtual and does not materialize data.

Examples:

```python tag:skip-run from laktory import models

df = spark.createDataFrame([{"x": 1}, {"x": 2}, {"x": 3}])

sink = models.PipelineViewDataSink( pipeline_view_name="my_view", ) sink.write(df) ``` References


PARAMETER DESCRIPTION
checkpoint_path_

Path to which the checkpoint file for which a streaming dataframe should be written.

TYPE: str | Path | VariableType DEFAULT: None

custom_writer

Custom writer that fully replaces Laktory's built-in write logic. Laktory manages the streaming query lifecycle (foreachBatch, trigger, checkpoint, start/await). Can be set as a plain string (func_name only) or a full CustomWriter object with func_name, func_args, and func_kwargs. Mutually exclusive with mode and merge_cdc_options.

TYPE: CustomWriter | None | VariableType DEFAULT: None

databricks_quality_monitor

Databricks Quality Monitor

TYPE: Literal[None] | VariableType DEFAULT: None

is_quarantine

Sink used to store quarantined results from a pipeline node expectations.

TYPE: bool | VariableType DEFAULT: False

merge_cdc_options

Merge options to handle input DataFrames that are Change Data Capture (CDC). Only used when MERGE mode is selected.

TYPE: DataSinkMergeCDCOptions | VariableType DEFAULT: None

metadata

Table and columns metadata.

TYPE: Literal[None] | VariableType DEFAULT: None

mode

Write mode.

Spark¤

  • OVERWRITE: Overwrite existing data.
  • APPEND: Append contents of this DataFrame to existing data.
  • ERROR: Throw an exception if data already exists.
  • IGNORE: Silently ignore this operation if data already exists.

Spark Streaming¤

  • APPEND: Only the new rows in the streaming DataFrame/Dataset will be written to the sink.
  • COMPLETE: All the rows in the streaming DataFrame/Dataset will be written to the sink every time there are some updates.
  • UPDATE: Only the rows that were updated in the streaming DataFrame/Dataset will be written to the sink every time there are some updates.

Polars Delta¤

  • OVERWRITE: Overwrite existing data.
  • APPEND: Append contents of this DataFrame to existing data.
  • ERROR: Throw an exception if data already exists.
  • IGNORE: Silently ignore this operation if data already exists.

Laktory¤

  • MERGE: Append, update and optionally delete records. Only supported for DELTA format. Requires cdc specification.

TYPE: Literal['APPEND', 'ERROR', 'MERGE', 'COMPLETE', 'IGNORE', 'UPDATE', 'ERRORIFEXISTS', 'OVERWRITE'] | None | VariableType DEFAULT: None

pipeline_view_name

Pipeline View name

TYPE: str | None | VariableType

schema_definition

Explicit table schema used when creating the table. If not set, schema is inferred from the transformer output DataFrame.

TYPE: DataFrameSchema | VariableType DEFAULT: None

type

Sink Type

TYPE: Literal['PIPELINE_VIEW'] | VariableType DEFAULT: 'PIPELINE_VIEW'

writer_kwargs

Keyword arguments passed directly to dataframe backend writer. Passed to .options() method when using PySpark.

TYPE: dict[str | VariableType, Any | VariableType] | VariableType DEFAULT: {}

writer_methods

DataFrame backend writer methods.

TYPE: list[ReaderWriterMethod | VariableType] | VariableType DEFAULT: []

METHOD DESCRIPTION
create

Initialize the sink if required.

purge

Delete sink data and checkpoints

read

Read dataframe from sink.

ATTRIBUTE DESCRIPTION
dlt_apply_changes_kwargs

Keyword arguments for dlt.apply_changes function

TYPE: dict[str, str]

dlt_pre_merge_view_name

DLT view applying node transformer prior to applying CDC changes.

dlt_apply_changes_kwargs property ¤

Keyword arguments for dlt.apply_changes function

dlt_pre_merge_view_name property ¤

DLT view applying node transformer prior to applying CDC changes.

create(df=None) ¤

Initialize the sink if required.

Some sinks (e.g., Unity Catalog or Delta tables) must exist before metadata can be applied or data can be written in append mode.

PARAMETER DESCRIPTION
df

Input DataFrame that may be used during sink initialization.

TYPE: DataFrame DEFAULT: None

RETURNS DESCRIPTION
bool

True if the sink was created, False if it already existed or if creation is not required.

Notes

This method is intended to be overridden by subclasses to implement sink-specific initialization logic.

Source code in laktory/models/datasinks/basedatasink.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
def create(self, df: "AnyFrame" = None) -> bool:
    """
    Initialize the sink if required.

    Some sinks (e.g., Unity Catalog or Delta tables) must exist before
    metadata can be applied or data can be written in append mode.

    Parameters
    ----------
    df : DataFrame
        Input DataFrame that may be used during sink initialization.

    Returns
    -------
    bool
        True if the sink was created, False if it already existed or if
        creation is not required.

    Notes
    -----
    This method is intended to be overridden by subclasses to implement
    sink-specific initialization logic.
    """
    return False

purge() ¤

Delete sink data and checkpoints

Source code in laktory/models/datasinks/pipelineviewdatasink.py
62
63
64
65
66
def purge(self):
    """
    Delete sink data and checkpoints
    """
    return

read(as_stream=None, reader_kwargs=None, reader_methods=None) ¤

Read dataframe from sink.

PARAMETER DESCRIPTION
as_stream

If True, dataframe read as stream.

DEFAULT: None

reader_kwargs

Keyword arguments passed to the dataframe backend reader.

DEFAULT: None

reader_methods

DataFrame backend reader methods.

DEFAULT: None

RETURNS DESCRIPTION
AnyFrame

DataFrame

Source code in laktory/models/datasinks/basedatasink.py
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
def read(self, as_stream=None, reader_kwargs=None, reader_methods=None):
    """
    Read dataframe from sink.

    Parameters
    ----------
    as_stream:
        If `True`, dataframe read as stream.
    reader_kwargs:
        Keyword arguments passed to the dataframe backend reader.
    reader_methods:
        DataFrame backend reader methods.

    Returns
    -------
    AnyFrame
        DataFrame
    """
    return self.as_source(
        as_stream=as_stream,
        reader_kwargs=reader_kwargs,
        reader_methods=reader_methods,
    ).read()