diff --git a/README.md b/README.md index d98ec82..f76daad 100644 --- a/README.md +++ b/README.md @@ -18,9 +18,9 @@ npm install aline yarn add aline ``` -## Sample +## Sample Node.js Stream Transform ```javascript -const Aline = require('aline'); +const {Aline} = require('aline'); const {Readable} = require('stream'); async function* generate() { @@ -34,3 +34,24 @@ stream.on('data', function(chunk) { console.log(chunk.toString()); }); ``` + +## Sample Web TransformStream +```javascript +const {AlineWeb} = require('aline'); + +async function main() { + const response = await fetch('https://example.com/big.jsonl'); + + if (!response.body) { + return; + } + + const readable = response.body + .pipeThrough(new TransformStream(new AlineWeb())) + .pipeThrough(new TextDecoderStream()); + + for await (const batch of readable) { + const jsonLines = batch.split(/\n/g).map(line => JSON.parse(line)); + } +} +``` diff --git a/package.json b/package.json index 172cd0a..c8d5a8e 100644 --- a/package.json +++ b/package.json @@ -1,11 +1,11 @@ { "name": "aline", - "version": "0.0.9", + "version": "1.0.0", "description": "Align stream chunks to bound of lines", "main": "./dist/index.js", "types": "./dist/index.d.ts", "engines": { - "node": ">=v6.4.0" + "node": ">=v20.0.0" }, "scripts": { "test": "jest", diff --git a/src/index.test.ts b/src/index.test.ts index 22b301a..7c6de3d 100644 --- a/src/index.test.ts +++ b/src/index.test.ts @@ -1,7 +1,79 @@ -import { Readable, Transform } from 'stream'; +import { Readable, Transform } from 'node:stream'; -import Aline from '.' +import { Aline, AlineWeb, concat, lastIndexOf } from '.' + +describe('concat two Uint8Array', () => { + test('regular two array', () => { + const left = new Uint8Array([1, 2, 3]); + const right = new Uint8Array([4, 5, 6]); + + const combined = Array.from(concat(left, right)); + + expect(combined).toEqual([1, 2, 3, 4, 5, 6]); + }); + + test('left array is empty', () => { + const left = new Uint8Array(); + const right = new Uint8Array([4, 5, 6]); + + const combined = Array.from(concat(left, right)); + + expect(combined).toEqual([4, 5, 6]); + }); + + test('right array is empty', () => { + const left = new Uint8Array([1, 2, 3]); + const right = new Uint8Array(); + + const combined = Array.from(concat(left, right)); + + expect(combined).toEqual([1, 2, 3]); + }); +}); + + + +describe('lastIndexOf Uint8Array in Uint8Array', () => { + test('regular two array', () => { + const left = new Uint8Array([1, 2, 3, 4, 5, 6]); + + expect(lastIndexOf(left, new Uint8Array([1]))).toBe(0); + + expect(lastIndexOf(left, new Uint8Array([1, 2, 3]))).toBe(0); + + expect(lastIndexOf(left, new Uint8Array([6]))).toBe(5); + + expect(lastIndexOf(left, new Uint8Array([6, 7, 8]))).toBe(-1); + + expect(lastIndexOf(left, new Uint8Array([3, 4, 5]))).toBe(2); + + expect(lastIndexOf(left, new Uint8Array([3, 4, 5, 6]))).toBe(2); + + expect(lastIndexOf(left, new Uint8Array([3, 4, 5, 7]))).toBe(-1); + + expect(lastIndexOf(left, new Uint8Array([3, 4]))).toBe(2); + + expect(lastIndexOf(left, new Uint8Array([3]))).toBe(2); + + expect(lastIndexOf(left, new Uint8Array([3, 5]))).toBe(-1); + + expect(lastIndexOf(left, new Uint8Array([7]))).toBe(-1); + + expect(lastIndexOf(left, new Uint8Array([4, 5, 6, 7]))).toBe(-1); + }); + + test('empty array', () => { + const fill = new Uint8Array([1, 2, 3]); + const empty = new Uint8Array(); + + expect(lastIndexOf(fill, empty)).toBe(-1); + + expect(lastIndexOf(empty, fill)).toBe(-1); + + expect(lastIndexOf(empty, empty)).toBe(-1); + }); +}); function combineData(stream: T) { @@ -62,5 +134,50 @@ const fixture: Array = [ ['\nfoo\n']] ]; -fixture.forEach(([name, generate, target]) => test(name, async() => expect(await combineData(Readable.from(generate()).pipe(new Aline()))).toEqual(target))); +describe('node transform', () => { + fixture.forEach(([name, generate, target]) => test(name, async() => { + expect(await combineData(Readable.from(generate()).pipe(new Aline()))).toEqual(target) + })); +}); + + +function readableStreamFrom(iterable: AsyncIterable) { + return new ReadableStream({ + async start(controller): Promise { + for await (const chunk of iterable) { + controller.enqueue(chunk); + } + + controller.close(); + } + }); +} + +async function arrayFromAsync(readable: ReadableStream): Promise> { + const chunks: Array = []; + + const reader = readable.getReader(); + + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } + chunks.push(value); + } + + reader.releaseLock(); + + return chunks; +} + +describe('web transform stream', () => { + + fixture.forEach(([name, generate, target]) => test(name, async() => { + expect(await arrayFromAsync(readableStreamFrom(generate()) + .pipeThrough(new TextEncoderStream()) + .pipeThrough(new TransformStream(new AlineWeb())) + .pipeThrough(new TextDecoderStream()))).toEqual(target); + })); +}); diff --git a/src/index.ts b/src/index.ts index 5db393d..25601a8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,16 +1,16 @@ -import { Transform } from 'stream'; -import type { TransformCallback, TransformOptions } from 'stream'; +import { Transform } from 'node:stream'; +import type { TransformCallback, TransformOptions } from 'node:stream'; -type AlineOptions = TransformOptions & {separator: string}; +type AlineOptions = {separator: string}; -export default class Aline extends Transform { +export class Aline extends Transform { _tail: Buffer; _separator: string; - constructor(options?: AlineOptions) { + constructor(options?: TransformOptions & AlineOptions) { super(); this._tail = Buffer.alloc(0); @@ -43,3 +43,108 @@ export default class Aline extends Transform { callback(null, this._tail); } } + + +/** + * Concatenates two Uint8Array instances into a single Uint8Array. + * @param left - The first Uint8Array to concatenate. + * @param right - The second Uint8Array to concatenate. + * @returns A new Uint8Array containing the concatenated elements of the input arrays. + * If one of the input arrays is empty, the function returns the other array unchanged. + * If both input arrays are empty, an empty Uint8Array is returned. + */ +export function concat(left: Uint8Array, right: Uint8Array): Uint8Array { + if (left.length === 0) { + return right; + } + + if (right.length === 0) { + return left; + } + + const merged = new Uint8Array(left.length + right.length); + merged.set(left); + merged.set(right, left.length); + return merged; +} + +/** + * Finds the last occurrence of an exact sequence of bytes (data) within another sequence of bytes (right). + * @param data - The sequence of bytes to search within. + * @param right - The exact sequence of bytes to search for. + * @returns The index of the last occurrence of the specified exact sequence of bytes within the given sequence, or -1 if not found. + */ +export function lastIndexOf(data: Uint8Array, search: Uint8Array): number { + + if (search.length === 0 || data.length === 0) { + return -1; + } + + const index = data.lastIndexOf(search[0]); + + if (index > -1) { + if (data.length - index < search.length) { + return -1; + } + + for (let i = 0; i < search.length; i++) { + if (data[index + i] !== search[i]) { + return -1; + } + } + } + + return index; +} + + +export class AlineWeb implements Transformer { + + _tail: Uint8Array; + _seperator: Uint8Array; + + constructor(options?: AlineOptions) { + const encoder = new TextEncoder(); + + this._seperator = encoder.encode((options && options.separator) || '\n'); + this._tail = new Uint8Array(); + } + + start() { + this._tail = new Uint8Array(); + } + + transform(chunk: Uint8Array, controller: TransformStreamDefaultController): void { + + if (!chunk) { + return; + } + + const index = lastIndexOf(chunk, this._seperator); + + if (index === -1) { + this._tail = concat(this._tail, chunk); + return; + } + + if (index === chunk.length - 1) { + const tail = this._tail; + this._tail = new Uint8Array(); + const data = concat(tail, chunk); + data.length > 0 && controller.enqueue(data); + return; + } + + const head = concat(this._tail, chunk.slice(0, index + 1)); + this._tail = chunk.slice(index + 1); + + head.length > 0 && controller.enqueue(head); + } + + flush(controller: TransformStreamDefaultController): void { + this._tail.length > 0 && controller.enqueue(this._tail); + } +} + + +