Build Your Own DataFrame
Module 3 of 5

The Engine Room —
Plan Nodes & Batched Execution

In Module 1 we built a lazy pipeline with generators. In Module 2 we built an expression AST that captures computations as data. Now it's time to wire them together — and in doing so, build a real execution engine.

Right now, our two creations live in separate worlds. The plan tree from Module 1 uses lambdas to filter rows. The expression AST from Module 2 can describe col("age") > 28 as a tree — but has nowhere to run. This module is the wedding: expressions will plug into plan nodes, and together they'll form something that actually looks like a real query engine.

Along the way, we'll confront a performance problem that every pure-Python data engine faces, and solve it the same way pyfloe does: by processing data in batches instead of one row at a time. And we'll wrap everything in a LazyFrame class that gives end users the fluent API they expect.

Lesson 3.1 From Lambdas to Expressions in Plan Nodes

Let's recall the FilterNode we built in Module 1. It took a condition parameter — a lambda — and used it to decide which rows to yield:

Module 1's FilterNode — lambda-based
class FilterNode:
    def __init__(self, child: ScanNode, condition: Callable) -> None:
        self.child = child
        self.condition = condition  # a lambda — opaque!

    def execute(self) -> Iterator[tuple]:
        for row in self.child.execute():
            if self.condition(row):
                yield row

The predicate is now an Expr — the inspectable tree from Module 2, replacing the lambda. Let's plug it in.

Step 1: Replace the lambda with an Expr

The simplest upgrade is straightforward: instead of accepting a lambda, our FilterNode accepts an Expr and calls its eval() method on each row.

But eval() needs a col_map — a dictionary mapping column names to their positions in the tuple. Remember, our rows are stored as tuples like ("Alice", 30, "NYC") for efficiency, and the expression col("age") needs to know that "age" lives at index 1. The col_map is that lookup table:

The col_map bridges names to positions
# If columns are ["name", "age", "city"]
# then a row like ("Alice", 30, "NYC") needs:
col_map: dict[str, int] = {"name": 0, "age": 1, "city": 2}

# Now col("age").eval(row, col_map) → row[1] → 30

With that in mind, here's our first Expr-powered FilterNode:

FilterNode v2 — expression-based
class FilterNode:
    def __init__(self, child: PlanNode, predicate: Expr) -> None:
        self.child = child
        self.predicate = predicate  # an Expr — inspectable!

    def execute(self) -> Iterator[tuple]:
        # Build the name→index mapping from the child's columns
        col_map: dict[str, int] = {n: i for i, n in enumerate(self.child.columns)}
        for row in self.child.execute():
            if self.predicate.eval(row, col_map):
                yield row
Recall from Module 2 — how Expr.eval() works
class Col(Expr):
    def eval(self, row: tuple, col_map: dict[str, int]) -> Any:
        return row[col_map[self.name]]  # look up index, then value

class Lit(Expr):
    def eval(self, row: tuple, col_map: dict[str, int]) -> Any:
        return self.value              # always returns the constant

class BinaryExpr(Expr):
    def eval(self, row: tuple, col_map: dict[str, int]) -> Any:
        lv = self.left.eval(row, col_map)   # recurse left
        rv = self.right.eval(row, col_map)  # recurse right
        return self.op(lv, rv)              # apply operator

So predicate.eval(row, col_map) in our new FilterNode walks this tree recursively — BinaryExpr calls Col and Lit, which return values, then applies the operator.

Same structure. Same generator pattern. But the predicate is now an Expr object — and that means our engine can call predicate.required_columns() to learn it only needs {"age"}, or pass the whole expression to an optimizer that rewrites the plan tree. The black box is gone.

The two trees meet
The expression tree from Module 2 now plugs into the plan tree from Module 1. From here on, expressions always live inside plan nodes.
Try it
What happens if you write col("nonexistent_column") in a filter and call .collect()? Trace the error: compile() looks up the name in col_map, the key doesn't exist, you get a KeyError. In production pyfloe, this is caught earlier by schema validation.
Deep Cut
The compile() optimization is a performance technique, not an architectural requirement. Safe to skip on a first pass.

Step 2: The col_map problem

