Files
flowgraph/docs/architecture/consumer-integration.md
glm-5.1 907c33650f fix: architecture review - address 5 critical issues, 6 warnings, 3 suggestions
Critical fixes:
- C1: Create standalone ADR-006 file (edge type consistency),
  extract from open-questions.md inline content
- C2: Convert CallResult from plain interface to TypeBox schema,
  aligning with 'TypeBox as single source of truth' constraint
- C3: Add fromJSON() cycle detection specification - enforce
  ADR-002 DAG invariant even on deserialized input
- C4: Rewrite consumer-integration.md Phase 4 to use ADR-005
  event-append pattern instead of direct signal mutation
- C5: Fix operator precedence bug in consumer-integration.md
  (missing parentheses around OR condition)

Warnings addressed:
- W1: Fix immutability claim - operation graph is 'conventionally
  immutable', not prevented by API
- W2: Add EventLogProjection to reactive exports map
- W3: Add CallResult/CallResultSchema to schema exports map
- W4: Fix reactive-execution.md Level 1 error handling to use
  event-append pattern instead of direct signal mutation
- W5: Remove duplicate dataFlow inference description in schema.md
- W6: Clarify ADR-006 project context (flowgraph vs taskgraph)

Suggestions implemented:
- S1: Add 'reviewed' document lifecycle status between draft/stable,
  update all docs to reviewed status
- S2: Add carve-out note for analysis result types in schema.md
  constraints (they are ephemeral, not serialized)
- S3: Add isComplete() and getAggregateStatus() convenience methods
  to WorkflowReactiveRoot specification
2026-05-21 19:40:45 +00:00

18 KiB

status, last_updated
status last_updated
reviewed 2026-05-19

Consumer Integration Guide

End-to-end walkthrough: from operation specs to a running workflow. This document shows how a consumer (alkhub, OpenCode, cograph) uses flowgraph's components together.

Overview

The integration path follows five phases:

1. Register operations → Build operation graph
2. Define workflow template → Validate against operation graph
3. Render template to DAG → Validate DAG structure
4. Create reactive execution → Drive workflow via signals
5. Subscribe to status changes → Respond to completion/failure

Each phase uses a different flowgraph module. The complete integration uses all modules; partial integrations are possible.

Phase 1: Register Operations → Build Operation Graph

import { OperationRegistry } from "@alkdev/operations";
import { FlowGraph } from "@alkdev/flowgraph/graph";
import { buildTypeEdges } from "@alkdev/flowgraph/analysis";

// 1. Create the registry with operation specs
const registry = new OperationRegistry([
  { namespace: "task", name: "classify", type: "query", inputSchema: {...}, outputSchema: {...} },
  { namespace: "task", name: "enrich", type: "query", inputSchema: {...}, outputSchema: {...} },
  { namespace: "task", name: "summarize", type: "mutation", inputSchema: {...}, outputSchema: {...} },
  // ... more operations
]);

// 2. Build the operation graph
const operationGraph = FlowGraph.fromSpecs(registry.getAll());

// 3. The graph now has type-compatibility edges
operationGraph.hasEdge("task.classify", "task.enrich");  // → true (if compatible)
operationGraph.getEdgeAttributes("task.classify", "task.enrich");
// → { edgeType: "typed", compatible: true, detail: "classify.output → enrich.input" }

What happens internally:

  • fromSpecs() creates a node for each operation (key: namespace.name)
  • buildTypeEdges() compares each pair's outputSchemainputSchema and adds edges
  • Cycles are rejected at construction time (DAG invariant)

Partial integration: If you only need the operation graph (no workflows), stop here. The operation graph is useful for type-compatibility queries and topological ordering without defining any templates.

Phase 2: Define Workflow Template → Validate

import { h } from "@alkdev/ujsx";
import { Operation, Sequential, Parallel, Conditional, Map } from "@alkdev/flowgraph/component";
import { validateTemplate } from "@alkdev/flowgraph/analysis";

// Define a template
const template = h(Sequential, {},
  h(Operation, { name: "task.classify" }),
  h(Conditional, {
    test: (results) => results["task.classify"].output.confidence > 0.8,
  },
    // High-confidence path
    h(Parallel, {},
      h(Operation, { name: "task.enrich" }),
      h(Operation, { name: "task.summarize" }),
    ),
    // Low-confidence fallback
    h(Operation, { name: "task.classify" }),  // re-classify with different params
  ),
);

