Files
flowgraph/docs/architecture/consumer-integration.md
glm-5.1 907c33650f fix: architecture review - address 5 critical issues, 6 warnings, 3 suggestions
Critical fixes:
- C1: Create standalone ADR-006 file (edge type consistency),
  extract from open-questions.md inline content
- C2: Convert CallResult from plain interface to TypeBox schema,
  aligning with 'TypeBox as single source of truth' constraint
- C3: Add fromJSON() cycle detection specification - enforce
  ADR-002 DAG invariant even on deserialized input
- C4: Rewrite consumer-integration.md Phase 4 to use ADR-005
  event-append pattern instead of direct signal mutation
- C5: Fix operator precedence bug in consumer-integration.md
  (missing parentheses around OR condition)

Warnings addressed:
- W1: Fix immutability claim - operation graph is 'conventionally
  immutable', not prevented by API
- W2: Add EventLogProjection to reactive exports map
- W3: Add CallResult/CallResultSchema to schema exports map
- W4: Fix reactive-execution.md Level 1 error handling to use
  event-append pattern instead of direct signal mutation
- W5: Remove duplicate dataFlow inference description in schema.md
- W6: Clarify ADR-006 project context (flowgraph vs taskgraph)

Suggestions implemented:
- S1: Add 'reviewed' document lifecycle status between draft/stable,
  update all docs to reviewed status
- S2: Add carve-out note for analysis result types in schema.md
  constraints (they are ephemeral, not serialized)
- S3: Add isComplete() and getAggregateStatus() convenience methods
  to WorkflowReactiveRoot specification
2026-05-21 19:40:45 +00:00

445 lines
18 KiB
Markdown

