Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

Commit

Permalink
feat: track progress events
Browse files Browse the repository at this point in the history
  • Loading branch information
dryajov committed Oct 2, 2017
1 parent c749c36 commit 490ea61
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 23 deletions.
2 changes: 1 addition & 1 deletion examples/upload-file-via-browser/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"devDependencies": {
"babel-core": "^5.4.7",
"babel-loader": "^5.1.2",
"ipfs-api": "^12.1.7",
"ipfs-api": "../../",
"json-loader": "^0.5.4",
"react": "^15.4.2",
"react-dom": "^15.4.2",
Expand Down
2 changes: 1 addition & 1 deletion examples/upload-file-via-browser/src/App.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class App extends React.Component {
saveToIpfs (reader) {
let ipfsId
const buffer = Buffer.from(reader.result)
this.ipfsApi.add(buffer)
this.ipfsApi.add(buffer, { progress: (prog) => console.log(`received: ${prog}`) })
.then((response) => {
console.log(response)
ipfsId = response[0].hash
Expand Down
11 changes: 7 additions & 4 deletions src/files/add.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const isStream = require('is-stream')
const promisify = require('promisify-es6')
const DAGNodeStream = require('../utils/dagnode-stream')
const ProgressStream = require('../utils/progress-stream')

module.exports = (send) => {
return promisify((files, opts, callback) => {
Expand All @@ -14,8 +15,8 @@ module.exports = (send) => {
opts = opts || {}

const ok = Buffer.isBuffer(files) ||
isStream.readable(files) ||
Array.isArray(files)
isStream.readable(files) ||
Array.isArray(files)

if (!ok) {
return callback(new Error('"files" must be a buffer, readable stream, or array of objects'))
Expand All @@ -41,10 +42,12 @@ module.exports = (send) => {
qs.hash = opts.hashAlg
}

const request = { path: 'add', files: files, qs: opts, progress: opts.progress }
const request = { path: 'add', files: files, qs: qs, progress: opts.progress }

// Transform the response stream to DAGNode values
const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback)
const transform = (res, callback) => DAGNodeStream.streamToValue(send,
ProgressStream.fromStream(opts.progress, res),
callback)
send.andTransform(request, transform, callback)
})
}
50 changes: 50 additions & 0 deletions src/utils/progress-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
'use strict'

const Transform = require('readable-stream').Transform

/*
A transform stream to track progress events on file upload
When the progress flag is passed to the HTTP api, the stream
emits progress events like such:
{
Name string
Hash string `json:",omitempty"`
Bytes int64 `json:",omitempty"`
Size string `json:",omitempty"`
}
This class will take care of detecting such
events and calling the associated track method
with the bytes sent so far as parameter. It will
also skip them from the stream, emitting only
when the final object has been uploaded and we
got a hash.
*/
class ProgressStream extends Transform {
constructor (opts) {
opts = Object.assign(opts || {}, { objectMode: true })
super(opts)
this._track = opts.track || (() => {})
this._res = []
}

static fromStream (track, stream) {
const prog = new ProgressStream({ track })
return stream.pipe(prog)
}

_transform (chunk, encoding, callback) {
if (chunk &&
typeof chunk.Bytes !== 'undefined' &&
typeof chunk.Hash === 'undefined') {
this._track(chunk.Bytes)
return callback()
}

callback(null, chunk)
}
}

module.exports = ProgressStream
16 changes: 4 additions & 12 deletions src/utils/request-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ const getFilesStream = require('./get-files-stream')
const streamToValue = require('./stream-to-value')
const streamToJsonValue = require('./stream-to-json-value')
const request = require('./request')
const Transform = require('readable-stream').Transform

// -- Internal

Expand Down Expand Up @@ -82,6 +81,9 @@ function requestAPI (config, options, callback) {
if (options.files && !Array.isArray(options.files)) {
options.files = [options.files]
}
if (options.progress) {
options.qs.progress = true
}

if (options.qs.r) {
options.qs.recursive = options.qs.r
Expand Down Expand Up @@ -161,17 +163,7 @@ function requestAPI (config, options, callback) {
})

if (options.files) {
if (options.progress && typeof options.progress === 'function') {
const progressStream = new Transform({
transform: (chunk, encoding, cb) => {
options.progress(chunk.byteLength)
cb(null, chunk)
}
})
stream.pipe(progressStream).pipe(req)
} else {
stream.pipe(req)
}
stream.pipe(req)
} else {
req.end()
}
Expand Down
14 changes: 9 additions & 5 deletions test/files.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,12 @@ describe('.files (the MFS API part)', function () {
})

it.only('files.add with progress options', (done) => {
ipfs.files.add(testfile, {progress: false}, (err, res) => {
let progress = 0
ipfs.files.add(testfile, { progress: (p) => { progress = p } }, (err, res) => {
expect(err).to.not.exist()

expect(res).to.have.length(1)
expect(progress).to.be.greaterThan(0)
done()
})
})
Expand Down Expand Up @@ -158,7 +160,7 @@ describe('.files (the MFS API part)', function () {

it('files.write', (done) => {
ipfs.files
.write('/test-folder/test-file-2.txt', Buffer.from('hello world'), {create: true}, (err) => {
.write('/test-folder/test-file-2.txt', Buffer.from('hello world'), { create: true }, (err) => {
expect(err).to.not.exist()

ipfs.files.read('/test-folder/test-file-2.txt', (err, stream) => {
Expand Down Expand Up @@ -254,7 +256,7 @@ describe('.files (the MFS API part)', function () {
})

it('files.rm', (done) => {
ipfs.files.rm('/test-folder', {recursive: true}, done)
ipfs.files.rm('/test-folder', { recursive: true }, done)
})
})

Expand Down Expand Up @@ -330,7 +332,7 @@ describe('.files (the MFS API part)', function () {

it('files.write', (done) => {
ipfs.files
.write('/test-folder/test-file-2.txt', Buffer.from('hello world'), {create: true})
.write('/test-folder/test-file-2.txt', Buffer.from('hello world'), { create: true })
.then(() => {
return ipfs.files.read('/test-folder/test-file-2.txt')
})
Expand Down Expand Up @@ -396,7 +398,9 @@ describe('.files (the MFS API part)', function () {
})

it('files.read', (done) => {
if (!isNode) { return done() }
if (!isNode) {
return done()
}

ipfs.files.read('/test-folder/test-file')
.then((stream) => {
Expand Down

0 comments on commit 490ea61

Please sign in to comment.