There's a subtle performance issue with our v2 FilterNode. Look at the execute() method: for every row, we call self.predicate.eval(row, col_map). That eval() call walks the expression tree recursively. For a simple filter like col("age") > 28, that's three method calls per row (BinaryExpr.eval → Col.eval + Lit.eval). For a million rows, that's three million Python function calls — just for the expression.

pyfloe has a clever solution: Expr.compile(). Instead of walking the tree at runtime, compile() walks the tree once and returns a single closure that does the same work. The col_map is baked into the closure at compile time — no dictionary lookups at runtime:

How compile() works for Col
class Col(Expr):
    def compile(self, col_map: dict[str, int]) -> Callable[[tuple], Any]:
        idx = col_map[self.name]   # look up ONCE
        return lambda row: row[idx]  # fast index at runtime

And for BinaryExpr:

How compile() works for BinaryExpr
class BinaryExpr(Expr):
    def compile(self, col_map: dict[str, int]) -> Callable[[tuple], Any]:
        left_fn = self.left.compile(col_map)    # compile children
        right_fn = self.right.compile(col_map)  # once, up front
        op = self.op
        def _eval(row: tuple) -> Any:
            lv = left_fn(row)
            rv = right_fn(row)
            if lv is None or rv is None:
                return None
            return op(lv, rv)
        return _eval

The idea is elegant: compile() recursively compiles each child into a closure, then wraps them all into one top-level closure. The result is a single callable that takes a row tuple and returns the answer — no tree walking, no dictionary lookups, no method dispatch. Just nested function calls with pre-resolved indices.

Now let's see pyfloe's real FilterNode, which uses compile() plus one more trick:

pyfloe's FilterNode (plan.py)
class FilterNode(PlanNode):
    __slots__ = ("child", "predicate")

    def __init__(self, child: PlanNode, predicate: Expr) -> None:
        self.child = child
        self.predicate = predicate

    def execute_batched(self) -> Iterator[list[tuple]]:
        col_map = {n: i for i, n in enumerate(
            self.child.schema().column_names)}
        pred_fn = self.predicate.compile(col_map)
        for chunk in self.child.execute_batched():
            filtered = list(compress(chunk, map(pred_fn, chunk)))
            if filtered:
                yield filtered

Three things to notice here beyond our simplified version:

__slots__ — Same pattern as Module 2's expression nodes: lightweight objects, no per-instance __dict__. You'll see __slots__ on every node class from here on — it's the standard pattern.

compress() — Instead of a for-loop with an if-statement, pyfloe uses itertools.compress(). This stdlib function takes a data sequence and a selector sequence, and yields only the data elements where the selector is truthy. Combined with map(pred_fn, chunk), it's a concise way to apply the predicate and filter in one shot — and it runs partly in C, which is faster than a Python loop.

execute_batched() — The method isn't execute(); it's execute_batched(). We'll get to this in Lesson 3.3, but the short version is: pyfloe processes data in chunks of 1,024 rows instead of one at a time. For now, just notice that the predicate is compiled once, before the loop — not per-row.

How Polars does this
When you write df.filter(col("age") > 28) in Polars, the same thing happens: the expression becomes an AST node, which Polars compiles into vectorized Rust code operating on Arrow arrays. The compilation is more sophisticated, but the architecture is the same: build a tree, compile it, run the compiled version against the data.

Lesson 3.2 ProjectNode — Select and Compute

Filtering rows is only half the story. A real query engine also needs to transform columns: selecting a subset, reordering them, or computing new ones. In SQL, this is the SELECT clause. In Polars, it's .select(). In our engine, it's the ProjectNode.

A ProjectNode sits in the plan tree just like a FilterNode, but instead of dropping rows, it reshapes them. It has two modes, and both are important.

Mode 1: Column selection

The simplest case: the user wants to keep only some columns. If your data has ["name", "age", "city", "salary"] and the user calls .select("name", "salary"), the ProjectNode needs to extract just those two columns from every row. Python's operator.itemgetter is built for exactly this:

itemgetter — fast multi-index extraction
from operator import itemgetter

