From e7663c3ac1f6754c457c0bcbfc4f4f856629be56 Mon Sep 17 00:00:00 2001 From: Larry Liu Date: Thu, 11 Apr 2024 16:55:04 -0700 Subject: [PATCH] reduce logs and clean up. --- Cargo.lock | 25 -- Cargo.toml | 2 - .../indexer-grpc-cache-worker/src/worker.rs | 5 - .../indexer-grpc-data-access/Cargo.toml | 34 -- .../src/access_trait.rs | 98 ----- .../indexer-grpc-data-access/src/gcs.rs | 376 ---------------- .../indexer-grpc-data-access/src/in_memory.rs | 101 ----- .../src/in_memory_storage/mod.rs | 4 - .../src/in_memory_storage/storage.rs | 336 --------------- .../indexer-grpc-data-access/src/lib.rs | 145 ------- .../src/local_file.rs | 272 ------------ .../indexer-grpc-data-access/src/redis.rs | 217 ---------- .../indexer-grpc-data-service/Cargo.toml | 3 - .../src/grpc_response_stream.rs | 63 --- .../indexer-grpc-data-service/src/lib.rs | 2 - .../grpc_response_dispatcher.rs | 405 ------------------ .../src/response_dispatcher/mod.rs | 35 -- .../indexer-grpc-utils/src/in_memory_cache.rs | 15 - 18 files changed, 2138 deletions(-) delete mode 100644 ecosystem/indexer-grpc/indexer-grpc-data-access/Cargo.toml delete mode 100644 ecosystem/indexer-grpc/indexer-grpc-data-access/src/access_trait.rs delete mode 100644 ecosystem/indexer-grpc/indexer-grpc-data-access/src/gcs.rs delete mode 100644 ecosystem/indexer-grpc/indexer-grpc-data-access/src/in_memory.rs delete mode 100644 ecosystem/indexer-grpc/indexer-grpc-data-access/src/in_memory_storage/mod.rs delete mode 100644 ecosystem/indexer-grpc/indexer-grpc-data-access/src/in_memory_storage/storage.rs delete mode 100644 ecosystem/indexer-grpc/indexer-grpc-data-access/src/lib.rs delete mode 100644 ecosystem/indexer-grpc/indexer-grpc-data-access/src/local_file.rs delete mode 100644 ecosystem/indexer-grpc/indexer-grpc-data-access/src/redis.rs delete mode 100644 ecosystem/indexer-grpc/indexer-grpc-data-service/src/grpc_response_stream.rs delete mode 100644 ecosystem/indexer-grpc/indexer-grpc-data-service/src/response_dispatcher/grpc_response_dispatcher.rs delete mode 100644 ecosystem/indexer-grpc/indexer-grpc-data-service/src/response_dispatcher/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 42aeac3c33941..25d0a3cc86a8b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1947,38 +1947,13 @@ dependencies = [ "url", ] -[[package]] -name = "aptos-indexer-grpc-data-access" -version = "1.0.0" -dependencies = [ - "anyhow", - "aptos-protos 1.3.0", - "async-trait", - "base64 0.13.1", - "dashmap", - "enum_dispatch", - "google-cloud-storage", - "prost 0.12.3", - "redis", - "redis-test", - "serde", - "serde_json", - "tempfile", - "thiserror", - "tokio", - "tokio-util 0.7.10", - "tracing", -] - [[package]] name = "aptos-indexer-grpc-data-service" version = "1.0.0" dependencies = [ "anyhow", - "aptos-indexer-grpc-data-access", "aptos-indexer-grpc-server-framework", "aptos-indexer-grpc-utils", - "aptos-logger", "aptos-metrics-core", "aptos-moving-average 0.1.0 (git+https://github.com/aptos-labs/aptos-indexer-processors.git?rev=4801acae7aea30d7e96bbfbe5ec5b04056dfa4cf)", "aptos-protos 1.3.0", diff --git a/Cargo.toml b/Cargo.toml index bf4283068415e..89f0f4d6c971e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -113,7 +113,6 @@ members = [ "devtools/aptos-cargo-cli", "dkg", "ecosystem/indexer-grpc/indexer-grpc-cache-worker", - "ecosystem/indexer-grpc/indexer-grpc-data-access", "ecosystem/indexer-grpc/indexer-grpc-data-service", "ecosystem/indexer-grpc/indexer-grpc-file-store", "ecosystem/indexer-grpc/indexer-grpc-fullnode", @@ -342,7 +341,6 @@ aptos-id-generator = { path = "crates/aptos-id-generator" } aptos-indexer = { path = "crates/indexer" } aptos-indexer-grpc-cache-worker = { path = "ecosystem/indexer-grpc/indexer-grpc-cache-worker" } aptos-indexer-grpc-data-service = { path = "ecosystem/indexer-grpc/indexer-grpc-data-service" } -aptos-indexer-grpc-data-access = { path = "ecosystem/indexer-grpc/indexer-grpc-data-access" } aptos-indexer-grpc-file-store = { path = "ecosystem/indexer-grpc/indexer-grpc-file-store" } aptos-indexer-grpc-fullnode = { path = "ecosystem/indexer-grpc/indexer-grpc-fullnode" } aptos-indexer-grpc-table-info = { path = "ecosystem/indexer-grpc/indexer-grpc-table-info" } diff --git a/ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/worker.rs b/ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/worker.rs index 2af05115f8de5..73452678c950e 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/worker.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/worker.rs @@ -437,11 +437,6 @@ async fn process_streaming_response( } // Cleanup. tasks_to_run = vec![]; - info!( - start_version = start_version, - num_of_transactions = num_of_transactions, - "[Indexer Cache] End signal received for current batch.", - ); if current_version != start_version + num_of_transactions { error!( current_version = current_version, diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-access/Cargo.toml b/ecosystem/indexer-grpc/indexer-grpc-data-access/Cargo.toml deleted file mode 100644 index 593dd3f78e521..0000000000000 --- a/ecosystem/indexer-grpc/indexer-grpc-data-access/Cargo.toml +++ /dev/null @@ -1,34 +0,0 @@ -[package] -name = "aptos-indexer-grpc-data-access" -description = "Indexer gRPC Data Access." -version = "1.0.0" - -# Workspace inherited keys -authors = { workspace = true } -edition = { workspace = true } -homepage = { workspace = true } -license = { workspace = true } -publish = { workspace = true } -repository = { workspace = true } -rust-version = { workspace = true } - -[dependencies] -anyhow = { workspace = true } -aptos-protos = { workspace = true } -async-trait = { workspace = true } -base64 = { workspace = true } -dashmap = { workspace = true } -enum_dispatch = { workspace = true } -google-cloud-storage = { workspace = true } -prost = { workspace = true } -redis = { workspace = true } -redis-test = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } -thiserror = { workspace = true } -tokio = { workspace = true } -tokio-util = { workspace = true } -tracing = { workspace = true } - -[dev-dependencies] -tempfile = { workspace = true } diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-access/src/access_trait.rs b/ecosystem/indexer-grpc/indexer-grpc-data-access/src/access_trait.rs deleted file mode 100644 index 9f92da24aba2d..0000000000000 --- a/ecosystem/indexer-grpc/indexer-grpc-data-access/src/access_trait.rs +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use aptos_protos::transaction::v1::Transaction; -use std::fmt::Debug; -use thiserror::Error; - -#[derive(Clone, Debug, PartialEq)] -pub enum StorageReadStatus { - // Requested version is available for the given storage. - Ok(Vec), - // Requested version is not available yet for the given storage. - NotAvailableYet, - // Requested version is not available anymore for the given storage. - NotFound, -} - -#[derive(Error, Debug)] -pub enum StorageReadError { - // Storage is not available; but you can try again later. - #[error("[{0}] Storage access transient error: {1:#}")] - TransientError(&'static str, #[source] anyhow::Error), - // Storage is not available; and you should not try again. - #[error("[{0}] Storage access permanent error: {1:#}")] - PermenantError(&'static str, #[source] anyhow::Error), -} - -#[derive(Clone, Debug, Default)] -pub struct AccessMetadata { - // The chain id of the transactions; this is used to check if the transactions are from the same chain. - pub chain_id: u64, - // The next version in the storage to process. - pub next_version: u64, -} - -impl PartialEq for AccessMetadata { - fn eq(&self, other: &Self) -> bool { - self.chain_id == other.chain_id - } -} - -/// StorageTransactionRead is the interface for reading transactions from storage. It's expected to be implemented by all storages and -/// cloning the trait object should be cheap. -#[async_trait::async_trait] -#[enum_dispatch::enum_dispatch(StorageClient)] -pub trait StorageTransactionRead: Send + Sync + Clone { - // Fetches the transactions from storage starting from the given version. - // The response returned has the following semantics: - // - If the requested version is available, the response will contain the transactions starting from the requested version. - // - If the requested version is not available yet, NotAvailableYet will be returned. - // - If the requested version is not available anymore, NotFound will be returned. - async fn get_transactions( - &self, - batch_starting_version: u64, - size_hint: Option, - ) -> Result; - - // Fetches the metadata from storage and check against the other storages. - // E.g., redis metadata == gcs metadata == in-memory metadata. - async fn get_metadata(&self) -> Result; - - async fn is_storage_ready(&self) -> bool { - self.get_metadata().await.is_ok() - } -} - -// TODO: Add write interface for cache worker + file storage. - -#[cfg(test)] -mod tests { - use super::*; - - #[tokio::test] - async fn test_access_metadata_different_chain() { - let mainnet_metadata = AccessMetadata { - chain_id: 1, - next_version: 100, - }; - let testnet_metadata = AccessMetadata { - chain_id: 2, - next_version: 100, - }; - assert_ne!(mainnet_metadata, testnet_metadata); - } - - #[tokio::test] - async fn test_access_metadata_same_chain() { - let mainnet_metadata = AccessMetadata { - chain_id: 1, - next_version: 100, - }; - let testnet_metadata = AccessMetadata { - chain_id: 1, - next_version: 101, - }; - assert_eq!(mainnet_metadata, testnet_metadata); - } -} diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-access/src/gcs.rs b/ecosystem/indexer-grpc/indexer-grpc-data-access/src/gcs.rs deleted file mode 100644 index 025e80fb3fc33..0000000000000 --- a/ecosystem/indexer-grpc/indexer-grpc-data-access/src/gcs.rs +++ /dev/null @@ -1,376 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use crate::{ - access_trait::{AccessMetadata, StorageReadError, StorageReadStatus, StorageTransactionRead}, - get_transactions_file_name, FileMetadata, TransactionsFile, -}; -use anyhow::Context; -use aptos_protos::transaction::v1::Transaction; -use google_cloud_storage::{ - client::{Client, ClientConfig}, - http::{ - objects::{download::Range, get::GetObjectRequest}, - Error, - }, -}; -use serde::{Deserialize, Serialize}; -use std::sync::{Arc, Mutex}; - -const GCS_STORAGE_NAME: &str = "Google Cloud Storage"; -const METADATA_FILE_NAME: &str = "metadata.json"; -// Avoid reading metadata file too often and use stale metadata instead. -const METADATA_FILE_MAX_STALENESS_IN_SECS: u64 = 30; // 30 seconds. - -#[derive(Clone, Debug, Default, Deserialize, Serialize)] -#[serde(deny_unknown_fields)] -pub struct GcsClientConfig { - bucket_name: String, -} - -pub type GcsClient = GcsInternalClient; - -impl GcsClient { - pub async fn new(config: GcsClientConfig) -> anyhow::Result { - let gcs_config = ClientConfig::default() - .with_auth() - .await - .context("Failed to create GCS client.")?; - let client = Client::new(gcs_config); - GcsInternalClient::new_with_client(config.bucket_name, client).await - } -} - -#[derive(Clone)] -pub struct GcsInternalClient { - // Bucket name. - pub bucket_name: String, - latest_metadata: Arc>, - latest_metadata_timestamp: Arc>>, - pub gcs_client: T, -} - -impl GcsInternalClient { - pub async fn new_with_client(bucket_name: String, gcs_client: T) -> anyhow::Result { - let res = Self { - bucket_name, - latest_metadata: Arc::new(Mutex::new(FileMetadata::default())), - latest_metadata_timestamp: Arc::new(Mutex::new(None)), - gcs_client, - }; - res.refresh_metadata_if_needed() - .await - .context("Failed to refresh metadata")?; - Ok(res) - } - - async fn refresh_metadata_if_needed(&self) -> Result<(), StorageReadError> { - let now = std::time::Instant::now(); - { - let latest_metadata_timestamp = self.latest_metadata_timestamp.lock().unwrap(); - if let Some(timestamp) = *latest_metadata_timestamp { - if now.duration_since(timestamp).as_secs() < METADATA_FILE_MAX_STALENESS_IN_SECS { - // The metadata is fresh enough. - return Ok(()); - } - } - } - let metadata = FileMetadata::from( - self.gcs_client - .download_object( - &GetObjectRequest { - bucket: self.bucket_name.clone(), - object: METADATA_FILE_NAME.to_string(), - ..Default::default() - }, - &Range::default(), - ) - .await?, - ); - { - let mut latest_metadata = self.latest_metadata.lock().unwrap(); - *latest_metadata = metadata; - let mut latest_metadata_timestamp = self.latest_metadata_timestamp.lock().unwrap(); - *latest_metadata_timestamp = Some(now); - } - Ok(()) - } -} - -impl From for StorageReadError { - fn from(err: google_cloud_storage::http::Error) -> Self { - match err { - Error::HttpClient(e) => StorageReadError::TransientError( - GCS_STORAGE_NAME, - anyhow::Error::new(e).context("Failed to download object due to network issue."), - ), - Error::Response(e) => match e.is_retriable() { - true => StorageReadError::TransientError( - GCS_STORAGE_NAME, - anyhow::Error::new(e).context("Failed to download object; it's transient."), - ), - false => StorageReadError::PermenantError( - GCS_STORAGE_NAME, - anyhow::Error::new(e).context("Failed to download object; it's permanent."), - ), - }, - Error::TokenSource(e) => StorageReadError::PermenantError( - GCS_STORAGE_NAME, - anyhow::anyhow!(e.to_string()) - .context("Failed to download object; authentication/token error."), - ), - } - } -} - -#[async_trait::async_trait] -impl StorageTransactionRead for GcsInternalClient { - async fn get_transactions( - &self, - batch_starting_version: u64, - _size_hint: Option, - ) -> Result { - let file_name = get_transactions_file_name(batch_starting_version); - let result = self - .gcs_client - .download_object( - &GetObjectRequest { - bucket: self.bucket_name.clone(), - object: file_name.clone(), - ..Default::default() - }, - &Range::default(), - ) - .await; - let file = match result { - Err(Error::Response(e)) if e.code == 404 => { - return Ok(StorageReadStatus::NotAvailableYet) - }, - Err(e) => Err(e)?, - _ => result?, - }; - let transactions_file: TransactionsFile = TransactionsFile::from(file); - let all_transactions: Vec = transactions_file.into(); - let transactions = all_transactions - .into_iter() - .skip((batch_starting_version % 1000) as usize) - .collect(); - Ok(StorageReadStatus::Ok(transactions)) - } - - async fn get_metadata(&self) -> Result { - self.refresh_metadata_if_needed().await?; - let mut access_metadata = AccessMetadata::default(); - { - let latest_metadata = self.latest_metadata.lock().unwrap(); - access_metadata.chain_id = latest_metadata.chain_id; - access_metadata.next_version = latest_metadata.version; - } - Ok(access_metadata) - } -} - -#[async_trait::async_trait] -pub trait GcsClientTrait: Send + Sync + Clone { - async fn download_object( - &self, - request: &GetObjectRequest, - range: &Range, - ) -> Result, Error>; -} - -#[async_trait::async_trait] -impl GcsClientTrait for google_cloud_storage::client::Client { - async fn download_object( - &self, - request: &GetObjectRequest, - range: &Range, - ) -> Result, Error> { - self.download_object(request, range).await - } -} - -#[cfg(test)] -mod tests { - use super::*; - use aptos_protos::transaction::v1::Transaction; - use prost::Message; - use std::sync::atomic::{AtomicU64, Ordering}; - #[derive(Debug)] - pub(crate) struct MockGcsClient { - // Transactions to be returned. - pub resps: Vec>, - pub reqs: Vec, - pub index: AtomicU64, - } - impl Clone for MockGcsClient { - fn clone(&self) -> Self { - MockGcsClient { - resps: self.resps.clone(), - reqs: self.reqs.clone(), - index: AtomicU64::new(0), - } - } - } - - #[async_trait::async_trait] - impl GcsClientTrait for MockGcsClient { - async fn download_object( - &self, - request: &GetObjectRequest, - _range: &Range, - ) -> Result, Error> { - let index = self.index.fetch_add(1, Ordering::SeqCst) as usize; - assert_eq!(self.reqs[index].object, request.object); - assert_eq!(self.reqs[index].bucket, request.bucket); - Ok(self.resps[index].clone()) - } - } - #[tokio::test] - async fn test_get_transactions() { - let serialized_metadata = serde_json::to_vec(&FileMetadata { - chain_id: 1, - file_folder_size: 1000, - version: 1000, - }) - .unwrap(); - - let mut transactions = Vec::new(); - for i in 0..1000 { - let transaction = Transaction { - version: i, - ..Transaction::default() - }; - transactions.push(transaction); - } - - let serialized_transactions = serde_json::to_vec(&TransactionsFile { - starting_version: 0, - transactions: transactions - .iter() - .map(|x| { - let mut buf = Vec::new(); - x.encode(&mut buf).unwrap(); - base64::encode(buf) - }) - .collect::>(), - }) - .unwrap(); - - let mock_gcs_client = MockGcsClient { - resps: vec![serialized_metadata, serialized_transactions], - reqs: vec![ - GetObjectRequest { - object: METADATA_FILE_NAME.to_string(), - bucket: "test1".to_string(), - ..Default::default() - }, - GetObjectRequest { - object: "files/0.json".to_string(), - bucket: "test1".to_string(), - ..Default::default() - }, - ], - index: AtomicU64::new(0), - }; - let gcs_client = GcsInternalClient::new_with_client("test1".to_string(), mock_gcs_client) - .await - .unwrap(); - - let get_transactions_resp = gcs_client.get_transactions(0, None).await.unwrap(); - - assert_eq!(get_transactions_resp, StorageReadStatus::Ok(transactions)); - } - - #[tokio::test] - async fn test_get_transactions_with_partial() { - let serialized_metadata = serde_json::to_vec(&FileMetadata { - chain_id: 1, - file_folder_size: 1000, - version: 1000, - }) - .unwrap(); - - let mut transactions = Vec::new(); - for i in 0..1000 { - let transaction = Transaction { - version: i, - ..Transaction::default() - }; - transactions.push(transaction); - } - - let serialized_transactions = serde_json::to_vec(&TransactionsFile { - starting_version: 0, - transactions: transactions - .iter() - .map(|x| { - let mut buf = Vec::new(); - x.encode(&mut buf).unwrap(); - base64::encode(buf) - }) - .collect::>(), - }) - .unwrap(); - - let mock_gcs_client = MockGcsClient { - resps: vec![serialized_metadata, serialized_transactions], - reqs: vec![ - GetObjectRequest { - object: METADATA_FILE_NAME.to_string(), - bucket: "test2".to_string(), - ..Default::default() - }, - GetObjectRequest { - object: "files/0.json".to_string(), - bucket: "test2".to_string(), - ..Default::default() - }, - ], - index: AtomicU64::new(0), - }; - let gcs_client = GcsInternalClient::new_with_client("test2".to_string(), mock_gcs_client) - .await - .unwrap(); - - let get_transactions_resp = gcs_client.get_transactions(500, None).await.unwrap(); - assert_eq!( - get_transactions_resp, - StorageReadStatus::Ok( - transactions - .into_iter() - .skip(500) - .collect::>() - ) - ); - } - - #[tokio::test] - async fn test_get_metadata() { - let serialized_metadata = serde_json::to_vec(&FileMetadata { - chain_id: 42, - file_folder_size: 1000, - version: 1000, - }) - .unwrap(); - - let mock_gcs_client = MockGcsClient { - resps: vec![serialized_metadata], - reqs: vec![GetObjectRequest { - object: METADATA_FILE_NAME.to_string(), - bucket: "test3".to_string(), - ..Default::default() - }], - index: AtomicU64::new(0), - }; - let gcs_client = GcsInternalClient::new_with_client("test3".to_string(), mock_gcs_client) - .await - .unwrap(); - - let get_metadata_resp = gcs_client.get_metadata().await.unwrap(); - - assert_eq!(get_metadata_resp.chain_id, 42); - assert_eq!(get_metadata_resp.next_version, 1000); - } - // TODO: add tests for GCS operation failures. -} diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-access/src/in_memory.rs b/ecosystem/indexer-grpc/indexer-grpc-data-access/src/in_memory.rs deleted file mode 100644 index 4942890be5a9e..0000000000000 --- a/ecosystem/indexer-grpc/indexer-grpc-data-access/src/in_memory.rs +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use crate::{ - access_trait::{AccessMetadata, StorageReadError, StorageReadStatus, StorageTransactionRead}, - in_memory_storage::storage::{InMemoryStorageInternal, IN_MEMORY_STORAGE_SIZE_SOFT_LIMIT}, -}; -use anyhow::Context; -use aptos_protos::transaction::v1::Transaction; -use serde::{Deserialize, Serialize}; -use std::sync::Arc; - -const IN_MEMORY_STORAGE_NAME: &str = "In Memory"; -const IN_MEMORY_STORAGE_READ_SIZE: usize = 1000; - -#[derive(Clone, Debug, Default, Deserialize, Serialize)] -#[serde(deny_unknown_fields)] -pub struct InMemoryStorageClientConfig { - // The source of the transactions. - redis_address: String, -} - -#[derive(Clone)] -pub struct InMemoryStorageClient { - internal: Arc, -} - -impl InMemoryStorageClient { - // For each process, to avoid memory explosion, only create the client once and copy the reference - // to other threads. - pub async fn new(redis_address: String) -> anyhow::Result { - let internal = InMemoryStorageInternal::new(redis_address) - .await - .context("Internal storage initialization failed.")?; - Ok(Self { - internal: Arc::new(internal), - }) - } -} - -#[async_trait::async_trait] -impl StorageTransactionRead for InMemoryStorageClient { - async fn get_transactions( - &self, - batch_starting_version: u64, - _size_hint: Option, - ) -> Result { - let current_metadata = self.get_metadata().await?; - - let lowest_available_version = current_metadata - .next_version - .saturating_sub(IN_MEMORY_STORAGE_SIZE_SOFT_LIMIT as u64); - if batch_starting_version < lowest_available_version { - // The requested version is too low. - return Ok(StorageReadStatus::NotFound); - } - let highest_version = std::cmp::min( - current_metadata.next_version, - batch_starting_version + IN_MEMORY_STORAGE_READ_SIZE as u64, - ); - - let mut transaction_refs = Vec::new(); - for version in batch_starting_version..highest_version { - let read_result = self.internal.transactions_map.get(&version); - match read_result { - Some(transaction_ref) => { - let transaction = transaction_ref.clone(); - transaction_refs.push(transaction); - }, - None => break, - } - } - let transactions: Vec = transaction_refs - .into_iter() - .map(|transaction_ref| (*transaction_ref).clone()) - .collect(); - match transactions.len() { - 0 => Ok(StorageReadStatus::NotFound), - _ => Ok(StorageReadStatus::Ok(transactions)), - } - } - - async fn get_metadata(&self) -> Result { - match self.internal.metadata.read() { - Ok(metadata) => { - match *metadata { - Some(ref metadata) => Ok(metadata.clone()), - // Metadata is not ready yet; needs retry. - None => Err(StorageReadError::TransientError( - IN_MEMORY_STORAGE_NAME, - anyhow::anyhow!("No metadata".to_string()), - )), - } - }, - Err(err) => Err(StorageReadError::PermenantError( - IN_MEMORY_STORAGE_NAME, - anyhow::anyhow!("Failed to read metadata: {:#}", err), - )), - } - } -} diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-access/src/in_memory_storage/mod.rs b/ecosystem/indexer-grpc/indexer-grpc-data-access/src/in_memory_storage/mod.rs deleted file mode 100644 index fd76af3fa4890..0000000000000 --- a/ecosystem/indexer-grpc/indexer-grpc-data-access/src/in_memory_storage/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -pub mod storage; diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-access/src/in_memory_storage/storage.rs b/ecosystem/indexer-grpc/indexer-grpc-data-access/src/in_memory_storage/storage.rs deleted file mode 100644 index 8782e724bc4ba..0000000000000 --- a/ecosystem/indexer-grpc/indexer-grpc-data-access/src/in_memory_storage/storage.rs +++ /dev/null @@ -1,336 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use crate::{access_trait::AccessMetadata, REDIS_CHAIN_ID, REDIS_ENDING_VERSION_EXCLUSIVE_KEY}; -use anyhow::Context; -use aptos_protos::transaction::v1::Transaction; -use dashmap::DashMap; -use prost::Message; -use redis::AsyncCommands; -use std::{ - sync::{Arc, RwLock}, - time::Duration, -}; - -// Shared data between reads and writes. -type ThreadSafeAccessMetadata = Arc>>; -type ThreadSafeInMemoryStorageStatus = Arc>>; -// Note: Arc is to avoid copying the transaction when operating on the map. -type TransactionMap = Arc>>; - -// Capacity of the in-memory storage. -pub const IN_MEMORY_STORAGE_SIZE_SOFT_LIMIT: usize = 100_000; -// Capacity of the in-memory storage. -const IN_MEMORY_STORAGE_SIZE_HARD_LIMIT: usize = 120_000; -// Redis fetch task interval in milliseconds. -const REDIS_FETCH_TASK_INTERVAL_IN_MILLIS: u64 = 10; -// Redis fetch MGET batch size. -const REDIS_FETCH_MGET_BATCH_SIZE: usize = 1000; - -// InMemoryStorage is the in-memory storage for transactions. -pub struct InMemoryStorageInternal { - pub transactions_map: TransactionMap, - pub metadata: ThreadSafeAccessMetadata, - pub storage_status: ThreadSafeInMemoryStorageStatus, - _cancellation_token_drop_guard: tokio_util::sync::DropGuard, -} - -impl InMemoryStorageInternal { - async fn new_with_connection( - redis_connection: C, - transaction_map_size: Option, - ) -> anyhow::Result - where - C: redis::aio::ConnectionLike + Send + Sync + Clone + 'static, - { - let redis_connection = Arc::new(redis_connection); - let transactions_map = Arc::new(DashMap::new()); - let transactions_map_clone = transactions_map.clone(); - let metadata = Arc::new(RwLock::new(None)); - let metadata_clone = metadata.clone(); - let cancellation_token = tokio_util::sync::CancellationToken::new(); - let cancellation_token_clone = cancellation_token.clone(); - let storage_status = Arc::new(RwLock::new(Ok(()))); - let storage_status_clone = storage_status.clone(); - tokio::task::spawn(async move { - let result = redis_fetch_task( - redis_connection, - transactions_map_clone, - metadata_clone, - cancellation_token_clone, - transaction_map_size, - ) - .await; - let mut storage_status = storage_status_clone.write().unwrap(); - *storage_status = result; - }); - Ok(Self { - transactions_map, - metadata, - _cancellation_token_drop_guard: cancellation_token.drop_guard(), - storage_status, - }) - } - - pub async fn new(redis_address: String) -> anyhow::Result { - let redis_client = - redis::Client::open(redis_address).context("Failed to open Redis client.")?; - let redis_connection = redis_client - .get_tokio_connection_manager() - .await - .context("Failed to get Redis connection.")?; - Self::new_with_connection(redis_connection, None).await - } -} - -/// redis_fetch_task fetches the transactions from Redis and updates the in-memory storage. -/// It's expected to be run in a separate thread. -async fn redis_fetch_task( - redis_connection: Arc, - transactions_map: Arc>>, - metadata: ThreadSafeAccessMetadata, - cancellation_token: tokio_util::sync::CancellationToken, - transaction_map_size: Option, -) -> anyhow::Result<()> -where - C: redis::aio::ConnectionLike + Send + Sync + Clone + 'static, -{ - let current_connection = redis_connection.clone(); - loop { - tokio::select! { - _ = cancellation_token.cancelled() => { - return Ok(()); - }, - _ = tokio::time::sleep(Duration::from_millis(REDIS_FETCH_TASK_INTERVAL_IN_MILLIS)) => { - // Continue. - }, - } - let start_time = std::time::Instant::now(); - let mut conn = current_connection.as_ref().clone(); - let redis_chain_id: u64 = conn - .get(REDIS_CHAIN_ID) - .await - .context("Failed to get the redis id")?; - let redis_ending_version_exclusive: u64 = conn - .get(REDIS_ENDING_VERSION_EXCLUSIVE_KEY) - .await - .context("Failed to get the ending version")?; - // The new metadata to be updated. - let new_metadata = AccessMetadata { - chain_id: redis_chain_id, - next_version: redis_ending_version_exclusive, - }; - - let transactions_map_size_hard_limit = - transaction_map_size.unwrap_or(IN_MEMORY_STORAGE_SIZE_HARD_LIMIT); - // 1. Determine the fetch size based on old metadata. - let redis_fetch_size = match *metadata.read().unwrap() { - Some(ref current_metadata) => { - anyhow::ensure!( - current_metadata.chain_id == redis_chain_id, - "Chain ID mismatch." - ); - redis_ending_version_exclusive.saturating_sub(current_metadata.next_version) - as usize - }, - None => std::cmp::min( - transactions_map_size_hard_limit, - redis_ending_version_exclusive as usize, - ), - }; - // 2. Use MGET to fetch the transactions in batches. - let starting_version = redis_ending_version_exclusive - redis_fetch_size as u64; - let ending_version = redis_ending_version_exclusive; - // Order doesn't matter here; it'll be available in the map until metadata is updated. - let keys_batches: Vec> = (starting_version..ending_version) - .map(|version| version.to_string()) - .collect::>() - .chunks(REDIS_FETCH_MGET_BATCH_SIZE) - .map(|x| x.to_vec()) - .collect(); - for keys in keys_batches { - let redis_transactions: Vec = conn - .mget(keys) - .await - .context("Failed to MGET from redis.") - .expect("lskajdlfkjlaj"); - let transactions: Vec> = redis_transactions - .into_iter() - .map(|serialized_transaction| { - // TODO: leverage FROM to do conversion. - let serialized_transaction = base64::decode(serialized_transaction.as_bytes()) - .expect("Failed to decode base64."); - let transaction = Transaction::decode(serialized_transaction.as_slice()) - .expect("Failed to decode transaction protobuf from Redis."); - Arc::new(transaction) - }) - .collect(); - for transaction in transactions { - transactions_map.insert(transaction.version, transaction); - } - } - // 3. Update the metadata. - { - let mut current_metadata = metadata.write().unwrap(); - *current_metadata = Some(new_metadata.clone()); - } - if redis_fetch_size == 0 { - tracing::info!("Redis is not ready for current fetch. Wait."); - continue; - } - // Garbage collection. Note, this is *not a thread safe* operation; readers should - // return NOT_FOUND if the version is not found. - let current_size = transactions_map.len(); - let lowest_version = new_metadata.next_version - current_size as u64; - let count_of_transactions_to_remove = - current_size.saturating_sub(transactions_map_size_hard_limit); - (lowest_version..lowest_version + count_of_transactions_to_remove as u64).for_each( - |version| { - transactions_map.remove(&version); - }, - ); - tracing::info!( - redis_fetch_size = redis_fetch_size, - time_spent_in_seconds = start_time.elapsed().as_secs_f64(), - fetch_starting_version = new_metadata.next_version - redis_fetch_size as u64, - fetch_ending_version_inclusive = new_metadata.next_version - 1, - "Fetching transactions from Redis." - ); - } -} - -#[cfg(test)] -mod tests { - use super::*; - use redis_test::{MockCmd, MockRedisConnection}; - - fn generate_redis_value_bulk(starting_version: u64, size: usize) -> redis::Value { - redis::Value::Bulk( - (starting_version..starting_version + size as u64) - .map(|e| { - let txn = Transaction { - version: e, - ..Default::default() - }; - let mut txn_buf = Vec::new(); - txn.encode(&mut txn_buf).unwrap(); - let encoded = base64::encode(txn_buf); - redis::Value::Data(encoded.as_bytes().to_vec()) - }) - .collect(), - ) - } - // This test is to start the in-memory storage with a empty Redis. - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn test_redis_fetch_fresh() { - let mock_connection = MockRedisConnection::new(vec![ - MockCmd::new(redis::cmd("GET").arg(REDIS_CHAIN_ID), Ok(1)), - MockCmd::new( - redis::cmd("GET").arg(REDIS_ENDING_VERSION_EXCLUSIVE_KEY), - Ok(0), - ), - ]); - let in_memory_storage = InMemoryStorageInternal::new_with_connection(mock_connection, None) - .await - .unwrap(); - // Wait for the fetch task to finish. - tokio::time::sleep(std::time::Duration::from_millis( - REDIS_FETCH_TASK_INTERVAL_IN_MILLIS * 2, - )) - .await; - { - let metadata = in_memory_storage.metadata.read().unwrap(); - assert_eq!(metadata.as_ref().unwrap().chain_id, 1); - assert_eq!(metadata.as_ref().unwrap().next_version, 0); - } - } - - // This test is to start the in-memory storage with 1001 transactions in Redis. - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn test_redis_fetch() { - let first_batch = generate_redis_value_bulk(0, 1000); - let second_batch = generate_redis_value_bulk(1000, 1); - let keys = (0..1000) - .map(|version| version.to_string()) - .collect::>(); - let cmds = vec![ - MockCmd::new(redis::cmd("GET").arg(REDIS_CHAIN_ID), Ok(1)), - MockCmd::new( - redis::cmd("GET").arg(REDIS_ENDING_VERSION_EXCLUSIVE_KEY), - Ok(1001), - ), - MockCmd::new(redis::cmd("MGET").arg::>(keys), Ok(first_batch)), - MockCmd::new( - redis::cmd("MGET").arg::>(vec!["1000".to_string()]), - Ok(second_batch), - ), - ]; - let mock_connection = MockRedisConnection::new(cmds); - let in_memory_storage = InMemoryStorageInternal::new_with_connection(mock_connection, None) - .await - .unwrap(); - // Wait for the fetch task to finish. - tokio::time::sleep(std::time::Duration::from_millis( - REDIS_FETCH_TASK_INTERVAL_IN_MILLIS * 10, - )) - .await; - { - let metadata = in_memory_storage.metadata.read().unwrap(); - assert_eq!(metadata.as_ref().unwrap().chain_id, 1); - assert_eq!(metadata.as_ref().unwrap().next_version, 1001); - } - - assert_eq!(in_memory_storage.transactions_map.len(), 1001); - } - - // This test is to start the in-memory storage with 1000 transactions in Redis first - // and then 2000 transactions for the second batch. - // In-memory storage has size 500. - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn test_redis_fetch_with_eviction() { - let first_batch = generate_redis_value_bulk(0, 1000); - let second_batch = generate_redis_value_bulk(1000, 1000); - // Filling the empty in-memory storage. - let keys = (500..1000) - .map(|version| version.to_string()) - .collect::>(); - let second_keys = (1000..2000) - .map(|version| version.to_string()) - .collect::>(); - let cmds = vec![ - MockCmd::new(redis::cmd("GET").arg(REDIS_CHAIN_ID), Ok(1)), - MockCmd::new( - redis::cmd("GET").arg(REDIS_ENDING_VERSION_EXCLUSIVE_KEY), - Ok(1000), - ), - MockCmd::new(redis::cmd("MGET").arg::>(keys), Ok(first_batch)), - // Redis storage moves forward. - MockCmd::new(redis::cmd("GET").arg(REDIS_CHAIN_ID), Ok(1)), - MockCmd::new( - redis::cmd("GET").arg(REDIS_ENDING_VERSION_EXCLUSIVE_KEY), - Ok(2000), - ), - MockCmd::new( - redis::cmd("MGET").arg::>(second_keys), - Ok(second_batch), - ), - ]; - let mock_connection = MockRedisConnection::new(cmds); - let in_memory_storage = - InMemoryStorageInternal::new_with_connection(mock_connection, Some(500)) - .await - .unwrap(); - // Wait for the fetch task to finish. - tokio::time::sleep(std::time::Duration::from_millis( - REDIS_FETCH_TASK_INTERVAL_IN_MILLIS * 10, - )) - .await; - { - let metadata = in_memory_storage.metadata.read().unwrap(); - assert_eq!(metadata.as_ref().unwrap().chain_id, 1); - assert_eq!(metadata.as_ref().unwrap().next_version, 2000); - } - - assert_eq!(in_memory_storage.transactions_map.len(), 500); - } -} diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-access/src/lib.rs b/ecosystem/indexer-grpc/indexer-grpc-data-access/src/lib.rs deleted file mode 100644 index 83006e7b08030..0000000000000 --- a/ecosystem/indexer-grpc/indexer-grpc-data-access/src/lib.rs +++ /dev/null @@ -1,145 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use aptos_protos::transaction::v1::Transaction; -use prost::Message; -use serde::{Deserialize, Serialize}; - -pub mod access_trait; -pub mod gcs; -pub mod in_memory; -pub mod in_memory_storage; -pub mod local_file; -pub mod redis; - -use crate::access_trait::{ - AccessMetadata, StorageReadError, StorageReadStatus, StorageTransactionRead, -}; - -#[enum_dispatch::enum_dispatch] -#[derive(Clone)] -pub enum StorageClient { - InMemory(in_memory::InMemoryStorageClient), - Redis(redis::RedisClient), - GCS(gcs::GcsClient), - MockClient(MockStorageClient), -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -#[serde(tag = "storage_type")] -pub enum ReadOnlyStorageType { - InMemory(in_memory::InMemoryStorageClientConfig), - Redis(redis::RedisClientConfig), - GCS(gcs::GcsClientConfig), - LocalFile(local_file::LocalFileClientConfig), -} - -const REDIS_ENDING_VERSION_EXCLUSIVE_KEY: &str = "latest_version"; -const REDIS_CHAIN_ID: &str = "chain_id"; - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)] -struct FileMetadata { - pub chain_id: u64, - pub file_folder_size: u64, - pub version: u64, -} - -impl From> for FileMetadata { - fn from(bytes: Vec) -> Self { - serde_json::from_slice(bytes.as_slice()).expect("Failed to deserialize FileMetadata.") - } -} - -type Based64EncodedSerializedTransactionProtobuf = String; - -#[derive(Debug, Serialize, Deserialize)] -struct TransactionsFile { - pub transactions: Vec, - pub starting_version: u64, -} - -impl From> for TransactionsFile { - fn from(bytes: Vec) -> Self { - serde_json::from_slice(bytes.as_slice()).expect("Failed to deserialize Transactions file.") - } -} -impl From for Vec { - fn from(transactions_file: TransactionsFile) -> Self { - transactions_file - .transactions - .into_iter() - .map(|transaction| { - let bytes = base64::decode(transaction).expect("Failed to decode base64."); - Transaction::decode(bytes.as_slice()).expect("Failed to decode protobuf.") - }) - .collect() - } -} - -#[inline] -fn get_transactions_file_name(version: u64) -> String { - // This assumes that the transactions are stored in file of 1000 versions. - format!("files/{}.json", version / 1000 * 1000) -} - -pub struct MockStorageClient { - metadata: AccessMetadata, - transactions: Vec, -} - -impl MockStorageClient { - pub fn new(chain_id: u64, transactions: Vec) -> Self { - let next_version = transactions.last().unwrap().version + 1; - Self { - metadata: AccessMetadata { - chain_id, - next_version, - }, - transactions, - } - } -} - -impl Clone for MockStorageClient { - fn clone(&self) -> Self { - Self { - metadata: self.metadata.clone(), - transactions: self.transactions.clone(), - } - } -} - -#[async_trait::async_trait] -impl StorageTransactionRead for MockStorageClient { - async fn get_metadata(&self) -> Result { - Ok(self.metadata.clone()) - } - - async fn get_transactions( - &self, - start_version: u64, - _limit: Option, - ) -> Result { - let current_starting_version = self.transactions.first().unwrap().version; - if current_starting_version > start_version { - return Ok(StorageReadStatus::NotFound); - } - - let current_next_version = self.metadata.next_version; - if start_version >= current_next_version { - return Ok(StorageReadStatus::NotAvailableYet); - } - - return Ok(StorageReadStatus::Ok( - self.transactions - .iter() - .filter(|v| v.version >= start_version) - .cloned() - .collect(), - )); - } - - async fn is_storage_ready(&self) -> bool { - true - } -} diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-access/src/local_file.rs b/ecosystem/indexer-grpc/indexer-grpc-data-access/src/local_file.rs deleted file mode 100644 index 0083f080a261d..0000000000000 --- a/ecosystem/indexer-grpc/indexer-grpc-data-access/src/local_file.rs +++ /dev/null @@ -1,272 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use crate::{ - access_trait::{AccessMetadata, StorageReadError, StorageReadStatus, StorageTransactionRead}, - get_transactions_file_name, FileMetadata, TransactionsFile, -}; -use aptos_protos::transaction::v1::Transaction; -use serde::{Deserialize, Serialize}; -use std::path::PathBuf; - -const LOCAL_FILE_STORAGE_NAME: &str = "Local File"; - -#[derive(Clone, Debug, Default, Deserialize, Serialize)] -#[serde(deny_unknown_fields)] -pub struct LocalFileClientConfig { - // The absolute path to the folder that contains the transactions files. - path: String, -} - -#[derive(Clone)] -pub struct LocalFileClient { - pub file_path: PathBuf, -} - -impl LocalFileClient { - pub fn new(config: LocalFileClientConfig) -> anyhow::Result { - Ok(Self { - file_path: PathBuf::from(config.path), - }) - } -} - -impl From for StorageReadError { - fn from(err: std::io::Error) -> Self { - match err.kind() { - // Fetch an entry that is not set yet. - std::io::ErrorKind::NotFound => { - StorageReadError::PermenantError(LOCAL_FILE_STORAGE_NAME, anyhow::Error::new(err)) - }, - // Other errors are transient; let it retry. - _ => StorageReadError::TransientError(LOCAL_FILE_STORAGE_NAME, anyhow::Error::new(err)), - } - } -} - -#[async_trait::async_trait] -impl StorageTransactionRead for LocalFileClient { - async fn get_transactions( - &self, - batch_starting_version: u64, - _size_hint: Option, - ) -> Result { - let file_path = self - .file_path - .clone() - .join(get_transactions_file_name(batch_starting_version)); - let file = match tokio::fs::read(file_path.clone()).await { - Ok(file) => file, - Err(e) => { - match e.kind() { - std::io::ErrorKind::NotFound => { - // The file is not found. This is not an error. - return Ok(StorageReadStatus::NotFound); - }, - _ => { - return Err(StorageReadError::PermenantError( - LOCAL_FILE_STORAGE_NAME, - anyhow::anyhow!( - "Failed to find txns file '{}': {}", - file_path.display(), - e - ), - )); - }, - } - }, - }; - let transactions_file = TransactionsFile::from(file); - let all_transactions: Vec = transactions_file.into(); - let transactions = all_transactions - .into_iter() - .skip((batch_starting_version % 1000) as usize) - .collect::>(); - Ok(StorageReadStatus::Ok(transactions)) - } - - async fn get_metadata(&self) -> Result { - let file_path = self.file_path.clone().join("metadata.json"); - let metadata = FileMetadata::from(tokio::fs::read(file_path.clone()).await?); - Ok(AccessMetadata { - chain_id: metadata.chain_id, - next_version: metadata.version, - }) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use aptos_protos::transaction::v1::Transaction; - use prost::Message; - use std::{ - fs::{create_dir, File}, - io::Write, - }; - fn create_transactions(starting_version: u64) -> Vec { - (starting_version..starting_version + 1000) - .map(|version| Transaction { - version, - ..Default::default() - }) - .collect() - } - - fn create_transactions_file(starting_version: u64) -> TransactionsFile { - TransactionsFile { - transactions: create_transactions(starting_version) - .into_iter() - .map(|transaction| { - let mut buf = Vec::new(); - transaction.encode(&mut buf).unwrap(); - base64::encode(buf) - }) - .collect(), - starting_version, - } - } - #[tokio::test] - async fn test_local_file_read_full_batch_successful() { - // Create a temp file. - let dir = tempfile::tempdir().unwrap(); - let metadata_path = dir.path().join("metadata.json"); - create_dir(dir.path().join("files")).unwrap(); - let transactions_file_path = dir.path().join("files/0.json"); - // Write some data to the file. - { - let mut metadata_file = File::create(&metadata_path).unwrap(); - let file_metadata = FileMetadata { - chain_id: 1, - file_folder_size: 1000, - version: 1000, - }; - write!( - metadata_file, - "{}", - serde_json::to_string(&file_metadata).unwrap() - ) - .unwrap(); - let mut transactions_file = File::create(&transactions_file_path).unwrap(); - let transactions_file_obj = create_transactions_file(0); - write!( - transactions_file, - "{}", - serde_json::to_string(&transactions_file_obj).unwrap() - ) - .unwrap(); - } - - let local_file_client = LocalFileClient::new(LocalFileClientConfig { - path: dir.path().to_path_buf().to_str().unwrap().to_string(), - }) - .unwrap(); - let transactions = local_file_client.get_transactions(0, None).await.unwrap(); - let access_metadata = local_file_client.get_metadata().await.unwrap(); - assert_eq!(access_metadata.chain_id, 1); - assert_eq!(access_metadata.next_version, 1000); - assert_eq!(transactions, StorageReadStatus::Ok(create_transactions(0))); - } - - #[tokio::test] - async fn test_local_file_read_partial_batch_successful() { - // Create a temp file. - let dir = tempfile::tempdir().unwrap(); - let metadata_path = dir.path().join("metadata.json"); - create_dir(dir.path().join("files")).unwrap(); - let transactions_file_path = dir.path().join("files/0.json"); - // Write some data to the file. - { - let mut metadata_file = File::create(&metadata_path).unwrap(); - let file_metadata = FileMetadata { - chain_id: 1, - file_folder_size: 1000, - version: 1000, - }; - write!( - metadata_file, - "{}", - serde_json::to_string(&file_metadata).unwrap() - ) - .unwrap(); - let mut transactions_file = File::create(&transactions_file_path).unwrap(); - let transactions_file_obj = create_transactions_file(0); - write!( - transactions_file, - "{}", - serde_json::to_string(&transactions_file_obj).unwrap() - ) - .unwrap(); - } - - let local_file_client = LocalFileClient::new(LocalFileClientConfig { - path: dir.path().to_path_buf().to_str().unwrap().to_string(), - }) - .unwrap(); - let transactions = local_file_client.get_transactions(500, None).await.unwrap(); - let access_metadata = local_file_client.get_metadata().await.unwrap(); - assert_eq!(access_metadata.chain_id, 1); - assert_eq!(access_metadata.next_version, 1000); - let partial_transactions_file = (500..1000) - .map(|version| Transaction { - version, - ..Default::default() - }) - .collect::>(); - assert_eq!( - transactions, - StorageReadStatus::Ok(partial_transactions_file) - ); - } - - #[tokio::test] - async fn test_local_file_metadata_missing() { - // Create a temp file. - let dir = tempfile::tempdir().unwrap(); - let local_file_client = LocalFileClient::new(LocalFileClientConfig { - path: dir.path().to_path_buf().to_str().unwrap().to_string(), - }) - .unwrap(); - let access_metadata = local_file_client.get_metadata().await; - assert!(access_metadata.is_err()); - assert!(matches!( - access_metadata.unwrap_err(), - StorageReadError::PermenantError(LOCAL_FILE_STORAGE_NAME, _) - )); - } - - #[tokio::test] - async fn test_local_file_transactions_file_not_found() { - // Create a temp file. - let dir = tempfile::tempdir().unwrap(); - let metadata_path = dir.path().join("metadata.json"); - // Write some data to the file. - { - let mut metadata_file = File::create(&metadata_path).unwrap(); - let file_metadata = FileMetadata { - chain_id: 1, - file_folder_size: 1000, - // No transactions yet. - version: 0, - }; - write!( - metadata_file, - "{}", - serde_json::to_string(&file_metadata).unwrap() - ) - .unwrap(); - } - - let local_file_client = LocalFileClient::new(LocalFileClientConfig { - path: dir.path().to_path_buf().to_str().unwrap().to_string(), - }) - .unwrap(); - let transactions = local_file_client.get_transactions(0, None).await; - let access_metadata = local_file_client.get_metadata().await.unwrap(); - - assert_eq!(access_metadata.chain_id, 1); - assert_eq!(access_metadata.next_version, 0); - assert!(transactions.is_ok()); - assert!(transactions.unwrap() == StorageReadStatus::NotFound); - } -} diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-access/src/redis.rs b/ecosystem/indexer-grpc/indexer-grpc-data-access/src/redis.rs deleted file mode 100644 index cd6b73f48199a..0000000000000 --- a/ecosystem/indexer-grpc/indexer-grpc-data-access/src/redis.rs +++ /dev/null @@ -1,217 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use crate::{ - access_trait::{AccessMetadata, StorageReadError, StorageReadStatus, StorageTransactionRead}, - REDIS_CHAIN_ID, REDIS_ENDING_VERSION_EXCLUSIVE_KEY, -}; -use anyhow::Context; -use aptos_protos::transaction::v1::Transaction; -use prost::Message; -use redis::{aio::ConnectionLike, AsyncCommands, ErrorKind}; -use serde::{Deserialize, Serialize}; - -const REDIS_STORAGE_NAME: &str = "Redis"; -const DEFAULT_REDIS_MGET_BATCH_SIZE: usize = 1000; - -#[derive(Clone, Debug, Default, Deserialize, Serialize)] -#[serde(deny_unknown_fields)] -pub struct RedisClientConfig { - // The source of the transactions. - redis_address: String, -} - -pub type RedisClient = RedisClientInternal; - -impl RedisClient { - pub async fn new(config: RedisClientConfig) -> anyhow::Result { - let redis_client = - redis::Client::open(config.redis_address).context("Failed to create Redis client.")?; - let redis_connection = redis_client - .get_tokio_connection_manager() - .await - .context("Failed to create Redis connection.")?; - Ok(Self::new_with_connection(redis_connection)) - } -} - -#[derive(Clone)] -pub struct RedisClientInternal { - // Redis Connection. - pub redis_connection: C, -} - -impl RedisClientInternal { - pub fn new_with_connection(redis_connection: C) -> Self { - Self { redis_connection } - } -} - -impl From for StorageReadError { - fn from(err: redis::RedisError) -> Self { - match err.kind() { - // Fetch an entry that is not set yet. - ErrorKind::TypeError => { - StorageReadError::PermenantError(REDIS_STORAGE_NAME, anyhow::Error::new(err)) - }, - // Other errors are transient; let it retry. - _ => StorageReadError::TransientError(REDIS_STORAGE_NAME, anyhow::Error::new(err)), - } - } -} - -#[async_trait::async_trait] -impl StorageTransactionRead for RedisClientInternal { - async fn get_transactions( - &self, - batch_starting_version: u64, - size_hint: Option, - ) -> Result { - // Check the latest version of the cache. - let mut conn = self.redis_connection.clone(); - let redis_ending_version_exclusive: u64 = - conn.get(REDIS_ENDING_VERSION_EXCLUSIVE_KEY).await?; - if batch_starting_version >= redis_ending_version_exclusive { - return Ok(StorageReadStatus::NotAvailableYet); - } - - let fetch_size = match size_hint { - Some(size) => size, - None => DEFAULT_REDIS_MGET_BATCH_SIZE, - }; - let batch_ending_version_exclusive = std::cmp::min( - batch_starting_version + fetch_size as u64, - redis_ending_version_exclusive, - ); - // Use MGET to fetch the transactions in batches. - let keys: Vec = (batch_starting_version..batch_ending_version_exclusive).collect(); - let result = conn.mget::, Vec>(keys).await; - match result { - Ok(serialized_transactions) => Ok(StorageReadStatus::Ok( - serialized_transactions - .into_iter() - .map(|serialized_transaction| { - Transaction::decode(serialized_transaction.as_bytes()) - .expect("Decode transaction failed.") - }) - .collect(), - )), - Err(err) => { - match err.kind() { - // If entries are evicted from the cache, Redis returns NIL, which is not String type. - // We treat this as NotFound. - ErrorKind::TypeError => Ok(StorageReadStatus::NotFound), - // Other errors are transient; let it retry. - _ => Err(StorageReadError::TransientError( - REDIS_STORAGE_NAME, - anyhow::Error::new(err), - )), - } - }, - } - } - - async fn get_metadata(&self) -> Result { - let mut conn = self.redis_connection.clone(); - let chain_id = conn.get(REDIS_CHAIN_ID).await?; - let next_version = conn.get(REDIS_ENDING_VERSION_EXCLUSIVE_KEY).await?; - Ok(AccessMetadata { - chain_id, - next_version, - }) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use redis_test::{MockCmd, MockRedisConnection}; - - #[tokio::test] - async fn test_redis_metadata_fetch_success() { - let mock_connection = MockRedisConnection::new(vec![ - MockCmd::new(redis::cmd("GET").arg(REDIS_CHAIN_ID), Ok(1)), - MockCmd::new( - redis::cmd("GET").arg(REDIS_ENDING_VERSION_EXCLUSIVE_KEY), - Ok(1000), - ), - ]); - let redis_client = RedisClientInternal::new_with_connection(mock_connection.clone()); - let metadata = redis_client.get_metadata().await.unwrap(); - assert_eq!(metadata.chain_id, 1); - assert_eq!(metadata.next_version, 1000); - } - - #[tokio::test] - async fn test_redis_metadata_fetch_failure() { - let mock_connection = MockRedisConnection::new(vec![MockCmd::new( - redis::cmd("GET").arg(REDIS_CHAIN_ID), - Ok(redis::Value::Nil), - )]); - let redis_client = RedisClientInternal::new_with_connection(mock_connection.clone()); - let metadata = redis_client.get_metadata().await; - assert!(metadata.is_err()); - assert!(matches!( - metadata.unwrap_err(), - StorageReadError::PermenantError(REDIS_STORAGE_NAME, _) - )); - } - - #[tokio::test] - async fn test_redis_transactions_fetch_success() { - let transaction = Transaction { - version: 42, - ..Transaction::default() - }; - let values = redis::Value::Bulk(vec![redis::Value::Data(transaction.encode_to_vec())]); - let mock_connection = MockRedisConnection::new(vec![ - MockCmd::new( - redis::cmd("GET").arg(REDIS_ENDING_VERSION_EXCLUSIVE_KEY), - Ok(43), - ), - MockCmd::new(redis::cmd("MGET").arg(42), Ok(values)), - ]); - let redis_client = RedisClientInternal::new_with_connection(mock_connection.clone()); - let transactions = redis_client.get_transactions(42, Some(1)).await; - assert!(transactions.is_ok()); - let transactions = transactions.unwrap(); - assert_eq!(transactions, StorageReadStatus::Ok(vec![transaction])); - } - - #[tokio::test] - async fn test_redis_transactions_fetch_data_not_ready_yet() { - let mock_connection = MockRedisConnection::new(vec![MockCmd::new( - redis::cmd("GET").arg(REDIS_ENDING_VERSION_EXCLUSIVE_KEY), - Ok(30), - )]); - let redis_client = RedisClientInternal::new_with_connection(mock_connection.clone()); - let transactions = redis_client.get_transactions(42, Some(1)).await; - assert!(transactions.is_ok()); - let transactions = transactions.unwrap(); - assert_eq!(transactions, StorageReadStatus::NotAvailableYet); - } - - #[tokio::test] - async fn test_redis_transactions_fetch_data_not_found() { - let transaction = Transaction { - version: 42, - ..Transaction::default() - }; - let values = redis::Value::Bulk(vec![ - redis::Value::Nil, - redis::Value::Data(transaction.encode_to_vec()), - ]); - let mock_connection = MockRedisConnection::new(vec![ - MockCmd::new( - redis::cmd("GET").arg(REDIS_ENDING_VERSION_EXCLUSIVE_KEY), - Ok(43), - ), - MockCmd::new(redis::cmd("MGET").arg(41).arg(42), Ok(values)), - ]); - let redis_client = RedisClientInternal::new_with_connection(mock_connection.clone()); - let transactions = redis_client.get_transactions(41, Some(2)).await; - assert!(transactions.is_ok()); - let transactions = transactions.unwrap(); - assert_eq!(transactions, StorageReadStatus::NotFound); - } -} diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/Cargo.toml b/ecosystem/indexer-grpc/indexer-grpc-data-service/Cargo.toml index be15417d88727..c378e10000297 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/Cargo.toml +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/Cargo.toml @@ -14,11 +14,8 @@ rust-version = { workspace = true } [dependencies] anyhow = { workspace = true } -aptos-indexer-grpc-data-access = { workspace = true } aptos-indexer-grpc-server-framework = { workspace = true } aptos-indexer-grpc-utils = { workspace = true } -# We introduce this only for sampling purpose. -aptos-logger = { workspace = true } aptos-metrics-core = { workspace = true } aptos-moving-average = { workspace = true } aptos-protos = { workspace = true } diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/grpc_response_stream.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/grpc_response_stream.rs deleted file mode 100644 index 15a8db84de79e..0000000000000 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/grpc_response_stream.rs +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use crate::response_dispatcher::{GrpcResponseDispatcher, ResponseDispatcher}; -use aptos_indexer_grpc_data_access::StorageClient; -use aptos_protos::indexer::v1::TransactionsResponse; -use futures::Stream; -use tokio::sync::mpsc::channel; -use tonic::Status; - -/// GrpcResponseStream is a struct that provides a stream of responses to the gRPC server. -/// The response stream is backed by a channel that is filled by GrpcResponseGenerator in another thread. -/// TODO: Add generic support for other types of responses or server-side transformations. -pub struct GrpcResponseStream { - /// The channel for receiving responses from upstream clients. - inner: tokio_stream::wrappers::ReceiverStream>, -} - -impl GrpcResponseStream { - #[allow(dead_code)] - pub fn new( - starting_version: u64, - transaction_count: Option, - buffer_size: Option, - storages: &[StorageClient], - ) -> anyhow::Result { - let (channel_sender, channel_receiver) = channel(buffer_size.unwrap_or(12)); - let response_stream = Self { - inner: tokio_stream::wrappers::ReceiverStream::new(channel_receiver), - }; - let storages = storages.to_vec(); - // Start a separate thread to generate the response for the stream. - tokio::spawn(async move { - let mut response_dispatcher = GrpcResponseDispatcher::new( - starting_version, - transaction_count, - channel_sender, - storages.as_slice(), - ); - match response_dispatcher.run().await { - Ok(_) => { - tracing::info!("Response dispatcher finished successfully."); - }, - Err(e) => { - tracing::error!("Response dispatcher failed: {}", e); - }, - } - }); - Ok(response_stream) - } -} - -impl Stream for GrpcResponseStream { - type Item = Result; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let this = self.get_mut(); - std::pin::Pin::new(&mut this.inner).poll_next(cx) - } -} diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/lib.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/lib.rs index cd268d1367a10..566941502a239 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/lib.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/lib.rs @@ -2,9 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 mod config; -mod grpc_response_stream; mod metrics; -mod response_dispatcher; mod service; pub use config::{IndexerGrpcDataServiceConfig, NonTlsConfig, SERVER_NAME}; diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/response_dispatcher/grpc_response_dispatcher.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/response_dispatcher/grpc_response_dispatcher.rs deleted file mode 100644 index 63296d82ae594..0000000000000 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/response_dispatcher/grpc_response_dispatcher.rs +++ /dev/null @@ -1,405 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use crate::response_dispatcher::ResponseDispatcher; -use aptos_indexer_grpc_data_access::{ - access_trait::{StorageReadError, StorageReadStatus, StorageTransactionRead}, - StorageClient, -}; -use aptos_indexer_grpc_utils::{chunk_transactions, constants::MESSAGE_SIZE_LIMIT}; -use aptos_logger::prelude::{sample, SampleRate}; -use aptos_protos::indexer::v1::TransactionsResponse; -use std::time::Duration; -use tokio::sync::mpsc::Sender; -use tonic::Status; - -// The server will retry to send the response to the client and give up after RESPONSE_CHANNEL_SEND_TIMEOUT. -// This is to prevent the server from being occupied by a slow client. -const RESPONSE_CHANNEL_SEND_TIMEOUT: Duration = Duration::from_secs(120); -// Number of retries for fetching responses from upstream. -const FETCH_RETRY_COUNT: usize = 100; -const RETRY_BACKOFF_IN_MS: u64 = 500; -const NOT_AVAILABLE_RETRY_BACKOFF_IN_MS: u64 = 10; -const WAIT_TIME_BEFORE_CLOUSING_IN_MS: u64 = 60_000; -const RESPONSE_DISPATCH_NAME: &str = "GrpcResponseDispatcher"; - -pub struct GrpcResponseDispatcher { - next_version_to_process: u64, - transaction_count: Option, - sender: Sender>, - storages: Vec, - sender_capacity: usize, -} - -impl GrpcResponseDispatcher { - // Fetches the next batch of responses from storage. - // This is a stateless function that only fetches from storage based on current state. - async fn fetch_from_storages(&self) -> Result, StorageReadError> { - if let Some(transaction_count) = self.transaction_count { - if transaction_count == 0 { - return Ok(vec![]); - } - } - // Loop to wait for the next storage to be available. - let mut previous_storage_not_found = false; - loop { - if self.sender.is_closed() { - return Err(StorageReadError::PermenantError( - RESPONSE_DISPATCH_NAME, - anyhow::anyhow!("Sender is closed."), - )); - } - for storage in self.storages.as_slice() { - let metadata = storage.get_metadata().await?; - match storage - .get_transactions(self.next_version_to_process, None) - .await - { - Ok(StorageReadStatus::Ok(transactions)) => { - let responses = chunk_transactions(transactions, MESSAGE_SIZE_LIMIT); - return Ok(responses - .into_iter() - .map(|transactions| TransactionsResponse { - transactions, - chain_id: Some(metadata.chain_id), - }) - .collect()); - }, - Ok(StorageReadStatus::NotAvailableYet) => { - // This is fatal; it means previous storage evicts the data before the current storage has it. - if previous_storage_not_found { - return Err(StorageReadError::PermenantError( - RESPONSE_DISPATCH_NAME, - anyhow::anyhow!("Gap detected between storages."), - )); - } - // If the storage is not available yet, retry the storages. - tokio::time::sleep(Duration::from_millis( - NOT_AVAILABLE_RETRY_BACKOFF_IN_MS, - )) - .await; - break; - }, - Ok(StorageReadStatus::NotFound) => { - // Continue to the next storage. - previous_storage_not_found = true; - continue; - }, - Err(e) => { - return Err(e); - }, - } - } - - if previous_storage_not_found { - return Err(StorageReadError::PermenantError( - RESPONSE_DISPATCH_NAME, - anyhow::anyhow!("Gap detected between storages."), - )); - } - } - } - - // Based on the response from fetch_from_storages, verify and dispatch the response, and update the state. - async fn fetch_internal(&mut self) -> Result, StorageReadError> { - // TODO: add retry to TransientError. - let responses = self.fetch_from_storages().await?; - // Verify no empty response. - if responses.iter().any(|v| v.transactions.is_empty()) { - return Err(StorageReadError::TransientError( - RESPONSE_DISPATCH_NAME, - anyhow::anyhow!("Empty responses from storages."), - )); - } - - // Verify responses are consecutive and sequential. - let mut version = self.next_version_to_process; - for response in responses.iter() { - for transaction in response.transactions.iter() { - if transaction.version != version { - return Err(StorageReadError::TransientError( - RESPONSE_DISPATCH_NAME, - anyhow::anyhow!("Version mismatch in response."), - )); - } - // move to the next version. - version += 1; - } - } - let mut processed_responses = vec![]; - if let Some(transaction_count) = self.transaction_count { - // If transactions_count is specified, truncate if necessary. - let mut current_transaction_count = 0; - for response in responses.into_iter() { - if current_transaction_count == transaction_count { - break; - } - let current_response_size = response.transactions.len() as u64; - if current_transaction_count + current_response_size > transaction_count { - let remaining_transaction_count = transaction_count - current_transaction_count; - let truncated_transactions = response - .transactions - .into_iter() - .take(remaining_transaction_count as usize) - .collect(); - processed_responses.push(TransactionsResponse { - transactions: truncated_transactions, - chain_id: response.chain_id, - }); - current_transaction_count += remaining_transaction_count; - } else { - processed_responses.push(response); - current_transaction_count += current_response_size; - } - } - self.transaction_count = Some(transaction_count - current_transaction_count); - } else { - // If not, continue to fetch. - processed_responses = responses; - } - let processed_transactions_count = processed_responses - .iter() - .map(|v| v.transactions.len()) - .sum::() as u64; - self.next_version_to_process += processed_transactions_count; - Ok(processed_responses) - } -} - -#[async_trait::async_trait] -impl ResponseDispatcher for GrpcResponseDispatcher { - fn new( - starting_version: u64, - transaction_count: Option, - sender: Sender>, - storages: &[StorageClient], - ) -> Self { - let sender_capacity = sender.capacity(); - Self { - next_version_to_process: starting_version, - transaction_count, - sender, - sender_capacity, - storages: storages.to_vec(), - } - } - - async fn run(&mut self) -> anyhow::Result<()> { - loop { - match self.fetch_with_retries().await { - Ok(responses) => { - if responses.is_empty() { - break; - } - for response in responses { - self.dispatch(Ok(response)).await?; - } - }, - Err(status) => { - self.dispatch(Err(status)).await?; - anyhow::bail!("Failed to fetch transactions from storages."); - }, - } - } - if self.transaction_count.is_some() { - let start_time = std::time::Instant::now(); - loop { - if start_time.elapsed().as_millis() > WAIT_TIME_BEFORE_CLOUSING_IN_MS as u128 { - break; - } - if self.sender.capacity() == self.sender_capacity { - break; - } - tokio::time::sleep(Duration::from_millis(1000)).await; - } - } - Ok(()) - } - - async fn fetch_with_retries(&mut self) -> anyhow::Result, Status> { - for _ in 0..FETCH_RETRY_COUNT { - match self.fetch_internal().await { - Ok(responses) => { - return Ok(responses); - }, - Err(StorageReadError::TransientError(s, _e)) => { - tracing::warn!("Failed to fetch transactions from storage: {:#}", s); - tokio::time::sleep(Duration::from_millis(RETRY_BACKOFF_IN_MS)).await; - continue; - }, - Err(StorageReadError::PermenantError(s, _e)) => Err(Status::internal(format!( - "Failed to fetch transactions from storages, {:}", - s - )))?, - } - } - Err(Status::internal( - "Failed to fetch transactions from storages.", - )) - } - - async fn dispatch( - &mut self, - response: Result, - ) -> anyhow::Result<()> { - let start_time = std::time::Instant::now(); - match self - .sender - .send_timeout(response, RESPONSE_CHANNEL_SEND_TIMEOUT) - .await - { - Ok(_) => {}, - Err(e) => { - tracing::warn!("Failed to send response to downstream: {:#}", e); - return Err(anyhow::anyhow!("Failed to send response to downstream.")); - }, - }; - sample!( - SampleRate::Duration(Duration::from_secs(60)), - tracing::info!( - "[GrpcResponseDispatch] response waiting time in seconds: {}", - start_time.elapsed().as_secs_f64() - ); - ); - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use aptos_indexer_grpc_data_access::MockStorageClient; - use aptos_protos::transaction::v1::Transaction; - fn create_transactions(starting_version: u64, size: usize) -> Vec { - let mut transactions = vec![]; - for i in 0..size { - transactions.push(Transaction { - version: starting_version + i as u64, - ..Default::default() - }); - } - transactions - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn test_finite_stream() { - let (sender, mut receiver) = tokio::sync::mpsc::channel(100); - tokio::spawn(async move { - let first_storage_transactions = create_transactions(20, 100); - let second_storage_transactions = create_transactions(10, 20); - let third_storage_transactions = create_transactions(0, 15); - let storages = vec![ - StorageClient::MockClient(MockStorageClient::new(1, first_storage_transactions)), - StorageClient::MockClient(MockStorageClient::new(2, second_storage_transactions)), - StorageClient::MockClient(MockStorageClient::new(3, third_storage_transactions)), - ]; - let mut dispatcher = - GrpcResponseDispatcher::new(0, Some(40), sender, storages.as_slice()); - let run_result = dispatcher.run().await; - assert!(run_result.is_ok()); - }); - - let mut transactions = vec![]; - while let Some(response) = receiver.recv().await { - for transaction in response.unwrap().transactions { - transactions.push(transaction); - } - } - assert_eq!(transactions.len(), 40); - for (current_version, t) in transactions.into_iter().enumerate() { - assert_eq!(t.version, current_version as u64); - } - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn test_storages_gap() { - let (sender, mut receiver) = tokio::sync::mpsc::channel(100); - tokio::spawn(async move { - let first_storage_transactions = create_transactions(30, 100); - let second_storage_transactions = create_transactions(10, 10); - let storages = vec![ - StorageClient::MockClient(MockStorageClient::new(1, first_storage_transactions)), - StorageClient::MockClient(MockStorageClient::new(2, second_storage_transactions)), - ]; - let mut dispatcher = - GrpcResponseDispatcher::new(15, Some(30), sender, storages.as_slice()); - let run_result = dispatcher.run().await; - assert!(run_result.is_err()); - }); - - let first_response = receiver.recv().await.unwrap(); - assert!(first_response.is_ok()); - let transactions_response = first_response.unwrap(); - assert!(transactions_response.transactions.len() == 5); - let second_response = receiver.recv().await.unwrap(); - // Gap is detected. - assert!(second_response.is_err()); - } - - // This test is to make sure dispatch doesn't leak memory. - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn test_infinite_stream_with_client_closure() { - let (sender, mut receiver) = tokio::sync::mpsc::channel(100); - let task_result = tokio::spawn(async move { - let first_storage_transactions = create_transactions(20, 20); - let second_storage_transactions = create_transactions(10, 30); - let third_storage_transactions = create_transactions(0, 15); - let storages = vec![ - StorageClient::MockClient(MockStorageClient::new(1, first_storage_transactions)), - StorageClient::MockClient(MockStorageClient::new(2, second_storage_transactions)), - StorageClient::MockClient(MockStorageClient::new(3, third_storage_transactions)), - ]; - let mut dispatcher = GrpcResponseDispatcher::new(0, None, sender, storages.as_slice()); - dispatcher.run().await - }); - // Let the dispatcher run for 1 second. - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - let first_peek = receiver.try_recv(); - // transactions 0 - 15 - assert!(first_peek.is_ok()); - let first_response = first_peek.unwrap(); - assert!(first_response.is_ok()); - let transactions_response = first_response.unwrap(); - assert!(transactions_response.transactions.len() == 15); - let second_peek = receiver.try_recv(); - // transactions 15 - 40 - assert!(second_peek.is_ok()); - let second_response = second_peek.unwrap(); - assert!(second_response.is_ok()); - let transactions_response = second_response.unwrap(); - assert!(transactions_response.transactions.len() == 25); - let third_peek = receiver.try_recv(); - match third_peek { - Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {}, - _ => unreachable!("This is not possible."), - } - // Drop the receiver to close the channel. - drop(receiver); - let task_result = task_result.await; - - // The task should finish successfully. - assert!(task_result.is_ok()); - let task_result = task_result.unwrap(); - // The dispatcher thread should exit with error. - assert!(task_result.is_err()); - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn test_not_found_in_all_storages() { - let (sender, mut receiver) = tokio::sync::mpsc::channel(100); - tokio::spawn(async move { - let first_storage_transactions = create_transactions(20, 100); - let storages = vec![StorageClient::MockClient(MockStorageClient::new( - 1, - first_storage_transactions, - ))]; - let mut dispatcher = - GrpcResponseDispatcher::new(0, Some(40), sender, storages.as_slice()); - let run_result = dispatcher.run().await; - assert!(run_result.is_err()); - }); - - let first_response = receiver.recv().await.unwrap(); - assert!(first_response.is_err()); - } -} diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/response_dispatcher/mod.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/response_dispatcher/mod.rs deleted file mode 100644 index 92b6f8247810c..0000000000000 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/response_dispatcher/mod.rs +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use aptos_indexer_grpc_data_access::StorageClient; -use aptos_protos::indexer::v1::TransactionsResponse; -use tokio::sync::mpsc::Sender; -use tonic::Status; - -pub mod grpc_response_dispatcher; -pub use grpc_response_dispatcher::*; - -/// ResponseDispatcher is a trait that defines the interface for dispatching responses into channel via provided sender. -#[async_trait::async_trait] -pub trait ResponseDispatcher { - fn new( - starting_version: u64, - transaction_count: Option, - sender: Sender>, - // Dispatcher is expected to fetch responses from these storages in order; - // if it fails to fetch from the first storage, it will try the second one, etc. - // StorageClient is expected to be *cheap to clone*. - storage_clients: &[StorageClient], - ) -> Self; - // Dispatch a single response to the channel. - async fn dispatch( - &mut self, - response: Result, - ) -> anyhow::Result<()>; - - // Fetch responses that need to be dispatched. TransactionsResponse might get chunked into multiple responses. - async fn fetch_with_retries(&mut self) -> anyhow::Result, Status>; - - // Run the dispatcher in a loop: fetch -> dispatch. - async fn run(&mut self) -> anyhow::Result<()>; -} diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/in_memory_cache.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/in_memory_cache.rs index 780d76eca8680..e3db9bd822b82 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-utils/src/in_memory_cache.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/src/in_memory_cache.rs @@ -178,7 +178,6 @@ fn spawn_update_task( { tokio::spawn(async move { let mut conn = conn.clone(); - let mut current_time = std::time::Instant::now(); loop { if cancellation_token.is_cancelled() { tracing::info!("In-memory cache update task is cancelled."); @@ -198,8 +197,6 @@ fn spawn_update_task( .await; continue; } - let redis_waiting_duration = current_time.elapsed().as_secs_f64(); - let start_time = std::time::Instant::now(); let end_version = std::cmp::min( current_latest_version, in_cache_latest_version + 10 * MAX_REDIS_FETCH_BATCH_SIZE as u64, @@ -209,7 +206,6 @@ fn spawn_update_task( .await .unwrap(); // Ensure that transactions are ordered by version. - let cache_processing_start_time = std::time::Instant::now(); let mut newly_added_bytes = 0; for (ind, transaction) in transactions.iter().enumerate() { if transaction.version != in_cache_latest_version + ind as u64 { @@ -220,16 +216,6 @@ fn spawn_update_task( for transaction in transactions { cache.insert(transaction.version, Arc::new(transaction)); } - let processing_duration = start_time.elapsed().as_secs_f64(); - tracing::info!( - redis_latest_version = current_latest_version, - in_memory_latest_version = in_cache_latest_version, - new_in_memory_latest_version = end_version, - processing_duration, - cache_processing_duration = cache_processing_start_time.elapsed().as_secs_f64(), - redis_waiting_duration, - "In-memory cache is updated" - ); let mut current_cache_metadata = { *cache_metadata.read().await }; current_cache_metadata.latest_version = end_version; current_cache_metadata.total_size_in_bytes += newly_added_bytes; @@ -237,7 +223,6 @@ fn spawn_update_task( { *cache_metadata.write().await = current_cache_metadata; } - current_time = std::time::Instant::now(); } }); }