Streaming
I/O
"Streaming" is the tech industry's favorite buzzword. Kafka. Flink. Event-driven architecture. Real-time pipelines. It sounds like a revolution — a fundamentally new way of thinking about data. But here's the thing: you already built it.
Remember Module 1? You wrote a FilterNode that used yield
to pull rows one at a time through the volcano model. That was streaming.
One row in memory, processed, discarded, next row. The entire architecture you've
spent five modules building — generators, plan nodes, lazy evaluation — is a
streaming architecture. You just didn't call it that.
This deep dive makes that connection explicit. We'll build a realistic
monitoring pipeline — a simulated web server emitting an infinite
stream of HTTP logs — and show how pyfloe reads it, enriches it, aggregates
it, and writes it to disk, all with constant memory. None of it requires
new concepts. It's all generators and factories and the same
execute() method you already know.
The point of this module is not to teach you something new. It's to show you that you already understand the thing — the core mechanism is simpler than the buzzwords suggest.
A runnable Python script that implements every example in this deep dive.
An infinite HTTP log generator, LazyFrame pipelines with joins and
aggregations, a flat-loop Stream pipeline, a live rolling
dashboard, and a timing comparison. Download it and run it alongside
this lesson.
Section D.1 The Iterator Factory Pattern
Let's start with a generator you can't load into Pandas.
An infinite stream
Our companion demo simulates a live web server. The function
http_log_stream() yields structured log records — one per HTTP
request — with timestamps, endpoints, status codes, and latencies.
With the default limit=None, it never stops.
def http_log_stream(*, limit=None, delay=0.0):
"""Simulate a live web server emitting log records.
With limit=None this generator never terminates."""
count = 0
while limit is None or count < limit:
status = random.choices(STATUS_CODES, weights=STATUS_WEIGHTS, k=1)[0]
latency = max(50, random.gauss(1500, 400)) if status >= 500 \
else max(5, random.gauss(120, 80))
yield {
"ts": ts.strftime("%Y-%m-%d %H:%M:%S.%f"),
"method": random.choice(METHODS),
"path": random.choice(ENDPOINTS),
"status": status,
"latency_ms": round(latency, 2),
"deploy_id": random.choice(DEPLOY_IDS),
}
count += 1
Try calling list(http_log_stream()) and your program hangs
forever — there's no end to iterate toward. Try loading it into Pandas and
you'll watch your RAM climb until the OS kills the process. An infinite
generator is the purest forcing function for streaming: you cannot
materialize it. You must process it lazily, one record at a time.
list(http_log_stream()) hangs. pd.DataFrame(http_log_stream())
hangs. Any tool that tries to consume the entire iterator before processing
it will never return. This is why streaming architecture exists — not because
it's trendy, but because some data sources never end.
Of course, in our demo we'll use limit=10_000 to keep things
finite. But the design must handle the infinite case — and once it does,
finite files are just a special case.
The one-shot problem
Even with a finite limit, generators have a subtle trap. They're single-pass
iterators — once exhausted, they yield nothing forever. Watch what happens
when you pass a generator to from_iter():
gen = http_log_stream(limit=100)
lf = from_iter(gen)
len(lf.to_pylist()) # → 100 ✓
len(lf.to_pylist()) # → 0 ✗ generator is spent!
The first .to_pylist() drains the generator. The second call
gets nothing — the generator is exhausted, and next() just raises
StopIteration forever. This kills repeatability. You can't
call .explain() and then .collect(). You can't
test a pipeline and then run it for real.
The fix is embarrassingly simple. Don't store the generator. Store a function that creates a fresh generator every time you call it.
The factory: a function that returns a generator
# Pass a lambda, not the generator itself
lf = from_iter(lambda: http_log_stream(limit=100))
len(lf.to_pylist()) # → 100 ✓
len(lf.to_pylist()) # → 100 ✓ fresh generator every time!
len(lf.to_pylist()) # → 100 ✓ unlimited replays
Each call to the lambda creates a brand-new generator. The lambda itself is
tiny — a stateless function object. The underlying IteratorSourceNode
calls this factory every time execute_batched() runs, getting
a fresh stream of data.
# The one-shot trap: generators exhaust after one pass
def counting_gen(n):
for i in range(n):
yield {"id": i, "value": i * 10}
gen = counting_gen(5)
print("First pass:", list(gen))
print("Second pass:", list(gen)) # empty!
print()
# The fix: store a FACTORY (a function that creates generators)
factory = lambda: counting_gen(5)
print("First pass (factory):", list(factory()))
print("Second pass (factory):", list(factory())) # works!
print("Third pass (factory):", list(factory())) # unlimited replays
This is not a new concept — it's a closure. The factory closes over whatever
state it needs (a file path, a URL, a limit parameter) and uses
it each time it's called. You've been writing closures since early Python.
But here it solves a very real engineering problem: making a streaming data
source repeatable.
pyfloe's IteratorSourceNode
In Module 3, you saw ScanNode — it holds data in a list and
iterates over it. For streaming sources, pyfloe uses a different leaf node:
IteratorSourceNode. Its execute_batched() method
calls the factory to get a fresh generator on every execution.
class IteratorSourceNode(PlanNode):
"""Leaf node that reads from a lazily-evaluated
iterator factory."""
__slots__ = ("_columns", "_schema", "_factory", "_source_label")
def __init__(self, columns: list[str], lazy_schema: LazySchema,
iterator_factory: Callable[[], Iterator], source_label: str = "Iterator"):
self._columns = columns
self._schema = lazy_schema
self._factory = iterator_factory # ← a callable, not a generator
self._source_label = source_label
def execute_batched(self):
return _batched(self._factory()) # ← calls the factory each time
Look at execute_batched. It calls self._factory() —
with parentheses — to get a new iterator, then wraps it in _batched()
to yield it in chunks of 1,024 rows (the same batching helper from Module 3).
Every call to execute_batched gets a fresh read.
Compare this to ScanNode, which holds self._data as
a plain list. ScanNode is for in-memory data that already exists.
IteratorSourceNode is for data that lives somewhere else — a file,
an API, a generator — and needs to be re-created on demand. Same interface,
same execute() / execute_batched() contract. The rest
of the plan tree can't tell the difference.
Section D.2 Constant-Memory File Reading
Our http_log_stream() yields Python dicts — pyfloe can infer
the types at runtime. But what about CSV files, where every value arrives
as a raw string? This is where the factory pattern meets real I/O, and where
we encounter the one genuinely tricky problem in the whole streaming story:
type inference.
Two-phase reading: sample, then stream
We want to sample a few rows to figure out types, then yield the rest one row at a time, never holding more than a single row in memory.
pyfloe's CSV reader does its work in two phases. Phase one happens at
read time (when you call read_csv()). Phase two happens
at execution time (when you call .collect() or
.to_csv()).
Phase 1 At read time
Open the file. Read the header. Sample 100 rows for type inference. Close the file. Build a schema. Build a factory function. No data is stored.
Phase 2 At execution time
The factory reopens the file. Yields rows one at a time as typed tuples. Each row is consumed by the plan tree and discarded. O(1) memory.
Phase 1 is the clever part. The file is opened, the header is read, and a small sample (100 rows by default) is consumed. From these 100 rows, pyfloe infers the type of every column — integers, floats, booleans, strings, and even datetimes. Then the file is closed. The only things kept in memory are the column names, the inferred types, and a factory function that knows how to reopen the file later.
Type inference: the type ladder
Each cell in the sample is tested against a hierarchy of types, from most specific to least:
If a cell parses as bool, that's its type. If not,
try int. Then float. If nothing works, it's a
str. Within a column, types are promoted using a lattice:
int + float → float, anything + str → str, and
T + None → T(nullable).
How datetime detection works
After the basic pass, there's a second pass specifically for datetimes.
Any column still typed as str gets checked: can the first
successfully-parsed value's format be used as a template? If 80% or more of the
sample values parse against that format, the column is retyped as
datetime and the detected format string is cached. This
avoids the cost of trying 14 different strptime patterns on every
cell during streaming — datetime detection runs once per column on the sample, not per value.
int, the cast will fail at
execution time. This is the fundamental tradeoff — you gain O(1) memory
but trade away guaranteed type safety across the whole file. Polars makes
the same tradeoff with scan_csv().
The factory function
After type inference, _read_delimited() constructs the factory.
Here's the core of it:
def _read_delimited(path, delimiter, has_header, ...):
# Phase 1: sample and infer types (at read time)
with open(path) as f:
reader = csv.reader(f, delimiter=delimiter)
header = next(reader)
col_names = [h.strip() for h in header]
sample = [next(reader) for _ in range(100)]
schema, col_types, dt_formats = _infer_schema(col_names, sample)
# Phase 2: the factory (called at execution time)
def make_rows():
with open(path) as f:
reader = csv.reader(f, delimiter=delimiter)
next(reader) # skip header
for row in reader:
yield tuple(
_cast_value(row[i], col_types[i], dt_formats[i])
for i in range(n_cols)
)
return _FileStreamNode(col_names, schema, make_rows)
Notice what the factory closes over: path, col_types,
and dt_formats. These are tiny — a string and two short lists.
The factory itself does zero I/O until it's called. When it is called,
it opens the file fresh, skips the header, and yields each row as a typed tuple.
One row in memory at a time.
The with statement inside the generator ensures the file handle
is closed when the generator finishes or is garbage-collected — which happens
as soon as the plan tree finishes consuming rows, or when .head(10)
stops pulling after 10 rows.
tuple, not a dict. You
saw this in Module 3 — tuples are ~40% smaller (no key storage per row),
faster to create, and hashable (needed for joins and group-by). The mapping
from column name to position lives in the schema, not in every row.
The _FileStreamNode wraps this factory in a plan node (inheriting
from PlanNode) with the same execute() and
execute_batched() interface that every other node uses. From the
perspective of the plan tree, it's just another leaf node. Whether the data
comes from a list in memory, an infinite generator, or a file on disk is
invisible to the filter, join, and project nodes above it.
scan_csv() works. It reads a sample, infers
types, and builds a lazy plan — all without loading the file. When you call
.collect(), that's when the file streams through the plan.
You now know exactly how that works, because the pattern is identical: factory
function → plan node → volcano pull.
Section D.3 Constant-Memory Writing
Reading was the hard part. Writing is almost trivially simple — once you have a streaming plan tree, writing to disk is just consuming the iterator into a file instead of a list. Don't collect. Don't materialize. Just iterate the plan directly and write each batch as it arrives:
def to_csv_streaming(plan, columns, path):
with open(path, "w") as f:
writer = csv.writer(f)
writer.writerow(columns) # header
for chunk in plan.execute_batched():
writer.writerows(chunk) # write batch, discard
That's it. Six lines. The plan tree yields batches of 1,024 rows (via the
execute_batched() method from Module 3). Each batch is written
to the file and then dropped. The next batch is pulled from the tree, which
pulls from its child, which pulls from the leaf node's factory.
The end-to-end memory profile: one batch of tuples (1,024 rows × however many columns). That's a constant. It doesn't matter if the source has 10 thousand records or 10 billion.
pyfloe's implementation
def _to_csv_impl(lf, path, delimiter=",",
header=True, encoding="utf-8"):
with open(path, "w", encoding=encoding, newline="") as f:
writer = csv.writer(f, delimiter=delimiter)
if header:
writer.writerow(lf.columns)
for chunk in lf._plan.execute_batched():
writer.writerows(chunk)
Nearly identical to our simplified version. The JSONL writer is equally simple — same structure, but each row is serialized as JSON and written as a line.
The demo: stream → enrich → write
Here's the demo's run_stream_to_csv() in action. It reads
10,000 records from the infinite log generator, filters to 5xx errors, adds
a computed column, selects the columns it needs, and writes to a CSV — all
with constant memory:
Stream.from_iter(
http_log_stream(limit=10_000),
columns=["ts", "method", "path", "status",
"latency_ms", "deploy_id"],
) \
.filter(col("status") >= 500) \
.with_column("latency_s", col("latency_ms") / 1000) \
.select("ts", "path", "status", "latency_s", "deploy_id") \
.to_csv("/tmp/errors.csv")
From generator to disk. The generator yields one record, the filter checks
the status code, the with_column computes a new value, the select trims the
columns, and to_csv writes the row. Then the next record.
At no point does the full dataset exist in memory.
🧮 Memory calculator: tank vs. tube
Pick a file size. See the memory difference between loading everything (eager) and streaming through the plan tree.
.sink_csv(). Spark calls it writing to a
"streaming sink." Different names, same mechanism:
pull from the plan tree, write to the output, discard each batch.
There's no secret sauce — it's a for loop over an iterator
connected to a file writer. A tube, not a tank.
Section D.4 The Stream Pipeline — What You Give Up When You Skip the Tree
Everything we've covered so far uses the same plan tree you built in
Modules 3–5. from_iter() returns a LazyFrame backed
by IteratorSourceNode → FilterNode →
ProjectNode → etc. The volcano model pulls rows through the tree.
It works, it's elegant, and it streams with constant memory.
pyfloe's Stream
class offers a faster alternative for simple pipelines — but it sacrifices
everything you built in this course: no joins, no aggregation, no schema
propagation, no optimizer. The things you built are exactly what Stream
trades away for speed.
But our demo has a natural place to see both models side by side. The error
summary needs joins and aggregation — it must use LazyFrame:
error_summary = (
from_iter(http_log_stream(limit=10_000))
.filter(col("status") >= 500)
.with_column("latency_s", col("latency_ms") / 1000)
.join(deploys, on="deploy_id") # ← needs hash join
.group_by("path", "deployer")
.agg(
col("latency_s").mean().alias("avg_latency"),
col("status").count().alias("error_count"),
)
)
But the simple filter-and-write-to-CSV part — no joins, no aggregation — doesn't need the tree at all. And the tree has a cost.
The overhead of trees
Every row that passes through a FilterNode triggers a method call:
execute_batched() on the child, then predicate.eval(row, col_map)
on the expression. For a ProjectNode, there's another method call
and another expression evaluation. Each plan node adds a layer of generator
orchestration per row.
For complex plans with joins and aggregations, this overhead is negligible compared to the actual work. But for simple, purely streaming pipelines — read, filter, add a column, write — the tree traversal overhead dominates.
filter → with_column → select → to_csv
passes each row through three plan nodes. Each node calls execute_batched()
on its child — that's three layers of Python generator frames per batch. When your
work per row is tiny, the overhead of the tree itself becomes the bottleneck.
The flat loop
pyfloe's Stream class offers an alternative. Instead of building
a tree of plan nodes, it records transforms as a flat list of steps
and executes them in a single tight loop. No tree traversal, no nested
generators.
# Each method appends to self._transforms — no nodes created
stream = Stream.from_iter(source, columns=[...])
stream = stream.filter(col("status") >= 500)
stream = stream.with_column("latency_s", col("latency_ms") / 1000)
stream = stream.select("ts", "path", "status", "latency_s")
# Internally, _transforms is:
# [
# ("filter", <Expr: col("status") >= 500>),
# ("with_column", "latency_s", <Expr: col("latency_ms") / 1000>),
# ("select", ["ts", "path", "status", "latency_s"]),
# ]
No nodes. No tree. Just a list of what to do. When it's time to execute,
_build_processor() compiles this into a sequence of steps — each
one a tuple with the operation kind, the pre-compiled expression or index list,
and a frozen column map. Then _execute() runs a single loop:
def _execute(self):
steps, _ = self._build_processor()
for row in self._source_factory(): # one loop over the source
skip = False
current = row
for step in steps: # flat step list
kind = step[0]
if kind == "filter":
if not step[1].eval(current, step[2]):
skip = True
break
elif kind == "with_column":
current = current + (step[1].eval(current, step[2]),)
elif kind == "select":
current = tuple(current[i] for i in step[1])
if not skip:
yield current
One outer loop over the source. One inner loop over the steps. No method dispatch, no generator chaining, no tree traversal.
Visualizing the difference
LazyFrame (volcano tree)
→ FilterNode.execute_batched()
→ IteratorSourceNode.execute_batched()
→ factory()
3 generator frames per batch
Stream (flat loop)
step 1: filter
step 2: with_column
step 3: select
yield row
1 generator frame total
Measuring the difference
The demo includes a timing comparison — same filter + transform pipeline,
50,000 records, measured with time.perf_counter():
# LazyFrame (volcano tree)
t0 = time.perf_counter()
result_lf = (
from_iter(lambda: http_log_stream(limit=50_000))
.filter(col("status") >= 500)
.with_column("latency_s", col("latency_ms") / 1000)
.select("ts", "path", "status", "latency_s")
.to_pylist()
)
t_lf = time.perf_counter() - t0
# Stream (flat loop)
t0 = time.perf_counter()
result_st = (
Stream.from_iter(
http_log_stream(limit=50_000),
columns=["ts", "method", "path",
"status", "latency_ms", "deploy_id"],
)
.filter(col("status") >= 500)
.with_column("latency_s", col("latency_ms") / 1000)
.select("ts", "path", "status", "latency_s")
.to_pylist()
)
t_st = time.perf_counter() - t0
Run it yourself — the Stream version is typically 20–40%
faster for this kind of simple pipeline. The gap widens with more
plan nodes (more generator frames to avoid) and narrows when per-row work
is heavier (the overhead becomes a smaller fraction of total time).
yield/next() pair across a node
boundary costs function-call overhead. The flat loop avoids all of that. The
tradeoff: Stream can't do joins, aggregations, sorts, or
optimization. It's only for filter → transform → write.
The foreach callback — a live dashboard
The demo's most Kafka-like feature is the rolling dashboard. It uses
Stream.foreach() to process each record with a callback,
maintaining manual accumulators just like the hash aggregation from Module 4:
stats = defaultdict(lambda: {"count": 0, "error_count": 0, ...})
def process(row):
stats[row["path"]]["count"] += 1
if row["status"] >= 500:
stats[row["path"]]["error_count"] += 1
# ... update accumulators ...
if total["n"] % 1000 == 0:
print_dashboard() # rolling summary every 1,000 records
Stream.from_iter(source, columns=[...]).foreach(process)
The defaultdict accumulator is exactly the hash aggregation
pattern from Module 4 — running sum, running count, running max — just done
manually because Stream doesn't have group_by. But
the student should recognize it: it's the same algorithm, different container.
When to use which
Use LazyFrame when…
You need joins, aggregations, sorts, window functions, or query optimization. This is the general-purpose engine — it handles anything.
Use Stream when…
You have a simple pipeline — read, filter, transform, write — and want maximum throughput with constant memory. No joins, no group-by, no optimizer. Just speed.
The demo uses both: LazyFrame for the enriched error summary
(it needs a hash join against the deploys table and a
group_by), and Stream for the CSV export and the
live dashboard (pure filter → transform → write). That split is natural — it
falls directly out of what each execution model supports.
Exercises Test Your Understanding
Question 1: Why a factory?
Why does IteratorSourceNode store a callable factory instead
of a generator?
Question 2: Memory profile
For from_iter(http_log_stream(limit=10_000_000)).filter(...).to_csv("out.csv"),
how much memory does the pipeline use at peak?
Question 3: Stream vs. LazyFrame
The demo uses LazyFrame for the error summary but Stream for the CSV export. Why can't the error summary use Stream?
Question 4: Type inference tradeoff
pyfloe infers CSV column types from a sample of 100 rows. What happens if
row 101 contains a string in a column that was inferred as int?
Question 5: The live dashboard
The demo's run_live_dashboard() uses Stream.foreach()
with a defaultdict accumulator. Which pattern from an earlier
module is this most similar to?
Recap What You Now Know
Let's be honest about what just happened. This deep dive introduced one new pattern — the iterator factory — and spent the rest of its time showing you that the streaming I/O system is just your existing knowledge applied to files and infinite sources. The generator-based volcano model you built in Module 1? That is streaming. The batched execution from Module 3? That drives the writers. The hash aggregation from Module 4? That powers the live dashboard's accumulators.
The factory pattern — store a callable that creates a fresh
iterator, not the iterator itself. This makes streaming sources repeatable.
from_iter(lambda: source) instead of from_iter(source).
Constant-memory reading — sample 100 rows for type inference, then yield the rest one at a time through a factory. The schema is known instantly; the data streams on demand.
Constant-memory writing — iterate the plan tree directly into a file writer. No materialization, no intermediate storage. A tube, not a tank.
The Stream class — when the plan tree's overhead
is the bottleneck, compile transforms into a flat loop. Same semantics,
fewer function calls, ~30% faster for simple pipelines. Use it for filter →
transform → write. Use LazyFrame for everything else.
from_chunks() for paginated API sources
(each page yields a batch of rows), read_jsonl() for JSON Lines,
read_parquet() via PyArrow, and Stream.from_csv()
for direct CSV-to-Stream. They all follow the same factory pattern. If you
understand read_csv and Stream.from_iter, you
understand them all — the format parsing differs, the architecture is identical.
The next time someone at a conference talks about "streaming architecture" like
it's astrophysics, you'll know the core: it's a generator, a factory, and a
for loop. Distributed streaming adds real complexity (fault
tolerance, backpressure, exactly-once delivery), but the fundamental mechanism
is what you've already built.
Source References
Browse the pyfloe source code for the classes and functions covered in this deep dive.