Skip to content

Commit

Permalink
feat: support old school streams (ipfs#12)
Browse files Browse the repository at this point in the history
In order not to break our users, this PR adds support for the
[`readable-stream`](https://www.npmjs.com/package/readable-stream)
 module @v2 and @V3.
  • Loading branch information
achingbrain authored and hugomrdias committed Sep 15, 2019
1 parent 8534de8 commit 18cfa86
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 1 deletion.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@
"dirty-chai": "^2.0.1",
"electron": "^6.0.6",
"electron-mocha": "^8.0.3",
"pull-stream": "^3.6.13"
"pull-stream": "^3.6.13",
"readable-stream-2": "npm:readable-stream@^2.0.0"
},
"contributors": [
"Alan Shaw <[email protected]>",
Expand Down
46 changes: 46 additions & 0 deletions src/files/normalise-input.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ const { Buffer } = require('buffer')
const pullStreamToIterable = require('pull-stream-to-async-iterator')
const { isSource } = require('is-pull-stream')
const globalThis = require('../globalthis')
const { Readable } = require('stream')
const Readable3 = require('readable-stream')

/*
* Transform one of:
Expand All @@ -20,6 +22,7 @@ const globalThis = require('../globalthis')
* { path, content: Iterable<Bytes> } [single file]
* { path, content: AsyncIterable<Bytes> } [single file]
* { path, content: PullStream<Bytes> } [single file]
* { path, content: Readable<Bytes> } [single file]
* Iterable<Number> [single file]
* Iterable<Bytes> [single file]
* Iterable<Bloby> [multiple files]
Expand All @@ -31,6 +34,7 @@ const globalThis = require('../globalthis')
* Iterable<{ path, content: Iterable<Bytes> }> [multiple files]
* Iterable<{ path, content: AsyncIterable<Bytes> }> [multiple files]
* Iterable<{ path, content: PullStream<Bytes> }> [multiple files]
* Iterable<{ path, content: Readable<Bytes> }> [multiple files]
* AsyncIterable<Bytes> [single file]
* AsyncIterable<Bloby> [multiple files]
* AsyncIterable<String> [multiple files]
Expand All @@ -41,6 +45,7 @@ const globalThis = require('../globalthis')
* AsyncIterable<{ path, content: Iterable<Bytes> }> [multiple files]
* AsyncIterable<{ path, content: AsyncIterable<Bytes> }> [multiple files]
* AsyncIterable<{ path, content: PullStream<Bytes> }> [multiple files]
* AsyncIterable<{ path, content: Readable<Bytes> }> [multiple files]
* PullStream<Bytes> [single file]
* PullStream<Bloby> [multiple files]
* PullStream<String> [multiple files]
Expand All @@ -51,6 +56,18 @@ const globalThis = require('../globalthis')
* PullStream<{ path, content: Iterable<Bytes> }> [multiple files]
* PullStream<{ path, content: AsyncIterable<Bytes> }> [multiple files]
* PullStream<{ path, content: PullStream<Bytes> }> [multiple files]
* PullStream<{ path, content: Readable<Bytes> }> [multiple files]
* Readable<Bytes> [single file]
* Readable<Bloby> [multiple files]
* Readable<String> [multiple files]
* Readable<{ path, content: Bytes }> [multiple files]
* Readable<{ path, content: Bloby }> [multiple files]
* Readable<{ path, content: String }> [multiple files]
* Readable<{ path, content: Iterable<Number> }> [multiple files]
* Readable<{ path, content: Iterable<Bytes> }> [multiple files]
* Readable<{ path, content: AsyncIterable<Bytes> }> [multiple files]
* Readable<{ path, content: PullStream<Bytes> }> [multiple files]
* Readable<{ path, content: Readable<Bytes> }> [multiple files]
* ```
* Into:
*
Expand Down Expand Up @@ -82,6 +99,11 @@ module.exports = function normaliseInput (input) {
})()
}

// Readable<?>
if (isOldReadable(input)) {
input = upgradeOldStream(input)
}

// Iterable<?>
if (input[Symbol.iterator]) {
return (async function * () { // eslint-disable-line require-await
Expand Down Expand Up @@ -213,6 +235,11 @@ function toAsyncIterable (input) {
return blobToAsyncGenerator(input)
}

// Readable<?>
if (isOldReadable(input)) {
input = upgradeOldStream(input)
}

// Iterator<?>
if (input[Symbol.iterator]) {
return (async function * () { // eslint-disable-line require-await
Expand Down Expand Up @@ -259,6 +286,14 @@ function toAsyncIterable (input) {
throw errCode(new Error(`Unexpected input: ${input}`, 'ERR_UNEXPECTED_INPUT'))
}

function isOldReadable (obj) {
if (obj[Symbol.iterator] || obj[Symbol.asyncIterator]) {
return false
}

return Boolean(obj.readable)
}

function toBuffer (chunk) {
return isBytes(chunk) ? chunk : Buffer.from(chunk)
}
Expand All @@ -276,6 +311,17 @@ function isFileObject (obj) {
return typeof obj === 'object' && (obj.path || obj.content)
}

function upgradeOldStream (stream) {
if (stream[Symbol.asyncIterator] || stream[Symbol.iterator]) {
return stream
}

// in the browser the stream.Readable is not an async iterator but readble-stream@3 is...
stream[Symbol.asyncIterator] = Readable.prototype[Symbol.asyncIterator] || Readable3.prototype[Symbol.asyncIterator]

return stream
}

function blobToAsyncGenerator (blob) {
if (typeof blob.stream === 'function') {
// firefox < 69 does not support blob.stream()
Expand Down
87 changes: 87 additions & 0 deletions test/files/normalise-input.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ const { supportsFileReader } = require('../../src/supports')
const { Buffer } = require('buffer')
const all = require('async-iterator-all')
const pull = require('pull-stream')
const Readable2 = require('readable-stream-2')
const Readable3 = require('readable-stream')
const ReadableNode = require('stream').Readable
const globalThis = require('../../src/globalthis')

chai.use(dirtyChai)
Expand Down Expand Up @@ -56,6 +59,42 @@ function pullStreamOf (thing) {
return pull.values([thing])
}

function readable2Of (thing) {
const stream = new Readable2({
objectMode: true,
read () {
this.push(thing)
this.push(null)
}
})

return stream
}

function readable3Of (thing) {
const stream = new Readable3({
objectMode: true,
read () {
this.push(thing)
this.push(null)
}
})

return stream
}

function readableNodeOf (thing) {
const stream = new ReadableNode({
objectMode: true,
read () {
this.push(thing)
this.push(null)
}
})

return stream
}

describe('normalise-input', function () {
function testInputType (content, name, isBytes) {
it(name, async function () {
Expand All @@ -74,6 +113,18 @@ describe('normalise-input', function () {
it(`PullStream<${name}>`, async function () {
await testContent(pullStreamOf(content))
})

it(`Readable2<${name}>`, async function () {
await testContent(readable2Of(content))
})

it(`Readable3<${name}>`, async function () {
await testContent(readable3Of(content))
})

it(`ReadableNode<${name}>`, async function () {
await testContent(readableNodeOf(content))
})
}

it(`{ path: '', content: ${name} }`, async function () {
Expand All @@ -92,6 +143,18 @@ describe('normalise-input', function () {
it(`{ path: '', content: PullStream<${name}> }`, async function () {
await testContent({ path: '', content: pullStreamOf(content) })
})

it(`{ path: '', content: Readable2<${name}> }`, async function () {
await testContent({ path: '', content: readable2Of(content) })
})

it(`{ path: '', content: Readable3<${name}> }`, async function () {
await testContent({ path: '', content: readable3Of(content) })
})

it(`{ path: '', content: ReadableNode<${name}> }`, async function () {
await testContent({ path: '', content: readableNodeOf(content) })
})
}

it(`Iterable<{ path: '', content: ${name} }`, async function () {
Expand All @@ -106,6 +169,18 @@ describe('normalise-input', function () {
await testContent(pullStreamOf({ path: '', content }))
})

it(`Readable2<{ path: '', content: ${name} }`, async function () {
await testContent(readable2Of({ path: '', content }))
})

it(`Readable3<{ path: '', content: ${name} }`, async function () {
await testContent(readable3Of({ path: '', content }))
})

it(`ReadableNode<{ path: '', content: ${name} }`, async function () {
await testContent(readableNodeOf({ path: '', content }))
})

if (isBytes) {
it(`Iterable<{ path: '', content: Iterable<${name}> }>`, async function () {
await testContent(iterableOf({ path: '', content: iterableOf(content) }))
Expand All @@ -126,6 +201,18 @@ describe('normalise-input', function () {
it(`PullStream<{ path: '', content: PullStream<${name}> }>`, async function () {
await testContent(pullStreamOf({ path: '', content: pullStreamOf(content) }))
})

it(`Readable2<{ path: '', content: Readable2<${name}> }>`, async function () {
await testContent(readable2Of({ path: '', content: readable2Of(content) }))
})

it(`Readable3<{ path: '', content: Readable3<${name}> }>`, async function () {
await testContent(readable3Of({ path: '', content: readable3Of(content) }))
})

it(`ReadableNode<{ path: '', content: Readable3<${name}> }>`, async function () {
await testContent(readableNodeOf({ path: '', content: readableNodeOf(content) }))
})
}
}

Expand Down

0 comments on commit 18cfa86

Please sign in to comment.