getter = itemgetter(0, 3)       # extract indices 0 and 3
row = ("Alice", 30, "NYC", 85000)
getter(row)                       # → ("Alice", 85000)

itemgetter is implemented in C and returns a tuple directly — no intermediate generator, no Python loop. It's 2–5× faster than the generator expression approach for multi-column extraction.

Mode 2: Expression evaluation

The second mode is more powerful: instead of selecting existing columns, we compute new columns from expressions. When the user writes:

Selecting with expressions
df.select(
    col("name"),
    (col("price") * col("qty")).alias("total")
)

the ProjectNode needs to evaluate each expression per row and assemble the results into a new tuple. This is where compile() pays off again — we compile each expression once, then run the compiled closures per row:

ProjectNode with expression evaluation
class ProjectNode:
    def __init__(self, child: PlanNode,
                 columns: list[str] | None = None,
                 exprs: list[Expr] | None = None) -> None:
        self.child = child
        self._columns = columns  # for column selection
        self._exprs = exprs      # for expression evaluation

    def execute(self) -> Iterator[tuple]:
        col_map: dict[str, int] = {n: i for i, n in enumerate(self.child.columns)}
        if self._columns:
            getter = itemgetter(*[col_map[c] for c in self._columns])
            for row in self.child.execute():
                yield getter(row)
        elif self._exprs:
            compiled = [e.compile(col_map) for e in self._exprs]
            for row in self.child.execute():
                yield tuple(fn(row) for fn in compiled)

Notice the two branches: _columns (names only, use itemgetter) vs. _exprs (full expressions, use compiled closures). This dual-mode design lets the same node handle both .select("name", "age") and .select(col("price") * col("qty")).

Now let's see pyfloe's real implementation, which adds batched processing and a special case for single-column selection:

pyfloe's ProjectNode (plan.py) — execute_batched
class ProjectNode(PlanNode):
    __slots__ = ("child", "_columns", "_exprs")

    def execute_batched(self) -> Iterator[list[tuple]]:
        col_map = {n: i for i, n in enumerate(
            self.child.schema().column_names)}

        if self._columns:
            indices = [col_map[c] for c in self._columns]
            n = len(indices)
            if n == 1:
                idx = indices[0]
                for chunk in self.child.execute_batched():
                    yield [(row[idx],) for row in chunk]
            else:
                getter = itemgetter(*indices)
                for chunk in self.child.execute_batched():
                    yield list(map(getter, chunk))
        elif self._exprs:
            compiled = [e.compile(col_map) for e in self._exprs]
            for chunk in self.child.execute_batched():
                yield [tuple(fn(row) for fn in compiled)
                       for row in chunk]

The single-column special case (n == 1) avoids itemgetter entirely because itemgetter with one argument returns the value directly, not a tuple — which would break the rest of the engine that expects rows to always be tuples. The multi-column path uses list(map(getter, chunk)) instead of a list comprehension — this keeps the tight C-level map() loop for maximum speed.

The pattern repeats
Notice how both FilterNode and ProjectNode follow the same recipe: build the col_map from the child's schema, compile or resolve the expressions once, then loop over chunks. This pattern will repeat for every plan node we build. The compile step happens outside the loop; the hot path inside the loop is as lean as possible.

Lesson 3.3 Batched Execution — Why Not One Row at a Time?

In Module 1, we celebrated the elegance of generators: each plan node yields one row at a time, and data flows through the chain with minimal memory. But there's a cost we swept under the rug. Let's uncover it.

Recall from Module 1 — our generator-based plan nodes
class ScanNode:
    def __init__(self, data: list[tuple]) -> None:
        self.data = data
    def execute(self) -> Iterator[tuple]:
        for row in self.data:
            yield row       # one row per next() call

class FilterNode:
    def __init__(self, child: ScanNode, condition: Callable) -> None:
        self.child = child
        self.condition = condition
    def execute(self) -> Iterator[tuple]:
        for row in self.child.execute():
            if self.condition(row):
                yield row  # one row per next() call

Each yield suspends the generator and returns one row. The parent calls next() to resume it. Elegant — but every yield/next() pair is a Python function call with overhead.

The problem: function call overhead

