Critical fixes: - C1: Create standalone ADR-006 file (edge type consistency), extract from open-questions.md inline content - C2: Convert CallResult from plain interface to TypeBox schema, aligning with 'TypeBox as single source of truth' constraint - C3: Add fromJSON() cycle detection specification - enforce ADR-002 DAG invariant even on deserialized input - C4: Rewrite consumer-integration.md Phase 4 to use ADR-005 event-append pattern instead of direct signal mutation - C5: Fix operator precedence bug in consumer-integration.md (missing parentheses around OR condition) Warnings addressed: - W1: Fix immutability claim - operation graph is 'conventionally immutable', not prevented by API - W2: Add EventLogProjection to reactive exports map - W3: Add CallResult/CallResultSchema to schema exports map - W4: Fix reactive-execution.md Level 1 error handling to use event-append pattern instead of direct signal mutation - W5: Remove duplicate dataFlow inference description in schema.md - W6: Clarify ADR-006 project context (flowgraph vs taskgraph) Suggestions implemented: - S1: Add 'reviewed' document lifecycle status between draft/stable, update all docs to reviewed status - S2: Add carve-out note for analysis result types in schema.md constraints (they are ephemeral, not serialized) - S3: Add isComplete() and getAggregateStatus() convenience methods to WorkflowReactiveRoot specification
445 lines
18 KiB
Markdown
445 lines
18 KiB
Markdown
---
|
|
status: reviewed
|
|
last_updated: 2026-05-19
|
|
---
|
|
|
|
# Consumer Integration Guide
|
|
|
|
End-to-end walkthrough: from operation specs to a running workflow. This document shows how a consumer (alkhub, OpenCode, cograph) uses flowgraph's components together.
|
|
|
|
## Overview
|
|
|
|
The integration path follows five phases:
|
|
|
|
```
|
|
1. Register operations → Build operation graph
|
|
2. Define workflow template → Validate against operation graph
|
|
3. Render template to DAG → Validate DAG structure
|
|
4. Create reactive execution → Drive workflow via signals
|
|
5. Subscribe to status changes → Respond to completion/failure
|
|
```
|
|
|
|
Each phase uses a different flowgraph module. The complete integration uses all modules; partial integrations are possible.
|
|
|
|
## Phase 1: Register Operations → Build Operation Graph
|
|
|
|
```typescript
|
|
import { OperationRegistry } from "@alkdev/operations";
|
|
import { FlowGraph } from "@alkdev/flowgraph/graph";
|
|
import { buildTypeEdges } from "@alkdev/flowgraph/analysis";
|
|
|
|
// 1. Create the registry with operation specs
|
|
const registry = new OperationRegistry([
|
|
{ namespace: "task", name: "classify", type: "query", inputSchema: {...}, outputSchema: {...} },
|
|
{ namespace: "task", name: "enrich", type: "query", inputSchema: {...}, outputSchema: {...} },
|
|
{ namespace: "task", name: "summarize", type: "mutation", inputSchema: {...}, outputSchema: {...} },
|
|
// ... more operations
|
|
]);
|
|
|
|
// 2. Build the operation graph
|
|
const operationGraph = FlowGraph.fromSpecs(registry.getAll());
|
|
|
|
// 3. The graph now has type-compatibility edges
|
|
operationGraph.hasEdge("task.classify", "task.enrich"); // → true (if compatible)
|
|
operationGraph.getEdgeAttributes("task.classify", "task.enrich");
|
|
// → { edgeType: "typed", compatible: true, detail: "classify.output → enrich.input" }
|
|
```
|
|
|
|
**What happens internally**:
|
|
- `fromSpecs()` creates a node for each operation (key: `namespace.name`)
|
|
- `buildTypeEdges()` compares each pair's `outputSchema` → `inputSchema` and adds edges
|
|
- Cycles are rejected at construction time (DAG invariant)
|
|
|
|
**Partial integration**: If you only need the operation graph (no workflows), stop here. The operation graph is useful for type-compatibility queries and topological ordering without defining any templates.
|
|
|
|
## Phase 2: Define Workflow Template → Validate
|
|
|
|
```typescript
|
|
import { h } from "@alkdev/ujsx";
|
|
import { Operation, Sequential, Parallel, Conditional, Map } from "@alkdev/flowgraph/component";
|
|
import { validateTemplate } from "@alkdev/flowgraph/analysis";
|
|
|
|
// Define a template
|
|
const template = h(Sequential, {},
|
|
h(Operation, { name: "task.classify" }),
|
|
h(Conditional, {
|
|
test: (results) => results["task.classify"].output.confidence > 0.8,
|
|
},
|
|
// High-confidence path
|
|
h(Parallel, {},
|
|
h(Operation, { name: "task.enrich" }),
|
|
h(Operation, { name: "task.summarize" }),
|
|
),
|
|
// Low-confidence fallback
|
|
h(Operation, { name: "task.classify" }), // re-classify with different params
|
|
),
|
|
);
|
|
|
|
// Validate against the operation graph
|
|
const errors = validateTemplate(template, operationGraph);
|
|
if (errors.length > 0) {
|
|
for (const error of errors) {
|
|
console.error(`Validation error: ${error.type}`, error);
|
|
}
|
|
// Handle errors...
|
|
}
|
|
```
|
|
|
|
**Validation checks**:
|
|
1. All `Operation` names exist in the registry
|
|
2. No cycles in the rendered DAG
|
|
3. Type compatibility between sequential operations
|
|
4. All operations are reachable from the start
|
|
|
|
**Template serialization** (for storage/transmission):
|
|
|
|
```typescript
|
|
// Serialize to JSON
|
|
const json = JSON.stringify(template);
|
|
|
|
// Deserialize and validate
|
|
const parsed = JSON.parse(json);
|
|
const templateErrors = validateTemplate(parsed, operationGraph);
|
|
```
|
|
|
|
Note: function-valued props (like `Conditional.test`) don't survive JSON serialization. Use string references for stored templates and resolve them at render time.
|
|
|
|
## Phase 3: Render Template to DAG → Validate Structure
|
|
|
|
```typescript
|
|
import { createRoot } from "@alkdev/ujsx";
|
|
import { GraphologyHostConfig } from "@alkdev/flowgraph/host/graphology";
|
|
import { DirectedGraph } from "graphology";
|
|
|
|
// Create the GraphologyHostConfig
|
|
const hostConfig = new GraphologyHostConfig();
|
|
const root = createRoot(hostConfig, new DirectedGraph());
|
|
|
|
// Render the template to a DAG
|
|
root.render(template);
|
|
|
|
// The DAG is now available in the root context
|
|
const dag = root.ctx.graph;
|
|
|
|
// Validate the DAG
|
|
dag.hasCycles(); // → false (always, if template is valid)
|
|
dag.nodes(); // → ["task.classify", "task.enrich", "task.summarize"]
|
|
dag.edges(); // → ["task.classify->task.enrich", "task.classify->task.summarize"]
|
|
|
|
// Query the DAG
|
|
dag.inNeighbors("task.enrich"); // → ["task.classify"]
|
|
dag.outNeighbors("task.classify"); // → ["task.enrich", "task.summarize"]
|
|
```
|
|
|
|
**What happens internally**:
|
|
- The `GraphologyHostConfig` renders each `Operation` as a node and each structural relationship (`Sequential`, `Parallel`, `Conditional`) as edges
|
|
- Structural containers (`Sequential`, `Parallel`, `Conditional`) are transparent — they produce edges, not nodes
|
|
- The result is a pure DAG that can be analyzed, serialized, or used for validation
|
|
|
|
## Phase 4: Create Reactive Execution → Drive Workflow
|
|
|
|
```typescript
|
|
import { WorkflowReactiveRoot } from "@alkdev/flowgraph/reactive";
|
|
import { ReactiveHostConfig } from "@alkdev/flowgraph/host/reactive";
|
|
|
|
// 1. Create the ReactiveRoot from the DAG
|
|
const workflowRoot = new WorkflowReactiveRoot(dag);
|
|
|
|
// 2. Create the ReactiveHostConfig
|
|
const reactiveHost = new ReactiveHostConfig(registry, workflowRoot);
|
|
|
|
// 3. Render the template to create reactive state
|
|
const reactiveRoot = createRoot(reactiveHost, {});
|
|
reactiveRoot.render(template);
|
|
|
|
// 4. Drive execution via the event log (ADR-005 pattern)
|
|
// The hub coordinator appends call protocol events; the projections derive state.
|
|
for (const [nodeId, node] of workflowRoot.nodes) {
|
|
// Start the call when preconditions are met
|
|
effect(() => {
|
|
if (node.preconditions.value && (node.status.value === "idle" || node.status.value === "waiting")) {
|
|
// All preconditions satisfied — start the call by appending to the event log
|
|
const operationId = dag.getNodeAttributes(nodeId).name;
|
|
const requestId = crypto.randomUUID();
|
|
workflowRoot.nodeKeyToRequestId.set(nodeId, requestId);
|
|
|
|
// Append call.requested event — the status projection derives "running" from this
|
|
workflowRoot.append({
|
|
type: "call.requested",
|
|
requestId,
|
|
operationId,
|
|
input: getInput(nodeId, workflowRoot),
|
|
timestamp: new Date().toISOString(),
|
|
});
|
|
|
|
// Start the actual call — when it completes, append the result event
|
|
registry.execute(operationId, getInput(nodeId, workflowRoot), { parentRequestId })
|
|
.then(result => {
|
|
// Append call.responded event — the status projection derives "completed" from this
|
|
workflowRoot.append({
|
|
type: "call.responded",
|
|
requestId,
|
|
output: result,
|
|
timestamp: new Date().toISOString(),
|
|
});
|
|
})
|
|
.catch(error => {
|
|
// Append call.error event — the status projection derives "failed" from this
|
|
workflowRoot.append({
|
|
type: "call.error",
|
|
requestId,
|
|
error: { code: error.code, message: error.message },
|
|
timestamp: new Date().toISOString(),
|
|
});
|
|
});
|
|
}
|
|
});
|
|
|
|
// Track failures for logging
|
|
effect(() => {
|
|
if (node.status.value === "failed") {
|
|
console.error(`Node ${nodeId} failed`);
|
|
}
|
|
});
|
|
}
|
|
|
|
// 5. Kick off the workflow — root nodes start as "ready"
|
|
// (The effect-driven execution above handles the rest automatically)
|
|
// Root nodes' preconditions are true by default (no predecessors)
|
|
// so they transition to "ready" immediately
|
|
```
|
|
|
|
**What happens automatically**:
|
|
- Node status changes propagate reactively through `computed` preconditions
|
|
- When a predecessor completes, dependents automatically transition to `ready`
|
|
- When a predecessor fails, dependents' `blockedByFailure` triggers and they transition to `aborted`
|
|
- The entire workflow progresses without manual orchestration
|
|
|
|
## Phase 5: Handle Completion → Cleanup
|
|
|
|
```typescript
|
|
// Track overall workflow status
|
|
const allNodes = Array.from(workflowRoot.statusMap.values());
|
|
const allCompleted = () => allNodes.every(s =>
|
|
s.value === "completed" || s.value === "failed" || s.value === "aborted" || s.value === "skipped"
|
|
);
|
|
|
|
// Check for success
|
|
effect(() => {
|
|
if (allCompleted()) {
|
|
const failed = allNodes.filter(s => s.value === "failed");
|
|
const aborted = allNodes.filter(s => s.value === "aborted");
|
|
const completed = allNodes.filter(s => s.value === "completed");
|
|
const skipped = allNodes.filter(s => s.value === "skipped");
|
|
|
|
console.log(`Workflow complete: ${completed.length} completed, ${failed.length} failed, ${aborted.length} aborted, ${skipped.length} skipped`);
|
|
|
|
// Cleanup
|
|
workflowRoot.dispose();
|
|
}
|
|
});
|
|
|
|
// Handle system-level abort (e.g., provider outage, auth failure)
|
|
function handleSystemFailure(error: Error) {
|
|
workflowRoot.abortAll();
|
|
prm.abortAll(pendingRequestIds);
|
|
workflowRoot.dispose();
|
|
console.error(`Workflow aborted: ${error.message}`);
|
|
}
|
|
```
|
|
|
|
## Export/Import for Persistence
|
|
|
|
```typescript
|
|
import { FlowGraph } from "@alkdev/flowgraph/graph";
|
|
|
|
// Export the call graph for persistence
|
|
const serialized = callGraph.export();
|
|
// → FlowGraphSerialized format (graphology native JSON)
|
|
|
|
// Store in Postgres (hub's responsibility)
|
|
await db.query('INSERT INTO call_graphs (id, data) VALUES ($1, $2)', [workflowId, JSON.stringify(serialized)]);
|
|
|
|
// Restore from persistence
|
|
const restored = FlowGraph.fromJSON(serialized);
|
|
// → FlowGraph<CallNodeAttrs, CallEdgeAttrs> with all nodes and edges
|
|
```
|
|
|
|
## Call Graph Population (Real-Time)
|
|
|
|
The call graph can be populated incrementally from call protocol events:
|
|
|
|
```typescript
|
|
import { FlowGraph } from "@alkdev/flowgraph/graph";
|
|
|
|
// Create empty call graph
|
|
const callGraph = new FlowGraph<CallNodeAttrs, CallEdgeAttrs>();
|
|
|
|
// Subscribe to call protocol events
|
|
pubsub.subscribe("call.requested", (event) => callGraph.updateFromEvent(event));
|
|
pubsub.subscribe("call.responded", (event) => callGraph.updateFromEvent(event));
|
|
pubsub.subscribe("call.error", (event) => callGraph.updateFromEvent(event));
|
|
pubsub.subscribe("call.aborted", (event) => callGraph.updateFromEvent(event));
|
|
pubsub.subscribe("call.completed", (event) => callGraph.updateFromEvent(event));
|
|
|
|
// Query the call graph for observability
|
|
callGraph.filterByStatus("running"); // What's currently running
|
|
callGraph.children("req_abc123"); // Children of a call
|
|
callGraph.lineage("req_xyz789"); // Ancestor chain
|
|
callGraph.duration("req_abc123"); // How long a call took
|
|
```
|
|
|
|
## Minimal Integration Example
|
|
|
|
For consumers that only need the operation graph and template validation (no reactive execution):
|
|
|
|
```typescript
|
|
import { FlowGraph } from "@alkdev/flowgraph/graph";
|
|
import { h } from "@alkdev/ujsx";
|
|
import { Operation, Sequential } from "@alkdev/flowgraph/component";
|
|
import { validateTemplate, typeCompat } from "@alkdev/flowgraph/analysis";
|
|
|
|
// 1. Build operation graph
|
|
const operationGraph = FlowGraph.fromSpecs(registry.getAll());
|
|
|
|
// 2. Define and validate template
|
|
const template = h(Sequential, {},
|
|
h(Operation, { name: "task.classify" }),
|
|
h(Operation, { name: "task.enrich" }),
|
|
);
|
|
const errors = validateTemplate(template, operationGraph);
|
|
|
|
// 3. Query type compatibility
|
|
const result = typeCompat(
|
|
registry.get("task.classify").outputSchema,
|
|
registry.get("task.enrich").inputSchema,
|
|
);
|
|
console.log(result.compatible); // → true or false
|
|
console.log(result.mismatches); // → TypeMismatch[] if incompatible
|
|
```
|
|
|
|
This integration only requires `@alkdev/flowgraph/graph`, `@alkdev/flowgraph/component`, and `@alkdev/flowgraph/analysis`. No reactive execution, no ujsx HostConfig, no signals.
|
|
|
|
## Module Dependency Map
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────┐
|
|
│ Consumer (hub coordinator, OpenCode plugin) │
|
|
└────────┬────────────────┬────────────────┬───────┘
|
|
│ │ │
|
|
┌────▼────┐ ┌──────▼──────┐ ┌──────▼──────┐
|
|
│ graph │ │ component │ │ analysis │
|
|
│ │ │ │ │ │
|
|
│FlowGraph│ │Operation │ │typeCompat │
|
|
│fromSpecs│ │Sequential │ │validate │
|
|
│queries │ │Parallel │ │topological │
|
|
│mutations│ │Conditional │ │parallelGroups│
|
|
└────┬────┘ │Map │ └──────┬──────┘
|
|
│ └──────┬───────┘ │
|
|
│ │ │
|
|
┌────▼────────────────▼─────────────────▼─────┐
|
|
│ schema │
|
|
│ OperationNodeAttrs CallNodeAttrs │
|
|
│ OperationEdgeAttrs CallEdgeAttrs │
|
|
│ TemplateEdgeAttrs NodeStatus EdgeType │
|
|
└──────────────────┬──────────────────────────┘
|
|
│
|
|
┌──────────────────▼──────────────────────────┐
|
|
│ host │
|
|
│ GraphologyHostConfig ReactiveHostConfig │
|
|
└──────────────────┬──────────────────────────┘
|
|
│
|
|
┌──────────────────▼──────────────────────────┐
|
|
│ reactive │
|
|
│ WorkflowReactiveRoot WorkflowNode │
|
|
│ NodeStatus signals computed preconditions │
|
|
└──────────────────┬──────────────────────────┘
|
|
│
|
|
┌──────────────────▼──────────────────────────┐
|
|
│ error │
|
|
│ FlowgraphError hierarchy │
|
|
└──────────────────────────────────────────────┘
|
|
|
|
External dependencies:
|
|
┌────────────┐ ┌────────────┐ ┌──────────────┐
|
|
│ graphology │ │ ujsx │ │@preact/sign │
|
|
│ graphology │ │ h, create │ │ als-core │
|
|
│ -dag │ │ Root │ │ signal,comp, │
|
|
└─────────────┘ └────────────┘ │ effect │
|
|
└──────────────┘
|
|
```
|
|
|
|
## Common Patterns
|
|
|
|
### Pattern: SDD Pipeline
|
|
|
|
```typescript
|
|
// The archetypal SDD (Spec-Driven Development) pipeline
|
|
const sddPipeline = h(Sequential, {},
|
|
h(Operation, { name: "task.architect" }),
|
|
h(Conditional, {
|
|
test: (results) => results["task.architect"].output.approved,
|
|
},
|
|
h(Sequential, {},
|
|
h(Operation, { name: "task.decomposer" }),
|
|
h(Operation, { name: "task.coordinator" }),
|
|
),
|
|
// else-branch: architect disapproved, loop back or stop
|
|
h(Operation, { name: "task.notify-stakeholder" }),
|
|
),
|
|
);
|
|
```
|
|
|
|
### Pattern: Fan-Out/Fan-In
|
|
|
|
```typescript
|
|
// Process items in parallel, then aggregate
|
|
const fanOut = h(Sequential, {},
|
|
h(Operation, { name: "task.fetch-items" }),
|
|
h(Map, {
|
|
over: (results) => results["task.fetch-items"].output.items,
|
|
as: "item",
|
|
},
|
|
h(Operation, { name: "task.process-item" }),
|
|
),
|
|
h(Operation, { name: "task.aggregate-results" }),
|
|
);
|
|
```
|
|
|
|
### Pattern: Error Boundary with Conditional
|
|
|
|
```typescript
|
|
// Critical operation with graceful degradation
|
|
const withFallback = h(Sequential, {},
|
|
h(Conditional, {
|
|
test: (results) => results["task.fetch-data"].status !== "failed",
|
|
},
|
|
// Happy path
|
|
h(Operation, { name: "task.transform" }),
|
|
// Fallback
|
|
h(Operation, { name: "task.use-cache" }),
|
|
),
|
|
// This operation runs regardless — the Conditional resolves
|
|
// whether the then or else branch was taken
|
|
h(Operation, { name: "task.notify" }),
|
|
);
|
|
```
|
|
|
|
## Constraints on Consumers
|
|
|
|
- **The hub coordinator drives execution** — flowgraph provides reactive state (signals, computed), not call execution. The coordinator reads `preconditions` and `blockedByFailure` and calls `registry.execute()` when appropriate.
|
|
- **Dispose is mandatory** — `WorkflowReactiveRoot.dispose()` must be called when the workflow completes or is cancelled. Without disposal, signal subscriptions leak.
|
|
- **Template rendering is currently one-shot** — until the ujsx reconciler is implemented, `createRoot(host, container).render(template)` can only be called once per root. To re-render, create a new root.
|
|
- **Function props don't survive serialization** — `Conditional.test` and `Map.over` with function values require runtime resolution. Use string references for stored templates.
|
|
- **Call graph is independent of reactive execution** — you can build a call graph from events without using the reactive layer. The reactive layer is optional for consumers that only need observability.
|
|
|
|
## References
|
|
|
|
- Architecture overview: [README.md](README.md)
|
|
- FlowGraph API: [flowgraph-api.md](flowgraph-api.md)
|
|
- Schema: [schema.md](schema.md)
|
|
- Workflow templates: [workflow-templates.md](workflow-templates.md)
|
|
- Host configs: [host-configs.md](host-configs.md)
|
|
- Reactive execution: [reactive-execution.md](reactive-execution.md)
|
|
- Call graph: [call-graph.md](call-graph.md)
|
|
- Analysis: [analysis.md](analysis.md) |