Files
flowgraph/docs/architecture/consumer-integration.md
glm-5.1 eaeba38e71 resolve architecture review round 2: criticals, warnings, suggestions
- C-05: Add flowgraph-api.md with complete public API surface
- C-06: Document <Map> component in workflow-templates.md
- C-07: Specify Conditional else-branch behavior
- C-08: Add lifecycle/ownership section to reactive-execution.md
- C-09: Add consumer-integration.md end-to-end walkthrough
- W-02: Add reactive error boundary semantics (3 levels)
- W-03: Complete ReactiveContext interface definition
- W-04: Add template composition rules (8 rules)
- W-05: Document removeChild for both HostConfigs
- W-06: Document signal/effect disposal lifecycle
- W-07: Add ADR-004 (no schema version field)
- W-08: Add type compatibility depth/contract to analysis.md
- W-11: Add performance characteristics section
- S-01: Getting Started merged into consumer-integration.md
- S-02: Add flow diagrams for template rendering pipeline
- S-03: Add node status state machine diagram
- S-04: Add testing strategy section
- S-06: Validate source structure cross-references

Review round 2 fixes:
- Define TemplateNodeAttrs as alias for OperationNodeAttrs
- Document CallEventMapValue and CallResult types in schema.md
- Standardize CycleError naming (replace CircularDependencyError)
- Add function form to Map.over type definition
- Define Map aggregate completion/failure semantics
- Fix immutability claim for fromCallEvents
- Clarify edgeType storage alongside OperationEdgeAttrs
- Clarify WorkflowNode.status === statusMap (same Signal)
- Add component-to-tag mapping for WorkflowTag
2026-05-19 13:05:35 +00:00

17 KiB

status, last_updated
status last_updated
draft 2026-05-19

Consumer Integration Guide

End-to-end walkthrough: from operation specs to a running workflow. This document shows how a consumer (alkhub, OpenCode, cograph) uses flowgraph's components together.

Overview

The integration path follows five phases:

1. Register operations → Build operation graph
2. Define workflow template → Validate against operation graph
3. Render template to DAG → Validate DAG structure
4. Create reactive execution → Drive workflow via signals
5. Subscribe to status changes → Respond to completion/failure

Each phase uses a different flowgraph module. The complete integration uses all modules; partial integrations are possible.

Phase 1: Register Operations → Build Operation Graph

import { OperationRegistry } from "@alkdev/operations";
import { FlowGraph } from "@alkdev/flowgraph/graph";
import { buildTypeEdges } from "@alkdev/flowgraph/analysis";

// 1. Create the registry with operation specs
const registry = new OperationRegistry([
  { namespace: "task", name: "classify", type: "query", inputSchema: {...}, outputSchema: {...} },
  { namespace: "task", name: "enrich", type: "query", inputSchema: {...}, outputSchema: {...} },
  { namespace: "task", name: "summarize", type: "mutation", inputSchema: {...}, outputSchema: {...} },
  // ... more operations
]);

// 2. Build the operation graph
const operationGraph = FlowGraph.fromSpecs(registry.getAll());

// 3. The graph now has type-compatibility edges
operationGraph.hasEdge("task.classify", "task.enrich");  // → true (if compatible)
operationGraph.getEdgeAttributes("task.classify", "task.enrich");
// → { edgeType: "typed", compatible: true, detail: "classify.output → enrich.input" }

What happens internally:

  • fromSpecs() creates a node for each operation (key: namespace.name)
  • buildTypeEdges() compares each pair's outputSchemainputSchema and adds edges
  • Cycles are rejected at construction time (DAG invariant)

Partial integration: If you only need the operation graph (no workflows), stop here. The operation graph is useful for type-compatibility queries and topological ordering without defining any templates.

Phase 2: Define Workflow Template → Validate

import { h } from "@alkdev/ujsx";
import { Operation, Sequential, Parallel, Conditional, Map } from "@alkdev/flowgraph/component";
import { validateTemplate } from "@alkdev/flowgraph/analysis";

