Skip to content

QualityMonitor

laktory.models.resources.databricks.QualityMonitor ¤

Bases: BaseModel, PulumiResource, TerraformResource

Databricks Quality Monitor

Examples:

import laktory as lk

qm = lk.models.resources.databricks.QualityMonitor(
    assets_dir="/.laktory/qualitymonitors",
    output_schema_name="dev.monitoring",
    table_name="dev.slv_stock_prices",
    snapshot={},
)
PARAMETER DESCRIPTION
assets_dir

The directory to store the monitoring assets (Eg. Dashboard and Metric Tables)

TYPE: str | VariableType

baseline_table_name

Name of the baseline table from which drift metrics are computed from.Columns in the monitored table should also be present in the baseline table.

TYPE: str | VariableType DEFAULT: None

custom_metrics

Custom metrics to compute on the monitored table. These can be aggregate metrics, derived metrics (from already computed aggregate metrics), or drift metrics (comparing metrics across time windows).

TYPE: list[QualityMonitorCustomMetric | VariableType] | VariableType DEFAULT: None

data_classification_config

The data classification config for the monitor

TYPE: QualityMonitorDataClassificationConfig | VariableType DEFAULT: None

inference_log

Configuration for the inference log monitor

TYPE: QualityMonitorInferenceLog | VariableType DEFAULT: None

latest_monitor_failure_msg

TYPE: str | VariableType DEFAULT: None

monitor_id

ID of this monitor is the same as the full table name of the format {catalog}.{schema_name}.{table_name}

TYPE: str | VariableType DEFAULT: None

notifications

The notification settings for the monitor.

TYPE: QualityMonitorNotifications | VariableType DEFAULT: None

output_schema_name_

Schema where output metric tables are created. Its of the format {catalog}.{schema}.

TYPE: str | VariableType DEFAULT: None

schedule

The schedule for automatically updating and refreshing metric tables.

TYPE: QualityMonitorSchedule | None | VariableType DEFAULT: None

skip_builtin_dashboard

