-
Notifications
You must be signed in to change notification settings - Fork 214
/
CsvFormatterStream.ts
68 lines (61 loc) · 2.21 KB
/
CsvFormatterStream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import { Transform, TransformCallback } from 'stream';
import { FormatterOptions } from './FormatterOptions';
import { Row, RowTransformFunction } from './types';
import { RowFormatter } from './formatter';
export class CsvFormatterStream<I extends Row, O extends Row> extends Transform {
private formatterOptions: FormatterOptions<I, O>;
private rowFormatter: RowFormatter<I, O>;
private hasWrittenBOM = false;
public constructor(formatterOptions: FormatterOptions<I, O>) {
super({ objectMode: formatterOptions.objectMode });
this.formatterOptions = formatterOptions;
this.rowFormatter = new RowFormatter(formatterOptions);
// if writeBOM is false then set to true
// if writeBOM is true then set to false by default so it is written out
this.hasWrittenBOM = !formatterOptions.writeBOM;
}
public transform(transformFunction: RowTransformFunction<I, O>): CsvFormatterStream<I, O> {
this.rowFormatter.rowTransform = transformFunction;
return this;
}
public _transform(row: I, encoding: string, cb: TransformCallback): void {
let cbCalled = false;
try {
if (!this.hasWrittenBOM) {
this.push(this.formatterOptions.BOM);
this.hasWrittenBOM = true;
}
this.rowFormatter.format(row, (err, rows): void => {
if (err) {
cbCalled = true;
return cb(err);
}
if (rows) {
rows.forEach((r): void => {
this.push(Buffer.from(r, 'utf8'));
});
}
cbCalled = true;
return cb();
});
} catch (e) {
if (cbCalled) {
throw e;
}
cb(e);
}
}
public _flush(cb: TransformCallback): void {
this.rowFormatter.finish((err, rows): void => {
if (err) {
return cb(err);
}
if (rows) {
rows.forEach((r): void => {
this.push(Buffer.from(r, 'utf8'));
});
}
return cb();
});
}
}