Wiring It All Together
You've spent four modules building the parts: expressions, plan nodes, batched execution, hash joins, aggregation. Now it's time to step back, wire everything into a real pipeline, and watch the engine run end to end.
This isn't a new module. No new concepts. Think of it as a pit stop — a chance to see the machine you've built actually work, to read the plan tree it produces, and to notice, with fresh eyes, the waste that our engine doesn't yet know how to avoid. That waste is what motivates Module 5.
A quick inventory
Before we wire anything together, let's take stock. Across four modules, you've built or studied every piece of this system:
Individually, you understand each piece. The question now is: what happens when they all work together?
Section I.1 Building a Real Pipeline
Let's build something realistic. Imagine an e-commerce company with two tables: orders (what customers bought) and customers (who they are). We want to answer a simple business question: "For each region, what's the total revenue from orders over $100?"
Here's the data. It's small enough to fit in your head — that's deliberate. The point isn't volume; it's watching the pipeline run.
import pyfloe as pf
orders = pf.LazyFrame([
{"order_id": 1, "customer_id": 101, "amount": 250.0, "product": "Widget A"},
{"order_id": 2, "customer_id": 102, "amount": 45.0, "product": "Widget B"},
{"order_id": 3, "customer_id": 103, "amount": 180.0, "product": "Widget C"},
{"order_id": 4, "customer_id": 101, "amount": 320.0, "product": "Widget D"},
{"order_id": 5, "customer_id": 104, "amount": 90.0, "product": "Widget A"},
{"order_id": 6, "customer_id": 102, "amount": 150.0, "product": "Widget E"},
])
customers = pf.LazyFrame([
{"customer_id": 101, "name": "Alice", "region": "EU", "segment": "Enterprise"},
{"customer_id": 102, "name": "Bob", "region": "US", "segment": "SMB"},
{"customer_id": 103, "name": "Carlos", "region": "EU", "segment": "Enterprise"},
{"customer_id": 104, "name": "Diana", "region": "US", "segment": "Enterprise"},
])
# Build the pipeline — each call adds a plan node
result = (
orders
.join(customers, on="customer_id")
.filter(pf.col("amount") > 100)
.with_column("tax", pf.col("amount") * 0.2)
.select("region", "amount", "tax")
)
# Check schema BEFORE running (lazy!)
print("Schema:", result.schema)
print()
# Now materialize
result.collect()
for row in result.to_pylist():
print(row)
Nothing has happened yet. Both orders and customers
are LazyFrame objects wrapping ScanNode instances.
No data has been processed. No rows have been touched. This is the "lazy"
in lazy evaluation.
Now let's build the pipeline — each method call adds a node to the plan tree:
result = (
orders
.join(customers, on="customer_id") # JoinNode
.filter(pf.col("amount") > 100) # FilterNode
.with_column("tax", pf.col("amount") * 0.2) # WithColumnNode
.select("region", "amount", "tax") # ProjectNode
)
Four method calls. Four plan nodes created. Zero rows processed. The variable
result is a LazyFrame that holds the plan
for a computation, not the result of one.
.join(), .filter(),
.with_column(), .select() — creates a new plan
node and wraps it in a new LazyFrame. The original
orders and customers frames are untouched. This
is the _from_plan classmethod from Module 3 in action.
We can even query the output schema before running anything:
result.schema
# Schema(
# region: str
# amount: float
# tax: float
# )
result.is_materialized # False
Three columns, all typed, no data touched. Each node's .schema()
method propagated types from the leaves to the root — exactly the
LazySchema machinery from Module 3, Lesson 3.5.
Now let's pull the trigger:
result.collect()
result.to_pylist()
# [
# {'region': 'EU', 'amount': 250.0, 'tax': 50.0},
# {'region': 'EU', 'amount': 180.0, 'tax': 36.0},
# {'region': 'EU', 'amount': 320.0, 'tax': 64.0},
# {'region': 'US', 'amount': 150.0, 'tax': 30.0},
# ]
That's the complete lifecycle: build the plan lazily, inspect the schema
instantly, materialise when you need the data. The order of execution was
not top-to-bottom through the code — it was bottom-to-top through the
plan tree. .collect() called
ProjectNode.execute_batched(), which pulled from
WithColumnNode, which pulled from FilterNode,
which pulled from JoinNode, which pulled from the two
ScanNode leaves. The volcano model, exactly as we built it.
Section I.2 Reading the Explain Plan
Every LazyFrame has an .explain() method that
prints the plan tree as an indented string. Each line is one node; each
level of indentation is one layer deeper in the tree. Let's look at
what our pipeline produced:
print(result.explain())
# Project [region, amount, tax]
# WithColumn [tax = (col("amount") * 0.2)]
# Filter [(col("amount") > 100)]
# Join [inner] ['customer_id'] = ['customer_id']
# Scan [order_id, customer_id, amount, product] (6 rows)
# Scan [customer_id, name, region, segment] (4 rows)
Read it from the bottom up — that's the direction data flows. Let's walk through every line:
The two ScanNode leaves are the data sources. The
JoinNode is the first node with two children —
this is where the plan becomes a true tree rather than a simple chain.
Above the join, every node has one child, forming a linear stack: filter,
then compute, then project.
explain() output is produced by the recursive
PlanNode.explain() method you saw in Module 3. Each node
calls _explain_self() for its own line, then calls
.explain(indent + 1) on each child. The indentation
is the tree structure.
Connecting it to the volcano model
When .collect() runs, execution starts at the top
of this tree and pulls downward. The ProjectNode asks its
child (the WithColumnNode) for a batch. The
WithColumnNode asks the FilterNode. The
FilterNode asks the JoinNode. The
JoinNode asks both ScanNode leaves. Rows
ripple up through the tree in batches of 1,024.
The plan is a recipe. The volcano model is the kitchen
that executes the recipe. And .explain() is the recipe card
you can read before anything starts cooking.
Section I.3 Spotting the Waste
Our engine is correct. It produces the right answer. But look at the plan again — really look at it — and ask: is it doing unnecessary work?
Waste #1: The filter is too late
Look at where the FilterNode sits in the plan:
The filter sits above the join. That means the join processes all 6 orders — including order #2 ($45) and order #5 ($90), which will be thrown away immediately afterward. With our tiny dataset, that's 2 wasted rows. With 10 million orders, it's potentially millions of wasted join probes.
pf.col("amount") > 100 only depends
on the amount column, which comes from the orders
table. It doesn't need anything from customers. So
there's no reason the filter must wait until after the join.
It could run on the orders table directly, before any joining
happens — reducing the input from 6 rows to 4.
What would a smarter plan look like?
Same result. Fewer rows entering the join. This transformation has a name: filter pushdown.
Waste #2: Too many columns
Look at the ScanNode for orders. It reads four columns:
order_id, customer_id, amount,
and product.
Now look at the final ProjectNode: it keeps only
region, amount, and tax. The
order_id column is never used by any node in the plan.
The product column is never used either. And from the
customers table, name and segment are carried
through the entire join only to be discarded at the end.
Four columns (highlighted in red) are read from the data sources, carried through the join as extra elements in every tuple, passed through the filter and the with-column computation, and only then dropped by the project at the very top. Every one of those columns consumed memory in the hash table, occupied a slot in every tuple, and was copied across batch boundaries — for nothing.
ProjectNode at the top. But by then, the work is done.
A smarter engine would figure out the needed columns starting from the
top and prune unused ones as early as possible — ideally right at the
ScanNode.
What would an optimal plan look like?
The filter was pushed below the join (filter pushdown). And
ProjectNode nodes were inserted right above each
ScanNode to drop unused columns as early as possible. This
transformation is called column pruning.
Section I.4 Our Engine vs. Polars — A Side-by-Side
At this point, you might be wondering: how close is what we've built to a "real" library? Let's put pyfloe and Polars side by side.
pyfloe
import pyfloe as pf
result = (
pf.LazyFrame(data)
.filter(pf.col("amount") > 100)
.join(customers, on="id")
.select("id", "amount")
.collect()
)
result.explain()
result.schema
Polars
import polars as pl
result = (
pl.LazyFrame(data)
.filter(pl.col("amount") > 100)
.join(customers, on="id")
.select("id", "amount")
.collect()
)
result.explain()
result.schema
The shapes are nearly identical. Both use pf.col() / pl.col() to reference
columns. Both use .filter(), .join(),
.select() to build the plan. Both use .collect()
to trigger execution. Both have .explain() to print the plan
tree and .schema to inspect types without running anything.
The differences are under the hood:
| Dimension | pyfloe | Polars |
|---|---|---|
| Language | Pure Python | Rust core via PyO3 |
| Row format | Tuples (row-oriented, in batches of 1,024) | Arrow arrays (columnar) |
| Hash tables | Python dict / defaultdict | Custom hash maps with SIMD probing |
| Optimizer | Rule-based, 2 passes | Cost-based + rule-based |
| Parallelism | Single-threaded | Work-stealing (rayon) |
| Scale | In-memory only | Streaming engine for out-of-core |
| Dependencies | Zero (~6K lines) | Polars + Arrow ecosystem |
The architecture is the same: a plan tree of nodes, an expression AST inside those nodes, a schema that propagates without data, and an optimizer that rewrites the tree. Polars is orders of magnitude faster, but the design — the thing you can learn by building it yourself — is exactly what you've built over the last four modules.
pl.col("amount") > 100 in Polars, you
know what's actually happening. A __gt__ dunder creates a
BinaryExpr node. That expression gets embedded inside a
FilterNode. The optimizer inspects it, checks
required_columns(), and decides whether to push it past a
join. The volcano model pulls rows through the tree in batches.
You've built every piece of that machinery. The "magic" is just
engineering — and now you understand it.
Deep dive: What about PySpark?
To understand PySpark's architecture, you need a bit of history. Before Spark, the dominant big-data framework was Hadoop MapReduce. Hadoop's insight was simple but powerful: split a huge dataset across many machines (using HDFS, the Hadoop Distributed File System), then process each chunk in two phases — a map step (transform each row independently) and a reduce step (aggregate results across all chunks). The problem? Every MapReduce job wrote intermediate results back to disk. If your pipeline had five steps, that was five rounds of disk I/O across the cluster. Slow.
Spark was created to fix exactly this. Its foundational abstraction
was the RDD — the Resilient Distributed Dataset. An
RDD is a lazy, distributed collection that tracks its own lineage:
instead of materializing intermediate results to disk, it remembers
the chain of transformations (.map(), .filter(),
.reduceByKey()) and only executes them when an
action like .collect() is called. Sound
familiar? It should — that's the same lazy plan-tree architecture
you've been building. RDDs were the original plan nodes, and
.collect() was the original materialisation trigger.
But RDDs had a limitation: they were opaque. Spark could see that you
called .filter(), but it couldn't inspect the lambda
inside the filter — exactly the problem we identified with
lambdas in Module 1. Without inspectable expressions, there was no way
to do filter pushdown or column pruning.
That's why Spark introduced DataFrames (and later the
Dataset API), backed by the Catalyst
optimizer. PySpark's modern API uses
spark.createDataFrame() and F.col() instead
of our pf.LazyFrame() and pf.col() — but the
architecture is strikingly similar. Spark builds a logical
plan (our plan tree), transforms it with Catalyst (their version
of filter pushdown and column pruning, plus many more rules), and
executes it with a physical plan that distributes work
across a cluster.
The key difference is distribution: Spark partitions data across machines and coordinates joins and aggregations over the network. Our engine runs on a single machine. But the plan tree, the expression AST, and the optimizer concepts are directly transferable. The evolution from Hadoop's disk-heavy MapReduce → Spark's lazy RDDs → Spark's inspectable DataFrames mirrors the exact progression we followed in this course: eager execution → lazy generators → expression ASTs that an optimizer can reason about. If you understand what we've built, you understand the architectural bones of Catalyst.
Exercises
These exercises are designed to cement your ability to read and reason about plan trees — the skill you'll need most in Module 5.
Exercise 1: Annotate the plan
Below is an .explain() output for a different pipeline.
For each node, identify: (a) what kind of node it is, and (b) what
data flows into it.
Exercise 2: Spot the waste
Look at this plan. Can you identify at least two sources of waste?
Exercise 3: Predict the optimized plan
Given the wasteful plan from Exercise 2, draw (or describe) what the
optimized plan should look like after filter pushdown and
column pruning. Where does the filter end up? Where do the
ProjectNode insertions go?