Files
pubsub/src/event-target-redis.ts

131 lines
4.4 KiB
TypeScript

/*
* Adapted from @graphql-yoga/redis-event-target
* Original source: https://github.com/graphql-hive/graphql-yoga
* License: MIT
*
* Copyright (c) 2024 The Guild, GraphQL Yoga Contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
* Changes from original:
* - Uses native CustomEvent instead of @whatwg-node/events ponyfill (Deno has CustomEvent)
* - Uses our TypedEventTarget/TypedEvent types from types.ts
* - Removed tslib dependency
* - Uses ioredis types directly (already a dependency)
* - Serializes full EventEnvelope for cross-process transport
*/
import type { Cluster, Redis } from "ioredis";
import type { TypedEventTarget, TypedEvent, EventEnvelope } from "./types.js";
export type CreateRedisEventTargetArgs = {
publishClient: Redis | Cluster;
subscribeClient: Redis | Cluster;
serializer?: {
stringify: (message: unknown) => string;
parse: (message: string) => unknown;
};
prefix?: string;
};
export function createRedisEventTarget<TEvent extends TypedEvent>(
args: CreateRedisEventTargetArgs,
): TypedEventTarget<TEvent> {
const { publishClient, subscribeClient } = args;
const serializer = args.serializer ?? JSON;
const prefix = args.prefix ?? "";
const callbacksForTopic = new Map<string, Set<EventListener>>();
function onMessage(channel: string, message: string) {
const callbacks = callbacksForTopic.get(channel);
if (callbacks === undefined) {
return;
}
let envelope: EventEnvelope;
try {
envelope = serializer.parse(message) as EventEnvelope;
} catch {
console.warn(
`Failed to parse message on channel "${channel}": ${message}`,
);
return;
}
const event = new CustomEvent(channel, {
detail: envelope,
}) as TEvent;
for (const callback of callbacks) {
callback(event);
}
}
(subscribeClient as Redis).on("message", onMessage);
function addCallback(topic: string, callback: EventListener) {
const prefixedTopic = prefix + topic;
let callbacks = callbacksForTopic.get(prefixedTopic);
if (callbacks === undefined) {
callbacks = new Set();
callbacksForTopic.set(prefixedTopic, callbacks);
subscribeClient.subscribe(prefixedTopic);
}
callbacks.add(callback);
}
function removeCallback(topic: string, callback: EventListener) {
const prefixedTopic = prefix + topic;
const callbacks = callbacksForTopic.get(prefixedTopic);
if (callbacks === undefined) {
return;
}
callbacks.delete(callback);
if (callbacks.size > 0) {
return;
}
callbacksForTopic.delete(prefixedTopic);
subscribeClient.unsubscribe(prefixedTopic);
}
return {
addEventListener(topic, callbackOrOptions: EventListenerOrEventListenerObject) {
if (callbackOrOptions != null) {
const callback =
"handleEvent" in callbackOrOptions ? callbackOrOptions.handleEvent : callbackOrOptions;
addCallback(topic, callback);
}
},
dispatchEvent(event: TEvent) {
publishClient.publish(
prefix + event.type,
serializer.stringify(event.detail),
);
return true;
},
removeEventListener(topic, callbackOrOptions: EventListenerOrEventListenerObject) {
if (callbackOrOptions != null) {
const callback =
"handleEvent" in callbackOrOptions ? callbackOrOptions.handleEvent : callbackOrOptions;
removeCallback(topic, callback);
}
},
};
}