Skip to content

Commit

Permalink
feat(migrate): add bufferThroughFile utility
Browse files Browse the repository at this point in the history
  • Loading branch information
bjoerge committed Jan 30, 2024
1 parent 0563e7b commit ed7c904
Show file tree
Hide file tree
Showing 4 changed files with 272 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/* eslint-disable no-constant-condition */
import {stat} from 'node:fs/promises'

import {bufferThroughFile} from '../bufferThroughFile'
import {ndjson} from '../../it-utils'
import {streamAsyncIterator} from '../../utils/streamToAsyncIterator'
import {asyncIterableToStream} from '../../utils/asyncIterableToStream'

const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms))

test('stops buffering when the consumer is done', async () => {
const encoder = new TextEncoder()
async function* gen() {
for (let n = 0; n < 1000; n++) {
yield encoder.encode(`{"foo": ${n},`)
// simulate a bit of delay in the producer (which is often the case)
await sleep(10)
yield encoder.encode(`"bar": ${n}, "baz": ${n}}`)
yield encoder.encode('\n')
}
}

const bufferFile = `${__dirname}/partial.ndjson`
const fileBufferStream = bufferThroughFile(asyncIterableToStream(gen()), bufferFile)

const lines = []
for await (const chunk of ndjson(streamAsyncIterator(fileBufferStream))) {
lines.push(chunk)
if (lines.length === 3) {
// we only pick 3 lines and break out of the iteration. This should stop the buffering
break
}
// simulate a slow consumer
// (the bufferThroughFile stream should still continue to write to the file as fast as possible)
await sleep(50)
}

expect(lines).toEqual([
{bar: 0, baz: 0, foo: 0},
{bar: 1, baz: 1, foo: 1},
{bar: 2, baz: 2, foo: 2},
])

// Note: the stream needs to be explicitly cancelled, otherwise the source stream will run to completion
// would be nice if there was a way to "unref()" the file handle to prevent it from blocking the process,
// but I don't think there is
await fileBufferStream.cancel()

// This asserts that buffer file contains more bytes than the 3 lines above represents
const bufferFileSize = (await stat(bufferFile)).size

expect(bufferFileSize).toBeGreaterThan(90)
// but not the full 1000 lines
expect(bufferFileSize).toBe(311)
})

