diff --git a/packages/network/src/logic/node/StreamManager.ts b/packages/network/src/logic/node/StreamManager.ts index 95bab09c83..72575047ef 100644 --- a/packages/network/src/logic/node/StreamManager.ts +++ b/packages/network/src/logic/node/StreamManager.ts @@ -117,6 +117,12 @@ export class StreamManager { return this.getStreamsAsKeys().map((key) => StreamIdAndPartition.fromKey(key)) } + *getStreamsIterable(): IterableIterator { + for (const streamKey of this.getStreamKeys()) { + yield StreamIdAndPartition.fromKey(streamKey) + } + } + // efficient way to access streams getStreamKeys(): IterableIterator { return this.streams.keys() diff --git a/packages/network/src/logic/node/TrackerConnector.ts b/packages/network/src/logic/node/TrackerConnector.ts index 99ce85749a..7ed53d86af 100644 --- a/packages/network/src/logic/node/TrackerConnector.ts +++ b/packages/network/src/logic/node/TrackerConnector.ts @@ -1,10 +1,8 @@ import { Utils } from 'streamr-client-protocol' import { StreamIdAndPartition, TrackerInfo } from '../../identifiers' -import { NodeToTracker } from '../../protocol/NodeToTracker' import { Logger } from '../../helpers/Logger' import { PeerInfo } from '../../connection/PeerInfo' import { TrackerId } from '../tracker/Tracker' -import { StreamManager } from './StreamManager' import { NameDirectory } from '../../NameDirectory' const logger = new Logger(module) @@ -14,38 +12,34 @@ enum ConnectionState { ERROR } -export class TrackerConnector { +type GetStreamsFn = () => Iterable +type ConnectToTrackerFn = (trackerAddress: string, trackerPeerInfo: PeerInfo) => Promise +type DisconnectFromTrackerFn = (trackerId: TrackerId) => void - private readonly streamManager: StreamManager - private readonly nodeToTracker: NodeToTracker +export class TrackerConnector { + private readonly getStreams: GetStreamsFn + private readonly connectToTracker: ConnectToTrackerFn + private readonly disconnectFromTracker: DisconnectFromTrackerFn private readonly trackerRegistry: Utils.TrackerRegistry private maintenanceTimer?: NodeJS.Timeout | null private readonly maintenanceInterval: number - private connectionStates: Map + private connectionStates: Map constructor( - streamManager: StreamManager, - nodeToTracker: NodeToTracker, + getStreams: GetStreamsFn, + connectToTracker: ConnectToTrackerFn, + disconnectFromTracker: DisconnectFromTrackerFn, trackerRegistry: Utils.TrackerRegistry, maintenanceInterval: number ) { - this.streamManager = streamManager - this.nodeToTracker = nodeToTracker + this.getStreams = getStreams + this.connectToTracker = connectToTracker + this.disconnectFromTracker = disconnectFromTracker this.trackerRegistry = trackerRegistry this.maintenanceInterval = maintenanceInterval this.connectionStates = new Map() } - maintainConnections(): void { - this.trackerRegistry.getAllTrackers().forEach((trackerInfo) => { - if (this.isActiveTracker(trackerInfo.id)) { - this.connectTo(trackerInfo) - } else { - this.nodeToTracker.disconnectFromTracker(trackerInfo.id) - } - }) - } - onNewStream(streamId: StreamIdAndPartition): void { const trackerInfo = this.trackerRegistry.getTracker(streamId.id, streamId.partition) this.connectTo(trackerInfo) @@ -66,8 +60,18 @@ export class TrackerConnector { } } + private maintainConnections(): void { + this.trackerRegistry.getAllTrackers().forEach((trackerInfo) => { + if (this.isActiveTracker(trackerInfo.id)) { + this.connectTo(trackerInfo) + } else { + this.disconnectFromTracker(trackerInfo.id) + } + }) + } + private connectTo({ id, ws }: TrackerInfo): void { - this.nodeToTracker.connectToTracker(ws, PeerInfo.newTracker(id)) + this.connectToTracker(ws, PeerInfo.newTracker(id)) .then(() => { if (this.connectionStates.get(id) !== ConnectionState.SUCCESS) { logger.info('Connected to tracker %s', NameDirectory.getName(id)) @@ -86,8 +90,7 @@ export class TrackerConnector { } private isActiveTracker(trackerId: TrackerId): boolean { - for (const streamKey of this.streamManager.getStreamKeys()) { - const { id: streamId, partition } = StreamIdAndPartition.fromKey(streamKey) + for (const { id: streamId, partition } of this.getStreams()) { if (this.trackerRegistry.getTracker(streamId, partition).id === trackerId) { return true } diff --git a/packages/network/src/logic/node/TrackerManager.ts b/packages/network/src/logic/node/TrackerManager.ts index f7a2b66306..5bcbb7cbce 100644 --- a/packages/network/src/logic/node/TrackerManager.ts +++ b/packages/network/src/logic/node/TrackerManager.ts @@ -71,8 +71,9 @@ export class TrackerManager { this.subscriber = subscriber this.rttUpdateInterval = opts.rttUpdateTimeout || 15000 this.trackerConnector = new TrackerConnector( - streamManager, - this.nodeToTracker, + streamManager.getStreamsIterable.bind(streamManager), + this.nodeToTracker.connectToTracker.bind(this.nodeToTracker), + this.nodeToTracker.disconnectFromTracker.bind(this.nodeToTracker), this.trackerRegistry, opts.trackerConnectionMaintenanceInterval ?? 5000 ) diff --git a/packages/network/test/unit/TrackerConnector.test.ts b/packages/network/test/unit/TrackerConnector.test.ts new file mode 100644 index 0000000000..5c7755f75c --- /dev/null +++ b/packages/network/test/unit/TrackerConnector.test.ts @@ -0,0 +1,125 @@ +import { Utils } from 'streamr-client-protocol' +import { TrackerConnector } from '../../src/logic/node/TrackerConnector' +import { StreamIdAndPartition, TrackerInfo } from '../../src/identifiers' +import { TrackerId } from '../../src/logic/tracker/Tracker' +import { wait } from 'streamr-test-utils' + +const TTL_IN_MS = 10 + +const TRACKERS = [ + { + id: 't1', + http: 'http://t1.xyz', + ws: 'ws://t1.xyz' + }, + { + id: 't2', + http: 'http://t2.xyz', + ws: 'ws://t2.xyz' + }, + { + id: 't3', + http: 'http://t3.xyz', + ws: 'ws://t3.xyz' + }, + { + id: 't4', + http: 'http://t4.xyz', + ws: 'ws://t4.xyz' + }, +] + +const T1_STREAM = new StreamIdAndPartition('streamOne', 0) +const T2_STREAM = new StreamIdAndPartition('streamOne', 15) +const T3_STREAM = new StreamIdAndPartition('streamSix', 0) +const T4_STREAM = new StreamIdAndPartition('streamTwo', 0) + +describe(TrackerConnector, () => { + let streams: Array + let activeConnections: Set + let connector: TrackerConnector + + beforeAll(() => { + // sanity check stream hash assignments + const trackerRegistry = new Utils.TrackerRegistry(TRACKERS) + function checkTrackerAssignment({ id, partition }: StreamIdAndPartition, expectedTracker: TrackerInfo): void { + expect(trackerRegistry.getTracker(id, partition)).toEqual(expectedTracker) + } + checkTrackerAssignment(T1_STREAM, TRACKERS[0]) + checkTrackerAssignment(T2_STREAM, TRACKERS[1]) + checkTrackerAssignment(T3_STREAM, TRACKERS[2]) + checkTrackerAssignment(T4_STREAM, TRACKERS[3]) + }) + + function setUpConnector(intervalInMs: number) { + streams = [] + activeConnections = new Set() + connector = new TrackerConnector( + () => streams, + (_wsUrl, trackerInfo) => { + activeConnections.add(trackerInfo.peerId) + return Promise.resolve() + }, + (trackerId) => { + activeConnections.delete(trackerId) + }, + new Utils.TrackerRegistry(TRACKERS), + intervalInMs + ) + } + + afterEach(() => { + connector?.stop() + }) + + it('maintains no tracker connections if no streams', async () => { + setUpConnector(TTL_IN_MS) + connector.start() + await wait(TTL_IN_MS * 2) + expect(activeConnections).toBeEmpty() + }) + + it('maintains tracker connections based on active streams', async () => { + setUpConnector(TTL_IN_MS) + connector.start() + + streams = [] + await wait(TTL_IN_MS + 1) + expect(activeConnections).toBeEmpty() + + streams = [T1_STREAM] + await wait(TTL_IN_MS + 1) + expect(activeConnections).toEqual(new Set(['t1'])) + + streams = [] + await wait(TTL_IN_MS + 1) + expect(activeConnections).toBeEmpty() + + streams = [T2_STREAM, T3_STREAM] + await wait(TTL_IN_MS + 1) + expect(activeConnections).toEqual(new Set(['t2', 't3'])) + + streams = [ + T4_STREAM, + T3_STREAM, + T2_STREAM + ] + await wait(TTL_IN_MS + 1) + expect(activeConnections).toEqual(new Set(['t2', 't3', 't4'])) + + streams = [] + await wait(TTL_IN_MS + 1) + expect(activeConnections).toBeEmpty() + }) + + it('onNewStream can be used to form immediate connections', () => { + setUpConnector(1000000000) + connector.start() + + connector.onNewStream(T2_STREAM) + expect(activeConnections).toEqual(new Set(['t2'])) + + connector.onNewStream(T4_STREAM) + expect(activeConnections).toEqual(new Set(['t2', 't4'])) + }) +})