// Define a template
const template = h(Sequential, {},
  h(Operation, { name: "task.classify" }),
  h(Conditional, {
    test: (results) => results["task.classify"].output.confidence > 0.8,
  },
    // High-confidence path
    h(Parallel, {},
      h(Operation, { name: "task.enrich" }),
      h(Operation, { name: "task.summarize" }),
    ),
    // Low-confidence fallback
    h(Operation, { name: "task.classify" }),  // re-classify with different params
  ),
);

// Validate against the operation graph
const errors = validateTemplate(template, operationGraph);
if (errors.length > 0) {
  for (const error of errors) {
    console.error(`Validation error: ${error.type}`, error);
  }
  // Handle errors...
}

Validation checks:

  1. All Operation names exist in the registry
  2. No cycles in the rendered DAG
  3. Type compatibility between sequential operations
  4. All operations are reachable from the start

Template serialization (for storage/transmission):

// Serialize to JSON
const json = JSON.stringify(template);

// Deserialize and validate
const parsed = JSON.parse(json);
const templateErrors = validateTemplate(parsed, operationGraph);

Note: function-valued props (like Conditional.test) don't survive JSON serialization. Use string references for stored templates and resolve them at render time.

Phase 3: Render Template to DAG → Validate Structure

import { createRoot } from "@alkdev/ujsx";
import { GraphologyHostConfig } from "@alkdev/flowgraph/host/graphology";
import { DirectedGraph } from "graphology";

// Create the GraphologyHostConfig
const hostConfig = new GraphologyHostConfig();
const root = createRoot(hostConfig, new DirectedGraph());

// Render the template to a DAG
root.render(template);

// The DAG is now available in the root context
const dag = root.ctx.graph;

// Validate the DAG
dag.hasCycles();                  // → false (always, if template is valid)
dag.nodes();                      // → ["task.classify", "task.enrich", "task.summarize"]
dag.edges();                      // → ["task.classify->task.enrich", "task.classify->task.summarize"]

// Query the DAG
dag.inNeighbors("task.enrich");    // → ["task.classify"]
dag.outNeighbors("task.classify"); // → ["task.enrich", "task.summarize"]

What happens internally:

  • The GraphologyHostConfig renders each Operation as a node and each structural relationship (Sequential, Parallel, Conditional) as edges
  • Structural containers (Sequential, Parallel, Conditional) are transparent — they produce edges, not nodes
  • The result is a pure DAG that can be analyzed, serialized, or used for validation

Phase 4: Create Reactive Execution → Drive Workflow

import { WorkflowReactiveRoot } from "@alkdev/flowgraph/reactive";
import { ReactiveHostConfig } from "@alkdev/flowgraph/host/reactive";

// 1. Create the ReactiveRoot from the DAG
const workflowRoot = new WorkflowReactiveRoot(dag);

// 2. Create the ReactiveHostConfig 
const reactiveHost = new ReactiveHostConfig(registry, workflowRoot);

// 3. Render the template to create reactive state
const reactiveRoot = createRoot(reactiveHost, {});
reactiveRoot.render(template);

// 4. Subscribe to status changes and effect-driven execution
for (const [nodeId, node] of workflowRoot.nodes) {
  // Start the call when preconditions are met
  effect(() => {
    if (node.preconditions.value && node.status.value === "idle" || node.status.value === "waiting") {
      node.status.value = "running";
      // getInput resolves the node's input from predecessor outputs and static config
      // For Operation nodes, input comes from the template props or aggregated predecessor results
      const input = resolveInput(nodeId, workflowRoot);
      registry.execute(node.operationId, input, { parentRequestId: parentCallId })
        .then(result => { node.status.value = "completed"; node.output.value = result; })
        .catch(error => { node.status.value = "failed"; });
    }
  });
  
  // Track failures
  effect(() => {
    if (node.status.value === "failed") {
      console.error(`Node ${nodeId} failed`);
    }
  });
}

// 5. Kick off the workflow — root nodes start as "ready"
// (The effect-driven execution above handles the rest automatically)
// Root nodes' preconditions are true by default (no predecessors)
// so they transition to "ready" immediately

What happens automatically:

  • Node status changes propagate reactively through computed preconditions
  • When a predecessor completes, dependents automatically transition to ready
  • When a predecessor fails, dependents' blockedByFailure triggers and they transition to aborted
  • The entire workflow progresses without manual orchestration

