import { signal, computed, effect } from "@preact/signals-core"; import type { Signal, ReadonlySignal } from "@preact/signals-core"; import type { DirectedGraph } from "graphology"; import type { NodeStatus } from "../schema/enums.js"; import type { CallResult } from "../schema/edge.js"; export type FailurePolicy = "continue-running" | "abort-dependents"; export interface CallRequestedEvent { type: "call.requested"; requestId: string; operationId: string; input: unknown; timestamp: string; } export interface CallRespondedEvent { type: "call.responded"; requestId: string; output: unknown; timestamp: string; } export interface CallErrorEvent { type: "call.error"; requestId: string; error: { code: string; message: string; details?: unknown }; timestamp: string; } export interface CallAbortedEvent { type: "call.aborted"; requestId: string; timestamp: string; } export interface CallCompletedEvent { type: "call.completed"; requestId: string; output: unknown; timestamp: string; } export type CallEventMapValue = | CallRequestedEvent | CallRespondedEvent | CallErrorEvent | CallAbortedEvent | CallCompletedEvent; export interface EventLogProjection { append(event: CallEventMapValue): void; getStatus(nodeId: string): NodeStatus; getResult(nodeId: string): CallResult | undefined; getEvents(nodeId: string): CallEventMapValue[]; } export interface AggregateStatus { completed: number; failed: number; aborted: number; skipped: number; running: number; waiting: number; ready: number; idle: number; total: number; } const TERMINAL_STATUSES: Set = new Set([ "completed", "failed", "aborted", "skipped", ]); const EVENT_TO_STATUS: Record = { "call.requested": "running", "call.responded": "completed", "call.error": "failed", "call.aborted": "aborted", "call.completed": "completed", }; export class WorkflowReactiveRoot implements EventLogProjection { statusMap: Map>; preconditions: Map>; blockedByFailure: Map>; resultMap: Map>; nodeKeyToRequestId: Map; private graph: DirectedGraph; private effectDisposers: (() => void)[]; private eventLog: CallEventMapValue[]; private _failurePolicy: FailurePolicy; constructor( graph: DirectedGraph, options?: { failurePolicy?: FailurePolicy }, ) { this.graph = graph; this.statusMap = new Map(); this.preconditions = new Map(); this.blockedByFailure = new Map(); this.resultMap = new Map(); this.effectDisposers = []; this.eventLog = []; this.nodeKeyToRequestId = new Map(); this._failurePolicy = options?.failurePolicy ?? "continue-running"; this.initializeSignals(); } private initializeSignals(): void { for (const node of this.graph.nodes()) { const predecessors: string[] = this.graph.inNeighbors(node) ?? []; const status = signal("idle"); const preconditionsComputed = computed(() => { return predecessors.every((pred: string) => { const predStatus = this.statusMap.get(pred); if (!predStatus) return false; return ( predStatus.value === "completed" || predStatus.value === "skipped" ); }); }); const blockedByFailureComputed = computed(() => { return predecessors.some((pred: string) => { const predStatus = this.statusMap.get(pred); if (!predStatus) return false; return ( predStatus.value === "failed" || predStatus.value === "aborted" ); }); }); const resultComputed = computed(() => { const requestId = this.nodeKeyToRequestId.get(node); if (!requestId) return undefined; let latestTerminalEvent: CallEventMapValue | undefined; for (let i = this.eventLog.length - 1; i >= 0; i--) { const e = this.eventLog[i]!; if ("requestId" in e && e.requestId === requestId) { if ( e.type === "call.responded" || e.type === "call.error" || e.type === "call.aborted" ) { latestTerminalEvent = e; break; } } } if (!latestTerminalEvent) return undefined; if (latestTerminalEvent.type === "call.error") { return { status: "failed" as NodeStatus, output: undefined, error: latestTerminalEvent.error, } as CallResult; } if (latestTerminalEvent.type === "call.responded") { return { status: "completed" as NodeStatus, output: latestTerminalEvent.output, } as CallResult; } if (latestTerminalEvent.type === "call.aborted") { return { status: "aborted" as NodeStatus, output: undefined, } as CallResult; } return undefined; }); this.statusMap.set(node, status); this.preconditions.set(node, preconditionsComputed); this.blockedByFailure.set(node, blockedByFailureComputed); this.resultMap.set(node, resultComputed); } for (const node of this.graph.nodes()) { const status = this.statusMap.get(node)!; const blocked = this.blockedByFailure.get(node)!; const disposer = effect(() => { if (blocked.value) { const current = status.value; if (current === "idle" || current === "waiting" || current === "ready") { if (this._failurePolicy === "abort-dependents") { if (!TERMINAL_STATUSES.has(current)) { status.value = "aborted"; } } else { status.value = "aborted"; } } } }); this.effectDisposers.push(disposer); } } append(event: CallEventMapValue): void { this.eventLog.push(event); if (!("requestId" in event)) return; const nodeId = this.findNodeByRequestId(event.requestId); if (nodeId === undefined) return; const statusSignal = this.statusMap.get(nodeId); if (!statusSignal) return; const derived = EVENT_TO_STATUS[event.type]; if (derived !== undefined) { statusSignal.value = derived; } } getStatus(nodeId: string): NodeStatus { const statusSignal = this.statusMap.get(nodeId); if (!statusSignal) return "idle"; const requestId = this.nodeKeyToRequestId.get(nodeId); if (requestId) { let lastEventType: string | undefined; for (let i = this.eventLog.length - 1; i >= 0; i--) { const e = this.eventLog[i]!; if ("requestId" in e && e.requestId === requestId) { lastEventType = e.type; break; } } if (lastEventType && EVENT_TO_STATUS[lastEventType] !== undefined) { return EVENT_TO_STATUS[lastEventType]!; } } return statusSignal.value; } getResult(nodeId: string): CallResult | undefined { const resultComputed = this.resultMap.get(nodeId); if (!resultComputed) return undefined; return resultComputed.value; } getEvents(nodeId: string): CallEventMapValue[] { const requestId = this.nodeKeyToRequestId.get(nodeId); if (!requestId) return []; const events: CallEventMapValue[] = []; for (const e of this.eventLog) { if ("requestId" in e && e.requestId === requestId) { events.push(e); } } return events; } abortAll(): void { for (const [_nodeId, status] of this.statusMap) { if (!TERMINAL_STATUSES.has(status.value)) { status.value = "aborted"; } } } abortNode(nodeId: string): void { const status = this.statusMap.get(nodeId); if (!status) return; if (!TERMINAL_STATUSES.has(status.value)) { status.value = "aborted"; } } isComplete(): boolean { for (const [_nodeId, status] of this.statusMap) { if (!TERMINAL_STATUSES.has(status.value)) { return false; } } return true; } getAggregateStatus(): AggregateStatus { const counts: Record = { completed: 0, failed: 0, aborted: 0, skipped: 0, running: 0, waiting: 0, ready: 0, idle: 0, total: 0, }; for (const [_nodeId, status] of this.statusMap) { const s = status.value; counts["total"]!++; if (s in counts) { counts[s]!++; } } return counts as unknown as AggregateStatus; } dispose(): void { for (const disposer of this.effectDisposers) { disposer(); } this.effectDisposers = []; this.statusMap.clear(); this.preconditions.clear(); this.blockedByFailure.clear(); this.resultMap.clear(); this.nodeKeyToRequestId.clear(); this.eventLog = []; } private findNodeByRequestId(requestId: string): string | undefined { for (const [nodeId, rid] of this.nodeKeyToRequestId) { if (rid === requestId) return nodeId; } return undefined; } }