Build Your Own DataFrame
Deep Dive

Streaming
I/O

Bonus material
This deep dive is optional — it doesn't feed into the remaining modules. If you've just finished Module 5, the main course arc is complete. Come back to this when you're curious about how pyfloe handles file I/O with constant memory, or when you want to see the generator patterns from Module 1 applied to a realistic streaming scenario.

"Streaming" is the tech industry's favorite buzzword. Kafka. Flink. Event-driven architecture. Real-time pipelines. It sounds like a revolution — a fundamentally new way of thinking about data. But here's the thing: you already built it.

Remember Module 1? You wrote a FilterNode that used yield to pull rows one at a time through the volcano model. That was streaming. One row in memory, processed, discarded, next row. The entire architecture you've spent five modules building — generators, plan nodes, lazy evaluation — is a streaming architecture. You just didn't call it that.

This deep dive makes that connection explicit. We'll build a realistic monitoring pipeline — a simulated web server emitting an infinite stream of HTTP logs — and show how pyfloe reads it, enriches it, aggregates it, and writes it to disk, all with constant memory. None of it requires new concepts. It's all generators and factories and the same execute() method you already know.

The point of this module is not to teach you something new. It's to show you that you already understand the thing — the core mechanism is simpler than the buzzwords suggest.

📦
streaming_demo.py — Companion Script

A runnable Python script that implements every example in this deep dive. An infinite HTTP log generator, LazyFrame pipelines with joins and aggregations, a flat-loop Stream pipeline, a live rolling dashboard, and a timing comparison. Download it and run it alongside this lesson.

Section D.1 The Iterator Factory Pattern

Let's start with a generator you can't load into Pandas.

An infinite stream

Our companion demo simulates a live web server. The function http_log_stream() yields structured log records — one per HTTP request — with timestamps, endpoints, status codes, and latencies. With the default limit=None, it never stops.

The infinite log generator (streaming_demo.py)
def http_log_stream(*, limit=None, delay=0.0):
    """Simulate a live web server emitting log records.
    With limit=None this generator never terminates."""
    count = 0
    while limit is None or count < limit:
        status = random.choices(STATUS_CODES, weights=STATUS_WEIGHTS, k=1)[0]
        latency = max(50, random.gauss(1500, 400)) if status >= 500 \
            else max(5, random.gauss(120, 80))

        yield {
            "ts":         ts.strftime("%Y-%m-%d %H:%M:%S.%f"),
            "method":     random.choice(METHODS),
            "path":       random.choice(ENDPOINTS),
            "status":     status,
            "latency_ms": round(latency, 2),
            "deploy_id":  random.choice(DEPLOY_IDS),
        }
        count += 1

Try calling list(http_log_stream()) and your program hangs forever — there's no end to iterate toward. Try loading it into Pandas and you'll watch your RAM climb until the OS kills the process. An infinite generator is the purest forcing function for streaming: you cannot materialize it. You must process it lazily, one record at a time.

You cannot load this
list(http_log_stream()) hangs. pd.DataFrame(http_log_stream()) hangs. Any tool that tries to consume the entire iterator before processing it will never return. This is why streaming architecture exists — not because it's trendy, but because some data sources never end.

Of course, in our demo we'll use limit=10_000 to keep things finite. But the design must handle the infinite case — and once it does, finite files are just a special case.

The one-shot problem

Even with a finite limit, generators have a subtle trap. They're single-pass iterators — once exhausted, they yield nothing forever. Watch what happens when you pass a generator to from_iter():

Passing a generator directly — the one-shot trap
gen = http_log_stream(limit=100)
lf = from_iter(gen)

len(lf.to_pylist())   # → 100  ✓
len(lf.to_pylist())   # → 0    ✗  generator is spent!

The first .to_pylist() drains the generator. The second call gets nothing — the generator is exhausted, and next() just raises StopIteration forever. This kills repeatability. You can't call .explain() and then .collect(). You can't test a pipeline and then run it for real.

