Skip to content

Commit

Permalink
move code
Browse files Browse the repository at this point in the history
  • Loading branch information
domoritz committed Jul 31, 2022
1 parent fe1198e commit 6c1e093
Showing 1 changed file with 33 additions and 33 deletions.
66 changes: 33 additions & 33 deletions js/src/bin/arrow2csv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,39 +44,6 @@ type ToStringState = {
maxColWidths: number[];
};

try {
const sources = argv.help ? [] : [
...files.map((file) => () => fs.createReadStream(file)),
...(process.stdin.isTTY ? [] : [() => process.stdin])
].filter(Boolean) as (() => NodeJS.ReadableStream)[];

let reader: RecordBatchReader | null;
let hasReaders = false;

for (const source of sources) {
if (state.closed) { break; }
for await (reader of recordBatchReaders(source)) {
hasReaders = true;
const transformToString = batchesToString(state, reader.schema);
await pipeTo(
reader.pipe(transformToString),
process.stdout, { end: false }
).catch(() => state.closed = true); // Handle EPIPE errors
}
if (state.closed) { break; }
}

const code = hasReaders ? 0 : print_usage();

process.exit(+code || 0);
} catch (err: any) {
if (err) {
console.error(`${err?.stack || err}`);
}

process.exit(process.exitCode || 1);
}

function pipeTo(source: NodeJS.ReadableStream, sink: NodeJS.WritableStream, opts?: { end: boolean }) {
return new Promise((resolve, reject) => {

Expand Down Expand Up @@ -337,3 +304,36 @@ function print_usage() {
]));
return 1;
}

try {
const sources = argv.help ? [] : [
...files.map((file) => () => fs.createReadStream(file)),
...(process.stdin.isTTY ? [] : [() => process.stdin])
].filter(Boolean) as (() => NodeJS.ReadableStream)[];

let reader: RecordBatchReader | null;
let hasReaders = false;

for (const source of sources) {
if (state.closed) { break; }
for await (reader of recordBatchReaders(source)) {
hasReaders = true;
const transformToString = batchesToString(state, reader.schema);
await pipeTo(
reader.pipe(transformToString),
process.stdout, { end: false }
).catch(() => state.closed = true); // Handle EPIPE errors
}
if (state.closed) { break; }
}

const code = hasReaders ? 0 : print_usage();

process.exit(+code || 0);
} catch (err: any) {
if (err) {
console.error(`${err?.stack || err}`);
}

process.exit(process.exitCode || 1);
}

0 comments on commit 6c1e093

Please sign in to comment.