-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
138 additions
and
114 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,102 +1,24 @@ | ||
import { createWriteStream } from 'fs' | ||
import { dirname, resolve } from 'path' | ||
import { promisify } from 'util' | ||
import { SpanStatusCode } from '@opentelemetry/api' | ||
import program from 'commander' | ||
import rdf from '@zazuko/env' | ||
import fromFile from 'rdf-utils-fs/fromFile.js' | ||
import { finished, PassThrough } from 'readable-stream' | ||
import fromStream from 'rdf-dataset-ext/fromStream.js' | ||
import findPipeline from '../findPipeline.js' | ||
import runner from '../runner.js' | ||
import bufferDebug from './bufferDebug.js' | ||
import tracer from './tracer.js' | ||
|
||
async function fileToDataset(filename) { | ||
return fromStream(rdf.dataset(), fromFile(filename)) | ||
} | ||
|
||
function createOutputStream(output) { | ||
if (output === '-') { | ||
// Use a PassThrough stream instead of just process.stdout to avoid closing | ||
// stdout too early | ||
const stream = new PassThrough() | ||
|
||
stream.pipe(process.stdout) | ||
|
||
return stream | ||
} | ||
|
||
return createWriteStream(output) | ||
} | ||
|
||
function setVariable(str, all) { | ||
let [key, value] = str.split('=', 2) | ||
|
||
if (typeof value === 'undefined') { | ||
value = process.env[key] | ||
} | ||
|
||
return all.set(key, value) | ||
} | ||
|
||
const runCommand = program | ||
import runAction from './cli/runAction.js' | ||
import * as otelOptions from './cli/otelOptions.js' | ||
import * as commonOptions from './cli/commonOptions.js' | ||
|
||
program | ||
.addOption(commonOptions.variable) | ||
.addOption(commonOptions.variableAll) | ||
.addOption(commonOptions.verbose) | ||
.addOption(commonOptions.enableBufferMonitor) | ||
.addOption(otelOptions.debug) | ||
.addOption(otelOptions.metricsExporter) | ||
.addOption(otelOptions.metricsInterval) | ||
.addOption(otelOptions.tracesExporter) | ||
|
||
program | ||
.command('run <filename>') | ||
.option('--output [filename]', 'output file', '-') | ||
.option('--pipeline [iri]', 'IRI of the pipeline description') | ||
.option('--variable <name=value>', 'variable key value pairs', setVariable, new Map()) | ||
.option('--variable-all', 'Import all environment variables') | ||
.option('-v, --verbose', 'enable diagnostic console output', (v, total) => ++total, 0) | ||
.option('--enable-buffer-monitor', 'enable histogram of buffer usage') | ||
.action(async (filename, { output, pipeline: iri, variable: variables, variableAll, verbose, enableBufferMonitor } = {}) => { | ||
await tracer.startActiveSpan('barnard59 run', async span => { | ||
try { | ||
const level = ['error', 'info', 'debug'][verbose] || 'error' | ||
|
||
const dataset = await fileToDataset(filename) | ||
const ptr = findPipeline(dataset, iri) | ||
|
||
if (variableAll) { | ||
for (const [key, value] of Object.entries(process.env)) { | ||
variables.set(key, value) | ||
} | ||
} | ||
|
||
span.setAttribute('iri', ptr.value) | ||
|
||
const outputStream = createOutputStream(output) | ||
const { finished: runFinished, pipeline } = await runner(ptr, { | ||
basePath: resolve(dirname(filename)), | ||
level, | ||
outputStream, | ||
variables, | ||
}) | ||
|
||
if (enableBufferMonitor) { | ||
bufferDebug(pipeline) | ||
} | ||
|
||
await runFinished | ||
// TODO: this has some issues | ||
await promisify(finished)(outputStream) | ||
} catch (err) { | ||
span.recordException(err) | ||
span.setStatus({ code: SpanStatusCode.ERROR, message: err.message }) | ||
throw err | ||
} finally { | ||
span.end() | ||
} | ||
}) | ||
}) | ||
|
||
async function run(commonOptions) { | ||
// Add the common options that were parsed earlier to properly have them | ||
// shown in --help | ||
for (const option of commonOptions) { | ||
runCommand.addOption(option) | ||
} | ||
.action(runAction) | ||
|
||
export default async function () { | ||
await program.parseAsync(process.argv) | ||
} | ||
|
||
export { run } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
import { Option } from 'commander' | ||
|
||
function setVariable(str, all) { | ||
let [key, value] = str.split('=', 2) | ||
|
||
if (typeof value === 'undefined') { | ||
value = process.env[key] | ||
} | ||
|
||
return all.set(key, value) | ||
} | ||
|
||
export const variable = new Option('--variable <name=value>', 'variable key value pairs') | ||
.default(new Map()) | ||
.argParser(setVariable) | ||
|
||
export const variableAll = new Option('--variable-all', 'Import all environment variables') | ||
|
||
export const verbose = new Option('-v, --verbose', 'enable diagnostic console output') | ||
.default(0) | ||
.argParser((v, total) => ++total) | ||
|
||
export const enableBufferMonitor = new Option('--enable-buffer-monitor', 'enable histogram of buffer usage') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
import { Option } from 'commander' | ||
import { DiagLogLevel } from '@opentelemetry/api' | ||
|
||
export const tracesExporter = new Option('--otel-traces-exporter <exporter>', 'OpenTelemetry Traces exporter to use') | ||
.choices(['otlp', 'none']) | ||
.default('none') | ||
|
||
export const metricsExporter = new Option('--otel-metrics-exporter <exporter>', 'OpenTelemetry Metrics exporter to use') | ||
.choices(['otlp', 'none']) | ||
.default('none') | ||
|
||
export const metricsInterval = new Option('--otel-metrics-interval <milliseconds>', 'Export Metrics interval') | ||
.argParser(value => Number.parseInt(value, 10)) | ||
.default(10000) | ||
|
||
export const debug = new Option('--otel-debug <level>', 'Enable OpenTelemetry console diagnostic output') | ||
.choices([...Object.keys(DiagLogLevel)].filter(opt => isNaN(Number.parseInt(opt, 10)))) | ||
.default('ERROR') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
import { dirname, resolve } from 'path' | ||
import { promisify } from 'util' | ||
import { createWriteStream } from 'fs' | ||
import { finished, PassThrough } from 'readable-stream' | ||
import { SpanStatusCode } from '@opentelemetry/api' | ||
import fromStream from 'rdf-dataset-ext/fromStream.js' | ||
import rdf from '@zazuko/env' | ||
import fromFile from 'rdf-utils-fs/fromFile.js' | ||
import runner from '../../runner.js' | ||
import findPipeline from '../../findPipeline.js' | ||
import bufferDebug from './../bufferDebug.js' | ||
import tracer from './../tracer.js' | ||
|
||
async function fileToDataset(filename) { | ||
return fromStream(rdf.dataset(), fromFile(filename)) | ||
} | ||
|
||
function createOutputStream(output) { | ||
if (output === '-') { | ||
// Use a PassThrough stream instead of just process.stdout to avoid closing | ||
// stdout too early | ||
const stream = new PassThrough() | ||
|
||
stream.pipe(process.stdout) | ||
|
||
return stream | ||
} | ||
|
||
return createWriteStream(output) | ||
} | ||
|
||
export default async function (filename, { output, pipeline: iri, variable: variables, variableAll, verbose, enableBufferMonitor } = {}) { | ||
await tracer.startActiveSpan('barnard59 run', async span => { | ||
try { | ||
const level = ['error', 'info', 'debug'][verbose] || 'error' | ||
|
||
const dataset = await fileToDataset(filename) | ||
const ptr = findPipeline(dataset, iri) | ||
|
||
if (variableAll) { | ||
for (const [key, value] of Object.entries(process.env)) { | ||
variables.set(key, value) | ||
} | ||
} | ||
|
||
span.setAttribute('iri', ptr.value) | ||
|
||
const outputStream = createOutputStream(output) | ||
const { finished: runFinished, pipeline } = await runner(ptr, { | ||
basePath: resolve(dirname(filename)), | ||
level, | ||
outputStream, | ||
variables, | ||
}) | ||
|
||
if (enableBufferMonitor) { | ||
bufferDebug(pipeline) | ||
} | ||
|
||
await runFinished | ||
// TODO: this has some issues | ||
await promisify(finished)(outputStream) | ||
} catch (err) { | ||
span.recordException(err) | ||
span.setStatus({ code: SpanStatusCode.ERROR, message: err.message }) | ||
throw err | ||
} finally { | ||
span.end() | ||
} | ||
}) | ||
} |