Skip to content

Commit

Permalink
Address PR Comments
Browse files Browse the repository at this point in the history
  • Loading branch information
darunrs committed Oct 5, 2023
1 parent 79c23fb commit 507b214
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ pub(crate) async fn process_historical_messages(
redis_connection_manager,
storage::generate_historical_storage_key(&indexer_function.get_full_name()),
serde_json::to_string(&indexer_function)?,
None,
)
.await?;
}
Expand Down
13 changes: 6 additions & 7 deletions indexer/queryapi_coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio::sync::{Mutex, MutexGuard};
use indexer_rules_engine::types::indexer_rule_match::{ChainId, IndexerRuleMatch};
use near_lake_framework::near_indexer_primitives::types::{AccountId, BlockHeight};
use near_lake_framework::near_indexer_primitives::{types, StreamerMessage};
use utils::process_streamer_message;
use utils::serialize_to_camel_case_json_string;

use crate::indexer_types::IndexerFunction;
use indexer_types::{IndexerQueueMessage, IndexerRegistry};
Expand Down Expand Up @@ -148,17 +148,15 @@ async fn handle_streamer_message(
let block_height: BlockHeight = context.streamer_message.block.header.height;

// Cache streamer message block and shards for use in real time processing
storage::setEx(
storage::set(
context.redis_connection_manager,
format!(
"{}{}{}:{}",
"{}{}",
storage::STREAMER_MESSAGE_HASH_KEY_BASE,
storage::LAKE_BUCKET_PREFIX,
context.chain_id,
block_height
),
180,
serde_json::to_string(&process_streamer_message(&context.streamer_message))?,
&serialize_to_camel_case_json_string(&context.streamer_message)?,
Some(60),
)
.await?;

Expand Down Expand Up @@ -222,6 +220,7 @@ async fn handle_streamer_message(
context.redis_connection_manager,
storage::generate_real_time_storage_key(&indexer_function.get_full_name()),
serde_json::to_string(indexer_function)?,
None,
)
.await?;
storage::xadd(
Expand Down
17 changes: 9 additions & 8 deletions indexer/queryapi_coordinator/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,19 @@ pub(crate) async fn stats(redis_connection_manager: storage::ConnectionManager)
}
}

pub(crate) fn process_streamer_message(
pub(crate) fn serialize_to_camel_case_json_string(
streamer_message: &near_lake_framework::near_indexer_primitives::StreamerMessage,
) -> Value {
) -> anyhow::Result<String, serde_json::Error> {
// Serialize the Message object to a JSON string
let json_str = serde_json::to_string(&streamer_message).unwrap();
let json_str = serde_json::to_string(&streamer_message)?;

// Deserialize the JSON string to a Value Object
let mut message_value: Value = serde_json::from_str(&json_str).unwrap();
let mut message_value: Value = serde_json::from_str(&json_str)?;

// Convert keys to Camel Case
to_camel_case_keys(&mut message_value);

return message_value;
return serde_json::to_string(&message_value);
}

fn to_camel_case_keys(message_value: &mut Value) {
Expand All @@ -86,9 +86,10 @@ fn to_camel_case_keys(message_value: &mut Value) {
.join("");

// Recursively process inner fields and update map with new key
let mut val = map.remove(&key).unwrap();
to_camel_case_keys(&mut val);
map.insert(new_key, val);
if let Some(mut val) = map.remove(&key) {
to_camel_case_keys(&mut val);
map.insert(new_key, val);
}
}
}
Value::Array(vec) => {
Expand Down
48 changes: 24 additions & 24 deletions indexer/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const STORAGE: &str = "storage_alertexer";

pub const LAKE_BUCKET_PREFIX: &str = "near-lake-data-";
pub const STREAMS_SET_KEY: &str = "streams";
pub const STREAMER_MESSAGE_HASH_KEY_BASE: &str = "streamer:message:cache:";
pub const STREAMER_MESSAGE_HASH_KEY_BASE: &str = "streamer:message:";

pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client {
redis::Client::open(redis_connection_str).expect("can create redis client")
Expand Down Expand Up @@ -49,29 +49,23 @@ pub async fn set(
redis_connection_manager: &ConnectionManager,
key: impl ToRedisArgs + std::fmt::Debug,
value: impl ToRedisArgs + std::fmt::Debug,
expiration: Option<usize>,
) -> anyhow::Result<()> {
redis::cmd("SET")
.arg(&key)
.arg(&value)
.query_async(&mut redis_connection_manager.clone())
.await?;
tracing::debug!(target: STORAGE, "SET: {:?}: {:?}", key, value,);
Ok(())
}

pub async fn setEx(
redis_connection_manager: &ConnectionManager,
key: impl ToRedisArgs + std::fmt::Debug,
expiration: usize,
value: impl ToRedisArgs + std::fmt::Debug,
) -> anyhow::Result<()> {
redis::cmd("SETEX")
.arg(&key)
.arg(expiration)
.arg(&value)
.query_async(&mut redis_connection_manager.clone())
let mut cmd = redis::cmd("SET");
cmd.arg(&key).arg(&value);

// Add expiration arguments if present
let exp_to_print: String;
if let Some(expiration) = expiration {
cmd.arg("EX").arg(expiration);
exp_to_print = format!("EX {}", expiration);
} else {
exp_to_print = "".to_string();
}

cmd.query_async(&mut redis_connection_manager.clone())
.await?;
tracing::debug!(target: STORAGE, "SETEX: {:?}: {:?} with expiration {:?}s", key, value, expiration);
tracing::debug!(target: STORAGE, "SET: {:?}: {:?} {:?}", key, value, exp_to_print);
Ok(())
}

Expand Down Expand Up @@ -131,7 +125,7 @@ pub async fn push_receipt_to_watching_list(
receipt_id: &str,
cache_value: &[u8],
) -> anyhow::Result<()> {
set(redis_connection_manager, receipt_id, cache_value).await?;
set(redis_connection_manager, receipt_id, cache_value, None).await?;
// redis::cmd("INCR")
// .arg(format!("receipts_{}", transaction_hash))
// .query_async(&mut redis_connection_manager.clone())
Expand Down Expand Up @@ -179,7 +173,13 @@ pub async fn update_last_indexed_block(
redis_connection_manager: &ConnectionManager,
block_height: u64,
) -> anyhow::Result<()> {
set(redis_connection_manager, "last_indexed_block", block_height).await?;
set(
redis_connection_manager,
"last_indexed_block",
block_height,
None
)
.await?;
redis::cmd("INCR")
.arg("blocks_processed")
.query_async(&mut redis_connection_manager.clone())
Expand Down
20 changes: 10 additions & 10 deletions runner/src/indexer/indexer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ CREATE TABLE
});

const transparentRedis = {
getStreamerMessageFromCache: jest.fn()
getStreamerMessage: jest.fn()
} as unknown as RedisClient;

beforeEach(() => {
Expand Down Expand Up @@ -201,7 +201,7 @@ CREATE TABLE
)
);
const mockRedis = {
getStreamerMessageFromCache: mockData
getStreamerMessage: mockData
} as unknown as RedisClient;

const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: mockRedis });
Expand Down Expand Up @@ -273,7 +273,7 @@ CREATE TABLE
test('Indexer.fetchStreamerMessage() should fetch the message from cache and use it directly', async () => {
const blockHeight = 85233529;
const blockHash = 'xyz';
const getMessageFromCache = jest.fn()
const getMessage = jest.fn()
.mockReturnValueOnce(JSON.stringify(
{
block: {
Expand All @@ -287,15 +287,15 @@ CREATE TABLE
}
));
const mockRedis = {
getStreamerMessageFromCache: getMessageFromCache
getStreamerMessage: getMessage
} as unknown as RedisClient;
const indexer = new Indexer('mainnet', { redisClient: mockRedis });

const streamerMessage = await indexer.fetchStreamerMessage(blockHeight, false);

expect(getMessageFromCache).toHaveBeenCalledTimes(1);
expect(JSON.stringify(getMessageFromCache.mock.calls[0])).toEqual(
`["near-lake-data-mainnet",${blockHeight}]`
expect(getMessage).toHaveBeenCalledTimes(1);
expect(JSON.stringify(getMessage.mock.calls[0])).toEqual(
`[${blockHeight}]`
);
const block = Block.fromStreamerMessage(streamerMessage);

Expand Down Expand Up @@ -339,7 +339,7 @@ CREATE TABLE
Bucket: 'near-lake-data-mainnet',
Key: `${blockHeight.toString().padStart(12, '0')}/shard_0.json`
})));
expect(transparentRedis.getStreamerMessageFromCache).toHaveBeenCalledTimes(1);
expect(transparentRedis.getStreamerMessage).toHaveBeenCalledTimes(1);

const block = Block.fromStreamerMessage(streamerMessage);

Expand Down Expand Up @@ -371,7 +371,7 @@ CREATE TABLE
send: mockSend,
} as unknown as S3Client;
const mockRedis = {
getStreamerMessageFromCache: jest.fn()
getStreamerMessage: jest.fn()
} as unknown as RedisClient;
const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: mockRedis });

