Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network: unit test for TrackerConnector #250

Merged
merged 5 commits into from
Oct 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/network/src/logic/node/StreamManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ export class StreamManager {
return this.getStreamsAsKeys().map((key) => StreamIdAndPartition.fromKey(key))
}

*getStreamsIterable(): IterableIterator<StreamIdAndPartition> {
for (const streamKey of this.getStreamKeys()) {
yield StreamIdAndPartition.fromKey(streamKey)
}
}

// efficient way to access streams
getStreamKeys(): IterableIterator<StreamKey> {
return this.streams.keys()
Expand Down
49 changes: 26 additions & 23 deletions packages/network/src/logic/node/TrackerConnector.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import { Utils } from 'streamr-client-protocol'
import { StreamIdAndPartition, TrackerInfo } from '../../identifiers'
import { NodeToTracker } from '../../protocol/NodeToTracker'
import { Logger } from '../../helpers/Logger'
import { PeerInfo } from '../../connection/PeerInfo'
import { TrackerId } from '../tracker/Tracker'
import { StreamManager } from './StreamManager'
import { NameDirectory } from '../../NameDirectory'

const logger = new Logger(module)
Expand All @@ -14,38 +12,34 @@ enum ConnectionState {
ERROR
}

export class TrackerConnector {
type GetStreamsFn = () => Iterable<StreamIdAndPartition>
type ConnectToTrackerFn = (trackerAddress: string, trackerPeerInfo: PeerInfo) => Promise<unknown>
type DisconnectFromTrackerFn = (trackerId: TrackerId) => void

private readonly streamManager: StreamManager
private readonly nodeToTracker: NodeToTracker
export class TrackerConnector {
private readonly getStreams: GetStreamsFn
private readonly connectToTracker: ConnectToTrackerFn
private readonly disconnectFromTracker: DisconnectFromTrackerFn
private readonly trackerRegistry: Utils.TrackerRegistry<TrackerInfo>
private maintenanceTimer?: NodeJS.Timeout | null
private readonly maintenanceInterval: number
private connectionStates: Map<TrackerId,ConnectionState>
private connectionStates: Map<TrackerId, ConnectionState>

constructor(
streamManager: StreamManager,
nodeToTracker: NodeToTracker,
getStreams: GetStreamsFn,
connectToTracker: ConnectToTrackerFn,
disconnectFromTracker: DisconnectFromTrackerFn,
trackerRegistry: Utils.TrackerRegistry<TrackerInfo>,
maintenanceInterval: number
) {
this.streamManager = streamManager
this.nodeToTracker = nodeToTracker
this.getStreams = getStreams
this.connectToTracker = connectToTracker
this.disconnectFromTracker = disconnectFromTracker
this.trackerRegistry = trackerRegistry
this.maintenanceInterval = maintenanceInterval
this.connectionStates = new Map()
}

maintainConnections(): void {
this.trackerRegistry.getAllTrackers().forEach((trackerInfo) => {
if (this.isActiveTracker(trackerInfo.id)) {
this.connectTo(trackerInfo)
} else {
this.nodeToTracker.disconnectFromTracker(trackerInfo.id)
}
})
}

onNewStream(streamId: StreamIdAndPartition): void {
const trackerInfo = this.trackerRegistry.getTracker(streamId.id, streamId.partition)
this.connectTo(trackerInfo)
Expand All @@ -66,8 +60,18 @@ export class TrackerConnector {
}
}

private maintainConnections(): void {
this.trackerRegistry.getAllTrackers().forEach((trackerInfo) => {
if (this.isActiveTracker(trackerInfo.id)) {
this.connectTo(trackerInfo)
} else {
this.disconnectFromTracker(trackerInfo.id)
}
})
}

private connectTo({ id, ws }: TrackerInfo): void {
this.nodeToTracker.connectToTracker(ws, PeerInfo.newTracker(id))
this.connectToTracker(ws, PeerInfo.newTracker(id))
.then(() => {
if (this.connectionStates.get(id) !== ConnectionState.SUCCESS) {
logger.info('Connected to tracker %s', NameDirectory.getName(id))
Expand All @@ -86,8 +90,7 @@ export class TrackerConnector {
}

private isActiveTracker(trackerId: TrackerId): boolean {
for (const streamKey of this.streamManager.getStreamKeys()) {
const { id: streamId, partition } = StreamIdAndPartition.fromKey(streamKey)
for (const { id: streamId, partition } of this.getStreams()) {
if (this.trackerRegistry.getTracker(streamId, partition).id === trackerId) {
return true
}
Expand Down
5 changes: 3 additions & 2 deletions packages/network/src/logic/node/TrackerManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ export class TrackerManager {
this.subscriber = subscriber
this.rttUpdateInterval = opts.rttUpdateTimeout || 15000
this.trackerConnector = new TrackerConnector(
streamManager,
this.nodeToTracker,
streamManager.getStreamsIterable.bind(streamManager),
this.nodeToTracker.connectToTracker.bind(this.nodeToTracker),
this.nodeToTracker.disconnectFromTracker.bind(this.nodeToTracker),
this.trackerRegistry,
opts.trackerConnectionMaintenanceInterval ?? 5000
)
Expand Down
125 changes: 125 additions & 0 deletions packages/network/test/unit/TrackerConnector.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import { Utils } from 'streamr-client-protocol'
import { TrackerConnector } from '../../src/logic/node/TrackerConnector'
import { StreamIdAndPartition, TrackerInfo } from '../../src/identifiers'
import { TrackerId } from '../../src/logic/tracker/Tracker'
import { wait } from 'streamr-test-utils'

const TTL_IN_MS = 10

const TRACKERS = [
{
id: 't1',
http: 'http://t1.xyz',
ws: 'ws://t1.xyz'
},
{
id: 't2',
http: 'http://t2.xyz',
ws: 'ws://t2.xyz'
},
{
id: 't3',
http: 'http://t3.xyz',
ws: 'ws://t3.xyz'
},
{
id: 't4',
http: 'http://t4.xyz',
ws: 'ws://t4.xyz'
},
]

const T1_STREAM = new StreamIdAndPartition('streamOne', 0)
const T2_STREAM = new StreamIdAndPartition('streamOne', 15)
const T3_STREAM = new StreamIdAndPartition('streamSix', 0)
const T4_STREAM = new StreamIdAndPartition('streamTwo', 0)

describe(TrackerConnector, () => {
let streams: Array<StreamIdAndPartition>
let activeConnections: Set<TrackerId>
let connector: TrackerConnector

beforeAll(() => {
// sanity check stream hash assignments
const trackerRegistry = new Utils.TrackerRegistry<TrackerInfo>(TRACKERS)
function checkTrackerAssignment({ id, partition }: StreamIdAndPartition, expectedTracker: TrackerInfo): void {
expect(trackerRegistry.getTracker(id, partition)).toEqual(expectedTracker)
}
checkTrackerAssignment(T1_STREAM, TRACKERS[0])
checkTrackerAssignment(T2_STREAM, TRACKERS[1])
checkTrackerAssignment(T3_STREAM, TRACKERS[2])
checkTrackerAssignment(T4_STREAM, TRACKERS[3])
})

function setUpConnector(intervalInMs: number) {
streams = []
activeConnections = new Set<TrackerId>()
connector = new TrackerConnector(
() => streams,
(_wsUrl, trackerInfo) => {
activeConnections.add(trackerInfo.peerId)
return Promise.resolve()
},
(trackerId) => {
activeConnections.delete(trackerId)
},
new Utils.TrackerRegistry<TrackerInfo>(TRACKERS),
intervalInMs
)
}

afterEach(() => {
connector?.stop()
})

it('maintains no tracker connections if no streams', async () => {
setUpConnector(TTL_IN_MS)
connector.start()
await wait(TTL_IN_MS * 2)
expect(activeConnections).toBeEmpty()
})

it('maintains tracker connections based on active streams', async () => {
setUpConnector(TTL_IN_MS)
connector.start()

streams = []
await wait(TTL_IN_MS + 1)
expect(activeConnections).toBeEmpty()

streams = [T1_STREAM]
await wait(TTL_IN_MS + 1)
expect(activeConnections).toEqual(new Set<string>(['t1']))

streams = []
await wait(TTL_IN_MS + 1)
expect(activeConnections).toBeEmpty()

streams = [T2_STREAM, T3_STREAM]
await wait(TTL_IN_MS + 1)
expect(activeConnections).toEqual(new Set<string>(['t2', 't3']))

streams = [
T4_STREAM,
T3_STREAM,
T2_STREAM
]
await wait(TTL_IN_MS + 1)
expect(activeConnections).toEqual(new Set<string>(['t2', 't3', 't4']))

streams = []
await wait(TTL_IN_MS + 1)
expect(activeConnections).toBeEmpty()
})

it('onNewStream can be used to form immediate connections', () => {
setUpConnector(1000000000)
connector.start()

connector.onNewStream(T2_STREAM)
expect(activeConnections).toEqual(new Set<string>(['t2']))

connector.onNewStream(T4_STREAM)
expect(activeConnections).toEqual(new Set<string>(['t2', 't4']))
})
})