Skip to content

DataFrameExpr

laktory.models.dataframe.DataFrameExpr ¤

Bases: BaseModel, PipelineChild

A DataFrame expressed as a SQL statement.

Examples:

import polars as pl

import laktory as lk

df0 = pl.DataFrame(
    {
        "x": [1, 2, 3],
    }
)

expr = lk.models.DataFrameExpr(expr="SELECT x, 2*x AS y FROM {df}")
df = expr.to_df(dfs={"df": df0}).collect()

print(df)
'''
┌──────────────────┐
|Narwhals DataFrame|
|------------------|
|    | x | y |     |
|    |---|---|     |
|    | 1 | 2 |     |
|    | 2 | 4 |     |
|    | 3 | 6 |     |
└──────────────────┘
'''
PARAMETER DESCRIPTION
expr

SQL Expression

TYPE: str | VariableType

type

Expression type. Only SQL is currently supported, but DF could be added in the future.

TYPE: Literal['SQL'] | VariableType DEFAULT: 'SQL'

METHOD DESCRIPTION
to_df

Execute expression on provided DataFrame dfs.

ATTRIBUTE DESCRIPTION
data_sources

Get PipelineNodeDataSource objects for each {nodes.X} reference in the SQL expression.

upstream_node_names

Get all upstream nodes referenced in the SQL expression.

TYPE: list[str]

data_sources property ¤

Get PipelineNodeDataSource objects for each {nodes.X} reference in the SQL expression.

upstream_node_names property ¤

Get all upstream nodes referenced in the SQL expression.

to_df(dfs) ¤

Execute expression on provided DataFrame dfs.

PARAMETER DESCRIPTION
dfs

Input dataframes

TYPE: dict[str, AnyFrame]

RETURNS DESCRIPTION
Output dataframe
Source code in laktory/models/dataframe/dataframeexpr.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def to_df(self, dfs: dict[str, AnyFrame]) -> AnyFrame:
    """
    Execute expression on provided DataFrame `dfs`.

    Parameters
    ----------
    dfs:
        Input dataframes

    Returns
    -------
        Output dataframe
    """

    # From SQL expression
    logger.info(f"DataFrame as \n{self.expr.strip()}")

    # Convert to Native
    dfs = {k: nw.from_native(v).to_native() for k, v in dfs.items()}
    df0 = list(dfs.values())[0]

    # Get Backend
    backend = DataFrameBackends.from_df(df0)

    if backend == DataFrameBackends.POLARS:
        import polars as pl

        #
        # kwargs = {"df": df}
        # for source in self.data_sources:
        #     kwargs[f"nodes__{source.node.name}"] = source.read()
        # return pl.SQLContext(frames=dfs).execute(";".join(self.parsed_expr()))

        # Because Polars don't support {} in frame names, we use
        # double underscores (__) instead
        _dfs = {}
        for k, v in dfs.items():
            _k = "{" + k + "}"
            _dfs[to_safe_expr(_k, df_names=[k])] = v

        expr = to_safe_expr(self.expr, df_names=list(dfs.keys()))

        df = pl.SQLContext(frames=_dfs).execute(expr)
        return nw.from_native(df)

    elif backend == DataFrameBackends.PYSPARK:
        _spark = df0.sparkSession

        from laktory import is_sdp_execute

        _df = None
        if is_sdp_execute():
            # Spark Connect (SDP): createOrReplaceTempView is forbidden inside
            # @dp.* decorated functions. Use spark.sql(**kwargs) instead -
            # PySpark creates SubqueryAlias plans internally without registering
            # temp views.
            #
            # {nodes.X} contains a dot which is not a valid Python kwarg key, so
            # use to_safe_expr(): {df} → __df__, {nodes.X} → __nodes_X___.
            # Escape all braces first so SQL patterns like {8,8} in regex literals
            # are not treated as format placeholders, then restore only our known
            # DataFrame placeholders as {safe_k}.
            sql_kwargs = {}
            query = self.expr.replace("{", "{{").replace("}", "}}")
            for k, v in dfs.items():
                safe_k = to_safe_expr("{" + k + "}", df_names=[k])
                sql_kwargs[safe_k] = v
                query = query.replace("{{" + k + "}}", "{" + safe_k + "}")

            for stmt in query.split(";"):
                if stmt.replace("\n", " ").strip() == "":
                    continue
                _df = _spark.sql(stmt, **sql_kwargs)
        else:
            # Local / LDP: use createOrReplaceTempView.
            # LDP monkey-patches spark.sql() and does not support **kwargs.
            # createOrReplaceTempView is safe outside Spark Connect.
            query = self.expr
            for k, v in dfs.items():
                safe_k = to_safe_expr("{" + k + "}", df_names=[k])
                query = query.replace("{" + k + "}", safe_k)
                v.createOrReplaceTempView(safe_k)

            for stmt in query.split(";"):
                if stmt.replace("\n", " ").strip() == "":
                    continue
                _df = _spark.sql(stmt)

        if _df is None:
            raise ValueError(f"SQL Expression '{self.expr}' is invalid")
        return nw.from_native(_df)

    else:
        raise NotImplementedError(f"Backend '{backend}' is not supported.")