@alkdev/flowgraph

DAG-based workflow orchestration over graphology, with ujsx template composition and reactive signal-driven execution.

What This Does

Flowgraph sits between @alkdev/operations (which defines what can be called) and @alkdev/alkhub (which records what was called). Flowgraph defines how calls are orchestrated — the structure, validation, and execution of workflows.

Three conceptual graphs, each for a different purpose:

  1. Operation Graph — static graph built from OperationSpecs at startup. Nodes are operations, edges are type-compatibility relationships. Enables cycle detection, topological ordering, and validation.
  2. Call Graph — dynamic graph built from call protocol events at runtime. Nodes are call invocations with status/timestamps, edges are parent-child relationships. Enables abort cascading and observability.
  3. Workflow Template — declarative ujsx tree defining a reusable workflow structure. A validated path through the operation graph, instantiated as a call graph at runtime.

The graph is the specification. The template is the authoring surface. The call graph is the execution record.

Installation

npm install @alkdev/flowgraph

Peer dependency: @alkdev/operations ^0.1.0

Quick Start

Build an Operation Graph

import { FlowGraph } from "@alkdev/flowgraph/graph";
import type { OperationSpec } from "@alkdev/flowgraph/graph";

const specs: OperationSpec[] = [
  { namespace: "task", name: "classify", type: "query", version: "1.0.0", inputSchema: {...}, outputSchema: {...} },
  { namespace: "task", name: "enrich", type: "query", version: "1.0.0", inputSchema: {...}, outputSchema: {...} },
  { namespace: "task", name: "summarize", type: "mutation", version: "1.0.0", inputSchema: {...}, outputSchema: {...} },
];

const graph = FlowGraph.fromSpecs(specs);
// Type-compatibility edges added automatically
graph.hasEdge("task.classify", "task.enrich");

Define a Workflow Template

import { h } from "@alkdev/ujsx";
import { Operation, Sequential, Parallel, Conditional } from "@alkdev/flowgraph/component";

const template = h(Sequential, {},
  h(Operation, { name: "task.classify" }),
  h(Conditional, {
    test: (results) => results["task.classify"].output.confidence > 0.8,
  },
    h(Parallel, {},
      h(Operation, { name: "task.enrich" }),
      h(Operation, { name: "task.summarize" }),
    ),
    h(Operation, { name: "task.classify" }),
  ),
);

Validate the Template

import { validateTemplate } from "@alkdev/flowgraph/analysis";

const errors = validateTemplate(template, graph);
if (errors.length > 0) {
  for (const error of errors) {
    console.error(`[${error.type}]`, error);
  }
}

Populate a Call Graph from Events

import { FlowGraph } from "@alkdev/flowgraph/graph";

const callGraph = FlowGraph.fromCallEvents(eventArray);
callGraph.filterByStatus("running");
callGraph.children("req_abc123");
callGraph.lineage("req_xyz789");
callGraph.duration("req_abc123");

Drive Reactive Execution

import { WorkflowReactiveRoot } from "@alkdev/flowgraph/reactive";

const workflow = new WorkflowReactiveRoot(dag, {
  failurePolicy: "abort-dependents",
});

// Append call protocol events — status derives reactively
workflow.append({ type: "call.requested", requestId: "req_1", operationId: "task.classify", input: {}, timestamp: "..." });
workflow.append({ type: "call.responded", requestId: "req_1", output: { confidence: 0.95 }, timestamp: "..." });

// Read reactive state
workflow.getStatus("task.enrich");
workflow.getResult("task.classify");

// Abort cascading
workflow.abortAll();
workflow.dispose();

Subpath Exports

Subpath Purpose Key Exports
@alkdev/flowgraph Root — re-exports everything All public types and functions
@alkdev/flowgraph/graph Core DAG class FlowGraph, FlowGraphOptions, OperationSpec, CallEventMapValue
@alkdev/flowgraph/schema TypeBox schemas and types CallStatus, NodeStatus, EdgeType, OperationType, CallNodeAttrs, OperationNodeAttrs, OperationEdgeAttrs, CallEdgeAttrs, TemplateEdgeAttrs, CallResult, FlowGraphSerialized
@alkdev/flowgraph/component ujsx workflow components Operation, Sequential, Parallel, Conditional, Map
@alkdev/flowgraph/host Rendering backends GraphologyHostConfig, ReactiveHostConfig
@alkdev/flowgraph/analysis Validation and analysis functions typeCompat, buildTypeEdges, validateGraph, validateSchema, validate, validateTemplate, validatePreconditions, topologicalOrder, parallelGroups, criticalPath, reachableFrom
@alkdev/flowgraph/reactive Reactive execution engine WorkflowReactiveRoot, EventLogProjection, WorkflowNode, ReactiveContext, FailurePolicy, AggregateStatus
@alkdev/flowgraph/error Error hierarchy FlowgraphError, ConstructionError, DuplicateNodeError, DuplicateEdgeError, NodeNotFoundError, CycleError, InvalidInputError, InvalidTransitionError

