Simplify to transport-only: remove call protocol, add EventEnvelope, expand stream operators

- Remove src/call.ts (PendingRequestMap, CallEventSchema, CallError) — call protocol belongs in @alkdev/operations
- Add EventEnvelope type ({ type, id, payload }) as the cross-platform serialization contract
- Simplify createPubSub: replace PubSubPublishArgsByKey tuple model with PubSubEventMap; publish(type, id, payload) and subscribe(type, id) use explicit id for topic scoping
- Update Redis adapter to serialize/deserialize full EventEnvelope
- Expand operators: add take, reduce, toArray, batch, dedupe, window, flat, groupBy, chain, join
- Remove @alkdev/typebox runtime dependency (was only used by call.ts)
- Remove ./call sub-path export from package.json and tsup config
- Update all architecture docs to reflect transport-only scope, add Worker adapter, remove call protocol references
- Remove docs/architecture/call-protocol.md
- Update AGENTS.md with new source layout and transport-only principle
This commit is contained in:
2026-05-01 19:40:25 +00:00
parent 04b3464c36
commit de7fc88f99
17 changed files with 446 additions and 764 deletions

View File

@@ -1,307 +0,0 @@
import { Type, type Static } from "@alkdev/typebox";
import { createPubSub, type PubSub } from "./create_pubsub.js";
import { Repeater, type Push, type Stop } from "./repeater.js";
export const CallEventSchema = {
"call.requested": Type.Object({
requestId: Type.String(),
operationId: Type.String(),
input: Type.Unknown(),
parentRequestId: Type.Optional(Type.String()),
deadline: Type.Optional(Type.Number()),
identity: Type.Optional(Type.Object({
id: Type.String(),
scopes: Type.Array(Type.String()),
resources: Type.Optional(Type.Record(Type.String(), Type.Array(Type.String()))),
})),
}),
"call.responded": Type.Object({
requestId: Type.String(),
output: Type.Unknown(),
}),
"call.part": Type.Object({
requestId: Type.String(),
output: Type.Unknown(),
index: Type.Optional(Type.Number()),
}),
"call.completed": Type.Object({
requestId: Type.String(),
}),
"call.aborted": Type.Object({
requestId: Type.String(),
}),
"call.error": Type.Object({
requestId: Type.String(),
code: Type.String(),
message: Type.String(),
details: Type.Optional(Type.Unknown()),
}),
} as const;
export type CallRequestedEvent = Static<typeof CallEventSchema["call.requested"]>;
export type CallRespondedEvent = Static<typeof CallEventSchema["call.responded"]>;
export type CallPartEvent = Static<typeof CallEventSchema["call.part"]>;
export type CallCompletedEvent = Static<typeof CallEventSchema["call.completed"]>;
export type CallAbortedEvent = Static<typeof CallEventSchema["call.aborted"]>;
export type CallErrorEvent = Static<typeof CallEventSchema["call.error"]>;
type CallPubSubMap = {
"call.requested": [CallRequestedEvent];
"call.responded": [string, CallRespondedEvent];
"call.part": [string, CallPartEvent];
"call.completed": [string, CallCompletedEvent];
"call.aborted": [string, CallAbortedEvent];
"call.error": [string, CallErrorEvent];
};
export const CallErrorCode = {
OPERATION_NOT_FOUND: "OPERATION_NOT_FOUND",
ACCESS_DENIED: "ACCESS_DENIED",
VALIDATION_ERROR: "VALIDATION_ERROR",
TIMEOUT: "TIMEOUT",
ABORTED: "ABORTED",
EXECUTION_ERROR: "EXECUTION_ERROR",
UNKNOWN_ERROR: "UNKNOWN_ERROR",
} as const;
export type CallErrorCodeType = (typeof CallErrorCode)[keyof typeof CallErrorCode];
export class CallError extends Error {
readonly code: string;
readonly details?: unknown;
constructor(code: string, message: string, details?: unknown) {
super(message);
this.name = "CallError";
this.code = code;
this.details = details;
}
}
interface PendingRequest {
resolve: (value: unknown) => void;
reject: (reason: unknown) => void;
deadline?: number;
timer?: ReturnType<typeof setTimeout>;
unsubscribe: () => void;
}
export class PendingRequestMap {
private requests = new Map<string, PendingRequest>();
private pubsub: PubSub<CallPubSubMap>;
constructor(eventTarget?: EventTarget) {
this.pubsub = createPubSub<CallPubSubMap>(
eventTarget ? { eventTarget: eventTarget as any } : undefined,
);
}
async call(
operationId: string,
input: unknown,
options?: { parentRequestId?: string; deadline?: number; identity?: CallRequestedEvent["identity"] },
): Promise<unknown> {
const requestId = crypto.randomUUID();
const respondedIter = this.pubsub.subscribe("call.responded", requestId);
const errorIter = this.pubsub.subscribe("call.error", requestId);
const abortedIter = this.pubsub.subscribe("call.aborted", requestId);
const cleanup = (): void => {
respondedIter.return?.();
errorIter.return?.();
abortedIter.return?.();
};
let timer: ReturnType<typeof setTimeout> | undefined;
if (options?.deadline) {
timer = setTimeout(() => {
cleanup();
this.pubsub.publish("call.aborted", requestId, { requestId });
}, options.deadline - Date.now());
}
this.pubsub.publish("call.requested", {
requestId,
operationId,
input,
parentRequestId: options?.parentRequestId,
deadline: options?.deadline,
identity: options?.identity,
});
try {
const result = await new Promise((resolve, reject) => {
const pending: PendingRequest = {
resolve: (value: unknown) => {
if (timer) clearTimeout(timer);
cleanup();
resolve(value);
},
reject: (reason: unknown) => {
if (timer) clearTimeout(timer);
cleanup();
reject(reason);
},
deadline: options?.deadline,
timer,
unsubscribe: cleanup,
};
this.requests.set(requestId, pending);
(async () => {
for await (const event of respondedIter) {
const responded = event as CallRespondedEvent;
const p = this.requests.get(responded.requestId);
if (p) {
this.requests.delete(responded.requestId);
p.resolve(responded.output);
}
return;
}
})();
(async () => {
for await (const event of errorIter) {
const err = event as CallErrorEvent;
const p = this.requests.get(err.requestId);
if (p) {
this.requests.delete(err.requestId);
p.reject(new CallError(err.code, err.message, err.details));
}
return;
}
})();
(async () => {
for await (const event of abortedIter) {
const aborted = event as CallAbortedEvent;
const p = this.requests.get(aborted.requestId);
if (p) {
this.requests.delete(aborted.requestId);
p.reject(new CallError(CallErrorCode.ABORTED, `Request ${aborted.requestId} was aborted`));
}
return;
}
})();
});
return result;
} finally {
if (timer) clearTimeout(timer);
}
}
subscribe(
operationId: string,
input: unknown,
options?: { parentRequestId?: string; deadline?: number; identity?: CallRequestedEvent["identity"] },
): Repeater<unknown> {
const requestId = crypto.randomUUID();
const map = this;
return new Repeater<unknown>(async function (push: Push<unknown>, stop: Stop) {
map.pubsub.publish("call.requested", {
requestId,
operationId,
input,
parentRequestId: options?.parentRequestId,
deadline: options?.deadline,
identity: options?.identity,
});
const partIter = map.pubsub.subscribe("call.part", requestId);
const completedIter = map.pubsub.subscribe("call.completed", requestId);
const errorIter = map.pubsub.subscribe("call.error", requestId);
let settled = false;
const cleanup = (): void => {
if (!settled) {
settled = true;
map.pubsub.publish("call.aborted", requestId, { requestId });
}
partIter.return?.();
completedIter.return?.();
errorIter.return?.();
};
stop.then(cleanup);
try {
const partPromise = (async (): Promise<never> => {
for await (const event of partIter) {
const part = event as CallPartEvent;
await push(part.output);
}
throw new Error("part stream ended unexpectedly");
})();
const completedPromise = (async () => {
for await (const _ of completedIter) {
return;
}
})();
const errorPromise = (async (): Promise<never> => {
for await (const event of errorIter) {
const err = event as CallErrorEvent;
throw new CallError(err.code, err.message, err.details);
}
throw new Error("error stream ended unexpectedly");
})();
await Promise.race([completedPromise, errorPromise, partPromise]);
} finally {
cleanup();
stop();
}
});
}
respond(requestId: string, output: unknown): void {
this.pubsub.publish("call.responded", requestId, {
requestId,
output,
});
}
part(requestId: string, output: unknown, index?: number): void {
this.pubsub.publish("call.part", requestId, {
requestId,
output,
index,
});
}
complete(requestId: string): void {
this.pubsub.publish("call.completed", requestId, { requestId });
}
emitError(requestId: string, code: string, message: string, details?: unknown): void {
this.pubsub.publish("call.error", requestId, {
requestId,
code,
message,
details,
});
}
abort(requestId: string): void {
const pending = this.requests.get(requestId);
if (pending) {
if (pending.timer) clearTimeout(pending.timer);
this.requests.delete(requestId);
pending.unsubscribe();
this.pubsub.publish("call.aborted", requestId, { requestId });
pending.reject(new CallError(CallErrorCode.ABORTED, `Request ${requestId} was aborted`));
} else {
this.pubsub.publish("call.aborted", requestId, { requestId });
}
}
getPendingCount(): number {
return this.requests.size;
}
}

