Skip to content

Commit

Permalink
fix: Prevent skipping blocks from Redis Stream (#558)
Browse files Browse the repository at this point in the history
`xread` returns messages _after_ the provided ID, there's no need to
increment the ID ourselves. If we increment to an ID that actually
exists within the stream, the message will be skipped permanently. e.g.
we have messages `1-0` and `1-1`, on first read we get `1-0`, we
increment to `1-1`, then on the second read we get nothing.
  • Loading branch information
morgsmccauley authored Feb 13, 2024
1 parent e2e8e53 commit c8ff7d1
Showing 1 changed file with 1 addition and 6 deletions.
7 changes: 1 addition & 6 deletions runner/src/stream-handler/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@ async function handleStream (workerContext: WorkerContext, streamKey: string): P
void blockQueueConsumer(workerContext, streamKey);
}

function incrementId (id: string): string {
const [main, sequence] = id.split('-');
return `${main}-${Number(sequence) + 1}`;
}

async function blockQueueProducer (workerContext: WorkerContext, streamKey: string): Promise<void> {
const HISTORICAL_BATCH_SIZE = parseInt(process.env.PREFETCH_QUEUE_LIMIT ?? '10');
let streamMessageStartId = '0';
Expand All @@ -78,7 +73,7 @@ async function blockQueueProducer (workerContext: WorkerContext, streamKey: stri
workerContext.queue.push(generateQueueMessage(workerContext, Number(message.block_height), id));
}

streamMessageStartId = incrementId(messages[messages.length - 1].id);
streamMessageStartId = messages[messages.length - 1].id;
} catch (err) {
console.error('Error fetching stream messages', err);
await sleep(500);
Expand Down

0 comments on commit c8ff7d1

Please sign in to comment.