Expand All @@ -386,7 +386,7 @@ CREATE TABLE
Bucket: 'near-lake-data-mainnet',
Key: `${blockHeight.toString().padStart(12, '0')}/shard_0.json`
})));
expect(mockRedis.getStreamerMessageFromCache).toHaveBeenCalledTimes(0);
expect(mockRedis.getStreamerMessage).toHaveBeenCalledTimes(0);

const block = Block.fromStreamerMessage(streamerMessage);

Expand Down
8 changes: 4 additions & 4 deletions runner/src/indexer/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,13 @@ export default class Indexer {

async fetchStreamerMessage (blockHeight: number, isHistorical: boolean): Promise<{ block: any, shards: any[] }> {
if (!isHistorical) {
const cachedMessage = await this.deps.redisClient.getStreamerMessageFromCache(`near-lake-data-${this.network}`, blockHeight);
if (cachedMessage) { // Cache hit on streamer message
METRICS.CACHE_HIT_STREAMER_MESSAGE.labels(isHistorical ? 'historical' : 'realtime').inc(); // increment the cache hit counter
const cachedMessage = await this.deps.redisClient.getStreamerMessage(blockHeight);
if (cachedMessage) {
METRICS.CACHE_HIT.labels(isHistorical ? 'historical' : 'real-time', 'streamer_message').inc();
const parsedMessage = JSON.parse(cachedMessage);
return parsedMessage;
} else {
METRICS.CACHE_MISS_STREAMER_MESSAGE.labels(isHistorical ? 'historical' : 'realtime').inc(); // increment the cache miss counter
METRICS.CACHE_MISS.labels(isHistorical ? 'historical' : 'real-time', 'streamer_message').inc();
}
}
const blockPromise = this.fetchBlockPromise(blockHeight);
Expand Down
20 changes: 10 additions & 10 deletions runner/src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,23 @@ const EXECUTION_DURATION = new promClient.Gauge({
labelNames: ['indexer', 'type'],
});

const CACHE_HIT_STREAMER_MESSAGE = new promClient.Counter({
name: 'redis_cache_hit_for_streamer_message',
help: 'The number of times the streamer message cache was hit',
labelNames: ['type']
const CACHE_HIT = new promClient.Counter({
name: 'queryapi_runner_cache_hit',
help: 'The number of times cache was hit successfully',
labelNames: ['type', 'key']
});

const CACHE_MISS_STREAMER_MESSAGE = new promClient.Counter({
name: 'redis_cache_miss_for_streamer_message',
help: 'The number of times the streamer message cache was missed',
labelNames: ['type']
const CACHE_MISS = new promClient.Counter({
name: 'queryapi_runner_cache_miss',
help: 'The number of times cache was missed',
labelNames: ['type', 'key']
});

export const METRICS = {
EXECUTION_DURATION,
UNPROCESSED_STREAM_MESSAGES,
CACHE_HIT_STREAMER_MESSAGE,
CACHE_MISS_STREAMER_MESSAGE
CACHE_HIT,
CACHE_MISS
};

export const startServer = async (): Promise<void> => {
Expand Down
4 changes: 2 additions & 2 deletions runner/src/redis-client/redis-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ describe('RedisClient', () => {
} as any;

const client = new RedisClient(mockClient);
await client.getStreamerMessageFromCache('near-lake-data-mainnet', 1000);
await client.getStreamerMessage(1000);

expect(mockClient.get).toHaveBeenCalledWith('streamer:message:cache:near-lake-data-mainnet:1000');
expect(mockClient.get).toHaveBeenCalledWith('streamer:message:1000');
});
});
6 changes: 3 additions & 3 deletions runner/src/redis-client/redis-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export default class RedisClient {
SMALLEST_STREAM_ID = '0';
LARGEST_STREAM_ID = '+';
STREAMS_SET_KEY = 'streams';
STREAMER_MESSAGE_HASH_KEY_BASE = 'streamer:message:cache:';
STREAMER_MESSAGE_HASH_KEY_BASE = 'streamer:message:';

constructor (
private readonly client: RedisClientType = createClient({ url: process.env.REDIS_CONNECTION_STRING })
Expand Down Expand Up @@ -85,7 +85,7 @@ export default class RedisClient {
return await this.client.sMembers(this.STREAMS_SET_KEY);
}

async getStreamerMessageFromCache (source: string, blockHeight: number): Promise<string | null> {
return await this.client.get(`${this.STREAMER_MESSAGE_HASH_KEY_BASE}${source}:${blockHeight}`);
async getStreamerMessage (blockHeight: number): Promise<string | null> {
return await this.client.get(`${this.STREAMER_MESSAGE_HASH_KEY_BASE}${blockHeight}`);
}
}

0 comments on commit 507b214

Please sign in to comment.