Skip to content
This repository has been archived by the owner on Aug 11, 2021. It is now read-only.

Commit

Permalink
refactor: Migrate to pull-streams
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire authored and daviddias committed Sep 8, 2016
1 parent 6019771 commit 3bc5596
Show file tree
Hide file tree
Showing 45 changed files with 195 additions and 272 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ language: node_js
node_js:
- 4
- 5
- "stable"

# Make sure we have new NPM.
before_install:
Expand Down
59 changes: 25 additions & 34 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@ const BlockService = require('ipfs-block-service')

### `new BlockService(repo)`

- `repo: Repo`

Creates a new block service backed by [IPFS Repo][repo] `repo` for storage.

### `goOnline(bitswap)`

- `bitswap: Bitswap`

Add a bitswap instance that communicates with the network to retreive blocks
that are not in the local store.

Expand All @@ -24,53 +28,40 @@ Remove the bitswap instance and fall back to offline mode.

Returns a `Boolean` indicating if the block service is online or not.

### `addBlock(block, callback(err))`
### `put(block, callback)`

- `block: Block`
- `callback: Function`

Asynchronously adds a block instance to the underlying repo.

### `addBlocks(blocks, callback(err))`
### `putStream()`

Asynchronously adds an array of block instances to the underlying repo.
Returns a through pull-stream, which `Block`s can be written to, and
that emits the meta data about the written block.

*Does not guarantee atomicity.*
### `get(multihash [, extension], callback)`

### `getBlock(multihash, callback(err, block))`
- `multihash: Multihash`
- `extension: String`, defaults to 'data'
- `callback: Function`

Asynchronously returns the block whose content multihash matches `multihash`.
Returns an error (`err.code === 'ENOENT'`) if the block does not exist.

If the block could not be found, expect `err.code` to be `'ENOENT'`.

### `getBlocks(multihashes, callback(err, blocks))`

Asynchronously returns the blocks whose content multihashes match the array
`multihashes`.

`blocks` is an object that maps each `multihash` to an object of the form

```js
{
err: Error
block: Block
}
```

Expect `blocks[multihash].err.code === 'ENOENT'` and `blocks[multihash].block
=== null` if a block did not exist.

*Does not guarantee atomicity.*
### `getStream(multihash [, extension])`

### `deleteBlock(multihash, callback(err))`
- `multihash: Multihash`
- `extension: String`, defaults to 'data'

Asynchronously deletes the block from the store with content multihash matching
`multihash`, if it exists.
Returns a source pull-stream, which emits the requested block.

### `bs.deleteBlocks(multihashes, callback(err))`
### `delete(multihashes, [, extension], callback)`

Asynchronously deletes all blocks from the store with content multihashes matching
from the array `multihashes`.
- `multihashes: Multihash|[]Multihash`
- `extension: String`, defaults to 'data'- `extension: String`, defaults to 'data'
- `callback: Function`

*Does not guarantee atomicity.*
Deletes all blocks referenced by multihashes.

