Skip to content

DataQualityExpectation

laktory.models.dataquality.DataQualityExpectation ¤

Bases: BaseModel, PipelineChild

Data Quality Expectation for a given DataFrame expressed as a row-specific condition (type="ROW") or as an aggregated metric (type="AGGREGATE").

The expression may be defined as a SQL statement or a DataFrame expression.

Examples:

from laktory import models

dqe = models.DataQualityExpectation(
    name="price higher than 10",
    action="WARN",
    expr="close > 127",
    tolerance={"rel": 0.05},
)
print(dqe)
'''
dataframe_backend_=None dataframe_api_=None variables={} action='WARN' type='ROW' name='price higher than 10' expr=DataFrameColumnExpr(dataframe_backend_=None, dataframe_api_=None, variables={}, expr='close > 127', type='SQL', dataframe_backend=<DataFrameBackends.PYSPARK: 'PYSPARK'>, dataframe_api='NARWHALS') tolerance=ExpectationTolerance(variables={}, abs=None, rel=0.05) dataframe_backend=<DataFrameBackends.PYSPARK: 'PYSPARK'> dataframe_api='NARWHALS'
'''

dqe = models.DataQualityExpectation(
    name="rows count",
    expr="COUNT(*) > 50",
    type="AGGREGATE",
)
print(dqe)
'''
dataframe_backend_=None dataframe_api_=None variables={} action='WARN' type='AGGREGATE' name='rows count' expr=DataFrameColumnExpr(dataframe_backend_=None, dataframe_api_=None, variables={}, expr='COUNT(*) > 50', type='SQL', dataframe_backend=<DataFrameBackends.PYSPARK: 'PYSPARK'>, dataframe_api='NARWHALS') tolerance=ExpectationTolerance(variables={}, abs=0, rel=None) dataframe_backend=<DataFrameBackends.PYSPARK: 'PYSPARK'> dataframe_api='NARWHALS'
'''
References
PARAMETER DESCRIPTION
action

Action to take when expectation is not met. - WARN: Write invalid records to the output DataFrame, but log exception. - DROP: Drop Invalid records to the output DataFrame and log exception. - QUARANTINE: Forward invalid data for quarantine. - FAIL: Raise exception when invalid records are found.

TYPE: Literal['WARN', 'DROP', 'QUARANTINE', 'FAIL'] | VariableType DEFAULT: 'WARN'

expr

SQL or DataFrame expression representing a row-specific condition or an aggregated metric.

TYPE: str | DataFrameColumnExpr | VariableType DEFAULT: None

name

Name of the expectation

TYPE: str | VariableType

tolerance

Tolerance for non-matching rows before resulting in failure. Only available for 'ROW' type expectation.

TYPE: ExpectationTolerance | VariableType DEFAULT: ExpectationTolerance(variables={}, abs=0, rel=None)

type

Type of expectation: - "ROW": Row-specific condition. Must be a boolean expression. - "AGGREGATE": Global condition. Must be a boolean expression.

TYPE: Literal['AGGREGATE', 'ROW'] | VariableType DEFAULT: 'ROW'

METHOD DESCRIPTION
raise_or_warn

Raise exception or issue warning if expectation is not met.

run_check

Check if expectation is met save result.

ATTRIBUTE DESCRIPTION
fail_filter

Expression representing all rows not meeting the expectation.

TYPE: Expr | None

is_dlt_compatible

Expectation is supported by DLT

TYPE: bool

is_dlt_managed

Expectation is DLT-compatible and pipeline node is executed by DLT

TYPE: bool

keep_filter

Expression representing all rows to keep, considering both the

TYPE: Expr | None

pass_filter

Expression representing all rows meeting the expectation.

TYPE: Expr | None

quarantine_filter

Expression representing all rows to quarantine, considering both the

TYPE: Expr | None

fail_filter property ¤

Expression representing all rows not meeting the expectation.

is_dlt_compatible property ¤

Expectation is supported by DLT

is_dlt_managed property ¤

Expectation is DLT-compatible and pipeline node is executed by DLT

keep_filter property ¤

Expression representing all rows to keep, considering both the expectation and the selected action.

pass_filter property ¤

Expression representing all rows meeting the expectation.

quarantine_filter property ¤

Expression representing all rows to quarantine, considering both the expectation and the selected action.

raise_or_warn(node=None) ¤

Raise exception or issue warning if expectation is not met.

Source code in laktory/models/dataquality/expectation.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
def raise_or_warn(self, node=None) -> None:
    """
    Raise exception or issue warning if expectation is not met.
    """

    # Failure Message
    msg = f"Expectation '{self.name}'"
    if node:
        msg += f" for node '{node.name}'"
    msg += f" FAILED | {self.log_msg}"

    if self.check.status != "FAIL":
        return

    # Raise Exception
    if self.action == "FAIL":
        raise DataQualityCheckFailedError(self, node)
    else:
        # actions: WARN, DROP, QUARANTINE
        warnings.warn(msg)

run_check(df, raise_or_warn=False, node=None) ¤

Check if expectation is met save result.

PARAMETER DESCRIPTION
df

Input DataFrame for checking the expectation.

TYPE: AnyFrame

raise_or_warn

Raise exception or issue warning if expectation is not met.

TYPE: bool DEFAULT: False

node

Pipeline Node

DEFAULT: None

RETURNS DESCRIPTION
output

Check result.

TYPE: DataQualityCheck

Source code in laktory/models/dataquality/expectation.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def run_check(
    self,
    df: AnyFrame,
    raise_or_warn: bool = False,
    node=None,
) -> DataQualityCheck:
    """
    Check if expectation is met save result.

    Parameters
    ----------
    df:
        Input DataFrame for checking the expectation.
    raise_or_warn:
        Raise exception or issue warning if expectation is not met.
    node:
        Pipeline Node

    Returns
    -------
    output: DataQualityCheck
        Check result.
    """

    logger.info(
        f"Checking expectation '{self.name}' | {self.expr.expr} (type: {self.type})"
    )

    # Run Check
    self._check = self._check_df(df)

    if raise_or_warn:
        self.raise_or_warn(node)

    return self._check

laktory.models.dataquality.expectation.ExpectationTolerance ¤

Bases: BaseModel

Tolerance values for data quality expectations with support for either absolute or relative tolerances.

PARAMETER DESCRIPTION
abs

Maximum number of rows with failure for a PASS status

TYPE: int | VariableType DEFAULT: None

rel

Relative number of rows with failure for a PASS status

TYPE: float | VariableType DEFAULT: None