AI Integration Architecture
This page is the developer's tour of the AI subsystem under flowfile_core/flowfile_core/ai/ and flowfile_frontend/src/renderer/app/features/ai/. Read AI Assistant and Provider Setup first if you need user-facing context.
The AI integration is organised in five layers: routes translate HTTP into Pydantic shapes, sessions/context hold per-run state and assemble prompts, the planner runs the agent loop, tools validate and stage graph edits, and providers abstract the LLM. A handful of cross-cutting modules — feature flag, prompt log, scheduler, audit, safety — wrap the whole thing.
Module map
flowfile_core/flowfile_core/ai/
├── routes.py # Aggregates all sub-routers; flag gate
├── feature_flag.py # FEATURE_FLAG_AI dependency
├── admin_routes.py # POST /system/feature_flags/ai
│
├── chat_routes.py # /ai/chat/stream + /ai/chat/preview
├── agent_routes.py # /ai/agent/* (start/resume/followup/abort/state)
├── inline_action_routes.py # /ai/inline_action
├── command_palette_routes.py # /ai/command_palette
├── autocomplete_routes.py # /ai/autocomplete/{formula,join_keys}
├── suggest_next_node_routes.py # /ai/suggest_next_node
├── lineage_routes.py # /ai/lineage_question
├── run_failure_routes.py # /ai/explain_run_failure
├── docgen_routes.py # /ai/generate_documentation
├── intent_router_routes.py # /ai/route (chat ↔ agent classifier)
├── diff_routes.py # /ai/diff/{stage,{id}/accept,{id}/reject}
├── byok_routes.py # /ai/providers (CRUD + test)
│
├── sessions.py # AgentSession, snapshot, drift detection
├── session_store.py # In-memory + disk-backed repos
├── byok.py # get_configured_provider, env fallback
├── credentials.py # ProviderCredential CRUD, Fernet
├── intent_router.py # Lightweight chat-vs-edit classifier
├── command_palette.py # Cmd+K orchestration
├── autocomplete.py # Formula / join-key suggestions
├── suggest_next_node.py # Ghost-node suggestions
│
├── agents/
│ ├── planner.py # run_planner_session — the agent loop
│ ├── copilot.py # Chat surface helpers
│ ├── assist.py # Inline / cmd_k assist helpers
│ └── live_observation.py # agent_live runtime feedback
│
├── context/
│ ├── builder.py # render_prompt_context
│ ├── budget.py # Token-budget truncation
│ └── mentions.py # @flow / @node:id parsing
│
├── tools/
│ ├── executor.py # execute_tool_call dispatcher
│ ├── registry.py # build_tool_catalog (per-surface narrowing)
│ ├── graph_ops.py # add_*, connect, delete_*
│ ├── schema_ops.py # update_column_*
│ ├── codegen_ops.py # regenerate / refactor for code nodes
│ ├── meta_ops.py # classify_intent, pick_*, fill_*, verify
│ ├── classification.py # node-type classifier
│ ├── predictor.py # Schema prediction (mirror + dry-run)
│ ├── dry_run.py # Polars / Python dry-run cache
│ └── node_docs.py # NODE_AGENT_PAYLOAD_EXAMPLES
│
├── providers/
│ ├── base.py # Provider Protocol, Message, ToolCall, ToolSpec
│ ├── _litellm_base.py # LiteLLMProvider base
│ ├── registry.py # PROVIDERS dict, provider_factory
│ ├── anthropic.py # AnthropicProvider
│ ├── openai.py # OpenAIProvider
│ ├── google.py # GoogleProvider
│ ├── groq.py # GroqProvider
│ ├── openrouter.py # OpenRouterProvider
│ └── ollama.py # OllamaProvider
│
├── prompts/ # Markdown system prompts
│ ├── base.md, copilot.md, planner.md, assist.md
│ └── stage_{plan,classify,pick_type,pick_upstream,fill_settings,verify_completion}.md
│
├── diff.py # GraphDiff, bundle_staged_results, apply_diff
├── diff_store.py # Diff persistence
├── audit.py # AuditEvent + record_event
├── safety.py # PII regex scrub, samples prep
├── streaming.py # SSE wrapping for async generators
├── scheduler.py # Per-provider RPM/RPD scheduler
├── prompt_log.py # Daily JSONL prompt log
└── replay_buffer.py # Per-session replay buffer
Frontend mirror:
flowfile_frontend/src/renderer/app/
├── api/ai.api.ts # Axios client for /ai/*
├── stores/
│ ├── ai-store.ts # Chat drawer + mode toggle
│ ├── ai-agent-store.ts # Agent session state + events
│ ├── ai-autocomplete-store.ts # Formula/join suggestions
│ ├── ai-command-palette-store.ts # Cmd+K state
│ ├── ai-diff-store.ts # Pending diffs
│ └── ai-ghost-node-store.ts # Ghost-node suggestions
├── features/ai/
│ ├── AiAssistant.vue # Drawer (chat + agent panel)
│ ├── AiAgentRun.vue / AiAgentEvent.vue
│ ├── AiCommandPalette.vue
│ ├── AiInlineActions.vue
│ ├── AiDiffPanel.vue / AiDiffPreview.vue
│ ├── AiGhostNode.vue
│ ├── AiMentionAutocomplete.vue / mentionVocabulary.ts
│ └── markdown.ts (+ test)
└── views/AiProvidersView/AiSettingsTab.vue # BYOK panel
Layer overview
flowchart TD
UI[Frontend stores<br/>ai-store / ai-agent-store / ai-diff-store]
R[Routes layer<br/>chat / agent / inline_action / cmd_k / autocomplete /<br/>suggest_next_node / lineage / run_failure / docgen / diff / byok]
S[Sessions + Context<br/>sessions.py · session_store.py · context/builder.py]
P[Planner loop<br/>agents/planner.py — run_planner_session]
T[Tools<br/>tools/executor.py · registry.py · predictor.py · meta_ops]
PR[Providers<br/>providers/_litellm_base.py + per-vendor subclasses]
L[litellm → LLM]
UI --> R
R --> S
R --> P
P --> T
P --> PR
T -.staged_payload.-> P
PR --> L
S --> P
The agent loop calls into both tools (to stage graph edits) and providers (to ask the LLM what to do next). Sessions and context are read by both routes (for setup) and the planner (during the loop).
Feature flag
flowfile_core/flowfile_core/ai/feature_flag.py exposes:
is_ai_enabled() -> bool— live read ofsettings.FEATURE_FLAG_AI, aMutableBoolinitialised from theFEATURE_FLAG_AIenv var at process start (default"1"— on; setFEATURE_FLAG_AI=falseto opt out).require_ai_enabled()— FastAPI dependency raisingHTTPException(503)when the flag is off.
The dependency is wired once on the aggregate router:
# flowfile_core/flowfile_core/ai/routes.py
router = APIRouter(dependencies=[Depends(require_ai_enabled)])
router.include_router(byok_router)
router.include_router(chat_router)
# ... all other AI sub-routers
So every AI endpoint inherits the gate without per-route boilerplate. The MutableBool can also be flipped at runtime via POST /system/feature_flags/ai (see admin_routes.py) without restarting the process — useful for incident response.
Routes catalog
| Route | Method | Request | Response | Source |
|---|---|---|---|---|
/ai/health |
GET | — | {"status": "skeleton"} |
routes.py |
/ai/chat/stream |
POST | ChatStreamRequest |
SSE text stream | chat_routes.py |
/ai/chat/preview |
POST | ChatStreamRequest |
ChatPreviewResponse (no LLM call) |
chat_routes.py |
/ai/agent/start |
POST | AgentStartRequest |
SSE planner events | agent_routes.py |
/ai/agent/{session_id}/resume |
POST | AgentResumeRequest |
SSE planner events | agent_routes.py |
/ai/agent/{session_id}/followup |
POST | AgentFollowupRequest |
SSE planner events | agent_routes.py |
/ai/agent/{session_id}/abort |
POST | — | AgentAbortResponse |
agent_routes.py |
/ai/agent/{session_id} |
GET | — | AgentStateResponse |
agent_routes.py |
/ai/inline_action |
POST | InlineActionRequest |
SSE text stream | inline_action_routes.py |
/ai/command_palette |
POST | CommandPaletteRequest |
CommandPaletteResponse |
command_palette_routes.py |
/ai/route |
POST | RouteRequest |
RouteResponse |
intent_router_routes.py |
/ai/autocomplete/formula |
POST | FormulaAutocompleteRequest |
JSON | autocomplete_routes.py |
/ai/autocomplete/join_keys |
POST | JoinKeyAutocompleteRequest |
JSON | autocomplete_routes.py |
/ai/suggest_next_node |
POST | SuggestNextNodeRequest |
NextNodeSuggestionsResponse |
suggest_next_node_routes.py |
/ai/lineage_question |
POST | LineageQuestionRequest |
SSE text stream | lineage_routes.py |
/ai/explain_run_failure |
POST | ExplainRunFailureRequest |
SSE text stream | run_failure_routes.py |
/ai/generate_documentation |
POST | GenerateDocumentationRequest |
SSE markdown stream | docgen_routes.py |
/ai/diff/stage |
POST | StageDiffRequest |
StageDiffResponse |
diff_routes.py |
/ai/diff/{id}/accept |
POST | AcceptDiffRequest |
AcceptDiffResponse |
diff_routes.py |
/ai/diff/{id}/reject |
POST | — | RejectDiffResponse |
diff_routes.py |
/ai/providers |
GET | — | list[ProviderListItem] |
byok_routes.py |
/ai/providers/{name} |
POST | ProviderCredentialInput |
ProviderCredentialPublic |
byok_routes.py |
/ai/providers/{name} |
DELETE | — | 204 |
byok_routes.py |
/ai/providers/{name}/test |
POST | — | ProviderTestResult |
byok_routes.py |
/system/feature_flags/ai |
POST/GET | FeatureFlagState |
FeatureFlagState |
admin_routes.py |
Streaming endpoints emit Server-Sent Events with a 15-second keepalive frame so the connection survives long thinking. See streaming.py.
Provider abstraction
The Provider Protocol lives in providers/base.py:
class Provider(Protocol):
name: ClassVar[str]
default_model: ClassVar[str]
supports_tools: ClassVar[bool]
supports_streaming: ClassVar[bool]
surface_models: ClassVar[dict[str, str]]
async def chat(
self,
*,
messages: list[Message],
tools: list[ToolSpec] | None = None,
max_tokens: int | None = None,
...
) -> ChatResponse: ...
async def stream(
self,
*,
messages: list[Message],
tools: list[ToolSpec] | None = None,
...
) -> AsyncIterator[StreamChunk]: ...
LiteLLMProvider (in _litellm_base.py) is the concrete base. It builds a litellm-compatible kwargs dict, calls litellm.acompletion (lazily imported — the module deliberately keeps litellm out of the import graph until first use), and translates the response back into ChatResponse / StreamChunk. The base class is also where the prompt log hooks in.
Per-vendor subclasses override class-level fields:
| Class | name |
model_prefix |
default_model |
Notable surface defaults |
|---|---|---|---|---|
AnthropicProvider |
anthropic |
anthropic/ |
claude-sonnet-4-6 |
Haiku 4.5 for Cmd+K / ghost / autocomplete / agent_staged; Opus 4.7 for agent_complex |
OpenAIProvider |
openai |
"" |
gpt-4.1-mini |
gpt-4.1 for explain / agent_complex / docgen / lineage |
GoogleProvider |
google |
gemini/ |
gemini-2.5-flash |
gemini-2.5-pro for agent_complex |
GroqProvider |
groq |
groq/ |
qwen/qwen3-coder-30b-a3b-instruct |
One model across surfaces |
OpenRouterProvider |
openrouter |
openrouter/ |
qwen/qwen3-coder-30b-a3b-instruct |
meta-llama/llama-3.3-70b-instruct for agent_staged (free tier) |
OllamaProvider |
ollama |
ollama_chat/ |
llama3.1:8b |
llama3.1:70b for agent_complex. default_api_base="http://localhost:11434" |
provider_factory(name, *, model, surface, api_key, api_base) in providers/registry.py is the construction entry point. The PROVIDERS dict registers all six classes; list_supported_providers() returns the names in registration order.
BYOK and model resolution
flowfile_core/flowfile_core/ai/byok.py::get_configured_provider(db, user_id, provider, *, surface=None, model=None) is the runtime entry point that downstream code calls when it needs a configured Provider. The resolution rules:
Credential lookup. get_provider_credential(db, user_id, provider) returns the (encrypted) credential row. If a row exists, the API key is decrypted via Fernet, api_base is read from the row, and cred.default_model is consulted.
Model resolution order:
- Explicit
model=argument (caller wins — e.g., per-request override from the chat drawer). cred.default_model(user preference).cls.surface_models[surface]if it appears in the user's curatedcred.modelslist.- First entry of
cred.models(curated list default). cls.surface_models[surface](class-level per-surface default).cls.default_model(terminal fallback).
Steps 3 and 4 (added in workstream W29) make multi-model curation work for OpenRouter and similar providers — users can pin a list of free-tier models and still get sensible per-surface routing when one of those models matches the surface preference.
Env-var fallback. If no credential row exists, provider_factory(name) is called with no api_key / api_base; litellm picks up the standard env var (e.g., ANTHROPIC_API_KEY). detect_env_fallback(provider) exists separately so GET /ai/providers can show users the distinction between "configured via key" and "configured via env" in the UI.
Failure mode. ProviderNotConfiguredError is raised only when the provider is completely unset — no row, no env var, and not Ollama (which has a sensible default_api_base).
Credentials live in flowfile_core.ai.credentials. The DB row is keyed (user_id, provider). The encrypted key is stored in the secrets table; the credential row carries api_key_secret_id. Deletion drops both atomically.
Sessions and context
AgentSession
Defined in flowfile_core/flowfile_core/ai/sessions.py. The shape (abridged):
class AgentSession(BaseModel):
session_id: str
flow_id: int
user_id: int
surface: PlannerSurface # "agent_complex" | "agent_staged" | "agent_live"
status: SessionStatus # "running" | "paused_drift" | "paused_user_action" |
# "awaiting_user_input" | "completed" | "aborted" | "failed"
samples_mode: SamplesMode # "off" | "regex"
stage: PlannerStage # "plan" | "classify" | "pick_type" | "pick_upstream" |
# "fill_settings" | "single_stage_op" | "verify_completion"
picked_op_kind: PlannerOpKind | None # "add" | "modify" | "delete" | "connect" |
# "disconnect" | "other"
picked_node_type: str | None
messages: list[Message] # full LLM conversation
snapshot: GraphSnapshot # graph state at start, for drift detection
staged_results: list[ToolExecutionResult]
diff_id: str | None
step_count: int
max_steps: int
rationale: str | None
pause_reason: str | None
drift_detail: DriftDetail | None
Sessions are persisted via session_store.py. The default repo is in-memory; a disk-backed DiskSessionRepository writes JSON files under base_directory/ai_sessions/ so a session survives a backend restart and the user can re-attach via POST /ai/agent/{session_id}/resume.
Drift detection
GraphSnapshot captures the set of node ids and node types at session start. Before each tool dispatch, the planner re-snapshots the live graph and compares — if the id set changed (or a node type flipped) the session flips to paused_drift, emits a drift_detected event, and waits for the user to pick Resume or Discard. This is intentionally id-set-only — settings drift was deliberately removed because it produced too many false-positive pauses.
Context assembly
context/builder.py::render_prompt_context(flow, pinned_node_ids, surface, samples_mode) produces:
- A system prompt assembled from
prompts/base.mdplus the surface-specific overlay (prompts/copilot.md,prompts/planner.md,prompts/stage_*.md). - A user message containing a deterministic JSON-markdown rendering of the flow: every node's id, type, settings, predicted output schema, plus optional sample rows. The walk is BFS upstream from pinned nodes (or the whole flow when nothing is pinned), bounded by
context/budget.py's token budget.
context/mentions.py parses @flow and @node:42 references in user messages; mentions become pin signals for the BFS.
The planner state machine
agents/planner.py::run_planner_session(session, flow, provider, max_tokens, max_retries) -> AsyncIterator[PlannerEvent] is the agent loop. It's an async generator that yields one PlannerEvent per significant step.
Event types
PlannerEventName = Literal[
"thinking",
"tool_call_proposed",
"tool_call_staged",
"tool_call_warned",
"tool_call_rejected",
"tool_call_applied",
"drift_detected",
"paused",
"retry",
"abort",
"complete",
"awaiting_user_input",
"stage_advanced",
"error",
"info",
]
awaiting_user_input is emitted instead of complete when the planner ends with no tool calls AND no staged results AND the last assistant message looks like a clarifying question. The frontend renders "Agent waiting for your reply…" instead of "Finished — nothing to stage."
Three surfaces
The surface field on the start request picks the loop shape. Note: the backend default on POST /ai/agent/start is "agent_staged", but the frontend drawer default (per-flow preference, persisted) is "agent_live" — that's the one most user sessions land on unless explicitly switched. A request with surface="agent_complex" against a supports_tools=False model returns 422 at validation.
agent_staged. The big idea: smaller models can't reliably function-call against a wide tool catalog. Solution: expose one tool per LLM round so each round is a tightly-scoped enum / shape decision. Mode is "stage" — proposals accumulate and are bundled into a GraphDiff for accept/reject. State machine:
plan ──▶ classify ─[op_kind=add]──▶ pick_type ──▶ pick_upstream ──▶ fill_settings ──▶ classify
├─[modify/delete/connect/disconnect]──▶ single_stage_op ──▶ classify
└─[other]──▶ (terminate or → verify_completion if opted in)
planexposesflowfile.meta.emit_plan— the LLM outlines the full multi-step plan as a structured narrative. Skipped (skip_plan=True) when the session was promoted from chat (the chat-mode response already produced a plan-shaped narrative).classifyexposesflowfile.meta.classify_intentand asks the LLM to pick aPlannerOpKindfor the next step.pick_typeexposesflowfile.meta.pick_node_type(add path only).pick_upstreamexposesflowfile.meta.pick_upstreamwith the upstream id enum populated fromlive_ids ∪ session.staged_node_ids.fill_settingsexposes the picked node type'sflowfile.graph.add_<type>tool with planner-injected fields stripped from its parameter schema.single_stage_op(non-add path) exposes exactly one ops tool —update_node_settings,delete_node,connect, ordelete_connection.verify_completion(opt-in viaverify_plan_completion=True) runs one extra round at the end so the LLM can confirmis_complete=trueagainst the chat-mode plan or send control back toclassify.
agent_complex. Single-shot, full tool catalog in one call. Best for big models (Sonnet, Opus, GPT-4.1, Gemini Pro). The state machine field is unused; the planner runs each round as a free-form tool-using turn. Same mode="stage" and accept/reject GraphDiff flow as agent_staged. Higher rate-limit pressure, fewer round-trips.
agent_live. Same state machine as agent_staged, but mode="apply" — each tool call mutates the live graph immediately, the affected subgraph is run (Performance) or a sample is evaluated (Development), and the runtime observation is fed back as the next tool reply. On runtime failure the just-added node is auto-deleted and the LLM retries up to max_retries_per_step. There is no bundled staged_results GraphDiff and no accept/reject step — applied_results on the session is the record of truth, and the canvas itself is the running record. This is the variant the frontend drawer defaults to.
Session lifecycle
Cold start: a running session with no live SSE stream is detected by _looks_cold_started and flipped to paused_user_action. The user re-attaches via POST /ai/agent/{session_id}/resume?action=continue. This avoids two SSE streams racing against the same session.
Followups: after complete or awaiting_user_input, the user can send another message via POST /ai/agent/{session_id}/followup with action="user_message". Rejecting a staged diff fires the same endpoint with action="rejected_diff" and an optional rejection note that becomes context for the next round.
Tool execution
flowfile_core/flowfile_core/ai/tools/executor.py::execute_tool_call(tool_name, tool_args, mode, ...) is the single entry point for every tool the LLM emits. It dispatches by the tool-name domain prefix:
| Domain | Operations | Handler |
|---|---|---|
flowfile.graph.* |
add_<node_type>, connect, delete_node, delete_connection, update_node_settings |
graph_ops.py |
flowfile.schema.* |
update_column_name, update_column_type |
schema_ops.py |
flowfile.codegen.* |
regenerate_polars_code, refactor_python_script |
codegen_ops.py |
flowfile.meta.* |
classify_intent, emit_plan, pick_node_type, pick_upstream, fill_<node>_settings, verify_completion |
meta_ops.py |
mode is one of:
stage(default for the agent). Validate the args via the Pydantic settings model, run schema prediction, build aStagedToolEntry, and return it without touching the live graph. The planner accumulates these intosession.staged_resultsand bundles them into aGraphDiffat the end of the run.apply. Validate, then mutate the live graph immediately. Used byagent_liveand by the Cmd+K palette / inline actions when the user accepts a staged diff.
The result shape is ToolExecutionResult:
class ToolExecutionResult(BaseModel):
status: Literal["staged", "applied", "rejected"]
staged_node_payload: dict[str, Any] | None
refusal_detail: str | None
audit_id: str | None
...
Schema prediction
tools/predictor.py predicts the output schema of an add_<node> before the node is added, so the planner can keep building on a node without committing first. Two strategies:
- Mirror for static / source / passthrough nodes — read settings, return the deterministic schema.
- Kernel dry-run for dynamic nodes (
polars_code,python_script,sql_query) — execute the code in a sandboxed kernel, capture the output schema, cache the result by(code_hash, input_schema_hash)intools/dry_run.py'sDryRunCache.
Refusal paths
The executor rejects rather than passes through certain shapes:
polars_codebodies containingimportstatements (security boundary — the kernel runtime is sandboxed but Flowfile-level bans are clearer).- Node settings that fail Pydantic validation (with a fully-formed error message including the JSON path and an example payload pulled from
tools/node_docs.py::NODE_AGENT_PAYLOAD_EXAMPLES). - Sink nodes used as upstreams (
_detect_sink_upstreams/_format_sink_upstream_refusal).
Refusals come back as ToolExecutionResult(status="rejected", refusal_detail=...), which the planner echoes to the LLM as the next tool reply. The model can correct course and retry within max_retries_per_step.
Tool catalog narrowing
tools/registry.py::build_tool_catalog(*, surface=None) returns the list[ToolSpec] to expose for the current surface. agent_complex gets the full catalog; agent_staged gets one or two tools depending on the current PlannerStage; cmd_k and inline surfaces get the single tool they need.
GraphDiff and accept/reject
diff.py::GraphDiff is the unit of stageable change:
class GraphDiff(BaseModel):
diff_id: str
flow_id: int
operations: list[StagedToolEntry] # tool_name + audit_id + staged_node_payload
applied_results: list[AppliedNodeRecord] = []
drifted_at: datetime | None = None
...
bundle_staged_results(staged_results) packs the planner's session.staged_results into a GraphDiff at session end. apply_diff(flow, diff) is the atomic apply — drift check first, then mutate, then create one undo point. Reject just discards the diff.
diff_store.py keeps a UUID-keyed dict of pending diffs, with optional disk persistence so restarts don't lose work.
diff_routes.py exposes:
POST /ai/diff/stage— used by Cmd+K / inline surfaces to stage a non-agent diff.POST /ai/diff/{diff_id}/accept— drift-checks then applies.POST /ai/diff/{diff_id}/reject— discards. Body optionally includes a rejection note that the agent reads on its next round.
Audit, safety, scheduler
Audit (audit.py). Every tool call records an AuditEvent (session_id, user_id, tool_name, result_status, provider, model, redacted args). Used for compliance, replay, and the per-session activity log in the UI.
Safety (safety.py). scrub_value_regex(value) runs a configurable set of regexes for common PII patterns (emails, credit cards, SSNs, etc.) and replaces matches with markers. Also exposes prepare_samples(...) which the agent uses when samples_mode="regex" to scrub sample rows before they're shown to the LLM.
Scheduler (scheduler.py). RateLimitScheduler enforces per-provider RPM/RPD via in-memory deques. Read from env: FLOWFILE_AI_<PROVIDER>_RPM / FLOWFILE_AI_<PROVIDER>_RPD (e.g., FLOWFILE_AI_ANTHROPIC_RPM=30). On 429 the scheduler honors Retry-After independently of the local caps. with_provider_retry(provider, ...) and stream_with_provider_retry(provider, ...) are the opt-in wrappers; byok.get_configured_provider is not auto-wrapped — callers compose explicitly. A RateLimitHint callback piped into the SSE stream surfaces "rate-limited, retrying in Ns" toasts to the UI.
Prompt logging (debug)
prompt_log.py is the dev-mode "what did the model actually see?" hatch. Off by default in production.
Enable (env vars on the core process):
export FLOWFILE_AI_LOG_PROMPTS=true # daily JSONL writes
export FLOWFILE_AI_LOG_PROMPTS_SCRUB=true # optional: scrub PII in user/tool messages
Both flags are also MutableBools and can be flipped at runtime via the settings module without restart.
Log location. {base_directory}/ai_prompts/YYYY-MM-DD.jsonl — one file per day, one line per LLM round-trip. {base_directory} is the same dir that owns master_key.txt / temp/ / system_logs/ (see shared.storage_config.storage.base_directory); on a local install this resolves to ~/.flowfile/.
One line per LLM call. For streaming calls the wrapper accumulates content + tool-call deltas as the iterator yields and emits the entry at stream-end. If the stream errors mid-flight, the entry still lands with whatever was accumulated and error set.
Truncation. Entries past 256 KiB get older user/assistant message bodies replaced with [...truncated, len=N chars] markers, preserving the system prompt and the most recent 5 turns. Each line stays parseable by jq.
Scrubbing. When FLOWFILE_AI_LOG_PROMPTS_SCRUB=true, user/tool message bodies are run through safety.scrub_value_regex before write. System and assistant content are not scrubbed — those are exactly what you need to read to debug model behaviour.
Failure isolation. A logging error never crashes the LLM call; the wrapper swallows + warns on file-IO / serialisation failures.
CLI:
# tail the last 20 entries
python -m flowfile_core.ai.prompt_log tail 20
# grep for a substring across recent entries (optional surface filter)
python -m flowfile_core.ai.prompt_log grep "filter for Q4"
python -m flowfile_core.ai.prompt_log grep "agent_staged" agent_staged
The CLI emits one JSON object per line; pipe through jq for ergonomic filtering.
Streaming
streaming.py exposes:
make_streaming_response(generator)/sse_stream(generator)— wrap an async generator of strings or events as an SSE response.planner_events_sse(events)— same shape but specialised forPlannerEvent, including theevent:SSE header per event type so frontend handlers can dispatch onEventSource.addEventListener('tool_call_staged', ...).
A 15-second keepalive frame is emitted on every stream so connections survive long thinking.
Intent router
intent_router.py::classify_intent(message, *, provider, surface="intent_classifier", ...) runs a small fast model (Haiku, Flash, 4.1-mini) and returns a discriminated decision:
class RouteResponse(BaseModel):
target: Literal["chat", "agent"]
confidence: Literal["low", "medium", "high"]
reason: str
The chat drawer's Auto mode hits POST /ai/route first, then dispatches to /ai/chat/stream or /ai/agent/start based on the result. Failure (timeout, parse error) defaults to "chat" — auto-promotion is a convenience, not a correctness requirement.
Frontend integration
The frontend is Vue 3 + Pinia. The store / component split:
stores/ai-store.ts— chat drawer state, mode toggle (Chat / Auto / Agent), in-flight stream handle, message history.stores/ai-agent-store.ts— agent session id, currentstage/picked_op_kind/picked_node_type, event timeline, staged diff id. Also handles SSE subscription, including reconnect on cold-start.stores/ai-diff-store.ts— pending diffs by id, accept/reject actions.stores/ai-autocomplete-store.ts— formula / join-key suggestion cache with timeouts and degraded-flag handling.stores/ai-command-palette-store.ts— Cmd+K palette state.stores/ai-ghost-node-store.ts— edge-stub suggestion cache.
Components in features/ai/:
AiAssistant.vue— the drawer (chat + agent).AiAgentRun.vue/AiAgentEvent.vue— agent panel and per-event renderers.AiDiffPanel.vue/AiDiffPreview.vue— diff review UI.AiInlineActions.vue— the ✨ menu on a node.AiCommandPalette.vue— Cmd+K.AiGhostNode.vue— edge-stub suggestions.AiMentionAutocomplete.vue(+mentionVocabulary.ts) —@flow/@node:idcompletion.
The BYOK panel lives in views/AiProvidersView/AiSettingsTab.vue and talks to the BYOK routes through api/ai.api.ts.
Adding a new surface
A typical recipe for a new AI feature:
- Decide whether it's read-only (chat-shape) or mutating (agent-shape).
- Pick a surface name (e.g.
my_new_feature) and add a per-surface entry to each provider'ssurface_modelsdict inproviders/{anthropic,openai,...}.py. - Add a system prompt under
flowfile_core/flowfile_core/ai/prompts/. Reusebase.mdas a foundation. - Add a route file
flowfile_core/flowfile_core/ai/my_feature_routes.pywith a Pydantic request model and a handler that:- Calls
byok.get_configured_provider(db, current_user.id, payload.provider, surface="my_new_feature", model=payload.model). - Calls
context.builder.render_prompt_context(...)for the user message. - Either
await provider.chat(...)for non-streaming, orstreaming.sse_stream(provider.stream(...))for streaming. - For mutating features: build a tool catalog via
tools.registry.build_tool_catalog(surface="my_new_feature"), dispatch tool calls throughtools.executor.execute_tool_call(..., mode="stage"), accumulate intostaged_results, bundle into aGraphDiffviadiff.bundle_staged_results(...), register withdiff_store, and return thediff_idso the frontend can render an accept/reject UI.
- Calls
- Mount the new sub-router on
routes.py::router.include_router(my_feature_router)— therequire_ai_enableddependency is inherited automatically. - Add the corresponding Pinia store and Vue components on the frontend.
- Don't forget to update AI Assistant overview and Provider Setup once the user-visible behaviour is finalised.
Existing surfaces are good templates — chat_routes.py for streaming chat, command_palette_routes.py for stage-and-return-diff, agent_routes.py for the full agent loop.