Skip to content

HiveMetastoreDataSink

laktory.models.datasinks.HiveMetastoreDataSink ¤

Bases: TableDataSink

Data sink writing to a Hive Metastore data table.

PARAMETER DESCRIPTION
dataframe_backend

Type of DataFrame backend

TYPE: DataFrameBackends DEFAULT: None

dataframe_api

DataFrame API to use in DataFrame Transformer nodes. Either 'NATIVE' (backend-specific) or 'NARWHALS' (backend-agnostic).

TYPE: Literal['NARWHALS', 'NATIVE'] | VariableType DEFAULT: None

variables

Dict of variables to be injected in the model at runtime

TYPE: dict[str, Any] DEFAULT: {}

checkpoint_path

Path to which the checkpoint file for which a streaming dataframe should be written.

TYPE: str | VariableType DEFAULT: None

is_quarantine

Sink used to store quarantined results from a pipeline node expectations.

TYPE: bool | VariableType DEFAULT: False

type

Sink Type

TYPE: Literal['HIVE_METASTORE'] | VariableType DEFAULT: 'HIVE_METASTORE'

merge_cdc_options

Merge options to handle input DataFrames that are Change Data Capture (CDC). Only used when MERGE mode is selected.

TYPE: DataSinkMergeCDCOptions | VariableType DEFAULT: None

mode

Write mode.

Spark¤

  • OVERWRITE: Overwrite existing data.
  • APPEND: Append contents of this DataFrame to existing data.
  • ERROR: Throw an exception if data already exists.
  • IGNORE: Silently ignore this operation if data already exists.

Spark Streaming¤

  • APPEND: Only the new rows in the streaming DataFrame/Dataset will be written to the sink.
  • COMPLETE: All the rows in the streaming DataFrame/Dataset will be written to the sink every time there are some updates.
  • UPDATE: Only the rows that were updated in the streaming DataFrame/Dataset will be written to the sink every time there are some updates.

Polars Delta¤

  • OVERWRITE: Overwrite existing data.
  • APPEND: Append contents of this DataFrame to existing data.
  • ERROR: Throw an exception if data already exists.
  • IGNORE: Silently ignore this operation if data already exists.

Laktory¤

  • MERGE: Append, update and optionally delete records. Only supported for DELTA format. Requires cdc specification.

TYPE: Literal['IGNORE', 'COMPLETE', 'ERROR', 'ERRORIFEXISTS', 'MERGE', 'UPDATE', 'OVERWRITE', 'APPEND'] | None | VariableType DEFAULT: None

writer_kwargs

Keyword arguments passed directly to dataframe backend writer. Passed to .options() method when using PySpark.

TYPE: dict[Union[str, VariableType], Union[Any, VariableType]] | VariableType DEFAULT: {}

writer_methods

DataFrame backend writer methods.

TYPE: list[Union[ReaderWriterMethod, VariableType]] | VariableType DEFAULT: []

catalog_name

Sink table catalog name

TYPE: str | None | VariableType DEFAULT: None

format

Storage format for data table.

TYPE: Literal['PARQUET', 'DELTA'] | VariableType DEFAULT: 'DELTA'

schema_name

Sink table schema name

TYPE: str | None | VariableType DEFAULT: None

table_name

Sink table name. Also supports fully qualified name ({catalog}.{schema}.{table}). In this case, catalog_name and schema_name arguments are ignored.

TYPE: str | VariableType

table_type

Type of table. 'TABLE' and 'VIEW' are currently supported.

TYPE: Literal['TABLE', 'VIEW'] | VariableType DEFAULT: 'TABLE'

view_definition

View definition of 'VIEW' table_type is selected.

TYPE: DataFrameExpr | str | VariableType DEFAULT: None

Examples:

import laktory as lk

df = spark.createDataFrame([{"x": 1}, {"x": 2}, {"x": 3}])

sink = lk.models.HiveMetastoreDataSink(
    schema_name="default",
    table_name="my_table",
    mode="APPEND",
)
# sink.write(df)
References
METHOD DESCRIPTION
as_source