Phase 5: Handle Completion → Cleanup

// Track overall workflow status
const allNodes = Array.from(workflowRoot.statusMap.values());
const allCompleted = () => allNodes.every(s => 
  s.value === "completed" || s.value === "failed" || s.value === "aborted" || s.value === "skipped"
);

// Check for success
effect(() => {
  if (allCompleted()) {
    const failed = allNodes.filter(s => s.value === "failed");
    const aborted = allNodes.filter(s => s.value === "aborted");
    const completed = allNodes.filter(s => s.value === "completed");
    const skipped = allNodes.filter(s => s.value === "skipped");
    
    console.log(`Workflow complete: ${completed.length} completed, ${failed.length} failed, ${aborted.length} aborted, ${skipped.length} skipped`);
    
    // Cleanup
    workflowRoot.dispose();
  }
});

// Handle system-level abort (e.g., provider outage, auth failure)
function handleSystemFailure(error: Error) {
  workflowRoot.abortAll();
  prm.abortAll(pendingRequestIds);
  workflowRoot.dispose();
  console.error(`Workflow aborted: ${error.message}`);
}

Export/Import for Persistence

import { FlowGraph } from "@alkdev/flowgraph/graph";

// Export the call graph for persistence
const serialized = callGraph.export();
// → FlowGraphSerialized format (graphology native JSON)

// Store in Postgres (hub's responsibility)
await db.query('INSERT INTO call_graphs (id, data) VALUES ($1, $2)', [workflowId, JSON.stringify(serialized)]);

// Restore from persistence
const restored = FlowGraph.fromJSON(serialized);
// → FlowGraph<CallNodeAttrs, CallEdgeAttrs> with all nodes and edges

Call Graph Population (Real-Time)

The call graph can be populated incrementally from call protocol events:

import { FlowGraph } from "@alkdev/flowgraph/graph";

// Create empty call graph
const callGraph = new FlowGraph<CallNodeAttrs, CallEdgeAttrs>();

// Subscribe to call protocol events
pubsub.subscribe("call.requested", (event) => callGraph.updateFromEvent(event));
pubsub.subscribe("call.responded", (event) => callGraph.updateFromEvent(event));
pubsub.subscribe("call.error", (event) => callGraph.updateFromEvent(event));
pubsub.subscribe("call.aborted", (event) => callGraph.updateFromEvent(event));
pubsub.subscribe("call.completed", (event) => callGraph.updateFromEvent(event));

// Query the call graph for observability
callGraph.filterByStatus("running");  // What's currently running
callGraph.children("req_abc123");      // Children of a call
callGraph.lineage("req_xyz789");        // Ancestor chain
callGraph.duration("req_abc123");       // How long a call took

Minimal Integration Example

For consumers that only need the operation graph and template validation (no reactive execution):

import { FlowGraph } from "@alkdev/flowgraph/graph";
import { h } from "@alkdev/ujsx";
import { Operation, Sequential } from "@alkdev/flowgraph/component";
import { validateTemplate, typeCompat } from "@alkdev/flowgraph/analysis";

// 1. Build operation graph
const operationGraph = FlowGraph.fromSpecs(registry.getAll());

// 2. Define and validate template
const template = h(Sequential, {},
  h(Operation, { name: "task.classify" }),
  h(Operation, { name: "task.enrich" }),
);
const errors = validateTemplate(template, operationGraph);

// 3. Query type compatibility
const result = typeCompat(
  registry.get("task.classify").outputSchema,
  registry.get("task.enrich").inputSchema,
);
console.log(result.compatible);  // → true or false
console.log(result.mismatches);  // → TypeMismatch[] if incompatible

This integration only requires @alkdev/flowgraph/graph, @alkdev/flowgraph/component, and @alkdev/flowgraph/analysis. No reactive execution, no ujsx HostConfig, no signals.

Module Dependency Map

