Skip to content

Databricks Pipeline

laktory.models.pipeline.DatabricksPipelineOrchestrator ¤

Bases: Pipeline, PipelineChild

Databricks Pipeline used as an orchestrator to execute a Laktory pipeline.

DLT orchestrator does not support pipeline nodes with views (as opposed to materialized tables). Also, it does not support writing to multiple schemas within the same pipeline.

Selecting this orchestrator requires to add the supporting notebook to the stack.

References
PARAMETER DESCRIPTION
access_controls

Pipeline access controls

TYPE: list[AccessControl | VariableType] | VariableType DEFAULT: []

allow_duplicate_names

If False, deployment will fail if name conflicts with that of another pipeline.

TYPE: bool | VariableType DEFAULT: None

budget_policy_id

optional string specifying ID of the budget policy for this DLT pipeline.

TYPE: str | VariableType DEFAULT: None

catalog

Name of the unity catalog storing the pipeline tables

TYPE: str | None | VariableType DEFAULT: None

cause

TYPE: str | VariableType DEFAULT: None

channel

Name of the release channel for Spark version used by DLT pipeline.

TYPE: Literal['CURRENT', 'PREVIEW'] | VariableType DEFAULT: 'PREVIEW'

cluster_id

TYPE: str | VariableType DEFAULT: None

clusters

Clusters to run the pipeline. If none is specified, pipelines will automatically select a default cluster configuration for the pipeline.

TYPE: list[PipelineCluster | VariableType] | VariableType DEFAULT: []

config_file

Pipeline configuration (json) file deployed to the workspace and used by the job to read and execute the pipeline.

TYPE: PipelineConfigWorkspaceFile | VariableType DEFAULT: PipelineConfigWorkspaceFile(dataframe_backend_=None, dataframe_api_=None, resource_name_=None, options=ResourceOptions(variables={}, is_enabled=True, depends_on=[], provider=None, ignore_changes=None, aliases=None, delete_before_replace=True, import_=None, parent=None, replace_on_changes=None, moved_from=None), lookup_existing=None, variables={}, access_controls=[AccessControl(variables={}, group_name='users', permission_level='CAN_READ', service_principal_name=None, user_name=None)], dirpath=None, path_=None, source_=None, content_base64=None, dataframe_backend=<DataFrameBackends.PYSPARK: 'PYSPARK'>, dataframe_api='NARWHALS', source='/home/runner/.cache/laktory/pipelines/.json', path=None)

configuration

List of values to apply to the entire pipeline. Elements must be formatted as key:value pairs

TYPE: dict[str | VariableType, str | VariableType] | VariableType DEFAULT: {}

continuous

If True, the pipeline is run continuously.

TYPE: bool | VariableType DEFAULT: None

creator_user_name

TYPE: str | VariableType DEFAULT: None

deployment

Deployment type of this pipeline.

TYPE: PipelineDeployment | VariableType DEFAULT: None

development

If True the pipeline is run in development mode

TYPE: bool | VariableType DEFAULT: None

edition

Name of the product edition

TYPE: Literal['CORE', 'PRO', 'ADVANCED'] | VariableType DEFAULT: None

event_log

An optional block specifying a table where DLT Event Log will be stored.

TYPE: PipelineEventLog | VariableType DEFAULT: None

expected_last_modified

TYPE: int | VariableType DEFAULT: None

filters

Filters on which Pipeline packages to include in the deployed graph.

TYPE: PipelineFilters | VariableType DEFAULT: None

gateway_definition

The definition of a gateway pipeline to support CDC.

TYPE: PipelineGatewayDefinition | VariableType DEFAULT: None

health

TYPE: str | VariableType DEFAULT: None

ingestion_definition

Lakeflow Ingestion Pipeline definition

TYPE: PipelineIngestionDefinition | VariableType DEFAULT: None

last_modified

TYPE: int | VariableType DEFAULT: None

latest_updates

TYPE: list[PipelineLatestUpdate | VariableType] | VariableType DEFAULT: None

libraries

Specifies pipeline code (notebooks) and required artifacts.

TYPE: list[PipelineLibrary | VariableType] | VariableType DEFAULT: None

name

Pipeline name

TYPE: str | VariableType

name_prefix

Prefix added to the DLT pipeline name

