Build Your Own DataFrame
Interlude

Wiring It All Together

You've spent four modules building the parts: expressions, plan nodes, batched execution, hash joins, aggregation. Now it's time to step back, wire everything into a real pipeline, and watch the engine run end to end.

This isn't a new module. No new concepts. Think of it as a pit stop — a chance to see the machine you've built actually work, to read the plan tree it produces, and to notice, with fresh eyes, the waste that our engine doesn't yet know how to avoid. That waste is what motivates Module 5.

A quick inventory

Before we wire anything together, let's take stock. Across four modules, you've built or studied every piece of this system:

📄
ScanNode
Module 1
🔍
FilterNode
Modules 1 & 3
📐
ProjectNode
Module 3
WithColumnNode
Module 3
🔗
JoinNode
Module 4
📊
AggNode
Module 4
🌳
Expression AST
Module 2
🏷️
LazySchema
Module 3

Individually, you understand each piece. The question now is: what happens when they all work together?


Section I.1 Building a Real Pipeline

Let's build something realistic. Imagine an e-commerce company with two tables: orders (what customers bought) and customers (who they are). We want to answer a simple business question: "For each region, what's the total revenue from orders over $100?"

Here's the data. It's small enough to fit in your head — that's deliberate. The point isn't volume; it's watching the pipeline run.

Our data — using pyfloe's real API
import pyfloe as pf

orders = pf.LazyFrame([
    {"order_id": 1, "customer_id": 101, "amount": 250.0, "product": "Widget A"},
    {"order_id": 2, "customer_id": 102, "amount": 45.0,  "product": "Widget B"},
    {"order_id": 3, "customer_id": 103, "amount": 180.0, "product": "Widget C"},
    {"order_id": 4, "customer_id": 101, "amount": 320.0, "product": "Widget D"},
    {"order_id": 5, "customer_id": 104, "amount": 90.0,  "product": "Widget A"},
    {"order_id": 6, "customer_id": 102, "amount": 150.0, "product": "Widget E"},
])

customers = pf.LazyFrame([
    {"customer_id": 101, "name": "Alice",  "region": "EU", "segment": "Enterprise"},
    {"customer_id": 102, "name": "Bob",    "region": "US", "segment": "SMB"},
    {"customer_id": 103, "name": "Carlos", "region": "EU", "segment": "Enterprise"},
    {"customer_id": 104, "name": "Diana",  "region": "US", "segment": "Enterprise"},
])

# Build the pipeline — each call adds a plan node
result = (
    orders
    .join(customers, on="customer_id")
    .filter(pf.col("amount") > 100)
    .with_column("tax", pf.col("amount") * 0.2)
    .select("region", "amount", "tax")
)

# Check schema BEFORE running (lazy!)
print("Schema:", result.schema)
print()

# Now materialize
result.collect()
for row in result.to_pylist():
    print(row)

Nothing has happened yet. Both orders and customers are LazyFrame objects wrapping ScanNode instances. No data has been processed. No rows have been touched. This is the "lazy" in lazy evaluation.

Now let's build the pipeline — each method call adds a node to the plan tree:

The full pipeline
result = (
    orders
    .join(customers, on="customer_id")          # JoinNode
    .filter(pf.col("amount") > 100)              # FilterNode
    .with_column("tax", pf.col("amount") * 0.2)  # WithColumnNode
    .select("region", "amount", "tax")         # ProjectNode
)

Four method calls. Four plan nodes created. Zero rows processed. The variable result is a LazyFrame that holds the plan for a computation, not the result of one.

Notice
Each method call — .join(), .filter(), .with_column(), .select() — creates a new plan node and wraps it in a new LazyFrame. The original orders and customers frames are untouched. This is the _from_plan classmethod from Module 3 in action.

We can even query the output schema before running anything:

Schema — known instantly
result.schema
# Schema(
#   region: str
#   amount: float
#   tax: float
# )

result.is_materialized   # False

Three columns, all typed, no data touched. Each node's .schema() method propagated types from the leaves to the root — exactly the LazySchema machinery from Module 3, Lesson 3.5.