---
status: reviewed
last_updated: 2026-05-19
---
# Consumer Integration Guide
End-to-end walkthrough: from operation specs to a running workflow. This document shows how a consumer (alkhub, OpenCode, cograph) uses flowgraph's components together.
## Overview
The integration path follows five phases:
```
1. Register operations → Build operation graph
2. Define workflow template → Validate against operation graph
3. Render template to DAG → Validate DAG structure
4. Create reactive execution → Drive workflow via signals
5. Subscribe to status changes → Respond to completion/failure
```
Each phase uses a different flowgraph module. The complete integration uses all modules; partial integrations are possible.
## Phase 1: Register Operations → Build Operation Graph
```typescript
import { OperationRegistry } from "@alkdev/operations";
import { FlowGraph } from "@alkdev/flowgraph/graph";
import { buildTypeEdges } from "@alkdev/flowgraph/analysis";
// 1. Create the registry with operation specs
const registry = new OperationRegistry([
{ namespace: "task", name: "classify", type: "query", inputSchema: {...}, outputSchema: {...} },
{ namespace: "task", name: "enrich", type: "query", inputSchema: {...}, outputSchema: {...} },
{ namespace: "task", name: "summarize", type: "mutation", inputSchema: {...}, outputSchema: {...} },
// ... more operations
]);
// 2. Build the operation graph
const operationGraph = FlowGraph.fromSpecs(registry.getAll());
// 3. The graph now has type-compatibility edges
operationGraph.hasEdge("task.classify", "task.enrich"); // → true (if compatible)
operationGraph.getEdgeAttributes("task.classify", "task.enrich");
// → { edgeType: "typed", compatible: true, detail: "classify.output → enrich.input" }
```
**What happens internally**:
- `fromSpecs()` creates a node for each operation (key: `namespace.name`)
- `buildTypeEdges()` compares each pair's `outputSchema``inputSchema` and adds edges
- Cycles are rejected at construction time (DAG invariant)
**Partial integration**: If you only need the operation graph (no workflows), stop here. The operation graph is useful for type-compatibility queries and topological ordering without defining any templates.
## Phase 2: Define Workflow Template → Validate
```typescript
import { h } from "@alkdev/ujsx";
import { Operation, Sequential, Parallel, Conditional, Map } from "@alkdev/flowgraph/component";
import { validateTemplate } from "@alkdev/flowgraph/analysis";
// Define a template
const template = h(Sequential, {},
h(Operation, { name: "task.classify" }),
h(Conditional, {
test: (results) => results["task.classify"].output.confidence > 0.8,
},
// High-confidence path
h(Parallel, {},
h(Operation, { name: "task.enrich" }),
h(Operation, { name: "task.summarize" }),
),
// Low-confidence fallback
h(Operation, { name: "task.classify" }), // re-classify with different params
),
);
// Validate against the operation graph
const errors = validateTemplate(template, operationGraph);
if (errors.length > 0) {
for (const error of errors) {
console.error(`Validation error: ${error.type}`, error);
}
// Handle errors...
}
```
**Validation checks**:
1. All `Operation` names exist in the registry
2. No cycles in the rendered DAG
3. Type compatibility between sequential operations
4. All operations are reachable from the start
**Template serialization** (for storage/transmission):
```typescript
// Serialize to JSON
const json = JSON.stringify(template);
// Deserialize and validate
const parsed = JSON.parse(json);
const templateErrors = validateTemplate(parsed, operationGraph);
```
Note: function-valued props (like `Conditional.test`) don't survive JSON serialization. Use string references for stored templates and resolve them at render time.
## Phase 3: Render Template to DAG → Validate Structure
```typescript
import { createRoot } from "@alkdev/ujsx";
import { GraphologyHostConfig } from "@alkdev/flowgraph/host/graphology";
import { DirectedGraph } from "graphology";
// Create the GraphologyHostConfig
const hostConfig = new GraphologyHostConfig();
const root = createRoot(hostConfig, new DirectedGraph());
// Render the template to a DAG
root.render(template);
// The DAG is now available in the root context
const dag = root.ctx.graph;
// Validate the DAG
dag.hasCycles(); // → false (always, if template is valid)
dag.nodes(); // → ["task.classify", "task.enrich", "task.summarize"]
dag.edges(); // → ["task.classify->task.enrich", "task.classify->task.summarize"]
// Query the DAG
dag.inNeighbors("task.enrich"); // → ["task.classify"]
dag.outNeighbors("task.classify"); // → ["task.enrich", "task.summarize"]
```
**What happens internally**:
- The `GraphologyHostConfig` renders each `Operation` as a node and each structural relationship (`Sequential`, `Parallel`, `Conditional`) as edges
- Structural containers (`Sequential`, `Parallel`, `Conditional`) are transparent — they produce edges, not nodes
- The result is a pure DAG that can be analyzed, serialized, or used for validation
## Phase 4: Create Reactive Execution → Drive Workflow
```typescript
import { WorkflowReactiveRoot } from "@alkdev/flowgraph/reactive";
import { ReactiveHostConfig } from "@alkdev/flowgraph/host/reactive";
// 1. Create the ReactiveRoot from the DAG
const workflowRoot = new WorkflowReactiveRoot(dag);
// 2. Create the ReactiveHostConfig
const reactiveHost = new ReactiveHostConfig(registry, workflowRoot);
// 3. Render the template to create reactive state
const reactiveRoot = createRoot(reactiveHost, {});
reactiveRoot.render(template);
// 4. Drive execution via the event log (ADR-005 pattern)
// The hub coordinator appends call protocol events; the projections derive state.
for (const [nodeId, node] of workflowRoot.nodes) {
// Start the call when preconditions are met
effect(() => {
if (node.preconditions.value && (node.status.value === "idle" || node.status.value === "waiting")) {
// All preconditions satisfied — start the call by appending to the event log
const operationId = dag.getNodeAttributes(nodeId).name;
const requestId = crypto.randomUUID();
workflowRoot.nodeKeyToRequestId.set(nodeId, requestId);
// Append call.requested event — the status projection derives "running" from this
workflowRoot.append({
type: "call.requested",
requestId,
operationId,
input: getInput(nodeId, workflowRoot),
timestamp: new Date().toISOString(),
});
// Start the actual call — when it completes, append the result event
registry.execute(operationId, getInput(nodeId, workflowRoot), { parentRequestId })
.then(result => {
// Append call.responded event — the status projection derives "completed" from this
workflowRoot.append({
type: "call.responded",
requestId,
output: result,
timestamp: new Date().toISOString(),
});
})
.catch(error => {
// Append call.error event — the status projection derives "failed" from this
workflowRoot.append({
type: "call.error",
requestId,
error: { code: error.code, message: error.message },
timestamp: new Date().toISOString(),
});
});
}
});
// Track failures for logging
effect(() => {
if (node.status.value === "failed") {
console.error(`Node ${nodeId} failed`);
}
});
}
// 5. Kick off the workflow — root nodes start as "ready"
// (The effect-driven execution above handles the rest automatically)
// Root nodes' preconditions are true by default (no predecessors)
// so they transition to "ready" immediately
```
**What happens automatically**:
- Node status changes propagate reactively through `computed` preconditions
- When a predecessor completes, dependents automatically transition to `ready`
- When a predecessor fails, dependents' `blockedByFailure` triggers and they transition to `aborted`
- The entire workflow progresses without manual orchestration
## Phase 5: Handle Completion → Cleanup
```typescript
// Track overall workflow status
const allNodes = Array.from(workflowRoot.statusMap.values());
const allCompleted = () => allNodes.every(s =>
s.value === "completed" || s.value === "failed" || s.value === "aborted" || s.value === "skipped"
);
// Check for success
effect(() => {
if (allCompleted()) {
const failed = allNodes.filter(s => s.value === "failed");
const aborted = allNodes.filter(s => s.value === "aborted");
const completed = allNodes.filter(s => s.value === "completed");
const skipped = allNodes.filter(s => s.value === "skipped");
console.log(`Workflow complete: ${completed.length} completed, ${failed.length} failed, ${aborted.length} aborted, ${skipped.length} skipped`);
// Cleanup
workflowRoot.dispose();
}
});
// Handle system-level abort (e.g., provider outage, auth failure)
function handleSystemFailure(error: Error) {
workflowRoot.abortAll();
prm.abortAll(pendingRequestIds);
workflowRoot.dispose();
console.error(`Workflow aborted: ${error.message}`);
}
```
## Export/Import for Persistence
```typescript
import { FlowGraph } from "@alkdev/flowgraph/graph";
// Export the call graph for persistence
const serialized = callGraph.export();
// → FlowGraphSerialized format (graphology native JSON)
// Store in Postgres (hub's responsibility)
await db.query('INSERT INTO call_graphs (id, data) VALUES ($1, $2)', [workflowId, JSON.stringify(serialized)]);
// Restore from persistence
const restored = FlowGraph.fromJSON(serialized);
// → FlowGraph<CallNodeAttrs, CallEdgeAttrs> with all nodes and edges
```
## Call Graph Population (Real-Time)
The call graph can be populated incrementally from call protocol events:
```typescript
import { FlowGraph } from "@alkdev/flowgraph/graph";
// Create empty call graph
const callGraph = new FlowGraph<CallNodeAttrs, CallEdgeAttrs>();
// Subscribe to call protocol events
pubsub.subscribe("call.requested", (event) => callGraph.updateFromEvent(event));
pubsub.subscribe("call.responded", (event) => callGraph.updateFromEvent(event));
pubsub.subscribe("call.error", (event) => callGraph.updateFromEvent(event));
pubsub.subscribe("call.aborted", (event) => callGraph.updateFromEvent(event));
pubsub.subscribe("call.completed", (event) => callGraph.updateFromEvent(event));
// Query the call graph for observability
callGraph.filterByStatus("running"); // What's currently running
callGraph.children("req_abc123"); // Children of a call
callGraph.lineage("req_xyz789"); // Ancestor chain
callGraph.duration("req_abc123"); // How long a call took
```
## Minimal Integration Example
For consumers that only need the operation graph and template validation (no reactive execution):
```typescript
import { FlowGraph } from "@alkdev/flowgraph/graph";
import { h } from "@alkdev/ujsx";
import { Operation, Sequential } from "@alkdev/flowgraph/component";
import { validateTemplate, typeCompat } from "@alkdev/flowgraph/analysis";
// 1. Build operation graph
const operationGraph = FlowGraph.fromSpecs(registry.getAll());
// 2. Define and validate template
const template = h(Sequential, {},
h(Operation, { name: "task.classify" }),
h(Operation, { name: "task.enrich" }),
);
const errors = validateTemplate(template, operationGraph);
// 3. Query type compatibility
const result = typeCompat(
registry.get("task.classify").outputSchema,
registry.get("task.enrich").inputSchema,
);
console.log(result.compatible); // → true or false
console.log(result.mismatches); // → TypeMismatch[] if incompatible
```
This integration only requires `@alkdev/flowgraph/graph`, `@alkdev/flowgraph/component`, and `@alkdev/flowgraph/analysis`. No reactive execution, no ujsx HostConfig, no signals.
## Module Dependency Map
```
┌─────────────────────────────────────────────────┐
│ Consumer (hub coordinator, OpenCode plugin) │
└────────┬────────────────┬────────────────┬───────┘
│ │ │
┌────▼────┐ ┌──────▼──────┐ ┌──────▼──────┐
│ graph │ │ component │ │ analysis │
│ │ │ │ │ │
│FlowGraph│ │Operation │ │typeCompat │
│fromSpecs│ │Sequential │ │validate │
│queries │ │Parallel │ │topological │
│mutations│ │Conditional │ │parallelGroups│
└────┬────┘ │Map │ └──────┬──────┘
│ └──────┬───────┘ │
│ │ │
┌────▼────────────────▼─────────────────▼─────┐
│ schema │
│ OperationNodeAttrs CallNodeAttrs │
│ OperationEdgeAttrs CallEdgeAttrs │
│ TemplateEdgeAttrs NodeStatus EdgeType │
└──────────────────┬──────────────────────────┘
┌──────────────────▼──────────────────────────┐
│ host │
│ GraphologyHostConfig ReactiveHostConfig │
└──────────────────┬──────────────────────────┘
┌──────────────────▼──────────────────────────┐
│ reactive │
│ WorkflowReactiveRoot WorkflowNode │
│ NodeStatus signals computed preconditions │
└──────────────────┬──────────────────────────┘
┌──────────────────▼──────────────────────────┐
│ error │
│ FlowgraphError hierarchy │
└──────────────────────────────────────────────┘
External dependencies:
┌────────────┐ ┌────────────┐ ┌──────────────┐
│ graphology │ │ ujsx │ │@preact/sign │
│ graphology │ │ h, create │ │ als-core │
│ -dag │ │ Root │ │ signal,comp, │
└─────────────┘ └────────────┘ │ effect │
└──────────────┘
```
## Common Patterns
### Pattern: SDD Pipeline
```typescript
// The archetypal SDD (Spec-Driven Development) pipeline
const sddPipeline = h(Sequential, {},
h(Operation, { name: "task.architect" }),
h(Conditional, {
test: (results) => results["task.architect"].output.approved,
},
h(Sequential, {},
h(Operation, { name: "task.decomposer" }),
h(Operation, { name: "task.coordinator" }),
),
// else-branch: architect disapproved, loop back or stop
h(Operation, { name: "task.notify-stakeholder" }),
),
);
```
### Pattern: Fan-Out/Fan-In
```typescript
// Process items in parallel, then aggregate
const fanOut = h(Sequential, {},
h(Operation, { name: "task.fetch-items" }),
h(Map, {
over: (results) => results["task.fetch-items"].output.items,
as: "item",
},
h(Operation, { name: "task.process-item" }),
),
h(Operation, { name: "task.aggregate-results" }),
);
```
### Pattern: Error Boundary with Conditional
```typescript
// Critical operation with graceful degradation
const withFallback = h(Sequential, {},
h(Conditional, {
test: (results) => results["task.fetch-data"].status !== "failed",
},
// Happy path
h(Operation, { name: "task.transform" }),
// Fallback
h(Operation, { name: "task.use-cache" }),
),
// This operation runs regardless — the Conditional resolves
// whether the then or else branch was taken
h(Operation, { name: "task.notify" }),
);
```
## Constraints on Consumers
- **The hub coordinator drives execution** — flowgraph provides reactive state (signals, computed), not call execution. The coordinator reads `preconditions` and `blockedByFailure` and calls `registry.execute()` when appropriate.
- **Dispose is mandatory** — `WorkflowReactiveRoot.dispose()` must be called when the workflow completes or is cancelled. Without disposal, signal subscriptions leak.
- **Template rendering is currently one-shot** — until the ujsx reconciler is implemented, `createRoot(host, container).render(template)` can only be called once per root. To re-render, create a new root.
- **Function props don't survive serialization** — `Conditional.test` and `Map.over` with function values require runtime resolution. Use string references for stored templates.
- **Call graph is independent of reactive execution** — you can build a call graph from events without using the reactive layer. The reactive layer is optional for consumers that only need observability.
## References
- Architecture overview: [README.md](README.md)
- FlowGraph API: [flowgraph-api.md](flowgraph-api.md)
- Schema: [schema.md](schema.md)
- Workflow templates: [workflow-templates.md](workflow-templates.md)
- Host configs: [host-configs.md](host-configs.md)
- Reactive execution: [reactive-execution.md](reactive-execution.md)
- Call graph: [call-graph.md](call-graph.md)
- Analysis: [analysis.md](analysis.md)