Skip to content

PipelineViewDataSink

laktory.models.datasinks.PipelineViewDataSink ¤

Bases: BaseDataSink

Data sink writing to a Declarative Pipeline (formerly Delta Live Tables) view. This view is virtual and does not materialize data.

Examples:

```python tag:skip-run from laktory import models

df = spark.createDataFrame([{"x": 1}, {"x": 2}, {"x": 3}])

sink = models.PipelineViewDataSink( pipeline_view_name="my_view", ) sink.write(df) ``` References


PARAMETER DESCRIPTION
as_stream

If True output DataFrame is written as Streaming DataFrame. If None, write mode is derived fromDataFrame.

TYPE: bool | None | VariableType DEFAULT: None

checkpoint_path_

Path to which the checkpoint file for which a streaming dataframe should be written.

TYPE: str | Path | VariableType DEFAULT: None

custom_writer

Custom writer that fully replaces Laktory's built-in write logic. Laktory manages the streaming query lifecycle (foreachBatch, trigger, checkpoint, start/await). Can be set as a plain string (func_name only) or a full CustomWriter object with func_name, func_args, and func_kwargs. Mutually exclusive with mode and merge_cdc_options.

TYPE: CustomWriter | None | VariableType DEFAULT: None

databricks_data_profiling_config

Databricks Data Quality Monitor data profiling configuration

TYPE: Literal[None] | VariableType DEFAULT: None

is_quarantine

Sink used to store quarantined results from a pipeline node expectations.

TYPE: bool | VariableType DEFAULT: False

merge_cdc_options

Merge options to handle input DataFrames that are Change Data Capture (CDC). Only used when MERGE mode is selected.

TYPE: DataSinkMergeCDCOptions | VariableType DEFAULT: None

metadata

Table and columns metadata.

TYPE: Literal[None] | VariableType DEFAULT: None

mode

Write mode.

Spark¤

  • OVERWRITE: Overwrite existing data.
  • APPEND: Append contents of this DataFrame to existing data.
  • ERROR: Throw an exception if data already exists.
  • IGNORE: Silently ignore this operation if data already exists.

Spark Streaming¤

  • APPEND: Only the new rows in the streaming DataFrame/Dataset will be written to the sink.
  • COMPLETE: All the rows in the streaming DataFrame/Dataset will be written to the sink every time there are some updates.
  • UPDATE: Only the rows that were updated in the streaming DataFrame/Dataset will be written to the sink every time there are some updates.

Polars Delta¤

  • OVERWRITE: Overwrite existing data.
  • APPEND: Append contents of this DataFrame to existing data.
  • ERROR: Throw an exception if data already exists.
  • IGNORE: Silently ignore this operation if data already exists.

Laktory¤

  • MERGE: Append, update and optionally delete records. Only supported for DELTA format. Requires cdc specification.

TYPE: Literal['COMPLETE', 'IGNORE', 'MERGE', 'ERRORIFEXISTS', 'OVERWRITE', 'UPDATE', 'ERROR', 'APPEND'] | None | VariableType DEFAULT: None

pipeline_view_name

Pipeline View name

TYPE: str | None | VariableType

schema_definition

Explicit table schema used when creating the table. If not set, schema is inferred from the transformer output DataFrame.

TYPE: DataFrameSchema | VariableType DEFAULT: None

type

Sink Type

TYPE: Literal['PIPELINE_VIEW'] | VariableType DEFAULT: 'PIPELINE_VIEW'

writer_kwargs

Keyword arguments passed directly to dataframe backend writer. Passed to .options() method when using PySpark.

TYPE: dict[str | VariableType, Any | VariableType] | VariableType DEFAULT: {}

writer_methods

DataFrame backend writer methods.

TYPE: list[ReaderWriterMethod | VariableType] | VariableType DEFAULT: []

METHOD DESCRIPTION
create

Initialize the sink if required.

is_streaming

Return True if the write should use Spark Structured Streaming.

purge

Delete sink data and checkpoints

read

Read dataframe from sink.

ATTRIBUTE DESCRIPTION
ldp_auto_cdc_flow_kwargs

Keyword arguments for dp.create_auto_cdc_flow function

TYPE: dict[str, str]

sdp_pre_merge_view_name

SPD view applying node transformer prior to applying CDC changes.

ldp_auto_cdc_flow_kwargs property ¤

Keyword arguments for dp.create_auto_cdc_flow function

sdp_pre_merge_view_name property ¤

SPD view applying node transformer prior to applying CDC changes.

create(df=None) ¤

Initialize the sink if required.

Some sinks (e.g., Unity Catalog or Delta tables) must exist before metadata can be applied or data can be written in append mode.

