Skip to content

Lakeflow Declarative Pipeline

laktory.models.pipeline.LakeflowDeclarativePipelineOrchestrator ¤

Bases: Pipeline, PipelineChild

Lakeflow Spark Declarative Pipeline used as an orchestrator to execute a Laktory pipeline.

LDP orchestrator does not support pipeline nodes with views (as opposed to materialized tables). Also, it does not support writing to multiple schemas within the same pipeline.

Selecting this orchestrator requires to add the supporting notebook to the stack.

References
BASE DESCRIPTION
allow_duplicate_names

Optional boolean flag. If false, deployment will fail if name conflicts with that of another pipeline. default is false

TYPE: bool | None | VariableType DEFAULT: None

budget_policy_id

optional string specifying ID of the budget policy for this Lakeflow Declarative Pipeline

TYPE: str | None | VariableType DEFAULT: None

catalog

The UC catalog the event log is published under

TYPE: str | None | VariableType DEFAULT: None

cause

TYPE: str | None | VariableType DEFAULT: None

channel

optional name of the release channel for Spark version used by Lakeflow Declarative Pipeline. Supported values are: CURRENT (default) and PREVIEW

TYPE: str | None | VariableType DEFAULT: None

cluster

TYPE: list[PipelineCluster] | None | VariableType DEFAULT: None

cluster_id

TYPE: str | None | VariableType DEFAULT: None

configuration

An optional list of values to apply to the entire pipeline. Elements must be formatted as key:value pairs. * library blocks - Specifies pipeline code

TYPE: dict[str, str] | None | VariableType DEFAULT: None

continuous

A flag indicating whether to run the pipeline continuously. The default value is false

TYPE: bool | None | VariableType DEFAULT: None

creator_user_name

TYPE: str | None | VariableType DEFAULT: None

deployment

Deployment type of this pipeline. Supports following attributes:

TYPE: PipelineDeployment | None | VariableType DEFAULT: None

development

A flag indicating whether to run the pipeline in development mode. The default value is false

TYPE: bool | None | VariableType DEFAULT: None

edition

optional name of the product edition. Supported values are: CORE, PRO, ADVANCED (default). Not required when serverless is set to true

TYPE: str | None | VariableType DEFAULT: None

environment

TYPE: PipelineEnvironment | None | VariableType DEFAULT: None

event_log

an optional block specifying a table where LDP Event Log will be stored. Consists of the following fields:

TYPE: PipelineEventLog | None | VariableType DEFAULT: None

expected_last_modified

TYPE: int | None | VariableType DEFAULT: None

filters

Filters on which Pipeline packages to include in the deployed graph. This block consists of following attributes:

TYPE: PipelineFilters | None | VariableType DEFAULT: None

gateway_definition

The definition of a gateway pipeline to support CDC. Consists of following attributes:

TYPE: PipelineGatewayDefinition | None | VariableType DEFAULT: None

health

TYPE: str | None | VariableType DEFAULT: None

ingestion_definition

TYPE: PipelineIngestionDefinition | None | VariableType DEFAULT: None

last_modified

TYPE: int | None | VariableType DEFAULT: None

latest_updates

TYPE: list[PipelineLatestUpdates] | None | VariableType DEFAULT: None

library

TYPE: list[PipelineLibrary] | None | VariableType DEFAULT: None

name

The table name the event log is published to in UC

TYPE: str | None | VariableType DEFAULT: None

notification

TYPE: list[PipelineNotification] | None | VariableType DEFAULT: None

photon

A flag indicating whether to use Photon engine. The default value is false

TYPE: bool | None | VariableType DEFAULT: None

restart_window

TYPE: PipelineRestartWindow | None | VariableType DEFAULT: None

root_path

An optional string specifying the root path for this pipeline. This is used as the root directory when editing the pipeline in the Databricks user interface and it is added to sys.path when executing Python sources during pipeline execution. * cluster blocks - Clusters to run the pipeline. If none is specified, pipelines will automatically select a default cluster configuration for the pipeline. Please note that Lakeflow Declarative Pipeline clusters are supporting only subset of attributes as described in documentation. Also, note that autoscale block is extended with the mode parameter that controls the autoscaling algorithm (possible values are ENHANCED for new, enhanced autoscaling algorithm, or LEGACY for old algorithm)

TYPE: str | None | VariableType DEFAULT: None

run_as

The user or the service principal the pipeline runs as. See run_as Configuration Block below

TYPE: PipelineRunAs | None | VariableType DEFAULT: None

run_as_user_name

TYPE: str | None | VariableType DEFAULT: None

schema_

The UC schema the event log is published under

TYPE: str | None | VariableType DEFAULT: None

serverless

