From 2549bcdea36f0a792cfc383e0ecff7b1694315f9 Mon Sep 17 00:00:00 2001 From: Ankur Dubey Date: Wed, 17 Jul 2024 17:30:00 +0400 Subject: [PATCH] Fix/redis batch import (#42) * fix: import model keys from redis cache in batches * fix: tests --- crates/routing-engine/src/indexer.rs | 5 ++++- crates/routing-engine/src/routing_engine.rs | 9 +++++++-- crates/routing-engine/src/settlement_engine.rs | 5 ++++- .../routing-engine/src/token_price/coingecko.rs | 7 +++++-- crates/storage/src/lib.rs | 5 ++++- crates/storage/src/redis_client.rs | 15 ++++++++++++--- 6 files changed, 36 insertions(+), 10 deletions(-) diff --git a/crates/routing-engine/src/indexer.rs b/crates/routing-engine/src/indexer.rs index 114565c..3ab32fe 100644 --- a/crates/routing-engine/src/indexer.rs +++ b/crates/routing-engine/src/indexer.rs @@ -352,7 +352,10 @@ mod tests { todo!() } - async fn get_all_key_values(&self) -> Result, RedisClientError> { + async fn get_all_key_values( + &self, + _: Option, + ) -> Result, RedisClientError> { todo!() } } diff --git a/crates/routing-engine/src/routing_engine.rs b/crates/routing-engine/src/routing_engine.rs index 6decb84..663546c 100644 --- a/crates/routing-engine/src/routing_engine.rs +++ b/crates/routing-engine/src/routing_engine.rs @@ -19,6 +19,8 @@ use crate::{ use crate::token_price::TokenPriceProvider; use crate::token_price::utils::{Errors, get_token_price}; +const FETCH_REDIS_KEYS_BATCH_SIZE: usize = 50; + /// (from_chain, to_chain, from_token, to_token) #[derive(Debug)] struct PathQuery(u32, u32, String, String); @@ -87,7 +89,7 @@ impl RoutingEngine { /// Refresh the cache from Redis pub async fn refresh_cache(&self) { - match self.redis_client.get_all_key_values().await { + match self.redis_client.get_all_key_values(Some(FETCH_REDIS_KEYS_BATCH_SIZE)).await { Ok(kv_pairs) => { info!("Refreshing cache from Redis."); let mut cache = self.cache.write().await; @@ -418,7 +420,10 @@ mod tests { unimplemented!() } - async fn get_all_key_values(&self) -> Result, RedisClientError> { + async fn get_all_key_values( + &self, + _: Option, + ) -> Result, RedisClientError> { unimplemented!() } } diff --git a/crates/routing-engine/src/settlement_engine.rs b/crates/routing-engine/src/settlement_engine.rs index 02a25fb..a94db56 100644 --- a/crates/routing-engine/src/settlement_engine.rs +++ b/crates/routing-engine/src/settlement_engine.rs @@ -488,7 +488,10 @@ mod tests { unimplemented!() } - async fn get_all_key_values(&self) -> Result, RedisClientError> { + async fn get_all_key_values( + &self, + _: Option, + ) -> Result, RedisClientError> { unimplemented!() } } diff --git a/crates/routing-engine/src/token_price/coingecko.rs b/crates/routing-engine/src/token_price/coingecko.rs index 321f127..461198b 100644 --- a/crates/routing-engine/src/token_price/coingecko.rs +++ b/crates/routing-engine/src/token_price/coingecko.rs @@ -144,10 +144,10 @@ mod tests { use async_trait::async_trait; use derive_more::Display; + use serial_test::serial; use thiserror::Error; use config::{Config, get_sample_config}; - use serial_test::serial; use storage::{KeyValueStore, RedisClientError}; use crate::CoingeckoClient; @@ -192,7 +192,10 @@ mod tests { unimplemented!() } - async fn get_all_key_values(&self) -> Result, RedisClientError> { + async fn get_all_key_values( + &self, + _: Option, + ) -> Result, RedisClientError> { unimplemented!() } } diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 0aea2b7..da6c74d 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -27,7 +27,10 @@ pub trait KeyValueStore: Debug + Send + Sync { async fn get_all_keys(&self) -> Result, RedisClientError>; - async fn get_all_key_values(&self) -> Result, RedisClientError>; + async fn get_all_key_values( + &self, + batch_size: Option, + ) -> Result, RedisClientError>; } #[async_trait] diff --git a/crates/storage/src/redis_client.rs b/crates/storage/src/redis_client.rs index 4b77c48..9b30d08 100644 --- a/crates/storage/src/redis_client.rs +++ b/crates/storage/src/redis_client.rs @@ -58,10 +58,19 @@ impl KeyValueStore for RedisClient { Ok(keys) } - async fn get_all_key_values(&self) -> Result, RedisClientError> { + async fn get_all_key_values( + &self, + batch_size: Option, + ) -> Result, RedisClientError> { info!("Fetching all key-value pairs"); let keys = self.get_all_keys().await?; - let values: Vec = self.connection.clone().mget(&keys).await?; + + let batch_size = batch_size.unwrap_or(keys.len()); + let mut values = Vec::new(); + for batch in keys.chunks(batch_size) { + values.extend(self.connection.clone().mget::<'_, _, Vec<_>>(batch).await?.into_iter()); + } + let kv_pairs = keys.into_iter().zip(values.into_iter()).collect(); Ok(kv_pairs) } @@ -167,7 +176,7 @@ mod tests { .unwrap(); // Fetch all key-values - let key_values = client.get_all_key_values().await.unwrap(); + let key_values = client.get_all_key_values(None).await.unwrap(); assert_eq!(key_values.get("key1").unwrap(), "value1"); assert_eq!(key_values.get("key2").unwrap(), "value2");