Skip to content

Commit

Permalink
Fix maxBuffer bug with TextDecoder() (#105)
Browse files Browse the repository at this point in the history
  • Loading branch information
ehmicky authored Aug 16, 2023
1 parent eee8fbe commit 7e191bb
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 30 deletions.
9 changes: 5 additions & 4 deletions source/array-buffer.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import {getStreamContents} from './contents.js';
import {throwObjectStream, getLengthProp} from './utils.js';
import {noop, throwObjectStream, getLengthProp} from './utils.js';

export async function getStreamAsArrayBuffer(stream, options) {
return getStreamContents(stream, arrayBufferMethods, options);
}

const initArrayBuffer = () => new ArrayBuffer(0);
const initArrayBuffer = () => ({contents: new ArrayBuffer(0)});

const useTextEncoder = chunk => textEncoder.encode(chunk);
const textEncoder = new TextEncoder();
Expand All @@ -15,7 +15,7 @@ const useUint8Array = chunk => new Uint8Array(chunk);
const useUint8ArrayWithOffset = chunk => new Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength);

// `contents` is an increasingly growing `Uint8Array`.
const addArrayBufferChunk = (convertedChunk, contents, length, previousLength) => {
const addArrayBufferChunk = (convertedChunk, {contents, length: previousLength}, length) => {
const newContents = hasArrayBufferResize() ? resizeArrayBuffer(contents, length) : resizeArrayBufferSlow(contents, length);
new Uint8Array(newContents).set(convertedChunk, previousLength);
return newContents;
Expand Down Expand Up @@ -54,7 +54,7 @@ const getNewContentsLength = length => SCALE_FACTOR ** Math.ceil(Math.log(length

const SCALE_FACTOR = 2;

const finalizeArrayBuffer = (contents, length) => hasArrayBufferResize() ? contents : contents.slice(0, length);
const finalizeArrayBuffer = ({contents, length}) => hasArrayBufferResize() ? contents : contents.slice(0, length);

// `ArrayBuffer.slice()` is slow. When `ArrayBuffer.resize()` is available
// (Node >=20.0.0, Safari >=16.4 and Chrome), we can use it instead.
Expand All @@ -76,5 +76,6 @@ const arrayBufferMethods = {
},
getSize: getLengthProp,
addChunk: addArrayBufferChunk,
getFinalChunk: noop,
finalize: finalizeArrayBuffer,
};
9 changes: 5 additions & 4 deletions source/array.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import {getStreamContents} from './contents.js';
import {identity} from './utils.js';
import {identity, noop, getContentsProp} from './utils.js';

export async function getStreamAsArray(stream, options) {
return getStreamContents(stream, arrayMethods, options);
}

const initArray = () => ([]);
const initArray = () => ({contents: []});

const increment = () => 1;

const addArrayChunk = (convertedChunk, contents) => {
const addArrayChunk = (convertedChunk, {contents}) => {
contents.push(convertedChunk);
return contents;
};
Expand All @@ -26,5 +26,6 @@ const arrayMethods = {
},
getSize: increment,
addChunk: addArrayChunk,
finalize: identity,
getFinalChunk: noop,
finalize: getContentsProp,
};
43 changes: 27 additions & 16 deletions source/contents.js
Original file line number Diff line number Diff line change
@@ -1,34 +1,45 @@
export const getStreamContents = async (stream, {init, convertChunk, getSize, addChunk, finalize}, {maxBuffer = Number.POSITIVE_INFINITY} = {}) => {
export const getStreamContents = async (stream, {init, convertChunk, getSize, addChunk, getFinalChunk, finalize}, {maxBuffer = Number.POSITIVE_INFINITY} = {}) => {
if (!isAsyncIterable(stream)) {
throw new Error('The first argument must be a Readable, a ReadableStream, or an async iterable.');
}

let length = 0;
let contents = init();
const textDecoder = new TextDecoder();
const state = init();
state.length = 0;

try {
for await (const chunk of stream) {
const chunkType = getChunkType(chunk);
const convertedChunk = convertChunk[chunkType](chunk, textDecoder);
const chunkSize = getSize(convertedChunk);

if (length + chunkSize > maxBuffer) {
throw new MaxBufferError();
}

const newLength = length + chunkSize;
contents = addChunk(convertedChunk, contents, newLength, length);
length = newLength;
const convertedChunk = convertChunk[chunkType](chunk, state);
appendChunk({convertedChunk, state, getSize, addChunk, maxBuffer});
}

return finalize(contents, length, textDecoder);
appendFinalChunk({state, convertChunk, getSize, addChunk, getFinalChunk, maxBuffer});
return finalize(state);
} catch (error) {
error.bufferedData = finalize(contents, length, textDecoder);
error.bufferedData = finalize(state);
throw error;
}
};

const appendFinalChunk = ({state, getSize, addChunk, getFinalChunk, maxBuffer}) => {
const convertedChunk = getFinalChunk(state);
if (convertedChunk !== undefined) {
appendChunk({convertedChunk, state, getSize, addChunk, maxBuffer});
}
};

const appendChunk = ({convertedChunk, state, getSize, addChunk, maxBuffer}) => {
const chunkSize = getSize(convertedChunk);
const newLength = state.length + chunkSize;

if (newLength > maxBuffer) {
throw new MaxBufferError();
}

state.contents = addChunk(convertedChunk, state, newLength);
state.length = newLength;
};

const isAsyncIterable = stream => typeof stream === 'object' && stream !== null && typeof stream[Symbol.asyncIterator] === 'function';

const getChunkType = chunk => {
Expand Down
16 changes: 10 additions & 6 deletions source/string.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import {getStreamContents} from './contents.js';
import {identity, throwObjectStream, getLengthProp} from './utils.js';
import {identity, getContentsProp, throwObjectStream, getLengthProp} from './utils.js';

export async function getStreamAsString(stream, options) {
return getStreamContents(stream, stringMethods, options);
}

const initString = () => '';
const initString = () => ({contents: '', textDecoder: new TextDecoder()});

const useTextDecoder = (chunk, textDecoder) => textDecoder.decode(chunk, {stream: true});
const useTextDecoder = (chunk, {textDecoder}) => textDecoder.decode(chunk, {stream: true});

const addStringChunk = (convertedChunk, contents) => contents + convertedChunk;
const addStringChunk = (convertedChunk, {contents}) => contents + convertedChunk;

const finalizeString = (contents, length, textDecoder) => `${contents}${textDecoder.decode()}`;
const getFinalStringChunk = ({textDecoder}) => {
const finalChunk = textDecoder.decode();
return finalChunk === '' ? undefined : finalChunk;
};

const stringMethods = {
init: initString,
Expand All @@ -25,5 +28,6 @@ const stringMethods = {
},
getSize: getLengthProp,
addChunk: addStringChunk,
finalize: finalizeString,
getFinalChunk: getFinalStringChunk,
finalize: getContentsProp,
};
4 changes: 4 additions & 0 deletions source/utils.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
export const identity = value => value;

export const noop = () => undefined;

export const getContentsProp = ({contents}) => contents;

export const throwObjectStream = chunk => {
throw new Error(`Streams in object mode are not supported: ${String(chunk)}`);
};
Expand Down
5 changes: 5 additions & 0 deletions test/string.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ test('get stream with truncated UTF-8 sequences', async t => {
t.is(result, `${multiByteString.slice(0, -1)}${INVALID_UTF8_MARKER}`);
});

test('handles truncated UTF-8 sequences over maxBuffer', async t => {
const maxBuffer = multiByteString.length - 1;
await t.throwsAsync(setupString(multiByteBuffer.slice(0, -1), {maxBuffer}), {instanceOf: MaxBufferError});
});

test('get stream with invalid UTF-8 sequences', async t => {
const result = await setupString(multiByteBuffer.slice(1, 2));
t.is(result, INVALID_UTF8_MARKER);
Expand Down

0 comments on commit 7e191bb

Please sign in to comment.