From 6d6b20d1371f4bf7fe95fa2a682ccefa70fb4feb Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Wed, 27 Sep 2023 11:47:10 -0700 Subject: [PATCH] Migrate Redis Caching to Coordinator --- indexer/queryapi_coordinator/src/main.rs | 17 ++ indexer/storage/src/lib.rs | 18 ++ runner/src/indexer/indexer.test.ts | 226 +++++++++-------------- runner/src/indexer/indexer.ts | 67 +++---- runner/src/redis-client/redis-client.ts | 17 +- 5 files changed, 144 insertions(+), 201 deletions(-) diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index 28c2dc3f3..d9330c0fc 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -1,3 +1,4 @@ +use actix_web::cookie::Expiration; use cached::SizedCache; use futures::stream::{self, StreamExt}; use near_jsonrpc_client::JsonRpcClient; @@ -27,6 +28,7 @@ pub(crate) const INTERVAL: std::time::Duration = std::time::Duration::from_milli pub(crate) const MAX_DELAY_TIME: std::time::Duration = std::time::Duration::from_millis(4000); pub(crate) const RETRY_COUNT: usize = 2; + type SharedIndexerRegistry = std::sync::Arc>; #[derive(Debug, Default, Clone, Copy)] @@ -146,6 +148,21 @@ async fn handle_streamer_message( let block_height: BlockHeight = context.streamer_message.block.header.height; + // Cache streamer message block and shards for use in real time processing + storage::setEx( + context.redis_connection_manager, + format!( + "{}{}{}:{}", + storage::STREAMER_MESSAGE_HASH_KEY_BASE, + storage::LAKE_BUCKET_PREFIX, + context.chain_id, + block_height + ), + 180, + serde_json::to_string(&context.streamer_message)?, + ) + .await?; + let spawned_indexers = indexer_registry::index_registry_changes( block_height, &mut indexer_registry_locked, diff --git a/indexer/storage/src/lib.rs b/indexer/storage/src/lib.rs index 6c449b6b2..4ee362e3f 100644 --- a/indexer/storage/src/lib.rs +++ b/indexer/storage/src/lib.rs @@ -2,7 +2,9 @@ pub use redis::{self, aio::ConnectionManager, FromRedisValue, ToRedisArgs}; const STORAGE: &str = "storage_alertexer"; +pub const LAKE_BUCKET_PREFIX: &str = "near-lake-data-"; pub const STREAMS_SET_KEY: &str = "streams"; +pub const STREAMER_MESSAGE_HASH_KEY_BASE: &str = "streamer:message:cache:"; pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client { redis::Client::open(redis_connection_str).expect("can create redis client") @@ -57,6 +59,22 @@ pub async fn set( Ok(()) } +pub async fn setEx( + redis_connection_manager: &ConnectionManager, + key: impl ToRedisArgs + std::fmt::Debug, + expiration: usize, + value: impl ToRedisArgs + std::fmt::Debug, +) -> anyhow::Result<()> { + redis::cmd("SETEX") + .arg(&key) + .arg(expiration) + .arg(&value) + .query_async(&mut redis_connection_manager.clone()) + .await?; + tracing::debug!(target: STORAGE, "SETEX: {:?}: {:?} with expiration {:?}s", key, value, expiration); + Ok(()) +} + pub async fn get( redis_connection_manager: &ConnectionManager, key: impl ToRedisArgs + std::fmt::Debug, diff --git a/runner/src/indexer/indexer.test.ts b/runner/src/indexer/indexer.test.ts index 3a17bb52f..0a1d0c8e7 100644 --- a/runner/src/indexer/indexer.test.ts +++ b/runner/src/indexer/indexer.test.ts @@ -164,10 +164,7 @@ CREATE TABLE }); const transparentRedis = { - getStreamerBlockFromCache: jest.fn(), - addStreamerBlockToCache: jest.fn(), - getStreamerShardFromCache: jest.fn(), - addStreamerShardToCache: jest.fn() + getStreamerMessageFromCache: jest.fn() } as unknown as RedisClient; beforeEach(() => { @@ -191,16 +188,20 @@ CREATE TABLE })); const blockHeight = 456; const mockData = jest.fn().mockResolvedValue( - JSON.stringify({ - chunks: [], - header: { - height: blockHeight + JSON.stringify( + { + block: { + chunks: [], + header: { + height: blockHeight + } + }, + shards: {} } - }) + ) ); const mockRedis = { - getStreamerShardFromCache: mockData, - getStreamerBlockFromCache: mockData + getStreamerMessageFromCache: mockData } as unknown as RedisClient; const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: mockRedis }); @@ -218,30 +219,7 @@ CREATE TABLE expect(mockFetch.mock.calls).toMatchSnapshot(); }); - test('Indexer.fetchBlock() should fetch a block from cache', async () => { - const author = 'dokiacapital.poolv1.near'; - const mockData = jest.fn().mockResolvedValue( - JSON.stringify({ - author - }) - ); - const mockRedis = { - getStreamerBlockFromCache: mockData - } as unknown as RedisClient; - const indexer = new Indexer('mainnet', { redisClient: mockRedis }); - - const blockHeight = 84333960; - const block = await indexer.fetchBlockPromise(blockHeight, false); - - expect(mockRedis.getStreamerBlockFromCache).toHaveBeenCalledTimes(1); - expect(mockRedis.getStreamerBlockFromCache).toHaveBeenCalledWith( - 'near-lake-data-mainnet', - `${blockHeight.toString().padStart(12, '0')}/block.json` - ); - expect(block.author).toEqual(author); - }); - - test('Indexer.fetchBlock() should fetch a block from the S3 after cache miss', async () => { + test('Indexer.fetchBlock() should fetch a block from S3', async () => { const author = 'dokiacapital.poolv1.near'; const mockData = JSON.stringify({ author @@ -258,7 +236,7 @@ CREATE TABLE const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: transparentRedis }); const blockHeight = 84333960; - const block = await indexer.fetchBlockPromise(blockHeight, false); + const block = await indexer.fetchBlockPromise(blockHeight); const params = { Bucket: 'near-lake-data-mainnet', Key: `${blockHeight.toString().padStart(12, '0')}/block.json` @@ -266,61 +244,10 @@ CREATE TABLE expect(mockS3.send).toHaveBeenCalledTimes(1); expect(JSON.stringify(mockSend.mock.calls[0][0])).toMatch(JSON.stringify(new GetObjectCommand(params))); - expect(transparentRedis.getStreamerBlockFromCache).toHaveBeenCalledTimes(1); - expect(transparentRedis.addStreamerBlockToCache).toHaveBeenCalledWith(params.Bucket, params.Key, mockData); expect(block.author).toEqual(author); }); - test('Indexer.fetchBlock() should fetch a block not from cache but from the S3 if historical', async () => { - const author = 'dokiacapital.poolv1.near'; - const mockSend = jest.fn().mockResolvedValue({ - Body: { - transformToString: () => JSON.stringify({ - author - }) - } - }); - const mockS3 = { - send: mockSend, - } as unknown as S3Client; - const mockRedis = { - getStreamerBlockFromCache: jest.fn(), - addStreamerBlockToCache: jest.fn() - } as unknown as RedisClient; - const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: mockRedis }); - - const blockHeight = 84333960; - const block = await indexer.fetchBlockPromise(blockHeight, true); - - expect(mockS3.send).toHaveBeenCalledTimes(1); - expect(JSON.stringify(mockSend.mock.calls[0][0])).toMatch(JSON.stringify(new GetObjectCommand({ - Bucket: 'near-lake-data-mainnet', - Key: `${blockHeight.toString().padStart(12, '0')}/block.json` - }))); - expect(mockRedis.getStreamerBlockFromCache).toHaveBeenCalledTimes(0); - expect(mockRedis.addStreamerBlockToCache).toHaveBeenCalledTimes(0); - expect(block.author).toEqual(author); - }); - - test('Indexer.fetchShard() should fetch the steamer message from cache', async () => { - const mockData = jest.fn().mockResolvedValue(JSON.stringify({})); - const mockRedis = { - getStreamerShardFromCache: mockData - } as unknown as RedisClient; - const indexer = new Indexer('mainnet', { redisClient: mockRedis }); - - const blockHeight = 82699904; - const shard = 0; - await indexer.fetchShardPromise(blockHeight, shard, false); - - expect(mockRedis.getStreamerShardFromCache).toHaveBeenCalledTimes(1); - expect(mockRedis.getStreamerShardFromCache).toHaveBeenCalledWith( - 'near-lake-data-mainnet', - `${blockHeight.toString().padStart(12, '0')}/shard_${shard}.json` - ); - }); - - test('Indexer.fetchShard() should fetch the steamer message from S3 after cache miss', async () => { + test('Indexer.fetchShard() should fetch a shard from S3', async () => { const mockData = JSON.stringify({}); const mockSend = jest.fn().mockResolvedValue({ Body: { @@ -330,11 +257,7 @@ CREATE TABLE const mockS3 = { send: mockSend, } as unknown as S3Client; - const mockRedis = { - getStreamerShardFromCache: jest.fn(), - addStreamerShardToCache: jest.fn() - } as unknown as RedisClient; - const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: mockRedis }); + const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: transparentRedis }); const blockHeight = 82699904; const shard = 0; @@ -342,68 +265,37 @@ CREATE TABLE Bucket: 'near-lake-data-mainnet', Key: `${blockHeight.toString().padStart(12, '0')}/shard_${shard}.json` }; - await indexer.fetchShardPromise(blockHeight, shard, false); + await indexer.fetchShardPromise(blockHeight, shard); expect(JSON.stringify(mockSend.mock.calls[0][0])).toMatch(JSON.stringify(new GetObjectCommand(params))); - expect(mockRedis.getStreamerShardFromCache).toHaveBeenCalledTimes(1); - expect(mockRedis.addStreamerShardToCache).toHaveBeenCalledWith(params.Bucket, params.Key, mockData); - }); - - test('Indexer.fetchShard() should fetch the steamer message not from cache but from S3 if historical', async () => { - const mockSend = jest.fn().mockResolvedValue({ - Body: { - transformToString: () => JSON.stringify({}) - } - }); - const mockS3 = { - send: mockSend, - } as unknown as S3Client; - const mockRedis = { - getStreamerShardFromCache: jest.fn(), - addStreamerShardToCache: jest.fn() - } as unknown as RedisClient; - const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: mockRedis }); - - const blockHeight = 82699904; - const shard = 0; - await indexer.fetchShardPromise(blockHeight, shard, true); - - expect(mockS3.send).toHaveBeenCalledTimes(1); - expect(JSON.stringify(mockSend.mock.calls[0][0])).toMatch(JSON.stringify(new GetObjectCommand({ - Bucket: 'near-lake-data-mainnet', - Key: `${blockHeight.toString().padStart(12, '0')}/shard_${shard}.json` - }))); - expect(mockRedis.getStreamerShardFromCache).toHaveBeenCalledTimes(0); - expect(mockRedis.addStreamerShardToCache).toHaveBeenCalledTimes(0); }); - test('Indexer.fetchStreamerMessage() should fetch the block/shards from cache and construct the streamer message', async () => { + test('Indexer.fetchStreamerMessage() should fetch the message from cache and use it directly', async () => { const blockHeight = 85233529; const blockHash = 'xyz'; - const getBlockFromCache = jest.fn() - .mockReturnValueOnce(JSON.stringify({ - chunks: [0], - header: { - height: blockHeight, - hash: blockHash, + const getMessageFromCache = jest.fn() + .mockReturnValueOnce(JSON.stringify( + { + block: { + chunks: [0], + header: { + height: blockHeight, + hash: blockHash, + } + }, + shards: {} } - })); - const getShardFromCache = jest.fn().mockReturnValue(JSON.stringify({})); + )); const mockRedis = { - getStreamerBlockFromCache: getBlockFromCache, - getStreamerShardFromCache: getShardFromCache + getStreamerMessageFromCache: getMessageFromCache } as unknown as RedisClient; const indexer = new Indexer('mainnet', { redisClient: mockRedis }); const streamerMessage = await indexer.fetchStreamerMessage(blockHeight, false); - expect(getBlockFromCache).toHaveBeenCalledTimes(1); - expect(getShardFromCache).toHaveBeenCalledTimes(4); - expect(JSON.stringify(getBlockFromCache.mock.calls[0])).toEqual( - `["near-lake-data-mainnet","${blockHeight.toString().padStart(12, '0')}/block.json"]` - ); - expect(JSON.stringify(getShardFromCache.mock.calls[1])).toEqual( - `["near-lake-data-mainnet","${blockHeight.toString().padStart(12, '0')}/shard_1.json"]`, + expect(getMessageFromCache).toHaveBeenCalledTimes(1); + expect(JSON.stringify(getMessageFromCache.mock.calls[0])).toEqual( + `["near-lake-data-mainnet",${blockHeight}]` ); const block = Block.fromStreamerMessage(streamerMessage); @@ -411,7 +303,7 @@ CREATE TABLE expect(block.blockHash).toEqual(blockHash); }); - test('Indexer.fetchStreamerMessage() should fetch the block/shards from S3 and construct the streamer message', async () => { + test('Indexer.fetchStreamerMessage() should fetch the block and shards from S3 upon cache miss', async () => { const blockHeight = 85233529; const blockHash = 'xyz'; const mockSend = jest.fn() @@ -447,6 +339,54 @@ CREATE TABLE Bucket: 'near-lake-data-mainnet', Key: `${blockHeight.toString().padStart(12, '0')}/shard_0.json` }))); + expect(transparentRedis.getStreamerMessageFromCache).toHaveBeenCalledTimes(1); + + const block = Block.fromStreamerMessage(streamerMessage); + + expect(block.blockHeight).toEqual(blockHeight); + expect(block.blockHash).toEqual(blockHash); + }); + + test('Indexer.fetchStreamerMessage() should fetch the block and shards from S3 and not cache and construct the streamer message if historical', async () => { + const blockHeight = 85233529; + const blockHash = 'xyz'; + const mockSend = jest.fn() + .mockReturnValueOnce({ // block + Body: { + transformToString: () => JSON.stringify({ + chunks: [0], + header: { + height: blockHeight, + hash: blockHash, + } + }) + } + }) + .mockReturnValue({ // shard + Body: { + transformToString: () => JSON.stringify({}) + } + }); + const mockS3 = { + send: mockSend, + } as unknown as S3Client; + const mockRedis = { + getStreamerMessageFromCache: jest.fn() + } as unknown as RedisClient; + const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: mockRedis }); + + const streamerMessage = await indexer.fetchStreamerMessage(blockHeight, true); + + expect(mockSend).toHaveBeenCalledTimes(5); + expect(JSON.stringify(mockSend.mock.calls[0][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ + Bucket: 'near-lake-data-mainnet', + Key: `${blockHeight.toString().padStart(12, '0')}/block.json` + }))); + expect(JSON.stringify(mockSend.mock.calls[1][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ + Bucket: 'near-lake-data-mainnet', + Key: `${blockHeight.toString().padStart(12, '0')}/shard_0.json` + }))); + expect(mockRedis.getStreamerMessageFromCache).toHaveBeenCalledTimes(0); const block = Block.fromStreamerMessage(streamerMessage); diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index d87f3d6d8..82e6e08e6 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -136,8 +136,15 @@ export default class Indexer { } async fetchStreamerMessage (blockHeight: number, isHistorical: boolean): Promise<{ block: any, shards: any[] }> { - const blockPromise = this.fetchBlockPromise(blockHeight, isHistorical); - const shardsPromises = await this.fetchShardsPromises(blockHeight, 4, isHistorical); + if (!isHistorical) { + const cachedMessage = await this.deps.redisClient.getStreamerMessageFromCache(`near-lake-data-${this.network}`, blockHeight); + if (cachedMessage) { + const parsedMessage = JSON.parse(cachedMessage, (_key, value) => this.renameUnderscoreFieldsToCamelCase(value)); + return parsedMessage; + } + } + const blockPromise = this.fetchBlockPromise(blockHeight); + const shardsPromises = await this.fetchShardsPromises(blockHeight, 4); const results = await Promise.all([blockPromise, ...shardsPromises]); const block = results.shift(); @@ -148,59 +155,31 @@ export default class Indexer { }; } - async fetchShardsPromises (blockHeight: number, numberOfShards: number, isHistorical: boolean): Promise>> { + async fetchShardsPromises (blockHeight: number, numberOfShards: number): Promise>> { return ([...Array(numberOfShards).keys()].map(async (shardId) => - await this.fetchShardPromise(blockHeight, shardId, isHistorical) + await this.fetchShardPromise(blockHeight, shardId) )); } - async fetchShardPromise (blockHeight: number, shardId: number, isHistorical: boolean): Promise { - const bucket = `near-lake-data-${this.network}`; - const shardKey = `${this.normalizeBlockHeight(blockHeight)}/shard_${shardId}.json`; + async fetchShardPromise (blockHeight: number, shardId: number): Promise { const params = { - Bucket: bucket, - Key: shardKey, + Bucket: `near-lake-data-${this.network}`, + Key: `${this.normalizeBlockHeight(blockHeight)}/shard_${shardId}.json`, }; - let shardData; - - if (!isHistorical) { // Do not attempt hitting cache if historical process - shardData = await this.deps.redisClient.getStreamerShardFromCache(bucket, shardKey); - } - - if (!shardData) { // If cache miss or not historical, poll S3 - const response = await this.deps.s3.send(new GetObjectCommand(params)); - shardData = await response.Body?.transformToString() ?? '{}'; - - if (!isHistorical) { - await this.deps.redisClient.addStreamerShardToCache(bucket, shardKey, shardData); - } - } - + const response = await this.deps.s3.send(new GetObjectCommand(params)); + const shardData = await response.Body?.transformToString() ?? '{}'; return JSON.parse(shardData, (_key, value) => this.renameUnderscoreFieldsToCamelCase(value)); } - async fetchBlockPromise (blockHeight: number, isHistorical: boolean): Promise { - const bucket = `near-lake-data-${this.network}`; - const blockKey = `${this.normalizeBlockHeight(blockHeight)}/block.json`; + async fetchBlockPromise (blockHeight: number): Promise { + const file = 'block.json'; + const folder = this.normalizeBlockHeight(blockHeight); const params = { - Bucket: bucket, - Key: blockKey, + Bucket: 'near-lake-data-' + this.network, + Key: `${folder}/${file}`, }; - let blockData; - - if (!isHistorical) { // Do not attempt hitting cache if historical process - blockData = await this.deps.redisClient.getStreamerBlockFromCache(bucket, blockKey); - } - - if (!blockData) { // If cache miss or not historical, poll S3 - const response = await this.deps.s3.send(new GetObjectCommand(params)); - blockData = await response.Body?.transformToString() ?? '{}'; - - if (!isHistorical) { - await this.deps.redisClient.addStreamerBlockToCache(bucket, blockKey, blockData); - } - } - + const response = await this.deps.s3.send(new GetObjectCommand(params)); + const blockData = await response.Body?.transformToString() ?? '{}'; return JSON.parse(blockData, (_key, value) => this.renameUnderscoreFieldsToCamelCase(value)); } diff --git a/runner/src/redis-client/redis-client.ts b/runner/src/redis-client/redis-client.ts index 17a01e9be..45a5ee43d 100644 --- a/runner/src/redis-client/redis-client.ts +++ b/runner/src/redis-client/redis-client.ts @@ -20,6 +20,7 @@ export default class RedisClient { SMALLEST_STREAM_ID = '0'; LARGEST_STREAM_ID = '+'; STREAMS_SET_KEY = 'streams'; + STREAMER_MESSAGE_HASH_KEY_BASE = 'streamer:message:cache:'; STREAMER_BLOCK_HASH_KEY_BASE = 'streamer:block:cache:'; STREAMER_SHARD_HASH_KEY_BASE = 'streamer:shard:cache:'; @@ -86,19 +87,7 @@ export default class RedisClient { return await this.client.sMembers(this.STREAMS_SET_KEY); } - async addStreamerBlockToCache (hashKey: string, key: string, blockData: string): Promise { - await this.client.setEx(`${this.STREAMER_BLOCK_HASH_KEY_BASE}${hashKey}:${key}`, 30, blockData); - } - - async getStreamerBlockFromCache (hashKey: string, key: string): Promise { - return await this.client.get(`${this.STREAMER_BLOCK_HASH_KEY_BASE}${hashKey}:${key}`); - } - - async addStreamerShardToCache (hashKey: string, key: string, blockData: string): Promise { - await this.client.setEx(`${this.STREAMER_BLOCK_HASH_KEY_BASE}${hashKey}:${key}`, 30, blockData); - } - - async getStreamerShardFromCache (hashKey: string, key: string): Promise { - return await this.client.get(`${this.STREAMER_BLOCK_HASH_KEY_BASE}${hashKey}:${key}`); + async getStreamerMessageFromCache (source: string, blockHeight: number): Promise { + return await this.client.get(`${this.STREAMER_MESSAGE_HASH_KEY_BASE}${source}:${blockHeight}`); } }