Every time a Python generator yields a value, there's overhead: the frame object is suspended, the caller resumes, the result is passed across. Every time a function is called, there's overhead: argument packing, the call stack, frame creation. In C or Rust, function call overhead is negligible — a few nanoseconds. In Python, it's significant.

Consider a simple pipeline: ScanNode → FilterNode → ProjectNode. For each row, the execution looks like:

Row-at-a-time execution — hidden cost
# For EVERY ROW in the dataset:
#   1. ProjectNode calls next() on FilterNode     ← overhead
#   2. FilterNode calls next() on ScanNode         ← overhead
#   3. ScanNode yields one row                     ← overhead
#   4. FilterNode calls pred_fn(row)                ← overhead
#   5. FilterNode yields (maybe)                    ← overhead
#   6. ProjectNode calls getter(row)                ← overhead
#   7. ProjectNode yields                           ← overhead
#
# That's ~7 Python function calls per row.
# For 1 million rows: 7 million function calls.
# At ~100ns each: ~0.7 seconds of PURE OVERHEAD.

The actual filtering and projection — the "useful" work — might take only 0.1 seconds. But the overhead of shuttling rows one at a time through the Python generator chain dominates.

The overhead trap
This isn't a hypothetical problem. It's the reason every serious pure-Python data processing library uses some form of batching. If you're moving data through multiple stages in Python, the cost of moving it one piece at a time eventually overwhelms the cost of processing it.

Pause and predict: if each row incurs ~7 Python function calls for overhead, and you have 1 million rows, that's 7 million overhead calls. What if you could process 1,024 rows per generator transition instead of one? How many overhead calls would that be?

The solution: process in batches

The fix is conceptually simple: instead of yielding one row per next() call, yield a list of rows. If we yield 1,024 rows at a time, we cut the number of generator transitions by a factor of 1,024. The overhead per row drops from ~7 function calls to ~7/1024 ≈ 0.007 function calls — essentially free.

Here's the batched version of our FilterNode:

Batched FilterNode — 1024 rows at a time
class FilterNode:
    def execute_batched(self) -> Iterator[list[tuple]]:
        col_map: dict[str, int] = {n: i for i, n in enumerate(self.child.columns)}
        pred_fn = self.predicate.compile(col_map)
        # Pull CHUNKS from child, not individual rows
        for chunk in self.child.execute_batched():
            filtered = [row for row in chunk if pred_fn(row)]
            if filtered:
                yield filtered

The inner loop is now a list comprehension — one of the fastest loop constructs in Python. And we only yield across the generator boundary once per chunk, not once per row.

The _batched() helper

But where do the chunks come from in the first place? The leaf node (usually a ScanNode) has to slice its data into batches. pyfloe uses a small helper function for this:

pyfloe's _batched() helper (plan.py)
from itertools import islice

_BATCH_SIZE: int = 1024

def _batched(iterable: Iterable, n: int = _BATCH_SIZE) -> Iterator[list]:
    it = iter(iterable)
    while True:
        chunk = list(islice(it, n))
        if not chunk:
            break
        yield chunk

islice(it, n) pulls up to n items from the iterator without loading everything into memory. The list() call materializes just that chunk. The result: a generator that yields lists of up to 1,024 rows each.

Why 1,024?
The batch size of 1,024 is a pragmatic choice. Too small (say, 10) and you barely reduce overhead. Too large (say, 1,000,000) and you lose the streaming property — you'd hold a huge chunk in memory between nodes. 1,024 is a sweet spot: large enough to amortize Python's per-call overhead, small enough to keep memory usage modest.

Bridging batched and unbatched: PlanNode

pyfloe's base class, PlanNode, defines both interfaces and bridges them automatically:

pyfloe's PlanNode base class (plan.py)
from itertools import chain

class PlanNode:
    __slots__ = ()

    def execute(self) -> Iterator[tuple]:
        """Yield rows one at a time (flattens batched output)."""
        return chain.from_iterable(self.execute_batched())

    def execute_batched(self) -> Iterator[list[tuple]]:
        """Yield lists of rows. Subclasses override this."""
        raise NotImplementedError