Core API: FlowGraph Class

FlowGraph<NodeAttrs, EdgeAttrs> wraps a graphology DirectedGraph and enforces DAG invariants. It delegates graph operations to graphology while adding flowgraph-specific construction, mutation, and query methods.

Factory Methods

FlowGraph.fromSpecs(specs: OperationSpec[]): OperationGraph
FlowGraph.fromCallEvents(events: CallEventMapValue[]): CallGraph
FlowGraph.fromJSON(data: FlowGraphSerialized): FlowGraph

Node Operations

graph.addNode(key, attrs)            // throws DuplicateNodeError
graph.removeNode(key)                // throws NodeNotFoundError
graph.updateNode(key, partialAttrs)  // throws NodeNotFoundError
graph.hasNode(key): boolean
graph.getNodeAttributes(key): NodeAttrs
graph.forEachNode(callback): void

Edge Operations

graph.addEdge(source, target, attrs?) // throws NodeNotFoundError, DuplicateEdgeError, CycleError
graph.removeEdge(source, target)      // no-op if not found
graph.hasEdge(source, target): boolean
graph.getEdgeAttributes(source, target): EdgeAttrs
graph.forEachEdge(callback): void

Traversal

graph.topologicalOrder(): string[]
graph.ancestors(nodeId): string[]
graph.descendants(nodeId): string[]
graph.predecessors(nodeId): string[]
graph.successors(nodeId): string[]
graph.reachableFrom(nodeIds): Set<string>
graph.hasCycles(): boolean
graph.findCycles(): string[][]

Call Graph Convenience

graph.addCall(attrs: CallNodeAttrs): void
graph.addDependency(source, target): void
graph.updateStatus(requestId, status, extra?): void  // throws InvalidTransitionError
graph.updateCall(requestId, partialAttrs): void
graph.removeCall(requestId): void
graph.updateFromEvent(event: CallEventMapValue): void
graph.filterByStatus(status: CallStatus): string[]
graph.getRoots(): string[]
graph.children(requestId): string[]
graph.duration(requestId): number
graph.lineage(requestId): string[]

Serialization

graph.export(): FlowGraphSerialized
graph.toJSON(): FlowGraphSerialized
graph.toString(): string

Escape Hatch

graph.graph  // → DirectedGraph (raw graphology instance)

Direct mutation via graph.graph bypasses flowgraph validation. Use with caution.

Schema Enums

Enum Values
CallStatus pending, running, completed, failed, aborted
NodeStatus idle, waiting, ready, running, completed, failed, skipped, aborted
EdgeType triggered, depends_on, typed, sequential, conditional
OperationType query, mutation, subscription

Call status transitions: pending → running → completed|failed|aborted. Terminal states are immutable. InvalidTransitionError is thrown on invalid transitions.

Workflow Components

Component Props Behavior
<Operation> name, input?, retries?, timeout? Declares an operation node in the workflow
<Sequential> id? Children execute in order; edges are sequential
<Parallel> id?, maxConcurrency? Children execute concurrently
<Conditional> test, else? Branches on test(results). Children = then-branch, else prop = else-branch
<Map> over, as Iterates over over collection, binding each item as as variable

Analysis Functions

import { typeCompat, validateTemplate, topologicalOrder, parallelGroups, criticalPath } from "@alkdev/flowgraph/analysis";

typeCompat(outputSchema, inputSchema): TypeCompatResult | undefined
validateTemplate(template, operationGraph): AnyValidationError[]
topologicalOrder(graph): string[]
parallelGroups(graph): string[][]  // topological generations
criticalPath(graph): string[]     // longest path
validateGraph(graph): GraphValidationError[]
validateSchema(graph, schema): ValidationError[]
validate(graph, schema): AnyValidationError[]  // combined

