Skip to content

Commit

Permalink
Added headers event #321
Browse files Browse the repository at this point in the history
* [ADDED] A new `headers` event that will be emitted when headers are parsed #321
  • Loading branch information
doug-martin committed Feb 14, 2020
1 parent 87b13ab commit c0d8f72
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 15 deletions.
4 changes: 4 additions & 0 deletions History.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# v4.1.0

* [ADDED] A new `headers` event that will be emitted when headers are parsed [#321] (https://github.com/C2FO/fast-csv/issues/321)

# v4.0.3

* [FIXED] Issue where invalid rows were not accounted for when skipRows was set [#317](https://github.com/C2FO/fast-csv/issues/317)
Expand Down
5 changes: 5 additions & 0 deletions packages/parse/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ import * as format csv '@fast-csv/parse';
<a name="parsing-events"></a>
## Events
* `headers`: Emitted when the headers are parsed
* **NOTE** If the headers option is a function that transform headers, the array passed to this event will be the transformed headers
* **NOTE** If the headers option is set to an array of headers, the emitted header will be the option passed in.
* **NOTE** If the headers option is set to `true` the headers will be the parsed headers from the csv.
* **NOTE** If headers is set to `false` or the csv has no rows then the event WILL NOT be emitted.
* `data`: Emitted when a record is parsed.
* If headers are present then all rows will be an object.
* If headers are not present then all rows will be an array.
Expand Down
94 changes: 80 additions & 14 deletions packages/parse/__tests__/CsvParsingStream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,16 @@
import * as fs from 'fs';
import * as domain from 'domain';
import partition from 'lodash.partition';
import { ParserOptions, CsvParserStream, parseFile, ParserOptionsArgs, Row, RowMap, RowValidateCallback } from '../src';
import {
ParserOptions,
CsvParserStream,
parseFile,
ParserOptionsArgs,
Row,
RowMap,
RowValidateCallback,
HeaderArray,
} from '../src';
import {
PathAndContent,
ParseResults,
Expand Down Expand Up @@ -52,19 +61,7 @@ describe('CsvParserStream', () => {
data: PathAndContent<R>,
options: ParserOptionsArgs = {},
): Promise<ParseResults<R>> => {
return new Promise((res, rej) => {
const rows: R[] = [];
const invalidRows: Row[] = [];
const parser = createParserStream(options)
.on('data', row => rows.push(row))
.on('data-invalid', row => invalidRows.push(row))
.on('error', rej)
.on('end', (count: number) => {
res({ count, rows, invalidRows });
});
parser.write(data.content);
parser.end();
});
return parseContentAndCollectFromStream(data, createParserStream(options));
};

it('should parse a csv without quotes or escapes', () =>
Expand Down Expand Up @@ -461,6 +458,75 @@ describe('CsvParserStream', () => {
listenForError(stream, "Parse Error: expected: ',' OR new line got: 'a'. at 'a \", Las", next);
});

describe('headers event', () => {
it('should emit a headers event one time when headers are discovered', async () => {
const parsedHeaders: string[] = [];
let eventCount = 0;
const stream = createParserStream({ headers: true });
stream.on('headers', hs => {
eventCount += 1;
parsedHeaders.push(...hs);
});
await expectParsed(parseContentAndCollectFromStream(withHeaders, stream), withHeaders.parsed);
expect(eventCount).toBe(1);
expect(parsedHeaders).toEqual(['first_name', 'last_name', 'email_address']);
});

it('should emit a headers event one time with transformed headers', async () => {
const parsedHeaders: string[] = [];
let eventCount = 0;
const headersTransform = (hs: HeaderArray): HeaderArray => hs.map(h => h?.toUpperCase());
const stream = createParserStream({ headers: headersTransform });
stream.on('headers', hs => {
eventCount += 1;
parsedHeaders.push(...hs);
});
await expectParsed(
parseContentAndCollectFromStream(withHeaders, stream),
withHeaders.parsed.map(r => ({
FIRST_NAME: r.first_name,
LAST_NAME: r.last_name,
EMAIL_ADDRESS: r.email_address,
})),
);
expect(eventCount).toBe(1);
expect(parsedHeaders).toEqual(['FIRST_NAME', 'LAST_NAME', 'EMAIL_ADDRESS']);
});

it('should emit a headers provided headers', async () => {
const parsedHeaders: string[] = [];
let eventCount = 0;
const headers = ['first_name', 'last_name', 'email_address', 'address'];
const stream = createParserStream({ headers });
stream.on('headers', hs => {
eventCount += 1;
parsedHeaders.push(...hs);
});
const expected = noHeadersAndQuotes.parsed.map(r => ({
first_name: r[0],
last_name: r[1],
email_address: r[2],
address: r[3],
}));
await expectParsed(parseContentAndCollectFromStream(noHeadersAndQuotes, stream), expected);
expect(eventCount).toBe(1);
expect(parsedHeaders).toEqual(headers);
});

it('should not a headers provided headers', async () => {
const parsedHeaders: string[] = [];
let eventCount = 0;
const stream = createParserStream();
stream.on('headers', hs => {
eventCount += 1;
parsedHeaders.push(...hs);
});
await expectParsed(parseContentAndCollectFromStream(noHeadersAndQuotes, stream), noHeadersAndQuotes.parsed);
expect(eventCount).toBe(0);
expect(parsedHeaders).toHaveLength(0);
});
});

describe('#validate', () => {
const syncValidator = (row: RowMap): boolean =>
parseInt(row.first_name ? row.first_name.replace(/^First/, '') : '0', 10) % 2 === 1;
Expand Down
10 changes: 10 additions & 0 deletions packages/parse/src/CsvParserStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ export class CsvParserStream<I extends Row, O extends Row> extends Transform {

private endEmitted = false;

private headersEmitted = false;

public constructor(parserOptions: ParserOptions) {
super({ objectMode: parserOptions.objectMode });
this.parserOptions = parserOptions;
Expand Down Expand Up @@ -121,6 +123,7 @@ export class CsvParserStream<I extends Row, O extends Row> extends Transform {
}
return iterate(i + 1);
};
this.checkAndEmitHeaders();
// if we have emitted all rows or we have hit the maxRows limit option
// then end
if (i >= rowsLength || this.hasHitRowLimit) {
Expand Down Expand Up @@ -186,6 +189,13 @@ export class CsvParserStream<I extends Row, O extends Row> extends Transform {
}
}

private checkAndEmitHeaders(): void {
if (!this.headersEmitted && this.headerTransformer.headers) {
this.headersEmitted = true;
this.emit('headers', this.headerTransformer.headers);
}
}

private skipRow(cb: RowValidatorCallback<O>): void {
// skipped because of skipRows option remove from total row count
this.rowCount -= 1;
Expand Down
2 changes: 1 addition & 1 deletion packages/parse/src/transforms/HeaderTransformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
export class HeaderTransformer<O extends Row> {
private readonly parserOptions: ParserOptions;

private headers: HeaderArray | null = null;
headers: HeaderArray | null = null;

private receivedHeaders = false;

Expand Down

0 comments on commit c0d8f72

Please sign in to comment.