The fix is embarrassingly simple. Don't store the generator. Store a function that creates a fresh generator every time you call it.

The factory: a function that returns a generator

Passing a factory — replayable forever
# Pass a lambda, not the generator itself
lf = from_iter(lambda: http_log_stream(limit=100))

len(lf.to_pylist())   # → 100  ✓
len(lf.to_pylist())   # → 100  ✓  fresh generator every time!
len(lf.to_pylist())   # → 100  ✓  unlimited replays

Each call to the lambda creates a brand-new generator. The lambda itself is tiny — a stateless function object. The underlying IteratorSourceNode calls this factory every time execute_batched() runs, getting a fresh stream of data.

The pattern in one sentence
A factory function is a callable that returns a fresh iterator. By storing the factory instead of the iterator, any plan node can re-read its source data as many times as needed — each call creates a new stream.
# The one-shot trap: generators exhaust after one pass
def counting_gen(n):
    for i in range(n):
        yield {"id": i, "value": i * 10}

gen = counting_gen(5)
print("First pass:", list(gen))
print("Second pass:", list(gen))  # empty!

print()

# The fix: store a FACTORY (a function that creates generators)
factory = lambda: counting_gen(5)
print("First pass (factory):", list(factory()))
print("Second pass (factory):", list(factory()))  # works!
print("Third pass (factory):", list(factory()))   # unlimited replays

This is not a new concept — it's a closure. The factory closes over whatever state it needs (a file path, a URL, a limit parameter) and uses it each time it's called. You've been writing closures since early Python. But here it solves a very real engineering problem: making a streaming data source repeatable.

pyfloe's IteratorSourceNode

In Module 3, you saw ScanNode — it holds data in a list and iterates over it. For streaming sources, pyfloe uses a different leaf node: IteratorSourceNode. Its execute_batched() method calls the factory to get a fresh generator on every execution.

pyfloe's IteratorSourceNode (plan.py)
class IteratorSourceNode(PlanNode):
    """Leaf node that reads from a lazily-evaluated
    iterator factory."""

    __slots__ = ("_columns", "_schema", "_factory", "_source_label")

    def __init__(self, columns: list[str], lazy_schema: LazySchema,
                 iterator_factory: Callable[[], Iterator], source_label: str = "Iterator"):
        self._columns = columns
        self._schema = lazy_schema
        self._factory = iterator_factory   # ← a callable, not a generator
        self._source_label = source_label

    def execute_batched(self):
        return _batched(self._factory())  # ← calls the factory each time

Look at execute_batched. It calls self._factory() — with parentheses — to get a new iterator, then wraps it in _batched() to yield it in chunks of 1,024 rows (the same batching helper from Module 3). Every call to execute_batched gets a fresh read.

Compare this to ScanNode, which holds self._data as a plain list. ScanNode is for in-memory data that already exists. IteratorSourceNode is for data that lives somewhere else — a file, an API, a generator — and needs to be re-created on demand. Same interface, same execute() / execute_batched() contract. The rest of the plan tree can't tell the difference.


Section D.2 Constant-Memory File Reading

Our http_log_stream() yields Python dicts — pyfloe can infer the types at runtime. But what about CSV files, where every value arrives as a raw string? This is where the factory pattern meets real I/O, and where we encounter the one genuinely tricky problem in the whole streaming story: type inference.

Two-phase reading: sample, then stream

We want to sample a few rows to figure out types, then yield the rest one row at a time, never holding more than a single row in memory.

pyfloe's CSV reader does its work in two phases. Phase one happens at read time (when you call read_csv()). Phase two happens at execution time (when you call .collect() or .to_csv()).

Phase 1 At read time

Open the file. Read the header. Sample 100 rows for type inference. Close the file. Build a schema. Build a factory function. No data is stored.

Phase 2 At execution time

The factory reopens the file. Yields rows one at a time as typed tuples. Each row is consumed by the plan tree and discarded. O(1) memory.

