Build Your Own DataFrame
Module 5 of 5

The Query
Optimizer

Everything you've built — expressions, plan nodes, hash joins, aggregations — has been heading here. The optimizer is the moment your engine learns to think. It looks at the plan tree you gave it, spots the waste, and quietly rewrites the tree into a better one — before a single row of data flows.

In Module 2, you built expression trees — inspectable objects that know which columns they need. In Modules 3 and 4, you built plan trees — chains of nodes that pull data through the volcano model. In this module, those two trees converge. The optimizer walks the plan tree and inspects the expressions inside each node to decide what can be moved, what can be removed, and what can be made cheaper.

This is the approach that powers Polars's query planner, Spark's Catalyst optimizer, and every serious database engine. And by the end of this module, you will have built your own — in about 80 lines of Python.

Expression Tree
Module 2
+
Plan Tree
Modules 3 & 4
=
Optimizer
Module 5
The two trees finally converge.

Lesson 5.1 What Is a Query Optimizer?

Let's start with the problem. Here's a realistic pipeline — joining orders with customers, filtering for a region, and selecting three columns:

A typical pipeline
result = (
    orders
    .join(customers, on="customer_id")
    .filter(col("region") == "EU")
    .select("order_id", "name", "amount")
    .collect()
)

This works correctly. But think about what the engine actually does. It joins every order with every customer — all rows, all columns. Then it filters out non-EU rows. Then it drops all columns except three. The join was the most expensive operation, and we forced it to process rows and columns that we knew we didn't need.

The waste
The filter sits above the join, so the join processes all rows. The select sits at the top, so every node below it carries all 20 columns through the entire pipeline. Both problems are invisible to the user — but they're visible in the plan tree.

Here's what the unoptimized plan looks like:

Project [order_id, name, amount] Filter [(col("region") == 'EU')] ← filters AFTER the join Join [inner] on customer_id Scan [order_id, customer_id, amount, date, ...] (100K rows) Scan [customer_id, name, region, segment, ...] (5K rows)

A query optimizer is a program that takes this plan tree as input and produces a better plan tree as output — one that computes the same result, but does less work to get there. It can't change what you asked for. It can only change how the engine computes it.

Two families of optimizers

There are two fundamentally different approaches to query optimization:

Rule-Based

Applies fixed rewrite rules: "always push filters down," "always prune unused columns." Simple, predictable, and fast. Used by pyfloe, early versions of Spark, and most teaching engines.

Cost-Based

Estimates the cost of different plans using statistics (row counts, cardinality). Picks the cheapest. Used by PostgreSQL, modern Spark Catalyst, and Polars for join ordering.

We're building a rule-based optimizer — pyfloe's approach. It applies two rewrite passes to the plan tree:

Pass 1: Filter pushdown — moves FilterNodes closer to the data source, so fewer rows flow through expensive operations like joins.

Pass 2: Column pruning — removes columns that no downstream node needs, so each row carries only the data that matters.

Why laziness makes this possible
An optimizer can only rewrite a plan that hasn't run yet. If the engine were eager — executing each operation immediately — the data would already be flowing by the time we realized the filter should have gone first. Lazy execution builds the entire plan tree before executing anything, giving the optimizer a window to inspect and rewrite it. This is the fundamental payoff of everything we built in Module 1.

Here's what the optimized plan looks like:

Project [order_id, name, amount] Join [inner] on customer_id Filter [(col("region") == 'EU')] ← pushed into the left branch! Project [order_id, customer_id, amount] ← only 3 cols, not 20 Scan (100K rows) Project [customer_id, name] ← only 2 cols, not 8 Scan (5K rows)

Same result. Dramatically less work. The filter now runs before the join, reducing the left side from 100K rows to however many are in the EU. The scans now carry only the columns needed downstream. And the user's code didn't change at all — the optimizer rewrote the plan silently, behind the scenes.


Lesson 5.2 Walking the Tree: Recursive Plan Traversal

Before we can optimize a plan tree, we need to be able to walk it. Every optimization pass follows the same pattern: start at the root, examine each node, decide whether to transform it, then recurse into its children. This is recursive tree traversal — the same technique used by compilers, linters, and every AST-processing tool you've ever used.

Let's start with the structure of our optimizer. At its core, it's remarkably simple:

The optimizer skeleton
class Optimizer:

    def optimize(self, plan: PlanNode) -> PlanNode:
        """Apply all optimization passes to a plan tree."""
        plan = self._push_filters(plan)
        needed = set(plan.schema().column_names)
        plan = self._prune_columns(plan, needed)
        return plan

Three lines of real logic. Two passes, applied in sequence. The order matters: filter pushdown first (to reduce row counts), then column pruning (to reduce row widths). Each pass receives a plan tree and returns a new, rewritten plan tree. The original tree is never mutated.

Immutable rewriting
Notice that optimize() returns a new plan tree — it doesn't modify the existing one. This is the same immutable pattern you saw in Module 3, where .filter() returns a new LazyFrame wrapping a new FilterNode. Immutable trees are easier to reason about and safer to compose.

Pattern matching with isinstance

The core technique for walking the tree is isinstance dispatch — checking what type of node we're looking at and handling each type differently. This is Python's version of pattern matching on an algebraic data type. Here's what the filter pushdown walk looks like at a high level:

The recursive walk — simplified
def _push_filters(self, node: PlanNode) -> PlanNode:
    if isinstance(node, FilterNode):
        # This is where the magic happens —
        # try to push this filter closer to the data
        child = self._push_filters(node.child)
        return self._try_push_filter(node.predicate, child)

    if isinstance(node, ProjectNode):
        # Not a filter — just recurse into the child
        return ProjectNode(
            self._push_filters(node.child),
            node._columns, node._exprs
        )

    if isinstance(node, JoinNode):
        # Recurse into both branches
        return JoinNode(
            self._push_filters(node.left),
            self._push_filters(node.right),
            node.left_on, node.right_on, node.how,
        )

    # Leaf nodes (ScanNode, etc.) — nothing to recurse into
    return node

Read this carefully. The pattern is always the same: check the node type, recurse into children, and rebuild the node with the (potentially transformed) children. For most node types, the logic is "just recurse and rebuild" — only FilterNode gets special treatment, because that's the node type we're trying to move.

There are two critical things happening here that are easy to miss:

First: the walk is top-down. We start at the root and work toward the leaves. When we find a FilterNode, we first recurse into its child (handling any filters below it), then try to push this filter further down. This means filters naturally cascade toward the data source.

Second: we rebuild the tree as we go. Every isinstance branch constructs a new node with the recursively-transformed children. This is what makes the rewrite immutable — we're constructing a new tree, not mutating the old one.

How Polars does this
Spark's Catalyst optimizer uses this same pattern — each "rule" is a function that pattern-matches on node types and returns a transformed tree. Polars does it in Rust with match expressions. The pattern is universal because the problem is: you have a tree, you want to transform it, and each node type needs different handling.

How LazyFrame triggers the optimizer

Where does the optimizer actually get called? In pyfloe, every LazyFrame has a _exec_plan property that lazily optimizes the plan the first time it's needed:

pyfloe's LazyFrame — triggering optimization
class LazyFrame:
    __slots__ = ("_plan", "_materialized", "_name", "_optimized")

    @property
    def _exec_plan(self) -> PlanNode:
        if self._optimized is None:
            self._optimized = Optimizer().optimize(self._plan)
        return self._optimized

    def collect(self, optimize: bool = True) -> LazyFrame:
        if self._materialized is None:
            plan = self._exec_plan if optimize else self._plan
            data = []
            for chunk in plan.execute_batched():
                data.extend(chunk)
            self._materialized = data
        return self

When .collect() is called, it accesses self._exec_plan, which triggers optimization exactly once. The optimized plan is cached in _optimized, so subsequent calls to .collect(), .head(), or .count() reuse the same plan. The user can also pass optimize=False to bypass the optimizer entirely — useful for debugging or benchmarking.

And of course, the user can compare plans directly:

Comparing plans
print(pipeline.explain())                  # unoptimized plan
print(pipeline.explain(optimized=True))    # optimized plan

This is the same .explain() you'd find in Polars (lf.explain(optimized=True)) or Spark (df.explain(True)). Every lazy engine gives you a way to inspect the plan before and after optimization. Now let's build what happens between those two calls.

import pyfloe as pf

