Skip to content

Dispatcher

laktory.dispatcher.Dispatcher ¤

Dispatcher(stack=None, env=None)

The dispatcher is a manager that can be used to run and monitor remote jobs and DLT pipelines defined in a stack. It is generally used through Laktory CLI run command, but may be used directly in scripts and python programs.

PARAMETER DESCRIPTION
stack

Stack object

TYPE: Stack DEFAULT: None

env

Selected environment

TYPE: str DEFAULT: None

Examples:

```py tag:skip-run from laktory import Dispatcher from laktory import models

with open("./stack.yaml") as fp: stack = models.Stack.model_validate_yaml(fp)

dispatcher = Dispatcher(stack=stack) dispatcher.get_resource_ids() pl = dispatcher.resources["pl-stock-prices"] job = dispatcher.resources["job-stock-prices"]

Run pipeline¤

pl.run(current_run_action="WAIT", full_refresh=False)

Run job¤

job.run(current_run_action="CANCEL") ```

METHOD DESCRIPTION
init_resources

Set resource for each of the resources defined in the stack

get_resource_ids

Get resource ids for each of the resources defined in the stack in the

run_job

Run job with name job_name

run_dlt

Run DLT pipeline with name dlt_name

ATTRIBUTE DESCRIPTION
env

Selected environment

TYPE: str

wc

Databricks Workspace Client

TYPE: WorkspaceClient

Source code in laktory/dispatcher/dispatcher.py
51
52
53
54
55
56
57
def __init__(self, stack: Stack = None, env: str = None):
    self.stack = stack
    self._env = env
    self._wc = None
    self.resources = {}

    self.init_resources()

Attributes¤

env property writable ¤

env

Selected environment

wc property ¤

wc

Databricks Workspace Client

Functions¤

init_resources ¤

init_resources()

Set resource for each of the resources defined in the stack

Source code in laktory/dispatcher/dispatcher.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def init_resources(self):
    """Set resource for each of the resources defined in the stack"""

    for k, pl in self.stack.resources.pipelines.items():
        if not pl.options.is_enabled:
            continue

        if pl.databricks_dlt is not None:
            self.resources[pl.databricks_dlt.name] = DLTPipelineRunner(
                dispatcher=self, name=pl.databricks_dlt.name
            )

        if pl.databricks_job is not None:
            self.resources[pl.databricks_job.name] = JobRunner(
                dispatcher=self, name=pl.databricks_job.name
            )

    for k, pl in self.stack.resources.databricks_dltpipelines.items():
        if not pl.options.is_enabled:
            continue
        self.resources[pl.name] = DLTPipelineRunner(dispatcher=self, name=pl.name)

    for k, job in self.stack.resources.databricks_jobs.items():
        if not job.options.is_enabled:
            continue
        self.resources[job.name] = JobRunner(dispatcher=self, name=job.name)

get_resource_ids ¤

get_resource_ids(env=None)

Get resource ids for each of the resources defined in the stack in the provided environment env.

Source code in laktory/dispatcher/dispatcher.py
170
171
172
173
174
175
176
177
178
179
def get_resource_ids(self, env=None):
    """
    Get resource ids for each of the resources defined in the stack in the
    provided environment `env`.
    """
    if env is not None:
        self.env = env

    for r in self.resources.values():
        r.get_id()

run_job ¤

run_job(job_name, *args, **kwargs)

Run job with name job_name

PARAMETER DESCRIPTION
job_name

Name of the job

TYPE: str

*args

Arguments passed to JobRunner.run()

DEFAULT: ()

**kwargs

Keyword arguments passed to JobRunner.run()

DEFAULT: {}

Source code in laktory/dispatcher/dispatcher.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def run_job(self, job_name: str, *args, **kwargs):
    """
    Run job with name `job_name`

    Parameters
    ----------
    job_name:
        Name of the job
    *args:
        Arguments passed to `JobRunner.run()`
    **kwargs:
        Keyword arguments passed to `JobRunner.run()`
    """
    job = self.resources[job_name]
    job.run(*args, **kwargs)

run_dlt ¤

run_dlt(dlt_name, *args, **kwargs)

Run DLT pipeline with name dlt_name

PARAMETER DESCRIPTION
dlt_name

Name of the DLT pipeline

TYPE: str

*args

Arguments passed to JobRunner.run()

DEFAULT: ()

**kwargs

Keyword arguments passed to JobRunner.run()

DEFAULT: {}

Source code in laktory/dispatcher/dispatcher.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def run_dlt(self, dlt_name: str, *args, **kwargs):
    """
    Run DLT pipeline with name `dlt_name`

    Parameters
    ----------
    dlt_name:
        Name of the DLT pipeline
    *args:
        Arguments passed to `JobRunner.run()`
    **kwargs:
        Keyword arguments passed to `JobRunner.run()`
    """
    pl = self.resources[dlt_name]
    pl.run(*args, **kwargs)