Generate a table data source with the same properties as the sink.

inject_vars

Inject model variables values into a model attributes.

inject_vars_into_dump

Inject model variables values into a model dump.

model_validate_json_file

Load model from json file object

model_validate_yaml

Load model from yaml file object using laktory.yaml.RecursiveLoader. Supports

purge

Delete sink data and checkpoints

push_vars

Push variable values to all child recursively

read

Read dataframe from sink.

validate_assignment_disabled

Updating a model attribute inside a model validator when validate_assignment

write

Write dataframe into sink.

ATTRIBUTE DESCRIPTION
data_sources

Get all sources feeding the sink

dlt_apply_changes_kwargs

Keyword arguments for dlt.apply_changes function

TYPE: dict[str, str]

full_name

Table full name {catalog_name}.{schema_name}.{table_name}

TYPE: str

upstream_node_names

Pipeline node names required to write sink

TYPE: list[str]

data_sources property ¤

Get all sources feeding the sink

dlt_apply_changes_kwargs property ¤

Keyword arguments for dlt.apply_changes function

full_name property ¤

Table full name {catalog_name}.{schema_name}.{table_name}

upstream_node_names property ¤

Pipeline node names required to write sink

as_source(as_stream=None) ¤

Generate a table data source with the same properties as the sink.

PARAMETER DESCRIPTION
as_stream

If True, sink will be read as stream.

DEFAULT: None

RETURNS DESCRIPTION
TableDataSource

Table Data Source

Source code in laktory/models/datasinks/tabledatasink.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
def as_source(self, as_stream=None) -> TableDataSource:
    """
    Generate a table data source with the same properties as the sink.

    Parameters
    ----------
    as_stream:
        If `True`, sink will be read as stream.

    Returns
    -------
    :
        Table Data Source
    """
    source = TableDataSource(
        catalog_name=self.catalog_name,
        table_name=self.table_name,
        schema_name=self.schema_name,
        type=self.type,
        dataframe_backend=self.df_backend,
    )

    if as_stream:
        source.as_stream = as_stream

    if self.dataframe_backend:
        source.dataframe_backend = self.dataframe_backend
    source.parent = self.parent

    return source

inject_vars(inplace=False, vars=None) ¤

Inject model variables values into a model attributes.

PARAMETER DESCRIPTION
inplace

If True model is modified in place. Otherwise, a new model instance is returned.

TYPE: bool DEFAULT: False

vars

A dictionary of variables to be injected in addition to the model internal variables.

TYPE: dict DEFAULT: None

RETURNS DESCRIPTION

Model instance.

Examples:

from typing import Union

from laktory import models


class Cluster(models.BaseModel):
    name: str = None
    size: Union[int, str] = None


c = Cluster(
    name="cluster-${vars.my_cluster}",
    size="${{ 4 if vars.env == 'prod' else 2 }}",
    variables={
        "env": "dev",
    },
).inject_vars()
print(c)
# > variables={'env': 'dev'} name='cluster-${vars.my_cluster}' size=2
References
Source code in laktory/models/basemodel.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
def inject_vars(self, inplace: bool = False, vars: dict = None):
    """
    Inject model variables values into a model attributes.

    Parameters
    ----------
    inplace:
        If `True` model is modified in place. Otherwise, a new model
        instance is returned.
    vars:
        A dictionary of variables to be injected in addition to the
        model internal variables.


    Returns
    -------
    :
        Model instance.

    Examples
    --------
    ```py
    from typing import Union

    from laktory import models


    class Cluster(models.BaseModel):
        name: str = None
        size: Union[int, str] = None


    c = Cluster(
        name="cluster-${vars.my_cluster}",
        size="${{ 4 if vars.env == 'prod' else 2 }}",
        variables={
            "env": "dev",
        },
    ).inject_vars()
    print(c)
    # > variables={'env': 'dev'} name='cluster-${vars.my_cluster}' size=2
    ```

    References
    ----------
    * [variables](https://www.laktory.ai/concepts/variables/)
    """

    # Fetching vars
    if vars is None:
        vars = {}
    vars = deepcopy(vars)
    vars.update(self.variables)

    # Create copy
    if not inplace:
        self = self.model_copy(deep=True)

    # Inject into field values
    for k in list(self.model_fields_set):
        if k == "variables":
            continue
        o = getattr(self, k)

        if isinstance(o, BaseModel) or isinstance(o, dict) or isinstance(o, list):
            # Mutable objects will be updated in place
            _resolve_values(o, vars)
        else:
            # Simple objects must be updated explicitly
            setattr(self, k, _resolve_value(o, vars))

    # Inject into child resources
    if hasattr(self, "core_resources"):
        for r in self.core_resources:
            if r == self:
                continue
            r.inject_vars(vars=vars, inplace=True)

    if not inplace:
        return self

