feat(redis): add channel prefix and error handling

This commit is contained in:
2026-05-08 06:33:15 +00:00
parent 7c12b40ed2
commit 392682c7be
6 changed files with 258 additions and 28 deletions

View File

@@ -41,6 +41,7 @@ export type CreateRedisEventTargetArgs = {
stringify: (message: unknown) => string;
parse: (message: string) => unknown;
};
prefix?: string;
};
export function createRedisEventTarget<TEvent extends TypedEvent>(
@@ -49,6 +50,7 @@ export function createRedisEventTarget<TEvent extends TypedEvent>(
const { publishClient, subscribeClient } = args;
const serializer = args.serializer ?? JSON;
const prefix = args.prefix ?? "";
const callbacksForTopic = new Map<string, Set<EventListener>>();
@@ -58,7 +60,15 @@ export function createRedisEventTarget<TEvent extends TypedEvent>(
return;
}
const envelope = serializer.parse(message) as EventEnvelope;
let envelope: EventEnvelope;
try {
envelope = serializer.parse(message) as EventEnvelope;
} catch {
console.warn(
`Failed to parse message on channel "${channel}": ${message}`,
);
return;
}
const event = new CustomEvent(channel, {
detail: envelope,
}) as TEvent;
@@ -70,18 +80,20 @@ export function createRedisEventTarget<TEvent extends TypedEvent>(
(subscribeClient as Redis).on("message", onMessage);
function addCallback(topic: string, callback: EventListener) {
let callbacks = callbacksForTopic.get(topic);
const prefixedTopic = prefix + topic;
let callbacks = callbacksForTopic.get(prefixedTopic);
if (callbacks === undefined) {
callbacks = new Set();
callbacksForTopic.set(topic, callbacks);
callbacksForTopic.set(prefixedTopic, callbacks);
subscribeClient.subscribe(topic);
subscribeClient.subscribe(prefixedTopic);
}
callbacks.add(callback);
}
function removeCallback(topic: string, callback: EventListener) {
const callbacks = callbacksForTopic.get(topic);
const prefixedTopic = prefix + topic;
const callbacks = callbacksForTopic.get(prefixedTopic);
if (callbacks === undefined) {
return;
}
@@ -89,8 +101,8 @@ export function createRedisEventTarget<TEvent extends TypedEvent>(
if (callbacks.size > 0) {
return;
}
callbacksForTopic.delete(topic);
subscribeClient.unsubscribe(topic);
callbacksForTopic.delete(prefixedTopic);
subscribeClient.unsubscribe(prefixedTopic);
}
return {
@@ -103,7 +115,7 @@ export function createRedisEventTarget<TEvent extends TypedEvent>(
},
dispatchEvent(event: TEvent) {
publishClient.publish(
event.type,
prefix + event.type,
serializer.stringify(event.detail),
);
return true;

View File

@@ -1,7 +1,7 @@
---
id: core-pubsub-tests
name: Write tests for createPubSub, EventEnvelope, and in-process event target
status: pending
status: completed
depends_on: []
scope: moderate
risk: low
@@ -43,8 +43,11 @@ The architecture specifies these behaviors:
## Notes
> To be filled by implementation agent
Used subscribe-based testing approach since `createPubSub` doesn't expose the internal `target`. Custom eventTarget tests use `vi.spyOn` on the provided target's `dispatchEvent` method.
## Summary
> To be filled on completion
Implemented comprehensive tests for createPubSub, EventEnvelope, and in-process event target.
- Created: test/create_pubsub.test.ts
- Tests: 11, all passing
- Coverage: publish dispatches correct type:id topic, publish throws on __-prefixed types, subscribe yields EventEnvelope objects, envelope has correct type/id/payload, topic scoping (type:id matching), multiple subscribers receive events, subscriber cleanup on break, custom eventTarget dispatches to provided target, default in-process EventTarget works

View File

@@ -1,7 +1,7 @@
---
id: core-operators-tests
name: Write tests for all stream operators
status: pending
status: completed
depends_on: []
scope: moderate
risk: low
@@ -46,4 +46,7 @@ The operators are adapted from graphql-yoga (`filter`, `map`, `pipe`) and added
## Summary
> To be filled on completion
Implemented comprehensive tests for all 13 stream operators.
- Created: `test/operators.test.ts` (53 tests)
- Tests cover: filter (5 tests including type-narrowing & async), map (3 tests including async), pipe (6 tests including compose with subscribe), take (4), reduce (4), toArray (3), batch (5), dedupe (4), window (5), flat (3), groupBy (3), chain (4), join (4)
- All 53 tests passing, build and lint pass

View File

@@ -1,7 +1,7 @@
---
id: redis-adapter-tests
name: Write tests for Redis event target adapter
status: pending
status: completed
depends_on: [core-pubsub-tests]
scope: moderate
risk: medium
@@ -26,15 +26,15 @@ Note: test 7 (error propagation) is partially blocked by the missing error handl
## Acceptance Criteria
- [ ] `test/event-target-redis.test.ts` exists and passes
- [ ] Test approach for Redis dependency is decided (mock, container, or ioredis-mock)
- [ ] Test: dispatchEvent publishes correct channel and serialized envelope
- [ ] Test: addEventListener subscribes to Redis and dispatches to local listeners
- [ ] Test: removeEventListener unsubscribes from Redis when no listeners remain
- [ ] Test: type:id topic scoping works correctly
- [ ] Test: EventEnvelope round-trips through JSON serialization
- [ ] Test: multiple listeners on same topic result in single Redis subscribe
- [ ] Test: custom serializer option works
- [x] `test/event-target-redis.test.ts` exists and passes
- [x] Test approach for Redis dependency is decided (mock, container, or ioredis-mock)
- [x] Test: dispatchEvent publishes correct channel and serialized envelope
- [x] Test: addEventListener subscribes to Redis and dispatches to local listeners
- [x] Test: removeEventListener unsubscribes from Redis when no listeners remain
- [x] Test: type:id topic scoping works correctly
- [x] Test: EventEnvelope round-trips through JSON serialization
- [x] Test: multiple listeners on same topic result in single Redis subscribe
- [x] Test: custom serializer option works
## References
@@ -43,8 +43,14 @@ Note: test 7 (error propagation) is partially blocked by the missing error handl
## Notes
> To be filled by implementation agent
Test approach: Manual Redis mock (no external dependencies). The `createMockRedis()` helper creates a lightweight mock that tracks `publish`/`subscribe`/`unsubscribe` calls and provides `simulateMessage()` to trigger the `message` event handler. This avoids needing `ioredis-mock` or a running Redis instance.
Test 7 (error propagation) is not included because the current implementation lacks error handling — tracked in `redis-channel-prefix-and-error-handling` task.
## Summary
> To be filled on completion
Implemented comprehensive tests for the Redis event target adapter using a manual mock approach (no external dependencies needed).
- Created: `test/event-target-redis.test.ts` (17 tests across 7 describe blocks)
- Tests: 17, all passing
- Coverage: dispatchEvent/publish path, addEventListener/subscribe path, removeEventListener/unsubscribe path, topic scoping, envelope round-trip serialization, multiple listeners (single Redis subscribe), custom serializer, EventListenerObject support

View File

@@ -1,7 +1,7 @@
---
id: build-and-exports-validation
name: Validate build, package.json exports, and tsup config for all adapters
status: pending
status: completed
depends_on: []
scope: narrow
risk: low
@@ -38,8 +38,51 @@ This task validates the current setup and serves as a checklist item for each ad
## Notes
> To be filled by implementation agent
Validated all acceptance criteria. No changes were needed — the existing configuration is correct.
### Validation Results
1. **`npm run build` produces correct output** ✅
- `src/index.ts``dist/index.js` (ESM), `dist/index.cjs` (CJS), `dist/index.d.ts` + `dist/index.d.cts` (declarations)
- `src/event-target-redis.ts``dist/event-target-redis.js` (ESM), `dist/event-target-redis.cjs` (CJS), `dist/event-target-redis.d.ts` + `dist/event-target-redis.d.cts` (declarations)
- Code splitting enabled: shared chunk between entries (`chunk-JGLUDY7D.js` / `chunk-TBKAZXTS.cjs`)
- Verified both ESM dynamic import and CJS require work for both entry points
2. **`npm run lint` (tsc --noEmit) passes** ✅ — No errors
3. **`package.json` exports map has correct ESM/CJS/dts paths** ✅
- `"."` → import (d.ts + js) / require (d.cts + cjs)
- `"./event-target-redis"` → import (d.ts + js) / require (d.cts + cjs)
- All paths resolve to actual built files
4. **`tsup.config.ts` lists all current entry points** ✅
- `src/index.ts` and `src/event-target-redis.ts` both present
- `format: ['esm', 'cjs']`, `dts: true`, `splitting: true`, `target: 'es2022'`
5. **`src/index.ts` re-exports everything from all modules** ✅
- Exports from: `create_pubsub.js`, `types.js`, `operators.js`, `repeater.js`, `event-target-redis.js`
- All public API surface is accessible from barrel
6. **Peer dependencies and peerDependenciesMeta are correct**
- `ioredis@^5.0.0` as optional peer dep
- Future: `@rayhanadev/iroh` will follow same pattern when added
7. **No runtime dependencies**
- `dependencies: {}` is empty
- `@repeaterjs/repeater` is inlined in `src/repeater.ts`
### Verified type resolution
- `import { createRedisEventTarget } from '@alkdev/pubsub/event-target-redis'` — types resolve correctly
- `import { createPubSub, type EventEnvelope } from '@alkdev/pubsub'` — types resolve correctly
- Works with both `skipLibCheck: true` and `skipLibCheck: false`
### Verified package contents
- `npm pack --dry-run` shows 19 files (all dist files + package.json)
- `files: ["dist"]` correctly includes all build output
## Summary
> To be filled on completion
Validated build pipeline, package.json exports map, and tsup config for all adapters. All acceptance criteria pass with no changes required — the existing configuration is correct and consistent.
- Created: none
- Modified: none (validation-only task)
- Tests: 0 (no test files exist yet; `npm test` exits with "No test files found")

View File

@@ -394,6 +394,169 @@ describe("createRedisEventTarget", () => {
});
});
describe("channel prefix", () => {
it("publishes to prefixed channel when prefix is set", () => {
const publishClient = createMockRedis();
const subscribeClient = createMockRedis();
const eventTarget = createRedisEventTarget<TestEvent>({
publishClient: publishClient as any,
subscribeClient: subscribeClient as any,
prefix: "alk:events:",
});
const event = new CustomEvent("call.responded:uuid-123", {
detail: { type: "call.responded", id: "uuid-123", payload: { status: "ok" } },
}) as TestEvent;
eventTarget.dispatchEvent(event);
expect(publishClient.publish).toHaveBeenCalledWith(
"alk:events:call.responded:uuid-123",
expect.any(String),
);
});
it("subscribes to prefixed channel when prefix is set", () => {
const publishClient = createMockRedis();
const subscribeClient = createMockRedis();
const eventTarget = createRedisEventTarget<TestEvent>({
publishClient: publishClient as any,
subscribeClient: subscribeClient as any,
prefix: "alk:events:",
});
const listener = vi.fn();
eventTarget.addEventListener("call.responded:uuid-123", listener);
expect(subscribeClient.subscribe).toHaveBeenCalledWith("alk:events:call.responded:uuid-123");
});
it("unsubscribes from prefixed channel when prefix is set", () => {
const publishClient = createMockRedis();
const subscribeClient = createMockRedis();
const eventTarget = createRedisEventTarget<TestEvent>({
publishClient: publishClient as any,
subscribeClient: subscribeClient as any,
prefix: "alk:events:",
});
const listener = vi.fn();
eventTarget.addEventListener("call.responded:uuid-123", listener);
eventTarget.removeEventListener("call.responded:uuid-123", listener);
expect(subscribeClient.unsubscribe).toHaveBeenCalledWith("alk:events:call.responded:uuid-123");
});
it("delivers messages on prefixed channels to listeners", () => {
const publishClient = createMockRedis();
const subscribeClient = createMockRedis();
const eventTarget = createRedisEventTarget<TestEvent>({
publishClient: publishClient as any,
subscribeClient: subscribeClient as any,
prefix: "alk:events:",
});
const listener = vi.fn();
eventTarget.addEventListener("call.responded:uuid-123", listener);
const envelope: EventEnvelope = {
type: "call.responded",
id: "uuid-123",
payload: { status: "ok" },
};
subscribeClient.simulateMessage("alk:events:call.responded:uuid-123", JSON.stringify(envelope));
expect(listener).toHaveBeenCalledTimes(1);
expect((listener.mock.calls[0][0] as TestEvent).detail).toEqual(envelope);
});
it("ignores messages on non-prefixed channels when prefix is set", () => {
const publishClient = createMockRedis();
const subscribeClient = createMockRedis();
const eventTarget = createRedisEventTarget<TestEvent>({
publishClient: publishClient as any,
subscribeClient: subscribeClient as any,
prefix: "alk:events:",
});
const listener = vi.fn();
eventTarget.addEventListener("call.responded:uuid-123", listener);
subscribeClient.simulateMessage("call.responded:uuid-123", JSON.stringify({ type: "call.responded", id: "uuid-123", payload: null }));
expect(listener).not.toHaveBeenCalled();
});
it("defaults prefix to empty string", () => {
const publishClient = createMockRedis();
const subscribeClient = createMockRedis();
const eventTarget = createRedisEventTarget<TestEvent>({
publishClient: publishClient as any,
subscribeClient: subscribeClient as any,
});
const event = new CustomEvent("test:1", {
detail: { type: "test", id: "1", payload: null },
}) as TestEvent;
eventTarget.dispatchEvent(event);
expect(publishClient.publish).toHaveBeenCalledWith("test:1", expect.any(String));
});
});
describe("error handling", () => {
it("skips messages that fail to parse and logs a warning", () => {
const publishClient = createMockRedis();
const subscribeClient = createMockRedis();
const eventTarget = createRedisEventTarget<TestEvent>({
publishClient: publishClient as any,
subscribeClient: subscribeClient as any,
});
const listener = vi.fn();
eventTarget.addEventListener("topic:a", listener);
const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {});
subscribeClient.simulateMessage("topic:a", "not valid json{{");
expect(listener).not.toHaveBeenCalled();
expect(warnSpy).toHaveBeenCalledTimes(1);
expect(warnSpy).toHaveBeenCalledWith(
'Failed to parse message on channel "topic:a": not valid json{{',
);
warnSpy.mockRestore();
});
it("continues delivering valid messages after a parse error", () => {
const publishClient = createMockRedis();
const subscribeClient = createMockRedis();
const eventTarget = createRedisEventTarget<TestEvent>({
publishClient: publishClient as any,
subscribeClient: subscribeClient as any,
});
const listener = vi.fn();
eventTarget.addEventListener("topic:a", listener);
const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {});
subscribeClient.simulateMessage("topic:a", "broken{{{");
expect(listener).not.toHaveBeenCalled();
const envelope: EventEnvelope = { type: "topic", id: "a", payload: "good" };
subscribeClient.simulateMessage("topic:a", JSON.stringify(envelope));
expect(listener).toHaveBeenCalledTimes(1);
expect((listener.mock.calls[0][0] as TestEvent).detail).toEqual(envelope);
warnSpy.mockRestore();
});
});
describe("EventListenerObject support", () => {
it("addEventListener accepts EventListenerObject with handleEvent", () => {
const publishClient = createMockRedis();