Files
hub/docs/architecture/pubsub-redis.md
glm-5.1 2b63cda1c7 Setup repo: migrate architecture specs, code stubs, and tasks from alkhub_ts
Copy architecture docs, ADRs, storage domain specs, research, reviews,
and 56 storage architecture tasks from the alkhub_ts monorepo. Adapt for
standalone @alkdev/hub repo structure (src/ not packages/hub/).

Sanitize all sensitive information:
- Replace private IPs (10.0.0.1) with localhost defaults
- Remove internal server hostnames (dev1, ns528096)
- Replace /workspace/ private paths with npm package references
- Remove hardcoded credentials from examples
- Rewrite infrastructure.md without private network details

Add Deno project scaffolding: deno.json (pinned deps), .gitignore,
AGENTS.md, entry point. Migrate existing code stubs (crypto, config
types, logger) with updated import paths.
2026-05-25 10:56:32 +00:00

8.1 KiB

status, last_updated
status last_updated
draft 2026-05-18

PubSub with Redis EventTarget

Overview

The pubsub system is a standalone npm package @alkdev/pubsub, adapted from @graphql-yoga/subscription (MIT). The Repeater is inlined (no external dependency). The critical design feature remains: PubSubConfig.eventTarget allows swapping the underlying transport, enabling single-process operation, cross-process Redis, hub-spoke WebSocket, or Worker communication — all behind the same TypedEventTarget interface.

Package: @alkdev/pubsub (npm)

How It Works

createPubSub accepts a PubSubEventMap and optional eventTarget config:

const pubsub = createPubSub<MyEventMap>();

pubsub.publish("myEvent", id, payload);

for await (const event of pubsub.subscribe("myEvent")) {
  // event is EventEnvelope<MyEventMap["myEvent"]>
  // event.type === "myEvent", event.id === id, event.payload === payload
}

PubSubEventMap is a simple { [eventType: string]: payload } map. publish(type, id, payload) always takes 3 explicit args. Subscribe returns Repeater<EventEnvelope>. Topics are scoped by idpublish("myEvent", id, payload) publishes to topic myEvent:id, and subscribe("myEvent", id) subscribes to that scoped topic only.

Default transport: in-process EventTarget — single-process only. Events are CustomEvent instances dispatched via addEventListener/dispatchEvent.

Operators

13 operators available for stream transformation:

filter, map, pipe, take, reduce, toArray, batch, dedupe, window, flat, groupBy, chain, join

Transport Options

Transport EventTarget Status Use case
In-process new EventTarget() (default) Implemented Single-process hub, testing
Redis createRedisEventTarget(...) Implemented Cross-process events, multi-hub
WebSocket (client) createWebSocketClientEventTarget(ws) Implemented Spoke-side transport
WebSocket (server) createWebSocketServerEventTarget(...) Implemented Hub-side transport, connection management
Worker (host) createWorkerHostEventTarget(worker) Implemented Host→thread communication
Worker (thread) createWorkerThreadEventTarget() Implemented Thread→host communication

Usage:

// In-process (default)
const pubsub = createPubSub<MyEventMap>();

// Redis
const pubsub = createPubSub<MyEventMap>({
  eventTarget: createRedisEventTarget({
    publishClient,
    subscribeClient,
    prefix: "alk:events:"
  }),
});
// Graceful shutdown
await redisET.close();

Redis EventTarget

Implemented in @alkdev/pubsub. Forked from @graphql-yoga/redis-event-target (MIT).

createRedisEventTarget

function createRedisEventTarget<TEvent extends TypedEvent>(
  args: CreateRedisEventTargetArgs
): TypedEventTarget<TEvent> & { close(): Promise<void> }

CreateRedisEventTargetArgs

Field Type Required Description
publishClient Redis | Cluster Yes ioredis client for publishing. Can share a connection with other Redis operations.
subscribeClient Redis | Cluster Yes ioredis client for subscribing. Must be a dedicated connection — Redis requires subscriber connections to only receive messages.
serializer { stringify, parse } No Custom serializer. Defaults to JSON. Use this for protocols that need different encoding (e.g., MessagePack).
prefix string No Redis channel prefix. Default: "". Use "alk:events:" for namespace isolation.