inject_vars_into_dump(dump, inplace=False, vars=None) ¤

Inject model variables values into a model dump.

PARAMETER DESCRIPTION
dump

Model dump (or any other general purpose mutable object)

TYPE: dict[str, Any]

inplace

If True model is modified in place. Otherwise, a new model instance is returned.

TYPE: bool DEFAULT: False

vars

A dictionary of variables to be injected in addition to the model internal variables.

TYPE: dict[str, Any] DEFAULT: None

RETURNS DESCRIPTION

Model dump with injected variables.

Examples:

from laktory import models

m = models.BaseModel(
    variables={
        "env": "dev",
    },
)
data = {
    "name": "cluster-${vars.my_cluster}",
    "size": "${{ 4 if vars.env == 'prod' else 2 }}",
}
print(m.inject_vars_into_dump(data))
# > {'name': 'cluster-${vars.my_cluster}', 'size': 2}
References
Source code in laktory/models/basemodel.py
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
def inject_vars_into_dump(
    self, dump: dict[str, Any], inplace: bool = False, vars: dict[str, Any] = None
):
    """
    Inject model variables values into a model dump.

    Parameters
    ----------
    dump:
        Model dump (or any other general purpose mutable object)
    inplace:
        If `True` model is modified in place. Otherwise, a new model
        instance is returned.
    vars:
        A dictionary of variables to be injected in addition to the
        model internal variables.


    Returns
    -------
    :
        Model dump with injected variables.


    Examples
    --------
    ```py
    from laktory import models

    m = models.BaseModel(
        variables={
            "env": "dev",
        },
    )
    data = {
        "name": "cluster-${vars.my_cluster}",
        "size": "${{ 4 if vars.env == 'prod' else 2 }}",
    }
    print(m.inject_vars_into_dump(data))
    # > {'name': 'cluster-${vars.my_cluster}', 'size': 2}
    ```

    References
    ----------
    * [variables](https://www.laktory.ai/concepts/variables/)
    """

    # Setting vars
    if vars is None:
        vars = {}
    vars = deepcopy(vars)
    vars.update(self.variables)

    # Create copy
    if not inplace:
        dump = copy.deepcopy(dump)

    # Inject into field values
    _resolve_values(dump, vars)

    if not inplace:
        return dump

model_validate_json_file(fp) classmethod ¤

Load model from json file object

PARAMETER DESCRIPTION
fp

file object structured as a json file

TYPE: TextIO

RETURNS DESCRIPTION
Model

Model instance

Source code in laktory/models/basemodel.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
@classmethod
def model_validate_json_file(cls: Type[Model], fp: TextIO) -> Model:
    """
    Load model from json file object

    Parameters
    ----------
    fp:
        file object structured as a json file

    Returns
    -------
    :
        Model instance
    """
    data = json.load(fp)
    return cls.model_validate(data)

model_validate_yaml(fp) classmethod ¤

Load model from yaml file object using laktory.yaml.RecursiveLoader. Supports reference to external yaml and sql files using !use, !extend and !update tags. Path to external files can be defined using model or environment variables.

