From 7e191bb0d166e3c76d63f5636ddb85f61eca2740 Mon Sep 17 00:00:00 2001 From: ehmicky Date: Thu, 17 Aug 2023 00:03:32 +0100 Subject: [PATCH] Fix `maxBuffer` bug with `TextDecoder()` (#105) --- source/array-buffer.js | 9 +++++---- source/array.js | 9 +++++---- source/contents.js | 43 ++++++++++++++++++++++++++---------------- source/string.js | 16 ++++++++++------ source/utils.js | 4 ++++ test/string.js | 5 +++++ 6 files changed, 56 insertions(+), 30 deletions(-) diff --git a/source/array-buffer.js b/source/array-buffer.js index 4207ad8..e1dc03b 100644 --- a/source/array-buffer.js +++ b/source/array-buffer.js @@ -1,11 +1,11 @@ import {getStreamContents} from './contents.js'; -import {throwObjectStream, getLengthProp} from './utils.js'; +import {noop, throwObjectStream, getLengthProp} from './utils.js'; export async function getStreamAsArrayBuffer(stream, options) { return getStreamContents(stream, arrayBufferMethods, options); } -const initArrayBuffer = () => new ArrayBuffer(0); +const initArrayBuffer = () => ({contents: new ArrayBuffer(0)}); const useTextEncoder = chunk => textEncoder.encode(chunk); const textEncoder = new TextEncoder(); @@ -15,7 +15,7 @@ const useUint8Array = chunk => new Uint8Array(chunk); const useUint8ArrayWithOffset = chunk => new Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength); // `contents` is an increasingly growing `Uint8Array`. -const addArrayBufferChunk = (convertedChunk, contents, length, previousLength) => { +const addArrayBufferChunk = (convertedChunk, {contents, length: previousLength}, length) => { const newContents = hasArrayBufferResize() ? resizeArrayBuffer(contents, length) : resizeArrayBufferSlow(contents, length); new Uint8Array(newContents).set(convertedChunk, previousLength); return newContents; @@ -54,7 +54,7 @@ const getNewContentsLength = length => SCALE_FACTOR ** Math.ceil(Math.log(length const SCALE_FACTOR = 2; -const finalizeArrayBuffer = (contents, length) => hasArrayBufferResize() ? contents : contents.slice(0, length); +const finalizeArrayBuffer = ({contents, length}) => hasArrayBufferResize() ? contents : contents.slice(0, length); // `ArrayBuffer.slice()` is slow. When `ArrayBuffer.resize()` is available // (Node >=20.0.0, Safari >=16.4 and Chrome), we can use it instead. @@ -76,5 +76,6 @@ const arrayBufferMethods = { }, getSize: getLengthProp, addChunk: addArrayBufferChunk, + getFinalChunk: noop, finalize: finalizeArrayBuffer, }; diff --git a/source/array.js b/source/array.js index 30ddedc..c7b4c79 100644 --- a/source/array.js +++ b/source/array.js @@ -1,15 +1,15 @@ import {getStreamContents} from './contents.js'; -import {identity} from './utils.js'; +import {identity, noop, getContentsProp} from './utils.js'; export async function getStreamAsArray(stream, options) { return getStreamContents(stream, arrayMethods, options); } -const initArray = () => ([]); +const initArray = () => ({contents: []}); const increment = () => 1; -const addArrayChunk = (convertedChunk, contents) => { +const addArrayChunk = (convertedChunk, {contents}) => { contents.push(convertedChunk); return contents; }; @@ -26,5 +26,6 @@ const arrayMethods = { }, getSize: increment, addChunk: addArrayChunk, - finalize: identity, + getFinalChunk: noop, + finalize: getContentsProp, }; diff --git a/source/contents.js b/source/contents.js index f22bf00..dfc028f 100644 --- a/source/contents.js +++ b/source/contents.js @@ -1,34 +1,45 @@ -export const getStreamContents = async (stream, {init, convertChunk, getSize, addChunk, finalize}, {maxBuffer = Number.POSITIVE_INFINITY} = {}) => { +export const getStreamContents = async (stream, {init, convertChunk, getSize, addChunk, getFinalChunk, finalize}, {maxBuffer = Number.POSITIVE_INFINITY} = {}) => { if (!isAsyncIterable(stream)) { throw new Error('The first argument must be a Readable, a ReadableStream, or an async iterable.'); } - let length = 0; - let contents = init(); - const textDecoder = new TextDecoder(); + const state = init(); + state.length = 0; try { for await (const chunk of stream) { const chunkType = getChunkType(chunk); - const convertedChunk = convertChunk[chunkType](chunk, textDecoder); - const chunkSize = getSize(convertedChunk); - - if (length + chunkSize > maxBuffer) { - throw new MaxBufferError(); - } - - const newLength = length + chunkSize; - contents = addChunk(convertedChunk, contents, newLength, length); - length = newLength; + const convertedChunk = convertChunk[chunkType](chunk, state); + appendChunk({convertedChunk, state, getSize, addChunk, maxBuffer}); } - return finalize(contents, length, textDecoder); + appendFinalChunk({state, convertChunk, getSize, addChunk, getFinalChunk, maxBuffer}); + return finalize(state); } catch (error) { - error.bufferedData = finalize(contents, length, textDecoder); + error.bufferedData = finalize(state); throw error; } }; +const appendFinalChunk = ({state, getSize, addChunk, getFinalChunk, maxBuffer}) => { + const convertedChunk = getFinalChunk(state); + if (convertedChunk !== undefined) { + appendChunk({convertedChunk, state, getSize, addChunk, maxBuffer}); + } +}; + +const appendChunk = ({convertedChunk, state, getSize, addChunk, maxBuffer}) => { + const chunkSize = getSize(convertedChunk); + const newLength = state.length + chunkSize; + + if (newLength > maxBuffer) { + throw new MaxBufferError(); + } + + state.contents = addChunk(convertedChunk, state, newLength); + state.length = newLength; +}; + const isAsyncIterable = stream => typeof stream === 'object' && stream !== null && typeof stream[Symbol.asyncIterator] === 'function'; const getChunkType = chunk => { diff --git a/source/string.js b/source/string.js index 46c45e6..d6997b3 100644 --- a/source/string.js +++ b/source/string.js @@ -1,17 +1,20 @@ import {getStreamContents} from './contents.js'; -import {identity, throwObjectStream, getLengthProp} from './utils.js'; +import {identity, getContentsProp, throwObjectStream, getLengthProp} from './utils.js'; export async function getStreamAsString(stream, options) { return getStreamContents(stream, stringMethods, options); } -const initString = () => ''; +const initString = () => ({contents: '', textDecoder: new TextDecoder()}); -const useTextDecoder = (chunk, textDecoder) => textDecoder.decode(chunk, {stream: true}); +const useTextDecoder = (chunk, {textDecoder}) => textDecoder.decode(chunk, {stream: true}); -const addStringChunk = (convertedChunk, contents) => contents + convertedChunk; +const addStringChunk = (convertedChunk, {contents}) => contents + convertedChunk; -const finalizeString = (contents, length, textDecoder) => `${contents}${textDecoder.decode()}`; +const getFinalStringChunk = ({textDecoder}) => { + const finalChunk = textDecoder.decode(); + return finalChunk === '' ? undefined : finalChunk; +}; const stringMethods = { init: initString, @@ -25,5 +28,6 @@ const stringMethods = { }, getSize: getLengthProp, addChunk: addStringChunk, - finalize: finalizeString, + getFinalChunk: getFinalStringChunk, + finalize: getContentsProp, }; diff --git a/source/utils.js b/source/utils.js index 4ba12b3..af8d5e2 100644 --- a/source/utils.js +++ b/source/utils.js @@ -1,5 +1,9 @@ export const identity = value => value; +export const noop = () => undefined; + +export const getContentsProp = ({contents}) => contents; + export const throwObjectStream = chunk => { throw new Error(`Streams in object mode are not supported: ${String(chunk)}`); }; diff --git a/test/string.js b/test/string.js index a3b620c..2a9b1a6 100644 --- a/test/string.js +++ b/test/string.js @@ -112,6 +112,11 @@ test('get stream with truncated UTF-8 sequences', async t => { t.is(result, `${multiByteString.slice(0, -1)}${INVALID_UTF8_MARKER}`); }); +test('handles truncated UTF-8 sequences over maxBuffer', async t => { + const maxBuffer = multiByteString.length - 1; + await t.throwsAsync(setupString(multiByteBuffer.slice(0, -1), {maxBuffer}), {instanceOf: MaxBufferError}); +}); + test('get stream with invalid UTF-8 sequences', async t => { const result = await setupString(multiByteBuffer.slice(1, 2)); t.is(result, INVALID_UTF8_MARKER);