Skip to content
This repository has been archived by the owner on Apr 29, 2020. It is now read-only.

Commit

Permalink
Merge pull request #84 from ipfs/pull
Browse files Browse the repository at this point in the history
refactor: use pull-streams
  • Loading branch information
daviddias authored Sep 7, 2016
2 parents 0eabb18 + 716e738 commit 16ad893
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 97 deletions.
3 changes: 2 additions & 1 deletion gulpfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const multiaddr = require('multiaddr')
const Node = require('libp2p-ipfs').Node
const Peer = require('peer-info')
const Id = require('peer-id')
const pull = require('pull-stream')

const sigServer = require('libp2p-webrtc-star/src/signalling-server')
let sigS
Expand All @@ -21,7 +22,7 @@ gulp.task('libnode:start', (done) => {
node = new Node(peer)
node.start(() => {
node.handle('/echo/1.0.0', (conn) => {
conn.pipe(conn)
pull(conn, conn)
})
ready()
})
Expand Down
21 changes: 12 additions & 9 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,25 @@
"homepage": "https://github.com/ipfs/js-libp2p-ipfs-browser#readme",
"devDependencies": {
"aegir": "^8.0.0",
"bl": "^1.1.2",
"chai": "^3.5.0",
"gulp": "^3.9.1",
"libp2p-ipfs": "^0.12.1",
"libp2p-ipfs": "^0.13.0",
"peer-id": "^0.7.0",
"pre-commit": "^1.1.3",
"pull-goodbye": "0.0.1",
"pull-serializer": "^0.3.2",
"pull-stream": "^3.4.5",
"run-parallel": "^1.1.6",
"webrtcsupport": "^2.2.0"
},
"dependencies": {
"babel-runtime": "^6.9.0",
"libp2p-spdy": "^0.8.1",
"libp2p-swarm": "^0.22.2",
"libp2p-webrtc-star": "^0.3.2",
"libp2p-websockets": "^0.7.1",
"mafmt": "^2.1.1",
"babel-runtime": "^6.11.6",
"libp2p-secio": "^0.4.2",
"libp2p-spdy": "^0.9.0",
"libp2p-swarm": "^0.23.0",
"libp2p-webrtc-star": "^0.4.3",
"libp2p-websockets": "^0.8.1",
"mafmt": "^2.1.2",
"multiaddr": "^2.0.2",
"peer-book": "^0.3.0",
"peer-id": "^0.7.0",
Expand All @@ -59,4 +62,4 @@
"dignifiedquire <[email protected]>",
"greenkeeperio-bot <[email protected]>"
]
}
}
6 changes: 4 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
'use strict'