Now let's pull the trigger:

Materialise with .collect()
result.collect()
result.to_pylist()
# [
#   {'region': 'EU', 'amount': 250.0, 'tax': 50.0},
#   {'region': 'EU', 'amount': 180.0, 'tax': 36.0},
#   {'region': 'EU', 'amount': 320.0, 'tax': 64.0},
#   {'region': 'US', 'amount': 150.0, 'tax': 30.0},
# ]

That's the complete lifecycle: build the plan lazily, inspect the schema instantly, materialise when you need the data. The order of execution was not top-to-bottom through the code — it was bottom-to-top through the plan tree. .collect() called ProjectNode.execute_batched(), which pulled from WithColumnNode, which pulled from FilterNode, which pulled from JoinNode, which pulled from the two ScanNode leaves. The volcano model, exactly as we built it.


Section I.2 Reading the Explain Plan

Every LazyFrame has an .explain() method that prints the plan tree as an indented string. Each line is one node; each level of indentation is one layer deeper in the tree. Let's look at what our pipeline produced:

pyfloe's .explain() output
print(result.explain())
# Project [region, amount, tax]
#   WithColumn [tax = (col("amount") * 0.2)]
#     Filter [(col("amount") > 100)]
#       Join [inner] ['customer_id'] = ['customer_id']
#         Scan [order_id, customer_id, amount, product] (6 rows)
#         Scan [customer_id, name, region, segment] (4 rows)

Read it from the bottom up — that's the direction data flows. Let's walk through every line:

Project [region, amount, tax] ← root: keep only 3 columns WithColumn [tax = (col("amount") * 0.2)] ← compute tax per row Filter [(col("amount") > 100)] ← discard rows ≤ 100 Join [inner] ['customer_id'] = … ← hash join on customer_id Scan [order_id, customer_id, …] ← left: 6 order rows Scan [customer_id, name, …] ← right: 4 customer rows

The two ScanNode leaves are the data sources. The JoinNode is the first node with two children — this is where the plan becomes a true tree rather than a simple chain. Above the join, every node has one child, forming a linear stack: filter, then compute, then project.

Key insight
The explain() output is produced by the recursive PlanNode.explain() method you saw in Module 3. Each node calls _explain_self() for its own line, then calls .explain(indent + 1) on each child. The indentation is the tree structure.

Connecting it to the volcano model

When .collect() runs, execution starts at the top of this tree and pulls downward. The ProjectNode asks its child (the WithColumnNode) for a batch. The WithColumnNode asks the FilterNode. The FilterNode asks the JoinNode. The JoinNode asks both ScanNode leaves. Rows ripple up through the tree in batches of 1,024.

The plan is a recipe. The volcano model is the kitchen that executes the recipe. And .explain() is the recipe card you can read before anything starts cooking.


Section I.3 Spotting the Waste

Our engine is correct. It produces the right answer. But look at the plan again — really look at it — and ask: is it doing unnecessary work?

Waste #1: The filter is too late

Look at where the FilterNode sits in the plan:

Project [region, amount, tax] WithColumn [tax = (col("amount") * 0.2)] Filter [(col("amount") > 100)] ← HERE Join [inner] ['customer_id'] = ['customer_id'] Scan [order_id, customer_id, amount, product] (6 rows) Scan [customer_id, name, region, segment] (4 rows)

The filter sits above the join. That means the join processes all 6 orders — including order #2 ($45) and order #5 ($90), which will be thrown away immediately afterward. With our tiny dataset, that's 2 wasted rows. With 10 million orders, it's potentially millions of wasted join probes.

The problem
The filter expression pf.col("amount") > 100 only depends on the amount column, which comes from the orders table. It doesn't need anything from customers. So there's no reason the filter must wait until after the join. It could run on the orders table directly, before any joining happens — reducing the input from 6 rows to 4.

What would a smarter plan look like?

