Skip to content

Commit

Permalink
Merge pull request #554 from near/fix-memory-error
Browse files Browse the repository at this point in the history
feat: Add Metrics for Memory Statistics
  • Loading branch information
darunrs authored Feb 9, 2024
2 parents c92f477 + a86206b commit 8864a2a
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 23 deletions.
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ services:
PGPASSWORD: postgrespassword
PGDATABASE: postgres
PORT: 9180
AWS_REGION: eu-central-1
AWS_ACCESS_KEY_ID:
AWS_SECRET_ACCESS_KEY:
GRPC_SERVER_PORT: 7001
Expand Down
5 changes: 3 additions & 2 deletions indexer/queryapi_coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@ async fn main() -> anyhow::Result<()> {
}

async fn fetch_denylist(redis_connection_manager: &ConnectionManager) -> anyhow::Result<Denylist> {
let raw_denylist: String =
storage::get(redis_connection_manager, storage::DENYLIST_KEY).await?;
let raw_denylist: String = storage::get(redis_connection_manager, storage::DENYLIST_KEY)
.await
.unwrap_or("".to_owned());
let denylist: Denylist =
serde_json::from_str(&raw_denylist).context("Failed to parse denylist")?;

Expand Down
21 changes: 21 additions & 0 deletions runner/src/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,24 @@
import express from 'express';
import { Gauge, Histogram, Counter, AggregatorRegistry } from 'prom-client';

const HEAP_TOTAL_ALLOCATION = new Gauge({
name: 'queryapi_runner_heap_total_allocation_megabytes',
help: 'Size of heap allocation for indexer function',
labelNames: ['indexer', 'type'],
});

const HEAP_USED = new Gauge({
name: 'queryapi_runner_heap_used_megabytes',
help: 'Size of used heap space for indexer function',
labelNames: ['indexer', 'type'],
});

const PREFETCH_QUEUE_COUNT = new Gauge({
name: 'queryapi_runner_prefetch_queue_count',
help: 'Count of items in prefetch queue for indexer function',
labelNames: ['indexer', 'type'],
});

const BLOCK_WAIT_DURATION = new Histogram({
name: 'queryapi_runner_block_wait_duration_milliseconds',
help: 'Time an indexer function waited for a block before processing',
Expand Down Expand Up @@ -37,6 +55,9 @@ const EXECUTION_DURATION = new Histogram({
});

export const METRICS = {
HEAP_TOTAL_ALLOCATION,
HEAP_USED,
PREFETCH_QUEUE_COUNT,
BLOCK_WAIT_DURATION,
CACHE_HIT,
CACHE_MISS,
Expand Down
3 changes: 2 additions & 1 deletion runner/src/server/runner-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ function getRunnerService (executors: Map<string, StreamHandler>, StreamHandlerT
schema: '',
};
context = {
status: Status.RUNNING
status: Status.RUNNING,
block_height: context.block_height,
};
}
response.push({
Expand Down
26 changes: 24 additions & 2 deletions runner/src/stream-handler/stream-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,19 @@ export interface IndexerConfig {
version: number
}

export enum WorkerMessageType {
METRICS = 'METRICS',
BLOCK_HEIGHT = 'BLOCK_HEIGHT',
}

export interface WorkerMessage {
type: WorkerMessageType
data: any
}

interface ExecutorContext {
status: Status
block_height: number
}

export default class StreamHandler {
Expand All @@ -38,6 +49,7 @@ export default class StreamHandler {
});
this.executorContext = {
status: Status.RUNNING,
block_height: indexerConfig?.version ?? 0,
};

this.worker.on('message', this.handleMessage.bind(this));
Expand All @@ -61,12 +73,22 @@ export default class StreamHandler {
indexer.setStatus(functionName, 0, Status.STOPPED).catch((e) => {
console.log(`Failed to set status STOPPED for stream: ${this.streamKey}`, e);
});
indexer.writeLog(functionName, this.executorContext.block_height, `Encountered error processing stream: ${this.streamKey}, terminating thread\n${error.toString()}`).catch((e) => {
console.log(`Failed to write log for stream: ${this.streamKey}`, e);
});
this.worker.terminate().catch(() => {
console.log(`Failed to terminate thread for stream: ${this.streamKey}`);
});
}

private handleMessage (message: string): void {
registerWorkerMetrics(this.worker.threadId, message);
private handleMessage (message: WorkerMessage): void {
switch (message.type) {
case WorkerMessageType.BLOCK_HEIGHT:
this.executorContext.block_height = message.data;
break;
case WorkerMessageType.METRICS:
registerWorkerMetrics(this.worker.threadId, message.data);
break;
}
}
}
49 changes: 31 additions & 18 deletions runner/src/stream-handler/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import RedisClient, { type StreamType } from '../redis-client';
import { METRICS } from '../metrics';
import type { Block } from '@near-lake/primitives';
import LakeClient from '../lake-client';
import { type IndexerConfig } from './stream-handler';
import { WorkerMessageType, type IndexerConfig, type WorkerMessage } from './stream-handler';

if (isMainThread) {
throw new Error('Worker should not be run on main thread');
Expand Down Expand Up @@ -57,35 +57,40 @@ function incrementId (id: string): string {
}

async function blockQueueProducer (workerContext: WorkerContext, streamKey: string): Promise<void> {
const HISTORICAL_BATCH_SIZE = parseInt(process.env.BATCH_SIZE ?? '10');
const HISTORICAL_BATCH_SIZE = parseInt(process.env.PREFETCH_QUEUE_LIMIT ?? '10');
let streamMessageStartId = '0';

while (true) {
const preFetchCount = HISTORICAL_BATCH_SIZE - workerContext.queue.length;
if (preFetchCount <= 0) {
await sleep(100);
continue;
}
const messages = await workerContext.redisClient.getStreamMessages(streamKey, streamMessageStartId, preFetchCount);
if (messages == null) {
await sleep(100);
continue;
}
try {
if (preFetchCount <= 0) {
await sleep(100);
continue;
}
const messages = await workerContext.redisClient.getStreamMessages(streamKey, streamMessageStartId, preFetchCount);
if (messages == null) {
await sleep(100);
continue;
}

for (const streamMessage of messages) {
const { id, message } = streamMessage;
workerContext.queue.push(generateQueueMessage(workerContext, Number(message.block_height), id));
}
for (const streamMessage of messages) {
const { id, message } = streamMessage;
workerContext.queue.push(generateQueueMessage(workerContext, Number(message.block_height), id));
}

streamMessageStartId = incrementId(messages[messages.length - 1].id);
streamMessageStartId = incrementId(messages[messages.length - 1].id);
} catch (err) {
console.error('Error fetching stream messages', err);
await sleep(500);
}
}
}

async function blockQueueConsumer (workerContext: WorkerContext, streamKey: string): Promise<void> {
const indexer = new Indexer();
const isHistorical = workerContext.streamType === 'historical';
let streamMessageId = '';
let indexerName = '';
let indexerName = streamKey.split(':')[0];
let currBlockHeight = 0;

while (true) {
Expand Down Expand Up @@ -114,6 +119,8 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri
}
const block = queueMessage.block;
currBlockHeight = block.blockHeight;
const blockHeightMessage: WorkerMessage = { type: WorkerMessageType.BLOCK_HEIGHT, data: currBlockHeight };
parentPort?.postMessage(blockHeightMessage);
streamMessageId = queueMessage.streamMessageId;

if (block === undefined || block.blockHeight == null) {
Expand All @@ -133,11 +140,17 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri
} catch (err) {
await sleep(10000);
console.log(`Failed: ${indexerName} ${workerContext.streamType} on block ${currBlockHeight}`, err);
throw err;
} finally {
const unprocessedMessageCount = await workerContext.redisClient.getUnprocessedStreamMessageCount(streamKey);
METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: workerContext.streamType }).set(unprocessedMessageCount);
const memoryUsage = process.memoryUsage();
METRICS.HEAP_TOTAL_ALLOCATION.labels({ indexer: indexerName, type: workerContext.streamType }).set(memoryUsage.heapTotal / (1024 * 1024));
METRICS.HEAP_USED.labels({ indexer: indexerName, type: workerContext.streamType }).set(memoryUsage.heapUsed / (1024 * 1024));
METRICS.PREFETCH_QUEUE_COUNT.labels({ indexer: indexerName, type: workerContext.streamType }).set(workerContext.queue.length);

parentPort?.postMessage(await promClient.register.getMetricsAsJSON());
const metricsMessage: WorkerMessage = { type: WorkerMessageType.METRICS, data: await promClient.register.getMetricsAsJSON() };
parentPort?.postMessage(metricsMessage);
}
}
}
Expand Down

0 comments on commit 8864a2a

Please sign in to comment.