Skip to content

Pipeline

laktory.models.resources.databricks.pipeline.Pipeline ¤

Bases: BaseModel, PulumiResource, TerraformResource

Databricks Lakeflow Declarative Pipeline (formerly Delta Live Tables)

Examples:

Assuming the configuration yaml file

import io

from laktory import models

# Define pipeline
pipeline_yaml = '''
name: pl-stock-prices

catalog: dev
target: finance

clusters:
  - name : default
    node_type_id: Standard_DS3_v2
    autoscale:
      min_workers: 1
      max_workers: 2

libraries:
  - notebook:
      path: /pipelines/dlt_brz_template.py
  - notebook:
      path: /pipelines/dlt_slv_template.py
  - notebook:
      path: /pipelines/dlt_gld_stock_performances.py

access_controls:
  - group_name: account users
    permission_level: CAN_VIEW
  - group_name: role-engineers
    permission_level: CAN_RUN

'''
pipeline = models.resources.databricks.Pipeline.model_validate_yaml(
    io.StringIO(pipeline_yaml)
)
References
PARAMETER DESCRIPTION
access_controls

Pipeline access controls

TYPE: list[AccessControl | VariableType] | VariableType DEFAULT: []

allow_duplicate_names

If False, deployment will fail if name conflicts with that of another pipeline.

TYPE: bool | VariableType DEFAULT: None

budget_policy_id

optional string specifying ID of the budget policy for this DLT pipeline.

TYPE: str | VariableType DEFAULT: None

catalog

Name of the unity catalog storing the pipeline tables

TYPE: str | None | VariableType DEFAULT: None

cause

TYPE: str | VariableType DEFAULT: None

channel

Name of the release channel for Spark version used by DLT pipeline.

TYPE: Literal['CURRENT', 'PREVIEW'] | VariableType DEFAULT: 'PREVIEW'

cluster_id

TYPE: str | VariableType DEFAULT: None

clusters

Clusters to run the pipeline. If none is specified, pipelines will automatically select a default cluster configuration for the pipeline.

TYPE: list[PipelineCluster | VariableType] | VariableType DEFAULT: []

configuration

List of values to apply to the entire pipeline. Elements must be formatted as key:value pairs

TYPE: dict[str | VariableType, str | VariableType] | VariableType DEFAULT: {}

continuous

If True, the pipeline is run continuously.

TYPE: bool | VariableType DEFAULT: None

creator_user_name

TYPE: str | VariableType DEFAULT: None

deployment

Deployment type of this pipeline.

TYPE: PipelineDeployment | VariableType DEFAULT: None

development

If True the pipeline is run in development mode

TYPE: bool | VariableType DEFAULT: None

edition

Name of the product edition

TYPE: Literal['CORE', 'PRO', 'ADVANCED'] | VariableType DEFAULT: None

event_log

An optional block specifying a table where DLT Event Log will be stored.

TYPE: PipelineEventLog | VariableType DEFAULT: None

expected_last_modified

TYPE: int | VariableType DEFAULT: None

filters

Filters on which Pipeline packages to include in the deployed graph.

TYPE: PipelineFilters | VariableType DEFAULT: None

gateway_definition

The definition of a gateway pipeline to support CDC.

TYPE: PipelineGatewayDefinition | VariableType DEFAULT: None

health

TYPE: str | VariableType DEFAULT: None

ingestion_definition

Lakeflow Ingestion Pipeline definition

TYPE: PipelineIngestionDefinition | VariableType DEFAULT: None

last_modified

TYPE: int | VariableType DEFAULT: None

latest_updates

TYPE: list[PipelineLatestUpdate | VariableType] | VariableType DEFAULT: None

libraries

Specifies pipeline code (notebooks) and required artifacts.

TYPE: list[PipelineLibrary | VariableType] | VariableType DEFAULT: None

name

Pipeline name

TYPE: str | VariableType

name_prefix

Prefix added to the DLT pipeline name

TYPE: str | VariableType DEFAULT: None

name_suffix

Suffix added to the DLT pipeline name

TYPE: str | VariableType DEFAULT: None

notifications

Notifications specifications

TYPE: list[PipelineNotifications | VariableType] | VariableType DEFAULT: []

photon

If True, Photon engine enabled.

TYPE: bool | VariableType DEFAULT: None

restart_window

TYPE: PipelineRestartWindow | VariableType DEFAULT: None

root_path

An optional string specifying the root path for this pipeline. This is used as the root directory when editing the pipeline in the Databricks user interface and it is added to sys.path when executing Python sources during pipeline execution.

TYPE: str | VariableType DEFAULT: None

run_as

TYPE: PipelineRunAs | VariableType DEFAULT: None

run_as_user_name

TYPE: str | VariableType DEFAULT: None

