Skip to content

DataFrameExpr

laktory.models.dataframe.DataFrameExpr ¤

Bases: BaseModel, PipelineChild

A DataFrame expressed as a SQL statement.

Examples:

import polars as pl

import laktory as lk

df0 = pl.DataFrame(
    {
        "x": [1, 2, 3],
    }
)

expr = lk.models.DataFrameExpr(expr="SELECT x, 2*x AS y FROM {df}")
df = expr.to_df(dfs={"df": df0}).collect()

print(df)
'''
┌──────────────────┐
|Narwhals DataFrame|
|------------------|
|    | x | y |     |
|    |---|---|     |
|    | 1 | 2 |     |
|    | 2 | 4 |     |
|    | 3 | 6 |     |
└──────────────────┘
'''
PARAMETER DESCRIPTION
expr

SQL Expression

TYPE: str | VariableType

type

Expression type. Only SQL is currently supported, but DF could be added in the future.

TYPE: Literal['SQL'] | VariableType DEFAULT: 'SQL'

METHOD DESCRIPTION
to_df

Execute expression on provided DataFrame dfs.

ATTRIBUTE DESCRIPTION
data_sources

Get all sources required by SQL Expression

upstream_node_names

Get all upstream nodes

TYPE: list[str]

data_sources property ¤

Get all sources required by SQL Expression

upstream_node_names property ¤

Get all upstream nodes

to_df(dfs) ¤

Execute expression on provided DataFrame dfs.

PARAMETER DESCRIPTION
dfs

Input dataframes

TYPE: dict[str, AnyFrame]

RETURNS DESCRIPTION
Output dataframe
Source code in laktory/models/dataframe/dataframeexpr.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
def to_df(self, dfs: dict[str, AnyFrame]) -> AnyFrame:
    """
    Execute expression on provided DataFrame `dfs`.

    Parameters
    ----------
    dfs:
        Input dataframes

    Returns
    -------
        Output dataframe
    """

    # From SQL expression
    logger.info(f"DataFrame as \n{self.expr.strip()}")

    # Read Data Sources
    for s in self.data_sources:
        dfs[f"nodes.{s.node.name}"] = s.read()

    # Convert to Native
    dfs = {k: nw.from_native(v).to_native() for k, v in dfs.items()}
    df0 = list(dfs.values())[0]

    # Get Backend
    backend = DataFrameBackends.from_df(df0)

    if backend == DataFrameBackends.POLARS:
        import polars as pl

        #
        # kwargs = {"df": df}
        # for source in self.data_sources:
        #     kwargs[f"nodes__{source.node.name}"] = source.read()
        # return pl.SQLContext(frames=dfs).execute(";".join(self.parsed_expr()))

        # Because Polars don't support {} in frame names, we use
        # double underscores (__) instead
        _dfs = {}
        for k, v in dfs.items():
            _k = "{" + k + "}"
            _dfs[to_safe_expr(_k, df_names=[k])] = v

        expr = to_safe_expr(self.expr, df_names=list(dfs.keys()))

        df = pl.SQLContext(frames=_dfs).execute(expr)
        return nw.from_native(df)

    elif backend == DataFrameBackends.PYSPARK:
        _spark = df0.sparkSession

        # Create views
        # TODO: Using parametrized queries would be ideal, but it is not compatible
        #       with older versions of spark or Delta Live Tables.
        # Because PySpark does not support view names with {}, we replaced them
        # with double underscores (__)
        for k, _df in dfs.items():
            _k = "{" + k + "}"
            _df.createOrReplaceTempView(to_safe_expr(_k, df_names=[k]))

        # Run query
        _df = None
        for expr in self.expr.split(";"):
            if expr.replace("\n", " ").strip() == "":
                continue
            _df = _spark.sql(to_safe_expr(expr, df_names=list(dfs.keys())))
        if _df is None:
            raise ValueError(f"SQL Expression '{self.expr}' is invalid")
        return nw.from_native(_df)

    else:
        raise NotImplementedError(f"Backend '{backend}' is not supported.")