Project [region, amount, tax] WithColumn [tax = (col("amount") * 0.2)] Join [inner] ['customer_id'] = ['customer_id'] Filter [(col("amount") > 100)] ← PUSHED DOWN Scan [order_id, customer_id, amount, product] (6 rows) Scan [customer_id, name, region, segment] (4 rows)

Same result. Fewer rows entering the join. This transformation has a name: filter pushdown.

Waste #2: Too many columns

Look at the ScanNode for orders. It reads four columns: order_id, customer_id, amount, and product.

Now look at the final ProjectNode: it keeps only region, amount, and tax. The order_id column is never used by any node in the plan. The product column is never used either. And from the customers table, name and segment are carried through the entire join only to be discarded at the end.

Project [region, amount, tax] WithColumn [tax = (col("amount") * 0.2)] Filter [(col("amount") > 100)] Join [inner] ['customer_id'] = ['customer_id'] Scan [order_id, customer_id, amount, product] (6 rows) Scan [customer_id, name, region, segment] (4 rows)

Four columns (highlighted in red) are read from the data sources, carried through the join as extra elements in every tuple, passed through the filter and the with-column computation, and only then dropped by the project at the very top. Every one of those columns consumed memory in the hash table, occupied a slot in every tuple, and was copied across batch boundaries — for nothing.

The problem
The engine doesn't know which columns are needed until it reaches the ProjectNode at the top. But by then, the work is done. A smarter engine would figure out the needed columns starting from the top and prune unused ones as early as possible — ideally right at the ScanNode.

What would an optimal plan look like?

Project [region, amount, tax] WithColumn [tax = (col("amount") * 0.2)] Join [inner] ['customer_id'] = ['customer_id'] Filter [(col("amount") > 100)] Project [customer_id, amount] ← only what's needed Scan [order_id, customer_id, amount, product] (6 rows) Project [customer_id, region] ← only what's needed Scan [customer_id, name, region, segment] (4 rows)

The filter was pushed below the join (filter pushdown). And ProjectNode nodes were inserted right above each ScanNode to drop unused columns as early as possible. This transformation is called column pruning.

Two optimizations, one insight
Both filter pushdown and column pruning share the same core idea: do less work by doing it earlier. Filters reduce rows; pruning reduces columns. Both push their logic as close to the data source as possible.

Section I.4 Our Engine vs. Polars — A Side-by-Side

At this point, you might be wondering: how close is what we've built to a "real" library? Let's put pyfloe and Polars side by side.

pyfloe

import pyfloe as pf

result = (
  pf.LazyFrame(data)
  .filter(pf.col("amount") > 100)
  .join(customers, on="id")
  .select("id", "amount")
  .collect()
)

result.explain()
result.schema

Polars

import polars as pl

result = (
  pl.LazyFrame(data)
  .filter(pl.col("amount") > 100)
  .join(customers, on="id")
  .select("id", "amount")
  .collect()
)

result.explain()
result.schema

The shapes are nearly identical. Both use pf.col() / pl.col() to reference columns. Both use .filter(), .join(), .select() to build the plan. Both use .collect() to trigger execution. Both have .explain() to print the plan tree and .schema to inspect types without running anything.

The differences are under the hood:

Dimension pyfloe Polars
LanguagePure PythonRust core via PyO3
Row formatTuples (row-oriented, in batches of 1,024)Arrow arrays (columnar)
Hash tablesPython dict / defaultdictCustom hash maps with SIMD probing
OptimizerRule-based, 2 passesCost-based + rule-based
ParallelismSingle-threadedWork-stealing (rayon)
ScaleIn-memory onlyStreaming engine for out-of-core
DependenciesZero (~6K lines)Polars + Arrow ecosystem

The architecture is the same: a plan tree of nodes, an expression AST inside those nodes, a schema that propagates without data, and an optimizer that rewrites the tree. Polars is orders of magnitude faster, but the design — the thing you can learn by building it yourself — is exactly what you've built over the last four modules.

