Skip to content

Quick Start Guide

Flowfile Logo

Get Started with Flowfile in 5 Minutes

Installation

Recommended quickstart: Install from PyPI

pip install flowfile

This installs everything you need - the Python API, visual editor, and all services.

Alternative Installation Methods

Desktop Application (Pre-built Installer)

Download the latest installer for your platform: - Windows: Flowfile-Setup.exe - macOS: Flowfile.dmg

Note: You may see security warnings since the installer isn't signed. On Windows, click "More info" → "Run anyway". On macOS, right-click → "Open" → confirm.

Development Setup (From Source)
# Clone repository
git clone https://github.com/edwardvaneechoud/Flowfile.git
cd Flowfile

# Install with Poetry
poetry install

# Start services
poetry run flowfile_worker  # Terminal 1 (port 63579)
poetry run flowfile_core    # Terminal 2 (port 63578)

# Start frontend
cd flowfile_frontend
npm install
npm run dev:web  # Terminal 3 (port 8080)

Choose Your Path

Non-Technical Users

Perfect for: Analysts, business users, Excel power users

No coding required!

  • ✅ Drag and drop interface
  • ✅ Visual data preview
  • ✅ Export to Excel/CSV
  • ✅ Built-in transformations
Start Visual Tutorial →

Technical Users

Perfect for: Developers, data scientists, engineers

Full programmatic control!

  • ✅ Polars-compatible API
  • ✅ Cloud storage integration
  • ✅ Version control friendly
  • ✅ Complex dynamic logic
Start Python Tutorial →

Quick Start for Non-Technical Users

Goal: Clean and analyze sales data without writing any code

Step 1: Start Flowfile, and create a Flow

Open your terminal (Command Prompt on Windows, Terminal on Mac) and type:

flowfile run ui
Your browser should automatically open to the Flowfile UI.

If the browser does not open automatically

If the browser does not open automatically, you can manually navigate to http://127.0.0.1:63578/ui#/main/designer in your web browser.

Creating your First Flow:

  1. Click "Create" to create a new data pipeline
  2. Click "Create New File Here"
  3. Name your flow (e.g., "Sales Data Analysis")
  4. Click on "Settings" in the top right to configure your flow
  5. Set the Execution mode to "Development"

Your should see now an empty flow: Flowfile InterfaceNew clean flow interface

Step 2: Load Your Data

Loading a CSV or Excel file:

  1. Find the "Read Data" node in the left panel under "Input"
  2. Drag it onto the canvas (center area)
  3. Click the node to open settings on the right
  4. Click "Browse" and select your file
  5. Configure options (if needed):
    • For CSV: Check "Has Headers" if your file has column names
    • For Excel: Select the sheet name
  6. Click "Run" (top toolbar) to load the data
  7. Click the node to preview your data in the bottom panel

Step 3: Clean Your Data

Let's remove duplicate records and filter for high-value transactions:

Remove Duplicates

  1. Drag "Drop Duplicates" node from Transform section
  2. Connect it to your Read Data node
  3. Select columns to check for duplicates
  4. Click Run

Filter Data

  1. Drag "Filter Data" node from Transform section
  2. Connect it to Drop Duplicates node
  3. Enter formula: [Quantity] > 7
  4. Click Run

Step 4: Analyze Your Data

Create a summary by city:

  1. Add a Group By node from the Aggregate section
  2. Connect it to your Filter node
  3. Configure the aggregation:
  4. Group by: city
  5. Aggregations:
    • gross income → Sum → Name it total_sales
    • gross income → Average → Name it avg_sale
    • gross income → Count → Name it number_of_sales
  6. Click Run to see your summary
Data after group by

Group By Configuration

Step 5: Save Your Results

Export your cleaned data:

  1. Add a "Write Data" node from Output section
  2. Connect it to your final transformation
  3. Choose format:
    • Excel: Best for sharing with colleagues
    • CSV: Best for Excel/Google Sheets
    • Parquet: Best for large datasets
  4. Set file path (e.g., cleaned_sales.xlsx)
  5. Click Run to save

Here's what your complete flow should look like:

Group By Configuration

Congratulations!

You've just built your first data pipeline! You can: - Save this flow using File → Save (creates a .flowfile) - Share it with colleagues who can run it without any setup - Schedule it to run automatically (coming soon) - Export as Python code if you want to see what's happening behind the scenes