┌─────────────────────────────────────────────────┐
│ Consumer (hub coordinator, OpenCode plugin)      │
└────────┬────────────────┬────────────────┬───────┘
         │                │                │
    ┌────▼────┐    ┌──────▼──────┐  ┌──────▼──────┐
    │  graph  │    │  component  │  │  analysis   │
    │         │    │              │  │              │
    │FlowGraph│    │Operation     │  │typeCompat    │
    │fromSpecs│    │Sequential    │  │validate      │
    │queries  │    │Parallel      │  │topological   │
    │mutations│    │Conditional   │  │parallelGroups│
    └────┬────┘    │Map           │  └──────┬──────┘
         │         └──────┬───────┘         │
         │                │                 │
    ┌────▼────────────────▼─────────────────▼─────┐
    │              schema                          │
    │  OperationNodeAttrs  CallNodeAttrs           │
    │  OperationEdgeAttrs  CallEdgeAttrs           │
    │  TemplateEdgeAttrs  NodeStatus  EdgeType     │
    └──────────────────┬──────────────────────────┘
                       │
    ┌──────────────────▼──────────────────────────┐
    │              host                             │
    │  GraphologyHostConfig  ReactiveHostConfig    │
    └──────────────────┬──────────────────────────┘
                       │
    ┌──────────────────▼──────────────────────────┐
    │              reactive                         │
    │  WorkflowReactiveRoot  WorkflowNode          │
    │  NodeStatus signals   computed preconditions │
    └──────────────────┬──────────────────────────┘
                       │
    ┌──────────────────▼──────────────────────────┐
    │              error                            │
    │  FlowgraphError hierarchy                     │
    └──────────────────────────────────────────────┘
    
    External dependencies:
    ┌────────────┐  ┌────────────┐  ┌──────────────┐
    │ graphology  │  │    ujsx    │  │@preact/sign  │
    │  graphology │  │  h, create │  │   als-core    │
    │    -dag     │  │   Root     │  │ signal,comp,  │
    └─────────────┘  └────────────┘  │   effect      │
                                     └──────────────┘

Common Patterns

Pattern: SDD Pipeline

// The archetypal SDD (Spec-Driven Development) pipeline
const sddPipeline = h(Sequential, {},
  h(Operation, { name: "task.architect" }),
  h(Conditional, {
    test: (results) => results["task.architect"].output.approved,
  },
    h(Sequential, {},
      h(Operation, { name: "task.decomposer" }),
      h(Operation, { name: "task.coordinator" }),
    ),
    // else-branch: architect disapproved, loop back or stop
    h(Operation, { name: "task.notify-stakeholder" }),
  ),
);

Pattern: Fan-Out/Fan-In

// Process items in parallel, then aggregate
const fanOut = h(Sequential, {},
  h(Operation, { name: "task.fetch-items" }),
  h(Map, {
    over: (results) => results["task.fetch-items"].output.items,
    as: "item",
  },
    h(Operation, { name: "task.process-item" }),
  ),
  h(Operation, { name: "task.aggregate-results" }),
);

Pattern: Error Boundary with Conditional

// Critical operation with graceful degradation
const withFallback = h(Sequential, {},
  h(Conditional, {
    test: (results) => results["task.fetch-data"].status !== "failed",
  },
    // Happy path
    h(Operation, { name: "task.transform" }),
    // Fallback
    h(Operation, { name: "task.use-cache" }),
  ),
  // This operation runs regardless — the Conditional resolves
  // whether the then or else branch was taken
  h(Operation, { name: "task.notify" }),
);

Constraints on Consumers

  • The hub coordinator drives execution — flowgraph provides reactive state (signals, computed), not call execution. The coordinator reads preconditions and blockedByFailure and calls registry.execute() when appropriate.
  • Dispose is mandatoryWorkflowReactiveRoot.dispose() must be called when the workflow completes or is cancelled. Without disposal, signal subscriptions leak.
  • Template rendering is currently one-shot — until the ujsx reconciler is implemented, createRoot(host, container).render(template) can only be called once per root. To re-render, create a new root.
  • Function props don't survive serializationConditional.test and Map.over with function values require runtime resolution. Use string references for stored templates.
  • Call graph is independent of reactive execution — you can build a call graph from events without using the reactive layer. The reactive layer is optional for consumers that only need observability.

References