Skip to content

Databricks Job

laktory.models.pipeline.DatabricksJobOrchestrator ¤

Bases: Job, PipelineChild

Databricks job used as an orchestrator to execute a Laktory pipeline.

Job orchestrator supports incremental workloads with Spark Structured Streaming, but it does not support continuous processing.

References
BASE DESCRIPTION
always_running

(Bool) Whenever the job is always running, like a Spark Streaming application, on every update restart the current active run or start it again, if nothing it is not running. False by default. Any job runs are started with parameters specified in spark_jar_task or spark_submit_task or spark_python_task or notebook_task blocks

TYPE: bool | None | VariableType DEFAULT: None

budget_policy_id

The ID of the user-specified budget policy to use for this job. If not specified, a default budget policy may be applied when creating or modifying the job

TYPE: str | None | VariableType DEFAULT: None

continuous

TYPE: JobContinuous | None | VariableType DEFAULT: None

control_run_state

(Bool) If true, the Databricks provider will stop and start the job as needed to ensure that the active run for the job reflects the deployed configuration. For continuous jobs, the provider respects the pause_status by stopping the current active run. This flag cannot be set for non-continuous jobs

TYPE: bool | None | VariableType DEFAULT: None

dbt_task

TYPE: JobDbtTask | None | VariableType DEFAULT: None

deployment

TYPE: JobDeployment | None | VariableType DEFAULT: None

description

description for this task

TYPE: str | None | VariableType DEFAULT: None

edit_mode

If 'UI_LOCKED', the user interface for the job will be locked. If 'EDITABLE' (the default), the user interface will be editable

TYPE: str | None | VariableType DEFAULT: None

email_notifications

An optional block to specify a set of email addresses notified when this task begins, completes or fails. The default behavior is to not send any emails. This block is documented below

TYPE: JobEmailNotifications | None | VariableType DEFAULT: None

environment

TYPE: list[JobEnvironment] | None | VariableType DEFAULT: None

existing_cluster_id

Identifier of the interactive cluster to run job on. Note: running tasks on interactive clusters may lead to increased costs!

TYPE: str | None | VariableType DEFAULT: None

format

TYPE: str | None | VariableType DEFAULT: None

git_source

Specifies the a Git repository for task source code. See git_source Configuration Block below

TYPE: JobGitSource | None | VariableType DEFAULT: None

health

block described below that specifies health conditions for a given task

TYPE: JobHealth | None | VariableType DEFAULT: None

job_cluster

A list of job databricks_cluster specifications that can be shared and reused by tasks of this job. Libraries cannot be declared in a shared job cluster. You must declare dependent libraries in task settings. Multi-task syntax

TYPE: list[JobJobCluster] | None | VariableType DEFAULT: None

library

(Set) An optional list of libraries to be installed on the cluster that will execute the job

TYPE: list[JobLibrary] | None | VariableType DEFAULT: None

max_concurrent_runs

(Integer) An optional maximum allowed number of concurrent runs of the job. Defaults to 1

TYPE: int | None | VariableType DEFAULT: None

max_retries

(Integer) An optional maximum number of times to retry an unsuccessful run. A run is considered to be unsuccessful if it completes with a FAILED or INTERNAL_ERROR lifecycle state. The value -1 means to retry indefinitely and the value 0 means to never retry. The default behavior is to never retry. A run can have the following lifecycle state: PENDING, RUNNING, TERMINATING, TERMINATED, SKIPPED or INTERNAL_ERROR

TYPE: int | None | VariableType DEFAULT: None

min_retry_interval_millis

(Integer) An optional minimal interval in milliseconds between the start of the failed run and the subsequent retry run. The default behavior is that unsuccessful runs are immediately retried

TYPE: int | None | VariableType DEFAULT: None

name

The name of the defined parameter. May only contain alphanumeric characters, _, -, and .

TYPE: str | None | VariableType DEFAULT: None

new_cluster

Block with almost the same set of parameters as for databricks_cluster resource, except following (check the REST API documentation for full list of supported parameters):

TYPE: JobNewCluster | None | VariableType DEFAULT: None

notebook_task

TYPE: JobNotebookTask | None | VariableType DEFAULT: None

notification_settings

An optional block controlling the notification settings on the job level documented below

TYPE: JobNotificationSettings | None | VariableType DEFAULT: None

parameter

Specifies job parameter for the job. See parameter Configuration Block

TYPE: list[JobParameter] | None | VariableType DEFAULT: None

performance_target

The performance mode on a serverless job. The performance target determines the level of compute performance or cost-efficiency for the run. Supported values are: * PERFORMANCE_OPTIMIZED: (default value) Prioritizes fast startup and execution times through rapid scaling and optimized cluster performance. * STANDARD: Enables cost-efficient execution of serverless workloads

TYPE: str | None | VariableType DEFAULT: None

pipeline_task

TYPE: JobPipelineTask | None | VariableType DEFAULT: None

python_wheel_task

TYPE: JobPythonWheelTask | None | VariableType DEFAULT: None

queue

The queue status for the job. See queue Configuration Block below

TYPE: JobQueue | None | VariableType DEFAULT: None

retry_on_timeout

