Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(encoding/csv/streaming): Added skipFirstRow and columns options #3184

Merged
merged 8 commits into from
Feb 17, 2023
19 changes: 4 additions & 15 deletions encoding/csv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*/

import { assert } from "../_util/asserts.ts";
import type { ReadOptions } from "./csv/_io.ts";
import { convertRowToObject, type ReadOptions } from "./csv/_io.ts";
import { Parser } from "./csv/_parser.ts";

export {
Expand Down Expand Up @@ -407,31 +407,20 @@ export function parse(

if (opt.skipFirstRow || opt.columns) {
let headers: string[] = [];
let i = 0;

if (opt.skipFirstRow) {
const head = r.shift();
assert(head != null);
headers = head;
i++;
}

if (opt.columns) {
headers = opt.columns;
}

return r.map((e) => {
if (e.length !== headers.length) {
throw new Error(
`Error number of fields line: ${i}\nNumber of fields found: ${headers.length}\nExpected number of fields: ${e.length}`,
);
}
i++;
const out: Record<string, unknown> = {};
for (let j = 0; j < e.length; j++) {
out[headers[j]] = e[j];
}
return out;
const firstLineIndex = opt.skipFirstRow ? 1 : 0;
return r.map((row, i) => {
return convertRowToObject(row, headers, firstLineIndex + i);
});
}
return r;
Expand Down
24 changes: 24 additions & 0 deletions encoding/csv/_io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,27 @@ export const ERR_BARE_QUOTE = 'bare " in non-quoted-field';
export const ERR_QUOTE = 'extraneous or missing " in quoted-field';
export const ERR_INVALID_DELIM = "Invalid Delimiter";
export const ERR_FIELD_COUNT = "wrong number of fields";

export function convertRowToObject(
row: string[],
headers: string[],
index: number,
) {
if (row.length !== headers.length) {
throw new Error(
`Error number of fields line: ${index}\nNumber of fields found: ${headers.length}\nExpected number of fields: ${row.length}`,
);
}
const out: Record<string, unknown> = {};
for (let i = 0; i < row.length; i++) {
out[headers[i]] = row[i];
}
return out;
}

export type RowType<ParseOptions, T> = T extends
Omit<ParseOptions, "columns"> & { columns: string[] }
? Record<string, unknown>
: T extends Omit<ParseOptions, "skipFirstRow"> & { skipFirstRow: true }
? Record<string, unknown>
: string[];
63 changes: 53 additions & 10 deletions encoding/csv/stream.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
import { defaultReadOptions, parseRecord } from "./_io.ts";
import type { LineReader } from "./_io.ts";
import {
convertRowToObject,
defaultReadOptions,
type LineReader,
parseRecord,
type RowType,
} from "./_io.ts";
import { TextDelimiterStream } from "../../streams/text_delimiter_stream.ts";

export interface CsvStreamOptions {
separator?: string;
comment?: string;
skipFirstRow?: boolean;
columns?: string[];
}

class StreamLineReader implements LineReader {
Expand Down Expand Up @@ -39,29 +46,37 @@ function stripLastCR(s: string): string {
return s.endsWith("\r") ? s.slice(0, -1) : s;
}

export class CsvStream implements TransformStream<string, Array<string>> {
readonly #readable: ReadableStream<Array<string>>;
export class CsvStream<T extends CsvStreamOptions>
implements TransformStream<string, RowType<CsvStreamOptions, T>> {
readonly #readable: ReadableStream<
string[] | Record<string, string | unknown>
>;
readonly #options: CsvStreamOptions;
readonly #lineReader: StreamLineReader;
readonly #lines: TextDelimiterStream;
#lineIndex = 0;
#isFirstRow = true;

constructor(options: CsvStreamOptions = defaultReadOptions) {
#headers: string[] = [];

constructor(options: T = defaultReadOptions as T) {
this.#options = {
...defaultReadOptions,
...options,
};

this.#lines = new TextDelimiterStream("\n");
this.#lineReader = new StreamLineReader(this.#lines.readable.getReader());
this.#readable = new ReadableStream<Array<string>>({
this.#readable = new ReadableStream({
pull: (controller) => this.#pull(controller),
cancel: () => this.#lineReader.cancel(),
});
}

async #pull(
controller: ReadableStreamDefaultController<Array<string>>,
controller: ReadableStreamDefaultController<
string[] | Record<string, string | unknown>
>,
): Promise<void> {
const line = await this.#lineReader.readLine();
if (line === "") {
Expand All @@ -88,16 +103,44 @@ export class CsvStream implements TransformStream<string, Array<string>> {
return;
}

if (this.#isFirstRow) {
this.#isFirstRow = false;
if (this.#options.skipFirstRow || this.#options.columns) {
this.#headers = [];

if (this.#options.skipFirstRow) {
const head = record;
this.#headers = head;
}

if (this.#options.columns) {
this.#headers = this.#options.columns;
}
}

if (this.#options.skipFirstRow) {
return this.#pull(controller);
}
}

this.#lineIndex++;
if (record.length > 0) {
controller.enqueue(record);
if (this.#options.skipFirstRow || this.#options.columns) {
controller.enqueue(convertRowToObject(
record,
this.#headers,
this.#lineIndex,
));
} else {
controller.enqueue(record);
}
} else {
return this.#pull(controller);
}
}

get readable(): ReadableStream<Array<string>> {
return this.#readable;
get readable() {
return this.#readable as ReadableStream<RowType<CsvStreamOptions, T>>;
}

get writable(): WritableStream<string> {
Expand Down
142 changes: 138 additions & 4 deletions encoding/csv/stream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
assertRejects,
assertStringIncludes,
} from "../../testing/asserts.ts";
import type { AssertTrue, Has } from "../../testing/types.ts";
import { fromFileUrl, join } from "../../path/mod.ts";
import { StringReader } from "../../io/string_reader.ts";

Expand Down Expand Up @@ -266,6 +267,54 @@ x,,,
input: `""""""""`,
output: [[`"""`]],
},
{
name: "simple",
input: "a,b,c",
output: [["a", "b", "c"]],
skipFirstRow: false,
},
{
Comment on lines +270 to +276
Copy link
Contributor Author

@ayame113 ayame113 Feb 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These test cases were copied from csv_test.ts.

name: "multiline",
input: "a,b,c\ne,f,g\n",
output: [
["a", "b", "c"],
["e", "f", "g"],
],
skipFirstRow: false,
},
{
name: "header mapping boolean",
input: "a,b,c\ne,f,g\n",
output: [{ a: "e", b: "f", c: "g" }],
skipFirstRow: true,
},
{
name: "header mapping array",
input: "a,b,c\ne,f,g\n",
output: [
{ this: "a", is: "b", sparta: "c" },
{ this: "e", is: "f", sparta: "g" },
],
columns: ["this", "is", "sparta"],
},
{
name: "provides both opts.skipFirstRow and opts.columns",
input: "a,b,1\nc,d,2\ne,f,3",
output: [
{ foo: "c", bar: "d", baz: "2" },
{ foo: "e", bar: "f", baz: "3" },
],
skipFirstRow: true,
columns: ["foo", "bar", "baz"],
},
{
name: "mismatching number of headers and fields",
input: "a,b,c\nd,e",
skipFirstRow: true,
columns: ["foo", "bar", "baz"],
errorMessage:
"Error number of fields line: 1\nNumber of fields found: 3\nExpected number of fields: 2",
},
];
for (const testCase of testCases) {
await t.step(testCase.name, async () => {
Expand All @@ -276,13 +325,26 @@ x,,,
if (testCase.comment) {
options.comment = testCase.comment;
}
if (testCase.skipFirstRow) {
options.skipFirstRow = testCase.skipFirstRow;
}
if (testCase.columns) {
options.columns = testCase.columns;
}
const readable = createReadableStreamFromString(testCase.input)
.pipeThrough(new CsvStream(options));
const actual = [] as Array<Array<string>>;
for await (const record of readable) {
actual.push(record);

if (testCase.output) {
const actual = [] as Array<Array<string>>;
for await (const record of readable) {
actual.push(record);
}
assertEquals(actual, testCase.output);
} else {
await assertRejects(async () => {
for await (const _ of readable);
}, testCase.errorMessage);
}
assertEquals(actual, testCase.output);
});
}
},
Expand Down Expand Up @@ -324,3 +386,75 @@ Deno.test({
}
},
});

Deno.test({
name: "[encoding/csv/stream] correct typing",
fn() {
{
const { readable } = new CsvStream();
type _ = AssertTrue<Has<typeof readable, ReadableStream<string[]>>>;
}
{
const { readable } = new CsvStream({});
type _ = AssertTrue<Has<typeof readable, ReadableStream<string[]>>>;
}
{
const { readable } = new CsvStream({ skipFirstRow: undefined });
type _ = AssertTrue<Has<typeof readable, ReadableStream<string[]>>>;
}
{
const { readable } = new CsvStream({ skipFirstRow: false });
type _ = AssertTrue<Has<typeof readable, ReadableStream<string[]>>>;
}
{
const { readable } = new CsvStream({ skipFirstRow: true });
type _ = AssertTrue<
Has<typeof readable, ReadableStream<Record<string, unknown>>>
>;
}
{
const { readable } = new CsvStream({ columns: undefined });
type _ = AssertTrue<Has<typeof readable, ReadableStream<string[]>>>;
}
{
const { readable } = new CsvStream({ columns: ["aaa"] });
type _ = AssertTrue<
Has<typeof readable, ReadableStream<Record<string, unknown>>>
>;
}
{
const { readable } = new CsvStream({
skipFirstRow: false,
columns: undefined,
});
type _ = AssertTrue<Has<typeof readable, ReadableStream<string[]>>>;
}
{
const { readable } = new CsvStream({
skipFirstRow: true,
columns: undefined,
});
type _ = AssertTrue<
Has<typeof readable, ReadableStream<Record<string, unknown>>>
>;
}
{
const { readable } = new CsvStream({
skipFirstRow: false,
columns: ["aaa"],
});
type _ = AssertTrue<
Has<typeof readable, ReadableStream<Record<string, unknown>>>
>;
}
{
const { readable } = new CsvStream({
skipFirstRow: true,
columns: ["aaa"],
});
type _ = AssertTrue<
Has<typeof readable, ReadableStream<Record<string, unknown>>>
>;
}
},
});
Loading