Skip to content

Commit

Permalink
Experiment with Blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
darunrs committed Nov 6, 2023
1 parent 947ad5d commit ae1aef8
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 14 deletions.
7 changes: 7 additions & 0 deletions runner/src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ const UNPROCESSED_STREAM_MESSAGES = new Gauge({
labelNames: ['indexer', 'type'],
});

const LAST_PROCESSED_BLOCK_HEIGHT = new Gauge({
name: 'queryapi_runner_last_processed_block_height',
help: 'Previous block height processed by an indexer',
labelNames: ['indexer', 'type'],
});

const EXECUTION_DURATION = new Histogram({
name: 'queryapi_runner_execution_duration_milliseconds',
help: 'Time taken to execute an indexer function',
Expand All @@ -34,6 +40,7 @@ export const METRICS = {
CACHE_HIT,
CACHE_MISS,
UNPROCESSED_STREAM_MESSAGES,
LAST_PROCESSED_BLOCK_HEIGHT,
EXECUTION_DURATION,
};

Expand Down
20 changes: 19 additions & 1 deletion runner/src/redis-client/redis-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,25 @@ describe('RedisClient', () => {

expect(mockClient.xRead).toHaveBeenCalledWith(
{ key: 'streamKey', id: '0' },
{ COUNT: 1 }
{ BLOCK: 0, COUNT: 1 }
);
expect(message).toBeUndefined();
});

it('returns count of messages after id with block', async () => {
const mockClient = {
on: jest.fn(),
connect: jest.fn().mockResolvedValue(null),
xRead: jest.fn().mockResolvedValue(null),
} as any;

const client = new RedisClient(mockClient);

const message = await client.getStreamMessages('streamKey', 10, '123-0', 1000);

expect(mockClient.xRead).toHaveBeenCalledWith(
{ key: 'streamKey', id: '123-0' },
{ BLOCK: 1000, COUNT: 10 }
);
expect(message).toBeUndefined();
});
Expand Down
5 changes: 3 additions & 2 deletions runner/src/redis-client/redis-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@ export default class RedisClient {
async getStreamMessages (
streamKey: string,
count = 1,
streamId = this.SMALLEST_STREAM_ID
streamId = this.SMALLEST_STREAM_ID,
block = 0
): Promise<StreamMessage[] | null> {
const results = await this.client.xRead(
{ key: streamKey, id: streamId },
{ COUNT: count }
{ COUNT: count, BLOCK: block }
);

return results?.[0].messages as StreamMessage[];
Expand Down
35 changes: 24 additions & 11 deletions runner/src/stream-handler/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,26 @@ function incrementId (id: string): string {
return `${Number(main) + 1}-${sequence}`;
}

async function blockQueueProducer (workerContext: WorkerContext, streamKey: string): Promise<void> {
async function waitForQueueSpace (workerContext: WorkerContext): Promise<number> {
const HISTORICAL_BATCH_SIZE = 100;
return await new Promise<number>((resolve) => {
const intervalId = setInterval(() => {
const preFetchCount = HISTORICAL_BATCH_SIZE - workerContext.queue.length;
if (preFetchCount > 0) {
clearInterval(intervalId);
resolve(preFetchCount);
}
}, 100);
});
}

async function blockQueueProducer (workerContext: WorkerContext, streamKey: string): Promise<void> {
let streamMessageStartId = '0';

while (true) {
const preFetchCount = HISTORICAL_BATCH_SIZE - workerContext.queue.length;
if (preFetchCount <= 0) {
await sleep(300);
continue;
}
const messages = await workerContext.redisClient.getStreamMessages(streamKey, preFetchCount, streamMessageStartId);
const preFetchCount = await waitForQueueSpace(workerContext);
const messages = await workerContext.redisClient.getStreamMessages(streamKey, preFetchCount, streamMessageStartId, 1000);
if (messages == null) {
await sleep(1000);
continue;
}
console.log(`Fetched ${messages?.length} messages from stream ${streamKey}`);
Expand Down Expand Up @@ -96,13 +103,15 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri
while (true) {
let streamMessageId = '';
try {
const startTime = performance.now();
const blockStartTime = startTime;
while (workerContext.queue.length === 0) {
await sleep(100);
}
const queueMessage = await workerContext.queue.at(0);
if (queueMessage === undefined) {
await sleep(1000);
continue;
}
const startTime = performance.now();
const blockStartTime = startTime;
const block = queueMessage.block;
streamMessageId = queueMessage.streamMessageId;

Expand All @@ -118,6 +127,10 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri

METRICS.EXECUTION_DURATION.labels({ indexer: indexerName, type: streamType }).observe(performance.now() - startTime);

if (streamType === 'historical') {
METRICS.LAST_PROCESSED_BLOCK_HEIGHT.labels({ indexer: indexerName, type: streamType }).set(block.blockHeight);
}

console.log(`Success: ${indexerName}`);
} catch (err) {
await sleep(10000);
Expand Down

0 comments on commit ae1aef8

Please sign in to comment.