Skip to content

DataSinkMergeCDCOptions

laktory.models.datasinks.DataSinkMergeCDCOptions ¤

Bases: BaseModel

Options for merging a change data capture (CDC).

They are also used to build the target using apply_changes method when using Databricks DLT.

Examples:

from laktory import models

df = spark.createDataFrame(
    [
        {"id": 1, "value": 3.0},
        {"id": 2, "value": 2.3},
        {"id": 3, "value": 7.7},
    ]
)

sink = models.FileDataSink(
    path="./my_table/",
    format="DELTA",
    mode="MERGE",
    merge_cdc_options={
        "scd_type": 1,
        "primary_keys": ["id"],
    },
)
# sink.write(df)
References
PARAMETER DESCRIPTION
delete_where

Specifies when a CDC event should be treated as a DELETE rather than an upsert.

TYPE: str | VariableType DEFAULT: None

end_at_column_name

When using SCD type 2, name of the column storing the end time (or sequencing index) during which a row is active. This attribute is not used when using Databricks DLT which does not allow column rename.

TYPE: str | VariableType DEFAULT: '__end_at'

exclude_columns

A subset of columns to exclude in the target table.

TYPE: list[str | VariableType] | VariableType DEFAULT: None

ignore_null_updates

Allow ingesting updates containing a subset of the target columns. When a CDC event matches an existing row and ignore_null_updates is True, columns with a null will retain their existing values in the target. This also applies to nested columns with a value of null. When ignore_null_updates is False, existing values will be overwritten with null values.

TYPE: bool | VariableType DEFAULT: False

include_columns

A subset of columns to include in the target table. Use include_columns to specify the complete list of columns to include.

TYPE: list[str | VariableType] | VariableType DEFAULT: None

order_by

The column name specifying the logical order of CDC events in the source data. Used to handle change events that arrive out of order.

TYPE: str | VariableType DEFAULT: None

primary_keys

The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table.

TYPE: list[str | VariableType] | VariableType DEFAULT: None

scd_type

Whether to store records as SCD type 1 or SCD type 2.

TYPE: Literal[1, 2] | VariableType DEFAULT: 1

start_at_column_name

When using SCD type 2, name of the column storing the start time (or sequencing index) during which a row is active. This attribute is not used when using Databricks DLT which does not allow column rename.

TYPE: str | VariableType DEFAULT: '__start_at'

METHOD DESCRIPTION
execute

Merge source into target delta from sink

execute(source) ¤

Merge source into target delta from sink

PARAMETER DESCRIPTION
source

Source DataFrame to merge into target (sink).

TYPE: AnyFrame

Source code in laktory/models/datasinks/mergecdcoptions.py
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
def execute(self, source: AnyFrame):
    """
    Merge source into target delta from sink

    Parameters
    ----------
    source:
        Source DataFrame to merge into target (sink).
    """

    dataframe_backend = DataFrameBackends(source.implementation)
    if dataframe_backend not in SUPPORTED_BACKENDS:
        raise NotImplementedError(
            f"DataFrame provided is of {dataframe_backend} backend, which is not currently implemented for merge operations."
        )

    source = source.to_native()

    from delta.tables import DeltaTable

    self._source_schema = source.schema
    spark = source.sparkSession

    if self.target_path:
        if not DeltaTable.isDeltaTable(spark, self.target_path):
            self._init_target(source)
    else:
        try:
            spark.catalog.getTable(self.target_name)
        except Exception:
            self._init_target(source)

    if source.isStreaming:
        if self.sink is None:
            raise ValueError("Sink value required to fetch checkpoint location.")

        if self.sink and self.sink.checkpoint_path is None:
            raise ValueError(
                f"Checkpoint location not specified for sink '{self.sink}'"
            )

        query = (
            source.writeStream.foreachBatch(
                lambda batch_df, batch_id: self._execute(source=batch_df)
            )
            .trigger(availableNow=True)
            .options(
                checkpointLocation=self.sink.checkpoint_path,
            )
            .start()
        )
        query.awaitTermination()

    else:
        self._execute(source=source)