Skip to content

Databricks Job

laktory.models.pipeline.orchestrators.databricksjoborchestrator.DatabricksJobOrchestrator ¤

Bases: Job, PipelineChild

Databricks job used as an orchestrator to execute a Laktory pipeline.

Job orchestrator supports incremental workloads with Spark Structured Streaming, but it does not support continuous processing.

PARAMETER DESCRIPTION
dataframe_backend_

Type of DataFrame backend

TYPE: DataFrameBackends | VariableType DEFAULT: None

dataframe_api_

DataFrame API to use in DataFrame Transformer nodes. Either 'NATIVE' (backend-specific) or 'NARWHALS' (backend-agnostic).

TYPE: Literal['NARWHALS', 'NATIVE'] | VariableType DEFAULT: None

resource_name_

Name of the resource in the context of infrastructure as code. If None, default_resource_name will be used instead.

TYPE: str | VariableType DEFAULT: None

options

Resources options specifications

TYPE: ResourceOptions | VariableType DEFAULT: ResourceOptions(variables={}, is_enabled=True, depends_on=[], provider=None, ignore_changes=None, aliases=None, delete_before_replace=True, import_=None, parent=None, replace_on_changes=None, moved_from=None)

lookup_existing

Specifications for looking up existing resource. Other attributes will be ignored.

TYPE: JobLookup | VariableType DEFAULT: None

variables

Dict of variables to be injected in the model at runtime

TYPE: dict[str, Any] DEFAULT: {}

access_controls

Access controls list

TYPE: list[Union[AccessControl, VariableType]] | VariableType DEFAULT: []

continuous

Continuous specifications

TYPE: JobContinuous | VariableType DEFAULT: None

control_run_state

If True, the Databricks provider will stop and start the job as needed to ensure that the active run for the job reflects the deployed configuration. For continuous jobs, the provider respects the pause_status by stopping the current active run. This flag cannot be set for non-continuous jobs.

TYPE: bool | VariableType DEFAULT: None

description

An optional description for the job. The maximum length is 1024 characters in UTF-8 encoding.

TYPE: str | VariableType DEFAULT: None

email_notifications

An optional set of email addresses notified when runs of this job begins, completes or fails. The default behavior is to not send any emails. This field is a block and is documented below.

TYPE: JobEmailNotifications | VariableType DEFAULT: None

environments

List of environments available for the tasks.

TYPE: list[Union[JobEnvironment, VariableType]] | VariableType DEFAULT: None

format

TYPE: str | VariableType DEFAULT: None

git_source

Specifies a Git repository for task source code.

TYPE: JobGitSource | VariableType DEFAULT: None

health

Health specifications

TYPE: JobHealth | VariableType DEFAULT: None

job_clusters

A list of job databricks.Cluster specifications that can be shared and reused by tasks of this job. Libraries cannot be declared in a shared job cluster. You must declare dependent libraries in task settings.

TYPE: list[Union[JobJobCluster, VariableType]] | VariableType DEFAULT: []

max_concurrent_runs

An optional maximum allowed number of concurrent runs of the job. Defaults to 1.

TYPE: int | VariableType DEFAULT: None

max_retries

An optional maximum number of times to retry an unsuccessful run. A run is considered to be unsuccessful if it completes with a FAILED or INTERNAL_ERROR lifecycle state. The value -1 means to retry indefinitely and the value 0 means to never retry. The default behavior is to never retry. A run can have the following lifecycle state: PENDING, RUNNING, TERMINATING, TERMINATED, SKIPPED or INTERNAL_ERROR.

TYPE: int | VariableType DEFAULT: None

min_retry_interval_millis

An optional minimal interval in milliseconds between the start of the failed run and the subsequent retry run. The default behavior is that unsuccessful runs are immediately retried.

TYPE: int | VariableType DEFAULT: None

name

Name of the job

TYPE: str | VariableType DEFAULT: None

name_prefix

Prefix added to the job name

TYPE: str | VariableType DEFAULT: None

name_suffix

Suffix added to the job name

TYPE: str | VariableType DEFAULT: None

notification_settings

Notifications specifications

TYPE: JobNotificationSettings | VariableType DEFAULT: None

parameters

Parameters specifications

TYPE: list[Union[JobParameter, VariableType]] | VariableType DEFAULT: []

queue

TYPE: JobQueue | VariableType DEFAULT: None

retry_on_timeout

An optional policy to specify whether to retry a job when it times out. The default behavior is to not retry on timeout.

