Skip to content

Commit

Permalink
[data service] Add support for configuring in memory cache size (#13570)
Browse files Browse the repository at this point in the history
  • Loading branch information
banool committed Jun 7, 2024
1 parent a9fd65c commit e66f866
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use crate::service::RawDataServerWrapper;
use anyhow::{bail, Result};
use aptos_indexer_grpc_server_framework::RunnableConfig;
use aptos_indexer_grpc_utils::{
compression_util::StorageFormat, config::IndexerGrpcFileStoreConfig, types::RedisUrl,
compression_util::StorageFormat, config::IndexerGrpcFileStoreConfig,
in_memory_cache::InMemoryCacheConfig, types::RedisUrl,
};
use aptos_protos::{
indexer::v1::FILE_DESCRIPTOR_SET as INDEXER_V1_FILE_DESCRIPTOR_SET,
Expand Down Expand Up @@ -66,6 +67,8 @@ pub struct IndexerGrpcDataServiceConfig {
/// Support compressed cache data.
#[serde(default = "IndexerGrpcDataServiceConfig::default_enable_cache_compression")]
pub enable_cache_compression: bool,
#[serde(default)]
pub in_memory_cache_config: InMemoryCacheConfig,
/// Sender addresses to ignore. Transactions from these addresses will not be indexed.
#[serde(default = "IndexerGrpcDataServiceConfig::default_sender_addresses_to_ignore")]
pub sender_addresses_to_ignore: Vec<String>,
Expand All @@ -80,6 +83,7 @@ impl IndexerGrpcDataServiceConfig {
file_store_config: IndexerGrpcFileStoreConfig,
redis_read_replica_address: RedisUrl,
enable_cache_compression: bool,
in_memory_cache_config: InMemoryCacheConfig,
sender_addresses_to_ignore: Vec<String>,
) -> Self {
Self {
Expand All @@ -92,6 +96,7 @@ impl IndexerGrpcDataServiceConfig {
file_store_config,
redis_read_replica_address,
enable_cache_compression,
in_memory_cache_config,
sender_addresses_to_ignore,
}
}
Expand All @@ -117,6 +122,7 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig {
{
bail!("At least one of data_service_grpc_non_tls_config and data_service_grpc_tls_config must be set");
}
self.in_memory_cache_config.validate()?;
Ok(())
}

Expand Down Expand Up @@ -146,6 +152,7 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig {
// InMemoryCache.
let in_memory_cache =
aptos_indexer_grpc_utils::in_memory_cache::InMemoryCache::new_with_redis_connection(
self.in_memory_cache_config.clone(),
redis_conn,
cache_storage_format,
)
Expand Down
77 changes: 68 additions & 9 deletions ecosystem/indexer-grpc/indexer-grpc-utils/src/in_memory_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,74 @@ use dashmap::DashMap;
use itertools::Itertools;
use prost::Message;
use redis::AsyncCommands;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::RwLock;

// Internal lookup retry interval for in-memory cache.
const IN_MEMORY_CACHE_LOOKUP_RETRY_INTERVAL_MS: u64 = 10;
const IN_MEMORY_CACHE_GC_INTERVAL_MS: u64 = 100;
// Max cache size in bytes: 3 GB.
const IN_MEMORY_CACHE_TARGET_MAX_CAPACITY_IN_BYTES: u64 = 3_000_000_000;
// Eviction cache size in bytes: 3.5 GB. Evict the map to 3 GB.
const IN_MEMORY_CACHE_EVICTION_TRIGGER_SIZE_IN_BYTES: u64 = 3_500_000_000;
// Max cache entry TTL: 30 seconds.
// const MAX_IN_MEMORY_CACHE_ENTRY_TTL: u64 = 30;
// Warm-up cache entries. Pre-fetch the cache entries to warm up the cache.
const WARM_UP_CACHE_ENTRIES: u64 = 20_000;
const MAX_REDIS_FETCH_BATCH_SIZE: usize = 500;
pub const WARM_UP_CACHE_ENTRIES: u64 = 20_000;
pub const MAX_REDIS_FETCH_BATCH_SIZE: usize = 500;

/// Configuration for when we want to explicitly declare how large the cache should be.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
#[serde(default)]
pub struct InMemoryCacheSizeConfig {
/// The maximum size of the cache in bytes.
cache_target_size_bytes: u64,
/// The maximum size of the cache in bytes before eviction is triggered, at which
/// point we reduce the size of the cache back to `cache_target_size_bytes`.
cache_eviction_trigger_size_bytes: u64,
}

impl Default for InMemoryCacheSizeConfig {
fn default() -> Self {
Self {
// 3 GB.
cache_target_size_bytes: 3_000_000_000,
// 3.5 GB.
cache_eviction_trigger_size_bytes: 3_500_000_000,
}
}
}

impl InMemoryCacheSizeConfig {
pub fn validate(&self) -> anyhow::Result<()> {
if self.cache_target_size_bytes == 0 {
return Err(anyhow::anyhow!("Cache target size must be greater than 0"));
}
if self.cache_eviction_trigger_size_bytes == 0 {
return Err(anyhow::anyhow!(
"Cache eviction trigger size must be greater than 0"
));
}
if self.cache_eviction_trigger_size_bytes < self.cache_target_size_bytes {
return Err(anyhow::anyhow!(
"Cache eviction trigger size must be greater than cache target size"
));
}
Ok(())
}
}

/// Configuration for the in memory cache.
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
#[serde(default)]
pub struct InMemoryCacheConfig {
size_config: InMemoryCacheSizeConfig,
}

impl InMemoryCacheConfig {
pub fn validate(&self) -> anyhow::Result<()> {
self.size_config.validate()
}
}

#[derive(Debug, Clone, Copy)]
struct CacheMetadata {
Expand All @@ -41,6 +94,7 @@ pub struct InMemoryCache {

impl InMemoryCache {
pub async fn new_with_redis_connection<C>(
cache_config: InMemoryCacheConfig,
conn: C,
storage_format: StorageFormat,
) -> anyhow::Result<Self>
Expand Down Expand Up @@ -68,6 +122,7 @@ impl InMemoryCache {
cancellation_token.clone(),
);
spawn_cleanup_task(
cache_config.size_config.clone(),
cache.clone(),
cache_metadata.clone(),
cancellation_token.clone(),
Expand All @@ -80,7 +135,7 @@ impl InMemoryCache {
})
}

async fn latest_version(&self) -> u64 {
pub async fn latest_version(&self) -> u64 {
self.cache_metadata.read().await.latest_version
}

Expand Down Expand Up @@ -228,6 +283,7 @@ fn spawn_update_task<C>(
}

fn spawn_cleanup_task(
cache_size_config: InMemoryCacheSizeConfig,
cache: Arc<DashMap<u64, Arc<Transaction>>>,
cache_metadata: Arc<RwLock<CacheMetadata>>,
cancellation_token: tokio_util::sync::CancellationToken,
Expand All @@ -241,7 +297,7 @@ fn spawn_cleanup_task(
let mut current_cache_metadata = { *cache_metadata.read().await };
let should_evict = current_cache_metadata
.total_size_in_bytes
.saturating_sub(IN_MEMORY_CACHE_EVICTION_TRIGGER_SIZE_IN_BYTES)
.saturating_sub(cache_size_config.cache_eviction_trigger_size_bytes)
> 0;
if !should_evict {
tokio::time::sleep(std::time::Duration::from_millis(
Expand All @@ -253,7 +309,7 @@ fn spawn_cleanup_task(
let mut actual_bytes_removed = 0;
let mut bytes_to_remove = current_cache_metadata
.total_size_in_bytes
.saturating_sub(IN_MEMORY_CACHE_TARGET_MAX_CAPACITY_IN_BYTES);
.saturating_sub(cache_size_config.cache_target_size_bytes);
while bytes_to_remove > 0 {
let key_to_remove = current_cache_metadata.first_version;
let (_k, v) = cache
Expand Down Expand Up @@ -374,6 +430,7 @@ mod tests {
Ok(0),
)]);
let in_memory_cache = InMemoryCache::new_with_redis_connection(
InMemoryCacheConfig::default(),
mock_connection.clone(),
StorageFormat::Base64UncompressedProto,
)
Expand Down Expand Up @@ -401,6 +458,7 @@ mod tests {
),
]);
let in_memory_cache = InMemoryCache::new_with_redis_connection(
InMemoryCacheConfig::default(),
mock_connection.clone(),
StorageFormat::Base64UncompressedProto,
)
Expand Down Expand Up @@ -445,6 +503,7 @@ mod tests {
MockCmd::new(redis::cmd("GET").arg("latest_version"), Ok(2)),
]);
let in_memory_cache = InMemoryCache::new_with_redis_connection(
InMemoryCacheConfig::default(),
mock_connection.clone(),
StorageFormat::Base64UncompressedProto,
)
Expand Down

0 comments on commit e66f866

Please sign in to comment.