(Bool) An optional policy to specify whether to retry a job when it times out. The default behavior is to not retry on timeout

TYPE: bool | None | VariableType DEFAULT: None

run_as

The user or the service principal the job runs as. See run_as Configuration Block below

TYPE: JobRunAs | None | VariableType DEFAULT: None

run_job_task

TYPE: JobRunJobTask | None | VariableType DEFAULT: None

schedule

An optional periodic schedule for this job. The default behavior is that the job runs when triggered by clicking Run Now in the Jobs UI or sending an API request to runNow. See schedule Configuration Block below

TYPE: JobSchedule | None | VariableType DEFAULT: None

spark_jar_task

TYPE: JobSparkJarTask | None | VariableType DEFAULT: None

spark_python_task

TYPE: JobSparkPythonTask | None | VariableType DEFAULT: None

spark_submit_task

TYPE: JobSparkSubmitTask | None | VariableType DEFAULT: None

tags

An optional map of the tags associated with the job. See tags Configuration Map

TYPE: dict[str, str] | None | VariableType DEFAULT: None

task

Task to run against the inputs list

TYPE: list[JobTask] | None | VariableType DEFAULT: None

timeout_seconds

(Integer) An optional timeout applied to each run of this job. The default behavior is to have no timeout

TYPE: int | None | VariableType DEFAULT: None

timeouts

TYPE: JobTimeouts | None | VariableType DEFAULT: None

trigger

The conditions that triggers the job to start. See trigger Configuration Block below. * continuous- (Optional) Configuration block to configure pause status. See continuous Configuration Block

TYPE: JobTrigger | None | VariableType DEFAULT: None

usage_policy_id

TYPE: str | None | VariableType DEFAULT: None

webhook_notifications

(List) An optional set of system destinations (for example, webhook destinations or Slack) to be notified when runs of this task begins, completes or fails. The default behavior is to not send any notifications. This field is a block and is documented below

TYPE: JobWebhookNotifications | None | VariableType DEFAULT: None

LAKTORY DESCRIPTION
access_controls

Access controls list

TYPE: list[AccessControl | VariableType] | VariableType DEFAULT: []

config_file

Pipeline configuration (json) file deployed to the workspace and used by the job to read and execute the pipeline.

TYPE: PipelineConfigWorkspaceFile | VariableType DEFAULT: PipelineConfigWorkspaceFile(dataframe_backend_=None, dataframe_api_=None, resource_options=ResourceOptions(variables={}, name=None, is_enabled=True, depends_on=[], provider=None, ignore_changes=None, import_=None, moved_from=None), lookup_existing=None, variables={}, content_base64=None, md5=None, object_id=None, access_controls=[AccessControl(variables={}, group_name='users', permission_level='CAN_READ', service_principal_name=None, user_name=None)], dirpath=None, path_=None, source_=None, dataframe_backend=<DataFrameBackends.PYSPARK: 'PYSPARK'>, dataframe_api='NARWHALS', source='/home/runner/.cache/laktory/pipelines/.json', path=None)

name_prefix

Prefix added to the job name

TYPE: str | VariableType DEFAULT: None

name_suffix

Suffix added to the job name

TYPE: str | VariableType DEFAULT: None

node_max_retries

An optional maximum number of times to retry an unsuccessful run for each node.

TYPE: int | VariableType DEFAULT: None

serverless_environment_version

Serverless environment version

TYPE: str | VariableType DEFAULT: None

type

Type of orchestrator

TYPE: Literal['DATABRICKS_JOB'] | VariableType DEFAULT: 'DATABRICKS_JOB'

METHOD DESCRIPTION
to_dab_resource

Convert to a DABs Python Job resource object for use with

ATTRIBUTE DESCRIPTION
additional_core_resources
  • configuration workspace file

TYPE: list

additional_core_resources property ¤

  • configuration workspace file
  • configuration workspace file permissions

to_dab_resource() ¤

Convert to a DABs Python Job resource object for use with laktory.dab.build_resources.

RETURNS DESCRIPTION

databricks.bundles.jobs.Job instance.

Source code in laktory/models/pipeline/orchestrators/databricksjoborchestrator.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def to_dab_resource(self):
    """
    Convert to a DABs Python Job resource object for use with
    ``laktory.dab.build_resources``.

    Returns
    -------
    :
        ``databricks.bundles.jobs.Job`` instance.
    """
    from databricks.bundles.jobs import Job as DabsJob

    d = self.model_dump(
        exclude=self.terraform_excludes, exclude_unset=True, by_alias=False
    )
    for task in d.get("task", []):
        # schema_ is a Python workaround for the reserved name; DABs expects "schema"
        if "dbt_task" in task and "schema_" in task["dbt_task"]:
            task["dbt_task"]["schema"] = task["dbt_task"].pop("schema_")
        # DABs SDK uses plural names for list fields
        if "library" in task:
            task["libraries"] = task.pop("library")

    # DABs SDK uses plural names for top-level list fields
    for singular, plural in [
        ("task", "tasks"),
        ("job_cluster", "job_clusters"),
        ("environment", "environments"),
        ("parameter", "parameters"),
    ]:
        if singular in d:
            d[plural] = d.pop(singular)

    return DabsJob.from_dict(d)