TYPE: bool | VariableType DEFAULT: None

run_as

Run as specifications

TYPE: JobRunAs | VariableType DEFAULT: None

schedule

Schedule specifications

TYPE: JobSchedule | VariableType DEFAULT: None

tags

Tags as key, value pairs

TYPE: dict[Union[str, VariableType], Union[Any, VariableType]] | VariableType DEFAULT: {}

tasks

Tasks specifications

TYPE: list[Union[JobTask, VariableType]] | VariableType DEFAULT: []

timeout_seconds

An optional timeout applied to each run of this job. The default behavior is to have no timeout.

TYPE: int | VariableType DEFAULT: None

trigger

Trigger specifications

TYPE: JobTrigger | VariableType DEFAULT: None

webhook_notifications

Webhook notifications specifications

TYPE: JobWebhookNotifications | VariableType DEFAULT: None

type

Type of orchestrator

TYPE: Literal['DATABRICKS_JOB'] | VariableType DEFAULT: 'DATABRICKS_JOB'

config_file

Pipeline configuration (json) file deployed to the workspace and used by the job to read and execute the pipeline.

TYPE: PipelineConfigWorkspaceFile | VariableType DEFAULT: PipelineConfigWorkspaceFile(dataframe_backend_=None, dataframe_api_=None, resource_name_=None, options=ResourceOptions(variables={}, is_enabled=True, depends_on=[], provider=None, ignore_changes=None, aliases=None, delete_before_replace=True, import_=None, parent=None, replace_on_changes=None, moved_from=None), lookup_existing=None, variables={}, access_controls=[AccessControl(variables={}, group_name='users', permission_level='CAN_READ', service_principal_name=None, user_name=None)], dirpath=None, path=None, rootpath=None, source=None, content_base64=None, dataframe_backend=<DataFrameBackends.PYSPARK: 'PYSPARK'>, dataframe_api='NARWHALS')

node_max_retries

An optional maximum number of times to retry an unsuccessful run for each node.

TYPE: int | VariableType DEFAULT: None

References
METHOD DESCRIPTION
inject_vars

Inject model variables values into a model attributes.

inject_vars_into_dump

Inject model variables values into a model dump.

model_validate_json_file

Load model from json file object

model_validate_yaml

Load model from yaml file object using laktory.yaml.RecursiveLoader. Supports

push_vars

Push variable values to all child recursively

validate_assignment_disabled

Updating a model attribute inside a model validator when validate_assignment

ATTRIBUTE DESCRIPTION
additional_core_resources
  • configuration workspace file

TYPE: list[PulumiResource]

core_resources

List of core resources to be deployed with this laktory model:

default_resource_name

Resource default name constructed as

TYPE: str

pulumi_renames

Map of fields to rename when dumping model to pulumi

TYPE: dict[str, str]

resource_key

Resource key used to build default resource name. Equivalent to

TYPE: str

self_as_core_resources

Flag set to True if self must be included in core resources

terraform_renames

Map of fields to rename when dumping model to terraform

TYPE: dict[str, str]

additional_core_resources property ¤

  • configuration workspace file
  • configuration workspace file permissions

core_resources property ¤

List of core resources to be deployed with this laktory model: - class instance (self)

default_resource_name property ¤

Resource default name constructed as - {self.resource_type_id}-{self.resource_key} - removing ${resources....} tags - removing ${vars....} tags - Replacing special characters with - to avoid conflicts with resource properties

pulumi_renames property ¤

Map of fields to rename when dumping model to pulumi

resource_key property ¤

Resource key used to build default resource name. Equivalent to name properties if available. Otherwise, empty string.

self_as_core_resources property ¤

Flag set to True if self must be included in core resources

terraform_renames property ¤

Map of fields to rename when dumping model to terraform

inject_vars(inplace=False, vars=None) ¤

Inject model variables values into a model attributes.

PARAMETER DESCRIPTION
inplace

If True model is modified in place. Otherwise, a new model instance is returned.

TYPE: bool DEFAULT: False

vars

A dictionary of variables to be injected in addition to the model internal variables.

TYPE: dict DEFAULT: None

RETURNS DESCRIPTION

Model instance.

Examples:

from typing import Union

from laktory import models


class Cluster(models.BaseModel):
    name: str = None
    size: Union[int, str] = None


