From de7fc88f9991edbe0370258840caff5361c9e873 Mon Sep 17 00:00:00 2001 From: "glm-5.1" Date: Fri, 1 May 2026 19:40:25 +0000 Subject: [PATCH] Simplify to transport-only: remove call protocol, add EventEnvelope, expand stream operators MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove src/call.ts (PendingRequestMap, CallEventSchema, CallError) — call protocol belongs in @alkdev/operations - Add EventEnvelope type ({ type, id, payload }) as the cross-platform serialization contract - Simplify createPubSub: replace PubSubPublishArgsByKey tuple model with PubSubEventMap; publish(type, id, payload) and subscribe(type, id) use explicit id for topic scoping - Update Redis adapter to serialize/deserialize full EventEnvelope - Expand operators: add take, reduce, toArray, batch, dedupe, window, flat, groupBy, chain, join - Remove @alkdev/typebox runtime dependency (was only used by call.ts) - Remove ./call sub-path export from package.json and tsup config - Update all architecture docs to reflect transport-only scope, add Worker adapter, remove call protocol references - Remove docs/architecture/call-protocol.md - Update AGENTS.md with new source layout and transport-only principle --- AGENTS.md | 12 +- docs/architecture.md | 4 +- docs/architecture/README.md | 58 +++-- docs/architecture/api-surface.md | 177 +++++++++++--- docs/architecture/build-distribution.md | 42 ++-- docs/architecture/call-protocol.md | 285 ---------------------- docs/architecture/event-targets.md | 66 ++++- docs/architecture/iroh-transport.md | 10 +- docs/research/migration.md | 2 + package.json | 14 +- src/call.ts | 307 ------------------------ src/create_pubsub.ts | 85 +++---- src/event-target-redis.ts | 8 +- src/index.ts | 6 +- src/operators.ts | 127 ++++++++++ src/types.ts | 6 + tsup.config.ts | 1 - 17 files changed, 446 insertions(+), 764 deletions(-) delete mode 100644 docs/architecture/call-protocol.md delete mode 100644 src/call.ts diff --git a/AGENTS.md b/AGENTS.md index 6f91785..4fdb938 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -60,7 +60,7 @@ The plugin auto-injects `workdir` for bash commands when the session is mapped t ## Project: @alkdev/pubsub -Type-safe publish/subscribe with pluggable event target adapters (in-process, Redis, WebSocket, Iroh). Core is adapted from graphql-yoga (MIT). Dual-licensed MIT / Apache-2.0. +Type-safe publish/subscribe with pluggable event target adapters (in-process, Redis, WebSocket, Worker, Iroh). Transport layer only — no call protocol or coordination semantics. Core is adapted from graphql-yoga (MIT). Dual-licensed MIT / Apache-2.0. ### Commands @@ -74,9 +74,11 @@ Type-safe publish/subscribe with pluggable event target adapters (in-process, Re See `docs/architecture/` for full spec. Key points: +- **EventEnvelope**: Every event is `{ type, id, payload }`. This is the cross-platform serialization contract. - **Barrel + sub-path exports**: `src/index.ts` re-exports core + operators. Each adapter has its own sub-path entry (`@alkdev/pubsub/event-target-redis`, etc.). - **Peer dep isolation**: Redis and Iroh adapters are optional peer deps. Consumers only install the ones they need. - **TypedEventTarget contract**: All adapters implement the same `addEventListener`/`dispatchEvent`/`removeEventListener` interface. `createPubSub` is transport-agnostic. +- **Transport only**: No call protocol, no PendingRequestMap, no coordination. Those belong in `@alkdev/operations`. - **No comments in source**: Do not add comments to code unless explicitly asked. - **License headers**: Files adapted from graphql-yoga must preserve their MIT attribution headers. @@ -85,18 +87,22 @@ See `docs/architecture/` for full spec. Key points: ``` src/ index.ts — Barrel: re-exports core API + operators - types.ts — TypedEvent, TypedEventTarget, etc. (adapted from graphql-yoga) + types.ts — TypedEvent, TypedEventTarget, EventEnvelope (adapted from graphql-yoga) create_pubsub.ts — createPubSub factory (adapted from graphql-yoga) operators.ts — filter, map, pipe (adapted from graphql-yoga) + take, reduce, toArray, batch, dedupe, window, + flat, groupBy, chain, join (from async-utility reference) + repeater.ts — Inlined from @repeaterjs/repeater (MIT) event-target-redis.ts — createRedisEventTarget (peer dep: ioredis) # Future adapters: # event-target-websocket.ts — (peer dep: none, web standard) + # event-target-worker.ts — (peer dep: none, web standard) # event-target-iroh.ts — (peer dep: @rayhanadev/iroh) ``` ### Dependencies -Runtime: `@repeaterjs/repeater` (direct, ~3KB). +Runtime: none (Repeater is inlined, TypeBox removed). Peer (optional): `ioredis@^5.0.0` (Redis adapter), `@rayhanadev/iroh` (Iroh adapter, future). Dev: `tsup`, `typescript`, `vitest`, `@vitest/coverage-v8`, `ioredis` (for type resolution). diff --git a/docs/architecture.md b/docs/architecture.md index 2384fd7..5f7ad5f 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -5,8 +5,8 @@ | Document | Content | |----------|---------| | [architecture/README.md](architecture/README.md) | Overview, why this exists, interface contract, consumer context | -| [architecture/api-surface.md](architecture/api-surface.md) | createPubSub, PubSub types, operators | -| [architecture/event-targets.md](architecture/event-targets.md) | In-process, Redis, WebSocket adapters | +| [architecture/api-surface.md](architecture/api-surface.md) | createPubSub, EventEnvelope, PubSub types, operators | +| [architecture/event-targets.md](architecture/event-targets.md) | In-process, Redis, WebSocket, Worker adapters | | [architecture/iroh-transport.md](architecture/iroh-transport.md) | Iroh P2P QUIC transport, framing, identity, hub/spoke | | [architecture/build-distribution.md](architecture/build-distribution.md) | Dependencies, project structure, tree-shaking, sub-path exports | diff --git a/docs/architecture/README.md b/docs/architecture/README.md index d13a87e..ba246a0 100644 --- a/docs/architecture/README.md +++ b/docs/architecture/README.md @@ -1,11 +1,27 @@ --- status: draft -last_updated: 2026-04-30 +last_updated: 2026-05-01 --- # @alkdev/pubsub Architecture -Type-safe publish/subscribe with pluggable event target adapters. The core (`createPubSub` + `TypedEventTarget` + operators) has no transport dependency. Each adapter (Redis, WebSocket, Iroh) is an isolated module that only imports its own peer dependency. +Type-safe publish/subscribe with pluggable event target adapters. The core (`createPubSub` + `TypedEventTarget` + `EventEnvelope` + operators) has no transport dependency. Each adapter (Redis, WebSocket, Worker, Iroh) is an isolated module that only imports its own peer dependency. + +This package is a **transport layer only**. It carries events between processes and does not prescribe what those events mean or how downstream systems coordinate. Higher-level protocols (call/response, operation invocation, workflow coordination) belong in downstream packages like `@alkdev/operations`. + +## Core Principle + +**The TypedEventTarget interface is the contract.** All transports implement the same `addEventListener` / `dispatchEvent` / `removeEventListener` surface. `createPubSub` doesn't know or care which transport is in use — it just dispatches events to whatever `TypedEventTarget` it was given. + +**The EventEnvelope is the cross-platform format.** Every event dispatched through pubsub is `{ type, id, payload }`. This is a minimal, JSON-serializable envelope that any transport adapter can route and any downstream consumer can interpret. Domain-specific data lives in `payload`. Correlation lives in `id`. The event type lives in `type`. No `parent` field — causal relationships are managed by downstream coordination layers, not the transport. + +Swapping transports is a one-line config change: + +```ts +const pubsub = createPubSub({ + eventTarget: createRedisEventTarget({ publishClient, subscribeClient }), +}); +``` ## Why This Exists @@ -16,33 +32,27 @@ Extracted from `@alkdev/alkhub_ts/packages/core/pubsub/`, which itself was adapt 3. **Isolates peer deps** — Redis and Iroh are heavy native dependencies; consumers that don't need them shouldn't carry them 4. **Matches established pattern** — `@alkdev/taskgraph` and `@alkdev/typemap` already use the standalone-package pattern -## Core Principle - -**The TypedEventTarget interface is the contract.** All transports implement the same `addEventListener` / `dispatchEvent` / `removeEventListener` surface. `createPubSub` doesn't know or care which transport is in use — it just dispatches events to whatever `TypedEventTarget` it was given. - -This means swapping from in-process to Redis to WebSocket to Iroh is a one-line config change: - -```ts -const pubsub = createPubSub({ - eventTarget: createRedisEventTarget({ publishClient, subscribeClient }), -}); -``` - ## What This Package Provides -- **Core** — `createPubSub`, `TypedEventTarget`, `TypedEvent`, topic scoping, `filter`/`map`/`pipe` operators, `Repeater` (inlined from @repeaterjs/repeater) -- **Call protocol** (`@alkdev/pubsub/call`) — `PendingRequestMap`, `CallEventSchema`, `CallError`, event types for request/response and streaming operations +- **Core** — `createPubSub`, `TypedEventTarget`, `TypedEvent`, `EventEnvelope`, stream operators (`filter`, `map`, `pipe`, `take`, `reduce`, `toArray`, `batch`, `dedupe`, `window`, `flat`, `groupBy`, `chain`, `join`), `Repeater` (inlined from @repeaterjs/repeater) - **Adapters** (each is a peer-dep island, importable via sub-path export): - In-process (default `EventTarget`, no adapter needed) - Redis (`@alkdev/pubsub/event-target-redis`, peer dep: `ioredis`) - WebSocket (future: `@alkdev/pubsub/event-target-websocket`) + - Worker (future: `@alkdev/pubsub/event-target-worker`) - Iroh (future: `@alkdev/pubsub/event-target-iroh`, peer dep: `@rayhanadev/iroh`) +## What This Package Does NOT Provide + +- **Call protocol** — request/response coordination, `PendingRequestMap`, `CallEventSchema`, and `CallError` have been moved to `@alkdev/operations`. The pubsub transport is substrate-agnostic. +- **Workflow coordination** — causal chains, parent/child relationships, and abort cascading are domain-level concerns managed by downstream packages. +- **Abort/cancellation primitives** — these belong in the coordination layer, not the transport. The `EventEnvelope` intentionally omits a `parent` field to avoid conflating transport with coordination semantics. + ## Consumer Context ### alkhub (hub-spoke coordinator) -The hub uses pubsub for event routing between operations, runners, and the SSE interface. The event map is the call protocol — typed JSON events (`call.requested`, `call.responded`, `session.status`, etc.). Transport choice depends on deployment: +The hub uses pubsub for event routing between operations, runners, and the SSE interface. Transport choice depends on deployment: | Deployment | Transport | |------------|-----------| @@ -50,23 +60,24 @@ The hub uses pubsub for event routing between operations, runners, and the SSE i | Hub + worker processes | Redis | | Hub + remote spokes | WebSocket or Iroh | -### Future: standalone spoke SDK +### Downstream packages -Spokes will import `@alkdev/pubsub` directly to create their event target (WebSocket or Iroh) and wire it into `createPubSub`. Call protocol types and `PendingRequestMap` are available from `@alkdev/pubsub/call`. +- `@alkdev/operations` uses `createPubSub` with its own event maps for call/response coordination. It defines its own event schemas and `PendingRequestMap` on top of the pubsub transport. +- `@alkdev/taskgraph` will use pubsub events for task lifecycle notifications and workflow coordination. ## Threat Model - **Fork provenance** — core pubsub and typed event target are adapted from graphql-yoga (MIT). All original copyright notices are preserved in file headers. See [ADR-001](decisions/001-graphql-yoga-fork.md). - **Peer dep isolation** — Redis and Iroh are optional peer dependencies. A consumer that only needs in-process transport installs zero extra packages. A consumer using Redis but not Iroh installs `ioredis` only. - **Type-only imports** — `event-target-redis.ts` imports `ioredis` types only at compile time. At runtime, the consumer must provide the actual `Redis`/`Cluster` instances. +- **Minimal envelope** — the `EventEnvelope` format (`{ type, id, payload }`) is intentionally minimal and JSON-serializable. Any platform that supports JSON can produce or consume these events (Rust, Python, etc.). ## Architecture Documents | Document | Content | |----------|---------| -| [api-surface.md](api-surface.md) | createPubSub factory, PubSub types, operators, TypedEventTarget types | -| [call-protocol.md](call-protocol.md) | Call/subscribe protocol — event types, PendingRequestMap, streaming, error model, transport mapping | -| [event-targets.md](event-targets.md) | In-process, Redis, WebSocket adapters — interface, configuration, limitations | +| [api-surface.md](api-surface.md) | createPubSub factory, EventEnvelope, PubSub types, operators, TypedEventTarget types | +| [event-targets.md](event-targets.md) | In-process, Redis, WebSocket, Worker adapters — interface, configuration, limitations | | [iroh-transport.md](iroh-transport.md) | Iroh P2P QUIC transport — protocol, framing, identity, hub/spoke sides, reconnection | | [build-distribution.md](build-distribution.md) | Dependencies, project structure, tree-shaking, sub-path exports, targets | @@ -93,4 +104,5 @@ last_updated: YYYY-MM-DD - Upstream: `@graphql-yoga/subscription` and `@graphql-yoga/typed-event-target` (MIT) - alkhub pubsub-redis doc: `@alkdev/alkhub_ts/docs/architecture/pubsub-redis.md` - alkhub spoke-runner doc: `@alkdev/alkhub_ts/docs/architecture/spoke-runner.md` -- Migration research: `docs/research/migration.md` \ No newline at end of file +- Migration research: `docs/research/migration.md` +- Research: Event sourcing types — `docs/research/event_sourcing/` (not in this repo, in global workspace) \ No newline at end of file diff --git a/docs/architecture/api-surface.md b/docs/architecture/api-surface.md index b16dd55..345456a 100644 --- a/docs/architecture/api-surface.md +++ b/docs/architecture/api-surface.md @@ -1,6 +1,6 @@ --- status: draft -last_updated: 2026-04-30 +last_updated: 2026-05-01 --- # API Surface @@ -10,45 +10,60 @@ Core pubsub creation, types, and operators. No transport dependencies. ## `createPubSub` ```ts -function createPubSub( - config?: PubSubConfig, -): PubSub; +function createPubSub( + config?: PubSubConfig, +): PubSub; ``` Factory function. Accepts an optional `eventTarget` config. If none is provided, uses `new EventTarget()` (in-process). -### Topic Scoping +### Event Envelope -Topics can be scoped with an id: - -- `pubsub.publish("session.status", projectId, payload)` → dispatches to topic `session.status:{projectId}` -- `pubsub.subscribe("session.status", projectId)` → subscribes to topic `session.status:{projectId}` only -- `pubsub.publish("session.status", payload)` → dispatches to topic `session.status` (unscoped) -- `pubsub.subscribe("session.status")` → subscribes to topic `session.status` (unscoped) - -The topic string is either the routing key directly (unscoped) or `{routingKey}:{id}` (scoped). This maps naturally to Redis channel naming and WebSocket message routing. - -### `PubSubPublishArgsByKey` - -The type parameter that defines the event map: +Every event dispatched through pubsub uses the `EventEnvelope` format: ```ts -type PubSubPublishArgsByKey = { - [key: string]: [] | [unknown] | [number | string, unknown]; +interface EventEnvelope { + readonly type: TType; + readonly id: string; + readonly payload: TPayload; +} +``` + +The envelope is the cross-platform serialization contract. All transport adapters serialize/deserialize this format. Domain-specific data goes in `payload`. + +### Topic Scoping + +Topics are scoped by `id` using the `type:id` convention: + +```ts +pubsub.publish("call.responded", requestId, { output }); +// → dispatches event with CustomEvent type "call.responded:{requestId}", detail = { type, id, payload } + +const stream = pubsub.subscribe("call.responded", requestId); +// → subscribes to topic "call.responded:{requestId}" +``` + +Unlike the previous tuple-based model, `id` is always required. This simplifies the type system and makes correlation explicit. + +### `PubSubEventMap` + +The type parameter that defines the event map. Maps event type strings to their payload types: + +```ts +type PubSubEventMap = { + [eventType: string]: unknown; }; ``` -- `[]` — event with no payload (trigger only) -- `[payload]` — unscoped event with payload -- `[id, payload]` — scoped event with id and payload - ### `PubSub.subscribe()` -Returns a `Repeater` (async iterable). Consumers iterate with `for await`: +Returns a `Repeater>` (async iterable). Consumers iterate with `for await`: ```ts -for await (const payload of pubsub.subscribe("session.status")) { - // handle payload +for await (const envelope of pubsub.subscribe("session.status", sessionId)) { + // envelope.type === "session.status" + // envelope.id === sessionId + // envelope.payload === the typed payload } ``` @@ -58,21 +73,26 @@ The `Repeater` automatically cleans up its `addEventListener` when the consumer | Export | Source | Description | |--------|--------|-------------| +| `EventEnvelope` | `types.ts` | Cross-platform envelope: `{ type, id, payload }`. JSON-serializable. | | `TypedEvent` | `types.ts` | Event with typed `type` and `detail`. Omits `CustomEvent`'s untyped fields. | | `TypedEventTarget` | `types.ts` | Extends `EventTarget` with typed `addEventListener`, `dispatchEvent`, `removeEventListener`. | | `TypedEventListener` | `types.ts` | `(evt: TEvent) => void` | | `TypedEventListenerObject` | `types.ts` | `{ handleEvent(object: TEvent): void }` | | `TypedEventListenerOrEventListenerObject` | `types.ts` | Union of the above | -| `PubSub` | `create_pubsub.ts` | `{ publish, subscribe }` | -| `PubSubConfig` | `create_pubsub.ts` | `{ eventTarget?: PubSubEventTarget }` | -| `PubSubEvent` | `create_pubsub.ts` | Derived `TypedEvent` for a specific event key | -| `PubSubEventTarget` | `create_pubsub.ts` | `TypedEventTarget>` | +| `PubSub` | `create_pubsub.ts` | `{ publish, subscribe }` — publish takes `(type, id, payload)`, subscribe takes `(type, id)` and returns `Repeater` | +| `PubSubConfig` | `create_pubsub.ts` | `{ eventTarget?: PubSubEventTarget }` | +| `PubSubEvent` | `create_pubsub.ts` | Derived `TypedEvent` for a specific event type, with `detail` as `EventEnvelope` | +| `PubSubEventTarget` | `create_pubsub.ts` | `TypedEventTarget>` | ## Operators -All operators return `Repeater` instances and work with any async iterable. +All operators work with any `AsyncIterable`. Operators that return `Repeater` provide backpressure-aware push semantics. -### `filter` +### Repeater-returning operators + +These wrap source iterables in a `Repeater` with explicit push/stop control: + +#### `filter` ```ts function filter(filterFn: (value: T) => Promise | boolean): (source: AsyncIterable) => Repeater; @@ -80,22 +100,105 @@ function filter(filterFn: (value: T) => Promise | boolean): (source: Type-narrowing overload available: `filter(fn: (input: T) => input is U)`. -### `map` +#### `map` ```ts function map(mapper: (input: T) => Promise | O): (source: AsyncIterable) => Repeater; ``` -### `pipe` +#### `pipe` ```ts function pipe(a: A, ab: (a: A) => B): B; -function pipe(a: A, ab: (a: A) => B, bc: (b: B) => C): C; // up to 5 arguments ``` -Compose operators: `pipe(pubsub.subscribe("myEvent"), filter(isRelevant), map(transform))` +Compose operators: `pipe(pubsub.subscribe("myEvent", id), filter(isRelevant), map(transform))` + +### AsyncGenerator operators + +These use native `async function*` generators for simpler stream transformations: + +#### `take` + +Yields only the first `count` items from the source. + +```ts +async function* take(source: AsyncIterable, count: number): AsyncIterable +``` + +#### `reduce` + +Reduces the stream to a single value. + +```ts +async function reduce(source: AsyncIterable, reducer: (acc: U, value: T) => Promise | U, initialValue: U): Promise +``` + +#### `toArray` + +Collects all items into an array. + +```ts +async function toArray(source: AsyncIterable): Promise +``` + +#### `batch` + +Groups items into arrays of `size`. + +```ts +async function* batch(source: AsyncIterable, size: number): AsyncIterable +``` + +#### `dedupe` + +Yields only unique items (uses `Set` for deduplication). + +```ts +async function* dedupe(source: AsyncIterable): AsyncIterable +``` + +#### `window` + +Sliding window of `size` items, advancing by `step` (default 1). + +```ts +async function* window(source: AsyncIterable, size: number, step?: number): AsyncIterable +``` + +#### `flat` + +Flattens an `AsyncIterable` into `AsyncIterable`. + +```ts +async function* flat(source: AsyncIterable): AsyncIterable +``` + +#### `groupBy` + +Groups items by key into a `Map`. Terminal operation (consumes entire stream). + +```ts +async function groupBy(source: AsyncIterable, keyFn: (value: T) => K): Promise> +``` + +#### `chain` + +Concatenates multiple async iterables into one. + +```ts +async function* chain(...sources: AsyncIterable[]): AsyncIterable +``` + +#### `join` + +Streaming join between two sources on matching keys. + +```ts +async function* join(source1: AsyncIterable, source2: AsyncIterable, keyFn1: (value: T) => K, keyFn2: (value: U) => K): AsyncIterable<[T, U]> +``` ## Attribution -`createPubSub` and operators are adapted from `@graphql-yoga/subscription` (MIT). `TypedEventTarget` types are adapted from `@graphql-yoga/typed-event-target` (MIT). See file headers for full license text. \ No newline at end of file +`createPubSub`, `filter`, `map`, and `pipe` are adapted from `@graphql-yoga/subscription` (MIT). `TypedEventTarget` types are adapted from `@graphql-yoga/typed-event-target` (MIT). `Repeater` is inlined from `@repeaterjs/repeater` (MIT). See file headers for full license text. \ No newline at end of file diff --git a/docs/architecture/build-distribution.md b/docs/architecture/build-distribution.md index b3d18c9..48f3030 100644 --- a/docs/architecture/build-distribution.md +++ b/docs/architecture/build-distribution.md @@ -1,6 +1,6 @@ --- status: draft -last_updated: 2026-04-30 +last_updated: 2026-05-01 --- # Build & Distribution @@ -9,32 +9,38 @@ Dependencies, project structure, tree-shaking, sub-path exports, and build targe ## Dependencies +No runtime dependencies. The `Repeater` class is inlined from `@repeaterjs/repeater` (MIT) — no external package required. + | Package | Type | Purpose | |---------|------|---------| -| `@repeaterjs/repeater` | direct | Small (~3KB). Core async iterable primitive for `subscribe()`. | +| (none) | runtime | — | | `ioredis` | peer (optional) | Redis client. Only imported by `event-target-redis.ts`. Type-only import at compile time. | | `@rayhanadev/iroh` | peer (optional, future) | Iroh NAPI-RS binding. Only imported by `event-target-iroh.ts`. | -No other external dependencies. No logger dependency. +No logger dependency. No TypeBox dependency (call protocol and schemas moved to `@alkdev/operations`). ## Project Structure ``` @alkdev/pubsub/ src/ - index.ts # Barrel: re-exports core API - types.ts # TypedEvent, TypedEventTarget, etc. - create_pubsub.ts # createPubSub factory - operators.ts # filter, map, pipe - event-target-redis.ts # createRedisEventTarget (peer dep: ioredis) + index.ts # Barrel: re-exports core API + operators + types.ts # TypedEvent, TypedEventTarget, EventEnvelope + create_pubsub.ts # createPubSub factory (adapted from graphql-yoga) + operators.ts # filter, map, pipe, take, reduce, toArray, + # batch, dedupe, window, flat, groupBy, chain, join + repeater.ts # Inlined from @repeaterjs/repeater (MIT) + event-target-redis.ts # createRedisEventTarget (peer dep: ioredis) # Future adapters (each is its own entry point + peer dep island): - # event-target-websocket.ts # peer dep: none (web standard) - # event-target-iroh.ts # peer dep: @rayhanadev/iroh + # event-target-websocket.ts # (peer dep: none, web standard) + # event-target-worker.ts # (peer dep: none, web standard) + # event-target-iroh.ts # (peer dep: @rayhanadev/iroh) test/ create_pubsub.test.ts operators.test.ts event-target-redis.test.ts # event-target-websocket.test.ts + # event-target-worker.test.ts # event-target-iroh.test.ts docs/ architecture.md @@ -56,6 +62,7 @@ We use explicit sub-path exports rather than barrel-only + tree-shaking. Each ad ".": { ... }, "./event-target-redis": { ... }, "./event-target-websocket": { ... }, + "./event-target-worker": { ... }, "./event-target-iroh": { ... } } } @@ -86,21 +93,6 @@ Optional peer deps means `npm install @alkdev/pubsub` does NOT install ioredis o - **Target**: `es2022` - **Splitting**: enabled (tsup code splitting for shared chunks) -```ts -// tsup.config.ts -import { defineConfig } from 'tsup'; - -export default defineConfig({ - entry: ['src/index.ts', 'src/event-target-redis.ts'], - format: ['esm', 'cjs'], - dts: true, - sourcemap: true, - clean: true, - splitting: true, - target: 'es2022', -}); -``` - ## Testing - **Runner**: `vitest` — matches taskgraph, natural fit with tsup/Node build pipeline diff --git a/docs/architecture/call-protocol.md b/docs/architecture/call-protocol.md deleted file mode 100644 index f66556d..0000000 --- a/docs/architecture/call-protocol.md +++ /dev/null @@ -1,285 +0,0 @@ ---- -status: draft -last_updated: 2026-04-30 ---- - -# Call Protocol - -Unified event-based protocol for request/response and streaming operations. Built on `@alkdev/pubsub`'s `TypedEventTarget` and `Repeater` primitives. - -## Overview - -The call protocol provides a single event-based mechanism that works identically whether the operation is local (in-process), remote (hub/spoke over WebSocket or Iroh), or streamed (subscription). It is transport-agnostic — the same event shapes, same `requestId` correlation, same `PendingRequestMap`. Only the `EventTarget` changes. - -Two consumption patterns share the same protocol: - -- **`call()`**: Publish `call.requested`, subscribe to response events scoped by `requestId`, resolve on first response → `Promise` -- **`subscribe()`**: Publish `call.requested`, subscribe to `call.part` events scoped by `requestId`, yield each part until `call.completed` or `call.error` → `Repeater` - -Both use `call.requested` as the trigger. The `operationId` and `operation.type` on the handler side determine which pattern applies. The protocol itself doesn't distinguish — it's the handler that decides whether to respond once (`respond()`) or stream (`part()` + `complete()`). - -## Event Types - -All events use TypeBox schemas, compatible with `@alkdev/pubsub`'s `PubSubPublishArgsByKey`. Schemas are exported as `CallEventSchema` for runtime validation. - -### `CallEventSchema` - -```ts -const CallEventSchema = { - "call.requested": Type.Object({ - requestId: Type.String(), - operationId: Type.String(), - input: Type.Unknown(), - parentRequestId: Type.Optional(Type.String()), - deadline: Type.Optional(Type.Number()), - identity: Type.Optional(Type.Object({ - id: Type.String(), - scopes: Type.Array(Type.String()), - resources: Type.Optional(Type.Record(Type.String(), Type.Array(Type.String()))), - })), - }), - "call.responded": Type.Object({ - requestId: Type.String(), - output: Type.Unknown(), - }), - "call.part": Type.Object({ - requestId: Type.String(), - output: Type.Unknown(), - index: Type.Optional(Type.Number()), - }), - "call.completed": Type.Object({ - requestId: Type.String(), - }), - "call.aborted": Type.Object({ - requestId: Type.String(), - }), - "call.error": Type.Object({ - requestId: Type.String(), - code: Type.String(), - message: Type.String(), - details: Type.Optional(Type.Unknown()), - }), -} -``` - -### Topic Scoping - -Response events are scoped by `requestId` using pubsub's built-in topic scoping: - -| Event | Publish | Subscribe (caller) | Subscribe (handler) | -|-------|---------|--------------------|--------------------| -| `call.requested` | `pubsub.publish("call.requested", event)` | — | Unscoped: `pubsub.subscribe("call.requested")` | -| `call.responded` | `pubsub.publish("call.responded", requestId, event)` | Scoped: `pubsub.subscribe("call.responded", requestId)` | — | -| `call.part` | `pubsub.publish("call.part", requestId, event)` | Scoped: `pubsub.subscribe("call.part", requestId)` | — | -| `call.completed` | `pubsub.publish("call.completed", requestId, event)` | Scoped: `pubsub.subscribe("call.completed", requestId)` | — | -| `call.aborted` | `pubsub.publish("call.aborted", requestId, event)` | Scoped: `pubsub.subscribe("call.aborted", requestId)` | Scoped: `pubsub.subscribe("call.aborted", requestId)` | -| `call.error` | `pubsub.publish("call.error", requestId, event)` | Scoped: `pubsub.subscribe("call.error", requestId)` | — | - -This gives every `requestId` its own event channel. On Redis, this maps to `call.responded:{uuid}` channels. On WebSocket or Iroh, the topic string is a routing key. In-process, it's a `CustomEvent` with `type: "call.responded:{uuid}"`. - -**Why scoped instead of unscoped + manual matching?** Scoped topics avoid O(n) fanout. A caller only receives events for its own request. This matters especially on Redis (pub/sub channels) and Iroh (topic订阅), where unscoped subscriptions would deliver every response to every listener. - -## Event Flow - -### Call (request/response) - -``` -Caller Handler - │ │ - │─── call.requested ───────────────>│ - │ {requestId, operationId, │ - │ input, identity, deadline} │ - │ │ - │<── call.responded:{requestId} ────│ - │ {requestId, output} │ -``` - -On error: - -``` - │<── call.error:{requestId} ───────│ - │ {requestId, code, message, │ - │ details} │ -``` - -On timeout or caller cancellation: - -``` - │─── call.aborted:{requestId} ────>│ - │ {requestId} │ -``` - -### Subscribe (request/stream) - -``` -Caller Handler - │ │ - │─── call.requested ───────────────>│ - │ {requestId, operationId, │ - │ input, identity} │ - │ │ - │<── call.part:{requestId} ────────│ - │ {requestId, output, index?} │ - │ │ - │<── call.part:{requestId} ────────│ - │ {requestId, output, index?} │ - │ │ - │<── call.completed:{requestId} ────│ ← stream ends normally - │ {requestId} │ -``` - -On stream error: - -``` - │<── call.error:{requestId} ───────│ - │ {requestId, code, message} │ -``` - -On caller cancellation (consumer breaks out of `for await`): - -``` - │─── call.aborted:{requestId} ────>│ - │ {requestId} │ -``` - -### Nesting - -Nested calls include `parentRequestId` to track the call chain: - -``` - │─── call.requested ───────────────>│ {requestId: A, parentRequestId: P} -``` - -This enables call graph reconstruction and abort cascading — every nested call includes its parent's `requestId`. - -## `PendingRequestMap` - -The primary consumer interface. Wraps `createPubSub` internally and manages the full call/subscribe lifecycle. - -### Construction - -```ts -const callMap = new PendingRequestMap(eventTarget?) -``` - -- Creates an internal `PubSub` -- If `eventTarget` is provided, passes it to `createPubSub` for transport-level event routing - -### `call(operationId, input, options?)` → `Promise` - -1. Generate `requestId` via `crypto.randomUUID()` -2. Subscribe to `call.responded:{requestId}`, `call.error:{requestId}`, `call.aborted:{requestId}` (scoped) -3. If `deadline` is set, start a timeout timer that publishes `call.aborted` on expiry -4. Publish `call.requested` -5. Return a Promise — resolves on `call.responded`, rejects on `call.error` or `call.aborted` -6. Cleanup: close all scoped subscriptions on settlement - -### `subscribe(operationId, input, options?)` → `Repeater` - -1. Generate `requestId` via `crypto.randomUUID()` -2. Publish `call.requested` -3. Create scoped subscriptions: `call.part:{requestId}`, `call.completed:{requestId}`, `call.error:{requestId}` -4. Return a `Repeater` that: - - Yields `output` from each `call.part` event - - Completes on `call.completed` - - Rejects on `call.error` - - On consumer break (Repeater `stop`), publishes `call.aborted:{requestId}` and closes all subscriptions - -This means consumers can use operators: - -```ts -const stream = callMap.subscribe("events.live", { topic: "sensors" }); -const filtered = pipe(stream, filter(isRelevant), map(transform)); -for await (const value of filtered) { - // handle each filtered/mapped stream value -} -``` - -### Handler-side methods - -| Method | Description | -|--------|-------------| -| `respond(requestId, output)` | Publish `call.responded:{requestId}` — single response for call | -| `part(requestId, output, index?)` | Publish `call.part:{requestId}` — next chunk in subscription stream | -| `complete(requestId)` | Publish `call.completed:{requestId}` — stream ended normally | -| `emitError(requestId, code, message, details?)` | Publish `call.error:{requestId}` — error response | -| `abort(requestId)` | Publish `call.aborted:{requestId}` — caller cancellation | - -## Transport Mapping - -Same protocol, same event shapes, same `PendingRequestMap` — different `EventTarget`: - -| Transport | Use Case | EventTarget impl | -|-----------|----------|-----------------| -| In-process | Local operations | Browser `EventTarget` (default) | -| Redis | Cross-process events | `RedisEventTarget` from `@alkdev/pubsub/event-target-redis` | -| WebSocket | Hub ↔ spoke bidirectional | `WebSocketEventTarget` (future) | -| Iroh | P2P QUIC | `IrohEventTarget` (future) | -| SSE | Server → client streaming | `SSEEventTarget` (future) | - -## Error Model - -### `CallError` - -```ts -class CallError extends Error { - readonly code: string; - readonly details?: unknown; -} -``` - -### Infrastructure Error Codes - -| Code | When | Details | -|------|------|---------| -| `OPERATION_NOT_FOUND` | No operation matches `operationId` | `{ operationId: string }` | -| `ACCESS_DENIED` | Missing scopes | `{ requiredScopes?: string[] }` | -| `VALIDATION_ERROR` | Input fails schema check | Wrapped from `Value.Errors` | -| `TIMEOUT` | Deadline exceeded | `{ deadline: number }` | -| `ABORTED` | Call/stream cancelled | — | -| `EXECUTION_ERROR` | Handler threw, no `errorSchemas` match | `{ message: string }` | -| `UNKNOWN_ERROR` | Non-Error thrown | `{ raw: string }` | - -## TypeBox Schemas and Validation - -All event shapes are defined as TypeBox schemas in `CallEventSchema`. Consumers can use `Value.Check()` or `Value.Errors()` from `@alkdev/typebox` for runtime validation: - -```ts -import { Value } from "@alkdev/typebox"; -import { CallEventSchema } from "@alkdev/pubsub/call"; - -if (!Value.Check(CallEventSchema["call.requested"], incoming)) { - const errors = [...Value.Errors(CallEventSchema["call.requested"], incoming)]; - // reject with VALIDATION_ERROR -} -``` - -This enables validation on the Iroh and SSE transports where incoming data is untrusted JSON. - -## Relationship to `@alkdev/operations` - -`@alkdev/operations` provides the `OperationRegistry`, access control, and handler dispatch. It uses `@alkdev/pubsub/call` for: - -- `PendingRequestMap` — call/subscribe client interface -- `CallEventSchema` — runtime validation of incoming events -- `CallError` and `CallErrorCode` — error construction and matching -- Type exports — `CallRequestedEvent`, etc. for handler signatures - -The `CallHandler` in operations receives `call.requested` events, looks up the operation, validates input, checks access, and dispatches to the handler. For query/mutation handlers, it calls `respond()`. For subscription handlers, it calls `part()` and `complete()`. - -## Operators and Stream Composition - -Since `subscribe()` returns a `Repeater` (which implements `AsyncIterable`), all pubsub operators work on streams: - -```ts -import { pipe, filter, map } from "@alkdev/pubsub"; - -const stream = callMap.subscribe("events.live", { topic: "sensors" }); -const filtered = pipe( - stream, - filter((e) => e.priority > 5), - map((e) => ({ ...e, enriched: true })), -); -``` - -This works the same regardless of whether the stream source is in-process, remote via Redis, or remote via Iroh/SSE. \ No newline at end of file diff --git a/docs/architecture/event-targets.md b/docs/architecture/event-targets.md index 1c57ba5..7f973f9 100644 --- a/docs/architecture/event-targets.md +++ b/docs/architecture/event-targets.md @@ -1,11 +1,11 @@ --- status: draft -last_updated: 2026-04-30 +last_updated: 2026-05-01 --- # Event Target Adapters -In-process, Redis, and WebSocket event targets. All implement `TypedEventTarget`. +In-process, Redis, WebSocket, and Worker event targets. All implement `TypedEventTarget`. ## Interface Contract @@ -13,10 +13,12 @@ Every adapter must implement: | Method | Behavior | |--------|----------| -| `addEventListener(type, callback)` | Register listener for event type. Callback receives `CustomEvent` with typed `detail`. | +| `addEventListener(type, callback)` | Register listener for event type. Callback receives `CustomEvent` with typed `detail` (an `EventEnvelope`). | | `dispatchEvent(event)` | Send/dispatch event. Returns `boolean` (always `true` for non-cancelable events). | | `removeEventListener(type, callback)` | Unregister listener. Clean up underlying subscription when no listeners remain for a topic. | +All adapters use the `EventEnvelope` format (`{ type, id, payload }`) as the serialization contract. Adapters that cross process boundaries (Redis, WebSocket, Iroh) serialize/deserialize the full envelope as JSON. + ## In-Process (Default) No adapter needed. `createPubSub` uses `new EventTarget()` by default. This works for single-process deployments where all pubsub participants share the same memory. @@ -49,10 +51,13 @@ function createRedisEventTarget( - `dispatchEvent` → `publishClient.publish(event.type, serializer.stringify(event.detail))` - `addEventListener` → `subscribeClient.subscribe(topic)`, track callbacks per topic - `removeEventListener` → remove callback; if no callbacks remain for topic, `subscribeClient.unsubscribe(topic)` +- On message: deserializes with `serializer.parse`, reconstructs `CustomEvent(channel, { detail: envelope })` + +The `detail` of the `CustomEvent` dispatched to local listeners is the full `EventEnvelope` object (`{ type, id, payload }`). ### Channel Naming -Currently uses raw event type as Redis channel name (e.g., `session.status:proj_123`). Architecture recommends `alk:events:{eventType}` prefix but this is not yet implemented. Should be configurable: `createRedisEventTarget({ ..., prefix: "alk:events:" })`. +Currently uses the topic string directly as the Redis channel name (e.g., `session.status:proj_123`). Architecture recommends `alk:events:{eventType}` prefix but this is not yet implemented. Should be configurable: `createRedisEventTarget({ ..., prefix: "alk:events:" })`. ### Limitations (Current) @@ -62,7 +67,7 @@ Currently uses raw event type as Redis channel name (e.g., `session.status:proj_ ### Test Coverage -5 tests in alkhub (publish path only, mocked ioredis). No tests for subscription-receive path, unsubscribe cleanup, or error handling. +No tests yet (test directory is empty). Previous alkhub had 5 Redis tests (publish path only, mocked ioredis). ## WebSocket @@ -77,16 +82,17 @@ class WebSocketEventTarget implements TypedEventTarget { constructor(private ws: WebSocket) { ws.onmessage = (msg) => { - const { type, payload } = JSON.parse(msg.data as string) - const event = new CustomEvent(type, { detail: payload }) - for (const listener of this.listeners.get(type) ?? []) { + const envelope = JSON.parse(msg.data as string) // { type, id, payload } + const topic = `${envelope.type}:${envelope.id}` + const event = new CustomEvent(topic, { detail: envelope }) + for (const listener of this.listeners.get(topic) ?? []) { listener(event) } } } dispatchEvent(event: CustomEvent): boolean { - this.ws.send(JSON.stringify({ type: event.type, payload: event.detail })) + this.ws.send(JSON.stringify(event.detail)) // sends { type, id, payload } return true } @@ -101,11 +107,49 @@ class WebSocketEventTarget implements TypedEventTarget { - **Per-connection** — hub creates one per spoke connection - **JSON framing** — WebSocket provides native message boundaries (no length-prefix needed) - **No native deps** — works in browsers and Node +- **Envelope serialization** — sends/receives the full `EventEnvelope` JSON (`{ type, id, payload }`) ### Gap: Reconnection WebSocket connections drop. On reconnect, the spoke must re-register with the hub (same `hub.register` flow). The `WebSocketEventTarget` itself is per-connection — a new connection means a new event target instance. Reconnection logic belongs to the spoke lifecycle, not the event target. -### Gap: Hub-Side Architecture +## Worker -The hub needs per-connection event target + `PendingRequestMap` creation on accept, cleanup on disconnect. This is a hub architectural concern, not a pubsub concern. See `@alkdev/alkhub_ts/docs/architecture/spoke-runner.md`. \ No newline at end of file +**Import**: `@alkdev/pubsub/event-target-worker` (not yet implemented) +**Peer dep**: none (Web Worker API is standard) + +### Design + +A `WorkerEventTarget` implementing `TypedEventTarget` over `postMessage`/`onmessage`. This enables `createPubSub` to work across Web Worker boundaries. + +The worker message protocol uses the `EventEnvelope` format: + +```json +{ "type": "call.responded", "id": "uuid-123", "payload": { "output": 42 } } +``` + +### Two-Sided Design + +- **Main thread** (`WorkerPoolManager` side): dispatches typed messages to workers via `worker.postMessage()`, receives responses via `worker.onmessage` +- **Worker thread**: dispatches to main thread via `parentPort.postMessage()`, receives from main thread via `globalThis.onmessage` + +Both sides wrap `postMessage`/`onmessage` to implement the `TypedEventTarget` interface: + +```ts +// Main thread side +const workerEventTarget = createWorkerEventTarget(worker); + +// Worker thread side +const mainEventTarget = createMainThreadEventTarget(); +``` + +### Key Properties + +- **Bidirectional** — both sides can publish and subscribe +- **Per-worker** — each worker gets its own event target +- **Structured clone** — Web Workers use structured clone algorithm for serialization, but JSON-serializable `EventEnvelope` ensures cross-platform compatibility +- **No native deps** — works in any environment with Web Worker support + +### Relationship to Taskgraph / Operations + +The worker event target enables distributed operation execution. Workers can subscribe to `call.requested` events and publish `call.responded` events through the event target, allowing `@alkdev/operations` to dispatch work to worker threads via the same pubsub transport. \ No newline at end of file diff --git a/docs/architecture/iroh-transport.md b/docs/architecture/iroh-transport.md index 8fc549b..fe7c3de 100644 --- a/docs/architecture/iroh-transport.md +++ b/docs/architecture/iroh-transport.md @@ -1,6 +1,6 @@ --- status: draft -last_updated: 2026-04-30 +last_updated: 2026-05-01 --- # Iroh Transport @@ -36,7 +36,7 @@ Using `@rayhanadev/iroh` (v0.1.1) as the NAPI-RS binding. Community binding, one | `Connection.remoteNodeId()` | Get peer's public key | | `Connection.sendDatagram()` / `readDatagram()` | Unreliable datagrams | -Not exposed (not critical): `Endpoint.watch_addr()`, `Connection.close_reason()`, `Connection.stats()`. +Not exposed (not critical): `Endpoint.watchAddr()`, `Connection.close_reason()`, `Connection.stats()`. ## Protocol @@ -54,13 +54,13 @@ QUIC streams are byte streams (no message boundaries). We use 4-byte big-endian ### Message Format -Same `type` + `detail` shape as all other transports: +All transports use the `EventEnvelope` format: ```json -{ "type": "call.requested", "detail": { ... } } +{ "type": "call.responded", "id": "uuid-123", "payload": { "output": 42 } } ``` -Maps directly to `new CustomEvent(type, { detail })`. +On the wire, this serializes as the JSON payload after the length prefix. When received, it maps to `new CustomEvent("call.responded:uuid-123", { detail: envelope })`. ## Two-Sided Design diff --git a/docs/research/migration.md b/docs/research/migration.md index 2b6aef8..29c336c 100644 --- a/docs/research/migration.md +++ b/docs/research/migration.md @@ -1,5 +1,7 @@ # Research: `@alkdev/pubsub` Package Extraction +> **Note (2026-05-01):** This document reflects the original migration from alkhub_ts. Since extraction, the architecture has been simplified — `call.ts` (PendingRequestMap, CallEventSchema, CallError) has been removed from this package and moved to `@alkdev/operations`. The `EventEnvelope` type (`{ type, id, payload }`) is now the cross-platform serialization contract. The `PubSubPublishArgsByKey` tuple-based model has been replaced with a simpler `PubSubEventMap`. Stream operators have been expanded beyond `filter`/`map`/`pipe`. See `docs/architecture/` for current spec. + ## Goal Extract `packages/core/pubsub/` into a standalone `@alkdev/pubsub` package, following the same peer-dependency tree-shaking pattern as `@alkdev/typemap`. Each event target adapter (Redis, WebSocket, Iroh) is an isolated module that only imports its own peer dependency. The core `createPubSub + TypedEventTarget + operators` has no peer deps beyond `@repeaterjs/repeater`. diff --git a/package.json b/package.json index 5b48d36..8700c50 100644 --- a/package.json +++ b/package.json @@ -26,16 +26,6 @@ "types": "./dist/event-target-redis.d.cts", "default": "./dist/event-target-redis.cjs" } - }, - "./call": { - "import": { - "types": "./dist/call.d.ts", - "default": "./dist/call.js" - }, - "require": { - "types": "./dist/call.d.cts", - "default": "./dist/call.cjs" - } } }, "publishConfig": { @@ -62,9 +52,7 @@ "quic" ], "license": "MIT OR Apache-2.0", - "dependencies": { - "@alkdev/typebox": "^0.34.49" - }, + "dependencies": {}, "peerDependencies": { "ioredis": "^5.0.0" }, diff --git a/src/call.ts b/src/call.ts deleted file mode 100644 index 3206bd5..0000000 --- a/src/call.ts +++ /dev/null @@ -1,307 +0,0 @@ -import { Type, type Static } from "@alkdev/typebox"; -import { createPubSub, type PubSub } from "./create_pubsub.js"; -import { Repeater, type Push, type Stop } from "./repeater.js"; - -export const CallEventSchema = { - "call.requested": Type.Object({ - requestId: Type.String(), - operationId: Type.String(), - input: Type.Unknown(), - parentRequestId: Type.Optional(Type.String()), - deadline: Type.Optional(Type.Number()), - identity: Type.Optional(Type.Object({ - id: Type.String(), - scopes: Type.Array(Type.String()), - resources: Type.Optional(Type.Record(Type.String(), Type.Array(Type.String()))), - })), - }), - "call.responded": Type.Object({ - requestId: Type.String(), - output: Type.Unknown(), - }), - "call.part": Type.Object({ - requestId: Type.String(), - output: Type.Unknown(), - index: Type.Optional(Type.Number()), - }), - "call.completed": Type.Object({ - requestId: Type.String(), - }), - "call.aborted": Type.Object({ - requestId: Type.String(), - }), - "call.error": Type.Object({ - requestId: Type.String(), - code: Type.String(), - message: Type.String(), - details: Type.Optional(Type.Unknown()), - }), -} as const; - -export type CallRequestedEvent = Static; -export type CallRespondedEvent = Static; -export type CallPartEvent = Static; -export type CallCompletedEvent = Static; -export type CallAbortedEvent = Static; -export type CallErrorEvent = Static; - -type CallPubSubMap = { - "call.requested": [CallRequestedEvent]; - "call.responded": [string, CallRespondedEvent]; - "call.part": [string, CallPartEvent]; - "call.completed": [string, CallCompletedEvent]; - "call.aborted": [string, CallAbortedEvent]; - "call.error": [string, CallErrorEvent]; -}; - -export const CallErrorCode = { - OPERATION_NOT_FOUND: "OPERATION_NOT_FOUND", - ACCESS_DENIED: "ACCESS_DENIED", - VALIDATION_ERROR: "VALIDATION_ERROR", - TIMEOUT: "TIMEOUT", - ABORTED: "ABORTED", - EXECUTION_ERROR: "EXECUTION_ERROR", - UNKNOWN_ERROR: "UNKNOWN_ERROR", -} as const; - -export type CallErrorCodeType = (typeof CallErrorCode)[keyof typeof CallErrorCode]; - -export class CallError extends Error { - readonly code: string; - readonly details?: unknown; - - constructor(code: string, message: string, details?: unknown) { - super(message); - this.name = "CallError"; - this.code = code; - this.details = details; - } -} - -interface PendingRequest { - resolve: (value: unknown) => void; - reject: (reason: unknown) => void; - deadline?: number; - timer?: ReturnType; - unsubscribe: () => void; -} - -export class PendingRequestMap { - private requests = new Map(); - private pubsub: PubSub; - - constructor(eventTarget?: EventTarget) { - this.pubsub = createPubSub( - eventTarget ? { eventTarget: eventTarget as any } : undefined, - ); - } - - async call( - operationId: string, - input: unknown, - options?: { parentRequestId?: string; deadline?: number; identity?: CallRequestedEvent["identity"] }, - ): Promise { - const requestId = crypto.randomUUID(); - - const respondedIter = this.pubsub.subscribe("call.responded", requestId); - const errorIter = this.pubsub.subscribe("call.error", requestId); - const abortedIter = this.pubsub.subscribe("call.aborted", requestId); - - const cleanup = (): void => { - respondedIter.return?.(); - errorIter.return?.(); - abortedIter.return?.(); - }; - - let timer: ReturnType | undefined; - if (options?.deadline) { - timer = setTimeout(() => { - cleanup(); - this.pubsub.publish("call.aborted", requestId, { requestId }); - }, options.deadline - Date.now()); - } - - this.pubsub.publish("call.requested", { - requestId, - operationId, - input, - parentRequestId: options?.parentRequestId, - deadline: options?.deadline, - identity: options?.identity, - }); - - try { - const result = await new Promise((resolve, reject) => { - const pending: PendingRequest = { - resolve: (value: unknown) => { - if (timer) clearTimeout(timer); - cleanup(); - resolve(value); - }, - reject: (reason: unknown) => { - if (timer) clearTimeout(timer); - cleanup(); - reject(reason); - }, - deadline: options?.deadline, - timer, - unsubscribe: cleanup, - }; - - this.requests.set(requestId, pending); - - (async () => { - for await (const event of respondedIter) { - const responded = event as CallRespondedEvent; - const p = this.requests.get(responded.requestId); - if (p) { - this.requests.delete(responded.requestId); - p.resolve(responded.output); - } - return; - } - })(); - - (async () => { - for await (const event of errorIter) { - const err = event as CallErrorEvent; - const p = this.requests.get(err.requestId); - if (p) { - this.requests.delete(err.requestId); - p.reject(new CallError(err.code, err.message, err.details)); - } - return; - } - })(); - - (async () => { - for await (const event of abortedIter) { - const aborted = event as CallAbortedEvent; - const p = this.requests.get(aborted.requestId); - if (p) { - this.requests.delete(aborted.requestId); - p.reject(new CallError(CallErrorCode.ABORTED, `Request ${aborted.requestId} was aborted`)); - } - return; - } - })(); - }); - - return result; - } finally { - if (timer) clearTimeout(timer); - } - } - - subscribe( - operationId: string, - input: unknown, - options?: { parentRequestId?: string; deadline?: number; identity?: CallRequestedEvent["identity"] }, - ): Repeater { - const requestId = crypto.randomUUID(); - const map = this; - - return new Repeater(async function (push: Push, stop: Stop) { - map.pubsub.publish("call.requested", { - requestId, - operationId, - input, - parentRequestId: options?.parentRequestId, - deadline: options?.deadline, - identity: options?.identity, - }); - - const partIter = map.pubsub.subscribe("call.part", requestId); - const completedIter = map.pubsub.subscribe("call.completed", requestId); - const errorIter = map.pubsub.subscribe("call.error", requestId); - - let settled = false; - - const cleanup = (): void => { - if (!settled) { - settled = true; - map.pubsub.publish("call.aborted", requestId, { requestId }); - } - partIter.return?.(); - completedIter.return?.(); - errorIter.return?.(); - }; - - stop.then(cleanup); - - try { - const partPromise = (async (): Promise => { - for await (const event of partIter) { - const part = event as CallPartEvent; - await push(part.output); - } - throw new Error("part stream ended unexpectedly"); - })(); - - const completedPromise = (async () => { - for await (const _ of completedIter) { - return; - } - })(); - - const errorPromise = (async (): Promise => { - for await (const event of errorIter) { - const err = event as CallErrorEvent; - throw new CallError(err.code, err.message, err.details); - } - throw new Error("error stream ended unexpectedly"); - })(); - - await Promise.race([completedPromise, errorPromise, partPromise]); - } finally { - cleanup(); - stop(); - } - }); - } - - respond(requestId: string, output: unknown): void { - this.pubsub.publish("call.responded", requestId, { - requestId, - output, - }); - } - - part(requestId: string, output: unknown, index?: number): void { - this.pubsub.publish("call.part", requestId, { - requestId, - output, - index, - }); - } - - complete(requestId: string): void { - this.pubsub.publish("call.completed", requestId, { requestId }); - } - - emitError(requestId: string, code: string, message: string, details?: unknown): void { - this.pubsub.publish("call.error", requestId, { - requestId, - code, - message, - details, - }); - } - - abort(requestId: string): void { - const pending = this.requests.get(requestId); - if (pending) { - if (pending.timer) clearTimeout(pending.timer); - this.requests.delete(requestId); - pending.unsubscribe(); - this.pubsub.publish("call.aborted", requestId, { requestId }); - pending.reject(new CallError(CallErrorCode.ABORTED, `Request ${requestId} was aborted`)); - } else { - this.pubsub.publish("call.aborted", requestId, { requestId }); - } - } - - getPendingCount(): number { - return this.requests.size; - } -} \ No newline at end of file diff --git a/src/create_pubsub.ts b/src/create_pubsub.ts index 70868a8..3a52536 100644 --- a/src/create_pubsub.ts +++ b/src/create_pubsub.ts @@ -25,69 +25,62 @@ */ import { Repeater } from "./repeater.js"; -import type { TypedEventTarget, TypedEvent } from "./types.js"; +import type { TypedEventTarget, TypedEvent, EventEnvelope } from "./types.js"; -export type PubSubPublishArgsByKey = { - [key: string]: [] | [unknown] | [number | string, unknown]; +export type PubSubEventMap = { + [eventType: string]: unknown; }; export type PubSubEvent< - TPubSubPublishArgsByKey extends PubSubPublishArgsByKey, - TKey extends Extract, -> = TypedEvent< - TKey, - TPubSubPublishArgsByKey[TKey][1] extends undefined - ? TPubSubPublishArgsByKey[TKey][0] - : TPubSubPublishArgsByKey[TKey][1] ->; + TEventMap extends PubSubEventMap, + TType extends Extract = Extract, +> = TypedEvent>; -export type PubSubEventTarget = +export type PubSubEventTarget = TypedEventTarget< - PubSubEvent> + PubSubEvent >; -export type PubSubConfig = { - eventTarget?: PubSubEventTarget; +export type PubSubConfig = { + eventTarget?: PubSubEventTarget; }; -export type PubSub = { - publish>( - routingKey: TKey, - ...args: TPubSubPublishArgsByKey[TKey] +export type PubSub = { + publish>( + type: TType, + id: string, + payload: TEventMap[TType], ): void; - subscribe>( - ...[routingKey, id]: TPubSubPublishArgsByKey[TKey][1] extends undefined - ? [TKey] - : [TKey, TPubSubPublishArgsByKey[TKey][0]] - ): Repeater; + subscribe>( + type: TType, + id: string, + ): Repeater>; }; -export function createPubSub( - config?: PubSubConfig, -): PubSub { +export function createPubSub( + config?: PubSubConfig, +): PubSub { const target = - config?.eventTarget ?? (new EventTarget() as PubSubEventTarget); + config?.eventTarget ?? (new EventTarget() as PubSubEventTarget); return { - publish>( - routingKey: TKey, - ...args: TPubSubPublishArgsByKey[TKey] + publish>( + type: TType, + id: string, + payload: TEventMap[TType], ) { - const payload = args[1] ?? args[0] ?? null; - const topic = args[1] === undefined ? routingKey : `${routingKey}:${args[0] as number}`; - - const event = new CustomEvent(topic, { detail: payload }) as PubSubEvent< - TPubSubPublishArgsByKey, - TKey + const envelope: EventEnvelope = { type, id, payload }; + const event = new CustomEvent(type, { detail: envelope }) as PubSubEvent< + TEventMap, + TType >; target.dispatchEvent(event); }, - subscribe>( - ...[routingKey, id]: TPubSubPublishArgsByKey[TKey][1] extends undefined - ? [TKey] - : [TKey, TPubSubPublishArgsByKey[TKey][0]] - ): Repeater { - const topic: TKey = (id === undefined ? routingKey : `${routingKey}:${id as number}`) as TKey; + subscribe>( + type: TType, + id: string, + ): Repeater> { + const topic = `${type}:${id}`; return new Repeater(function subscriptionRepeater( next: (value: unknown) => Promise, @@ -98,11 +91,11 @@ export function createPubSub>; }, }; } \ No newline at end of file diff --git a/src/event-target-redis.ts b/src/event-target-redis.ts index de841a8..6f6b303 100644 --- a/src/event-target-redis.ts +++ b/src/event-target-redis.ts @@ -28,10 +28,11 @@ * - Uses our TypedEventTarget/TypedEvent types from types.ts * - Removed tslib dependency * - Uses ioredis types directly (already a dependency) + * - Serializes full EventEnvelope for cross-process transport */ import type { Cluster, Redis } from "ioredis"; -import type { TypedEventTarget, TypedEvent } from "./types.js"; +import type { TypedEventTarget, TypedEvent, EventEnvelope } from "./types.js"; export type CreateRedisEventTargetArgs = { publishClient: Redis | Cluster; @@ -57,8 +58,9 @@ export function createRedisEventTarget( return; } + const envelope = serializer.parse(message) as EventEnvelope; const event = new CustomEvent(channel, { - detail: message === "" ? null : serializer.parse(message), + detail: envelope, }) as TEvent; for (const callback of callbacks) { callback(event); @@ -102,7 +104,7 @@ export function createRedisEventTarget( dispatchEvent(event: TEvent) { publishClient.publish( event.type, - event.detail === undefined ? "" : serializer.stringify(event.detail), + serializer.stringify(event.detail), ); return true; }, diff --git a/src/index.ts b/src/index.ts index 9eddd8d..88596c1 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,4 +1,4 @@ -export { createPubSub, type PubSub, type PubSubConfig, type PubSubEvent, type PubSubEventTarget, type PubSubPublishArgsByKey } from "./create_pubsub.js"; -export { type TypedEvent, type TypedEventTarget, type TypedEventListener, type TypedEventListenerObject, type TypedEventListenerOrEventListenerObject } from "./types.js"; -export { filter, map, pipe } from "./operators.js"; +export { createPubSub, type PubSub, type PubSubConfig, type PubSubEvent, type PubSubEventTarget, type PubSubEventMap } from "./create_pubsub.js"; +export { type EventEnvelope, type TypedEvent, type TypedEventTarget, type TypedEventListener, type TypedEventListenerObject, type TypedEventListenerOrEventListenerObject } from "./types.js"; +export { filter, map, pipe, take, reduce, toArray, batch, dedupe, window, flat, groupBy, chain, join } from "./operators.js"; export { Repeater, RepeaterOverflowError, type Push, type Stop, type RepeaterExecutor, type RepeaterBuffer } from "./repeater.js"; \ No newline at end of file diff --git a/src/operators.ts b/src/operators.ts index aa60676..68a8376 100644 --- a/src/operators.ts +++ b/src/operators.ts @@ -64,4 +64,131 @@ export function pipe( ...fns: ((arg: unknown) => unknown)[] ): unknown { return fns.reduce((acc, fn) => fn(acc), a); +} + +export async function* take( + source: AsyncIterable, + count: number, +): AsyncIterable { + let i = 0; + for await (const value of source) { + if (i++ >= count) return; + yield value; + } +} + +export async function reduce( + source: AsyncIterable, + reducer: (accumulator: U, value: T) => Promise | U, + initialValue: U, +): Promise { + let accumulator = initialValue; + for await (const value of source) { + accumulator = await reducer(accumulator, value); + } + return accumulator; +} + +export async function toArray(source: AsyncIterable): Promise { + const result: T[] = []; + for await (const value of source) { + result.push(value); + } + return result; +} + +export async function* batch( + source: AsyncIterable, + size: number, +): AsyncIterable { + let current: T[] = []; + for await (const value of source) { + current.push(value); + if (current.length === size) { + yield current; + current = []; + } + } + if (current.length > 0) yield current; +} + +export async function* dedupe( + source: AsyncIterable, +): AsyncIterable { + const seen = new Set(); + for await (const value of source) { + if (!seen.has(value)) { + seen.add(value); + yield value; + } + } +} + +export async function* window( + source: AsyncIterable, + size: number, + step: number = 1, +): AsyncIterable { + const buffer: T[] = []; + for await (const value of source) { + buffer.push(value); + if (buffer.length === size) { + yield [...buffer]; + buffer.splice(0, step); + } + } +} + +export async function* flat( + source: AsyncIterable, +): AsyncIterable { + for await (const array of source) { + for (const value of array) { + yield value; + } + } +} + +export async function groupBy( + source: AsyncIterable, + keyFn: (value: T) => K, +): Promise> { + const groups = new Map(); + for await (const value of source) { + const key = keyFn(value); + if (!groups.has(key)) { + groups.set(key, []); + } + groups.get(key)!.push(value); + } + return groups; +} + +export async function* chain( + ...sources: AsyncIterable[] +): AsyncIterable { + for (const source of sources) { + for await (const value of source) { + yield value; + } + } +} + +export async function* join( + source1: AsyncIterable, + source2: AsyncIterable, + keyFn1: (value: T) => K, + keyFn2: (value: U) => K, +): AsyncIterable<[T, U]> { + const map2 = new Map(); + for await (const value of source2) { + const key = keyFn2(value); + map2.set(key, value); + } + for await (const value of source1) { + const key = keyFn1(value); + if (map2.has(key)) { + yield [value, map2.get(key)!]; + } + } } \ No newline at end of file diff --git a/src/types.ts b/src/types.ts index 5ec9010..a96412a 100644 --- a/src/types.ts +++ b/src/types.ts @@ -24,6 +24,12 @@ * SOFTWARE. */ +export interface EventEnvelope { + readonly type: TType; + readonly id: string; + readonly payload: TPayload; +} + export type TypedEvent = Omit< CustomEvent, "detail" | "type" diff --git a/tsup.config.ts b/tsup.config.ts index 2a75871..fd4ae46 100644 --- a/tsup.config.ts +++ b/tsup.config.ts @@ -4,7 +4,6 @@ export default defineConfig({ entry: [ 'src/index.ts', 'src/event-target-redis.ts', - 'src/call.ts', ], format: ['esm', 'cjs'], dts: true,