import { describe, it, expect } from "vitest"; import { DirectedGraph } from "graphology"; import { WorkflowReactiveRoot } from "../../src/reactive/workflow.js"; import type { ParallelGroupConfig } from "../../src/reactive/workflow.js"; function makeParallelGroupGraph(): DirectedGraph { const graph = new DirectedGraph(); graph.addNode("entry", { name: "entry" }); graph.addNode("a", { name: "a" }); graph.addNode("b", { name: "b" }); graph.addNode("c", { name: "c" }); graph.addNode("d", { name: "d" }); graph.addEdgeWithKey("entry->a", "entry", "a", { edgeType: "sequential" }); graph.addEdgeWithKey("entry->b", "entry", "b", { edgeType: "sequential" }); graph.addEdgeWithKey("entry->c", "entry", "c", { edgeType: "sequential" }); graph.addEdgeWithKey("entry->d", "entry", "d", { edgeType: "sequential" }); return graph; } function makeParallelGroupNoEntry(): DirectedGraph { const graph = new DirectedGraph(); graph.addNode("a", { name: "a" }); graph.addNode("b", { name: "b" }); graph.addNode("c", { name: "c" }); return graph; } describe("maxConcurrency semaphore", () => { describe("parallel group with maxConcurrency: 2", () => { const parallelGroups: ParallelGroupConfig = { group1: { siblings: ["a", "b", "c", "d"], maxConcurrency: 2, }, }; it("limits running siblings to maxConcurrency", () => { const graph = makeParallelGroupGraph(); const root = new WorkflowReactiveRoot(graph, { parallelGroups }); expect(root.canStart.has("a")).toBe(true); expect(root.canStart.has("b")).toBe(true); expect(root.canStart.has("c")).toBe(true); expect(root.canStart.has("d")).toBe(true); expect(root.statusMap.get("entry")!.value).toBe("ready"); root.statusMap.get("entry")!.value = "completed"; expect(root.preconditions.get("a")!.value).toBe(true); expect(root.preconditions.get("b")!.value).toBe(true); expect(root.preconditions.get("c")!.value).toBe(true); expect(root.preconditions.get("d")!.value).toBe(true); expect(root.canStart.get("a")!.value).toBe(true); expect(root.canStart.get("b")!.value).toBe(true); root.dispose(); }); it("canStart becomes false when maxConcurrency siblings are running", () => { const graph = makeParallelGroupGraph(); const root = new WorkflowReactiveRoot(graph, { parallelGroups }); root.statusMap.get("entry")!.value = "completed"; root.statusMap.get("a")!.value = "running"; root.statusMap.get("b")!.value = "running"; expect(root.canStart.get("a")!.value).toBe(true); expect(root.canStart.get("b")!.value).toBe(true); expect(root.canStart.get("c")!.value).toBe(false); expect(root.canStart.get("d")!.value).toBe(false); root.dispose(); }); it("blocked nodes by semaphore stay idle (not ready)", () => { const graph = makeParallelGroupGraph(); const root = new WorkflowReactiveRoot(graph, { parallelGroups }); root.statusMap.get("entry")!.value = "completed"; root.statusMap.get("a")!.value = "running"; root.statusMap.get("b")!.value = "running"; expect(root.statusMap.get("c")!.value).toBe("idle"); expect(root.statusMap.get("d")!.value).toBe("idle"); root.dispose(); }); it("slot opens when a running sibling completes", () => { const graph = makeParallelGroupGraph(); const root = new WorkflowReactiveRoot(graph, { parallelGroups }); root.statusMap.get("entry")!.value = "completed"; root.statusMap.get("a")!.value = "running"; root.statusMap.get("b")!.value = "running"; expect(root.canStart.get("c")!.value).toBe(false); root.statusMap.get("a")!.value = "completed"; expect(root.canStart.get("c")!.value).toBe(true); expect(root.canStart.get("d")!.value).toBe(false); expect(root.statusMap.get("c")!.value).toBe("ready"); root.dispose(); }); it("all slots eventually open as siblings complete", () => { const graph = makeParallelGroupGraph(); const root = new WorkflowReactiveRoot(graph, { parallelGroups }); root.statusMap.get("entry")!.value = "completed"; root.statusMap.get("a")!.value = "running"; root.statusMap.get("b")!.value = "running"; root.statusMap.get("a")!.value = "completed"; expect(root.canStart.get("c")!.value).toBe(true); root.statusMap.get("b")!.value = "completed"; expect(root.canStart.get("d")!.value).toBe(true); root.dispose(); }); it("canStart correctly limits when sibling transitions to ready", () => { const graph = makeParallelGroupGraph(); const root = new WorkflowReactiveRoot(graph, { parallelGroups }); root.statusMap.get("entry")!.value = "completed"; root.statusMap.get("a")!.value = "ready"; root.statusMap.get("b")!.value = "ready"; expect(root.canStart.get("c")!.value).toBe(false); expect(root.canStart.get("d")!.value).toBe(false); root.statusMap.get("a")!.value = "running"; root.statusMap.get("b")!.value = "running"; expect(root.canStart.get("c")!.value).toBe(false); expect(root.canStart.get("d")!.value).toBe(false); root.dispose(); }); }); describe("parallel group without maxConcurrency", () => { const parallelGroups: ParallelGroupConfig = { group1: { siblings: ["a", "b", "c", "d"], }, }; it("all siblings start immediately when preconditions are met", () => { const graph = makeParallelGroupGraph(); const root = new WorkflowReactiveRoot(graph, { parallelGroups }); root.statusMap.get("entry")!.value = "completed"; expect(root.canStart.get("a")!.value).toBe(true); expect(root.canStart.get("b")!.value).toBe(true); expect(root.canStart.get("c")!.value).toBe(true); expect(root.canStart.get("d")!.value).toBe(true); expect(root.statusMap.get("a")!.value).toBe("ready"); expect(root.statusMap.get("b")!.value).toBe("ready"); expect(root.statusMap.get("c")!.value).toBe("ready"); expect(root.statusMap.get("d")!.value).toBe("ready"); root.dispose(); }); it("no semaphore — running siblings do not block others", () => { const graph = makeParallelGroupGraph(); const root = new WorkflowReactiveRoot(graph, { parallelGroups }); root.statusMap.get("entry")!.value = "completed"; root.statusMap.get("a")!.value = "running"; root.statusMap.get("b")!.value = "running"; expect(root.canStart.get("c")!.value).toBe(true); expect(root.canStart.get("d")!.value).toBe(true); root.dispose(); }); }); describe("no parallelGroups config", () => { it("all nodes start normally without semaphore", () => { const graph = makeParallelGroupGraph(); const root = new WorkflowReactiveRoot(graph); root.statusMap.get("entry")!.value = "completed"; expect(root.canStart.get("a")!.value).toBe(true); expect(root.canStart.get("b")!.value).toBe(true); expect(root.canStart.get("c")!.value).toBe(true); expect(root.canStart.get("d")!.value).toBe(true); expect(root.statusMap.get("a")!.value).toBe("ready"); expect(root.statusMap.get("b")!.value).toBe("ready"); expect(root.statusMap.get("c")!.value).toBe("ready"); expect(root.statusMap.get("d")!.value).toBe("ready"); root.dispose(); }); }); describe("maxConcurrency: 1 (serial within parallel)", () => { const parallelGroups: ParallelGroupConfig = { group1: { siblings: ["a", "b", "c"], maxConcurrency: 1, }, }; it("only one sibling can run at a time", () => { const graph = makeParallelGroupGraph(); const root = new WorkflowReactiveRoot(graph, { parallelGroups }); root.statusMap.get("entry")!.value = "completed"; const canStartA = root.canStart.get("a")!; const canStartB = root.canStart.get("b")!; const canStartC = root.canStart.get("c")!; expect(canStartA.value).toBe(true); expect(canStartB.value).toBe(false); expect(canStartC.value).toBe(false); root.statusMap.get("a")!.value = "running"; expect(canStartA.value).toBe(true); expect(canStartB.value).toBe(false); expect(canStartC.value).toBe(false); root.dispose(); }); it("next sibling can start when current finishes", () => { const graph = makeParallelGroupGraph(); const root = new WorkflowReactiveRoot(graph, { parallelGroups }); root.statusMap.get("entry")!.value = "completed"; root.statusMap.get("a")!.value = "running"; root.statusMap.get("a")!.value = "completed"; expect(root.canStart.get("b")!.value).toBe(true); expect(root.canStart.get("c")!.value).toBe(false); root.dispose(); }); }); describe("root-level parallel group with maxConcurrency", () => { it("limits concurrent start for root nodes with no predecessors", () => { const graph = makeParallelGroupNoEntry(); const parallelGroups: ParallelGroupConfig = { root: { siblings: ["a", "b", "c"], maxConcurrency: 2, }, }; const root = new WorkflowReactiveRoot(graph, { parallelGroups }); expect(root.canStart.get("a")!.value).toBe(true); expect(root.canStart.get("b")!.value).toBe(true); expect(root.canStart.get("c")!.value).toBe(false); root.dispose(); }); it("third node becomes ready when a slot opens", () => { const graph = makeParallelGroupNoEntry(); const parallelGroups: ParallelGroupConfig = { root: { siblings: ["a", "b", "c"], maxConcurrency: 2, }, }; const root = new WorkflowReactiveRoot(graph, { parallelGroups }); root.statusMap.get("a")!.value = "running"; root.statusMap.get("b")!.value = "running"; expect(root.canStart.get("c")!.value).toBe(false); expect(root.statusMap.get("c")!.value).toBe("idle"); root.statusMap.get("a")!.value = "completed"; expect(root.canStart.get("c")!.value).toBe(true); expect(root.statusMap.get("c")!.value).toBe("ready"); root.dispose(); }); }); describe("preconditions vs canStart", () => { it("canStart is false when preconditions are not met even if semaphore has slots", () => { const graph = makeParallelGroupGraph(); const parallelGroups: ParallelGroupConfig = { group1: { siblings: ["a", "b", "c", "d"], maxConcurrency: 2, }, }; const root = new WorkflowReactiveRoot(graph, { parallelGroups }); expect(root.preconditions.get("a")!.value).toBe(false); expect(root.canStart.get("a")!.value).toBe(false); root.dispose(); }); it("canStart requires both preconditions AND semaphore slot", () => { const graph = makeParallelGroupGraph(); const parallelGroups: ParallelGroupConfig = { group1: { siblings: ["a", "b", "c", "d"], maxConcurrency: 2, }, }; const root = new WorkflowReactiveRoot(graph, { parallelGroups }); root.statusMap.get("entry")!.value = "completed"; expect(root.preconditions.get("a")!.value).toBe(true); expect(root.canStart.get("a")!.value).toBe(true); root.statusMap.get("a")!.value = "running"; root.statusMap.get("b")!.value = "running"; expect(root.preconditions.get("c")!.value).toBe(true); expect(root.canStart.get("c")!.value).toBe(false); root.dispose(); }); }); describe("failed sibling does not count as running", () => { it("failed sibling frees a slot for semaphore", () => { const graph = makeParallelGroupGraph(); const parallelGroups: ParallelGroupConfig = { group1: { siblings: ["a", "b", "c", "d"], maxConcurrency: 2, }, }; const root = new WorkflowReactiveRoot(graph, { parallelGroups }); root.statusMap.get("entry")!.value = "completed"; root.statusMap.get("a")!.value = "running"; root.statusMap.get("b")!.value = "running"; expect(root.canStart.get("c")!.value).toBe(false); root.statusMap.get("b")!.value = "failed"; expect(root.canStart.get("c")!.value).toBe(true); root.dispose(); }); it("completed sibling frees a slot for semaphore", () => { const graph = makeParallelGroupGraph(); const parallelGroups: ParallelGroupConfig = { group1: { siblings: ["a", "b", "c", "d"], maxConcurrency: 2, }, }; const root = new WorkflowReactiveRoot(graph, { parallelGroups }); root.statusMap.get("entry")!.value = "completed"; root.statusMap.get("a")!.value = "running"; root.statusMap.get("b")!.value = "running"; root.statusMap.get("b")!.value = "completed"; expect(root.canStart.get("c")!.value).toBe(true); expect(root.canStart.get("d")!.value).toBe(false); root.dispose(); }); it("aborted sibling frees a slot for semaphore", () => { const graph = makeParallelGroupGraph(); const parallelGroups: ParallelGroupConfig = { group1: { siblings: ["a", "b", "c", "d"], maxConcurrency: 2, }, }; const root = new WorkflowReactiveRoot(graph, { parallelGroups }); root.statusMap.get("entry")!.value = "completed"; root.statusMap.get("a")!.value = "running"; root.statusMap.get("b")!.value = "running"; root.statusMap.get("b")!.value = "aborted"; expect(root.canStart.get("c")!.value).toBe(true); root.dispose(); }); }); describe("dispose clears canStart", () => { it("canStart map is cleared on dispose", () => { const graph = makeParallelGroupGraph(); const parallelGroups: ParallelGroupConfig = { group1: { siblings: ["a", "b", "c", "d"], maxConcurrency: 2, }, }; const root = new WorkflowReactiveRoot(graph, { parallelGroups }); expect(root.canStart.size).toBe(5); root.dispose(); expect(root.canStart.size).toBe(0); }); }); describe("full execution flow with maxConcurrency", () => { it("simulates parallel execution with maxConcurrency: 2 — only 2 run at a time", () => { const graph = makeParallelGroupGraph(); const parallelGroups: ParallelGroupConfig = { group1: { siblings: ["a", "b", "c", "d"], maxConcurrency: 2, }, }; const root = new WorkflowReactiveRoot(graph, { parallelGroups }); root.setRequestId("entry", "req-entry"); root.setRequestId("a", "req-a"); root.setRequestId("b", "req-b"); root.setRequestId("c", "req-c"); root.setRequestId("d", "req-d"); root.statusMap.get("entry")!.value = "completed"; expect(root.statusMap.get("a")!.value).toBe("ready"); expect(root.statusMap.get("b")!.value).toBe("ready"); expect(root.statusMap.get("c")!.value).toBe("idle"); expect(root.statusMap.get("d")!.value).toBe("idle"); root.statusMap.get("a")!.value = "running"; root.statusMap.get("b")!.value = "running"; expect(root.statusMap.get("c")!.value).toBe("idle"); expect(root.statusMap.get("d")!.value).toBe("idle"); root.statusMap.get("a")!.value = "completed"; expect(root.statusMap.get("c")!.value).toBe("ready"); expect(root.statusMap.get("d")!.value).toBe("idle"); root.statusMap.get("c")!.value = "running"; root.statusMap.get("b")!.value = "completed"; expect(root.statusMap.get("d")!.value).toBe("ready"); root.statusMap.get("d")!.value = "running"; root.statusMap.get("c")!.value = "completed"; root.statusMap.get("d")!.value = "completed"; expect(root.isComplete()).toBe(true); root.dispose(); }); }); });