Pro Tips for Non-Technical Users:

  • Use descriptions: Right-click nodes and add descriptions to document your work
  • Preview often: Click nodes after running to see data at each step
  • Start small: Test with a sample of your data first
  • Save versions: Save different versions of your flow as you build

Next Steps


Quick Start for Technical Users

Goal: Build a production-ready ETL pipeline with cloud integration

Step 1: Install and Import

pip install flowfile
import flowfile as ff
from flowfile import col, when, lit
import polars as pl  # Flowfile returns Polars DataFrames

Step 2: Build a Real-World ETL Pipeline

Let's build a production pipeline that reads from S3, transforms data, and writes results:

# Configure S3 connection (one-time setup)
from pydantic import SecretStr

import flowfile as ff

ff.create_cloud_storage_connection_if_not_exists(
    ff.FullCloudStorageConnection(
        connection_name="production-data",
        storage_type="s3",
        auth_method="access_key",
        aws_region="us-east-1",
        aws_access_key_id="AKIAIOSFODNN7EXAMPLE",
        aws_secret_access_key=SecretStr("wJalrXUtnFEMI/K7MDENG")
    )
)

Step 3: Extract and Transform

# Build the pipeline (lazy evaluation - no data loaded yet!)
import flowfile as ff
pipeline = (
    # Extract: Read partitioned parquet files from S3
    ff.scan_parquet_from_cloud_storage(
        "s3://data-lake/sales/year=2024/month=*", 
        connection_name="production-data",
        description="Load Q1-Q4 2024 sales data"
    )

    # Transform: Clean and enrich
    .filter(
        (ff.col("status") == "completed") & 
        (ff.col("amount") > 0),
        description="Keep only valid completed transactions"
    )

    # Add calculated fields
    .with_columns([
        # Business logic
        (ff.col("amount") * ff.col("quantity")).alias("line_total"),
        (ff.col("amount") * ff.col("quantity") * 0.1).alias("tax"),

        # Date features for analytics
        ff.col("order_date").dt.quarter().alias("quarter"),
        ff.col("order_date").dt.day_of_week().alias("day_of_week"),

        # Customer segmentation
        ff.when(ff.col("customer_lifetime_value") > 10000)
            .then(ff.lit("VIP"))
            .when(ff.col("customer_lifetime_value") > 1000)
            .then(ff.lit("Regular"))
            .otherwise(ff.lit("New"))
            .alias("customer_segment"),

        # Region mapping
        ff.when(ff.col("state").is_in(["CA", "OR", "WA"]))
            .then(ff.lit("West"))
            .when(ff.col("state").is_in(["NY", "NJ", "PA"]))
            .then(ff.lit("Northeast"))
            .when(ff.col("state").is_in(["TX", "FL", "GA"]))
            .then(ff.lit("South"))
            .otherwise(ff.lit("Midwest"))
            .alias("region")
    ], description="Add business metrics and segments")

    # Complex aggregation
    .group_by(["region", "quarter", "customer_segment"])
    .agg([
        # Revenue metrics
        ff.col("line_total").sum().alias("total_revenue"),
        ff.col("tax").sum().alias("total_tax"),

        # Order metrics
        ff.col("order_id").n_unique().alias("unique_orders"),
        ff.col("customer_id").n_unique().alias("unique_customers"),

        # Performance metrics
        ff.col("line_total").mean().round(2).alias("avg_order_value"),
        ff.col("quantity").sum().alias("units_sold"),

        # Statistical metrics
        ff.col("line_total").std().round(2).alias("revenue_std"),
        ff.col("line_total").quantile(0.5).alias("median_order_value")
    ])

    # Final cleanup
    .sort(["region", "quarter", "total_revenue"], descending=[False, False, True])
    .filter(ff.col("total_revenue") > 1000)  # Remove noise
)

# Check the execution plan (no data processed yet!)
print(pipeline.explain())  # Shows optimized Polars query plan

Step 4: Load and Monitor

# Option 1: Write to cloud storage
pipeline.write_parquet_to_cloud_storage(
    "s3://data-warehouse/aggregated/sales_summary_2024.parquet",
    connection_name="production-data",
    compression="snappy",
    description="Save aggregated results for BI tools"
)

# Option 2: Write to Delta Lake for versioning
pipeline.write_delta(
    "s3://data-warehouse/delta/sales_summary",
    connection_name="production-data",
    write_mode="append",  # or "overwrite"
    description="Append to Delta table"
)

