Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
fix: dispatch connection event from mock upgrader (#345)
Browse files Browse the repository at this point in the history
* Dispatch a connection event as per the upgrader interface
* Simplify args for MockConnectionManager
* Use base64pad for MockMuxer data transfer
  • Loading branch information
achingbrain authored Mar 7, 2023
1 parent 9f6eb55 commit b691b1f
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 15 deletions.
13 changes: 9 additions & 4 deletions packages/interface-mocks/src/connection-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,17 @@ class MockNetwork {

export const mockNetwork = new MockNetwork()

export interface MockConnectionManagerComponents {
peerId: PeerId
registrar: Registrar
}

class MockConnectionManager extends EventEmitter<ConnectionManagerEvents> implements ConnectionManager, Startable {
private connections: Connection[] = []
private readonly components: MockNetworkComponents
private readonly components: MockConnectionManagerComponents
private started = false

constructor (components: MockNetworkComponents) {
constructor (components: MockConnectionManagerComponents) {
super()

this.components = components
Expand Down Expand Up @@ -99,7 +104,7 @@ class MockConnectionManager extends EventEmitter<ConnectionManagerEvents> implem
this.connections.push(aToB)
;(componentsB.connectionManager as MockConnectionManager).connections.push(bToA)

this.components.connectionManager.safeDispatchEvent<Connection>('peer:connect', {
this.safeDispatchEvent<Connection>('peer:connect', {
detail: aToB
})

Expand Down Expand Up @@ -159,6 +164,6 @@ class MockConnectionManager extends EventEmitter<ConnectionManagerEvents> implem
}
}

export function mockConnectionManager (components: MockNetworkComponents): ConnectionManager {
export function mockConnectionManager (components: MockConnectionManagerComponents): ConnectionManager {
return new MockConnectionManager(components)
}
2 changes: 1 addition & 1 deletion packages/interface-mocks/src/multiaddr-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export function mockMultiaddrConnPair (opts: MockMultiaddrConnPairOptions): { in

const outbound: MultiaddrConnection = {
...outboundStream,
remoteAddr: remoteAddr.encapsulate(`/p2p/${remotePeer.toString()}`),
remoteAddr: remoteAddr.toString().includes(`/p2p/${remotePeer.toString()}`) ? remoteAddr : remoteAddr.encapsulate(`/p2p/${remotePeer.toString()}`),
timeline: {
open: Date.now()
},
Expand Down
5 changes: 2 additions & 3 deletions packages/interface-mocks/src/muxer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,10 @@ class MuxedStream {

while (list.length > 0) {
const available = Math.min(list.length, MAX_MESSAGE_SIZE)
const subList = list.subarray(0, available)
const dataMsg: DataMessage = {
id,
type: 'data',
chunk: uint8ArrayToString(subList.slice(), 'base64'),
chunk: uint8ArrayToString(list.subarray(0, available), 'base64pad'),
direction: this.type
}

Expand Down Expand Up @@ -344,7 +343,7 @@ class MockMuxer implements StreamMuxer {
}

if (message.type === 'data') {
muxedStream.input.push(new Uint8ArrayList(uint8ArrayFromString(message.chunk, 'base64')))
muxedStream.input.push(new Uint8ArrayList(uint8ArrayFromString(message.chunk, 'base64pad')))
} else if (message.type === 'reset') {
this.log('-> reset stream %s %s', muxedStream.type, muxedStream.stream.id)
muxedStream.stream.reset()
Expand Down
14 changes: 11 additions & 3 deletions packages/interface-mocks/src/upgrader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { mockConnection } from './connection.js'
import type { Upgrader, UpgraderEvents, UpgraderOptions } from '@libp2p/interface-transport'
import type { Connection, MultiaddrConnection } from '@libp2p/interface-connection'
import type { Registrar } from '@libp2p/interface-registrar'
import { EventEmitter } from '@libp2p/interfaces/events'
import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events'

export interface MockUpgraderInit {
registrar?: Registrar
Expand All @@ -18,19 +18,27 @@ class MockUpgrader extends EventEmitter<UpgraderEvents> implements Upgrader {
}

async upgradeOutbound (multiaddrConnection: MultiaddrConnection, opts: UpgraderOptions = {}): Promise<Connection> {
return mockConnection(multiaddrConnection, {
const connection = mockConnection(multiaddrConnection, {
direction: 'outbound',
registrar: this.registrar,
...opts
})

this.dispatchEvent(new CustomEvent<Connection>('connection', { detail: connection }))

return connection
}

async upgradeInbound (multiaddrConnection: MultiaddrConnection, opts: UpgraderOptions = {}): Promise<Connection> {
return mockConnection(multiaddrConnection, {
const connection = mockConnection(multiaddrConnection, {
direction: 'inbound',
registrar: this.registrar,
...opts
})

this.dispatchEvent(new CustomEvent<Connection>('connection', { detail: connection }))

return connection
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const streams = await Promise.all(Array(expectedStreams).fill(0).map(() => dialer.newStream()))
const streams = await Promise.all(Array(expectedStreams).fill(0).map(async () => await dialer.newStream()))

void Promise.all(
streams.map(async stream => {
Expand Down Expand Up @@ -89,7 +89,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const streams = await Promise.all(Array(expectedStreams).fill(0).map(() => dialer.newStream()))
const streams = await Promise.all(Array(expectedStreams).fill(0).map(async () => await dialer.newStream()))

void Promise.all(
streams.map(async stream => {
Expand Down Expand Up @@ -132,7 +132,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const streams = await Promise.all(Array(expectedStreams).fill(0).map(() => dialer.newStream()))
const streams = await Promise.all(Array(expectedStreams).fill(0).map(async () => await dialer.newStream()))

const streamPipes = streams.map(async stream => {
await pipe(
Expand Down Expand Up @@ -201,7 +201,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
void pipe(p[1], listener, p[1])

const stream = await dialer.newStream()
const streams = await Promise.all(Array.from(Array(5), () => dialer.newStream()))
const streams = await Promise.all(Array.from(Array(5), async () => await dialer.newStream()))
let closed = false
const controllers: AbortController[] = []

Expand Down

0 comments on commit b691b1f

Please sign in to comment.