/* * Adapted from @graphql-yoga/redis-event-target * Original source: https://github.com/graphql-hive/graphql-yoga * License: MIT * * Copyright (c) 2024 The Guild, GraphQL Yoga Contributors * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in all * copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. * * Changes from original: * - Uses native CustomEvent instead of @whatwg-node/events ponyfill (Deno has CustomEvent) * - Uses our TypedEventTarget/TypedEvent types from types.ts * - Removed tslib dependency * - Uses ioredis types directly (already a dependency) * - Serializes full EventEnvelope for cross-process transport */ import type { Cluster, Redis } from "ioredis"; import type { TypedEventTarget, TypedEvent, EventEnvelope } from "./types.js"; export type CreateRedisEventTargetArgs = { publishClient: Redis | Cluster; subscribeClient: Redis | Cluster; serializer?: { stringify: (message: unknown) => string; parse: (message: string) => unknown; }; prefix?: string; }; export function createRedisEventTarget( args: CreateRedisEventTargetArgs, ): TypedEventTarget { const { publishClient, subscribeClient } = args; const serializer = args.serializer ?? JSON; const prefix = args.prefix ?? ""; const callbacksForTopic = new Map>(); function onMessage(channel: string, message: string) { const callbacks = callbacksForTopic.get(channel); if (callbacks === undefined) { return; } let envelope: EventEnvelope; try { envelope = serializer.parse(message) as EventEnvelope; } catch { console.warn( `Failed to parse message on channel "${channel}": ${message}`, ); return; } const event = new CustomEvent(channel, { detail: envelope, }) as TEvent; for (const callback of callbacks) { callback(event); } } (subscribeClient as Redis).on("message", onMessage); function addCallback(topic: string, callback: EventListener) { const prefixedTopic = prefix + topic; let callbacks = callbacksForTopic.get(prefixedTopic); if (callbacks === undefined) { callbacks = new Set(); callbacksForTopic.set(prefixedTopic, callbacks); subscribeClient.subscribe(prefixedTopic); } callbacks.add(callback); } function removeCallback(topic: string, callback: EventListener) { const prefixedTopic = prefix + topic; const callbacks = callbacksForTopic.get(prefixedTopic); if (callbacks === undefined) { return; } callbacks.delete(callback); if (callbacks.size > 0) { return; } callbacksForTopic.delete(prefixedTopic); subscribeClient.unsubscribe(prefixedTopic); } return { addEventListener(topic, callbackOrOptions: EventListenerOrEventListenerObject) { if (callbackOrOptions != null) { const callback = "handleEvent" in callbackOrOptions ? callbackOrOptions.handleEvent : callbackOrOptions; addCallback(topic, callback); } }, dispatchEvent(event: TEvent) { publishClient.publish( prefix + event.type, serializer.stringify(event.detail), ); return true; }, removeEventListener(topic, callbackOrOptions: EventListenerOrEventListenerObject) { if (callbackOrOptions != null) { const callback = "handleEvent" in callbackOrOptions ? callbackOrOptions.handleEvent : callbackOrOptions; removeCallback(topic, callback); } }, }; }