Skip to content

Commit

Permalink
trick typescript into accepting our types.
Browse files Browse the repository at this point in the history
  • Loading branch information
fiatjaf committed Sep 10, 2023
1 parent 55f032d commit 13bc2ad
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 48 deletions.
9 changes: 6 additions & 3 deletions pool.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { relayInit, type Relay, type Sub, type SubscriptionOptions } from './relay.ts'
import { relayInit, eventsGenerator, type Relay, type Sub, type SubscriptionOptions } from './relay.ts'
import { normalizeURL } from './utils.ts'

import type { Event } from './event.ts'
Expand Down Expand Up @@ -118,10 +118,10 @@ export class SimplePool {
}
})

let greaterSub: Sub = {
let greaterSub: Sub<K> = {
sub(filters, opts) {
subs.forEach(sub => sub.sub(filters, opts))
return greaterSub
return greaterSub as any
},
unsub() {
subs.forEach(sub => sub.unsub())
Expand All @@ -138,6 +138,9 @@ export class SimplePool {
eventListeners.delete(cb)
} else if (type === 'eose') eoseListeners.delete(cb as () => void | Promise<void>)
},
get events() {
return eventsGenerator(greaterSub)
},
}

return greaterSub
Expand Down
82 changes: 37 additions & 45 deletions relay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type RelayEvent = {
export type CountPayload = {
count: number
}
type SubEvent<K extends number> = {
export type SubEvent<K extends number> = {
event: (event: Event<K>) => void | Promise<void>
count: (payload: CountPayload) => void | Promise<void>
eose: () => void | Promise<void>
Expand Down Expand Up @@ -242,48 +242,7 @@ export function relayInit(
}
trySend([verb, subid, ...filters])

async function* eventsGenerator(): AsyncGenerator<Event<K>, void, unknown> {
let nextResolve: ((event: Event<K>) => void) | undefined
const eventQueue: Event<K>[] = []

const pushToQueue = (event: Event<K>) => {
if (nextResolve) {
nextResolve(event)
nextResolve = undefined
} else {
eventQueue.push(event)
}
}

// Register the event listener
if (!subListeners[subid]) {
subListeners[subid] = {
event: [],
count: [],
eose: []
}
}
subListeners[subid].event.push(pushToQueue)

try {
while (true) {
if (eventQueue.length > 0) {
yield eventQueue.shift()!
} else {
const event = await new Promise<Event<K>>((resolve) => {
nextResolve = resolve
})
yield event
}
}
} finally {
// Unregister the event listener when the generator is done
const idx = subListeners[subid].event.indexOf(pushToQueue)
if (idx >= 0) subListeners[subid].event.splice(idx, 1)
}
}

return {
let subscription: Sub<K> = {
sub: (newFilters, newOpts = {}) =>
sub(newFilters || filters, {
skipVerification: newOpts.skipVerification || skipVerification,
Expand All @@ -309,9 +268,11 @@ export function relayInit(
if (idx >= 0) listeners[type].splice(idx, 1)
},
get events() {
return eventsGenerator()
}
return eventsGenerator(subscription)
},
}

return subscription
}

function _publishEvent(event: Event<number>, type: string) {
Expand Down Expand Up @@ -404,3 +365,34 @@ export function relayInit(
},
}
}

export async function* eventsGenerator<K extends number>(sub: Sub<K>): AsyncGenerator<Event<K>, void, unknown> {
let nextResolve: ((event: Event<K>) => void) | undefined
const eventQueue: Event<K>[] = []

const pushToQueue = (event: Event<K>) => {
if (nextResolve) {
nextResolve(event)
nextResolve = undefined
} else {
eventQueue.push(event)
}
}

sub.on('event', pushToQueue)

try {
while (true) {
if (eventQueue.length > 0) {
yield eventQueue.shift()!
} else {
const event = await new Promise<Event<K>>(resolve => {
nextResolve = resolve
})
yield event
}
}
} finally {
sub.off('event', pushToQueue)
}
}

0 comments on commit 13bc2ad

Please sign in to comment.