-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbatcher.ts
35 lines (31 loc) · 1.09 KB
/
batcher.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import {Transform} from 'stream';
/**
Batcher transforms a stream of <T> into a stream of Array<T>, where each array
has at most `size` elements (the last chunk may have fewer, but more than 0).
*/
export class Batcher<T> extends Transform {
protected batchBuffer: any[] = [];
constructor(protected batchSize: number) {
super({objectMode: true});
}
/**
checkFlush is called by both _transform and _flush, with different `end` values.
*/
protected checkFlush(end: boolean, callback: (error?: Error) => void) {
if (this.batchBuffer.length >= this.batchSize || (this.batchBuffer.length > 0 && end)) {
// splice(index, number_to_remove, number_to_insert) returns the removed items
const batch = this.batchBuffer.splice(0, this.batchSize);
this.push(batch);
}
callback();
}
_transform(chunk: T,
encoding: BufferEncoding,
callback: (error?: Error, outputChunk?: T[]) => void) {
this.batchBuffer.push(chunk);
this.checkFlush(false, callback);
}
_flush(callback: (error?: Error) => void) {
this.checkFlush(true, callback);
}
}