diff --git a/runner/src/metrics.ts b/runner/src/metrics.ts index 2ff612ece..e76d4d86d 100644 --- a/runner/src/metrics.ts +++ b/runner/src/metrics.ts @@ -23,6 +23,12 @@ const UNPROCESSED_STREAM_MESSAGES = new Gauge({ labelNames: ['indexer', 'type'], }); +const LAST_PROCESSED_BLOCK_HEIGHT = new Gauge({ + name: 'queryapi_runner_last_processed_block_height', + help: 'Previous block height processed by an indexer', + labelNames: ['indexer', 'type'], +}); + const EXECUTION_DURATION = new Histogram({ name: 'queryapi_runner_execution_duration_milliseconds', help: 'Time taken to execute an indexer function', @@ -34,6 +40,7 @@ export const METRICS = { CACHE_HIT, CACHE_MISS, UNPROCESSED_STREAM_MESSAGES, + LAST_PROCESSED_BLOCK_HEIGHT, EXECUTION_DURATION, }; diff --git a/runner/src/redis-client/redis-client.test.ts b/runner/src/redis-client/redis-client.test.ts index 36f0a36bf..f9d2669d7 100644 --- a/runner/src/redis-client/redis-client.test.ts +++ b/runner/src/redis-client/redis-client.test.ts @@ -14,7 +14,25 @@ describe('RedisClient', () => { expect(mockClient.xRead).toHaveBeenCalledWith( { key: 'streamKey', id: '0' }, - { COUNT: 1 } + { BLOCK: 0, COUNT: 1 } + ); + expect(message).toBeUndefined(); + }); + + it('returns count of messages after id with block', async () => { + const mockClient = { + on: jest.fn(), + connect: jest.fn().mockResolvedValue(null), + xRead: jest.fn().mockResolvedValue(null), + } as any; + + const client = new RedisClient(mockClient); + + const message = await client.getStreamMessages('streamKey', 10, '123-0', 1000); + + expect(mockClient.xRead).toHaveBeenCalledWith( + { key: 'streamKey', id: '123-0' }, + { BLOCK: 1000, COUNT: 10 } ); expect(message).toBeUndefined(); }); diff --git a/runner/src/redis-client/redis-client.ts b/runner/src/redis-client/redis-client.ts index 9b2ae7309..ea763988f 100644 --- a/runner/src/redis-client/redis-client.ts +++ b/runner/src/redis-client/redis-client.ts @@ -47,11 +47,12 @@ export default class RedisClient { async getStreamMessages ( streamKey: string, count = 1, - streamId = this.SMALLEST_STREAM_ID + streamId = this.SMALLEST_STREAM_ID, + block = 0 ): Promise { const results = await this.client.xRead( { key: streamKey, id: streamId }, - { COUNT: count } + { COUNT: count, BLOCK: block } ); return results?.[0].messages as StreamMessage[]; diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index c7cb52828..a41552557 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -52,19 +52,26 @@ function incrementId (id: string): string { return `${Number(main) + 1}-${sequence}`; } -async function blockQueueProducer (workerContext: WorkerContext, streamKey: string): Promise { +async function waitForQueueSpace (workerContext: WorkerContext): Promise { const HISTORICAL_BATCH_SIZE = 100; + return await new Promise((resolve) => { + const intervalId = setInterval(() => { + const preFetchCount = HISTORICAL_BATCH_SIZE - workerContext.queue.length; + if (preFetchCount > 0) { + clearInterval(intervalId); + resolve(preFetchCount); + } + }, 100); + }); +} + +async function blockQueueProducer (workerContext: WorkerContext, streamKey: string): Promise { let streamMessageStartId = '0'; while (true) { - const preFetchCount = HISTORICAL_BATCH_SIZE - workerContext.queue.length; - if (preFetchCount <= 0) { - await sleep(300); - continue; - } - const messages = await workerContext.redisClient.getStreamMessages(streamKey, preFetchCount, streamMessageStartId); + const preFetchCount = await waitForQueueSpace(workerContext); + const messages = await workerContext.redisClient.getStreamMessages(streamKey, preFetchCount, streamMessageStartId, 1000); if (messages == null) { - await sleep(1000); continue; } console.log(`Fetched ${messages?.length} messages from stream ${streamKey}`); @@ -96,13 +103,15 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri while (true) { let streamMessageId = ''; try { - const startTime = performance.now(); - const blockStartTime = startTime; + while (workerContext.queue.length === 0) { + await sleep(100); + } const queueMessage = await workerContext.queue.at(0); if (queueMessage === undefined) { - await sleep(1000); continue; } + const startTime = performance.now(); + const blockStartTime = startTime; const block = queueMessage.block; streamMessageId = queueMessage.streamMessageId; @@ -118,6 +127,10 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri METRICS.EXECUTION_DURATION.labels({ indexer: indexerName, type: streamType }).observe(performance.now() - startTime); + if (streamType === 'historical') { + METRICS.LAST_PROCESSED_BLOCK_HEIGHT.labels({ indexer: indexerName, type: streamType }).set(block.blockHeight); + } + console.log(`Success: ${indexerName}`); } catch (err) { await sleep(10000);