Skip to content

Commit

Permalink
Web TransformStream
Browse files Browse the repository at this point in the history
  • Loading branch information
tugrul committed May 5, 2024
1 parent dde6a3b commit 848e651
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 12 deletions.
25 changes: 23 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ npm install aline
yarn add aline
```

## Sample
## Sample Node.js Stream Transform
```javascript
const Aline = require('aline');
const {Aline} = require('aline');
const {Readable} = require('stream');

async function* generate() {
Expand All @@ -34,3 +34,24 @@ stream.on('data', function(chunk) {
console.log(chunk.toString());
});
```

## Sample Web TransformStream
```javascript
const {AlineWeb} = require('aline');

async function main() {
const response = await fetch('https://example.com/big.jsonl');

if (!response.body) {
return;
}

const readable = response.body
.pipeThrough(new TransformStream(new AlineWeb()))
.pipeThrough(new TextDecoderStream());

for await (const batch of readable) {
const jsonLines = batch.split(/\n/g).map(line => JSON.parse(line));
}
}
```
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
{
"name": "aline",
"version": "0.0.9",
"version": "1.0.0",
"description": "Align stream chunks to bound of lines",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"engines": {
"node": ">=v6.4.0"
"node": ">=v20.0.0"
},
"scripts": {
"test": "jest",
Expand Down
123 changes: 120 additions & 3 deletions src/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,79 @@

import { Readable, Transform } from 'stream';
import { Readable, Transform } from 'node:stream';

import Aline from '.'
import { Aline, AlineWeb, concat, lastIndexOf } from '.'

describe('concat two Uint8Array', () => {
test('regular two array', () => {
const left = new Uint8Array([1, 2, 3]);
const right = new Uint8Array([4, 5, 6]);

const combined = Array.from(concat(left, right));

expect(combined).toEqual([1, 2, 3, 4, 5, 6]);
});

test('left array is empty', () => {
const left = new Uint8Array();
const right = new Uint8Array([4, 5, 6]);

const combined = Array.from(concat(left, right));

expect(combined).toEqual([4, 5, 6]);
});

test('right array is empty', () => {
const left = new Uint8Array([1, 2, 3]);
const right = new Uint8Array();

const combined = Array.from(concat(left, right));

expect(combined).toEqual([1, 2, 3]);
});
});



describe('lastIndexOf Uint8Array in Uint8Array', () => {
test('regular two array', () => {
const left = new Uint8Array([1, 2, 3, 4, 5, 6]);

expect(lastIndexOf(left, new Uint8Array([1]))).toBe(0);

expect(lastIndexOf(left, new Uint8Array([1, 2, 3]))).toBe(0);

expect(lastIndexOf(left, new Uint8Array([6]))).toBe(5);

expect(lastIndexOf(left, new Uint8Array([6, 7, 8]))).toBe(-1);

expect(lastIndexOf(left, new Uint8Array([3, 4, 5]))).toBe(2);

expect(lastIndexOf(left, new Uint8Array([3, 4, 5, 6]))).toBe(2);

expect(lastIndexOf(left, new Uint8Array([3, 4, 5, 7]))).toBe(-1);

expect(lastIndexOf(left, new Uint8Array([3, 4]))).toBe(2);

expect(lastIndexOf(left, new Uint8Array([3]))).toBe(2);

expect(lastIndexOf(left, new Uint8Array([3, 5]))).toBe(-1);

expect(lastIndexOf(left, new Uint8Array([7]))).toBe(-1);

expect(lastIndexOf(left, new Uint8Array([4, 5, 6, 7]))).toBe(-1);
});

test('empty array', () => {
const fill = new Uint8Array([1, 2, 3]);
const empty = new Uint8Array();

expect(lastIndexOf(fill, empty)).toBe(-1);

expect(lastIndexOf(empty, fill)).toBe(-1);

expect(lastIndexOf(empty, empty)).toBe(-1);
});
});


function combineData<T extends Transform>(stream: T) {
Expand Down Expand Up @@ -62,5 +134,50 @@ const fixture: Array<FixtureItem> = [
['\nfoo\n']]
];

fixture.forEach(([name, generate, target]) => test(name, async() => expect(await combineData(Readable.from(generate()).pipe(new Aline()))).toEqual(target)));
describe('node transform', () => {
fixture.forEach(([name, generate, target]) => test(name, async() => {
expect(await combineData(Readable.from(generate()).pipe(new Aline()))).toEqual(target)
}));
});


function readableStreamFrom<T>(iterable: AsyncIterable<T>) {
return new ReadableStream<T>({
async start(controller): Promise<void> {
for await (const chunk of iterable) {
controller.enqueue(chunk);
}

controller.close();
}
});
}

async function arrayFromAsync<T>(readable: ReadableStream<T>): Promise<Array<T>> {
const chunks: Array<T> = [];

const reader = readable.getReader();

while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
chunks.push(value);
}

reader.releaseLock();

return chunks;
}

describe('web transform stream', () => {

fixture.forEach(([name, generate, target]) => test(name, async() => {
expect(await arrayFromAsync(readableStreamFrom(generate())
.pipeThrough(new TextEncoderStream())
.pipeThrough(new TransformStream(new AlineWeb()))
.pipeThrough(new TextDecoderStream()))).toEqual(target);
}));
});

115 changes: 110 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@


import { Transform } from 'stream';
import type { TransformCallback, TransformOptions } from 'stream';
import { Transform } from 'node:stream';
import type { TransformCallback, TransformOptions } from 'node:stream';

type AlineOptions = TransformOptions & {separator: string};
type AlineOptions = {separator: string};

export default class Aline extends Transform {
export class Aline extends Transform {

_tail: Buffer;
_separator: string;

constructor(options?: AlineOptions) {
constructor(options?: TransformOptions & AlineOptions) {
super();

this._tail = Buffer.alloc(0);
Expand Down Expand Up @@ -43,3 +43,108 @@ export default class Aline extends Transform {
callback(null, this._tail);
}
}


/**
* Concatenates two Uint8Array instances into a single Uint8Array.
* @param left - The first Uint8Array to concatenate.
* @param right - The second Uint8Array to concatenate.
* @returns A new Uint8Array containing the concatenated elements of the input arrays.
* If one of the input arrays is empty, the function returns the other array unchanged.
* If both input arrays are empty, an empty Uint8Array is returned.
*/
export function concat(left: Uint8Array, right: Uint8Array): Uint8Array {
if (left.length === 0) {
return right;
}

if (right.length === 0) {
return left;
}

const merged = new Uint8Array(left.length + right.length);
merged.set(left);
merged.set(right, left.length);
return merged;
}

/**
* Finds the last occurrence of an exact sequence of bytes (data) within another sequence of bytes (right).
* @param data - The sequence of bytes to search within.
* @param right - The exact sequence of bytes to search for.
* @returns The index of the last occurrence of the specified exact sequence of bytes within the given sequence, or -1 if not found.
*/
export function lastIndexOf(data: Uint8Array, search: Uint8Array): number {

if (search.length === 0 || data.length === 0) {
return -1;
}

const index = data.lastIndexOf(search[0]);

if (index > -1) {
if (data.length - index < search.length) {
return -1;
}

for (let i = 0; i < search.length; i++) {
if (data[index + i] !== search[i]) {
return -1;
}
}
}

return index;
}


export class AlineWeb implements Transformer<Uint8Array, Uint8Array> {

_tail: Uint8Array;
_seperator: Uint8Array;

constructor(options?: AlineOptions) {
const encoder = new TextEncoder();

this._seperator = encoder.encode((options && options.separator) || '\n');
this._tail = new Uint8Array();
}

start() {
this._tail = new Uint8Array();
}

transform(chunk: Uint8Array, controller: TransformStreamDefaultController<Uint8Array>): void {

if (!chunk) {
return;
}

const index = lastIndexOf(chunk, this._seperator);

if (index === -1) {
this._tail = concat(this._tail, chunk);
return;
}

if (index === chunk.length - 1) {
const tail = this._tail;
this._tail = new Uint8Array();
const data = concat(tail, chunk);
data.length > 0 && controller.enqueue(data);
return;
}

const head = concat(this._tail, chunk.slice(0, index + 1));
this._tail = chunk.slice(index + 1);

head.length > 0 && controller.enqueue(head);
}

flush(controller: TransformStreamDefaultController<Uint8Array>): void {
this._tail.length > 0 && controller.enqueue(this._tail);
}
}



0 comments on commit 848e651

Please sign in to comment.