test('it runs to completion if consumer needs it', async () => {
const encoder = new TextEncoder()
async function* gen() {
for (let n = 0; n < 100; n++) {
yield encoder.encode(`{"foo": ${n},`)
// simulate a bit of delay in the producer (which is often the case)
await sleep(1)
yield encoder.encode(`"bar": ${n}, "baz": ${n}}`)
yield encoder.encode('\n')
}
}

const bufferFile = `${__dirname}/full.ndjson`
const fileBufferStream = bufferThroughFile(asyncIterableToStream(gen()), bufferFile)

const lines = []
for await (const chunk of ndjson(streamAsyncIterator(fileBufferStream))) {
if (lines.length < 3) {
// in contrast to the test above, we don't break out of the iteration early, but let it run to completion
lines.push(chunk)
}
}
await fileBufferStream.cancel()

expect(lines).toEqual([
{bar: 0, baz: 0, foo: 0},
{bar: 1, baz: 1, foo: 1},
{bar: 2, baz: 2, foo: 2},
])

// This asserts that buffer file contains all the yielded lines
expect((await stat(bufferFile)).size).toBe(3270)
})
100 changes: 100 additions & 0 deletions packages/@sanity/migrate/src/fs-webstream/__test__/full.ndjson
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
{"foo": 0,"bar": 0, "baz": 0}
{"foo": 1,"bar": 1, "baz": 1}
{"foo": 2,"bar": 2, "baz": 2}
{"foo": 3,"bar": 3, "baz": 3}
{"foo": 4,"bar": 4, "baz": 4}
{"foo": 5,"bar": 5, "baz": 5}
{"foo": 6,"bar": 6, "baz": 6}
{"foo": 7,"bar": 7, "baz": 7}
{"foo": 8,"bar": 8, "baz": 8}
{"foo": 9,"bar": 9, "baz": 9}
{"foo": 10,"bar": 10, "baz": 10}
{"foo": 11,"bar": 11, "baz": 11}
{"foo": 12,"bar": 12, "baz": 12}
{"foo": 13,"bar": 13, "baz": 13}
{"foo": 14,"bar": 14, "baz": 14}
{"foo": 15,"bar": 15, "baz": 15}
{"foo": 16,"bar": 16, "baz": 16}
{"foo": 17,"bar": 17, "baz": 17}
{"foo": 18,"bar": 18, "baz": 18}
{"foo": 19,"bar": 19, "baz": 19}
{"foo": 20,"bar": 20, "baz": 20}
{"foo": 21,"bar": 21, "baz": 21}
{"foo": 22,"bar": 22, "baz": 22}
{"foo": 23,"bar": 23, "baz": 23}
{"foo": 24,"bar": 24, "baz": 24}
{"foo": 25,"bar": 25, "baz": 25}
{"foo": 26,"bar": 26, "baz": 26}
{"foo": 27,"bar": 27, "baz": 27}
{"foo": 28,"bar": 28, "baz": 28}
{"foo": 29,"bar": 29, "baz": 29}
{"foo": 30,"bar": 30, "baz": 30}
{"foo": 31,"bar": 31, "baz": 31}
{"foo": 32,"bar": 32, "baz": 32}
{"foo": 33,"bar": 33, "baz": 33}
{"foo": 34,"bar": 34, "baz": 34}
{"foo": 35,"bar": 35, "baz": 35}
{"foo": 36,"bar": 36, "baz": 36}
{"foo": 37,"bar": 37, "baz": 37}
{"foo": 38,"bar": 38, "baz": 38}
{"foo": 39,"bar": 39, "baz": 39}
{"foo": 40,"bar": 40, "baz": 40}
{"foo": 41,"bar": 41, "baz": 41}
{"foo": 42,"bar": 42, "baz": 42}
{"foo": 43,"bar": 43, "baz": 43}
{"foo": 44,"bar": 44, "baz": 44}
{"foo": 45,"bar": 45, "baz": 45}
{"foo": 46,"bar": 46, "baz": 46}
{"foo": 47,"bar": 47, "baz": 47}
{"foo": 48,"bar": 48, "baz": 48}
{"foo": 49,"bar": 49, "baz": 49}
{"foo": 50,"bar": 50, "baz": 50}
{"foo": 51,"bar": 51, "baz": 51}
{"foo": 52,"bar": 52, "baz": 52}
{"foo": 53,"bar": 53, "baz": 53}
{"foo": 54,"bar": 54, "baz": 54}
{"foo": 55,"bar": 55, "baz": 55}
{"foo": 56,"bar": 56, "baz": 56}
{"foo": 57,"bar": 57, "baz": 57}
{"foo": 58,"bar": 58, "baz": 58}
{"foo": 59,"bar": 59, "baz": 59}
{"foo": 60,"bar": 60, "baz": 60}
{"foo": 61,"bar": 61, "baz": 61}
{"foo": 62,"bar": 62, "baz": 62}
{"foo": 63,"bar": 63, "baz": 63}
{"foo": 64,"bar": 64, "baz": 64}
{"foo": 65,"bar": 65, "baz": 65}
{"foo": 66,"bar": 66, "baz": 66}
{"foo": 67,"bar": 67, "baz": 67}
{"foo": 68,"bar": 68, "baz": 68}
{"foo": 69,"bar": 69, "baz": 69}
{"foo": 70,"bar": 70, "baz": 70}
{"foo": 71,"bar": 71, "baz": 71}
{"foo": 72,"bar": 72, "baz": 72}
{"foo": 73,"bar": 73, "baz": 73}
{"foo": 74,"bar": 74, "baz": 74}
{"foo": 75,"bar": 75, "baz": 75}
{"foo": 76,"bar": 76, "baz": 76}
{"foo": 77,"bar": 77, "baz": 77}
{"foo": 78,"bar": 78, "baz": 78}
{"foo": 79,"bar": 79, "baz": 79}
{"foo": 80,"bar": 80, "baz": 80}
{"foo": 81,"bar": 81, "baz": 81}
{"foo": 82,"bar": 82, "baz": 82}
{"foo": 83,"bar": 83, "baz": 83}
{"foo": 84,"bar": 84, "baz": 84}
{"foo": 85,"bar": 85, "baz": 85}
{"foo": 86,"bar": 86, "baz": 86}
{"foo": 87,"bar": 87, "baz": 87}
{"foo": 88,"bar": 88, "baz": 88}
{"foo": 89,"bar": 89, "baz": 89}
{"foo": 90,"bar": 90, "baz": 90}
{"foo": 91,"bar": 91, "baz": 91}
{"foo": 92,"bar": 92, "baz": 92}
{"foo": 93,"bar": 93, "baz": 93}
{"foo": 94,"bar": 94, "baz": 94}
{"foo": 95,"bar": 95, "baz": 95}
{"foo": 96,"bar": 96, "baz": 96}
{"foo": 97,"bar": 97, "baz": 97}
{"foo": 98,"bar": 98, "baz": 98}
{"foo": 99,"bar": 99, "baz": 99}
11 changes: 11 additions & 0 deletions packages/@sanity/migrate/src/fs-webstream/__test__/partial.ndjson
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{"foo": 0,"bar": 0, "baz": 0}
{"foo": 1,"bar": 1, "baz": 1}
{"foo": 2,"bar": 2, "baz": 2}
{"foo": 3,"bar": 3, "baz": 3}
{"foo": 4,"bar": 4, "baz": 4}
{"foo": 5,"bar": 5, "baz": 5}
{"foo": 6,"bar": 6, "baz": 6}
{"foo": 7,"bar": 7, "baz": 7}
{"foo": 8,"bar": 8, "baz": 8}
{"foo": 9,"bar": 9, "baz": 9}
{"foo": 10,
72 changes: 72 additions & 0 deletions packages/@sanity/migrate/src/fs-webstream/bufferThroughFile.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import {FileHandle, open} from 'node:fs/promises'

const CHUNK_SIZE = 1024

/**
* Takes a source stream that will be drained and written to the provided file name as fast as possible.
* and returns a readable stream that reads from the same buffer file. The returned stream can be read at any rate.
* Note: the returned stream needs to be explicitly cancelled, otherwise it will run to completion and potentially
* prevent the process from exiting until the whole source stream has been written.
* @param source - The source readable stream. Will be drained as fast as possible.
* @param filename - The filename to write to.
*/
export function bufferThroughFile(source: ReadableStream, filename: string) {
let fileHandle: FileHandle
let totalBytesRead = 0
let totalBytesWritten = 0

let bufferDone = false
let readerDone = false

async function pump(reader: ReadableStreamDefaultReader) {
try {
// eslint-disable-next-line no-constant-condition
while (true) {
const {done, value} = await reader.read()
if (done || readerDone) return
await fileHandle.write(value)
totalBytesWritten += value.byteLength
}
} finally {
reader.releaseLock()
}
}

async function tryReadFromBuffer() {
const {bytesRead, buffer} = await fileHandle.read(
new Uint8Array(CHUNK_SIZE),
0,
CHUNK_SIZE,
totalBytesRead,
)
if (bytesRead === 0 && !bufferDone && !readerDone) {
// we're waiting for more data to be written to the buffer file, try again
return tryReadFromBuffer()
}
return {bytesRead, buffer}
}

return new ReadableStream({
async start() {
fileHandle = await open(filename, 'w+')
// Note: do not await this, as it will block the stream from starting
pump(source.getReader()).then(() => {
bufferDone = true
})
},
async pull(controller) {
const {bytesRead, buffer} = await tryReadFromBuffer()
if (bytesRead === 0 && bufferDone) {
await fileHandle.close()
controller.close()
} else {
totalBytesRead += bytesRead
controller.enqueue(buffer.subarray(0, bytesRead))
}
},
async cancel() {
readerDone = true
await fileHandle?.close()
},
})
}

0 comments on commit ed7c904

Please sign in to comment.