31 Commits

Author SHA1 Message Date
058b437c3f fix: add ./ prefix to subpath export keys in package.json 2026-05-22 06:26:56 +00:00
6fb633c05b chore: add LICENSE, README, update package.json and AGENTS.md for npm release prep 2026-05-22 06:17:54 +00:00
e45b8c0cc0 chore: mark final review/complete-library task completed — all 38/38 tasks done 2026-05-21 22:56:22 +00:00
fab0b64f0b chore: update api/public-exports task status to completed 2026-05-21 22:55:56 +00:00
dd96ceb4f8 feat: wire all barrel exports, resolve conflicts, add 8 sub-path import tests 2026-05-21 22:55:37 +00:00
4fcd544261 chore: update review/reactive-and-hosts task status to completed 2026-05-21 22:40:57 +00:00
1233266ffd chore: update meta-reactive-layer task status to completed 2026-05-21 22:31:22 +00:00
b297203a5d chore: update max-concurrency task status, spawn next generation 2026-05-21 22:29:25 +00:00
4df8698a3d Merge branch 'feat/reactive/max-concurrency' 2026-05-21 22:29:05 +00:00
1a12410229 feat: implement maxConcurrency reactive counting semaphore with 20 unit tests 2026-05-21 22:29:02 +00:00
72356b9758 chore: update host-reactive task status to completed 2026-05-21 22:21:00 +00:00
793b6e513f Merge branch 'feat/host/reactive' 2026-05-21 22:20:33 +00:00
8b7f4f985a feat: implement ReactiveHostConfig with WorkflowNode, ReactiveContext, precondition computation and 34 integration tests 2026-05-21 22:20:30 +00:00
a72c3173e7 chore: update review-foundation task status 2026-05-21 22:18:37 +00:00
ee3da90b63 chore: update task statuses completed 2026-05-21 22:18:03 +00:00
270bd7cd69 chore: merge reactive-node-status, update retry-semantics task status 2026-05-21 22:17:41 +00:00
e8b62c0c58 Merge branch 'feat/reactive-node-status' 2026-05-21 22:17:29 +00:00
000a1e04c5 feat: add retry semantics tests with requestIdToNodeKey reverse map 2026-05-21 22:17:11 +00:00
e98204161d feat: implement node status signal management with computed preconditions and blockedByFailure
- Add computePreconditions and computeBlockedByFailure functions to node-status.ts
- Add registerStartEffect and registerAbortEffect for automatic state transitions
- Start effect: idle/waiting -> ready when preconditions met
- Abort effect: idle/waiting -> aborted when blockedByFailure true
- Refactor WorkflowReactiveRoot to use node-status.ts functions
- Root nodes auto-transition from idle to ready (no predecessors = preconditions true)
- Add AbortEffectOptions with abortDependents policy support
- Add comprehensive unit tests for all precondition and failure isolation scenarios
2026-05-21 22:16:46 +00:00
4cf644f734 Merge branch 'feat/reactive-retry-semantics' 2026-05-21 22:13:33 +00:00
d63a301cea Implement retry semantics: requestIdToNodeKey reverse map, setRequestId method, full-history getEvents 2026-05-21 22:10:31 +00:00
18999fb38e chore: update task statuses to completed 2026-05-21 22:09:10 +00:00
7a4e430aa9 Merge branch 'feat/analysis-build-type-edges'
# Conflicts:
#	src/analysis/index.ts
#	src/graph/construction.ts
#	src/index.ts
2026-05-21 22:08:42 +00:00
c7c6d13d6b Merge branch 'feat/analysis-template-validation' 2026-05-21 22:07:11 +00:00
c57a8558c7 Merge branch 'feat/graph-construction-call'
# Conflicts:
#	src/graph/construction.ts
#	test/graph/construction.test.ts
2026-05-21 22:06:51 +00:00
fa2223b90b feat: move buildTypeEdges to src/analysis/type-compat.ts as standalone function
Relocate buildTypeEdges from construction.ts to type-compat.ts per architecture spec.
construction.ts re-exports it for backward compatibility. Add 10 unit tests for
buildTypeEdges covering compatible edges, incompatible edges with mismatches,
unknown schema passthrough, incremental construction, and edge deduplication.
2026-05-21 22:06:26 +00:00
2e0350e87c feat: implement call graph construction methods (fromCallEvents, updateFromEvent, addCall, addDependency, updateStatus, updateCall, removeCall) 2026-05-21 22:04:28 +00:00
d7fea47214 Merge branch 'feat/analysis-ordering' 2026-05-21 22:03:19 +00:00
67907dc0f3 Implement validatePreconditions and validateTemplate for template validation 2026-05-21 22:02:31 +00:00
3b52998f20 chore: update task graph/construction-json status to completed 2026-05-21 21:56:43 +00:00
5cfc8882bd Implement fromJSON, export, toJSON, toString serialization for FlowGraph 2026-05-21 21:51:24 +00:00
43 changed files with 4682 additions and 266 deletions

114
AGENTS.md
View File

@@ -1,60 +1,86 @@
## Memory Tools (via @alkdev/open-memory plugin) # @alkdev/flowgraph — Agent Guide
You have access to two tools for managing your context and accessing session history: Project-specific context for AI agents working on this codebase.
### memory({tool: "...", args: {...}}) ## What This Is
Read-only tool for introspecting your session history and context state. Available operations: DAG-based workflow orchestration library. Wraps graphology `DirectedGraph`, enforces DAG invariants, provides ujsx template composition and reactive signal-driven execution. Sits between `@alkdev/operations` (what can be called) and `@alkdev/alkhub` (what was called) — defines *how calls are orchestrated*.
- `memory({tool: "help"})` — full reference with examples
- `memory({tool: "summary"})` — quick counts of projects, sessions, messages, todos
- `memory({tool: "sessions"})` — list recent sessions (useful for finding past work)
- `memory({tool: "children", args: {sessionId: "ses_..."}})` — list sub-agent sessions spawned from a parent
- `memory({tool: "messages", args: {sessionId: "..."}})` — read a session's conversation
- `memory({tool: "messages", args: {sessionId: "...", role: "assistant"}})` — read only assistant messages
- `memory({tool: "messages", args: {sessionId: "...", showTools: true}})` — include tool-call output
- `memory({tool: "message", args: {messageId: "msg_..."}})` — read a single message by ID
- `memory({tool: "search", args: {query: "..."}})` — search across all conversations
- `memory({tool: "compactions", args: {sessionId: "..."}})` — view compaction checkpoints
- `memory({tool: "context"})` — check your current context window usage
### memory_compact() ## Build & Test Commands
Trigger compaction on the current session. This summarizes the conversation so far to free context space. ```bash
npm run build # tsup (ESM + CJS + dts + sourcemaps)
npm run build:tsc # tsc type-check only
npm run lint # tsc --noEmit
npm run test # vitest run
npm run test:watch # vitest watch
npm run test:coverage # vitest run --coverage
```
**When to use memory_compact:** Always run `npm run lint && npm run test` after making changes.
- When context is above 80% (check with `memory({tool: "context"})`)
- When you notice you're losing track of earlier conversation details
- At natural breakpoints in multi-step tasks (after completing a subtask, before starting a new one)
- When the system prompt shows a yellow/red/critical context warning
- Proactively, rather than waiting for automatic compaction at 92%
**When NOT to use memory_compact:** ## Source Structure
- When context is below 50% (it wastes a compaction cycle)
- In the middle of a complex edit that you need immediate context for
- When the task is nearly complete (just finish the task instead)
Compaction preserves your most important context in a structured summary — you will continue the session with the summary as your starting point. ```
src/
index.ts # Barrel — re-exports all sub-modules
component/ # ujsx workflow components (Operation, Sequential, Parallel, Conditional, Map)
host/ # HostConfig implementations (GraphologyHostConfig, ReactiveHostConfig)
schema/ # TypeBox schemas, enums, node/edge attribute types
graph/ # FlowGraph class (construction, mutation, query, serialization)
reactive/ # WorkflowReactiveRoot, signal-driven status, event log projection
analysis/ # Pure functions: typeCompat, validate, topologicalOrder, parallelGroups, criticalPath
error/ # FlowgraphError hierarchy
```
## Worktree Tool (via @alkimiadev/open-coordinator plugin) ## Subpath Exports
You have access to the `worktree` tool for git worktree management and session coordination. Call with `{action, args}`: The package has 8 export subpaths. Root `@alkdev/flowgraph` re-exports everything. Subpath imports make dependencies explicit:
### Coordinator Operations (available when session is not spawned by another session) | Subpath | Key Exports |
|---------|-------------|
| `/graph` | `FlowGraph`, `FlowGraphOptions`, `OperationSpec`, `CallEventMapValue` |
| `/schema` | `CallStatus`, `NodeStatus`, `EdgeType`, `OperationType`, all node/edge attribute types |
| `/component` | `Operation`, `Sequential`, `Parallel`, `Conditional`, `Map` |
| `/host` | `GraphologyHostConfig`, `ReactiveHostConfig` |
| `/analysis` | `typeCompat`, `buildTypeEdges`, `validateGraph`, `validateSchema`, `validate`, `validateTemplate`, `topologicalOrder`, `parallelGroups`, `criticalPath` |
| `/reactive` | `WorkflowReactiveRoot`, `EventLogProjection`, `WorkflowNode`, `FailurePolicy` |
| `/error` | `FlowgraphError`, `ConstructionError`, `CycleError`, `InvalidInputError`, `InvalidTransitionError` |
- `worktree({action: "list"})` — List git worktrees ## Key Patterns
- `worktree({action: "dashboard"})` — Worktree dashboard with session info
- `worktree({action: "spawn", args: {tasks: [...], prompt: "..."}})` — Spawn parallel worktrees + sessions
- `worktree({action: "sessions"})` — Query spawned session status
- `worktree({action: "message", args: {sessionID: "...", message: "..."}})` — Message a session
- `worktree({action: "abort", args: {sessionID: "..."}})` — Abort a session
- `worktree({action: "cleanup", args: {action: "remove", pathOrBranch: "..."}})` — Remove worktree
- `worktree({action: "help"})` — Show all available operations
### Implementation Operations (available when session is spawned by a coordinator) - **TypeBox schema + Static type pairs**: Every schema is exported as both a const (runtime) and an inferred type. `const FooSchema = Type.Object({...}); type Foo = Static<typeof FooSchema>;`
- **Delegation model**: `FlowGraph` wraps a graphology `DirectedGraph` (does not extend it). `flowGraph.graph` is an escape hatch that bypasses validation.
- **DAG enforcement**: `addEdge()` throws `CycleError` if the edge would create a cycle. `fromJSON()` validates DAG invariants on deserialization.
- **Event log as source of truth**: Call protocol events (`call.requested`, `call.responded`, `call.error`, `call.aborted`, `call.completed`) are appended to `WorkflowReactiveRoot`. Status/results are derived projections.
- **Signal-driven execution**: `@preact/signals-core` powers `WorkflowReactiveRoot`. `preconditions`, `canStart`, `blockedByFailure` are `ReadonlySignal<boolean>` computed from predecessor status.
- **`dispose()` is mandatory**: `WorkflowReactiveRoot.dispose()` must be called to release signal subscriptions.
- `worktree({action: "current"})` — Show your worktree mapping ## Architecture Docs
- `worktree({action: "notify", args: {message: "...", level: "info|blocking"}})` — Report to coordinator
- `worktree({action: "status"})` — Show worktree git status
The plugin auto-injects `workdir` for bash commands when the session is mapped to a worktree. `docs/architecture/` contains detailed specs (all `status: reviewed`):
- `README.md` — overview, relationship to sibling packages, design decisions
- `flowgraph-api.md` — FlowGraph class full API
- `consumer-integration.md` — end-to-end integration walkthrough (5 phases)
- `schema.md` — TypeBox Module, all node/edge attribute schemas
- `operation-graph.md` — static graph from OperationSpecs
- `call-graph.md` — dynamic graph from call events
- `workflow-templates.md` — ujsx components, composition rules, template→DAG hydration
- `host-configs.md` — GraphologyHostConfig, ReactiveHostConfig
- `reactive-execution.md` — signal-driven status propagation, lifecycle, error boundaries
- `analysis.md` — type-compatibility checking, precondition validation, execution ordering
- `error-handling.md` — FlowgraphError hierarchy
- `build-distribution.md` — package structure, exports map
- `decisions/` — ADRs 001006
Consult these for anything non-trivial. The README is surface-level; architecture docs are the specification.
## Constraints for Agent Modifications
- **Never add cycles** — this is a DAG-only library. Any edge that would create a cycle must throw `CycleError`.
- **Never mutate operation graphs after construction** — `fromSpecs()` graphs are conventionally immutable.
- **Keep schema as pure data** — `src/schema/` contains TypeBox schemas and types only, no runtime logic.
- **Keep analysis as pure functions** — `src/analysis/` functions take a `FlowGraph` or `DirectedGraph` as input and return results without side effects.
- **Maintain the delegation model** — `FlowGraph` delegates to graphology. Don't expose raw graphology methods that could violate DAG invariants without explicit validation.
- **tsup builds all subpath entries**`tsup.config.ts` lists every subpath. If you add a new top-level module, add the entry there and update `package.json` exports.

6
LICENSE Normal file
View File

@@ -0,0 +1,6 @@
This project is dual-licensed under either of the following:
- MIT License (LICENSE-MIT)
- Apache License 2.0 (LICENSE-APACHE)
You may choose either license at your option.

190
LICENSE-APACHE Normal file
View File

@@ -0,0 +1,190 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to the Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by the Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable by
such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding any notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed as
modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limitation damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
Copyright 2026 alkdev
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

21
LICENSE-MIT Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2026 alkdev
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

320
README.md Normal file
View File

@@ -0,0 +1,320 @@
# @alkdev/flowgraph
DAG-based workflow orchestration over graphology, with ujsx template composition and reactive signal-driven execution.
## What This Does
Flowgraph sits between `@alkdev/operations` (which defines *what can be called*) and `@alkdev/alkhub` (which records *what was called*). Flowgraph defines **how calls are orchestrated** — the structure, validation, and execution of workflows.
Three conceptual graphs, each for a different purpose:
1. **Operation Graph** — static graph built from `OperationSpec`s at startup. Nodes are operations, edges are type-compatibility relationships. Enables cycle detection, topological ordering, and validation.
2. **Call Graph** — dynamic graph built from call protocol events at runtime. Nodes are call invocations with status/timestamps, edges are parent-child relationships. Enables abort cascading and observability.
3. **Workflow Template** — declarative ujsx tree defining a reusable workflow structure. A validated path through the operation graph, instantiated as a call graph at runtime.
**The graph is the specification. The template is the authoring surface. The call graph is the execution record.**
## Installation
```bash
npm install @alkdev/flowgraph
```
Peer dependency: `@alkdev/operations ^0.1.0`
## Quick Start
### Build an Operation Graph
```typescript
import { FlowGraph } from "@alkdev/flowgraph/graph";
import type { OperationSpec } from "@alkdev/flowgraph/graph";
const specs: OperationSpec[] = [
{ namespace: "task", name: "classify", type: "query", version: "1.0.0", inputSchema: {...}, outputSchema: {...} },
{ namespace: "task", name: "enrich", type: "query", version: "1.0.0", inputSchema: {...}, outputSchema: {...} },
{ namespace: "task", name: "summarize", type: "mutation", version: "1.0.0", inputSchema: {...}, outputSchema: {...} },
];
const graph = FlowGraph.fromSpecs(specs);
// Type-compatibility edges added automatically
graph.hasEdge("task.classify", "task.enrich");
```
### Define a Workflow Template
```typescript
import { h } from "@alkdev/ujsx";
import { Operation, Sequential, Parallel, Conditional } from "@alkdev/flowgraph/component";
const template = h(Sequential, {},
h(Operation, { name: "task.classify" }),
h(Conditional, {
test: (results) => results["task.classify"].output.confidence > 0.8,
},
h(Parallel, {},
h(Operation, { name: "task.enrich" }),
h(Operation, { name: "task.summarize" }),
),
h(Operation, { name: "task.classify" }),
),
);
```
### Validate the Template
```typescript
import { validateTemplate } from "@alkdev/flowgraph/analysis";
const errors = validateTemplate(template, graph);
if (errors.length > 0) {
for (const error of errors) {
console.error(`[${error.type}]`, error);
}
}
```
### Populate a Call Graph from Events
```typescript
import { FlowGraph } from "@alkdev/flowgraph/graph";
const callGraph = FlowGraph.fromCallEvents(eventArray);
callGraph.filterByStatus("running");
callGraph.children("req_abc123");
callGraph.lineage("req_xyz789");
callGraph.duration("req_abc123");
```
### Drive Reactive Execution
```typescript
import { WorkflowReactiveRoot } from "@alkdev/flowgraph/reactive";
const workflow = new WorkflowReactiveRoot(dag, {
failurePolicy: "abort-dependents",
});
// Append call protocol events — status derives reactively
workflow.append({ type: "call.requested", requestId: "req_1", operationId: "task.classify", input: {}, timestamp: "..." });
workflow.append({ type: "call.responded", requestId: "req_1", output: { confidence: 0.95 }, timestamp: "..." });
// Read reactive state
workflow.getStatus("task.enrich");
workflow.getResult("task.classify");
// Abort cascading
workflow.abortAll();
workflow.dispose();
```
## Subpath Exports
| Subpath | Purpose | Key Exports |
|---------|---------|-------------|
| `@alkdev/flowgraph` | Root — re-exports everything | All public types and functions |
| `@alkdev/flowgraph/graph` | Core DAG class | `FlowGraph`, `FlowGraphOptions`, `OperationSpec`, `CallEventMapValue` |
| `@alkdev/flowgraph/schema` | TypeBox schemas and types | `CallStatus`, `NodeStatus`, `EdgeType`, `OperationType`, `CallNodeAttrs`, `OperationNodeAttrs`, `OperationEdgeAttrs`, `CallEdgeAttrs`, `TemplateEdgeAttrs`, `CallResult`, `FlowGraphSerialized` |
| `@alkdev/flowgraph/component` | ujsx workflow components | `Operation`, `Sequential`, `Parallel`, `Conditional`, `Map` |
| `@alkdev/flowgraph/host` | Rendering backends | `GraphologyHostConfig`, `ReactiveHostConfig` |
| `@alkdev/flowgraph/analysis` | Validation and analysis functions | `typeCompat`, `buildTypeEdges`, `validateGraph`, `validateSchema`, `validate`, `validateTemplate`, `validatePreconditions`, `topologicalOrder`, `parallelGroups`, `criticalPath`, `reachableFrom` |
| `@alkdev/flowgraph/reactive` | Reactive execution engine | `WorkflowReactiveRoot`, `EventLogProjection`, `WorkflowNode`, `ReactiveContext`, `FailurePolicy`, `AggregateStatus` |
| `@alkdev/flowgraph/error` | Error hierarchy | `FlowgraphError`, `ConstructionError`, `DuplicateNodeError`, `DuplicateEdgeError`, `NodeNotFoundError`, `CycleError`, `InvalidInputError`, `InvalidTransitionError` |
## Core API: FlowGraph Class
`FlowGraph<NodeAttrs, EdgeAttrs>` wraps a graphology `DirectedGraph` and enforces DAG invariants. It delegates graph operations to graphology while adding flowgraph-specific construction, mutation, and query methods.
### Factory Methods
```typescript
FlowGraph.fromSpecs(specs: OperationSpec[]): OperationGraph
FlowGraph.fromCallEvents(events: CallEventMapValue[]): CallGraph
FlowGraph.fromJSON(data: FlowGraphSerialized): FlowGraph
```
### Node Operations
```typescript
graph.addNode(key, attrs) // throws DuplicateNodeError
graph.removeNode(key) // throws NodeNotFoundError
graph.updateNode(key, partialAttrs) // throws NodeNotFoundError
graph.hasNode(key): boolean
graph.getNodeAttributes(key): NodeAttrs
graph.forEachNode(callback): void
```
### Edge Operations
```typescript
graph.addEdge(source, target, attrs?) // throws NodeNotFoundError, DuplicateEdgeError, CycleError
graph.removeEdge(source, target) // no-op if not found
graph.hasEdge(source, target): boolean
graph.getEdgeAttributes(source, target): EdgeAttrs
graph.forEachEdge(callback): void
```
### Traversal
```typescript
graph.topologicalOrder(): string[]
graph.ancestors(nodeId): string[]
graph.descendants(nodeId): string[]
graph.predecessors(nodeId): string[]
graph.successors(nodeId): string[]
graph.reachableFrom(nodeIds): Set<string>
graph.hasCycles(): boolean
graph.findCycles(): string[][]
```
### Call Graph Convenience
```typescript
graph.addCall(attrs: CallNodeAttrs): void
graph.addDependency(source, target): void
graph.updateStatus(requestId, status, extra?): void // throws InvalidTransitionError
graph.updateCall(requestId, partialAttrs): void
graph.removeCall(requestId): void
graph.updateFromEvent(event: CallEventMapValue): void
graph.filterByStatus(status: CallStatus): string[]
graph.getRoots(): string[]
graph.children(requestId): string[]
graph.duration(requestId): number
graph.lineage(requestId): string[]
```
### Serialization
```typescript
graph.export(): FlowGraphSerialized
graph.toJSON(): FlowGraphSerialized
graph.toString(): string
```
### Escape Hatch
```typescript
graph.graph // → DirectedGraph (raw graphology instance)
```
Direct mutation via `graph.graph` bypasses flowgraph validation. Use with caution.
## Schema Enums
| Enum | Values |
|------|--------|
| `CallStatus` | `pending`, `running`, `completed`, `failed`, `aborted` |
| `NodeStatus` | `idle`, `waiting`, `ready`, `running`, `completed`, `failed`, `skipped`, `aborted` |
| `EdgeType` | `triggered`, `depends_on`, `typed`, `sequential`, `conditional` |
| `OperationType` | `query`, `mutation`, `subscription` |
Call status transitions: `pending → running → completed|failed|aborted`. Terminal states are immutable. `InvalidTransitionError` is thrown on invalid transitions.
## Workflow Components
| Component | Props | Behavior |
|-----------|-------|----------|
| `<Operation>` | `name`, `input?`, `retries?`, `timeout?` | Declares an operation node in the workflow |
| `<Sequential>` | `id?` | Children execute in order; edges are `sequential` |
| `<Parallel>` | `id?`, `maxConcurrency?` | Children execute concurrently |
| `<Conditional>` | `test`, `else?` | Branches on `test(results)`. Children = then-branch, `else` prop = else-branch |
| `<Map>` | `over`, `as` | Iterates over `over` collection, binding each item as `as` variable |
## Analysis Functions
```typescript
import { typeCompat, validateTemplate, topologicalOrder, parallelGroups, criticalPath } from "@alkdev/flowgraph/analysis";
typeCompat(outputSchema, inputSchema): TypeCompatResult | undefined
validateTemplate(template, operationGraph): AnyValidationError[]
topologicalOrder(graph): string[]
parallelGroups(graph): string[][] // topological generations
criticalPath(graph): string[] // longest path
validateGraph(graph): GraphValidationError[]
validateSchema(graph, schema): ValidationError[]
validate(graph, schema): AnyValidationError[] // combined
```
## Reactive Execution
`WorkflowReactiveRoot` implements `EventLogProjection` — call protocol events are the source of truth, status/results are derived projections.
```typescript
const workflow = new WorkflowReactiveRoot(dag, {
failurePolicy: "abort-dependents", // or "continue-running"
parallelGroups: { group1: { siblings: ["a", "b"], maxConcurrency: 2 } },
});
// Per-node reactive signals
workflow.statusMap // Map<string, Signal<NodeStatus>>
workflow.preconditions // Map<string, ReadonlySignal<boolean>> — all predecessors completed/skipped
workflow.canStart // Map<string, ReadonlySignal<boolean>> — preconditions + concurrency
workflow.blockedByFailure // Map<string, ReadonlySignal<boolean>> — any predecessor failed/aborted
workflow.resultMap // Map<string, ReadonlySignal<CallResult | undefined>>
// Event-driven updates
workflow.append(event: CallEventMapValue): void
// Queries
workflow.getStatus(nodeId): NodeStatus
workflow.getResult(nodeId): CallResult | undefined
workflow.isComplete(): boolean
workflow.getAggregateStatus(): AggregateStatus
// Lifecycle
workflow.abortAll(): void
workflow.abortNode(nodeId): void
workflow.dispose(): void // mandatory cleanup — releases signal subscriptions
```
## Error Hierarchy
```
FlowgraphError (base)
├── ConstructionError
│ ├── DuplicateNodeError (readonly key)
│ ├── DuplicateEdgeError (readonly source, target)
│ ├── NodeNotFoundError (readonly key)
│ ├── CycleError (readonly cycles: string[][])
│ └── InvalidInputError (readonly errors: ValidationError[])
└── InvalidTransitionError (readonly requestId, from, to)
```
## Design Principles
1. **DAG-only, no cycles**`addEdge()` rejects cycle-creating edges at mutation time (ADR-002). This differs from taskgraph, which allows cycles and detects them after the fact.
2. **Storage is decoupled** — flowgraph handles in-memory graph construction, validation, and analysis. Persistence is the caller's concern. `export()`/`fromJSON()` provides the serialization boundary.
3. **Template → DAG → Execution is a pipeline** — each representation serves a different phase and can exist independently. Validate a template without executing it. Build a call graph from events without a template. Run reactive execution directly from a DAG.
4. **Event log as source of truth** — call protocol events (`call.requested`, `call.responded`, `call.error`, `call.aborted`, `call.completed`) are the ground truth. Status, results, and the call graph are projections derived from the event log (ADR-005).
5. **Delegation, not inheritance**`FlowGraph` wraps a graphology `DirectedGraph`, exposing a curated API. The raw graphology instance is available via the `.graph` escape hatch.
## For AI Agents
When working with this library programmatically:
- **Use subpath imports** — `@alkdev/flowgraph/graph`, `@alkdev/flowgraph/analysis`, etc. The root export re-exports everything but subpath imports make dependencies explicit.
- **Always call `dispose()` on `WorkflowReactiveRoot`** — signal subscriptions leak without it.
- **Function-valued props don't survive JSON serialization** — `Conditional.test` and `Map.over` with function values need runtime resolution. Use string references for stored templates.
- **`fromSpecs()` graphs are conventionally immutable** — don't mutate operation graphs after construction. If the registry changes, rebuild via `fromSpecs()`.
- **Call graph mutation uses event protocol** — use `updateFromEvent()` or `addCall()`/`updateStatus()`, not direct node mutation.
- **`typeCompat()` returns `undefined` for `unknown`/`any` schemas** — this means "no meaningful check possible", not "incompatible".
- **Architecture specs are in `docs/architecture/`** — detailed design decisions, ADRs, and open questions live there. This README is a surface-level guide. Consult the architecture docs for anything non-trivial.
## Dependencies
| Package | Relationship |
|---------|-------------|
| `graphology` | Direct — the underlying directed graph data structure |
| `graphology-dag` | Direct — topological sort, cycle detection, DAG traversal |
| `@alkdev/ujsx` | Direct — `UNode` trees and `HostConfig` for workflow template rendering |
| `@alkdev/typebox` | Direct — all schemas are TypeBox Modules |
| `@preact/signals-core` | Direct — reactive state management for `WorkflowReactiveRoot` |
| `@alkdev/operations` | **Peer** — provides `OperationSpec`, `OperationRegistry`, call event types |
## License
Dual-licensed under [MIT](LICENSE-MIT) or [Apache-2.0](LICENSE-APACHE) at your option.

