--- status: draft last_updated: 2026-05-19 --- # Consumer Integration Guide End-to-end walkthrough: from operation specs to a running workflow. This document shows how a consumer (alkhub, OpenCode, cograph) uses flowgraph's components together. ## Overview The integration path follows five phases: ``` 1. Register operations → Build operation graph 2. Define workflow template → Validate against operation graph 3. Render template to DAG → Validate DAG structure 4. Create reactive execution → Drive workflow via signals 5. Subscribe to status changes → Respond to completion/failure ``` Each phase uses a different flowgraph module. The complete integration uses all modules; partial integrations are possible. ## Phase 1: Register Operations → Build Operation Graph ```typescript import { OperationRegistry } from "@alkdev/operations"; import { FlowGraph } from "@alkdev/flowgraph/graph"; import { buildTypeEdges } from "@alkdev/flowgraph/analysis"; // 1. Create the registry with operation specs const registry = new OperationRegistry([ { namespace: "task", name: "classify", type: "query", inputSchema: {...}, outputSchema: {...} }, { namespace: "task", name: "enrich", type: "query", inputSchema: {...}, outputSchema: {...} }, { namespace: "task", name: "summarize", type: "mutation", inputSchema: {...}, outputSchema: {...} }, // ... more operations ]); // 2. Build the operation graph const operationGraph = FlowGraph.fromSpecs(registry.getAll()); // 3. The graph now has type-compatibility edges operationGraph.hasEdge("task.classify", "task.enrich"); // → true (if compatible) operationGraph.getEdgeAttributes("task.classify", "task.enrich"); // → { edgeType: "typed", compatible: true, detail: "classify.output → enrich.input" } ``` **What happens internally**: - `fromSpecs()` creates a node for each operation (key: `namespace.name`) - `buildTypeEdges()` compares each pair's `outputSchema` → `inputSchema` and adds edges - Cycles are rejected at construction time (DAG invariant) **Partial integration**: If you only need the operation graph (no workflows), stop here. The operation graph is useful for type-compatibility queries and topological ordering without defining any templates. ## Phase 2: Define Workflow Template → Validate ```typescript import { h } from "@alkdev/ujsx"; import { Operation, Sequential, Parallel, Conditional, Map } from "@alkdev/flowgraph/component"; import { validateTemplate } from "@alkdev/flowgraph/analysis"; // Define a template const template = h(Sequential, {}, h(Operation, { name: "task.classify" }), h(Conditional, { test: (results) => results["task.classify"].output.confidence > 0.8, }, // High-confidence path h(Parallel, {}, h(Operation, { name: "task.enrich" }), h(Operation, { name: "task.summarize" }), ), // Low-confidence fallback h(Operation, { name: "task.classify" }), // re-classify with different params ), ); // Validate against the operation graph const errors = validateTemplate(template, operationGraph); if (errors.length > 0) { for (const error of errors) { console.error(`Validation error: ${error.type}`, error); } // Handle errors... } ``` **Validation checks**: 1. All `Operation` names exist in the registry 2. No cycles in the rendered DAG 3. Type compatibility between sequential operations 4. All operations are reachable from the start **Template serialization** (for storage/transmission): ```typescript // Serialize to JSON const json = JSON.stringify(template); // Deserialize and validate const parsed = JSON.parse(json); const templateErrors = validateTemplate(parsed, operationGraph); ``` Note: function-valued props (like `Conditional.test`) don't survive JSON serialization. Use string references for stored templates and resolve them at render time. ## Phase 3: Render Template to DAG → Validate Structure ```typescript import { createRoot } from "@alkdev/ujsx"; import { GraphologyHostConfig } from "@alkdev/flowgraph/host/graphology"; import { DirectedGraph } from "graphology"; // Create the GraphologyHostConfig const hostConfig = new GraphologyHostConfig(); const root = createRoot(hostConfig, new DirectedGraph()); // Render the template to a DAG root.render(template); // The DAG is now available in the root context const dag = root.ctx.graph; // Validate the DAG dag.hasCycles(); // → false (always, if template is valid) dag.nodes(); // → ["task.classify", "task.enrich", "task.summarize"] dag.edges(); // → ["task.classify->task.enrich", "task.classify->task.summarize"] // Query the DAG dag.inNeighbors("task.enrich"); // → ["task.classify"] dag.outNeighbors("task.classify"); // → ["task.enrich", "task.summarize"] ``` **What happens internally**: - The `GraphologyHostConfig` renders each `Operation` as a node and each structural relationship (`Sequential`, `Parallel`, `Conditional`) as edges - Structural containers (`Sequential`, `Parallel`, `Conditional`) are transparent — they produce edges, not nodes - The result is a pure DAG that can be analyzed, serialized, or used for validation ## Phase 4: Create Reactive Execution → Drive Workflow ```typescript import { WorkflowReactiveRoot } from "@alkdev/flowgraph/reactive"; import { ReactiveHostConfig } from "@alkdev/flowgraph/host/reactive"; // 1. Create the ReactiveRoot from the DAG const workflowRoot = new WorkflowReactiveRoot(dag); // 2. Create the ReactiveHostConfig const reactiveHost = new ReactiveHostConfig(registry, workflowRoot); // 3. Render the template to create reactive state const reactiveRoot = createRoot(reactiveHost, {}); reactiveRoot.render(template); // 4. Subscribe to status changes and effect-driven execution for (const [nodeId, node] of workflowRoot.nodes) { // Start the call when preconditions are met effect(() => { if (node.preconditions.value && node.status.value === "idle" || node.status.value === "waiting") { node.status.value = "running"; // getInput resolves the node's input from predecessor outputs and static config // For Operation nodes, input comes from the template props or aggregated predecessor results const input = resolveInput(nodeId, workflowRoot); registry.execute(node.operationId, input, { parentRequestId: parentCallId }) .then(result => { node.status.value = "completed"; node.output.value = result; }) .catch(error => { node.status.value = "failed"; }); } }); // Track failures effect(() => { if (node.status.value === "failed") { console.error(`Node ${nodeId} failed`); } }); } // 5. Kick off the workflow — root nodes start as "ready" // (The effect-driven execution above handles the rest automatically) // Root nodes' preconditions are true by default (no predecessors) // so they transition to "ready" immediately ``` **What happens automatically**: - Node status changes propagate reactively through `computed` preconditions - When a predecessor completes, dependents automatically transition to `ready` - When a predecessor fails, dependents' `blockedByFailure` triggers and they transition to `aborted` - The entire workflow progresses without manual orchestration ## Phase 5: Handle Completion → Cleanup ```typescript // Track overall workflow status const allNodes = Array.from(workflowRoot.statusMap.values()); const allCompleted = () => allNodes.every(s => s.value === "completed" || s.value === "failed" || s.value === "aborted" || s.value === "skipped" ); // Check for success effect(() => { if (allCompleted()) { const failed = allNodes.filter(s => s.value === "failed"); const aborted = allNodes.filter(s => s.value === "aborted"); const completed = allNodes.filter(s => s.value === "completed"); const skipped = allNodes.filter(s => s.value === "skipped"); console.log(`Workflow complete: ${completed.length} completed, ${failed.length} failed, ${aborted.length} aborted, ${skipped.length} skipped`); // Cleanup workflowRoot.dispose(); } }); // Handle system-level abort (e.g., provider outage, auth failure) function handleSystemFailure(error: Error) { workflowRoot.abortAll(); prm.abortAll(pendingRequestIds); workflowRoot.dispose(); console.error(`Workflow aborted: ${error.message}`); } ``` ## Export/Import for Persistence ```typescript import { FlowGraph } from "@alkdev/flowgraph/graph"; // Export the call graph for persistence const serialized = callGraph.export(); // → FlowGraphSerialized format (graphology native JSON) // Store in Postgres (hub's responsibility) await db.query('INSERT INTO call_graphs (id, data) VALUES ($1, $2)', [workflowId, JSON.stringify(serialized)]); // Restore from persistence const restored = FlowGraph.fromJSON(serialized); // → FlowGraph with all nodes and edges ``` ## Call Graph Population (Real-Time) The call graph can be populated incrementally from call protocol events: ```typescript import { FlowGraph } from "@alkdev/flowgraph/graph"; // Create empty call graph const callGraph = new FlowGraph(); // Subscribe to call protocol events pubsub.subscribe("call.requested", (event) => callGraph.updateFromEvent(event)); pubsub.subscribe("call.responded", (event) => callGraph.updateFromEvent(event)); pubsub.subscribe("call.error", (event) => callGraph.updateFromEvent(event)); pubsub.subscribe("call.aborted", (event) => callGraph.updateFromEvent(event)); pubsub.subscribe("call.completed", (event) => callGraph.updateFromEvent(event)); // Query the call graph for observability callGraph.filterByStatus("running"); // What's currently running callGraph.children("req_abc123"); // Children of a call callGraph.lineage("req_xyz789"); // Ancestor chain callGraph.duration("req_abc123"); // How long a call took ``` ## Minimal Integration Example For consumers that only need the operation graph and template validation (no reactive execution): ```typescript import { FlowGraph } from "@alkdev/flowgraph/graph"; import { h } from "@alkdev/ujsx"; import { Operation, Sequential } from "@alkdev/flowgraph/component"; import { validateTemplate, typeCompat } from "@alkdev/flowgraph/analysis"; // 1. Build operation graph const operationGraph = FlowGraph.fromSpecs(registry.getAll()); // 2. Define and validate template const template = h(Sequential, {}, h(Operation, { name: "task.classify" }), h(Operation, { name: "task.enrich" }), ); const errors = validateTemplate(template, operationGraph); // 3. Query type compatibility const result = typeCompat( registry.get("task.classify").outputSchema, registry.get("task.enrich").inputSchema, ); console.log(result.compatible); // → true or false console.log(result.mismatches); // → TypeMismatch[] if incompatible ``` This integration only requires `@alkdev/flowgraph/graph`, `@alkdev/flowgraph/component`, and `@alkdev/flowgraph/analysis`. No reactive execution, no ujsx HostConfig, no signals. ## Module Dependency Map ``` ┌─────────────────────────────────────────────────┐ │ Consumer (hub coordinator, OpenCode plugin) │ └────────┬────────────────┬────────────────┬───────┘ │ │ │ ┌────▼────┐ ┌──────▼──────┐ ┌──────▼──────┐ │ graph │ │ component │ │ analysis │ │ │ │ │ │ │ │FlowGraph│ │Operation │ │typeCompat │ │fromSpecs│ │Sequential │ │validate │ │queries │ │Parallel │ │topological │ │mutations│ │Conditional │ │parallelGroups│ └────┬────┘ │Map │ └──────┬──────┘ │ └──────┬───────┘ │ │ │ │ ┌────▼────────────────▼─────────────────▼─────┐ │ schema │ │ OperationNodeAttrs CallNodeAttrs │ │ OperationEdgeAttrs CallEdgeAttrs │ │ TemplateEdgeAttrs NodeStatus EdgeType │ └──────────────────┬──────────────────────────┘ │ ┌──────────────────▼──────────────────────────┐ │ host │ │ GraphologyHostConfig ReactiveHostConfig │ └──────────────────┬──────────────────────────┘ │ ┌──────────────────▼──────────────────────────┐ │ reactive │ │ WorkflowReactiveRoot WorkflowNode │ │ NodeStatus signals computed preconditions │ └──────────────────┬──────────────────────────┘ │ ┌──────────────────▼──────────────────────────┐ │ error │ │ FlowgraphError hierarchy │ └──────────────────────────────────────────────┘ External dependencies: ┌────────────┐ ┌────────────┐ ┌──────────────┐ │ graphology │ │ ujsx │ │@preact/sign │ │ graphology │ │ h, create │ │ als-core │ │ -dag │ │ Root │ │ signal,comp, │ └─────────────┘ └────────────┘ │ effect │ └──────────────┘ ``` ## Common Patterns ### Pattern: SDD Pipeline ```typescript // The archetypal SDD (Spec-Driven Development) pipeline const sddPipeline = h(Sequential, {}, h(Operation, { name: "task.architect" }), h(Conditional, { test: (results) => results["task.architect"].output.approved, }, h(Sequential, {}, h(Operation, { name: "task.decomposer" }), h(Operation, { name: "task.coordinator" }), ), // else-branch: architect disapproved, loop back or stop h(Operation, { name: "task.notify-stakeholder" }), ), ); ``` ### Pattern: Fan-Out/Fan-In ```typescript // Process items in parallel, then aggregate const fanOut = h(Sequential, {}, h(Operation, { name: "task.fetch-items" }), h(Map, { over: (results) => results["task.fetch-items"].output.items, as: "item", }, h(Operation, { name: "task.process-item" }), ), h(Operation, { name: "task.aggregate-results" }), ); ``` ### Pattern: Error Boundary with Conditional ```typescript // Critical operation with graceful degradation const withFallback = h(Sequential, {}, h(Conditional, { test: (results) => results["task.fetch-data"].status !== "failed", }, // Happy path h(Operation, { name: "task.transform" }), // Fallback h(Operation, { name: "task.use-cache" }), ), // This operation runs regardless — the Conditional resolves // whether the then or else branch was taken h(Operation, { name: "task.notify" }), ); ``` ## Constraints on Consumers - **The hub coordinator drives execution** — flowgraph provides reactive state (signals, computed), not call execution. The coordinator reads `preconditions` and `blockedByFailure` and calls `registry.execute()` when appropriate. - **Dispose is mandatory** — `WorkflowReactiveRoot.dispose()` must be called when the workflow completes or is cancelled. Without disposal, signal subscriptions leak. - **Template rendering is currently one-shot** — until the ujsx reconciler is implemented, `createRoot(host, container).render(template)` can only be called once per root. To re-render, create a new root. - **Function props don't survive serialization** — `Conditional.test` and `Map.over` with function values require runtime resolution. Use string references for stored templates. - **Call graph is independent of reactive execution** — you can build a call graph from events without using the reactive layer. The reactive layer is optional for consumers that only need observability. ## References - Architecture overview: [README.md](README.md) - FlowGraph API: [flowgraph-api.md](flowgraph-api.md) - Schema: [schema.md](schema.md) - Workflow templates: [workflow-templates.md](workflow-templates.md) - Host configs: [host-configs.md](host-configs.md) - Reactive execution: [reactive-execution.md](reactive-execution.md) - Call graph: [call-graph.md](call-graph.md) - Analysis: [analysis.md](analysis.md)