The key insight: subclasses implement execute_batched(), and execute() comes for free. chain.from_iterable takes an iterable of lists and flattens them into a single stream of individual items — it's the bridge between the batched world (where the engine operates internally) and the unbatched world (where the user consumes results one at a time).

Let's see how ScanNode produces its batches:

pyfloe's ScanNode (plan.py)
class ScanNode(PlanNode):
    __slots__ = ("_data", "_columns", "_schema")

    def __init__(self, data: list[tuple],
                 columns: list[str],
                 lazy_schema: LazySchema | None = None) -> None:
        self._data = data
        self._columns = list(columns)
        self._schema = lazy_schema

    def execute_batched(self) -> Iterator[list[tuple]]:
        data = self._data
        for i in range(0, len(data), _BATCH_SIZE):
            yield data[i : i + _BATCH_SIZE]

Because the data is already an in-memory list, ScanNode can just slice it directly — no need for the _batched() helper. That helper is used by IteratorSourceNode, which reads from generators (like a file reader) where you can't slice but can islice.

Row-at-a-time

Each next() crosses the generator boundary. For N rows across K nodes: N × K function calls.

Batched (1,024)

Each next() yields 1,024 rows. Same pipeline: N/1024 × K function calls. Inner loop runs as a fast list comprehension.

How Polars does this
In Polars, the equivalent of our batch is an Arrow "chunk" — a contiguous block of memory holding thousands of values in columnar format. Polars applies SIMD instructions to process entire chunks at once. Our Python batches don't get SIMD, but the architectural idea is identical: reduce the number of times you cross a function boundary, and do as much work as possible inside tight inner loops.

Lesson 3.4 The LazyFrame Wrapper

We now have a working engine: expressions compile into fast closures, plan nodes process data in batches, and generators chain everything together. But the user-facing API is still rough. To build a pipeline today, you'd write something like:

Wiring plan nodes by hand — painful
scan = ScanNode(data, ["name", "age", "city"])
filt = FilterNode(scan, col("age") > 28)
proj = ProjectNode(filt, ["name", "age"])

# Collect results manually
results = []
for chunk in proj.execute_batched():
    results.extend(chunk)

This is the engine's internal wiring exposed to the user — and it's terrible. Every operation creates a raw node. The user has to remember which node takes which arguments. There's no method chaining, no .collect(), no fluent API. Compare this to what a Polars user writes:

What we want — a fluent API
result = (
    LazyFrame(data)
    .filter(col("age") > 28)
    .select("name", "age")
    .collect()
)
Recall from Module 1 — our first LazyFrame sketch
class LazyFrame:
    def __init__(self, plan_node: PlanNode) -> None:
        self._plan = plan_node

    def filter(self, condition: Callable) -> LazyFrame:
        new_plan = FilterNode(self._plan, condition)
        return LazyFrame(new_plan)

    def collect(self) -> list[tuple]:
        return list(self._plan.execute())

Module 1's version accepted a raw plan node and used lambdas. Now we'll accept user data directly, use expressions, handle batched execution, and add _from_plan for internal construction.

The LazyFrame is the thin wrapper that bridges the gap. It holds a plan tree internally, and each method call just wraps the current plan in a new node:

Our LazyFrame — a thin wrapper over the plan tree
class LazyFrame:
    def __init__(self, data: list[dict]) -> None:
        # Convert list-of-dicts to internal format
        columns, rows = dicts_to_tuples(data)
        self._plan = ScanNode(rows, columns)

    def filter(self, predicate: Expr) -> LazyFrame:
        # Don't execute — just wrap in a new node!
        return LazyFrame._from_plan(
            FilterNode(self._plan, predicate))

    def select(self, *columns: str) -> LazyFrame:
        return LazyFrame._from_plan(
            ProjectNode(self._plan, list(columns)))

    def collect(self) -> LazyFrame:
        # NOW we execute — pull all data through the plan
        data = []
        for chunk in self._plan.execute_batched():
            data.extend(chunk)
        self._materialized = data
        return self

Look at .filter(): it doesn't touch any data. It creates a FilterNode that wraps the current plan, then returns a new LazyFrame pointing at this deeper plan tree. Each method call adds one node on top. The tree grows downward. .collect() is the only method that triggers execution — it pulls data up through the entire chain.

