Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry pick 01f3324b49313d2e849a651ef065b2fb7dcd8486 onto aptos-indexer-grpc-v1.5 #13606

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use crate::service::RawDataServerWrapper;
use anyhow::{bail, Result};
use aptos_indexer_grpc_server_framework::RunnableConfig;
use aptos_indexer_grpc_utils::{
compression_util::StorageFormat, config::IndexerGrpcFileStoreConfig, types::RedisUrl,
compression_util::StorageFormat, config::IndexerGrpcFileStoreConfig,
in_memory_cache::InMemoryCacheConfig, types::RedisUrl,
};
use aptos_protos::{
indexer::v1::FILE_DESCRIPTOR_SET as INDEXER_V1_FILE_DESCRIPTOR_SET,
Expand Down Expand Up @@ -66,6 +67,8 @@ pub struct IndexerGrpcDataServiceConfig {
/// Support compressed cache data.
#[serde(default = "IndexerGrpcDataServiceConfig::default_enable_cache_compression")]
pub enable_cache_compression: bool,
#[serde(default)]
pub in_memory_cache_config: InMemoryCacheConfig,
/// Sender addresses to ignore. Transactions from these addresses will not be indexed.
#[serde(default = "IndexerGrpcDataServiceConfig::default_sender_addresses_to_ignore")]
pub sender_addresses_to_ignore: Vec<String>,
Expand All @@ -80,6 +83,7 @@ impl IndexerGrpcDataServiceConfig {
file_store_config: IndexerGrpcFileStoreConfig,
redis_read_replica_address: RedisUrl,
enable_cache_compression: bool,
in_memory_cache_config: InMemoryCacheConfig,
sender_addresses_to_ignore: Vec<String>,
) -> Self {
Self {
Expand All @@ -92,6 +96,7 @@ impl IndexerGrpcDataServiceConfig {
file_store_config,
redis_read_replica_address,
enable_cache_compression,
in_memory_cache_config,
sender_addresses_to_ignore,
}
}
Expand All @@ -117,6 +122,7 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig {
{
bail!("At least one of data_service_grpc_non_tls_config and data_service_grpc_tls_config must be set");
}
self.in_memory_cache_config.validate()?;
Ok(())
}

Expand Down Expand Up @@ -146,6 +152,7 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig {
// InMemoryCache.
let in_memory_cache =
aptos_indexer_grpc_utils::in_memory_cache::InMemoryCache::new_with_redis_connection(
self.in_memory_cache_config.clone(),
redis_conn,
cache_storage_format,
)
Expand Down
77 changes: 68 additions & 9 deletions ecosystem/indexer-grpc/indexer-grpc-utils/src/in_memory_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,74 @@ use dashmap::DashMap;
use itertools::Itertools;
use prost::Message;
use redis::AsyncCommands;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::RwLock;

// Internal lookup retry interval for in-memory cache.
const IN_MEMORY_CACHE_LOOKUP_RETRY_INTERVAL_MS: u64 = 10;
const IN_MEMORY_CACHE_GC_INTERVAL_MS: u64 = 100;
// Max cache size in bytes: 3 GB.
const IN_MEMORY_CACHE_TARGET_MAX_CAPACITY_IN_BYTES: u64 = 3_000_000_000;
// Eviction cache size in bytes: 3.5 GB. Evict the map to 3 GB.
const IN_MEMORY_CACHE_EVICTION_TRIGGER_SIZE_IN_BYTES: u64 = 3_500_000_000;
// Max cache entry TTL: 30 seconds.
// const MAX_IN_MEMORY_CACHE_ENTRY_TTL: u64 = 30;
// Warm-up cache entries. Pre-fetch the cache entries to warm up the cache.
const WARM_UP_CACHE_ENTRIES: u64 = 20_000;
const MAX_REDIS_FETCH_BATCH_SIZE: usize = 500;
pub const WARM_UP_CACHE_ENTRIES: u64 = 20_000;
pub const MAX_REDIS_FETCH_BATCH_SIZE: usize = 500;

/// Configuration for when we want to explicitly declare how large the cache should be.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
#[serde(default)]
pub struct InMemoryCacheSizeConfig {
/// The maximum size of the cache in bytes.
cache_target_size_bytes: u64,
/// The maximum size of the cache in bytes before eviction is triggered, at which
/// point we reduce the size of the cache back to `cache_target_size_bytes`.
cache_eviction_trigger_size_bytes: u64,
}

impl Default for InMemoryCacheSizeConfig {
fn default() -> Self {
Self {
// 3 GB.
cache_target_size_bytes: 3_000_000_000,
// 3.5 GB.
cache_eviction_trigger_size_bytes: 3_500_000_000,
}
}
}

impl InMemoryCacheSizeConfig {
pub fn validate(&self) -> anyhow::Result<()> {
if self.cache_target_size_bytes == 0 {
return Err(anyhow::anyhow!("Cache target size must be greater than 0"));
}
if self.cache_eviction_trigger_size_bytes == 0 {
return Err(anyhow::anyhow!(
"Cache eviction trigger size must be greater than 0"
));
}
if self.cache_eviction_trigger_size_bytes < self.cache_target_size_bytes {
return Err(anyhow::anyhow!(
"Cache eviction trigger size must be greater than cache target size"
));
}
Ok(())
}
}

/// Configuration for the in memory cache.
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
#[serde(default)]
pub struct InMemoryCacheConfig {
size_config: InMemoryCacheSizeConfig,
}

impl InMemoryCacheConfig {
pub fn validate(&self) -> anyhow::Result<()> {
self.size_config.validate()
}
}

#[derive(Debug, Clone, Copy)]
struct CacheMetadata {
Expand All @@ -41,6 +94,7 @@ pub struct InMemoryCache {

impl InMemoryCache {
pub async fn new_with_redis_connection<C>(
cache_config: InMemoryCacheConfig,
conn: C,
storage_format: StorageFormat,
) -> anyhow::Result<Self>
Expand Down Expand Up @@ -68,6 +122,7 @@ impl InMemoryCache {
cancellation_token.clone(),
);
spawn_cleanup_task(
cache_config.size_config.clone(),
cache.clone(),
cache_metadata.clone(),
cancellation_token.clone(),
Expand All @@ -80,7 +135,7 @@ impl InMemoryCache {
})
}

async fn latest_version(&self) -> u64 {
pub async fn latest_version(&self) -> u64 {
self.cache_metadata.read().await.latest_version
}

Expand Down Expand Up @@ -228,6 +283,7 @@ fn spawn_update_task<C>(
}

fn spawn_cleanup_task(
cache_size_config: InMemoryCacheSizeConfig,
cache: Arc<DashMap<u64, Arc<Transaction>>>,
cache_metadata: Arc<RwLock<CacheMetadata>>,
cancellation_token: tokio_util::sync::CancellationToken,
Expand All @@ -241,7 +297,7 @@ fn spawn_cleanup_task(
let mut current_cache_metadata = { *cache_metadata.read().await };
let should_evict = current_cache_metadata
.total_size_in_bytes
.saturating_sub(IN_MEMORY_CACHE_EVICTION_TRIGGER_SIZE_IN_BYTES)
.saturating_sub(cache_size_config.cache_eviction_trigger_size_bytes)
> 0;
if !should_evict {
tokio::time::sleep(std::time::Duration::from_millis(
Expand All @@ -253,7 +309,7 @@ fn spawn_cleanup_task(
let mut actual_bytes_removed = 0;
let mut bytes_to_remove = current_cache_metadata
.total_size_in_bytes
.saturating_sub(IN_MEMORY_CACHE_TARGET_MAX_CAPACITY_IN_BYTES);
.saturating_sub(cache_size_config.cache_target_size_bytes);
while bytes_to_remove > 0 {
let key_to_remove = current_cache_metadata.first_version;
let (_k, v) = cache
Expand Down Expand Up @@ -374,6 +430,7 @@ mod tests {
Ok(0),
)]);
let in_memory_cache = InMemoryCache::new_with_redis_connection(
InMemoryCacheConfig::default(),
mock_connection.clone(),
StorageFormat::Base64UncompressedProto,
)
Expand Down Expand Up @@ -401,6 +458,7 @@ mod tests {
),
]);
let in_memory_cache = InMemoryCache::new_with_redis_connection(
InMemoryCacheConfig::default(),
mock_connection.clone(),
StorageFormat::Base64UncompressedProto,
)
Expand Down Expand Up @@ -445,6 +503,7 @@ mod tests {
MockCmd::new(redis::cmd("GET").arg("latest_version"), Ok(2)),
]);
let in_memory_cache = InMemoryCache::new_with_redis_connection(
InMemoryCacheConfig::default(),
mock_connection.clone(),
StorageFormat::Base64UncompressedProto,
)
Expand Down
Loading