c = Cluster(
    name="cluster-${vars.my_cluster}",
    size="${{ 4 if vars.env == 'prod' else 2 }}",
    variables={
        "env": "dev",
    },
).inject_vars()
print(c)
# > variables={'env': 'dev'} name='cluster-${vars.my_cluster}' size=2
References
Source code in laktory/models/basemodel.py
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
def inject_vars(self, inplace: bool = False, vars: dict = None):
    """
    Inject model variables values into a model attributes.

    Parameters
    ----------
    inplace:
        If `True` model is modified in place. Otherwise, a new model
        instance is returned.
    vars:
        A dictionary of variables to be injected in addition to the
        model internal variables.


    Returns
    -------
    :
        Model instance.

    Examples
    --------
    ```py
    from typing import Union

    from laktory import models


    class Cluster(models.BaseModel):
        name: str = None
        size: Union[int, str] = None


    c = Cluster(
        name="cluster-${vars.my_cluster}",
        size="${{ 4 if vars.env == 'prod' else 2 }}",
        variables={
            "env": "dev",
        },
    ).inject_vars()
    print(c)
    # > variables={'env': 'dev'} name='cluster-${vars.my_cluster}' size=2
    ```

    References
    ----------
    * [variables](https://www.laktory.ai/concepts/variables/)
    """

    # Fetching vars
    if vars is None:
        vars = {}
    vars = deepcopy(vars)
    vars.update(self.variables)

    # Create copy
    if not inplace:
        self = self.model_copy(deep=True)

    # Inject into field values
    for k in list(self.model_fields_set):
        if k == "variables":
            continue
        o = getattr(self, k)

        if isinstance(o, BaseModel) or isinstance(o, dict) or isinstance(o, list):
            # Mutable objects will be updated in place
            _resolve_values(o, vars)
        else:
            # Simple objects must be updated explicitly
            setattr(self, k, _resolve_value(o, vars))

    # Inject into child resources
    if hasattr(self, "core_resources"):
        for r in self.core_resources:
            if r == self:
                continue
            r.inject_vars(vars=vars, inplace=True)

    if not inplace:
        return self

inject_vars_into_dump(dump, inplace=False, vars=None) ¤

Inject model variables values into a model dump.

PARAMETER DESCRIPTION
dump

Model dump (or any other general purpose mutable object)

TYPE: dict[str, Any]

inplace

If True model is modified in place. Otherwise, a new model instance is returned.

TYPE: bool DEFAULT: False

vars

A dictionary of variables to be injected in addition to the model internal variables.

TYPE: dict[str, Any] DEFAULT: None

RETURNS DESCRIPTION

Model dump with injected variables.

Examples:

from laktory import models

m = models.BaseModel(
    variables={
        "env": "dev",
    },
)
data = {
    "name": "cluster-${vars.my_cluster}",
    "size": "${{ 4 if vars.env == 'prod' else 2 }}",
}
print(m.inject_vars_into_dump(data))
# > {'name': 'cluster-${vars.my_cluster}', 'size': 2}
References
Source code in laktory/models/basemodel.py
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
def inject_vars_into_dump(
    self, dump: dict[str, Any], inplace: bool = False, vars: dict[str, Any] = None
):
    """
    Inject model variables values into a model dump.

    Parameters
    ----------
    dump:
        Model dump (or any other general purpose mutable object)
    inplace:
        If `True` model is modified in place. Otherwise, a new model
        instance is returned.
    vars:
        A dictionary of variables to be injected in addition to the
        model internal variables.


    Returns
    -------
    :
        Model dump with injected variables.


    Examples
    --------
    ```py
    from laktory import models

    m = models.BaseModel(
        variables={
            "env": "dev",
        },
    )
    data = {
        "name": "cluster-${vars.my_cluster}",
        "size": "${{ 4 if vars.env == 'prod' else 2 }}",
    }
    print(m.inject_vars_into_dump(data))
    # > {'name': 'cluster-${vars.my_cluster}', 'size': 2}
    ```

    References
    ----------
    * [variables](https://www.laktory.ai/concepts/variables/)
    """

    # Setting vars
    if vars is None:
        vars = {}
    vars = deepcopy(vars)
    vars.update(self.variables)

    # Create copy
    if not inplace:
        dump = copy.deepcopy(dump)

    # Inject into field values
    _resolve_values(dump, vars)

    if not inplace:
        return dump

model_validate_json_file(fp) classmethod ¤

Load model from json file object

PARAMETER DESCRIPTION
fp

file object structured as a json file

TYPE: TextIO