Referenced path should always be relative to the file they are referenced from.

Custom Tags
  • !use {filepath}: Directly inject the content of the file at filepath

  • - !extend {filepath}: Extend the current list with the elements found in the file at filepath. Similar to python list.extend method.

  • <<: !update {filepath}: Merge the current dictionary with the content of the dictionary defined at filepath. Similar to python dict.update method.

PARAMETER DESCRIPTION
fp

file object structured as a yaml file

TYPE: TextIO

RETURNS DESCRIPTION
Model

Model instance

Examples:

businesses:
  apple:
    symbol: aapl
    address: !use addresses.yaml
    <<: !update common.yaml
    emails:
      - jane.doe@apple.com
      - extend! emails.yaml
  amazon:
    symbol: amzn
    address: !use addresses.yaml
    <<: update! common.yaml
    emails:
      - john.doe@amazon.com
      - extend! emails.yaml
Source code in laktory/models/basemodel.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
@classmethod
def model_validate_yaml(cls: Type[Model], fp: TextIO) -> Model:
    """
    Load model from yaml file object using laktory.yaml.RecursiveLoader. Supports
    reference to external yaml and sql files using `!use`, `!extend` and `!update` tags.
    Path to external files can be defined using model or environment variables.

    Referenced path should always be relative to the file they are referenced from.

    Custom Tags
    -----------
    - `!use {filepath}`:
        Directly inject the content of the file at `filepath`

    - `- !extend {filepath}`:
        Extend the current list with the elements found in the file at `filepath`.
        Similar to python list.extend method.

    - `<<: !update {filepath}`:
        Merge the current dictionary with the content of the dictionary defined at
        `filepath`. Similar to python dict.update method.

    Parameters
    ----------
    fp:
        file object structured as a yaml file

    Returns
    -------
    :
        Model instance

    Examples
    --------
    ```yaml
    businesses:
      apple:
        symbol: aapl
        address: !use addresses.yaml
        <<: !update common.yaml
        emails:
          - jane.doe@apple.com
          - extend! emails.yaml
      amazon:
        symbol: amzn
        address: !use addresses.yaml
        <<: update! common.yaml
        emails:
          - john.doe@amazon.com
          - extend! emails.yaml
    ```
    """

    data = RecursiveLoader.load(fp)
    return cls.model_validate(data)

purge() ¤

Delete sink data and checkpoints

Source code in laktory/models/datasinks/tabledatasink.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def purge(self):
    """
    Delete sink data and checkpoints
    """

    if self.df_backend == DataFrameBackends.PYSPARK:
        from laktory import get_spark_session

        spark = get_spark_session()

        # Remove Data
        logger.info(
            f"Dropping {self.table_type} {self.full_name}",
        )
        spark.sql(f"DROP {self.table_type} IF EXISTS {self.full_name}")

        path = self.writer_kwargs.get("path", None)
        if path:
            path = Path(path)
            if path.exists():
                is_dir = path.is_dir()
                if is_dir:
                    logger.info(f"Deleting data dir {path}")
                    shutil.rmtree(path)
                else:
                    logger.info(f"Deleting data file {path}")
                    os.remove(path)

        # Remove Checkpoint
        self._purge_checkpoint()

    else:
        raise TypeError(f"DataFrame backend {self.df_backend} is not supported.")

push_vars(update_core_resources=False) ¤

Push variable values to all child recursively

Source code in laktory/models/basemodel.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
def push_vars(self, update_core_resources=False) -> Any:
    """Push variable values to all child recursively"""

    def _update_model(m):
        if not isinstance(m, BaseModel):
            return
        for k, v in self.variables.items():
            m.variables[k] = m.variables.get(k, v)
        m.push_vars()

    def _push_vars(o):
        if isinstance(o, list):
            for _o in o:
                _push_vars(_o)
        elif isinstance(o, dict):
            for _o in o.values():
                _push_vars(_o)
        else:
            _update_model(o)

    for k in self.model_fields.keys():
        _push_vars(getattr(self, k))

    if update_core_resources and hasattr(self, "core_resources"):
        for r in self.core_resources:
            if r != self:
                _push_vars(r)

    return None