schema_

The default schema (database) where tables are read from or published to. The presence of this attribute implies that the pipeline is in direct publishing mode.

TYPE: str | VariableType DEFAULT: None

serverless

If True, serverless is enabled

TYPE: bool | VariableType DEFAULT: None

state

TYPE: str | VariableType DEFAULT: None

storage

A location on DBFS or cloud storage where output data and metadata required for pipeline execution are stored. By default, tables are stored in a subdirectory of this location. Change of this parameter forces recreation of the pipeline. (Conflicts with catalog).

TYPE: str | VariableType DEFAULT: None

tags

A map of tags associated with the pipeline. These are forwarded to the cluster as cluster tags, and are therefore subject to the same limitations. A maximum of 25 tags can be added to the pipeline.

TYPE: dict[str | VariableType, str | VariableType] | VariableType DEFAULT: None

target

The name of a database (in either the Hive metastore or in a UC catalog) for persisting pipeline output data. Configuring the target setting allows you to view and query the pipeline output data from the Databricks UI.

TYPE: str | VariableType DEFAULT: None

trigger

TYPE: PipelineTrigger | VariableType DEFAULT: None

url

URL of the DLT pipeline on the given workspace.

TYPE: str | VariableType DEFAULT: None

ATTRIBUTE DESCRIPTION
additional_core_resources
  • permissions

TYPE: list[PulumiResource]

resource_type_id

dlt

TYPE: str

additional_core_resources property ¤

  • permissions

resource_type_id property ¤

dlt


laktory.models.resources.databricks.pipeline.PipelineCluster ¤

Bases: Cluster

Pipeline Cluster. Same attributes as laktory.models.Cluster, except for

  • autotermination_minutes
  • cluster_id
  • data_security_mode
  • enable_elastic_disk
  • idempotency_token
  • is_pinned
  • libraries
  • no_wait
  • node_type_id
  • runtime_engine
  • single_user_name
  • spark_version

that are not allowed.

PARAMETER DESCRIPTION
access_controls

List of access controls

TYPE: list[AccessControl | VariableType] | VariableType DEFAULT: []

apply_policy_default_values

Whether to use policy default values for missing cluster attributes.

TYPE: bool | VariableType DEFAULT: None

autoscale

Autoscale specifications

TYPE: ClusterAutoScale | VariableType DEFAULT: None

autotermination_minutes

Automatically terminate the cluster after being inactive for this time in minutes.

TYPE: int | VariableType DEFAULT: None

cluster_id

Cluster ID. Used when assigning a cluster to a job task.

TYPE: str | VariableType DEFAULT: None

cluster_name

Cluster name, which doesn’t have to be unique. If not specified at creation, the cluster name will be an empty string.

TYPE: str | VariableType DEFAULT: None

custom_tags

Additional tags for cluster resources. Databricks will tag all cluster resources (e.g., AWS EC2 instances and EBS volumes) with these tags in addition to default_tags. If a custom cluster tag has the same name as a default cluster tag, the custom tag is prefixed with an x_ when it is propagated.

TYPE: dict[str | VariableType, str | VariableType] | VariableType DEFAULT: None

data_security_mode

Select the security features of the cluster (see API docs for full list of values). Unity Catalog requires SINGLE_USER or USER_ISOLATION mode. LEGACY_PASSTHROUGH for passthrough cluster and LEGACY_TABLE_ACL for Table ACL cluster. If omitted, default security features are enabled. To disable security features use NONE or legacy mode NO_ISOLATION. If kind is specified, then the following options are available: - DATA_SECURITY_MODE_AUTO: Databricks will choose the most appropriate access mode depending on your compute configuration. - DATA_SECURITY_MODE_STANDARD: Alias for USER_ISOLATION. - DATA_SECURITY_MODE_DEDICATED: Alias for SINGLE_USER.

TYPE: str | VariableType DEFAULT: None

driver_instance_pool_id

Similar to instance_pool_id, but for driver node. If omitted, and instance_pool_id is specified, then the driver will be allocated from that pool.

TYPE: str | VariableType DEFAULT: None

driver_node_type_id

The node type of the Spark driver. This field is optional; if unset, API will set the driver node type to the same value as node_type_id defined above.

TYPE: str | VariableType DEFAULT: None

enable_elastic_disk

If you don’t want to allocate a fixed number of EBS volumes at cluster creation time, use autoscaling local storage. With autoscaling local storage, Databricks monitors the amount of free disk space available on your cluster’s Spark workers. If a worker begins to run too low on disk, Databricks automatically attaches a new EBS volume to the worker before it runs out of disk space. EBS volumes are attached up to a limit of 5 TB of total disk space per instance (including the instance’s local storage). To scale down EBS usage, make sure you have autotermination_minutes and autoscale attributes set.

