glm-5.1 b3f598dffd docs: add LICENSE, README, CHANGELOG, and license headers for npm publish
- Add combined MIT + Apache-2.0 LICENSE file (Copyright 2026 alk.dev)
- Add README.md with quick start for each adapter, lifecycle/close docs,
  operators, EventEnvelope, TypeScript, exports table, upstream attribution
- Add CHANGELOG.md with 0.1.0 entry
- Add SPDX license headers to original source files (WS client, WS server,
  Worker, index barrel)
- Update package.json: add author, repository, homepage, bugs; include
  CHANGELOG.md in files; update description
2026-05-08 16:48:33 +00:00
2026-05-07 15:16:07 +00:00

@alkdev/pubsub

Type-safe publish/subscribe with pluggable event target adapters. Transport layer only — no call protocol or coordination semantics.

Every event is an EventEnvelope<TType, TPayload> with { type, id, payload }. Adapters implement the TypedEventTarget interface so you can swap transports without changing your subscribe logic.

Install

npm install @alkdev/pubsub

For Redis transport:

npm install ioredis

WebSocket and Worker adapters use built-in APIs — no additional dependencies.

Quick Start

In-Process (default)

import { createPubSub } from "@alkdev/pubsub";

type EventMap = {
  "user.created": { name: string };
  "order.placed": { orderId: string };
};

const pubsub = createPubSub<EventMap>();

pubsub.subscribe("user.created", (_, payload) => {
  console.log(`New user: ${payload.name}`);
});

pubsub.publish("user.created", "id-1", { name: "Alice" });

Redis

import { createPubSub } from "@alkdev/pubsub";
import { createRedisEventTarget } from "@alkdev/pubsub/event-target-redis";
import Redis from "ioredis";

const publishClient = new Redis();
const subscribeClient = new Redis();

const eventTarget = createRedisEventTarget({
  publishClient,
  subscribeClient,
});

const pubsub = createPubSub({ eventTarget });

WebSocket Client (browser/Node)

import { createPubSub } from "@alkdev/pubsub";
import { createWebSocketClientEventTarget } from "@alkdev/pubsub/event-target-websocket-client";

const ws = new WebSocket("ws://localhost:8080");
const eventTarget = createWebSocketClientEventTarget(ws);

const pubsub = createPubSub({ eventTarget });

WebSocket Server (Node)

import { createWebSocketServerEventTarget } from "@alkdev/pubsub/event-target-websocket-server";

const server = createWebSocketServerEventTarget({
  onConnection(spoke, ws) { /* new client connected */ },
  onDisconnection(spoke, ws) { /* client disconnected */ },
  maxBufferedAmount: 1_048_576,
  onBackpressure(ws, bufferedAmount) { /* optional backpressure signal */ },
});

// When a new WebSocket connects:
server.addConnection(ws);

// When it disconnects:
server.removeConnection(ws);

// Subscribe local handlers:
server.addEventListener("user.created:id-1", (event) => {
  // event.detail is the EventEnvelope
});

// Publish to subscribed connections:
server.dispatchEvent(new CustomEvent("user.created:id-1", { detail: envelope }));

Worker (Host ↔ Thread)

// Host (main thread)
import { createWorkerHostEventTarget } from "@alkdev/pubsub/event-target-worker";

const worker = new Worker("./worker.js");
const eventTarget = createWorkerHostEventTarget(worker);
// Worker thread
import { createWorkerThreadEventTarget } from "@alkdev/pubsub/event-target-worker";

const eventTarget = createWorkerThreadEventTarget();
// Must be called inside a Worker context — throws if globalThis.postMessage is unavailable

Lifecycle

All transport adapters provide a close() method for graceful teardown:

const eventTarget = createRedisEventTarget({ publishClient, subscribeClient });
// ... subscribe and publish ...

eventTarget.close(); // unsubscribes all channels, removes listener, clears state

After close():

  • addEventListener, removeEventListener, and dispatchEvent are no-ops
  • Intercepted handlers (onmessage, onclose) are restored to their originals
  • Subscriptions are cleaned up (Redis channels unsubscribed, WebSocket __unsubscribe sent)
  • The underlying transport (Redis connection, WebSocket, Worker) is not destroyed — the caller owns it

close() is idempotent. Calling it multiple times is safe.

Operators

Operators transform AsyncIterable streams from subscribe():

import { pipe, filter, map, take, batch } from "@alkdev/pubsub";

const pubsub = createPubSub<EventMap>();

const stream = pubsub.subscribe("user.created");

for await (const event of pipe(
  stream,
  filter((e) => e.payload.name.startsWith("A")),
  map((e) => e.payload.name),
  take(5),
)) {
  console.log(event);
}

Available operators: filter, map, pipe, take, reduce, toArray, batch, dedupe, window, flat, groupBy, chain, join.

EventEnvelope

All events are serialized as EventEnvelope:

interface EventEnvelope<TType = string, TPayload = unknown> {
  type: TType;
  id: string;
  payload: TPayload;
}

This is the cross-platform wire format. Adapters serialize/deserialize this automatically (JSON for Redis and WebSocket, structured clone for Worker).

Subscription Control Protocol

Event types starting with __ are reserved for internal use. Adapters use __subscribe and __unsubscribe control events to manage topic subscriptions across connections. User code must not define event types with the __ prefix.

TypeScript

Full type inference through EventMap:

type EventMap = {
  "user.created": { name: string; role: string };
  "order.placed": { orderId: string; total: number };
};

const pubsub = createPubSub<EventMap>();

pubsub.publish("user.created", "id-1", { name: "Alice", role: "admin" });
//                                               ^ full type checking on payload

Exports

Import Description
@alkdev/pubsub Core: createPubSub, EventEnvelope, Repeater, operators
@alkdev/pubsub/event-target-redis Redis adapter (peer dep: ioredis)
@alkdev/pubsub/event-target-websocket-client WebSocket client adapter
@alkdev/pubsub/event-target-websocket-server WebSocket server adapter
@alkdev/pubsub/event-target-worker Worker host + thread adapters

Upstream Attribution

Core createPubSub, TypedEventTarget, and operators are adapted from graphql-yoga (MIT). The Repeater class is inlined from @repeaterjs/repeater (MIT).

License

Dual-licensed under MIT or Apache-2.0. Portions adapted from upstream projects retain their MIT attribution.

Description
No description provided
Readme MIT 447 KiB
Languages
TypeScript 100%