From b691b1fa28e23b549c32e89d6b7c98d6a50c7b8f Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Tue, 7 Mar 2023 12:14:20 +0100 Subject: [PATCH] fix: dispatch connection event from mock upgrader (#345) * Dispatch a connection event as per the upgrader interface * Simplify args for MockConnectionManager * Use base64pad for MockMuxer data transfer --- packages/interface-mocks/src/connection-manager.ts | 13 +++++++++---- .../interface-mocks/src/multiaddr-connection.ts | 2 +- packages/interface-mocks/src/muxer.ts | 5 ++--- packages/interface-mocks/src/upgrader.ts | 14 +++++++++++--- .../src/close-test.ts | 8 ++++---- 5 files changed, 27 insertions(+), 15 deletions(-) diff --git a/packages/interface-mocks/src/connection-manager.ts b/packages/interface-mocks/src/connection-manager.ts index ef12adbc1..9521b4b93 100644 --- a/packages/interface-mocks/src/connection-manager.ts +++ b/packages/interface-mocks/src/connection-manager.ts @@ -40,12 +40,17 @@ class MockNetwork { export const mockNetwork = new MockNetwork() +export interface MockConnectionManagerComponents { + peerId: PeerId + registrar: Registrar +} + class MockConnectionManager extends EventEmitter implements ConnectionManager, Startable { private connections: Connection[] = [] - private readonly components: MockNetworkComponents + private readonly components: MockConnectionManagerComponents private started = false - constructor (components: MockNetworkComponents) { + constructor (components: MockConnectionManagerComponents) { super() this.components = components @@ -99,7 +104,7 @@ class MockConnectionManager extends EventEmitter implem this.connections.push(aToB) ;(componentsB.connectionManager as MockConnectionManager).connections.push(bToA) - this.components.connectionManager.safeDispatchEvent('peer:connect', { + this.safeDispatchEvent('peer:connect', { detail: aToB }) @@ -159,6 +164,6 @@ class MockConnectionManager extends EventEmitter implem } } -export function mockConnectionManager (components: MockNetworkComponents): ConnectionManager { +export function mockConnectionManager (components: MockConnectionManagerComponents): ConnectionManager { return new MockConnectionManager(components) } diff --git a/packages/interface-mocks/src/multiaddr-connection.ts b/packages/interface-mocks/src/multiaddr-connection.ts index 6fe40bd82..9a50f35b8 100644 --- a/packages/interface-mocks/src/multiaddr-connection.ts +++ b/packages/interface-mocks/src/multiaddr-connection.ts @@ -37,7 +37,7 @@ export function mockMultiaddrConnPair (opts: MockMultiaddrConnPairOptions): { in const outbound: MultiaddrConnection = { ...outboundStream, - remoteAddr: remoteAddr.encapsulate(`/p2p/${remotePeer.toString()}`), + remoteAddr: remoteAddr.toString().includes(`/p2p/${remotePeer.toString()}`) ? remoteAddr : remoteAddr.encapsulate(`/p2p/${remotePeer.toString()}`), timeline: { open: Date.now() }, diff --git a/packages/interface-mocks/src/muxer.ts b/packages/interface-mocks/src/muxer.ts index 14a1927d9..3eda1b6bd 100644 --- a/packages/interface-mocks/src/muxer.ts +++ b/packages/interface-mocks/src/muxer.ts @@ -152,11 +152,10 @@ class MuxedStream { while (list.length > 0) { const available = Math.min(list.length, MAX_MESSAGE_SIZE) - const subList = list.subarray(0, available) const dataMsg: DataMessage = { id, type: 'data', - chunk: uint8ArrayToString(subList.slice(), 'base64'), + chunk: uint8ArrayToString(list.subarray(0, available), 'base64pad'), direction: this.type } @@ -344,7 +343,7 @@ class MockMuxer implements StreamMuxer { } if (message.type === 'data') { - muxedStream.input.push(new Uint8ArrayList(uint8ArrayFromString(message.chunk, 'base64'))) + muxedStream.input.push(new Uint8ArrayList(uint8ArrayFromString(message.chunk, 'base64pad'))) } else if (message.type === 'reset') { this.log('-> reset stream %s %s', muxedStream.type, muxedStream.stream.id) muxedStream.stream.reset() diff --git a/packages/interface-mocks/src/upgrader.ts b/packages/interface-mocks/src/upgrader.ts index af3a340fc..40b0781d0 100644 --- a/packages/interface-mocks/src/upgrader.ts +++ b/packages/interface-mocks/src/upgrader.ts @@ -2,7 +2,7 @@ import { mockConnection } from './connection.js' import type { Upgrader, UpgraderEvents, UpgraderOptions } from '@libp2p/interface-transport' import type { Connection, MultiaddrConnection } from '@libp2p/interface-connection' import type { Registrar } from '@libp2p/interface-registrar' -import { EventEmitter } from '@libp2p/interfaces/events' +import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events' export interface MockUpgraderInit { registrar?: Registrar @@ -18,19 +18,27 @@ class MockUpgrader extends EventEmitter implements Upgrader { } async upgradeOutbound (multiaddrConnection: MultiaddrConnection, opts: UpgraderOptions = {}): Promise { - return mockConnection(multiaddrConnection, { + const connection = mockConnection(multiaddrConnection, { direction: 'outbound', registrar: this.registrar, ...opts }) + + this.dispatchEvent(new CustomEvent('connection', { detail: connection })) + + return connection } async upgradeInbound (multiaddrConnection: MultiaddrConnection, opts: UpgraderOptions = {}): Promise { - return mockConnection(multiaddrConnection, { + const connection = mockConnection(multiaddrConnection, { direction: 'inbound', registrar: this.registrar, ...opts }) + + this.dispatchEvent(new CustomEvent('connection', { detail: connection })) + + return connection } } diff --git a/packages/interface-stream-muxer-compliance-tests/src/close-test.ts b/packages/interface-stream-muxer-compliance-tests/src/close-test.ts index 438437fdc..95cbfa571 100644 --- a/packages/interface-stream-muxer-compliance-tests/src/close-test.ts +++ b/packages/interface-stream-muxer-compliance-tests/src/close-test.ts @@ -47,7 +47,7 @@ export default (common: TestSetup): void => { void pipe(p[0], dialer, p[0]) void pipe(p[1], listener, p[1]) - const streams = await Promise.all(Array(expectedStreams).fill(0).map(() => dialer.newStream())) + const streams = await Promise.all(Array(expectedStreams).fill(0).map(async () => await dialer.newStream())) void Promise.all( streams.map(async stream => { @@ -89,7 +89,7 @@ export default (common: TestSetup): void => { void pipe(p[0], dialer, p[0]) void pipe(p[1], listener, p[1]) - const streams = await Promise.all(Array(expectedStreams).fill(0).map(() => dialer.newStream())) + const streams = await Promise.all(Array(expectedStreams).fill(0).map(async () => await dialer.newStream())) void Promise.all( streams.map(async stream => { @@ -132,7 +132,7 @@ export default (common: TestSetup): void => { void pipe(p[0], dialer, p[0]) void pipe(p[1], listener, p[1]) - const streams = await Promise.all(Array(expectedStreams).fill(0).map(() => dialer.newStream())) + const streams = await Promise.all(Array(expectedStreams).fill(0).map(async () => await dialer.newStream())) const streamPipes = streams.map(async stream => { await pipe( @@ -201,7 +201,7 @@ export default (common: TestSetup): void => { void pipe(p[1], listener, p[1]) const stream = await dialer.newStream() - const streams = await Promise.all(Array.from(Array(5), () => dialer.newStream())) + const streams = await Promise.all(Array.from(Array(5), async () => await dialer.newStream())) let closed = false const controllers: AbortController[] = []