TYPE: bool | VariableType DEFAULT: None

enable_local_disk_encryption

Some instance types you use to run clusters may have locally attached disks. Databricks may store shuffle data or temporary data on these locally attached disks. To ensure that all data at rest is encrypted for all storage types, including shuffle data stored temporarily on your cluster’s local disks, you can enable local disk encryption. When local disk encryption is enabled, Databricks generates an encryption key locally unique to each cluster node and uses it to encrypt all data stored on local disks. The scope of the key is local to each cluster node and is destroyed along with the cluster node itself. During its lifetime, the key resides in memory for encryption and decryption and is stored encrypted on the disk. Your workloads may run more slowly because of the performance impact of reading and writing encrypted data to and from local volumes. This feature is not available for all Azure Databricks subscriptions. Contact your Microsoft or Databricks account representative to request access.

TYPE: bool | VariableType DEFAULT: None

idempotency_token

An optional token to guarantee the idempotency of cluster creation requests. If an active cluster with the provided token already exists, the request will not create a new cluster, but it will return the existing running cluster's ID instead. If you specify the idempotency token, upon failure, you can retry until the request succeeds. Databricks platform guarantees to launch exactly one cluster with that idempotency token. This token should have at most 64 characters.

TYPE: str | VariableType DEFAULT: None

init_scripts

List of init scripts specifications

TYPE: list[ClusterInitScript | VariableType] | VariableType DEFAULT: []

instance_pool_id

To reduce cluster start time, you can attach a cluster to a predefined pool of idle instances. When attached to a pool, a cluster allocates its driver and worker nodes from the pool. If the pool does not have sufficient idle resources to accommodate the cluster’s request, it expands by allocating new instances from the instance provider. When an attached cluster changes its state to TERMINATED, the instances it used are returned to the pool and reused by a different cluster.

TYPE: str | VariableType DEFAULT: None

is_pinned

boolean value specifying if the cluster is pinned (not pinned by default). You must be a Databricks administrator to use this. The pinned clusters' maximum number is limited to 100, so apply may fail if you have more than that (this number may change over time, so check Databricks documentation for actual number).

TYPE: bool | VariableType DEFAULT: None

is_single_node

When set to true, Databricks will automatically set single node related custom_tags, spark_conf, and num_workers.

TYPE: bool | VariableType DEFAULT: None

kind

The kind of compute described by this compute specification. Possible values (see API docs for full list): CLASSIC_PREVIEW (if corresponding public preview is enabled).

TYPE: str | VariableType DEFAULT: None

libraries

List of libraries specifications

TYPE: list[Any | VariableType] | VariableType DEFAULT: None

no_wait

If true, the provider will not wait for the cluster to reach RUNNING state when creating the cluster, allowing cluster creation and library installation to continue asynchronously. Defaults to false (the provider will wait for cluster creation and library installation to succeed).

TYPE: bool | VariableType DEFAULT: None

node_type_id

Any supported databricks.getNodeType id. If instance_pool_id is specified, this field is not needed.

TYPE: str | VariableType DEFAULT: None

num_workers

Number of worker nodes that this cluster should have. A cluster has one Spark driver and num_workers executors for a total of num_workers + 1 Spark nodes.

TYPE: int | VariableType DEFAULT: None

policy_id

TYPE: str | VariableType DEFAULT: None

remote_disk_throughput

TYPE: int | VariableType DEFAULT: None

runtime_engine

The type of runtime engine to use. If not specified, the runtime engine type is inferred based on the spark_version value

TYPE: str | VariableType DEFAULT: None

single_user_name

The optional user name of the user to assign to an interactive cluster. This field is required when using data_security_mode set to SINGLE_USER or AAD Passthrough for Azure Data Lake Storage (ADLS) with a single-user cluster (i.e., not high-concurrency clusters).

TYPE: str | VariableType DEFAULT: None

spark_conf

Map with key-value pairs to fine-tune Spark clusters, where you can provide custom Spark configuration properties in a cluster configuration.

TYPE: dict[str | VariableType, str | VariableType] | VariableType DEFAULT: {}

spark_env_vars

Map with environment variable key-value pairs to fine-tune Spark clusters. Key-value pairs of the form (X,Y) are exported (i.e., X='Y') while launching the driver and workers.

TYPE: dict[str | VariableType, str | VariableType] | VariableType DEFAULT: {}

spark_version

Runtime version of the cluster. Any supported databricks.getSparkVersion id. We advise using Cluster Policies to restrict the list of versions for simplicity while maintaining enough control.

TYPE: str | VariableType DEFAULT: None

ssh_public_keys

SSH public key contents that will be added to each Spark node in this cluster. The corresponding private keys can be used to login with the user name ubuntu on port 2200. You can specify up to 10 keys.