Phase 1 is the clever part. The file is opened, the header is read, and a small sample (100 rows by default) is consumed. From these 100 rows, pyfloe infers the type of every column — integers, floats, booleans, strings, and even datetimes. Then the file is closed. The only things kept in memory are the column names, the inferred types, and a factory function that knows how to reopen the file later.

Type inference: the type ladder

Each cell in the sample is tested against a hierarchy of types, from most specific to least:

bool int float str first match wins

If a cell parses as bool, that's its type. If not, try int. Then float. If nothing works, it's a str. Within a column, types are promoted using a lattice: int + float → float, anything + str → str, and T + None → T(nullable).

How datetime detection works

After the basic pass, there's a second pass specifically for datetimes. Any column still typed as str gets checked: can the first successfully-parsed value's format be used as a template? If 80% or more of the sample values parse against that format, the column is retyped as datetime and the detected format string is cached. This avoids the cost of trying 14 different strptime patterns on every cell during streaming — datetime detection runs once per column on the sample, not per value.

The tradeoff of sampling
Type inference from a 100-row sample is fast and memory-efficient, but it's a bet: you're assuming the sample is representative. If row 101 contains a string in a column inferred as int, the cast will fail at execution time. This is the fundamental tradeoff — you gain O(1) memory but trade away guaranteed type safety across the whole file. Polars makes the same tradeoff with scan_csv().

The factory function

After type inference, _read_delimited() constructs the factory. Here's the core of it:

pyfloe's CSV factory (io.py, simplified)
def _read_delimited(path, delimiter, has_header, ...):
    # Phase 1: sample and infer types (at read time)
    with open(path) as f:
        reader = csv.reader(f, delimiter=delimiter)
        header = next(reader)
        col_names = [h.strip() for h in header]
        sample = [next(reader) for _ in range(100)]

    schema, col_types, dt_formats = _infer_schema(col_names, sample)

    # Phase 2: the factory (called at execution time)
    def make_rows():
        with open(path) as f:
            reader = csv.reader(f, delimiter=delimiter)
            next(reader)                  # skip header
            for row in reader:
                yield tuple(
                    _cast_value(row[i], col_types[i], dt_formats[i])
                    for i in range(n_cols)
                )

    return _FileStreamNode(col_names, schema, make_rows)

Notice what the factory closes over: path, col_types, and dt_formats. These are tiny — a string and two short lists. The factory itself does zero I/O until it's called. When it is called, it opens the file fresh, skips the header, and yields each row as a typed tuple. One row in memory at a time.

The with statement inside the generator ensures the file handle is closed when the generator finishes or is garbage-collected — which happens as soon as the plan tree finishes consuming rows, or when .head(10) stops pulling after 10 rows.

Why tuples, not dicts
Every row is yielded as a tuple, not a dict. You saw this in Module 3 — tuples are ~40% smaller (no key storage per row), faster to create, and hashable (needed for joins and group-by). The mapping from column name to position lives in the schema, not in every row.

The _FileStreamNode wraps this factory in a plan node (inheriting from PlanNode) with the same execute() and execute_batched() interface that every other node uses. From the perspective of the plan tree, it's just another leaf node. Whether the data comes from a list in memory, an infinite generator, or a file on disk is invisible to the filter, join, and project nodes above it.

How Polars does this
This is how Polars' scan_csv() works. It reads a sample, infers types, and builds a lazy plan — all without loading the file. When you call .collect(), that's when the file streams through the plan. You now know exactly how that works, because the pattern is identical: factory function → plan node → volcano pull.

Section D.3 Constant-Memory Writing

Reading was the hard part. Writing is almost trivially simple — once you have a streaming plan tree, writing to disk is just consuming the iterator into a file instead of a list. Don't collect. Don't materialize. Just iterate the plan directly and write each batch as it arrives:

Stream from the plan to the file
def to_csv_streaming(plan, columns, path):
    with open(path, "w") as f:
        writer = csv.writer(f)
        writer.writerow(columns)           # header
        for chunk in plan.execute_batched():
            writer.writerows(chunk)         # write batch, discard

