Skip to content

Dispatcher

laktory.dispatcher.Dispatcher(stack=None, env=None) ¤

The dispatcher is a manager that can be used to run and monitor remote jobs and DLT pipelines defined in a stack. It is generally used through Laktory CLI run command, but may be used directly in scripts and python programs.

PARAMETER DESCRIPTION
stack

Stack object

TYPE: Stack DEFAULT: None

env

Selected environment

TYPE: str DEFAULT: None

Examples:

```py tag:skip-run from laktory import Dispatcher from laktory import models

with open("./stack.yaml") as fp: stack = models.Stack.model_validate_yaml(fp)

dispatcher = Dispatcher(stack=stack) dispatcher.get_resource_ids() pl = dispatcher.resources["pl-stock-prices"] job = dispatcher.resources["job-stock-prices"]

Run pipeline¤

pl.run(current_run_action="WAIT", full_refresh=False)

Run job¤

job.run(current_run_action="CANCEL") ```

METHOD DESCRIPTION
get_resource_ids

Get resource ids for each of the resources defined in the stack in the

init_resources

Set resource for each of the resources defined in the stack

run_databricks_dlt

Run Databricks pipeline with name dlt_name

run_databricks_job

Run job with name job_name

ATTRIBUTE DESCRIPTION
env

Selected environment

TYPE: str

wc

Databricks Workspace Client

TYPE: WorkspaceClient

Source code in laktory/dispatcher/dispatcher.py
49
50
51
52
53
54
55
def __init__(self, stack: Stack = None, env: str = None):
    self.stack = stack
    self._env = env
    self._wc = None
    self.resources = {}

    self.init_resources()

env property writable ¤

Selected environment

wc property ¤

Databricks Workspace Client

get_resource_ids(env=None) ¤

Get resource ids for each of the resources defined in the stack in the provided environment env.

Source code in laktory/dispatcher/dispatcher.py
179
180
181
182
183
184
185
186
187
188
def get_resource_ids(self, env=None):
    """
    Get resource ids for each of the resources defined in the stack in the
    provided environment `env`.
    """
    if env is not None:
        self.env = env

    for r in self.resources.values():
        r.get_id()

init_resources() ¤

Set resource for each of the resources defined in the stack

Source code in laktory/dispatcher/dispatcher.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def init_resources(self):
    """Set resource for each of the resources defined in the stack"""

    from laktory.models.pipeline.orchestrators.databricksjoborchestrator import (
        DatabricksJobOrchestrator,
    )
    from laktory.models.pipeline.orchestrators.databrickspipelineorchestrator import (
        DatabricksPipelineOrchestrator,
    )

    for k, pl in self.stack.resources.pipelines.items():
        if not pl.options.is_enabled:
            continue

        if isinstance(pl.orchestrator, DatabricksPipelineOrchestrator):
            self.resources[pl.orchestrator.name] = DatabricksPipelineRunner(
                dispatcher=self, name=pl.orchestrator.name
            )

        if isinstance(pl.orchestrator, DatabricksJobOrchestrator):
            self.resources[pl.orchestrator.name] = DatabricksJobRunner(
                dispatcher=self, name=pl.orchestrator.name
            )

    for k, pl in self.stack.resources.databricks_pipelines.items():
        if not pl.options.is_enabled:
            continue
        self.resources[pl.name] = DatabricksPipelineRunner(
            dispatcher=self, name=pl.name
        )

    for k, job in self.stack.resources.databricks_jobs.items():
        if not job.options.is_enabled:
            continue
        self.resources[job.name] = DatabricksJobRunner(
            dispatcher=self, name=job.name
        )

run_databricks_dlt(dlt_name, *args, **kwargs) ¤

Run Databricks pipeline with name dlt_name

PARAMETER DESCRIPTION
dlt_name

Name of the DLT pipeline

TYPE: str

*args

Arguments passed to JobRunner.run()

DEFAULT: ()

**kwargs

Keyword arguments passed to JobRunner.run()

DEFAULT: {}

Source code in laktory/dispatcher/dispatcher.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def run_databricks_dlt(self, dlt_name: str, *args, **kwargs):
    """
    Run Databricks pipeline with name `dlt_name`

    Parameters
    ----------
    dlt_name:
        Name of the DLT pipeline
    *args:
        Arguments passed to `JobRunner.run()`
    **kwargs:
        Keyword arguments passed to `JobRunner.run()`
    """
    pl = self.resources[dlt_name]
    pl.run(*args, **kwargs)

run_databricks_job(job_name, *args, **kwargs) ¤

Run job with name job_name

PARAMETER DESCRIPTION
job_name

Name of the job

TYPE: str

*args

Arguments passed to JobRunner.run()

DEFAULT: ()

**kwargs

Keyword arguments passed to JobRunner.run()

DEFAULT: {}

Source code in laktory/dispatcher/dispatcher.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def run_databricks_job(self, job_name: str, *args, **kwargs):
    """
    Run job with name `job_name`

    Parameters
    ----------
    job_name:
        Name of the job
    *args:
        Arguments passed to `JobRunner.run()`
    **kwargs:
        Keyword arguments passed to `JobRunner.run()`
    """
    job = self.resources[job_name]
    job.run(*args, **kwargs)