diff --git a/docker-compose.yml b/docker-compose.yml index 48103d4ae..7dc2e5181 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -70,6 +70,7 @@ services: PGPASSWORD: postgrespassword PGDATABASE: postgres PORT: 9180 + AWS_REGION: eu-central-1 AWS_ACCESS_KEY_ID: AWS_SECRET_ACCESS_KEY: GRPC_SERVER_PORT: 7001 diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index dbb08ba85..9f0ffbdf6 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -141,8 +141,9 @@ async fn main() -> anyhow::Result<()> { } async fn fetch_denylist(redis_connection_manager: &ConnectionManager) -> anyhow::Result { - let raw_denylist: String = - storage::get(redis_connection_manager, storage::DENYLIST_KEY).await?; + let raw_denylist: String = storage::get(redis_connection_manager, storage::DENYLIST_KEY) + .await + .unwrap_or("".to_owned()); let denylist: Denylist = serde_json::from_str(&raw_denylist).context("Failed to parse denylist")?; diff --git a/runner/src/metrics.ts b/runner/src/metrics.ts index 9224c374b..b30d2011c 100644 --- a/runner/src/metrics.ts +++ b/runner/src/metrics.ts @@ -1,6 +1,24 @@ import express from 'express'; import { Gauge, Histogram, Counter, AggregatorRegistry } from 'prom-client'; +const HEAP_TOTAL_ALLOCATION = new Gauge({ + name: 'queryapi_runner_heap_total_allocation_megabytes', + help: 'Size of heap allocation for indexer function', + labelNames: ['indexer', 'type'], +}); + +const HEAP_USED = new Gauge({ + name: 'queryapi_runner_heap_used_megabytes', + help: 'Size of used heap space for indexer function', + labelNames: ['indexer', 'type'], +}); + +const PREFETCH_QUEUE_COUNT = new Gauge({ + name: 'queryapi_runner_prefetch_queue_count', + help: 'Count of items in prefetch queue for indexer function', + labelNames: ['indexer', 'type'], +}); + const BLOCK_WAIT_DURATION = new Histogram({ name: 'queryapi_runner_block_wait_duration_milliseconds', help: 'Time an indexer function waited for a block before processing', @@ -37,6 +55,9 @@ const EXECUTION_DURATION = new Histogram({ }); export const METRICS = { + HEAP_TOTAL_ALLOCATION, + HEAP_USED, + PREFETCH_QUEUE_COUNT, BLOCK_WAIT_DURATION, CACHE_HIT, CACHE_MISS, diff --git a/runner/src/server/runner-service.ts b/runner/src/server/runner-service.ts index 96d295e56..2118591a9 100644 --- a/runner/src/server/runner-service.ts +++ b/runner/src/server/runner-service.ts @@ -107,7 +107,8 @@ function getRunnerService (executors: Map, StreamHandlerT schema: '', }; context = { - status: Status.RUNNING + status: Status.RUNNING, + block_height: context.block_height, }; } response.push({ diff --git a/runner/src/stream-handler/stream-handler.ts b/runner/src/stream-handler/stream-handler.ts index 490be6559..0f7268a09 100644 --- a/runner/src/stream-handler/stream-handler.ts +++ b/runner/src/stream-handler/stream-handler.ts @@ -17,8 +17,19 @@ export interface IndexerConfig { version: number } +export enum WorkerMessageType { + METRICS = 'METRICS', + BLOCK_HEIGHT = 'BLOCK_HEIGHT', +} + +export interface WorkerMessage { + type: WorkerMessageType + data: any +} + interface ExecutorContext { status: Status + block_height: number } export default class StreamHandler { @@ -38,6 +49,7 @@ export default class StreamHandler { }); this.executorContext = { status: Status.RUNNING, + block_height: indexerConfig?.version ?? 0, }; this.worker.on('message', this.handleMessage.bind(this)); @@ -61,12 +73,22 @@ export default class StreamHandler { indexer.setStatus(functionName, 0, Status.STOPPED).catch((e) => { console.log(`Failed to set status STOPPED for stream: ${this.streamKey}`, e); }); + indexer.writeLog(functionName, this.executorContext.block_height, `Encountered error processing stream: ${this.streamKey}, terminating thread\n${error.toString()}`).catch((e) => { + console.log(`Failed to write log for stream: ${this.streamKey}`, e); + }); this.worker.terminate().catch(() => { console.log(`Failed to terminate thread for stream: ${this.streamKey}`); }); } - private handleMessage (message: string): void { - registerWorkerMetrics(this.worker.threadId, message); + private handleMessage (message: WorkerMessage): void { + switch (message.type) { + case WorkerMessageType.BLOCK_HEIGHT: + this.executorContext.block_height = message.data; + break; + case WorkerMessageType.METRICS: + registerWorkerMetrics(this.worker.threadId, message.data); + break; + } } } diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index 311657112..1e3b6c267 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -6,7 +6,7 @@ import RedisClient, { type StreamType } from '../redis-client'; import { METRICS } from '../metrics'; import type { Block } from '@near-lake/primitives'; import LakeClient from '../lake-client'; -import { type IndexerConfig } from './stream-handler'; +import { WorkerMessageType, type IndexerConfig, type WorkerMessage } from './stream-handler'; if (isMainThread) { throw new Error('Worker should not be run on main thread'); @@ -57,27 +57,32 @@ function incrementId (id: string): string { } async function blockQueueProducer (workerContext: WorkerContext, streamKey: string): Promise { - const HISTORICAL_BATCH_SIZE = parseInt(process.env.BATCH_SIZE ?? '10'); + const HISTORICAL_BATCH_SIZE = parseInt(process.env.PREFETCH_QUEUE_LIMIT ?? '10'); let streamMessageStartId = '0'; while (true) { const preFetchCount = HISTORICAL_BATCH_SIZE - workerContext.queue.length; - if (preFetchCount <= 0) { - await sleep(100); - continue; - } - const messages = await workerContext.redisClient.getStreamMessages(streamKey, streamMessageStartId, preFetchCount); - if (messages == null) { - await sleep(100); - continue; - } + try { + if (preFetchCount <= 0) { + await sleep(100); + continue; + } + const messages = await workerContext.redisClient.getStreamMessages(streamKey, streamMessageStartId, preFetchCount); + if (messages == null) { + await sleep(100); + continue; + } - for (const streamMessage of messages) { - const { id, message } = streamMessage; - workerContext.queue.push(generateQueueMessage(workerContext, Number(message.block_height), id)); - } + for (const streamMessage of messages) { + const { id, message } = streamMessage; + workerContext.queue.push(generateQueueMessage(workerContext, Number(message.block_height), id)); + } - streamMessageStartId = incrementId(messages[messages.length - 1].id); + streamMessageStartId = incrementId(messages[messages.length - 1].id); + } catch (err) { + console.error('Error fetching stream messages', err); + await sleep(500); + } } } @@ -85,7 +90,7 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri const indexer = new Indexer(); const isHistorical = workerContext.streamType === 'historical'; let streamMessageId = ''; - let indexerName = ''; + let indexerName = streamKey.split(':')[0]; let currBlockHeight = 0; while (true) { @@ -114,6 +119,8 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri } const block = queueMessage.block; currBlockHeight = block.blockHeight; + const blockHeightMessage: WorkerMessage = { type: WorkerMessageType.BLOCK_HEIGHT, data: currBlockHeight }; + parentPort?.postMessage(blockHeightMessage); streamMessageId = queueMessage.streamMessageId; if (block === undefined || block.blockHeight == null) { @@ -133,11 +140,17 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri } catch (err) { await sleep(10000); console.log(`Failed: ${indexerName} ${workerContext.streamType} on block ${currBlockHeight}`, err); + throw err; } finally { const unprocessedMessageCount = await workerContext.redisClient.getUnprocessedStreamMessageCount(streamKey); METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: workerContext.streamType }).set(unprocessedMessageCount); + const memoryUsage = process.memoryUsage(); + METRICS.HEAP_TOTAL_ALLOCATION.labels({ indexer: indexerName, type: workerContext.streamType }).set(memoryUsage.heapTotal / (1024 * 1024)); + METRICS.HEAP_USED.labels({ indexer: indexerName, type: workerContext.streamType }).set(memoryUsage.heapUsed / (1024 * 1024)); + METRICS.PREFETCH_QUEUE_COUNT.labels({ indexer: indexerName, type: workerContext.streamType }).set(workerContext.queue.length); - parentPort?.postMessage(await promClient.register.getMetricsAsJSON()); + const metricsMessage: WorkerMessage = { type: WorkerMessageType.METRICS, data: await promClient.register.getMetricsAsJSON() }; + parentPort?.postMessage(metricsMessage); } } }