Skip to content

Streaming

Functions and classes for streaming data from iterators, generators, and paginated sources.


Functions

from_iter

from_iter

from_iter(source: Iterable | Callable[[], Iterable], *, columns: list[str] | None = None, dtypes: dict[str, type] | None = None, schema: LazySchema | None = None, source_label: str = 'Iterator') -> LazyFrame

Create a LazyFrame from any iterator or generator.

If given a generator (single-pass), the data can only be evaluated once. If given a callable factory or an iterable (list, etc.), data can be replayed on each evaluation.

Parameters:

  • source (Iterable | Callable[[], Iterable]) –

    An iterable, generator, or zero-argument callable that returns an iterable. Items can be dicts, tuples, scalars, or objects with __dict__.

  • columns (list[str] | None, default: None ) –

    Override column names.

  • dtypes (dict[str, type] | None, default: None ) –

    Override column types as {name: type} mapping.

  • schema (LazySchema | None, default: None ) –

    Explicit LazySchema to use instead of inferring.

  • source_label (str, default: 'Iterator' ) –

    Label shown in explain() output.

Returns:

Examples:

From a generator:

>>> import pyfloe as pf
>>> def gen():
...     for i in range(5):
...         yield {"id": i, "value": i * 1.5}
>>> pf.from_iter(gen()).to_pylist()
[{'id': 0, 'value': 0.0}, {'id': 1, 'value': 1.5}, {'id': 2, 'value': 3.0}, {'id': 3, 'value': 4.5}, {'id': 4, 'value': 6.0}]

From a replayable callable factory:

>>> def make_data():
...     for i in range(3):
...         yield {"x": i}
>>> lf = from_iter(make_data)  # pass the function, not the generator
>>> lf.to_pylist() == lf.to_pylist()  # can be evaluated multiple times
True

from_chunks

from_chunks

from_chunks(chunks: Iterable[list[dict]] | Callable[[], Iterable[list[dict]]], *, columns: list[str] | None = None, dtypes: dict[str, type] | None = None, schema: LazySchema | None = None, source_label: str = 'Chunked') -> LazyFrame

Create a LazyFrame from batched/paginated data.

Each chunk is a list of dicts, list of tuples, or a LazyFrame. Useful for paginated APIs or batch-producing sources.

Parameters:

  • chunks (Iterable[list[dict]] | Callable[[], Iterable[list[dict]]]) –

    An iterable of chunks, or a callable that returns one.

  • columns (list[str] | None, default: None ) –

    Override column names.

  • dtypes (dict[str, type] | None, default: None ) –

    Override column types as {name: type} mapping.

  • schema (LazySchema | None, default: None ) –

    Explicit LazySchema to use instead of inferring.

  • source_label (str, default: 'Chunked' ) –

    Label shown in explain() output.

Returns:

Examples:

From a replayable chunk factory:

>>> import pyfloe as pf
>>> def make_chunks():
...     yield [{"n": 1}, {"n": 2}]
...     yield [{"n": 3}]
>>> lf = pf.from_chunks(make_chunks)
>>> lf.to_pylist()
[{'n': 1}, {'n': 2}, {'n': 3}]

From an iterator of chunks:

>>> chunks = [[{"id": 1, "v": "a"}], [{"id": 2, "v": "b"}]]
>>> from_chunks(iter(chunks)).to_pylist()
[{'id': 1, 'v': 'a'}, {'id': 2, 'v': 'b'}]

Stream

The Stream class provides a true single-pass streaming pipeline. Unlike LazyFrame, it compiles transforms into a flat loop for maximum throughput with no plan-tree overhead.

Stream

A true single-pass streaming pipeline.

Unlike LazyFrame, Stream compiles transforms into a flat loop for maximum throughput. Supports filter, with_column, select, and apply. Results are consumed via .to_csv(), .to_jsonl(), .to_pylist(), or .collect().

Examples:

