The Query
Optimizer
Everything you've built — expressions, plan nodes, hash joins, aggregations — has been heading here. The optimizer is the moment your engine learns to think. It looks at the plan tree you gave it, spots the waste, and quietly rewrites the tree into a better one — before a single row of data flows.
In Module 2, you built expression trees — inspectable objects that know which columns they need. In Modules 3 and 4, you built plan trees — chains of nodes that pull data through the volcano model. In this module, those two trees converge. The optimizer walks the plan tree and inspects the expressions inside each node to decide what can be moved, what can be removed, and what can be made cheaper.
This is the approach that powers Polars's query planner, Spark's Catalyst optimizer, and every serious database engine. And by the end of this module, you will have built your own — in about 80 lines of Python.
Module 2
Modules 3 & 4
Module 5
Lesson 5.1 What Is a Query Optimizer?
Let's start with the problem. Here's a realistic pipeline — joining orders with customers, filtering for a region, and selecting three columns:
result = (
orders
.join(customers, on="customer_id")
.filter(col("region") == "EU")
.select("order_id", "name", "amount")
.collect()
)
This works correctly. But think about what the engine actually does. It joins every order with every customer — all rows, all columns. Then it filters out non-EU rows. Then it drops all columns except three. The join was the most expensive operation, and we forced it to process rows and columns that we knew we didn't need.
Here's what the unoptimized plan looks like:
A query optimizer is a program that takes this plan tree as input and produces a better plan tree as output — one that computes the same result, but does less work to get there. It can't change what you asked for. It can only change how the engine computes it.
Two families of optimizers
There are two fundamentally different approaches to query optimization:
Rule-Based
Applies fixed rewrite rules: "always push filters down," "always prune unused columns." Simple, predictable, and fast. Used by pyfloe, early versions of Spark, and most teaching engines.
Cost-Based
Estimates the cost of different plans using statistics (row counts, cardinality). Picks the cheapest. Used by PostgreSQL, modern Spark Catalyst, and Polars for join ordering.
We're building a rule-based optimizer — pyfloe's approach. It applies two rewrite passes to the plan tree:
Pass 1: Filter pushdown — moves FilterNodes
closer to the data source, so fewer rows flow through expensive operations
like joins.
Pass 2: Column pruning — removes columns that no downstream node needs, so each row carries only the data that matters.
Here's what the optimized plan looks like:
Same result. Dramatically less work. The filter now runs before the join, reducing the left side from 100K rows to however many are in the EU. The scans now carry only the columns needed downstream. And the user's code didn't change at all — the optimizer rewrote the plan silently, behind the scenes.
Lesson 5.2 Walking the Tree: Recursive Plan Traversal
Before we can optimize a plan tree, we need to be able to walk it. Every optimization pass follows the same pattern: start at the root, examine each node, decide whether to transform it, then recurse into its children. This is recursive tree traversal — the same technique used by compilers, linters, and every AST-processing tool you've ever used.
Let's start with the structure of our optimizer. At its core, it's remarkably simple:
class Optimizer:
def optimize(self, plan: PlanNode) -> PlanNode:
"""Apply all optimization passes to a plan tree."""
plan = self._push_filters(plan)
needed = set(plan.schema().column_names)
plan = self._prune_columns(plan, needed)
return plan
Three lines of real logic. Two passes, applied in sequence. The order matters: filter pushdown first (to reduce row counts), then column pruning (to reduce row widths). Each pass receives a plan tree and returns a new, rewritten plan tree. The original tree is never mutated.
optimize() returns a new plan tree —
it doesn't modify the existing one. This is the same immutable pattern
you saw in Module 3, where .filter() returns a new
LazyFrame wrapping a new FilterNode. Immutable
trees are easier to reason about and safer to compose.
Pattern matching with isinstance
The core technique for walking the tree is isinstance
dispatch — checking what type of node we're looking at and
handling each type differently. This is Python's version of pattern matching
on an algebraic data type. Here's what the filter pushdown walk looks like
at a high level:
def _push_filters(self, node: PlanNode) -> PlanNode:
if isinstance(node, FilterNode):
# This is where the magic happens —
# try to push this filter closer to the data
child = self._push_filters(node.child)
return self._try_push_filter(node.predicate, child)
if isinstance(node, ProjectNode):
# Not a filter — just recurse into the child
return ProjectNode(
self._push_filters(node.child),
node._columns, node._exprs
)
if isinstance(node, JoinNode):
# Recurse into both branches
return JoinNode(
self._push_filters(node.left),
self._push_filters(node.right),
node.left_on, node.right_on, node.how,
)
# Leaf nodes (ScanNode, etc.) — nothing to recurse into
return node
Read this carefully. The pattern is always the same: check the node type,
recurse into children, and rebuild the node with the (potentially
transformed) children. For most node types, the logic is "just recurse
and rebuild" — only FilterNode gets special treatment,
because that's the node type we're trying to move.
There are two critical things happening here that are easy to miss:
First: the walk is top-down. We start at the root and
work toward the leaves. When we find a FilterNode, we
first recurse into its child (handling any filters below it),
then try to push this filter further down. This means filters
naturally cascade toward the data source.
Second: we rebuild the tree as we go. Every
isinstance branch constructs a new node with the
recursively-transformed children. This is what makes the rewrite immutable
— we're constructing a new tree, not mutating the old one.
match expressions.
The pattern is universal because the problem is: you have a tree,
you want to transform it, and each node type needs different handling.
How LazyFrame triggers the optimizer
Where does the optimizer actually get called? In pyfloe, every
LazyFrame has a _exec_plan property that lazily
optimizes the plan the first time it's needed:
class LazyFrame:
__slots__ = ("_plan", "_materialized", "_name", "_optimized")
@property
def _exec_plan(self) -> PlanNode:
if self._optimized is None:
self._optimized = Optimizer().optimize(self._plan)
return self._optimized
def collect(self, optimize: bool = True) -> LazyFrame:
if self._materialized is None:
plan = self._exec_plan if optimize else self._plan
data = []
for chunk in plan.execute_batched():
data.extend(chunk)
self._materialized = data
return self
When .collect() is called, it accesses
self._exec_plan, which triggers optimization exactly once.
The optimized plan is cached in _optimized, so subsequent
calls to .collect(), .head(), or
.count() reuse the same plan. The user can also pass
optimize=False to bypass the optimizer entirely — useful
for debugging or benchmarking.
And of course, the user can compare plans directly:
print(pipeline.explain()) # unoptimized plan
print(pipeline.explain(optimized=True)) # optimized plan
This is the same .explain() you'd find in Polars
(lf.explain(optimized=True)) or Spark
(df.explain(True)). Every lazy engine gives you a way to
inspect the plan before and after optimization. Now let's build
what happens between those two calls.
import pyfloe as pf
orders = pf.LazyFrame([
{"order_id": 1, "customer_id": 101, "amount": 250.0},
{"order_id": 2, "customer_id": 102, "amount": 45.0},
{"order_id": 3, "customer_id": 103, "amount": 180.0},
{"order_id": 4, "customer_id": 101, "amount": 320.0},
])
customers = pf.LazyFrame([
{"customer_id": 101, "name": "Alice", "region": "EU"},
{"customer_id": 102, "name": "Bob", "region": "US"},
{"customer_id": 103, "name": "Carol", "region": "EU"},
])
# Build the pipeline: join → filter → select
pipeline = (
orders
.join(customers, on="customer_id")
.filter(pf.col("amount") > 100)
.select("name", "amount")
)
# BEFORE: filter sits above the join
print("=== Unoptimized ===")
print(pipeline.explain())
# AFTER: the optimizer pushes the filter DOWN past the join
# (because amount only depends on orders, not customers)
# and prunes unused columns from the scan
print("=== Optimized ===")
print(pipeline.explain(optimized=True))
# You can also call the Optimizer directly on plan nodes
from pyfloe.plan import Optimizer
opt = Optimizer()
optimized_plan = opt.optimize(pipeline._plan)
print("=== Optimized plan node ===")
print(optimized_plan.explain())
# Execute
pipeline.collect()
print("\n=== Result ===")
for row in pipeline.to_pylist():
print(row)
.filter(pf.col("region") == "EU")? Where
does the optimizer push each filter? Try changing which columns you
.select() and compare the optimized plans. Can you
predict what columns the optimizer will prune before running
.explain(optimized=True)?
Lesson 5.3 Filter Pushdown
Filter pushdown is the single most impactful optimization in a query engine. The idea is deceptively simple: if a filter depends only on columns from one side of a join, move it to that side before the join happens. Fewer input rows means less work for the join, less memory for the hash table, and less data flowing through every node above.
But you can't just blindly shove filters downward. Each type of child node has different rules about what's safe. Let's build the logic piece by piece.
The key enabler: required_columns()
Remember required_columns() from Module 2? Every expression
node knows which columns it needs:
# col("region") == "EU"
# └── required_columns() → {"region"}
# (col("amount") > 100) & (col("region") == "EU")
# └── required_columns() → {"amount", "region"}
This is the bridge between the two trees. The expression tree tells the optimizer which columns the filter needs. The plan tree tells the optimizer which columns each node provides. If the filter's required columns are a subset of a child node's input columns, the filter can be pushed past that node.
Here's how required_columns() propagates through the
expression AST — a quick refresher from Module 2:
class Col:
def required_columns(self):
return {self.name} # just this column
class Lit:
def required_columns(self):
return set() # constants need nothing
class BinaryExpr:
def required_columns(self):
return self.left.required_columns() | self.right.required_columns()
The | operator is set union — the BinaryExpr
needs whatever its left side needs plus whatever its right side needs.
This propagation is what makes filter pushdown possible. Without it,
the optimizer would have no way to know which columns a filter touches.
The pushdown logic: _try_push_filter
Now for the core algorithm. When the recursive walk finds a
FilterNode, it calls _try_push_filter with
the filter's predicate and the (already-recursed) child node. This method
asks: "Can I push this filter past this child? If so, do it. If not,
leave it in place."
Let's build the rules one child type at a time.
Rule 1: Pushing past ProjectNode
A ProjectNode selects or reorders columns. If the filter
only needs columns that exist below the project (in its child's
schema), we can safely push the filter underneath:
if isinstance(child, ProjectNode) and child._columns:
child_has = set(child.child.schema().column_names)
if needed <= child_has:
pushed = FilterNode(child.child, predicate)
return ProjectNode(pushed, child._columns, child._exprs)
The key line is needed <= child_has. This is Python's
set subset operator — it checks whether every column the filter
needs exists in the project's input. If yes, we create a new
FilterNode underneath the ProjectNode, and
wrap it back up in a new ProjectNode. If not, the filter
stays where it is.
Before
Filter [region == 'EU']
Project [order_id, region, amount]
Scan [...]
After
Project [order_id, region, amount]
Filter [region == 'EU']
Scan [...]
Rule 2: Pushing into JoinNode
This is the high-value rule — the one that makes the biggest difference
in real pipelines. A JoinNode has two children (left and
right), and each has its own set of columns. The optimizer checks: does
the filter's required columns come entirely from the left side? Or
entirely from the right?
if isinstance(child, JoinNode):
left_cols = set(child.left.schema().column_names)
right_cols = set(child.right.schema().column_names)
if needed <= left_cols:
return JoinNode(
FilterNode(child.left, predicate), # push into left
child.right,
child.left_on, child.right_on, child.how,
)
if needed <= right_cols:
return JoinNode(
child.left,
FilterNode(child.right, predicate), # push into right
child.left_on, child.right_on, child.how,
)
If the filter needs {"region"} and region
comes from the left table, we wrap the left child in a new
FilterNode and rebuild the JoinNode with
the filtered left branch. The right branch is untouched.
If the filter needs columns from both sides (e.g.,
col("left_price") > col("right_price")), neither subset
check passes, and the filter stays above the join. This is correct — you
can't evaluate a cross-table predicate on a single side.
Before
Filter [region == 'EU']
Filter [segment == 'Ent']
Join [inner]
Scan(orders)
Scan(customers)
After
Join [inner]
Filter [region == 'EU']
Scan(orders)
Filter [segment == 'Ent']
Scan(customers)
Look what happened — two filters that were stacked above the join got
pushed into different branches. region belongs
to the orders table (left), segment belongs to the
customers table (right). The optimizer figured this out using
required_columns() and the schema of each branch.
Rule 3: Aggregation and sort boundaries
Not every node is safe to push past. Consider AggNode: if
you push a filter below an aggregation, you change which rows get
aggregated, which changes the result. A filter on a group-by key is
safe though — filtering by region == 'EU' before grouping by
region doesn't change the groups, it just removes one:
if isinstance(child, (AggNode, SortedAggNode)):
group_keys = set(child.group_by)
if needed <= group_keys:
pushed = FilterNode(child.child, predicate)
return type(child)(pushed, child.group_by, child.agg_exprs)
If the filter only references group-by keys (like filtering on
region after group_by("region")), it's safe
to push because the group boundaries don't change — you're just
removing entire groups. But a filter on an aggregated result like
total > 1000 can't be pushed down because total
doesn't exist in the child's output yet — the subset check fails and
the filter stays above. Notice the elegant use
of type(child)(...) — this works for both
AggNode and SortedAggNode since they share
the same constructor signature.
The fallback
If none of the rules match — the child is a SortNode, a
ScanNode, or any node type the optimizer doesn't have a
special rule for — the filter stays where it is:
# If no rule matched, keep the filter above the child
return FilterNode(child, predicate)
This is the conservative default. An optimizer should never change the semantics of a query — if it's not sure a rewrite is safe, it does nothing. The query still produces correct results; it just doesn't get this particular optimization.
The complete pushdown rules
| Child Node | Rule | Safety Check |
|---|---|---|
ProjectNode |
Push filter below the projection | Filter columns exist in project's input |
JoinNode |
Push into the branch that owns the columns | All filter columns come from one side |
AggNode |
Push below aggregation | Filter only references group-by keys |
PivotNode |
Push below pivot | Filter only references index columns |
UnpivotNode |
Push below unpivot | Filter only references id columns |
| Everything else | Leave in place | — |
lf.explain(optimized=True) shows this in action.
Polars calls it "predicate pushdown" — the concept is identical.
Spark's Catalyst has the same pass, extended to handle UNION
and subqueries.
Now let's look at pyfloe's complete implementation. It handles a few more node types than our simplified version, but the structure is identical:
def _push_filters(self, node: PlanNode) -> PlanNode:
if isinstance(node, FilterNode):
child = self._push_filters(node.child)
return self._try_push_filter(node.predicate, child)
if isinstance(node, ProjectNode):
return ProjectNode(self._push_filters(node.child), node._columns, node._exprs)
if isinstance(node, JoinNode):
return JoinNode(
self._push_filters(node.left),
self._push_filters(node.right),
node.left_on, node.right_on, node.how,
)
if isinstance(node, SortNode):
return SortNode(self._push_filters(node.child), node.by, node.ascending)
if isinstance(node, WithColumnNode):
return WithColumnNode(self._push_filters(node.child), node._name, node._expr)
if isinstance(node, (AggNode, SortedAggNode)):
return type(node)(self._push_filters(node.child), node.group_by, node.agg_exprs)
# PivotNode, UnpivotNode — same recurse-and-rebuild pattern
return node
And here's the full _try_push_filter:
def _try_push_filter(self, predicate: Expr, child: PlanNode) -> PlanNode:
needed = predicate.required_columns()
if isinstance(child, ProjectNode) and child._columns:
child_has = set(child.child.schema().column_names)
if needed <= child_has:
return ProjectNode(FilterNode(child.child, predicate), child._columns, child._exprs)
if isinstance(child, (AggNode, SortedAggNode)):
if needed <= set(child.group_by):
return type(child)(FilterNode(child.child, predicate), child.group_by, child.agg_exprs)
if isinstance(child, JoinNode):
left_cols = set(child.left.schema().column_names)
right_cols = set(child.right.schema().column_names)
if needed <= left_cols:
return JoinNode(
FilterNode(child.left, predicate), child.right,
child.left_on, child.right_on, child.how)
if needed <= right_cols:
return JoinNode(
child.left, FilterNode(child.right, predicate),
child.left_on, child.right_on, child.how)
# PivotNode, UnpivotNode — same subset-check pattern
return FilterNode(child, predicate) # fallback
That's the entire filter pushdown implementation. Two methods, about
50 lines of code. The complexity isn't in the code — it's in understanding
which rewrites are safe and why. Every rule boils down
to one question: "Does the filter's required_columns()
fit within the columns available at the target location?"
<= appears in this code. Every
pushdown decision is a set-subset check. Python's set
class gives us <= (subset), | (union),
and & (intersection) — and those three operations are
the entire mathematical vocabulary of filter pushdown. The expression
AST provides required_columns() as a set. The schema
provides column_names as a set. The optimizer is just
comparing sets.
Lesson 5.4 Column Pruning
Filter pushdown reduces the number of rows. Column pruning
reduces the width of each row — how many values it carries. If your
pipeline ends with .select("order_id", "name", "amount"),
why should the engine carry date, segment,
internal_notes, and 15 other columns through every node?
Column pruning walks the tree top-down, tracking which columns are
actually needed at each level. When it reaches a
ScanNode, it inserts a ProjectNode to
discard unused columns right at the source — before any data flows.
The approach: propagate "needed" sets downward
The algorithm starts at the root and asks: "What columns does the output need?" Then it walks down the tree, and at each node, it computes: "Given what my parent needs, what do I need from my child?" Each node type has its own rule for this computation.
def optimize(self, plan: PlanNode) -> PlanNode:
plan = self._push_filters(plan) # pass 1
needed = set(plan.schema().column_names) # root's output columns
plan = self._prune_columns(plan, needed) # pass 2
return plan
The initial needed set is the root's output columns — whatever
the final ProjectNode or ScanNode at the top
produces. This set flows downward through the tree, expanding at each node
to include whatever extra columns that node requires.
Pruning at ScanNode — the payoff
The simplest and most impactful rule: when we reach a
ScanNode, compare what it provides to what's actually needed.
If there are unused columns, insert a ProjectNode to strip
them:
if isinstance(node, ScanNode):
available = set(node._columns)
prune_to = [c for c in node._columns if c in needed]
if len(prune_to) < len(available) and prune_to:
return ProjectNode(node, prune_to)
return node
Two details are worth noting. First, the list comprehension preserves the
original column order — [c for c in node._columns if c in needed]
iterates over the scan's columns, not the needed set, so the output order
is deterministic. Second, the guard prune_to (truthy if
non-empty) prevents inserting an empty projection that would discard all
columns.
Expanding the needed set at each node
As the algorithm walks down the tree, each node type may add
columns to the needed set. A FilterNode needs whatever
its parent needs plus the columns its predicate references:
if isinstance(node, FilterNode):
filter_needs = node.predicate.required_columns()
return FilterNode(
self._prune_columns(node.child, needed | filter_needs),
node.predicate,
)
The | is set union — the child of a FilterNode
must provide both the columns the parent needs and the columns
the filter evaluates. This is required_columns() doing its
job again — the same method that powered filter pushdown now drives
column pruning.
Splitting needs at JoinNode
Joins are the most interesting case. A join's output is the combination of left and right columns. The pruner needs to figure out which needed columns come from the left side and which from the right — then tell each child to keep only its portion, plus the join keys:
if isinstance(node, JoinNode):
left_cols = set(node.left.schema().column_names)
right_cols = set(node.right.schema().column_names)
left_needed = (needed & left_cols) | set(node.left_on)
right_needed = (needed & right_cols) | set(node.right_on)
return JoinNode(
self._prune_columns(node.left, left_needed),
self._prune_columns(node.right, right_needed),
node.left_on, node.right_on, node.how,
)
Three set operations in two lines tell the whole story.
needed & left_cols is set intersection — which of the
downstream-needed columns come from the left side? We union that with the
join keys (node.left_on) because the join always
needs its keys, even if no downstream node references them. Same logic
for the right side.
The WithColumnNode case
WithColumnNode computes a new column from an expression.
Its child doesn't need the output column (it doesn't exist yet),
but it does need whatever columns the expression references:
if isinstance(node, WithColumnNode):
expr_needs = node._expr.required_columns()
child_needed = (needed - {node._name}) | expr_needs
return WithColumnNode(
self._prune_columns(node.child, child_needed),
node._name, node._expr,
)
needed - {node._name} removes the computed column name (it
doesn't exist in the child's output). | expr_needs adds the
columns the expression reads. So if WithColumnNode computes
tax = amount * 0.2, the child needs amount but
not tax.
Aggregation — a natural boundary
AggNode is a natural pruning boundary. Its output is the
group-by keys plus the aggregation results. Its input needs
the group-by keys plus whatever columns the aggregate expressions read:
if isinstance(node, AggNode):
child_needed = set(node.group_by)
for agg in node.agg_exprs:
child_needed |= agg.required_columns()
return AggNode(
self._prune_columns(node.child, child_needed),
node.group_by, node.agg_exprs,
)
Notice that child_needed here is computed entirely from the
AggNode's own parameters — it ignores the needed
set from above. That's because aggregation completely reshapes the data:
the child's columns bear no direct relation to the parent's columns. The
aggregation itself determines what it needs.
Lesson 5.5 Seeing the Difference
Let's bring it all together. Here's a realistic pipeline — the kind a data engineer writes every day:
orders = LazyFrame([
{"order_id": 1, "customer_id": 101, "amount": 250,
"date": "2024-01-15", "region": "EU", "notes": "..."},
# ... thousands more rows, 6 columns each
])
customers = LazyFrame([
{"customer_id": 101, "name": "Alice",
"segment": "Enterprise", "tier": "Gold"},
# ... hundreds of customers, 4 columns each
])
pipeline = (
orders
.join(customers, on="customer_id")
.filter(col("region") == "EU")
.filter(col("segment") == "Enterprise")
.select("order_id", "name", "amount")
)
Now let's see what .explain() reveals — before and after:
pipeline.explain()
Project [order_id, name, amount]
Filter [segment == 'Enterprise']
Filter [region == 'EU']
Join [inner] on customer_id
Scan [6 cols] (N rows)
Scan [4 cols] (M rows)
pipeline.explain(optimized=True)
Project [order_id, name, amount]
Join [inner] on customer_id
Filter [region == 'EU']
Project [order_id, customer_id,
amount]
Scan (N rows)
Filter [segment == 'Enterprise']
Project [customer_id, name]
Scan (M rows)
Count the changes:
Filter pushdown: region == 'EU' was pushed
into the left branch (orders), because region is an orders
column. segment == 'Enterprise' was pushed into the right
branch (customers), because segment is a customers column.
The join now operates on pre-filtered data from both sides.
Column pruning: The left scan originally carried 6
columns; now it carries 3 (order_id, customer_id,
amount). The right scan originally carried 4 columns; now it
carries 2 (customer_id, name). Everything the
pipeline doesn't need — date, notes,
tier — is stripped at the source.
Why this matters in practice
The impact is multiplicative. Suppose the orders table has 100,000 rows and the EU filter reduces that to 30,000. The customers table has 5,000 rows and the Enterprise filter reduces that to 800.
Without optimizer
The join builds a hash table of 5,000 customers (4 cols each), then probes it with 100,000 orders (6 cols each). After the join, the two filters run on 100,000+ joined rows carrying 10 columns. The project at the top finally strips columns.
With optimizer
The left filter reduces orders to 30,000 rows (3 cols). The right filter reduces customers to 800 rows (2 cols). The join builds a hash table of 800 rows — not 5,000. It probes with 30,000 rows — not 100,000. Less memory, less CPU, same result.
For large pipelines, these optimizations can be the difference between a query taking seconds and taking minutes. And the user's code didn't change at all — they wrote the pipeline in whatever order felt natural, and the optimizer silently rewrote it to be efficient.
.collect(). Polars runs predicate pushdown, projection
pushdown (column pruning), and several other optimization passes
before executing anything. You can see it with
lf.explain(optimized=True). Spark does the same in its
Catalyst optimizer — and now you understand the fundamental mechanism
behind both. You built it.
The full optimizer, in perspective
Let's take stock of what the Optimizer class actually is.
It's about 80 lines of code — three methods. optimize()
coordinates the two passes. _push_filters() walks the tree
top-down, and for every FilterNode it finds, delegates to
_try_push_filter() which uses subset checks on
required_columns() to decide where the filter can go.
_prune_columns() walks the tree top-down with a
needed set, expanding it at each node, and inserts
ProjectNodes at ScanNodes to strip unused
columns.
Two recursive walks. A handful of isinstance checks.
Set operations — <=, |, &,
-. And required_columns(), the method you built
in Module 2, is the thread that ties everything together. That's all
it takes.
PivotNode, UnpivotNode,
WindowNode. The pattern is identical in every case —
check whether the filter's columns fit, or compute the needed set for
the child. pyfloe also handles right_-prefixed columns
in join pruning (for disambiguating columns that appear on both sides
of a join). These are refinements, not new concepts.
Quick check
1. Why can't an eager engine support query optimization?
2. A filter uses col("amount") > 100. The child is a JoinNode where amount exists in the left schema. What happens?
3. What does required_columns() return for col("price") * lit(0.2)?
4. In column pruning, what does the child of WithColumnNode(tax = amount * 0.2) need?
5. Why does pyfloe run filter pushdown before column pruning?
Source References
plan.py — Plan Nodes & Optimizer
core.py — LazyFrame API
expr.py — Expression AST
But this is a pure Python engine. The real world wants more. What if you could process batches across multiple CPU cores? What if you could stream results to disk without ever holding the full dataset in memory? What if you rewrote the hot loops in Rust or C? Those are the questions that separate a teaching engine from a production one — and they're exactly where you go next.