diff --git a/ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/worker.rs b/ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/worker.rs index 01231225200ae..d17ba87d3c75f 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/worker.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/worker.rs @@ -142,28 +142,24 @@ async fn process_raw_datastream_response( datastream::raw_datastream_response::Response::Data(data) => { let transaction_len = data.transactions.len(); let start_version = data.transactions.first().unwrap().version; + let transactions = data + .transactions + .into_iter() + .map(|tx| { + let timestamp_in_seconds = match tx.timestamp { + Some(timestamp) => timestamp.seconds as u64, + None => 0, + }; + (tx.version, tx.encoded_proto_data, timestamp_in_seconds) + }) + .collect::>(); - for e in data.transactions { - let version = e.version; - let timestamp_in_seconds = match e.timestamp { - Some(t) => t.seconds, - // For Genesis block, there is no timestamp. - None => 0, - }; - // Push to cache. - match cache_operator - .update_cache_transaction( - version, - e.encoded_proto_data, - timestamp_in_seconds as u64, - ) - .await - { - Ok(_) => {}, - Err(e) => { - anyhow::bail!("Update cache with version failed: {}", e); - }, - } + // Push to cache. + match cache_operator.update_cache_transactions(transactions).await { + Ok(_) => {}, + Err(e) => { + anyhow::bail!("Update cache with version failed: {}", e); + }, } Ok(GrpcDataStatus::ChunkDataOk { start_version, @@ -184,11 +180,13 @@ async fn setup_cache_with_init_signal( ) { let (fullnode_chain_id, starting_version) = match init_signal.response.expect("Response type not exists.") { - Response::Status(status_frame) => match status_frame.r#type { - 0 => (init_signal.chain_id, status_frame.start_version), - _ => { - panic!("[Indexer Cache] Streaming error: first frame is not INIT signal."); - }, + Response::Status(status_frame) => { + match StatusType::from_i32(status_frame.r#type).expect("Invalid status type.") { + StatusType::Init => (init_signal.chain_id, status_frame.start_version), + _ => { + panic!("[Indexer Cache] Streaming error: first frame is not INIT signal."); + }, + } }, _ => { panic!("[Indexer Cache] Streaming error: first frame is not siganl frame."); diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/cache_operator.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/cache_operator.rs index 69a256d0ea82a..c33665c4f9f98 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-utils/src/cache_operator.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/src/cache_operator.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::constants::BLOB_STORAGE_SIZE; -use redis::{AsyncCommands, RedisError}; +use redis::{AsyncCommands, RedisError, RedisResult}; // Configurations for cache. // The cache size is estimated to be 10M transactions. @@ -200,6 +200,29 @@ impl CacheOperator { } } + pub async fn update_cache_transactions( + &mut self, + transactions: Vec<(u64, String, u64)>, + ) -> anyhow::Result<()> { + let mut redis_pipeline = redis::pipe(); + for (version, encoded_proto_data, timestamp_in_seconds) in transactions { + redis_pipeline + .cmd("SET") + .arg(version) + .arg(encoded_proto_data) + .arg("EX") + .arg(get_ttl_in_seconds(timestamp_in_seconds)) + .ignore(); + } + let redis_result: RedisResult<()> = + redis_pipeline.query_async::<_, _>(&mut self.conn).await; + + match redis_result { + Ok(_) => Ok(()), + Err(err) => Err(err.into()), + } + } + // Update the latest version in cache. pub async fn update_cache_latest_version( &mut self, diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator.rs index bd6b0212c22c3..8c4142b09b652 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator.rs @@ -67,6 +67,10 @@ impl FileStoreOperator { /// Bootstraps the file store operator. This is required before any other operations. pub async fn verify_storage_bucket_existence(&self) { + aptos_logger::info!( + bucket_name = self.bucket_name, + "Before file store operator starts, verify the bucket exists." + ); // Verifies the bucket exists. Bucket::read(&self.bucket_name) .await