Python API Quick Start
Get up and running with Flowfile's Python API in 5 minutes.
Installation
pip install flowfile
Your First Pipeline
import flowfile as ff
# Load data
df = ff.read_csv("sales.csv", description="Load sales data")
# Transform
result = (
df
.filter(ff.col("amount") > 100, description="Filter large sales")
.with_columns([
(ff.col("amount") * 1.1).alias("amount_with_tax")
], description="Add tax calculation")
.group_by("region")
.agg([
ff.col("amount").sum().alias("total_sales"),
ff.col("amount").mean().alias("avg_sale")
])
)
# Get results as Polars DataFrame
data = result.collect()
print(data)
# Visualize in the UI
ff.open_graph_in_editor(result.flow_graph)
Key Concepts
FlowFrame
Your data container - like a Polars LazyFrame but tracks all operations:
# Create from various sources
df = ff.FlowFrame({"col1": [1, 2, 3]}) # From dict
df = ff.read_csv("file.csv") # From CSV
df = ff.read_parquet("file.parquet") # From Parquet
Always Lazy
Operations don't execute until you call .collect()
:
# These operations just build the plan
df = ff.read_csv("huge_file.csv")
df = df.filter(ff.col("status") == "active")
df = df.select(["id", "name", "amount"])
# Now it executes everything efficiently
result = df.collect()
Descriptions
Document your pipeline as you build:
df = (
ff.read_csv("input.csv", description="Raw customer data")
.filter(ff.col("active") == True, description="Keep active only")
.drop_duplicates(description="Remove duplicates")
)
Common Operations
Filtering
# Polars style
df.filter(ff.col("age") > 21)
# Flowfile formula style
df.filter(flowfile_formula="[age] > 21 AND [status] = 'active'")
Adding Columns
# Standard way
df.with_columns([
(ff.col("price") * ff.col("quantity")).alias("total")
])
# Formula syntax
df.with_columns(
flowfile_formulas=["[price] * [quantity]"],
output_column_names=["total"]
)
Grouping & Aggregation
df.group_by("category").agg([
ff.col("sales").sum().alias("total_sales"),
ff.col("sales").mean().alias("avg_sales"),
ff.col("id").count().alias("count")
])
Joining
customers = ff.read_csv("customers.csv")
orders = ff.read_csv("orders.csv")
result = customers.join(
orders,
left_on="customer_id",
right_on="cust_id",
how="left"
)
Cloud Storage
from pydantic import SecretStr
# Set up S3 connection
ff.create_cloud_storage_connection_if_not_exists(
ff.FullCloudStorageConnection(
connection_name="my-s3",
storage_type="s3",
auth_method="access_key",
aws_region="us-east-1",
aws_access_key_id="your-key",
aws_secret_access_key=SecretStr("your-secret")
)
)
# Read from S3
df = ff.scan_parquet_from_cloud_storage(
"s3://bucket/data.parquet",
connection_name="my-s3"
)
# Write to S3
df.write_parquet_to_cloud_storage(
"s3://bucket/output.parquet",
connection_name="my-s3"
)
Visual Integration
Open in Editor
# Build pipeline in code
pipeline = ff.read_csv("data.csv").filter(ff.col("value") > 100)
# Open in visual editor
ff.open_graph_in_editor(pipeline.flow_graph)
Start Web UI
# Launch the web interface
ff.start_web_ui() # Opens browser automatically
Complete Example
import flowfile as ff
# Build a complete ETL pipeline
pipeline = (
ff.read_csv("raw_sales.csv", description="Load raw sales")
.filter(ff.col("amount") > 0, description="Remove invalid")
.with_columns([
ff.col("date").str.strptime(ff.Date, "%Y-%m-%d"),
(ff.col("amount") * ff.col("quantity")).alias("total")
], description="Parse dates and calculate totals")
.group_by([ff.col("date").dt.year().alias("year"), "product"])
.agg([
ff.col("total").sum().alias("revenue"),
ff.col("quantity").sum().alias("units_sold")
])
.sort("revenue", descending=True)
)
# Execute and get results
results = pipeline.collect()
print(results)
# Visualize the pipeline
ff.open_graph_in_editor(pipeline.flow_graph)
# Save results
pipeline.write_parquet("yearly_sales.parquet")
Next Steps
- 📖 Core Concepts - Understand FlowFrame and FlowGraph
- 📚 API Reference - Detailed documentation
- 🎯 Tutorials - Real-world examples
- 🔄 Visual Integration - Working with the UI
Tips
- Use descriptions - They appear in the visual editor
- Think lazy - Build your entire pipeline before collecting
- Leverage formulas - Use
[column]
syntax for simpler expressions - Visualize often -
open_graph_in_editor()
helps debug - Check schemas - Use
df.schema
to see structure without running
Ready for more? Check out the full API reference or learn about core concepts. Or want to see another example? Checkout the quickstart guide!