From c8ff7d12fbba902450b82faaa14af3eaa1aaf468 Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Wed, 14 Feb 2024 11:30:45 +1300 Subject: [PATCH] fix: Prevent skipping blocks from Redis Stream (#558) `xread` returns messages _after_ the provided ID, there's no need to increment the ID ourselves. If we increment to an ID that actually exists within the stream, the message will be skipped permanently. e.g. we have messages `1-0` and `1-1`, on first read we get `1-0`, we increment to `1-1`, then on the second read we get nothing. --- runner/src/stream-handler/worker.ts | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index 1e3b6c267..0b16ec185 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -51,11 +51,6 @@ async function handleStream (workerContext: WorkerContext, streamKey: string): P void blockQueueConsumer(workerContext, streamKey); } -function incrementId (id: string): string { - const [main, sequence] = id.split('-'); - return `${main}-${Number(sequence) + 1}`; -} - async function blockQueueProducer (workerContext: WorkerContext, streamKey: string): Promise { const HISTORICAL_BATCH_SIZE = parseInt(process.env.PREFETCH_QUEUE_LIMIT ?? '10'); let streamMessageStartId = '0'; @@ -78,7 +73,7 @@ async function blockQueueProducer (workerContext: WorkerContext, streamKey: stri workerContext.queue.push(generateQueueMessage(workerContext, Number(message.block_height), id)); } - streamMessageStartId = incrementId(messages[messages.length - 1].id); + streamMessageStartId = messages[messages.length - 1].id; } catch (err) { console.error('Error fetching stream messages', err); await sleep(500);