orders = pf.LazyFrame([
    {"order_id": 1, "customer_id": 101, "amount": 250.0},
    {"order_id": 2, "customer_id": 102, "amount": 45.0},
    {"order_id": 3, "customer_id": 103, "amount": 180.0},
    {"order_id": 4, "customer_id": 101, "amount": 320.0},
])

customers = pf.LazyFrame([
    {"customer_id": 101, "name": "Alice", "region": "EU"},
    {"customer_id": 102, "name": "Bob",   "region": "US"},
    {"customer_id": 103, "name": "Carol", "region": "EU"},
])

# Build the pipeline: join → filter → select
pipeline = (
    orders
    .join(customers, on="customer_id")
    .filter(pf.col("amount") > 100)
    .select("name", "amount")
)

# BEFORE: filter sits above the join
print("=== Unoptimized ===")
print(pipeline.explain())

# AFTER: the optimizer pushes the filter DOWN past the join
# (because amount only depends on orders, not customers)
# and prunes unused columns from the scan
print("=== Optimized ===")
print(pipeline.explain(optimized=True))

# You can also call the Optimizer directly on plan nodes
from pyfloe.plan import Optimizer
opt = Optimizer()
optimized_plan = opt.optimize(pipeline._plan)
print("=== Optimized plan node ===")
print(optimized_plan.explain())

# Execute
pipeline.collect()
print("\n=== Result ===")
for row in pipeline.to_pylist():
    print(row)
Try it yourself
Modify the pipeline above. What happens if you add a second filter — say, .filter(pf.col("region") == "EU")? Where does the optimizer push each filter? Try changing which columns you .select() and compare the optimized plans. Can you predict what columns the optimizer will prune before running .explain(optimized=True)?

Lesson 5.3 Filter Pushdown

Filter pushdown is the single most impactful optimization in a query engine. The idea is deceptively simple: if a filter depends only on columns from one side of a join, move it to that side before the join happens. Fewer input rows means less work for the join, less memory for the hash table, and less data flowing through every node above.

But you can't just blindly shove filters downward. Each type of child node has different rules about what's safe. Let's build the logic piece by piece.

The key enabler: required_columns()

Remember required_columns() from Module 2? Every expression node knows which columns it needs:

Expressions know their dependencies
# col("region") == "EU"
# └── required_columns() → {"region"}

# (col("amount") > 100) & (col("region") == "EU")
# └── required_columns() → {"amount", "region"}

This is the bridge between the two trees. The expression tree tells the optimizer which columns the filter needs. The plan tree tells the optimizer which columns each node provides. If the filter's required columns are a subset of a child node's input columns, the filter can be pushed past that node.

Here's how required_columns() propagates through the expression AST — a quick refresher from Module 2:

How required_columns propagates
class Col:
    def required_columns(self):
        return {self.name}          # just this column

class Lit:
    def required_columns(self):
        return set()                # constants need nothing

class BinaryExpr:
    def required_columns(self):
        return self.left.required_columns() | self.right.required_columns()

The | operator is set union — the BinaryExpr needs whatever its left side needs plus whatever its right side needs. This propagation is what makes filter pushdown possible. Without it, the optimizer would have no way to know which columns a filter touches.

This is the moment required_columns() earns its keep
Without it — if the filter were still a lambda — the optimizer would have no way to inspect which columns the filter accesses. None of what follows would be possible.

The pushdown logic: _try_push_filter

Now for the core algorithm. When the recursive walk finds a FilterNode, it calls _try_push_filter with the filter's predicate and the (already-recursed) child node. This method asks: "Can I push this filter past this child? If so, do it. If not, leave it in place."

Let's build the rules one child type at a time.

Rule 1: Pushing past ProjectNode