// Validate against the operation graph
const errors = validateTemplate(template, operationGraph);
if (errors.length > 0) {
  for (const error of errors) {
    console.error(`Validation error: ${error.type}`, error);
  }
  // Handle errors...
}

Validation checks:

  1. All Operation names exist in the registry
  2. No cycles in the rendered DAG
  3. Type compatibility between sequential operations
  4. All operations are reachable from the start

Template serialization (for storage/transmission):

// Serialize to JSON
const json = JSON.stringify(template);

// Deserialize and validate
const parsed = JSON.parse(json);
const templateErrors = validateTemplate(parsed, operationGraph);

Note: function-valued props (like Conditional.test) don't survive JSON serialization. Use string references for stored templates and resolve them at render time.

Phase 3: Render Template to DAG → Validate Structure

import { createRoot } from "@alkdev/ujsx";
import { GraphologyHostConfig } from "@alkdev/flowgraph/host/graphology";
import { DirectedGraph } from "graphology";

// Create the GraphologyHostConfig
const hostConfig = new GraphologyHostConfig();
const root = createRoot(hostConfig, new DirectedGraph());

// Render the template to a DAG
root.render(template);

// The DAG is now available in the root context
const dag = root.ctx.graph;

// Validate the DAG
dag.hasCycles();                  // → false (always, if template is valid)
dag.nodes();                      // → ["task.classify", "task.enrich", "task.summarize"]
dag.edges();                      // → ["task.classify->task.enrich", "task.classify->task.summarize"]

// Query the DAG
dag.inNeighbors("task.enrich");    // → ["task.classify"]
dag.outNeighbors("task.classify"); // → ["task.enrich", "task.summarize"]

What happens internally:

  • The GraphologyHostConfig renders each Operation as a node and each structural relationship (Sequential, Parallel, Conditional) as edges
  • Structural containers (Sequential, Parallel, Conditional) are transparent — they produce edges, not nodes
  • The result is a pure DAG that can be analyzed, serialized, or used for validation

Phase 4: Create Reactive Execution → Drive Workflow

import { WorkflowReactiveRoot } from "@alkdev/flowgraph/reactive";
import { ReactiveHostConfig } from "@alkdev/flowgraph/host/reactive";

// 1. Create the ReactiveRoot from the DAG
const workflowRoot = new WorkflowReactiveRoot(dag);

// 2. Create the ReactiveHostConfig 
const reactiveHost = new ReactiveHostConfig(registry, workflowRoot);

// 3. Render the template to create reactive state
const reactiveRoot = createRoot(reactiveHost, {});
reactiveRoot.render(template);

// 4. Drive execution via the event log (ADR-005 pattern)
// The hub coordinator appends call protocol events; the projections derive state.
for (const [nodeId, node] of workflowRoot.nodes) {
  // Start the call when preconditions are met
  effect(() => {
    if (node.preconditions.value && (node.status.value === "idle" || node.status.value === "waiting")) {
      // All preconditions satisfied — start the call by appending to the event log
      const operationId = dag.getNodeAttributes(nodeId).name;
      const requestId = crypto.randomUUID();
      workflowRoot.nodeKeyToRequestId.set(nodeId, requestId);
      
      // Append call.requested event — the status projection derives "running" from this
      workflowRoot.append({
        type: "call.requested",
        requestId,
        operationId,
        input: getInput(nodeId, workflowRoot),
        timestamp: new Date().toISOString(),
      });
      
      // Start the actual call — when it completes, append the result event
      registry.execute(operationId, getInput(nodeId, workflowRoot), { parentRequestId })
        .then(result => {
          // Append call.responded event — the status projection derives "completed" from this
          workflowRoot.append({
            type: "call.responded",
            requestId,
            output: result,
            timestamp: new Date().toISOString(),
          });
        })
        .catch(error => {
          // Append call.error event — the status projection derives "failed" from this
          workflowRoot.append({
            type: "call.error",
            requestId,
            error: { code: error.code, message: error.message },
            timestamp: new Date().toISOString(),
          });
        });
    }
  });
  
  // Track failures for logging
  effect(() => {
    if (node.status.value === "failed") {
      console.error(`Node ${nodeId} failed`);
    }
  });
}

