Streaming¶
Functions and classes for streaming data from iterators, generators, and paginated sources.
Functions¶
from_iter¶
from_iter
¶
from_iter(source: Iterable | Callable[[], Iterable], *, columns: list[str] | None = None, dtypes: dict[str, type] | None = None, schema: LazySchema | None = None, source_label: str = 'Iterator') -> LazyFrame
Create a LazyFrame from any iterator or generator.
If given a generator (single-pass), the data can only be evaluated once. If given a callable factory or an iterable (list, etc.), data can be replayed on each evaluation.
Parameters:
-
source(Iterable | Callable[[], Iterable]) –An iterable, generator, or zero-argument callable that returns an iterable. Items can be dicts, tuples, scalars, or objects with
__dict__. -
columns(list[str] | None, default:None) –Override column names.
-
dtypes(dict[str, type] | None, default:None) –Override column types as
{name: type}mapping. -
schema(LazySchema | None, default:None) –Explicit LazySchema to use instead of inferring.
-
source_label(str, default:'Iterator') –Label shown in
explain()output.
Returns:
-
LazyFrame–A LazyFrame.
Examples:
From a generator:
>>> import pyfloe as pf
>>> def gen():
... for i in range(5):
... yield {"id": i, "value": i * 1.5}
>>> pf.from_iter(gen()).to_pylist()
[{'id': 0, 'value': 0.0}, {'id': 1, 'value': 1.5}, {'id': 2, 'value': 3.0}, {'id': 3, 'value': 4.5}, {'id': 4, 'value': 6.0}]
From a replayable callable factory:
from_chunks¶
from_chunks
¶
from_chunks(chunks: Iterable[list[dict]] | Callable[[], Iterable[list[dict]]], *, columns: list[str] | None = None, dtypes: dict[str, type] | None = None, schema: LazySchema | None = None, source_label: str = 'Chunked') -> LazyFrame
Create a LazyFrame from batched/paginated data.
Each chunk is a list of dicts, list of tuples, or a LazyFrame. Useful for paginated APIs or batch-producing sources.
Parameters:
-
chunks(Iterable[list[dict]] | Callable[[], Iterable[list[dict]]]) –An iterable of chunks, or a callable that returns one.
-
columns(list[str] | None, default:None) –Override column names.
-
dtypes(dict[str, type] | None, default:None) –Override column types as
{name: type}mapping. -
schema(LazySchema | None, default:None) –Explicit LazySchema to use instead of inferring.
-
source_label(str, default:'Chunked') –Label shown in
explain()output.
Returns:
-
LazyFrame–A LazyFrame.
Examples:
From a replayable chunk factory:
>>> import pyfloe as pf
>>> def make_chunks():
... yield [{"n": 1}, {"n": 2}]
... yield [{"n": 3}]
>>> lf = pf.from_chunks(make_chunks)
>>> lf.to_pylist()
[{'n': 1}, {'n': 2}, {'n': 3}]
From an iterator of chunks:
Stream¶
The Stream class provides a true single-pass streaming pipeline. Unlike LazyFrame, it compiles transforms into a flat loop for maximum throughput with no plan-tree overhead.
Stream
¶
A true single-pass streaming pipeline.
Unlike LazyFrame, Stream compiles transforms into a flat loop for
maximum throughput. Supports filter, with_column, select, and apply.
Results are consumed via .to_csv(), .to_jsonl(),
.to_pylist(), or .collect().
Examples:
>>> import pyfloe as pf
>>> def gen():
... for i in range(100):
... yield {"id": i, "value": i * 2.0}
>>> result = (
... pf.Stream.from_iter(gen())
... .filter(pf.col("value") > 190)
... .with_column("label", pf.col("id").cast(str))
... .select("id", "value", "label")
... .to_pylist()
... )
>>> len(result)
4
Methods:
-
from_iter–Create a Stream from an iterator, iterable, or factory callable.
-
from_csv–Create a Stream from a CSV file.
-
filter–Add a filter step to the pipeline.
-
with_column–Add a computed column step to the pipeline.
-
select–Add a column selection step to the pipeline.
-
apply–Apply a function to column values in the stream.
-
collect–Execute the pipeline and return a materialized LazyFrame.
-
to_pylist–Execute the pipeline and return results as a list of dicts.
-
to_csv–Execute the pipeline and stream results to a CSV file.
-
to_jsonl–Execute the pipeline and stream results to a JSON Lines file.
-
foreach–Execute the pipeline and call a function for each row.
-
count–Execute the pipeline and return the total row count.
-
take–Execute the pipeline and return the first n rows as dicts.
Attributes:
-
columns(list[str]) –List of output column names after all transforms.
-
schema(LazySchema) –Output schema of this Stream.
from_iter
classmethod
¶
from_iter(source: Any, *, columns: list[str] | None = None, dtypes: dict[str, type] | None = None, schema: LazySchema | None = None) -> Stream
Create a Stream from an iterator, iterable, or factory callable.
Parameters:
-
source(Any) –Data source — iterable, generator, or callable factory.
-
columns(list[str] | None, default:None) –Override column names.
-
dtypes(dict[str, type] | None, default:None) –Override column types.
-
schema(LazySchema | None, default:None) –Explicit LazySchema.
Returns:
-
Stream–A new Stream.
Examples:
from_csv
classmethod
¶
Create a Stream from a CSV file.
Parameters:
-
path(str) –Path to the CSV file.
-
**kwargs(Any, default:{}) –Arguments passed to the CSV reader.
Returns:
-
Stream–A new Stream.
Examples:
filter
¶
with_column
¶
Add a computed column step to the pipeline.
Can be called with a name and expression, or with a single
expression whose output name is derived via .alias() or from
the underlying column reference.
Parameters:
-
name_or_expr(str | Expr) –Column name (str) or an expression with an inferrable output name.
-
expr(Expr | None, default:None) –Expression to compute column values (required when name_or_expr is a string).
Returns:
-
Stream–A new Stream with the additional column.
Examples:
>>> stream = Stream.from_iter([{"x": 1, "y": 2}, {"x": 3, "y": 4}])
>>> stream.with_column("total", col("x") + col("y")).to_pylist()
[{'x': 1, 'y': 2, 'total': 3}, {'x': 3, 'y': 4, 'total': 7}]
>>> stream = Stream.from_iter([{"x": 1, "y": 2}])
>>> stream.with_column((col("x") + col("y")).alias("total")).to_pylist()
[{'x': 1, 'y': 2, 'total': 3}]
select
¶
Add a column selection step to the pipeline.
Parameters:
-
*columns(str, default:()) –Column names to keep.
Returns:
-
Stream–A new Stream with only the selected columns.
Examples:
apply
¶
Apply a function to column values in the stream.
Parameters:
-
func(Callable) –Function to apply to each cell value.
-
columns(list[str] | None, default:None) –Columns to apply to. If None, applies to all columns.
Returns:
-
Stream–A new Stream with the function applied.
collect
¶
to_pylist
¶
to_csv
¶
Execute the pipeline and stream results to a CSV file.
Rows are written one-at-a-time with constant memory.
Parameters:
-
path(str) –Output file path.
-
delimiter(str, default:',') –Field delimiter.
-
header(bool, default:True) –Whether to write a header row.
-
encoding(str, default:'utf-8') –File encoding.
Examples: