Skip to content

Spark Declarative Pipeline

laktory.models.pipeline.SparkDeclarativePipelineOrchestrator ¤

Bases: PipelineChild

Spark Declarative Pipeline used as an orchestrator to execute a Laktory pipeline locally (via spark-pipelines run) or as a plain Databricks Job task on DBR 16.x.

Generates three artifacts into build_root/pipelines/:

  • laktory_sdp.py - Python definition script with @dp.materialized_view / @dp.table decorators
  • {pipeline_name}.json - serialized pipeline configuration
  • {pipeline_name}-spec.yml - SDP YAML spec pointing to the script and config
References
PARAMETER DESCRIPTION
catalog

The default target catalog for pipeline outputs.

TYPE: str | None | VariableType DEFAULT: None

configuration

Map of configuration properties

TYPE: dict[str, str] | None | VariableType DEFAULT: None

schema_

The default target schema for pipeline outputs. database can alternatively be used as an alias.

TYPE: str | None | VariableType DEFAULT: None

storage_

A directory to store checkpoints and configuration files. Must include a scheme (file://, s3a://, hdfs://, etc.). Defaults to file:///{self.parent_pipeline.root_path}.

TYPE: str | None | VariableType DEFAULT: None

type

Type of orchestrator

TYPE: Literal['SPARK_DECLARATIVE_PIPELINE'] | VariableType DEFAULT: 'SPARK_DECLARATIVE_PIPELINE'

METHOD DESCRIPTION
build

Generate SDP artifacts into the pipeline root directory.

execute

Build SDP artifacts then run the pipeline via spark-pipelines run.

build() ¤

Generate SDP artifacts into the pipeline root directory.

Source code in laktory/models/pipeline/orchestrators/sparkdeclarativepipelineorchestrator.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def build(self) -> None:
    """
    Generate SDP artifacts into the pipeline root directory.
    """
    pl = self.parent_pipeline

    root_dir = pl.root_path
    root_dir.mkdir(parents=True, exist_ok=True)

    logger.info(f"Writing SDP artifacts to {root_dir}")

    # Write laktory pipeline config JSON
    with self.config_filepath_abs.open("w") as fp:
        json.dump(self.config_dict, fp, indent=4)

    # Write Spark pipeline spec file
    with self.spec_filepath_abs.open("w") as fp:
        yaml.dump(self.spec_dict, fp, default_flow_style=False, sort_keys=False)

    # Copy python code
    source_script = (
        Path(__file__).parent.parent.parent.parent
        / "resources"
        / "scripts"
        / "laktory_sdp.py"
    )
    target_script = root_dir / "laktory_sdp.py"
    shutil.copy(source_script, target_script)

execute(full_refresh=False, selects=None, cwd=None, read_output=False) ¤

Build SDP artifacts then run the pipeline via spark-pipelines run.

PARAMETER DESCRIPTION
full_refresh

If True without selects, passes --full-refresh-all to the CLI to reset and recompute all datasets. If combined with selects, passes --full-refresh DATASETS for the specified datasets only.

TYPE: bool DEFAULT: False

selects

Comma-joined and passed as --refresh DATASETS (or --full-refresh DATASETS when full_refresh=True). Values must be dataset names as they appear in the SDP pipeline (i.e. table/view names), not Laktory node names.

TYPE: list[str] | None DEFAULT: None

cwd

Working directory for the subprocess. Controls where spark-warehouse/ is created. Defaults to the pipeline root path.

TYPE: str | None DEFAULT: None

read_output

If True pipeline outputs are read and assigned to respective nodes (pipeline.nodes[i].output_df)

TYPE: bool DEFAULT: False

Source code in laktory/models/pipeline/orchestrators/sparkdeclarativepipelineorchestrator.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def execute(
    self,
    full_refresh: bool = False,
    selects: list[str] | None = None,
    cwd: str | None = None,
    read_output: bool = False,
):
    """
    Build SDP artifacts then run the pipeline via `spark-pipelines run`.

    Parameters
    ----------
    full_refresh:
        If `True` without `selects`, passes `--full-refresh-all` to the CLI
        to reset and recompute all datasets. If combined with `selects`,
        passes `--full-refresh DATASETS` for the specified datasets only.
    selects:
        Comma-joined and passed as `--refresh DATASETS` (or
        `--full-refresh DATASETS` when `full_refresh=True`). Values must be
        dataset names as they appear in the SDP pipeline (i.e. table/view
        names), not Laktory node names.
    cwd:
        Working directory for the subprocess. Controls where `spark-warehouse/`
        is created. Defaults to the pipeline root path.
    read_output:
        If `True` pipeline outputs are read and assigned to respective nodes (`pipeline.nodes[i].output_df`)
    """
    from laktory import get_spark_session

    self.build()
    cmd = ["spark-pipelines", "run", "--spec", str(self.spec_filepath_rel)]
    if cwd is None:
        cwd = self.parent_pipeline.root_path.absolute()

    if full_refresh and selects:
        cmd += ["--full-refresh", ",".join(selects)]
    elif full_refresh:
        cmd += ["--full-refresh-all"]
    elif selects:
        cmd += ["--refresh", ",".join(selects)]

    logger.info(f"Running SDP pipeline: {' '.join(cmd)}")
    subprocess.run(cmd, check=True, cwd=cwd)

    # Read back node outputs from the spark-warehouse written by SDP
    if read_output:
        from laktory.models.datasinks.pipelineviewdatasink import (
            PipelineViewDataSink,
        )

        spark = get_spark_session()
        for node in self.parent_pipeline.nodes:
            for sink in node.sinks:
                if isinstance(sink, PipelineViewDataSink):
                    continue  # views are not persisted to disk
                warehouse_root = Path(cwd) / "spark-warehouse"
                schema_name = sink.schema_name or "default"
                if schema_name != "default":
                    warehouse_root = warehouse_root / f"{schema_name}.db"

                if sink.catalog_name:
                    # Unity Catalog - table accessible via catalog directly
                    node._output_df = sink.as_source().read()
                else:
                    dataframe_path = warehouse_root / sink.table_name
                    if dataframe_path.exists():
                        # Hive tables can be saved as parquet or delta
                        # We use parquet for simplicity because it will work in
                        # both cases
                        node._output_df = spark.read.parquet(str(dataframe_path))