const Swarm = require('libp2p-swarm')
const PeerInfo = require('peer-info')
const PeerId = require('peer-id')
const WS = require('libp2p-websockets')
const WebRTCStar = require('libp2p-webrtc-star')
const spdy = require('libp2p-spdy')
const secio = require('libp2p-secio')
const PeerInfo = require('peer-info')
const PeerId = require('peer-id')
const EE = require('events').EventEmitter
const multiaddr = require('multiaddr')
const PeerBook = require('peer-book')
Expand Down Expand Up @@ -37,6 +38,7 @@ exports.Node = function Node (pInfo, pBook) {
this.swarm = new Swarm(pInfo)
this.swarm.connection.addStreamMuxer(spdy)
this.swarm.connection.reuse()
this.swarm.connection.crypto(secio.tag, secio.encrypt)

this.swarm.on('peer-mux-established', (peerInfo) => {
this.peerBook.put(peerInfo)
Expand Down
21 changes: 12 additions & 9 deletions test/webrtc-star-only.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const multiaddr = require('multiaddr')
const PeerInfo = require('peer-info')
const peerId = require('peer-id')
const parallel = require('run-parallel')
const bl = require('bl')
const pull = require('pull-stream')

const libp2p = require('../src')

Expand Down Expand Up @@ -47,7 +47,7 @@ describe('libp2p-ipfs-browser (webrtc only)', function () {

it('handle a protocol on the first node', (done) => {
node2.handle('/echo/1.0.0', (conn) => {
conn.pipe(conn)
pull(conn, conn)
})
done()
})
Expand All @@ -65,13 +65,16 @@ describe('libp2p-ipfs-browser (webrtc only)', function () {
const peers2 = node2.peerBook.getAll()
expect(err).to.not.exist
expect(Object.keys(peers2)).to.have.length(1)
conn.pipe(bl((err, data) => {
expect(err).to.not.exist
expect(data.toString()).to.equal(text)
done()
}))
conn.write(text)
conn.end()

pull(
pull.values([Buffer(text)]),
conn,
pull.collect((err, data) => {
expect(err).to.not.exist
expect(data[0].toString()).to.equal(text)
done()
})
)
}
})
})
Expand Down
137 changes: 61 additions & 76 deletions test/websockets-only.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
/* eslint max-nested-callbacks: ["error", 8] */
/* eslint-env mocha */
'use strict'

const expect = require('chai').expect
const multiaddr = require('multiaddr')
const PeerInfo = require('peer-info')
const PeerId = require('peer-id')
const pull = require('pull-stream')
const goodbye = require('pull-goodbye')
const serializer = require('pull-serializer')

const libp2p = require('../src')
const rawPeer = require('./peer.json')
const id = PeerId.createFromPrivKey(rawPeer.privKey)
const bl = require('bl')

describe('libp2p-ipfs-browser (websockets only)', function () {
this.timeout(20 * 1000)

let peerB
let nodeA

Expand All @@ -24,6 +25,10 @@ describe('libp2p-ipfs-browser (websockets only)', function () {
done()
})

after((done) => {
nodeA.stop(done)
})

it('create libp2pNode', () => {
nodeA = new libp2p.Node()
})
Expand Down Expand Up @@ -56,13 +61,16 @@ describe('libp2p-ipfs-browser (websockets only)', function () {
const peers = nodeA.peerBook.getAll()
expect(err).to.not.exist
expect(Object.keys(peers)).to.have.length(1)
conn.pipe(bl((err, data) => {
expect(err).to.not.exist
expect(data.toString()).to.equal('hey')
done()
}))
conn.write('hey')
conn.end()

pull(
pull.values([Buffer('hey')]),
conn,
pull.collect((err, data) => {
expect(err).to.not.exist
expect(data).to.be.eql([Buffer('hey')])
done()
})
)
})
})

Expand Down Expand Up @@ -103,13 +111,16 @@ describe('libp2p-ipfs-browser (websockets only)', function () {
const peers = nodeA.peerBook.getAll()
expect(err).to.not.exist
expect(Object.keys(peers)).to.have.length(1)
conn.pipe(bl((err, data) => {
expect(err).to.not.exist
expect(data.toString()).to.equal('hey')
done()
}))
conn.write('hey')
conn.end()

pull(
pull.values([Buffer('hey')]),
conn,
pull.collect((err, data) => {
expect(err).to.not.exist
expect(data).to.be.eql([Buffer('hey')])
done()
})
)
})
})

Expand All @@ -134,70 +145,44 @@ describe('libp2p-ipfs-browser (websockets only)', function () {
it.skip('libp2p.dialById on Protocol nodeA to nodeB', (done) => {})
it.skip('libp2p.hangupById nodeA to nodeB', (done) => {})

it('stress test: one big write', (done) => {
const message = new Buffer(1000000).fill('a').toString('hex')

nodeA.dialByPeerInfo(peerB, '/echo/1.0.0', (err, conn) => {
expect(err).to.not.exist

conn.write(message)
conn.write('STOP')

let result = ''

conn.on('data', (data) => {
if (data.toString() === 'STOP') {
conn.end()
return
}
result += data.toString()
})

conn.on('end', () => {
expect(result).to.equal(message)
done()
describe('stress', () => {
it('one big write', (done) => {
nodeA.dialByPeerInfo(peerB, '/echo/1.0.0', (err, conn) => {
expect(err).to.not.exist
const rawMessage = new Buffer(1000000).fill('a')

const s = serializer(goodbye({
source: pull.values([rawMessage]),
sink: pull.collect((err, results) => {
expect(err).to.not.exist
expect(results).to.have.length(1)
expect(Buffer(results[0])).to.have.length(rawMessage.length)
done()
})
}))
pull(s, conn, s)
})
})
})

it('stress test: many writes in 2 batches', (done) => {
let expected = ''
let counter = 0

nodeA.dialByPeerInfo(peerB, '/echo/1.0.0', (err, conn) => {
expect(err).to.not.exist

while (++counter < 10000) {
conn.write(`${counter} `)
expected += `${counter} `
}

while (++counter < 20000) {
conn.write(`${counter} `)
expected += `${counter} `
}

setTimeout(() => {
conn.write('STOP')
}, 2000)

let result = ''
conn.on('data', (data) => {
if (data.toString() === 'STOP') {
conn.end()
return
}
result += data.toString()
})
it('many writes', (done) => {
nodeA.dialByPeerInfo(peerB, '/echo/1.0.0', (err, conn) => {
expect(err).to.not.exist

conn.on('end', () => {
expect(result).to.equal(expected)
done()
const s = serializer(goodbye({
source: pull(
pull.infinite(),
pull.take(1000),
pull.map((val) => Buffer(val.toString()))
),
sink: pull.collect((err, result) => {
expect(err).to.not.exist
expect(result).to.have.length(1000)
done()
})
}))

pull(s, conn, s)
})
})
})

it('stop the libp2pnode', (done) => {
nodeA.stop(done)
})
})

0 comments on commit 16ad893

Please sign in to comment.