-
Notifications
You must be signed in to change notification settings - Fork 9
/
echo.ts
83 lines (68 loc) · 2.31 KB
/
echo.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/* eslint-env mocha */
import { expect } from 'aegir/chai'
import all from 'it-all'
import { pipe } from 'it-pipe'
import defer from 'p-defer'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import type { Daemon, DaemonFactory, SpawnOptions } from '../index.js'
export function echoStreamTests (name: string, factory: DaemonFactory, optionsA: SpawnOptions, optionsB: SpawnOptions): void {
describe(name, () => {
let daemonA: Daemon
let daemonB: Daemon
// Start Daemons
before(async function () {
this.timeout(20 * 1000)
daemonA = await factory.spawn(optionsA)
daemonB = await factory.spawn(optionsB)
// connect them
const identify0 = await daemonA.client.identify()
await daemonB.client.connect(identify0.peerId, identify0.addrs)
// jsDaemon1 will take some time to get the peers
await new Promise(resolve => setTimeout(resolve, 1000))
})
// Stop daemons
after(async function () {
await Promise.all(
[daemonA, daemonB]
.filter(Boolean)
.map(async d => { await d.stop() })
)
})
it(`${optionsA.type} sender to ${optionsB.type} listener`, async function () {
this.timeout(10 * 1000)
const receivingIdentity = await daemonB.client.identify()
const protocol = '/echo/1.0.0'
const input = [uint8ArrayFromString('hello world')]
await daemonB.client.registerStreamHandler(protocol, async (stream) => {
await pipe(
stream,
async function * (source) {
for await (const buf of source) {
yield buf.subarray()
}
},
stream
)
})
const stream = await daemonA.client.openStream(receivingIdentity.peerId, protocol)
// without this the socket can close before we receive a response
const responseReceived = defer()
const output = await pipe(
input,
async function * (source) {
yield * source
await responseReceived.promise
},
stream,
async function * (source) {
for await (const buf of source) {
yield buf.subarray()
responseReceived.resolve()
}
},
async (source) => all(source)
)
expect(output).to.deep.equal(input)
})
})
}