That's it. Six lines. The plan tree yields batches of 1,024 rows (via the execute_batched() method from Module 3). Each batch is written to the file and then dropped. The next batch is pulled from the tree, which pulls from its child, which pulls from the leaf node's factory.

The end-to-end memory profile: one batch of tuples (1,024 rows × however many columns). That's a constant. It doesn't matter if the source has 10 thousand records or 10 billion.

Generator → yield → FilterNode → yield → ProjectNode → yield → CSV file Memory at any point: ≈ 1 batch (1,024 rows) Total rows processed: unlimited

pyfloe's implementation

pyfloe's _to_csv_impl (io.py)
def _to_csv_impl(lf, path, delimiter=",",
                 header=True, encoding="utf-8"):
    with open(path, "w", encoding=encoding, newline="") as f:
        writer = csv.writer(f, delimiter=delimiter)
        if header:
            writer.writerow(lf.columns)
        for chunk in lf._plan.execute_batched():
            writer.writerows(chunk)

Nearly identical to our simplified version. The JSONL writer is equally simple — same structure, but each row is serialized as JSON and written as a line.

The demo: stream → enrich → write

Here's the demo's run_stream_to_csv() in action. It reads 10,000 records from the infinite log generator, filters to 5xx errors, adds a computed column, selects the columns it needs, and writes to a CSV — all with constant memory:

From streaming_demo.py
Stream.from_iter(
    http_log_stream(limit=10_000),
    columns=["ts", "method", "path", "status",
             "latency_ms", "deploy_id"],
) \
    .filter(col("status") >= 500) \
    .with_column("latency_s", col("latency_ms") / 1000) \
    .select("ts", "path", "status", "latency_s", "deploy_id") \
    .to_csv("/tmp/errors.csv")

From generator to disk. The generator yields one record, the filter checks the status code, the with_column computes a new value, the select trims the columns, and to_csv writes the row. Then the next record. At no point does the full dataset exist in memory.

🧮 Memory calculator: tank vs. tube

Pick a file size. See the memory difference between loading everything (eager) and streaming through the plan tree.

In the wild
Polars calls this .sink_csv(). Spark calls it writing to a "streaming sink." Different names, same mechanism: pull from the plan tree, write to the output, discard each batch. There's no secret sauce — it's a for loop over an iterator connected to a file writer. A tube, not a tank.

Section D.4 The Stream Pipeline — What You Give Up When You Skip the Tree

Everything we've covered so far uses the same plan tree you built in Modules 3–5. from_iter() returns a LazyFrame backed by IteratorSourceNodeFilterNodeProjectNode → etc. The volcano model pulls rows through the tree. It works, it's elegant, and it streams with constant memory.

pyfloe's Stream class offers a faster alternative for simple pipelines — but it sacrifices everything you built in this course: no joins, no aggregation, no schema propagation, no optimizer. The things you built are exactly what Stream trades away for speed.

But our demo has a natural place to see both models side by side. The error summary needs joins and aggregation — it must use LazyFrame:

LazyFrame pipeline — needs the full plan tree
error_summary = (
    from_iter(http_log_stream(limit=10_000))
    .filter(col("status") >= 500)
    .with_column("latency_s", col("latency_ms") / 1000)
    .join(deploys, on="deploy_id")       # ← needs hash join
    .group_by("path", "deployer")
    .agg(
        col("latency_s").mean().alias("avg_latency"),
        col("status").count().alias("error_count"),
    )
)

But the simple filter-and-write-to-CSV part — no joins, no aggregation — doesn't need the tree at all. And the tree has a cost.

The overhead of trees

Every row that passes through a FilterNode triggers a method call: execute_batched() on the child, then predicate.eval(row, col_map) on the expression. For a ProjectNode, there's another method call and another expression evaluation. Each plan node adds a layer of generator orchestration per row.

