From 13bc2ad5a88c24a752c958bef561fb04c64ad156 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Sun, 10 Sep 2023 15:44:16 -0300 Subject: [PATCH] trick typescript into accepting our types. --- pool.ts | 9 ++++--- relay.ts | 82 +++++++++++++++++++++++++------------------------------- 2 files changed, 43 insertions(+), 48 deletions(-) diff --git a/pool.ts b/pool.ts index 76358122..ce0c95ff 100644 --- a/pool.ts +++ b/pool.ts @@ -1,4 +1,4 @@ -import { relayInit, type Relay, type Sub, type SubscriptionOptions } from './relay.ts' +import { relayInit, eventsGenerator, type Relay, type Sub, type SubscriptionOptions } from './relay.ts' import { normalizeURL } from './utils.ts' import type { Event } from './event.ts' @@ -118,10 +118,10 @@ export class SimplePool { } }) - let greaterSub: Sub = { + let greaterSub: Sub = { sub(filters, opts) { subs.forEach(sub => sub.sub(filters, opts)) - return greaterSub + return greaterSub as any }, unsub() { subs.forEach(sub => sub.unsub()) @@ -138,6 +138,9 @@ export class SimplePool { eventListeners.delete(cb) } else if (type === 'eose') eoseListeners.delete(cb as () => void | Promise) }, + get events() { + return eventsGenerator(greaterSub) + }, } return greaterSub diff --git a/relay.ts b/relay.ts index e8cf8f5a..87cdd204 100644 --- a/relay.ts +++ b/relay.ts @@ -15,7 +15,7 @@ type RelayEvent = { export type CountPayload = { count: number } -type SubEvent = { +export type SubEvent = { event: (event: Event) => void | Promise count: (payload: CountPayload) => void | Promise eose: () => void | Promise @@ -242,48 +242,7 @@ export function relayInit( } trySend([verb, subid, ...filters]) - async function* eventsGenerator(): AsyncGenerator, void, unknown> { - let nextResolve: ((event: Event) => void) | undefined - const eventQueue: Event[] = [] - - const pushToQueue = (event: Event) => { - if (nextResolve) { - nextResolve(event) - nextResolve = undefined - } else { - eventQueue.push(event) - } - } - - // Register the event listener - if (!subListeners[subid]) { - subListeners[subid] = { - event: [], - count: [], - eose: [] - } - } - subListeners[subid].event.push(pushToQueue) - - try { - while (true) { - if (eventQueue.length > 0) { - yield eventQueue.shift()! - } else { - const event = await new Promise>((resolve) => { - nextResolve = resolve - }) - yield event - } - } - } finally { - // Unregister the event listener when the generator is done - const idx = subListeners[subid].event.indexOf(pushToQueue) - if (idx >= 0) subListeners[subid].event.splice(idx, 1) - } - } - - return { + let subscription: Sub = { sub: (newFilters, newOpts = {}) => sub(newFilters || filters, { skipVerification: newOpts.skipVerification || skipVerification, @@ -309,9 +268,11 @@ export function relayInit( if (idx >= 0) listeners[type].splice(idx, 1) }, get events() { - return eventsGenerator() - } + return eventsGenerator(subscription) + }, } + + return subscription } function _publishEvent(event: Event, type: string) { @@ -404,3 +365,34 @@ export function relayInit( }, } } + +export async function* eventsGenerator(sub: Sub): AsyncGenerator, void, unknown> { + let nextResolve: ((event: Event) => void) | undefined + const eventQueue: Event[] = [] + + const pushToQueue = (event: Event) => { + if (nextResolve) { + nextResolve(event) + nextResolve = undefined + } else { + eventQueue.push(event) + } + } + + sub.on('event', pushToQueue) + + try { + while (true) { + if (eventQueue.length > 0) { + yield eventQueue.shift()! + } else { + const event = await new Promise>(resolve => { + nextResolve = resolve + }) + yield event + } + } + } finally { + sub.off('event', pushToQueue) + } +}