A ProjectNode selects or reorders columns. If the filter only needs columns that exist below the project (in its child's schema), we can safely push the filter underneath:

Pushing a filter past a projection
if isinstance(child, ProjectNode) and child._columns:
    child_has = set(child.child.schema().column_names)
    if needed <= child_has:
        pushed = FilterNode(child.child, predicate)
        return ProjectNode(pushed, child._columns, child._exprs)

The key line is needed <= child_has. This is Python's set subset operator — it checks whether every column the filter needs exists in the project's input. If yes, we create a new FilterNode underneath the ProjectNode, and wrap it back up in a new ProjectNode. If not, the filter stays where it is.

Before

Filter [region == 'EU']
  Project [order_id, region, amount]
    Scan [...]

After

Project [order_id, region, amount]
  Filter [region == 'EU']
    Scan [...]

Rule 2: Pushing into JoinNode

This is the high-value rule — the one that makes the biggest difference in real pipelines. A JoinNode has two children (left and right), and each has its own set of columns. The optimizer checks: does the filter's required columns come entirely from the left side? Or entirely from the right?

Pushing a filter into a join branch
if isinstance(child, JoinNode):
    left_cols  = set(child.left.schema().column_names)
    right_cols = set(child.right.schema().column_names)

    if needed <= left_cols:
        return JoinNode(
            FilterNode(child.left, predicate),  # push into left
            child.right,
            child.left_on, child.right_on, child.how,
        )
    if needed <= right_cols:
        return JoinNode(
            child.left,
            FilterNode(child.right, predicate),  # push into right
            child.left_on, child.right_on, child.how,
        )

If the filter needs {"region"} and region comes from the left table, we wrap the left child in a new FilterNode and rebuild the JoinNode with the filtered left branch. The right branch is untouched.

If the filter needs columns from both sides (e.g., col("left_price") > col("right_price")), neither subset check passes, and the filter stays above the join. This is correct — you can't evaluate a cross-table predicate on a single side.

Before

Filter [region == 'EU']
  Filter [segment == 'Ent']
    Join [inner]
      Scan(orders)
      Scan(customers)

After

Join [inner]
  Filter [region == 'EU']
    Scan(orders)
  Filter [segment == 'Ent']
    Scan(customers)

Look what happened — two filters that were stacked above the join got pushed into different branches. region belongs to the orders table (left), segment belongs to the customers table (right). The optimizer figured this out using required_columns() and the schema of each branch.

Rule 3: Aggregation and sort boundaries

Not every node is safe to push past. Consider AggNode: if you push a filter below an aggregation, you change which rows get aggregated, which changes the result. A filter on a group-by key is safe though — filtering by region == 'EU' before grouping by region doesn't change the groups, it just removes one:

Pushing past aggregation — only on group keys
if isinstance(child, (AggNode, SortedAggNode)):
    group_keys = set(child.group_by)
    if needed <= group_keys:
        pushed = FilterNode(child.child, predicate)
        return type(child)(pushed, child.group_by, child.agg_exprs)

If the filter only references group-by keys (like filtering on region after group_by("region")), it's safe to push because the group boundaries don't change — you're just removing entire groups. But a filter on an aggregated result like total > 1000 can't be pushed down because total doesn't exist in the child's output yet — the subset check fails and the filter stays above. Notice the elegant use of type(child)(...) — this works for both AggNode and SortedAggNode since they share the same constructor signature.

The fallback

If none of the rules match — the child is a SortNode, a ScanNode, or any node type the optimizer doesn't have a special rule for — the filter stays where it is:

When nothing matches, leave the filter in place
# If no rule matched, keep the filter above the child
return FilterNode(child, predicate)

This is the conservative default. An optimizer should never change the semantics of a query — if it's not sure a rewrite is safe, it does nothing. The query still produces correct results; it just doesn't get this particular optimization.

The complete pushdown rules

Child Node Rule Safety Check
ProjectNode Push filter below the projection Filter columns exist in project's input
JoinNode Push into the branch that owns the columns All filter columns come from one side
AggNode Push below aggregation Filter only references group-by keys
PivotNode Push below pivot Filter only references index columns
UnpivotNode Push below unpivot Filter only references id columns
Everything else Leave in place
In the wild
In Polars, lf.explain(optimized=True) shows this in action. Polars calls it "predicate pushdown" — the concept is identical. Spark's Catalyst has the same pass, extended to handle UNION and subqueries.

Now let's look at pyfloe's complete implementation. It handles a few more node types than our simplified version, but the structure is identical:

pyfloe's _push_filters — the full recursive walk
def _push_filters(self, node: PlanNode) -> PlanNode:
    if isinstance(node, FilterNode):
        child = self._push_filters(node.child)
        return self._try_push_filter(node.predicate, child)
    if isinstance(node, ProjectNode):
        return ProjectNode(self._push_filters(node.child), node._columns, node._exprs)
    if isinstance(node, JoinNode):
        return JoinNode(
            self._push_filters(node.left),
            self._push_filters(node.right),
            node.left_on, node.right_on, node.how,
        )
    if isinstance(node, SortNode):
        return SortNode(self._push_filters(node.child), node.by, node.ascending)
    if isinstance(node, WithColumnNode):
        return WithColumnNode(self._push_filters(node.child), node._name, node._expr)
    if isinstance(node, (AggNode, SortedAggNode)):
        return type(node)(self._push_filters(node.child), node.group_by, node.agg_exprs)
    # PivotNode, UnpivotNode — same recurse-and-rebuild pattern
    return node

And here's the full _try_push_filter:

pyfloe's _try_push_filter — the decision logic
def _try_push_filter(self, predicate: Expr, child: PlanNode) -> PlanNode:
    needed = predicate.required_columns()

    if isinstance(child, ProjectNode) and child._columns:
        child_has = set(child.child.schema().column_names)
        if needed <= child_has:
            return ProjectNode(FilterNode(child.child, predicate), child._columns, child._exprs)

    if isinstance(child, (AggNode, SortedAggNode)):
        if needed <= set(child.group_by):
            return type(child)(FilterNode(child.child, predicate), child.group_by, child.agg_exprs)

    if isinstance(child, JoinNode):
        left_cols  = set(child.left.schema().column_names)
        right_cols = set(child.right.schema().column_names)
        if needed <= left_cols:
            return JoinNode(
                FilterNode(child.left, predicate), child.right,
                child.left_on, child.right_on, child.how)
        if needed <= right_cols:
            return JoinNode(
                child.left, FilterNode(child.right, predicate),
                child.left_on, child.right_on, child.how)

    # PivotNode, UnpivotNode — same subset-check pattern

    return FilterNode(child, predicate)  # fallback

That's the entire filter pushdown implementation. Two methods, about 50 lines of code. The complexity isn't in the code — it's in understanding which rewrites are safe and why. Every rule boils down to one question: "Does the filter's required_columns() fit within the columns available at the target location?"

The subset operator is doing all the work
Count how many times <= appears in this code. Every pushdown decision is a set-subset check. Python's set class gives us <= (subset), | (union), and & (intersection) — and those three operations are the entire mathematical vocabulary of filter pushdown. The expression AST provides required_columns() as a set. The schema provides column_names as a set. The optimizer is just comparing sets.

Lesson 5.4 Column Pruning

Filter pushdown reduces the number of rows. Column pruning reduces the width of each row — how many values it carries. If your pipeline ends with .select("order_id", "name", "amount"), why should the engine carry date, segment, internal_notes, and 15 other columns through every node?

Column pruning walks the tree top-down, tracking which columns are actually needed at each level. When it reaches a ScanNode, it inserts a ProjectNode to discard unused columns right at the source — before any data flows.

The approach: propagate "needed" sets downward

The algorithm starts at the root and asks: "What columns does the output need?" Then it walks down the tree, and at each node, it computes: "Given what my parent needs, what do I need from my child?" Each node type has its own rule for this computation.

The pruning entry point
def optimize(self, plan: PlanNode) -> PlanNode:
    plan = self._push_filters(plan)       # pass 1
    needed = set(plan.schema().column_names) # root's output columns
    plan = self._prune_columns(plan, needed) # pass 2
    return plan

The initial needed set is the root's output columns — whatever the final ProjectNode or ScanNode at the top produces. This set flows downward through the tree, expanding at each node to include whatever extra columns that node requires.

Pruning at ScanNode — the payoff

The simplest and most impactful rule: when we reach a ScanNode, compare what it provides to what's actually needed. If there are unused columns, insert a ProjectNode to strip them:

Pruning at the data source
if isinstance(node, ScanNode):
    available = set(node._columns)
    prune_to = [c for c in node._columns if c in needed]
    if len(prune_to) < len(available) and prune_to:
        return ProjectNode(node, prune_to)
    return node

Two details are worth noting. First, the list comprehension preserves the original column order — [c for c in node._columns if c in needed] iterates over the scan's columns, not the needed set, so the output order is deterministic. Second, the guard prune_to (truthy if non-empty) prevents inserting an empty projection that would discard all columns.

Expanding the needed set at each node

As the algorithm walks down the tree, each node type may add columns to the needed set. A FilterNode needs whatever its parent needs plus the columns its predicate references:

FilterNode expands the needed set
if isinstance(node, FilterNode):
    filter_needs = node.predicate.required_columns()
    return FilterNode(
        self._prune_columns(node.child, needed | filter_needs),
        node.predicate,
    )

The | is set union — the child of a FilterNode must provide both the columns the parent needs and the columns the filter evaluates. This is required_columns() doing its job again — the same method that powered filter pushdown now drives column pruning.

Splitting needs at JoinNode

Joins are the most interesting case. A join's output is the combination of left and right columns. The pruner needs to figure out which needed columns come from the left side and which from the right — then tell each child to keep only its portion, plus the join keys:

Splitting needs across a join
if isinstance(node, JoinNode):
    left_cols  = set(node.left.schema().column_names)
    right_cols = set(node.right.schema().column_names)
    left_needed  = (needed & left_cols)  | set(node.left_on)
    right_needed = (needed & right_cols) | set(node.right_on)
    return JoinNode(
        self._prune_columns(node.left, left_needed),
        self._prune_columns(node.right, right_needed),
        node.left_on, node.right_on, node.how,
    )

Three set operations in two lines tell the whole story. needed & left_cols is set intersection — which of the downstream-needed columns come from the left side? We union that with the join keys (node.left_on) because the join always needs its keys, even if no downstream node references them. Same logic for the right side.

The WithColumnNode case

WithColumnNode computes a new column from an expression. Its child doesn't need the output column (it doesn't exist yet), but it does need whatever columns the expression references:

WithColumnNode — subtract output, add expression deps
if isinstance(node, WithColumnNode):
    expr_needs = node._expr.required_columns()
    child_needed = (needed - {node._name}) | expr_needs
    return WithColumnNode(
        self._prune_columns(node.child, child_needed),
        node._name, node._expr,
    )

needed - {node._name} removes the computed column name (it doesn't exist in the child's output). | expr_needs adds the columns the expression reads. So if WithColumnNode computes tax = amount * 0.2, the child needs amount but not tax.

Aggregation — a natural boundary

AggNode is a natural pruning boundary. Its output is the group-by keys plus the aggregation results. Its input needs the group-by keys plus whatever columns the aggregate expressions read:

pyfloe's AggNode pruning
if isinstance(node, AggNode):
    child_needed = set(node.group_by)
    for agg in node.agg_exprs:
        child_needed |= agg.required_columns()
    return AggNode(
        self._prune_columns(node.child, child_needed),
        node.group_by, node.agg_exprs,
    )

Notice that child_needed here is computed entirely from the AggNode's own parameters — it ignores the needed set from above. That's because aggregation completely reshapes the data: the child's columns bear no direct relation to the parent's columns. The aggregation itself determines what it needs.

The two-pass interaction
Filter pushdown and column pruning are designed to compose. After pushdown moves filters closer to the source, column pruning sees a simpler tree to work with — potentially with fewer nodes between the scan and the root. The pushed-down filters also expand the needed-column set at the right level, ensuring that filter columns don't get pruned too early. Running pushdown first, then pruning, is a deliberate design choice.

