The Engine Room —
Plan Nodes & Batched Execution
In Module 1 we built a lazy pipeline with generators. In Module 2 we built an expression AST that captures computations as data. Now it's time to wire them together — and in doing so, build a real execution engine.
Right now, our two creations live in separate worlds. The plan tree
from Module 1 uses lambdas to filter rows. The expression AST from
Module 2 can describe col("age") > 28 as a tree — but
has nowhere to run. This module is the wedding: expressions will plug
into plan nodes, and together they'll form something that actually
looks like a real query engine.
Along the way, we'll confront a performance problem that every
pure-Python data engine faces, and solve it the same way pyfloe does:
by processing data in batches instead of one row at
a time. And we'll wrap everything in a LazyFrame class
that gives end users the fluent API they expect.
Lesson 3.1 From Lambdas to Expressions in Plan Nodes
Let's recall the FilterNode we built in Module 1. It took
a condition parameter — a lambda — and used it to decide
which rows to yield:
class FilterNode:
def __init__(self, child: ScanNode, condition: Callable) -> None:
self.child = child
self.condition = condition # a lambda — opaque!
def execute(self) -> Iterator[tuple]:
for row in self.child.execute():
if self.condition(row):
yield row
The predicate is now an Expr — the inspectable tree from
Module 2, replacing the lambda. Let's plug it in.
Step 1: Replace the lambda with an Expr
The simplest upgrade is straightforward: instead of accepting a lambda,
our FilterNode accepts an Expr and calls its
eval() method on each row.
But eval() needs a col_map — a dictionary
mapping column names to their positions in the tuple. Remember, our rows
are stored as tuples like ("Alice", 30, "NYC") for
efficiency, and the expression col("age") needs to know
that "age" lives at index 1. The col_map is
that lookup table:
# If columns are ["name", "age", "city"]
# then a row like ("Alice", 30, "NYC") needs:
col_map: dict[str, int] = {"name": 0, "age": 1, "city": 2}
# Now col("age").eval(row, col_map) → row[1] → 30
With that in mind, here's our first Expr-powered FilterNode:
class FilterNode:
def __init__(self, child: PlanNode, predicate: Expr) -> None:
self.child = child
self.predicate = predicate # an Expr — inspectable!
def execute(self) -> Iterator[tuple]:
# Build the name→index mapping from the child's columns
col_map: dict[str, int] = {n: i for i, n in enumerate(self.child.columns)}
for row in self.child.execute():
if self.predicate.eval(row, col_map):
yield row
Recall from Module 2 — how Expr.eval() works
class Col(Expr):
def eval(self, row: tuple, col_map: dict[str, int]) -> Any:
return row[col_map[self.name]] # look up index, then value
class Lit(Expr):
def eval(self, row: tuple, col_map: dict[str, int]) -> Any:
return self.value # always returns the constant
class BinaryExpr(Expr):
def eval(self, row: tuple, col_map: dict[str, int]) -> Any:
lv = self.left.eval(row, col_map) # recurse left
rv = self.right.eval(row, col_map) # recurse right
return self.op(lv, rv) # apply operator
So predicate.eval(row, col_map) in our new FilterNode
walks this tree recursively — BinaryExpr calls Col and Lit, which
return values, then applies the operator.
Same structure. Same generator pattern. But the predicate
is now an Expr object — and that means our engine can call
predicate.required_columns() to learn it only needs
{"age"}, or pass the whole expression to an optimizer that
rewrites the plan tree. The black box is gone.
col("nonexistent_column") in a filter
and call .collect()? Trace the error: compile()
looks up the name in col_map, the key doesn't exist, you get a
KeyError. In production pyfloe, this is caught earlier by
schema validation.
compile() optimization is a performance technique, not an architectural requirement. Safe to skip on a first pass.
Step 2: The col_map problem
There's a subtle performance issue with our v2 FilterNode.
Look at the execute() method: for every row, we
call self.predicate.eval(row, col_map). That eval()
call walks the expression tree recursively. For a simple filter like
col("age") > 28, that's three method calls per row
(BinaryExpr.eval → Col.eval + Lit.eval). For a million rows, that's
three million Python function calls — just for the expression.
pyfloe has a clever solution: Expr.compile(). Instead of
walking the tree at runtime, compile() walks the tree
once and returns a single closure that does the same work.
The col_map is baked into the closure at compile time — no dictionary
lookups at runtime:
class Col(Expr):
def compile(self, col_map: dict[str, int]) -> Callable[[tuple], Any]:
idx = col_map[self.name] # look up ONCE
return lambda row: row[idx] # fast index at runtime
And for BinaryExpr:
class BinaryExpr(Expr):
def compile(self, col_map: dict[str, int]) -> Callable[[tuple], Any]:
left_fn = self.left.compile(col_map) # compile children
right_fn = self.right.compile(col_map) # once, up front
op = self.op
def _eval(row: tuple) -> Any:
lv = left_fn(row)
rv = right_fn(row)
if lv is None or rv is None:
return None
return op(lv, rv)
return _eval
The idea is elegant: compile() recursively compiles each
child into a closure, then wraps them all into one top-level closure.
The result is a single callable that takes a row tuple and returns the
answer — no tree walking, no dictionary lookups, no method dispatch.
Just nested function calls with pre-resolved indices.
Now let's see pyfloe's real FilterNode, which uses
compile() plus one more trick:
class FilterNode(PlanNode):
__slots__ = ("child", "predicate")
def __init__(self, child: PlanNode, predicate: Expr) -> None:
self.child = child
self.predicate = predicate
def execute_batched(self) -> Iterator[list[tuple]]:
col_map = {n: i for i, n in enumerate(
self.child.schema().column_names)}
pred_fn = self.predicate.compile(col_map)
for chunk in self.child.execute_batched():
filtered = list(compress(chunk, map(pred_fn, chunk)))
if filtered:
yield filtered
Three things to notice here beyond our simplified version:
__slots__ — Same pattern as Module 2's
expression nodes: lightweight objects, no per-instance
__dict__. You'll see __slots__ on every node
class from here on — it's the standard pattern.
compress() — Instead of a for-loop with
an if-statement, pyfloe uses itertools.compress(). This
stdlib function takes a data sequence and a selector sequence, and
yields only the data elements where the selector is truthy. Combined
with map(pred_fn, chunk), it's a concise way to apply the
predicate and filter in one shot — and it runs partly in C, which is
faster than a Python loop.
execute_batched() — The method isn't
execute(); it's execute_batched(). We'll
get to this in Lesson 3.3, but the short version is: pyfloe processes
data in chunks of 1,024 rows instead of one at a time. For now, just
notice that the predicate is compiled once, before the loop —
not per-row.
df.filter(col("age") > 28) in Polars, the
same thing happens: the expression becomes an AST node, which Polars
compiles into vectorized Rust code operating on Arrow arrays. The
compilation is more sophisticated, but the architecture is the same:
build a tree, compile it, run the compiled version against the data.
Lesson 3.2 ProjectNode — Select and Compute
Filtering rows is only half the story. A real query engine also needs
to transform columns: selecting a subset, reordering them, or
computing new ones. In SQL, this is the SELECT clause.
In Polars, it's .select(). In our engine, it's the
ProjectNode.
A ProjectNode sits in the plan tree just like a
FilterNode, but instead of dropping rows, it reshapes them.
It has two modes, and both are important.
Mode 1: Column selection
The simplest case: the user wants to keep only some columns. If your
data has ["name", "age", "city", "salary"] and the user
calls .select("name", "salary"), the ProjectNode
needs to extract just those two columns from every row. Python's
operator.itemgetter is built for exactly this:
from operator import itemgetter
getter = itemgetter(0, 3) # extract indices 0 and 3
row = ("Alice", 30, "NYC", 85000)
getter(row) # → ("Alice", 85000)
itemgetter is implemented in C and returns a tuple directly —
no intermediate generator, no Python loop. It's 2–5× faster than the
generator expression approach for multi-column extraction.
Mode 2: Expression evaluation
The second mode is more powerful: instead of selecting existing columns, we compute new columns from expressions. When the user writes:
df.select(
col("name"),
(col("price") * col("qty")).alias("total")
)
the ProjectNode needs to evaluate each expression per row
and assemble the results into a new tuple. This is where
compile() pays off again — we compile each expression once,
then run the compiled closures per row:
class ProjectNode:
def __init__(self, child: PlanNode,
columns: list[str] | None = None,
exprs: list[Expr] | None = None) -> None:
self.child = child
self._columns = columns # for column selection
self._exprs = exprs # for expression evaluation
def execute(self) -> Iterator[tuple]:
col_map: dict[str, int] = {n: i for i, n in enumerate(self.child.columns)}
if self._columns:
getter = itemgetter(*[col_map[c] for c in self._columns])
for row in self.child.execute():
yield getter(row)
elif self._exprs:
compiled = [e.compile(col_map) for e in self._exprs]
for row in self.child.execute():
yield tuple(fn(row) for fn in compiled)
Notice the two branches: _columns (names only, use
itemgetter) vs. _exprs (full expressions,
use compiled closures). This dual-mode design lets the same node handle
both .select("name", "age") and
.select(col("price") * col("qty")).
Now let's see pyfloe's real implementation, which adds batched processing and a special case for single-column selection:
class ProjectNode(PlanNode):
__slots__ = ("child", "_columns", "_exprs")
def execute_batched(self) -> Iterator[list[tuple]]:
col_map = {n: i for i, n in enumerate(
self.child.schema().column_names)}
if self._columns:
indices = [col_map[c] for c in self._columns]
n = len(indices)
if n == 1:
idx = indices[0]
for chunk in self.child.execute_batched():
yield [(row[idx],) for row in chunk]
else:
getter = itemgetter(*indices)
for chunk in self.child.execute_batched():
yield list(map(getter, chunk))
elif self._exprs:
compiled = [e.compile(col_map) for e in self._exprs]
for chunk in self.child.execute_batched():
yield [tuple(fn(row) for fn in compiled)
for row in chunk]
The single-column special case (n == 1) avoids
itemgetter entirely because itemgetter with
one argument returns the value directly, not a tuple — which would
break the rest of the engine that expects rows to always be tuples.
The multi-column path uses list(map(getter, chunk))
instead of a list comprehension — this keeps the tight C-level
map() loop for maximum speed.
FilterNode and ProjectNode
follow the same recipe: build the col_map from the child's
schema, compile or resolve the expressions once, then loop over chunks.
This pattern will repeat for every plan node we build. The compile step
happens outside the loop; the hot path inside the loop is as lean as
possible.
Lesson 3.3 Batched Execution — Why Not One Row at a Time?
In Module 1, we celebrated the elegance of generators: each plan node yields one row at a time, and data flows through the chain with minimal memory. But there's a cost we swept under the rug. Let's uncover it.
Recall from Module 1 — our generator-based plan nodes
class ScanNode:
def __init__(self, data: list[tuple]) -> None:
self.data = data
def execute(self) -> Iterator[tuple]:
for row in self.data:
yield row # one row per next() call
class FilterNode:
def __init__(self, child: ScanNode, condition: Callable) -> None:
self.child = child
self.condition = condition
def execute(self) -> Iterator[tuple]:
for row in self.child.execute():
if self.condition(row):
yield row # one row per next() call
Each yield suspends the generator and returns one row.
The parent calls next() to resume it. Elegant — but
every yield/next() pair is a Python
function call with overhead.
The problem: function call overhead
Every time a Python generator yields a value, there's overhead: the frame object is suspended, the caller resumes, the result is passed across. Every time a function is called, there's overhead: argument packing, the call stack, frame creation. In C or Rust, function call overhead is negligible — a few nanoseconds. In Python, it's significant.
Consider a simple pipeline: ScanNode → FilterNode → ProjectNode.
For each row, the execution looks like:
# For EVERY ROW in the dataset:
# 1. ProjectNode calls next() on FilterNode ← overhead
# 2. FilterNode calls next() on ScanNode ← overhead
# 3. ScanNode yields one row ← overhead
# 4. FilterNode calls pred_fn(row) ← overhead
# 5. FilterNode yields (maybe) ← overhead
# 6. ProjectNode calls getter(row) ← overhead
# 7. ProjectNode yields ← overhead
#
# That's ~7 Python function calls per row.
# For 1 million rows: 7 million function calls.
# At ~100ns each: ~0.7 seconds of PURE OVERHEAD.
The actual filtering and projection — the "useful" work — might take only 0.1 seconds. But the overhead of shuttling rows one at a time through the Python generator chain dominates.
Pause and predict: if each row incurs ~7 Python function calls for overhead, and you have 1 million rows, that's 7 million overhead calls. What if you could process 1,024 rows per generator transition instead of one? How many overhead calls would that be?
The solution: process in batches
The fix is conceptually simple: instead of yielding one row per
next() call, yield a list of rows. If we yield
1,024 rows at a time, we cut the number of generator transitions by a
factor of 1,024. The overhead per row drops from ~7 function calls to
~7/1024 ≈ 0.007 function calls — essentially free.
Here's the batched version of our FilterNode:
class FilterNode:
def execute_batched(self) -> Iterator[list[tuple]]:
col_map: dict[str, int] = {n: i for i, n in enumerate(self.child.columns)}
pred_fn = self.predicate.compile(col_map)
# Pull CHUNKS from child, not individual rows
for chunk in self.child.execute_batched():
filtered = [row for row in chunk if pred_fn(row)]
if filtered:
yield filtered
The inner loop is now a list comprehension — one of the fastest loop constructs in Python. And we only yield across the generator boundary once per chunk, not once per row.
The _batched() helper
But where do the chunks come from in the first place? The leaf node
(usually a ScanNode) has to slice its data into batches.
pyfloe uses a small helper function for this:
from itertools import islice
_BATCH_SIZE: int = 1024
def _batched(iterable: Iterable, n: int = _BATCH_SIZE) -> Iterator[list]:
it = iter(iterable)
while True:
chunk = list(islice(it, n))
if not chunk:
break
yield chunk
islice(it, n) pulls up to n items from the
iterator without loading everything into memory. The list()
call materializes just that chunk. The result: a generator that yields
lists of up to 1,024 rows each.
Bridging batched and unbatched: PlanNode
pyfloe's base class, PlanNode, defines both interfaces
and bridges them automatically:
from itertools import chain
class PlanNode:
__slots__ = ()
def execute(self) -> Iterator[tuple]:
"""Yield rows one at a time (flattens batched output)."""
return chain.from_iterable(self.execute_batched())
def execute_batched(self) -> Iterator[list[tuple]]:
"""Yield lists of rows. Subclasses override this."""
raise NotImplementedError
The key insight: subclasses implement execute_batched(),
and execute() comes for free.
chain.from_iterable takes an iterable of lists and flattens
them into a single stream of individual items — it's the bridge between
the batched world (where the engine operates internally) and the
unbatched world (where the user consumes results one at a time).
Let's see how ScanNode produces its batches:
class ScanNode(PlanNode):
__slots__ = ("_data", "_columns", "_schema")
def __init__(self, data: list[tuple],
columns: list[str],
lazy_schema: LazySchema | None = None) -> None:
self._data = data
self._columns = list(columns)
self._schema = lazy_schema
def execute_batched(self) -> Iterator[list[tuple]]:
data = self._data
for i in range(0, len(data), _BATCH_SIZE):
yield data[i : i + _BATCH_SIZE]
Because the data is already an in-memory list, ScanNode
can just slice it directly — no need for the _batched()
helper. That helper is used by IteratorSourceNode, which
reads from generators (like a file reader) where you can't slice but
can islice.
Row-at-a-time
Each next() crosses the generator boundary. For N rows across K nodes: N × K function calls.
Batched (1,024)
Each next() yields 1,024 rows. Same pipeline: N/1024 × K function calls. Inner loop runs as a fast list comprehension.
Lesson 3.4 The LazyFrame Wrapper
We now have a working engine: expressions compile into fast closures, plan nodes process data in batches, and generators chain everything together. But the user-facing API is still rough. To build a pipeline today, you'd write something like:
scan = ScanNode(data, ["name", "age", "city"])
filt = FilterNode(scan, col("age") > 28)
proj = ProjectNode(filt, ["name", "age"])
# Collect results manually
results = []
for chunk in proj.execute_batched():
results.extend(chunk)
This is the engine's internal wiring exposed to the user — and it's
terrible. Every operation creates a raw node. The user has to remember
which node takes which arguments. There's no method chaining, no
.collect(), no fluent API. Compare this to what a Polars
user writes:
result = (
LazyFrame(data)
.filter(col("age") > 28)
.select("name", "age")
.collect()
)
Recall from Module 1 — our first LazyFrame sketch
class LazyFrame:
def __init__(self, plan_node: PlanNode) -> None:
self._plan = plan_node
def filter(self, condition: Callable) -> LazyFrame:
new_plan = FilterNode(self._plan, condition)
return LazyFrame(new_plan)
def collect(self) -> list[tuple]:
return list(self._plan.execute())
Module 1's version accepted a raw plan node and used lambdas. Now
we'll accept user data directly, use expressions, handle batched
execution, and add _from_plan for internal construction.
The LazyFrame is the thin wrapper that bridges the gap. It
holds a plan tree internally, and each method call just wraps the
current plan in a new node:
class LazyFrame:
def __init__(self, data: list[dict]) -> None:
# Convert list-of-dicts to internal format
columns, rows = dicts_to_tuples(data)
self._plan = ScanNode(rows, columns)
def filter(self, predicate: Expr) -> LazyFrame:
# Don't execute — just wrap in a new node!
return LazyFrame._from_plan(
FilterNode(self._plan, predicate))
def select(self, *columns: str) -> LazyFrame:
return LazyFrame._from_plan(
ProjectNode(self._plan, list(columns)))
def collect(self) -> LazyFrame:
# NOW we execute — pull all data through the plan
data = []
for chunk in self._plan.execute_batched():
data.extend(chunk)
self._materialized = data
return self
Look at .filter(): it doesn't touch any data. It creates a
FilterNode that wraps the current plan, then returns a
new LazyFrame pointing at this deeper plan tree.
Each method call adds one node on top. The tree grows downward.
.collect() is the only method that triggers execution — it
pulls data up through the entire chain.
_from_plan is an implementation detail of how LazyFrame constructs itself internally. Safe to skip.
The _from_plan classmethod
Notice that .filter() doesn't call
LazyFrame.__init__() — that would try to convert data
from dicts again. Instead, it calls _from_plan, a
classmethod that creates a LazyFrame directly from an
existing plan node:
class LazyFrame:
@classmethod
def _from_plan(cls, plan: PlanNode) -> LazyFrame:
lf = cls.__new__(cls) # create instance, skip __init__
lf._plan = plan
lf._materialized = None
return lf
cls.__new__(cls) creates a bare instance without calling
__init__. Then we set the attributes directly. This is a
common pattern in libraries that have two distinct construction paths:
one for end users (from raw data) and one for internal plumbing (from
an existing plan).
LazyFrame is secretly a plan node constructor.
.filter() creates a FilterNode.
.select() creates a ProjectNode.
.with_column() creates a WithColumnNode.
The user thinks they're "doing" something to the data; in reality,
they're building a tree of instructions that will execute later.
Now let's look at pyfloe's actual LazyFrame:
class LazyFrame:
__slots__ = ("_plan", "_materialized", "_name", "_optimized")
def filter(self, predicate: Expr) -> LazyFrame:
return LazyFrame._from_plan(
FilterNode(self._plan, predicate))
def select(self, *args: str | Expr) -> LazyFrame:
if all(isinstance(a, str) for a in args):
return LazyFrame._from_plan(
ProjectNode(self._plan, list(args)))
exprs = [Col(a) if isinstance(a, str) else a
for a in args]
return LazyFrame._from_plan(
ProjectNode(self._plan, exprs=exprs))
def with_column(self, name: str, expr: Expr) -> LazyFrame:
return LazyFrame._from_plan(
WithColumnNode(self._plan, name, expr))
def collect(self) -> LazyFrame:
if self._materialized is None:
plan = self._exec_plan # optimized!
data = []
for chunk in plan.execute_batched():
data.extend(chunk)
self._materialized = data
return self
Beyond our simplified version, pyfloe adds:
__slots__ — Again, eliminating the
instance __dict__. A LazyFrame stores
just four things: the plan, cached data, a name, and an optimized
plan.
.select() with mixed arguments — If all
arguments are strings, it does column selection (fast path). If any
are expressions, it converts string arguments to Col
objects and does expression evaluation. This lets users mix both:
.select("name", (col("price") * col("qty")).alias("total")).
_exec_plan — When you call
.collect(), pyfloe doesn't just execute the raw plan — it
runs the optimizer first (which we'll build in Module 5). The
_exec_plan property caches the optimized plan so it's
only computed once.
LazyFrame works identically: each method returns
a new LazyFrame with a deeper plan tree.
.collect() triggers the optimizer and then the executor.
The only difference is implementation language: Polars' plan nodes are
Rust structs, and .collect() triggers a Rust-native
execution engine. But the Python-level pattern — the thin wrapper, the
plan tree, the deferred execution — is the same.
Lesson 3.5 Schema Propagation Without Data
Here's a question that might seem impossible to answer: if a
LazyFrame hasn't executed yet — no data has flowed, no
rows have been processed — how can it know what its output columns and
types are?
pipeline = (
LazyFrame(orders)
.filter(col("amount") > 100)
.with_column("tax", col("amount") * 0.2)
.select("order_id", "amount", "tax")
)
pipeline.schema # ← How does this work? No data has flowed!
And yet, Polars does exactly this. You can call .schema
on a lazy pipeline and get instant answers — column names, types,
nullability — without triggering execution. It's one of the most
useful features for interactive development: catch column typos and
type mismatches before waiting for a query to run.
The trick: every plan node knows how to compute its output
schema from its input schema. The schema propagates
bottom-up: leaf nodes like ScanNode know
their columns from the data source, and every node above derives its
schema from its children. The entire propagation happens instantly,
by walking the tree's structure alone — no data flows at all.
Building ColumnSchema
We need a way to describe a single column — its name, type, and
whether it can contain None. pyfloe uses a frozen
dataclass for this:
from dataclasses import dataclass
@dataclass(frozen=True)
class ColumnSchema:
name: str
dtype: type = str
nullable: bool = True
def with_name(self, name: str) -> ColumnSchema:
return ColumnSchema(name, self.dtype, self.nullable)
def with_dtype(self, dtype: type) -> ColumnSchema:
return ColumnSchema(self.name, dtype, self.nullable)
frozen=True means the dataclass is
immutable — once created, its fields can't be changed. This is
important because schemas are shared across the plan tree; you don't
want one node's schema modification to accidentally affect another
node. The with_name() and with_dtype()
methods follow the immutable pattern: they return new objects instead
of modifying the existing one.
Building LazySchema
A LazySchema is a collection of ColumnSchema
objects — one per column, in order. It's the schema for an entire plan
node's output:
class LazySchema:
__slots__ = ('_columns',)
def __init__(self, columns: dict[str, ColumnSchema] | None = None) -> None:
self._columns = columns or {}
@property
def column_names(self) -> list[str]:
return list(self._columns.keys())
@property
def dtypes(self) -> dict[str, type]:
return {n: c.dtype for n, c in self._columns.items()}
LazySchema uses __slots__ too — it stores
just one thing, the ordered dictionary of columns. Now we need methods
that let plan nodes derive new schemas from existing ones:
class LazySchema:
# ...continued
def select(self, columns: list[str]) -> LazySchema:
"""Keep only the named columns."""
return LazySchema({n: self._columns[n]
for n in columns})
def with_column(self, name: str, dtype: type, nullable: bool = True) -> LazySchema:
"""Return schema with one column added or replaced."""
cols = dict(self._columns)
cols[name] = ColumnSchema(name, dtype, nullable)
return LazySchema(cols)
def merge(self, other: LazySchema, suffix: str = 'right_') -> LazySchema:
"""Merge two schemas (for joins)."""
cols = dict(self._columns)
for name, col in other._columns.items():
if name in cols:
new_name = suffix + name
cols[new_name] = col.with_name(new_name)
else:
cols[name] = col
return LazySchema(cols)
Every method returns a new LazySchema — the
original is never modified. This immutability is crucial: a
FilterNode and the ProjectNode above it can
both reference the same child node's schema without stepping on each
other.
Wiring schema into plan nodes
Now the payoff. Each plan node implements a .schema()
method that computes its output schema by transforming its child's
schema. The rules are intuitive:
class FilterNode(PlanNode):
def schema(self) -> LazySchema:
return self.child.schema() # same columns, fewer rows
class ProjectNode(PlanNode):
def schema(self) -> LazySchema:
if self._columns:
return self.child.schema().select(self._columns)
# For expressions: infer type of each
parent = self.child.schema()
cols = {}
for expr in self._exprs:
name = expr.output_name()
dtype = expr.output_dtype(parent)
cols[name] = ColumnSchema(name, dtype)
return LazySchema(cols)
class WithColumnNode(PlanNode):
def schema(self) -> LazySchema:
parent = self.child.schema()
dtype = self._expr.output_dtype(parent)
return parent.with_column(self._name, dtype)
Look at what's happening: no data is involved.
FilterNode.schema() just passes through its child's
schema (filtering doesn't change columns). ProjectNode.schema()
calls .select() on the child's schema to keep only the
requested columns. WithColumnNode.schema() calls
.with_column() to append a new column with the type that
Expr.output_dtype() infers.
And here's where the expression AST from Module 2 closes the loop.
Remember how every Expr subclass implements
output_dtype()?
Recall from Module 2 — output_dtype() on each Expr node
class Col(Expr):
def output_dtype(self, schema: LazySchema) -> type:
return schema[self.name].dtype # look up from schema
class Lit(Expr):
def output_dtype(self, schema: LazySchema) -> type:
return type(self.value) # type of the constant
class BinaryExpr(Expr):
def output_dtype(self, schema: LazySchema) -> type:
if self.op_str in ('>', '<', '==', '!=', '&', '|'):
return bool # comparisons → bool
return self.left.output_dtype(schema) # arithmetic → left type
Each expression node knows its output type without seeing any data.
Col looks it up in the schema. Lit uses
type(). BinaryExpr returns bool
for comparisons and inherits from the left operand for arithmetic.
# Col looks up the type from the schema
col("amount").output_dtype(schema) # → float (from schema)
# BinaryExpr with comparison → always bool
(col("amount") > 100).output_dtype(schema) # → bool
# BinaryExpr with arithmetic → inherits from left operand
(col("amount") * 0.2).output_dtype(schema) # → float
The schema walks the plan tree from top to bottom to find the leaf's
schema (which is known from the data), then propagates it back up
through each node's .schema() method. It's recursive, but
the recursion depth equals the number of plan nodes — usually less
than 20 — so it's effectively instant.
.schema property works.
When you write pipeline.schema in Polars, no data
flows. Polars walks the plan tree, computing each node's output
schema from the node below it. That's how it can tell you
instantly that your join will produce a right_id column,
or that col("price") * 0.2 will be a float — before
scanning a single row.
Inferring schema from data
The leaf node — typically ScanNode — needs an initial
schema. If the user doesn't provide one, pyfloe infers it by sampling
the data:
@classmethod
def from_data(cls, columns: list[str], rows: list[tuple]) -> LazySchema:
if not rows:
return cls({n: ColumnSchema(n) for n in columns})
sample = rows[:min(1000, len(rows))]
cols = {}
for i, name in enumerate(columns):
dtype = str
nullable = False
for row in sample:
val = row[i]
if val is None:
nullable = True
elif dtype is str:
dtype = type(val)
cols[name] = ColumnSchema(name, dtype, nullable)
return cls(cols)
It examines up to 1,000 rows for in-memory data (100 for file readers),
checking each column for its Python type and whether None appears. The default type is
str (the safest assumption), which gets overridden the
moment a non-None value is encountered. pyfloe also has a separate
from_dicts() classmethod for when data arrives as
dictionaries — but the logic is essentially the same.
LazySchema, the optimizer
would have no way to know which columns are safe to drop.
Exercises Test Your Understanding
Quiz: The Engine Room
1. Why does pyfloe's FilterNode call predicate.compile(col_map) instead of using predicate.eval(row, col_map) inside the loop?
2. What happens when you call LazyFrame.filter(col("age") > 28)?
3. Why does pyfloe process rows in batches of 1,024 instead of one at a time?
4. How does FilterNode.schema() compute its output schema?
5. What does PlanNode.execute() return, and how does it relate to execute_batched()?
Hands-On Challenge: Build a LimitNode
Implement a LimitNode that stops pulling after N rows —
the equivalent of SQL's LIMIT or Polars'
.head(). Think about:
1. It should work with execute_batched().
What happens when the limit falls in the middle of a batch?
2. What should .schema() return? (Hint:
limiting rows doesn't change columns.)
3. The key insight: once you've yielded enough rows,
you can return from the generator — which tells the
entire upstream chain to stop pulling. This is the early termination
property from Module 1, applied to batched execution.
ScanNode → FilterNode → LimitNode(5),
and the filter's selectivity is 10%, roughly how many rows does ScanNode
yield before LimitNode stops pulling? (Answer: ~50, because on average you
need to scan 50 rows to find 5 that pass a 10% filter.)