>>> import pyfloe as pf
>>> def gen():
...     for i in range(100):
...         yield {"id": i, "value": i * 2.0}
>>> result = (
...     pf.Stream.from_iter(gen())
...     .filter(pf.col("value") > 190)
...     .with_column("label", pf.col("id").cast(str))
...     .select("id", "value", "label")
...     .to_pylist()
... )
>>> len(result)
4

Methods:

  • from_iter

    Create a Stream from an iterator, iterable, or factory callable.

  • from_csv

    Create a Stream from a CSV file.

  • filter

    Add a filter step to the pipeline.

  • with_column

    Add a computed column step to the pipeline.

  • select

    Add a column selection step to the pipeline.

  • apply

    Apply a function to column values in the stream.

  • collect

    Execute the pipeline and return a materialized LazyFrame.

  • to_pylist

    Execute the pipeline and return results as a list of dicts.

  • to_csv

    Execute the pipeline and stream results to a CSV file.

  • to_jsonl

    Execute the pipeline and stream results to a JSON Lines file.

  • foreach

    Execute the pipeline and call a function for each row.

  • count

    Execute the pipeline and return the total row count.

  • take

    Execute the pipeline and return the first n rows as dicts.

Attributes:

  • columns (list[str]) –

    List of output column names after all transforms.

  • schema (LazySchema) –

    Output schema of this Stream.

columns property

columns: list[str]

List of output column names after all transforms.

schema property

schema: LazySchema

Output schema of this Stream.

from_iter classmethod

from_iter(source: Any, *, columns: list[str] | None = None, dtypes: dict[str, type] | None = None, schema: LazySchema | None = None) -> Stream

Create a Stream from an iterator, iterable, or factory callable.

Parameters:

  • source (Any) –

    Data source — iterable, generator, or callable factory.

  • columns (list[str] | None, default: None ) –

    Override column names.

  • dtypes (dict[str, type] | None, default: None ) –

    Override column types.

  • schema (LazySchema | None, default: None ) –

    Explicit LazySchema.

Returns:

Examples:

>>> def gen():
...     for i in range(10):
...         yield {"x": i, "y": i * 10}
>>> Stream.from_iter(gen()).filter(col("y") > 50).to_pylist()
[{'x': 6, 'y': 60}, {'x': 7, 'y': 70}, {'x': 8, 'y': 80}, {'x': 9, 'y': 90}]

from_csv classmethod

from_csv(path: str, **kwargs: Any) -> Stream

Create a Stream from a CSV file.

Parameters:

  • path (str) –

    Path to the CSV file.

  • **kwargs (Any, default: {} ) –

    Arguments passed to the CSV reader.

Returns:

Examples:

>>> Stream.from_csv("orders.csv").filter(col("amount") > 200).select("order_id", "amount").to_pylist()
[{'order_id': 1, 'amount': 250.0}, {'order_id': 6, 'amount': 310.0}]

filter

filter(predicate: Expr) -> Stream

Add a filter step to the pipeline.

Parameters:

  • predicate (Expr) –

    Boolean expression to filter rows.

Returns:

  • Stream

    A new Stream with the filter applied.

Examples:

>>> stream = Stream.from_iter([{"amount": 50}, {"amount": 200}])
>>> stream.filter(col("amount") > 100).to_pylist()
[{'amount': 200}]

with_column

with_column(name_or_expr: str | Expr, expr: Expr | None = None) -> Stream

Add a computed column step to the pipeline.

Can be called with a name and expression, or with a single expression whose output name is derived via .alias() or from the underlying column reference.

Parameters:

  • name_or_expr (str | Expr) –

    Column name (str) or an expression with an inferrable output name.

  • expr (Expr | None, default: None ) –

    Expression to compute column values (required when name_or_expr is a string).

Returns:

  • Stream

    A new Stream with the additional column.

Examples:

