@alkdev/flowgraph
DAG-based workflow orchestration over graphology, with ujsx template composition and reactive signal-driven execution.
What This Does
Flowgraph sits between @alkdev/operations (which defines what can be called) and @alkdev/alkhub (which records what was called). Flowgraph defines how calls are orchestrated — the structure, validation, and execution of workflows.
Three conceptual graphs, each for a different purpose:
- Operation Graph — static graph built from
OperationSpecs at startup. Nodes are operations, edges are type-compatibility relationships. Enables cycle detection, topological ordering, and validation. - Call Graph — dynamic graph built from call protocol events at runtime. Nodes are call invocations with status/timestamps, edges are parent-child relationships. Enables abort cascading and observability.
- Workflow Template — declarative ujsx tree defining a reusable workflow structure. A validated path through the operation graph, instantiated as a call graph at runtime.
The graph is the specification. The template is the authoring surface. The call graph is the execution record.
Installation
npm install @alkdev/flowgraph
Peer dependency: @alkdev/operations ^0.1.0
Quick Start
Build an Operation Graph
import { FlowGraph } from "@alkdev/flowgraph/graph";
import type { OperationSpec } from "@alkdev/flowgraph/graph";
const specs: OperationSpec[] = [
{ namespace: "task", name: "classify", type: "query", version: "1.0.0", inputSchema: {...}, outputSchema: {...} },
{ namespace: "task", name: "enrich", type: "query", version: "1.0.0", inputSchema: {...}, outputSchema: {...} },
{ namespace: "task", name: "summarize", type: "mutation", version: "1.0.0", inputSchema: {...}, outputSchema: {...} },
];
const graph = FlowGraph.fromSpecs(specs);
// Type-compatibility edges added automatically
graph.hasEdge("task.classify", "task.enrich");
Define a Workflow Template
import { h } from "@alkdev/ujsx";
import { Operation, Sequential, Parallel, Conditional } from "@alkdev/flowgraph/component";
const template = h(Sequential, {},
h(Operation, { name: "task.classify" }),
h(Conditional, {
test: (results) => results["task.classify"].output.confidence > 0.8,
},
h(Parallel, {},
h(Operation, { name: "task.enrich" }),
h(Operation, { name: "task.summarize" }),
),
h(Operation, { name: "task.classify" }),
),
);
Validate the Template
import { validateTemplate } from "@alkdev/flowgraph/analysis";
const errors = validateTemplate(template, graph);
if (errors.length > 0) {
for (const error of errors) {
console.error(`[${error.type}]`, error);
}
}
Populate a Call Graph from Events
import { FlowGraph } from "@alkdev/flowgraph/graph";
const callGraph = FlowGraph.fromCallEvents(eventArray);
callGraph.filterByStatus("running");
callGraph.children("req_abc123");
callGraph.lineage("req_xyz789");
callGraph.duration("req_abc123");
Drive Reactive Execution
import { WorkflowReactiveRoot } from "@alkdev/flowgraph/reactive";
const workflow = new WorkflowReactiveRoot(dag, {
failurePolicy: "abort-dependents",
});
// Append call protocol events — status derives reactively
workflow.append({ type: "call.requested", requestId: "req_1", operationId: "task.classify", input: {}, timestamp: "..." });
workflow.append({ type: "call.responded", requestId: "req_1", output: { confidence: 0.95 }, timestamp: "..." });
// Read reactive state
workflow.getStatus("task.enrich");
workflow.getResult("task.classify");
// Abort cascading
workflow.abortAll();
workflow.dispose();
Subpath Exports
| Subpath | Purpose | Key Exports |
|---|---|---|
@alkdev/flowgraph |
Root — re-exports everything | All public types and functions |
@alkdev/flowgraph/graph |
Core DAG class | FlowGraph, FlowGraphOptions, OperationSpec, CallEventMapValue |
@alkdev/flowgraph/schema |
TypeBox schemas and types | CallStatus, NodeStatus, EdgeType, OperationType, CallNodeAttrs, OperationNodeAttrs, OperationEdgeAttrs, CallEdgeAttrs, TemplateEdgeAttrs, CallResult, FlowGraphSerialized |
@alkdev/flowgraph/component |
ujsx workflow components | Operation, Sequential, Parallel, Conditional, Map |
@alkdev/flowgraph/host |
Rendering backends | GraphologyHostConfig, ReactiveHostConfig |
@alkdev/flowgraph/analysis |
Validation and analysis functions | typeCompat, buildTypeEdges, validateGraph, validateSchema, validate, validateTemplate, validatePreconditions, topologicalOrder, parallelGroups, criticalPath, reachableFrom |
@alkdev/flowgraph/reactive |
Reactive execution engine | WorkflowReactiveRoot, EventLogProjection, WorkflowNode, ReactiveContext, FailurePolicy, AggregateStatus |
@alkdev/flowgraph/error |
Error hierarchy | FlowgraphError, ConstructionError, DuplicateNodeError, DuplicateEdgeError, NodeNotFoundError, CycleError, InvalidInputError, InvalidTransitionError |
Core API: FlowGraph Class
FlowGraph<NodeAttrs, EdgeAttrs> wraps a graphology DirectedGraph and enforces DAG invariants. It delegates graph operations to graphology while adding flowgraph-specific construction, mutation, and query methods.
Factory Methods
FlowGraph.fromSpecs(specs: OperationSpec[]): OperationGraph
FlowGraph.fromCallEvents(events: CallEventMapValue[]): CallGraph
FlowGraph.fromJSON(data: FlowGraphSerialized): FlowGraph
Node Operations
graph.addNode(key, attrs) // throws DuplicateNodeError
graph.removeNode(key) // throws NodeNotFoundError
graph.updateNode(key, partialAttrs) // throws NodeNotFoundError
graph.hasNode(key): boolean
graph.getNodeAttributes(key): NodeAttrs
graph.forEachNode(callback): void
Edge Operations
graph.addEdge(source, target, attrs?) // throws NodeNotFoundError, DuplicateEdgeError, CycleError
graph.removeEdge(source, target) // no-op if not found
graph.hasEdge(source, target): boolean
graph.getEdgeAttributes(source, target): EdgeAttrs
graph.forEachEdge(callback): void
Traversal
graph.topologicalOrder(): string[]
graph.ancestors(nodeId): string[]
graph.descendants(nodeId): string[]
graph.predecessors(nodeId): string[]
graph.successors(nodeId): string[]
graph.reachableFrom(nodeIds): Set<string>
graph.hasCycles(): boolean
graph.findCycles(): string[][]
Call Graph Convenience
graph.addCall(attrs: CallNodeAttrs): void
graph.addDependency(source, target): void
graph.updateStatus(requestId, status, extra?): void // throws InvalidTransitionError
graph.updateCall(requestId, partialAttrs): void
graph.removeCall(requestId): void
graph.updateFromEvent(event: CallEventMapValue): void
graph.filterByStatus(status: CallStatus): string[]
graph.getRoots(): string[]
graph.children(requestId): string[]
graph.duration(requestId): number
graph.lineage(requestId): string[]
Serialization
graph.export(): FlowGraphSerialized
graph.toJSON(): FlowGraphSerialized
graph.toString(): string
Escape Hatch
graph.graph // → DirectedGraph (raw graphology instance)
Direct mutation via graph.graph bypasses flowgraph validation. Use with caution.
Schema Enums
| Enum | Values |
|---|---|
CallStatus |
pending, running, completed, failed, aborted |
NodeStatus |
idle, waiting, ready, running, completed, failed, skipped, aborted |
EdgeType |
triggered, depends_on, typed, sequential, conditional |
OperationType |
query, mutation, subscription |
Call status transitions: pending → running → completed|failed|aborted. Terminal states are immutable. InvalidTransitionError is thrown on invalid transitions.
Workflow Components
| Component | Props | Behavior |
|---|---|---|
<Operation> |
name, input?, retries?, timeout? |
Declares an operation node in the workflow |
<Sequential> |
id? |
Children execute in order; edges are sequential |
<Parallel> |
id?, maxConcurrency? |
Children execute concurrently |
<Conditional> |
test, else? |
Branches on test(results). Children = then-branch, else prop = else-branch |
<Map> |
over, as |
Iterates over over collection, binding each item as as variable |
Analysis Functions
import { typeCompat, validateTemplate, topologicalOrder, parallelGroups, criticalPath } from "@alkdev/flowgraph/analysis";
typeCompat(outputSchema, inputSchema): TypeCompatResult | undefined
validateTemplate(template, operationGraph): AnyValidationError[]
topologicalOrder(graph): string[]
parallelGroups(graph): string[][] // topological generations
criticalPath(graph): string[] // longest path
validateGraph(graph): GraphValidationError[]
validateSchema(graph, schema): ValidationError[]
validate(graph, schema): AnyValidationError[] // combined
Reactive Execution
WorkflowReactiveRoot implements EventLogProjection — call protocol events are the source of truth, status/results are derived projections.
const workflow = new WorkflowReactiveRoot(dag, {
failurePolicy: "abort-dependents", // or "continue-running"
parallelGroups: { group1: { siblings: ["a", "b"], maxConcurrency: 2 } },
});
// Per-node reactive signals
workflow.statusMap // Map<string, Signal<NodeStatus>>
workflow.preconditions // Map<string, ReadonlySignal<boolean>> — all predecessors completed/skipped
workflow.canStart // Map<string, ReadonlySignal<boolean>> — preconditions + concurrency
workflow.blockedByFailure // Map<string, ReadonlySignal<boolean>> — any predecessor failed/aborted
workflow.resultMap // Map<string, ReadonlySignal<CallResult | undefined>>
// Event-driven updates
workflow.append(event: CallEventMapValue): void
// Queries
workflow.getStatus(nodeId): NodeStatus
workflow.getResult(nodeId): CallResult | undefined
workflow.isComplete(): boolean
workflow.getAggregateStatus(): AggregateStatus
// Lifecycle
workflow.abortAll(): void
workflow.abortNode(nodeId): void
workflow.dispose(): void // mandatory cleanup — releases signal subscriptions
Error Hierarchy
FlowgraphError (base)
├── ConstructionError
│ ├── DuplicateNodeError (readonly key)
│ ├── DuplicateEdgeError (readonly source, target)
│ ├── NodeNotFoundError (readonly key)
│ ├── CycleError (readonly cycles: string[][])
│ └── InvalidInputError (readonly errors: ValidationError[])
└── InvalidTransitionError (readonly requestId, from, to)
Design Principles
-
DAG-only, no cycles —
addEdge()rejects cycle-creating edges at mutation time (ADR-002). This differs from taskgraph, which allows cycles and detects them after the fact. -
Storage is decoupled — flowgraph handles in-memory graph construction, validation, and analysis. Persistence is the caller's concern.
export()/fromJSON()provides the serialization boundary. -
Template → DAG → Execution is a pipeline — each representation serves a different phase and can exist independently. Validate a template without executing it. Build a call graph from events without a template. Run reactive execution directly from a DAG.
-
Event log as source of truth — call protocol events (
call.requested,call.responded,call.error,call.aborted,call.completed) are the ground truth. Status, results, and the call graph are projections derived from the event log (ADR-005). -
Delegation, not inheritance —
FlowGraphwraps a graphologyDirectedGraph, exposing a curated API. The raw graphology instance is available via the.graphescape hatch.
For AI Agents
When working with this library programmatically:
- Use subpath imports —
@alkdev/flowgraph/graph,@alkdev/flowgraph/analysis, etc. The root export re-exports everything but subpath imports make dependencies explicit. - Always call
dispose()onWorkflowReactiveRoot— signal subscriptions leak without it. - Function-valued props don't survive JSON serialization —
Conditional.testandMap.overwith function values need runtime resolution. Use string references for stored templates. fromSpecs()graphs are conventionally immutable — don't mutate operation graphs after construction. If the registry changes, rebuild viafromSpecs().- Call graph mutation uses event protocol — use
updateFromEvent()oraddCall()/updateStatus(), not direct node mutation. typeCompat()returnsundefinedforunknown/anyschemas — this means "no meaningful check possible", not "incompatible".- Architecture specs are in
docs/architecture/— detailed design decisions, ADRs, and open questions live there. This README is a surface-level guide. Consult the architecture docs for anything non-trivial.
Dependencies
| Package | Relationship |
|---|---|
graphology |
Direct — the underlying directed graph data structure |
graphology-dag |
Direct — topological sort, cycle detection, DAG traversal |
@alkdev/ujsx |
Direct — UNode trees and HostConfig for workflow template rendering |
@alkdev/typebox |
Direct — all schemas are TypeBox Modules |
@preact/signals-core |
Direct — reactive state management for WorkflowReactiveRoot |
@alkdev/operations |
Peer — provides OperationSpec, OperationRegistry, call event types |
License
Dual-licensed under MIT or Apache-2.0 at your option.