Copy architecture docs, ADRs, storage domain specs, research, reviews, and 56 storage architecture tasks from the alkhub_ts monorepo. Adapt for standalone @alkdev/hub repo structure (src/ not packages/hub/). Sanitize all sensitive information: - Replace private IPs (10.0.0.1) with localhost defaults - Remove internal server hostnames (dev1, ns528096) - Replace /workspace/ private paths with npm package references - Remove hardcoded credentials from examples - Rewrite infrastructure.md without private network details Add Deno project scaffolding: deno.json (pinned deps), .gitignore, AGENTS.md, entry point. Migrate existing code stubs (crypto, config types, logger) with updated import paths.
8.1 KiB
status, last_updated
| status | last_updated |
|---|---|
| draft | 2026-05-18 |
PubSub with Redis EventTarget
Overview
The pubsub system is a standalone npm package @alkdev/pubsub, adapted from @graphql-yoga/subscription (MIT). The Repeater is inlined (no external dependency). The critical design feature remains: PubSubConfig.eventTarget allows swapping the underlying transport, enabling single-process operation, cross-process Redis, hub-spoke WebSocket, or Worker communication — all behind the same TypedEventTarget interface.
Package: @alkdev/pubsub (npm)
How It Works
createPubSub accepts a PubSubEventMap and optional eventTarget config:
const pubsub = createPubSub<MyEventMap>();
pubsub.publish("myEvent", id, payload);
for await (const event of pubsub.subscribe("myEvent")) {
// event is EventEnvelope<MyEventMap["myEvent"]>
// event.type === "myEvent", event.id === id, event.payload === payload
}
PubSubEventMap is a simple { [eventType: string]: payload } map. publish(type, id, payload) always takes 3 explicit args. Subscribe returns Repeater<EventEnvelope>. Topics are scoped by id — publish("myEvent", id, payload) publishes to topic myEvent:id, and subscribe("myEvent", id) subscribes to that scoped topic only.
Default transport: in-process EventTarget — single-process only. Events are CustomEvent instances dispatched via addEventListener/dispatchEvent.
Operators
13 operators available for stream transformation:
filter, map, pipe, take, reduce, toArray, batch, dedupe, window, flat, groupBy, chain, join
Transport Options
| Transport | EventTarget | Status | Use case |
|---|---|---|---|
| In-process | new EventTarget() (default) |
Implemented | Single-process hub, testing |
| Redis | createRedisEventTarget(...) |
Implemented | Cross-process events, multi-hub |
| WebSocket (client) | createWebSocketClientEventTarget(ws) |
Implemented | Spoke-side transport |
| WebSocket (server) | createWebSocketServerEventTarget(...) |
Implemented | Hub-side transport, connection management |
| Worker (host) | createWorkerHostEventTarget(worker) |
Implemented | Host→thread communication |
| Worker (thread) | createWorkerThreadEventTarget() |
Implemented | Thread→host communication |
Usage:
// In-process (default)
const pubsub = createPubSub<MyEventMap>();
// Redis
const pubsub = createPubSub<MyEventMap>({
eventTarget: createRedisEventTarget({
publishClient,
subscribeClient,
prefix: "alk:events:"
}),
});
// Graceful shutdown
await redisET.close();
Redis EventTarget
Implemented in @alkdev/pubsub. Forked from @graphql-yoga/redis-event-target (MIT).
createRedisEventTarget
function createRedisEventTarget<TEvent extends TypedEvent>(
args: CreateRedisEventTargetArgs
): TypedEventTarget<TEvent> & { close(): Promise<void> }
CreateRedisEventTargetArgs
| Field | Type | Required | Description |
|---|---|---|---|
publishClient |
Redis | Cluster |
Yes | ioredis client for publishing. Can share a connection with other Redis operations. |
subscribeClient |
Redis | Cluster |
Yes | ioredis client for subscribing. Must be a dedicated connection — Redis requires subscriber connections to only receive messages. |
serializer |
{ stringify, parse } |
No | Custom serializer. Defaults to JSON. Use this for protocols that need different encoding (e.g., MessagePack). |
prefix |
string |
No | Redis channel prefix. Default: "". Use "alk:events:" for namespace isolation. |
Channel Naming
Set prefix: "alk:events:" in createRedisEventTarget to namespace Redis channels. Events publish to channels like alk:events:session.status:projectId.
Serialization
Events must be JSON-serializable since Redis is a network service. CustomEvent.detail must not contain functions, circular references, or non-serializable values. This is already the case for call protocol event types (all are TypeBox-validated plain objects). The serializer option on CreateRedisEventTargetArgs allows overriding the default JSON serialization.
TypedEventTarget Interface
Canonical types at @alkdev/pubsub. Adapted from @graphql-yoga/typed-event-target (MIT).
| Export | Description |
|---|---|
TypedEvent<TType, TDetail> |
Event type with typed type and detail fields. Omits CustomEvent's untyped detail/type and replaces them. |
TypedEventListener<TEvent> |
(evt: TEvent) => void |
TypedEventListenerObject<TEvent> |
{ handleEvent(object: TEvent): void } |
TypedEventListenerOrEventListenerObject<TEvent> |
Union of the above two |
TypedEventTarget<TEvent> |
Extends EventTarget. Typed addEventListener, dispatchEvent, and removeEventListener that constrain event types to TEvent. |
All transports (in-process, Redis, WebSocket, Worker) implement this same interface, making them interchangeable at the createPubSub config level.
WebSocket Event Targets
Implemented in @alkdev/pubsub. Two adapters for bidirectional hub↔spoke communication:
Client-side (@alkdev/pubsub/event-target-websocket-client)
createWebSocketClientEventTarget(ws) — wraps a WebSocket. Sends __subscribe/__unsubscribe control messages (reserved __ prefix). Used by spokes to connect to the hub.
Server-side (@alkdev/pubsub/event-target-websocket-server)
createWebSocketServerEventTarget(args?) — manages multiple WebSocket connections. Key methods:
addConnection(ws)/removeConnection(ws)— connection lifecycleonConnection/onDisconnectioncallbacks- Per-connection
SpokeEventTargetfor individual spoke dispatch - Backpressure handling for slow consumers
Worker Event Targets
For Web Worker (or Deno Worker) communication:
createWorkerHostEventTarget(worker)— host side, wraps aWorkercreateWorkerThreadEventTarget()— thread side, usesglobalThis.postMessage/onmessage
Both implement TypedEventTarget with close() for cleanup.
EventEnvelope
All cross-process events use EventEnvelope<T> as the wire format:
interface EventEnvelope<T> {
readonly type: string // event type
readonly id: string // topic/correlation ID
readonly payload: T // event data
}
Types starting with __ are reserved for adapter control messages (e.g., __subscribe, __unsubscribe for WebSocket adapter).
Filtering Strategy
OpenCode's problem: every SSE client receives ALL events for a project. With Redis, we scope channels:
alk:events:session.status:{projectId} — only session status for one project
alk:events:message.updated:{sessionId} — only message updates for one session
alk:events:runner.dispatch:{runnerId} — only dispatch for one runner
The hub's SSE endpoint subscribes to the channels relevant to each connected client and relays events. No firehose.
What This Replaces in OpenCode
| OpenCode | alk.dev |
|---|---|
Effect PubSub per instance (in-memory) |
createPubSub({ eventTarget: createRedisEventTarget(...) }) |
GlobalBus (Node EventEmitter, single-process) |
Redis channel alk:events:* |
SSE /event (all events for one project) |
Redis subscription filtered by project |
SSE /global/event (all events for all projects) |
Redis subscription optionally unfiltered |
Bus.subscribeAll() (zero filtering) |
pubsub.subscribe("eventType") with Redis scoping |
Prior Art
The pubsub system was originally adapted from @graphql-yoga/subscription and @graphql-yoga/typed-event-target. It has been extracted into @alkdev/pubsub as a standalone package with:
- Simplified API (
PubSubEventMapreplacingPubSubPublishArgsByKey) - Inlined Repeater (no external dependency)
- 4 new event target adapters (WebSocket client/server, Worker host/thread)
- 10 new operators
EventEnvelopeas universal cross-process message formatprefixandclose()on Redis adapter