>>> stream = Stream.from_iter([{"x": 1, "y": 2}, {"x": 3, "y": 4}])
>>> stream.with_column("total", col("x") + col("y")).to_pylist()
[{'x': 1, 'y': 2, 'total': 3}, {'x': 3, 'y': 4, 'total': 7}]
>>> stream = Stream.from_iter([{"x": 1, "y": 2}])
>>> stream.with_column((col("x") + col("y")).alias("total")).to_pylist()
[{'x': 1, 'y': 2, 'total': 3}]

select

select(*columns: str) -> Stream

Add a column selection step to the pipeline.

Parameters:

  • *columns (str, default: () ) –

    Column names to keep.

Returns:

  • Stream

    A new Stream with only the selected columns.

Examples:

>>> stream = Stream.from_iter([{"id": 1, "value": 10, "extra": "x"}])
>>> stream.select("id", "value").to_pylist()
[{'id': 1, 'value': 10}]

apply

apply(func: Callable, columns: list[str] | None = None) -> Stream

Apply a function to column values in the stream.

Parameters:

  • func (Callable) –

    Function to apply to each cell value.

  • columns (list[str] | None, default: None ) –

    Columns to apply to. If None, applies to all columns.

Returns:

  • Stream

    A new Stream with the function applied.

collect

collect() -> LazyFrame

Execute the pipeline and return a materialized LazyFrame.

Examples:

>>> def gen():
...     for i in range(10):
...         yield {"x": i}
>>> lf = Stream.from_iter(gen()).filter(col("x") > 5).collect()
>>> isinstance(lf, LazyFrame)
True

to_pylist

to_pylist() -> list[dict]

Execute the pipeline and return results as a list of dicts.

Examples:

>>> def gen():
...     for i in range(8):
...         yield {"x": i}
>>> Stream.from_iter(gen()).filter(col("x") > 5).to_pylist()
[{'x': 6}, {'x': 7}]

to_csv

to_csv(path: str, *, delimiter: str = ',', header: bool = True, encoding: str = 'utf-8') -> None

Execute the pipeline and stream results to a CSV file.

Rows are written one-at-a-time with constant memory.

Parameters:

  • path (str) –

    Output file path.

  • delimiter (str, default: ',' ) –

    Field delimiter.

  • header (bool, default: True ) –

    Whether to write a header row.

  • encoding (str, default: 'utf-8' ) –

    File encoding.

Examples:

>>> Stream.from_iter([{"score": 40}, {"score": 80}]).filter(col("score") > 50).to_csv("out.csv")

to_jsonl

to_jsonl(path: str, *, encoding: str = 'utf-8') -> None

Execute the pipeline and stream results to a JSON Lines file.

Parameters:

  • path (str) –

    Output file path.

  • encoding (str, default: 'utf-8' ) –

    File encoding.

Examples:

>>> Stream.from_iter([{"ts": 5}, {"ts": 20}]).filter(col("ts") > 10).to_jsonl("out.jsonl")

foreach

foreach(func: Callable[[dict], None]) -> None

Execute the pipeline and call a function for each row.

Parameters:

  • func (Callable[[dict], None]) –

    Function that receives each row as a dict.

Examples:

>>> collected = []
>>> Stream.from_iter([{"x": 1}, {"x": 2}]).foreach(lambda row: collected.append(row))
>>> collected
[{'x': 1}, {'x': 2}]

count

count() -> int

Execute the pipeline and return the total row count.

Examples:

>>> def gen():
...     for i in range(1000):
...         yield {"x": i}
>>> Stream.from_iter(gen()).filter(col("x") > 500).count()
499

take

take(n: int) -> list[dict]

Execute the pipeline and return the first n rows as dicts.

Parameters:

  • n (int) –

    Number of rows to return.

Examples:

>>> def gen():
...     for i in range(10):
...         yield {"x": i}
>>> Stream.from_iter(gen()).take(3)
[{'x': 0}, {'x': 1}, {'x': 2}]