For complex plans with joins and aggregations, this overhead is negligible compared to the actual work. But for simple, purely streaming pipelines — read, filter, add a column, write — the tree traversal overhead dominates.

The cost of indirection
A pipeline like filter → with_column → select → to_csv passes each row through three plan nodes. Each node calls execute_batched() on its child — that's three layers of Python generator frames per batch. When your work per row is tiny, the overhead of the tree itself becomes the bottleneck.

The flat loop

pyfloe's Stream class offers an alternative. Instead of building a tree of plan nodes, it records transforms as a flat list of steps and executes them in a single tight loop. No tree traversal, no nested generators.

How Stream records transforms
# Each method appends to self._transforms — no nodes created
stream = Stream.from_iter(source, columns=[...])
stream = stream.filter(col("status") >= 500)
stream = stream.with_column("latency_s", col("latency_ms") / 1000)
stream = stream.select("ts", "path", "status", "latency_s")

# Internally, _transforms is:
# [
#   ("filter",      <Expr: col("status") >= 500>),
#   ("with_column", "latency_s", <Expr: col("latency_ms") / 1000>),
#   ("select",      ["ts", "path", "status", "latency_s"]),
# ]

No nodes. No tree. Just a list of what to do. When it's time to execute, _build_processor() compiles this into a sequence of steps — each one a tuple with the operation kind, the pre-compiled expression or index list, and a frozen column map. Then _execute() runs a single loop:

pyfloe's Stream._execute() (stream.py, simplified)
def _execute(self):
    steps, _ = self._build_processor()

    for row in self._source_factory():    # one loop over the source
        skip = False
        current = row

        for step in steps:               # flat step list
            kind = step[0]
            if kind == "filter":
                if not step[1].eval(current, step[2]):
                    skip = True
                    break
            elif kind == "with_column":
                current = current + (step[1].eval(current, step[2]),)
            elif kind == "select":
                current = tuple(current[i] for i in step[1])

        if not skip:
            yield current

One outer loop over the source. One inner loop over the steps. No method dispatch, no generator chaining, no tree traversal.

Visualizing the difference

LazyFrame (volcano tree)

ProjectNode.execute_batched()
  → FilterNode.execute_batched()
    → IteratorSourceNode.execute_batched()
      → factory()

3 generator frames per batch

Stream (flat loop)

for row in factory():
  step 1: filter
  step 2: with_column
  step 3: select
  yield row
1 generator frame total

Measuring the difference

The demo includes a timing comparison — same filter + transform pipeline, 50,000 records, measured with time.perf_counter():

From streaming_demo.py — run_timing_comparison()
# LazyFrame (volcano tree)
t0 = time.perf_counter()
result_lf = (
    from_iter(lambda: http_log_stream(limit=50_000))
    .filter(col("status") >= 500)
    .with_column("latency_s", col("latency_ms") / 1000)
    .select("ts", "path", "status", "latency_s")
    .to_pylist()
)
t_lf = time.perf_counter() - t0

# Stream (flat loop)
t0 = time.perf_counter()
result_st = (
    Stream.from_iter(
        http_log_stream(limit=50_000),
        columns=["ts", "method", "path",
                 "status", "latency_ms", "deploy_id"],
    )
    .filter(col("status") >= 500)
    .with_column("latency_s", col("latency_ms") / 1000)
    .select("ts", "path", "status", "latency_s")
    .to_pylist()
)
t_st = time.perf_counter() - t0

Run it yourself — the Stream version is typically 20–40% faster for this kind of simple pipeline. The gap widens with more plan nodes (more generator frames to avoid) and narrows when per-row work is heavier (the overhead becomes a smaller fraction of total time).

Why this is faster
Python's generator protocol has real overhead: frame creation, suspension, resumption. Each yield/next() pair across a node boundary costs function-call overhead. The flat loop avoids all of that. The tradeoff: Stream can't do joins, aggregations, sorts, or optimization. It's only for filter → transform → write.

The foreach callback — a live dashboard

