feat: implement maxConcurrency reactive counting semaphore with 20 unit tests

This commit is contained in:
2026-05-21 22:29:02 +00:00
parent ee3da90b63
commit 1a12410229
4 changed files with 531 additions and 5 deletions

View File

@@ -9,6 +9,8 @@ export {
type CallCompletedEvent,
type EventLogProjection,
type AggregateStatus,
type ParallelGroup,
type ParallelGroupConfig,
} from "./workflow.js";
export {

View File

@@ -39,11 +39,11 @@ export function computeBlockedByFailure(
export function registerStartEffect(
status: Signal<NodeStatus>,
preconditions: ReadonlySignal<boolean>,
canStart: ReadonlySignal<boolean>,
effectDisposers: (() => void)[],
): void {
const disposer = effect(() => {
if (preconditions.value) {
if (canStart.value) {
const current = status.value;
if (current === "idle" || current === "waiting") {
status.value = "ready";

View File

@@ -13,6 +13,15 @@ import type { NodeStatusContext } from "./node-status.js";
export type FailurePolicy = "continue-running" | "abort-dependents";
export interface ParallelGroup {
siblings: string[];
maxConcurrency?: number;
}
export interface ParallelGroupConfig {
[groupKey: string]: ParallelGroup;
}
export interface CallRequestedEvent {
type: "call.requested";
requestId: string;
@@ -92,6 +101,7 @@ const EVENT_TO_STATUS: Record<string, NodeStatus> = {
export class WorkflowReactiveRoot implements EventLogProjection {
statusMap: Map<string, Signal<NodeStatus>>;
preconditions: Map<string, ReadonlySignal<boolean>>;
canStart: Map<string, ReadonlySignal<boolean>>;
blockedByFailure: Map<string, ReadonlySignal<boolean>>;
resultMap: Map<string, ReadonlySignal<CallResult | undefined>>;
nodeKeyToRequestId: Map<string, string>;
@@ -101,14 +111,16 @@ export class WorkflowReactiveRoot implements EventLogProjection {
private effectDisposers: (() => void)[];
private eventLog: CallEventMapValue[];
private _failurePolicy: FailurePolicy;
private _parallelGroups: ParallelGroupConfig;
constructor(
graph: DirectedGraph,
options?: { failurePolicy?: FailurePolicy },
options?: { failurePolicy?: FailurePolicy; parallelGroups?: ParallelGroupConfig },
) {
this.graph = graph;
this.statusMap = new Map();
this.preconditions = new Map();
this.canStart = new Map();
this.blockedByFailure = new Map();
this.resultMap = new Map();
this.effectDisposers = [];
@@ -116,6 +128,7 @@ export class WorkflowReactiveRoot implements EventLogProjection {
this.nodeKeyToRequestId = new Map();
this.requestIdToNodeKey = new Map();
this._failurePolicy = options?.failurePolicy ?? "continue-running";
this._parallelGroups = options?.parallelGroups ?? {};
this.initializeSignals();
}
@@ -125,6 +138,13 @@ export class WorkflowReactiveRoot implements EventLogProjection {
}
private initializeSignals(): void {
const nodeToGroupKey = new Map<string, string>();
for (const [groupKey, group] of Object.entries(this._parallelGroups)) {
for (const sibling of group.siblings) {
nodeToGroupKey.set(sibling, groupKey);
}
}
for (const node of this.graph.nodes()) {
const predecessors: string[] = this.graph.inNeighbors(node) ?? [];
@@ -139,6 +159,26 @@ export class WorkflowReactiveRoot implements EventLogProjection {
return computePreconditions(node, ctx);
});
const groupKey = nodeToGroupKey.get(node);
const parallelGroup = groupKey ? this._parallelGroups[groupKey] : undefined;
const maxConc = parallelGroup?.maxConcurrency;
const siblings = parallelGroup?.siblings ?? [];
let canStartComputed: ReadonlySignal<boolean>;
if (maxConc !== undefined && siblings.length > 0) {
const otherSiblings = siblings.filter((s) => s !== node);
canStartComputed = computed(() => {
if (!preconditionsComputed.value) return false;
const activeSiblingCount = otherSiblings.filter((sib) => {
const sibStatus = this.statusMap.get(sib);
return sibStatus && (sibStatus.value === "running" || sibStatus.value === "ready");
}).length;
return activeSiblingCount < maxConc;
});
} else {
canStartComputed = preconditionsComputed;
}
const blockedByFailureComputed = computed(() => {
return computeBlockedByFailure(node, ctx);
});
@@ -189,16 +229,17 @@ export class WorkflowReactiveRoot implements EventLogProjection {
this.statusMap.set(node, status);
this.preconditions.set(node, preconditionsComputed);
this.canStart.set(node, canStartComputed);
this.blockedByFailure.set(node, blockedByFailureComputed);
this.resultMap.set(node, resultComputed);
}
for (const node of this.graph.nodes()) {
const status = this.statusMap.get(node)!;
const preconditions = this.preconditions.get(node)!;
const canStart = this.canStart.get(node)!;
const blocked = this.blockedByFailure.get(node)!;
registerStartEffect(status, preconditions, this.effectDisposers);
registerStartEffect(status, canStart, this.effectDisposers);
registerAbortEffect(status, blocked, this.effectDisposers, {
abortDependents: this._failurePolicy === "abort-dependents",
});
@@ -338,6 +379,7 @@ export class WorkflowReactiveRoot implements EventLogProjection {
this.effectDisposers = [];
this.statusMap.clear();
this.preconditions.clear();
this.canStart.clear();
this.blockedByFailure.clear();
this.resultMap.clear();
this.nodeKeyToRequestId.clear();