TYPE: str | VariableType DEFAULT: None

name_suffix

Suffix added to the DLT pipeline name

TYPE: str | VariableType DEFAULT: None

notifications

Notifications specifications

TYPE: list[PipelineNotifications | VariableType] | VariableType DEFAULT: []

photon

If True, Photon engine enabled.

TYPE: bool | VariableType DEFAULT: None

restart_window

TYPE: PipelineRestartWindow | VariableType DEFAULT: None

root_path

An optional string specifying the root path for this pipeline. This is used as the root directory when editing the pipeline in the Databricks user interface and it is added to sys.path when executing Python sources during pipeline execution.

TYPE: str | VariableType DEFAULT: None

run_as

TYPE: PipelineRunAs | VariableType DEFAULT: None

run_as_user_name

TYPE: str | VariableType DEFAULT: None

schema_

The default schema (database) where tables are read from or published to. The presence of this attribute implies that the pipeline is in direct publishing mode.

TYPE: str | VariableType DEFAULT: None

serverless

If True, serverless is enabled

TYPE: bool | VariableType DEFAULT: None

state

TYPE: str | VariableType DEFAULT: None

storage

A location on DBFS or cloud storage where output data and metadata required for pipeline execution are stored. By default, tables are stored in a subdirectory of this location. Change of this parameter forces recreation of the pipeline. (Conflicts with catalog).

TYPE: str | VariableType DEFAULT: None

tags

A map of tags associated with the pipeline. These are forwarded to the cluster as cluster tags, and are therefore subject to the same limitations. A maximum of 25 tags can be added to the pipeline.

TYPE: dict[str | VariableType, str | VariableType] | VariableType DEFAULT: None

target

The name of a database (in either the Hive metastore or in a UC catalog) for persisting pipeline output data. Configuring the target setting allows you to view and query the pipeline output data from the Databricks UI.

TYPE: str | VariableType DEFAULT: None

trigger

TYPE: PipelineTrigger | VariableType DEFAULT: None

type

Type of orchestrator

TYPE: Literal['DATABRICKS_PIPELINE'] | VariableType DEFAULT: 'DATABRICKS_PIPELINE'

url

URL of the DLT pipeline on the given workspace.

TYPE: str | VariableType DEFAULT: None

METHOD DESCRIPTION
to_dab_resource

Convert to a DABs Python Pipeline resource object for use with

ATTRIBUTE DESCRIPTION
additional_core_resources
  • configuration workspace file

TYPE: list[PulumiResource]

additional_core_resources property ¤

  • configuration workspace file
  • configuration workspace file permissions

to_dab_resource() ¤

Convert to a DABs Python Pipeline resource object for use with laktory.dab.build_resources.

RETURNS DESCRIPTION

databricks.bundles.pipelines.Pipeline instance.

Source code in laktory/models/pipeline/orchestrators/databrickspipelineorchestrator.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def to_dab_resource(self):
    """
    Convert to a DABs Python Pipeline resource object for use with
    ``laktory.dab.build_resources``.

    Returns
    -------
    :
        ``databricks.bundles.pipelines.Pipeline`` instance.
    """
    from databricks.bundles.pipelines import Pipeline as DabsPipeline

    d = self.model_dump(
        exclude=self.terraform_excludes, exclude_unset=True, by_alias=True
    )
    for k, v in self.terraform_renames.items():
        if k in d:
            d[v] = d.pop(k)

    # Pipeline notebook
    source_filepath = (
        Path(__file__).parent.parent.parent.parent
        / "resources"
        / "quickstart-stacks"
        / "workflows"
        / "workspacefiles"
        / "notebooks"
        / "dlt_laktory_pl.py"
    )
    target_filepath = Path(settings.build_root) / "pipelines" / "dlt_laktory_pl.py"
    shutil.copy(source_filepath, target_filepath)

    # Laktory pipelines use a common notebook (copied above). Its path is
    # hardcoded here, but should probably be hardcoded in the base resource as well.
    # TODO: Allow for other libraries?
    notebook_filepath = (
        Path("/Workspace" + settings.workspace_root)
        / "pipelines"
        / "dlt_laktory_pl"
    )
    d["libraries"] = [{"notebook": {"path": notebook_filepath.as_posix()}}]

    return DabsPipeline.from_dict(d)