Databricks Declarative Automation Bundles Python entry point for building and
loading Laktory pipeline resources.
This function is called by the Databricks CLI during bundle resolution.
It discovers Laktory pipeline YAML files, writes their JSON config files to
disk (for DABs to sync to the workspace), and returns Job and DLT Pipeline
resources as a DABs Resources object.
Two global settings are configured automatically when not already set:
- build_root defaults to ./laktory/.build/ relative
to the bundle root (the directory containing databricks.yml).
- laktory workspace root is derived from the dab_workspace_root bundle variable
as {dab_workspace_root}/files/{build_root}/.
Examples:
To use, declare in databricks.yml:
variables:
laktory_pipelines_dir:
default: ./laktory/pipelines/ # comma-separated for multiple dirs
dab_workspace_root:
default: ${workspace.root_path}
sync:
paths:
- ./laktory/
include:
- ./laktory/.build/** # needed if laktory/.build/ is in .gitignore
python:
venv_path: .venv
resources:
- 'laktory.dab:build_resources'
| PARAMETER |
DESCRIPTION |
bundle
|
DABs Bundle object provided by the Databricks CLI.
|
| RETURNS |
DESCRIPTION |
|
|
DABs Resources object containing all pipeline/job definitions.
|
Source code in laktory/dab.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155 | def build_resources(bundle):
"""
Databricks Declarative Automation Bundles Python entry point for building and
loading Laktory pipeline resources.
This function is called by the Databricks CLI during bundle resolution.
It discovers Laktory pipeline YAML files, writes their JSON config files to
disk (for DABs to sync to the workspace), and returns Job and DLT Pipeline
resources as a DABs ``Resources`` object.
Two global settings are configured automatically when not already set:
- `build_root` defaults to `./laktory/.build/` relative
to the bundle root (the directory containing `databricks.yml`).
- laktory workspace root is derived from the `dab_workspace_root` bundle variable
as `{dab_workspace_root}/files/{build_root}/`.
Examples
--------
To use, declare in `databricks.yml`:
```yaml
variables:
laktory_pipelines_dir:
default: ./laktory/pipelines/ # comma-separated for multiple dirs
dab_workspace_root:
default: ${workspace.root_path}
sync:
paths:
- ./laktory/
include:
- ./laktory/.build/** # needed if laktory/.build/ is in .gitignore
python:
venv_path: .venv
resources:
- 'laktory.dab:build_resources'
```
Parameters
----------
bundle:
DABs Bundle object provided by the Databricks CLI.
Returns
-------
:
DABs Resources object containing all pipeline/job definitions.
"""
from databricks.bundles.core import Resources
from laktory._settings import settings
from laktory.models.pipeline.pipeline import Pipeline
# Get Bundle (databricks.yml) directory. This only works if CLI is called from the
# same directory (i.e. --bundle-dir is not used)
# TODO: build a more reliable approach.
bundle_dirpath = Path(os.getcwd())
# Build Root
if settings.build_root == DEFAULT_BUILD_ROOT:
settings.build_root = str(bundle_dirpath / "laktory" / ".build")
logger.info(
f"Setting `build_root` to default '{settings.build_root}'. Make sure this path is added to Bundle sync paths."
)
# Workspace root
# This is where Laktory files (pipeline config, queries, dashboards, etc.) are
# deployed. When using Laktory only, default is /Workspace/.laktory/. In
# the context of DAB, we set it to {dab_workspace_root}/laktory/.build/
# Unfortunately, {dab_workspace_root} is not available unless the user
# adds it to the variables.
dab_workspace_root = bundle.variables.get("dab_workspace_root")
if settings.workspace_root == DEFAULT_WORKSPACE_ROOT:
if dab_workspace_root is None:
raise ValueError(
"Variable `dab_workspace_root` must be set to '${workspace.root_path}' in databricks.yml to use Laktory."
)
# Build Path relative to Bundle root
build_root_abs = settings.build_root
build_root_rel = os.path.relpath(build_root_abs, bundle_dirpath)
settings.workspace_root = f"{dab_workspace_root}/files/{build_root_rel}/"
# Laktory expect the workspace root to exclude "/Workspace/"
settings.workspace_root = settings.workspace_root.replace("/Workspace/", "/")
# Clean the build directory to remove stale files from deleted pipelines
build_dir = Path(settings.build_root)
if build_dir.exists():
shutil.rmtree(build_dir)
logger.info(f"Cleaned stale build directory '{build_dir}'")
build_dir.mkdir(parents=True, exist_ok=True)
# --- Bundle variables ---
# Expose all bundle variables for injection into pipeline models.
bundle_vars = {k: v for k, v in bundle.variables.items() if v is not None}
# --- Discover pipeline YAML files ---
dirs_raw = bundle_vars.get("laktory_pipelines_dir", "laktory/pipelines")
pipelines_dirs = [d.strip() for d in dirs_raw.split(",")]
resources = Resources()
for laktory_pipelines_dir in pipelines_dirs:
dirpath = Path(laktory_pipelines_dir)
if not dirpath.is_absolute():
dirpath = bundle_dirpath / dirpath
if not dirpath.exists():
logger.warning(f"Pipelines directory '{dirpath}' does not exist. Skipping.")
continue
yaml_files = sorted(dirpath.glob("*.yaml")) + sorted(dirpath.glob("*.yml"))
if not yaml_files:
logger.warning(f"No pipeline YAML files found in '{dirpath}'.")
continue
for yaml_file in yaml_files:
logger.info(f"Loading pipeline from '{yaml_file}'")
with open(yaml_file, "r", encoding="utf-8") as fp:
pl = Pipeline.model_validate_yaml(fp)
# Inject bundle variables. Pipeline-level variables take priority
# because inject_vars() applies them on top of the provided vars dict.
pl = pl.inject_vars(vars=bundle_vars)
orchestrator = pl.orchestrator
if not orchestrator:
logger.info(f"Pipeline '{pl.name}' has no orchestrator. Skipping.")
continue
# Write pipeline config JSON for DABs to sync to the workspace
config_file = getattr(orchestrator, "config_file", None)
if config_file:
config_file.build()
# to_dab_resource() returns the dab resource, and also copies supporting
# files (e.g. DLT notebook) to build_root and sets notebook paths.
dab_resource = orchestrator.to_dab_resource()
resources.add_resource(orchestrator.resource_name, dab_resource)
logger.info(f"Added DABs resource '{orchestrator.resource_name}'")
return resources
|