# Phase 5: Flowgraph HostConfig ## Status: Spec (Draft) ## Problem Flowgraph is a separate project (`@alkdev/flowgraph`) that uses `@alkdev/ujsx` as a dependency. It renders workflow templates (which are `UNode` trees) to different targets using `HostConfig`. This requires two new host implementations: 1. **Graphology DAG host** — renders a workflow template to a graphology directed acyclic graph for structural analysis, cycle detection, topological sort 2. **Reactive execution host** — renders a workflow template to an in-memory reactive execution engine for runtime workflow execution ## Conceptual Mapping | Workflow concept | UJSX concept | |---|---| | Workflow template | `URoot` / `UElement` tree (`{ type, props, children }`) | | Operation step | `` → `UElement` | | Sequential flow | Parent → child relationship | | Parallel branches | Multiple children of a `Parallel` component | | Status propagation | `signal`/`computed` reactive layer | | Template validation | `TransformRegistry` rule pass | | Template → DAG | `HostConfig` rendering into graphology | | Template → execution engine | Different `HostConfig` for different runners | ## Host 1: Graphology DAG ### Purpose Convert a declarative workflow template into a graphology DAG for: - Structural analysis (connected components, reachability) - Cycle detection (prevent invalid workflows) - Topological sort (execution ordering) - Critical path analysis - Visualization ### Type Parameters ```typescript HostConfig ``` Where: - `WorkflowTag` = string literal union: `"operation" | "sequential" | "parallel" | "conditional" | "workflow"` - `GraphologyNode` = graphology node instance (with attributes) - `GraphologyContext` = `{ graph: Graph; nodeIdCounter: number }` ### Implementation ```typescript import Graph from "graphology"; import { HostConfig } from "@alkdev/ujsx/host"; import { hasCycle } from "graphology-dag"; type WorkflowTag = "operation" | "sequential" | "parallel" | "conditional" | "workflow"; interface GraphNodeAttrs { label: string; type: WorkflowTag; status: "pending" | "running" | "completed" | "failed" | "aborted"; props: Record; } interface GraphologyContext { graph: Graph; nodeIdCounter: number; parentStack: string[]; // stack of parent node IDs for edge creation } export function createGraphologyHost(): HostConfig { return { name: "graphology-dag", createRootContext(container, options, context) { return { graph: new Graph({ type: "directed", multi: false }), nodeIdCounter: 0, parentStack: [], }; }, finalizeRoot(ctx) { if (hasCycle(ctx.graph)) { throw new CycleError("Workflow contains a cycle"); } }, createInstance(tag, props, ctx, parent) { const id = `node_${ctx.nodeIdCounter++}`; ctx.graph.addNode(id, { label: String(props.name ?? id), type: tag, status: "pending", props, }); return id; }, createTextInstance(text, ctx, parent) { // Workflows don't have text nodes; skip return `text_${ctx.nodeIdCounter++}`; }, appendChild(parent, child, ctx) { ctx.graph.addEdge(parent, child, { type: "dependency" }); }, insertBefore(parent, child, before, ctx) { ctx.graph.addEdge(parent, child, { type: "dependency" }); // graphology doesn't have order, but we can add an ordinal attribute }, removeChild(parent, child, ctx) { ctx.graph.dropEdge(parent, child); ctx.graph.dropNode(child); }, prepareUpdate(instance, tag, prevProps, nextProps, ctx) { const attrs = ctx.graph.getNodeAttributes(instance); const changed = {}; for (const key of Object.keys(nextProps)) { if (nextProps[key] !== prevProps[key]) { changed[key] = nextProps[key]; } } return Object.keys(changed).length > 0 ? changed : null; }, commitUpdate(instance, payload, tag, prevProps, nextProps, ctx) { ctx.graph.updateNodeAttributes(instance, (attrs) => ({ ...attrs, props: { ...attrs.props, ...payload }, })); }, }; } ``` ### What the DAG Host Enables ```typescript import { h, createRoot } from "@alkdev/ujsx"; import { createGraphologyHost } from "@alkdev/flowgraph/host/graphology"; const workflow = h("workflow", { name: "document-pipeline" }, h("sequential", { name: "ingest" }, h("operation", { name: "fetch", type: "http-request", url: "https://..." }), h("operation", { name: "parse", type: "json-parse" }), ), h("parallel", { name: "analyze" }, h("operation", { name: "classify", type: "llm-classify" }), h("operation", { name: "extract", type: "llm-extract" }), ), h("conditional", { name: "route", condition: "classify.result" }, h("operation", { name: "escalate", type: "notify" }), h("operation", { name: "archive", type: "store" }), ), ); const host = createGraphologyHost(); const root = createRoot(host, {}); root.render(workflow); // ctx.graph is now a graphology DAG const { graph } = root.ctx; console.log(graph.order); // number of nodes console.log(topologicalSort(graph)); // execution order ``` ## Host 2: Reactive Execution Engine ### Purpose Render a workflow template to a reactive execution engine where: - Each operation node has a `signal` for its status - Downstream operations use `computed` signals that check upstream completion - Status propagation is automatic: when `operation_A.status` becomes `completed`, dependent operations automatically start (if preconditions are met) ### Type Parameters ```typescript HostConfig ``` Where: - `OperationInstance` = `{ id: string; status: Signal; result: Signal; definition: Record }` - `ExecutionContext` = `{ operations: Map; runner: OperationRunner }` ### Implementation Sketch ```typescript import { signal, computed, batch } from "@preact/signals-core"; type OpStatus = "pending" | "running" | "completed" | "failed" | "aborted"; interface OperationInstance { id: string; status: Signal; result: Signal; definition: Record; } interface ExecutionContext { operations: Map; parentStack: string[]; runner: OperationRunner; } export function createReactiveHost(runner: OperationRunner): HostConfig { return { name: "reactive-execution", createRootContext(container, options) { return { operations: new Map(), parentStack: [], runner, }; }, createInstance(tag, props, ctx) { const instance: OperationInstance = { id: String(props.name ?? `op_${ctx.operations.size}`), status: signal("pending"), result: signal(null), definition: props, }; ctx.operations.set(instance.id, instance); return instance; }, appendChild(parent, child, ctx) { // Establish dependency: child depends on parent // When parent completes, child's preconditions are re-evaluated const parentInstance = typeof parent === "string" ? ctx.operations.get(parent)! : parent as OperationInstance; const childInstance = child as OperationInstance; // Create a computed that watches parent status const preconditionsMet = computed(() => { // For sequential: parent must be completed if (parentInstance.status.value === "completed") return true; return false; }); // Effect: when preconditions are met, start the child operation effect(() => { if (preconditionsMet.value && childInstance.status.value === "pending") { batch(() => { childInstance.status.value = "running"; ctx.runner.execute(childInstance).then( (result) => batch(() => { childInstance.result.value = result; childInstance.status.value = "completed"; }), (error) => batch(() => { childInstance.result.value = error; childInstance.status.value = "failed"; }), ); }); } }); }, commitUpdate(instance, payload, tag, prevProps, nextProps, ctx) { // Update the operation's definition instance.definition = { ...instance.definition, ...payload }; }, // ... remaining methods }; } ``` ### Abort Cascade The reactive host naturally supports abort cascading. When a parent operation fails, downstream operations can be automatically aborted: ```typescript // In appendChild, also watch for parent failure: const abortTrigger = computed(() => { return parentInstance.status.value === "failed" || parentInstance.status.value === "aborted"; }); effect(() => { if (abortTrigger.value && childInstance.status.value === "pending") { batch(() => { childInstance.status.value = "aborted"; }); } }); ``` This is the key advantage of the signal-based approach: no manual event wiring for status propagation. The reactive graph handles it automatically. ## Integration with Flowgraph ### Flowgraph Package Structure ``` @alkdev/flowgraph/ src/ component/ # ujsx components for workflow definition operation.ts # component sequential.ts # — sequential execution parallel.ts # — concurrent execution conditional.ts # — branching index.ts host/ graphology.ts # HostConfig: ujsx template → graphology DAG reactive.ts # HostConfig: ujsx template → reactive execution engine schema/ enums.ts # CallStatus, NodeStatus, EdgeType node.ts # WorkflowNode schema (OperationNode, CallNode) edge.ts # WorkflowEdge schema graph.ts # SerializedGraph factory + flowgraph schemas graph/ construction.ts # FlowGraph class (wraps graphology) validation.ts # Cycle detection, type-compat checking queries.ts # topological sort, ancestors/descendants, critical path mutation.ts # addNode, addEdge, updateStatus reactive/ workflow.ts # ReactiveRoot for workflow state node.ts # Per-node signal management analysis/ type-compat.ts # Output→Input schema compatibility checking workflow.ts # Execution ordering, precondition validation defaults.ts # Enum defaults error/ index.ts # FlowgraphError, CycleError, TypeCompatError index.ts # Public API ``` ### Dependencies ``` @alkdev/typebox # Schema definitions @alkdev/pubsub # Event transport (peer dep, optional) @alkdev/operations # OperationSpec types for type-compat edges (peer dep) @alkdev/ujsx # Template definition, reactive layer, transform @preact/signals-core # Transitive via ujsx, reactive state graphology # DAG structural analysis graphology-dag # Cycle detection, topological sort ``` ### Key Design Decision `@alkdev/ujsx` should be a **direct dependency** of `@alkdev/flowgraph`. The workflow template IS the canonical representation, and the graphology DAG is one rendering target. This keeps things simple — one package, clear dependency chain. The `from_ujsx` adapter for operations stays in the operations package as a separate export path (consistent with how it already handles `from-mcp` and `from-openapi`). ## Changes to UJSX | File | Change | |------|--------| | None specific | Flowgraph is a consumer, not a modification of ujsx. | However, this phase validates the reconciler design: if the flowgraph host configs cannot be built cleanly on top of the Phase 1-3 reconciler, the reconciler design needs adjustment. Build both hosts early as integration tests. ## Dependencies - Phases 1-3 must be complete (the reactive host needs proper signal disposal, the graphology host needs `removeChild`) - Phase 4 is optional but highly beneficial (graphology host benefits from `Value.Equal` bail-out on unchanged workflows) - `graphology` and `graphology-dag` — new external dependencies for flowgraph (not ujsx) ## Open Questions 1. **Should flowgraph components be in `@alkdev/flowgraph` or `@alkdev/ujsx`?** The workflow components (``, ``, ``) could go in either package. Putting them in flowgraph keeps ujsx generic. Putting them in ujsx (under a separate export like `@alkdev/ujsx/workflow`) makes them reusable. Recommendation: keep them in flowgraph — they're domain-specific. 2. **How does the reactive host handle async operations?** `effect()` is synchronous. The `runner.execute()` call is async. Need to ensure the `effect` that starts execution doesn't re-run on every signal read inside the async chain. 3. **Should the graphology host support incremental updates?** If a workflow template changes (add/remove operations), the reconciler should update the graph in place rather than rebuilding it. This requires Phase 2 (key-based reconciliation) to be working. ## Test Cases For graphology host: 1. Render simple sequential workflow → correct DAG structure 2. Render parallel branches → correct multi-child edges 3. Cycle detection on invalid workflow 4. Topological sort order matches declaration order 5. Update an operation's props → graphology node attributes update 6. Remove an operation → graphology node and edges removed For reactive host: 1. Execute sequential workflow → operations run in order 2. Execute parallel branches → operations start concurrently 3. Operation failure → downstream operations aborted 4. Status propagation is automatic (no manual wiring) 5. Disposing a workflow stops all pending operations 6. Signal subscriptions are cleaned up on unmount