View File

@@ -25,69 +25,62 @@
*/
import { Repeater } from "./repeater.js";
import type { TypedEventTarget, TypedEvent } from "./types.js";
import type { TypedEventTarget, TypedEvent, EventEnvelope } from "./types.js";
export type PubSubPublishArgsByKey = {
[key: string]: [] | [unknown] | [number | string, unknown];
export type PubSubEventMap = {
[eventType: string]: unknown;
};
export type PubSubEvent<
TPubSubPublishArgsByKey extends PubSubPublishArgsByKey,
TKey extends Extract<keyof TPubSubPublishArgsByKey, string>,
> = TypedEvent<
TKey,
TPubSubPublishArgsByKey[TKey][1] extends undefined
? TPubSubPublishArgsByKey[TKey][0]
: TPubSubPublishArgsByKey[TKey][1]
>;
TEventMap extends PubSubEventMap,
TType extends Extract<keyof TEventMap, string> = Extract<keyof TEventMap, string>,
> = TypedEvent<TType, EventEnvelope<TType, TEventMap[TType]>>;
export type PubSubEventTarget<TPubSubPublishArgsByKey extends PubSubPublishArgsByKey> =
export type PubSubEventTarget<TEventMap extends PubSubEventMap> =
TypedEventTarget<
PubSubEvent<TPubSubPublishArgsByKey, Extract<keyof TPubSubPublishArgsByKey, string>>
PubSubEvent<TEventMap>
>;
export type PubSubConfig<TPubSubPublishArgsByKey extends PubSubPublishArgsByKey> = {
eventTarget?: PubSubEventTarget<TPubSubPublishArgsByKey>;
export type PubSubConfig<TEventMap extends PubSubEventMap> = {
eventTarget?: PubSubEventTarget<TEventMap>;
};
export type PubSub<TPubSubPublishArgsByKey extends PubSubPublishArgsByKey> = {
publish<TKey extends Extract<keyof TPubSubPublishArgsByKey, string>>(
routingKey: TKey,
...args: TPubSubPublishArgsByKey[TKey]
export type PubSub<TEventMap extends PubSubEventMap> = {
publish<TType extends Extract<keyof TEventMap, string>>(
type: TType,
id: string,
payload: TEventMap[TType],
): void;
subscribe<TKey extends Extract<keyof TPubSubPublishArgsByKey, string>>(
...[routingKey, id]: TPubSubPublishArgsByKey[TKey][1] extends undefined
? [TKey]
: [TKey, TPubSubPublishArgsByKey[TKey][0]]
): Repeater<unknown>;
subscribe<TType extends Extract<keyof TEventMap, string>>(
type: TType,
id: string,
): Repeater<EventEnvelope<TType, TEventMap[TType]>>;
};
export function createPubSub<TPubSubPublishArgsByKey extends PubSubPublishArgsByKey>(
config?: PubSubConfig<TPubSubPublishArgsByKey>,
): PubSub<TPubSubPublishArgsByKey> {
export function createPubSub<TEventMap extends PubSubEventMap>(
config?: PubSubConfig<TEventMap>,
): PubSub<TEventMap> {
const target =
config?.eventTarget ?? (new EventTarget() as PubSubEventTarget<TPubSubPublishArgsByKey>);
config?.eventTarget ?? (new EventTarget() as PubSubEventTarget<TEventMap>);
return {
publish<TKey extends Extract<keyof TPubSubPublishArgsByKey, string>>(
routingKey: TKey,
...args: TPubSubPublishArgsByKey[TKey]
publish<TType extends Extract<keyof TEventMap, string>>(
type: TType,
id: string,
payload: TEventMap[TType],
) {
const payload = args[1] ?? args[0] ?? null;
const topic = args[1] === undefined ? routingKey : `${routingKey}:${args[0] as number}`;
const event = new CustomEvent(topic, { detail: payload }) as PubSubEvent<
TPubSubPublishArgsByKey,
TKey
const envelope: EventEnvelope<TType, TEventMap[TType]> = { type, id, payload };
const event = new CustomEvent(type, { detail: envelope }) as PubSubEvent<
TEventMap,
TType
>;
target.dispatchEvent(event);
},
subscribe<TKey extends Extract<keyof TPubSubPublishArgsByKey, string>>(
...[routingKey, id]: TPubSubPublishArgsByKey[TKey][1] extends undefined
? [TKey]
: [TKey, TPubSubPublishArgsByKey[TKey][0]]
): Repeater<unknown> {
const topic: TKey = (id === undefined ? routingKey : `${routingKey}:${id as number}`) as TKey;
subscribe<TType extends Extract<keyof TEventMap, string>>(
type: TType,
id: string,
): Repeater<EventEnvelope<TType, TEventMap[TType]>> {
const topic = `${type}:${id}`;
return new Repeater(function subscriptionRepeater(
next: (value: unknown) => Promise<void>,
@@ -98,11 +91,11 @@ export function createPubSub<TPubSubPublishArgsByKey extends PubSubPublishArgsBy
}
stop.then(function subscriptionRepeaterStopHandler() {
target.removeEventListener(topic, pubsubEventListener as EventListener);
target.removeEventListener(topic as TType, pubsubEventListener as EventListener);
});
target.addEventListener(topic, pubsubEventListener as EventListener, undefined);
});
target.addEventListener(topic as TType, pubsubEventListener as EventListener, undefined);
}) as Repeater<EventEnvelope<TType, TEventMap[TType]>>;
},
};
}

View File

@@ -28,10 +28,11 @@
* - Uses our TypedEventTarget/TypedEvent types from types.ts
* - Removed tslib dependency
* - Uses ioredis types directly (already a dependency)
* - Serializes full EventEnvelope for cross-process transport
*/
import type { Cluster, Redis } from "ioredis";
import type { TypedEventTarget, TypedEvent } from "./types.js";
import type { TypedEventTarget, TypedEvent, EventEnvelope } from "./types.js";
export type CreateRedisEventTargetArgs = {
publishClient: Redis | Cluster;
@@ -57,8 +58,9 @@ export function createRedisEventTarget<TEvent extends TypedEvent>(
return;
}
const envelope = serializer.parse(message) as EventEnvelope;
const event = new CustomEvent(channel, {
detail: message === "" ? null : serializer.parse(message),
detail: envelope,
}) as TEvent;
for (const callback of callbacks) {
callback(event);
@@ -102,7 +104,7 @@ export function createRedisEventTarget<TEvent extends TypedEvent>(
dispatchEvent(event: TEvent) {
publishClient.publish(
event.type,
event.detail === undefined ? "" : serializer.stringify(event.detail),
serializer.stringify(event.detail),
);
return true;
},

View File

@@ -1,4 +1,4 @@
export { createPubSub, type PubSub, type PubSubConfig, type PubSubEvent, type PubSubEventTarget, type PubSubPublishArgsByKey } from "./create_pubsub.js";
export { type TypedEvent, type TypedEventTarget, type TypedEventListener, type TypedEventListenerObject, type TypedEventListenerOrEventListenerObject } from "./types.js";
export { filter, map, pipe } from "./operators.js";
export { createPubSub, type PubSub, type PubSubConfig, type PubSubEvent, type PubSubEventTarget, type PubSubEventMap } from "./create_pubsub.js";
export { type EventEnvelope, type TypedEvent, type TypedEventTarget, type TypedEventListener, type TypedEventListenerObject, type TypedEventListenerOrEventListenerObject } from "./types.js";
export { filter, map, pipe, take, reduce, toArray, batch, dedupe, window, flat, groupBy, chain, join } from "./operators.js";
export { Repeater, RepeaterOverflowError, type Push, type Stop, type RepeaterExecutor, type RepeaterBuffer } from "./repeater.js";

View File

@@ -64,4 +64,131 @@ export function pipe(
...fns: ((arg: unknown) => unknown)[]
): unknown {
return fns.reduce((acc, fn) => fn(acc), a);
}
export async function* take<T>(
source: AsyncIterable<T>,
count: number,
): AsyncIterable<T> {
let i = 0;
for await (const value of source) {
if (i++ >= count) return;
yield value;
}
}
export async function reduce<T, U>(
source: AsyncIterable<T>,
reducer: (accumulator: U, value: T) => Promise<U> | U,
initialValue: U,
): Promise<U> {
let accumulator = initialValue;
for await (const value of source) {
accumulator = await reducer(accumulator, value);
}
return accumulator;
}
export async function toArray<T>(source: AsyncIterable<T>): Promise<T[]> {
const result: T[] = [];
for await (const value of source) {
result.push(value);
}
return result;
}
export async function* batch<T>(
source: AsyncIterable<T>,
size: number,
): AsyncIterable<T[]> {
let current: T[] = [];
for await (const value of source) {
current.push(value);
if (current.length === size) {
yield current;
current = [];
}
}
if (current.length > 0) yield current;
}
export async function* dedupe<T>(
source: AsyncIterable<T>,
): AsyncIterable<T> {
const seen = new Set<T>();
for await (const value of source) {
if (!seen.has(value)) {
seen.add(value);
yield value;
}
}
}
export async function* window<T>(
source: AsyncIterable<T>,
size: number,
step: number = 1,
): AsyncIterable<T[]> {
const buffer: T[] = [];
for await (const value of source) {
buffer.push(value);
if (buffer.length === size) {
yield [...buffer];
buffer.splice(0, step);
}
}
}
export async function* flat<T>(
source: AsyncIterable<T[]>,
): AsyncIterable<T> {
for await (const array of source) {
for (const value of array) {
yield value;
}
}
}
export async function groupBy<T, K>(
source: AsyncIterable<T>,
keyFn: (value: T) => K,
): Promise<Map<K, T[]>> {
const groups = new Map<K, T[]>();
for await (const value of source) {
const key = keyFn(value);
if (!groups.has(key)) {
groups.set(key, []);
}
groups.get(key)!.push(value);
}
return groups;
}
export async function* chain<T>(
...sources: AsyncIterable<T>[]
): AsyncIterable<T> {
for (const source of sources) {
for await (const value of source) {
yield value;
}
}
}
export async function* join<T, U, K>(
source1: AsyncIterable<T>,
source2: AsyncIterable<U>,
keyFn1: (value: T) => K,
keyFn2: (value: U) => K,
): AsyncIterable<[T, U]> {
const map2 = new Map<K, U>();
for await (const value of source2) {
const key = keyFn2(value);
map2.set(key, value);
}
for await (const value of source1) {
const key = keyFn1(value);
if (map2.has(key)) {
yield [value, map2.get(key)!];
}
}
}

View File

@@ -24,6 +24,12 @@
* SOFTWARE.
*/
export interface EventEnvelope<TType extends string = string, TPayload = unknown> {
readonly type: TType;
readonly id: string;
readonly payload: TPayload;
}
export type TypedEvent<TType extends string = string, TDetail = unknown> = Omit<
CustomEvent<TDetail>,
"detail" | "type"