From 2096c3919ab67970f744cc17ef19f557647407ae Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Wed, 1 Nov 2023 13:46:37 +1300 Subject: [PATCH] fix: Write `UNPROCESSED_STREAM_MESSAGES` metric on both failure/success (#352) --- runner/src/stream-handler/worker.ts | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index a80e854ee..09569f4fa 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -19,11 +19,11 @@ void (async function main () { console.log('Started processing stream: ', streamKey); let indexerName = ''; + const streamType = redisClient.getStreamType(streamKey); while (true) { try { const startTime = performance.now(); - const streamType = redisClient.getStreamType(streamKey); const messages = await redisClient.getNextStreamMessage(streamKey); const indexerConfig = await redisClient.getStreamStorage(streamKey); @@ -52,14 +52,6 @@ void (async function main () { await redisClient.deleteStreamMessage(streamKey, id); - const unprocessedMessages = await redisClient.getUnprocessedStreamMessages(streamKey); - - parentPort?.postMessage({ - type: 'UNPROCESSED_STREAM_MESSAGES', - labels: { indexer: indexerName, type: streamType }, - value: unprocessedMessages?.length ?? 0, - } satisfies Message); - parentPort?.postMessage({ type: 'EXECUTION_DURATION', labels: { indexer: indexerName, type: streamType }, @@ -70,6 +62,14 @@ void (async function main () { } catch (err) { await sleep(10000); console.log(`Failed: ${indexerName}`, err); + } finally { + const unprocessedMessages = await redisClient.getUnprocessedStreamMessages(streamKey); + + parentPort?.postMessage({ + type: 'UNPROCESSED_STREAM_MESSAGES', + labels: { indexer: indexerName, type: streamType }, + value: unprocessedMessages?.length ?? 0, + } satisfies Message); } } })();