Deep Cut
_from_plan is an implementation detail of how LazyFrame constructs itself internally. Safe to skip.

The _from_plan classmethod

Notice that .filter() doesn't call LazyFrame.__init__() — that would try to convert data from dicts again. Instead, it calls _from_plan, a classmethod that creates a LazyFrame directly from an existing plan node:

_from_plan — construct without __init__
class LazyFrame:
    @classmethod
    def _from_plan(cls, plan: PlanNode) -> LazyFrame:
        lf = cls.__new__(cls)  # create instance, skip __init__
        lf._plan = plan
        lf._materialized = None
        return lf

cls.__new__(cls) creates a bare instance without calling __init__. Then we set the attributes directly. This is a common pattern in libraries that have two distinct construction paths: one for end users (from raw data) and one for internal plumbing (from an existing plan).

Every method is a node constructor
This is the key insight about lazy APIs: every method on LazyFrame is secretly a plan node constructor. .filter() creates a FilterNode. .select() creates a ProjectNode. .with_column() creates a WithColumnNode. The user thinks they're "doing" something to the data; in reality, they're building a tree of instructions that will execute later.

Now let's look at pyfloe's actual LazyFrame:

pyfloe's LazyFrame (core.py) — key methods
class LazyFrame:
    __slots__ = ("_plan", "_materialized", "_name", "_optimized")

    def filter(self, predicate: Expr) -> LazyFrame:
        return LazyFrame._from_plan(
            FilterNode(self._plan, predicate))

    def select(self, *args: str | Expr) -> LazyFrame:
        if all(isinstance(a, str) for a in args):
            return LazyFrame._from_plan(
                ProjectNode(self._plan, list(args)))
        exprs = [Col(a) if isinstance(a, str) else a
                 for a in args]
        return LazyFrame._from_plan(
            ProjectNode(self._plan, exprs=exprs))

    def with_column(self, name: str, expr: Expr) -> LazyFrame:
        return LazyFrame._from_plan(
            WithColumnNode(self._plan, name, expr))

    def collect(self) -> LazyFrame:
        if self._materialized is None:
            plan = self._exec_plan  # optimized!
            data = []
            for chunk in plan.execute_batched():
                data.extend(chunk)
            self._materialized = data
        return self

Beyond our simplified version, pyfloe adds:

__slots__ — Again, eliminating the instance __dict__. A LazyFrame stores just four things: the plan, cached data, a name, and an optimized plan.

.select() with mixed arguments — If all arguments are strings, it does column selection (fast path). If any are expressions, it converts string arguments to Col objects and does expression evaluation. This lets users mix both: .select("name", (col("price") * col("qty")).alias("total")).

_exec_plan — When you call .collect(), pyfloe doesn't just execute the raw plan — it runs the optimizer first (which we'll build in Module 5). The _exec_plan property caches the optimized plan so it's only computed once.

In the wild
Polars' LazyFrame works identically: each method returns a new LazyFrame with a deeper plan tree. .collect() triggers the optimizer and then the executor. The only difference is implementation language: Polars' plan nodes are Rust structs, and .collect() triggers a Rust-native execution engine. But the Python-level pattern — the thin wrapper, the plan tree, the deferred execution — is the same.

Lesson 3.5 Schema Propagation Without Data

Here's a question that might seem impossible to answer: if a LazyFrame hasn't executed yet — no data has flowed, no rows have been processed — how can it know what its output columns and types are?

Schema without data — how?
pipeline = (
    LazyFrame(orders)
    .filter(col("amount") > 100)
    .with_column("tax", col("amount") * 0.2)
    .select("order_id", "amount", "tax")
)

pipeline.schema   # ← How does this work? No data has flowed!

And yet, Polars does exactly this. You can call .schema on a lazy pipeline and get instant answers — column names, types, nullability — without triggering execution. It's one of the most useful features for interactive development: catch column typos and type mismatches before waiting for a query to run.

The trick: every plan node knows how to compute its output schema from its input schema. The schema propagates bottom-up: leaf nodes like ScanNode know their columns from the data source, and every node above derives its schema from its children. The entire propagation happens instantly, by walking the tree's structure alone — no data flows at all.

