From 9c893ac19ba825ad63ee5b1154fa8fd080a48742 Mon Sep 17 00:00:00 2001 From: Daniel Porteous Date: Tue, 19 Sep 2023 13:53:45 +0100 Subject: [PATCH] [indexer-grpc] Overhaul indexer configuration, error handling --- .gitignore | 3 +- Cargo.lock | 2 + aptos-node/src/lib.rs | 134 ++++++----- aptos-node/src/tests.rs | 16 +- config/src/config/indexer_grpc_config.rs | 67 +++--- crates/aptos/src/node/mod.rs | 1 + .../indexer-grpc-cache-worker/Cargo.toml | 1 + .../indexer-grpc-cache-worker/src/lib.rs | 16 +- .../indexer-grpc-cache-worker/src/worker.rs | 93 +++++--- .../indexer-grpc-data-service/src/config.rs | 222 ++++++++++++++++++ .../indexer-grpc-data-service/src/lib.rs | 3 + .../indexer-grpc-data-service/src/main.rs | 176 +------------- .../indexer-grpc-data-service/src/service.rs | 30 ++- .../indexer-grpc-file-store/src/lib.rs | 16 +- .../indexer-grpc-file-store/src/processor.rs | 100 ++++---- .../indexer-grpc-fullnode/src/runtime.rs | 8 +- .../indexer-grpc-server-framework/src/lib.rs | 17 +- .../indexer-grpc-utils/Cargo.toml | 1 + .../indexer-grpc-utils/src/cache_operator.rs | 16 +- .../indexer-grpc-utils/src/lib.rs | 10 +- .../indexer-grpc-utils/src/types.rs | 62 +++++ 21 files changed, 587 insertions(+), 407 deletions(-) create mode 100644 ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs create mode 100644 ecosystem/indexer-grpc/indexer-grpc-utils/src/types.rs diff --git a/.gitignore b/.gitignore index 7bf7390cc7951b..d17443bf41bc0d 100644 --- a/.gitignore +++ b/.gitignore @@ -94,8 +94,9 @@ docker/compose/indexer-grpc/data-service-grpc-server.key .*\# \#*\# -# Aptos CLI files +# Aptos CLI / local testnet files .aptos +**/*.rdb # VSCode settings .vscode/ diff --git a/Cargo.lock b/Cargo.lock index 111b8dbce5ef68..e302c7673f83b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1896,6 +1896,7 @@ dependencies = [ "tokio", "tonic 0.10.0", "tracing", + "url", ] [[package]] @@ -2127,6 +2128,7 @@ dependencies = [ "tonic 0.10.0", "tracing", "tracing-subscriber", + "url", "warp", ] diff --git a/aptos-node/src/lib.rs b/aptos-node/src/lib.rs index 7b7960f3e8d73d..3447f1a79755b9 100644 --- a/aptos-node/src/lib.rs +++ b/aptos-node/src/lib.rs @@ -29,8 +29,8 @@ use hex::{FromHex, FromHexError}; use rand::{rngs::StdRng, SeedableRng}; use std::{ fs, - io::{Read, Write}, - path::PathBuf, + io::Write, + path::{Path, PathBuf}, sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -123,6 +123,7 @@ impl AptosNodeArgs { setup_test_environment_and_start_node( self.config, self.test_config_override, + None, self.test_dir, self.random_ports, self.lazy, @@ -231,6 +232,7 @@ pub fn start( pub fn setup_test_environment_and_start_node( config_path: Option, test_config_override_path: Option, + config: Option, test_dir: Option, random_ports: bool, enable_lazy_mode: bool, @@ -253,46 +255,22 @@ where let aptos_root_key_path = test_dir.join("mint.key"); // If there's already a config, use it. Otherwise create a test one. - let config = if validator_config_path.exists() { + let config = if let Some(config) = config { + config + } else if validator_config_path.exists() { NodeConfig::load_from_path(&validator_config_path) .map_err(|error| anyhow!("Unable to load config: {:?}", error))? } else { - // Create a test only config for a single validator node - let node_config = create_single_node_test_config( - config_path.clone(), - test_config_override_path.clone(), + // Create a test only config for a single validator node. + create_single_node_test_config( + &config_path, + &test_config_override_path, + &test_dir, + random_ports, enable_lazy_mode, - )?; - - // Build genesis and the validator node - let builder = aptos_genesis::builder::Builder::new(&test_dir, framework.clone())? - .with_init_config(Some(Arc::new(move |_, config, _| { - *config = node_config.clone(); - }))) - .with_init_genesis_config(Some(Arc::new(|genesis_config| { - genesis_config.allow_new_validators = true; - genesis_config.epoch_duration_secs = EPOCH_LENGTH_SECS; - genesis_config.recurring_lockup_duration_secs = 7200; - }))) - .with_randomize_first_validator_ports(random_ports); - let (root_key, _genesis, genesis_waypoint, mut validators) = builder.build(rng)?; - - // Write the mint key to disk - let serialized_keys = bcs::to_bytes(&root_key)?; - let mut key_file = fs::File::create(&aptos_root_key_path)?; - key_file.write_all(&serialized_keys)?; - - // Build a waypoint file so that clients / docker can grab it easily - let waypoint_file_path = test_dir.join("waypoint.txt"); - Write::write_all( - &mut fs::File::create(waypoint_file_path)?, - genesis_waypoint.to_string().as_bytes(), - )?; - - aptos_config::config::sanitize_node_config(validators[0].config.override_config_mut())?; - - // Return the validator config - validators[0].config.override_config().clone() + framework, + rng, + )? }; // Prepare log file since we cannot automatically route logs to stderr @@ -316,7 +294,7 @@ where println!("\tTest dir: {:?}", test_dir); println!("\tAptos root key path: {:?}", aptos_root_key_path); println!("\tWaypoint: {}", config.base.waypoint.genesis_waypoint()); - println!("\tChainId: {}", ChainId::test()); + println!("\tChainId: {}", ChainId::test().id()); println!("\tREST API endpoint: http://{}", &config.api.address); println!( "\tMetrics endpoint: http://{}:{}/metrics", @@ -327,9 +305,10 @@ where &config.full_node_networks[0].listen_address ); if config.indexer_grpc.enabled { - if let Some(ref indexer_grpc_address) = config.indexer_grpc.address { - println!("\tIndexer gRPC endpoint: {}", indexer_grpc_address); - } + println!( + "\tIndexer gRPC node stream endpoint: {}", + config.indexer_grpc.address + ); } if enable_lazy_mode { println!("\tLazy mode is enabled"); @@ -340,26 +319,31 @@ where } /// Creates a single node test config, with a few config tweaks to reduce -/// the overhead of running the node on a local machine. -fn create_single_node_test_config( - config_path: Option, - test_config_override_path: Option, +/// the overhead of running the node on a local machine. It writes necessary +/// configuration artifacts (e.g. the mint key) to disk. +pub fn create_single_node_test_config( + config_path: &Option, + test_config_override_path: &Option, + test_dir: &Path, + random_ports: bool, enable_lazy_mode: bool, -) -> anyhow::Result { + framework: &ReleaseBundle, + rng: R, +) -> anyhow::Result +where + R: rand::RngCore + rand::CryptoRng, +{ let mut node_config = match test_config_override_path { // If a config override path was provided, merge it with the default config Some(test_config_override_path) => { - let mut contents = String::new(); - fs::File::open(&test_config_override_path) - .map_err(|e| { - anyhow!( - "Unable to open config override file {:?}. Error: {}", - test_config_override_path, - e - ) - })? - .read_to_string(&mut contents)?; - let values = serde_yaml::from_str::(&contents).map_err(|e| { + let reader = fs::File::open(test_config_override_path).map_err(|e| { + anyhow!( + "Unable to open config override file {:?}. Error: {}", + test_config_override_path, + e + ) + })?; + let values: serde_yaml::Value = serde_yaml::from_reader(&reader).map_err(|e| { anyhow!( "Unable to read config override file as YAML {:?}. Error: {}", test_config_override_path, @@ -433,7 +417,7 @@ fn create_single_node_test_config( // If a config path was provided, use that as the template if let Some(config_path) = config_path { - node_config = NodeConfig::load_config(&config_path).map_err(|e| { + node_config = NodeConfig::load_config(config_path).map_err(|e| { anyhow!( "Unable to load config from path: {:?}. Error: {:?}", config_path, @@ -455,6 +439,38 @@ fn create_single_node_test_config( node_config.consensus.quorum_store_poll_time_ms = 3_600_000; } + // The validator builder puts the first node in the 0 directory + let aptos_root_key_path = test_dir.join("mint.key"); + + // Build genesis and the validator node + let builder = aptos_genesis::builder::Builder::new(test_dir, framework.clone())? + .with_init_config(Some(Arc::new(move |_, config, _| { + *config = node_config.clone(); + }))) + .with_init_genesis_config(Some(Arc::new(|genesis_config| { + genesis_config.allow_new_validators = true; + genesis_config.epoch_duration_secs = EPOCH_LENGTH_SECS; + genesis_config.recurring_lockup_duration_secs = 7200; + }))) + .with_randomize_first_validator_ports(random_ports); + let (root_key, _genesis, genesis_waypoint, mut validators) = builder.build(rng)?; + + // Write the mint key to disk + let serialized_keys = bcs::to_bytes(&root_key)?; + let mut key_file = fs::File::create(aptos_root_key_path)?; + key_file.write_all(&serialized_keys)?; + + // Build a waypoint file so that clients / docker can grab it easily + let waypoint_file_path = test_dir.join("waypoint.txt"); + Write::write_all( + &mut fs::File::create(waypoint_file_path)?, + genesis_waypoint.to_string().as_bytes(), + )?; + + aptos_config::config::sanitize_node_config(&mut validators[0].config)?; + + let node_config = validators[0].config.clone(); + Ok(node_config) } diff --git a/aptos-node/src/tests.rs b/aptos-node/src/tests.rs index dd170ddd5578ad..ac2666e453d199 100644 --- a/aptos-node/src/tests.rs +++ b/aptos-node/src/tests.rs @@ -8,6 +8,7 @@ use aptos_infallible::RwLock; use aptos_storage_interface::{DbReader, DbReaderWriter, DbWriter}; use aptos_temppath::TempPath; use aptos_types::{chain_id::ChainId, waypoint::Waypoint}; +use rand::SeedableRng; use std::{fs, sync::Arc}; /// A mock database implementing DbReader and DbWriter @@ -48,10 +49,11 @@ fn test_aptos_vm_does_not_have_test_natives() { aptos_vm::natives::assert_no_test_natives(crate::utils::ERROR_MSG_BAD_FEATURE_FLAGS) } +// This test confirms that the overriding behavior works as intended. #[test] fn test_create_single_node_test_config() { // create a test config override and merge it with the default config - // this will get cleaned up by the tempdir when it goes out of scope + // this will get cleaned up by the tempdir when it goes out of scope let test_dir = aptos_temppath::TempPath::new().as_ref().to_path_buf(); fs::DirBuilder::new() .recursive(true) @@ -82,8 +84,16 @@ fn test_create_single_node_test_config() { // merge it let default_node_config = NodeConfig::get_default_validator_config(); - let merged_config = - create_single_node_test_config(None, Some(config_override_path), false).unwrap(); + let merged_config = create_single_node_test_config( + &None, + &Some(config_override_path), + &test_dir, + false, + false, + aptos_cached_packages::head_release_bundle(), + rand::rngs::StdRng::from_entropy(), + ) + .unwrap(); // overriden configs assert!(merged_config.storage.enable_indexer); diff --git a/config/src/config/indexer_grpc_config.rs b/config/src/config/indexer_grpc_config.rs index 79645d31c65c8a..09fcc2588b8526 100644 --- a/config/src/config/indexer_grpc_config.rs +++ b/config/src/config/indexer_grpc_config.rs @@ -6,33 +6,48 @@ use crate::config::{ }; use aptos_types::chain_id::ChainId; use serde::{Deserialize, Serialize}; +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; // Useful indexer defaults -const DEFAULT_ADDRESS: &str = "0.0.0.0:50051"; -const DEFAULT_OUTPUT_BATCH_SIZE: u16 = 100; -const DEFAULT_PROCESSOR_BATCH_SIZE: u16 = 1000; const DEFAULT_PROCESSOR_TASK_COUNT: u16 = 20; +const DEFAULT_PROCESSOR_BATCH_SIZE: u16 = 1000; +const DEFAULT_OUTPUT_BATCH_SIZE: u16 = 100; +pub const DEFAULT_GRPC_STREAM_PORT: u16 = 50051; -#[derive(Clone, Debug, Default, Deserialize, PartialEq, Eq, Serialize)] +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] #[serde(default, deny_unknown_fields)] pub struct IndexerGrpcConfig { pub enabled: bool, - /// The address that the grpc server will listen on - #[serde(default, skip_serializing_if = "Option::is_none")] - pub address: Option, + /// The address that the grpc server will listen on. + pub address: SocketAddr, /// Number of processor tasks to fan out - #[serde(default, skip_serializing_if = "Option::is_none")] - pub processor_task_count: Option, + pub processor_task_count: u16, /// Number of transactions each processor will process - #[serde(default, skip_serializing_if = "Option::is_none")] - pub processor_batch_size: Option, + pub processor_batch_size: u16, /// Number of transactions returned in a single stream response - #[serde(default, skip_serializing_if = "Option::is_none")] - pub output_batch_size: Option, + pub output_batch_size: u16, +} + +// Reminder, #[serde(default)] on IndexerGrpcConfig means that the default values for +// fields will come from this Default impl, unless the field has a specific +// #[serde(default)] on it (which none of the above do). +impl Default for IndexerGrpcConfig { + fn default() -> Self { + Self { + enabled: false, + address: SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(0, 0, 0, 0), + DEFAULT_GRPC_STREAM_PORT, + )), + processor_task_count: DEFAULT_PROCESSOR_TASK_COUNT, + processor_batch_size: DEFAULT_PROCESSOR_BATCH_SIZE, + output_batch_size: DEFAULT_OUTPUT_BATCH_SIZE, + } + } } impl ConfigSanitizer for IndexerGrpcConfig { @@ -41,28 +56,18 @@ impl ConfigSanitizer for IndexerGrpcConfig { _node_type: NodeType, _chain_id: ChainId, ) -> Result<(), Error> { - let indexer_grpc_config = &mut node_config.indexer_grpc; + let sanitizer_name = Self::get_sanitizer_name(); - // If the indexer is not enabled, we don't need to do anything - if !indexer_grpc_config.enabled { + if !node_config.indexer_grpc.enabled { return Ok(()); } - // Set appropriate defaults - indexer_grpc_config.address = indexer_grpc_config - .address - .clone() - .or_else(|| Some(DEFAULT_ADDRESS.into())); - indexer_grpc_config.processor_task_count = indexer_grpc_config - .processor_task_count - .or(Some(DEFAULT_PROCESSOR_TASK_COUNT)); - indexer_grpc_config.processor_batch_size = indexer_grpc_config - .processor_batch_size - .or(Some(DEFAULT_PROCESSOR_BATCH_SIZE)); - indexer_grpc_config.output_batch_size = indexer_grpc_config - .output_batch_size - .or(Some(DEFAULT_OUTPUT_BATCH_SIZE)); - + if !node_config.storage.enable_indexer { + return Err(Error::ConfigSanitizerFailed( + sanitizer_name, + "storage.enable_indexer must be true if indexer_grpc.enabled is true".to_string(), + )); + } Ok(()) } } diff --git a/crates/aptos/src/node/mod.rs b/crates/aptos/src/node/mod.rs index 22d6ff10a17fb3..8a86c600006688 100644 --- a/crates/aptos/src/node/mod.rs +++ b/crates/aptos/src/node/mod.rs @@ -1150,6 +1150,7 @@ impl CliCommand<()> for RunLocalTestnet { let result = aptos_node::setup_test_environment_and_start_node( config_path, self.test_config_override, + None, Some(test_dir_copy), false, false, diff --git a/ecosystem/indexer-grpc/indexer-grpc-cache-worker/Cargo.toml b/ecosystem/indexer-grpc/indexer-grpc-cache-worker/Cargo.toml index 2b3ca437063b8b..91074b24a95d6f 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-cache-worker/Cargo.toml +++ b/ecosystem/indexer-grpc/indexer-grpc-cache-worker/Cargo.toml @@ -35,6 +35,7 @@ serde_yaml = { workspace = true } tokio = { workspace = true } tonic = { workspace = true } tracing = { workspace = true } +url = { workspace = true } [dev-dependencies] aptos-config = { workspace = true } diff --git a/ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/lib.rs b/ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/lib.rs index 55820d580a76df..3bdb9f18882d7c 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/lib.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/lib.rs @@ -4,18 +4,19 @@ pub mod metrics; pub mod worker; -use anyhow::{Ok, Result}; +use anyhow::{Context, Result}; use aptos_indexer_grpc_server_framework::RunnableConfig; -use aptos_indexer_grpc_utils::config::IndexerGrpcFileStoreConfig; +use aptos_indexer_grpc_utils::{config::IndexerGrpcFileStoreConfig, types::RedisUrl}; use serde::{Deserialize, Serialize}; +use url::Url; use worker::Worker; #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(deny_unknown_fields)] pub struct IndexerGrpcCacheWorkerConfig { - pub fullnode_grpc_address: String, + pub fullnode_grpc_address: Url, pub file_store_config: IndexerGrpcFileStoreConfig, - pub redis_main_instance_address: String, + pub redis_main_instance_address: RedisUrl, } #[async_trait::async_trait] @@ -26,12 +27,13 @@ impl RunnableConfig for IndexerGrpcCacheWorkerConfig { self.redis_main_instance_address.clone(), self.file_store_config.clone(), ) - .await; - worker.run().await; + .await + .context("Failed to create cache worker")?; + worker.run().await?; Ok(()) } fn get_server_name(&self) -> String { - "idxcache".to_string() + "idxcachewrkr".to_string() } } diff --git a/ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/worker.rs b/ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/worker.rs index e905456de00826..f1ac2254034388 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/worker.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/worker.rs @@ -5,6 +5,7 @@ use crate::metrics::{ ERROR_COUNT, LATEST_PROCESSED_VERSION, PROCESSED_BATCH_SIZE, PROCESSED_LATENCY_IN_SECS, PROCESSED_VERSIONS_COUNT, }; +use anyhow::{bail, Context, Result}; use aptos_indexer_grpc_utils::{ cache_operator::CacheOperator, config::IndexerGrpcFileStoreConfig, @@ -13,6 +14,7 @@ use aptos_indexer_grpc_utils::{ FileStoreMetadata, FileStoreOperator, GcsFileStoreOperator, LocalFileStoreOperator, }, time_diff_since_pb_timestamp_in_secs, + types::RedisUrl, }; use aptos_moving_average::MovingAverage; use aptos_protos::internal::fullnode::v1::{ @@ -22,6 +24,7 @@ use aptos_protos::internal::fullnode::v1::{ use futures::{self, StreamExt}; use prost::Message; use tracing::{error, info}; +use url::Url; type ChainID = u32; type StartingVersion = u64; @@ -30,7 +33,7 @@ pub struct Worker { /// Redis client. redis_client: redis::Client, /// Fullnode grpc address. - fullnode_grpc_address: String, + fullnode_grpc_address: Url, /// File store config file_store: IndexerGrpcFileStoreConfig, } @@ -58,17 +61,22 @@ pub(crate) enum GrpcDataStatus { impl Worker { pub async fn new( - fullnode_grpc_address: String, - redis_main_instance_address: String, + fullnode_grpc_address: Url, + redis_main_instance_address: RedisUrl, file_store: IndexerGrpcFileStoreConfig, - ) -> Self { - let redis_client = redis::Client::open(format!("redis://{}", redis_main_instance_address)) - .expect("Create redis client failed."); - Self { + ) -> Result { + let redis_client = redis::Client::open(redis_main_instance_address.0.clone()) + .with_context(|| { + format!( + "Failed to create redis client for {}", + redis_main_instance_address + ) + })?; + Ok(Self { redis_client, file_store, - fullnode_grpc_address: format!("http://{}", fullnode_grpc_address), - } + fullnode_grpc_address, + }) } /// The main loop of the worker is: @@ -79,14 +87,15 @@ impl Worker { /// * If metadata is not present and cache is not empty, crash. /// * If metadata is present, start from file store version. /// 4. Process the streaming response. - pub async fn run(&mut self) { + // TODO: Use the ! return type when it is stable. + pub async fn run(&mut self) -> Result<()> { // Re-connect if lost. loop { let conn = self .redis_client .get_tokio_connection_manager() .await - .expect("Get redis connection failed."); + .context("Get redis connection failed.")?; let mut rpc_client = create_grpc_client(self.fullnode_grpc_address.clone()).await; // 1. Fetch metadata. @@ -121,10 +130,15 @@ impl Worker { let response = rpc_client .get_transactions_from_node(request) .await - .unwrap(); + .with_context(|| { + format!( + "Failed to get transactions from node at starting version {}", + starting_version + ) + })?; // 3&4. Infinite streaming until error happens. Either stream ends or worker crashes. - process_streaming_response(conn, file_store_metadata, response.into_inner()).await; + process_streaming_response(conn, file_store_metadata, response.into_inner()).await?; } } } @@ -132,7 +146,7 @@ impl Worker { async fn process_transactions_from_node_response( response: TransactionsFromNodeResponse, cache_operator: &mut CacheOperator, -) -> anyhow::Result { +) -> Result { match response.response.unwrap() { Response::Status(status) => { match StatusType::try_from(status.r#type).expect("[Indexer Cache] Invalid status type.") @@ -155,9 +169,12 @@ async fn process_transactions_from_node_response( }, Response::Data(data) => { let transaction_len = data.transactions.len(); - let start_version = data.transactions.first().unwrap().version; - let first_transaction_pb_timestamp = - data.transactions.first().unwrap().timestamp.clone(); + let first_transaction = data + .transactions + .first() + .context("There were unexpectedly no transactions in the response")?; + let start_version = first_transaction.version; + let first_transaction_pb_timestamp = first_transaction.timestamp.clone(); let transactions = data .transactions .into_iter() @@ -168,11 +185,11 @@ async fn process_transactions_from_node_response( }; let mut encoded_proto_data = vec![]; tx.encode(&mut encoded_proto_data) - .expect("Encode transaction failed."); + .context("Encode transaction failed.")?; let base64_encoded_proto_data = base64::encode(encoded_proto_data); - (tx.version, base64_encoded_proto_data, timestamp_in_seconds) + Ok((tx.version, base64_encoded_proto_data, timestamp_in_seconds)) }) - .collect::>(); + .collect::>>()?; // Push to cache. match cache_operator.update_cache_transactions(transactions).await { @@ -181,7 +198,7 @@ async fn process_transactions_from_node_response( ERROR_COUNT .with_label_values(&["failed_to_update_cache_version"]) .inc(); - anyhow::bail!("Update cache with version failed: {}", e); + bail!("Update cache with version failed: {}", e); }, } if let Some(ref txn_time) = first_transaction_pb_timestamp { @@ -199,34 +216,34 @@ async fn process_transactions_from_node_response( async fn setup_cache_with_init_signal( conn: redis::aio::ConnectionManager, init_signal: TransactionsFromNodeResponse, -) -> ( +) -> Result<( CacheOperator, ChainID, StartingVersion, -) { +)> { let (fullnode_chain_id, starting_version) = - match init_signal.response.expect("Response type not exists.") { + match init_signal.response.expect("Response type does not exist.") { Response::Status(status_frame) => { match StatusType::try_from(status_frame.r#type).expect("Invalid status type.") { StatusType::Init => (init_signal.chain_id, status_frame.start_version), _ => { - panic!("[Indexer Cache] Streaming error: first frame is not INIT signal."); + bail!("[Indexer Cache] Streaming error: first frame is not INIT signal."); }, } }, _ => { - panic!("[Indexer Cache] Streaming error: first frame is not siganl frame."); + bail!("[Indexer Cache] Streaming error: first frame is not siganl frame."); }, }; let mut cache_operator = CacheOperator::new(conn); - cache_operator.cache_setup_if_needed().await; + cache_operator.cache_setup_if_needed().await?; cache_operator .update_or_verify_chain_id(fullnode_chain_id as u64) .await - .expect("[Indexer Cache] Chain id mismatch between cache and fullnode."); + .context("[Indexer Cache] Chain id mismatch between cache and fullnode.")?; - (cache_operator, fullnode_chain_id, starting_version) + Ok((cache_operator, fullnode_chain_id, starting_version)) } // Infinite streaming processing. Retry if error happens; crash if fatal. @@ -235,25 +252,27 @@ async fn process_streaming_response( file_store_metadata: Option, mut resp_stream: impl futures_core::Stream> + std::marker::Unpin, -) { +) -> Result<()> { let mut tps_calculator = MovingAverage::new(10_000); let mut transaction_count = 0; // 3. Set up the cache operator with init signal. let init_signal = match resp_stream.next().await { Some(Ok(r)) => r, _ => { - panic!("[Indexer Cache] Streaming error: no response."); + bail!("[Indexer Cache] Streaming error: no response."); }, }; let (mut cache_operator, fullnode_chain_id, starting_version) = - setup_cache_with_init_signal(conn, init_signal).await; + setup_cache_with_init_signal(conn, init_signal) + .await + .context("Failed to setup cache")?; // It's required to start the worker with the same version as file store. if let Some(file_store_metadata) = file_store_metadata { if file_store_metadata.version != starting_version { - panic!("[Indexer Cache] File store version mismatch with fullnode."); + bail!("[Indexer Cache] File store version mismatch with fullnode."); } if file_store_metadata.chain_id != fullnode_chain_id as u64 { - panic!("[Indexer Cache] Chain id mismatch between file store and fullnode."); + bail!("[Indexer Cache] Chain id mismatch between file store and fullnode."); } } let mut current_version = starting_version; @@ -323,7 +342,7 @@ async fn process_streaming_response( cache_operator .update_cache_latest_version(transaction_count, current_version) .await - .unwrap(); + .context("Failed to update the latest version in the cache")?; transaction_count = 0; info!( current_version = current_version, @@ -343,4 +362,8 @@ async fn process_streaming_response( }, } } + + // It is expected that we get to this point, the upstream server disconnects + // clients after 5 minutes. + Ok(()) } diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs new file mode 100644 index 00000000000000..4c3168dbfc13a0 --- /dev/null +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs @@ -0,0 +1,222 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::service::RawDataServerWrapper; +use anyhow::{bail, Result}; +use aptos_indexer_grpc_server_framework::RunnableConfig; +use aptos_indexer_grpc_utils::{config::IndexerGrpcFileStoreConfig, types::RedisUrl}; +use aptos_protos::{ + indexer::v1::FILE_DESCRIPTOR_SET as INDEXER_V1_FILE_DESCRIPTOR_SET, + transaction::v1::FILE_DESCRIPTOR_SET as TRANSACTION_V1_TESTING_FILE_DESCRIPTOR_SET, + util::timestamp::FILE_DESCRIPTOR_SET as UTIL_TIMESTAMP_FILE_DESCRIPTOR_SET, +}; +use serde::{Deserialize, Serialize}; +use std::{collections::HashSet, net::SocketAddr}; +use tonic::{ + codec::CompressionEncoding, + codegen::InterceptedService, + metadata::{Ascii, MetadataValue}, + transport::Server, + Request, Status, +}; + +pub const SERVER_NAME: &str = "idxdatasvc"; + +// Default max response channel size. +const DEFAULT_MAX_RESPONSE_CHANNEL_SIZE: usize = 3; + +// HTTP2 ping interval and timeout. +// This can help server to garbage collect dead connections. +// tonic server: https://docs.rs/tonic/latest/tonic/transport/server/struct.Server.html#method.http2_keepalive_interval +const HTTP2_PING_INTERVAL_DURATION: std::time::Duration = std::time::Duration::from_secs(60); +const HTTP2_PING_TIMEOUT_DURATION: std::time::Duration = std::time::Duration::from_secs(10); + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct TlsConfig { + /// The address for the TLS GRPC server to listen on. + pub data_service_grpc_listen_address: SocketAddr, + pub cert_path: String, + pub key_path: String, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct NonTlsConfig { + /// The address for the TLS GRPC server to listen on. + pub data_service_grpc_listen_address: SocketAddr, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct IndexerGrpcDataServiceConfig { + /// If given, we will run a server that uses TLS. + pub data_service_grpc_tls_config: Option, + /// If given, we will run a server that does not use TLS. + pub data_service_grpc_non_tls_config: Option, + /// The size of the response channel that response can be buffered. + #[serde(default = "IndexerGrpcDataServiceConfig::default_data_service_response_channel_size")] + pub data_service_response_channel_size: usize, + /// A list of auth tokens that are allowed to access the service. + pub whitelisted_auth_tokens: Vec, + /// If set, don't check for auth tokens. + #[serde(default)] + pub disable_auth_check: bool, + /// File store config. + pub file_store_config: IndexerGrpcFileStoreConfig, + /// Redis read replica address. + pub redis_read_replica_address: RedisUrl, +} + +impl IndexerGrpcDataServiceConfig { + pub fn new( + data_service_grpc_tls_config: Option, + data_service_grpc_non_tls_config: Option, + data_service_response_channel_size: Option, + whitelisted_auth_tokens: Vec, + disable_auth_check: bool, + file_store_config: IndexerGrpcFileStoreConfig, + redis_read_replica_address: RedisUrl, + ) -> Self { + Self { + data_service_grpc_tls_config, + data_service_grpc_non_tls_config, + data_service_response_channel_size: data_service_response_channel_size + .unwrap_or_else(Self::default_data_service_response_channel_size), + whitelisted_auth_tokens, + disable_auth_check, + file_store_config, + redis_read_replica_address, + } + } + + pub const fn default_data_service_response_channel_size() -> usize { + DEFAULT_MAX_RESPONSE_CHANNEL_SIZE + } +} + +#[async_trait::async_trait] +impl RunnableConfig for IndexerGrpcDataServiceConfig { + fn validate(&self) -> Result<()> { + if self.disable_auth_check && !self.whitelisted_auth_tokens.is_empty() { + bail!("disable_auth_check is set but whitelisted_auth_tokens is not empty"); + } + if !self.disable_auth_check && self.whitelisted_auth_tokens.is_empty() { + bail!("disable_auth_check is not set but whitelisted_auth_tokens is empty"); + } + if self.data_service_grpc_non_tls_config.is_none() + && self.data_service_grpc_tls_config.is_none() + { + bail!("At least one of data_service_grpc_non_tls_config and data_service_grpc_tls_config must be set"); + } + Ok(()) + } + + async fn run(&self) -> Result<()> { + let token_set = build_auth_token_set(self.whitelisted_auth_tokens.clone()); + let disable_auth_check = self.disable_auth_check; + let authentication_inceptor = + move |req: Request<()>| -> std::result::Result, Status> { + if disable_auth_check { + return std::result::Result::Ok(req); + } + let metadata = req.metadata(); + if let Some(token) = + metadata.get(aptos_indexer_grpc_utils::constants::GRPC_AUTH_TOKEN_HEADER) + { + if token_set.contains(token) { + std::result::Result::Ok(req) + } else { + Err(Status::unauthenticated("Invalid token")) + } + } else { + Err(Status::unauthenticated("Missing token")) + } + }; + let reflection_service = tonic_reflection::server::Builder::configure() + // Note: It is critical that the file descriptor set is registered for every + // file that the top level API proto depends on recursively. If you don't, + // compilation will still succeed but reflection will fail at runtime. + // + // TODO: Add a test for this / something in build.rs, this is a big footgun. + .register_encoded_file_descriptor_set(INDEXER_V1_FILE_DESCRIPTOR_SET) + .register_encoded_file_descriptor_set(TRANSACTION_V1_TESTING_FILE_DESCRIPTOR_SET) + .register_encoded_file_descriptor_set(UTIL_TIMESTAMP_FILE_DESCRIPTOR_SET) + .build() + .map_err(|e| anyhow::anyhow!("Failed to build reflection service: {}", e))?; + + // Add authentication interceptor. + let server = RawDataServerWrapper::new( + self.redis_read_replica_address.clone(), + self.file_store_config.clone(), + self.data_service_response_channel_size, + )?; + let svc = aptos_protos::indexer::v1::raw_data_server::RawDataServer::new(server) + .send_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Gzip); + let svc_with_interceptor = InterceptedService::new(svc, authentication_inceptor); + + let svc_with_interceptor_clone = svc_with_interceptor.clone(); + let reflection_service_clone = reflection_service.clone(); + + let mut tasks = vec![]; + if let Some(config) = &self.data_service_grpc_non_tls_config { + let listen_address = config.data_service_grpc_listen_address; + tracing::info!( + grpc_address = listen_address.to_string().as_str(), + "[data service] starting gRPC server with non-TLS." + ); + tasks.push(tokio::spawn(async move { + Server::builder() + .http2_keepalive_interval(Some(HTTP2_PING_INTERVAL_DURATION)) + .http2_keepalive_timeout(Some(HTTP2_PING_TIMEOUT_DURATION)) + .add_service(svc_with_interceptor_clone) + .add_service(reflection_service_clone) + .serve(listen_address) + .await + .map_err(|e| anyhow::anyhow!(e)) + })); + } + if let Some(config) = &self.data_service_grpc_tls_config { + let listen_address = config.data_service_grpc_listen_address; + let cert = tokio::fs::read(config.cert_path.clone()).await?; + let key = tokio::fs::read(config.key_path.clone()).await?; + let identity = tonic::transport::Identity::from_pem(cert, key); + tracing::info!( + grpc_address = listen_address.to_string().as_str(), + "[Data Service] Starting gRPC server with TLS." + ); + tasks.push(tokio::spawn(async move { + Server::builder() + .http2_keepalive_interval(Some(HTTP2_PING_INTERVAL_DURATION)) + .http2_keepalive_timeout(Some(HTTP2_PING_TIMEOUT_DURATION)) + .tls_config(tonic::transport::ServerTlsConfig::new().identity(identity))? + .add_service(svc_with_interceptor) + .add_service(reflection_service) + .serve(listen_address) + .await + .map_err(|e| anyhow::anyhow!(e)) + })); + } + + if tasks.is_empty() { + return Err(anyhow::anyhow!("No grpc config provided")); + } + + futures::future::try_join_all(tasks).await?; + Ok(()) + } + + fn get_server_name(&self) -> String { + SERVER_NAME.to_string() + } +} + +/// Build a set of whitelisted auth tokens. Invalid tokens are ignored. +pub fn build_auth_token_set(whitelisted_auth_tokens: Vec) -> HashSet> { + whitelisted_auth_tokens + .into_iter() + .map(|token| token.parse::>()) + .filter_map(Result::ok) + .collect::>() +} diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/lib.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/lib.rs index c4c2ee138f6a7f..5e27df871b3ac3 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/lib.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/lib.rs @@ -1,5 +1,8 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 +mod config; pub mod metrics; pub mod service; + +pub use config::{IndexerGrpcDataServiceConfig, NonTlsConfig, SERVER_NAME}; diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/main.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/main.rs index e05859fe06e02f..04f273c28d493b 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/main.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/main.rs @@ -2,181 +2,9 @@ // SPDX-License-Identifier: Apache-2.0 use anyhow::Result; -use aptos_indexer_grpc_data_service::service::RawDataServerWrapper; -use aptos_indexer_grpc_server_framework::{RunnableConfig, ServerArgs}; -use aptos_indexer_grpc_utils::config::IndexerGrpcFileStoreConfig; -use aptos_protos::{ - indexer::v1::FILE_DESCRIPTOR_SET as INDEXER_V1_FILE_DESCRIPTOR_SET, - transaction::v1::FILE_DESCRIPTOR_SET as TRANSACTION_V1_TESTING_FILE_DESCRIPTOR_SET, - util::timestamp::FILE_DESCRIPTOR_SET as UTIL_TIMESTAMP_FILE_DESCRIPTOR_SET, -}; +use aptos_indexer_grpc_data_service::IndexerGrpcDataServiceConfig; +use aptos_indexer_grpc_server_framework::ServerArgs; use clap::Parser; -use serde::{Deserialize, Serialize}; -use std::{collections::HashSet, net::ToSocketAddrs}; -use tonic::{ - codec::CompressionEncoding, - codegen::InterceptedService, - metadata::{Ascii, MetadataValue}, - transport::Server, - Request, Status, -}; - -// HTTP2 ping interval and timeout. -// This can help server to garbage collect dead connections. -// tonic server: https://docs.rs/tonic/latest/tonic/transport/server/struct.Server.html#method.http2_keepalive_interval -const HTTP2_PING_INTERVAL_DURATION: std::time::Duration = std::time::Duration::from_secs(60); -const HTTP2_PING_TIMEOUT_DURATION: std::time::Duration = std::time::Duration::from_secs(10); - -#[derive(Clone, Debug, Deserialize, Serialize)] -#[serde(deny_unknown_fields)] -pub struct TlsConfig { - // TLS config. - pub data_service_grpc_listen_address: String, - pub cert_path: String, - pub key_path: String, -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -#[serde(deny_unknown_fields)] -pub struct NonTlsConfig { - pub data_service_grpc_listen_address: String, -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -#[serde(deny_unknown_fields)] -pub struct IndexerGrpcDataServiceConfig { - // The address for TLS and non-TLS gRPC server to listen on. - pub data_service_grpc_tls_config: Option, - pub data_service_grpc_non_tls_config: Option, - // The size of the response channel that response can be buffered. - pub data_service_response_channel_size: Option, - // A list of auth tokens that are allowed to access the service. - pub whitelisted_auth_tokens: Vec, - // File store config. - pub file_store_config: IndexerGrpcFileStoreConfig, - // Redis read replica address. - pub redis_read_replica_address: String, -} - -#[async_trait::async_trait] -impl RunnableConfig for IndexerGrpcDataServiceConfig { - async fn run(&self) -> Result<()> { - let token_set = build_auth_token_set(self.whitelisted_auth_tokens.clone()); - let authentication_inceptor = - move |req: Request<()>| -> std::result::Result, Status> { - let metadata = req.metadata(); - if let Some(token) = - metadata.get(aptos_indexer_grpc_utils::constants::GRPC_AUTH_TOKEN_HEADER) - { - if token_set.contains(token) { - std::result::Result::Ok(req) - } else { - Err(Status::unauthenticated("Invalid token")) - } - } else { - Err(Status::unauthenticated("Missing token")) - } - }; - let reflection_service = tonic_reflection::server::Builder::configure() - // Note: It is critical that the file descriptor set is registered for every - // file that the top level API proto depends on recursively. If you don't, - // compilation will still succeed but reflection will fail at runtime. - // - // TODO: Add a test for this / something in build.rs, this is a big footgun. - .register_encoded_file_descriptor_set(INDEXER_V1_FILE_DESCRIPTOR_SET) - .register_encoded_file_descriptor_set(TRANSACTION_V1_TESTING_FILE_DESCRIPTOR_SET) - .register_encoded_file_descriptor_set(UTIL_TIMESTAMP_FILE_DESCRIPTOR_SET) - .build() - .map_err(|e| anyhow::anyhow!("Failed to build reflection service: {}", e))?; - - // Add authentication interceptor. - let server = RawDataServerWrapper::new( - self.redis_read_replica_address.clone(), - self.file_store_config.clone(), - self.data_service_response_channel_size, - ); - let svc = aptos_protos::indexer::v1::raw_data_server::RawDataServer::new(server) - .send_compressed(CompressionEncoding::Gzip) - .accept_compressed(CompressionEncoding::Gzip); - let svc_with_interceptor = InterceptedService::new(svc, authentication_inceptor); - - let svc_with_interceptor_clone = svc_with_interceptor.clone(); - let reflection_service_clone = reflection_service.clone(); - - let mut tasks = vec![]; - if self.data_service_grpc_non_tls_config.is_some() { - let config = self.data_service_grpc_non_tls_config.clone().unwrap(); - let grpc_address = config - .data_service_grpc_listen_address - .to_socket_addrs() - .map_err(|e| anyhow::anyhow!(e))? - .next() - .ok_or_else(|| anyhow::anyhow!("Failed to parse grpc address"))?; - tracing::info!( - grpc_address = grpc_address.to_string().as_str(), - "[Data Service] Starting gRPC server with non-TLS." - ); - tasks.push(tokio::spawn(async move { - Server::builder() - .http2_keepalive_interval(Some(HTTP2_PING_INTERVAL_DURATION)) - .http2_keepalive_timeout(Some(HTTP2_PING_TIMEOUT_DURATION)) - .add_service(svc_with_interceptor_clone) - .add_service(reflection_service_clone) - .serve(grpc_address) - .await - .map_err(|e| anyhow::anyhow!(e)) - })); - } - if self.data_service_grpc_tls_config.is_some() { - let config = self.data_service_grpc_tls_config.clone().unwrap(); - let grpc_address = config - .data_service_grpc_listen_address - .to_socket_addrs() - .map_err(|e| anyhow::anyhow!(e))? - .next() - .ok_or_else(|| anyhow::anyhow!("Failed to parse grpc address"))?; - - let cert = tokio::fs::read(config.cert_path.clone()).await?; - let key = tokio::fs::read(config.key_path.clone()).await?; - let identity = tonic::transport::Identity::from_pem(cert, key); - tracing::info!( - grpc_address = grpc_address.to_string().as_str(), - "[Data Service] Starting gRPC server with TLS." - ); - tasks.push(tokio::spawn(async move { - Server::builder() - .http2_keepalive_interval(Some(HTTP2_PING_INTERVAL_DURATION)) - .http2_keepalive_timeout(Some(HTTP2_PING_TIMEOUT_DURATION)) - .tls_config(tonic::transport::ServerTlsConfig::new().identity(identity))? - .add_service(svc_with_interceptor) - .add_service(reflection_service) - .serve(grpc_address) - .await - .map_err(|e| anyhow::anyhow!(e)) - })); - } - - if tasks.is_empty() { - return Err(anyhow::anyhow!("No grpc config provided")); - } - - futures::future::try_join_all(tasks).await?; - Ok(()) - } - - fn get_server_name(&self) -> String { - "idxdata".to_string() - } -} - -/// Build a set of whitelisted auth tokens. Invalid tokens are ignored. -pub fn build_auth_token_set(whitelisted_auth_tokens: Vec) -> HashSet> { - whitelisted_auth_tokens - .into_iter() - .map(|token| token.parse::>()) - .filter_map(Result::ok) - .collect::>() -} #[tokio::main] async fn main() -> Result<()> { diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs index 7768b57da4b163..cbc73007559938 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs @@ -6,6 +6,7 @@ use crate::metrics::{ PROCESSED_LATENCY_IN_SECS, PROCESSED_LATENCY_IN_SECS_ALL, PROCESSED_VERSIONS_COUNT, SHORT_CONNECTION_COUNT, }; +use anyhow::Context; use aptos_indexer_grpc_utils::{ build_protobuf_encoded_transaction_wrappers, cache_operator::{CacheBatchGetStatus, CacheOperator}, @@ -15,7 +16,9 @@ use aptos_indexer_grpc_utils::{ BLOB_STORAGE_SIZE, GRPC_AUTH_TOKEN_HEADER, GRPC_REQUEST_NAME_HEADER, MESSAGE_SIZE_LIMIT, }, file_store_operator::{FileStoreOperator, GcsFileStoreOperator, LocalFileStoreOperator}, - time_diff_since_pb_timestamp_in_secs, EncodedTransactionWithVersion, + time_diff_since_pb_timestamp_in_secs, + types::RedisUrl, + EncodedTransactionWithVersion, }; use aptos_logger::prelude::{sample, SampleRate}; use aptos_moving_average::MovingAverage; @@ -49,9 +52,6 @@ const AHEAD_OF_CACHE_RETRY_SLEEP_DURATION_MS: u64 = 50; // TODO(larry): fix all errors treated as transient errors. const TRANSIENT_DATA_ERROR_RETRY_SLEEP_DURATION_MS: u64 = 1000; -// Default max response channel size. -const DEFAULT_MAX_RESPONSE_CHANNEL_SIZE: usize = 3; - // The server will retry to send the response to the client and give up after RESPONSE_CHANNEL_SEND_TIMEOUT. // This is to prevent the server from being occupied by a slow client. const RESPONSE_CHANNEL_SEND_TIMEOUT: Duration = Duration::from_secs(120); @@ -65,23 +65,24 @@ const REQUEST_HEADER_APTOS_API_KEY_NAME: &str = "x-aptos-api-key-name"; pub struct RawDataServerWrapper { pub redis_client: Arc, pub file_store_config: IndexerGrpcFileStoreConfig, - pub data_service_response_channel_size: Option, + pub data_service_response_channel_size: usize, } impl RawDataServerWrapper { pub fn new( - redis_address: String, + redis_address: RedisUrl, file_store_config: IndexerGrpcFileStoreConfig, - data_service_response_channel_size: Option, - ) -> Self { - Self { + data_service_response_channel_size: usize, + ) -> anyhow::Result { + Ok(Self { redis_client: Arc::new( - redis::Client::open(format!("redis://{}", redis_address)) - .expect("Create redis client failed."), + redis::Client::open(redis_address.0.clone()).with_context(|| { + format!("Failed to create redis client for {}", redis_address) + })?, ), file_store_config, data_service_response_channel_size, - } + }) } } @@ -123,10 +124,7 @@ impl RawData for RawDataServerWrapper { let transactions_count = request.transactions_count; // Response channel to stream the data to the client. - let (tx, rx) = channel( - self.data_service_response_channel_size - .unwrap_or(DEFAULT_MAX_RESPONSE_CHANNEL_SIZE), - ); + let (tx, rx) = channel(self.data_service_response_channel_size); let mut current_version = match &request.starting_version { Some(version) => *version, None => { diff --git a/ecosystem/indexer-grpc/indexer-grpc-file-store/src/lib.rs b/ecosystem/indexer-grpc/indexer-grpc-file-store/src/lib.rs index e44896e3f001df..e24cc1c0104596 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-file-store/src/lib.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-file-store/src/lib.rs @@ -4,9 +4,9 @@ pub mod metrics; pub mod processor; -use anyhow::Result; +use anyhow::{Context, Result}; use aptos_indexer_grpc_server_framework::RunnableConfig; -use aptos_indexer_grpc_utils::config::IndexerGrpcFileStoreConfig; +use aptos_indexer_grpc_utils::{config::IndexerGrpcFileStoreConfig, types::RedisUrl}; use processor::Processor; use serde::{Deserialize, Serialize}; @@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize}; #[serde(deny_unknown_fields)] pub struct IndexerGrpcFileStoreWorkerConfig { pub file_store_config: IndexerGrpcFileStoreConfig, - pub redis_main_instance_address: String, + pub redis_main_instance_address: RedisUrl, } #[async_trait::async_trait] @@ -23,12 +23,14 @@ impl RunnableConfig for IndexerGrpcFileStoreWorkerConfig { let mut processor = Processor::new( self.redis_main_instance_address.clone(), self.file_store_config.clone(), - ); - processor.run().await; - Ok(()) + ) + .await + .context("Failed to create processor for file store worker")?; + processor.run().await?; + Err(anyhow::anyhow!("File store processor exited unexpectedly")) } fn get_server_name(&self) -> String { - "idxfile".to_string() + "idxfilestore".to_string() } } diff --git a/ecosystem/indexer-grpc/indexer-grpc-file-store/src/processor.rs b/ecosystem/indexer-grpc/indexer-grpc-file-store/src/processor.rs index a66c9d19b0d8a2..73d03d5ae7192a 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-file-store/src/processor.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-file-store/src/processor.rs @@ -2,12 +2,14 @@ // SPDX-License-Identifier: Apache-2.0 use crate::metrics::{LATEST_PROCESSED_VERSION, PROCESSED_VERSIONS_COUNT}; +use anyhow::{bail, Context, Result}; use aptos_indexer_grpc_utils::{ build_protobuf_encoded_transaction_wrappers, cache_operator::{CacheBatchGetStatus, CacheOperator}, config::IndexerGrpcFileStoreConfig, constants::BLOB_STORAGE_SIZE, file_store_operator::{FileStoreOperator, GcsFileStoreOperator, LocalFileStoreOperator}, + types::RedisUrl, EncodedTransactionWithVersion, }; use aptos_moving_average::MovingAverage; @@ -19,43 +21,30 @@ const AHEAD_OF_CACHE_SLEEP_DURATION_IN_MILLIS: u64 = 100; /// Processor tails the data in cache and stores the data in file store. pub struct Processor { - cache_operator: Option>, - file_store_processor: Option>, - cache_chain_id: Option, - redis_main_instance_address: String, - file_store_config: IndexerGrpcFileStoreConfig, + cache_operator: CacheOperator, + file_store_operator: Box, + cache_chain_id: u64, } impl Processor { - pub fn new( - redis_main_instance_address: String, + pub async fn new( + redis_main_instance_address: RedisUrl, file_store_config: IndexerGrpcFileStoreConfig, - ) -> Self { - Self { - cache_operator: None, - file_store_processor: None, - cache_chain_id: None, - redis_main_instance_address, - file_store_config, - } - } - - /// Init the processor, including creating the redis connection and file store operator. - async fn init(&mut self) { + ) -> Result { // Connection to redis is a hard dependency for file store processor. - let conn = redis::Client::open(format!("redis://{}", self.redis_main_instance_address)) - .expect("Create redis client failed.") + let conn = redis::Client::open(redis_main_instance_address.0) + .context("Create redis client failed.")? .get_tokio_connection_manager() .await - .expect("Create redis connection failed."); + .context("Create redis connection failed.")?; let mut cache_operator = CacheOperator::new(conn); - let chain_id = cache_operator + let cache_chain_id = cache_operator .get_chain_id() .await - .expect("Get chain id failed."); + .context("Get chain id failed.")?; - let file_store_operator: Box = match &self.file_store_config { + let file_store_operator: Box = match &file_store_config { IndexerGrpcFileStoreConfig::GcsFileStore(gcs_file_store) => { Box::new(GcsFileStoreOperator::new( gcs_file_store.gcs_file_store_bucket_name.clone(), @@ -70,24 +59,23 @@ impl Processor { }; file_store_operator.verify_storage_bucket_existence().await; - self.cache_operator = Some(cache_operator); - self.file_store_processor = Some(file_store_operator); - self.cache_chain_id = Some(chain_id); + Ok(Self { + cache_operator, + file_store_operator, + cache_chain_id, + }) } // Starts the processing. - pub async fn run(&mut self) { - self.init().await; - let cache_chain_id = self.cache_chain_id.unwrap(); + pub async fn run(&mut self) -> Result<()> { + let cache_chain_id = self.cache_chain_id; - // If file store and cache chain id don't match, panic. + // If file store and cache chain id don't match, return an error. let metadata = self - .file_store_processor - .as_mut() - .unwrap() + .file_store_operator .create_default_file_store_metadata_if_absent(cache_chain_id) .await - .unwrap(); + .context("Metadata did not match.")?; // This implements a two-cursor approach: // * One curosr is to track the current cache version. @@ -104,18 +92,16 @@ impl Processor { // 0. Data verfiication. // File store version has to be a multiple of BLOB_STORAGE_SIZE. if current_file_store_version % BLOB_STORAGE_SIZE as u64 != 0 { - panic!("File store version is not a multiple of BLOB_STORAGE_SIZE."); + bail!("File store version is not a multiple of BLOB_STORAGE_SIZE."); } let batch_get_result = self .cache_operator - .as_mut() - .unwrap() .batch_get_encoded_proto_data(current_cache_version) .await; let batch_get_result = - fullnode_grpc_status_handling(batch_get_result, current_cache_version); + fullnode_grpc_status_handling(batch_get_result, current_cache_version)?; let current_transactions = match batch_get_result { Some(transactions) => transactions, @@ -147,12 +133,10 @@ impl Processor { let process_size = transactions_buffer.len() / BLOB_STORAGE_SIZE * BLOB_STORAGE_SIZE; let current_batch = transactions_buffer.drain(..process_size).collect(); - self.file_store_processor - .as_mut() - .unwrap() + self.file_store_operator .upload_transactions(cache_chain_id, current_batch) .await - .unwrap(); + .context("Uploading transactions to file store failed.")?; PROCESSED_VERSIONS_COUNT.inc_by(process_size as u64); tps_calculator.tick_now(process_size as u64); info!( @@ -169,19 +153,19 @@ impl Processor { fn fullnode_grpc_status_handling( fullnode_rpc_status: anyhow::Result, batch_start_version: u64, -) -> Option> { +) -> Result>> { match fullnode_rpc_status { - Ok(CacheBatchGetStatus::Ok(encoded_transactions)) => Some( + Ok(CacheBatchGetStatus::Ok(encoded_transactions)) => Ok(Some( build_protobuf_encoded_transaction_wrappers(encoded_transactions, batch_start_version), - ), - Ok(CacheBatchGetStatus::NotReady) => None, + )), + Ok(CacheBatchGetStatus::NotReady) => Ok(None), Ok(CacheBatchGetStatus::EvictedFromCache) => { - panic!( - "[indexer file]Cache evicted from cache. For file store worker, this is not expected." + bail!( + "[indexer file] Cache evicted from cache. For file store worker, this is not expected." ); }, Err(err) => { - panic!("Batch get encoded proto data failed: {}", err); + bail!("Batch get encoded proto data failed: {}", err); }, } } @@ -195,25 +179,27 @@ mod tests { let fullnode_rpc_status: anyhow::Result = Ok(CacheBatchGetStatus::NotReady); let batch_start_version = 0; - assert!(fullnode_grpc_status_handling(fullnode_rpc_status, batch_start_version).is_none()); + assert!( + fullnode_grpc_status_handling(fullnode_rpc_status, batch_start_version) + .unwrap() + .is_none() + ); } #[test] - #[should_panic] fn verify_the_grpc_status_handling_evicted_from_cache() { let fullnode_rpc_status: anyhow::Result = Ok(CacheBatchGetStatus::EvictedFromCache); let batch_start_version = 0; - fullnode_grpc_status_handling(fullnode_rpc_status, batch_start_version); + assert!(fullnode_grpc_status_handling(fullnode_rpc_status, batch_start_version).is_err()); } #[test] - #[should_panic] fn verify_the_grpc_status_handling_error() { let fullnode_rpc_status: anyhow::Result = Err(anyhow::anyhow!("Error")); let batch_start_version = 0; - fullnode_grpc_status_handling(fullnode_rpc_status, batch_start_version); + assert!(fullnode_grpc_status_handling(fullnode_rpc_status, batch_start_version).is_err()); } #[test] @@ -228,7 +214,7 @@ mod tests { let fullnode_rpc_status: anyhow::Result = Ok(CacheBatchGetStatus::Ok(transactions)); let actual_transactions = - fullnode_grpc_status_handling(fullnode_rpc_status, batch_start_version); + fullnode_grpc_status_handling(fullnode_rpc_status, batch_start_version).unwrap(); assert!(actual_transactions.is_some()); let actual_transactions = actual_transactions.unwrap(); assert_eq!(actual_transactions, transactions_with_version); diff --git a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs index 09b744cf96e251..47d31e901f6362 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs @@ -55,10 +55,10 @@ pub fn bootstrap( let node_config = config.clone(); // We have defaults for these so they should all return something nonnull so unwrap is safe here - let processor_task_count = node_config.indexer_grpc.processor_task_count.unwrap(); - let processor_batch_size = node_config.indexer_grpc.processor_batch_size.unwrap(); - let output_batch_size = node_config.indexer_grpc.output_batch_size.unwrap(); - let address = node_config.indexer_grpc.address.clone().unwrap(); + let processor_task_count = node_config.indexer_grpc.processor_task_count; + let processor_batch_size = node_config.indexer_grpc.processor_batch_size; + let output_batch_size = node_config.indexer_grpc.output_batch_size; + let address = node_config.indexer_grpc.address; runtime.spawn(async move { let context = Arc::new(Context::new(chain_id, db, mp_sender, node_config)); diff --git a/ecosystem/indexer-grpc/indexer-grpc-server-framework/src/lib.rs b/ecosystem/indexer-grpc/indexer-grpc-server-framework/src/lib.rs index 211cd6dd623c64..3594365add9b18 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-server-framework/src/lib.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-server-framework/src/lib.rs @@ -27,6 +27,9 @@ impl ServerArgs { setup_logging(); setup_panic_handler(); let config = load::>(&self.config_path)?; + config + .validate() + .context("Config did not pass validation")?; run_server_with_config(config).await } } @@ -75,6 +78,10 @@ impl RunnableConfig for GenericConfig where T: RunnableConfig, { + fn validate(&self) -> Result<()> { + self.server_config.validate() + } + async fn run(&self) -> Result<()> { self.server_config.run().await } @@ -87,7 +94,15 @@ where /// RunnableConfig is a trait that all services must implement for their configuration. #[async_trait::async_trait] pub trait RunnableConfig: DeserializeOwned + Send + Sync + 'static { + // Validate the config. + fn validate(&self) -> Result<()> { + Ok(()) + } + + // Run something based on the config. async fn run(&self) -> Result<()>; + + // Get the server name. fn get_server_name(&self) -> String; } @@ -109,7 +124,7 @@ pub struct CrashInfo { /// Invoke to ensure process exits on a thread panic. /// -/// Tokio's default behavior is to catch panics and ignore them. Invoking this function will +/// Tokio's default behavior is to catch panics and ignore them. Invoking this function will /// ensure that all subsequent thread panics (even Tokio threads) will report the /// details/backtrace and then exit. pub fn setup_panic_handler() { diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/Cargo.toml b/ecosystem/indexer-grpc/indexer-grpc-utils/Cargo.toml index c1234f1dce6f06..b88da0f164ca4b 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-utils/Cargo.toml +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/Cargo.toml @@ -39,4 +39,5 @@ toml = { workspace = true } tonic = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } +url = { workspace = true } warp = { workspace = true } diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/cache_operator.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/cache_operator.rs index 418187d51c6519..c0823695a2609d 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-utils/src/cache_operator.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/src/cache_operator.rs @@ -118,21 +118,21 @@ impl CacheOperator { } // Set up the cache if needed. - pub async fn cache_setup_if_needed(&mut self) -> bool { + pub async fn cache_setup_if_needed(&mut self) -> anyhow::Result { let version_inserted: bool = redis::cmd("SET") .arg(CACHE_KEY_LATEST_VERSION) .arg(CACHE_DEFAULT_LATEST_VERSION_NUMBER) .arg("NX") .query_async(&mut self.conn) .await - .expect("Redis latest_version check failed."); + .context("Redis latest_version check failed.")?; if version_inserted { tracing::info!( initialized_latest_version = CACHE_DEFAULT_LATEST_VERSION_NUMBER, "Cache latest version is initialized." ); } - version_inserted + Ok(version_inserted) } // Update the chain id in cache if missing; otherwise, verify the chain id. @@ -144,7 +144,7 @@ impl CacheOperator { .arg(chain_id) .invoke_async(&mut self.conn) .await - .expect("Redis chain id update/verification failed."); + .context("Redis chain id update/verification failed.")?; if result != 1 { anyhow::bail!("Chain id is not correct."); } @@ -255,11 +255,11 @@ impl CacheOperator { .arg(version) .invoke_async(&mut self.conn) .await - .expect("Redis latest version update failed.") + .context("Redis latest version update failed.")? { 2 => { tracing::error!(version=version, "Redis latest version update failed. The version is beyond the next expected version."); - panic!("version is not right."); + Err(anyhow::anyhow!("Version is not right.")) }, _ => Ok(()), } @@ -308,7 +308,7 @@ mod tests { let mut cache_operator: CacheOperator = CacheOperator::new(mock_connection); - assert!(cache_operator.cache_setup_if_needed().await); + assert!(cache_operator.cache_setup_if_needed().await.unwrap()); } #[tokio::test] @@ -324,7 +324,7 @@ mod tests { let mut cache_operator: CacheOperator = CacheOperator::new(mock_connection); - assert!(!cache_operator.cache_setup_if_needed().await); + assert!(!cache_operator.cache_setup_if_needed().await.unwrap()); } // Cache coverage status tests. #[tokio::test] diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/lib.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/lib.rs index 2dc1516cdd0344..26e97ad17462b7 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-utils/src/lib.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/src/lib.rs @@ -5,29 +5,31 @@ pub mod cache_operator; pub mod config; pub mod constants; pub mod file_store_operator; +pub mod types; use aptos_protos::{ internal::fullnode::v1::fullnode_data_client::FullnodeDataClient, transaction::v1::Transaction, util::timestamp::Timestamp, }; use prost::Message; +use url::Url; pub type GrpcClientType = FullnodeDataClient; /// Create a gRPC client with exponential backoff. -pub async fn create_grpc_client(address: String) -> GrpcClientType { +pub async fn create_grpc_client(address: Url) -> GrpcClientType { backoff::future::retry(backoff::ExponentialBackoff::default(), || async { - match FullnodeDataClient::connect(address.clone()).await { + match FullnodeDataClient::connect(address.to_string()).await { Ok(client) => { tracing::info!( - address = address.clone(), + address = address.to_string(), "[Indexer Cache] Connected to indexer gRPC server." ); Ok(client) }, Err(e) => { tracing::error!( - address = address.clone(), + address = address.to_string(), "[Indexer Cache] Failed to connect to indexer gRPC server: {}", e ); diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/types.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/types.rs new file mode 100644 index 00000000000000..6cf24da14754d7 --- /dev/null +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/src/types.rs @@ -0,0 +1,62 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use serde::{Deserialize, Serialize}; +use std::{ + fmt::{Display, Formatter}, + ops::Deref, + str::FromStr, +}; +use url::Url; + +/// A URL that only allows the redis:// scheme. +#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +pub struct RedisUrl(pub Url); + +impl FromStr for RedisUrl { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let url = Url::parse(s)?; + if url.scheme() != "redis" { + return Err(anyhow::anyhow!("Invalid scheme: {}", url.scheme())); + } + Ok(RedisUrl(url)) + } +} + +impl<'de> Deserialize<'de> for RedisUrl { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let url = Url::deserialize(deserializer)?; + if url.scheme() != "redis" { + return Err(serde::de::Error::custom(format!( + "Invalid scheme: {}", + url.scheme() + ))); + } + Ok(Self(url)) + } +} + +impl Deref for RedisUrl { + type Target = Url; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl From for Url { + fn from(redis_url: RedisUrl) -> Self { + redis_url.0 + } +} + +impl Display for RedisUrl { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +}