From 1a1241022933e305032b6e4571b83558914559e1 Mon Sep 17 00:00:00 2001 From: "glm-5.1" Date: Thu, 21 May 2026 22:29:02 +0000 Subject: [PATCH] feat: implement maxConcurrency reactive counting semaphore with 20 unit tests --- src/reactive/index.ts | 2 + src/reactive/node-status.ts | 4 +- src/reactive/workflow.ts | 48 ++- test/reactive/max-concurrency.test.ts | 482 ++++++++++++++++++++++++++ 4 files changed, 531 insertions(+), 5 deletions(-) create mode 100644 test/reactive/max-concurrency.test.ts diff --git a/src/reactive/index.ts b/src/reactive/index.ts index a295403..70aca3d 100644 --- a/src/reactive/index.ts +++ b/src/reactive/index.ts @@ -9,6 +9,8 @@ export { type CallCompletedEvent, type EventLogProjection, type AggregateStatus, + type ParallelGroup, + type ParallelGroupConfig, } from "./workflow.js"; export { diff --git a/src/reactive/node-status.ts b/src/reactive/node-status.ts index f9b7cc8..67d649c 100644 --- a/src/reactive/node-status.ts +++ b/src/reactive/node-status.ts @@ -39,11 +39,11 @@ export function computeBlockedByFailure( export function registerStartEffect( status: Signal, - preconditions: ReadonlySignal, + canStart: ReadonlySignal, effectDisposers: (() => void)[], ): void { const disposer = effect(() => { - if (preconditions.value) { + if (canStart.value) { const current = status.value; if (current === "idle" || current === "waiting") { status.value = "ready"; diff --git a/src/reactive/workflow.ts b/src/reactive/workflow.ts index d7f5d49..6be9168 100644 --- a/src/reactive/workflow.ts +++ b/src/reactive/workflow.ts @@ -13,6 +13,15 @@ import type { NodeStatusContext } from "./node-status.js"; export type FailurePolicy = "continue-running" | "abort-dependents"; +export interface ParallelGroup { + siblings: string[]; + maxConcurrency?: number; +} + +export interface ParallelGroupConfig { + [groupKey: string]: ParallelGroup; +} + export interface CallRequestedEvent { type: "call.requested"; requestId: string; @@ -92,6 +101,7 @@ const EVENT_TO_STATUS: Record = { export class WorkflowReactiveRoot implements EventLogProjection { statusMap: Map>; preconditions: Map>; + canStart: Map>; blockedByFailure: Map>; resultMap: Map>; nodeKeyToRequestId: Map; @@ -101,14 +111,16 @@ export class WorkflowReactiveRoot implements EventLogProjection { private effectDisposers: (() => void)[]; private eventLog: CallEventMapValue[]; private _failurePolicy: FailurePolicy; + private _parallelGroups: ParallelGroupConfig; constructor( graph: DirectedGraph, - options?: { failurePolicy?: FailurePolicy }, + options?: { failurePolicy?: FailurePolicy; parallelGroups?: ParallelGroupConfig }, ) { this.graph = graph; this.statusMap = new Map(); this.preconditions = new Map(); + this.canStart = new Map(); this.blockedByFailure = new Map(); this.resultMap = new Map(); this.effectDisposers = []; @@ -116,6 +128,7 @@ export class WorkflowReactiveRoot implements EventLogProjection { this.nodeKeyToRequestId = new Map(); this.requestIdToNodeKey = new Map(); this._failurePolicy = options?.failurePolicy ?? "continue-running"; + this._parallelGroups = options?.parallelGroups ?? {}; this.initializeSignals(); } @@ -125,6 +138,13 @@ export class WorkflowReactiveRoot implements EventLogProjection { } private initializeSignals(): void { + const nodeToGroupKey = new Map(); + for (const [groupKey, group] of Object.entries(this._parallelGroups)) { + for (const sibling of group.siblings) { + nodeToGroupKey.set(sibling, groupKey); + } + } + for (const node of this.graph.nodes()) { const predecessors: string[] = this.graph.inNeighbors(node) ?? []; @@ -139,6 +159,26 @@ export class WorkflowReactiveRoot implements EventLogProjection { return computePreconditions(node, ctx); }); + const groupKey = nodeToGroupKey.get(node); + const parallelGroup = groupKey ? this._parallelGroups[groupKey] : undefined; + const maxConc = parallelGroup?.maxConcurrency; + const siblings = parallelGroup?.siblings ?? []; + + let canStartComputed: ReadonlySignal; + if (maxConc !== undefined && siblings.length > 0) { + const otherSiblings = siblings.filter((s) => s !== node); + canStartComputed = computed(() => { + if (!preconditionsComputed.value) return false; + const activeSiblingCount = otherSiblings.filter((sib) => { + const sibStatus = this.statusMap.get(sib); + return sibStatus && (sibStatus.value === "running" || sibStatus.value === "ready"); + }).length; + return activeSiblingCount < maxConc; + }); + } else { + canStartComputed = preconditionsComputed; + } + const blockedByFailureComputed = computed(() => { return computeBlockedByFailure(node, ctx); }); @@ -189,16 +229,17 @@ export class WorkflowReactiveRoot implements EventLogProjection { this.statusMap.set(node, status); this.preconditions.set(node, preconditionsComputed); + this.canStart.set(node, canStartComputed); this.blockedByFailure.set(node, blockedByFailureComputed); this.resultMap.set(node, resultComputed); } for (const node of this.graph.nodes()) { const status = this.statusMap.get(node)!; - const preconditions = this.preconditions.get(node)!; + const canStart = this.canStart.get(node)!; const blocked = this.blockedByFailure.get(node)!; - registerStartEffect(status, preconditions, this.effectDisposers); + registerStartEffect(status, canStart, this.effectDisposers); registerAbortEffect(status, blocked, this.effectDisposers, { abortDependents: this._failurePolicy === "abort-dependents", }); @@ -338,6 +379,7 @@ export class WorkflowReactiveRoot implements EventLogProjection { this.effectDisposers = []; this.statusMap.clear(); this.preconditions.clear(); + this.canStart.clear(); this.blockedByFailure.clear(); this.resultMap.clear(); this.nodeKeyToRequestId.clear(); diff --git a/test/reactive/max-concurrency.test.ts b/test/reactive/max-concurrency.test.ts new file mode 100644 index 0000000..fb57c33 --- /dev/null +++ b/test/reactive/max-concurrency.test.ts @@ -0,0 +1,482 @@ +import { describe, it, expect } from "vitest"; +import { DirectedGraph } from "graphology"; +import { WorkflowReactiveRoot } from "../../src/reactive/workflow.js"; +import type { ParallelGroupConfig } from "../../src/reactive/workflow.js"; + +function makeParallelGroupGraph(): DirectedGraph { + const graph = new DirectedGraph(); + graph.addNode("entry", { name: "entry" }); + graph.addNode("a", { name: "a" }); + graph.addNode("b", { name: "b" }); + graph.addNode("c", { name: "c" }); + graph.addNode("d", { name: "d" }); + graph.addEdgeWithKey("entry->a", "entry", "a", { edgeType: "sequential" }); + graph.addEdgeWithKey("entry->b", "entry", "b", { edgeType: "sequential" }); + graph.addEdgeWithKey("entry->c", "entry", "c", { edgeType: "sequential" }); + graph.addEdgeWithKey("entry->d", "entry", "d", { edgeType: "sequential" }); + return graph; +} + +function makeParallelGroupNoEntry(): DirectedGraph { + const graph = new DirectedGraph(); + graph.addNode("a", { name: "a" }); + graph.addNode("b", { name: "b" }); + graph.addNode("c", { name: "c" }); + return graph; +} + +describe("maxConcurrency semaphore", () => { + describe("parallel group with maxConcurrency: 2", () => { + const parallelGroups: ParallelGroupConfig = { + group1: { + siblings: ["a", "b", "c", "d"], + maxConcurrency: 2, + }, + }; + + it("limits running siblings to maxConcurrency", () => { + const graph = makeParallelGroupGraph(); + const root = new WorkflowReactiveRoot(graph, { parallelGroups }); + + expect(root.canStart.has("a")).toBe(true); + expect(root.canStart.has("b")).toBe(true); + expect(root.canStart.has("c")).toBe(true); + expect(root.canStart.has("d")).toBe(true); + + expect(root.statusMap.get("entry")!.value).toBe("ready"); + root.statusMap.get("entry")!.value = "completed"; + + expect(root.preconditions.get("a")!.value).toBe(true); + expect(root.preconditions.get("b")!.value).toBe(true); + expect(root.preconditions.get("c")!.value).toBe(true); + expect(root.preconditions.get("d")!.value).toBe(true); + + expect(root.canStart.get("a")!.value).toBe(true); + expect(root.canStart.get("b")!.value).toBe(true); + + root.dispose(); + }); + + it("canStart becomes false when maxConcurrency siblings are running", () => { + const graph = makeParallelGroupGraph(); + const root = new WorkflowReactiveRoot(graph, { parallelGroups }); + + root.statusMap.get("entry")!.value = "completed"; + + root.statusMap.get("a")!.value = "running"; + root.statusMap.get("b")!.value = "running"; + + expect(root.canStart.get("a")!.value).toBe(true); + expect(root.canStart.get("b")!.value).toBe(true); + expect(root.canStart.get("c")!.value).toBe(false); + expect(root.canStart.get("d")!.value).toBe(false); + + root.dispose(); + }); + + it("blocked nodes by semaphore stay idle (not ready)", () => { + const graph = makeParallelGroupGraph(); + const root = new WorkflowReactiveRoot(graph, { parallelGroups }); + + root.statusMap.get("entry")!.value = "completed"; + + root.statusMap.get("a")!.value = "running"; + root.statusMap.get("b")!.value = "running"; + + expect(root.statusMap.get("c")!.value).toBe("idle"); + expect(root.statusMap.get("d")!.value).toBe("idle"); + + root.dispose(); + }); + + it("slot opens when a running sibling completes", () => { + const graph = makeParallelGroupGraph(); + const root = new WorkflowReactiveRoot(graph, { parallelGroups }); + + root.statusMap.get("entry")!.value = "completed"; + root.statusMap.get("a")!.value = "running"; + root.statusMap.get("b")!.value = "running"; + + expect(root.canStart.get("c")!.value).toBe(false); + + root.statusMap.get("a")!.value = "completed"; + + expect(root.canStart.get("c")!.value).toBe(true); + expect(root.canStart.get("d")!.value).toBe(false); + + expect(root.statusMap.get("c")!.value).toBe("ready"); + + root.dispose(); + }); + + it("all slots eventually open as siblings complete", () => { + const graph = makeParallelGroupGraph(); + const root = new WorkflowReactiveRoot(graph, { parallelGroups }); + + root.statusMap.get("entry")!.value = "completed"; + root.statusMap.get("a")!.value = "running"; + root.statusMap.get("b")!.value = "running"; + + root.statusMap.get("a")!.value = "completed"; + expect(root.canStart.get("c")!.value).toBe(true); + + root.statusMap.get("b")!.value = "completed"; + expect(root.canStart.get("d")!.value).toBe(true); + + root.dispose(); + }); + + it("canStart correctly limits when sibling transitions to ready", () => { + const graph = makeParallelGroupGraph(); + const root = new WorkflowReactiveRoot(graph, { parallelGroups }); + + root.statusMap.get("entry")!.value = "completed"; + + root.statusMap.get("a")!.value = "ready"; + root.statusMap.get("b")!.value = "ready"; + + expect(root.canStart.get("c")!.value).toBe(false); + expect(root.canStart.get("d")!.value).toBe(false); + + root.statusMap.get("a")!.value = "running"; + root.statusMap.get("b")!.value = "running"; + + expect(root.canStart.get("c")!.value).toBe(false); + expect(root.canStart.get("d")!.value).toBe(false); + + root.dispose(); + }); + }); + + describe("parallel group without maxConcurrency", () => { + const parallelGroups: ParallelGroupConfig = { + group1: { + siblings: ["a", "b", "c", "d"], + }, + }; + + it("all siblings start immediately when preconditions are met", () => { + const graph = makeParallelGroupGraph(); + const root = new WorkflowReactiveRoot(graph, { parallelGroups }); + + root.statusMap.get("entry")!.value = "completed"; + + expect(root.canStart.get("a")!.value).toBe(true); + expect(root.canStart.get("b")!.value).toBe(true); + expect(root.canStart.get("c")!.value).toBe(true); + expect(root.canStart.get("d")!.value).toBe(true); + + expect(root.statusMap.get("a")!.value).toBe("ready"); + expect(root.statusMap.get("b")!.value).toBe("ready"); + expect(root.statusMap.get("c")!.value).toBe("ready"); + expect(root.statusMap.get("d")!.value).toBe("ready"); + + root.dispose(); + }); + + it("no semaphore — running siblings do not block others", () => { + const graph = makeParallelGroupGraph(); + const root = new WorkflowReactiveRoot(graph, { parallelGroups }); + + root.statusMap.get("entry")!.value = "completed"; + root.statusMap.get("a")!.value = "running"; + root.statusMap.get("b")!.value = "running"; + + expect(root.canStart.get("c")!.value).toBe(true); + expect(root.canStart.get("d")!.value).toBe(true); + + root.dispose(); + }); + }); + + describe("no parallelGroups config", () => { + it("all nodes start normally without semaphore", () => { + const graph = makeParallelGroupGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.statusMap.get("entry")!.value = "completed"; + + expect(root.canStart.get("a")!.value).toBe(true); + expect(root.canStart.get("b")!.value).toBe(true); + expect(root.canStart.get("c")!.value).toBe(true); + expect(root.canStart.get("d")!.value).toBe(true); + + expect(root.statusMap.get("a")!.value).toBe("ready"); + expect(root.statusMap.get("b")!.value).toBe("ready"); + expect(root.statusMap.get("c")!.value).toBe("ready"); + expect(root.statusMap.get("d")!.value).toBe("ready"); + + root.dispose(); + }); + }); + + describe("maxConcurrency: 1 (serial within parallel)", () => { + const parallelGroups: ParallelGroupConfig = { + group1: { + siblings: ["a", "b", "c"], + maxConcurrency: 1, + }, + }; + + it("only one sibling can run at a time", () => { + const graph = makeParallelGroupGraph(); + const root = new WorkflowReactiveRoot(graph, { parallelGroups }); + + root.statusMap.get("entry")!.value = "completed"; + + const canStartA = root.canStart.get("a")!; + const canStartB = root.canStart.get("b")!; + const canStartC = root.canStart.get("c")!; + + expect(canStartA.value).toBe(true); + expect(canStartB.value).toBe(false); + expect(canStartC.value).toBe(false); + + root.statusMap.get("a")!.value = "running"; + + expect(canStartA.value).toBe(true); + expect(canStartB.value).toBe(false); + expect(canStartC.value).toBe(false); + + root.dispose(); + }); + + it("next sibling can start when current finishes", () => { + const graph = makeParallelGroupGraph(); + const root = new WorkflowReactiveRoot(graph, { parallelGroups }); + + root.statusMap.get("entry")!.value = "completed"; + + root.statusMap.get("a")!.value = "running"; + root.statusMap.get("a")!.value = "completed"; + + expect(root.canStart.get("b")!.value).toBe(true); + expect(root.canStart.get("c")!.value).toBe(false); + + root.dispose(); + }); + }); + + describe("root-level parallel group with maxConcurrency", () => { + it("limits concurrent start for root nodes with no predecessors", () => { + const graph = makeParallelGroupNoEntry(); + const parallelGroups: ParallelGroupConfig = { + root: { + siblings: ["a", "b", "c"], + maxConcurrency: 2, + }, + }; + const root = new WorkflowReactiveRoot(graph, { parallelGroups }); + + expect(root.canStart.get("a")!.value).toBe(true); + expect(root.canStart.get("b")!.value).toBe(true); + expect(root.canStart.get("c")!.value).toBe(false); + + root.dispose(); + }); + + it("third node becomes ready when a slot opens", () => { + const graph = makeParallelGroupNoEntry(); + const parallelGroups: ParallelGroupConfig = { + root: { + siblings: ["a", "b", "c"], + maxConcurrency: 2, + }, + }; + const root = new WorkflowReactiveRoot(graph, { parallelGroups }); + + root.statusMap.get("a")!.value = "running"; + root.statusMap.get("b")!.value = "running"; + + expect(root.canStart.get("c")!.value).toBe(false); + expect(root.statusMap.get("c")!.value).toBe("idle"); + + root.statusMap.get("a")!.value = "completed"; + + expect(root.canStart.get("c")!.value).toBe(true); + expect(root.statusMap.get("c")!.value).toBe("ready"); + + root.dispose(); + }); + }); + + describe("preconditions vs canStart", () => { + it("canStart is false when preconditions are not met even if semaphore has slots", () => { + const graph = makeParallelGroupGraph(); + const parallelGroups: ParallelGroupConfig = { + group1: { + siblings: ["a", "b", "c", "d"], + maxConcurrency: 2, + }, + }; + const root = new WorkflowReactiveRoot(graph, { parallelGroups }); + + expect(root.preconditions.get("a")!.value).toBe(false); + expect(root.canStart.get("a")!.value).toBe(false); + + root.dispose(); + }); + + it("canStart requires both preconditions AND semaphore slot", () => { + const graph = makeParallelGroupGraph(); + const parallelGroups: ParallelGroupConfig = { + group1: { + siblings: ["a", "b", "c", "d"], + maxConcurrency: 2, + }, + }; + const root = new WorkflowReactiveRoot(graph, { parallelGroups }); + + root.statusMap.get("entry")!.value = "completed"; + + expect(root.preconditions.get("a")!.value).toBe(true); + expect(root.canStart.get("a")!.value).toBe(true); + + root.statusMap.get("a")!.value = "running"; + root.statusMap.get("b")!.value = "running"; + + expect(root.preconditions.get("c")!.value).toBe(true); + expect(root.canStart.get("c")!.value).toBe(false); + + root.dispose(); + }); + }); + + describe("failed sibling does not count as running", () => { + it("failed sibling frees a slot for semaphore", () => { + const graph = makeParallelGroupGraph(); + const parallelGroups: ParallelGroupConfig = { + group1: { + siblings: ["a", "b", "c", "d"], + maxConcurrency: 2, + }, + }; + const root = new WorkflowReactiveRoot(graph, { parallelGroups }); + + root.statusMap.get("entry")!.value = "completed"; + root.statusMap.get("a")!.value = "running"; + root.statusMap.get("b")!.value = "running"; + + expect(root.canStart.get("c")!.value).toBe(false); + + root.statusMap.get("b")!.value = "failed"; + + expect(root.canStart.get("c")!.value).toBe(true); + + root.dispose(); + }); + + it("completed sibling frees a slot for semaphore", () => { + const graph = makeParallelGroupGraph(); + const parallelGroups: ParallelGroupConfig = { + group1: { + siblings: ["a", "b", "c", "d"], + maxConcurrency: 2, + }, + }; + const root = new WorkflowReactiveRoot(graph, { parallelGroups }); + + root.statusMap.get("entry")!.value = "completed"; + root.statusMap.get("a")!.value = "running"; + root.statusMap.get("b")!.value = "running"; + + root.statusMap.get("b")!.value = "completed"; + + expect(root.canStart.get("c")!.value).toBe(true); + expect(root.canStart.get("d")!.value).toBe(false); + + root.dispose(); + }); + + it("aborted sibling frees a slot for semaphore", () => { + const graph = makeParallelGroupGraph(); + const parallelGroups: ParallelGroupConfig = { + group1: { + siblings: ["a", "b", "c", "d"], + maxConcurrency: 2, + }, + }; + const root = new WorkflowReactiveRoot(graph, { parallelGroups }); + + root.statusMap.get("entry")!.value = "completed"; + root.statusMap.get("a")!.value = "running"; + root.statusMap.get("b")!.value = "running"; + + root.statusMap.get("b")!.value = "aborted"; + + expect(root.canStart.get("c")!.value).toBe(true); + + root.dispose(); + }); + }); + + describe("dispose clears canStart", () => { + it("canStart map is cleared on dispose", () => { + const graph = makeParallelGroupGraph(); + const parallelGroups: ParallelGroupConfig = { + group1: { + siblings: ["a", "b", "c", "d"], + maxConcurrency: 2, + }, + }; + const root = new WorkflowReactiveRoot(graph, { parallelGroups }); + + expect(root.canStart.size).toBe(5); + + root.dispose(); + + expect(root.canStart.size).toBe(0); + }); + }); + + describe("full execution flow with maxConcurrency", () => { + it("simulates parallel execution with maxConcurrency: 2 — only 2 run at a time", () => { + const graph = makeParallelGroupGraph(); + const parallelGroups: ParallelGroupConfig = { + group1: { + siblings: ["a", "b", "c", "d"], + maxConcurrency: 2, + }, + }; + const root = new WorkflowReactiveRoot(graph, { parallelGroups }); + + root.setRequestId("entry", "req-entry"); + root.setRequestId("a", "req-a"); + root.setRequestId("b", "req-b"); + root.setRequestId("c", "req-c"); + root.setRequestId("d", "req-d"); + + root.statusMap.get("entry")!.value = "completed"; + + expect(root.statusMap.get("a")!.value).toBe("ready"); + expect(root.statusMap.get("b")!.value).toBe("ready"); + expect(root.statusMap.get("c")!.value).toBe("idle"); + expect(root.statusMap.get("d")!.value).toBe("idle"); + + root.statusMap.get("a")!.value = "running"; + root.statusMap.get("b")!.value = "running"; + + expect(root.statusMap.get("c")!.value).toBe("idle"); + expect(root.statusMap.get("d")!.value).toBe("idle"); + + root.statusMap.get("a")!.value = "completed"; + + expect(root.statusMap.get("c")!.value).toBe("ready"); + expect(root.statusMap.get("d")!.value).toBe("idle"); + + root.statusMap.get("c")!.value = "running"; + + root.statusMap.get("b")!.value = "completed"; + + expect(root.statusMap.get("d")!.value).toBe("ready"); + + root.statusMap.get("d")!.value = "running"; + root.statusMap.get("c")!.value = "completed"; + root.statusMap.get("d")!.value = "completed"; + + expect(root.isComplete()).toBe(true); + + root.dispose(); + }); + }); +}); \ No newline at end of file