read(as_stream=None) ¤

Read dataframe from sink.

PARAMETER DESCRIPTION
as_stream

If True, dataframe read as stream.

DEFAULT: None

RETURNS DESCRIPTION
AnyFrame

DataFrame

Source code in laktory/models/datasinks/basedatasink.py
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
def read(self, as_stream=None):
    """
    Read dataframe from sink.

    Parameters
    ----------
    as_stream:
        If `True`, dataframe read as stream.

    Returns
    -------
    AnyFrame
        DataFrame
    """
    return self.as_source(as_stream=as_stream).read()

validate_assignment_disabled() ¤

Updating a model attribute inside a model validator when validate_assignment is True causes an infinite recursion by design and must be turned off temporarily.

Source code in laktory/models/basemodel.py
323
324
325
326
327
328
329
330
331
332
333
334
335
@contextmanager
def validate_assignment_disabled(self):
    """
    Updating a model attribute inside a model validator when `validate_assignment`
    is `True` causes an infinite recursion by design and must be turned off
    temporarily.
    """
    original_state = self.model_config["validate_assignment"]
    self.model_config["validate_assignment"] = False
    try:
        yield
    finally:
        self.model_config["validate_assignment"] = original_state

write(df=None, mode=None, full_refresh=False) ¤

Write dataframe into sink.

PARAMETER DESCRIPTION
df

Input dataframe.

TYPE: AnyFrame DEFAULT: None

full_refresh

If True, source is deleted/dropped (including checkpoint if applicable) before write.

TYPE: bool DEFAULT: False

mode

Write mode overwrite of the sink default mode.

TYPE: str DEFAULT: None

Source code in laktory/models/datasinks/basedatasink.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def write(
    self, df: AnyFrame = None, mode: str = None, full_refresh: bool = False
) -> None:
    """
    Write dataframe into sink.

    Parameters
    ----------
    df:
        Input dataframe.
    full_refresh
        If `True`, source is deleted/dropped (including checkpoint if applicable)
        before write.
    mode:
        Write mode overwrite of the sink default mode.
    """

    if getattr(self, "view_definition", None):
        if self.df_backend == DataFrameBackends.PYSPARK:
            self._write_spark_view()
        elif self.df_backend == DataFrameBackends.POLARS:
            self._write_polars_view()
        else:
            raise ValueError(
                f"DataFrame backend '{self.dataframe_backend}' is not supported"
            )
        return

    if mode is None:
        mode = self.mode

    if not isinstance(df, (nw.DataFrame, nw.LazyFrame)):
        df = nw.from_native(df)

    dataframe_backend = DataFrameBackends.from_nw_implementation(df.implementation)
    if dataframe_backend not in SUPPORTED_BACKENDS:
        raise ValueError(
            f"DataFrame provided is of {dataframe_backend} backend, which is not supported."
        )

    if self.dataframe_backend and self.dataframe_backend != dataframe_backend:
        raise ValueError(
            f"DataFrame provided is {dataframe_backend} and source has been configure with {self.dataframe_backend} backend."
        )
    self.dataframe_backend = dataframe_backend

    self._validate_mode(mode, df)
    self._validate_format()

    if mode and mode.lower() == "merge":
        self.merge_cdc_options.execute(source=df)
        logger.info("Write completed.")
        return

    if self.dataframe_backend == DataFrameBackends.PYSPARK:
        self._write_spark(df=df, mode=mode, full_refresh=full_refresh)
    elif self.dataframe_backend == DataFrameBackends.POLARS:
        self._write_polars(df=df, mode=mode, full_refresh=full_refresh)
    else:
        raise ValueError(
            f"DataFrame backend '{self.dataframe_backend}' is not supported"
        )

    logger.info("Write completed.")