Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Indexer grpc refactor cache performance #6711

Merged
merged 2 commits into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 24 additions & 26 deletions ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,28 +142,24 @@ async fn process_raw_datastream_response(
datastream::raw_datastream_response::Response::Data(data) => {
let transaction_len = data.transactions.len();
let start_version = data.transactions.first().unwrap().version;
let transactions = data
.transactions
.into_iter()
.map(|tx| {
let timestamp_in_seconds = match tx.timestamp {
Some(timestamp) => timestamp.seconds as u64,
None => 0,
};
(tx.version, tx.encoded_proto_data, timestamp_in_seconds)
})
.collect::<Vec<(u64, String, u64)>>();

for e in data.transactions {
let version = e.version;
let timestamp_in_seconds = match e.timestamp {
Some(t) => t.seconds,
// For Genesis block, there is no timestamp.
None => 0,
};
// Push to cache.
match cache_operator
.update_cache_transaction(
version,
e.encoded_proto_data,
timestamp_in_seconds as u64,
)
.await
{
Ok(_) => {},
Err(e) => {
anyhow::bail!("Update cache with version failed: {}", e);
},
}
// Push to cache.
match cache_operator.update_cache_transactions(transactions).await {
Ok(_) => {},
Err(e) => {
anyhow::bail!("Update cache with version failed: {}", e);
},
}
Ok(GrpcDataStatus::ChunkDataOk {
start_version,
Expand All @@ -184,11 +180,13 @@ async fn setup_cache_with_init_signal(
) {
let (fullnode_chain_id, starting_version) =
match init_signal.response.expect("Response type not exists.") {
Response::Status(status_frame) => match status_frame.r#type {
0 => (init_signal.chain_id, status_frame.start_version),
_ => {
panic!("[Indexer Cache] Streaming error: first frame is not INIT signal.");
},
Response::Status(status_frame) => {
match StatusType::from_i32(status_frame.r#type).expect("Invalid status type.") {
StatusType::Init => (init_signal.chain_id, status_frame.start_version),
_ => {
panic!("[Indexer Cache] Streaming error: first frame is not INIT signal.");
},
}
},
_ => {
panic!("[Indexer Cache] Streaming error: first frame is not siganl frame.");
Expand Down
25 changes: 24 additions & 1 deletion ecosystem/indexer-grpc/indexer-grpc-utils/src/cache_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::constants::BLOB_STORAGE_SIZE;
use redis::{AsyncCommands, RedisError};
use redis::{AsyncCommands, RedisError, RedisResult};

// Configurations for cache.
// The cache size is estimated to be 10M transactions.
Expand Down Expand Up @@ -200,6 +200,29 @@ impl<T: redis::aio::ConnectionLike + Send> CacheOperator<T> {
}
}

pub async fn update_cache_transactions(
&mut self,
transactions: Vec<(u64, String, u64)>,
) -> anyhow::Result<()> {
let mut redis_pipeline = redis::pipe();
for (version, encoded_proto_data, timestamp_in_seconds) in transactions {
redis_pipeline
.cmd("SET")
.arg(version)
.arg(encoded_proto_data)
.arg("EX")
.arg(get_ttl_in_seconds(timestamp_in_seconds))
.ignore();
}
let redis_result: RedisResult<()> =
redis_pipeline.query_async::<_, _>(&mut self.conn).await;

match redis_result {
Ok(_) => Ok(()),
Err(err) => Err(err.into()),
}
}

// Update the latest version in cache.
pub async fn update_cache_latest_version(
&mut self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ impl FileStoreOperator {

/// Bootstraps the file store operator. This is required before any other operations.
pub async fn verify_storage_bucket_existence(&self) {
aptos_logger::info!(
bucket_name = self.bucket_name,
"Before file store operator starts, verify the bucket exists."
);
// Verifies the bucket exists.
Bucket::read(&self.bucket_name)
.await
Expand Down