TYPE: list[str | VariableType] | VariableType DEFAULT: []

total_initial_remote_disk_size

TYPE: int | VariableType DEFAULT: None

use_ml_runtime

Whenever ML runtime should be selected or not. Actual runtime is determined by spark_version (DBR release), this field use_ml_runtime, and whether node_type_id is GPU node or not.

TYPE: bool | VariableType DEFAULT: None


laktory.models.resources.databricks.pipeline.PipelineDeployment ¤

Bases: BaseModel

PARAMETER DESCRIPTION
kind

The deployment method that manages the pipeline.

TYPE: str | VariableType

metadata_file_path

The path to the file containing metadata about the deployment.

TYPE: str | VariableType


laktory.models.resources.databricks.pipeline.PipelineEventLog ¤

Bases: BaseModel

PARAMETER DESCRIPTION
catalog

The UC catalog the event log is published under.

TYPE: str | VariableType

name

The table name the event log is published to in UC.

TYPE: str | VariableType

schema_

The UC schema the event log is published under.

TYPE: str | VariableType


laktory.models.resources.databricks.pipeline.PipelineFilters ¤

Bases: BaseModel

PARAMETER DESCRIPTION
excludes

Paths to exclude.

TYPE: str | VariableType

includes

Paths to include.

TYPE: str | VariableType


laktory.models.resources.databricks.pipeline.PipelineGatewayDefinition ¤

Bases: BaseModel

PARAMETER DESCRIPTION
connection_id

Immutable. The Unity Catalog connection this gateway pipeline uses to communicate with the source.

TYPE: str | VariableType DEFAULT: None

connection_name

TYPE: str | VariableType

gateway_storage_catalog

Required, Immutable. The name of the catalog for the gateway pipeline's storage location.

TYPE: str | VariableType

gateway_storage_name

Required. The Unity Catalog-compatible naming for the gateway storage location. This is the destination to use for the data that is extracted by the gateway. Delta Live Tables system will automatically create the storage location under the catalog and schema.

TYPE: str | VariableType DEFAULT: None

gateway_storage_schema

Required, Immutable. The name of the schema for the gateway pipelines's storage location.

TYPE: str | VariableType


laktory.models.resources.databricks.pipeline.PipelineLatestUpdate ¤

Bases: BaseModel

PARAMETER DESCRIPTION
creation_time

TYPE: str | VariableType DEFAULT: None

state

TYPE: str | VariableType DEFAULT: None

update_id

TYPE: str | VariableType DEFAULT: None


laktory.models.resources.databricks.pipeline.PipelineLibrary ¤

Bases: BaseModel

PARAMETER DESCRIPTION
file

File specifications

TYPE: str | VariableType DEFAULT: None

notebook

Notebook specifications

TYPE: PipelineLibraryNotebook | VariableType DEFAULT: None


laktory.models.resources.databricks.pipeline.PipelineLibraryFile ¤

Bases: BaseModel

PARAMETER DESCRIPTION
path

TYPE: str | VariableType


laktory.models.resources.databricks.pipeline.PipelineLibraryNotebook ¤

Bases: BaseModel

PARAMETER DESCRIPTION
path

Workspace notebook filepath

TYPE: str | VariableType


laktory.models.resources.databricks.pipeline.PipelineNotifications ¤

Bases: BaseModel

PARAMETER DESCRIPTION
alerts

Alert types

TYPE: list[Literal['on-update-success', 'on-update-failure', 'on-update-fatal-failure', 'on-flow-failure'] | VariableType] | VariableType

recipients

List of user/group/service principal names

TYPE: list[str | VariableType] | VariableType


laktory.models.resources.databricks.pipeline.PipelineRestartWindow ¤

Bases: BaseModel

PARAMETER DESCRIPTION
days_of_weeks

TYPE: list[str | VariableType] | VariableType DEFAULT: None

start_hour

TYPE: int | VariableType

time_zone_id

TYPE: str | VariableType DEFAULT: None


laktory.models.resources.databricks.pipeline.PipelineRunAs ¤

Bases: BaseModel

PARAMETER DESCRIPTION
service_principal_name

TYPE: str | VariableType DEFAULT: None

user_name

TYPE: str | VariableType DEFAULT: None


laktory.models.resources.databricks.pipeline.PipelineTrigger ¤

Bases: BaseModel

PARAMETER DESCRIPTION
cron

TYPE: PipelineTriggerCron | VariableType DEFAULT: None


laktory.models.resources.databricks.pipeline.PipelineTriggerCron ¤

Bases: BaseModel

PARAMETER DESCRIPTION
quartz_cron_schedule

TYPE: str | VariableType DEFAULT: None

timezone_id

TYPE: str | VariableType DEFAULT: None