Lesson 5.5 Seeing the Difference

Let's bring it all together. Here's a realistic pipeline — the kind a data engineer writes every day:

A pipeline with optimization potential
orders = LazyFrame([
    {"order_id": 1, "customer_id": 101, "amount": 250,
     "date": "2024-01-15", "region": "EU", "notes": "..."},
    # ... thousands more rows, 6 columns each
])

customers = LazyFrame([
    {"customer_id": 101, "name": "Alice",
     "segment": "Enterprise", "tier": "Gold"},
    # ... hundreds of customers, 4 columns each
])

pipeline = (
    orders
    .join(customers, on="customer_id")
    .filter(col("region") == "EU")
    .filter(col("segment") == "Enterprise")
    .select("order_id", "name", "amount")
)

Now let's see what .explain() reveals — before and after:

pipeline.explain()

Project [order_id, name, amount]
  Filter [segment == 'Enterprise']
    Filter [region == 'EU']
      Join [inner] on customer_id
        Scan [6 cols] (N rows)
        Scan [4 cols] (M rows)

pipeline.explain(optimized=True)

Project [order_id, name, amount]
  Join [inner] on customer_id
    Filter [region == 'EU']
      Project [order_id, customer_id,
              amount]
        Scan (N rows)
    Filter [segment == 'Enterprise']
      Project [customer_id, name]
        Scan (M rows)

