The Architecture Story: How Flowfile Bridges Code and Visual Worlds
📋 TL;DR - Key Takeaways
Key Points
- Two ways to build pipelines: Write Python code or use drag-and-drop UI - both create the same thing
- Settings-based design: Every transformation is just a configuration object (Pydantic model)
- Clear separation: Graph structure, settings, and execution are handled separately
- Happy accident: Started as a UI project, ended up with an architecture that works great for both UI and code
Navigation
- This page: Architecture overview and design decisions
- Core Developer Guide: Technical implementation details
- Python API: How to use Flowfile in your projects
👥 Who Should Read This?
Target Audience
- Contributors who want to understand the codebase
- Users curious about how things work internally
- Developers building similar dual-interface tools
- Anyone interested in bridging UI and code approaches
The Problem We Solved
Most data tools force you to choose: either use a visual interface (easy but limited) or write code (powerful but complex). We wanted both in the same tool.
The challenge: How do you make a visual drag-and-drop interface that creates the exact same pipelines as writing code?
The platform started with a clean, settings-based backend where every transformation is a declarative configuration object. This design is perfect for a UI. But developers don't think in configuration objects—they think in code:
# How developers want to write data code
df.filter(col("price") > 100).group_by("region").sum()
The breakthrough came from realizing that the Polars API would be able to convert to our settings object and therefore creating the same settings object that the UI creates. Both interfaces become different ways to build the same underlying configuration, giving developers the expressiveness they want while maintaining the structured settings the UI needs.
The result
How It Actually Happened
This wasn't some grand plan. I started building a drag-and-drop UI and needed a clean way to configure nodes. Settings objects made sense for the UI. But the development of Flowfile has never been a planned approach, it was just about building what sounded fun. Later, when looking at other projects, I realized I could just have the API methods create the same settings objects, well that is fun. Suddenly there were two equivalent interfaces almost by accident,. Since Polars does the actual data processing, our settings just configure what Polars should do. This turned out to be an easy abstraction layer that showed it's potential from the start.
The result is a Python API that constructs the exact same configuration objects as the visual editor:
- The Python API
df.filter(...)
translates directly to aNodeFilter
object - The Visual Editor creates an identical
NodeFilter
object through clicks and drags
Both interfaces are different ways to build the same Directed Acyclic Graph (DAG), providing the experience of a code-native API combined with the accessibility of a visual editor.
One Pipeline, Two Ways
Let's build the same pipeline using both approaches to see how they produce identical results.
Sample Data
import flowfile as ff
raw_data = [
{"id": 1, "region": "North", "quantity": 10, "price": 150},
{"id": 2, "region": "South", "quantity": 5, "price": 300},
{"id": 3, "region": "East", "quantity": 8, "price": 200},
{"id": 4, "region": "West", "quantity": 12, "price": 100},
{"id": 5, "region": "North", "quantity": 20, "price": 250},
{"id": 6, "region": "South", "quantity": 15, "price": 400},
{"id": 7, "region": "East", "quantity": 18, "price": 350},
{"id": 8, "region": "West", "quantity": 25, "price": 500},
]
Method 1: The Flowfile API (Developer Experience)
Code:
import flowfile as ff
from flowfile_core.flowfile.flow_graph import FlowGraph
graph: FlowGraph = ff.create_flow_graph()
# Create pipeline with fluent API
df_1 = ff.FlowFrame(raw_data, flow_graph=graph)
df_2 = df_1.with_columns(
flowfile_formulas=['[quantity] * [price]'],
output_column_names=["total"]
)
df_3 = df_2.filter(flowfile_formula="[total]>1500")
df_4 = df_3.group_by(['region']).agg([
ff.col("total").sum().alias("total_revenue"),
ff.col("total").mean().alias("avg_transaction"),
])
Inspecting the graph
Graph Introspection:
# Access all nodes that were created in the graph
print(graph._node_db)
# {1: Node id: 1 (manual_input),
# 3: Node id: 3 (formula),
# 4: Node id: 4 (filter),
# 5: Node id: 5 (group_by)}
# Find the starting node(s) of the graph
print(graph._flow_starts)
# [Node id: 1 (manual_input)]
# From every node, access the next node that depends on it
print(graph.get_node(1).leads_to_nodes)
# [Node id: 3 (formula)]
# The other way around works too
print(graph.get_node(3).node_inputs)
# NodeStepInputs(Left Input: None, Right Input: None,
# Main Inputs: [Node id: 1 (manual_input)])
# Access the settings and type of any node
print(graph.get_node(4).setting_input)
print(graph.get_node(4).node_type)
Method 2: Direct Graph Construction (What Happens Internally)
Code:
from flowfile_core.schemas import node_interface, transformation_settings, RawData
from flowfile_core.flowfile.flow_graph import add_connection
flow = ff.create_flow_graph()
# Node 1: Manual input
node_manual_input = node_interface.NodeManualInput(
flow_id=flow.flow_id,
node_id=1,
raw_data_format=RawData.from_pylist(raw_data)
)
flow.add_manual_input(node_manual_input)
# Node 2: Add formula for total
formula_node = node_interface.NodeFormula(
flow_id=1,
node_id=2,
function=transformation_settings.FunctionInput(
field=transformation_settings.FieldInput(
name="total",
data_type="Double"
),
function="[quantity] * [price]"
)
)
flow.add_formula(formula_node)
add_connection(flow,
node_interface.NodeConnection.create_from_simple_input(1, 2))
# Node 3: Filter high value transactions
filter_node = node_interface.NodeFilter(
flow_id=1,
node_id=3,
filter_input=transformation_settings.FilterInput(
filter_type="advanced",
advanced_filter="[total]>1500"
)
)
flow.add_filter(filter_node)
add_connection(flow,
node_interface.NodeConnection.create_from_simple_input(2, 3))
# Node 4: Group by region
group_by_node = node_interface.NodeGroupBy(
flow_id=1,
node_id=4,
groupby_input=transformation_settings.GroupByInput(
agg_cols=[
transformation_settings.AggColl("region", "groupby"),
transformation_settings.AggColl("total", "sum", "total_revenue"),
transformation_settings.AggColl("total", "mean", "avg_transaction")
]
)
)
flow.add_group_by(group_by_node)
add_connection(flow,
node_interface.NodeConnection.create_from_simple_input(3, 4))
Schema Inspection:
# Check the schema at any node
print([s.get_minimal_field_info() for s in flow.get_node(4).schema])
# [MinimalFieldInfo(name='region', data_type='String'),
# MinimalFieldInfo(name='total_revenue', data_type='Float64'),
# MinimalFieldInfo(name='avg_transaction', data_type='Float64')]
Both methods produce the exact same Polars execution plan:
This is the polars query plan generated by both methods:
```
AGGREGATE[maintain_order: false]
[col("total").sum().alias("total_revenue"),
col("total").mean().alias("avg_transaction")] BY [col("region")]
FROM
FILTER [(col("total")) > (1500)]
FROM
WITH_COLUMNS:
[[(col("quantity")) * (col("price"))].alias("total")]
DF ["id", "region", "quantity", "price"]; PROJECT 3/4 COLUMNS
```
Core Architecture
Three Fundamental Concepts
1. The DAG is Everything
Every Flowfile pipeline is a Directed Acyclic Graph where. This is captured in the FlowGraph
- Nodes are transformations (filter, join, group_by, etc.)
- Edges represent data flow between nodes
- Settings are Pydantic models configuring each transformation
2. Settings Drive Everything
Every node is composed of two parts: the Node class (a Pydantic BaseModel) that holds metadata and the Settings (often dataclasses) that configure the transformation:
Read more about Nodes and the transformations
# The Node: metadata and graph position
class NodeGroupBy(NodeSingleInput):
groupby_input: transform_schema.GroupByInput = None
class NodeSingleInput(NodeBase):
depending_on_id: Optional[int] = -1 # Parent node reference
class NodeBase(BaseModel):
flow_id: int
node_id: int
cache_results: Optional[bool] = False
pos_x: Optional[float] = 0
pos_y: Optional[float] = 0
description: Optional[str] = None
# ... graph metadata ...
# The Settings: transformation configuration (dataclass)
@dataclass
class GroupByInput:
"""Defines how to perform the group by operation"""
agg_cols: List[AggColl]
@dataclass
class AggColl:
"""Single aggregation operation"""
old_name: str # Column to aggregate
agg: str # Aggregation function ('sum', 'mean', etc.)
new_name: Optional[str] # Output column name
output_type: Optional[str] = None
Settings Power The Backend
This dual structure—Nodes for graph metadata, Settings for transformation logic—drives the backend:
- 🔧 Code generation (method signatures match settings)
- 💾 Serialization (graphs can be saved/loaded)
- 🔮 Schema prediction (output types are inferred from AggColl)
- 🎨 UI structure (defines what the frontend needs to collect, though forms are manually built)
3. Execution is Everything
The FlowDataEngine
orchestrates everything about execution. While the DAG defines structure and settings define configuration, FlowDataEngine is the runtime brain that makes it all happen.
FlowDataEngine handles: - Compute location (worker service vs local execution) - Caching strategy (when to materialize, where to store) - Schema caching (avoiding redundant schema calculations) - Lazy vs eager evaluation (performance vs debugging modes) - Data movement (passing LazyFrames between transformations)
This separation is powerful: the DAG remains a pure specification, settings stay declarative, and FlowDataEngine owns all execution concerns. It wraps a Polars LazyFrame/DataFrame but is really the execution orchestrator—deciding where, when, and how transformations run.
Understanding FlowNode
The FlowNode
class is the heart of each transformation in the graph. Each node encapsulates everything needed for a single transformation step:
Core FlowNode Components
Essential State:
_function
: The closure containing the transformation logicleads_to_nodes
: List of downstream nodes that depend on this onenode_information
: Metadata (id, type, position, connections)_hash
: Unique identifier based on settings and parent hashes
Runtime State:
results
: Holds the resulting data, errors, and example data pathsnode_stats
: Tracks execution status (has_run, is_canceled, etc.)node_settings
: Runtime settings (cache_results, streamable, etc.)state_needs_reset
: Flag indicating if the node needs recalculation
Schema Information:
node_schema
: Input/output columns and predicted schemasschema_callback
: Function to calculate schema without execution
The beauty is that FlowNode doesn't know about specific transformations—it just orchestrates the execution of its _function
closure with the right inputs and manages the resulting state.
Flowfile: The Use of Closures
When a method like .filter()
is called, no data is actually filtered. Instead, a FlowNode
is created containing a function—a closure that remembers its settings.
Visual: How Closures Build the Execution Chain
graph LR
subgraph "Node 1: manual_input"
direction TB
settings1("<b>Settings</b><br/>raw_data = [...]")
func1("<b>_func()</b><br/><i>closure</i>")
settings1 -.-> |remembered by| func1
end
subgraph "Node 2: with_columns<br/>(formula)"
direction TB
settings2("<b>Settings</b><br/>formula = '[q] * [p]'")
func2("<b>_func(fl)</b><br/><i>closure</i>")
settings2 -.-> |remembered by| func2
end
subgraph "Node 3: filter"
direction TB
settings3("<b>Settings</b><br/>filter = '[total] > 1500'")
func3("<b>_func(fl)</b><br/><i>closure</i>")
settings3 -.-> |remembered by| func3
end
subgraph "Node 4: group_by"
direction TB
settings4("<b>Settings</b><br/>agg = sum(total)")
func4("<b>_func(fl)</b><br/><i>closure</i>")
settings4 -.-> |remembered by| func4
end
Result([Schema / Data])
func1 ==> |FlowDataEngine| func2
func2 ==> |FlowDataEngine| func3
func3 ==> |FlowDataEngine| func4
func4 ==> |Final FlowDataEngine<br/>with full LazyFrame plan| Result
Each _func
is a closure that wraps around the previous one, building up a chain. The beauty is that Polars can track the schema through this entire chain without executing any data transformations—it just builds the query plan!
The Closure Pattern in Practice
Here's how closures are actually created in FlowGraph:
# From the FlowGraph implementation
def add_group_by(self, group_by_settings: input_schema.NodeGroupBy):
# The closure: captures group_by_settings
def _func(fl: FlowDataEngine) -> FlowDataEngine:
return fl.do_group_by(group_by_settings.groupby_input, False)
self.add_node_step(
node_id=group_by_settings.node_id,
function=_func, # This closure remembers group_by_settings!
node_type='group_by',
setting_input=group_by_settings,
input_node_ids=[group_by_settings.depending_on_id]
)
def add_union(self, union_settings: input_schema.NodeUnion):
# Another closure: captures union_settings
def _func(*flowfile_tables: FlowDataEngine):
dfs = [flt.data_frame for flt in flowfile_tables]
return FlowDataEngine(pl.concat(dfs, how='diagonal_relaxed'))
self.add_node_step(
node_id=union_settings.node_id,
function=_func, # This closure has everything it needs
node_type='union',
setting_input=union_settings,
input_node_ids=union_settings.depending_on_ids
)
Each _func
is a closure that captures its specific settings. When these functions are composed during execution, they form a chain:
# Conceptual composition of the closures
result = group_by._func(
filter._func(
formula._func(
manual_input._func()
)
)
)
# Result is a FlowDataEngine with a LazyFrame that knows its schema
print(result.data_frame.collect_schema())
# Schema([('region', String), ('total_revenue', Float64), ('avg_transaction', Float64)])
Why This Works
- Each
_func
is a closure containing the node's settings - Functions only need FlowDataEngine as input (or multiple for joins/unions)
- LazyFrame tracks schema changes through the entire chain
- No data is processed—Polars just builds the query plan
The result: instant schema feedback without running expensive computations!
Fallback: Schema Callbacks
For nodes that can't infer schemas automatically (external data sources), each FlowNode can have a schema_callback
:
def schema_callback(settings, input_schema):
"""Pure function: settings + input schema → output schema"""
# Calculate output schema without data
return new_schema
Execution Methods
Flowfile offers flexible execution strategies depending on your needs:
🚀 Available Execution Methods
Performance Mode
When to use: Production pipelines, large datasets
# Get the final result efficiently
result = flow.get_node(final_node_id).get_resulting_data()
Characteristics:
- ⚡ Pull-based execution from the final node
- 🎯 Polars optimizes the entire pipeline
- 💨 Data flows once through optimized plan
- 🚫 No intermediate materialization
Development Mode
When to use: Debugging, inspection, incremental development
# Execute with caching enabled
import flowfile as ff
flow = ff.create_flow_graph()
flow.flow_settings.execution_mode = "Development"
# Add transformations here
flow.run_graph()
# Inspect intermediate results
node_3_result = flow.get_node(3).results.get_example_data()
flow.get_node(3).needs_run(performance_mode=False) # False
Characteristics: - 📝 Push-based execution in topological order - 💾 Each node's output written to disk - 🔍 Inspect any intermediate result - 🔄 When re-running the flow, only the steps that have changed(directly and indirectly) will run
Explain Plan
When to use: Optimization, understanding deeply the execution plan.
!!! warning This feature uses directly the Polars implementation, when the full flow cannot be fully converted to Polars, it will show partial executions.
# See what Polars will actually do
plan = flow.get_node(node_id).get_resulting_data().data_frame.explain()
print(plan)
Characteristics: - 📊 Shows optimized query plan - 🔍 Understand Polars optimizations - 📈 Identify performance bottlenecks - 🎯 No actual execution
System Architecture
Service Architecture
graph LR
subgraph "Frontend"
A[Designer<br/>Vue/Electron]
end
subgraph "Backend"
B[Core Service<br/>FastAPI]
C[Worker Service<br/>FastAPI]
end
subgraph "Storage"
D[Arrow IPC<br/>Cache]
end
A <-->|Settings/Schema| B
B <-->|Execution| C
C <-->|Data| D
Service Responsibilities
Designer: - Visual graph building interface - Node configuration forms (manually implemented) - Real-time schema feedback
Core: - DAG management - Execution orchestration - Schema prediction
Worker: - Polars transformations - Data caching (Arrow IPC) - Isolated from Core for scalability
Project Structure
flowfile/
├── flowfile_core/
│ ├── nodes/ # Node implementations
│ ├── schemas/ # Pydantic models
│ └── flowfile/ # Graph management
├── flowfile_worker/
│ ├── execution/ # Polars execution
│ └── cache/ # Arrow IPC caching
└── flowfile_frontend/
├── components/ # Vue components
└── electron/ # Desktop app
Contributing
Current State of Node Development
While the backend architecture elegantly uses settings-driven nodes, adding new nodes requires work across multiple layers. The frontend currently requires manual implementation for each node type—the visual editor doesn't automatically generate forms from Pydantic schemas yet.
However, there are also opportunities for more focused contributions! Integration with databases and cloud services is needed—these are smaller, more targeted tasks since the core structure is already in place. There's a lot of active development happening, so it's an exciting time to contribute!
Adding a New Node: The Full Picture
Adding a node isn't as simple as defining settings and a function. Here's what's actually required:
Backend Requirements
- Define the Pydantic settings model in
schemas/
- Implement the transformation method on
FlowDataEngine
- Add the node method to
FlowGraph
(e.g.,add_custom_transform()
) - Create the closure function that captures settings
- Define schema callbacks for predicting output schemas
- Register the node in the node registry
Example of what's really needed in FlowGraph:
def add_custom_transform(self, transform_settings: input_schema.NodeCustomTransform):
# Create the closure that captures settings
def _func(fl: FlowDataEngine) -> FlowDataEngine:
return fl.do_custom_transform(transform_settings.transform_input)
# Register with the graph
self.add_node_step(
node_id=transform_settings.node_id,
function=_func,
node_type='custom_transform',
setting_input=transform_settings,
input_node_ids=[transform_settings.depending_on_id]
)
# Don't forget schema prediction!
node = self.get_node(transform_settings.node_id)
# ... schema callback setup ...
Frontend Requirements
Currently, you'll need to:
- Create a new Vue component for the node's configuration form
- Handle the visual representation in the graph editor
- Map the UI inputs to the backend settings structure
- Add the node type to the visual editor's palette
This manual process ensures full control over the UI/UX but requires significant development effort.
Future Vision
The goal is to eventually auto-generate UI from Pydantic schemas, which would complete the settings-driven architecture. This would make adding new nodes closer to just defining the backend settings and transformation logic, with the UI automatically following.
The beauty of Flowfile's architecture—discovered through the organic evolution from a UI-first approach—is that even though adding nodes requires work across multiple layers today, the settings-based design provides a clear contract between visual and code interfaces.
I hope you enjoyed learning about Flowfile's architecture and found the dual-interface approach as exciting as I do! If you have questions, ideas, or want to contribute, ] feel free to reach out via GitHub or check our Core Developer Guide. Happy building!