From d63a301ceacfe44214c8c380c4194bb8c7b245ed Mon Sep 17 00:00:00 2001 From: "glm-5.1" Date: Thu, 21 May 2026 22:10:31 +0000 Subject: [PATCH] Implement retry semantics: requestIdToNodeKey reverse map, setRequestId method, full-history getEvents --- src/reactive/workflow.ts | 52 +++++++++++++++++++++++----------- test/reactive/workflow.test.ts | 31 ++++++++++---------- 2 files changed, 52 insertions(+), 31 deletions(-) diff --git a/src/reactive/workflow.ts b/src/reactive/workflow.ts index dfd0ad3..d10a5a2 100644 --- a/src/reactive/workflow.ts +++ b/src/reactive/workflow.ts @@ -88,6 +88,7 @@ export class WorkflowReactiveRoot implements EventLogProjection { blockedByFailure: Map>; resultMap: Map>; nodeKeyToRequestId: Map; + requestIdToNodeKey: Map; private graph: DirectedGraph; private effectDisposers: (() => void)[]; @@ -106,10 +107,16 @@ export class WorkflowReactiveRoot implements EventLogProjection { this.effectDisposers = []; this.eventLog = []; this.nodeKeyToRequestId = new Map(); + this.requestIdToNodeKey = new Map(); this._failurePolicy = options?.failurePolicy ?? "continue-running"; this.initializeSignals(); } + setRequestId(nodeKey: string, requestId: string): void { + this.nodeKeyToRequestId.set(nodeKey, requestId); + this.requestIdToNodeKey.set(requestId, nodeKey); + } + private initializeSignals(): void { for (const node of this.graph.nodes()) { const predecessors: string[] = this.graph.inNeighbors(node) ?? []; @@ -213,15 +220,29 @@ export class WorkflowReactiveRoot implements EventLogProjection { if (!("requestId" in event)) return; - const nodeId = this.findNodeByRequestId(event.requestId); + let nodeId = this.requestIdToNodeKey.get(event.requestId); + + if (nodeId === undefined) { + for (const [nId, rid] of this.nodeKeyToRequestId) { + if (rid === event.requestId) { + nodeId = nId; + this.requestIdToNodeKey.set(event.requestId, nId); + break; + } + } + } + if (nodeId === undefined) return; - const statusSignal = this.statusMap.get(nodeId); - if (!statusSignal) return; + const currentRequestId = this.nodeKeyToRequestId.get(nodeId); + if (currentRequestId === event.requestId) { + const statusSignal = this.statusMap.get(nodeId); + if (!statusSignal) return; - const derived = EVENT_TO_STATUS[event.type]; - if (derived !== undefined) { - statusSignal.value = derived; + const derived = EVENT_TO_STATUS[event.type]; + if (derived !== undefined) { + statusSignal.value = derived; + } } } @@ -254,12 +275,17 @@ export class WorkflowReactiveRoot implements EventLogProjection { } getEvents(nodeId: string): CallEventMapValue[] { - const requestId = this.nodeKeyToRequestId.get(nodeId); - if (!requestId) return []; + const requestIds = new Set(); + for (const [rid, nId] of this.requestIdToNodeKey) { + if (nId === nodeId) { + requestIds.add(rid); + } + } + if (requestIds.size === 0) return []; const events: CallEventMapValue[] = []; for (const e of this.eventLog) { - if ("requestId" in e && e.requestId === requestId) { + if ("requestId" in e && requestIds.has(e.requestId)) { events.push(e); } } @@ -325,13 +351,7 @@ export class WorkflowReactiveRoot implements EventLogProjection { this.blockedByFailure.clear(); this.resultMap.clear(); this.nodeKeyToRequestId.clear(); + this.requestIdToNodeKey.clear(); this.eventLog = []; } - - private findNodeByRequestId(requestId: string): string | undefined { - for (const [nodeId, rid] of this.nodeKeyToRequestId) { - if (rid === requestId) return nodeId; - } - return undefined; - } } diff --git a/test/reactive/workflow.test.ts b/test/reactive/workflow.test.ts index 1975831..f02ab08 100644 --- a/test/reactive/workflow.test.ts +++ b/test/reactive/workflow.test.ts @@ -159,7 +159,7 @@ describe("WorkflowReactiveRoot", () => { const graph = makeSimpleGraph(); const root = new WorkflowReactiveRoot(graph); - root.nodeKeyToRequestId.set("a", "req-1"); + root.setRequestId("a", "req-1"); root.append({ type: "call.requested", requestId: "req-1", @@ -177,7 +177,7 @@ describe("WorkflowReactiveRoot", () => { const graph = makeSimpleGraph(); const root = new WorkflowReactiveRoot(graph); - root.nodeKeyToRequestId.set("a", "req-1"); + root.setRequestId("a", "req-1"); root.append({ type: "call.requested", requestId: "req-1", @@ -201,7 +201,7 @@ describe("WorkflowReactiveRoot", () => { const graph = makeSimpleGraph(); const root = new WorkflowReactiveRoot(graph); - root.nodeKeyToRequestId.set("a", "req-1"); + root.setRequestId("a", "req-1"); root.append({ type: "call.requested", requestId: "req-1", @@ -225,7 +225,7 @@ describe("WorkflowReactiveRoot", () => { const graph = makeSimpleGraph(); const root = new WorkflowReactiveRoot(graph); - root.nodeKeyToRequestId.set("a", "req-1"); + root.setRequestId("a", "req-1"); root.append({ type: "call.requested", requestId: "req-1", @@ -265,7 +265,7 @@ describe("WorkflowReactiveRoot", () => { const graph = makeSimpleGraph(); const root = new WorkflowReactiveRoot(graph); - root.nodeKeyToRequestId.set("a", "req-1"); + root.setRequestId("a", "req-1"); const respondedEvent: CallEventMapValue = { type: "call.responded", @@ -304,7 +304,7 @@ describe("WorkflowReactiveRoot", () => { const graph = makeSimpleGraph(); const root = new WorkflowReactiveRoot(graph); - root.nodeKeyToRequestId.set("a", "req-1"); + root.setRequestId("a", "req-1"); root.append({ type: "call.requested", requestId: "req-1", @@ -353,7 +353,7 @@ describe("WorkflowReactiveRoot", () => { const graph = makeSimpleGraph(); const root = new WorkflowReactiveRoot(graph); - root.nodeKeyToRequestId.set("a", "req-1"); + root.setRequestId("a", "req-1"); root.append({ type: "call.requested", requestId: "req-1", @@ -380,7 +380,7 @@ describe("WorkflowReactiveRoot", () => { const graph = makeSimpleGraph(); const root = new WorkflowReactiveRoot(graph); - root.nodeKeyToRequestId.set("a", "req-1"); + root.setRequestId("a", "req-1"); root.append({ type: "call.requested", requestId: "req-1", @@ -408,7 +408,7 @@ describe("WorkflowReactiveRoot", () => { const graph = makeSimpleGraph(); const root = new WorkflowReactiveRoot(graph); - root.nodeKeyToRequestId.set("a", "req-1"); + root.setRequestId("a", "req-1"); root.append({ type: "call.requested", requestId: "req-1", @@ -434,7 +434,7 @@ describe("WorkflowReactiveRoot", () => { const graph = makeSimpleGraph(); const root = new WorkflowReactiveRoot(graph); - root.nodeKeyToRequestId.set("a", "req-1"); + root.setRequestId("a", "req-1"); root.append({ type: "call.requested", requestId: "req-1", @@ -452,7 +452,7 @@ describe("WorkflowReactiveRoot", () => { const graph = makeSimpleGraph(); const root = new WorkflowReactiveRoot(graph); - root.nodeKeyToRequestId.set("a", "req-2"); + root.setRequestId("a", "req-1"); root.append({ type: "call.requested", requestId: "req-1", @@ -466,6 +466,7 @@ describe("WorkflowReactiveRoot", () => { error: { code: "ERR", message: "first attempt failed" }, timestamp: "2026-01-01T00:00:01Z", }); + root.setRequestId("a", "req-2"); root.append({ type: "call.requested", requestId: "req-2", @@ -503,7 +504,7 @@ describe("WorkflowReactiveRoot", () => { const graph = makeSimpleGraph(); const root = new WorkflowReactiveRoot(graph); - root.nodeKeyToRequestId.set("a", "req-1"); + root.setRequestId("a", "req-1"); root.append({ type: "call.requested", requestId: "req-1", @@ -840,9 +841,9 @@ describe("WorkflowReactiveRoot", () => { const graph = makeSimpleGraph(); const root = new WorkflowReactiveRoot(graph); - root.nodeKeyToRequestId.set("a", "req-a"); - root.nodeKeyToRequestId.set("b", "req-b"); - root.nodeKeyToRequestId.set("c", "req-c"); + root.setRequestId("a", "req-a"); + root.setRequestId("b", "req-b"); + root.setRequestId("c", "req-c"); expect(root.getStatus("a")).toBe("idle"); expect(root.getStatus("b")).toBe("idle");