The demo's most Kafka-like feature is the rolling dashboard. It uses Stream.foreach() to process each record with a callback, maintaining manual accumulators just like the hash aggregation from Module 4:

From streaming_demo.py — run_live_dashboard()
stats = defaultdict(lambda: {"count": 0, "error_count": 0, ...})

def process(row):
    stats[row["path"]]["count"] += 1
    if row["status"] >= 500:
        stats[row["path"]]["error_count"] += 1
        # ... update accumulators ...
    if total["n"] % 1000 == 0:
        print_dashboard()  # rolling summary every 1,000 records

Stream.from_iter(source, columns=[...]).foreach(process)

The defaultdict accumulator is exactly the hash aggregation pattern from Module 4 — running sum, running count, running max — just done manually because Stream doesn't have group_by. But the student should recognize it: it's the same algorithm, different container.

When to use which

Use LazyFrame when…

You need joins, aggregations, sorts, window functions, or query optimization. This is the general-purpose engine — it handles anything.

Use Stream when…

You have a simple pipeline — read, filter, transform, write — and want maximum throughput with constant memory. No joins, no group-by, no optimizer. Just speed.

The demo uses both: LazyFrame for the enriched error summary (it needs a hash join against the deploys table and a group_by), and Stream for the CSV export and the live dashboard (pure filter → transform → write). That split is natural — it falls directly out of what each execution model supports.


Exercises Test Your Understanding

Question 1: Why a factory?

Why does IteratorSourceNode store a callable factory instead of a generator?

Question 2: Memory profile

For from_iter(http_log_stream(limit=10_000_000)).filter(...).to_csv("out.csv"), how much memory does the pipeline use at peak?

Question 3: Stream vs. LazyFrame

The demo uses LazyFrame for the error summary but Stream for the CSV export. Why can't the error summary use Stream?

Question 4: Type inference tradeoff

pyfloe infers CSV column types from a sample of 100 rows. What happens if row 101 contains a string in a column that was inferred as int?

Question 5: The live dashboard

The demo's run_live_dashboard() uses Stream.foreach() with a defaultdict accumulator. Which pattern from an earlier module is this most similar to?


Recap What You Now Know

Let's be honest about what just happened. This deep dive introduced one new pattern — the iterator factory — and spent the rest of its time showing you that the streaming I/O system is just your existing knowledge applied to files and infinite sources. The generator-based volcano model you built in Module 1? That is streaming. The batched execution from Module 3? That drives the writers. The hash aggregation from Module 4? That powers the live dashboard's accumulators.

The factory pattern — store a callable that creates a fresh iterator, not the iterator itself. This makes streaming sources repeatable. from_iter(lambda: source) instead of from_iter(source).

Constant-memory reading — sample 100 rows for type inference, then yield the rest one at a time through a factory. The schema is known instantly; the data streams on demand.

Constant-memory writing — iterate the plan tree directly into a file writer. No materialization, no intermediate storage. A tube, not a tank.

The Stream class — when the plan tree's overhead is the bottleneck, compile transforms into a flat loop. Same semantics, fewer function calls, ~30% faster for simple pipelines. Use it for filter → transform → write. Use LazyFrame for everything else.

What we didn't cover
pyfloe also supports from_chunks() for paginated API sources (each page yields a batch of rows), read_jsonl() for JSON Lines, read_parquet() via PyArrow, and Stream.from_csv() for direct CSV-to-Stream. They all follow the same factory pattern. If you understand read_csv and Stream.from_iter, you understand them all — the format parsing differs, the architecture is identical.

The next time someone at a conference talks about "streaming architecture" like it's astrophysics, you'll know the core: it's a generator, a factory, and a for loop. Distributed streaming adds real complexity (fault tolerance, backpressure, exactly-once delivery), but the fundamental mechanism is what you've already built.

Source References

Browse the pyfloe source code for the classes and functions covered in this deep dive.

core.py — LazyFrame API

stream.py — Streaming