From ed7c904247d6226e763f96ec9a60017ca6785223 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B8rge=20N=C3=A6ss?= Date: Fri, 26 Jan 2024 11:57:35 +0100 Subject: [PATCH] feat(migrate): add bufferThroughFile utility --- .../__test__/bufferThroughFile.test.ts | 89 ++++++++++++++++ .../src/fs-webstream/__test__/full.ndjson | 100 ++++++++++++++++++ .../src/fs-webstream/__test__/partial.ndjson | 11 ++ .../src/fs-webstream/bufferThroughFile.ts | 72 +++++++++++++ 4 files changed, 272 insertions(+) create mode 100644 packages/@sanity/migrate/src/fs-webstream/__test__/bufferThroughFile.test.ts create mode 100644 packages/@sanity/migrate/src/fs-webstream/__test__/full.ndjson create mode 100644 packages/@sanity/migrate/src/fs-webstream/__test__/partial.ndjson create mode 100644 packages/@sanity/migrate/src/fs-webstream/bufferThroughFile.ts diff --git a/packages/@sanity/migrate/src/fs-webstream/__test__/bufferThroughFile.test.ts b/packages/@sanity/migrate/src/fs-webstream/__test__/bufferThroughFile.test.ts new file mode 100644 index 00000000000..7fda7c0a582 --- /dev/null +++ b/packages/@sanity/migrate/src/fs-webstream/__test__/bufferThroughFile.test.ts @@ -0,0 +1,89 @@ +/* eslint-disable no-constant-condition */ +import {stat} from 'node:fs/promises' + +import {bufferThroughFile} from '../bufferThroughFile' +import {ndjson} from '../../it-utils' +import {streamAsyncIterator} from '../../utils/streamToAsyncIterator' +import {asyncIterableToStream} from '../../utils/asyncIterableToStream' + +const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)) + +test('stops buffering when the consumer is done', async () => { + const encoder = new TextEncoder() + async function* gen() { + for (let n = 0; n < 1000; n++) { + yield encoder.encode(`{"foo": ${n},`) + // simulate a bit of delay in the producer (which is often the case) + await sleep(10) + yield encoder.encode(`"bar": ${n}, "baz": ${n}}`) + yield encoder.encode('\n') + } + } + + const bufferFile = `${__dirname}/partial.ndjson` + const fileBufferStream = bufferThroughFile(asyncIterableToStream(gen()), bufferFile) + + const lines = [] + for await (const chunk of ndjson(streamAsyncIterator(fileBufferStream))) { + lines.push(chunk) + if (lines.length === 3) { + // we only pick 3 lines and break out of the iteration. This should stop the buffering + break + } + // simulate a slow consumer + // (the bufferThroughFile stream should still continue to write to the file as fast as possible) + await sleep(50) + } + + expect(lines).toEqual([ + {bar: 0, baz: 0, foo: 0}, + {bar: 1, baz: 1, foo: 1}, + {bar: 2, baz: 2, foo: 2}, + ]) + + // Note: the stream needs to be explicitly cancelled, otherwise the source stream will run to completion + // would be nice if there was a way to "unref()" the file handle to prevent it from blocking the process, + // but I don't think there is + await fileBufferStream.cancel() + + // This asserts that buffer file contains more bytes than the 3 lines above represents + const bufferFileSize = (await stat(bufferFile)).size + + expect(bufferFileSize).toBeGreaterThan(90) + // but not the full 1000 lines + expect(bufferFileSize).toBe(311) +}) + +test('it runs to completion if consumer needs it', async () => { + const encoder = new TextEncoder() + async function* gen() { + for (let n = 0; n < 100; n++) { + yield encoder.encode(`{"foo": ${n},`) + // simulate a bit of delay in the producer (which is often the case) + await sleep(1) + yield encoder.encode(`"bar": ${n}, "baz": ${n}}`) + yield encoder.encode('\n') + } + } + + const bufferFile = `${__dirname}/full.ndjson` + const fileBufferStream = bufferThroughFile(asyncIterableToStream(gen()), bufferFile) + + const lines = [] + for await (const chunk of ndjson(streamAsyncIterator(fileBufferStream))) { + if (lines.length < 3) { + // in contrast to the test above, we don't break out of the iteration early, but let it run to completion + lines.push(chunk) + } + } + await fileBufferStream.cancel() + + expect(lines).toEqual([ + {bar: 0, baz: 0, foo: 0}, + {bar: 1, baz: 1, foo: 1}, + {bar: 2, baz: 2, foo: 2}, + ]) + + // This asserts that buffer file contains all the yielded lines + expect((await stat(bufferFile)).size).toBe(3270) +}) diff --git a/packages/@sanity/migrate/src/fs-webstream/__test__/full.ndjson b/packages/@sanity/migrate/src/fs-webstream/__test__/full.ndjson new file mode 100644 index 00000000000..519132587d0 --- /dev/null +++ b/packages/@sanity/migrate/src/fs-webstream/__test__/full.ndjson @@ -0,0 +1,100 @@ +{"foo": 0,"bar": 0, "baz": 0} +{"foo": 1,"bar": 1, "baz": 1} +{"foo": 2,"bar": 2, "baz": 2} +{"foo": 3,"bar": 3, "baz": 3} +{"foo": 4,"bar": 4, "baz": 4} +{"foo": 5,"bar": 5, "baz": 5} +{"foo": 6,"bar": 6, "baz": 6} +{"foo": 7,"bar": 7, "baz": 7} +{"foo": 8,"bar": 8, "baz": 8} +{"foo": 9,"bar": 9, "baz": 9} +{"foo": 10,"bar": 10, "baz": 10} +{"foo": 11,"bar": 11, "baz": 11} +{"foo": 12,"bar": 12, "baz": 12} +{"foo": 13,"bar": 13, "baz": 13} +{"foo": 14,"bar": 14, "baz": 14} +{"foo": 15,"bar": 15, "baz": 15} +{"foo": 16,"bar": 16, "baz": 16} +{"foo": 17,"bar": 17, "baz": 17} +{"foo": 18,"bar": 18, "baz": 18} +{"foo": 19,"bar": 19, "baz": 19} +{"foo": 20,"bar": 20, "baz": 20} +{"foo": 21,"bar": 21, "baz": 21} +{"foo": 22,"bar": 22, "baz": 22} +{"foo": 23,"bar": 23, "baz": 23} +{"foo": 24,"bar": 24, "baz": 24} +{"foo": 25,"bar": 25, "baz": 25} +{"foo": 26,"bar": 26, "baz": 26} +{"foo": 27,"bar": 27, "baz": 27} +{"foo": 28,"bar": 28, "baz": 28} +{"foo": 29,"bar": 29, "baz": 29} +{"foo": 30,"bar": 30, "baz": 30} +{"foo": 31,"bar": 31, "baz": 31} +{"foo": 32,"bar": 32, "baz": 32} +{"foo": 33,"bar": 33, "baz": 33} +{"foo": 34,"bar": 34, "baz": 34} +{"foo": 35,"bar": 35, "baz": 35} +{"foo": 36,"bar": 36, "baz": 36} +{"foo": 37,"bar": 37, "baz": 37} +{"foo": 38,"bar": 38, "baz": 38} +{"foo": 39,"bar": 39, "baz": 39} +{"foo": 40,"bar": 40, "baz": 40} +{"foo": 41,"bar": 41, "baz": 41} +{"foo": 42,"bar": 42, "baz": 42} +{"foo": 43,"bar": 43, "baz": 43} +{"foo": 44,"bar": 44, "baz": 44} +{"foo": 45,"bar": 45, "baz": 45} +{"foo": 46,"bar": 46, "baz": 46} +{"foo": 47,"bar": 47, "baz": 47} +{"foo": 48,"bar": 48, "baz": 48} +{"foo": 49,"bar": 49, "baz": 49} +{"foo": 50,"bar": 50, "baz": 50} +{"foo": 51,"bar": 51, "baz": 51} +{"foo": 52,"bar": 52, "baz": 52} +{"foo": 53,"bar": 53, "baz": 53} +{"foo": 54,"bar": 54, "baz": 54} +{"foo": 55,"bar": 55, "baz": 55} +{"foo": 56,"bar": 56, "baz": 56} +{"foo": 57,"bar": 57, "baz": 57} +{"foo": 58,"bar": 58, "baz": 58} +{"foo": 59,"bar": 59, "baz": 59} +{"foo": 60,"bar": 60, "baz": 60} +{"foo": 61,"bar": 61, "baz": 61} +{"foo": 62,"bar": 62, "baz": 62} +{"foo": 63,"bar": 63, "baz": 63} +{"foo": 64,"bar": 64, "baz": 64} +{"foo": 65,"bar": 65, "baz": 65} +{"foo": 66,"bar": 66, "baz": 66} +{"foo": 67,"bar": 67, "baz": 67} +{"foo": 68,"bar": 68, "baz": 68} +{"foo": 69,"bar": 69, "baz": 69} +{"foo": 70,"bar": 70, "baz": 70} +{"foo": 71,"bar": 71, "baz": 71} +{"foo": 72,"bar": 72, "baz": 72} +{"foo": 73,"bar": 73, "baz": 73} +{"foo": 74,"bar": 74, "baz": 74} +{"foo": 75,"bar": 75, "baz": 75} +{"foo": 76,"bar": 76, "baz": 76} +{"foo": 77,"bar": 77, "baz": 77} +{"foo": 78,"bar": 78, "baz": 78} +{"foo": 79,"bar": 79, "baz": 79} +{"foo": 80,"bar": 80, "baz": 80} +{"foo": 81,"bar": 81, "baz": 81} +{"foo": 82,"bar": 82, "baz": 82} +{"foo": 83,"bar": 83, "baz": 83} +{"foo": 84,"bar": 84, "baz": 84} +{"foo": 85,"bar": 85, "baz": 85} +{"foo": 86,"bar": 86, "baz": 86} +{"foo": 87,"bar": 87, "baz": 87} +{"foo": 88,"bar": 88, "baz": 88} +{"foo": 89,"bar": 89, "baz": 89} +{"foo": 90,"bar": 90, "baz": 90} +{"foo": 91,"bar": 91, "baz": 91} +{"foo": 92,"bar": 92, "baz": 92} +{"foo": 93,"bar": 93, "baz": 93} +{"foo": 94,"bar": 94, "baz": 94} +{"foo": 95,"bar": 95, "baz": 95} +{"foo": 96,"bar": 96, "baz": 96} +{"foo": 97,"bar": 97, "baz": 97} +{"foo": 98,"bar": 98, "baz": 98} +{"foo": 99,"bar": 99, "baz": 99} diff --git a/packages/@sanity/migrate/src/fs-webstream/__test__/partial.ndjson b/packages/@sanity/migrate/src/fs-webstream/__test__/partial.ndjson new file mode 100644 index 00000000000..178dee2b1dc --- /dev/null +++ b/packages/@sanity/migrate/src/fs-webstream/__test__/partial.ndjson @@ -0,0 +1,11 @@ +{"foo": 0,"bar": 0, "baz": 0} +{"foo": 1,"bar": 1, "baz": 1} +{"foo": 2,"bar": 2, "baz": 2} +{"foo": 3,"bar": 3, "baz": 3} +{"foo": 4,"bar": 4, "baz": 4} +{"foo": 5,"bar": 5, "baz": 5} +{"foo": 6,"bar": 6, "baz": 6} +{"foo": 7,"bar": 7, "baz": 7} +{"foo": 8,"bar": 8, "baz": 8} +{"foo": 9,"bar": 9, "baz": 9} +{"foo": 10, \ No newline at end of file diff --git a/packages/@sanity/migrate/src/fs-webstream/bufferThroughFile.ts b/packages/@sanity/migrate/src/fs-webstream/bufferThroughFile.ts new file mode 100644 index 00000000000..e715c978e55 --- /dev/null +++ b/packages/@sanity/migrate/src/fs-webstream/bufferThroughFile.ts @@ -0,0 +1,72 @@ +import {FileHandle, open} from 'node:fs/promises' + +const CHUNK_SIZE = 1024 + +/** + * Takes a source stream that will be drained and written to the provided file name as fast as possible. + * and returns a readable stream that reads from the same buffer file. The returned stream can be read at any rate. + * Note: the returned stream needs to be explicitly cancelled, otherwise it will run to completion and potentially + * prevent the process from exiting until the whole source stream has been written. + * @param source - The source readable stream. Will be drained as fast as possible. + * @param filename - The filename to write to. + */ +export function bufferThroughFile(source: ReadableStream, filename: string) { + let fileHandle: FileHandle + let totalBytesRead = 0 + let totalBytesWritten = 0 + + let bufferDone = false + let readerDone = false + + async function pump(reader: ReadableStreamDefaultReader) { + try { + // eslint-disable-next-line no-constant-condition + while (true) { + const {done, value} = await reader.read() + if (done || readerDone) return + await fileHandle.write(value) + totalBytesWritten += value.byteLength + } + } finally { + reader.releaseLock() + } + } + + async function tryReadFromBuffer() { + const {bytesRead, buffer} = await fileHandle.read( + new Uint8Array(CHUNK_SIZE), + 0, + CHUNK_SIZE, + totalBytesRead, + ) + if (bytesRead === 0 && !bufferDone && !readerDone) { + // we're waiting for more data to be written to the buffer file, try again + return tryReadFromBuffer() + } + return {bytesRead, buffer} + } + + return new ReadableStream({ + async start() { + fileHandle = await open(filename, 'w+') + // Note: do not await this, as it will block the stream from starting + pump(source.getReader()).then(() => { + bufferDone = true + }) + }, + async pull(controller) { + const {bytesRead, buffer} = await tryReadFromBuffer() + if (bytesRead === 0 && bufferDone) { + await fileHandle.close() + controller.close() + } else { + totalBytesRead += bytesRead + controller.enqueue(buffer.subarray(0, bytesRead)) + } + }, + async cancel() { + readerDone = true + await fileHandle?.close() + }, + }) +}