# @alkdev/flowgraph DAG-based workflow orchestration over graphology, with ujsx template composition and reactive signal-driven execution. ## What This Does Flowgraph sits between `@alkdev/operations` (which defines *what can be called*) and `@alkdev/alkhub` (which records *what was called*). Flowgraph defines **how calls are orchestrated** — the structure, validation, and execution of workflows. Three conceptual graphs, each for a different purpose: 1. **Operation Graph** — static graph built from `OperationSpec`s at startup. Nodes are operations, edges are type-compatibility relationships. Enables cycle detection, topological ordering, and validation. 2. **Call Graph** — dynamic graph built from call protocol events at runtime. Nodes are call invocations with status/timestamps, edges are parent-child relationships. Enables abort cascading and observability. 3. **Workflow Template** — declarative ujsx tree defining a reusable workflow structure. A validated path through the operation graph, instantiated as a call graph at runtime. **The graph is the specification. The template is the authoring surface. The call graph is the execution record.** ## Installation ```bash npm install @alkdev/flowgraph ``` Peer dependency: `@alkdev/operations ^0.1.0` ## Quick Start ### Build an Operation Graph ```typescript import { FlowGraph } from "@alkdev/flowgraph/graph"; import type { OperationSpec } from "@alkdev/flowgraph/graph"; const specs: OperationSpec[] = [ { namespace: "task", name: "classify", type: "query", version: "1.0.0", inputSchema: {...}, outputSchema: {...} }, { namespace: "task", name: "enrich", type: "query", version: "1.0.0", inputSchema: {...}, outputSchema: {...} }, { namespace: "task", name: "summarize", type: "mutation", version: "1.0.0", inputSchema: {...}, outputSchema: {...} }, ]; const graph = FlowGraph.fromSpecs(specs); // Type-compatibility edges added automatically graph.hasEdge("task.classify", "task.enrich"); ``` ### Define a Workflow Template ```typescript import { h } from "@alkdev/ujsx"; import { Operation, Sequential, Parallel, Conditional } from "@alkdev/flowgraph/component"; const template = h(Sequential, {}, h(Operation, { name: "task.classify" }), h(Conditional, { test: (results) => results["task.classify"].output.confidence > 0.8, }, h(Parallel, {}, h(Operation, { name: "task.enrich" }), h(Operation, { name: "task.summarize" }), ), h(Operation, { name: "task.classify" }), ), ); ``` ### Validate the Template ```typescript import { validateTemplate } from "@alkdev/flowgraph/analysis"; const errors = validateTemplate(template, graph); if (errors.length > 0) { for (const error of errors) { console.error(`[${error.type}]`, error); } } ``` ### Populate a Call Graph from Events ```typescript import { FlowGraph } from "@alkdev/flowgraph/graph"; const callGraph = FlowGraph.fromCallEvents(eventArray); callGraph.filterByStatus("running"); callGraph.children("req_abc123"); callGraph.lineage("req_xyz789"); callGraph.duration("req_abc123"); ``` ### Drive Reactive Execution ```typescript import { WorkflowReactiveRoot } from "@alkdev/flowgraph/reactive"; const workflow = new WorkflowReactiveRoot(dag, { failurePolicy: "abort-dependents", }); // Append call protocol events — status derives reactively workflow.append({ type: "call.requested", requestId: "req_1", operationId: "task.classify", input: {}, timestamp: "..." }); workflow.append({ type: "call.responded", requestId: "req_1", output: { confidence: 0.95 }, timestamp: "..." }); // Read reactive state workflow.getStatus("task.enrich"); workflow.getResult("task.classify"); // Abort cascading workflow.abortAll(); workflow.dispose(); ``` ## Subpath Exports | Subpath | Purpose | Key Exports | |---------|---------|-------------| | `@alkdev/flowgraph` | Root — re-exports everything | All public types and functions | | `@alkdev/flowgraph/graph` | Core DAG class | `FlowGraph`, `FlowGraphOptions`, `OperationSpec`, `CallEventMapValue` | | `@alkdev/flowgraph/schema` | TypeBox schemas and types | `CallStatus`, `NodeStatus`, `EdgeType`, `OperationType`, `CallNodeAttrs`, `OperationNodeAttrs`, `OperationEdgeAttrs`, `CallEdgeAttrs`, `TemplateEdgeAttrs`, `CallResult`, `FlowGraphSerialized` | | `@alkdev/flowgraph/component` | ujsx workflow components | `Operation`, `Sequential`, `Parallel`, `Conditional`, `Map` | | `@alkdev/flowgraph/host` | Rendering backends | `GraphologyHostConfig`, `ReactiveHostConfig` | | `@alkdev/flowgraph/analysis` | Validation and analysis functions | `typeCompat`, `buildTypeEdges`, `validateGraph`, `validateSchema`, `validate`, `validateTemplate`, `validatePreconditions`, `topologicalOrder`, `parallelGroups`, `criticalPath`, `reachableFrom` | | `@alkdev/flowgraph/reactive` | Reactive execution engine | `WorkflowReactiveRoot`, `EventLogProjection`, `WorkflowNode`, `ReactiveContext`, `FailurePolicy`, `AggregateStatus` | | `@alkdev/flowgraph/error` | Error hierarchy | `FlowgraphError`, `ConstructionError`, `DuplicateNodeError`, `DuplicateEdgeError`, `NodeNotFoundError`, `CycleError`, `InvalidInputError`, `InvalidTransitionError` | ## Core API: FlowGraph Class `FlowGraph` wraps a graphology `DirectedGraph` and enforces DAG invariants. It delegates graph operations to graphology while adding flowgraph-specific construction, mutation, and query methods. ### Factory Methods ```typescript FlowGraph.fromSpecs(specs: OperationSpec[]): OperationGraph FlowGraph.fromCallEvents(events: CallEventMapValue[]): CallGraph FlowGraph.fromJSON(data: FlowGraphSerialized): FlowGraph ``` ### Node Operations ```typescript graph.addNode(key, attrs) // throws DuplicateNodeError graph.removeNode(key) // throws NodeNotFoundError graph.updateNode(key, partialAttrs) // throws NodeNotFoundError graph.hasNode(key): boolean graph.getNodeAttributes(key): NodeAttrs graph.forEachNode(callback): void ``` ### Edge Operations ```typescript graph.addEdge(source, target, attrs?) // throws NodeNotFoundError, DuplicateEdgeError, CycleError graph.removeEdge(source, target) // no-op if not found graph.hasEdge(source, target): boolean graph.getEdgeAttributes(source, target): EdgeAttrs graph.forEachEdge(callback): void ``` ### Traversal ```typescript graph.topologicalOrder(): string[] graph.ancestors(nodeId): string[] graph.descendants(nodeId): string[] graph.predecessors(nodeId): string[] graph.successors(nodeId): string[] graph.reachableFrom(nodeIds): Set graph.hasCycles(): boolean graph.findCycles(): string[][] ``` ### Call Graph Convenience ```typescript graph.addCall(attrs: CallNodeAttrs): void graph.addDependency(source, target): void graph.updateStatus(requestId, status, extra?): void // throws InvalidTransitionError graph.updateCall(requestId, partialAttrs): void graph.removeCall(requestId): void graph.updateFromEvent(event: CallEventMapValue): void graph.filterByStatus(status: CallStatus): string[] graph.getRoots(): string[] graph.children(requestId): string[] graph.duration(requestId): number graph.lineage(requestId): string[] ``` ### Serialization ```typescript graph.export(): FlowGraphSerialized graph.toJSON(): FlowGraphSerialized graph.toString(): string ``` ### Escape Hatch ```typescript graph.graph // → DirectedGraph (raw graphology instance) ``` Direct mutation via `graph.graph` bypasses flowgraph validation. Use with caution. ## Schema Enums | Enum | Values | |------|--------| | `CallStatus` | `pending`, `running`, `completed`, `failed`, `aborted` | | `NodeStatus` | `idle`, `waiting`, `ready`, `running`, `completed`, `failed`, `skipped`, `aborted` | | `EdgeType` | `triggered`, `depends_on`, `typed`, `sequential`, `conditional` | | `OperationType` | `query`, `mutation`, `subscription` | Call status transitions: `pending → running → completed|failed|aborted`. Terminal states are immutable. `InvalidTransitionError` is thrown on invalid transitions. ## Workflow Components | Component | Props | Behavior | |-----------|-------|----------| | `` | `name`, `input?`, `retries?`, `timeout?` | Declares an operation node in the workflow | | `` | `id?` | Children execute in order; edges are `sequential` | | `` | `id?`, `maxConcurrency?` | Children execute concurrently | | `` | `test`, `else?` | Branches on `test(results)`. Children = then-branch, `else` prop = else-branch | | `` | `over`, `as` | Iterates over `over` collection, binding each item as `as` variable | ## Analysis Functions ```typescript import { typeCompat, validateTemplate, topologicalOrder, parallelGroups, criticalPath } from "@alkdev/flowgraph/analysis"; typeCompat(outputSchema, inputSchema): TypeCompatResult | undefined validateTemplate(template, operationGraph): AnyValidationError[] topologicalOrder(graph): string[] parallelGroups(graph): string[][] // topological generations criticalPath(graph): string[] // longest path validateGraph(graph): GraphValidationError[] validateSchema(graph, schema): ValidationError[] validate(graph, schema): AnyValidationError[] // combined ``` ## Reactive Execution `WorkflowReactiveRoot` implements `EventLogProjection` — call protocol events are the source of truth, status/results are derived projections. ```typescript const workflow = new WorkflowReactiveRoot(dag, { failurePolicy: "abort-dependents", // or "continue-running" parallelGroups: { group1: { siblings: ["a", "b"], maxConcurrency: 2 } }, }); // Per-node reactive signals workflow.statusMap // Map> workflow.preconditions // Map> — all predecessors completed/skipped workflow.canStart // Map> — preconditions + concurrency workflow.blockedByFailure // Map> — any predecessor failed/aborted workflow.resultMap // Map> // Event-driven updates workflow.append(event: CallEventMapValue): void // Queries workflow.getStatus(nodeId): NodeStatus workflow.getResult(nodeId): CallResult | undefined workflow.isComplete(): boolean workflow.getAggregateStatus(): AggregateStatus // Lifecycle workflow.abortAll(): void workflow.abortNode(nodeId): void workflow.dispose(): void // mandatory cleanup — releases signal subscriptions ``` ## Error Hierarchy ``` FlowgraphError (base) ├── ConstructionError │ ├── DuplicateNodeError (readonly key) │ ├── DuplicateEdgeError (readonly source, target) │ ├── NodeNotFoundError (readonly key) │ ├── CycleError (readonly cycles: string[][]) │ └── InvalidInputError (readonly errors: ValidationError[]) └── InvalidTransitionError (readonly requestId, from, to) ``` ## Design Principles 1. **DAG-only, no cycles** — `addEdge()` rejects cycle-creating edges at mutation time (ADR-002). This differs from taskgraph, which allows cycles and detects them after the fact. 2. **Storage is decoupled** — flowgraph handles in-memory graph construction, validation, and analysis. Persistence is the caller's concern. `export()`/`fromJSON()` provides the serialization boundary. 3. **Template → DAG → Execution is a pipeline** — each representation serves a different phase and can exist independently. Validate a template without executing it. Build a call graph from events without a template. Run reactive execution directly from a DAG. 4. **Event log as source of truth** — call protocol events (`call.requested`, `call.responded`, `call.error`, `call.aborted`, `call.completed`) are the ground truth. Status, results, and the call graph are projections derived from the event log (ADR-005). 5. **Delegation, not inheritance** — `FlowGraph` wraps a graphology `DirectedGraph`, exposing a curated API. The raw graphology instance is available via the `.graph` escape hatch. ## For AI Agents When working with this library programmatically: - **Use subpath imports** — `@alkdev/flowgraph/graph`, `@alkdev/flowgraph/analysis`, etc. The root export re-exports everything but subpath imports make dependencies explicit. - **Always call `dispose()` on `WorkflowReactiveRoot`** — signal subscriptions leak without it. - **Function-valued props don't survive JSON serialization** — `Conditional.test` and `Map.over` with function values need runtime resolution. Use string references for stored templates. - **`fromSpecs()` graphs are conventionally immutable** — don't mutate operation graphs after construction. If the registry changes, rebuild via `fromSpecs()`. - **Call graph mutation uses event protocol** — use `updateFromEvent()` or `addCall()`/`updateStatus()`, not direct node mutation. - **`typeCompat()` returns `undefined` for `unknown`/`any` schemas** — this means "no meaningful check possible", not "incompatible". - **Architecture specs are in `docs/architecture/`** — detailed design decisions, ADRs, and open questions live there. This README is a surface-level guide. Consult the architecture docs for anything non-trivial. ## Dependencies | Package | Relationship | |---------|-------------| | `graphology` | Direct — the underlying directed graph data structure | | `graphology-dag` | Direct — topological sort, cycle detection, DAG traversal | | `@alkdev/ujsx` | Direct — `UNode` trees and `HostConfig` for workflow template rendering | | `@alkdev/typebox` | Direct — all schemas are TypeBox Modules | | `@preact/signals-core` | Direct — reactive state management for `WorkflowReactiveRoot` | | `@alkdev/operations` | **Peer** — provides `OperationSpec`, `OperationRegistry`, call event types | ## License Dual-licensed under [MIT](LICENSE-MIT) or [Apache-2.0](LICENSE-APACHE) at your option.