Skip to content

Commit

Permalink
feat(csv-parse): implement TransformStream (#445)
Browse files Browse the repository at this point in the history
* feat(csv-parse): implement TransformStream

* feat(csv-parse): add test api.web_stream and fix controller.terminate

* feat(csv-parse): added errors handle to TransformStream

* test(csv-parse): error thrown by webstream

* build(csv-parse): exclude web_stream ts test from ci in node 14

* fix(csv-parse): satisfy test and re-habilitate node stream import

---------

Co-authored-by: David Worms <[email protected]>
  • Loading branch information
uasan and wdavidw authored Nov 20, 2024
1 parent 6ecf900 commit 1213de8
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 27 deletions.
11 changes: 11 additions & 0 deletions packages/csv-parse/dist/esm/stream.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@

import { Options } from './index.js';

declare function parse(options?: Options): TransformStream;
// export default parse;
export { parse };

export {
CastingContext, CastingFunction, CastingDateFunction,
ColumnOption, Options, Info, CsvErrorCode, CsvError
} from './index.js';
11 changes: 11 additions & 0 deletions packages/csv-parse/lib/stream.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@

import { Options } from './index.js';

declare function parse(options?: Options): TransformStream;
// export default parse;
export { parse };

export {
CastingContext, CastingFunction, CastingDateFunction,
ColumnOption, Options, Info, CsvErrorCode, CsvError
} from './index.js';
58 changes: 32 additions & 26 deletions packages/csv-parse/lib/stream.js
Original file line number Diff line number Diff line change
@@ -1,34 +1,40 @@
import { TransformStream } from "node:stream/web";
import { TransformStream, CountQueuingStrategy } from "node:stream/web";
import { transform } from "./api/index.js";

const parse = (opts) => {
const api = transform(opts);
return new TransformStream({
async transform(chunk, controller) {
api.parse(
chunk,
false,
(record) => {
controller.enqueue(record);
},
() => {
controller.close();
},
);
},
async flush(controller) {
api.parse(
undefined,
true,
(record) => {
controller.enqueue(record);
},
() => {
controller.close();
},
);
let controller;
const enqueue = (record) => {
controller.enqueue(record);
};
const terminate = () => {
controller.terminate();
};
return new TransformStream(
{
start(ctr) {
controller = ctr;
},
transform(chunk) {
const error = api.parse(chunk, false, enqueue, terminate);

if (error) {
controller.error(error);
throw error;
}
},
flush() {
const error = api.parse(undefined, true, enqueue, terminate);

if (error) {
controller.error(error);
throw error;
}
},
},
});
new CountQueuingStrategy({ highWaterMark: 1024 }),
new CountQueuingStrategy({ highWaterMark: 1024 }),
);
};

export { parse };
12 changes: 11 additions & 1 deletion packages/csv-parse/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@
"default": "./dist/cjs/sync.cjs"
}
},
"./stream": {
"import": {
"types": "./lib/stream.d.ts",
"default": "./lib/stream.js"
},
"require": {
"types": "./dist/cjs/stream.d.cts",
"default": "./dist/cjs/stream.cjs"
}
},
"./browser/esm": {
"types": "./dist/esm/index.d.ts",
"default": "./dist/esm/index.js"
Expand Down Expand Up @@ -123,7 +133,7 @@
"preversion": "npm run build && git add dist",
"pretest": "npm run build",
"test": "mocha 'test/**/*.{coffee,ts}'",
"test:legacy": "mocha --ignore test/api.web_stream.coffee --ignore test/api.stream.finished.coffee --ignore test/api.stream.iterator.coffee --loader=./test/loaders/legacy/all.js 'test/**/*.{coffee,ts}'"
"test:legacy": "mocha --ignore test/api.web_stream.coffee --ignore test/api.web_stream.ts --ignore test/api.stream.finished.coffee --ignore test/api.stream.iterator.coffee --loader=./test/loaders/legacy/all.js 'test/**/*.{coffee,ts}'"
},
"type": "module",
"types": "dist/esm/index.d.ts",
Expand Down
45 changes: 45 additions & 0 deletions packages/csv-parse/test/api.web_stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import 'should'
import {parse as parseStream} from '../lib/stream.js'
import { CsvError } from '../lib/index.js'

describe('API Web Stream', () => {

describe('stream/web/TransformStream', () => {

it('simple parse', async () => {
const stream = parseStream();
const writer = stream.writable.getWriter();
const reader = stream.readable.getReader();
await writer.write(Buffer.from("A,B,C\nD,E,F"));
await writer.close();
await reader.read().should.finally.eql({
done: false,
value: ['A', 'B', 'C'],
});
await reader.read().should.finally.eql({
done: false,
value: ['D', 'E', 'F'],
});
await reader.read().should.finally.eql({
done: true,
value: undefined,
});
})

it("cat error parse", async function () {
const stream = parseStream();
const writer = stream.writable.getWriter();
try {
await writer.write(Buffer.from("A,B,C\nD,E"));
await writer.close();
throw Error("Shall not be called");
} catch (err) {
if (!(err instanceof CsvError)) {
throw Error("Invalid error type");
}
err.code.should.eql("CSV_RECORD_INCONSISTENT_FIELDS_LENGTH");
}
});

})
})

0 comments on commit 1213de8

Please sign in to comment.