--- status: draft last_updated: 2026-05-18 --- # PubSub with Redis EventTarget ## Overview The pubsub system is a standalone npm package `@alkdev/pubsub`, adapted from `@graphql-yoga/subscription` (MIT). The Repeater is inlined (no external dependency). The critical design feature remains: `PubSubConfig.eventTarget` allows swapping the underlying transport, enabling single-process operation, cross-process Redis, hub-spoke WebSocket, or Worker communication — all behind the same `TypedEventTarget` interface. **Package**: `@alkdev/pubsub` (npm) ## How It Works `createPubSub` accepts a `PubSubEventMap` and optional `eventTarget` config: ```ts const pubsub = createPubSub(); pubsub.publish("myEvent", id, payload); for await (const event of pubsub.subscribe("myEvent")) { // event is EventEnvelope // event.type === "myEvent", event.id === id, event.payload === payload } ``` `PubSubEventMap` is a simple `{ [eventType: string]: payload }` map. `publish(type, id, payload)` always takes 3 explicit args. Subscribe returns `Repeater`. Topics are scoped by `id` — `publish("myEvent", id, payload)` publishes to topic `myEvent:id`, and `subscribe("myEvent", id)` subscribes to that scoped topic only. Default transport: in-process `EventTarget` — single-process only. Events are `CustomEvent` instances dispatched via `addEventListener`/`dispatchEvent`. ## Operators 13 operators available for stream transformation: `filter`, `map`, `pipe`, `take`, `reduce`, `toArray`, `batch`, `dedupe`, `window`, `flat`, `groupBy`, `chain`, `join` ## Transport Options | Transport | EventTarget | Status | Use case | |-----------|------------|--------|----------| | In-process | `new EventTarget()` (default) | Implemented | Single-process hub, testing | | Redis | `createRedisEventTarget(...)` | Implemented | Cross-process events, multi-hub | | WebSocket (client) | `createWebSocketClientEventTarget(ws)` | Implemented | Spoke-side transport | | WebSocket (server) | `createWebSocketServerEventTarget(...)` | Implemented | Hub-side transport, connection management | | Worker (host) | `createWorkerHostEventTarget(worker)` | Implemented | Host→thread communication | | Worker (thread) | `createWorkerThreadEventTarget()` | Implemented | Thread→host communication | Usage: ```ts // In-process (default) const pubsub = createPubSub(); // Redis const pubsub = createPubSub({ eventTarget: createRedisEventTarget({ publishClient, subscribeClient, prefix: "alk:events:" }), }); // Graceful shutdown await redisET.close(); ``` ## Redis EventTarget Implemented in `@alkdev/pubsub`. Forked from `@graphql-yoga/redis-event-target` (MIT). ### `createRedisEventTarget` ```ts function createRedisEventTarget( args: CreateRedisEventTargetArgs ): TypedEventTarget & { close(): Promise } ``` ### `CreateRedisEventTargetArgs` | Field | Type | Required | Description | |-------|------|----------|-------------| | `publishClient` | `Redis \| Cluster` | Yes | ioredis client for publishing. Can share a connection with other Redis operations. | | `subscribeClient` | `Redis \| Cluster` | Yes | ioredis client for subscribing. Must be a dedicated connection — Redis requires subscriber connections to only receive messages. | | `serializer` | `{ stringify, parse }` | No | Custom serializer. Defaults to `JSON`. Use this for protocols that need different encoding (e.g., MessagePack). | | `prefix` | `string` | No | Redis channel prefix. Default: `""`. Use `"alk:events:"` for namespace isolation. | ### Channel Naming Set `prefix: "alk:events:"` in `createRedisEventTarget` to namespace Redis channels. Events publish to channels like `alk:events:session.status:projectId`. ### Serialization Events must be JSON-serializable since Redis is a network service. `CustomEvent.detail` must not contain functions, circular references, or non-serializable values. This is already the case for call protocol event types (all are TypeBox-validated plain objects). The `serializer` option on `CreateRedisEventTargetArgs` allows overriding the default `JSON` serialization. ## TypedEventTarget Interface Canonical types at `@alkdev/pubsub`. Adapted from `@graphql-yoga/typed-event-target` (MIT). | Export | Description | |--------|-------------| | `TypedEvent` | Event type with typed `type` and `detail` fields. Omits `CustomEvent`'s untyped `detail`/`type` and replaces them. | | `TypedEventListener` | `(evt: TEvent) => void` | | `TypedEventListenerObject` | `{ handleEvent(object: TEvent): void }` | | `TypedEventListenerOrEventListenerObject` | Union of the above two | | `TypedEventTarget` | Extends `EventTarget`. Typed `addEventListener`, `dispatchEvent`, and `removeEventListener` that constrain event types to `TEvent`. | All transports (in-process, Redis, WebSocket, Worker) implement this same interface, making them interchangeable at the `createPubSub` config level. ## WebSocket Event Targets Implemented in `@alkdev/pubsub`. Two adapters for bidirectional hub↔spoke communication: ### Client-side (`@alkdev/pubsub/event-target-websocket-client`) `createWebSocketClientEventTarget(ws)` — wraps a `WebSocket`. Sends `__subscribe`/`__unsubscribe` control messages (reserved `__` prefix). Used by spokes to connect to the hub. ### Server-side (`@alkdev/pubsub/event-target-websocket-server`) `createWebSocketServerEventTarget(args?)` — manages multiple WebSocket connections. Key methods: - `addConnection(ws)` / `removeConnection(ws)` — connection lifecycle - `onConnection` / `onDisconnection` callbacks - Per-connection `SpokeEventTarget` for individual spoke dispatch - Backpressure handling for slow consumers ## Worker Event Targets For Web Worker (or Deno Worker) communication: - `createWorkerHostEventTarget(worker)` — host side, wraps a `Worker` - `createWorkerThreadEventTarget()` — thread side, uses `globalThis.postMessage`/`onmessage` Both implement `TypedEventTarget` with `close()` for cleanup. ## EventEnvelope All cross-process events use `EventEnvelope` as the wire format: ```ts interface EventEnvelope { readonly type: string // event type readonly id: string // topic/correlation ID readonly payload: T // event data } ``` Types starting with `__` are reserved for adapter control messages (e.g., `__subscribe`, `__unsubscribe` for WebSocket adapter). ## Filtering Strategy OpenCode's problem: every SSE client receives ALL events for a project. With Redis, we scope channels: ``` alk:events:session.status:{projectId} — only session status for one project alk:events:message.updated:{sessionId} — only message updates for one session alk:events:runner.dispatch:{runnerId} — only dispatch for one runner ``` The hub's SSE endpoint subscribes to the channels relevant to each connected client and relays events. No firehose. ## What This Replaces in OpenCode | OpenCode | alk.dev | | ------------------------------------------------- | -------------------------------------------------- | | Effect `PubSub` per instance (in-memory) | `createPubSub({ eventTarget: createRedisEventTarget(...) })` | | `GlobalBus` (Node EventEmitter, single-process) | Redis channel `alk:events:*` | | SSE `/event` (all events for one project) | Redis subscription filtered by project | | SSE `/global/event` (all events for all projects) | Redis subscription optionally unfiltered | | `Bus.subscribeAll()` (zero filtering) | `pubsub.subscribe("eventType")` with Redis scoping | ## Prior Art The pubsub system was originally adapted from `@graphql-yoga/subscription` and `@graphql-yoga/typed-event-target`. It has been extracted into `@alkdev/pubsub` as a standalone package with: - Simplified API (`PubSubEventMap` replacing `PubSubPublishArgsByKey`) - Inlined Repeater (no external dependency) - 4 new event target adapters (WebSocket client/server, Worker host/thread) - 10 new operators - `EventEnvelope` as universal cross-process message format - `prefix` and `close()` on Redis adapter