Count the changes:

Filter pushdown: region == 'EU' was pushed into the left branch (orders), because region is an orders column. segment == 'Enterprise' was pushed into the right branch (customers), because segment is a customers column. The join now operates on pre-filtered data from both sides.

Column pruning: The left scan originally carried 6 columns; now it carries 3 (order_id, customer_id, amount). The right scan originally carried 4 columns; now it carries 2 (customer_id, name). Everything the pipeline doesn't need — date, notes, tier — is stripped at the source.

Why this matters in practice

The impact is multiplicative. Suppose the orders table has 100,000 rows and the EU filter reduces that to 30,000. The customers table has 5,000 rows and the Enterprise filter reduces that to 800.

Without optimizer

The join builds a hash table of 5,000 customers (4 cols each), then probes it with 100,000 orders (6 cols each). After the join, the two filters run on 100,000+ joined rows carrying 10 columns. The project at the top finally strips columns.

With optimizer

The left filter reduces orders to 30,000 rows (3 cols). The right filter reduces customers to 800 rows (2 cols). The join builds a hash table of 800 rows — not 5,000. It probes with 30,000 rows — not 100,000. Less memory, less CPU, same result.

For large pipelines, these optimizations can be the difference between a query taking seconds and taking minutes. And the user's code didn't change at all — they wrote the pipeline in whatever order felt natural, and the optimizer silently rewrote it to be efficient.

Show the magic
This is exactly what happens when you write a Polars pipeline and call .collect(). Polars runs predicate pushdown, projection pushdown (column pruning), and several other optimization passes before executing anything. You can see it with lf.explain(optimized=True). Spark does the same in its Catalyst optimizer — and now you understand the fundamental mechanism behind both. You built it.

The full optimizer, in perspective

Let's take stock of what the Optimizer class actually is. It's about 80 lines of code — three methods. optimize() coordinates the two passes. _push_filters() walks the tree top-down, and for every FilterNode it finds, delegates to _try_push_filter() which uses subset checks on required_columns() to decide where the filter can go. _prune_columns() walks the tree top-down with a needed set, expanding it at each node, and inserts ProjectNodes at ScanNodes to strip unused columns.

Two recursive walks. A handful of isinstance checks. Set operations — <=, |, &, -. And required_columns(), the method you built in Module 2, is the thread that ties everything together. That's all it takes.

What pyfloe adds beyond our simplified version
pyfloe's actual optimizer handles a few more node types than we walked through in detail: PivotNode, UnpivotNode, WindowNode. The pattern is identical in every case — check whether the filter's columns fit, or compute the needed set for the child. pyfloe also handles right_-prefixed columns in join pruning (for disambiguating columns that appear on both sides of a join). These are refinements, not new concepts.

Quick check

1. Why can't an eager engine support query optimization?

2. A filter uses col("amount") > 100. The child is a JoinNode where amount exists in the left schema. What happens?

3. What does required_columns() return for col("price") * lit(0.2)?

4. In column pruning, what does the child of WithColumnNode(tax = amount * 0.2) need?

5. Why does pyfloe run filter pushdown before column pruning?


You built a query engine
Take a moment to appreciate what you've done across these five modules. You started with Python lists and generators. You built an expression AST with operator overloading. You connected it to a volcano-model execution engine with batched processing and schema propagation. You implemented hash joins and hash aggregation. And now you've written an optimizer that rewrites plan trees for efficiency — using the architecture that powers Polars, Spark, and every serious database engine in production today.

But this is a pure Python engine. The real world wants more. What if you could process batches across multiple CPU cores? What if you could stream results to disk without ever holding the full dataset in memory? What if you rewrote the hot loops in Rust or C? Those are the questions that separate a teaching engine from a production one — and they're exactly where you go next.