From 4df36588d65c37e1bddeafc0dc6db02e7f21f557 Mon Sep 17 00:00:00 2001 From: Eric Andrews Date: Fri, 15 Oct 2021 01:02:53 +0300 Subject: [PATCH 1/5] feat(StreamManager): add method getStreamsIterable --- packages/network/src/logic/node/StreamManager.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/packages/network/src/logic/node/StreamManager.ts b/packages/network/src/logic/node/StreamManager.ts index 95bab09c83..72575047ef 100644 --- a/packages/network/src/logic/node/StreamManager.ts +++ b/packages/network/src/logic/node/StreamManager.ts @@ -117,6 +117,12 @@ export class StreamManager { return this.getStreamsAsKeys().map((key) => StreamIdAndPartition.fromKey(key)) } + *getStreamsIterable(): IterableIterator { + for (const streamKey of this.getStreamKeys()) { + yield StreamIdAndPartition.fromKey(streamKey) + } + } + // efficient way to access streams getStreamKeys(): IterableIterator { return this.streams.keys() From 7fd1bbcdef0e9166aab34db4425a8fd90c78189b Mon Sep 17 00:00:00 2001 From: Eric Andrews Date: Fri, 15 Oct 2021 01:03:59 +0300 Subject: [PATCH 2/5] refactor(TrackerConnector): replace class deps with function deps --- .../src/logic/node/TrackerConnector.ts | 49 ++++++++++--------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/packages/network/src/logic/node/TrackerConnector.ts b/packages/network/src/logic/node/TrackerConnector.ts index 99ce85749a..7ed53d86af 100644 --- a/packages/network/src/logic/node/TrackerConnector.ts +++ b/packages/network/src/logic/node/TrackerConnector.ts @@ -1,10 +1,8 @@ import { Utils } from 'streamr-client-protocol' import { StreamIdAndPartition, TrackerInfo } from '../../identifiers' -import { NodeToTracker } from '../../protocol/NodeToTracker' import { Logger } from '../../helpers/Logger' import { PeerInfo } from '../../connection/PeerInfo' import { TrackerId } from '../tracker/Tracker' -import { StreamManager } from './StreamManager' import { NameDirectory } from '../../NameDirectory' const logger = new Logger(module) @@ -14,38 +12,34 @@ enum ConnectionState { ERROR } -export class TrackerConnector { +type GetStreamsFn = () => Iterable +type ConnectToTrackerFn = (trackerAddress: string, trackerPeerInfo: PeerInfo) => Promise +type DisconnectFromTrackerFn = (trackerId: TrackerId) => void - private readonly streamManager: StreamManager - private readonly nodeToTracker: NodeToTracker +export class TrackerConnector { + private readonly getStreams: GetStreamsFn + private readonly connectToTracker: ConnectToTrackerFn + private readonly disconnectFromTracker: DisconnectFromTrackerFn private readonly trackerRegistry: Utils.TrackerRegistry private maintenanceTimer?: NodeJS.Timeout | null private readonly maintenanceInterval: number - private connectionStates: Map + private connectionStates: Map constructor( - streamManager: StreamManager, - nodeToTracker: NodeToTracker, + getStreams: GetStreamsFn, + connectToTracker: ConnectToTrackerFn, + disconnectFromTracker: DisconnectFromTrackerFn, trackerRegistry: Utils.TrackerRegistry, maintenanceInterval: number ) { - this.streamManager = streamManager - this.nodeToTracker = nodeToTracker + this.getStreams = getStreams + this.connectToTracker = connectToTracker + this.disconnectFromTracker = disconnectFromTracker this.trackerRegistry = trackerRegistry this.maintenanceInterval = maintenanceInterval this.connectionStates = new Map() } - maintainConnections(): void { - this.trackerRegistry.getAllTrackers().forEach((trackerInfo) => { - if (this.isActiveTracker(trackerInfo.id)) { - this.connectTo(trackerInfo) - } else { - this.nodeToTracker.disconnectFromTracker(trackerInfo.id) - } - }) - } - onNewStream(streamId: StreamIdAndPartition): void { const trackerInfo = this.trackerRegistry.getTracker(streamId.id, streamId.partition) this.connectTo(trackerInfo) @@ -66,8 +60,18 @@ export class TrackerConnector { } } + private maintainConnections(): void { + this.trackerRegistry.getAllTrackers().forEach((trackerInfo) => { + if (this.isActiveTracker(trackerInfo.id)) { + this.connectTo(trackerInfo) + } else { + this.disconnectFromTracker(trackerInfo.id) + } + }) + } + private connectTo({ id, ws }: TrackerInfo): void { - this.nodeToTracker.connectToTracker(ws, PeerInfo.newTracker(id)) + this.connectToTracker(ws, PeerInfo.newTracker(id)) .then(() => { if (this.connectionStates.get(id) !== ConnectionState.SUCCESS) { logger.info('Connected to tracker %s', NameDirectory.getName(id)) @@ -86,8 +90,7 @@ export class TrackerConnector { } private isActiveTracker(trackerId: TrackerId): boolean { - for (const streamKey of this.streamManager.getStreamKeys()) { - const { id: streamId, partition } = StreamIdAndPartition.fromKey(streamKey) + for (const { id: streamId, partition } of this.getStreams()) { if (this.trackerRegistry.getTracker(streamId, partition).id === trackerId) { return true } From 25ce02659cd3850fb93381acd11818e17792c2b1 Mon Sep 17 00:00:00 2001 From: Eric Andrews Date: Fri, 15 Oct 2021 01:04:46 +0300 Subject: [PATCH 3/5] feat(network): wip --- packages/network/src/logic/node/TrackerManager.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/network/src/logic/node/TrackerManager.ts b/packages/network/src/logic/node/TrackerManager.ts index f7a2b66306..5bcbb7cbce 100644 --- a/packages/network/src/logic/node/TrackerManager.ts +++ b/packages/network/src/logic/node/TrackerManager.ts @@ -71,8 +71,9 @@ export class TrackerManager { this.subscriber = subscriber this.rttUpdateInterval = opts.rttUpdateTimeout || 15000 this.trackerConnector = new TrackerConnector( - streamManager, - this.nodeToTracker, + streamManager.getStreamsIterable.bind(streamManager), + this.nodeToTracker.connectToTracker.bind(this.nodeToTracker), + this.nodeToTracker.disconnectFromTracker.bind(this.nodeToTracker), this.trackerRegistry, opts.trackerConnectionMaintenanceInterval ?? 5000 ) From c77eb8fdaae5b1a09a083a6c89ebed01146a3ce5 Mon Sep 17 00:00:00 2001 From: Eric Andrews Date: Fri, 15 Oct 2021 01:05:30 +0300 Subject: [PATCH 4/5] feat(TrackerConnector): add unit test --- .../test/unit/TrackerConnector.test.ts | 123 ++++++++++++++++++ 1 file changed, 123 insertions(+) create mode 100644 packages/network/test/unit/TrackerConnector.test.ts diff --git a/packages/network/test/unit/TrackerConnector.test.ts b/packages/network/test/unit/TrackerConnector.test.ts new file mode 100644 index 0000000000..7af1d07ce0 --- /dev/null +++ b/packages/network/test/unit/TrackerConnector.test.ts @@ -0,0 +1,123 @@ +import { Utils } from 'streamr-client-protocol' +import { TrackerConnector } from '../../src/logic/node/TrackerConnector' +import { StreamIdAndPartition, TrackerInfo } from '../../src/identifiers' +import { TrackerId } from '../../src/logic/tracker/Tracker' +import { wait } from 'streamr-test-utils' + +const TTL_IN_MS = 10 + +const TRACKERS = [ + { + id: 't1', + http: 'http://t1.xyz', + ws: 'ws://t1.xyz' + }, + { + id: 't2', + http: 'http://t2.xyz', + ws: 'ws://t2.xyz' + }, + { + id: 't3', + http: 'http://t3.xyz', + ws: 'ws://t3.xyz' + }, + { + id: 't4', + http: 'http://t4.xyz', + ws: 'ws://t4.xyz' + }, +] + +const STREAMS = [ + new StreamIdAndPartition('streamOne', 0), // t1 + new StreamIdAndPartition('streamOne', 15), // t2 + new StreamIdAndPartition('streamSix', 0), // t3 + new StreamIdAndPartition('streamTwo', 0) // t4 +] + +describe(TrackerConnector, () => { + let streams: Array + let activeConnections: Set + let connector: TrackerConnector + + beforeAll(() => { + // sanity check stream hash assignments + const trackerRegistry = new Utils.TrackerRegistry(TRACKERS) + for (let i = 0; i < STREAMS.length; ++i) { + expect(trackerRegistry.getTracker(STREAMS[i].id, STREAMS[i].partition)).toEqual(TRACKERS[i]) + } + }) + + function setUpConnector(intervalInMs: number) { + streams = [] + activeConnections = new Set() + connector = new TrackerConnector( + () => streams, + (_wsUrl, trackerInfo) => { + activeConnections.add(trackerInfo.peerId) + return Promise.resolve() + }, + (trackerId) => { + activeConnections.delete(trackerId) + }, + new Utils.TrackerRegistry(TRACKERS), + intervalInMs + ) + } + + afterEach(() => { + connector?.stop() + }) + + it('maintains no tracker connections if no streams', async () => { + setUpConnector(TTL_IN_MS) + connector.start() + await wait(TTL_IN_MS * 2) + expect(activeConnections).toBeEmpty() + }) + + it('maintains tracker connections based on active streams', async () => { + setUpConnector(TTL_IN_MS) + connector.start() + + streams = [] + await wait(TTL_IN_MS + 1) + expect(activeConnections).toBeEmpty() + + streams = [STREAMS[0]] + await wait(TTL_IN_MS + 1) + expect(activeConnections).toEqual(new Set(['t1'])) + + streams = [] + await wait(TTL_IN_MS + 1) + expect(activeConnections).toBeEmpty() + + streams = [STREAMS[1], STREAMS[2]] + await wait(TTL_IN_MS + 1) + expect(activeConnections).toEqual(new Set(['t2', 't3'])) + + streams = [ + STREAMS[3], + STREAMS[2], + STREAMS[1] + ] + await wait(TTL_IN_MS + 1) + expect(activeConnections).toEqual(new Set(['t2', 't3', 't4'])) + + streams = [] + await wait(TTL_IN_MS + 1) + expect(activeConnections).toBeEmpty() + }) + + it('onNewStream can be used to form immediate connections', () => { + setUpConnector(1000000000) + connector.start() + + connector.onNewStream(STREAMS[1]) + expect(activeConnections).toEqual(new Set(['t2'])) + + connector.onNewStream(STREAMS[3]) + expect(activeConnections).toEqual(new Set(['t2', 't4'])) + }) +}) From 382bab56c6719894e34dda43530bbd8ed7179593 Mon Sep 17 00:00:00 2001 From: Eric Andrews Date: Fri, 15 Oct 2021 14:12:27 +0300 Subject: [PATCH 5/5] style(network): address PR comment --- .../test/unit/TrackerConnector.test.ts | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/packages/network/test/unit/TrackerConnector.test.ts b/packages/network/test/unit/TrackerConnector.test.ts index 7af1d07ce0..5c7755f75c 100644 --- a/packages/network/test/unit/TrackerConnector.test.ts +++ b/packages/network/test/unit/TrackerConnector.test.ts @@ -29,12 +29,10 @@ const TRACKERS = [ }, ] -const STREAMS = [ - new StreamIdAndPartition('streamOne', 0), // t1 - new StreamIdAndPartition('streamOne', 15), // t2 - new StreamIdAndPartition('streamSix', 0), // t3 - new StreamIdAndPartition('streamTwo', 0) // t4 -] +const T1_STREAM = new StreamIdAndPartition('streamOne', 0) +const T2_STREAM = new StreamIdAndPartition('streamOne', 15) +const T3_STREAM = new StreamIdAndPartition('streamSix', 0) +const T4_STREAM = new StreamIdAndPartition('streamTwo', 0) describe(TrackerConnector, () => { let streams: Array @@ -44,9 +42,13 @@ describe(TrackerConnector, () => { beforeAll(() => { // sanity check stream hash assignments const trackerRegistry = new Utils.TrackerRegistry(TRACKERS) - for (let i = 0; i < STREAMS.length; ++i) { - expect(trackerRegistry.getTracker(STREAMS[i].id, STREAMS[i].partition)).toEqual(TRACKERS[i]) + function checkTrackerAssignment({ id, partition }: StreamIdAndPartition, expectedTracker: TrackerInfo): void { + expect(trackerRegistry.getTracker(id, partition)).toEqual(expectedTracker) } + checkTrackerAssignment(T1_STREAM, TRACKERS[0]) + checkTrackerAssignment(T2_STREAM, TRACKERS[1]) + checkTrackerAssignment(T3_STREAM, TRACKERS[2]) + checkTrackerAssignment(T4_STREAM, TRACKERS[3]) }) function setUpConnector(intervalInMs: number) { @@ -85,7 +87,7 @@ describe(TrackerConnector, () => { await wait(TTL_IN_MS + 1) expect(activeConnections).toBeEmpty() - streams = [STREAMS[0]] + streams = [T1_STREAM] await wait(TTL_IN_MS + 1) expect(activeConnections).toEqual(new Set(['t1'])) @@ -93,14 +95,14 @@ describe(TrackerConnector, () => { await wait(TTL_IN_MS + 1) expect(activeConnections).toBeEmpty() - streams = [STREAMS[1], STREAMS[2]] + streams = [T2_STREAM, T3_STREAM] await wait(TTL_IN_MS + 1) expect(activeConnections).toEqual(new Set(['t2', 't3'])) streams = [ - STREAMS[3], - STREAMS[2], - STREAMS[1] + T4_STREAM, + T3_STREAM, + T2_STREAM ] await wait(TTL_IN_MS + 1) expect(activeConnections).toEqual(new Set(['t2', 't3', 't4'])) @@ -114,10 +116,10 @@ describe(TrackerConnector, () => { setUpConnector(1000000000) connector.start() - connector.onNewStream(STREAMS[1]) + connector.onNewStream(T2_STREAM) expect(activeConnections).toEqual(new Set(['t2'])) - connector.onNewStream(STREAMS[3]) + connector.onNewStream(T4_STREAM) expect(activeConnections).toEqual(new Set(['t2', 't4'])) }) })