Skip to content

FileDataSource

laktory.models.datasources.FileDataSource ¤

Bases: BaseDataSource

Data source using disk files, such data events (json/csv) or full dataframes.

Examples:

from laktory import models

source = models.FileDataSource(
    path="/Volumes/sources/landing/events/yahoo-finance/stock_price",
    format="JSON",
    dataframe_backend="POLARS",
)
# df = source.read()

# With Explicit Schema
source = models.FileDataSource(
    path="/Volumes/sources/landing/events/yahoo-finance/stock_price",
    format="JSON",
    dataframe_backend="PYSPARK",
    schema={
        "columns": {
            "symbol": "String",
            "open": "Float64",
            "close": "Float64",
        }
    },
)
# df = source.read()
References
PARAMETER DESCRIPTION
as_stream

If Truesource is read as a streaming DataFrame. Currently only supported by Spark DataFrame backend.

TYPE: bool | VariableType DEFAULT: False

drop_duplicates

Remove duplicated rows from source using all columns if True or only the provided column names.

TYPE: bool | list[str] | VariableType DEFAULT: None

drops

List of columns to drop

TYPE: list | VariableType DEFAULT: None

filter

SQL expression used to select specific rows from the source table

TYPE: str | VariableType DEFAULT: None

format

Format of the data files.

TYPE: Literal['AVRO', 'BINARYFILE', 'CSV', 'DELTA', 'EXCEL', 'IPC', 'JSON', 'JSONL', 'NDJSON', 'ORC', 'PARQUET', 'PYARROW', 'TEXT', 'XML'] | VariableType

has_header

Indicate if the first row of the dataset is a header or not. Only applicable to 'CSV' format.

TYPE: bool | VariableType DEFAULT: True

infer_schema

When True, the schema is inferred from the data. When False, the schema is not inferred and will be string if not specified in schema_definition. Only applicable to some format like CSV and JSON.

TYPE: bool | VariableType DEFAULT: False

path

File path on a local disk, remote storage or Databricks volume.

TYPE: str | VariableType

reader_kwargs

Keyword arguments passed directly to dataframe backend reader. Passed to .options() method when using PySpark.

TYPE: dict[str | VariableType, Any | VariableType] | VariableType DEFAULT: {}

reader_methods

DataFrame backend reader methods.

TYPE: list[ReaderWriterMethod | VariableType] | VariableType DEFAULT: []

renames

Mapping between the source column names and desired column names

TYPE: dict[str | VariableType, str | VariableType] | VariableType DEFAULT: None

schema_definition

Target schema specified as a list of columns, as a dict or a json serialization. Only used when reading data from non-strongly typed files such as JSON or csv files.

TYPE: DataFrameSchema | VariableType DEFAULT: None

schema_location_

Path for schema inference when reading data as a stream. If None, parent directory of path is used.

TYPE: str | Path | VariableType DEFAULT: None

selects

Columns to select from the source. Can be specified as a list or as a dictionary to rename the source columns

TYPE: list[str] | dict[str, str] | VariableType DEFAULT: None

type

Source Type

TYPE: Literal['FILE'] | VariableType DEFAULT: 'FILE'

METHOD DESCRIPTION
read

Read data with options specified in attributes.

read(**kwargs) ¤

Read data with options specified in attributes.

RETURNS DESCRIPTION
AnyFrame

Resulting dataframe

Source code in laktory/models/datasources/basedatasource.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def read(self, **kwargs) -> AnyFrame:
    """
    Read data with options specified in attributes.

    Returns
    -------
    :
        Resulting dataframe
    """
    logger.info(
        f"Reading `{self.__class__.__name__}` {self._id} with {self.dataframe_backend}"
    )
    df = self._read(**kwargs)

    # Convert to Narwhals
    if not isinstance(df, (nw.LazyFrame, nw.DataFrame)):
        df = nw.from_native(df)

    # Post read
    df = self._post_read(df)

    logger.info("Read completed.")

    return df