From 12b7a2e6a98569d6a9ca4c3c01c6c74646a50102 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 18 Nov 2019 15:51:40 +0000 Subject: [PATCH] refactor: switch tar-stream for it-tar (#1161) This switches out `tar-stream` for [`it-tar`](https://www.npmjs.com/package/it-tar), which is a fork that uses pure async iterables. This removes `readable-stream` from this branch of the tree (which will allow us to drop it entirely in the future) and allows us to drop the `event-iterator` dependency. --- package.json | 3 +- src/get.js | 18 ++++++- src/utils/load-commands.js | 24 +++++++-- src/utils/tar-stream-to-objects.js | 85 ------------------------------ 4 files changed, 38 insertions(+), 92 deletions(-) delete mode 100644 src/utils/tar-stream-to-objects.js diff --git a/package.json b/package.json index acd90bb63a..bfc80c5693 100644 --- a/package.json +++ b/package.json @@ -57,7 +57,6 @@ "detect-node": "^2.0.4", "end-of-stream": "^1.4.1", "err-code": "^2.0.0", - "event-iterator": "^1.2.0", "explain-error": "^1.0.4", "flatmap": "0.0.3", "form-data": "^3.0.0", @@ -74,6 +73,7 @@ "iso-stream-http": "~0.1.2", "iso-url": "~0.4.6", "it-glob": "0.0.6", + "it-tar": "^1.1.0", "it-to-stream": "^0.1.1", "iterable-ndjson": "^1.1.0", "just-kebab-case": "^1.1.0", @@ -101,7 +101,6 @@ "qs": "^6.5.2", "readable-stream": "^3.1.1", "stream-to-pull-stream": "^1.7.2", - "tar-stream": "^2.0.1", "through2": "^3.0.1" }, "devDependencies": { diff --git a/src/get.js b/src/get.js index 62adfd1e9a..c06fc6d36a 100644 --- a/src/get.js +++ b/src/get.js @@ -1,8 +1,9 @@ 'use strict' const configure = require('./lib/configure') -const tarStreamToObjects = require('./utils/tar-stream-to-objects') +const Tar = require('it-tar') const IsIpfs = require('is-ipfs') +const toIterable = require('./lib/stream-to-iterable') const cleanCID = require('./utils/clean-cid') module.exports = configure(({ ky }) => { @@ -43,6 +44,19 @@ module.exports = configure(({ ky }) => { searchParams }) - yield * tarStreamToObjects(res.body) + const extractor = Tar.extract() + + for await (const { header, body } of extractor(toIterable(res.body))) { + if (header.type === 'directory') { + yield { + path: header.name + } + } else { + yield { + path: header.name, + content: body + } + } + } } }) diff --git a/src/utils/load-commands.js b/src/utils/load-commands.js index 14545b5641..3e51acdb20 100644 --- a/src/utils/load-commands.js +++ b/src/utils/load-commands.js @@ -7,6 +7,8 @@ const { concatify, collectify, pullify, streamify } = require('../lib/converters const toPullStream = require('async-iterator-to-pull-stream') const pull = require('pull-stream/pull') const map = require('pull-stream/throughs/map') +const toStream = require('it-to-stream') +const BufferList = require('bl/BufferList') function requireCommands (send, config) { const add = require('../add')(config) @@ -58,7 +60,7 @@ function requireCommands (send, config) { for await (const entry of get(path, options)) { if (entry.content) { - entry.content = Buffer.concat(await all(entry.content)) + entry.content = new BufferList(await all(entry.content)).slice() } output.push(entry) @@ -66,13 +68,29 @@ function requireCommands (send, config) { return output }), - getReadableStream: streamify.readable(get), + getReadableStream: streamify.readable((path, options) => (async function * () { + for await (const file of get(path, options)) { + if (file.content) { + const { content } = file + file.content = toStream((async function * () { + for await (const chunk of content) { + yield chunk.slice() // Convert bl to Buffer + } + })()) + } + + yield file + } + })()), getPullStream: (path, options) => { return pull( toPullStream(get(path, options)), map(file => { if (file.content) { - file.content = toPullStream(file.content) + file.content = pull( + toPullStream(file.content), + map(chunk => chunk.slice()) // Convert bl to Buffer + ) } return file diff --git a/src/utils/tar-stream-to-objects.js b/src/utils/tar-stream-to-objects.js deleted file mode 100644 index 7241b34452..0000000000 --- a/src/utils/tar-stream-to-objects.js +++ /dev/null @@ -1,85 +0,0 @@ -'use strict' - -const tar = require('tar-stream') -const { EventIterator } = require('event-iterator') - -function pipe (reader, writable) { - reader.read() - .then(({ done, value }) => { - if (done) { - writable.end() - - return - } - - if (value) { - const beneathHighWaterMark = writable.write(value) - - if (beneathHighWaterMark) { - pipe(reader, writable) - } else { - writable.once('drain', () => { - pipe(reader, writable) - }) - } - } - }, (err) => { - writable.emit('error', err) - }) -} - -/* - Transform a tar readable stream into an async iterator of objects: - - Output format: - { path: 'string', content: AsyncIterator } -*/ -async function * tarStreamToObjects (inputStream) { - const extractStream = tar.extract() - let onEntry - - const tarStream = new EventIterator( - (push, stop, fail) => { - onEntry = (header, stream, next) => { - push({ header, stream }) - - next() - } - - extractStream.addListener('entry', onEntry) - extractStream.addListener('finish', stop) - extractStream.addListener('error', fail) - }, - (push, stop, fail) => { - extractStream.removeListener('entry', onEntry) - extractStream.removeListener('finish', stop) - extractStream.removeListener('error', fail) - extractStream.destroy() - } - ) - - if (inputStream.pipe) { - // node stream - inputStream.pipe(extractStream) - } else if (inputStream.getReader) { - // browser readable stream - pipe(inputStream.getReader(), extractStream) - } else { - throw new Error('Unknown stream type') - } - - for await (const { header, stream } of tarStream) { - if (header.type === 'directory') { - yield { - path: header.name - } - } else { - yield { - path: header.name, - content: stream - } - } - } -} - -module.exports = tarStreamToObjects