From 507b214da1e02919fad1655ce50e0e67032b3940 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Tue, 3 Oct 2023 10:30:16 -0700 Subject: [PATCH] Address PR Comments --- .../src/historical_block_processing.rs | 1 + indexer/queryapi_coordinator/src/main.rs | 13 +++-- indexer/queryapi_coordinator/src/utils.rs | 17 +++---- indexer/storage/src/lib.rs | 48 +++++++++---------- runner/src/indexer/indexer.test.ts | 20 ++++---- runner/src/indexer/indexer.ts | 8 ++-- runner/src/metrics.ts | 20 ++++---- runner/src/redis-client/redis-client.test.ts | 4 +- runner/src/redis-client/redis-client.ts | 6 +-- 9 files changed, 69 insertions(+), 68 deletions(-) diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index ca7125d7c..82b60b8c3 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -151,6 +151,7 @@ pub(crate) async fn process_historical_messages( redis_connection_manager, storage::generate_historical_storage_key(&indexer_function.get_full_name()), serde_json::to_string(&indexer_function)?, + None, ) .await?; } diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index 8c4fed966..b62c0fad6 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -6,7 +6,7 @@ use tokio::sync::{Mutex, MutexGuard}; use indexer_rules_engine::types::indexer_rule_match::{ChainId, IndexerRuleMatch}; use near_lake_framework::near_indexer_primitives::types::{AccountId, BlockHeight}; use near_lake_framework::near_indexer_primitives::{types, StreamerMessage}; -use utils::process_streamer_message; +use utils::serialize_to_camel_case_json_string; use crate::indexer_types::IndexerFunction; use indexer_types::{IndexerQueueMessage, IndexerRegistry}; @@ -148,17 +148,15 @@ async fn handle_streamer_message( let block_height: BlockHeight = context.streamer_message.block.header.height; // Cache streamer message block and shards for use in real time processing - storage::setEx( + storage::set( context.redis_connection_manager, format!( - "{}{}{}:{}", + "{}{}", storage::STREAMER_MESSAGE_HASH_KEY_BASE, - storage::LAKE_BUCKET_PREFIX, - context.chain_id, block_height ), - 180, - serde_json::to_string(&process_streamer_message(&context.streamer_message))?, + &serialize_to_camel_case_json_string(&context.streamer_message)?, + Some(60), ) .await?; @@ -222,6 +220,7 @@ async fn handle_streamer_message( context.redis_connection_manager, storage::generate_real_time_storage_key(&indexer_function.get_full_name()), serde_json::to_string(indexer_function)?, + None, ) .await?; storage::xadd( diff --git a/indexer/queryapi_coordinator/src/utils.rs b/indexer/queryapi_coordinator/src/utils.rs index 9626c2154..9f8c2585a 100644 --- a/indexer/queryapi_coordinator/src/utils.rs +++ b/indexer/queryapi_coordinator/src/utils.rs @@ -52,19 +52,19 @@ pub(crate) async fn stats(redis_connection_manager: storage::ConnectionManager) } } -pub(crate) fn process_streamer_message( +pub(crate) fn serialize_to_camel_case_json_string( streamer_message: &near_lake_framework::near_indexer_primitives::StreamerMessage, -) -> Value { +) -> anyhow::Result { // Serialize the Message object to a JSON string - let json_str = serde_json::to_string(&streamer_message).unwrap(); + let json_str = serde_json::to_string(&streamer_message)?; // Deserialize the JSON string to a Value Object - let mut message_value: Value = serde_json::from_str(&json_str).unwrap(); + let mut message_value: Value = serde_json::from_str(&json_str)?; // Convert keys to Camel Case to_camel_case_keys(&mut message_value); - return message_value; + return serde_json::to_string(&message_value); } fn to_camel_case_keys(message_value: &mut Value) { @@ -86,9 +86,10 @@ fn to_camel_case_keys(message_value: &mut Value) { .join(""); // Recursively process inner fields and update map with new key - let mut val = map.remove(&key).unwrap(); - to_camel_case_keys(&mut val); - map.insert(new_key, val); + if let Some(mut val) = map.remove(&key) { + to_camel_case_keys(&mut val); + map.insert(new_key, val); + } } } Value::Array(vec) => { diff --git a/indexer/storage/src/lib.rs b/indexer/storage/src/lib.rs index 4ee362e3f..cf6c21432 100644 --- a/indexer/storage/src/lib.rs +++ b/indexer/storage/src/lib.rs @@ -4,7 +4,7 @@ const STORAGE: &str = "storage_alertexer"; pub const LAKE_BUCKET_PREFIX: &str = "near-lake-data-"; pub const STREAMS_SET_KEY: &str = "streams"; -pub const STREAMER_MESSAGE_HASH_KEY_BASE: &str = "streamer:message:cache:"; +pub const STREAMER_MESSAGE_HASH_KEY_BASE: &str = "streamer:message:"; pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client { redis::Client::open(redis_connection_str).expect("can create redis client") @@ -49,29 +49,23 @@ pub async fn set( redis_connection_manager: &ConnectionManager, key: impl ToRedisArgs + std::fmt::Debug, value: impl ToRedisArgs + std::fmt::Debug, + expiration: Option, ) -> anyhow::Result<()> { - redis::cmd("SET") - .arg(&key) - .arg(&value) - .query_async(&mut redis_connection_manager.clone()) - .await?; - tracing::debug!(target: STORAGE, "SET: {:?}: {:?}", key, value,); - Ok(()) -} - -pub async fn setEx( - redis_connection_manager: &ConnectionManager, - key: impl ToRedisArgs + std::fmt::Debug, - expiration: usize, - value: impl ToRedisArgs + std::fmt::Debug, -) -> anyhow::Result<()> { - redis::cmd("SETEX") - .arg(&key) - .arg(expiration) - .arg(&value) - .query_async(&mut redis_connection_manager.clone()) + let mut cmd = redis::cmd("SET"); + cmd.arg(&key).arg(&value); + + // Add expiration arguments if present + let exp_to_print: String; + if let Some(expiration) = expiration { + cmd.arg("EX").arg(expiration); + exp_to_print = format!("EX {}", expiration); + } else { + exp_to_print = "".to_string(); + } + + cmd.query_async(&mut redis_connection_manager.clone()) .await?; - tracing::debug!(target: STORAGE, "SETEX: {:?}: {:?} with expiration {:?}s", key, value, expiration); + tracing::debug!(target: STORAGE, "SET: {:?}: {:?} {:?}", key, value, exp_to_print); Ok(()) } @@ -131,7 +125,7 @@ pub async fn push_receipt_to_watching_list( receipt_id: &str, cache_value: &[u8], ) -> anyhow::Result<()> { - set(redis_connection_manager, receipt_id, cache_value).await?; + set(redis_connection_manager, receipt_id, cache_value, None).await?; // redis::cmd("INCR") // .arg(format!("receipts_{}", transaction_hash)) // .query_async(&mut redis_connection_manager.clone()) @@ -179,7 +173,13 @@ pub async fn update_last_indexed_block( redis_connection_manager: &ConnectionManager, block_height: u64, ) -> anyhow::Result<()> { - set(redis_connection_manager, "last_indexed_block", block_height).await?; + set( + redis_connection_manager, + "last_indexed_block", + block_height, + None + ) + .await?; redis::cmd("INCR") .arg("blocks_processed") .query_async(&mut redis_connection_manager.clone()) diff --git a/runner/src/indexer/indexer.test.ts b/runner/src/indexer/indexer.test.ts index 0a1d0c8e7..31faeba83 100644 --- a/runner/src/indexer/indexer.test.ts +++ b/runner/src/indexer/indexer.test.ts @@ -164,7 +164,7 @@ CREATE TABLE }); const transparentRedis = { - getStreamerMessageFromCache: jest.fn() + getStreamerMessage: jest.fn() } as unknown as RedisClient; beforeEach(() => { @@ -201,7 +201,7 @@ CREATE TABLE ) ); const mockRedis = { - getStreamerMessageFromCache: mockData + getStreamerMessage: mockData } as unknown as RedisClient; const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: mockRedis }); @@ -273,7 +273,7 @@ CREATE TABLE test('Indexer.fetchStreamerMessage() should fetch the message from cache and use it directly', async () => { const blockHeight = 85233529; const blockHash = 'xyz'; - const getMessageFromCache = jest.fn() + const getMessage = jest.fn() .mockReturnValueOnce(JSON.stringify( { block: { @@ -287,15 +287,15 @@ CREATE TABLE } )); const mockRedis = { - getStreamerMessageFromCache: getMessageFromCache + getStreamerMessage: getMessage } as unknown as RedisClient; const indexer = new Indexer('mainnet', { redisClient: mockRedis }); const streamerMessage = await indexer.fetchStreamerMessage(blockHeight, false); - expect(getMessageFromCache).toHaveBeenCalledTimes(1); - expect(JSON.stringify(getMessageFromCache.mock.calls[0])).toEqual( - `["near-lake-data-mainnet",${blockHeight}]` + expect(getMessage).toHaveBeenCalledTimes(1); + expect(JSON.stringify(getMessage.mock.calls[0])).toEqual( + `[${blockHeight}]` ); const block = Block.fromStreamerMessage(streamerMessage); @@ -339,7 +339,7 @@ CREATE TABLE Bucket: 'near-lake-data-mainnet', Key: `${blockHeight.toString().padStart(12, '0')}/shard_0.json` }))); - expect(transparentRedis.getStreamerMessageFromCache).toHaveBeenCalledTimes(1); + expect(transparentRedis.getStreamerMessage).toHaveBeenCalledTimes(1); const block = Block.fromStreamerMessage(streamerMessage); @@ -371,7 +371,7 @@ CREATE TABLE send: mockSend, } as unknown as S3Client; const mockRedis = { - getStreamerMessageFromCache: jest.fn() + getStreamerMessage: jest.fn() } as unknown as RedisClient; const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: mockRedis }); @@ -386,7 +386,7 @@ CREATE TABLE Bucket: 'near-lake-data-mainnet', Key: `${blockHeight.toString().padStart(12, '0')}/shard_0.json` }))); - expect(mockRedis.getStreamerMessageFromCache).toHaveBeenCalledTimes(0); + expect(mockRedis.getStreamerMessage).toHaveBeenCalledTimes(0); const block = Block.fromStreamerMessage(streamerMessage); diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index 7bf33081b..248f27960 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -138,13 +138,13 @@ export default class Indexer { async fetchStreamerMessage (blockHeight: number, isHistorical: boolean): Promise<{ block: any, shards: any[] }> { if (!isHistorical) { - const cachedMessage = await this.deps.redisClient.getStreamerMessageFromCache(`near-lake-data-${this.network}`, blockHeight); - if (cachedMessage) { // Cache hit on streamer message - METRICS.CACHE_HIT_STREAMER_MESSAGE.labels(isHistorical ? 'historical' : 'realtime').inc(); // increment the cache hit counter + const cachedMessage = await this.deps.redisClient.getStreamerMessage(blockHeight); + if (cachedMessage) { + METRICS.CACHE_HIT.labels(isHistorical ? 'historical' : 'real-time', 'streamer_message').inc(); const parsedMessage = JSON.parse(cachedMessage); return parsedMessage; } else { - METRICS.CACHE_MISS_STREAMER_MESSAGE.labels(isHistorical ? 'historical' : 'realtime').inc(); // increment the cache miss counter + METRICS.CACHE_MISS.labels(isHistorical ? 'historical' : 'real-time', 'streamer_message').inc(); } } const blockPromise = this.fetchBlockPromise(blockHeight); diff --git a/runner/src/metrics.ts b/runner/src/metrics.ts index 5cefffd28..ee757adc7 100644 --- a/runner/src/metrics.ts +++ b/runner/src/metrics.ts @@ -13,23 +13,23 @@ const EXECUTION_DURATION = new promClient.Gauge({ labelNames: ['indexer', 'type'], }); -const CACHE_HIT_STREAMER_MESSAGE = new promClient.Counter({ - name: 'redis_cache_hit_for_streamer_message', - help: 'The number of times the streamer message cache was hit', - labelNames: ['type'] +const CACHE_HIT = new promClient.Counter({ + name: 'queryapi_runner_cache_hit', + help: 'The number of times cache was hit successfully', + labelNames: ['type', 'key'] }); -const CACHE_MISS_STREAMER_MESSAGE = new promClient.Counter({ - name: 'redis_cache_miss_for_streamer_message', - help: 'The number of times the streamer message cache was missed', - labelNames: ['type'] +const CACHE_MISS = new promClient.Counter({ + name: 'queryapi_runner_cache_miss', + help: 'The number of times cache was missed', + labelNames: ['type', 'key'] }); export const METRICS = { EXECUTION_DURATION, UNPROCESSED_STREAM_MESSAGES, - CACHE_HIT_STREAMER_MESSAGE, - CACHE_MISS_STREAMER_MESSAGE + CACHE_HIT, + CACHE_MISS }; export const startServer = async (): Promise => { diff --git a/runner/src/redis-client/redis-client.test.ts b/runner/src/redis-client/redis-client.test.ts index f9e3b29f2..26030f249 100644 --- a/runner/src/redis-client/redis-client.test.ts +++ b/runner/src/redis-client/redis-client.test.ts @@ -90,8 +90,8 @@ describe('RedisClient', () => { } as any; const client = new RedisClient(mockClient); - await client.getStreamerMessageFromCache('near-lake-data-mainnet', 1000); + await client.getStreamerMessage(1000); - expect(mockClient.get).toHaveBeenCalledWith('streamer:message:cache:near-lake-data-mainnet:1000'); + expect(mockClient.get).toHaveBeenCalledWith('streamer:message:1000'); }); }); diff --git a/runner/src/redis-client/redis-client.ts b/runner/src/redis-client/redis-client.ts index a29918c5c..18e11b854 100644 --- a/runner/src/redis-client/redis-client.ts +++ b/runner/src/redis-client/redis-client.ts @@ -20,7 +20,7 @@ export default class RedisClient { SMALLEST_STREAM_ID = '0'; LARGEST_STREAM_ID = '+'; STREAMS_SET_KEY = 'streams'; - STREAMER_MESSAGE_HASH_KEY_BASE = 'streamer:message:cache:'; + STREAMER_MESSAGE_HASH_KEY_BASE = 'streamer:message:'; constructor ( private readonly client: RedisClientType = createClient({ url: process.env.REDIS_CONNECTION_STRING }) @@ -85,7 +85,7 @@ export default class RedisClient { return await this.client.sMembers(this.STREAMS_SET_KEY); } - async getStreamerMessageFromCache (source: string, blockHeight: number): Promise { - return await this.client.get(`${this.STREAMER_MESSAGE_HASH_KEY_BASE}${source}:${blockHeight}`); + async getStreamerMessage (blockHeight: number): Promise { + return await this.client.get(`${this.STREAMER_MESSAGE_HASH_KEY_BASE}${blockHeight}`); } }