Skip to content

Commit

Permalink
feat: allow providing a buffer for pubsub subscribe call
Browse files Browse the repository at this point in the history
  • Loading branch information
n1ru4l committed Oct 3, 2024
1 parent fbe0beb commit e70c28c
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 9 deletions.
20 changes: 20 additions & 0 deletions .changeset/angry-clocks-sniff.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
'@graphql-yoga/subscription': minor
---

Support providing a `RepeaterBuffer` to the `PubSub.subscribe` method, by using the new object based call signature.

```ts
import { createPubSub } from 'graphql-yoga'
import { SlidingBuffer } from '@repeaterjs/repeater'

const pubSub = createPubSub()

pubSub.subscribe({
topic: "userChanged",
id: "1",
buffer: new SlidingBuffer(1_000)
})
```

Learn more about buffers on the [Repeater.js website](https://repeater.js.org/docs/safety#3-buffering-and-dropping-values).
32 changes: 23 additions & 9 deletions packages/subscription/src/create-pub-sub.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { TypedEventTarget } from '@graphql-yoga/typed-event-target';
import { Repeater } from '@repeaterjs/repeater';
import { Repeater, type RepeaterBuffer } from '@repeaterjs/repeater';
import { CustomEvent } from '@whatwg-node/events';

type PubSubPublishArgsByKey = {
Expand Down Expand Up @@ -45,9 +45,15 @@ export type PubSub<TPubSubPublishArgsByKey extends PubSubPublishArgsByKey> = {
* Subscribe to a topic.
*/
subscribe<TKey extends Extract<keyof TPubSubPublishArgsByKey, string>>(
...[routingKey, id]: TPubSubPublishArgsByKey[TKey][1] extends undefined
? [TKey]
: [TKey, TPubSubPublishArgsByKey[TKey][0]]
args: {
topic: string;
} & (TPubSubPublishArgsByKey[TKey][1] extends undefined
? {
id?: void;
}
: {
id: TPubSubPublishArgsByKey[TKey][0];
}),
): Repeater<
TPubSubPublishArgsByKey[TKey][1] extends undefined
? MapToNull<TPubSubPublishArgsByKey[TKey][0]>
Expand Down Expand Up @@ -78,15 +84,23 @@ export const createPubSub = <TPubSubPublishArgsByKey extends PubSubPublishArgsBy
target.dispatchEvent(event);
},
subscribe<TKey extends Extract<keyof TPubSubPublishArgsByKey, string>>(
...[routingKey, id]: TPubSubPublishArgsByKey[TKey][1] extends undefined
? [TKey]
: [TKey, TPubSubPublishArgsByKey[TKey][0]]
args: {
/** Topic to subscribe to */
topic: string;
buffer?: RepeaterBuffer | undefined;
} & (TPubSubPublishArgsByKey[TKey][1] extends undefined
? {
id?: void;
}
: {
id: TPubSubPublishArgsByKey[TKey][0];
}),
): Repeater<
TPubSubPublishArgsByKey[TKey][1] extends undefined
? TPubSubPublishArgsByKey[TKey][0]
: TPubSubPublishArgsByKey[TKey][1]
> {
const topic = id === undefined ? routingKey : `${routingKey}:${id as number}`;
const topic = args.id === undefined ? args.topic : `${args.topic}:${args.id}`;

return new Repeater(function subscriptionRepeater(next, stop) {
stop.then(function subscriptionRepeaterStopHandler() {
Expand All @@ -98,7 +112,7 @@ export const createPubSub = <TPubSubPublishArgsByKey extends PubSubPublishArgsBy
function pubsubEventListener(event: PubSubEvent<TPubSubPublishArgsByKey, TKey>) {
next(event.detail);
}
});
}, args.buffer);
},
};
};

0 comments on commit e70c28c

Please sign in to comment.