Writing Data
Flowfile provides Polars-compatible writers with additional cloud storage integration and visual workflow features.
Polars Compatibility
Local file writers work identically to Polars, plus optional description for visual documentation.
Local File Writing
CSV Files
import flowfile as ff
# Basic usage (same as Polars)
df = ff.read_csv("input.csv")
df.write_csv("output.csv")
# With Flowfile description
df.write_csv("processed_data.csv", description="Save cleaned customer data")
# Polars parameters work identically
df.write_csv(
"output.csv",
separator=";",
encoding="utf-8",
description="Export with semicolon delimiter"
)
Key Parameters (same as Polars):
separator: Field delimiter (default:,)encoding: File encoding (default:utf-8)
Parquet Files
# Basic usage
df.write_parquet("output.parquet")
# With description and compression
df.write_parquet(
"compressed_data.parquet",
description="Save with high compression",
compression="gzip"
)
Cloud Storage Writing
Flowfile extends writing capabilities with specialized cloud storage writers that integrate with secure connection management.
Unified Cloud Storage Writer
write_to_cloud_storage() is a single entry point for writing any supported format to cloud storage.
import flowfile as ff
# Write Parquet (default format)
ff.write_to_cloud_storage(
df, "s3://bucket/output.parquet",
connection_name="my-conn",
)
# Write CSV
ff.write_to_cloud_storage(
df, "s3://bucket/output.csv",
file_format="csv",
connection_name="my-conn",
delimiter=",",
)
# Write Delta with append mode
ff.write_to_cloud_storage(
df, "s3://warehouse/my_table",
file_format="delta",
connection_name="my-conn",
write_mode="append",
)
Parameters:
df: TheLazyFrameto writepath: Cloud storage destination pathfile_format:"csv","parquet","json", or"delta"(default:"parquet")connection_name: Name of the stored cloud storage connectiondelimiter: CSV field separator (default:;). Only used for CSVencoding: CSV encoding (default:utf8). Only used for CSVcompression: Parquet compression:"snappy","gzip","brotli","lz4","zstd"(default:"snappy"). Only used for Parquetwrite_mode:"overwrite"or"append"(default:"overwrite"). Only used for Delta
Recommended Approach
write_to_cloud_storage() is the recommended way to write to cloud storage. The format-specific methods below still work and are useful when you want a more concise call for a known format.
Format-Specific Cloud Writers
Cloud CSV Writing
# Write to S3
df.write_csv_to_cloud_storage(
"s3://my-bucket/output.csv",
connection_name="my-aws-connection",
delimiter=",",
encoding="utf8",
description="Export processed data to S3"
)
Parameters:
path: Full S3 path including bucket and file nameconnection_name: Name of configured cloud storage connectiondelimiter: CSV field separator (default:;)encoding: File encoding (utf8orutf8-lossy)
Cloud Parquet Writing
# Write to S3 with compression
df.write_parquet_to_cloud_storage(
"s3://data-lake/processed/results.parquet",
connection_name="data-lake-connection",
compression="snappy",
description="Save analysis results to data lake"
)
Parameters:
path: Full S3 path for the output fileconnection_name: Name of configured cloud storage connectioncompression: Compression algorithm (snappy,gzip,brotli,lz4,zstd)
Cloud JSON Writing
# Write JSON to cloud storage
df.write_json_to_cloud_storage(
"s3://api-data/export.json",
connection_name="api-storage",
description="Export for API consumption"
)
Delta Lake Writing
# Write Delta table (supports append mode)
df.write_delta(
"s3://warehouse/customer_dim",
connection_name="warehouse-connection",
write_mode="overwrite",
description="Update customer dimension table"
)
# Append to existing Delta table
new_data.write_delta(
"s3://warehouse/customer_dim",
connection_name="warehouse-connection",
write_mode="append",
description="Add new customers to dimension"
)
Parameters:
path: S3 path for the Delta tableconnection_name: Name of configured cloud storage connectionwrite_mode:overwrite(replace) orappend(add to existing)
Catalog Writing
Write data to the Flowfile catalog as managed Delta tables. Available as both a standalone function and a FlowFrame method.
Standalone Function
import flowfile as ff
# Resolve (or create) the target schema once
schema = ff.CatalogReference("sales", auto_create=True).schema("staging", auto_create=True)
ff.write_catalog_table(
df, "output_table",
schema=schema,
write_mode="upsert",
merge_keys=["id"],
)
Parameters:
df: TheLazyFrameto writetable_name: Name of the catalog table to write to (required)schema: ASchemaReferenceidentifying the target catalog/schema. Preferred overnamespace_id.write_mode: How to handle existing data (default:"overwrite"). See Write Modesmerge_keys: Column names for merge operations (required forupsert,update,delete)description: Optional description for the table
Catalog handles vs raw IDs
schema= accepts a SchemaReference — a validated, name-based handle that resolves to the underlying namespace id once. Construction fails fast if the catalog or schema doesn't exist (or creates it if auto_create=True). The legacy namespace_id=<int> keyword still works for back-compat, but you can't pass both.
FlowFrame Method
# Convenience: write through the schema handle
schema.write_table(df, "output_table", write_mode="overwrite")
# Or as a method on the FlowFrame
df.write_catalog_table(
"output_table",
schema=schema,
write_mode="overwrite",
)
Returns a new child FlowFrame representing the written data, allowing further chaining.
Database Writing
Write data to a SQL database using a stored connection. Available as a method on FlowFrame.
import flowfile as ff
df = ff.read_csv("data.csv")
df.write_database(
connection_name="my_db",
table_name="users",
schema_name="public",
if_exists="append",
)
Parameters:
connection_name: Name of the stored database connection (required)table_name: Name of the table to write to (required)schema_name: Database schema name (e.g.,"public")if_exists: What to do if the table exists:"append","replace", or"fail"(default:"append")description: Optional description for this operation
Returns a new child FlowFrame.
Write Modes
Overwrite vs Append
# Overwrite existing data (default)
df.write_parquet_to_cloud_storage(
"s3://bucket/data.parquet",
connection_name="conn",
write_mode="overwrite" # Default for most formats
)
# Append to existing (Delta Lake only)
df.write_delta(
"s3://warehouse/events",
connection_name="conn",
write_mode="append"
)
Append Mode
For cloud storage, append is only supported for Delta Lake format. Other formats always overwrite.
Catalog Write Modes
The catalog supports these write modes:
| Mode | Description |
|---|---|
overwrite |
Replace the entire table |
error |
Fail if the table already exists |
append |
Add rows to the existing table |
upsert |
Insert new rows or update existing rows matched by merge_keys |
update |
Update only existing rows matched by merge_keys |
delete |
Delete rows matching merge_keys |
virtual |
Create a virtual table — no data written to disk |
# Upsert: insert or update based on merge keys
ff.write_catalog_table(
df, "customers",
write_mode="upsert",
merge_keys=["customer_id"],
)
Merge Keys Required
The upsert, update, and delete modes require merge_keys to be specified.
Virtual Mode
The virtual write mode creates a catalog entry without materializing data to disk. When the virtual table is read, the producer flow is re-executed on demand. This requires the flow to be registered in the catalog. See Virtual Flow Tables for details.
Connection Requirements
All cloud storage writing requires a configured connection:
import flowfile as ff
from pydantic import SecretStr
# Set up connection before writing
ff.create_cloud_storage_connection_if_not_exists(
ff.FullCloudStorageConnection(
connection_name="data-lake",
storage_type="s3",
auth_method="access_key",
aws_region="us-east-1",
aws_access_key_id="your-key",
aws_secret_access_key=SecretStr("your-secret")
)
)
# Now you can write to cloud storage
df.write_parquet_to_cloud_storage(
"s3://data-lake/output.parquet",
connection_name="data-lake"
)