PARAMETER DESCRIPTION
df

Input DataFrame that may be used during sink initialization.

TYPE: DataFrame DEFAULT: None

RETURNS DESCRIPTION
bool

True if the sink was created, False if it already existed or if creation is not required.

Notes

This method is intended to be overridden by subclasses to implement sink-specific initialization logic.

Source code in laktory/models/datasinks/basedatasink.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
def create(self, df: "AnyFrame" = None) -> bool:
    """
    Initialize the sink if required.

    Some sinks (e.g., Unity Catalog or Delta tables) must exist before
    metadata can be applied or data can be written in append mode.

    Parameters
    ----------
    df : DataFrame
        Input DataFrame that may be used during sink initialization.

    Returns
    -------
    bool
        True if the sink was created, False if it already existed or if
        creation is not required.

    Notes
    -----
    This method is intended to be overridden by subclasses to implement
    sink-specific initialization logic.
    """
    return False

is_streaming(df=None) ¤

Return True if the write should use Spark Structured Streaming.

Resolution order: 1. If a Narwhals-wrapped PySpark DataFrame is provided, read its native isStreaming attribute. 2. Fall back to self.as_stream (explicit sink configuration). 3. Fall back to the parent node's source as_stream flag. 4. Default to False (static write).

If both the DataFrame state and the configuration are set and they disagree, a TypeError is raised to surface the misconfiguration early.

PARAMETER DESCRIPTION
df

Optional Narwhals DataFrame or LazyFrame. Must be passed before calling .to_native() so that the Narwhals implementation attribute is still available.

DEFAULT: None

Source code in laktory/models/datasinks/basedatasink.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def is_streaming(self, df=None) -> bool:
    """
    Return `True` if the write should use Spark Structured Streaming.

    Resolution order:
    1. If a Narwhals-wrapped PySpark DataFrame is provided, read its native
       ``isStreaming`` attribute.
    2. Fall back to ``self.as_stream`` (explicit sink configuration).
    3. Fall back to the parent node's source ``as_stream`` flag.
    4. Default to ``False`` (static write).

    If both the DataFrame state and the configuration are set and they
    disagree, a ``TypeError`` is raised to surface the misconfiguration
    early.

    Parameters
    ----------
    df:
        Optional Narwhals DataFrame or LazyFrame. Must be passed before
        calling ``.to_native()`` so that the Narwhals ``implementation``
        attribute is still available.
    """
    # Check if DataFrame is streaming
    df_is_streaming = None
    if df is not None:
        df = nw.from_native(df)
        dataframe_backend = DataFrameBackends(df.implementation)
        if dataframe_backend == DataFrameBackends.PYSPARK:
            df_is_streaming = df.to_native().isStreaming

    # Check if configured as stream from writer or source
    configured_as_stream = self.as_stream
    if configured_as_stream is None:
        node = self.parent_pipeline_node
        if node is not None and node.sources:
            configured_as_stream = node.has_streaming_source

    # Resolve conflict
    if df_is_streaming is not None and configured_as_stream is not None:
        if df_is_streaming != configured_as_stream:
            if df_is_streaming:
                raise TypeError(
                    "Sink configured as static, but received dataframe is streaming."
                )
            else:
                raise TypeError(
                    "Sink configured as stream, but received dataframe is not streaming."
                )

    is_streaming = df_is_streaming or configured_as_stream or False

    return is_streaming

purge() ¤

Delete sink data and checkpoints

Source code in laktory/models/datasinks/pipelineviewdatasink.py
62
63
64
65
66
def purge(self):
    """
    Delete sink data and checkpoints
    """
    return

read(as_stream=None, reader_kwargs=None, reader_methods=None) ¤

Read dataframe from sink.

PARAMETER DESCRIPTION
as_stream

If True, dataframe read as stream.

DEFAULT: None

reader_kwargs

Keyword arguments passed to the dataframe backend reader.

DEFAULT: None

reader_methods

DataFrame backend reader methods.

DEFAULT: None

RETURNS DESCRIPTION
AnyFrame

DataFrame

Source code in laktory/models/datasinks/basedatasink.py
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
def read(self, as_stream=None, reader_kwargs=None, reader_methods=None):
    """
    Read dataframe from sink.

    Parameters
    ----------
    as_stream:
        If `True`, dataframe read as stream.
    reader_kwargs:
        Keyword arguments passed to the dataframe backend reader.
    reader_methods:
        DataFrame backend reader methods.

    Returns
    -------
    AnyFrame
        DataFrame
    """
    return self.as_source(
        as_stream=as_stream,
        reader_kwargs=reader_kwargs,
        reader_methods=reader_methods,
    ).read()