View File

@@ -17,7 +17,7 @@
"default": "./dist/index.cjs" "default": "./dist/index.cjs"
} }
}, },
"/component": { "./component": {
"import": { "import": {
"types": "./dist/component/index.d.ts", "types": "./dist/component/index.d.ts",
"default": "./dist/component/index.js" "default": "./dist/component/index.js"
@@ -27,7 +27,7 @@
"default": "./dist/component/index.cjs" "default": "./dist/component/index.cjs"
} }
}, },
"/host": { "./host": {
"import": { "import": {
"types": "./dist/host/index.d.ts", "types": "./dist/host/index.d.ts",
"default": "./dist/host/index.js" "default": "./dist/host/index.js"
@@ -37,7 +37,7 @@
"default": "./dist/host/index.cjs" "default": "./dist/host/index.cjs"
} }
}, },
"/schema": { "./schema": {
"import": { "import": {
"types": "./dist/schema/index.d.ts", "types": "./dist/schema/index.d.ts",
"default": "./dist/schema/index.js" "default": "./dist/schema/index.js"
@@ -47,7 +47,7 @@
"default": "./dist/schema/index.cjs" "default": "./dist/schema/index.cjs"
} }
}, },
"/graph": { "./graph": {
"import": { "import": {
"types": "./dist/graph/index.d.ts", "types": "./dist/graph/index.d.ts",
"default": "./dist/graph/index.js" "default": "./dist/graph/index.js"
@@ -57,7 +57,7 @@
"default": "./dist/graph/index.cjs" "default": "./dist/graph/index.cjs"
} }
}, },
"/reactive": { "./reactive": {
"import": { "import": {
"types": "./dist/reactive/index.d.ts", "types": "./dist/reactive/index.d.ts",
"default": "./dist/reactive/index.js" "default": "./dist/reactive/index.js"
@@ -67,7 +67,7 @@
"default": "./dist/reactive/index.cjs" "default": "./dist/reactive/index.cjs"
} }
}, },
"/analysis": { "./analysis": {
"import": { "import": {
"types": "./dist/analysis/index.d.ts", "types": "./dist/analysis/index.d.ts",
"default": "./dist/analysis/index.js" "default": "./dist/analysis/index.js"
@@ -77,7 +77,7 @@
"default": "./dist/analysis/index.cjs" "default": "./dist/analysis/index.cjs"
} }
}, },
"/error": { "./error": {
"import": { "import": {
"types": "./dist/error/index.d.ts", "types": "./dist/error/index.d.ts",
"default": "./dist/error/index.js" "default": "./dist/error/index.js"
@@ -112,6 +112,15 @@
"operations" "operations"
], ],
"license": "MIT OR Apache-2.0", "license": "MIT OR Apache-2.0",
"repository": {
"type": "git",
"url": "git+https://git.alk.dev/alkdev/flowgraph.git"
},
"homepage": "https://git.alk.dev/alkdev/flowgraph",
"bugs": {
"url": "https://git.alk.dev/alkdev/flowgraph/issues"
},
"sideEffects": false,
"dependencies": { "dependencies": {
"@alkdev/typebox": "^0.34.49", "@alkdev/typebox": "^0.34.49",
"@alkdev/ujsx": "^0.1.0", "@alkdev/ujsx": "^0.1.0",

View File

@@ -4,7 +4,7 @@ export {
defaultEdgeType, defaultEdgeType,
resolveDefaultNodeAttrs, resolveDefaultNodeAttrs,
} from "./defaults.js"; } from "./defaults.js";
export { typeCompat, type TypeCompatResult, type TypeMismatch } from "./type-compat.js"; export { typeCompat, buildTypeEdges, type TypeCompatResult, type TypeMismatch } from "./type-compat.js";
export { export {
topologicalOrder, topologicalOrder,
parallelGroups, parallelGroups,
@@ -13,9 +13,9 @@ export {
ancestors, ancestors,
descendants, descendants,
} from "./ordering.js"; } from "./ordering.js";
export { buildTypeEdges } from "../graph/construction.js";
export { export {
validateSchema, validateSchema,
validateGraph, validateGraph,
validate, validate,
} from "../graph/validation.js"; } from "../graph/validation.js";
export { validatePreconditions, validateTemplate } from "./workflow.js";

View File

@@ -1,4 +1,11 @@
import { KindGuard, Kind, type TSchema } from "@alkdev/typebox"; import { KindGuard, Kind, type TSchema } from "@alkdev/typebox";
import { willCreateCycle } from "graphology-dag";
import type { FlowGraph } from "../graph/construction.js";
import {
OperationNodeAttrs as OperationNodeAttrsSchema,
OperationEdgeAttrs as OperationEdgeAttrsSchema,
} from "../schema/index.js";
import type { OperationNodeAttrs } from "../schema/index.js";
export interface TypeMismatch { export interface TypeMismatch {
path: string; path: string;
@@ -275,4 +282,25 @@ export function typeCompat(outputSchema: TSchema, inputSchema: TSchema): TypeCom
} }
return { compatible: false, mismatches }; return { compatible: false, mismatches };
}
export function buildTypeEdges(graph: FlowGraph<typeof OperationNodeAttrsSchema, typeof OperationEdgeAttrsSchema>): void {
const nodeKeys = graph.nodes();
for (const source of nodeKeys) {
for (const target of nodeKeys) {
if (source === target) continue;
const sourceAttrs = graph.getNodeAttributes(source as never) as unknown as OperationNodeAttrs;
const targetAttrs = graph.getNodeAttributes(target as never) as unknown as OperationNodeAttrs;
const result = typeCompat(sourceAttrs.outputSchema as TSchema, targetAttrs.inputSchema as TSchema);
if (result === undefined) continue;
if (graph.hasEdge(source, target)) continue;
if (willCreateCycle(graph.graph, source, target)) continue;
const detail = result.detail ?? `${sourceAttrs.namespace}.${sourceAttrs.name}.output → ${targetAttrs.namespace}.${targetAttrs.name}.input`;
graph.addTypedEdge(source, target, {
compatible: result.compatible,
detail,
...(result.mismatches !== undefined ? { mismatches: result.mismatches } : {}),
});
}
}
} }

View File

@@ -1 +1,197 @@
export {}; import type { TSchema } from "@alkdev/typebox";
import { KindGuard } from "@alkdev/typebox";
import type { UNode } from "@alkdev/ujsx";
import { createHostRoot } from "@alkdev/ujsx";
import { hasCycle } from "graphology-dag";
import { DirectedGraph } from "graphology";
import type { FlowGraph } from "../graph/construction.js";
import type { OperationNodeAttrs } from "../schema/node.js";
import type { OperationEdgeAttrs } from "../schema/edge.js";
import type { ValidationError, AnyValidationError } from "../error/index.js";
import { GraphologyHostConfig } from "../host/graphology.js";
import { reachableFrom } from "../graph/queries.js";
function getRequiredTopLevelFields(schema: unknown): Set<string> {
const fields = new Set<string>();
if (schema === null || schema === undefined || typeof schema !== "object") return fields;
const s = schema as TSchema;
if (!KindGuard.IsObject(s)) return fields;
const props = s.properties as Record<string, TSchema> | undefined;
const required = s.required as string[] | undefined;
if (props && required) {
for (const key of required) {
fields.add(key);
}
}
return fields;
}
function getProvidedFields(schema: unknown): Set<string> {
const fields = new Set<string>();
if (schema === null || schema === undefined || typeof schema !== "object") return fields;
const s = schema as TSchema;
if (!KindGuard.IsObject(s)) return fields;
const props = s.properties as Record<string, TSchema> | undefined;
if (props) {
for (const key of Object.keys(props)) {
fields.add(key);
}
}
return fields;
}
export function validatePreconditions(
graph: FlowGraph<typeof import("../schema/node.js").OperationNodeAttrs, typeof import("../schema/edge.js").OperationEdgeAttrs>,
): ValidationError[] {
const errors: ValidationError[] = [];
const nodeKeys = graph.nodes();
for (const nodeKey of nodeKeys) {
const attrs = graph.getNodeAttributes(nodeKey) as unknown as OperationNodeAttrs;
const inputSchema = attrs.inputSchema;
const requiredFields = getRequiredTopLevelFields(inputSchema);
if (requiredFields.size === 0) continue;
const predecessors = graph.predecessors(nodeKey);
if (predecessors.length === 0) {
for (const field of requiredFields) {
errors.push({
type: "schema",
nodeKey,
field,
message: `Required input field "${field}" has no predecessor providing it`,
});
}
continue;
}
const providedFields = new Set<string>();
for (const predKey of predecessors) {
const predAttrs = graph.getNodeAttributes(predKey) as unknown as OperationNodeAttrs;
const predProvided = getProvidedFields(predAttrs.outputSchema);
for (const field of predProvided) {
providedFields.add(field);
}
}
for (const field of requiredFields) {
if (!providedFields.has(field)) {
errors.push({
type: "schema",
nodeKey,
field,
message: `Required input field "${field}" is not provided by any predecessor`,
});
}
}
}
return errors;
}
function collectOperationNodeKeys(dag: DirectedGraph): string[] {
const names: string[] = [];
dag.forEachNode((key) => {
if (!key.startsWith("__")) {
names.push(key);
}
});
return names;
}
export function validateTemplate(
template: UNode,
operationGraph: FlowGraph<typeof import("../schema/node.js").OperationNodeAttrs, typeof import("../schema/edge.js").OperationEdgeAttrs>,
): AnyValidationError[] {
const errors: AnyValidationError[] = [];
let renderedDag: DirectedGraph;
try {
const root = createHostRoot(GraphologyHostConfig, null);
root.render(template);
renderedDag = root.ctx.graph as DirectedGraph;
} catch {
renderedDag = new DirectedGraph();
}
const templateNodeKeys = collectOperationNodeKeys(renderedDag);
const graphNodeKeys = new Set(operationGraph.nodes());
for (const opKey of templateNodeKeys) {
if (!graphNodeKeys.has(opKey)) {
errors.push({
type: "graph",
category: "orphan-node",
details: { operation: opKey, message: `Operation "${opKey}" not found in operation graph` },
});
}
}
if (hasCycle(renderedDag)) {
errors.push({
type: "graph",
category: "cycle",
details: { message: "Rendered template DAG contains a cycle" },
});
}
for (const opKey of templateNodeKeys) {
if (!graphNodeKeys.has(opKey)) continue;
const outEdges = renderedDag.outEdges(opKey) ?? [];
for (const edge of outEdges) {
const target = renderedDag.target(edge);
if (target.startsWith("__")) continue;
if (!graphNodeKeys.has(target)) continue;
if (operationGraph.hasEdge(opKey, target)) {
const edgeAttrs = operationGraph.getEdgeAttributes(opKey, target) as unknown as OperationEdgeAttrs;
if (!edgeAttrs.compatible) {
errors.push({
type: "type-compat",
sourceKey: opKey,
targetKey: target,
compatible: false,
mismatches: edgeAttrs.mismatches ?? [],
});
}
}
}
}
if (templateNodeKeys.length > 1) {
const roots: string[] = [];
for (const key of templateNodeKeys) {
const inDegree = renderedDag.inDegree(key);
if (inDegree === 0) {
roots.push(key);
}
}
if (roots.length > 0) {
const reachable = reachableFrom(renderedDag, roots);
for (const nodeKey of templateNodeKeys) {
if (!reachable.has(nodeKey)) {
errors.push({
type: "graph",
category: "orphan-node",
details: { nodeKey, message: `Operation "${nodeKey}" is not reachable from start` },
});
}
}
}
}
for (const nodeKey of templateNodeKeys) {
const inDegree = renderedDag.inDegree(nodeKey);
const outDegree = renderedDag.outDegree(nodeKey);
if (inDegree === 0 && outDegree === 0 && templateNodeKeys.length > 1) {
errors.push({
type: "graph",
category: "orphan-node",
details: { nodeKey, message: `Operation "${nodeKey}" has no edges (orphan node)` },
});
}
}
return errors;
}

View File

@@ -1,4 +1,10 @@
type CallStatus = "pending" | "running" | "completed" | "failed" | "aborted"; import type { CallStatus } from "../schema/enums.js";
interface TypeMismatch {
path: string;
expected: string;
actual: string;
}
class FlowgraphError extends Error { class FlowgraphError extends Error {
constructor(message: string) { constructor(message: string) {
@@ -88,12 +94,6 @@ interface GraphValidationError {
details: unknown; details: unknown;
} }
interface TypeMismatch {
path: string;
expected: string;
actual: string;
}
interface TypeIncompatError { interface TypeIncompatError {
type: "type-compat"; type: "type-compat";
sourceKey: string; sourceKey: string;
@@ -116,10 +116,8 @@ export {
}; };
export type { export type {
CallStatus,
ValidationError, ValidationError,
GraphValidationError, GraphValidationError,
TypeMismatch,
TypeIncompatError, TypeIncompatError,
AnyValidationError, AnyValidationError,
}; };

View File

@@ -1,13 +1,17 @@
import { DirectedGraph } from "graphology"; import { DirectedGraph } from "graphology";
import type { TSchema, Static } from "@alkdev/typebox"; import type { TSchema, Static } from "@alkdev/typebox";
import { Value } from "@alkdev/typebox/value";
import { willCreateCycle, topologicalSort, hasCycle } from "graphology-dag"; import { willCreateCycle, topologicalSort, hasCycle } from "graphology-dag";
import { import {
DuplicateNodeError, DuplicateNodeError,
DuplicateEdgeError, DuplicateEdgeError,
NodeNotFoundError, NodeNotFoundError,
CycleError, CycleError,
InvalidInputError,
InvalidTransitionError,
} from "../error/index.js"; } from "../error/index.js";
import type { CallStatus, AnyValidationError } from "../error/index.js"; import type { AnyValidationError, ValidationError } from "../error/index.js";
import type { CallStatus } from "../schema/enums.js";
import { import {
findCycles, findCycles,
reachableFrom as reachableFromFn, reachableFrom as reachableFromFn,
@@ -16,9 +20,15 @@ import { validate as _validate } from "./validation.js";
import { import {
OperationNodeAttrs as OperationNodeAttrsSchema, OperationNodeAttrs as OperationNodeAttrsSchema,
OperationEdgeAttrs as OperationEdgeAttrsSchema, OperationEdgeAttrs as OperationEdgeAttrsSchema,
OperationGraphSerialized,
CallGraphSerialized,
CallNodeAttrs as CallNodeAttrsSchema,
CallEdgeAttrs as CallEdgeAttrsSchema,
} from "../schema/index.js"; } from "../schema/index.js";
import type { OperationNodeAttrs } from "../schema/index.js"; import type { FlowGraphSerialized, CallNodeAttrs } from "../schema/index.js";
import { typeCompat, type TypeCompatResult } from "../analysis/type-compat.js"; import { buildTypeEdges, type TypeCompatResult } from "../analysis/type-compat.js";
export { buildTypeEdges } from "../analysis/type-compat.js";
export interface FlowGraphOptions { export interface FlowGraphOptions {
type?: "directed"; type?: "directed";
@@ -37,8 +47,55 @@ export interface OperationSpec {
tags?: string[]; tags?: string[];
} }
export interface CallRequestedEvent {
type: "call.requested";
requestId: string;
operationId: string;
input: unknown;
timestamp: string;
parentRequestId?: string;
identity?: { id: string; scopes: string[]; resources?: Record<string, string[]> };
startedAt?: string;
}
export interface CallRespondedEvent {
type: "call.responded";
requestId: string;
output: unknown;
timestamp: string;
}
export interface CallErrorEvent {
type: "call.error";
requestId: string;
error: { code: string; message: string; details?: unknown };
timestamp: string;
}
export interface CallAbortedEvent {
type: "call.aborted";
requestId: string;
timestamp: string;
}
export interface CallCompletedEvent {
type: "call.completed";
requestId: string;
output?: unknown;
timestamp: string;
}
export type CallEventMapValue =
| CallRequestedEvent
| CallRespondedEvent
| CallErrorEvent
| CallAbortedEvent
| CallCompletedEvent;
type OperationGraph = FlowGraph<typeof OperationNodeAttrsSchema, typeof OperationEdgeAttrsSchema>; type OperationGraph = FlowGraph<typeof OperationNodeAttrsSchema, typeof OperationEdgeAttrsSchema>;
type CallGraph = FlowGraph<typeof CallNodeAttrsSchema, typeof CallEdgeAttrsSchema>;
type TypedEdgeAttrs = { type TypedEdgeAttrs = {
edgeType: "typed"; edgeType: "typed";
compatible: boolean; compatible: boolean;
@@ -48,6 +105,14 @@ type TypedEdgeAttrs = {
type Attrs = Record<string, unknown>; type Attrs = Record<string, unknown>;
const VALID_TRANSITIONS: Record<CallStatus, CallStatus[]> = {
pending: ["running", "aborted"],
running: ["completed", "failed", "aborted"],
completed: [],
failed: [],
aborted: [],
};
export class FlowGraph< export class FlowGraph<
NodeAttrs extends TSchema = TSchema, NodeAttrs extends TSchema = TSchema,
EdgeAttrs extends TSchema = TSchema, EdgeAttrs extends TSchema = TSchema,
@@ -314,6 +379,145 @@ export class FlowGraph<
return chain; return chain;
} }
updateFromEvent(event: CallEventMapValue): void {
switch (event.type) {
case "call.requested": {
const attrs: CallNodeAttrs = {
requestId: event.requestId,
operationId: event.operationId,
status: "pending",
input: event.input,
...(event.parentRequestId !== undefined ? { parentRequestId: event.parentRequestId } : {}),
...(event.identity !== undefined ? { identity: event.identity } : {}),
...(event.startedAt !== undefined ? { startedAt: event.startedAt } : {}),
};
this.addCall(attrs);
break;
}
case "call.responded": {
if (!this._graph.hasNode(event.requestId)) return;
const current = this._graph.getNodeAttributes(event.requestId) as Record<string, unknown>;
const currentStatus = current.status as CallStatus;
if (currentStatus === "completed" || currentStatus === "failed" || currentStatus === "aborted") return;
this._graph.mergeNodeAttributes(event.requestId, {
status: "completed",
output: event.output,
completedAt: event.timestamp,
} as Attrs);
break;
}
case "call.error": {
if (!this._graph.hasNode(event.requestId)) return;
const current = this._graph.getNodeAttributes(event.requestId) as Record<string, unknown>;
const currentStatus = current.status as CallStatus;
if (currentStatus === "completed" || currentStatus === "failed" || currentStatus === "aborted") return;
this._graph.mergeNodeAttributes(event.requestId, {
status: "failed",
error: event.error,
completedAt: event.timestamp,
} as Attrs);
break;
}
case "call.aborted": {
if (!this._graph.hasNode(event.requestId)) return;
const current = this._graph.getNodeAttributes(event.requestId) as Record<string, unknown>;
const currentStatus = current.status as CallStatus;
if (currentStatus === "completed" || currentStatus === "failed" || currentStatus === "aborted") return;
this._graph.mergeNodeAttributes(event.requestId, {
status: "aborted",
completedAt: event.timestamp,
} as Attrs);
break;
}
case "call.completed": {
if (!this._graph.hasNode(event.requestId)) return;
const current = this._graph.getNodeAttributes(event.requestId) as Record<string, unknown>;
const currentStatus = current.status as CallStatus;
if (currentStatus === "completed") {
if (!current.completedAt) {
this._graph.mergeNodeAttributes(event.requestId, { completedAt: event.timestamp } as Attrs);
}
return;
}
if (currentStatus === "failed" || currentStatus === "aborted") return;
this._graph.mergeNodeAttributes(event.requestId, {
status: "completed",
...(event.output !== undefined ? { output: event.output } : {}),
completedAt: event.timestamp,
} as Attrs);
break;
}
}
}
addCall(attrs: CallNodeAttrs): void {
if (this._graph.hasNode(attrs.requestId)) return;
this._graph.addNode(attrs.requestId, attrs as Attrs);
if (attrs.parentRequestId !== undefined) {
if (this._graph.hasNode(attrs.parentRequestId)) {
if (willCreateCycle(this._graph, attrs.parentRequestId, attrs.requestId)) {
this._graph.dropNode(attrs.requestId);
const path = this._findPath(attrs.requestId, attrs.parentRequestId);
const cycle = [attrs.parentRequestId, ...path, attrs.parentRequestId];
throw new CycleError([cycle]);
}
const edgeKey = this._edgeKey(attrs.parentRequestId, attrs.requestId);
this._graph.addEdgeWithKey(edgeKey, attrs.parentRequestId, attrs.requestId, { edgeType: "triggered" } as Attrs);
}
}
}
addDependency(source: string, target: string): void {
if (!this._graph.hasNode(source)) {
throw new NodeNotFoundError(source);
}
if (!this._graph.hasNode(target)) {
throw new NodeNotFoundError(target);
}
const edgeKey = `${source}->${target}:depends_on`;
if (this._graph.hasEdge(edgeKey)) return;
if (willCreateCycle(this._graph, source, target)) {
const path = this._findPath(target, source);
const cycle = [source, ...path, source];
throw new CycleError([cycle]);
}
this._graph.addEdgeWithKey(edgeKey, source, target, { edgeType: "depends_on" } as Attrs);
}
updateStatus(requestId: string, status: CallStatus, extra?: Partial<CallNodeAttrs>): void {
if (!this._graph.hasNode(requestId)) {
throw new NodeNotFoundError(requestId);
}
const current = this._graph.getNodeAttributes(requestId) as Record<string, unknown>;
const currentStatus = current.status as CallStatus;
if (currentStatus === status) return;
const allowed = VALID_TRANSITIONS[currentStatus];
if (!allowed || !allowed.includes(status)) {
throw new InvalidTransitionError(requestId, currentStatus, status);
}
const update: Record<string, unknown> = { status };
if (extra) {
for (const [key, value] of Object.entries(extra)) {
if (value !== undefined) {
update[key] = value;
}
}
}
this._graph.mergeNodeAttributes(requestId, update as Attrs);
}
updateCall(requestId: string, attrs: Partial<CallNodeAttrs>): void {
if (!this._graph.hasNode(requestId)) {
throw new NodeNotFoundError(requestId);
}
this._graph.mergeNodeAttributes(requestId, attrs as Attrs);
}
removeCall(requestId: string): void {
if (!this._graph.hasNode(requestId)) return;
this._graph.dropNode(requestId);
}
validate(schema: TSchema): AnyValidationError[] { validate(schema: TSchema): AnyValidationError[] {
return _validate(this, schema as NodeAttrs); return _validate(this, schema as NodeAttrs);
} }
@@ -361,16 +565,72 @@ export class FlowGraph<
return graph; return graph;
} }
static fromCallEvents( static fromCallEvents(events: CallEventMapValue[]): CallGraph {
_events: unknown[], const graph = new FlowGraph<typeof CallNodeAttrsSchema, typeof CallEdgeAttrsSchema>();
): FlowGraph<TSchema, TSchema> { for (const event of events) {
throw new Error("not implemented"); graph.updateFromEvent(event);
}
return graph;
}
export(): FlowGraphSerialized {
return this._graph.export() as unknown as FlowGraphSerialized;
}
toJSON(): FlowGraphSerialized {
return this.export();
}
toString(): string {
return JSON.stringify(this.export());
} }
static fromJSON( static fromJSON(
_data: unknown, data: FlowGraphSerialized,
): FlowGraph<TSchema, TSchema> { ): FlowGraph<TSchema, TSchema> {
throw new Error("not implemented"); const opCheck = Value.Check(OperationGraphSerialized, data);
const callCheck = Value.Check(CallGraphSerialized, data);
if (!opCheck && !callCheck) {
const errors: ValidationError[] = [];
const opIter = Value.Errors(OperationGraphSerialized, data as Record<string, unknown>);
for (const err of opIter) {
errors.push({
type: "schema",
nodeKey: "",
field: err.path.replace(/^\//, "") || err.path,
message: err.message,
value: err.value,
});
}
if (errors.length === 0) {
const callIter = Value.Errors(CallGraphSerialized, data as Record<string, unknown>);
for (const err of callIter) {
errors.push({
type: "schema",
nodeKey: "",
field: err.path.replace(/^\//, "") || err.path,
message: err.message,
value: err.value,
});
}
}
throw new InvalidInputError(errors);
}
const fg = new FlowGraph<TSchema, TSchema>();
for (const node of data.nodes) {
fg._graph.addNode(node.key, node.attributes as Attrs);
}
for (const edge of data.edges) {
fg._graph.addEdgeWithKey(edge.key, edge.source, edge.target, edge.attributes as Attrs);
}
if (hasCycle(fg._graph)) {
const cycles = findCycles(fg._graph);
throw new CycleError(cycles);
}
return fg;
} }
private _findPath(from: string, to: string): string[] { private _findPath(from: string, to: string): string[] {
@@ -397,25 +657,4 @@ export class FlowGraph<
} }
return []; return [];
} }
}
export function buildTypeEdges(graph: OperationGraph): void {
const nodeKeys = graph.nodes();
for (const source of nodeKeys) {
for (const target of nodeKeys) {
if (source === target) continue;
const sourceAttrs = graph.getNodeAttributes(source as never) as unknown as OperationNodeAttrs;
const targetAttrs = graph.getNodeAttributes(target as never) as unknown as OperationNodeAttrs;
const result = typeCompat(sourceAttrs.outputSchema as TSchema, targetAttrs.inputSchema as TSchema);
if (result === undefined) continue;
if (graph.hasEdge(source, target)) continue;
if (willCreateCycle(graph.graph, source, target)) continue;
const detail = result.detail ?? `${sourceAttrs.namespace}.${sourceAttrs.name}.output → ${targetAttrs.namespace}.${targetAttrs.name}.input`;
graph.addTypedEdge(source, target, {
compatible: result.compatible,
detail,
...(result.mismatches !== undefined ? { mismatches: result.mismatches } : {}),
});
}
}
} }

View File

@@ -1,14 +1,5 @@
export { FlowGraph, buildTypeEdges, type FlowGraphOptions, type OperationSpec } from "./construction.js"; export { FlowGraph, buildTypeEdges, type FlowGraphOptions, type OperationSpec, type CallEventMapValue, type CallRequestedEvent, type CallRespondedEvent, type CallErrorEvent, type CallAbortedEvent, type CallCompletedEvent } from "./construction.js";
export { export {
topologicalOrder,
hasCycles, hasCycles,
findCycles, findCycles,
ancestors, } from "./queries.js";
descendants,
reachableFrom,
} from "./queries.js";
export {
validateSchema,
validateGraph,
validate,
} from "./validation.js";

View File

@@ -1,2 +1,4 @@
export { GraphologyHostConfig } from "./graphology.js"; export { GraphologyHostConfig } from "./graphology.js";
export type { WorkflowTag, GraphNode, GraphContext, OperationRegistry } from "./graphology.js"; export type { WorkflowTag, GraphNode, GraphContext, OperationRegistry as GraphOperationRegistry } from "./graphology.js";
export { ReactiveHostConfig } from "./reactive.js";
export type { WorkflowNode, ReactiveContext, OperationRegistry } from "./reactive.js";

View File

@@ -1 +1,250 @@
export {}; import { signal, computed } from "@preact/signals-core";
import type { Signal, ReadonlySignal } from "@preact/signals-core";
import type { HostConfig } from "@alkdev/ujsx";
import type { NodeStatus } from "../schema/enums.js";
import type { CallResult } from "../schema/edge.js";
import type { EventLogProjection } from "../reactive/workflow.js";
import type { WorkflowTag } from "./graphology.js";
export interface OperationRegistry {
resolve(name: string): unknown;
}
export interface WorkflowNode {
key: string;
type: WorkflowTag;
status: Signal<NodeStatus>;
preconditions: ReadonlySignal<boolean>;
blockedByFailure: ReadonlySignal<boolean>;
operationId?: string;
output?: Signal<unknown>;
children: WorkflowNode[];
}
export interface ReactiveContext {
operationRegistry: OperationRegistry;
nodes: Map<string, WorkflowNode>;
statusSignals: Map<string, Signal<NodeStatus>>;
preconditions: Map<string, ReadonlySignal<boolean>>;
blockedByFailure: Map<string, ReadonlySignal<boolean>>;
resultProjection: EventLogProjection;
parentMap: Map<string, string>;
siblingMap: Map<string, string[]>;
results: Map<string, ReadonlySignal<CallResult | undefined>>;
_containerCounter: number;
}
function collectLeafPredecessors(
nodeKey: string,
ctx: ReactiveContext,
): string[] {
const parentKey = ctx.parentMap.get(nodeKey);
if (!parentKey) return [];
const parentNode = ctx.nodes.get(parentKey);
if (!parentNode) return [];
const siblings = ctx.siblingMap.get(parentKey);
if (!siblings) return [];
const idx = siblings.indexOf(nodeKey);
switch (parentNode.type) {
case "sequential": {
if (idx > 0) {
const prevKey = siblings[idx - 1]!;
const prevNode = ctx.nodes.get(prevKey);
if (prevNode && prevNode.type === "operation") {
return [prevKey];
}
return collectLeafPredecessors(prevKey, ctx);
}
return collectLeafPredecessors(parentKey, ctx);
}
case "parallel":
case "map":
case "conditional": {
return collectLeafPredecessors(parentKey, ctx);
}
default:
return [];
}
}
function computePreconditions(
node: WorkflowNode,
ctx: ReactiveContext,
): boolean {
const predecessors = collectLeafPredecessors(node.key, ctx);
if (predecessors.length === 0) return true;
return predecessors.every((predKey) => {
const predStatus = ctx.statusSignals.get(predKey);
if (!predStatus) return true;
return predStatus.value === "completed" || predStatus.value === "skipped";
});
}
function computeBlockedByFailure(
node: WorkflowNode,
ctx: ReactiveContext,
): boolean {
const predecessors = collectLeafPredecessors(node.key, ctx);
return predecessors.some((predKey) => {
const predStatus = ctx.statusSignals.get(predKey);
if (!predStatus) return false;
return predStatus.value === "failed" || predStatus.value === "aborted";
});
}
export const ReactiveHostConfig: HostConfig<WorkflowTag, WorkflowNode, ReactiveContext> = {
name: "reactive",
createRootContext(_container, options) {
const ctx: ReactiveContext = {
operationRegistry: options?.registry as OperationRegistry ?? { resolve: () => undefined },
nodes: new Map(),
statusSignals: new Map(),
preconditions: new Map(),
blockedByFailure: new Map(),
resultProjection: options?.resultProjection as EventLogProjection ?? {
append() {},
getStatus: () => "idle" as NodeStatus,
getResult: () => undefined,
getEvents: () => [],
},
parentMap: new Map(),
siblingMap: new Map(),
results: new Map(),
_containerCounter: 0,
};
return ctx;
},
createInstance(tag, props, ctx, parent) {
if (tag === "operation") {
const key = props.name as string;
const status = signal<NodeStatus>("idle");
const node: WorkflowNode = {
key,
type: "operation",
status,
preconditions: computed(() => computePreconditions(node, ctx)),
blockedByFailure: computed(() => computeBlockedByFailure(node, ctx)),
operationId: key,
output: signal<unknown>(undefined),
children: [],
};
ctx.nodes.set(key, node);
ctx.statusSignals.set(key, status);
ctx.preconditions.set(key, node.preconditions);
ctx.blockedByFailure.set(key, node.blockedByFailure);
if (parent) {
ctx.parentMap.set(key, parent.key);
const siblings = ctx.siblingMap.get(parent.key);
if (siblings) {
siblings.push(key);
} else {
ctx.siblingMap.set(parent.key, [key]);
}
}
return node;
}
const counter = ctx._containerCounter++;
const key = `__${tag}_${counter}`;
const status = signal<NodeStatus>("idle");
const node: WorkflowNode = {
key,
type: tag,
status,
preconditions: computed(() => computePreconditions(node, ctx)),
blockedByFailure: computed(() => computeBlockedByFailure(node, ctx)),
children: [],
};
ctx.nodes.set(key, node);
ctx.statusSignals.set(key, status);
ctx.preconditions.set(key, node.preconditions);
ctx.blockedByFailure.set(key, node.blockedByFailure);
if (parent) {
ctx.parentMap.set(key, parent.key);
const siblings = ctx.siblingMap.get(parent.key);
if (siblings) {
siblings.push(key);
} else {
ctx.siblingMap.set(parent.key, [key]);
}
}
if (!ctx.siblingMap.has(key)) {
ctx.siblingMap.set(key, []);
}
return node;
},
createTextInstance(_text, ctx, _parent) {
const counter = ctx._containerCounter++;
const key = `__text_${counter}`;
const status = signal<NodeStatus>("idle");
const node: WorkflowNode = {
key,
type: "sequential" as WorkflowTag,
status,
preconditions: computed(() => true),
blockedByFailure: computed(() => false),
children: [],
};
ctx.nodes.set(key, node);
ctx.statusSignals.set(key, status);
return node;
},
appendChild(parent, child, _ctx) {
if (!parent.children.includes(child)) {
parent.children.push(child);
}
},
insertBefore(parent, child, before, _ctx) {
const idx = parent.children.indexOf(before);
if (idx === -1) {
parent.children.push(child);
} else {
if (!parent.children.includes(child)) {
parent.children.splice(idx, 0, child);
}
}
},
removeChild(parent, child, ctx) {
parent.children = parent.children.filter((c) => c.key !== child.key);
const siblings = ctx.siblingMap.get(parent.key);
if (siblings) {
const idx = siblings.indexOf(child.key);
if (idx !== -1) {
siblings.splice(idx, 1);
}
}
},
prepareUpdate() {
return null;
},
commitUpdate() {
},
emit() {
},
finalizeInstance(_instance, _ctx) {
},
};

View File

@@ -1,8 +1,7 @@
export * from "./error/index.js"; export * from "./error/index.js";
export * from "./schema/index.js";
export { FlowGraph, buildTypeEdges, type FlowGraphOptions, type OperationSpec } from "./graph/index.js"; export * from "./component/index.js";
export { export * from "./host/index.js";
validateSchema, export * from "./reactive/index.js";
validateGraph, export * from "./analysis/index.js";
validate, export * from "./graph/index.js";
} from "./graph/validation.js";

View File

@@ -1,12 +1,22 @@
export { export {
WorkflowReactiveRoot, WorkflowReactiveRoot,
type FailurePolicy, type FailurePolicy,
type CallEventMapValue,
type CallRequestedEvent,
type CallRespondedEvent,
type CallErrorEvent,
type CallAbortedEvent,
type CallCompletedEvent,
type EventLogProjection, type EventLogProjection,
type AggregateStatus, type AggregateStatus,
type ParallelGroup,
type ParallelGroupConfig,
} from "./workflow.js"; } from "./workflow.js";
export type {
WorkflowNode,
ReactiveContext,
} from "../host/reactive.js";
export {
computePreconditions,
computeBlockedByFailure,
registerStartEffect,
registerAbortEffect,
type NodeStatusContext,
type AbortEffectOptions,
} from "./node-status.js";

View File

@@ -1 +1,81 @@
export {}; import { effect } from "@preact/signals-core";
import type { Signal, ReadonlySignal } from "@preact/signals-core";
import type { NodeStatus } from "../schema/enums.js";
const TERMINAL_STATUSES: Set<NodeStatus> = new Set([
"completed",
"failed",
"aborted",
"skipped",
]);
export interface NodeStatusContext {
statusMap: Map<string, Signal<NodeStatus>>;
predecessors: string[];
}
export function computePreconditions(
_nodeKey: string,
ctx: NodeStatusContext,
): boolean {
if (ctx.predecessors.length === 0) return true;
return ctx.predecessors.every((pred: string) => {
const predStatus = ctx.statusMap.get(pred);
if (!predStatus) return false;
return predStatus.value === "completed" || predStatus.value === "skipped";
});
}
export function computeBlockedByFailure(
_nodeKey: string,
ctx: NodeStatusContext,
): boolean {
return ctx.predecessors.some((pred: string) => {
const predStatus = ctx.statusMap.get(pred);
if (!predStatus) return false;
return predStatus.value === "failed" || predStatus.value === "aborted";
});
}
export function registerStartEffect(
status: Signal<NodeStatus>,
canStart: ReadonlySignal<boolean>,
effectDisposers: (() => void)[],
): void {
const disposer = effect(() => {
if (canStart.value) {
const current = status.value;
if (current === "idle" || current === "waiting") {
status.value = "ready";
}
}
});
effectDisposers.push(disposer);
}
export interface AbortEffectOptions {
abortDependents?: boolean;
}
export function registerAbortEffect(
status: Signal<NodeStatus>,
blockedByFailure: ReadonlySignal<boolean>,
effectDisposers: (() => void)[],
options?: AbortEffectOptions,
): void {
const disposer = effect(() => {
if (blockedByFailure.value) {
const current = status.value;
if (options?.abortDependents) {
if (!TERMINAL_STATUSES.has(current)) {
status.value = "aborted";
}
} else {
if (current === "idle" || current === "waiting") {
status.value = "aborted";
}
}
}
});
effectDisposers.push(disposer);
}

View File

@@ -1,53 +1,29 @@
import { signal, computed, effect } from "@preact/signals-core"; import { signal, computed } from "@preact/signals-core";
import type { Signal, ReadonlySignal } from "@preact/signals-core"; import type { Signal, ReadonlySignal } from "@preact/signals-core";
import type { DirectedGraph } from "graphology"; import type { DirectedGraph } from "graphology";
import type { NodeStatus } from "../schema/enums.js"; import type { NodeStatus } from "../schema/enums.js";
import type { CallResult } from "../schema/edge.js"; import type { CallResult } from "../schema/edge.js";
import type { CallEventMapValue } from "../graph/construction.js";
export type { CallEventMapValue } from "../graph/construction.js";
import {
computePreconditions,
computeBlockedByFailure,
registerStartEffect,
registerAbortEffect,
} from "./node-status.js";
import type { NodeStatusContext } from "./node-status.js";
export type FailurePolicy = "continue-running" | "abort-dependents"; export type FailurePolicy = "continue-running" | "abort-dependents";
export interface CallRequestedEvent { export interface ParallelGroup {
type: "call.requested"; siblings: string[];
requestId: string; maxConcurrency?: number;
operationId: string;
input: unknown;
timestamp: string;
} }
export interface CallRespondedEvent { export interface ParallelGroupConfig {
type: "call.responded"; [groupKey: string]: ParallelGroup;
requestId: string;
output: unknown;
timestamp: string;
} }
export interface CallErrorEvent {
type: "call.error";
requestId: string;
error: { code: string; message: string; details?: unknown };
timestamp: string;
}
export interface CallAbortedEvent {
type: "call.aborted";
requestId: string;
timestamp: string;
}
export interface CallCompletedEvent {
type: "call.completed";
requestId: string;
output: unknown;
timestamp: string;
}
export type CallEventMapValue =
| CallRequestedEvent
| CallRespondedEvent
| CallErrorEvent
| CallAbortedEvent
| CallCompletedEvent;
export interface EventLogProjection { export interface EventLogProjection {
append(event: CallEventMapValue): void; append(event: CallEventMapValue): void;
getStatus(nodeId: string): NodeStatus; getStatus(nodeId: string): NodeStatus;
@@ -85,55 +61,86 @@ const EVENT_TO_STATUS: Record<string, NodeStatus> = {
export class WorkflowReactiveRoot implements EventLogProjection { export class WorkflowReactiveRoot implements EventLogProjection {
statusMap: Map<string, Signal<NodeStatus>>; statusMap: Map<string, Signal<NodeStatus>>;
preconditions: Map<string, ReadonlySignal<boolean>>; preconditions: Map<string, ReadonlySignal<boolean>>;
canStart: Map<string, ReadonlySignal<boolean>>;
blockedByFailure: Map<string, ReadonlySignal<boolean>>; blockedByFailure: Map<string, ReadonlySignal<boolean>>;
resultMap: Map<string, ReadonlySignal<CallResult | undefined>>; resultMap: Map<string, ReadonlySignal<CallResult | undefined>>;
nodeKeyToRequestId: Map<string, string>; nodeKeyToRequestId: Map<string, string>;
requestIdToNodeKey: Map<string, string>;
private graph: DirectedGraph; private graph: DirectedGraph;
private effectDisposers: (() => void)[]; private effectDisposers: (() => void)[];
private eventLog: CallEventMapValue[]; private eventLog: CallEventMapValue[];
private _failurePolicy: FailurePolicy; private _failurePolicy: FailurePolicy;
private _parallelGroups: ParallelGroupConfig;
constructor( constructor(
graph: DirectedGraph, graph: DirectedGraph,
options?: { failurePolicy?: FailurePolicy }, options?: { failurePolicy?: FailurePolicy; parallelGroups?: ParallelGroupConfig },
) { ) {
this.graph = graph; this.graph = graph;
this.statusMap = new Map(); this.statusMap = new Map();
this.preconditions = new Map(); this.preconditions = new Map();
this.canStart = new Map();
this.blockedByFailure = new Map(); this.blockedByFailure = new Map();
this.resultMap = new Map(); this.resultMap = new Map();
this.effectDisposers = []; this.effectDisposers = [];
this.eventLog = []; this.eventLog = [];
this.nodeKeyToRequestId = new Map(); this.nodeKeyToRequestId = new Map();
this.requestIdToNodeKey = new Map();
this._failurePolicy = options?.failurePolicy ?? "continue-running"; this._failurePolicy = options?.failurePolicy ?? "continue-running";
this._parallelGroups = options?.parallelGroups ?? {};
this.initializeSignals(); this.initializeSignals();
} }
setRequestId(nodeKey: string, requestId: string): void {
this.nodeKeyToRequestId.set(nodeKey, requestId);
this.requestIdToNodeKey.set(requestId, nodeKey);
}
private initializeSignals(): void { private initializeSignals(): void {
const nodeToGroupKey = new Map<string, string>();
for (const [groupKey, group] of Object.entries(this._parallelGroups)) {
for (const sibling of group.siblings) {
nodeToGroupKey.set(sibling, groupKey);
}
}
for (const node of this.graph.nodes()) { for (const node of this.graph.nodes()) {
const predecessors: string[] = this.graph.inNeighbors(node) ?? []; const predecessors: string[] = this.graph.inNeighbors(node) ?? [];
const status = signal<NodeStatus>("idle"); const status = signal<NodeStatus>("idle");
const ctx: NodeStatusContext = {
statusMap: this.statusMap,
predecessors,
};
const preconditionsComputed = computed(() => { const preconditionsComputed = computed(() => {
return predecessors.every((pred: string) => { return computePreconditions(node, ctx);
const predStatus = this.statusMap.get(pred);
if (!predStatus) return false;
return (
predStatus.value === "completed" || predStatus.value === "skipped"
);
});
}); });
const blockedByFailureComputed = computed(() => { const groupKey = nodeToGroupKey.get(node);
return predecessors.some((pred: string) => { const parallelGroup = groupKey ? this._parallelGroups[groupKey] : undefined;
const predStatus = this.statusMap.get(pred); const maxConc = parallelGroup?.maxConcurrency;
if (!predStatus) return false; const siblings = parallelGroup?.siblings ?? [];
return (
predStatus.value === "failed" || predStatus.value === "aborted" let canStartComputed: ReadonlySignal<boolean>;
); if (maxConc !== undefined && siblings.length > 0) {
const otherSiblings = siblings.filter((s) => s !== node);
canStartComputed = computed(() => {
if (!preconditionsComputed.value) return false;
const activeSiblingCount = otherSiblings.filter((sib) => {
const sibStatus = this.statusMap.get(sib);
return sibStatus && (sibStatus.value === "running" || sibStatus.value === "ready");
}).length;
return activeSiblingCount < maxConc;
}); });
} else {
canStartComputed = preconditionsComputed;
}
const blockedByFailureComputed = computed(() => {
return computeBlockedByFailure(node, ctx);
}); });
const resultComputed = computed(() => { const resultComputed = computed(() => {
@@ -182,29 +189,20 @@ export class WorkflowReactiveRoot implements EventLogProjection {
this.statusMap.set(node, status); this.statusMap.set(node, status);
this.preconditions.set(node, preconditionsComputed); this.preconditions.set(node, preconditionsComputed);
this.canStart.set(node, canStartComputed);
this.blockedByFailure.set(node, blockedByFailureComputed); this.blockedByFailure.set(node, blockedByFailureComputed);
this.resultMap.set(node, resultComputed); this.resultMap.set(node, resultComputed);
} }
for (const node of this.graph.nodes()) { for (const node of this.graph.nodes()) {
const status = this.statusMap.get(node)!; const status = this.statusMap.get(node)!;
const canStart = this.canStart.get(node)!;
const blocked = this.blockedByFailure.get(node)!; const blocked = this.blockedByFailure.get(node)!;
const disposer = effect(() => { registerStartEffect(status, canStart, this.effectDisposers);
if (blocked.value) { registerAbortEffect(status, blocked, this.effectDisposers, {
const current = status.value; abortDependents: this._failurePolicy === "abort-dependents",
if (current === "idle" || current === "waiting" || current === "ready") {
if (this._failurePolicy === "abort-dependents") {
if (!TERMINAL_STATUSES.has(current)) {
status.value = "aborted";
}
} else {
status.value = "aborted";
}
}
}
}); });
this.effectDisposers.push(disposer);
} }
} }
@@ -213,15 +211,29 @@ export class WorkflowReactiveRoot implements EventLogProjection {
if (!("requestId" in event)) return; if (!("requestId" in event)) return;
const nodeId = this.findNodeByRequestId(event.requestId); let nodeId = this.requestIdToNodeKey.get(event.requestId);
if (nodeId === undefined) {
for (const [nId, rid] of this.nodeKeyToRequestId) {
if (rid === event.requestId) {
nodeId = nId;
this.requestIdToNodeKey.set(event.requestId, nId);
break;
}
}
}
if (nodeId === undefined) return; if (nodeId === undefined) return;
const statusSignal = this.statusMap.get(nodeId); const currentRequestId = this.nodeKeyToRequestId.get(nodeId);
if (!statusSignal) return; if (currentRequestId === event.requestId) {
const statusSignal = this.statusMap.get(nodeId);
if (!statusSignal) return;
const derived = EVENT_TO_STATUS[event.type]; const derived = EVENT_TO_STATUS[event.type];
if (derived !== undefined) { if (derived !== undefined) {
statusSignal.value = derived; statusSignal.value = derived;
}
} }
} }
@@ -254,12 +266,17 @@ export class WorkflowReactiveRoot implements EventLogProjection {
} }
getEvents(nodeId: string): CallEventMapValue[] { getEvents(nodeId: string): CallEventMapValue[] {
const requestId = this.nodeKeyToRequestId.get(nodeId); const requestIds = new Set<string>();
if (!requestId) return []; for (const [rid, nId] of this.requestIdToNodeKey) {
if (nId === nodeId) {
requestIds.add(rid);
}
}
if (requestIds.size === 0) return [];
const events: CallEventMapValue[] = []; const events: CallEventMapValue[] = [];
for (const e of this.eventLog) { for (const e of this.eventLog) {
if ("requestId" in e && e.requestId === requestId) { if ("requestId" in e && requestIds.has(e.requestId)) {
events.push(e); events.push(e);
} }
} }
@@ -322,16 +339,11 @@ export class WorkflowReactiveRoot implements EventLogProjection {
this.effectDisposers = []; this.effectDisposers = [];
this.statusMap.clear(); this.statusMap.clear();
this.preconditions.clear(); this.preconditions.clear();
this.canStart.clear();
this.blockedByFailure.clear(); this.blockedByFailure.clear();
this.resultMap.clear(); this.resultMap.clear();
this.nodeKeyToRequestId.clear(); this.nodeKeyToRequestId.clear();
this.requestIdToNodeKey.clear();
this.eventLog = []; this.eventLog = [];
} }
}
private findNodeByRequestId(requestId: string): string | undefined {
for (const [nodeId, rid] of this.nodeKeyToRequestId) {
if (rid === requestId) return nodeId;
}
return undefined;
}
}

View File

@@ -1,7 +1,7 @@
--- ---
id: analysis/build-type-edges id: analysis/build-type-edges
name: Implement buildTypeEdges — populate operation graph with type-compatibility edges name: Implement buildTypeEdges — populate operation graph with type-compatibility edges
status: pending status: completed
depends_on: depends_on:
- analysis/type-compat - analysis/type-compat
- graph/construction-operation - graph/construction-operation

View File

@@ -1,7 +1,7 @@
--- ---
id: analysis/ordering id: analysis/ordering
name: Implement execution ordering functions (topologicalOrder, parallelGroups, criticalPath, reachableFrom) name: Implement execution ordering functions (topologicalOrder, parallelGroups, criticalPath, reachableFrom)
status: pending status: completed
depends_on: depends_on:
- graph/flowgraph-class - graph/flowgraph-class
- graph/queries - graph/queries

View File

@@ -1,7 +1,7 @@
--- ---
id: analysis/template-validation id: analysis/template-validation
name: Implement validateTemplate and validatePreconditions name: Implement validateTemplate and validatePreconditions
status: pending status: completed
depends_on: depends_on:
- analysis/type-compat - analysis/type-compat
- graph/queries - graph/queries

View File

@@ -1,7 +1,7 @@
--- ---
id: api/public-exports id: api/public-exports
name: Wire barrel exports and verify exports map resolves correctly for all sub-paths name: Wire barrel exports and verify exports map resolves correctly for all sub-paths
status: pending status: completed
depends_on: depends_on:
- review/reactive-and-hosts - review/reactive-and-hosts
- analysis/type-compat - analysis/type-compat

View File

@@ -1,7 +1,7 @@
--- ---
id: graph/construction-call id: graph/construction-call
name: Implement call graph construction (fromCallEvents, updateFromEvent, addCall, addDependency, updateStatus) name: Implement call graph construction (fromCallEvents, updateFromEvent, addCall, addDependency, updateStatus)
status: pending status: completed
depends_on: depends_on:
- graph/flowgraph-class - graph/flowgraph-class
- schema/enums - schema/enums

View File

@@ -1,7 +1,7 @@
--- ---
id: graph/construction-json id: graph/construction-json
name: Implement fromJSON and export/toJSON serialization for FlowGraph name: Implement fromJSON and export/toJSON serialization for FlowGraph
status: pending status: completed
depends_on: depends_on:
- graph/flowgraph-class - graph/flowgraph-class
- schema/graph-schemas - schema/graph-schemas

View File

@@ -1,7 +1,7 @@
--- ---
id: host/reactive id: host/reactive
name: Implement ReactiveHostConfig — render ujsx template to reactive execution engine name: Implement ReactiveHostConfig — render ujsx template to reactive execution engine
status: pending status: completed
depends_on: depends_on:
- component/operation - component/operation
- component/sequential - component/sequential

View File

@@ -1,7 +1,7 @@
--- ---
id: meta/analysis-layer id: meta/analysis-layer
name: Complete analysis layer — type compatibility, ordering, template/graph validation, defaults name: Complete analysis layer — type compatibility, ordering, template/graph validation, defaults
status: pending status: completed
depends_on: depends_on:
- analysis/type-compat - analysis/type-compat
- analysis/build-type-edges - analysis/build-type-edges

View File

@@ -1,7 +1,7 @@
--- ---
id: meta/graph-layer id: meta/graph-layer
name: Complete graph layer — FlowGraph class, all construction paths, queries, validation name: Complete graph layer — FlowGraph class, all construction paths, queries, validation
status: pending status: completed
depends_on: depends_on:
- graph/flowgraph-class - graph/flowgraph-class
- graph/construction-operation - graph/construction-operation

View File

@@ -1,7 +1,7 @@
--- ---
id: meta/reactive-layer id: meta/reactive-layer
name: Complete reactive execution layer — WorkflowRoot, node-status, maxConcurrency, retries name: Complete reactive execution layer — WorkflowRoot, node-status, maxConcurrency, retries
status: pending status: completed
depends_on: depends_on:
- reactive/workflow-root - reactive/workflow-root
- reactive/node-status - reactive/node-status
@@ -19,11 +19,11 @@ Meta task that clusters all reactive execution tasks. Once complete, the reactiv
## Acceptance Criteria ## Acceptance Criteria
- [ ] All reactive tasks completed - [x] All reactive tasks completed
- [ ] WorkflowReactiveRoot implements EventLogProjection correctly - [x] WorkflowReactiveRoot implements EventLogProjection correctly
- [ ] Preconditions drive automatic state transitions - [x] Preconditions drive automatic state transitions
- [ ] Failure follows dependency edges, not structural scope - [x] Failure follows dependency edges, not structural scope
- [ ] dispose() prevents signal leaks - [x] dispose() prevents signal leaks
## References ## References
@@ -31,8 +31,8 @@ Meta task that clusters all reactive execution tasks. Once complete, the reactiv
## Notes ## Notes
> To be filled by implementation agent All four dependent tasks (reactive/workflow-root, reactive/node-status, reactive/max-concurrency, reactive/retry-semantics) were already completed. This meta-task verifies the integrated reactive execution layer works correctly end-to-end.
## Summary ## Summary
> To be filled on completion Verified the complete reactive execution layer: WorkflowReactiveRoot implements EventLogProjection correctly with signal-backed status, computed preconditions drive automatic state transitions, failure propagation follows dependency edges (not structural scope), maxConcurrency semaphore works for Parallel groups, retry semantics preserve full event history, and dispose() prevents signal leaks. All 665 tests pass (129 reactive-specific).

View File

@@ -1,7 +1,7 @@
--- ---
id: review/reactive-and-hosts id: review/reactive-and-hosts
name: Review reactive execution and host configs — signal graph, preconditions, HostConfig implementations name: Review reactive execution and host configs — signal graph, preconditions, HostConfig implementations
status: pending status: completed
depends_on: depends_on:
- reactive/workflow-root - reactive/workflow-root
- reactive/node-status - reactive/node-status

View File

@@ -1,7 +1,7 @@
--- ---
id: reactive/max-concurrency id: reactive/max-concurrency
name: Implement maxConcurrency semaphore for Parallel groups name: Implement maxConcurrency semaphore for Parallel groups
status: pending status: completed
depends_on: depends_on:
- reactive/node-status - reactive/node-status
- component/parallel - component/parallel

View File

@@ -1,7 +1,7 @@
--- ---
id: reactive/node-status id: reactive/node-status
name: Implement node status signal management and computed preconditions + blockedByFailure name: Implement node status signal management and computed preconditions + blockedByFailure
status: pending status: completed
depends_on: depends_on:
- reactive/workflow-root - reactive/workflow-root
- schema/enums - schema/enums

View File

@@ -1,7 +1,7 @@
--- ---
id: reactive/retry-semantics id: reactive/retry-semantics
name: Implement retry semantics — event log append with new requestId, status projection respects retries name: Implement retry semantics — event log append with new requestId, status projection respects retries
status: pending status: completed
depends_on: depends_on:
- reactive/workflow-root - reactive/workflow-root
scope: narrow scope: narrow

View File

@@ -1,7 +1,7 @@
--- ---
id: review/complete-library id: review/complete-library
name: Final review — validate full library against architecture docs, build, and exports name: Final review — validate full library against architecture docs, build, and exports
status: pending status: completed
depends_on: depends_on:
- api/public-exports - api/public-exports
- review/foundation - review/foundation

View File

@@ -1,7 +1,7 @@
--- ---
id: review/foundation id: review/foundation
name: Review foundation layer — schemas, errors, FlowGraph class, construction name: Review foundation layer — schemas, errors, FlowGraph class, construction
status: pending status: completed
depends_on: depends_on:
- schema/enums - schema/enums
- schema/node-attrs - schema/node-attrs

View File

@@ -1,6 +1,8 @@
import { describe, it, expect } from "vitest"; import { describe, it, expect } from "vitest";
import { Type, type TSchema } from "@alkdev/typebox"; import { Type, type TSchema } from "@alkdev/typebox";
import { typeCompat, type TypeCompatResult, type TypeMismatch } from "../../src/analysis/type-compat.js"; import { typeCompat, buildTypeEdges, type TypeCompatResult, type TypeMismatch } from "../../src/analysis/type-compat.js";
import { FlowGraph } from "../../src/graph/construction.js";
import type { OperationSpec } from "../../src/graph/construction.js";
describe("typeCompat", () => { describe("typeCompat", () => {
describe("exact match", () => { describe("exact match", () => {
@@ -411,4 +413,111 @@ describe("typeCompat", () => {
expect(typeof mismatch.actual).toBe("string"); expect(typeof mismatch.actual).toBe("string");
}); });
}); });
});
describe("buildTypeEdges", () => {
it("adds compatible edges for matching output→input schemas", () => {
const fg = new FlowGraph();
fg.addOperation({ name: "extract", namespace: "task", version: "1.0.0", type: "query", inputSchema: Type.Object({ raw: Type.String() }), outputSchema: Type.Object({ text: Type.String() }) });
fg.addOperation({ name: "classify", namespace: "task", version: "1.0.0", type: "query", inputSchema: Type.Object({ text: Type.String() }), outputSchema: Type.Object({ label: Type.String(), score: Type.Number() }) });
buildTypeEdges(fg);
expect(fg.hasEdge("task.extract", "task.classify")).toBe(true);
const attrs = fg.getEdgeAttributes("task.extract", "task.classify") as Record<string, unknown>;
expect(attrs.edgeType).toBe("typed");
expect(attrs.compatible).toBe(true);
});
it("adds incompatible edges when schemas mismatch", () => {
const fg = new FlowGraph();
fg.addOperation({ name: "classify", namespace: "task", version: "1.0.0", type: "query", inputSchema: Type.Object({ text: Type.String() }), outputSchema: Type.Object({ label: Type.String(), score: Type.Number() }) });
fg.addOperation({ name: "count", namespace: "task", version: "1.0.0", type: "query", inputSchema: Type.Object({ count: Type.Number() }), outputSchema: Type.Object({ result: Type.Number() }) });
buildTypeEdges(fg);
expect(fg.hasEdge("task.classify", "task.count")).toBe(true);
const attrs = fg.getEdgeAttributes("task.classify", "task.count") as Record<string, unknown>;
expect(attrs.edgeType).toBe("typed");
expect(attrs.compatible).toBe(false);
expect(attrs.mismatches).toBeDefined();
});
it("incompatible edges include mismatches array", () => {
const fg = new FlowGraph();
fg.addOperation({ name: "a", namespace: "op", version: "1.0.0", type: "query", inputSchema: Type.Object({ x: Type.String() }), outputSchema: Type.Object({ value: Type.String() }) });
fg.addOperation({ name: "b", namespace: "op", version: "1.0.0", type: "query", inputSchema: Type.Object({ value: Type.Number() }), outputSchema: Type.Object({ z: Type.Boolean() }) });
buildTypeEdges(fg);
const attrs = fg.getEdgeAttributes("op.a", "op.b") as Record<string, unknown>;
expect(attrs.compatible).toBe(false);
expect(Array.isArray(attrs.mismatches)).toBe(true);
expect((attrs.mismatches as Array<unknown>).length).toBeGreaterThan(0);
});
it("does not add edges when either schema is Unknown", () => {
const fg = new FlowGraph();
fg.addOperation({ name: "unk_out", namespace: "op", version: "1.0.0", type: "query", inputSchema: Type.Object({ x: Type.String() }), outputSchema: Type.Unknown() });
fg.addOperation({ name: "unk_in", namespace: "op", version: "1.0.0", type: "query", inputSchema: Type.Unknown(), outputSchema: Type.Object({ y: Type.String() }) });
fg.addOperation({ name: "normal", namespace: "op", version: "1.0.0", type: "query", inputSchema: Type.Object({ y: Type.String() }), outputSchema: Type.Object({ x: Type.String() }) });
buildTypeEdges(fg);
expect(fg.hasEdge("op.unk_out", "op.unk_in")).toBe(false);
expect(fg.hasEdge("op.unk_out", "op.normal")).toBe(false);
expect(fg.hasEdge("op.normal", "op.unk_in")).toBe(false);
});
it("sets detail to namespace.name.output → namespace.name.input for compatible edges", () => {
const fg = new FlowGraph();
fg.addOperation({ name: "extract", namespace: "task", version: "1.0.0", type: "query", inputSchema: Type.Object({ raw: Type.String() }), outputSchema: Type.Object({ text: Type.String() }) });
fg.addOperation({ name: "classify", namespace: "task", version: "1.0.0", type: "query", inputSchema: Type.Object({ text: Type.String() }), outputSchema: Type.Object({ label: Type.String() }) });
buildTypeEdges(fg);
const attrs = fg.getEdgeAttributes("task.extract", "task.classify") as Record<string, unknown>;
expect(attrs.detail).toContain("task.extract.output → task.classify.input");
});
it("is callable after incremental addOperation calls", () => {
const fg = new FlowGraph();
fg.addOperation({ name: "extract", namespace: "op", version: "1.0.0", type: "query", inputSchema: Type.Object({ raw: Type.String() }), outputSchema: Type.Object({ text: Type.String() }) });
buildTypeEdges(fg);
expect(fg.size).toBe(0);
fg.addOperation({ name: "classify", namespace: "op", version: "1.0.0", type: "query", inputSchema: Type.Object({ text: Type.String() }), outputSchema: Type.Object({ label: Type.String() }) });
buildTypeEdges(fg);
expect(fg.hasEdge("op.extract", "op.classify")).toBe(true);
});
it("produces edges for three operations in a pipeline", () => {
const fg = new FlowGraph();
fg.addOperation({ name: "extract", namespace: "task", version: "1.0.0", type: "query", inputSchema: Type.Object({ raw: Type.String() }), outputSchema: Type.Object({ text: Type.String() }) });
fg.addOperation({ name: "classify", namespace: "task", version: "1.0.0", type: "query", inputSchema: Type.Object({ text: Type.String() }), outputSchema: Type.Object({ label: Type.String() }) });
fg.addOperation({ name: "enrich", namespace: "task", version: "1.0.0", type: "query", inputSchema: Type.Object({ label: Type.String() }), outputSchema: Type.Object({ enriched: Type.String() }) });
buildTypeEdges(fg);
expect(fg.hasEdge("task.extract", "task.classify")).toBe(true);
expect(fg.hasEdge("task.classify", "task.enrich")).toBe(true);
expect(fg.hasEdge("task.extract", "task.enrich")).toBe(true);
const e2c = fg.getEdgeAttributes("task.extract", "task.classify") as Record<string, unknown>;
const c2e = fg.getEdgeAttributes("task.classify", "task.enrich") as Record<string, unknown>;
const e2e = fg.getEdgeAttributes("task.extract", "task.enrich") as Record<string, unknown>;
expect(e2c.compatible).toBe(true);
expect(c2e.compatible).toBe(true);
expect(e2e.compatible).toBe(false);
});
it("does not add self-loops", () => {
const fg = new FlowGraph();
fg.addOperation({ name: "a", namespace: "op", version: "1.0.0", type: "query", inputSchema: Type.Object({ x: Type.String() }), outputSchema: Type.Object({ x: Type.String() }) });
buildTypeEdges(fg);
expect(fg.size).toBe(0);
});
it("returns empty graph with no edges for empty graph", () => {
const fg = new FlowGraph();
buildTypeEdges(fg);
expect(fg.order).toBe(0);
expect(fg.size).toBe(0);
});
it("skips edges that would already exist", () => {
const fg = new FlowGraph();
fg.addOperation({ name: "a", namespace: "op", version: "1.0.0", type: "query", inputSchema: Type.Object({ x: Type.String() }), outputSchema: Type.Object({ y: Type.String() }) });
fg.addOperation({ name: "b", namespace: "op", version: "1.0.0", type: "query", inputSchema: Type.Object({ y: Type.String() }), outputSchema: Type.Object({ z: Type.String() }) });
buildTypeEdges(fg);
const sizeAfterFirst = fg.size;
buildTypeEdges(fg);
expect(fg.size).toBe(sizeAfterFirst);
});
}); });

View File

@@ -1,7 +1,251 @@
import { describe, it, expect } from 'vitest'; import { describe, it, expect } from "vitest";
import { Type } from "@alkdev/typebox";
import { h, createHostRoot } from "@alkdev/ujsx";
import { Operation, Sequential, Parallel, Conditional } from "../../src/component/index.js";
import { validatePreconditions, validateTemplate } from "../../src/analysis/workflow.js";
import { FlowGraph } from "../../src/graph/construction.js";
import type { OperationNodeAttrs, OperationEdgeAttrs } from "../../src/schema/index.js";
describe('analysis workflow', () => { type OpGraph = FlowGraph<typeof import("../../src/schema/node.js").OperationNodeAttrs, typeof import("../../src/schema/edge.js").OperationEdgeAttrs>;
it('placeholder', () => {
expect(true).toBe(true); function createOperationGraph(
specs: Array<{
name: string;
namespace?: string;
inputSchema?: Record<string, unknown>;
outputSchema?: Record<string, unknown>;
}>,
): OpGraph {
const graph = new FlowGraph() as OpGraph;
for (const spec of specs) {
const ns = spec.namespace ?? "test";
const key = `${ns}.${spec.name}`;
graph.addNode(key, {
name: spec.name,
namespace: ns,
version: "1.0.0",
type: "query",
inputSchema: spec.inputSchema ?? Type.Object({}),
outputSchema: spec.outputSchema ?? Type.Object({}),
} as OperationNodeAttrs);
}
return graph;
}
function createOperationGraphWithEdges(
specs: Array<{
name: string;
namespace?: string;
inputSchema?: Record<string, unknown>;
outputSchema?: Record<string, unknown>;
}>,
edges?: Array<{ source: string; target: string; compatible: boolean; mismatches?: Array<{ path: string; expected: string; actual: string }> }>,
): OpGraph {
const graph = createOperationGraph(specs);
if (edges) {
for (const edge of edges) {
graph.addTypedEdge(edge.source, edge.target, {
compatible: edge.compatible,
mismatches: edge.mismatches,
});
}
}
return graph;
}
describe("validatePreconditions", () => {
it("returns empty for valid graph with no required fields", () => {
const graph = createOperationGraph([
{ name: "a", outputSchema: Type.Object({ x: Type.Number() }) },
{ name: "b", inputSchema: Type.Object({}) },
]);
graph.addEdge("test.a", "test.b");
const errors = validatePreconditions(graph);
expect(errors).toEqual([]);
});
it("returns empty when all required input fields are provided by predecessors", () => {
const graph = createOperationGraph([
{ name: "a", outputSchema: Type.Object({ x: Type.Number(), y: Type.String() }) },
{ name: "b", inputSchema: Type.Object({ x: Type.Number() }) },
]);
graph.addEdge("test.a", "test.b");
const errors = validatePreconditions(graph);
expect(errors).toEqual([]);
});
it("returns errors when required input field is not provided by any predecessor", () => {
const graph = createOperationGraph([
{ name: "a", outputSchema: Type.Object({ x: Type.Number() }) },
{ name: "b", inputSchema: Type.Object({ x: Type.Number(), y: Type.String() }) },
]);
graph.addEdge("test.a", "test.b");
const errors = validatePreconditions(graph);
expect(errors.length).toBeGreaterThan(0);
const fieldNames = errors.map((e) => e.field);
expect(fieldNames).toContain("y");
});
it("returns errors when node with required fields has no predecessors", () => {
const graph = createOperationGraph([
{ name: "a", inputSchema: Type.Object({ x: Type.Number() }), outputSchema: Type.Object({}) },
]);
const errors = validatePreconditions(graph);
expect(errors.length).toBeGreaterThan(0);
expect(errors[0]!.message).toContain("no predecessor");
});
it("collects provided fields from multiple predecessors", () => {
const graph = createOperationGraph([
{ name: "a", outputSchema: Type.Object({ x: Type.Number() }) },
{ name: "c", outputSchema: Type.Object({ y: Type.String() }) },
{ name: "b", inputSchema: Type.Object({ x: Type.Number(), y: Type.String() }) },
]);
graph.addEdge("test.a", "test.b");
graph.addEdge("test.c", "test.b");
const errors = validatePreconditions(graph);
expect(errors).toEqual([]);
});
it("returns empty for graph with no nodes", () => {
const graph = new FlowGraph() as OpGraph;
const errors = validatePreconditions(graph);
expect(errors).toEqual([]);
});
});
describe("validateTemplate", () => {
it("returns empty for valid template with all operations in graph", () => {
const graph = createOperationGraphWithEdges([
{ name: "a", outputSchema: Type.Object({ x: Type.Number() }) },
{ name: "b", inputSchema: Type.Object({ x: Type.Number() }) },
], [
{ source: "test.a", target: "test.b", compatible: true },
]);
const template = h(Sequential, {},
h(Operation, { name: "test.a" }),
h(Operation, { name: "test.b" }),
);
const errors = validateTemplate(template, graph);
expect(errors).toEqual([]);
});
it("returns error when operation name is not in operation graph", () => {
const graph = createOperationGraph([
{ name: "a" },
]);
const template = h(Sequential, {},
h(Operation, { name: "test.a" }),
h(Operation, { name: "test.missing" }),
);
const errors = validateTemplate(template, graph);
expect(errors.length).toBeGreaterThan(0);
const missingErrors = errors.filter(
(e) => e.type === "graph" && (e as { type: string; category: string; details: unknown }).category === "orphan-node"
&& JSON.stringify((e as { details: unknown }).details).includes("missing"),
);
expect(missingErrors.length).toBeGreaterThan(0);
});
it("returns error for type incompatibility between sequential operations", () => {
const graph = createOperationGraphWithEdges([
{ name: "a", outputSchema: Type.Object({ x: Type.String() }) },
{ name: "b", inputSchema: Type.Object({ x: Type.Number() }) },
], [
{ source: "test.a", target: "test.b", compatible: false, mismatches: [{ path: "/x", expected: "number", actual: "string" }] },
]);
const template = h(Sequential, {},
h(Operation, { name: "test.a" }),
h(Operation, { name: "test.b" }),
);
const errors = validateTemplate(template, graph);
const typeErrors = errors.filter((e) => e.type === "type-compat");
expect(typeErrors.length).toBeGreaterThan(0);
});
it("returns empty for single-operation template", () => {
const graph = createOperationGraph([
{ name: "a" },
]);
const template = h(Operation, { name: "test.a" });
const errors = validateTemplate(template, graph);
expect(errors).toEqual([]);
});
it("detects unreachable nodes", () => {
const graph = createOperationGraphWithEdges([
{ name: "a" },
{ name: "b" },
{ name: "c" },
]);
const template = h(Sequential, {},
h(Operation, { name: "test.a" }),
h(Operation, { name: "test.b" }),
);
const errors = validateTemplate(template, graph);
const reachableErrors = errors.filter(
(e) => e.type === "graph" && JSON.stringify((e as { details: unknown }).details).includes("not reachable"),
);
expect(reachableErrors.length).toBe(0);
});
it("returns empty for valid parallel template", () => {
const graph = createOperationGraphWithEdges([
{ name: "a" },
{ name: "b" },
{ name: "c" },
]);
const template = h(Sequential, {},
h(Operation, { name: "test.a" }),
h(Parallel, {},
h(Operation, { name: "test.b" }),
h(Operation, { name: "test.c" }),
),
);
const errors = validateTemplate(template, graph);
expect(errors).toEqual([]);
});
it("handles template with conditional", () => {
const graph = createOperationGraphWithEdges([
{ name: "a", outputSchema: Type.Object({ result: Type.Boolean() }) },
{ name: "b" },
{ name: "c" },
]);
const template = h(Sequential, {},
h(Operation, { name: "test.a" }),
h(Conditional, { test: "test.a" },
h(Operation, { name: "test.b" }),
),
h(Operation, { name: "test.c" }),
);
const errors = validateTemplate(template, graph);
expect(errors).toEqual([]);
});
it("template validation is advisory - never throws", () => {
const graph = new FlowGraph() as OpGraph;
const template = h(Sequential, {},
h(Operation, { name: "nonexistent" }),
);
const errors = validateTemplate(template, graph);
expect(Array.isArray(errors)).toBe(true);
});
it("detects orphan node with no edges in multi-node template", () => {
const graph = createOperationGraphWithEdges([
{ name: "a" },
{ name: "b" },
]);
const template = h(Parallel, {},
h(Operation, { name: "test.a" }),
h(Operation, { name: "test.b" }),
);
const errors = validateTemplate(template, graph);
const orphanErrors = errors.filter(
(e) => e.type === "graph" && (e as { type: string; category: string }).category === "orphan-node"
&& JSON.stringify((e as { details: unknown }).details).includes("no edges"),
);
expect(orphanErrors.length).toBeGreaterThan(0);
}); });
}); });

77
test/exports.test.ts Normal file
View File

@@ -0,0 +1,77 @@
import { describe, it, expect } from "vitest";
import { FlowGraph, typeCompat, CycleError, OperationNodeAttrs, Operation, GraphologyHostConfig, WorkflowReactiveRoot, topologicalOrder } from "../src/index.js";
import { buildTypeEdges as buildTypeEdgesGraph } from "../src/graph/index.js";
import { CallStatusEnum, NodeStatusEnum, EdgeTypeEnum, CallResultSchema, OperationEdgeAttrs, CallEdgeAttrs, TemplateEdgeAttrs, CallNodeAttrs } from "../src/schema/index.js";
import { Sequential, Parallel, Conditional, Map } from "../src/component/index.js";
import { ReactiveHostConfig } from "../src/host/index.js";
import { WorkflowReactiveRoot as ReactiveRootFromReactive, computePreconditions, computeBlockedByFailure } from "../src/reactive/index.js";
import { typeCompat as typeCompatAnalysis, buildTypeEdges, validateGraph, validateTemplate, topologicalOrder as topologicalOrderAnalysis, parallelGroups, criticalPath, reachableFrom } from "../src/analysis/index.js";
import { FlowgraphError, ConstructionError, CycleError as CycleErrorFromError, InvalidTransitionError } from "../src/error/index.js";
describe("Sub-path imports", () => {
it("@alkdev/flowgraph → all public types", () => {
expect(FlowGraph).toBeDefined();
expect(typeCompat).toBeDefined();
expect(CycleError).toBeDefined();
expect(OperationNodeAttrs).toBeDefined();
expect(Operation).toBeDefined();
expect(GraphologyHostConfig).toBeDefined();
expect(WorkflowReactiveRoot).toBeDefined();
expect(topologicalOrder).toBeDefined();
});
it("@alkdev/flowgraph/graph → FlowGraph, FlowGraphOptions", () => {
expect(FlowGraph).toBeDefined();
expect(buildTypeEdgesGraph).toBeDefined();
});
it("@alkdev/flowgraph/schema → all schemas, enums, types, CallResult", () => {
expect(OperationNodeAttrs).toBeDefined();
expect(CallNodeAttrs).toBeDefined();
expect(OperationEdgeAttrs).toBeDefined();
expect(CallEdgeAttrs).toBeDefined();
expect(TemplateEdgeAttrs).toBeDefined();
expect(CallResultSchema).toBeDefined();
expect(CallStatusEnum).toBeDefined();
expect(NodeStatusEnum).toBeDefined();
expect(EdgeTypeEnum).toBeDefined();
});
it("@alkdev/flowgraph/component → Operation, Sequential, Parallel, Conditional, Map", () => {
expect(Operation).toBeDefined();
expect(Sequential).toBeDefined();
expect(Parallel).toBeDefined();
expect(Conditional).toBeDefined();
expect(Map).toBeDefined();
});
it("@alkdev/flowgraph/host → GraphologyHostConfig, ReactiveHostConfig", () => {
expect(GraphologyHostConfig).toBeDefined();
expect(ReactiveHostConfig).toBeDefined();
});
it("@alkdev/flowgraph/reactive → WorkflowReactiveRoot, EventLogProjection", () => {
expect(ReactiveRootFromReactive).toBeDefined();
expect(computePreconditions).toBeDefined();
expect(computeBlockedByFailure).toBeDefined();
});
it("@alkdev/flowgraph/analysis → typeCompat, buildTypeEdges, validateGraph, validateTemplate, topologicalOrder, parallelGroups, criticalPath, reachableFrom", () => {
expect(typeCompatAnalysis).toBeDefined();
expect(buildTypeEdges).toBeDefined();
expect(validateGraph).toBeDefined();
expect(validateTemplate).toBeDefined();
expect(topologicalOrderAnalysis).toBeDefined();
expect(parallelGroups).toBeDefined();
expect(criticalPath).toBeDefined();
expect(reachableFrom).toBeDefined();
});
it("@alkdev/flowgraph/error → all error classes", () => {
expect(FlowgraphError).toBeDefined();
expect(ConstructionError).toBeDefined();
expect(CycleErrorFromError).toBeDefined();
expect(InvalidTransitionError).toBeDefined();
});
});

View File

@@ -1,13 +1,15 @@
import { describe, it, expect } from "vitest"; import { describe, it, expect } from "vitest";
import { Type } from "@alkdev/typebox"; import { Type } from "@alkdev/typebox";
import { FlowGraph, buildTypeEdges } from "../../src/graph/construction.js"; import { FlowGraph, buildTypeEdges } from "../../src/graph/construction.js";
import type { OperationSpec } from "../../src/graph/construction.js"; import type { OperationSpec, CallEventMapValue } from "../../src/graph/construction.js";
import { import {
DuplicateNodeError, DuplicateNodeError,
DuplicateEdgeError, DuplicateEdgeError,
NodeNotFoundError, NodeNotFoundError,
CycleError, CycleError,
InvalidTransitionError,
} from "../../src/error/index.js"; } from "../../src/error/index.js";
import type { CallStatus } from "../../src/schema/enums.js";
describe("FlowGraph constructor", () => { describe("FlowGraph constructor", () => {
it("creates an empty graph", () => { it("creates an empty graph", () => {
@@ -300,12 +302,13 @@ describe("FlowGraph query methods", () => {
}); });
describe("FlowGraph static stubs", () => { describe("FlowGraph static stubs", () => {
it("fromCallEvents throws not implemented", () => { it("fromCallEvents returns empty graph for empty events", () => {
expect(() => FlowGraph.fromCallEvents([])).toThrow("not implemented"); const graph = FlowGraph.fromCallEvents([]);
expect(graph.order).toBe(0);
expect(graph.size).toBe(0);
}); });
it("fromJSON throws not implemented", () => { it("fromJSON throws not implemented", () => {
expect(() => FlowGraph.fromJSON({})).toThrow("not implemented"); expect(() => FlowGraph.fromJSON({} as never)).toThrow();
}); });
}); });
@@ -638,4 +641,520 @@ describe("FlowGraph cycle detection", () => {
fg.addEdge("a", "c"); fg.addEdge("a", "c");
expect(() => fg.addEdge("b", "c")).not.toThrow(); expect(() => fg.addEdge("b", "c")).not.toThrow();
}); });
});
describe("FlowGraph.fromCallEvents", () => {
const requestedEvent: CallEventMapValue = {
type: "call.requested",
requestId: "req-1",
operationId: "task.classify",
input: { text: "hello" },
timestamp: "2026-01-01T00:00:00Z",
};
const requestedWithParent: CallEventMapValue = {
type: "call.requested",
requestId: "req-2",
operationId: "task.enrich",
input: { label: "greeting" },
timestamp: "2026-01-01T00:00:01Z",
parentRequestId: "req-1",
};
const respondedEvent: CallEventMapValue = {
type: "call.responded",
requestId: "req-1",
output: { label: "greeting" },
timestamp: "2026-01-01T00:00:02Z",
};
const errorEvent: CallEventMapValue = {
type: "call.error",
requestId: "req-1",
error: { code: "INTERNAL", message: "Something went wrong" },
timestamp: "2026-01-01T00:00:03Z",
};
const abortedEvent: CallEventMapValue = {
type: "call.aborted",
requestId: "req-1",
timestamp: "2026-01-01T00:00:04Z",
};
const completedEvent: CallEventMapValue = {
type: "call.completed",
requestId: "req-1",
output: { label: "greeting" },
timestamp: "2026-01-01T00:00:05Z",
};
it("adds node from call.requested event", () => {
const graph = FlowGraph.fromCallEvents([requestedEvent]);
expect(graph.order).toBe(1);
expect(graph.hasNode("req-1")).toBe(true);
const attrs = graph.getNodeAttributes("req-1") as Record<string, unknown>;
expect(attrs.status).toBe("pending");
expect(attrs.operationId).toBe("task.classify");
});
it("creates triggered edge from parentRequestId", () => {
const graph = FlowGraph.fromCallEvents([requestedEvent, requestedWithParent]);
expect(graph.order).toBe(2);
expect(graph.hasNode("req-2")).toBe(true);
expect(graph.hasEdge("req-1", "req-2")).toBe(true);
const edgeAttrs = graph.getEdgeAttributes("req-1", "req-2") as Record<string, unknown>;
expect(edgeAttrs.edgeType).toBe("triggered");
});
it("updates status to completed on call.responded", () => {
const graph = FlowGraph.fromCallEvents([requestedEvent, respondedEvent]);
const attrs = graph.getNodeAttributes("req-1") as Record<string, unknown>;
expect(attrs.status).toBe("completed");
expect(attrs.output).toEqual({ label: "greeting" });
expect(attrs.completedAt).toBe("2026-01-01T00:00:02Z");
});
it("updates status to failed on call.error", () => {
const graph = FlowGraph.fromCallEvents([requestedEvent, errorEvent]);
const attrs = graph.getNodeAttributes("req-1") as Record<string, unknown>;
expect(attrs.status).toBe("failed");
expect(attrs.error).toEqual({ code: "INTERNAL", message: "Something went wrong" });
expect(attrs.completedAt).toBe("2026-01-01T00:00:03Z");
});
it("updates status to aborted on call.aborted", () => {
const graph = FlowGraph.fromCallEvents([requestedEvent, abortedEvent]);
const attrs = graph.getNodeAttributes("req-1") as Record<string, unknown>;
expect(attrs.status).toBe("aborted");
expect(attrs.completedAt).toBe("2026-01-01T00:00:04Z");
});
it("updates status to completed on call.completed", () => {
const graph = FlowGraph.fromCallEvents([requestedEvent, completedEvent]);
const attrs = graph.getNodeAttributes("req-1") as Record<string, unknown>;
expect(attrs.status).toBe("completed");
expect(attrs.completedAt).toBe("2026-01-01T00:00:05Z");
});
it("is idempotent — duplicate events have no effect", () => {
const graph = FlowGraph.fromCallEvents([requestedEvent, requestedEvent, respondedEvent, respondedEvent]);
expect(graph.order).toBe(1);
const attrs = graph.getNodeAttributes("req-1") as Record<string, unknown>;
expect(attrs.status).toBe("completed");
});
it("ignores responded/error/aborted for unknown requestId", () => {
const graph = FlowGraph.fromCallEvents([respondedEvent, errorEvent, abortedEvent]);
expect(graph.order).toBe(0);
});
it("creates node for unknown operationId", () => {
const unknownOpEvent: CallEventMapValue = {
type: "call.requested",
requestId: "req-unknown",
operationId: "unknown.op",
input: {},
timestamp: "2026-01-01T00:00:00Z",
};
const graph = FlowGraph.fromCallEvents([unknownOpEvent]);
expect(graph.order).toBe(1);
const attrs = graph.getNodeAttributes("req-unknown") as Record<string, unknown>;
expect(attrs.status).toBe("pending");
expect(attrs.operationId).toBe("unknown.op");
});
it("processes full event sequence", () => {
const req1: CallEventMapValue = {
type: "call.requested",
requestId: "req-parent",
operationId: "task.parent",
input: {},
timestamp: "2026-01-01T00:00:00Z",
};
const req2: CallEventMapValue = {
type: "call.requested",
requestId: "req-child",
operationId: "task.child",
input: {},
timestamp: "2026-01-01T00:00:01Z",
parentRequestId: "req-parent",
};
const resp: CallEventMapValue = {
type: "call.responded",
requestId: "req-parent",
output: "done",
timestamp: "2026-01-01T00:00:02Z",
};
const graph = FlowGraph.fromCallEvents([req1, req2, resp]);
expect(graph.order).toBe(2);
expect(graph.hasEdge("req-parent", "req-child")).toBe(true);
const parentAttrs = graph.getNodeAttributes("req-parent") as Record<string, unknown>;
expect(parentAttrs.status).toBe("completed");
const childAttrs = graph.getNodeAttributes("req-child") as Record<string, unknown>;
expect(childAttrs.status).toBe("pending");
});
it("stores identity and startedAt from call.requested", () => {
const event: CallEventMapValue = {
type: "call.requested",
requestId: "req-id",
operationId: "task.op",
input: {},
timestamp: "2026-01-01T00:00:00Z",
identity: { id: "user-1", scopes: ["read"] },
startedAt: "2026-01-01T00:00:01Z",
};
const graph = FlowGraph.fromCallEvents([event]);
const attrs = graph.getNodeAttributes("req-id") as Record<string, unknown>;
expect(attrs.identity).toEqual({ id: "user-1", scopes: ["read"] });
expect(attrs.startedAt).toBe("2026-01-01T00:00:01Z");
});
it("skips triggered edge if parent node does not exist", () => {
const orphanEvent: CallEventMapValue = {
type: "call.requested",
requestId: "req-orphan",
operationId: "task.child",
input: {},
timestamp: "2026-01-01T00:00:00Z",
parentRequestId: "req-nonexistent",
};
const graph = FlowGraph.fromCallEvents([orphanEvent]);
expect(graph.order).toBe(1);
expect(graph.hasNode("req-orphan")).toBe(true);
expect(graph.size).toBe(0);
});
});
describe("FlowGraph.updateFromEvent", () => {
it("processes single event for real-time pattern", () => {
const graph = new FlowGraph();
graph.updateFromEvent({
type: "call.requested",
requestId: "req-1",
operationId: "task.classify",
input: { text: "hello" },
timestamp: "2026-01-01T00:00:00Z",
});
expect(graph.hasNode("req-1")).toBe(true);
graph.updateFromEvent({
type: "call.responded",
requestId: "req-1",
output: { label: "hi" },
timestamp: "2026-01-01T00:00:02Z",
});
const attrs = graph.getNodeAttributes("req-1") as Record<string, unknown>;
expect(attrs.status).toBe("completed");
expect(attrs.output).toEqual({ label: "hi" });
});
it("ignores events for unknown requestId", () => {
const graph = new FlowGraph();
graph.updateFromEvent({
type: "call.responded",
requestId: "unknown",
output: "x",
timestamp: "2026-01-01T00:00:00Z",
});
expect(graph.order).toBe(0);
});
it("ignores terminal event re-processing", () => {
const graph = new FlowGraph();
graph.updateFromEvent({
type: "call.requested",
requestId: "req-1",
operationId: "task.op",
input: {},
timestamp: "2026-01-01T00:00:00Z",
});
graph.updateFromEvent({
type: "call.responded",
requestId: "req-1",
output: "done",
timestamp: "2026-01-01T00:00:01Z",
});
graph.updateFromEvent({
type: "call.error",
requestId: "req-1",
error: { code: "X", message: "Y" },
timestamp: "2026-01-01T00:00:02Z",
});
const attrs = graph.getNodeAttributes("req-1") as Record<string, unknown>;
expect(attrs.status).toBe("completed");
});
});
describe("FlowGraph.addCall", () => {
it("adds a call node", () => {
const graph = new FlowGraph();
graph.addCall({
requestId: "req-1",
operationId: "task.classify",
status: "pending",
input: { text: "hello" },
});
expect(graph.hasNode("req-1")).toBe(true);
const attrs = graph.getNodeAttributes("req-1") as Record<string, unknown>;
expect(attrs.status).toBe("pending");
});
it("adds triggered edge when parentRequestId is present", () => {
const graph = new FlowGraph();
graph.addCall({
requestId: "req-parent",
operationId: "task.parent",
status: "pending",
input: {},
});
graph.addCall({
requestId: "req-child",
operationId: "task.child",
status: "pending",
input: {},
parentRequestId: "req-parent",
});
expect(graph.hasEdge("req-parent", "req-child")).toBe(true);
const edgeAttrs = graph.getEdgeAttributes("req-parent", "req-child") as Record<string, unknown>;
expect(edgeAttrs.edgeType).toBe("triggered");
});
it("is idempotent — duplicate addCall is ignored", () => {
const graph = new FlowGraph();
graph.addCall({
requestId: "req-1",
operationId: "task.op",
status: "pending",
input: {},
});
graph.addCall({
requestId: "req-1",
operationId: "task.op",
status: "pending",
input: {},
});
expect(graph.order).toBe(1);
});
it("does not throw if parentRequestId node does not exist", () => {
const graph = new FlowGraph();
graph.addCall({
requestId: "req-child",
operationId: "task.child",
status: "pending",
input: {},
parentRequestId: "nonexistent",
});
expect(graph.hasNode("req-child")).toBe(true);
expect(graph.size).toBe(0);
});
});
describe("FlowGraph.addDependency", () => {
it("creates depends_on edge", () => {
const graph = new FlowGraph();
graph.addCall({ requestId: "req-1", operationId: "task.a", status: "pending", input: {} });
graph.addCall({ requestId: "req-2", operationId: "task.b", status: "pending", input: {} });
graph.addDependency("req-1", "req-2");
const edgeKey = "req-1->req-2:depends_on";
expect(graph.graph.hasEdge(edgeKey)).toBe(true);
const attrs = graph.graph.getEdgeAttributes(edgeKey) as Record<string, unknown>;
expect(attrs.edgeType).toBe("depends_on");
});
it("is idempotent — duplicate addDependency is ignored", () => {
const graph = new FlowGraph();
graph.addCall({ requestId: "req-1", operationId: "task.a", status: "pending", input: {} });
graph.addCall({ requestId: "req-2", operationId: "task.b", status: "pending", input: {} });
graph.addDependency("req-1", "req-2");
graph.addDependency("req-1", "req-2");
expect(graph.graph.hasEdge("req-1->req-2:depends_on")).toBe(true);
});
it("throws NodeNotFoundError if source doesn't exist", () => {
const graph = new FlowGraph();
graph.addCall({ requestId: "req-2", operationId: "task.b", status: "pending", input: {} });
expect(() => graph.addDependency("missing", "req-2")).toThrow(NodeNotFoundError);
});
it("throws NodeNotFoundError if target doesn't exist", () => {
const graph = new FlowGraph();
graph.addCall({ requestId: "req-1", operationId: "task.a", status: "pending", input: {} });
expect(() => graph.addDependency("req-1", "missing")).toThrow(NodeNotFoundError);
});
it("throws CycleError if adding would create cycle", () => {
const graph = new FlowGraph();
graph.addCall({ requestId: "req-1", operationId: "task.a", status: "pending", input: {} });
graph.addCall({ requestId: "req-2", operationId: "task.b", status: "pending", input: {} });
graph.addCall({ requestId: "req-3", operationId: "task.c", status: "pending", input: {} });
graph.addEdge("req-1", "req-2");
graph.addEdge("req-2", "req-3");
expect(() => graph.addDependency("req-3", "req-1")).toThrow(CycleError);
});
});
describe("FlowGraph.updateStatus", () => {
it("transitions pending to running", () => {
const graph = new FlowGraph();
graph.addCall({ requestId: "req-1", operationId: "task.op", status: "pending", input: {} });
graph.updateStatus("req-1", "running");
const attrs = graph.getNodeAttributes("req-1") as Record<string, unknown>;
expect(attrs.status).toBe("running");
});
it("transitions running to completed", () => {
const graph = new FlowGraph();
graph.addCall({ requestId: "req-1", operationId: "task.op", status: "pending", input: {} });
graph.updateStatus("req-1", "running");
graph.updateStatus("req-1", "completed", { completedAt: "2026-01-01T00:00:01Z" });
const attrs = graph.getNodeAttributes("req-1") as Record<string, unknown>;
expect(attrs.status).toBe("completed");
expect(attrs.completedAt).toBe("2026-01-01T00:00:01Z");
});
it("transitions running to failed", () => {
const graph = new FlowGraph();
graph.addCall({ requestId: "req-1", operationId: "task.op", status: "pending", input: {} });
graph.updateStatus("req-1", "running");
graph.updateStatus("req-1", "failed");
const attrs = graph.getNodeAttributes("req-1") as Record<string, unknown>;
expect(attrs.status).toBe("failed");
});
it("transitions pending to aborted", () => {
const graph = new FlowGraph();
graph.addCall({ requestId: "req-1", operationId: "task.op", status: "pending", input: {} });
graph.updateStatus("req-1", "aborted");
const attrs = graph.getNodeAttributes("req-1") as Record<string, unknown>;
expect(attrs.status).toBe("aborted");
});
it("transitions running to aborted", () => {
const graph = new FlowGraph();
graph.addCall({ requestId: "req-1", operationId: "task.op", status: "pending", input: {} });
graph.updateStatus("req-1", "running");
graph.updateStatus("req-1", "aborted");
const attrs = graph.getNodeAttributes("req-1") as Record<string, unknown>;
expect(attrs.status).toBe("aborted");
});
it("is no-op if status is already the target", () => {
const graph = new FlowGraph();
graph.addCall({ requestId: "req-1", operationId: "task.op", status: "pending", input: {} });
graph.updateStatus("req-1", "pending");
const attrs = graph.getNodeAttributes("req-1") as Record<string, unknown>;
expect(attrs.status).toBe("pending");
});
it("throws InvalidTransitionError for completed to running", () => {
const graph = new FlowGraph();
graph.addCall({ requestId: "req-1", operationId: "task.op", status: "pending", input: {} });
graph.updateStatus("req-1", "running");
graph.updateStatus("req-1", "completed", { completedAt: "2026-01-01T00:00:01Z" });
expect(() => graph.updateStatus("req-1", "running")).toThrow(InvalidTransitionError);
});
it("throws InvalidTransitionError for failed to running", () => {
const graph = new FlowGraph();
graph.addCall({ requestId: "req-1", operationId: "task.op", status: "pending", input: {} });
graph.updateStatus("req-1", "running");
graph.updateStatus("req-1", "failed");
expect(() => graph.updateStatus("req-1", "running")).toThrow(InvalidTransitionError);
});
it("throws InvalidTransitionError for aborted to running", () => {
const graph = new FlowGraph();
graph.addCall({ requestId: "req-1", operationId: "task.op", status: "pending", input: {} });
graph.updateStatus("req-1", "aborted");
expect(() => graph.updateStatus("req-1", "running")).toThrow(InvalidTransitionError);
});
it("throws InvalidTransitionError for pending to completed", () => {
const graph = new FlowGraph();
graph.addCall({ requestId: "req-1", operationId: "task.op", status: "pending", input: {} });
expect(() => graph.updateStatus("req-1", "completed")).toThrow(InvalidTransitionError);
});
it("throws InvalidTransitionError for pending to failed", () => {
const graph = new FlowGraph();
graph.addCall({ requestId: "req-1", operationId: "task.op", status: "pending", input: {} });
expect(() => graph.updateStatus("req-1", "failed")).toThrow(InvalidTransitionError);
});
it("throws NodeNotFoundError for unknown requestId", () => {
const graph = new FlowGraph();
expect(() => graph.updateStatus("missing", "running")).toThrow(NodeNotFoundError);
});
it("InvalidTransitionError contains from/to info", () => {
const graph = new FlowGraph();
graph.addCall({ requestId: "req-1", operationId: "task.op", status: "pending", input: {} });
graph.updateStatus("req-1", "running");
graph.updateStatus("req-1", "completed", { completedAt: "2026-01-01T00:00:01Z" });
try {
graph.updateStatus("req-1", "running");
expect.unreachable("should throw");
} catch (e) {
expect(e).toBeInstanceOf(InvalidTransitionError);
const ite = e as InvalidTransitionError;
expect(ite.requestId).toBe("req-1");
expect(ite.from).toBe("completed" as CallStatus);
expect(ite.to).toBe("running" as CallStatus);
}
});
it("merges extra attributes on transition", () => {
const graph = new FlowGraph();
graph.addCall({ requestId: "req-1", operationId: "task.op", status: "pending", input: {} });
graph.updateStatus("req-1", "running");
graph.updateStatus("req-1", "completed", {
output: { result: 42 },
completedAt: "2026-01-01T00:00:01Z",
});
const attrs = graph.getNodeAttributes("req-1") as Record<string, unknown>;
expect(attrs.status).toBe("completed");
expect(attrs.output).toEqual({ result: 42 });
expect(attrs.completedAt).toBe("2026-01-01T00:00:01Z");
});
});
describe("FlowGraph.updateCall", () => {
it("partially merges call attributes", () => {
const graph = new FlowGraph();
graph.addCall({
requestId: "req-1",
operationId: "task.op",
status: "pending",
input: {},
});
graph.updateCall("req-1", { output: "some result" });
const attrs = graph.getNodeAttributes("req-1") as Record<string, unknown>;
expect(attrs.output).toBe("some result");
expect(attrs.status).toBe("pending");
});
it("throws NodeNotFoundError for unknown requestId", () => {
const graph = new FlowGraph();
expect(() => graph.updateCall("missing", { output: "x" })).toThrow(NodeNotFoundError);
});
});
describe("FlowGraph.removeCall", () => {
it("removes node and attached edges", () => {
const graph = new FlowGraph();
graph.addCall({ requestId: "req-1", operationId: "task.parent", status: "pending", input: {} });
graph.addCall({ requestId: "req-2", operationId: "task.child", status: "pending", input: {}, parentRequestId: "req-1" });
expect(graph.size).toBe(1);
graph.removeCall("req-2");
expect(graph.hasNode("req-2")).toBe(false);
expect(graph.size).toBe(0);
expect(graph.hasNode("req-1")).toBe(true);
});
it("is a no-op if requestId doesn't exist", () => {
const graph = new FlowGraph();
expect(() => graph.removeCall("missing")).not.toThrow();
});
}); });

View File

@@ -0,0 +1,232 @@
import { describe, it, expect } from "vitest";
import { Type } from "@alkdev/typebox";
import { FlowGraph } from "../../src/graph/construction.js";
import type { OperationSpec } from "../../src/graph/construction.js";
import { InvalidInputError, CycleError } from "../../src/error/index.js";
describe("FlowGraph.export", () => {
it("returns graphology native JSON format for empty graph", () => {
const fg = new FlowGraph();
const data = fg.export();
expect(data.options).toEqual({ type: "directed", multi: false, allowSelfLoops: false });
expect(data.attributes).toEqual({});
expect(data.nodes).toEqual([]);
expect(data.edges).toEqual([]);
});
it("returns graphology native JSON format for operation graph", () => {
const specs: OperationSpec[] = [
{ name: "extract", namespace: "task", version: "1.0.0", type: "query", inputSchema: Type.Object({ raw: Type.String() }), outputSchema: Type.Object({ text: Type.String() }) },
{ name: "classify", namespace: "task", version: "1.0.0", type: "query", inputSchema: Type.Object({ text: Type.String() }), outputSchema: Type.Object({ label: Type.String() }) },
];
const graph = FlowGraph.fromSpecs(specs);
const data = graph.export();
expect(data.options.type).toBe("directed");
expect(data.nodes.length).toBe(2);
expect(data.edges.length).toBeGreaterThan(0);
});
});
describe("FlowGraph.toJSON", () => {
it("is an alias for export", () => {
const fg = new FlowGraph();
fg.addNode("a", { name: "a" } as never);
const exported = fg.export();
const jsoned = fg.toJSON();
expect(jsoned).toEqual(exported);
});
});
describe("FlowGraph.toString", () => {
it("returns JSON.stringify of export()", () => {
const fg = new FlowGraph();
fg.addNode("a", { name: "a" } as never);
expect(fg.toString()).toBe(JSON.stringify(fg.export()));
});
it("round-trips through JSON.parse", () => {
const fg = new FlowGraph();
fg.addNode("a", { name: "a" } as never);
const parsed = JSON.parse(fg.toString());
expect(parsed.options).toEqual({ type: "directed", multi: false, allowSelfLoops: false });
});
});
describe("FlowGraph.fromJSON", () => {
it("round-trips fromSpecs -> export -> fromJSON", () => {
const specs: OperationSpec[] = [
{ name: "extract", namespace: "task", version: "1.0.0", type: "query", inputSchema: Type.Object({ raw: Type.String() }), outputSchema: Type.Object({ text: Type.String() }) },
{ name: "classify", namespace: "task", version: "1.0.0", type: "query", inputSchema: Type.Object({ text: Type.String() }), outputSchema: Type.Object({ label: Type.String() }) },
];
const original = FlowGraph.fromSpecs(specs);
const data = original.export();
const restored = FlowGraph.fromJSON(data);
expect(restored.order).toBe(original.order);
expect(restored.size).toBe(original.size);
for (const node of original.nodes()) {
expect(restored.hasNode(node)).toBe(true);
const origAttrs = original.getNodeAttributes(node as never) as Record<string, unknown>;
const restAttrs = restored.getNodeAttributes(node as never) as Record<string, unknown>;
expect(restAttrs.name).toBe(origAttrs.name);
expect(restAttrs.namespace).toBe(origAttrs.namespace);
}
for (const edge of original.edges()) {
const source = edge.split("->")[0]!;
const target = edge.split("->")[1]!;
expect(restored.hasEdge(source, target)).toBe(true);
}
});
it("round-trips empty graph", () => {
const fg = new FlowGraph();
const data = fg.export();
const restored = FlowGraph.fromJSON(data);
expect(restored.order).toBe(0);
expect(restored.size).toBe(0);
expect(restored.export()).toEqual(data);
});
it("throws InvalidInputError on invalid input", () => {
expect(() => FlowGraph.fromJSON({})).toThrow(InvalidInputError);
});
it("InvalidInputError contains errors array", () => {
try {
FlowGraph.fromJSON({});
expect.unreachable("should throw");
} catch (e) {
expect(e).toBeInstanceOf(InvalidInputError);
const err = e as InvalidInputError;
expect(err.errors.length).toBeGreaterThan(0);
}
});
it("throws InvalidInputError on missing nodes", () => {
const bad = {
options: { type: "directed", multi: false, allowSelfLoops: false },
attributes: {},
edges: [],
};
expect(() => FlowGraph.fromJSON(bad as never)).toThrow(InvalidInputError);
});
it("throws CycleError on cyclic input", () => {
const cyclicData = {
options: { type: "directed", multi: false, allowSelfLoops: false },
attributes: {},
nodes: [
{ key: "a", attributes: { requestId: "a", operationId: "op.a", status: "completed", input: {} } },
{ key: "b", attributes: { requestId: "b", operationId: "op.b", status: "completed", input: {} } },
],
edges: [
{ key: "a->b", source: "a", target: "b", attributes: {} },
{ key: "b->a", source: "b", target: "a", attributes: {} },
],
};
expect(() => FlowGraph.fromJSON(cyclicData as never)).toThrow(CycleError);
});
it("CycleError contains cycle paths on cyclic input", () => {
const cyclicData = {
options: { type: "directed", multi: false, allowSelfLoops: false },
attributes: {},
nodes: [
{ key: "a", attributes: { requestId: "a", operationId: "op.a", status: "completed", input: {} } },
{ key: "b", attributes: { requestId: "b", operationId: "op.b", status: "completed", input: {} } },
],
edges: [
{ key: "a->b", source: "a", target: "b", attributes: {} },
{ key: "b->a", source: "b", target: "a", attributes: {} },
],
};
try {
FlowGraph.fromJSON(cyclicData as never);
expect.unreachable("should throw");
} catch (e) {
expect(e).toBeInstanceOf(CycleError);
const ce = e as CycleError;
expect(ce.cycles.length).toBeGreaterThan(0);
}
});
it("preserves node attributes through round-trip", () => {
const specs: OperationSpec[] = [
{ name: "classify", namespace: "task", version: "2.0.0", type: "mutation", inputSchema: Type.Object({ text: Type.String() }), outputSchema: Type.Object({ label: Type.String() }), description: "Classifies text", tags: ["nlp"] },
];
const original = FlowGraph.fromSpecs(specs);
const data = original.export();
const restored = FlowGraph.fromJSON(data);
const origAttrs = original.getNodeAttributes("task.classify" as never) as Record<string, unknown>;
const restAttrs = restored.getNodeAttributes("task.classify" as never) as Record<string, unknown>;
expect(restAttrs.name).toBe(origAttrs.name);
expect(restAttrs.namespace).toBe(origAttrs.namespace);
expect(restAttrs.version).toBe(origAttrs.version);
expect(restAttrs.type).toBe(origAttrs.type);
expect(restAttrs.description).toBe(origAttrs.description);
expect(restAttrs.tags).toEqual(origAttrs.tags);
});
it("preserves edge attributes through round-trip", () => {
const specs: OperationSpec[] = [
{ name: "extract", namespace: "task", version: "1.0.0", type: "query", inputSchema: Type.Object({ raw: Type.String() }), outputSchema: Type.Object({ text: Type.String() }) },
{ name: "classify", namespace: "task", version: "1.0.0", type: "query", inputSchema: Type.Object({ text: Type.String() }), outputSchema: Type.Object({ label: Type.String() }) },
];
const original = FlowGraph.fromSpecs(specs);
const data = original.export();
const restored = FlowGraph.fromJSON(data);
const origEdgeAttrs = original.getEdgeAttributes("task.extract", "task.classify") as Record<string, unknown>;
const restEdgeAttrs = restored.getEdgeAttributes("task.extract", "task.classify") as Record<string, unknown>;
expect(restEdgeAttrs.edgeType).toBe(origEdgeAttrs.edgeType);
expect(restEdgeAttrs.compatible).toBe(origEdgeAttrs.compatible);
});
it("double round-trip is lossless", () => {
const specs: OperationSpec[] = [
{ name: "extract", namespace: "task", version: "1.0.0", type: "query", inputSchema: Type.Object({ raw: Type.String() }), outputSchema: Type.Object({ text: Type.String() }) },
{ name: "classify", namespace: "task", version: "1.0.0", type: "query", inputSchema: Type.Object({ text: Type.String() }), outputSchema: Type.Object({ label: Type.String() }) },
];
const original = FlowGraph.fromSpecs(specs);
const first = original.export();
const restored1 = FlowGraph.fromJSON(first);
const second = restored1.export();
const restored2 = FlowGraph.fromJSON(second);
const third = restored2.export();
expect(third).toEqual(first);
expect(third).toEqual(second);
});
it("accepts valid call graph serialized data", () => {
const callData = {
options: { type: "directed", multi: false, allowSelfLoops: false },
attributes: {},
nodes: [
{
key: "req_1",
attributes: {
requestId: "req_1",
operationId: "task.classify",
status: "completed",
input: { text: "hello" },
output: { label: "greeting" },
},
},
],
edges: [],
};
const fg = FlowGraph.fromJSON(callData as never);
expect(fg.order).toBe(1);
expect(fg.hasNode("req_1")).toBe(true);
});
it("throws InvalidInputError for invalid node attributes", () => {
const bad = {
options: { type: "directed", multi: false, allowSelfLoops: false },
attributes: {},
nodes: [
{ key: "a", attributes: { invalid: true } },
],
edges: [],
};
expect(() => FlowGraph.fromJSON(bad as never)).toThrow(InvalidInputError);
});
});

View File

@@ -1,7 +1,507 @@
import { describe, it, expect } from 'vitest'; import { describe, it, expect } from "vitest";
import { h, createHostRoot } from "@alkdev/ujsx";
import { Operation, Sequential, Parallel, Conditional, Map } from "../../src/component/index.js";
import { ReactiveHostConfig } from "../../src/host/reactive.js";
import type { WorkflowNode, ReactiveContext } from "../../src/host/reactive.js";
describe('reactive host', () => { function renderTemplate(template: ReturnType<typeof h>): ReactiveContext {
it('placeholder', () => { const root = createHostRoot(ReactiveHostConfig, null);
expect(true).toBe(true); root.render(template);
return root.ctx;
}
describe("ReactiveHostConfig", () => {
describe("createRootContext", () => {
it("creates fresh ReactiveContext with empty maps", () => {
const ctx = ReactiveHostConfig.createRootContext(null);
expect(ctx.nodes.size).toBe(0);
expect(ctx.statusSignals.size).toBe(0);
expect(ctx.parentMap.size).toBe(0);
expect(ctx.siblingMap.size).toBe(0);
});
it("accepts options with registry", () => {
const registry = { resolve: (name: string) => name };
const ctx = ReactiveHostConfig.createRootContext(null, { registry });
expect(ctx.operationRegistry).toBe(registry);
});
});
describe("createInstance for operation", () => {
it("creates WorkflowNode with correct key and type", () => {
const template = h(Sequential, {},
h(Operation, { name: "classify" }),
);
const ctx = renderTemplate(template);
const node = ctx.nodes.get("classify");
expect(node).toBeDefined();
expect(node!.key).toBe("classify");
expect(node!.type).toBe("operation");
expect(node!.operationId).toBe("classify");
});
it("creates WorkflowNode with idle status signal", () => {
const template = h(Sequential, {},
h(Operation, { name: "classify" }),
);
const ctx = renderTemplate(template);
const node = ctx.nodes.get("classify")!;
expect(node.status.value).toBe("idle");
});
it("registers node in context maps", () => {
const template = h(Sequential, {},
h(Operation, { name: "classify" }),
);
const ctx = renderTemplate(template);
expect(ctx.statusSignals.has("classify")).toBe(true);
expect(ctx.preconditions.has("classify")).toBe(true);
expect(ctx.blockedByFailure.has("classify")).toBe(true);
});
it("creates output signal for operation nodes", () => {
const template = h(Sequential, {},
h(Operation, { name: "classify" }),
);
const ctx = renderTemplate(template);
const node = ctx.nodes.get("classify")!;
expect(node.output).toBeDefined();
expect(node.output!.value).toBeUndefined();
});
});
describe("createInstance for structural containers", () => {
it("creates WorkflowNode tracking children for Sequential", () => {
const template = h(Sequential, {},
h(Operation, { name: "A" }),
h(Operation, { name: "B" }),
);
const ctx = renderTemplate(template);
const seqNode = ctx.nodes.get("__sequential_0");
expect(seqNode).toBeDefined();
expect(seqNode!.type).toBe("sequential");
expect(seqNode!.children.length).toBe(2);
});
it("creates WorkflowNode for Parallel with children", () => {
const template = h(Parallel, {},
h(Operation, { name: "A" }),
h(Operation, { name: "B" }),
);
const ctx = renderTemplate(template);
const parNode = ctx.nodes.get("__parallel_0");
expect(parNode).toBeDefined();
expect(parNode!.type).toBe("parallel");
expect(parNode!.children.length).toBe(2);
});
it("creates WorkflowNode for Conditional with children", () => {
const template = h(Sequential, {},
h(Operation, { name: "A" }),
h(Conditional, { test: "A" },
h(Operation, { name: "B" }),
),
);
const ctx = renderTemplate(template);
const condNode = ctx.nodes.get("__conditional_1");
expect(condNode).toBeDefined();
expect(condNode!.type).toBe("conditional");
});
it("structural containers do not have operationId", () => {
const template = h(Sequential, {},
h(Operation, { name: "A" }),
);
const ctx = renderTemplate(template);
const seqNode = ctx.nodes.get("__sequential_0")!;
expect(seqNode.operationId).toBeUndefined();
});
});
describe("signal initial states", () => {
it("root operation has preconditions met (no predecessors)", () => {
const template = h(Sequential, {},
h(Operation, { name: "A" }),
h(Operation, { name: "B" }),
);
const ctx = renderTemplate(template);
const nodeA = ctx.nodes.get("A")!;
expect(nodeA.preconditions.value).toBe(true);
});
it("second operation in sequential has unmet preconditions", () => {
const template = h(Sequential, {},
h(Operation, { name: "A" }),
h(Operation, { name: "B" }),
);
const ctx = renderTemplate(template);
const nodeB = ctx.nodes.get("B")!;
expect(nodeB.preconditions.value).toBe(false);
});
it("parallel children have preconditions met (no inter-child deps)", () => {
const template = h(Parallel, {},
h(Operation, { name: "A" }),
h(Operation, { name: "B" }),
);
const ctx = renderTemplate(template);
expect(ctx.nodes.get("A")!.preconditions.value).toBe(true);
expect(ctx.nodes.get("B")!.preconditions.value).toBe(true);
});
it("all initial blockedByFailure are false", () => {
const template = h(Sequential, {},
h(Operation, { name: "A" }),
h(Operation, { name: "B" }),
);
const ctx = renderTemplate(template);
expect(ctx.nodes.get("A")!.blockedByFailure.value).toBe(false);
expect(ctx.nodes.get("B")!.blockedByFailure.value).toBe(false);
});
});
describe("precondition transition on predecessor completion", () => {
it("sequential: completing predecessor updates dependent preconditions", () => {
const template = h(Sequential, {},
h(Operation, { name: "A" }),
h(Operation, { name: "B" }),
);
const ctx = renderTemplate(template);
const nodeA = ctx.nodes.get("A")!;
const nodeB = ctx.nodes.get("B")!;
expect(nodeB.preconditions.value).toBe(false);
nodeA.status.value = "completed";
expect(nodeB.preconditions.value).toBe(true);
});
it("sequential: skipped predecessor satisfies preconditions", () => {
const template = h(Sequential, {},
h(Operation, { name: "A" }),
h(Operation, { name: "B" }),
);
const ctx = renderTemplate(template);
const nodeA = ctx.nodes.get("A")!;
nodeA.status.value = "skipped";
expect(ctx.nodes.get("B")!.preconditions.value).toBe(true);
});
it("sequential chain: A→B→C, completing A then B", () => {
const template = h(Sequential, {},
h(Operation, { name: "A" }),
h(Operation, { name: "B" }),
h(Operation, { name: "C" }),
);
const ctx = renderTemplate(template);
expect(ctx.nodes.get("B")!.preconditions.value).toBe(false);
expect(ctx.nodes.get("C")!.preconditions.value).toBe(false);
ctx.nodes.get("A")!.status.value = "completed";
expect(ctx.nodes.get("B")!.preconditions.value).toBe(true);
expect(ctx.nodes.get("C")!.preconditions.value).toBe(false);
ctx.nodes.get("B")!.status.value = "completed";
expect(ctx.nodes.get("C")!.preconditions.value).toBe(true);
});
it("parallel children preconditions independent of each other", () => {
const template = h(Sequential, {},
h(Operation, { name: "pre" }),
h(Parallel, {},
h(Operation, { name: "A" }),
h(Operation, { name: "B" }),
),
);
const ctx = renderTemplate(template);
expect(ctx.nodes.get("A")!.preconditions.value).toBe(false);
expect(ctx.nodes.get("B")!.preconditions.value).toBe(false);
ctx.nodes.get("pre")!.status.value = "completed";
expect(ctx.nodes.get("A")!.preconditions.value).toBe(true);
expect(ctx.nodes.get("B")!.preconditions.value).toBe(true);
});
});
describe("failure propagation", () => {
it("sequential: failed predecessor causes blockedByFailure to be true", () => {
const template = h(Sequential, {},
h(Operation, { name: "A" }),
h(Operation, { name: "B" }),
);
const ctx = renderTemplate(template);
ctx.nodes.get("A")!.status.value = "failed";
expect(ctx.nodes.get("B")!.blockedByFailure.value).toBe(true);
});
it("sequential: aborted predecessor causes blockedByFailure to be true", () => {
const template = h(Sequential, {},
h(Operation, { name: "A" }),
h(Operation, { name: "B" }),
);
const ctx = renderTemplate(template);
ctx.nodes.get("A")!.status.value = "aborted";
expect(ctx.nodes.get("B")!.blockedByFailure.value).toBe(true);
});
it("parallel siblings are independent for failure propagation", () => {
const template = h(Parallel, {},
h(Operation, { name: "A" }),
h(Operation, { name: "B" }),
);
const ctx = renderTemplate(template);
ctx.nodes.get("A")!.status.value = "failed";
expect(ctx.nodes.get("B")!.blockedByFailure.value).toBe(false);
});
it("failed predecessor does not satisfy preconditions", () => {
const template = h(Sequential, {},
h(Operation, { name: "A" }),
h(Operation, { name: "B" }),
);
const ctx = renderTemplate(template);
ctx.nodes.get("A")!.status.value = "failed";
expect(ctx.nodes.get("B")!.preconditions.value).toBe(false);
});
});
describe("appendChild", () => {
it("appends children to parent's children array", () => {
const template = h(Sequential, {},
h(Operation, { name: "A" }),
h(Operation, { name: "B" }),
);
const ctx = renderTemplate(template);
const seqNode = ctx.nodes.get("__sequential_0")!;
expect(seqNode.children.length).toBe(2);
expect(seqNode.children[0]!.key).toBe("A");
expect(seqNode.children[1]!.key).toBe("B");
});
it("does not duplicate children", () => {
const template = h(Sequential, {},
h(Operation, { name: "A" }),
);
const ctx = renderTemplate(template);
const seqNode = ctx.nodes.get("__sequential_0")!;
const childNode = ctx.nodes.get("A")!;
ReactiveHostConfig.appendChild(seqNode, childNode, ctx);
expect(seqNode.children.length).toBe(1);
});
});
describe("removeChild", () => {
it("removes child from parent's children array", () => {
const template = h(Sequential, {},
h(Operation, { name: "A" }),
h(Operation, { name: "B" }),
);
const ctx = renderTemplate(template);
const seqNode = ctx.nodes.get("__sequential_0")!;
const childB = ctx.nodes.get("B")!;
ReactiveHostConfig.removeChild(seqNode, childB, ctx);
expect(seqNode.children.length).toBe(1);
expect(seqNode.children[0]!.key).toBe("A");
});
it("preconditions auto-reevaluate after removeChild", () => {
const template = h(Sequential, {},
h(Operation, { name: "A" }),
h(Operation, { name: "B" }),
h(Operation, { name: "C" }),
);
const ctx = renderTemplate(template);
const seqNode = ctx.nodes.get("__sequential_0")!;
const childB = ctx.nodes.get("B")!;
ctx.nodes.get("A")!.status.value = "completed";
expect(childB.preconditions.value).toBe(true);
ReactiveHostConfig.removeChild(seqNode, childB, ctx);
const childC = ctx.nodes.get("C")!;
expect(childC.preconditions.value).toBe(true);
});
});
describe("parentMap and siblingMap", () => {
it("registers parent-child relationships in parentMap", () => {
const template = h(Sequential, {},
h(Operation, { name: "A" }),
h(Operation, { name: "B" }),
);
const ctx = renderTemplate(template);
expect(ctx.parentMap.get("A")).toBe("__sequential_0");
expect(ctx.parentMap.get("B")).toBe("__sequential_0");
});
it("registers siblings in siblingMap", () => {
const template = h(Sequential, {},
h(Operation, { name: "A" }),
h(Operation, { name: "B" }),
);
const ctx = renderTemplate(template);
const siblings = ctx.siblingMap.get("__sequential_0");
expect(siblings).toEqual(["A", "B"]);
});
it("nested structures register correct parent", () => {
const template = h(Sequential, {},
h(Operation, { name: "pre" }),
h(Parallel, {},
h(Operation, { name: "A" }),
h(Operation, { name: "B" }),
),
);
const ctx = renderTemplate(template);
expect(ctx.parentMap.get("pre")).toBe("__sequential_0");
expect(ctx.parentMap.get("A")).toBe("__parallel_1");
expect(ctx.parentMap.get("B")).toBe("__parallel_1");
const seqSiblings = ctx.siblingMap.get("__sequential_0");
expect(seqSiblings).toEqual(["pre", "__parallel_1"]);
const parSiblings = ctx.siblingMap.get("__parallel_1");
expect(parSiblings).toEqual(["A", "B"]);
});
});
describe("Conditional as error boundary", () => {
it("conditional node is created with correct type", () => {
const template = h(Sequential, {},
h(Operation, { name: "A" }),
h(Conditional, { test: "A" },
h(Operation, { name: "B" }),
),
);
const ctx = renderTemplate(template);
const condNode = ctx.nodes.get("__conditional_1");
expect(condNode).toBeDefined();
expect(condNode!.type).toBe("conditional");
});
it("conditional child has preconditions met (no inter-child ordering)", () => {
const template = h(Conditional, { test: "A" },
h(Operation, { name: "B" }),
);
const ctx = renderTemplate(template);
expect(ctx.nodes.get("B")!.preconditions.value).toBe(true);
});
});
describe("full template rendering", () => {
it("renders a complete pipeline template", () => {
const template = h(Sequential, {},
h(Operation, { name: "architect" }),
h(Operation, { name: "reviewer" }),
h(Parallel, {},
h(Operation, { name: "decomposer" }),
h(Operation, { name: "specialist" }),
),
h(Operation, { name: "synthesizer" }),
);
const ctx = renderTemplate(template);
expect(ctx.nodes.get("architect")).toBeDefined();
expect(ctx.nodes.get("reviewer")).toBeDefined();
expect(ctx.nodes.get("decomposer")).toBeDefined();
expect(ctx.nodes.get("specialist")).toBeDefined();
expect(ctx.nodes.get("synthesizer")).toBeDefined();
expect(ctx.nodes.get("architect")!.preconditions.value).toBe(true);
expect(ctx.nodes.get("reviewer")!.preconditions.value).toBe(false);
ctx.nodes.get("architect")!.status.value = "completed";
expect(ctx.nodes.get("reviewer")!.preconditions.value).toBe(true);
});
it("deeply nested template registers all nodes", () => {
const template = h(Sequential, {},
h(Parallel, {},
h(Sequential, {},
h(Operation, { name: "A" }),
h(Operation, { name: "B" }),
),
h(Operation, { name: "C" }),
),
);
const ctx = renderTemplate(template);
expect(ctx.nodes.has("A")).toBe(true);
expect(ctx.nodes.has("B")).toBe(true);
expect(ctx.nodes.has("C")).toBe(true);
});
});
describe("shared signal references", () => {
it("WorkflowNode.status and statusSignals point to same signal", () => {
const template = h(Sequential, {},
h(Operation, { name: "A" }),
);
const ctx = renderTemplate(template);
const node = ctx.nodes.get("A")!;
const statusFromMap = ctx.statusSignals.get("A")!;
expect(node.status).toBe(statusFromMap);
statusFromMap.value = "running";
expect(node.status.value).toBe("running");
node.status.value = "completed";
expect(statusFromMap.value).toBe("completed");
});
}); });
}); });

View File

@@ -0,0 +1,482 @@
import { describe, it, expect } from "vitest";
import { DirectedGraph } from "graphology";
import { WorkflowReactiveRoot } from "../../src/reactive/workflow.js";
import type { ParallelGroupConfig } from "../../src/reactive/workflow.js";
function makeParallelGroupGraph(): DirectedGraph {
const graph = new DirectedGraph();
graph.addNode("entry", { name: "entry" });
graph.addNode("a", { name: "a" });
graph.addNode("b", { name: "b" });
graph.addNode("c", { name: "c" });
graph.addNode("d", { name: "d" });
graph.addEdgeWithKey("entry->a", "entry", "a", { edgeType: "sequential" });
graph.addEdgeWithKey("entry->b", "entry", "b", { edgeType: "sequential" });
graph.addEdgeWithKey("entry->c", "entry", "c", { edgeType: "sequential" });
graph.addEdgeWithKey("entry->d", "entry", "d", { edgeType: "sequential" });
return graph;
}
function makeParallelGroupNoEntry(): DirectedGraph {
const graph = new DirectedGraph();
graph.addNode("a", { name: "a" });
graph.addNode("b", { name: "b" });
graph.addNode("c", { name: "c" });
return graph;
}
describe("maxConcurrency semaphore", () => {
describe("parallel group with maxConcurrency: 2", () => {
const parallelGroups: ParallelGroupConfig = {
group1: {
siblings: ["a", "b", "c", "d"],
maxConcurrency: 2,
},
};
it("limits running siblings to maxConcurrency", () => {
const graph = makeParallelGroupGraph();
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
expect(root.canStart.has("a")).toBe(true);
expect(root.canStart.has("b")).toBe(true);
expect(root.canStart.has("c")).toBe(true);
expect(root.canStart.has("d")).toBe(true);
expect(root.statusMap.get("entry")!.value).toBe("ready");
root.statusMap.get("entry")!.value = "completed";
expect(root.preconditions.get("a")!.value).toBe(true);
expect(root.preconditions.get("b")!.value).toBe(true);
expect(root.preconditions.get("c")!.value).toBe(true);
expect(root.preconditions.get("d")!.value).toBe(true);
expect(root.canStart.get("a")!.value).toBe(true);
expect(root.canStart.get("b")!.value).toBe(true);
root.dispose();
});
it("canStart becomes false when maxConcurrency siblings are running", () => {
const graph = makeParallelGroupGraph();
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
root.statusMap.get("entry")!.value = "completed";
root.statusMap.get("a")!.value = "running";
root.statusMap.get("b")!.value = "running";
expect(root.canStart.get("a")!.value).toBe(true);
expect(root.canStart.get("b")!.value).toBe(true);
expect(root.canStart.get("c")!.value).toBe(false);
expect(root.canStart.get("d")!.value).toBe(false);
root.dispose();
});
it("blocked nodes by semaphore stay idle (not ready)", () => {
const graph = makeParallelGroupGraph();
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
root.statusMap.get("entry")!.value = "completed";
root.statusMap.get("a")!.value = "running";
root.statusMap.get("b")!.value = "running";
expect(root.statusMap.get("c")!.value).toBe("idle");
expect(root.statusMap.get("d")!.value).toBe("idle");
root.dispose();
});
it("slot opens when a running sibling completes", () => {
const graph = makeParallelGroupGraph();
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
root.statusMap.get("entry")!.value = "completed";
root.statusMap.get("a")!.value = "running";
root.statusMap.get("b")!.value = "running";
expect(root.canStart.get("c")!.value).toBe(false);
root.statusMap.get("a")!.value = "completed";
expect(root.canStart.get("c")!.value).toBe(true);
expect(root.canStart.get("d")!.value).toBe(false);
expect(root.statusMap.get("c")!.value).toBe("ready");
root.dispose();
});
it("all slots eventually open as siblings complete", () => {
const graph = makeParallelGroupGraph();
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
root.statusMap.get("entry")!.value = "completed";
root.statusMap.get("a")!.value = "running";
root.statusMap.get("b")!.value = "running";
root.statusMap.get("a")!.value = "completed";
expect(root.canStart.get("c")!.value).toBe(true);
root.statusMap.get("b")!.value = "completed";
expect(root.canStart.get("d")!.value).toBe(true);
root.dispose();
});
it("canStart correctly limits when sibling transitions to ready", () => {
const graph = makeParallelGroupGraph();
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
root.statusMap.get("entry")!.value = "completed";
root.statusMap.get("a")!.value = "ready";
root.statusMap.get("b")!.value = "ready";
expect(root.canStart.get("c")!.value).toBe(false);
expect(root.canStart.get("d")!.value).toBe(false);
root.statusMap.get("a")!.value = "running";
root.statusMap.get("b")!.value = "running";
expect(root.canStart.get("c")!.value).toBe(false);
expect(root.canStart.get("d")!.value).toBe(false);
root.dispose();
});
});
describe("parallel group without maxConcurrency", () => {
const parallelGroups: ParallelGroupConfig = {
group1: {
siblings: ["a", "b", "c", "d"],
},
};
it("all siblings start immediately when preconditions are met", () => {
const graph = makeParallelGroupGraph();
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
root.statusMap.get("entry")!.value = "completed";
expect(root.canStart.get("a")!.value).toBe(true);
expect(root.canStart.get("b")!.value).toBe(true);
expect(root.canStart.get("c")!.value).toBe(true);
expect(root.canStart.get("d")!.value).toBe(true);
expect(root.statusMap.get("a")!.value).toBe("ready");
expect(root.statusMap.get("b")!.value).toBe("ready");
expect(root.statusMap.get("c")!.value).toBe("ready");
expect(root.statusMap.get("d")!.value).toBe("ready");
root.dispose();
});
it("no semaphore — running siblings do not block others", () => {
const graph = makeParallelGroupGraph();
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
root.statusMap.get("entry")!.value = "completed";
root.statusMap.get("a")!.value = "running";
root.statusMap.get("b")!.value = "running";
expect(root.canStart.get("c")!.value).toBe(true);
expect(root.canStart.get("d")!.value).toBe(true);
root.dispose();
});
});
describe("no parallelGroups config", () => {
it("all nodes start normally without semaphore", () => {
const graph = makeParallelGroupGraph();
const root = new WorkflowReactiveRoot(graph);
root.statusMap.get("entry")!.value = "completed";
expect(root.canStart.get("a")!.value).toBe(true);
expect(root.canStart.get("b")!.value).toBe(true);
expect(root.canStart.get("c")!.value).toBe(true);
expect(root.canStart.get("d")!.value).toBe(true);
expect(root.statusMap.get("a")!.value).toBe("ready");
expect(root.statusMap.get("b")!.value).toBe("ready");
expect(root.statusMap.get("c")!.value).toBe("ready");
expect(root.statusMap.get("d")!.value).toBe("ready");
root.dispose();
});
});
describe("maxConcurrency: 1 (serial within parallel)", () => {
const parallelGroups: ParallelGroupConfig = {
group1: {
siblings: ["a", "b", "c"],
maxConcurrency: 1,
},
};
it("only one sibling can run at a time", () => {
const graph = makeParallelGroupGraph();
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
root.statusMap.get("entry")!.value = "completed";
const canStartA = root.canStart.get("a")!;
const canStartB = root.canStart.get("b")!;
const canStartC = root.canStart.get("c")!;
expect(canStartA.value).toBe(true);
expect(canStartB.value).toBe(false);
expect(canStartC.value).toBe(false);
root.statusMap.get("a")!.value = "running";
expect(canStartA.value).toBe(true);
expect(canStartB.value).toBe(false);
expect(canStartC.value).toBe(false);
root.dispose();
});
it("next sibling can start when current finishes", () => {
const graph = makeParallelGroupGraph();
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
root.statusMap.get("entry")!.value = "completed";
root.statusMap.get("a")!.value = "running";
root.statusMap.get("a")!.value = "completed";
expect(root.canStart.get("b")!.value).toBe(true);
expect(root.canStart.get("c")!.value).toBe(false);
root.dispose();
});
});
describe("root-level parallel group with maxConcurrency", () => {
it("limits concurrent start for root nodes with no predecessors", () => {
const graph = makeParallelGroupNoEntry();
const parallelGroups: ParallelGroupConfig = {
root: {
siblings: ["a", "b", "c"],
maxConcurrency: 2,
},
};
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
expect(root.canStart.get("a")!.value).toBe(true);
expect(root.canStart.get("b")!.value).toBe(true);
expect(root.canStart.get("c")!.value).toBe(false);
root.dispose();
});
it("third node becomes ready when a slot opens", () => {
const graph = makeParallelGroupNoEntry();
const parallelGroups: ParallelGroupConfig = {
root: {
siblings: ["a", "b", "c"],
maxConcurrency: 2,
},
};
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
root.statusMap.get("a")!.value = "running";
root.statusMap.get("b")!.value = "running";
expect(root.canStart.get("c")!.value).toBe(false);
expect(root.statusMap.get("c")!.value).toBe("idle");
root.statusMap.get("a")!.value = "completed";
expect(root.canStart.get("c")!.value).toBe(true);
expect(root.statusMap.get("c")!.value).toBe("ready");
root.dispose();
});
});
describe("preconditions vs canStart", () => {
it("canStart is false when preconditions are not met even if semaphore has slots", () => {
const graph = makeParallelGroupGraph();
const parallelGroups: ParallelGroupConfig = {
group1: {
siblings: ["a", "b", "c", "d"],
maxConcurrency: 2,
},
};
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
expect(root.preconditions.get("a")!.value).toBe(false);
expect(root.canStart.get("a")!.value).toBe(false);
root.dispose();
});
it("canStart requires both preconditions AND semaphore slot", () => {
const graph = makeParallelGroupGraph();
const parallelGroups: ParallelGroupConfig = {
group1: {
siblings: ["a", "b", "c", "d"],
maxConcurrency: 2,
},
};
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
root.statusMap.get("entry")!.value = "completed";
expect(root.preconditions.get("a")!.value).toBe(true);
expect(root.canStart.get("a")!.value).toBe(true);
root.statusMap.get("a")!.value = "running";
root.statusMap.get("b")!.value = "running";
expect(root.preconditions.get("c")!.value).toBe(true);
expect(root.canStart.get("c")!.value).toBe(false);
root.dispose();
});
});
describe("failed sibling does not count as running", () => {
it("failed sibling frees a slot for semaphore", () => {
const graph = makeParallelGroupGraph();
const parallelGroups: ParallelGroupConfig = {
group1: {
siblings: ["a", "b", "c", "d"],
maxConcurrency: 2,
},
};
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
root.statusMap.get("entry")!.value = "completed";
root.statusMap.get("a")!.value = "running";
root.statusMap.get("b")!.value = "running";
expect(root.canStart.get("c")!.value).toBe(false);
root.statusMap.get("b")!.value = "failed";
expect(root.canStart.get("c")!.value).toBe(true);
root.dispose();
});
it("completed sibling frees a slot for semaphore", () => {
const graph = makeParallelGroupGraph();
const parallelGroups: ParallelGroupConfig = {
group1: {
siblings: ["a", "b", "c", "d"],
maxConcurrency: 2,
},
};
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
root.statusMap.get("entry")!.value = "completed";
root.statusMap.get("a")!.value = "running";
root.statusMap.get("b")!.value = "running";
root.statusMap.get("b")!.value = "completed";
expect(root.canStart.get("c")!.value).toBe(true);
expect(root.canStart.get("d")!.value).toBe(false);
root.dispose();
});
it("aborted sibling frees a slot for semaphore", () => {
const graph = makeParallelGroupGraph();
const parallelGroups: ParallelGroupConfig = {
group1: {
siblings: ["a", "b", "c", "d"],
maxConcurrency: 2,
},
};
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
root.statusMap.get("entry")!.value = "completed";
root.statusMap.get("a")!.value = "running";
root.statusMap.get("b")!.value = "running";
root.statusMap.get("b")!.value = "aborted";
expect(root.canStart.get("c")!.value).toBe(true);
root.dispose();
});
});
describe("dispose clears canStart", () => {
it("canStart map is cleared on dispose", () => {
const graph = makeParallelGroupGraph();
const parallelGroups: ParallelGroupConfig = {
group1: {
siblings: ["a", "b", "c", "d"],
maxConcurrency: 2,
},
};
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
expect(root.canStart.size).toBe(5);
root.dispose();
expect(root.canStart.size).toBe(0);
});
});
describe("full execution flow with maxConcurrency", () => {
it("simulates parallel execution with maxConcurrency: 2 — only 2 run at a time", () => {
const graph = makeParallelGroupGraph();
const parallelGroups: ParallelGroupConfig = {
group1: {
siblings: ["a", "b", "c", "d"],
maxConcurrency: 2,
},
};
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
root.setRequestId("entry", "req-entry");
root.setRequestId("a", "req-a");
root.setRequestId("b", "req-b");
root.setRequestId("c", "req-c");
root.setRequestId("d", "req-d");
root.statusMap.get("entry")!.value = "completed";
expect(root.statusMap.get("a")!.value).toBe("ready");
expect(root.statusMap.get("b")!.value).toBe("ready");
expect(root.statusMap.get("c")!.value).toBe("idle");
expect(root.statusMap.get("d")!.value).toBe("idle");
root.statusMap.get("a")!.value = "running";
root.statusMap.get("b")!.value = "running";
expect(root.statusMap.get("c")!.value).toBe("idle");
expect(root.statusMap.get("d")!.value).toBe("idle");
root.statusMap.get("a")!.value = "completed";
expect(root.statusMap.get("c")!.value).toBe("ready");
expect(root.statusMap.get("d")!.value).toBe("idle");
root.statusMap.get("c")!.value = "running";
root.statusMap.get("b")!.value = "completed";
expect(root.statusMap.get("d")!.value).toBe("ready");
root.statusMap.get("d")!.value = "running";
root.statusMap.get("c")!.value = "completed";
root.statusMap.get("d")!.value = "completed";
expect(root.isComplete()).toBe(true);
root.dispose();
});
});
});

View File

@@ -0,0 +1,663 @@
import { describe, it, expect } from "vitest";
import { signal, computed } from "@preact/signals-core";
import type { Signal, ReadonlySignal } from "@preact/signals-core";
import { DirectedGraph } from "graphology";
import {
computePreconditions,
computeBlockedByFailure,
registerStartEffect,
registerAbortEffect,
} from "../../src/reactive/node-status.js";
import type { NodeStatusContext } from "../../src/reactive/node-status.js";
import { WorkflowReactiveRoot } from "../../src/reactive/workflow.js";
import type { NodeStatus } from "../../src/schema/enums.js";
function makeContext(
statusMap: Map<string, Signal<NodeStatus>>,
predecessors: string[],
): NodeStatusContext {
return { statusMap, predecessors };
}
function makeStatusMap(
entries: [string, NodeStatus][],
): Map<string, Signal<NodeStatus>> {
const map = new Map<string, Signal<NodeStatus>>();
for (const [key, value] of entries) {
map.set(key, signal<NodeStatus>(value));
}
return map;
}
describe("computePreconditions", () => {
it("returns true for root node with no predecessors", () => {
const statusMap = makeStatusMap([]);
const ctx = makeContext(statusMap, []);
expect(computePreconditions("a", ctx)).toBe(true);
});
it("returns false when predecessor is idle", () => {
const statusMap = makeStatusMap([["a", "idle"]]);
const ctx = makeContext(statusMap, ["a"]);
expect(computePreconditions("b", ctx)).toBe(false);
});
it("returns true when all predecessors are completed", () => {
const statusMap = makeStatusMap([["a", "completed"]]);
const ctx = makeContext(statusMap, ["a"]);
expect(computePreconditions("b", ctx)).toBe(true);
});
it("returns true when all predecessors are skipped", () => {
const statusMap = makeStatusMap([["a", "skipped"]]);
const ctx = makeContext(statusMap, ["a"]);
expect(computePreconditions("b", ctx)).toBe(true);
});
it("returns true when predecessors are mix of completed and skipped", () => {
const statusMap = makeStatusMap([
["a", "completed"],
["b", "skipped"],
]);
const ctx = makeContext(statusMap, ["a", "b"]);
expect(computePreconditions("c", ctx)).toBe(true);
});
it("returns false when predecessor is failed", () => {
const statusMap = makeStatusMap([["a", "failed"]]);
const ctx = makeContext(statusMap, ["a"]);
expect(computePreconditions("b", ctx)).toBe(false);
});
it("returns false when predecessor is aborted", () => {
const statusMap = makeStatusMap([["a", "aborted"]]);
const ctx = makeContext(statusMap, ["a"]);
expect(computePreconditions("b", ctx)).toBe(false);
});
it("returns false when one of multiple predecessors is failed", () => {
const statusMap = makeStatusMap([
["a", "completed"],
["b", "failed"],
]);
const ctx = makeContext(statusMap, ["a", "b"]);
expect(computePreconditions("c", ctx)).toBe(false);
});
it("returns false when predecessor is running", () => {
const statusMap = makeStatusMap([["a", "running"]]);
const ctx = makeContext(statusMap, ["a"]);
expect(computePreconditions("b", ctx)).toBe(false);
});
it("returns false when predecessor is waiting", () => {
const statusMap = makeStatusMap([["a", "waiting"]]);
const ctx = makeContext(statusMap, ["a"]);
expect(computePreconditions("b", ctx)).toBe(false);
});
it("returns false for missing predecessor in statusMap", () => {
const statusMap = makeStatusMap([]);
const ctx = makeContext(statusMap, ["unknown"]);
expect(computePreconditions("b", ctx)).toBe(false);
});
});
describe("computeBlockedByFailure", () => {
it("returns false for root node with no predecessors", () => {
const statusMap = makeStatusMap([]);
const ctx = makeContext(statusMap, []);
expect(computeBlockedByFailure("a", ctx)).toBe(false);
});
it("returns true when predecessor is failed", () => {
const statusMap = makeStatusMap([["a", "failed"]]);
const ctx = makeContext(statusMap, ["a"]);
expect(computeBlockedByFailure("b", ctx)).toBe(true);
});
it("returns true when predecessor is aborted", () => {
const statusMap = makeStatusMap([["a", "aborted"]]);
const ctx = makeContext(statusMap, ["a"]);
expect(computeBlockedByFailure("b", ctx)).toBe(true);
});
it("returns false when predecessor is completed", () => {
const statusMap = makeStatusMap([["a", "completed"]]);
const ctx = makeContext(statusMap, ["a"]);
expect(computeBlockedByFailure("b", ctx)).toBe(false);
});
it("returns false when predecessor is skipped", () => {
const statusMap = makeStatusMap([["a", "skipped"]]);
const ctx = makeContext(statusMap, ["a"]);
expect(computeBlockedByFailure("b", ctx)).toBe(false);
});
it("returns true when any of multiple predecessors is failed", () => {
const statusMap = makeStatusMap([
["a", "completed"],
["b", "failed"],
]);
const ctx = makeContext(statusMap, ["a", "b"]);
expect(computeBlockedByFailure("c", ctx)).toBe(true);
});
it("returns false when all predecessors are completed", () => {
const statusMap = makeStatusMap([
["a", "completed"],
["b", "completed"],
]);
const ctx = makeContext(statusMap, ["a", "b"]);
expect(computeBlockedByFailure("c", ctx)).toBe(false);
});
it("returns false for missing predecessor in statusMap", () => {
const statusMap = makeStatusMap([]);
const ctx = makeContext(statusMap, ["unknown"]);
expect(computeBlockedByFailure("b", ctx)).toBe(false);
});
});
describe("registerStartEffect", () => {
it("transitions idle node to ready when preconditions are true", () => {
const status = signal<NodeStatus>("idle");
const preconditions = computed(() => true);
const disposers: (() => void)[] = [];
registerStartEffect(status, preconditions, disposers);
expect(status.value).toBe("ready");
for (const d of disposers) d();
});
it("transitions waiting node to ready when preconditions are true", () => {
const status = signal<NodeStatus>("waiting");
const preconditions = computed(() => true);
const disposers: (() => void)[] = [];
registerStartEffect(status, preconditions, disposers);
expect(status.value).toBe("ready");
for (const d of disposers) d();
});
it("does not transition running node when preconditions become true", () => {
const status = signal<NodeStatus>("running");
const preconditions = computed(() => true);
const disposers: (() => void)[] = [];
registerStartEffect(status, preconditions, disposers);
expect(status.value).toBe("running");
for (const d of disposers) d();
});
it("does not transition completed node when preconditions become true", () => {
const status = signal<NodeStatus>("completed");
const preconditions = computed(() => true);
const disposers: (() => void)[] = [];
registerStartEffect(status, preconditions, disposers);
expect(status.value).toBe("completed");
for (const d of disposers) d();
});
it("does not transition idle node when preconditions are false", () => {
const status = signal<NodeStatus>("idle");
const preconditions = computed(() => false);
const disposers: (() => void)[] = [];
registerStartEffect(status, preconditions, disposers);
expect(status.value).toBe("idle");
for (const d of disposers) d();
});
it("reactively transitions to ready when preconditions change from false to true", () => {
const trigger = signal<NodeStatus>("idle");
const preconditions = computed(() => trigger.value === "completed");
const status = signal<NodeStatus>("idle");
const disposers: (() => void)[] = [];
registerStartEffect(status, preconditions, disposers);
expect(status.value).toBe("idle");
trigger.value = "completed";
expect(status.value).toBe("ready");
for (const d of disposers) d();
});
});
describe("registerAbortEffect", () => {
it("transitions idle node to aborted when blockedByFailure is true", () => {
const status = signal<NodeStatus>("idle");
const blockedByFailure = computed(() => true);
const disposers: (() => void)[] = [];
registerAbortEffect(status, blockedByFailure, disposers);
expect(status.value).toBe("aborted");
for (const d of disposers) d();
});
it("transitions waiting node to aborted when blockedByFailure is true", () => {
const status = signal<NodeStatus>("waiting");
const blockedByFailure = computed(() => true);
const disposers: (() => void)[] = [];
registerAbortEffect(status, blockedByFailure, disposers);
expect(status.value).toBe("aborted");
for (const d of disposers) d();
});
it("does not transition ready node with default policy", () => {
const status = signal<NodeStatus>("ready");
const blockedByFailure = computed(() => true);
const disposers: (() => void)[] = [];
registerAbortEffect(status, blockedByFailure, disposers);
expect(status.value).toBe("ready");
for (const d of disposers) d();
});
it("transitions ready node to aborted with abortDependents option", () => {
const status = signal<NodeStatus>("ready");
const blockedByFailure = computed(() => true);
const disposers: (() => void)[] = [];
registerAbortEffect(status, blockedByFailure, disposers, {
abortDependents: true,
});
expect(status.value).toBe("aborted");
for (const d of disposers) d();
});
it("transitions running node to aborted with abortDependents option", () => {
const status = signal<NodeStatus>("running");
const blockedByFailure = computed(() => true);
const disposers: (() => void)[] = [];
registerAbortEffect(status, blockedByFailure, disposers, {
abortDependents: true,
});
expect(status.value).toBe("aborted");
for (const d of disposers) d();
});
it("does not transition completed node even with abortDependents", () => {
const status = signal<NodeStatus>("completed");
const blockedByFailure = computed(() => true);
const disposers: (() => void)[] = [];
registerAbortEffect(status, blockedByFailure, disposers, {
abortDependents: true,
});
expect(status.value).toBe("completed");
for (const d of disposers) d();
});
it("does not transition failed node even with abortDependents", () => {
const status = signal<NodeStatus>("failed");
const blockedByFailure = computed(() => true);
const disposers: (() => void)[] = [];
registerAbortEffect(status, blockedByFailure, disposers, {
abortDependents: true,
});
expect(status.value).toBe("failed");
for (const d of disposers) d();
});
it("reactively transitions to aborted when blockedByFailure changes", () => {
const trigger = signal<NodeStatus>("idle");
const blockedByFailure = computed(
() => trigger.value === "failed" || trigger.value === "aborted",
);
const status = signal<NodeStatus>("idle");
const disposers: (() => void)[] = [];
registerAbortEffect(status, blockedByFailure, disposers);
expect(status.value).toBe("idle");
trigger.value = "failed";
expect(status.value).toBe("aborted");
for (const d of disposers) d();
});
});
describe("sequential preconditions via WorkflowReactiveRoot", () => {
function makeSeqGraph(): DirectedGraph {
const graph = new DirectedGraph();
graph.addNode("a", { name: "a" });
graph.addNode("b", { name: "b" });
graph.addNode("c", { name: "c" });
graph.addEdgeWithKey("a->b", "a", "b", { edgeType: "sequential" });
graph.addEdgeWithKey("b->c", "b", "c", { edgeType: "sequential" });
return graph;
}
it("root node transitions to ready (no predecessors)", () => {
const graph = makeSeqGraph();
const root = new WorkflowReactiveRoot(graph);
expect(root.statusMap.get("a")!.value).toBe("ready");
root.dispose();
});
it("downstream nodes stay idle until predecessor completes", () => {
const graph = makeSeqGraph();
const root = new WorkflowReactiveRoot(graph);
expect(root.statusMap.get("b")!.value).toBe("idle");
root.dispose();
});
it("downstream node transitions to ready after predecessor completes", () => {
const graph = makeSeqGraph();
const root = new WorkflowReactiveRoot(graph);
root.statusMap.get("a")!.value = "completed";
expect(root.statusMap.get("b")!.value).toBe("ready");
root.dispose();
});
it("full sequential chain transitions", () => {
const graph = makeSeqGraph();
const root = new WorkflowReactiveRoot(graph);
root.statusMap.get("a")!.value = "completed";
expect(root.statusMap.get("b")!.value).toBe("ready");
root.statusMap.get("b")!.value = "completed";
expect(root.statusMap.get("c")!.value).toBe("ready");
root.dispose();
});
});
describe("parallel preconditions via WorkflowReactiveRoot", () => {
function makeParallelGraph(): DirectedGraph {
const graph = new DirectedGraph();
graph.addNode("top", { name: "top" });
graph.addNode("left", { name: "left" });
graph.addNode("right", { name: "right" });
graph.addEdgeWithKey("top->left", "top", "left", {
edgeType: "sequential",
});
graph.addEdgeWithKey("top->right", "top", "right", {
edgeType: "sequential",
});
return graph;
}
it("parallel siblings both become ready when parent completes", () => {
const graph = makeParallelGraph();
const root = new WorkflowReactiveRoot(graph);
root.statusMap.get("top")!.value = "completed";
expect(root.statusMap.get("left")!.value).toBe("ready");
expect(root.statusMap.get("right")!.value).toBe("ready");
root.dispose();
});
it("parallel siblings are independent - failure on one does not abort the other", () => {
const graph = makeParallelGraph();
const root = new WorkflowReactiveRoot(graph);
root.statusMap.get("top")!.value = "completed";
root.statusMap.get("left")!.value = "ready";
root.statusMap.get("right")!.value = "ready";
root.statusMap.get("left")!.value = "failed";
expect(root.statusMap.get("left")!.value).toBe("failed");
expect(root.statusMap.get("right")!.value).toBe("ready");
root.dispose();
});
});
describe("join (fork-join) preconditions via WorkflowReactiveRoot", () => {
function makeDiamondGraph(): DirectedGraph {
const graph = new DirectedGraph();
graph.addNode("top", { name: "top" });
graph.addNode("left", { name: "left" });
graph.addNode("right", { name: "right" });
graph.addNode("bottom", { name: "bottom" });
graph.addEdgeWithKey("top->left", "top", "left", {
edgeType: "sequential",
});
graph.addEdgeWithKey("top->right", "top", "right", {
edgeType: "sequential",
});
graph.addEdgeWithKey("left->bottom", "left", "bottom", {
edgeType: "sequential",
});
graph.addEdgeWithKey("right->bottom", "right", "bottom", {
edgeType: "sequential",
});
return graph;
}
it("join node stays idle until all predecessors complete", () => {
const graph = makeDiamondGraph();
const root = new WorkflowReactiveRoot(graph);
root.statusMap.get("top")!.value = "completed";
root.statusMap.get("left")!.value = "completed";
expect(root.statusMap.get("bottom")!.value).toBe("idle");
root.dispose();
});
it("join node becomes ready when all predecessors are completed", () => {
const graph = makeDiamondGraph();
const root = new WorkflowReactiveRoot(graph);
root.statusMap.get("top")!.value = "completed";
root.statusMap.get("left")!.value = "completed";
root.statusMap.get("right")!.value = "completed";
expect(root.statusMap.get("bottom")!.value).toBe("ready");
root.dispose();
});
it("join node blocked when one predecessor fails", () => {
const graph = makeDiamondGraph();
const root = new WorkflowReactiveRoot(graph);
root.statusMap.get("top")!.value = "completed";
root.statusMap.get("left")!.value = "completed";
root.statusMap.get("right")!.value = "failed";
expect(root.preconditions.get("bottom")!.value).toBe(false);
expect(root.statusMap.get("bottom")!.value).toBe("aborted");
root.dispose();
});
});
describe("blockedByFailure cascade via WorkflowReactiveRoot", () => {
it("failure cascades through multiple downstream nodes", () => {
const graph = new DirectedGraph();
graph.addNode("a", { name: "a" });
graph.addNode("b", { name: "b" });
graph.addNode("c", { name: "c" });
graph.addEdgeWithKey("a->b", "a", "b", { edgeType: "sequential" });
graph.addEdgeWithKey("b->c", "b", "c", { edgeType: "sequential" });
const root = new WorkflowReactiveRoot(graph);
root.statusMap.get("a")!.value = "failed";
expect(root.statusMap.get("b")!.value).toBe("aborted");
expect(root.statusMap.get("c")!.value).toBe("aborted");
root.dispose();
});
it("aborted predecessor causes cascade just like failed", () => {
const graph = new DirectedGraph();
graph.addNode("a", { name: "a" });
graph.addNode("b", { name: "b" });
graph.addEdgeWithKey("a->b", "a", "b", { edgeType: "sequential" });
const root = new WorkflowReactiveRoot(graph);
root.statusMap.get("a")!.value = "aborted";
expect(root.statusMap.get("b")!.value).toBe("aborted");
root.dispose();
});
});
describe("skipped satisfies preconditions via WorkflowReactiveRoot", () => {
it("skipped predecessor satisfies preconditions for downstream node", () => {
const graph = new DirectedGraph();
graph.addNode("a", { name: "a" });
graph.addNode("b", { name: "b" });
graph.addEdgeWithKey("a->b", "a", "b", { edgeType: "sequential" });
const root = new WorkflowReactiveRoot(graph);
root.statusMap.get("a")!.value = "skipped";
expect(root.preconditions.get("b")!.value).toBe(true);
expect(root.statusMap.get("b")!.value).toBe("ready");
root.dispose();
});
});
describe("failure isolation in parallel branches via WorkflowReactiveRoot", () => {
it("failure in one branch does not abort sibling branch", () => {
const graph = new DirectedGraph();
graph.addNode("top", { name: "top" });
graph.addNode("left", { name: "left" });
graph.addNode("right", { name: "right" });
graph.addNode("bottom", { name: "bottom" });
graph.addEdgeWithKey("top->left", "top", "left", {
edgeType: "sequential",
});
graph.addEdgeWithKey("top->right", "top", "right", {
edgeType: "sequential",
});
graph.addEdgeWithKey("left->bottom", "left", "bottom", {
edgeType: "sequential",
});
graph.addEdgeWithKey("right->bottom", "right", "bottom", {
edgeType: "sequential",
});
const root = new WorkflowReactiveRoot(graph);
root.statusMap.get("top")!.value = "completed";
root.statusMap.get("left")!.value = "running";
root.statusMap.get("right")!.value = "failed";
expect(root.statusMap.get("left")!.value).toBe("running");
expect(root.statusMap.get("bottom")!.value).toBe("aborted");
root.dispose();
});
it("completed branch is not affected by failure in other branch", () => {
const graph = new DirectedGraph();
graph.addNode("top", { name: "top" });
graph.addNode("left", { name: "left" });
graph.addNode("right", { name: "right" });
graph.addNode("bottom", { name: "bottom" });
graph.addEdgeWithKey("top->left", "top", "left", {
edgeType: "sequential",
});
graph.addEdgeWithKey("top->right", "top", "right", {
edgeType: "sequential",
});
graph.addEdgeWithKey("left->bottom", "left", "bottom", {
edgeType: "sequential",
});
graph.addEdgeWithKey("right->bottom", "right", "bottom", {
edgeType: "sequential",
});
const root = new WorkflowReactiveRoot(graph);
root.statusMap.get("top")!.value = "completed";
root.statusMap.get("left")!.value = "completed";
root.statusMap.get("right")!.value = "failed";
expect(root.statusMap.get("left")!.value).toBe("completed");
root.dispose();
});
});
describe("effect disposal", () => {
it("start effect is tracked by effectDisposers and cleaned up", () => {
const status = signal<NodeStatus>("idle");
const preconditions = computed(() => false);
const disposers: (() => void)[] = [];
registerStartEffect(status, preconditions, disposers);
expect(disposers.length).toBe(1);
disposers[0]!();
});
it("abort effect is tracked by effectDisposer and cleaned up", () => {
const status = signal<NodeStatus>("idle");
const blockedByFailure = computed(() => false);
const disposers: (() => void)[] = [];
registerAbortEffect(status, blockedByFailure, disposers);
expect(disposers.length).toBe(1);
disposers[0]!();
});
it("WorkflowReactiveRoot dispose clears all effects", () => {
const graph = new DirectedGraph();
graph.addNode("a", { name: "a" });
graph.addNode("b", { name: "b" });
graph.addEdgeWithKey("a->b", "a", "b", { edgeType: "sequential" });
const root = new WorkflowReactiveRoot(graph);
expect(root.statusMap.get("a")!.value).toBe("ready");
root.dispose();
expect(root.statusMap.size).toBe(0);
});
});

View File

@@ -48,11 +48,11 @@ function makeForkJoinGraph(): DirectedGraph {
describe("WorkflowReactiveRoot", () => { describe("WorkflowReactiveRoot", () => {
describe("constructor and initializeSignals", () => { describe("constructor and initializeSignals", () => {
it("initializes all nodes with idle status", () => { it("root node starts as ready, downstream nodes start as idle", () => {
const graph = makeSimpleGraph(); const graph = makeSimpleGraph();
const root = new WorkflowReactiveRoot(graph); const root = new WorkflowReactiveRoot(graph);
expect(root.statusMap.get("a")!.value).toBe("idle"); expect(root.statusMap.get("a")!.value).toBe("ready");
expect(root.statusMap.get("b")!.value).toBe("idle"); expect(root.statusMap.get("b")!.value).toBe("idle");
expect(root.statusMap.get("c")!.value).toBe("idle"); expect(root.statusMap.get("c")!.value).toBe("idle");
@@ -159,7 +159,7 @@ describe("WorkflowReactiveRoot", () => {
const graph = makeSimpleGraph(); const graph = makeSimpleGraph();
const root = new WorkflowReactiveRoot(graph); const root = new WorkflowReactiveRoot(graph);
root.nodeKeyToRequestId.set("a", "req-1"); root.setRequestId("a", "req-1");
root.append({ root.append({
type: "call.requested", type: "call.requested",
requestId: "req-1", requestId: "req-1",
@@ -177,7 +177,7 @@ describe("WorkflowReactiveRoot", () => {
const graph = makeSimpleGraph(); const graph = makeSimpleGraph();
const root = new WorkflowReactiveRoot(graph); const root = new WorkflowReactiveRoot(graph);
root.nodeKeyToRequestId.set("a", "req-1"); root.setRequestId("a", "req-1");
root.append({ root.append({
type: "call.requested", type: "call.requested",
requestId: "req-1", requestId: "req-1",
@@ -201,7 +201,7 @@ describe("WorkflowReactiveRoot", () => {
const graph = makeSimpleGraph(); const graph = makeSimpleGraph();
const root = new WorkflowReactiveRoot(graph); const root = new WorkflowReactiveRoot(graph);
root.nodeKeyToRequestId.set("a", "req-1"); root.setRequestId("a", "req-1");
root.append({ root.append({
type: "call.requested", type: "call.requested",
requestId: "req-1", requestId: "req-1",
@@ -225,7 +225,7 @@ describe("WorkflowReactiveRoot", () => {
const graph = makeSimpleGraph(); const graph = makeSimpleGraph();
const root = new WorkflowReactiveRoot(graph); const root = new WorkflowReactiveRoot(graph);
root.nodeKeyToRequestId.set("a", "req-1"); root.setRequestId("a", "req-1");
root.append({ root.append({
type: "call.requested", type: "call.requested",
requestId: "req-1", requestId: "req-1",
@@ -256,7 +256,7 @@ describe("WorkflowReactiveRoot", () => {
timestamp: "2026-01-01T00:00:00Z", timestamp: "2026-01-01T00:00:00Z",
}); });
expect(root.statusMap.get("a")!.value).toBe("idle"); expect(root.statusMap.get("a")!.value).toBe("ready");
root.dispose(); root.dispose();
}); });
@@ -265,7 +265,7 @@ describe("WorkflowReactiveRoot", () => {
const graph = makeSimpleGraph(); const graph = makeSimpleGraph();
const root = new WorkflowReactiveRoot(graph); const root = new WorkflowReactiveRoot(graph);
root.nodeKeyToRequestId.set("a", "req-1"); root.setRequestId("a", "req-1");
const respondedEvent: CallEventMapValue = { const respondedEvent: CallEventMapValue = {
type: "call.responded", type: "call.responded",
@@ -291,11 +291,11 @@ describe("WorkflowReactiveRoot", () => {
}); });
describe("getStatus", () => { describe("getStatus", () => {
it("returns idle for nodes with no events", () => { it("returns ready for root nodes with no events", () => {
const graph = makeSimpleGraph(); const graph = makeSimpleGraph();
const root = new WorkflowReactiveRoot(graph); const root = new WorkflowReactiveRoot(graph);
expect(root.getStatus("a")).toBe("idle"); expect(root.getStatus("a")).toBe("ready");
root.dispose(); root.dispose();
}); });
@@ -304,7 +304,7 @@ describe("WorkflowReactiveRoot", () => {
const graph = makeSimpleGraph(); const graph = makeSimpleGraph();
const root = new WorkflowReactiveRoot(graph); const root = new WorkflowReactiveRoot(graph);
root.nodeKeyToRequestId.set("a", "req-1"); root.setRequestId("a", "req-1");
root.append({ root.append({
type: "call.requested", type: "call.requested",
requestId: "req-1", requestId: "req-1",
@@ -322,9 +322,9 @@ describe("WorkflowReactiveRoot", () => {
const graph = makeSimpleGraph(); const graph = makeSimpleGraph();
const root = new WorkflowReactiveRoot(graph); const root = new WorkflowReactiveRoot(graph);
root.statusMap.get("a")!.value = "waiting"; root.statusMap.get("b")!.value = "waiting";
expect(root.getStatus("a")).toBe("waiting"); expect(root.getStatus("b")).toBe("waiting");
root.dispose(); root.dispose();
}); });
@@ -353,7 +353,7 @@ describe("WorkflowReactiveRoot", () => {
const graph = makeSimpleGraph(); const graph = makeSimpleGraph();
const root = new WorkflowReactiveRoot(graph); const root = new WorkflowReactiveRoot(graph);
root.nodeKeyToRequestId.set("a", "req-1"); root.setRequestId("a", "req-1");
root.append({ root.append({
type: "call.requested", type: "call.requested",
requestId: "req-1", requestId: "req-1",
@@ -380,7 +380,7 @@ describe("WorkflowReactiveRoot", () => {
const graph = makeSimpleGraph(); const graph = makeSimpleGraph();
const root = new WorkflowReactiveRoot(graph); const root = new WorkflowReactiveRoot(graph);
root.nodeKeyToRequestId.set("a", "req-1"); root.setRequestId("a", "req-1");
root.append({ root.append({
type: "call.requested", type: "call.requested",
requestId: "req-1", requestId: "req-1",
@@ -408,7 +408,7 @@ describe("WorkflowReactiveRoot", () => {
const graph = makeSimpleGraph(); const graph = makeSimpleGraph();
const root = new WorkflowReactiveRoot(graph); const root = new WorkflowReactiveRoot(graph);
root.nodeKeyToRequestId.set("a", "req-1"); root.setRequestId("a", "req-1");
root.append({ root.append({
type: "call.requested", type: "call.requested",
requestId: "req-1", requestId: "req-1",
@@ -434,7 +434,7 @@ describe("WorkflowReactiveRoot", () => {
const graph = makeSimpleGraph(); const graph = makeSimpleGraph();
const root = new WorkflowReactiveRoot(graph); const root = new WorkflowReactiveRoot(graph);
root.nodeKeyToRequestId.set("a", "req-1"); root.setRequestId("a", "req-1");
root.append({ root.append({
type: "call.requested", type: "call.requested",
requestId: "req-1", requestId: "req-1",
@@ -452,7 +452,7 @@ describe("WorkflowReactiveRoot", () => {
const graph = makeSimpleGraph(); const graph = makeSimpleGraph();
const root = new WorkflowReactiveRoot(graph); const root = new WorkflowReactiveRoot(graph);
root.nodeKeyToRequestId.set("a", "req-2"); root.setRequestId("a", "req-1");
root.append({ root.append({
type: "call.requested", type: "call.requested",
requestId: "req-1", requestId: "req-1",
@@ -466,6 +466,7 @@ describe("WorkflowReactiveRoot", () => {
error: { code: "ERR", message: "first attempt failed" }, error: { code: "ERR", message: "first attempt failed" },
timestamp: "2026-01-01T00:00:01Z", timestamp: "2026-01-01T00:00:01Z",
}); });
root.setRequestId("a", "req-2");
root.append({ root.append({
type: "call.requested", type: "call.requested",
requestId: "req-2", requestId: "req-2",
@@ -503,7 +504,7 @@ describe("WorkflowReactiveRoot", () => {
const graph = makeSimpleGraph(); const graph = makeSimpleGraph();
const root = new WorkflowReactiveRoot(graph); const root = new WorkflowReactiveRoot(graph);
root.nodeKeyToRequestId.set("a", "req-1"); root.setRequestId("a", "req-1");
root.append({ root.append({
type: "call.requested", type: "call.requested",
requestId: "req-1", requestId: "req-1",
@@ -527,6 +528,219 @@ describe("WorkflowReactiveRoot", () => {
}); });
}); });
describe("retry semantics", () => {
it("nodeKeyToRequestId overwrites previous requestId — projection tracks latest attempt", () => {
const graph = makeSimpleGraph();
const root = new WorkflowReactiveRoot(graph);
root.setRequestId("a", "req-1");
expect(root.nodeKeyToRequestId.get("a")).toBe("req-1");
root.setRequestId("a", "req-2");
expect(root.nodeKeyToRequestId.get("a")).toBe("req-2");
expect(root.requestIdToNodeKey.get("req-2")).toBe("a");
root.dispose();
});
it("retry sequence: error then requested then responded → status is completed", () => {
const graph = makeSimpleGraph();
const root = new WorkflowReactiveRoot(graph);
root.setRequestId("a", "req-1");
root.append({
type: "call.requested",
requestId: "req-1",
operationId: "a",
input: null,
timestamp: "2026-01-01T00:00:00Z",
});
root.append({
type: "call.error",
requestId: "req-1",
error: { code: "ERR", message: "failed" },
timestamp: "2026-01-01T00:00:01Z",
});
expect(root.getStatus("a")).toBe("failed");
root.setRequestId("a", "req-2");
root.append({
type: "call.requested",
requestId: "req-2",
operationId: "a",
input: null,
timestamp: "2026-01-01T00:00:02Z",
});
expect(root.getStatus("a")).toBe("running");
root.append({
type: "call.responded",
requestId: "req-2",
output: "ok",
timestamp: "2026-01-01T00:00:03Z",
});
expect(root.getStatus("a")).toBe("completed");
root.dispose();
});
it("getResult reflects latest attempt after retry", () => {
const graph = makeSimpleGraph();
const root = new WorkflowReactiveRoot(graph);
root.setRequestId("a", "req-1");
root.append({
type: "call.requested",
requestId: "req-1",
operationId: "a",
input: null,
timestamp: "2026-01-01T00:00:00Z",
});
root.append({
type: "call.error",
requestId: "req-1",
error: { code: "ERR", message: "failed" },
timestamp: "2026-01-01T00:00:01Z",
});
root.setRequestId("a", "req-2");
root.append({
type: "call.requested",
requestId: "req-2",
operationId: "a",
input: null,
timestamp: "2026-01-01T00:00:02Z",
});
root.append({
type: "call.responded",
requestId: "req-2",
output: { retry: true },
timestamp: "2026-01-01T00:00:03Z",
});
const result = root.getResult("a");
expect(result).toBeDefined();
expect(result!.status).toBe("completed");
expect(result!.output).toEqual({ retry: true });
root.dispose();
});
it("event log preserves full history across retries", () => {
const graph = makeSimpleGraph();
const root = new WorkflowReactiveRoot(graph);
root.setRequestId("a", "req-1");
root.append({
type: "call.requested",
requestId: "req-1",
operationId: "a",
input: null,
timestamp: "2026-01-01T00:00:00Z",
});
root.append({
type: "call.error",
requestId: "req-1",
error: { code: "ERR", message: "failed" },
timestamp: "2026-01-01T00:00:01Z",
});
root.setRequestId("a", "req-2");
root.append({
type: "call.requested",
requestId: "req-2",
operationId: "a",
input: null,
timestamp: "2026-01-01T00:00:02Z",
});
root.append({
type: "call.responded",
requestId: "req-2",
output: "ok",
timestamp: "2026-01-01T00:00:03Z",
});
const events = root.getEvents("a");
expect(events.length).toBe(4);
expect(events[0]!.type).toBe("call.requested");
expect(events[0]!.requestId).toBe("req-1");
expect(events[1]!.type).toBe("call.error");
expect(events[1]!.requestId).toBe("req-1");
expect(events[2]!.type).toBe("call.requested");
expect(events[2]!.requestId).toBe("req-2");
expect(events[3]!.type).toBe("call.responded");
expect(events[3]!.requestId).toBe("req-2");
root.dispose();
});
it("retry clears failed status so downstream preconditions can be met", () => {
const graph = makeSimpleGraph();
const root = new WorkflowReactiveRoot(graph);
root.setRequestId("a", "req-1");
root.setRequestId("b", "req-b");
root.append({
type: "call.requested",
requestId: "req-1",
operationId: "a",
input: null,
timestamp: "2026-01-01T00:00:00Z",
});
root.append({
type: "call.error",
requestId: "req-1",
error: { code: "ERR", message: "failed" },
timestamp: "2026-01-01T00:00:01Z",
});
expect(root.blockedByFailure.get("b")!.value).toBe(true);
root.setRequestId("a", "req-2");
root.append({
type: "call.requested",
requestId: "req-2",
operationId: "a",
input: null,
timestamp: "2026-01-01T00:00:02Z",
});
root.append({
type: "call.responded",
requestId: "req-2",
output: "ok",
timestamp: "2026-01-01T00:00:03Z",
});
expect(root.getStatus("a")).toBe("completed");
expect(root.blockedByFailure.get("b")!.value).toBe(false);
root.dispose();
});
it("requestIdToNodeKey preserves all requestIds including previous attempts", () => {
const graph = makeSimpleGraph();
const root = new WorkflowReactiveRoot(graph);
root.setRequestId("a", "req-1");
root.setRequestId("a", "req-2");
expect(root.requestIdToNodeKey.get("req-1")).toBe("a");
expect(root.requestIdToNodeKey.get("req-2")).toBe("a");
root.dispose();
});
it("dispose clears requestIdToNodeKey", () => {
const graph = makeSimpleGraph();
const root = new WorkflowReactiveRoot(graph);
root.setRequestId("a", "req-1");
root.setRequestId("a", "req-2");
expect(root.requestIdToNodeKey.size).toBe(2);
root.dispose();
expect(root.requestIdToNodeKey.size).toBe(0);
});
});
describe("abort cascade", () => { describe("abort cascade", () => {
it("failed node causes downstream dependents to abort (continue-running default)", () => { it("failed node causes downstream dependents to abort (continue-running default)", () => {
const graph = makeSimpleGraph(); const graph = makeSimpleGraph();
@@ -840,11 +1054,11 @@ describe("WorkflowReactiveRoot", () => {
const graph = makeSimpleGraph(); const graph = makeSimpleGraph();
const root = new WorkflowReactiveRoot(graph); const root = new WorkflowReactiveRoot(graph);
root.nodeKeyToRequestId.set("a", "req-a"); root.setRequestId("a", "req-a");
root.nodeKeyToRequestId.set("b", "req-b"); root.setRequestId("b", "req-b");
root.nodeKeyToRequestId.set("c", "req-c"); root.setRequestId("c", "req-c");
expect(root.getStatus("a")).toBe("idle"); expect(root.getStatus("a")).toBe("ready");
expect(root.getStatus("b")).toBe("idle"); expect(root.getStatus("b")).toBe("idle");
expect(root.getStatus("c")).toBe("idle"); expect(root.getStatus("c")).toBe("idle");