Skip to content

Commit

Permalink
fix(fileimport-service): write valid messages from underlying parsers (
Browse files Browse the repository at this point in the history
  • Loading branch information
iainsproat authored Jan 16, 2024
1 parent 2b3a26b commit ceeb30e
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 25 deletions.
18 changes: 12 additions & 6 deletions packages/fileimport-service/ifc/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,12 @@ module.exports = class ServerAPI {
this.prepInsertionObjectBatch(batch)
await Objects().insert(batch).onConflict().ignore()
this.logger.info(
`Inserted ${batch.length} objects from batch ${index + 1} of ${
batches.length
}`
{
currentBatchCount: batch.length,
currentBatchId: index + 1,
totalNumberOfBatches: batches.length
},
'Inserted {currentBatchCount} objects from batch {currentBatchId} of {totalNumberOfBatches}'
)
}
}
Expand All @@ -140,9 +143,12 @@ module.exports = class ServerAPI {
this.prepInsertionClosureBatch(batch)
await Closures().insert(batch).onConflict().ignore()
this.logger.info(
`Inserted ${batch.length} closures from batch ${index + 1} of ${
batches.length
}`
{
currentBatchCount: batch.length,
currentBatchId: index + 1,
totalNumberOfBatches: batches.length
},
'Inserted {currentBatchCount} closures from batch {currentBatchId} of {totalNumberOfBatches}'
)
}
}
Expand Down
14 changes: 10 additions & 4 deletions packages/fileimport-service/ifc/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,14 @@ async function parseAndCreateCommit({

const start = performance.now()
const { id, tCount } = await myParser.parse(data)
logger = logger.child({ objectId: id })
const end = performance.now()
logger.info(`Total processing time V2: ${(end - start).toFixed(2)}ms`)
logger.info(
{
fileProcessingDurationMs: (end - start).toFixed(2)
},
'Total processing time V2: {fileProcessingDurationMs}ms'
)

const commit = {
streamId,
Expand All @@ -43,7 +49,7 @@ async function parseAndCreateCommit({
})

if (!branch) {
logger.info('Branch not found, creating it.')
logger.info("Branch '{branchName}' not found, creating it.")
await serverApi.createBranch({
name: branchName,
streamId,
Expand All @@ -55,7 +61,7 @@ async function parseAndCreateCommit({
const userToken = process.env.USER_TOKEN

const serverBaseUrl = process.env.SPECKLE_SERVER_URL || 'http://127.0.0.1:3000'
logger.info(`Creating commit for object (${id}), with message "${message}"`)
logger.info(`Creating commit for object ({objectId}), with message "${message}"`)
const response = await fetch(serverBaseUrl + '/graphql', {
method: 'POST',
headers: {
Expand All @@ -72,7 +78,7 @@ async function parseAndCreateCommit({
})

const json = await response.json()
logger.info(json)
logger.info(json, 'Commit created')

return json.data.commitCreate
}
Expand Down
46 changes: 31 additions & 15 deletions packages/fileimport-service/src/daemon.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async function doTask(task) {
let branchMetadata = { streamId: null, branchName: null }

try {
taskLogger.info('Doing task.')
taskLogger.info("Doing task '{taskId}'.")
const info = await FileUploads().where({ id: taskId }).first()
if (!info) {
throw new Error('Internal error: DB inconsistent')
Expand Down Expand Up @@ -239,27 +239,17 @@ function runProcessWithTimeout(processLogger, cmd, cmdArgs, extraEnv, timeoutMs)

boundLogger = boundLogger.child({ pid: childProc.pid })
childProc.stdout.on('data', (data) => {
try {
JSON.parse(data.toString()) // data is already in JSON format
process.stdout.write(data.string())
} catch {
boundLogger.info('Parser: %s', data.toString())
}
handleData(data, false, boundLogger)
})

childProc.stderr.on('data', (data) => {
try {
JSON.parse(data.toString()) // data is already in JSON format
process.stderr.write(data.string())
} catch {
boundLogger.info('Parser: %s', data.toString())
}
handleData(data, true, boundLogger)
})

let timedOut = false

const timeout = setTimeout(() => {
boundLogger.warn('Process timeout. Killing process...')
boundLogger.warn('Process timed out. Killing process...')

timedOut = true
childProc.kill(9)
Expand All @@ -273,7 +263,7 @@ function runProcessWithTimeout(processLogger, cmd, cmdArgs, extraEnv, timeoutMs)
}, timeoutMs)

childProc.on('close', (code) => {
boundLogger.info({ exitCode: code }, `Process exited with code ${code}`)
boundLogger.info({ exitCode: code }, "Process exited with code '{exitCode}'")

if (timedOut) {
return // ignore `close` calls after killing (the promise was already rejected)
Expand All @@ -290,6 +280,32 @@ function runProcessWithTimeout(processLogger, cmd, cmdArgs, extraEnv, timeoutMs)
})
}

function handleData(data, isErr, logger) {
try {
Buffer.isBuffer(data) && (data = data.toString())
data.split('\n').forEach((line) => {
if (!line) return
try {
JSON.parse(line) // verify if the data is already in JSON format
process.stdout.write(line)
process.stdout.write('\n')
} catch {
wrapLogLine(line, isErr, logger)
}
})
} catch {
wrapLogLine(JSON.stringify(data), isErr, logger)
}
}

function wrapLogLine(line, isErr, logger) {
if (isErr) {
logger.error({ parserLogLine: line }, 'ParserLog: {parserLogLine}')
return
}
logger.info({ parserLogLine: line }, 'ParserLog: {parserLogLine}')
}

async function tick() {
if (shouldExit) {
process.exit(0)
Expand Down

0 comments on commit ceeb30e

Please sign in to comment.