# Option 3: Collect for analysis
df_result = pipeline.collect()  # NOW it executes everything!
print(f"Processed {len(df_result):,} aggregated records")
print(df_result.head())

Step 5: Advanced Features

Visualize Your Pipeline

# Open in visual editor
ff.open_graph_in_editor(pipeline.flow_graph)

# This shows your entire pipeline
# as a visual flow diagram!
Visual overview of pipeline

Flow visualized

Export as Pure Python

# Generate standalone code
code = pipeline.flow_graph.generate_code()

# Deploy without Flowfile dependency!
# Uses only Polars

Step 6: Production Patterns

Pattern 1: Data Quality Checks

from datetime import datetime
import flowfile as ff

def data_quality_pipeline(df: ff.FlowFrame) -> ff.FlowFrame:
    """Reusable data quality check pipeline"""

    # Record initial count
    initial_count = df.select(ff.col("*").count().alias("count"))

    # Apply quality filters
    clean_df = (
        df
        # Remove nulls in critical fields
        .drop_nulls(subset=["order_id", "customer_id", "amount"])

        # Validate data ranges
        .filter(
            (ff.col("amount").is_between(0, 1000000)) &
            (ff.col("quantity") > 0) &
            (ff.col("order_date") <= datetime.now())
        )

        # Remove duplicates
        .unique(subset=["order_id"], keep="first")
    )

    # Log quality metrics
    final_count = clean_df.select(ff.col("*").count().alias("count"))
    print(f"Initial count: {initial_count.collect()[0]['count']}")
    print(f"Final count after quality checks: {final_count.collect()[0]['count']}")
    return clean_df

Pattern 2: Incremental Processing

# Read only new data since last run
from datetime import datetime
import flowfile as ff
last_processed = datetime(2024, 10, 1)

incremental_pipeline = (
    ff.scan_parquet_from_cloud_storage(
        "s3://data-lake/events/",
        connection_name="production-data"
    )
    .filter(ff.col("event_timestamp") > last_processed)
    .group_by(ff.col("event_timestamp").dt.date().alias("date"))
    .agg([
        ff.col("event_id").count().alias("event_count"),
        ff.col("user_id").n_unique().alias("unique_users")
    ])
)

# Process and append to existing data
incremental_pipeline.write_delta(
    "s3://data-warehouse/delta/daily_metrics",
    connection_name="production-data",
    write_mode="append"
)

Pattern 3: Multi-Source Join

# Combine data from multiple sources
import flowfile as ff

customers = ff.scan_parquet_from_cloud_storage(
    "s3://data-lake/customers/",
    connection_name="production-data"
)

orders = ff.scan_csv_from_cloud_storage(
    "s3://raw-data/orders/",
    connection_name="production-data",
    delimiter="|",
    has_header=True
)

products = ff.read_parquet("local_products.parquet")

# Complex multi-join pipeline
enriched_orders = (
    orders
    .join(customers, on="customer_id", how="left")
    .join(products, on="product_id", how="left")
    .with_columns([
        # Handle missing values from left joins
        ff.col("customer_segment").fill_null("Unknown"),
        ff.col("product_category").fill_null("Other"),

        # Calculate metrics
        (ff.col("unit_price") * ff.col("quantity") * 
         (ff.lit(1) - ff.col("discount_rate").fill_null(0))).alias("net_revenue")
    ])
)

# Materialize results
results = enriched_orders.collect()

Next Steps for Technical Users


🌟 Why Flowfile?

⚡ Performance

Built on Polars - Uses the speed of Polars

🔄 Dual Interface

Same pipeline works in both visual and code. Switch anytime, no lock-in.

📦 Export to Production

Generate pure Python/Polars code. Deploy anywhere without Flowfile.

☁️ Cloud Support

Direct S3/cloud storage support, no need for expensive clusters to analyse your data


Troubleshooting

Installation Issues
# If pip install fails, try:
pip install --upgrade pip
pip install flowfile

# For M1/M2 Macs:
pip install flowfile --no-binary :all:

# Behind corporate proxy:
pip install --proxy http://proxy.company.com:8080 flowfile
Port Already in Use
# Find what's using port 63578
lsof -i :63578  # Mac/Linux
netstat -ano | findstr :63578  # Windows

# Kill the process or use different port:
FLOWFILE_PORT=8080 flowfile run ui

Get Help

Documentation
Full Documentation
Discussions
GitHub Discussions
Issues
GitHub Issues

Ready to Transform Your Data?

Join thousands of users building data pipelines with Flowfile