An optional flag indicating if serverless compute should be used for this Lakeflow Declarative Pipeline. Requires catalog to be set, as it could be used only with Unity Catalog

TYPE: bool | None | VariableType DEFAULT: None

state

TYPE: str | None | VariableType DEFAULT: None

storage

A location on cloud storage where output data and metadata required for pipeline execution are stored. By default, tables are stored in a subdirectory of this location. Change of this parameter forces recreation of the pipeline. (Conflicts with catalog)

TYPE: str | None | VariableType DEFAULT: None

tags

A map of tags associated with the pipeline. These are forwarded to the cluster as cluster tags, and are therefore subject to the same limitations. A maximum of 25 tags can be added to the pipeline

TYPE: dict[str, str] | None | VariableType DEFAULT: None

target

The name of a database (in either the Hive metastore or in a UC catalog) for persisting pipeline output data. Configuring the target setting allows you to view and query the pipeline output data from the Databricks UI

TYPE: str | None | VariableType DEFAULT: None

timeouts

TYPE: PipelineTimeouts | None | VariableType DEFAULT: None

trigger

TYPE: PipelineTrigger | None | VariableType DEFAULT: None

url

URL of the Lakeflow Declarative Pipeline on the given workspace

TYPE: str | None | VariableType DEFAULT: None

usage_policy_id

TYPE: str | None | VariableType DEFAULT: None

LAKTORY DESCRIPTION
access_controls

Pipeline access controls

TYPE: list[AccessControl | VariableType] | VariableType DEFAULT: []

config_file

Pipeline configuration (json) file deployed to the workspace and used by the job to read and execute the pipeline.

TYPE: PipelineConfigWorkspaceFile | VariableType DEFAULT: PipelineConfigWorkspaceFile(dataframe_backend_=None, dataframe_api_=None, resource_options=ResourceOptions(variables={}, name=None, is_enabled=True, depends_on=[], provider=None, ignore_changes=None, import_=None, moved_from=None), lookup_existing=None, variables={}, content_base64=None, md5=None, object_id=None, access_controls=[AccessControl(variables={}, group_name='users', permission_level='CAN_READ', service_principal_name=None, user_name=None)], dirpath=None, path_=None, source_=None, dataframe_backend=<DataFrameBackends.PYSPARK: 'PYSPARK'>, dataframe_api='NARWHALS', source='/home/runner/.cache/laktory/pipelines/.json', path=None)

name_prefix

Prefix added to the DLP name

TYPE: str | VariableType DEFAULT: None

name_suffix

Suffix added to the DLP name

TYPE: str | VariableType DEFAULT: None

type

Type of orchestrator

TYPE: Literal['LAKEFLOW_DECLARATIVE_PIPELINE'] | VariableType DEFAULT: 'LAKEFLOW_DECLARATIVE_PIPELINE'

METHOD DESCRIPTION
to_dab_resource

Convert to a DABs Python Pipeline resource object for use with

ATTRIBUTE DESCRIPTION
additional_core_resources
  • configuration workspace file

TYPE: list

additional_core_resources property ¤

  • configuration workspace file
  • configuration workspace file permissions

to_dab_resource() ¤

Convert to a DABs Python Pipeline resource object for use with laktory.dab.build_resources.

RETURNS DESCRIPTION

databricks.bundles.pipelines.Pipeline instance.

Source code in laktory/models/pipeline/orchestrators/lakeflowdeclarativepipelineorchestrator.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def to_dab_resource(self):
    """
    Convert to a DABs Python Pipeline resource object for use with
    ``laktory.dab.build_resources``.

    Returns
    -------
    :
        ``databricks.bundles.pipelines.Pipeline`` instance.
    """
    from databricks.bundles.pipelines import Pipeline as DabsPipeline

    d = self.model_dump(
        exclude=self.terraform_excludes, exclude_unset=True, by_alias=False
    )
    # schema_ is a Python workaround for the reserved name; DABs expects "schema"
    if "schema_" in d:
        d["schema"] = d.pop("schema_")

    # Pipeline notebook
    source_filepath = (
        Path(__file__).parent.parent.parent.parent
        / "resources"
        / "scripts"
        / "laktory_ldp.py"
    )
    target_filepath = Path(settings.build_root) / "pipelines" / "laktory_ldp.py"
    shutil.copy(source_filepath, target_filepath)

    # Laktory pipelines use a common notebook (copied above). Its path is
    # hardcoded here, but should probably be hardcoded in the base resource as well.
    # TODO: Allow for other libraries?
    notebook_filepath = (
        Path("/Workspace" + settings.workspace_root) / "pipelines" / "laktory_ldp"
    )
    d["libraries"] = [{"notebook": {"path": notebook_filepath.as_posix()}}]

    return DabsPipeline.from_dict(d)