RETURNS DESCRIPTION
Model

Model instance

Source code in laktory/models/basemodel.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
@classmethod
def model_validate_json_file(cls: Type[Model], fp: TextIO) -> Model:
    """
    Load model from json file object

    Parameters
    ----------
    fp:
        file object structured as a json file

    Returns
    -------
    :
        Model instance
    """
    data = json.load(fp)
    return cls.model_validate(data)

model_validate_yaml(fp) classmethod ¤

Load model from yaml file object using laktory.yaml.RecursiveLoader. Supports reference to external yaml and sql files using !use, !extend and !update tags. Path to external files can be defined using model or environment variables.

Referenced path should always be relative to the file they are referenced from.

Custom Tags
  • !use {filepath}: Directly inject the content of the file at filepath

  • - !extend {filepath}: Extend the current list with the elements found in the file at filepath. Similar to python list.extend method.

  • <<: !update {filepath}: Merge the current dictionary with the content of the dictionary defined at filepath. Similar to python dict.update method.

PARAMETER DESCRIPTION
fp

file object structured as a yaml file

TYPE: TextIO

RETURNS DESCRIPTION
Model

Model instance

Examples:

businesses:
  apple:
    symbol: aapl
    address: !use addresses.yaml
    <<: !update common.yaml
    emails:
      - jane.doe@apple.com
      - extend! emails.yaml
  amazon:
    symbol: amzn
    address: !use addresses.yaml
    <<: update! common.yaml
    emails:
      - john.doe@amazon.com
      - extend! emails.yaml
Source code in laktory/models/basemodel.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
@classmethod
def model_validate_yaml(cls: Type[Model], fp: TextIO) -> Model:
    """
    Load model from yaml file object using laktory.yaml.RecursiveLoader. Supports
    reference to external yaml and sql files using `!use`, `!extend` and `!update` tags.
    Path to external files can be defined using model or environment variables.

    Referenced path should always be relative to the file they are referenced from.

    Custom Tags
    -----------
    - `!use {filepath}`:
        Directly inject the content of the file at `filepath`

    - `- !extend {filepath}`:
        Extend the current list with the elements found in the file at `filepath`.
        Similar to python list.extend method.

    - `<<: !update {filepath}`:
        Merge the current dictionary with the content of the dictionary defined at
        `filepath`. Similar to python dict.update method.

    Parameters
    ----------
    fp:
        file object structured as a yaml file

    Returns
    -------
    :
        Model instance

    Examples
    --------
    ```yaml
    businesses:
      apple:
        symbol: aapl
        address: !use addresses.yaml
        <<: !update common.yaml
        emails:
          - jane.doe@apple.com
          - extend! emails.yaml
      amazon:
        symbol: amzn
        address: !use addresses.yaml
        <<: update! common.yaml
        emails:
          - john.doe@amazon.com
          - extend! emails.yaml
    ```
    """

    data = RecursiveLoader.load(fp)
    return cls.model_validate(data)

push_vars(update_core_resources=False) ¤

Push variable values to all child recursively

Source code in laktory/models/basemodel.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
def push_vars(self, update_core_resources=False) -> Any:
    """Push variable values to all child recursively"""

    def _update_model(m):
        if not isinstance(m, BaseModel):
            return
        for k, v in self.variables.items():
            m.variables[k] = m.variables.get(k, v)
        m.push_vars()

    def _push_vars(o):
        if isinstance(o, list):
            for _o in o:
                _push_vars(_o)
        elif isinstance(o, dict):
            for _o in o.values():
                _push_vars(_o)
        else:
            _update_model(o)

    for k in self.model_fields.keys():
        _push_vars(getattr(self, k))

    if update_core_resources and hasattr(self, "core_resources"):
        for r in self.core_resources:
            if r != self:
                _push_vars(r)

    return None

validate_assignment_disabled() ¤

Updating a model attribute inside a model validator when validate_assignment is True causes an infinite recursion by design and must be turned off temporarily.

Source code in laktory/models/basemodel.py
336
337
338
339
340
341
342
343
344
345
346
347
348
@contextmanager
def validate_assignment_disabled(self):
    """
    Updating a model attribute inside a model validator when `validate_assignment`
    is `True` causes an infinite recursion by design and must be turned off
    temporarily.
    """
    original_state = self.model_config["validate_assignment"]
    self.model_config["validate_assignment"] = False
    try:
        yield
    finally:
        self.model_config["validate_assignment"] = original_state