Merge branch 'feat/reactive/max-concurrency'
This commit is contained in:
@@ -9,6 +9,8 @@ export {
|
|||||||
type CallCompletedEvent,
|
type CallCompletedEvent,
|
||||||
type EventLogProjection,
|
type EventLogProjection,
|
||||||
type AggregateStatus,
|
type AggregateStatus,
|
||||||
|
type ParallelGroup,
|
||||||
|
type ParallelGroupConfig,
|
||||||
} from "./workflow.js";
|
} from "./workflow.js";
|
||||||
|
|
||||||
export {
|
export {
|
||||||
|
|||||||
@@ -39,11 +39,11 @@ export function computeBlockedByFailure(
|
|||||||
|
|
||||||
export function registerStartEffect(
|
export function registerStartEffect(
|
||||||
status: Signal<NodeStatus>,
|
status: Signal<NodeStatus>,
|
||||||
preconditions: ReadonlySignal<boolean>,
|
canStart: ReadonlySignal<boolean>,
|
||||||
effectDisposers: (() => void)[],
|
effectDisposers: (() => void)[],
|
||||||
): void {
|
): void {
|
||||||
const disposer = effect(() => {
|
const disposer = effect(() => {
|
||||||
if (preconditions.value) {
|
if (canStart.value) {
|
||||||
const current = status.value;
|
const current = status.value;
|
||||||
if (current === "idle" || current === "waiting") {
|
if (current === "idle" || current === "waiting") {
|
||||||
status.value = "ready";
|
status.value = "ready";
|
||||||
|
|||||||
@@ -13,6 +13,15 @@ import type { NodeStatusContext } from "./node-status.js";
|
|||||||
|
|
||||||
export type FailurePolicy = "continue-running" | "abort-dependents";
|
export type FailurePolicy = "continue-running" | "abort-dependents";
|
||||||
|
|
||||||
|
export interface ParallelGroup {
|
||||||
|
siblings: string[];
|
||||||
|
maxConcurrency?: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface ParallelGroupConfig {
|
||||||
|
[groupKey: string]: ParallelGroup;
|
||||||
|
}
|
||||||
|
|
||||||
export interface CallRequestedEvent {
|
export interface CallRequestedEvent {
|
||||||
type: "call.requested";
|
type: "call.requested";
|
||||||
requestId: string;
|
requestId: string;
|
||||||
@@ -92,6 +101,7 @@ const EVENT_TO_STATUS: Record<string, NodeStatus> = {
|
|||||||
export class WorkflowReactiveRoot implements EventLogProjection {
|
export class WorkflowReactiveRoot implements EventLogProjection {
|
||||||
statusMap: Map<string, Signal<NodeStatus>>;
|
statusMap: Map<string, Signal<NodeStatus>>;
|
||||||
preconditions: Map<string, ReadonlySignal<boolean>>;
|
preconditions: Map<string, ReadonlySignal<boolean>>;
|
||||||
|
canStart: Map<string, ReadonlySignal<boolean>>;
|
||||||
blockedByFailure: Map<string, ReadonlySignal<boolean>>;
|
blockedByFailure: Map<string, ReadonlySignal<boolean>>;
|
||||||
resultMap: Map<string, ReadonlySignal<CallResult | undefined>>;
|
resultMap: Map<string, ReadonlySignal<CallResult | undefined>>;
|
||||||
nodeKeyToRequestId: Map<string, string>;
|
nodeKeyToRequestId: Map<string, string>;
|
||||||
@@ -101,14 +111,16 @@ export class WorkflowReactiveRoot implements EventLogProjection {
|
|||||||
private effectDisposers: (() => void)[];
|
private effectDisposers: (() => void)[];
|
||||||
private eventLog: CallEventMapValue[];
|
private eventLog: CallEventMapValue[];
|
||||||
private _failurePolicy: FailurePolicy;
|
private _failurePolicy: FailurePolicy;
|
||||||
|
private _parallelGroups: ParallelGroupConfig;
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
graph: DirectedGraph,
|
graph: DirectedGraph,
|
||||||
options?: { failurePolicy?: FailurePolicy },
|
options?: { failurePolicy?: FailurePolicy; parallelGroups?: ParallelGroupConfig },
|
||||||
) {
|
) {
|
||||||
this.graph = graph;
|
this.graph = graph;
|
||||||
this.statusMap = new Map();
|
this.statusMap = new Map();
|
||||||
this.preconditions = new Map();
|
this.preconditions = new Map();
|
||||||
|
this.canStart = new Map();
|
||||||
this.blockedByFailure = new Map();
|
this.blockedByFailure = new Map();
|
||||||
this.resultMap = new Map();
|
this.resultMap = new Map();
|
||||||
this.effectDisposers = [];
|
this.effectDisposers = [];
|
||||||
@@ -116,6 +128,7 @@ export class WorkflowReactiveRoot implements EventLogProjection {
|
|||||||
this.nodeKeyToRequestId = new Map();
|
this.nodeKeyToRequestId = new Map();
|
||||||
this.requestIdToNodeKey = new Map();
|
this.requestIdToNodeKey = new Map();
|
||||||
this._failurePolicy = options?.failurePolicy ?? "continue-running";
|
this._failurePolicy = options?.failurePolicy ?? "continue-running";
|
||||||
|
this._parallelGroups = options?.parallelGroups ?? {};
|
||||||
this.initializeSignals();
|
this.initializeSignals();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -125,6 +138,13 @@ export class WorkflowReactiveRoot implements EventLogProjection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private initializeSignals(): void {
|
private initializeSignals(): void {
|
||||||
|
const nodeToGroupKey = new Map<string, string>();
|
||||||
|
for (const [groupKey, group] of Object.entries(this._parallelGroups)) {
|
||||||
|
for (const sibling of group.siblings) {
|
||||||
|
nodeToGroupKey.set(sibling, groupKey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for (const node of this.graph.nodes()) {
|
for (const node of this.graph.nodes()) {
|
||||||
const predecessors: string[] = this.graph.inNeighbors(node) ?? [];
|
const predecessors: string[] = this.graph.inNeighbors(node) ?? [];
|
||||||
|
|
||||||
@@ -139,6 +159,26 @@ export class WorkflowReactiveRoot implements EventLogProjection {
|
|||||||
return computePreconditions(node, ctx);
|
return computePreconditions(node, ctx);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const groupKey = nodeToGroupKey.get(node);
|
||||||
|
const parallelGroup = groupKey ? this._parallelGroups[groupKey] : undefined;
|
||||||
|
const maxConc = parallelGroup?.maxConcurrency;
|
||||||
|
const siblings = parallelGroup?.siblings ?? [];
|
||||||
|
|
||||||
|
let canStartComputed: ReadonlySignal<boolean>;
|
||||||
|
if (maxConc !== undefined && siblings.length > 0) {
|
||||||
|
const otherSiblings = siblings.filter((s) => s !== node);
|
||||||
|
canStartComputed = computed(() => {
|
||||||
|
if (!preconditionsComputed.value) return false;
|
||||||
|
const activeSiblingCount = otherSiblings.filter((sib) => {
|
||||||
|
const sibStatus = this.statusMap.get(sib);
|
||||||
|
return sibStatus && (sibStatus.value === "running" || sibStatus.value === "ready");
|
||||||
|
}).length;
|
||||||
|
return activeSiblingCount < maxConc;
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
canStartComputed = preconditionsComputed;
|
||||||
|
}
|
||||||
|
|
||||||
const blockedByFailureComputed = computed(() => {
|
const blockedByFailureComputed = computed(() => {
|
||||||
return computeBlockedByFailure(node, ctx);
|
return computeBlockedByFailure(node, ctx);
|
||||||
});
|
});
|
||||||
@@ -189,16 +229,17 @@ export class WorkflowReactiveRoot implements EventLogProjection {
|
|||||||
|
|
||||||
this.statusMap.set(node, status);
|
this.statusMap.set(node, status);
|
||||||
this.preconditions.set(node, preconditionsComputed);
|
this.preconditions.set(node, preconditionsComputed);
|
||||||
|
this.canStart.set(node, canStartComputed);
|
||||||
this.blockedByFailure.set(node, blockedByFailureComputed);
|
this.blockedByFailure.set(node, blockedByFailureComputed);
|
||||||
this.resultMap.set(node, resultComputed);
|
this.resultMap.set(node, resultComputed);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const node of this.graph.nodes()) {
|
for (const node of this.graph.nodes()) {
|
||||||
const status = this.statusMap.get(node)!;
|
const status = this.statusMap.get(node)!;
|
||||||
const preconditions = this.preconditions.get(node)!;
|
const canStart = this.canStart.get(node)!;
|
||||||
const blocked = this.blockedByFailure.get(node)!;
|
const blocked = this.blockedByFailure.get(node)!;
|
||||||
|
|
||||||
registerStartEffect(status, preconditions, this.effectDisposers);
|
registerStartEffect(status, canStart, this.effectDisposers);
|
||||||
registerAbortEffect(status, blocked, this.effectDisposers, {
|
registerAbortEffect(status, blocked, this.effectDisposers, {
|
||||||
abortDependents: this._failurePolicy === "abort-dependents",
|
abortDependents: this._failurePolicy === "abort-dependents",
|
||||||
});
|
});
|
||||||
@@ -338,6 +379,7 @@ export class WorkflowReactiveRoot implements EventLogProjection {
|
|||||||
this.effectDisposers = [];
|
this.effectDisposers = [];
|
||||||
this.statusMap.clear();
|
this.statusMap.clear();
|
||||||
this.preconditions.clear();
|
this.preconditions.clear();
|
||||||
|
this.canStart.clear();
|
||||||
this.blockedByFailure.clear();
|
this.blockedByFailure.clear();
|
||||||
this.resultMap.clear();
|
this.resultMap.clear();
|
||||||
this.nodeKeyToRequestId.clear();
|
this.nodeKeyToRequestId.clear();
|
||||||
|
|||||||
482
test/reactive/max-concurrency.test.ts
Normal file
482
test/reactive/max-concurrency.test.ts
Normal file
@@ -0,0 +1,482 @@
|
|||||||
|
import { describe, it, expect } from "vitest";
|
||||||
|
import { DirectedGraph } from "graphology";
|
||||||
|
import { WorkflowReactiveRoot } from "../../src/reactive/workflow.js";
|
||||||
|
import type { ParallelGroupConfig } from "../../src/reactive/workflow.js";
|
||||||
|
|
||||||
|
function makeParallelGroupGraph(): DirectedGraph {
|
||||||
|
const graph = new DirectedGraph();
|
||||||
|
graph.addNode("entry", { name: "entry" });
|
||||||
|
graph.addNode("a", { name: "a" });
|
||||||
|
graph.addNode("b", { name: "b" });
|
||||||
|
graph.addNode("c", { name: "c" });
|
||||||
|
graph.addNode("d", { name: "d" });
|
||||||
|
graph.addEdgeWithKey("entry->a", "entry", "a", { edgeType: "sequential" });
|
||||||
|
graph.addEdgeWithKey("entry->b", "entry", "b", { edgeType: "sequential" });
|
||||||
|
graph.addEdgeWithKey("entry->c", "entry", "c", { edgeType: "sequential" });
|
||||||
|
graph.addEdgeWithKey("entry->d", "entry", "d", { edgeType: "sequential" });
|
||||||
|
return graph;
|
||||||
|
}
|
||||||
|
|
||||||
|
function makeParallelGroupNoEntry(): DirectedGraph {
|
||||||
|
const graph = new DirectedGraph();
|
||||||
|
graph.addNode("a", { name: "a" });
|
||||||
|
graph.addNode("b", { name: "b" });
|
||||||
|
graph.addNode("c", { name: "c" });
|
||||||
|
return graph;
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("maxConcurrency semaphore", () => {
|
||||||
|
describe("parallel group with maxConcurrency: 2", () => {
|
||||||
|
const parallelGroups: ParallelGroupConfig = {
|
||||||
|
group1: {
|
||||||
|
siblings: ["a", "b", "c", "d"],
|
||||||
|
maxConcurrency: 2,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
it("limits running siblings to maxConcurrency", () => {
|
||||||
|
const graph = makeParallelGroupGraph();
|
||||||
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
||||||
|
|
||||||
|
expect(root.canStart.has("a")).toBe(true);
|
||||||
|
expect(root.canStart.has("b")).toBe(true);
|
||||||
|
expect(root.canStart.has("c")).toBe(true);
|
||||||
|
expect(root.canStart.has("d")).toBe(true);
|
||||||
|
|
||||||
|
expect(root.statusMap.get("entry")!.value).toBe("ready");
|
||||||
|
root.statusMap.get("entry")!.value = "completed";
|
||||||
|
|
||||||
|
expect(root.preconditions.get("a")!.value).toBe(true);
|
||||||
|
expect(root.preconditions.get("b")!.value).toBe(true);
|
||||||
|
expect(root.preconditions.get("c")!.value).toBe(true);
|
||||||
|
expect(root.preconditions.get("d")!.value).toBe(true);
|
||||||
|
|
||||||
|
expect(root.canStart.get("a")!.value).toBe(true);
|
||||||
|
expect(root.canStart.get("b")!.value).toBe(true);
|
||||||
|
|
||||||
|
root.dispose();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("canStart becomes false when maxConcurrency siblings are running", () => {
|
||||||
|
const graph = makeParallelGroupGraph();
|
||||||
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
||||||
|
|
||||||
|
root.statusMap.get("entry")!.value = "completed";
|
||||||
|
|
||||||
|
root.statusMap.get("a")!.value = "running";
|
||||||
|
root.statusMap.get("b")!.value = "running";
|
||||||
|
|
||||||
|
expect(root.canStart.get("a")!.value).toBe(true);
|
||||||
|
expect(root.canStart.get("b")!.value).toBe(true);
|
||||||
|
expect(root.canStart.get("c")!.value).toBe(false);
|
||||||
|
expect(root.canStart.get("d")!.value).toBe(false);
|
||||||
|
|
||||||
|
root.dispose();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("blocked nodes by semaphore stay idle (not ready)", () => {
|
||||||
|
const graph = makeParallelGroupGraph();
|
||||||
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
||||||
|
|
||||||
|
root.statusMap.get("entry")!.value = "completed";
|
||||||
|
|
||||||
|
root.statusMap.get("a")!.value = "running";
|
||||||
|
root.statusMap.get("b")!.value = "running";
|
||||||
|
|
||||||
|
expect(root.statusMap.get("c")!.value).toBe("idle");
|
||||||
|
expect(root.statusMap.get("d")!.value).toBe("idle");
|
||||||
|
|
||||||
|
root.dispose();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("slot opens when a running sibling completes", () => {
|
||||||
|
const graph = makeParallelGroupGraph();
|
||||||
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
||||||
|
|
||||||
|
root.statusMap.get("entry")!.value = "completed";
|
||||||
|
root.statusMap.get("a")!.value = "running";
|
||||||
|
root.statusMap.get("b")!.value = "running";
|
||||||
|
|
||||||
|
expect(root.canStart.get("c")!.value).toBe(false);
|
||||||
|
|
||||||
|
root.statusMap.get("a")!.value = "completed";
|
||||||
|
|
||||||
|
expect(root.canStart.get("c")!.value).toBe(true);
|
||||||
|
expect(root.canStart.get("d")!.value).toBe(false);
|
||||||
|
|
||||||
|
expect(root.statusMap.get("c")!.value).toBe("ready");
|
||||||
|
|
||||||
|
root.dispose();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("all slots eventually open as siblings complete", () => {
|
||||||
|
const graph = makeParallelGroupGraph();
|
||||||
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
||||||
|
|
||||||
|
root.statusMap.get("entry")!.value = "completed";
|
||||||
|
root.statusMap.get("a")!.value = "running";
|
||||||
|
root.statusMap.get("b")!.value = "running";
|
||||||
|
|
||||||
|
root.statusMap.get("a")!.value = "completed";
|
||||||
|
expect(root.canStart.get("c")!.value).toBe(true);
|
||||||
|
|
||||||
|
root.statusMap.get("b")!.value = "completed";
|
||||||
|
expect(root.canStart.get("d")!.value).toBe(true);
|
||||||
|
|
||||||
|
root.dispose();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("canStart correctly limits when sibling transitions to ready", () => {
|
||||||
|
const graph = makeParallelGroupGraph();
|
||||||
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
||||||
|
|
||||||
|
root.statusMap.get("entry")!.value = "completed";
|
||||||
|
|
||||||
|
root.statusMap.get("a")!.value = "ready";
|
||||||
|
root.statusMap.get("b")!.value = "ready";
|
||||||
|
|
||||||
|
expect(root.canStart.get("c")!.value).toBe(false);
|
||||||
|
expect(root.canStart.get("d")!.value).toBe(false);
|
||||||
|
|
||||||
|
root.statusMap.get("a")!.value = "running";
|
||||||
|
root.statusMap.get("b")!.value = "running";
|
||||||
|
|
||||||
|
expect(root.canStart.get("c")!.value).toBe(false);
|
||||||
|
expect(root.canStart.get("d")!.value).toBe(false);
|
||||||
|
|
||||||
|
root.dispose();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("parallel group without maxConcurrency", () => {
|
||||||
|
const parallelGroups: ParallelGroupConfig = {
|
||||||
|
group1: {
|
||||||
|
siblings: ["a", "b", "c", "d"],
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
it("all siblings start immediately when preconditions are met", () => {
|
||||||
|
const graph = makeParallelGroupGraph();
|
||||||
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
||||||
|
|
||||||
|
root.statusMap.get("entry")!.value = "completed";
|
||||||
|
|
||||||
|
expect(root.canStart.get("a")!.value).toBe(true);
|
||||||
|
expect(root.canStart.get("b")!.value).toBe(true);
|
||||||
|
expect(root.canStart.get("c")!.value).toBe(true);
|
||||||
|
expect(root.canStart.get("d")!.value).toBe(true);
|
||||||
|
|
||||||
|
expect(root.statusMap.get("a")!.value).toBe("ready");
|
||||||
|
expect(root.statusMap.get("b")!.value).toBe("ready");
|
||||||
|
expect(root.statusMap.get("c")!.value).toBe("ready");
|
||||||
|
expect(root.statusMap.get("d")!.value).toBe("ready");
|
||||||
|
|
||||||
|
root.dispose();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("no semaphore — running siblings do not block others", () => {
|
||||||
|
const graph = makeParallelGroupGraph();
|
||||||
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
||||||
|
|
||||||
|
root.statusMap.get("entry")!.value = "completed";
|
||||||
|
root.statusMap.get("a")!.value = "running";
|
||||||
|
root.statusMap.get("b")!.value = "running";
|
||||||
|
|
||||||
|
expect(root.canStart.get("c")!.value).toBe(true);
|
||||||
|
expect(root.canStart.get("d")!.value).toBe(true);
|
||||||
|
|
||||||
|
root.dispose();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("no parallelGroups config", () => {
|
||||||
|
it("all nodes start normally without semaphore", () => {
|
||||||
|
const graph = makeParallelGroupGraph();
|
||||||
|
const root = new WorkflowReactiveRoot(graph);
|
||||||
|
|
||||||
|
root.statusMap.get("entry")!.value = "completed";
|
||||||
|
|
||||||
|
expect(root.canStart.get("a")!.value).toBe(true);
|
||||||
|
expect(root.canStart.get("b")!.value).toBe(true);
|
||||||
|
expect(root.canStart.get("c")!.value).toBe(true);
|
||||||
|
expect(root.canStart.get("d")!.value).toBe(true);
|
||||||
|
|
||||||
|
expect(root.statusMap.get("a")!.value).toBe("ready");
|
||||||
|
expect(root.statusMap.get("b")!.value).toBe("ready");
|
||||||
|
expect(root.statusMap.get("c")!.value).toBe("ready");
|
||||||
|
expect(root.statusMap.get("d")!.value).toBe("ready");
|
||||||
|
|
||||||
|
root.dispose();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("maxConcurrency: 1 (serial within parallel)", () => {
|
||||||
|
const parallelGroups: ParallelGroupConfig = {
|
||||||
|
group1: {
|
||||||
|
siblings: ["a", "b", "c"],
|
||||||
|
maxConcurrency: 1,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
it("only one sibling can run at a time", () => {
|
||||||
|
const graph = makeParallelGroupGraph();
|
||||||
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
||||||
|
|
||||||
|
root.statusMap.get("entry")!.value = "completed";
|
||||||
|
|
||||||
|
const canStartA = root.canStart.get("a")!;
|
||||||
|
const canStartB = root.canStart.get("b")!;
|
||||||
|
const canStartC = root.canStart.get("c")!;
|
||||||
|
|
||||||
|
expect(canStartA.value).toBe(true);
|
||||||
|
expect(canStartB.value).toBe(false);
|
||||||
|
expect(canStartC.value).toBe(false);
|
||||||
|
|
||||||
|
root.statusMap.get("a")!.value = "running";
|
||||||
|
|
||||||
|
expect(canStartA.value).toBe(true);
|
||||||
|
expect(canStartB.value).toBe(false);
|
||||||
|
expect(canStartC.value).toBe(false);
|
||||||
|
|
||||||
|
root.dispose();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("next sibling can start when current finishes", () => {
|
||||||
|
const graph = makeParallelGroupGraph();
|
||||||
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
||||||
|
|
||||||
|
root.statusMap.get("entry")!.value = "completed";
|
||||||
|
|
||||||
|
root.statusMap.get("a")!.value = "running";
|
||||||
|
root.statusMap.get("a")!.value = "completed";
|
||||||
|
|
||||||
|
expect(root.canStart.get("b")!.value).toBe(true);
|
||||||
|
expect(root.canStart.get("c")!.value).toBe(false);
|
||||||
|
|
||||||
|
root.dispose();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("root-level parallel group with maxConcurrency", () => {
|
||||||
|
it("limits concurrent start for root nodes with no predecessors", () => {
|
||||||
|
const graph = makeParallelGroupNoEntry();
|
||||||
|
const parallelGroups: ParallelGroupConfig = {
|
||||||
|
root: {
|
||||||
|
siblings: ["a", "b", "c"],
|
||||||
|
maxConcurrency: 2,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
||||||
|
|
||||||
|
expect(root.canStart.get("a")!.value).toBe(true);
|
||||||
|
expect(root.canStart.get("b")!.value).toBe(true);
|
||||||
|
expect(root.canStart.get("c")!.value).toBe(false);
|
||||||
|
|
||||||
|
root.dispose();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("third node becomes ready when a slot opens", () => {
|
||||||
|
const graph = makeParallelGroupNoEntry();
|
||||||
|
const parallelGroups: ParallelGroupConfig = {
|
||||||
|
root: {
|
||||||
|
siblings: ["a", "b", "c"],
|
||||||
|
maxConcurrency: 2,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
||||||
|
|
||||||
|
root.statusMap.get("a")!.value = "running";
|
||||||
|
root.statusMap.get("b")!.value = "running";
|
||||||
|
|
||||||
|
expect(root.canStart.get("c")!.value).toBe(false);
|
||||||
|
expect(root.statusMap.get("c")!.value).toBe("idle");
|
||||||
|
|
||||||
|
root.statusMap.get("a")!.value = "completed";
|
||||||
|
|
||||||
|
expect(root.canStart.get("c")!.value).toBe(true);
|
||||||
|
expect(root.statusMap.get("c")!.value).toBe("ready");
|
||||||
|
|
||||||
|
root.dispose();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("preconditions vs canStart", () => {
|
||||||
|
it("canStart is false when preconditions are not met even if semaphore has slots", () => {
|
||||||
|
const graph = makeParallelGroupGraph();
|
||||||
|
const parallelGroups: ParallelGroupConfig = {
|
||||||
|
group1: {
|
||||||
|
siblings: ["a", "b", "c", "d"],
|
||||||
|
maxConcurrency: 2,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
||||||
|
|
||||||
|
expect(root.preconditions.get("a")!.value).toBe(false);
|
||||||
|
expect(root.canStart.get("a")!.value).toBe(false);
|
||||||
|
|
||||||
|
root.dispose();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("canStart requires both preconditions AND semaphore slot", () => {
|
||||||
|
const graph = makeParallelGroupGraph();
|
||||||
|
const parallelGroups: ParallelGroupConfig = {
|
||||||
|
group1: {
|
||||||
|
siblings: ["a", "b", "c", "d"],
|
||||||
|
maxConcurrency: 2,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
||||||
|
|
||||||
|
root.statusMap.get("entry")!.value = "completed";
|
||||||
|
|
||||||
|
expect(root.preconditions.get("a")!.value).toBe(true);
|
||||||
|
expect(root.canStart.get("a")!.value).toBe(true);
|
||||||
|
|
||||||
|
root.statusMap.get("a")!.value = "running";
|
||||||
|
root.statusMap.get("b")!.value = "running";
|
||||||
|
|
||||||
|
expect(root.preconditions.get("c")!.value).toBe(true);
|
||||||
|
expect(root.canStart.get("c")!.value).toBe(false);
|
||||||
|
|
||||||
|
root.dispose();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("failed sibling does not count as running", () => {
|
||||||
|
it("failed sibling frees a slot for semaphore", () => {
|
||||||
|
const graph = makeParallelGroupGraph();
|
||||||
|
const parallelGroups: ParallelGroupConfig = {
|
||||||
|
group1: {
|
||||||
|
siblings: ["a", "b", "c", "d"],
|
||||||
|
maxConcurrency: 2,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
||||||
|
|
||||||
|
root.statusMap.get("entry")!.value = "completed";
|
||||||
|
root.statusMap.get("a")!.value = "running";
|
||||||
|
root.statusMap.get("b")!.value = "running";
|
||||||
|
|
||||||
|
expect(root.canStart.get("c")!.value).toBe(false);
|
||||||
|
|
||||||
|
root.statusMap.get("b")!.value = "failed";
|
||||||
|
|
||||||
|
expect(root.canStart.get("c")!.value).toBe(true);
|
||||||
|
|
||||||
|
root.dispose();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("completed sibling frees a slot for semaphore", () => {
|
||||||
|
const graph = makeParallelGroupGraph();
|
||||||
|
const parallelGroups: ParallelGroupConfig = {
|
||||||
|
group1: {
|
||||||
|
siblings: ["a", "b", "c", "d"],
|
||||||
|
maxConcurrency: 2,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
||||||
|
|
||||||
|
root.statusMap.get("entry")!.value = "completed";
|
||||||
|
root.statusMap.get("a")!.value = "running";
|
||||||
|
root.statusMap.get("b")!.value = "running";
|
||||||
|
|
||||||
|
root.statusMap.get("b")!.value = "completed";
|
||||||
|
|
||||||
|
expect(root.canStart.get("c")!.value).toBe(true);
|
||||||
|
expect(root.canStart.get("d")!.value).toBe(false);
|
||||||
|
|
||||||
|
root.dispose();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("aborted sibling frees a slot for semaphore", () => {
|
||||||
|
const graph = makeParallelGroupGraph();
|
||||||
|
const parallelGroups: ParallelGroupConfig = {
|
||||||
|
group1: {
|
||||||
|
siblings: ["a", "b", "c", "d"],
|
||||||
|
maxConcurrency: 2,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
||||||
|
|
||||||
|
root.statusMap.get("entry")!.value = "completed";
|
||||||
|
root.statusMap.get("a")!.value = "running";
|
||||||
|
root.statusMap.get("b")!.value = "running";
|
||||||
|
|
||||||
|
root.statusMap.get("b")!.value = "aborted";
|
||||||
|
|
||||||
|
expect(root.canStart.get("c")!.value).toBe(true);
|
||||||
|
|
||||||
|
root.dispose();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("dispose clears canStart", () => {
|
||||||
|
it("canStart map is cleared on dispose", () => {
|
||||||
|
const graph = makeParallelGroupGraph();
|
||||||
|
const parallelGroups: ParallelGroupConfig = {
|
||||||
|
group1: {
|
||||||
|
siblings: ["a", "b", "c", "d"],
|
||||||
|
maxConcurrency: 2,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
||||||
|
|
||||||
|
expect(root.canStart.size).toBe(5);
|
||||||
|
|
||||||
|
root.dispose();
|
||||||
|
|
||||||
|
expect(root.canStart.size).toBe(0);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("full execution flow with maxConcurrency", () => {
|
||||||
|
it("simulates parallel execution with maxConcurrency: 2 — only 2 run at a time", () => {
|
||||||
|
const graph = makeParallelGroupGraph();
|
||||||
|
const parallelGroups: ParallelGroupConfig = {
|
||||||
|
group1: {
|
||||||
|
siblings: ["a", "b", "c", "d"],
|
||||||
|
maxConcurrency: 2,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
||||||
|
|
||||||
|
root.setRequestId("entry", "req-entry");
|
||||||
|
root.setRequestId("a", "req-a");
|
||||||
|
root.setRequestId("b", "req-b");
|
||||||
|
root.setRequestId("c", "req-c");
|
||||||
|
root.setRequestId("d", "req-d");
|
||||||
|
|
||||||
|
root.statusMap.get("entry")!.value = "completed";
|
||||||
|
|
||||||
|
expect(root.statusMap.get("a")!.value).toBe("ready");
|
||||||
|
expect(root.statusMap.get("b")!.value).toBe("ready");
|
||||||
|
expect(root.statusMap.get("c")!.value).toBe("idle");
|
||||||
|
expect(root.statusMap.get("d")!.value).toBe("idle");
|
||||||
|
|
||||||
|
root.statusMap.get("a")!.value = "running";
|
||||||
|
root.statusMap.get("b")!.value = "running";
|
||||||
|
|
||||||
|
expect(root.statusMap.get("c")!.value).toBe("idle");
|
||||||
|
expect(root.statusMap.get("d")!.value).toBe("idle");
|
||||||
|
|
||||||
|
root.statusMap.get("a")!.value = "completed";
|
||||||
|
|
||||||
|
expect(root.statusMap.get("c")!.value).toBe("ready");
|
||||||
|
expect(root.statusMap.get("d")!.value).toBe("idle");
|
||||||
|
|
||||||
|
root.statusMap.get("c")!.value = "running";
|
||||||
|
|
||||||
|
root.statusMap.get("b")!.value = "completed";
|
||||||
|
|
||||||
|
expect(root.statusMap.get("d")!.value).toBe("ready");
|
||||||
|
|
||||||
|
root.statusMap.get("d")!.value = "running";
|
||||||
|
root.statusMap.get("c")!.value = "completed";
|
||||||
|
root.statusMap.get("d")!.value = "completed";
|
||||||
|
|
||||||
|
expect(root.isComplete()).toBe(true);
|
||||||
|
|
||||||
|
root.dispose();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
Reference in New Issue
Block a user