diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs index 71460c361f1fa..6824343676579 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs @@ -5,7 +5,8 @@ use crate::service::RawDataServerWrapper; use anyhow::{bail, Result}; use aptos_indexer_grpc_server_framework::RunnableConfig; use aptos_indexer_grpc_utils::{ - compression_util::StorageFormat, config::IndexerGrpcFileStoreConfig, types::RedisUrl, + compression_util::StorageFormat, config::IndexerGrpcFileStoreConfig, + in_memory_cache::InMemoryCacheConfig, types::RedisUrl, }; use aptos_protos::{ indexer::v1::FILE_DESCRIPTOR_SET as INDEXER_V1_FILE_DESCRIPTOR_SET, @@ -66,6 +67,8 @@ pub struct IndexerGrpcDataServiceConfig { /// Support compressed cache data. #[serde(default = "IndexerGrpcDataServiceConfig::default_enable_cache_compression")] pub enable_cache_compression: bool, + #[serde(default)] + pub in_memory_cache_config: InMemoryCacheConfig, /// Sender addresses to ignore. Transactions from these addresses will not be indexed. #[serde(default = "IndexerGrpcDataServiceConfig::default_sender_addresses_to_ignore")] pub sender_addresses_to_ignore: Vec, @@ -80,6 +83,7 @@ impl IndexerGrpcDataServiceConfig { file_store_config: IndexerGrpcFileStoreConfig, redis_read_replica_address: RedisUrl, enable_cache_compression: bool, + in_memory_cache_config: InMemoryCacheConfig, sender_addresses_to_ignore: Vec, ) -> Self { Self { @@ -92,6 +96,7 @@ impl IndexerGrpcDataServiceConfig { file_store_config, redis_read_replica_address, enable_cache_compression, + in_memory_cache_config, sender_addresses_to_ignore, } } @@ -117,6 +122,7 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig { { bail!("At least one of data_service_grpc_non_tls_config and data_service_grpc_tls_config must be set"); } + self.in_memory_cache_config.validate()?; Ok(()) } @@ -146,6 +152,7 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig { // InMemoryCache. let in_memory_cache = aptos_indexer_grpc_utils::in_memory_cache::InMemoryCache::new_with_redis_connection( + self.in_memory_cache_config.clone(), redis_conn, cache_storage_format, ) diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/in_memory_cache.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/in_memory_cache.rs index e3db9bd822b82..6a19e0c23bbb6 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-utils/src/in_memory_cache.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/src/in_memory_cache.rs @@ -8,21 +8,74 @@ use dashmap::DashMap; use itertools::Itertools; use prost::Message; use redis::AsyncCommands; +use serde::{Deserialize, Serialize}; use std::sync::Arc; use tokio::sync::RwLock; // Internal lookup retry interval for in-memory cache. const IN_MEMORY_CACHE_LOOKUP_RETRY_INTERVAL_MS: u64 = 10; const IN_MEMORY_CACHE_GC_INTERVAL_MS: u64 = 100; -// Max cache size in bytes: 3 GB. -const IN_MEMORY_CACHE_TARGET_MAX_CAPACITY_IN_BYTES: u64 = 3_000_000_000; -// Eviction cache size in bytes: 3.5 GB. Evict the map to 3 GB. -const IN_MEMORY_CACHE_EVICTION_TRIGGER_SIZE_IN_BYTES: u64 = 3_500_000_000; // Max cache entry TTL: 30 seconds. // const MAX_IN_MEMORY_CACHE_ENTRY_TTL: u64 = 30; // Warm-up cache entries. Pre-fetch the cache entries to warm up the cache. -const WARM_UP_CACHE_ENTRIES: u64 = 20_000; -const MAX_REDIS_FETCH_BATCH_SIZE: usize = 500; +pub const WARM_UP_CACHE_ENTRIES: u64 = 20_000; +pub const MAX_REDIS_FETCH_BATCH_SIZE: usize = 500; + +/// Configuration for when we want to explicitly declare how large the cache should be. +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +#[serde(default)] +pub struct InMemoryCacheSizeConfig { + /// The maximum size of the cache in bytes. + cache_target_size_bytes: u64, + /// The maximum size of the cache in bytes before eviction is triggered, at which + /// point we reduce the size of the cache back to `cache_target_size_bytes`. + cache_eviction_trigger_size_bytes: u64, +} + +impl Default for InMemoryCacheSizeConfig { + fn default() -> Self { + Self { + // 3 GB. + cache_target_size_bytes: 3_000_000_000, + // 3.5 GB. + cache_eviction_trigger_size_bytes: 3_500_000_000, + } + } +} + +impl InMemoryCacheSizeConfig { + pub fn validate(&self) -> anyhow::Result<()> { + if self.cache_target_size_bytes == 0 { + return Err(anyhow::anyhow!("Cache target size must be greater than 0")); + } + if self.cache_eviction_trigger_size_bytes == 0 { + return Err(anyhow::anyhow!( + "Cache eviction trigger size must be greater than 0" + )); + } + if self.cache_eviction_trigger_size_bytes < self.cache_target_size_bytes { + return Err(anyhow::anyhow!( + "Cache eviction trigger size must be greater than cache target size" + )); + } + Ok(()) + } +} + +/// Configuration for the in memory cache. +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +#[serde(default)] +pub struct InMemoryCacheConfig { + size_config: InMemoryCacheSizeConfig, +} + +impl InMemoryCacheConfig { + pub fn validate(&self) -> anyhow::Result<()> { + self.size_config.validate() + } +} #[derive(Debug, Clone, Copy)] struct CacheMetadata { @@ -41,6 +94,7 @@ pub struct InMemoryCache { impl InMemoryCache { pub async fn new_with_redis_connection( + cache_config: InMemoryCacheConfig, conn: C, storage_format: StorageFormat, ) -> anyhow::Result @@ -68,6 +122,7 @@ impl InMemoryCache { cancellation_token.clone(), ); spawn_cleanup_task( + cache_config.size_config.clone(), cache.clone(), cache_metadata.clone(), cancellation_token.clone(), @@ -80,7 +135,7 @@ impl InMemoryCache { }) } - async fn latest_version(&self) -> u64 { + pub async fn latest_version(&self) -> u64 { self.cache_metadata.read().await.latest_version } @@ -228,6 +283,7 @@ fn spawn_update_task( } fn spawn_cleanup_task( + cache_size_config: InMemoryCacheSizeConfig, cache: Arc>>, cache_metadata: Arc>, cancellation_token: tokio_util::sync::CancellationToken, @@ -241,7 +297,7 @@ fn spawn_cleanup_task( let mut current_cache_metadata = { *cache_metadata.read().await }; let should_evict = current_cache_metadata .total_size_in_bytes - .saturating_sub(IN_MEMORY_CACHE_EVICTION_TRIGGER_SIZE_IN_BYTES) + .saturating_sub(cache_size_config.cache_eviction_trigger_size_bytes) > 0; if !should_evict { tokio::time::sleep(std::time::Duration::from_millis( @@ -253,7 +309,7 @@ fn spawn_cleanup_task( let mut actual_bytes_removed = 0; let mut bytes_to_remove = current_cache_metadata .total_size_in_bytes - .saturating_sub(IN_MEMORY_CACHE_TARGET_MAX_CAPACITY_IN_BYTES); + .saturating_sub(cache_size_config.cache_target_size_bytes); while bytes_to_remove > 0 { let key_to_remove = current_cache_metadata.first_version; let (_k, v) = cache @@ -374,6 +430,7 @@ mod tests { Ok(0), )]); let in_memory_cache = InMemoryCache::new_with_redis_connection( + InMemoryCacheConfig::default(), mock_connection.clone(), StorageFormat::Base64UncompressedProto, ) @@ -401,6 +458,7 @@ mod tests { ), ]); let in_memory_cache = InMemoryCache::new_with_redis_connection( + InMemoryCacheConfig::default(), mock_connection.clone(), StorageFormat::Base64UncompressedProto, ) @@ -445,6 +503,7 @@ mod tests { MockCmd::new(redis::cmd("GET").arg("latest_version"), Ok(2)), ]); let in_memory_cache = InMemoryCache::new_with_redis_connection( + InMemoryCacheConfig::default(), mock_connection.clone(), StorageFormat::Base64UncompressedProto, )