From be7b80252a7a678533b51d59bb7e11e9f317aced Mon Sep 17 00:00:00 2001 From: Vadim Demedes Date: Sun, 23 Jul 2017 15:36:02 +0300 Subject: [PATCH] Add progress events (#322) --- index.js | 171 ++++++++++++++++++++++++++++++++++++++++-- package.json | 6 +- readme.md | 31 ++++++++ test/progress.js | 189 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 389 insertions(+), 8 deletions(-) create mode 100644 test/progress.js diff --git a/index.js b/index.js index 9f9c76430..0631aa280 100644 --- a/index.js +++ b/index.js @@ -3,9 +3,12 @@ const EventEmitter = require('events'); const http = require('http'); const https = require('https'); const PassThrough = require('stream').PassThrough; +const Transform = require('stream').Transform; const urlLib = require('url'); +const fs = require('fs'); const querystring = require('querystring'); const duplexer3 = require('duplexer3'); +const intoStream = require('into-stream'); const isStream = require('is-stream'); const getStream = require('get-stream'); const timedOut = require('timed-out'); @@ -13,17 +16,51 @@ const urlParseLax = require('url-parse-lax'); const urlToOptions = require('url-to-options'); const lowercaseKeys = require('lowercase-keys'); const decompressResponse = require('decompress-response'); +const mimicResponse = require('mimic-response'); const isRetryAllowed = require('is-retry-allowed'); const Buffer = require('safe-buffer').Buffer; const isURL = require('isurl'); const isPlainObj = require('is-plain-obj'); const PCancelable = require('p-cancelable'); const pTimeout = require('p-timeout'); +const pify = require('pify'); const pkg = require('./package'); const getMethodRedirectCodes = new Set([300, 301, 302, 303, 304, 305, 307, 308]); const allMethodRedirectCodes = new Set([300, 303, 307, 308]); +const isFormData = body => isStream(body) && typeof body.getBoundary === 'function'; + +const getBodySize = opts => { + const body = opts.body; + + if (opts.headers['content-length']) { + return Number(opts.headers['content-length']); + } + + if (!body && !opts.stream) { + return 0; + } + + if (typeof body === 'string') { + return Buffer.byteLength(body); + } + + if (isFormData(body)) { + return pify(body.getLength.bind(body))(); + } + + if (body instanceof fs.ReadStream) { + return pify(fs.stat)(body.path).then(stat => stat.size); + } + + if (isStream(body) && Buffer.isBuffer(body._buffer)) { + return body._buffer.length; + } + + return null; +}; + function requestAsEventEmitter(opts) { opts = opts || {}; @@ -32,6 +69,8 @@ function requestAsEventEmitter(opts) { const redirects = []; let retryCount = 0; let redirectUrl; + let uploadBodySize; + let uploaded = 0; const get = opts => { if (opts.protocol !== 'http:' && opts.protocol !== 'https:') { @@ -46,7 +85,17 @@ function requestAsEventEmitter(opts) { fn = electron.net || electron.remote.net; } + let progressInterval; + const req = fn.request(opts, res => { + clearInterval(progressInterval); + + ee.emit('uploadProgress', { + percent: 1, + transferred: uploaded, + total: uploadBodySize + }); + const statusCode = res.statusCode; res.url = redirectUrl || requestUrl; @@ -85,22 +134,65 @@ function requestAsEventEmitter(opts) { return; } + const downloadBodySize = Number(res.headers['content-length']) || null; + let downloaded = 0; + setImmediate(() => { + const progressStream = new Transform({ + transform(chunk, encoding, callback) { + downloaded += chunk.length; + + const percent = downloadBodySize ? downloaded / downloadBodySize : 0; + + // Let flush() be responsible for emitting the last event + if (percent < 1) { + ee.emit('downloadProgress', { + percent, + transferred: downloaded, + total: downloadBodySize + }); + } + + callback(null, chunk); + }, + + flush(callback) { + ee.emit('downloadProgress', { + percent: 1, + transferred: downloaded, + total: downloadBodySize + }); + + callback(); + } + }); + + mimicResponse(res, progressStream); + progressStream.redirectUrls = redirects; + const response = opts.decompress === true && typeof decompressResponse === 'function' && - req.method !== 'HEAD' ? decompressResponse(res) : res; + req.method !== 'HEAD' ? decompressResponse(progressStream) : progressStream; if (!opts.decompress && ['gzip', 'deflate'].indexOf(res.headers['content-encoding']) !== -1) { opts.encoding = null; } - response.redirectUrls = redirects; - ee.emit('response', response); + + ee.emit('downloadProgress', { + percent: 0, + transferred: 0, + total: downloadBodySize + }); + + res.pipe(progressStream); }); }); req.once('error', err => { + clearInterval(progressInterval); + const backoff = opts.retries(++retryCount, err); if (backoff) { @@ -111,7 +203,44 @@ function requestAsEventEmitter(opts) { ee.emit('error', new got.RequestError(err, opts)); }); + ee.on('request', req => { + ee.emit('uploadProgress', { + percent: 0, + transferred: 0, + total: uploadBodySize + }); + + req.connection.on('connect', () => { + const uploadEventFrequency = 150; + + progressInterval = setInterval(() => { + const lastUploaded = uploaded; + const headersSize = Buffer.byteLength(req._header); + uploaded = req.connection.bytesWritten - headersSize; + + // Prevent the known issue of `bytesWritten` being larger than body size + if (uploadBodySize && uploaded > uploadBodySize) { + uploaded = uploadBodySize; + } + + // Don't emit events with unchanged progress and + // prevent last event from being emitted, because + // it's emitted when `response` is emitted + if (uploaded === lastUploaded || uploaded === uploadBodySize) { + return; + } + + ee.emit('uploadProgress', { + percent: uploadBodySize ? uploaded / uploadBodySize : 0, + transferred: uploaded, + total: uploadBodySize + }); + }, uploadEventFrequency); + }); + }); + if (opts.gotTimeout) { + clearInterval(progressInterval); timedOut(req, opts.gotTimeout); } @@ -121,8 +250,16 @@ function requestAsEventEmitter(opts) { }; setImmediate(() => { - get(opts); + Promise.resolve(getBodySize(opts)) + .then(size => { + uploadBodySize = size; + get(opts); + }) + .catch(err => { + ee.emit('error', err); + }); }); + return ee; } @@ -131,7 +268,9 @@ function asPromise(opts) { pTimeout(requestPromise, opts.gotTimeout.request, new got.RequestError({message: 'Request timed out', code: 'ETIMEDOUT'}, opts)) : requestPromise; - return timeoutFn(new PCancelable((onCancel, resolve, reject) => { + const proxy = new EventEmitter(); + + const promise = timeoutFn(new PCancelable((onCancel, resolve, reject) => { const ee = requestAsEventEmitter(opts); let cancelOnRequest = false; @@ -191,10 +330,21 @@ function asPromise(opts) { }); ee.on('error', reject); + ee.on('uploadProgress', proxy.emit.bind(proxy, 'uploadProgress')); + ee.on('downloadProgress', proxy.emit.bind(proxy, 'downloadProgress')); })); + + promise.on = (name, fn) => { + proxy.on(name, fn); + return promise; + }; + + return promise; } function asStream(opts) { + opts.stream = true; + const input = new PassThrough(); const output = new PassThrough(); const proxy = duplexer3(input, output); @@ -256,6 +406,8 @@ function asStream(opts) { ee.on('redirect', proxy.emit.bind(proxy, 'redirect')); ee.on('error', proxy.emit.bind(proxy, 'error')); + ee.on('uploadProgress', proxy.emit.bind(proxy, 'uploadProgress')); + ee.on('downloadProgress', proxy.emit.bind(proxy, 'downloadProgress')); return proxy; } @@ -320,7 +472,7 @@ function normalizeArguments(url, opts) { throw new TypeError('options.body must be a plain Object or Array when options.form or options.json is used'); } - if (isStream(body) && typeof body.getBoundary === 'function') { + if (isFormData(body)) { // Special case for https://github.com/form-data/form-data headers['content-type'] = headers['content-type'] || `multipart/form-data; boundary=${body.getBoundary()}`; } else if (opts.form && canBodyBeStringified) { @@ -336,6 +488,13 @@ function normalizeArguments(url, opts) { headers['content-length'] = length; } + // Convert buffer to stream to receive upload progress events + // see https://github.com/sindresorhus/got/pull/322 + if (Buffer.isBuffer(body)) { + opts.body = intoStream(body); + opts.body._buffer = body; + } + opts.method = (opts.method || 'POST').toUpperCase(); } else { opts.method = (opts.method || 'GET').toUpperCase(); diff --git a/package.json b/package.json index bc8f9586d..d6a5e60e6 100644 --- a/package.json +++ b/package.json @@ -53,13 +53,16 @@ "decompress-response": "^3.2.0", "duplexer3": "^0.1.4", "get-stream": "^3.0.0", + "into-stream": "^3.1.0", "is-plain-obj": "^1.1.0", "is-retry-allowed": "^1.0.0", "is-stream": "^1.0.0", "isurl": "^1.0.0-alpha5", "lowercase-keys": "^1.0.0", + "mimic-response": "^1.0.0", "p-cancelable": "^0.3.0", "p-timeout": "^1.1.1", + "pify": "^3.0.0", "safe-buffer": "^5.0.1", "timed-out": "^4.0.0", "url-parse-lax": "^1.0.0", @@ -70,10 +73,9 @@ "coveralls": "^2.11.4", "form-data": "^2.1.1", "get-port": "^3.0.0", - "into-stream": "^3.0.0", "nyc": "^11.0.2", "pem": "^1.4.4", - "pify": "^3.0.0", + "slow-stream": "0.0.4", "tempfile": "^2.0.0", "tempy": "^0.1.0", "universal-url": "1.0.0-alpha", diff --git a/readme.md b/readme.md index f44341b1c..0f7fb1f50 100644 --- a/readme.md +++ b/readme.md @@ -21,6 +21,7 @@ Created because [`request`](https://github.com/request/request) is bloated *(sev - [Request cancelation](#aborting-the-request) - [Follows redirects](#followredirect) - [Retries on network failure](#retries) +- [Progress events](#onuploadprogress-progress) - [Handles gzip/deflate](#decompress) - [Timeout handling](#timeout) - [Errors with metadata](#errors) @@ -202,6 +203,36 @@ got.stream('github.com') `redirect` event to get the response object of a redirect. The second argument is options for the next request to the redirect location. +##### .on('uploadProgress', progress) +##### .on('downloadProgress', progress) + +Progress events for uploading (sending request) and downloading (receiving response). The `progress` argument is an object like: + +```js +{ + percent: 0.1, + transferred: 1024, + total: 10240 +} +``` + +If it's not possible to retrieve the body size (can happen when streaming), `total` will be `null`. + +**Note**: Progress events can also be used with promises. + +```js +got('todomvc.com') + .on('downloadProgress', progress => { + // Report download progress + }) + .on('uploadProgress', progress => { + // Report upload progress + }) + .then(response => { + // Done + }); +``` + ##### .on('error', error, body, response) `error` event emitted in case of protocol error (like `ENOTFOUND` etc.) or status error (4xx or 5xx). The second argument is the body of the server response in case of status error. The third argument is response object. diff --git a/test/progress.js b/test/progress.js new file mode 100644 index 000000000..bc98c87eb --- /dev/null +++ b/test/progress.js @@ -0,0 +1,189 @@ +import fs from 'fs'; +import SlowStream from 'slow-stream'; +import intoStream from 'into-stream'; +import getStream from 'get-stream'; +import FormData from 'form-data'; +import tempfile from 'tempfile'; +import pify from 'pify'; +import test from 'ava'; +import got from '..'; +import {createServer} from './helpers/server'; + +const checkEvents = (t, events, bodySize = null) => { + t.true(events.length >= 2); + + const hasBodySize = typeof bodySize === 'number'; + let lastEvent = events.shift(); + + if (!hasBodySize) { + t.is(lastEvent.percent, 0); + } + + for (const [index, event] of events.entries()) { + if (hasBodySize) { + t.is(event.percent, event.transferred / bodySize); + t.true(event.percent > lastEvent.percent); + } else { + const isLastEvent = index === events.length - 1; + t.is(event.percent, isLastEvent ? 1 : 0); + } + + t.true(event.transferred >= lastEvent.transferred); + t.is(event.total, bodySize); + + lastEvent = event; + } +}; + +const file = Buffer.alloc(1024 * 1024 * 2); +let s; + +test.before('setup', async () => { + s = await createServer(); + + s.on('/download', (req, res) => { + res.setHeader('content-length', file.length); + + intoStream(file) + .pipe(new SlowStream({maxWriteInterval: 50})) + .pipe(res); + }); + + s.on('/download/no-total', (req, res) => { + res.write('hello'); + res.end(); + }); + + s.on('/upload', (req, res) => { + req + .pipe(new SlowStream({maxWriteInterval: 100})) + .on('end', () => res.end()); + }); + + await s.listen(s.port); +}); + +test('download progress', async t => { + const events = []; + + const res = await got(`${s.url}/download`, {encoding: null}) + .on('downloadProgress', e => events.push(e)); + + checkEvents(t, events, res.body.length); +}); + +test('download progress - missing total size', async t => { + const events = []; + + await got(`${s.url}/download/no-total`) + .on('downloadProgress', e => events.push(e)); + + checkEvents(t, events); +}); + +test('download progress - stream', async t => { + const events = []; + + const stream = got.stream(`${s.url}/download`, {encoding: null}) + .on('downloadProgress', e => events.push(e)); + + await getStream(stream); + + checkEvents(t, events, file.length); +}); + +test('upload progress - file', async t => { + const events = []; + + await got.post(`${s.url}/upload`, {body: file}) + .on('uploadProgress', e => events.push(e)); + + checkEvents(t, events, file.length); +}); + +test('upload progress - file stream', async t => { + const path = tempfile(); + fs.writeFileSync(path, file); + + const events = []; + + await got.post(`${s.url}/upload`, {body: fs.createReadStream(path)}) + .on('uploadProgress', e => events.push(e)); + + checkEvents(t, events, file.length); +}); + +test('upload progress - form data', async t => { + const events = []; + + const body = new FormData(); + body.append('key', 'value'); + body.append('file', file); + + const size = await pify(body.getLength.bind(body))(); + + await got.post(`${s.url}/upload`, {body}) + .on('uploadProgress', e => events.push(e)); + + checkEvents(t, events, size); +}); + +test('upload progress - json', async t => { + const body = JSON.stringify({key: 'value'}); + const size = Buffer.byteLength(body); + const events = []; + + await got.post(`${s.url}/upload`, {body}) + .on('uploadProgress', e => events.push(e)); + + checkEvents(t, events, size); +}); + +test('upload progress - stream with known body size', async t => { + const events = []; + const options = { + headers: {'content-length': file.length} + }; + + const req = got.stream.post(`${s.url}/upload`, options) + .on('uploadProgress', e => events.push(e)); + + await getStream(intoStream(file).pipe(req)); + + checkEvents(t, events, file.length); +}); + +test('upload progress - stream with unknown body size', async t => { + const events = []; + + const req = got.stream.post(`${s.url}/upload`) + .on('uploadProgress', e => events.push(e)); + + await getStream(intoStream(file).pipe(req)); + + checkEvents(t, events); +}); + +test('upload progress - no body', async t => { + const events = []; + + await got.post(`${s.url}/upload`) + .on('uploadProgress', e => events.push(e)); + + t.deepEqual(events, [ + { + percent: 0, + transferred: 0, + total: 0 + }, + { + percent: 1, + transferred: 0, + total: 0 + } + ]); +}); + +test.after('cleanup', async () => { + await s.close(); +});