Reactive Execution

WorkflowReactiveRoot implements EventLogProjection — call protocol events are the source of truth, status/results are derived projections.

const workflow = new WorkflowReactiveRoot(dag, {
  failurePolicy: "abort-dependents",  // or "continue-running"
  parallelGroups: { group1: { siblings: ["a", "b"], maxConcurrency: 2 } },
});

// Per-node reactive signals
workflow.statusMap      // Map<string, Signal<NodeStatus>>
workflow.preconditions  // Map<string, ReadonlySignal<boolean>> — all predecessors completed/skipped
workflow.canStart       // Map<string, ReadonlySignal<boolean>> — preconditions + concurrency
workflow.blockedByFailure // Map<string, ReadonlySignal<boolean>> — any predecessor failed/aborted
workflow.resultMap      // Map<string, ReadonlySignal<CallResult | undefined>>

// Event-driven updates
workflow.append(event: CallEventMapValue): void

// Queries
workflow.getStatus(nodeId): NodeStatus
workflow.getResult(nodeId): CallResult | undefined
workflow.isComplete(): boolean
workflow.getAggregateStatus(): AggregateStatus

// Lifecycle
workflow.abortAll(): void
workflow.abortNode(nodeId): void
workflow.dispose(): void  // mandatory cleanup — releases signal subscriptions

Error Hierarchy

FlowgraphError (base)
├── ConstructionError
│   ├── DuplicateNodeError    (readonly key)
│   ├── DuplicateEdgeError    (readonly source, target)
│   ├── NodeNotFoundError     (readonly key)
│   ├── CycleError           (readonly cycles: string[][])
│   └── InvalidInputError    (readonly errors: ValidationError[])
└── InvalidTransitionError   (readonly requestId, from, to)

Design Principles

  1. DAG-only, no cyclesaddEdge() rejects cycle-creating edges at mutation time (ADR-002). This differs from taskgraph, which allows cycles and detects them after the fact.

  2. Storage is decoupled — flowgraph handles in-memory graph construction, validation, and analysis. Persistence is the caller's concern. export()/fromJSON() provides the serialization boundary.

  3. Template → DAG → Execution is a pipeline — each representation serves a different phase and can exist independently. Validate a template without executing it. Build a call graph from events without a template. Run reactive execution directly from a DAG.

  4. Event log as source of truth — call protocol events (call.requested, call.responded, call.error, call.aborted, call.completed) are the ground truth. Status, results, and the call graph are projections derived from the event log (ADR-005).

  5. Delegation, not inheritanceFlowGraph wraps a graphology DirectedGraph, exposing a curated API. The raw graphology instance is available via the .graph escape hatch.

For AI Agents

When working with this library programmatically:

  • Use subpath imports@alkdev/flowgraph/graph, @alkdev/flowgraph/analysis, etc. The root export re-exports everything but subpath imports make dependencies explicit.
  • Always call dispose() on WorkflowReactiveRoot — signal subscriptions leak without it.
  • Function-valued props don't survive JSON serializationConditional.test and Map.over with function values need runtime resolution. Use string references for stored templates.
  • fromSpecs() graphs are conventionally immutable — don't mutate operation graphs after construction. If the registry changes, rebuild via fromSpecs().
  • Call graph mutation uses event protocol — use updateFromEvent() or addCall()/updateStatus(), not direct node mutation.
  • typeCompat() returns undefined for unknown/any schemas — this means "no meaningful check possible", not "incompatible".
  • Architecture specs are in docs/architecture/ — detailed design decisions, ADRs, and open questions live there. This README is a surface-level guide. Consult the architecture docs for anything non-trivial.

Dependencies

Package Relationship
graphology Direct — the underlying directed graph data structure
graphology-dag Direct — topological sort, cycle detection, DAG traversal
@alkdev/ujsx Direct — UNode trees and HostConfig for workflow template rendering
@alkdev/typebox Direct — all schemas are TypeBox Modules
@preact/signals-core Direct — reactive state management for WorkflowReactiveRoot
@alkdev/operations Peer — provides OperationSpec, OperationRegistry, call event types

License

Dual-licensed under MIT or Apache-2.0 at your option.

Description
No description provided
Readme 1 MiB
Languages
TypeScript 100%