Skip to content

Commit

Permalink
feat: Performance improvements and new async api (#360)
Browse files Browse the repository at this point in the history
* Update dependencies

* Memoize preprocessRow

* Skip preprocessing if no preprocessing options enabled

* Improve Regexps

* Remove unnecessary intermediate variable

* Replace indexOf by includes

* Improve fields gathering

* Fix a bug that created nested field getters if reusing the same parser

* Flatten using push and fallback to concat

* Add new Async Parser

* Add tests for new Async Parser

* Ensure that async tests finish properly even when there are exceptions

* Create promise before pushing the data in the AsyncParser
  • Loading branch information
juanjoDiaz authored and knownasilya committed Mar 10, 2019
1 parent 58bd0ae commit d59dea1
Show file tree
Hide file tree
Showing 16 changed files with 4,453 additions and 3,169 deletions.
82 changes: 76 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,12 @@ The programatic APIs take a configuration object very equivalent to the CLI opti

`json2csv` can also be use programatically as a synchronous converter using its `parse` method.
```javascript
const Json2csvParser = require('json2csv').Parser;
const { Parser } = require('json2csv');
const fields = ['field1', 'field2', 'field3'];
const opts = { fields };

try {
const parser = new Json2csvParser(opts);
const parser = new Parser(opts);
const csv = parser.parse(myData);
console.log(csv);
} catch (err) {
Expand All @@ -220,13 +220,83 @@ try {
}
```

### json2csv transform (Streaming API)
Both of the methods above load the entire JSON in memory and do the whole processing in-memory while blocking Javascript event loop. For that reason is rarely a good reason to use it until your data is very small or your application doesn't do anything else.

### json2csv async parser (Streaming API)

The synchronous API has the downside of loading the entire JSON array in memory and blocking javascript's event loop while processing the data. This means that you server won't be able to process more request or your UI will become irresponsive while data is being processed. For those reasons, is rarely a good reason to use it unless your data is very small or your application doesn't do anything else.

The async parser process the data as a non-blocking stream. This approach ensures a consistent memory footprint and avoid blocking javascript's event loop. Thus, it's better suited for large datasets or system with high concurrency.

The parse method is really good but has the downside of loading the entire JSON array in memory. This might not be optimal or even possible for large JSON files.
One very important difference between the asynchronous and the synchronous APIs is that using the asynchronous API json objects are processed one by one. In practice, this means that only the fields in the first object of the array are automatically detected and other fields are just ignored. To avoid this, it's advisable to ensure that all the objects contain exactly the same fields or provide the list of fields using the `fields` option.

The async API uses takes a second options arguments that's directly passed to the underlying streams and accept the same options as the standard [Node.js streams](https://nodejs.org/api/stream.html#stream_new_stream_duplex_options).

Instances of `AsyncParser` expose three objects:
* *input:* Which allows to push more data
* *processor:* A readable string representing the whole data processing. You can listen to all the standard events of Node.js streams.
* *transform:* The json2csv transform. See bellow for more details.

```javascript
const { AsyncParser } = require('json2csv');
const fields = ['field1', 'field2', 'field3'];
const opts = { fields };
const transformOpts = { highWaterMark: 8192 };

const asyncParser = new JSON2CSVAsyncParser(opts, transformOpts);

let csv = '';
asyncParser.processor
.on('data', chunk => (csv += chunk.toString()))
.on('end', () => console.log(csv))
.on('error', err => console.error(err));

// You can also listen for events on the conversion and see how the header or the lines are coming out.
asyncParser.transform
.on('header', header => console.log(header))
.on('line', line => console.log(line))
.on('error', err => console.log(err));

asyncParser.input.push(data); // This data might come from an HTTP request, etc.
asyncParser.input.push(null); // Sending `null` to a stream signal that no more data is expected and ends it.
```

For such cases json2csv offers a stream transform so pipe your json content into it and it will output it.
`AsyncParser` also exposes some convenience methods:
* `fromInput` allows you to set the input stream.
* `throughTransform` allows you to add transforms to the stream.
* `toOutput` allows you to set the output stream.
* `promise` returns a promise that resolves when the stream ends or errors.

```javascript
const fs = require('fs');
const { AsyncParser } = require('json2csv');
const fields = ['field1', 'field2', 'field3'];
const opts = { fields };
const transformOpts = { highWaterMark: 8192 };

const input = fs.createReadStream(inputPath, { encoding: 'utf8' });
const output = fs.createWriteStream(outputPath, { encoding: 'utf8' });
const asyncParser = new JSON2CSVAsyncParser(opts, transformOpts);
asyncParser.fromInput(input).toOutput(output).promise()
.then(csv => console.log(csv))
.catch(err => console.error(err));;
```

you can also use the convenience method `parseAsync` which returns a promise.

```javascript
const { parseAsync } = require('json2csv');
const fields = ['field1', 'field2', 'field3'];
const opts = { fields };

parseAsync(myData, opts)
.then(csv => console.log(csv))
.catch(err => console.error(err));
```

### json2csv transform (Streaming API)

One very important difference between the transform and the parser is that the json objects are processed one by one. In practice, this means that only the fields in the first object of the array are considered and fields in other other objects that were not present in the first one are just ignored. To avoid this. It's advisable to ensure that all the objects contain exactly the same fields or provide the list of fields using the `fields` option.
json2csv also exposes the raw stream transform so you can pipe your json content into it. This is the same Transform that `AsyncParser` uses under the hood.

```javascript
const fs = require('fs');
Expand Down
6 changes: 3 additions & 3 deletions bin/json2csv.js
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,11 @@ Promise.resolve()
});
})
.catch((err) => {
if (inputPath && err.message.indexOf(inputPath) !== -1) {
if (inputPath && err.message.includes(inputPath)) {
err = new Error('Invalid input file. (' + err.message + ')');
} else if (outputPath && err.message.indexOf(outputPath) !== -1) {
} else if (outputPath && err.message.includes(outputPath)) {
err = new Error('Invalid output file. (' + err.message + ')');
} else if (fieldsConfigPath && err.message.indexOf(fieldsConfigPath) !== -1) {
} else if (fieldsConfigPath && err.message.includes(fieldsConfigPath)) {
err = new Error('Invalid fields config file. (' + err.message + ')');
}
// eslint-disable-next-line no-console
Expand Down
52 changes: 52 additions & 0 deletions lib/JSON2CSVAsyncParser.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
'use strict';

const { Transform } = require('stream');
const JSON2CSVTransform = require('./JSON2CSVTransform');

class JSON2CSVAsyncParser {
constructor(opts, transformOpts) {
this.input = new Transform(transformOpts);
this.input._read = () => {};

this.transform = new JSON2CSVTransform(opts, transformOpts);
this.processor = this.input.pipe(this.transform);
}

fromInput(input) {
if (this._input) {
throw new Error('Async parser already has an input.');
}
this._input = input;
this.input = this._input.pipe(this.processor);
return this;
}

throughTransform(transform) {
if (this._output) {
throw new Error('Can\'t add transforms once an output has been added.');
}
this.processor = this.processor.pipe(transform);
return this;
}

toOutput(output) {
if (this._output) {
throw new Error('Async parser already has an output.');
}
this._output = output;
this.processor = this.processor.pipe(output);
return this;
}

promise() {
return new Promise((resolve, reject) => {
let csv = '';
this.processor
.on('data', chunk => (csv += chunk.toString()))
.on('finish', () => resolve(csv))
.on('error', err => reject(err));
});
}
}

module.exports = JSON2CSVAsyncParser
72 changes: 41 additions & 31 deletions lib/JSON2CSVBase.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,12 @@

const os = require('os');
const lodashGet = require('lodash.get');

const setProp = (obj, path, value) => {
const pathArray = Array.isArray(path) ? path : path.split('.');
const key = pathArray[0];
const newValue = pathArray.length > 1 ? setProp(obj[key] || {}, pathArray.slice(1), value) : value;
return Object.assign({}, obj, { [key]: newValue });
};
const { setProp, flattenReducer } = require('./utils');

class JSON2CSVBase {
constructor(opts) {
this.opts = this.preprocessOpts(opts);
this.preprocessRow = this.memoizePreprocessRow();
}

/**
Expand Down Expand Up @@ -103,22 +98,39 @@ class JSON2CSVBase {
.join(this.opts.delimiter);
}

memoizePreprocessRow() {
if (this.opts.unwind && this.opts.unwind.length) {
if (this.opts.flatten) {
return function (row) {
return this.unwindData(row, this.opts.unwind)
.map(row => this.flatten(row, this.opts.flattenSeparator));
};
}

return function (row) {
return this.unwindData(row, this.opts.unwind);
};
}

if (this.opts.flatten) {
return function (row) {
return [this.flatten(row, this.opts.flattenSeparator)];
};
}

return function (row) {
return [row];
};
}

/**
* Preprocess each object according to the give opts (unwind, flatten, etc.).
* The actual body of the function is dynamically set on the constructor by the
* `memoizePreprocessRow` method after parsing the options.
*
* @param {Object} row JSON object to be converted in a CSV row
*/
preprocessRow(row) {
const processedRow = (this.opts.unwind && this.opts.unwind.length)
? this.unwindData(row, this.opts.unwind)
: [row];

if (this.opts.flatten) {
return processedRow.map(row => this.flatten(row, this.opts.flattenSeparator));
}

return processedRow;
}
preprocessRow() {}

/**
* Create the content of a specific CSV row
Expand Down Expand Up @@ -175,7 +187,7 @@ class JSON2CSVBase {
? JSON.stringify(value)
: value);

if (typeof value === 'object' && !/^"(.*)"$/.test(stringifiedValue)) {
if (typeof value === 'object' && !/^".*"$/.test(stringifiedValue)) {
// Stringify object that are not stringified to a
// JSON string (like Date) to escape commas, quotes, etc.
stringifiedValue = JSON.stringify(stringifiedValue);
Expand All @@ -192,24 +204,22 @@ class JSON2CSVBase {
.replace(/\u21E5/g, '\t');
}

// Replace automatically scaped single quotes by doubleQuotes
stringifiedValue = stringifiedValue
.replace(/\\"(?!$)/g, this.opts.doubleQuote);

if (this.opts.quote === '"') {
// Replace automatically scaped single quotes by doubleQuotes
stringifiedValue = stringifiedValue
.replace(/(\\")(?!$)/g, this.opts.doubleQuote);
} else {
// Unescape automatically escaped double quote symbol
if (this.opts.quote !== '"') {
// Replace single quote with double quote
// Replace wrapping quotes
stringifiedValue = stringifiedValue
.replace(/(\\")(?!$)/g, '"')
.replace(new RegExp(this.opts.quote, 'g'), this.opts.doubleQuote)
.replace(/^"(.*)"$/, this.opts.quote + '$1' + this.opts.quote);
.replace(/^"/, this.opts.quote)
.replace(/"$/, this.opts.quote);
}

// Remove double backslashes
stringifiedValue = stringifiedValue
.replace(/\\\\/g, '\\');
// Remove double backslashes
stringifiedValue = stringifiedValue
.replace(/\\\\/g, '\\');

if (this.opts.excelStrings && typeof value === 'string') {
stringifiedValue = '"="' + stringifiedValue + '""';
Expand Down Expand Up @@ -281,7 +291,7 @@ class JSON2CSVBase {
return setProp(clonedRow, unwindPath, unwindRow);
});
})
.reduce((a, e) => a.concat(e), []);
.reduce(flattenReducer, []);
};

return unwindPaths.reduce(unwind, [dataRow]);
Expand Down
31 changes: 23 additions & 8 deletions lib/JSON2CSVParser.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
'use strict';

const JSON2CSVBase = require('./JSON2CSVBase');
const { flattenReducer } = require('./utils');

class JSON2CSVParser extends JSON2CSVBase {
constructor(opts) {
super(opts);
if (this.opts.fields) {
this.opts.fields = this.preprocessFieldsInfo(this.opts.fields);
}
}
/**
* Main function that converts json to csv.
*
Expand All @@ -13,16 +20,20 @@ class JSON2CSVParser extends JSON2CSVBase {
const processedData = this.preprocessData(data);

if (!this.opts.fields) {
const dataFields = processedData
.map(item => Object.keys(item))
.reduce((tempData, rows) => tempData.concat(rows), []);
this.opts.fields = processedData
.reduce((fields, item) => {
Object.keys(item).forEach((field) => {
if (!fields.includes(field)) {
fields.push(field)
}
});

this.opts.fields = dataFields
.filter((field, pos, arr) => arr.indexOf(field) == pos);
return fields
}, []);

this.opts.fields = this.preprocessFieldsInfo(this.opts.fields);
}

this.opts.fields = this.preprocessFieldsInfo(this.opts.fields);

const header = this.opts.header ? this.getHeader() : '';
const rows = this.processData(processedData);
const csv = (this.opts.withBOM ? '\ufeff' : '')
Expand All @@ -46,9 +57,13 @@ class JSON2CSVParser extends JSON2CSVBase {
throw new Error('Data should not be empty or the "fields" option should be included');
}

if ((!this.opts.unwind || !this.opts.unwind.length) && !this.opts.flatten) {
return processedData;
}

return processedData
.map(row => this.preprocessRow(row))
.reduce((tempData, rows) => tempData.concat(rows), []);
.reduce(flattenReducer, []);
}

/**
Expand Down
10 changes: 5 additions & 5 deletions lib/JSON2CSVTransform.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';

const Transform = require('stream').Transform;
const { Transform } = require('stream');
const Parser = require('jsonparse');
const JSON2CSVBase = require('./JSON2CSVBase');

Expand All @@ -13,6 +13,7 @@ class JSON2CSVTransform extends Transform {
Object.getOwnPropertyNames(JSON2CSVBase.prototype)
.forEach(key => (this[key] = JSON2CSVBase.prototype[key]));
this.opts = this.preprocessOpts(opts);
this.preprocessRow = this.memoizePreprocessRow();

this._data = '';
this._hasWritten = false;
Expand Down Expand Up @@ -118,6 +119,7 @@ class JSON2CSVTransform extends Transform {
&& this.mode !== Parser.C.OBJECT) {
this.onError(new Error('Data should not be empty or the "fields" option should be included'));
}

if (this.stack.length === 1) {
if(this.depthToEmit === undefined) {
// If Array emit its content, else emit itself
Expand All @@ -136,7 +138,7 @@ class JSON2CSVTransform extends Transform {
}

this.parser.onError = function (err) {
if(err.message.indexOf('Unexpected') > -1) {
if(err.message.includes('Unexpected')) {
err.message = 'Invalid JSON (' + err.message + ')';
}
transform.emit('error', err);
Expand Down Expand Up @@ -192,10 +194,8 @@ class JSON2CSVTransform extends Transform {
processedData.forEach(row => {
const line = this.processRow(row, this.opts);
if (line === undefined) return;
const eoledLine = (this._hasWritten ? this.opts.eol : '')
+ line;
this.emit('line', line);
this.push(eoledLine);
this.push(this._hasWritten ? this.opts.eol + line : line);
this._hasWritten = true;
});
}
Expand Down
Loading

0 comments on commit d59dea1

Please sign in to comment.