Building ColumnSchema

We need a way to describe a single column — its name, type, and whether it can contain None. pyfloe uses a frozen dataclass for this:

pyfloe's ColumnSchema (schema.py)
from dataclasses import dataclass

@dataclass(frozen=True)
class ColumnSchema:
    name: str
    dtype: type = str
    nullable: bool = True

    def with_name(self, name: str) -> ColumnSchema:
        return ColumnSchema(name, self.dtype, self.nullable)

    def with_dtype(self, dtype: type) -> ColumnSchema:
        return ColumnSchema(self.name, dtype, self.nullable)

frozen=True means the dataclass is immutable — once created, its fields can't be changed. This is important because schemas are shared across the plan tree; you don't want one node's schema modification to accidentally affect another node. The with_name() and with_dtype() methods follow the immutable pattern: they return new objects instead of modifying the existing one.

Building LazySchema

A LazySchema is a collection of ColumnSchema objects — one per column, in order. It's the schema for an entire plan node's output:

pyfloe's LazySchema (schema.py) — core structure
class LazySchema:
    __slots__ = ('_columns',)

    def __init__(self, columns: dict[str, ColumnSchema] | None = None) -> None:
        self._columns = columns or {}

    @property
    def column_names(self) -> list[str]:
        return list(self._columns.keys())

    @property
    def dtypes(self) -> dict[str, type]:
        return {n: c.dtype for n, c in self._columns.items()}

LazySchema uses __slots__ too — it stores just one thing, the ordered dictionary of columns. Now we need methods that let plan nodes derive new schemas from existing ones:

LazySchema — transformation methods
class LazySchema:
    # ...continued

    def select(self, columns: list[str]) -> LazySchema:
        """Keep only the named columns."""
        return LazySchema({n: self._columns[n]
                           for n in columns})

    def with_column(self, name: str, dtype: type, nullable: bool = True) -> LazySchema:
        """Return schema with one column added or replaced."""
        cols = dict(self._columns)
        cols[name] = ColumnSchema(name, dtype, nullable)
        return LazySchema(cols)

    def merge(self, other: LazySchema, suffix: str = 'right_') -> LazySchema:
        """Merge two schemas (for joins)."""
        cols = dict(self._columns)
        for name, col in other._columns.items():
            if name in cols:
                new_name = suffix + name
                cols[new_name] = col.with_name(new_name)
            else:
                cols[name] = col
        return LazySchema(cols)

Every method returns a new LazySchema — the original is never modified. This immutability is crucial: a FilterNode and the ProjectNode above it can both reference the same child node's schema without stepping on each other.

Wiring schema into plan nodes

Now the payoff. Each plan node implements a .schema() method that computes its output schema by transforming its child's schema. The rules are intuitive:

Schema propagation rules
class FilterNode(PlanNode):
    def schema(self) -> LazySchema:
        return self.child.schema()  # same columns, fewer rows

class ProjectNode(PlanNode):
    def schema(self) -> LazySchema:
        if self._columns:
            return self.child.schema().select(self._columns)
        # For expressions: infer type of each
        parent = self.child.schema()
        cols = {}
        for expr in self._exprs:
            name = expr.output_name()
            dtype = expr.output_dtype(parent)
            cols[name] = ColumnSchema(name, dtype)
        return LazySchema(cols)

class WithColumnNode(PlanNode):
    def schema(self) -> LazySchema:
        parent = self.child.schema()
        dtype = self._expr.output_dtype(parent)
        return parent.with_column(self._name, dtype)