Whether to skip creating a default dashboard summarizing data quality metrics. (Can't be updated after creation).

TYPE: bool | VariableType DEFAULT: None

slicing_exprs

List of column expressions to slice data with for targeted analysis. The data is grouped by each expression independently, resulting in a separate slice for each predicate and its complements. For high-cardinality columns, only the top 100 unique values by frequency will generate slices.

TYPE: list[str | VariableType] | VariableType DEFAULT: None

snapshot

Configuration for monitoring snapshot tables.

TYPE: QualityMonitorSnapshot | VariableType DEFAULT: None

table_name_

The full name of the table to attach the monitor too. Its of the format {catalog}.{schema}.{tableName}

TYPE: str | VariableType DEFAULT: None

time_series

Configuration for monitoring timeseries tables.

TYPE: QualityMonitorTimeSeries | VariableType DEFAULT: None

warehouse_id

Optional argument to specify the warehouse for dashboard creation. If not specified, the first running warehouse will be used. (Can't be updated after creation)

TYPE: str | VariableType DEFAULT: None

ATTRIBUTE DESCRIPTION
additional_core_resources

TYPE: list[PulumiResource]

table_name

Remove backticks from table name as they are not accepted by the API

TYPE: str | None

additional_core_resources property ¤

table_name property ¤

Remove backticks from table name as they are not accepted by the API


laktory.models.resources.databricks.qualitymonitor.QualityMonitorCustomMetric ¤

Bases: BaseModel

PARAMETER DESCRIPTION
definition

Create metric definition

TYPE: str | VariableType

input_columns

Columns on the monitored table to apply the custom metrics to.

TYPE: list[str | VariableType] | VariableType

name

Name of the custom metric.

TYPE: str | VariableType

output_data_type

The output type of the custom metric.

TYPE: str | VariableType

type

The type of the custom metric.

TYPE: str | VariableType


laktory.models.resources.databricks.qualitymonitor.QualityMonitorDataClassificationConfig ¤

Bases: BaseModel

PARAMETER DESCRIPTION
enabled

TYPE: bool | VariableType


laktory.models.resources.databricks.qualitymonitor.QualityMonitorInferenceLog ¤

Bases: BaseModel

PARAMETER DESCRIPTION
granularities

List of granularities to use when aggregating data into time windows based on their timestamp.

TYPE: list[str | VariableType] | VariableType

label_col

Column of the model label

TYPE: str | VariableType DEFAULT: None

model_id_col

Column of the model id or version

TYPE: str | VariableType

prediction_col

Column of the model prediction

TYPE: str | VariableType

prediction_proba_col

Column of the model prediction probabilities

TYPE: str | VariableType DEFAULT: None

problem_type

Problem type the model aims to solve. Either PROBLEM_TYPE_CLASSIFICATION or PROBLEM_TYPE_REGRESSION

TYPE: str | VariableType

timestamp_col

Column of the timestamp of predictions

TYPE: str | VariableType


laktory.models.resources.databricks.qualitymonitor.QualityMonitorNotifications ¤

Bases: BaseModel

PARAMETER DESCRIPTION
on_failure

Who to send notifications to on monitor failure.

TYPE: QualityMonitorNotificationsOnFailure | VariableType DEFAULT: None

on_new_classification_tag_detected

Who to send notifications to when new data classification tags are detected.

TYPE: QualityMonitorNotificationsOnNewClassificationTagDetected | VariableType DEFAULT: None


laktory.models.resources.databricks.qualitymonitor.QualityMonitorNotificationsOnFailure ¤

Bases: BaseModel

PARAMETER DESCRIPTION
email_addresses

TYPE: list[str | VariableType] | VariableType


laktory.models.resources.databricks.qualitymonitor.QualityMonitorNotificationsOnNewClassificationTagDetected ¤

Bases: BaseModel

PARAMETER DESCRIPTION
email_addresses

TYPE: list[str | VariableType] | VariableType


laktory.models.resources.databricks.qualitymonitor.QualityMonitorSDKClient(quality_monitor_resource, workspace_client) ¤

METHOD DESCRIPTION
create

Bypass ws.quality_monitors.create to avoid having instantiating the data

update

Bypass ws.quality_monitors.update to avoid having instantiating the data

Source code in laktory/models/resources/databricks/qualitymonitor.py
52
53
54
55
56
def __init__(
    self, quality_monitor_resource, workspace_client: Union["WorkspaceClient", None]
):
    self.qmr = quality_monitor_resource
    self._ws = workspace_client

create() ¤

Bypass ws.quality_monitors.create to avoid having instantiating the data classes.

Source code in laktory/models/resources/databricks/qualitymonitor.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def create(self):
    """
    Bypass ws.quality_monitors.create to avoid having instantiating the data
    classes.
    """
    from databricks.sdk.service.catalog import MonitorInfo

    body = self.qmr.model_dump(exclude_unset=True)

    logger.info(f"Creating Quality Monitor for {self.table_name}")

    table_name = body.pop("table_name")
    headers = {
        "Accept": "application/json",
        "Content-Type": "application/json",
    }
    res = self.ws.quality_monitors._api.do(
        "POST",
        f"/api/2.1/unity-catalog/tables/{table_name}/monitor",
        body=body,
        headers=headers,
    )
    return MonitorInfo.from_dict(res)

update(_qm) ¤

Bypass ws.quality_monitors.update to avoid having instantiating the data classes.

Source code in laktory/models/resources/databricks/qualitymonitor.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def update(self, _qm):
    """
    Bypass ws.quality_monitors.update to avoid having instantiating the data
    classes.
    """
    from databricks.sdk.service.catalog import MonitorInfo
    from databricks.sdk.service.catalog import MonitorInfoStatus

    body = self.qmr.model_dump(exclude_unset=True, exclude_none=True)

    # Exit if update is not possible
    if self.qmr.assets_dir != _qm.assets_dir:
        return False
    if self.qmr.time_series is None and _qm.time_series is not None:
        return False
    if self.qmr.time_series is not None and _qm.time_series is None:
        return False
    table_name = body.pop("table_name")
    body.pop("warehouse_id", None)
    body.pop("assets_dir", None)

    # Wait for previous update or creation to be completed
    while _qm.status == MonitorInfoStatus.MONITOR_STATUS_PENDING:
        time.sleep(1.0)
        _qm = self.get()

    # Get current state
    body0 = _qm.as_dict()

    update_required = False
    for k, v in body.items():
        v0 = body0.get(k, None)
        if v != v0:
            update_required = True
            break

    if not update_required:
        logger.info(f"Quality Monitor for {self.table_name} is already up-to-date.")
        return _qm

    logger.info(f"Updating Quality Monitor for {self.table_name} with body {body}.")

    headers = {
        "Accept": "application/json",
        "Content-Type": "application/json",
    }
    res = self.ws.quality_monitors._api.do(
        "PUT",
        f"/api/2.1/unity-catalog/tables/{table_name}/monitor",
        body=body,
        headers=headers,
    )
    return MonitorInfo.from_dict(res)

laktory.models.resources.databricks.qualitymonitor.QualityMonitorSchedule ¤

Bases: BaseModel

PARAMETER DESCRIPTION
quartz_cron_expression

string expression that determines when to run the monitor. See Quartz documentation for examples.

TYPE: list[str | VariableType] | VariableType

timezone_id

string with timezone id (e.g., PST) in which to evaluate the Quartz expression.

TYPE: str | VariableType


laktory.models.resources.databricks.qualitymonitor.QualityMonitorSnapshot ¤

Bases: BaseModel


laktory.models.resources.databricks.qualitymonitor.QualityMonitorTimeSeries ¤

Bases: BaseModel

PARAMETER DESCRIPTION
granularities

List of granularities to use when aggregating data into time windows based on their timestamp.

TYPE: list[str | VariableType] | VariableType

timestamp_col

Column of the timestamp of predictions.

TYPE: str | VariableType