Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Crypto Endeavour #41

Merged
merged 1 commit into from
Nov 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 0 additions & 19 deletions .aegir.js

This file was deleted.

3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,5 @@ build/Release
node_modules

dist
lib

test/test-repo-for*
test/test-repo-for*
24 changes: 18 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
sudo: false
language: node_js
node_js:
- 4
- stable
matrix:
include:
- node_js: 4
env: CXX=g++-4.8
- node_js: 6
env:
- SAUCE=true
- CXX=g++-4.8
- node_js: stable
env: CXX=g++-4.8

# Make sure we have new NPM.
before_install:
Expand All @@ -13,12 +20,17 @@ script:
- npm test
- npm run coverage

addons:
firefox: 'latest'

before_script:
- export DISPLAY=:99.0
- sh -e /etc/init.d/xvfb start

after_success:
- npm run coverage-publish

addons:
firefox: 'latest'
apt:
sources:
- ubuntu-toolchain-r-test
packages:
- g++-4.8
7 changes: 4 additions & 3 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,14 @@ Cancel previously requested keys.
### `putStream()`

Returns a duplex `pull-stream` that emits an object `{key: Multihash}` for every written block when it was stored.
Objects passed into here should be of the form `{data: Buffer, key: Multihash}`

### `put(block, cb)`
### `put(blockAndKey, cb)`

- `block: IpfsBlock`
- `blockAndKey: {data: Buffer, key: Multihash}`
- `cb: Function`

