Skip to content

DataQualityExpectation

laktory.models.dataquality.DataQualityExpectation ¤

Bases: BaseModel, PipelineChild

Data Quality Expectation for a given DataFrame expressed as a row-specific condition (type="ROW") or as an aggregated metric (type="AGGREGATE").

The expression may be defined as a SQL statement or a DataFrame expression.

Examples:

from laktory import models

dqe = models.DataQualityExpectation(
    name="price higher than 10",
    action="WARN",
    expr="close > 127",
    tolerance={"rel": 0.05},
)
print(dqe)
'''
dataframe_backend_=None dataframe_api_=None variables={} action='WARN' type='ROW' name='price higher than 10' expr=DataFrameColumnExpr(dataframe_backend_=None, dataframe_api_=None, variables={}, expr='close > 127', type='SQL', dataframe_backend=<DataFrameBackends.PYSPARK: 'PYSPARK'>, dataframe_api='NARWHALS') tolerance=ExpectationTolerance(variables={}, abs=None, rel=0.05) dataframe_backend=<DataFrameBackends.PYSPARK: 'PYSPARK'> dataframe_api='NARWHALS'
'''

dqe = models.DataQualityExpectation(
    name="rows count",
    expr="COUNT(*) > 50",
    type="AGGREGATE",
)
print(dqe)
'''
dataframe_backend_=None dataframe_api_=None variables={} action='WARN' type='AGGREGATE' name='rows count' expr=DataFrameColumnExpr(dataframe_backend_=None, dataframe_api_=None, variables={}, expr='COUNT(*) > 50', type='SQL', dataframe_backend=<DataFrameBackends.PYSPARK: 'PYSPARK'>, dataframe_api='NARWHALS') tolerance=ExpectationTolerance(variables={}, abs=0, rel=None) dataframe_backend=<DataFrameBackends.PYSPARK: 'PYSPARK'> dataframe_api='NARWHALS'
'''
References
PARAMETER DESCRIPTION
action

Action to take when expectation is not met. - WARN: Write invalid records to the output DataFrame, but log exception. - DROP: Drop Invalid records to the output DataFrame and log exception. - QUARANTINE: Forward invalid data for quarantine. - FAIL: Raise exception when invalid records are found.

TYPE: Literal['WARN', 'DROP', 'QUARANTINE', 'FAIL'] | VariableType DEFAULT: 'WARN'

expr

SQL or DataFrame expression representing a row-specific condition or an aggregated metric.

TYPE: str | DataFrameColumnExpr | VariableType DEFAULT: None

name

Name of the expectation

TYPE: str | VariableType

tolerance

Tolerance for non-matching rows before resulting in failure. Only available for 'ROW' type expectation.

TYPE: ExpectationTolerance | VariableType DEFAULT: ExpectationTolerance(variables={}, abs=0, rel=None)

type

Type of expectation: - "ROW": Row-specific condition. Must be a boolean expression. - "AGGREGATE": Global condition. Must be a boolean expression.

TYPE: Literal['AGGREGATE', 'ROW'] | VariableType DEFAULT: 'ROW'

METHOD DESCRIPTION
raise_or_warn

Raise exception or issue warning if expectation is not met.

run_check

Check if expectation is met save result.

ATTRIBUTE DESCRIPTION
fail_filter

Expression representing all rows not meeting the expectation.

TYPE: Expr | None

is_ldp_compatible

Expectation is supported by LDP natively via @dp.expect_* decorators

TYPE: bool

is_ldp_managed

Expectation is LDP-compatible and pipeline node is executing inside LDP

TYPE: bool

is_sdp_compatible

Expectation is supported by SDP natively via @dp.expect_* decorators

TYPE: bool

is_sdp_managed

True when the expectation is delegated to the SDP runtime via @dp.expect_*.

TYPE: bool

keep_filter

Expression representing all rows to keep, considering both the

TYPE: Expr | None

pass_filter

Expression representing all rows meeting the expectation.

TYPE: Expr | None

quarantine_filter

Expression representing all rows to quarantine, considering both the

TYPE: Expr | None

fail_filter property ¤

Expression representing all rows not meeting the expectation.

is_ldp_compatible property ¤

Expectation is supported by LDP natively via @dp.expect_* decorators

is_ldp_managed property ¤

Expectation is LDP-compatible and pipeline node is executing inside LDP

is_sdp_compatible property ¤

Expectation is supported by SDP natively via @dp.expect_* decorators

is_sdp_managed property ¤

True when the expectation is delegated to the SDP runtime via @dp.expect_*.

keep_filter property ¤

Expression representing all rows to keep, considering both the expectation and the selected action.

pass_filter property ¤

Expression representing all rows meeting the expectation.

quarantine_filter property ¤

Expression representing all rows to quarantine, considering both the expectation and the selected action.

raise_or_warn(node=None) ¤

Raise exception or issue warning if expectation is not met.

Source code in laktory/models/dataquality/expectation.py
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
def raise_or_warn(self, node=None) -> None:
    """
    Raise exception or issue warning if expectation is not met.
    """

    # Failure Message
    msg = f"Expectation '{self.name}'"
    if node:
        msg += f" for node '{node.name}'"
    msg += f" FAILED | {self.log_msg}"

    if self.check.status != "FAIL":
        return

    # Raise Exception
    if self.action == "FAIL":
        raise DataQualityCheckFailedError(self, node)
    else:
        # actions: WARN, DROP, QUARANTINE
        warnings.warn(msg)

run_check(df, raise_or_warn=False, node=None) ¤

Check if expectation is met save result.

PARAMETER DESCRIPTION
df

Input DataFrame for checking the expectation.

TYPE: AnyFrame

raise_or_warn

Raise exception or issue warning if expectation is not met.

TYPE: bool DEFAULT: False

node

Pipeline Node

DEFAULT: None

RETURNS DESCRIPTION
output

Check result.

TYPE: DataQualityCheck

Source code in laktory/models/dataquality/expectation.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def run_check(
    self,
    df: AnyFrame,
    raise_or_warn: bool = False,
    node=None,
) -> DataQualityCheck:
    """
    Check if expectation is met save result.

    Parameters
    ----------
    df:
        Input DataFrame for checking the expectation.
    raise_or_warn:
        Raise exception or issue warning if expectation is not met.
    node:
        Pipeline Node

    Returns
    -------
    output: DataQualityCheck
        Check result.
    """

    logger.info(
        f"Checking expectation '{self.name}' | {self.expr.expr} (type: {self.type})"
    )

    # Run Check
    self._check = self._check_df(df)

    if raise_or_warn:
        self.raise_or_warn(node)

    return self._check

laktory.models.dataquality.expectation.ExpectationTolerance ¤

Bases: BaseModel

Tolerance values for data quality expectations with support for either absolute or relative tolerances.

PARAMETER DESCRIPTION
abs

Maximum number of rows with failure for a PASS status

TYPE: int | VariableType DEFAULT: None

rel

Relative number of rows with failure for a PASS status

TYPE: float | VariableType DEFAULT: None