// 5. Kick off the workflow — root nodes start as "ready"
// (The effect-driven execution above handles the rest automatically)
// Root nodes' preconditions are true by default (no predecessors)
// so they transition to "ready" immediately

What happens automatically:

  • Node status changes propagate reactively through computed preconditions
  • When a predecessor completes, dependents automatically transition to ready
  • When a predecessor fails, dependents' blockedByFailure triggers and they transition to aborted
  • The entire workflow progresses without manual orchestration

Phase 5: Handle Completion → Cleanup

// Track overall workflow status
const allNodes = Array.from(workflowRoot.statusMap.values());
const allCompleted = () => allNodes.every(s => 
  s.value === "completed" || s.value === "failed" || s.value === "aborted" || s.value === "skipped"
);

// Check for success
effect(() => {
  if (allCompleted()) {
    const failed = allNodes.filter(s => s.value === "failed");
    const aborted = allNodes.filter(s => s.value === "aborted");
    const completed = allNodes.filter(s => s.value === "completed");
    const skipped = allNodes.filter(s => s.value === "skipped");
    
    console.log(`Workflow complete: ${completed.length} completed, ${failed.length} failed, ${aborted.length} aborted, ${skipped.length} skipped`);
    
    // Cleanup
    workflowRoot.dispose();
  }
});

// Handle system-level abort (e.g., provider outage, auth failure)
function handleSystemFailure(error: Error) {
  workflowRoot.abortAll();
  prm.abortAll(pendingRequestIds);
  workflowRoot.dispose();
  console.error(`Workflow aborted: ${error.message}`);
}

Export/Import for Persistence

import { FlowGraph } from "@alkdev/flowgraph/graph";

// Export the call graph for persistence
const serialized = callGraph.export();
// → FlowGraphSerialized format (graphology native JSON)

// Store in Postgres (hub's responsibility)
await db.query('INSERT INTO call_graphs (id, data) VALUES ($1, $2)', [workflowId, JSON.stringify(serialized)]);

// Restore from persistence
const restored = FlowGraph.fromJSON(serialized);
// → FlowGraph<CallNodeAttrs, CallEdgeAttrs> with all nodes and edges

Call Graph Population (Real-Time)

The call graph can be populated incrementally from call protocol events:

import { FlowGraph } from "@alkdev/flowgraph/graph";

// Create empty call graph
const callGraph = new FlowGraph<CallNodeAttrs, CallEdgeAttrs>();

// Subscribe to call protocol events
pubsub.subscribe("call.requested", (event) => callGraph.updateFromEvent(event));
pubsub.subscribe("call.responded", (event) => callGraph.updateFromEvent(event));
pubsub.subscribe("call.error", (event) => callGraph.updateFromEvent(event));
pubsub.subscribe("call.aborted", (event) => callGraph.updateFromEvent(event));
pubsub.subscribe("call.completed", (event) => callGraph.updateFromEvent(event));

// Query the call graph for observability
callGraph.filterByStatus("running");  // What's currently running
callGraph.children("req_abc123");      // Children of a call
callGraph.lineage("req_xyz789");        // Ancestor chain
callGraph.duration("req_abc123");       // How long a call took

Minimal Integration Example

For consumers that only need the operation graph and template validation (no reactive execution):

import { FlowGraph } from "@alkdev/flowgraph/graph";
import { h } from "@alkdev/ujsx";
import { Operation, Sequential } from "@alkdev/flowgraph/component";
import { validateTemplate, typeCompat } from "@alkdev/flowgraph/analysis";

// 1. Build operation graph
const operationGraph = FlowGraph.fromSpecs(registry.getAll());

// 2. Define and validate template
const template = h(Sequential, {},
  h(Operation, { name: "task.classify" }),
  h(Operation, { name: "task.enrich" }),
);
const errors = validateTemplate(template, operationGraph);

// 3. Query type compatibility
const result = typeCompat(
  registry.get("task.classify").outputSchema,
  registry.get("task.enrich").inputSchema,
);
console.log(result.compatible);  // → true or false
console.log(result.mismatches);  // → TypeMismatch[] if incompatible

This integration only requires @alkdev/flowgraph/graph, @alkdev/flowgraph/component, and @alkdev/flowgraph/analysis. No reactive execution, no ujsx HostConfig, no signals.

Module Dependency Map