Channel Naming

Set prefix: "alk:events:" in createRedisEventTarget to namespace Redis channels. Events publish to channels like alk:events:session.status:projectId.

Serialization

Events must be JSON-serializable since Redis is a network service. CustomEvent.detail must not contain functions, circular references, or non-serializable values. This is already the case for call protocol event types (all are TypeBox-validated plain objects). The serializer option on CreateRedisEventTargetArgs allows overriding the default JSON serialization.

TypedEventTarget Interface

Canonical types at @alkdev/pubsub. Adapted from @graphql-yoga/typed-event-target (MIT).

Export Description
TypedEvent<TType, TDetail> Event type with typed type and detail fields. Omits CustomEvent's untyped detail/type and replaces them.
TypedEventListener<TEvent> (evt: TEvent) => void
TypedEventListenerObject<TEvent> { handleEvent(object: TEvent): void }
TypedEventListenerOrEventListenerObject<TEvent> Union of the above two
TypedEventTarget<TEvent> Extends EventTarget. Typed addEventListener, dispatchEvent, and removeEventListener that constrain event types to TEvent.

All transports (in-process, Redis, WebSocket, Worker) implement this same interface, making them interchangeable at the createPubSub config level.

WebSocket Event Targets

Implemented in @alkdev/pubsub. Two adapters for bidirectional hub↔spoke communication:

Client-side (@alkdev/pubsub/event-target-websocket-client)

createWebSocketClientEventTarget(ws) — wraps a WebSocket. Sends __subscribe/__unsubscribe control messages (reserved __ prefix). Used by spokes to connect to the hub.

Server-side (@alkdev/pubsub/event-target-websocket-server)

createWebSocketServerEventTarget(args?) — manages multiple WebSocket connections. Key methods:

  • addConnection(ws) / removeConnection(ws) — connection lifecycle
  • onConnection / onDisconnection callbacks
  • Per-connection SpokeEventTarget for individual spoke dispatch
  • Backpressure handling for slow consumers

Worker Event Targets

For Web Worker (or Deno Worker) communication:

  • createWorkerHostEventTarget(worker) — host side, wraps a Worker
  • createWorkerThreadEventTarget() — thread side, uses globalThis.postMessage/onmessage

Both implement TypedEventTarget with close() for cleanup.

EventEnvelope

All cross-process events use EventEnvelope<T> as the wire format:

interface EventEnvelope<T> {
  readonly type: string   // event type
  readonly id: string     // topic/correlation ID
  readonly payload: T     // event data
}

Types starting with __ are reserved for adapter control messages (e.g., __subscribe, __unsubscribe for WebSocket adapter).

Filtering Strategy

OpenCode's problem: every SSE client receives ALL events for a project. With Redis, we scope channels:

alk:events:session.status:{projectId}     — only session status for one project
alk:events:message.updated:{sessionId}    — only message updates for one session
alk:events:runner.dispatch:{runnerId}     — only dispatch for one runner

The hub's SSE endpoint subscribes to the channels relevant to each connected client and relays events. No firehose.

What This Replaces in OpenCode

OpenCode alk.dev
Effect PubSub per instance (in-memory) createPubSub({ eventTarget: createRedisEventTarget(...) })
GlobalBus (Node EventEmitter, single-process) Redis channel alk:events:*
SSE /event (all events for one project) Redis subscription filtered by project
SSE /global/event (all events for all projects) Redis subscription optionally unfiltered
Bus.subscribeAll() (zero filtering) pubsub.subscribe("eventType") with Redis scoping

Prior Art

The pubsub system was originally adapted from @graphql-yoga/subscription and @graphql-yoga/typed-event-target. It has been extracted into @alkdev/pubsub as a standalone package with:

  • Simplified API (PubSubEventMap replacing PubSubPublishArgsByKey)
  • Inlined Repeater (no external dependency)
  • 4 new event target adapters (WebSocket client/server, Worker host/thread)
  • 10 new operators
  • EventEnvelope as universal cross-process message format
  • prefix and close() on Redis adapter