Internals¶
This section describes the algorithms behind each major subsystem. Everything is implemented in pure Python using only stdlib data structures.
Execution model: volcano / iterator¶
LazyFrame uses the volcano model (also called the iterator model or Graefe model), the same execution strategy used by most SQL databases. Each plan node implements an execute() method that returns a Python iterator. When a parent node asks for data, it calls execute() on its child, which calls its child, and so on down to the leaf (data source). Rows are pulled up one at a time through the tree.
to_pylist() calls → FilterNode.execute()
calls → JoinNode.execute()
calls → ScanNode.execute() (left)
calls → ScanNode.execute() (right)
Rows are never buffered between stages unless an operation requires it (sort, group-by). For a pipeline like read_csv → filter → select → to_csv, exactly one row is in memory at a time.
Complexity: O(1) memory per row for streaming operations. O(n) only when materialization is forced.
Join: hash join vs sort-merge join¶
LazyFrame provides two join algorithms, chosen via the sorted= parameter:
Hash join (default): materializes the right table into a hash map keyed on join columns, then streams the left table probing the map.
Build: right_index = {}
for row in right: right_index[key(row)].append(row) O(m)
Probe: for row in left:
for match in right_index[key(row)]:
yield row + match O(n)
Sort-merge join (sorted=True): for inputs already sorted on the join key, two cursors advance in lockstep. Whichever has the smaller key advances; when keys match, it emits the cross product.
while left and right not exhausted:
if left_key < right_key: advance left (emit null for left/full join)
elif left_key > right_key: advance right (emit null for full join)
else: collect matching group, emit cross product
| Hash join | Sort-merge join | |
|---|---|---|
| Time | O(n + m) | O(n + m) if pre-sorted, O(n log n) if not |
| Memory | O(m) for hash table | O(1) base (O(g) for many-to-many groups) |
| Best when | Unsorted data, small right side | Pre-sorted data, memory-constrained |
| API | .join(other, on="id") |
.join(other, on="id", sorted=True) |
For a streaming pipeline like read_csv("sorted_log.csv").join(lookup, on="key", sorted=True).to_csv(...), the merge join never materializes either side — rows flow through with constant memory.
Aggregation: hash vs sorted streaming¶
Like joins, LazyFrame provides two aggregation strategies:
Hash aggregation (default): groups rows into a dict keyed by group columns, maintaining running accumulators per group. Unlike a naive implementation that stores all rows per group, this stores only the accumulator state (a running sum, count, min, etc.), so memory is O(k) where k = number of groups, not O(n).
accumulators = {} # key → [running_sum, running_count, ...]
for row in child.execute():
key = (row[group_col_1], ...)
if key not in accumulators:
accumulators[key] = init()
update(accumulators[key], row) # O(1) per row
for key, acc in accumulators.items():
yield key + finalize(acc)
Sorted aggregation (sorted=True): when input is pre-sorted by the group key, groups appear as contiguous runs. A single cursor watches for key changes and emits each group as soon as the key changes. Memory: O(1) per group — only one accumulator lives at a time.
prev_key = None
for row in sorted_input:
key = row[group_cols]
if key != prev_key:
if prev_key is not None: yield finalize(prev_key, acc)
acc = init()
prev_key = key
update(acc, row)
yield finalize(prev_key, acc)
| Hash aggregation | Sorted aggregation | |
|---|---|---|
| Time | O(n) | O(n) (O(n log n) if you need to sort first) |
| Memory | O(k) groups | O(1) — one accumulator at a time |
| Best when | Unsorted data | Pre-sorted data, streaming sources |
| API | .group_by("k").agg(...) |
.group_by("k", sorted=True).agg(...) |
Accumulator types: sum (running total), count (increment), mean (sum + count, divide at finalize), min/max (running extremum), first (first seen), last (overwrite), n_unique (set).
Sort: Timsort¶
SortNode materializes all rows and delegates to Python's built-in sorted(), which uses Timsort — a hybrid merge-sort / insertion-sort that is O(n log n) worst-case but adapts to partially-sorted data (O(n) for already-sorted input).
Multi-column sorting composes key tuples. For mixed ascending/descending, a negation wrapper handles numeric columns; string columns use separate stable sorts in reverse priority order, exploiting Timsort's stability guarantee.
Complexity: O(n log n) time, O(n) memory.
Window functions: sort-partition-scan¶
WindowNode implements the sort-partition-scan pattern used by SQL window functions:
1. Materialize all rows O(n)
2. Sort by (partition_key, order_key) O(n log n)
3. Scan each partition (contiguous run):
- row_number: incrementing counter O(1) per row
- rank: counter + gap on tie change O(1) per row
- dense_rank: counter, no gap O(1) per row
- cumsum/cummax/cummin: running fold O(1) per row
- lag/lead: index offset into partition O(1) per row
- agg over partition: one pass, O(p) per partition
then broadcast to all rows
4. Restore original row order O(n log n)
Partition boundaries are detected by checking when the partition key changes in the sorted sequence (no hash map needed).
Complexity: O(n log n) dominated by the two sorts. O(n) memory.
Schema propagation¶
Each PlanNode computes its output schema from its input schema and the operation's semantics, without touching data:
| Node | Schema rule |
|---|---|
ScanNode |
Inferred from data sample or provided |
FilterNode |
Pass through parent unchanged |
ProjectNode |
Select/reorder from parent |
WithColumnNode |
Append new column with inferred type |
JoinNode |
Merge left and right schemas |
AggNode |
Group-by keys + aggregate output types |
RenameNode |
Rename keys in parent schema |
SortNode, ExplodeNode |
Pass through unchanged |
WindowNode |
Append window column with inferred type |
Type inference for expressions walks the AST: col("a") * col("b") resolves both operands' types, then applies promotion rules (int × float → float). when().otherwise() takes the widest branch type. Aggregations have fixed output types (count → int, mean → float).
Complexity: O(d) where d is plan tree depth. Instantaneous for any practical pipeline.
Query optimizer: rule-based rewriting¶
The optimizer makes two passes over the plan tree:
Pass 1: Filter pushdown¶
Walks top-down. When it finds a FilterNode, it examines the filter's required columns and tries to push it past the node below:
| Child node | Rule |
|---|---|
ProjectNode |
Push if all filter columns exist in the project's input |
JoinNode |
If all filter columns come from the left side, push into left branch. If all from the right, push into right. If mixed, leave in place. |
AggNode, SortNode |
Leave in place (semantics change across aggregation) |
Compound filters ((a > 1) & (b < 5)) are split into independent filters to maximize pushdown.
Before: After:
Filter(region='EU') Join
Filter(segment='Ent') Filter(region='EU')
Join Scan(orders)
Scan(orders) Filter(segment='Ent')
Scan(customers) Scan(customers)
Pass 2: Column pruning¶
Walks top-down, tracking which columns are needed by nodes above:
- Root declares its output columns
- For each child, compute minimal columns needed (filter columns + parent columns + join keys)
- Insert
ProjectNodeatScanNodeto prune unused columns early
Complexity: Both passes are O(d) where d is tree depth.
Type inference for CSV: two-phase detection¶
Phase 1 — Basic types: each cell is tested against a type ladder: bool → int → float → str. The first successful parse wins. Per-column types are promoted using a lattice: int + float → float, anything + str → str, T + None → T(nullable).
Phase 2 — Datetime detection: columns still typed as str after phase 1 are tested column-wise. The first successfully-parsed value's format becomes a candidate, then all values are validated against that single format. If ≥80% parse, the column is typed as datetime and the detected format string is cached for fast parsing during streaming.
This avoids the cost of trying strptime with 14 formats on every cell — datetime detection runs once per column on the sample, not per-value.
Complexity: O(s × c) where s = sample size (default 100), c = column count.
Streaming: factory-based replay¶
File readers store a factory function, not data:
def make_rows():
with open(path) as f:
reader = csv.reader(f)
next(reader) # skip header
for row in reader:
yield cast(row) # one row at a time
Each execute() call invokes the factory, reopening the file and yielding a fresh generator. This means:
read_csv → filter → to_csvuses O(1) memory: one row in flight at a timeto_pylist()can be called repeatedly: the file is re-read each timehead(10)yields 10 rows, then the generator is garbage-collected (file closes)
The Stream class compiles transforms into a flat loop:
for row in source():
if not predicate(row): continue
row = row + (expr(row),)
row = (row[2], row[0])
writer.writerow(row)
No intermediate iterators or plan-tree overhead — ~30% faster than the LazyFrame pipeline for pure streaming.
Data representation¶
Rows are stored as Python tuples internally, not dicts:
- ~40% less memory than dicts (no key storage per row)
- Faster to create and iterate
- Hashable (needed for grouping and join keys)
Column-to-index mapping lives in the schema:
columns = ["name", "age", "score"]
col_map = {"name": 0, "age": 1, "score": 2}
row = ("Alice", 30, 95.5)
row[col_map["age"]] # → 30
Conversion to dicts happens only at the output boundary (to_pylist(), __iter__).
Expression evaluation¶
Expressions form an AST. Each node implements eval(row, col_map):
BinaryExpr(op=*, left=Col("price"), right=Col("qty"))
Col("price").eval(row, col_map) → row[2] → 25.0
Col("qty").eval(row, col_map) → row[3] → 4
result: 25.0 * 4 = 100.0
WhenExpr walks branches in order (identical to SQL CASE WHEN):
for condition, value in branches:
if condition.eval(row, col_map):
return value.eval(row, col_map)
return otherwise.eval(row, col_map)
Datetime accessor methods (col("ts").dt.year()) produce _DtUnaryExpr nodes that extract components via stdlib datetime attributes. Null values short-circuit to None.
Complexity: O(expression depth) per row — typically 1–5 levels.
Architecture¶
pyfloe/
├── schema.py (154 lines) LazySchema, ColumnSchema — type propagation
├── expr.py (740 lines) Expression AST — col, lit, when, .str, .dt, aggregations, windows
├── plan.py (660 lines) Plan nodes + optimizer (volcano model)
├── core.py (518 lines) LazyFrame, TypedLazyFrame, LazyGroupBy
├── io.py (610 lines) File readers/writers — CSV, TSV, JSONL, JSON, Parquet
├── stream.py (649 lines) from_iter, from_chunks, Stream pipeline
└── __init__.py (56 lines) Public API
─────────────
~3,400 lines total, zero dependencies
Summary of complexities¶
| Operation | Time | Memory | Algorithm |
|---|---|---|---|
filter, select, with_column |
O(n) streaming | O(1) | Generator chain (volcano) |
join |
O(n + m) | O(m) right side | Hash join |
join(sorted=True) |
O(n + m) | O(1) | Sort-merge join |
group_by().agg() |
O(n) | O(k groups) | Hash agg (running accumulators) |
group_by(sorted=True).agg() |
O(n) | O(1) | Sorted streaming agg |
sort |
O(n log n) | O(n) | Timsort |
window functions |
O(n log n) | O(n) | Sort-partition-scan |
explain / schema |
O(d tree depth) | O(d) | AST walk |
optimize |
O(d tree depth) | O(d) | Rule-based rewrite |
| CSV type inference | O(s × c) | O(s × c) | Type ladder + datetime detection |
| File streaming | O(n) | O(1) | Factory-based generator replay |