Look at what's happening: no data is involved. FilterNode.schema() just passes through its child's schema (filtering doesn't change columns). ProjectNode.schema() calls .select() on the child's schema to keep only the requested columns. WithColumnNode.schema() calls .with_column() to append a new column with the type that Expr.output_dtype() infers.

And here's where the expression AST from Module 2 closes the loop. Remember how every Expr subclass implements output_dtype()?

Recall from Module 2 — output_dtype() on each Expr node
class Col(Expr):
    def output_dtype(self, schema: LazySchema) -> type:
        return schema[self.name].dtype  # look up from schema

class Lit(Expr):
    def output_dtype(self, schema: LazySchema) -> type:
        return type(self.value)        # type of the constant

class BinaryExpr(Expr):
    def output_dtype(self, schema: LazySchema) -> type:
        if self.op_str in ('>', '<', '==', '!=', '&', '|'):
            return bool          # comparisons → bool
        return self.left.output_dtype(schema)  # arithmetic → left type

Each expression node knows its output type without seeing any data. Col looks it up in the schema. Lit uses type(). BinaryExpr returns bool for comparisons and inherits from the left operand for arithmetic.

Expression type inference — no data needed
# Col looks up the type from the schema
col("amount").output_dtype(schema)  # → float (from schema)

# BinaryExpr with comparison → always bool
(col("amount") > 100).output_dtype(schema)  # → bool

# BinaryExpr with arithmetic → inherits from left operand
(col("amount") * 0.2).output_dtype(schema)  # → float

The schema walks the plan tree from top to bottom to find the leaf's schema (which is known from the data), then propagates it back up through each node's .schema() method. It's recursive, but the recursion depth equals the number of plan nodes — usually less than 20 — so it's effectively instant.

How Polars does this
This is exactly how Polars' .schema property works. When you write pipeline.schema in Polars, no data flows. Polars walks the plan tree, computing each node's output schema from the node below it. That's how it can tell you instantly that your join will produce a right_id column, or that col("price") * 0.2 will be a float — before scanning a single row.
Deep Cut
Schema inference is useful but not on the critical path to Module 5. Safe to skip.

Inferring schema from data

The leaf node — typically ScanNode — needs an initial schema. If the user doesn't provide one, pyfloe infers it by sampling the data:

LazySchema.from_data() — infer types from a sample
@classmethod
def from_data(cls, columns: list[str], rows: list[tuple]) -> LazySchema:
    if not rows:
        return cls({n: ColumnSchema(n) for n in columns})
    sample = rows[:min(1000, len(rows))]
    cols = {}
    for i, name in enumerate(columns):
        dtype = str
        nullable = False
        for row in sample:
            val = row[i]
            if val is None:
                nullable = True
            elif dtype is str:
                dtype = type(val)
        cols[name] = ColumnSchema(name, dtype, nullable)
    return cls(cols)

It examines up to 1,000 rows for in-memory data (100 for file readers), checking each column for its Python type and whether None appears. The default type is str (the safest assumption), which gets overridden the moment a non-None value is encountered. pyfloe also has a separate from_dicts() classmethod for when data arrives as dictionaries — but the logic is essentially the same.

Coming in Module 5
Schema propagation isn't just a nice developer feature — it's essential for the optimizer. In Module 5, the column pruning pass will ask each plan node "what columns do you actually need?" by walking the schemas. Without LazySchema, the optimizer would have no way to know which columns are safe to drop.

Exercises Test Your Understanding

Quiz: The Engine Room

1. Why does pyfloe's FilterNode call predicate.compile(col_map) instead of using predicate.eval(row, col_map) inside the loop?

2. What happens when you call LazyFrame.filter(col("age") > 28)?

3. Why does pyfloe process rows in batches of 1,024 instead of one at a time?

4. How does FilterNode.schema() compute its output schema?

5. What does PlanNode.execute() return, and how does it relate to execute_batched()?

Hands-On Challenge: Build a LimitNode

Implement a LimitNode that stops pulling after N rows — the equivalent of SQL's LIMIT or Polars' .head(). Think about:

1. It should work with execute_batched(). What happens when the limit falls in the middle of a batch?
2. What should .schema() return? (Hint: limiting rows doesn't change columns.)
3. The key insight: once you've yielded enough rows, you can return from the generator — which tells the entire upstream chain to stop pulling. This is the early termination property from Module 1, applied to batched execution.

Thinking question
If you have ScanNode → FilterNode → LimitNode(5), and the filter's selectivity is 10%, roughly how many rows does ScanNode yield before LimitNode stops pulling? (Answer: ~50, because on average you need to scan 50 rows to find 5 that pass a 10% filter.)

Source References
core.py — LazyFrame API
expr.py — Expression AST
schema.py — Type System