┌─────────────────────────────────────────────────┐
│ Consumer (hub coordinator, OpenCode plugin)      │
└────────┬────────────────┬────────────────┬───────┘
         │                │                │
    ┌────▼────┐    ┌──────▼──────┐  ┌──────▼──────┐
    │  graph  │    │  component  │  │  analysis   │
    │         │    │              │  │              │
    │FlowGraph│    │Operation     │  │typeCompat    │
    │fromSpecs│    │Sequential    │  │validate      │
    │queries  │    │Parallel      │  │topological   │
    │mutations│    │Conditional   │  │parallelGroups│
    └────┬────┘    │Map           │  └──────┬──────┘
         │         └──────┬───────┘         │
         │                │                 │
    ┌────▼────────────────▼─────────────────▼─────┐
    │              schema                          │
    │  OperationNodeAttrs  CallNodeAttrs           │
    │  OperationEdgeAttrs  CallEdgeAttrs           │
    │  TemplateEdgeAttrs  NodeStatus  EdgeType     │
    └──────────────────┬──────────────────────────┘
                       │
    ┌──────────────────▼──────────────────────────┐
    │              host                             │
    │  GraphologyHostConfig  ReactiveHostConfig    │
    └──────────────────┬──────────────────────────┘
                       │
    ┌──────────────────▼──────────────────────────┐
    │              reactive                         │
    │  WorkflowReactiveRoot  WorkflowNode          │
    │  NodeStatus signals   computed preconditions │
    └──────────────────┬──────────────────────────┘
                       │
    ┌──────────────────▼──────────────────────────┐
    │              error                            │
    │  FlowgraphError hierarchy                     │
    └──────────────────────────────────────────────┘
    
    External dependencies:
    ┌────────────┐  ┌────────────┐  ┌──────────────┐
    │ graphology  │  │    ujsx    │  │@preact/sign  │
    │  graphology │  │  h, create │  │   als-core    │
    │    -dag     │  │   Root     │  │ signal,comp,  │
    └─────────────┘  └────────────┘  │   effect      │
                                     └──────────────┘

Common Patterns

Pattern: SDD Pipeline

// The archetypal SDD (Spec-Driven Development) pipeline
const sddPipeline = h(Sequential, {},
  h(Operation, { name: "task.architect" }),
  h(Conditional, {
    test: (results) => results["task.architect"].output.approved,
  },
    h(Sequential, {},
      h(Operation, { name: "task.decomposer" }),
      h(Operation, { name: "task.coordinator" }),
    ),
    // else-branch: architect disapproved, loop back or stop
    h(Operation, { name: "task.notify-stakeholder" }),
  ),
);

Pattern: Fan-Out/Fan-In

// Process items in parallel, then aggregate
const fanOut = h(Sequential, {},
  h(Operation, { name: "task.fetch-items" }),
  h(Map, {
    over: (results) => results["task.fetch-items"].output.items,
    as: "item",
  },
    h(Operation, { name: "task.process-item" }),
  ),
  h(Operation, { name: "task.aggregate-results" }),
);

Pattern: Error Boundary with Conditional

// Critical operation with graceful degradation
const withFallback = h(Sequential, {},
  h(Conditional, {
    test: (results) => results["task.fetch-data"].status !== "failed",
  },
    // Happy path
    h(Operation, { name: "task.transform" }),
    // Fallback
    h(Operation, { name: "task.use-cache" }),
  ),
  // This operation runs regardless — the Conditional resolves
  // whether the then or else branch was taken
  h(Operation, { name: "task.notify" }),
);

Constraints on Consumers

  • The hub coordinator drives execution — flowgraph provides reactive state (signals, computed), not call execution. The coordinator reads preconditions and blockedByFailure and calls registry.execute() when appropriate.
  • Dispose is mandatoryWorkflowReactiveRoot.dispose() must be called when the workflow completes or is cancelled. Without disposal, signal subscriptions leak.
  • Template rendering is currently one-shot — until the ujsx reconciler is implemented, createRoot(host, container).render(template) can only be called once per root. To re-render, create a new root.
  • Function props don't survive serializationConditional.test and Map.over with function values require runtime resolution. Use string references for stored templates.
  • Call graph is independent of reactive execution — you can build a call graph from events without using the reactive layer. The reactive layer is optional for consumers that only need observability.

References