Skip to content

Commit

Permalink
Indexer grpc refactor cache performance (#6711)
Browse files Browse the repository at this point in the history
  • Loading branch information
larry-aptos authored Mar 3, 2023
1 parent f9b75b2 commit ab3983b
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 27 deletions.
50 changes: 24 additions & 26 deletions ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,28 +142,24 @@ async fn process_raw_datastream_response(
datastream::raw_datastream_response::Response::Data(data) => {
let transaction_len = data.transactions.len();
let start_version = data.transactions.first().unwrap().version;
let transactions = data
.transactions
.into_iter()
.map(|tx| {
let timestamp_in_seconds = match tx.timestamp {
Some(timestamp) => timestamp.seconds as u64,
None => 0,
};
(tx.version, tx.encoded_proto_data, timestamp_in_seconds)
})
.collect::<Vec<(u64, String, u64)>>();

for e in data.transactions {
let version = e.version;
let timestamp_in_seconds = match e.timestamp {
Some(t) => t.seconds,
// For Genesis block, there is no timestamp.
None => 0,
};
// Push to cache.
match cache_operator
.update_cache_transaction(
version,
e.encoded_proto_data,
timestamp_in_seconds as u64,
)
.await
{
Ok(_) => {},
Err(e) => {
anyhow::bail!("Update cache with version failed: {}", e);
},
}
// Push to cache.
match cache_operator.update_cache_transactions(transactions).await {
Ok(_) => {},
Err(e) => {
anyhow::bail!("Update cache with version failed: {}", e);
},
}
Ok(GrpcDataStatus::ChunkDataOk {
start_version,
Expand All @@ -184,11 +180,13 @@ async fn setup_cache_with_init_signal(
) {
let (fullnode_chain_id, starting_version) =
match init_signal.response.expect("Response type not exists.") {
Response::Status(status_frame) => match status_frame.r#type {
0 => (init_signal.chain_id, status_frame.start_version),
_ => {
panic!("[Indexer Cache] Streaming error: first frame is not INIT signal.");
},
Response::Status(status_frame) => {
match StatusType::from_i32(status_frame.r#type).expect("Invalid status type.") {
StatusType::Init => (init_signal.chain_id, status_frame.start_version),
_ => {
panic!("[Indexer Cache] Streaming error: first frame is not INIT signal.");
},
}
},
_ => {
panic!("[Indexer Cache] Streaming error: first frame is not siganl frame.");
Expand Down
25 changes: 24 additions & 1 deletion ecosystem/indexer-grpc/indexer-grpc-utils/src/cache_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::constants::BLOB_STORAGE_SIZE;
use redis::{AsyncCommands, RedisError};
use redis::{AsyncCommands, RedisError, RedisResult};

// Configurations for cache.
// The cache size is estimated to be 10M transactions.
Expand Down Expand Up @@ -200,6 +200,29 @@ impl<T: redis::aio::ConnectionLike + Send> CacheOperator<T> {
}
}

pub async fn update_cache_transactions(
&mut self,
transactions: Vec<(u64, String, u64)>,
) -> anyhow::Result<()> {
let mut redis_pipeline = redis::pipe();
for (version, encoded_proto_data, timestamp_in_seconds) in transactions {
redis_pipeline
.cmd("SET")
.arg(version)
.arg(encoded_proto_data)
.arg("EX")
.arg(get_ttl_in_seconds(timestamp_in_seconds))
.ignore();
}
let redis_result: RedisResult<()> =
redis_pipeline.query_async::<_, _>(&mut self.conn).await;

match redis_result {
Ok(_) => Ok(()),
Err(err) => Err(err.into()),
}
}

// Update the latest version in cache.
pub async fn update_cache_latest_version(
&mut self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ impl FileStoreOperator {

/// Bootstraps the file store operator. This is required before any other operations.
pub async fn verify_storage_bucket_existence(&self) {
aptos_logger::info!(
bucket_name = self.bucket_name,
"Before file store operator starts, verify the bucket exists."
);
// Verifies the bucket exists.
Bucket::read(&self.bucket_name)
.await
Expand Down

0 comments on commit ab3983b

Please sign in to comment.