The production version
Next time you type pl.col("amount") > 100 in Polars, you know what's actually happening. A __gt__ dunder creates a BinaryExpr node. That expression gets embedded inside a FilterNode. The optimizer inspects it, checks required_columns(), and decides whether to push it past a join. The volcano model pulls rows through the tree in batches. You've built every piece of that machinery. The "magic" is just engineering — and now you understand it.
Deep dive: What about PySpark?

To understand PySpark's architecture, you need a bit of history. Before Spark, the dominant big-data framework was Hadoop MapReduce. Hadoop's insight was simple but powerful: split a huge dataset across many machines (using HDFS, the Hadoop Distributed File System), then process each chunk in two phases — a map step (transform each row independently) and a reduce step (aggregate results across all chunks). The problem? Every MapReduce job wrote intermediate results back to disk. If your pipeline had five steps, that was five rounds of disk I/O across the cluster. Slow.

Spark was created to fix exactly this. Its foundational abstraction was the RDD — the Resilient Distributed Dataset. An RDD is a lazy, distributed collection that tracks its own lineage: instead of materializing intermediate results to disk, it remembers the chain of transformations (.map(), .filter(), .reduceByKey()) and only executes them when an action like .collect() is called. Sound familiar? It should — that's the same lazy plan-tree architecture you've been building. RDDs were the original plan nodes, and .collect() was the original materialisation trigger.

But RDDs had a limitation: they were opaque. Spark could see that you called .filter(), but it couldn't inspect the lambda inside the filter — exactly the problem we identified with lambdas in Module 1. Without inspectable expressions, there was no way to do filter pushdown or column pruning.

That's why Spark introduced DataFrames (and later the Dataset API), backed by the Catalyst optimizer. PySpark's modern API uses spark.createDataFrame() and F.col() instead of our pf.LazyFrame() and pf.col() — but the architecture is strikingly similar. Spark builds a logical plan (our plan tree), transforms it with Catalyst (their version of filter pushdown and column pruning, plus many more rules), and executes it with a physical plan that distributes work across a cluster.

The key difference is distribution: Spark partitions data across machines and coordinates joins and aggregations over the network. Our engine runs on a single machine. But the plan tree, the expression AST, and the optimizer concepts are directly transferable. The evolution from Hadoop's disk-heavy MapReduce → Spark's lazy RDDs → Spark's inspectable DataFrames mirrors the exact progression we followed in this course: eager execution → lazy generators → expression ASTs that an optimizer can reason about. If you understand what we've built, you understand the architectural bones of Catalyst.


Exercises

These exercises are designed to cement your ability to read and reason about plan trees — the skill you'll need most in Module 5.

Exercise 1: Annotate the plan

Below is an .explain() output for a different pipeline. For each node, identify: (a) what kind of node it is, and (b) what data flows into it.

Project [name, total] Aggregate by=[name] aggs=[col("amount").sum()] Join [inner] ['id'] = ['id'] Scan [id, amount, status] (1000 rows) Scan [id, name, city] (200 rows)

Exercise 2: Spot the waste

Look at this plan. Can you identify at least two sources of waste?

Project [region, total_amount] Filter [(col("region") == "EU")] Aggregate by=[region] aggs=[col("amount").sum()] Join [inner] ['cust_id'] = ['cust_id'] Scan [order_id, cust_id, amount, product, notes] (50000 rows) Scan [cust_id, name, region, email, phone] (5000 rows)

Exercise 3: Predict the optimized plan

Given the wasteful plan from Exercise 2, draw (or describe) what the optimized plan should look like after filter pushdown and column pruning. Where does the filter end up? Where do the ProjectNode insertions go?


Coming next
You can see the waste. You can describe exactly what the engine should do differently. Now let's teach the engine to see it too. Module 5 builds the Optimizer — a class that walks the plan tree, inspects the expressions inside each node, and rewrites the tree for efficiency. It's the capstone of the entire course, and it brings together everything: the expression AST from Module 2, the plan tree from Module 3, and the join/aggregation nodes from Module 4. All of it converges in ~80 lines of Python.
Source References
core.py — LazyFrame API
expr.py — Expression AST
schema.py — Type System