# @alkdev/pubsub Type-safe publish/subscribe with pluggable event target adapters. Transport layer only — no call protocol or coordination semantics. Every event is an `EventEnvelope` with `{ type, id, payload }`. Adapters implement the `TypedEventTarget` interface so you can swap transports without changing your subscribe logic. ## Install ```bash npm install @alkdev/pubsub ``` For Redis transport: ```bash npm install ioredis ``` WebSocket and Worker adapters use built-in APIs — no additional dependencies. ## Quick Start ### In-Process (default) ```ts import { createPubSub } from "@alkdev/pubsub"; type EventMap = { "user.created": { name: string }; "order.placed": { orderId: string }; }; const pubsub = createPubSub(); pubsub.subscribe("user.created", (_, payload) => { console.log(`New user: ${payload.name}`); }); pubsub.publish("user.created", "id-1", { name: "Alice" }); ``` ### Redis ```ts import { createPubSub } from "@alkdev/pubsub"; import { createRedisEventTarget } from "@alkdev/pubsub/event-target-redis"; import Redis from "ioredis"; const publishClient = new Redis(); const subscribeClient = new Redis(); const eventTarget = createRedisEventTarget({ publishClient, subscribeClient, }); const pubsub = createPubSub({ eventTarget }); ``` ### WebSocket Client (browser/Node) ```ts import { createPubSub } from "@alkdev/pubsub"; import { createWebSocketClientEventTarget } from "@alkdev/pubsub/event-target-websocket-client"; const ws = new WebSocket("ws://localhost:8080"); const eventTarget = createWebSocketClientEventTarget(ws); const pubsub = createPubSub({ eventTarget }); ``` ### WebSocket Server (Node) ```ts import { createWebSocketServerEventTarget } from "@alkdev/pubsub/event-target-websocket-server"; const server = createWebSocketServerEventTarget({ onConnection(spoke, ws) { /* new client connected */ }, onDisconnection(spoke, ws) { /* client disconnected */ }, maxBufferedAmount: 1_048_576, onBackpressure(ws, bufferedAmount) { /* optional backpressure signal */ }, }); // When a new WebSocket connects: server.addConnection(ws); // When it disconnects: server.removeConnection(ws); // Subscribe local handlers: server.addEventListener("user.created:id-1", (event) => { // event.detail is the EventEnvelope }); // Publish to subscribed connections: server.dispatchEvent(new CustomEvent("user.created:id-1", { detail: envelope })); ``` ### Worker (Host ↔ Thread) ```ts // Host (main thread) import { createWorkerHostEventTarget } from "@alkdev/pubsub/event-target-worker"; const worker = new Worker("./worker.js"); const eventTarget = createWorkerHostEventTarget(worker); ``` ```ts // Worker thread import { createWorkerThreadEventTarget } from "@alkdev/pubsub/event-target-worker"; const eventTarget = createWorkerThreadEventTarget(); // Must be called inside a Worker context — throws if globalThis.postMessage is unavailable ``` ## Lifecycle All transport adapters provide a `close()` method for graceful teardown: ```ts const eventTarget = createRedisEventTarget({ publishClient, subscribeClient }); // ... subscribe and publish ... eventTarget.close(); // unsubscribes all channels, removes listener, clears state ``` After `close()`: - `addEventListener`, `removeEventListener`, and `dispatchEvent` are no-ops - Intercepted handlers (`onmessage`, `onclose`) are restored to their originals - Subscriptions are cleaned up (Redis channels unsubscribed, WebSocket `__unsubscribe` sent) - The underlying transport (Redis connection, WebSocket, Worker) is **not** destroyed — the caller owns it `close()` is idempotent. Calling it multiple times is safe. ## Operators Operators transform `AsyncIterable` streams from `subscribe()`: ```ts import { pipe, filter, map, take, batch } from "@alkdev/pubsub"; const pubsub = createPubSub(); const stream = pubsub.subscribe("user.created"); for await (const event of pipe( stream, filter((e) => e.payload.name.startsWith("A")), map((e) => e.payload.name), take(5), )) { console.log(event); } ``` Available operators: `filter`, `map`, `pipe`, `take`, `reduce`, `toArray`, `batch`, `dedupe`, `window`, `flat`, `groupBy`, `chain`, `join`. ## EventEnvelope All events are serialized as `EventEnvelope`: ```ts interface EventEnvelope { type: TType; id: string; payload: TPayload; } ``` This is the cross-platform wire format. Adapters serialize/deserialize this automatically (JSON for Redis and WebSocket, structured clone for Worker). ## Subscription Control Protocol Event types starting with `__` are reserved for internal use. Adapters use `__subscribe` and `__unsubscribe` control events to manage topic subscriptions across connections. User code must not define event types with the `__` prefix. ## TypeScript Full type inference through `EventMap`: ```ts type EventMap = { "user.created": { name: string; role: string }; "order.placed": { orderId: string; total: number }; }; const pubsub = createPubSub(); pubsub.publish("user.created", "id-1", { name: "Alice", role: "admin" }); // ^ full type checking on payload ``` ## Exports | Import | Description | |--------|-------------| | `@alkdev/pubsub` | Core: `createPubSub`, `EventEnvelope`, `Repeater`, operators | | `@alkdev/pubsub/event-target-redis` | Redis adapter (peer dep: `ioredis`) | | `@alkdev/pubsub/event-target-websocket-client` | WebSocket client adapter | | `@alkdev/pubsub/event-target-websocket-server` | WebSocket server adapter | | `@alkdev/pubsub/event-target-worker` | Worker host + thread adapters | ## Upstream Attribution Core `createPubSub`, `TypedEventTarget`, and operators are adapted from [graphql-yoga](https://github.com/graphql-hive/graphql-yoga) (MIT). The `Repeater` class is inlined from [@repeaterjs/repeater](https://github.com/repeaterjs/repeater) (MIT). ## License Dual-licensed under [MIT](LICENSE-MIT) or [Apache-2.0](LICENSE-APACHE). Portions adapted from upstream projects retain their MIT attribution.