diff --git a/docs/architecture/api-surface.md b/docs/architecture/api-surface.md index e1da375..05ac4b7 100644 --- a/docs/architecture/api-surface.md +++ b/docs/architecture/api-surface.md @@ -90,7 +90,7 @@ The `Repeater` automatically cleans up its `addEventListener` when the consumer |--------|--------|-------------| | `EventEnvelope` | `types.ts` | Cross-platform envelope: `{ type, id, payload }`. JSON-serializable. | | `TypedEvent` | `types.ts` | Event with typed `type` and `detail`. Omits `CustomEvent`'s untyped fields. | -| `TypedEventTarget` | `types.ts` | Extends `EventTarget` with typed `addEventListener`, `dispatchEvent`, `removeEventListener`. | +| `TypedEventTarget` | `types.ts` | Extends `EventTarget` with typed `addEventListener`, `dispatchEvent`, `removeEventListener`. All adapters' `dispatchEvent` returns `true` (events are non-cancelable). | | `TypedEventListener` | `types.ts` | `(evt: TEvent) => void` | | `TypedEventListenerObject` | `types.ts` | `{ handleEvent(object: TEvent): void }` | | `TypedEventListenerOrEventListenerObject` | `types.ts` | Union of the above | @@ -99,6 +99,27 @@ The `Repeater` automatically cleans up its `addEventListener` when the consumer | `PubSubEvent` | `create_pubsub.ts` | Derived `TypedEvent` for a specific event type, with `detail` as `EventEnvelope` | | `PubSubEventTarget` | `create_pubsub.ts` | `TypedEventTarget>` | +## Adapter Lifecycle + +All transport adapters provide a `close()` method for graceful teardown. After `close()`: + +- The adapter is unusable (no-op for `addEventListener`, `removeEventListener`, `dispatchEvent`) +- All subscriptions are cleaned up (Redis channels unsubscribed, `__unsubscribe` sent for WebSocket topics, callbacks cleared) +- Intercepted handlers are restored to their originals +- The underlying transport (Redis connection, WebSocket, Worker) is **not** destroyed — the caller owns it + +`close()` is idempotent. Calling it multiple times is safe. + +Adapter return types reflect this: + +| Adapter | Return type | +|---------|-------------| +| Redis | `RedisEventTarget` (extends `TypedEventTarget`, adds `close()`) | +| WebSocket Client | `WebSocketClientEventTarget` (extends `TypedEventTarget`, adds `close()`) | +| WebSocket Server | `WebSocketServerEventTarget` (extends `TypedEventTarget`, adds `addConnection`, `removeConnection`, `close()`) | +| Worker Host | `WorkerHostEventTarget` (extends `TypedEventTarget`, adds `close()`) | +| Worker Thread | `WorkerThreadEventTarget` (extends `TypedEventTarget`, adds `close()`) | + ## Operators All operators work with any `AsyncIterable`. Operators that return `Repeater` provide backpressure-aware push semantics. diff --git a/docs/architecture/build-distribution.md b/docs/architecture/build-distribution.md index 48f3030..ed89b43 100644 --- a/docs/architecture/build-distribution.md +++ b/docs/architecture/build-distribution.md @@ -28,22 +28,25 @@ No logger dependency. No TypeBox dependency (call protocol and schemas moved to types.ts # TypedEvent, TypedEventTarget, EventEnvelope create_pubsub.ts # createPubSub factory (adapted from graphql-yoga) operators.ts # filter, map, pipe, take, reduce, toArray, - # batch, dedupe, window, flat, groupBy, chain, join + # batch, dedupe, window, flat, groupBy, chain, join repeater.ts # Inlined from @repeaterjs/repeater (MIT) event-target-redis.ts # createRedisEventTarget (peer dep: ioredis) - # Future adapters (each is its own entry point + peer dep island): - # event-target-websocket.ts # (peer dep: none, web standard) - # event-target-worker.ts # (peer dep: none, web standard) + event-target-websocket-client.ts # createWebSocketClientEventTarget + event-target-websocket-server.ts # createWebSocketServerEventTarget, WebSocketLike, SpokeEventTarget + event-target-worker.ts # createWorkerHostEventTarget, createWorkerThreadEventTarget + # Future adapters: # event-target-iroh.ts # (peer dep: @rayhanadev/iroh) test/ create_pubsub.test.ts operators.test.ts event-target-redis.test.ts - # event-target-websocket.test.ts - # event-target-worker.test.ts - # event-target-iroh.test.ts + event-target-websocket-client.test.ts + event-target-websocket-server.test.ts + event-target-worker.test.ts + integration-pubsub-redis.test.ts + integration-websocket.test.ts docs/ - architecture.md + architecture/ architecture/ research/ package.json @@ -61,7 +64,8 @@ We use explicit sub-path exports rather than barrel-only + tree-shaking. Each ad "exports": { ".": { ... }, "./event-target-redis": { ... }, - "./event-target-websocket": { ... }, + "./event-target-websocket-client": { ... }, + "./event-target-websocket-server": { ... }, "./event-target-worker": { ... }, "./event-target-iroh": { ... } } diff --git a/docs/architecture/event-targets.md b/docs/architecture/event-targets.md index ec7ef63..3190342 100644 --- a/docs/architecture/event-targets.md +++ b/docs/architecture/event-targets.md @@ -16,6 +16,7 @@ Every adapter must implement: | `addEventListener(type, callback)` | Register listener for event type. Callback receives `CustomEvent` with typed `detail` (an `EventEnvelope`). | | `dispatchEvent(event)` | Send/dispatch event. Returns `boolean` (always `true` for non-cancelable events). | | `removeEventListener(type, callback)` | Unregister listener. Clean up underlying subscription when no listeners remain for a topic. | +| `close()` | Teardown: clean up all subscriptions, restore any intercepted handlers, remove message listeners. Adapter is unusable after `close()`. Idempotent. Does **not** destroy the underlying transport (which the caller owns). | ## Topology Model @@ -56,14 +57,35 @@ See [ADR-003](decisions/003-subscription-control-protocol.md). This is analogous to Redis's `SUBSCRIBE`/`UNSUBSCRIBE` commands — control messages share the same wire format and connection as data. +## Lifecycle + +All adapters that acquire resources (handler interception, message listeners, subscriptions) provide a `close()` method for graceful teardown. `close()` is idempotent — calling it more than once is a no-op. + +`close()` does **not** destroy the underlying transport (Redis connection, WebSocket, Worker). The caller owns the transport and decides when to disconnect it. `close()` only cleans up the adapter's own state: + +- Removes message listeners from the transport +- Restores any original `onmessage`/`onclose` handlers that were intercepted +- Unsubscribes from all Redis channels / sends `__unsubscribe` for all active topics +- Clears internal maps (subscription tracking, callbacks) + +After `close()`, the adapter is unusable: `addEventListener`, `removeEventListener`, and `dispatchEvent` become no-ops. This is intentional — the caller should create a new adapter if they need to reconnect. + +| Adapter | What `close()` does | +|---------|---------------------| +| Redis | Unsubscribes all channels, removes `message` listener, clears callback map | +| WebSocket Client | Sends `__unsubscribe` for all active topics, restores original `onmessage`, clears callback map | +| WebSocket Server | Removes all connections (restoring their original handlers, firing `onDisconnection`), clears local listener map | +| Worker Host | Restores original `worker.onmessage`, clears callback map | +| Worker Thread | Restores original `globalThis.onmessage`, clears callback map | + ## Adapter Docs | Adapter | Import | Status | |---------|--------|--------| -| [In-Process](in-process.md) | (default, no import) | Implemented (built-in `EventTarget`) | -| [Redis](redis.md) | `@alkdev/pubsub/event-target-redis` | Implemented. Needs tests. | -| [WebSocket Client](websocket-client.md) | `@alkdev/pubsub/event-target-websocket-client` | Not yet implemented | -| [WebSocket Server](websocket-server.md) | `@alkdev/pubsub/event-target-websocket-server` | Not yet implemented | -| [Worker](worker.md) | `@alkdev/pubsub/event-target-worker` | Not yet implemented (R&D on Node vs Web Worker) | -| [Iroh Spoke](iroh-spoke.md) | `@alkdev/pubsub/event-target-iroh-spoke` | Deferred (pending fork of iroh-ts) | -| [Iroh Hub](iroh-hub.md) | `@alkdev/pubsub/event-target-iroh-hub` | Deferred (pending fork of iroh-ts) | \ No newline at end of file +| [In-Process](event-targets/in-process.md) | (default, no import) | Implemented (built-in `EventTarget`) | +| [Redis](event-targets/redis.md) | `@alkdev/pubsub/event-target-redis` | Implemented | +| [WebSocket Client](event-targets/websocket-client.md) | `@alkdev/pubsub/event-target-websocket-client` | Implemented | +| [WebSocket Server](event-targets/websocket-server.md) | `@alkdev/pubsub/event-target-websocket-server` | Implemented | +| [Worker](event-targets/worker.md) | `@alkdev/pubsub/event-target-worker` | Implemented | +| [Iroh Spoke](iroh-transport.md) | `@alkdev/pubsub/event-target-iroh-spoke` | Deferred (pending fork of iroh-ts) | +| [Iroh Hub](iroh-transport.md) | `@alkdev/pubsub/event-target-iroh-hub` | Deferred (pending fork of iroh-ts) | \ No newline at end of file diff --git a/package.json b/package.json index 2fdf015..e3265fc 100644 --- a/package.json +++ b/package.json @@ -61,6 +61,7 @@ "publishConfig": { "access": "public" }, + "sideEffects": false, "files": [ "dist" ], diff --git a/src/event-target-redis.ts b/src/event-target-redis.ts index 9a931f7..f8ee730 100644 --- a/src/event-target-redis.ts +++ b/src/event-target-redis.ts @@ -44,9 +44,13 @@ export type CreateRedisEventTargetArgs = { prefix?: string; }; +export interface RedisEventTarget extends TypedEventTarget { + close(): void; +} + export function createRedisEventTarget( args: CreateRedisEventTargetArgs, -): TypedEventTarget { +): RedisEventTarget { const { publishClient, subscribeClient } = args; const serializer = args.serializer ?? JSON; @@ -127,5 +131,13 @@ export function createRedisEventTarget( removeCallback(topic, callback); } }, + close() { + const topics = [...callbacksForTopic.keys()]; + callbacksForTopic.clear(); + for (const topic of topics) { + subscribeClient.unsubscribe(topic); + } + (subscribeClient as Redis).off("message", onMessage); + }, }; } \ No newline at end of file diff --git a/src/event-target-websocket-client.ts b/src/event-target-websocket-client.ts index 8fcd750..cdf694a 100644 --- a/src/event-target-websocket-client.ts +++ b/src/event-target-websocket-client.ts @@ -1,10 +1,17 @@ import type { TypedEventTarget, TypedEvent, EventEnvelope } from "./types.js"; +export interface WebSocketClientEventTarget extends TypedEventTarget { + close(): void; +} + export function createWebSocketClientEventTarget( ws: WebSocket, -): TypedEventTarget { +): WebSocketClientEventTarget { const callbacksForTopic = new Map>(); + const originalOnmessage = ws.onmessage; + let closed = false; + ws.onmessage = (event: MessageEvent) => { let envelope: EventEnvelope; try { @@ -36,6 +43,7 @@ export function createWebSocketClientEventTarget( }; function addCallback(topic: string, callback: EventListener) { + if (closed) return; let callbacks = callbacksForTopic.get(topic); const isFirst = callbacks === undefined; if (isFirst) { @@ -56,6 +64,7 @@ export function createWebSocketClientEventTarget( } function removeCallback(topic: string, callback: EventListener) { + if (closed) return; const callbacks = callbacksForTopic.get(topic); if (callbacks === undefined) { return; @@ -86,6 +95,7 @@ export function createWebSocketClientEventTarget( } }, dispatchEvent(event: TEvent) { + if (closed) return true; ws.send(JSON.stringify(event.detail)); return true; }, @@ -96,5 +106,22 @@ export function createWebSocketClientEventTarget( removeCallback(topic, callback); } }, + close() { + if (closed) return; + closed = true; + for (const [topic, callbacks] of callbacksForTopic) { + if (callbacks.size > 0) { + ws.send( + JSON.stringify({ + type: "__unsubscribe", + id: "", + payload: { topic }, + }), + ); + } + } + callbacksForTopic.clear(); + ws.onmessage = originalOnmessage; + }, }; } \ No newline at end of file diff --git a/src/event-target-websocket-server.ts b/src/event-target-websocket-server.ts index bcda0d3..c488588 100644 --- a/src/event-target-websocket-server.ts +++ b/src/event-target-websocket-server.ts @@ -22,6 +22,7 @@ export interface CreateWebSocketServerEventTargetArgs export interface WebSocketServerEventTarget extends TypedEventTarget { addConnection(ws: WebSocketLike): void; removeConnection(ws: WebSocketLike): void; + close(): void; } export function createWebSocketServerEventTarget( @@ -96,6 +97,7 @@ export function createWebSocketServerEventTarget( } function removeConnection(ws: WebSocketLike) { + if (!spokeTargets.has(ws)) return; const topics = connectionSubscriptions.get(ws); if (topics !== undefined) { for (const topic of topics) { @@ -276,6 +278,12 @@ export function createWebSocketServerEventTarget( } return true; }, + close() { + for (const ws of [...spokeTargets.keys()]) { + removeConnection(ws); + } + localListeners.clear(); + }, }; return serverTarget; diff --git a/src/event-target-worker.ts b/src/event-target-worker.ts index 681408f..9407152 100644 --- a/src/event-target-worker.ts +++ b/src/event-target-worker.ts @@ -1,10 +1,20 @@ import type { TypedEventTarget, TypedEvent, EventEnvelope } from "./types.js"; +export interface WorkerHostEventTarget extends TypedEventTarget { + close(): void; +} + +export interface WorkerThreadEventTarget extends TypedEventTarget { + close(): void; +} + export function createWorkerHostEventTarget( worker: Worker, -): TypedEventTarget { +): WorkerHostEventTarget { const callbacksForTopic = new Map>(); + const originalOnmessage = worker.onmessage; + worker.onmessage = (event: MessageEvent) => { const envelope = event.data as EventEnvelope; if (typeof envelope?.type !== "string" || envelope.type.startsWith("__")) { @@ -65,10 +75,14 @@ export function createWorkerHostEventTarget( removeCallback(topic, callback); } }, + close() { + callbacksForTopic.clear(); + worker.onmessage = originalOnmessage; + }, }; } -export function createWorkerThreadEventTarget(): TypedEventTarget { +export function createWorkerThreadEventTarget(): WorkerThreadEventTarget { const callbacksForTopic = new Map>(); const global = globalThis as unknown as { @@ -76,6 +90,14 @@ export function createWorkerThreadEventTarget(): Type postMessage: (message: unknown) => void; }; + if (typeof global.postMessage !== "function") { + throw new Error( + "createWorkerThreadEventTarget must be called inside a Worker context where globalThis.postMessage is available", + ); + } + + const originalOnmessage = global.onmessage; + global.onmessage = (event: MessageEvent) => { const envelope = event.data as EventEnvelope; if (typeof envelope?.type !== "string" || envelope.type.startsWith("__")) { @@ -136,5 +158,9 @@ export function createWorkerThreadEventTarget(): Type removeCallback(topic, callback); } }, + close() { + callbacksForTopic.clear(); + global.onmessage = originalOnmessage; + }, }; } \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index f4bf5fa..d7a5981 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,7 +2,7 @@ export { createPubSub, type PubSub, type PubSubConfig, type PubSubEvent, type Pu export { type EventEnvelope, type TypedEvent, type TypedEventTarget, type TypedEventListener, type TypedEventListenerObject, type TypedEventListenerOrEventListenerObject } from "./types.js"; export { filter, map, pipe, take, reduce, toArray, batch, dedupe, window, flat, groupBy, chain, join } from "./operators.js"; export { Repeater, RepeaterOverflowError, type Push, type Stop, type RepeaterExecutor, type RepeaterBuffer } from "./repeater.js"; -export { createRedisEventTarget, type CreateRedisEventTargetArgs } from "./event-target-redis.js"; -export { createWebSocketClientEventTarget } from "./event-target-websocket-client.js"; +export { createRedisEventTarget, type CreateRedisEventTargetArgs, type RedisEventTarget } from "./event-target-redis.js"; +export { createWebSocketClientEventTarget, type WebSocketClientEventTarget } from "./event-target-websocket-client.js"; export { createWebSocketServerEventTarget, type WebSocketLike, type SpokeEventTarget, type CreateWebSocketServerEventTargetArgs, type WebSocketServerEventTarget } from "./event-target-websocket-server.js"; -export { createWorkerHostEventTarget, createWorkerThreadEventTarget } from "./event-target-worker.js"; \ No newline at end of file +export { createWorkerHostEventTarget, createWorkerThreadEventTarget, type WorkerHostEventTarget, type WorkerThreadEventTarget } from "./event-target-worker.js"; \ No newline at end of file diff --git a/test/event-target-redis.test.ts b/test/event-target-redis.test.ts index e13987d..e7f5068 100644 --- a/test/event-target-redis.test.ts +++ b/test/event-target-redis.test.ts @@ -26,6 +26,11 @@ function createMockRedis() { } return {} as any; }), + off: vi.fn((event: string, callback: (channel: string, message: string) => void) => { + if (event === "message" && messageListener === callback) { + messageListener = null; + } + }), publications, subscriptions, unsubscriptions, @@ -595,4 +600,102 @@ describe("createRedisEventTarget", () => { expect(subscribeClient.unsubscribe).toHaveBeenCalledWith("obj:test2"); }); }); + + describe("close()", () => { + it("unsubscribes from all active channels", () => { + const publishClient = createMockRedis(); + const subscribeClient = createMockRedis(); + const eventTarget = createRedisEventTarget({ + publishClient: publishClient as any, + subscribeClient: subscribeClient as any, + }); + + const listener1 = vi.fn(); + const listener2 = vi.fn(); + eventTarget.addEventListener("topic:a", listener1); + eventTarget.addEventListener("topic:b", listener2); + + eventTarget.close(); + + expect(subscribeClient.unsubscribe).toHaveBeenCalledWith("topic:a"); + expect(subscribeClient.unsubscribe).toHaveBeenCalledWith("topic:b"); + }); + + it("removes the message listener from subscribeClient", () => { + const publishClient = createMockRedis(); + const subscribeClient = createMockRedis(); + const eventTarget = createRedisEventTarget({ + publishClient: publishClient as any, + subscribeClient: subscribeClient as any, + }); + + eventTarget.close(); + + expect(subscribeClient.off).toHaveBeenCalledWith("message", expect.any(Function)); + }); + + it("does not receive messages after close", () => { + const publishClient = createMockRedis(); + const subscribeClient = createMockRedis(); + const eventTarget = createRedisEventTarget({ + publishClient: publishClient as any, + subscribeClient: subscribeClient as any, + }); + + const listener = vi.fn(); + eventTarget.addEventListener("topic:a", listener); + + eventTarget.close(); + + const envelope: EventEnvelope = { type: "topic", id: "a", payload: "hello" }; + subscribeClient.simulateMessage("topic:a", JSON.stringify(envelope)); + + expect(listener).not.toHaveBeenCalled(); + }); + + it("is idempotent", () => { + const publishClient = createMockRedis(); + const subscribeClient = createMockRedis(); + const eventTarget = createRedisEventTarget({ + publishClient: publishClient as any, + subscribeClient: subscribeClient as any, + }); + + const listener = vi.fn(); + eventTarget.addEventListener("topic:a", listener); + + eventTarget.close(); + eventTarget.close(); + + expect(subscribeClient.unsubscribe).toHaveBeenCalledTimes(1); + }); + + it("handles close with no subscriptions", () => { + const publishClient = createMockRedis(); + const subscribeClient = createMockRedis(); + const eventTarget = createRedisEventTarget({ + publishClient: publishClient as any, + subscribeClient: subscribeClient as any, + }); + + expect(() => eventTarget.close()).not.toThrow(); + }); + + it("unsubscribes from prefixed channels correctly", () => { + const publishClient = createMockRedis(); + const subscribeClient = createMockRedis(); + const eventTarget = createRedisEventTarget({ + publishClient: publishClient as any, + subscribeClient: subscribeClient as any, + prefix: "alk:events:", + }); + + const listener = vi.fn(); + eventTarget.addEventListener("topic:a", listener); + + eventTarget.close(); + + expect(subscribeClient.unsubscribe).toHaveBeenCalledWith("alk:events:topic:a"); + }); + }); }); \ No newline at end of file diff --git a/test/event-target-websocket-client.test.ts b/test/event-target-websocket-client.test.ts index a6a0f79..e4ead97 100644 --- a/test/event-target-websocket-client.test.ts +++ b/test/event-target-websocket-client.test.ts @@ -626,6 +626,100 @@ describe("createWebSocketClientEventTarget", () => { }); }); + describe("close()", () => { + it("sends __unsubscribe for all active subscriptions", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + + const listener1 = vi.fn(); + const listener2 = vi.fn(); + eventTarget.addEventListener("topic:a", listener1); + eventTarget.addEventListener("topic:b", listener2); + + (ws.send as ReturnType).mockClear(); + + eventTarget.close(); + + const sent = ws.sent.map((s: string) => JSON.parse(s)); + const unsubscribes = sent.filter((e: any) => e.type === "__unsubscribe"); + expect(unsubscribes).toHaveLength(2); + const topics = unsubscribes.map((e: any) => e.payload.topic); + expect(topics).toContain("topic:a"); + expect(topics).toContain("topic:b"); + }); + + it("restores original onmessage handler", () => { + const ws = createMockWebSocket(); + const originalOnmessage = vi.fn(); + ws.onmessage = originalOnmessage; + + const eventTarget = createWebSocketClientEventTarget(ws as any); + expect(ws.onmessage).not.toBe(originalOnmessage); + + eventTarget.close(); + + expect(ws.onmessage).toBe(originalOnmessage); + }); + + it("does not deliver messages after close", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + + const listener = vi.fn(); + eventTarget.addEventListener("topic:a", listener); + + eventTarget.close(); + + const envelope: EventEnvelope = { type: "topic", id: "a", payload: "hello" }; + ws.simulateMessage(JSON.stringify(envelope)); + + expect(listener).not.toHaveBeenCalled(); + }); + + it("does not send __subscribe after close", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + eventTarget.close(); + + const listener = vi.fn(); + eventTarget.addEventListener("topic:a", listener); + + expect(ws.send).not.toHaveBeenCalledWith( + expect.stringContaining("__subscribe"), + ); + }); + + it("dispatchEvent returns true but does not send after close", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + eventTarget.close(); + + (ws.send as ReturnType).mockClear(); + + const event = new CustomEvent("test:event", { + detail: { type: "test", id: "event", payload: null }, + }) as TestEvent; + + const result = eventTarget.dispatchEvent(event); + expect(result).toBe(true); + expect(ws.send).not.toHaveBeenCalled(); + }); + + it("is idempotent", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + + const listener = vi.fn(); + eventTarget.addEventListener("topic:a", listener); + + eventTarget.close(); + eventTarget.close(); + + const sentCalls = ws.sent.filter((s: string) => JSON.parse(s).type === "__unsubscribe"); + expect(sentCalls).toHaveLength(1); + }); + }); + describe("dispatchEvent (send path) edge cases", () => { it("sends envelope with null payload", () => { const ws = createMockWebSocket(); diff --git a/test/event-target-websocket-server.test.ts b/test/event-target-websocket-server.test.ts index 59ce9da..6cc4a2e 100644 --- a/test/event-target-websocket-server.test.ts +++ b/test/event-target-websocket-server.test.ts @@ -819,4 +819,90 @@ describe("createWebSocketServerEventTarget", () => { expect(rawWs).toBe(ws); }); }); + + describe("close()", () => { + it("removes all connections and clears local listeners", () => { + const onDisconnection = vi.fn(); + const server = createWebSocketServerEventTarget({ onDisconnection }); + const ws1 = createMockWebSocket(); + const ws2 = createMockWebSocket(); + + server.addConnection(ws1 as any); + server.addConnection(ws2 as any); + + server.close(); + + expect(onDisconnection).toHaveBeenCalledTimes(2); + }); + + it("no longer delivers events to removed connections after close", () => { + const server = createWebSocketServerEventTarget(); + const ws = createMockWebSocket(); + + server.addConnection(ws as any); + ws.simulateMessage(JSON.stringify({ type: "__subscribe", id: "", payload: { topic: "chat:room1" } })); + + server.close(); + + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; + const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; + server.dispatchEvent(event); + + expect(ws.send).not.toHaveBeenCalledWith(JSON.stringify(envelope)); + }); + + it("no longer delivers events to local listeners after close", () => { + const server = createWebSocketServerEventTarget(); + const listener = vi.fn(); + server.addEventListener("chat:room1", listener); + + server.close(); + + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; + const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; + server.dispatchEvent(event); + + expect(listener).not.toHaveBeenCalled(); + }); + + it("restores original onmessage and onclose for all connections", () => { + const server = createWebSocketServerEventTarget(); + const ws = createMockWebSocket(); + const originalOnmessage = vi.fn(); + const originalOnclose = vi.fn(); + ws.onmessage = originalOnmessage; + ws.onclose = originalOnclose; + + server.addConnection(ws as any); + expect(ws.onmessage).not.toBe(originalOnmessage); + + server.close(); + + expect(ws.onmessage).toBe(originalOnmessage); + expect(ws.onclose).toBe(originalOnclose); + }); + + it("does not close the WebSocket connections", () => { + const server = createWebSocketServerEventTarget(); + const ws = createMockWebSocket(); + + server.addConnection(ws as any); + server.close(); + + expect(ws.close).not.toHaveBeenCalled(); + }); + + it("is idempotent", () => { + const onDisconnection = vi.fn(); + const server = createWebSocketServerEventTarget({ onDisconnection }); + const ws = createMockWebSocket(); + + server.addConnection(ws as any); + + server.close(); + server.close(); + + expect(onDisconnection).toHaveBeenCalledTimes(1); + }); + }); }); \ No newline at end of file diff --git a/test/event-target-worker.test.ts b/test/event-target-worker.test.ts index 9ce8060..5de8b08 100644 --- a/test/event-target-worker.test.ts +++ b/test/event-target-worker.test.ts @@ -473,6 +473,47 @@ describe("createWorkerHostEventTarget", () => { expect(listener).not.toHaveBeenCalled(); }); }); + + describe("close()", () => { + it("restores original worker.onmessage handler", () => { + const worker = createMockWorker(); + const originalOnmessage = vi.fn(); + worker.onmessage = originalOnmessage as any; + + const eventTarget = createWorkerHostEventTarget(worker as any); + expect(worker.onmessage).not.toBe(originalOnmessage); + + eventTarget.close(); + expect(worker.onmessage).toBe(originalOnmessage); + }); + + it("clears all listeners so events are no longer delivered", () => { + const worker = createMockWorker(); + const eventTarget = createWorkerHostEventTarget(worker as any); + + const listener = vi.fn(); + eventTarget.addEventListener("topic:a", listener); + + eventTarget.close(); + + const envelope: EventEnvelope = { type: "topic", id: "a", payload: "data" }; + worker.simulateMessage(envelope); + + expect(listener).not.toHaveBeenCalled(); + }); + + it("is idempotent", () => { + const worker = createMockWorker(); + const originalOnmessage = vi.fn(); + worker.onmessage = originalOnmessage as any; + + const eventTarget = createWorkerHostEventTarget(worker as any); + eventTarget.close(); + eventTarget.close(); + + expect(worker.onmessage).toBe(originalOnmessage); + }); + }); }); describe("createWorkerThreadEventTarget", () => { @@ -769,6 +810,20 @@ describe("createWorkerThreadEventTarget", () => { }); }); +describe("createWorkerThreadEventTarget context guard", () => { + it("throws if globalThis.postMessage is not available", () => { + const originalPostMessage = (globalThis as any).postMessage; + delete (globalThis as any).postMessage; + try { + expect(() => createWorkerThreadEventTarget()).toThrow( + "createWorkerThreadEventTarget must be called inside a Worker context where globalThis.postMessage is available", + ); + } finally { + (globalThis as any).postMessage = originalPostMessage; + } + }); +}); + describe("bidirectional communication (host + thread)", () => { it("host sends envelope that thread receives", () => { const worker = createMockWorker();