[multihash]: https://github.com/jbenet/js-multihash
[multihash]: https://github.com/multiformats/js-multihash
[repo]: https://github.com/ipfs/specs/tree/master/repo
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ const BlockService = require('ipfs-block-service')
const BlockService = require('ipfs-block-service')
const Block = require('ipfs-block')
const IPFSRepo = require('ipfs-repo') // storage repo
const memstore = require('abstract-blob-store') // in-memory store
const Store = require(interface-pull-blob-store') // in-memory store
// setup a repo
var repo = new IPFSRepo('example', { stores: memstore })
var repo = new IPFSRepo('example', { stores: Store })
// create a block
const block = new Block('hello warld')
const block = new Block('hello world)
console.log(block.data)
console.log(block.key)

Expand Down
17 changes: 8 additions & 9 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,21 @@
},
"homepage": "https://github.com/ipfs/js-ipfs-block-service#readme",
"devDependencies": {
"aegir": "^3.0.0",
"bs58": "^3.0.0",
"aegir": "^8.0.0",
"buffer-loader": "0.0.1",
"chai": "^3.5.0",
"fs-blob-store": "^5.2.1",
"idb-plus-blob-store": "^1.1.2",
"fs-pull-blob-store": "^0.3.0",
"idb-pull-blob-store": "^0.4.0",
"ipfs-block": "^0.3.0",
"ipfs-repo": "^0.7.1",
"lodash": "^4.8.2",
"ipfs-repo": "^0.9.0",
"lodash": "^4.15.0",
"ncp": "^2.0.0",
"pre-commit": "^1.1.2",
"rimraf": "^2.5.1",
"pre-commit": "^1.1.3",
"rimraf": "^2.5.4",
"run-series": "^1.1.4"
},
"dependencies": {
"multihashes": "^0.2.2",
"pull-stream": "^3.4.5",
"run-parallel-limit": "^1.0.3"
},
"contributors": [
Expand Down
90 changes: 32 additions & 58 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const parallelLimit = require('run-parallel-limit')
const mh = require('multihashes')
const pull = require('pull-stream')

// BlockService is a hybrid block datastore. It stores data in a local
// datastore and may retrieve data from a remote Exchange.
Expand All @@ -24,88 +24,62 @@ module.exports = class BlockService {
return this._bitswap != null
}

addBlock (block, extension, callback) {
if (this.isOnline()) {
if (typeof extension === 'function') {
callback = extension
extension = undefined
}

this._bitswap.hasBlock(block, callback)
} else {
this._repo.datastore.put(block, extension, callback)
}
}

addBlocks (blocks, callback) {
if (!Array.isArray(blocks)) {
return callback(new Error('expects an array of Blocks'))
put (block, callback) {
callback = callback || (() => {})
if (!block) {
return callback(new Error('Missing block'))
}

parallelLimit(blocks.map((block) => (next) => {
this.addBlock(block, next)
}), 100, callback)
pull(
pull.values([block]),
this.putStream(),
pull.onEnd(callback)
)
}

getBlock (key, extension, callback) {
putStream () {
if (this.isOnline()) {
if (typeof extension === 'function') {
callback = extension
extension = undefined
}

this._bitswap.getBlock(key, callback)
} else {
this._repo.datastore.get(key, extension, callback)
return this._bitswap.putStream()
}

return this._repo.blockstore.putStream()
}

getBlocks (multihashes, extension, callback) {
get (key, extension, callback) {
if (typeof extension === 'function') {
callback = extension
extension = undefined
}

if (!Array.isArray(multihashes)) {
return callback(new Error('Invalid batch of multihashes'))
}
pull(
this.getStream(key, extension),
pull.collect((err, result) => {
if (err) return callback(err)
callback(null, result[0])
})
)
}

getStream (key, extension) {
if (this.isOnline()) {
this._bitswap.getBlocks(multihashes, (results) => {
callback(null, results)
})
return
return this._bitswap.getStream(key)
}

const results = {}
parallelLimit(multihashes.map((key) => (next) => {
this._repo.datastore.get(key, extension, (error, block) => {
results[mh.toB58String(key)] = {error, block}
next()
})
}), 100, (err) => {
callback(err, results)
})
return this._repo.blockstore.getStream(key, extension)
}

deleteBlock (key, extension, callback) {
this._repo.datastore.delete(key, extension, callback)
}

deleteBlocks (multihashes, extension, callback) {
delete (keys, extension, callback) {
if (typeof extension === 'function') {
callback = extension
extension = undefined
}

if (!Array.isArray(multihashes)) {
return callback(new Error('Invalid batch of multihashes'))
if (!Array.isArray(keys)) {
keys = [keys]
}

parallelLimit(multihashes.map((multihash) => (next) => {
this.deleteBlock(multihash, extension, next)
}), 100, (err) => {
callback(err)
})
parallelLimit(keys.map((key) => (next) => {
this._repo.blockstore.delete(key, extension, next)
}), 100, callback)
}
}
Loading

0 comments on commit 3bc5596

Please sign in to comment.