Skip to content

Commit

Permalink
feat: Add metrics for memory footprint
Browse files Browse the repository at this point in the history
  • Loading branch information
darunrs committed Feb 9, 2024
1 parent 161b31f commit 3c24beb
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 16 deletions.
2 changes: 1 addition & 1 deletion indexer/queryapi_coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ async fn main() -> anyhow::Result<()> {

async fn fetch_denylist(redis_connection_manager: &ConnectionManager) -> anyhow::Result<Denylist> {
let raw_denylist: String =
storage::get(redis_connection_manager, storage::DENYLIST_KEY).await?;
storage::get(redis_connection_manager, storage::DENYLIST_KEY).await.unwrap_or("".to_owned());
let denylist: Denylist =
serde_json::from_str(&raw_denylist).context("Failed to parse denylist")?;

Expand Down
21 changes: 21 additions & 0 deletions runner/src/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,24 @@
import express from 'express';
import { Gauge, Histogram, Counter, AggregatorRegistry } from 'prom-client';

const HEAP_TOTAL_ALLOCATION = new Gauge({
name: 'queryapi_runner_heap_total_allocation_megabytes',
help: 'Size of heap allocation for indexer function',
labelNames: ['indexer', 'type'],
});

const HEAP_USED = new Gauge({
name: 'queryapi_runner_heap_used_megabytes',
help: 'Size of used heap space for indexer function',
labelNames: ['indexer', 'type'],
});

const PREFETCH_QUEUE_COUNT = new Gauge({
name: 'queryapi_runner_prefetch_queue_count',
help: 'Count of items in prefetch queue for indexer function',
labelNames: ['indexer', 'type'],
});

const BLOCK_WAIT_DURATION = new Histogram({
name: 'queryapi_runner_block_wait_duration_milliseconds',
help: 'Time an indexer function waited for a block before processing',
Expand Down Expand Up @@ -37,6 +55,9 @@ const EXECUTION_DURATION = new Histogram({
});

export const METRICS = {
HEAP_TOTAL_ALLOCATION,
HEAP_USED,
PREFETCH_QUEUE_COUNT,
BLOCK_WAIT_DURATION,
CACHE_HIT,
CACHE_MISS,
Expand Down
39 changes: 24 additions & 15 deletions runner/src/stream-handler/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,27 +57,32 @@ function incrementId (id: string): string {
}

async function blockQueueProducer (workerContext: WorkerContext, streamKey: string): Promise<void> {
const HISTORICAL_BATCH_SIZE = parseInt(process.env.BATCH_SIZE ?? '10');
const HISTORICAL_BATCH_SIZE = parseInt(process.env.PREFETCH_QUEUE_LIMIT ?? '10');
let streamMessageStartId = '0';

while (true) {
const preFetchCount = HISTORICAL_BATCH_SIZE - workerContext.queue.length;
if (preFetchCount <= 0) {
await sleep(100);
continue;
}
const messages = await workerContext.redisClient.getStreamMessages(streamKey, streamMessageStartId, preFetchCount);
if (messages == null) {
await sleep(100);
continue;
}
try {
if (preFetchCount <= 0) {
await sleep(100);
continue;
}
const messages = await workerContext.redisClient.getStreamMessages(streamKey, streamMessageStartId, preFetchCount);
if (messages == null) {
await sleep(100);
continue;
}

for (const streamMessage of messages) {
const { id, message } = streamMessage;
workerContext.queue.push(generateQueueMessage(workerContext, Number(message.block_height), id));
}
for (const streamMessage of messages) {
const { id, message } = streamMessage;
workerContext.queue.push(generateQueueMessage(workerContext, Number(message.block_height), id));
}

streamMessageStartId = incrementId(messages[messages.length - 1].id);
streamMessageStartId = incrementId(messages[messages.length - 1].id);
} catch (err) {
console.error('Error fetching stream messages', err);
await sleep(500);
}
}
}

Expand Down Expand Up @@ -139,6 +144,10 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri
} finally {
const unprocessedMessageCount = await workerContext.redisClient.getUnprocessedStreamMessageCount(streamKey);
METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: workerContext.streamType }).set(unprocessedMessageCount);
const memoryUsage = process.memoryUsage();
METRICS.HEAP_TOTAL_ALLOCATION.labels({ indexer: indexerName, type: workerContext.streamType }).set(memoryUsage.heapTotal / (1024 * 1024));
METRICS.HEAP_USED.labels({ indexer: indexerName, type: workerContext.streamType }).set(memoryUsage.heapUsed / (1024 * 1024));
METRICS.PREFETCH_QUEUE_COUNT.labels({ indexer: indexerName, type: workerContext.streamType }).set(workerContext.queue.length);

const metricsMessage: WorkerMessage = { type: WorkerMessageType.METRICS, data: await promClient.register.getMetricsAsJSON() };
parentPort?.postMessage(metricsMessage);
Expand Down

0 comments on commit 3c24beb

Please sign in to comment.