Announce that the current node now has the `block`. This will store it
Announce that the current node now has the block containing `data`. This will store it
in the local database and attempt to serve it to all peers that are known
to have requested it. The callback is called when we are sure that the block
is stored.
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
[![Travis CI](https://travis-ci.org/ipfs/js-ipfs-bitswap.svg?branch=master)](https://travis-ci.org/ipfs/js-ipfs-bitswap)
[![Circle CI](https://circleci.com/gh/ipfs/js-ipfs-bitswap.svg?style=svg)](https://circleci.com/gh/ipfs/js-ipfs-bitswap)
[![Dependency Status](https://david-dm.org/ipfs/js-ipfs-bitswap.svg?style=flat-square)](https://david-dm.org/ipfs/js-ipfs-bitswap) [![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/feross/standard)
![](https://img.shields.io/badge/npm-%3E%3D3.0.0-orange.svg?style=flat-square)
![](https://img.shields.io/badge/Node.js-%3E%3D4.0.0-orange.svg?style=flat-square)

[![Sauce Test Status](https://saucelabs.com/browser-matrix/js-ipfs-bitswap.svg)](https://saucelabs.com/u/js-ipfs-bitswap)

> Node.js implementation of the Bitswap 'data exchange' protocol used by IPFS

Expand Down
39 changes: 21 additions & 18 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
"name": "ipfs-bitswap",
"version": "0.7.1",
"description": "Node.js implementation of the Bitswap data exchange protocol used by IPFS",
"main": "lib/index.js",
"jsnext:main": "src/index.js",
"main": "src/index.js",
"browser": {
"libp2p-ipfs": false
},
"scripts": {
"test": "aegir-test",
"test:browser": "aegir-test browser",
Expand Down Expand Up @@ -33,38 +35,39 @@
},
"homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme",
"devDependencies": {
"aegir": "^8.0.1",
"aegir": "^9.1.1",
"buffer-loader": "0.0.1",
"chai": "^3.5.0",
"fs-pull-blob-store": "^0.4.1",
"idb-pull-blob-store": "^0.4.0",
"interface-pull-blob-store": "^0.5.0",
"ipfs-repo": "^0.9.0",
"libp2p-ipfs": "^0.14.1",
"lodash": "^4.16.2",
"idb-pull-blob-store": "^0.5.1",
"interface-pull-blob-store": "^0.6.0",
"ipfs-repo": "^0.11.1",
"libp2p-ipfs": "^0.15.0",
"lodash": "^4.16.6",
"multiaddr": "^2.0.3",
"ncp": "^2.0.0",
"peer-book": "^0.3.0",
"peer-id": "^0.7.0",
"peer-info": "^0.7.1",
"peer-id": "^0.8.0",
"peer-info": "^0.8.0",
"rimraf": "^2.5.4",
"safe-buffer": "^5.0.1"
},
"dependencies": {
"async": "^2.1.0",
"debug": "^2.3.1",
"async": "^2.1.2",
"cids": "^0.2.0",
"debug": "^2.3.2",
"heap": "^0.2.6",
"ipfs-block": "^0.3.0",
"ipfs-block": "^0.5.0",
"lodash.debounce": "^4.0.8",
"lodash.isequalwith": "^4.4.0",
"lodash.isundefined": "^3.0.1",
"multihashes": "^0.2.2",
"protocol-buffers": "^3.1.6",
"protocol-buffers": "^3.1.8",
"pull-defer": "^0.2.2",
"pull-generate": "^2.2.0",
"pull-length-prefixed": "^1.2.0",
"pull-paramap": "^1.1.6",
"pull-paramap": "^1.2.0",
"pull-pushable": "^2.0.1",
"pull-stream": "^3.4.5"
"pull-stream": "^3.5.0"
},
"contributors": [
"David Dias <[email protected]>",
Expand All @@ -74,4 +77,4 @@
"greenkeeperio-bot <[email protected]>",
"npmcdn-to-unpkg-bot <[email protected]>"
]
}
}
136 changes: 77 additions & 59 deletions src/decision/engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
const debug = require('debug')
const mh = require('multihashes')
const pull = require('pull-stream')
const generate = require('pull-generate')
const whilst = require('async/whilst')
const setImmediate = require('async/setImmediate')
const each = require('async/each')
const debounce = require('lodash.debounce')

const log = debug('bitswap:engine')
log.error = debug('bitswap:engine:error')
Expand All @@ -26,37 +29,45 @@ module.exports = class Engine {
this.peerRequestQueue = new PeerRequestQueue()

this._running = false

this._outbox = debounce(this._outboxExec.bind(this), 100)
}

_sendBlock (env, cb) {
const msg = new Message(false)
msg.addBlock(env.block)

log('Sending block to %s', env.peer.toB58String(), env.block.data.toString())

this.network.sendMessage(env.peer, msg, (err) => {
msg.addBlock(env.block, (err) => {
if (err) {
log('sendblock error: %s', err.message)
return cb(err)
}
cb(null, 'done')

log('Sending block to %s', env.peer.toB58String(), env.block.data.toString())

this.network.sendMessage(env.peer, msg, (err) => {
if (err) {
log('sendblock error: %s', err.message)
}
cb(null, 'done')
})
})
}

_outbox () {
if (!this._running) return
_outboxExec () {
let nextTask
log('outbox')

const doIt = (cb) => pull(
generate(null, (state, cb) => {
log('generating', this._running)
whilst(
() => {
if (!this._running) {
return cb(true)
return
}

const nextTask = this.peerRequestQueue.pop()
nextTask = this.peerRequestQueue.pop()
log('check', this._running && nextTask)
return Boolean(nextTask)
},
(next) => {
log('generating')
log('got task', nextTask)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two logs next to each other, better coalesce to one

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahhhh, there I don't wait for you one single time..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in master

if (!nextTask) {
return cb(true)
}

pull(
this.blockstore.getStream(nextTask.entry.key),
Expand All @@ -65,31 +76,20 @@ module.exports = class Engine {
const block = blocks[0]
if (err || !block) {
nextTask.done()
return cb(null, false)
return next()
}

cb(null, {
this._sendBlock({
peer: nextTask.target,
block: block,
sent: () => {
sent () {
nextTask.done()
}
})
}, next)
})
)
}),
pull.filter(Boolean),
pull.asyncMap(this._sendBlock.bind(this)),
pull.onEnd(cb)
}
)

if (!this._timer) {
this._timer = setTimeout(() => {
doIt(() => {
this._timer = null
})
}, 50)
}
}

wantlistForPeer (peerId) {
Expand Down Expand Up @@ -118,20 +118,25 @@ module.exports = class Engine {
ledger.wantlist = new Wantlist()
}

this._processBlocks(msg.blocks, ledger)
log('wantlist', Array.from(msg.wantlist.values()).map((e) => e.toString()))

pull(
pull.values(Array.from(msg.wantlist.values())),
pull.asyncMap((entry, cb) => {
this._processWantlist(ledger, peerId, entry, cb)
}),
pull.onEnd((err) => {
if (err) return cb(err)
this._outbox()
cb()
})
)
this._processBlocks(msg.blocks, ledger, (err) => {
if (err) {
log.error(`failed to process blocks: ${err.message}`)
}

log('wantlist', Array.from(msg.wantlist.values()).map((e) => e.toString()))

pull(
pull.values(Array.from(msg.wantlist.values())),
pull.asyncMap((entry, cb) => {
this._processWantlist(ledger, peerId, entry, cb)
}),
pull.onEnd((err) => {
if (err) return cb(err)
this._outbox()
cb()
})
)
})
}

receivedBlock (key) {
Expand Down Expand Up @@ -173,23 +178,36 @@ module.exports = class Engine {
}
}

_processBlocks (blocks, ledger) {
for (let block of blocks.values()) {
log('got block %s (%s bytes)', mh.toB58String(block.key), block.data.length)
ledger.receivedBytes(block.data.length)
_processBlocks (blocks, ledger, callback) {
each(blocks.values(), (block, cb) => {
block.key((err, key) => {
if (err) {
return cb(err)
}
log('got block %s (%s bytes)', mh.toB58String(key), block.data.length)
ledger.receivedBytes(block.data.length)

this.receivedBlock(block.key)
}
this.receivedBlock(key)
cb()
})
}, callback)
}

// Clear up all accounting things after message was sent
messageSent (peerId, msg) {
messageSent (peerId, msg, callback) {
const ledger = this._findOrCreate(peerId)
for (let block of msg.blocks.values()) {
each(msg.blocks.values(), (block, cb) => {
ledger.sentBytes(block.data.length)
ledger.wantlist.remove(block.key)
this.peerRequestQueue.remove(block.key, peerId)
}
block.key((err, key) => {
if (err) {
return cb(err)
}

ledger.wantlist.remove(key)
this.peerRequestQueue.remove(key, peerId)
cb()
})
}, callback)
}

numBytesSentTo (peerId) {
Expand Down
Loading