From 3b853548c6464253e71569d22d4eccfc63fa7b2e Mon Sep 17 00:00:00 2001 From: Laurin Quast Date: Thu, 3 Oct 2024 12:20:34 +0200 Subject: [PATCH] feat: allow providing a buffer for pubsub subscribe call --- .changeset/angry-clocks-sniff.md | 20 ++++ packages/subscription/src/create-pub-sub.ts | 114 +++++++++++++++----- 2 files changed, 107 insertions(+), 27 deletions(-) create mode 100644 .changeset/angry-clocks-sniff.md diff --git a/.changeset/angry-clocks-sniff.md b/.changeset/angry-clocks-sniff.md new file mode 100644 index 0000000000..64b312f91a --- /dev/null +++ b/.changeset/angry-clocks-sniff.md @@ -0,0 +1,20 @@ +--- +'@graphql-yoga/subscription': minor +--- + +Support providing a `RepeaterBuffer` to the `PubSub.subscribe` method, by using the new object based call signature. + +```ts +import { createPubSub } from 'graphql-yoga' +import { SlidingBuffer } from '@repeaterjs/repeater' + +const pubSub = createPubSub() + +pubSub.subscribe({ + topic: "userChanged", + id: "1", + buffer: new SlidingBuffer(1_000) +}) +``` + +Learn more about buffers on the [Repeater.js website](https://repeater.js.org/docs/safety#3-buffering-and-dropping-values). diff --git a/packages/subscription/src/create-pub-sub.ts b/packages/subscription/src/create-pub-sub.ts index 447d093a5a..5138ba9882 100644 --- a/packages/subscription/src/create-pub-sub.ts +++ b/packages/subscription/src/create-pub-sub.ts @@ -1,5 +1,5 @@ import type { TypedEventTarget } from '@graphql-yoga/typed-event-target'; -import { Repeater } from '@repeaterjs/repeater'; +import { Repeater, type RepeaterBuffer } from '@repeaterjs/repeater'; import { CustomEvent } from '@whatwg-node/events'; type PubSubPublishArgsByKey = { @@ -45,9 +45,22 @@ export type PubSub = { * Subscribe to a topic. */ subscribe>( - ...[routingKey, id]: TPubSubPublishArgsByKey[TKey][1] extends undefined - ? [TKey] - : [TKey, TPubSubPublishArgsByKey[TKey][0]] + ...args: + | (TPubSubPublishArgsByKey[TKey][1] extends undefined + ? [key: TKey] + : [key: TKey, id: TPubSubPublishArgsByKey[TKey][0]]) + | [ + args: { + topic: string; + buffer?: RepeaterBuffer | undefined; + } & (TPubSubPublishArgsByKey[TKey][1] extends undefined + ? { + id?: void; + } + : { + id: TPubSubPublishArgsByKey[TKey][0]; + }), + ] ): Repeater< TPubSubPublishArgsByKey[TKey][1] extends undefined ? MapToNull @@ -64,6 +77,75 @@ export const createPubSub = ); + function subscribe>( + ...args: TPubSubPublishArgsByKey[TKey][1] extends undefined + ? [routingKey: TKey] + : [routingKey: TKey, id: TPubSubPublishArgsByKey[TKey][0]] + ): Repeater< + TPubSubPublishArgsByKey[TKey][1] extends undefined + ? TPubSubPublishArgsByKey[TKey][0] + : TPubSubPublishArgsByKey[TKey][1] + >; + function subscribe>( + args: { + topic: string; + buffer?: RepeaterBuffer | undefined; + } & (TPubSubPublishArgsByKey[TKey][1] extends undefined + ? { + id?: void; + } + : { + id: TPubSubPublishArgsByKey[TKey][0]; + }), + ): Repeater< + TPubSubPublishArgsByKey[TKey][1] extends undefined + ? TPubSubPublishArgsByKey[TKey][0] + : TPubSubPublishArgsByKey[TKey][1] + >; + function subscribe>( + ...args: + | (TPubSubPublishArgsByKey[TKey][1] extends undefined + ? [key: TKey] + : [key: TKey, id: TPubSubPublishArgsByKey[TKey][0]]) + | [ + args: { + topic: string; + buffer?: RepeaterBuffer | undefined; + } & (TPubSubPublishArgsByKey[TKey][1] extends undefined + ? { + id?: void; + } + : { + id: TPubSubPublishArgsByKey[TKey][0]; + }), + ] + ): Repeater< + TPubSubPublishArgsByKey[TKey][1] extends undefined + ? TPubSubPublishArgsByKey[TKey][0] + : TPubSubPublishArgsByKey[TKey][1] + > { + let topic: string; + let buffer: RepeaterBuffer | undefined; + if (typeof args[0] === 'string') { + topic = args[1] === undefined ? args[0] : `${args[0]}:${args[1]}`; + } else { + topic = args[0].id === undefined ? args[0].topic : `${args[0].topic}:${args[0].id}`; + buffer = args[0].buffer; + } + + return new Repeater(function subscriptionRepeater(next, stop) { + stop.then(function subscriptionRepeaterStopHandler() { + target.removeEventListener(topic, pubsubEventListener); + }); + + target.addEventListener(topic, pubsubEventListener); + + function pubsubEventListener(event: PubSubEvent) { + next(event.detail); + } + }, buffer); + } + return { publish>( routingKey: TKey, @@ -77,28 +159,6 @@ export const createPubSub = >( - ...[routingKey, id]: TPubSubPublishArgsByKey[TKey][1] extends undefined - ? [TKey] - : [TKey, TPubSubPublishArgsByKey[TKey][0]] - ): Repeater< - TPubSubPublishArgsByKey[TKey][1] extends undefined - ? TPubSubPublishArgsByKey[TKey][0] - : TPubSubPublishArgsByKey[TKey][1] - > { - const topic = id === undefined ? routingKey : `${routingKey}:${id as number}`; - - return new Repeater(function subscriptionRepeater(next, stop) { - stop.then(function subscriptionRepeaterStopHandler() { - target.removeEventListener(topic, pubsubEventListener); - }); - - target.addEventListener(topic, pubsubEventListener); - - function pubsubEventListener(event: PubSubEvent) { - next(event.detail); - } - }); - }, + subscribe, }; };