Skip to content

Commit

Permalink
feat: allow providing a buffer for pubsub subscribe call
Browse files Browse the repository at this point in the history
  • Loading branch information
n1ru4l committed Oct 3, 2024
1 parent fbe0beb commit 3b85354
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 27 deletions.
20 changes: 20 additions & 0 deletions .changeset/angry-clocks-sniff.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
'@graphql-yoga/subscription': minor
---

Support providing a `RepeaterBuffer` to the `PubSub.subscribe` method, by using the new object based call signature.

```ts
import { createPubSub } from 'graphql-yoga'
import { SlidingBuffer } from '@repeaterjs/repeater'

const pubSub = createPubSub()

pubSub.subscribe({
topic: "userChanged",
id: "1",
buffer: new SlidingBuffer(1_000)
})
```

Learn more about buffers on the [Repeater.js website](https://repeater.js.org/docs/safety#3-buffering-and-dropping-values).
114 changes: 87 additions & 27 deletions packages/subscription/src/create-pub-sub.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { TypedEventTarget } from '@graphql-yoga/typed-event-target';
import { Repeater } from '@repeaterjs/repeater';
import { Repeater, type RepeaterBuffer } from '@repeaterjs/repeater';
import { CustomEvent } from '@whatwg-node/events';

type PubSubPublishArgsByKey = {
Expand Down Expand Up @@ -45,9 +45,22 @@ export type PubSub<TPubSubPublishArgsByKey extends PubSubPublishArgsByKey> = {
* Subscribe to a topic.
*/
subscribe<TKey extends Extract<keyof TPubSubPublishArgsByKey, string>>(
...[routingKey, id]: TPubSubPublishArgsByKey[TKey][1] extends undefined
? [TKey]
: [TKey, TPubSubPublishArgsByKey[TKey][0]]
...args:
| (TPubSubPublishArgsByKey[TKey][1] extends undefined
? [key: TKey]
: [key: TKey, id: TPubSubPublishArgsByKey[TKey][0]])
| [
args: {
topic: string;
buffer?: RepeaterBuffer | undefined;
} & (TPubSubPublishArgsByKey[TKey][1] extends undefined
? {
id?: void;
}
: {
id: TPubSubPublishArgsByKey[TKey][0];
}),
]
): Repeater<
TPubSubPublishArgsByKey[TKey][1] extends undefined
? MapToNull<TPubSubPublishArgsByKey[TKey][0]>
Expand All @@ -64,6 +77,75 @@ export const createPubSub = <TPubSubPublishArgsByKey extends PubSubPublishArgsBy
const target =
config?.eventTarget ?? (new EventTarget() as PubSubEventTarget<TPubSubPublishArgsByKey>);

function subscribe<TKey extends Extract<keyof TPubSubPublishArgsByKey, string>>(
...args: TPubSubPublishArgsByKey[TKey][1] extends undefined
? [routingKey: TKey]
: [routingKey: TKey, id: TPubSubPublishArgsByKey[TKey][0]]
): Repeater<
TPubSubPublishArgsByKey[TKey][1] extends undefined
? TPubSubPublishArgsByKey[TKey][0]
: TPubSubPublishArgsByKey[TKey][1]
>;
function subscribe<TKey extends Extract<keyof TPubSubPublishArgsByKey, string>>(
args: {
topic: string;
buffer?: RepeaterBuffer | undefined;
} & (TPubSubPublishArgsByKey[TKey][1] extends undefined
? {
id?: void;
}
: {
id: TPubSubPublishArgsByKey[TKey][0];
}),
): Repeater<
TPubSubPublishArgsByKey[TKey][1] extends undefined
? TPubSubPublishArgsByKey[TKey][0]
: TPubSubPublishArgsByKey[TKey][1]
>;
function subscribe<TKey extends Extract<keyof TPubSubPublishArgsByKey, string>>(
...args:
| (TPubSubPublishArgsByKey[TKey][1] extends undefined
? [key: TKey]
: [key: TKey, id: TPubSubPublishArgsByKey[TKey][0]])
| [
args: {
topic: string;
buffer?: RepeaterBuffer | undefined;
} & (TPubSubPublishArgsByKey[TKey][1] extends undefined
? {
id?: void;
}
: {
id: TPubSubPublishArgsByKey[TKey][0];
}),
]
): Repeater<
TPubSubPublishArgsByKey[TKey][1] extends undefined
? TPubSubPublishArgsByKey[TKey][0]
: TPubSubPublishArgsByKey[TKey][1]
> {
let topic: string;
let buffer: RepeaterBuffer | undefined;
if (typeof args[0] === 'string') {
topic = args[1] === undefined ? args[0] : `${args[0]}:${args[1]}`;
} else {
topic = args[0].id === undefined ? args[0].topic : `${args[0].topic}:${args[0].id}`;
buffer = args[0].buffer;
}

return new Repeater(function subscriptionRepeater(next, stop) {
stop.then(function subscriptionRepeaterStopHandler() {
target.removeEventListener(topic, pubsubEventListener);
});

target.addEventListener(topic, pubsubEventListener);

function pubsubEventListener(event: PubSubEvent<TPubSubPublishArgsByKey, TKey>) {
next(event.detail);
}
}, buffer);
}

return {
publish<TKey extends Extract<keyof TPubSubPublishArgsByKey, string>>(
routingKey: TKey,
Expand All @@ -77,28 +159,6 @@ export const createPubSub = <TPubSubPublishArgsByKey extends PubSubPublishArgsBy
});
target.dispatchEvent(event);
},
subscribe<TKey extends Extract<keyof TPubSubPublishArgsByKey, string>>(
...[routingKey, id]: TPubSubPublishArgsByKey[TKey][1] extends undefined
? [TKey]
: [TKey, TPubSubPublishArgsByKey[TKey][0]]
): Repeater<
TPubSubPublishArgsByKey[TKey][1] extends undefined
? TPubSubPublishArgsByKey[TKey][0]
: TPubSubPublishArgsByKey[TKey][1]
> {
const topic = id === undefined ? routingKey : `${routingKey}:${id as number}`;

return new Repeater(function subscriptionRepeater(next, stop) {
stop.then(function subscriptionRepeaterStopHandler() {
target.removeEventListener(topic, pubsubEventListener);
});

target.addEventListener(topic, pubsubEventListener);

function pubsubEventListener(event: PubSubEvent<TPubSubPublishArgsByKey, TKey>) {
next(event.detail);
}
});
},
subscribe,
};
};

0 comments on commit 3b85354

Please sign in to comment.