From ba6bdab347a036bea8ef9f90c7d3e77a739679d7 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Mon, 14 Oct 2024 21:20:26 +0530 Subject: [PATCH 01/27] chore: changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 35b21a83e9..8184c14bd6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - [2334](https://github.com/FuelLabs/fuel-core/pull/2334): Prepare the GraphQL service for the switching to `async` methods. +### Fixed + +- [2352](https://github.com/FuelLabs/fuel-core/pull/2352): Fetches transactions during sync phase from any node that can provide it instead of just 1. + ## [Version 0.39.0] ### Added From 5a51e0c009a2ed6c38e1ad645338d63c2b53c5d3 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Tue, 15 Oct 2024 01:38:07 +0530 Subject: [PATCH 02/27] test: arc mutex on cachedview, reset every 10 sec --- crates/services/p2p/src/service.rs | 128 +++++++++++++++++++++++++---- 1 file changed, 112 insertions(+), 16 deletions(-) diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 3ff09da0be..e20af0d6ff 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -38,7 +38,10 @@ use fuel_core_services::{ SyncProcessor, TraceErr, }; -use fuel_core_storage::transactional::AtomicView; +use fuel_core_storage::{ + transactional::AtomicView, + Result as StorageResult, +}; use fuel_core_types::{ blockchain::SealedBlockHeader, fuel_tx::{ @@ -78,10 +81,14 @@ use libp2p::{ PeerId, }; use std::{ + collections::HashMap, fmt::Debug, future::Future, ops::Range, - sync::Arc, + sync::{ + Arc, + Mutex, + }, }; use tokio::{ sync::{ @@ -389,7 +396,10 @@ pub struct UninitializedTask { /// Orchestrates various p2p-related events between the inner `P2pService` /// and the top level `NetworkService`. -pub struct Task { +pub struct Task +where + V: AtomicView, +{ chain_id: ChainId, response_timeout: Duration, p2p_service: P, @@ -410,6 +420,11 @@ pub struct Task { heartbeat_max_time_since_last: Duration, next_check_time: Instant, heartbeat_peer_reputation_config: HeartbeatPeerReputationConfig, + // cached view + cached_view: Arc>>, + // milliseconds wait time between cache reset + cache_reset_interval: Duration, + next_cache_reset_time: Instant, } #[derive(Default, Clone)] @@ -442,7 +457,7 @@ impl UninitializedTask { } } -impl Task { +impl Task { fn peer_heartbeat_reputation_checks(&self) -> anyhow::Result<()> { for (peer_id, peer_info) in self.p2p_service.get_all_peer_info() { if peer_info.heartbeat_data.duration_since_last_heartbeat() @@ -485,12 +500,63 @@ impl Task { } } +struct CachedView { + inner: V, + sealed_block_headers: HashMap, Vec>, + transactions_on_blocks: HashMap, Vec>, +} + +impl CachedView { + fn new(inner: V) -> Self { + Self { + inner, + sealed_block_headers: HashMap::new(), + transactions_on_blocks: HashMap::new(), + } + } +} + +impl CachedView { + fn get_sealed_headers( + &mut self, + block_height_range: Range, + ) -> StorageResult>> { + if let Some(headers) = self.sealed_block_headers.get(&block_height_range) { + Ok(Some(headers.clone())) + } else { + let headers = self.inner.get_sealed_headers(block_height_range.clone())?; + if let Some(headers) = &headers { + self.sealed_block_headers + .insert(block_height_range, headers.clone()); + } + Ok(headers) + } + } + + fn get_transactions( + &mut self, + block_height_range: Range, + ) -> StorageResult>> { + if let Some(transactions) = self.transactions_on_blocks.get(&block_height_range) { + Ok(Some(transactions.clone())) + } else { + let transactions = self.inner.get_transactions(block_height_range.clone())?; + if let Some(transactions) = &transactions { + self.transactions_on_blocks + .insert(block_height_range, transactions.clone()); + } + Ok(transactions) + } + } +} + impl Task where P: TaskP2PService + 'static, V: AtomicView + 'static, V::LatestView: P2pDb, T: TxPool + 'static, + B: Send, { fn update_metrics(&self, update_fn: U) where @@ -530,8 +596,9 @@ where max_len: usize, ) -> anyhow::Result<()> where - DbLookUpFn: - Fn(&V::LatestView, Range) -> anyhow::Result> + Send + 'static, + DbLookUpFn: Fn(&mut CachedView, Range) -> anyhow::Result> + + Send + + 'static, ResponseSenderFn: Fn(Option) -> ResponseMessage + Send + 'static, TaskRequestFn: Fn(Option, InboundRequestId) -> TaskRequest + Send + 'static, R: Send + 'static, @@ -557,18 +624,22 @@ where return Ok(()); } - let view = self.view_provider.latest_view()?; - let result = self.db_heavy_task_processor.try_spawn(move || { - if instant.elapsed() > timeout { - tracing::warn!("Request timed out"); - return; - } + let view = self.cached_view.clone(); + let result = self.db_heavy_task_processor.try_spawn({ + move || { + if instant.elapsed() > timeout { + tracing::warn!("Request timed out"); + return; + } - let response = db_lookup(&view, range.clone()).ok().flatten(); + let mut view = view.lock().expect("Cached view lock poisoned"); - let _ = response_channel - .try_send(task_request(response, request_id)) - .trace_err("Failed to send response to the request channel"); + let response = db_lookup(&mut view, range.clone()).ok().flatten(); + + let _ = response_channel + .try_send(task_request(response, request_id)) + .trace_err("Failed to send response to the request channel"); + } }); if result.is_err() { @@ -781,6 +852,11 @@ where AsyncProcessor::new("P2P_TxPoolLookUpProcessor", tx_pool_threads, 32)?; let request_sender = broadcast.request_sender.clone(); + let cache_reset_interval = Duration::from_millis(10_000); + let next_cache_reset_time = Instant::now() + .checked_add(cache_reset_interval) + .expect("The cache reset interval should be small enough to do frequently"); + let task = Task { chain_id, response_timeout, @@ -800,6 +876,9 @@ where heartbeat_max_time_since_last, next_check_time, heartbeat_peer_reputation_config, + cached_view: Arc::new(Mutex::new(CachedView::new(view))), + cache_reset_interval, + next_cache_reset_time, }; Ok(task) } @@ -947,6 +1026,14 @@ where } self.next_check_time += self.heartbeat_check_interval; } + _ = tokio::time::sleep_until(self.next_cache_reset_time) => { + should_continue = true; + let mut view = self.cached_view.lock().expect("Cached view lock poisoned"); + // we could just call .clear() on the internal hashmaps + let latest_view = self.view_provider.latest_view()?; + *view = CachedView::new(latest_view); + self.next_cache_reset_time += self.cache_reset_interval; + } } tracing::debug!("P2P task is finished"); @@ -1577,6 +1664,9 @@ pub mod tests { heartbeat_max_time_since_last, next_check_time: Instant::now(), heartbeat_peer_reputation_config: heartbeat_peer_reputation_config.clone(), + cached_view: Arc::new(Mutex::new(CachedView::new(FakeDB))), + cache_reset_interval: Duration::from_secs(0), + next_cache_reset_time: Instant::now(), }; let (watch_sender, watch_receiver) = tokio::sync::watch::channel(State::Started); let mut watcher = StateWatcher::from(watch_receiver); @@ -1667,6 +1757,9 @@ pub mod tests { heartbeat_max_time_since_last, next_check_time: Instant::now(), heartbeat_peer_reputation_config: heartbeat_peer_reputation_config.clone(), + cached_view: Arc::new(Mutex::new(CachedView::new(FakeDB))), + cache_reset_interval: Duration::from_secs(0), + next_cache_reset_time: Instant::now(), }; let (watch_sender, watch_receiver) = tokio::sync::watch::channel(State::Started); let mut watcher = StateWatcher::from(watch_receiver); @@ -1729,6 +1822,9 @@ pub mod tests { heartbeat_max_time_since_last: Default::default(), next_check_time: Instant::now(), heartbeat_peer_reputation_config: Default::default(), + cached_view: Arc::new(Mutex::new(CachedView::new(FakeDB))), + cache_reset_interval: Duration::from_secs(0), + next_cache_reset_time: Instant::now(), }; let mut watcher = StateWatcher::started(); // End of initialization From 8079739342246764ddc195bbc1ef0ec03d4a1535 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Tue, 15 Oct 2024 01:40:25 +0530 Subject: [PATCH 03/27] fix: err --- crates/services/p2p/src/service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index e20af0d6ff..91efa76be5 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -1028,7 +1028,7 @@ where } _ = tokio::time::sleep_until(self.next_cache_reset_time) => { should_continue = true; - let mut view = self.cached_view.lock().expect("Cached view lock poisoned"); + let mut view = self.cached_view.lock().map_err(|e| anyhow!("Failed to lock cached view: {:?}", e))?; // we could just call .clear() on the internal hashmaps let latest_view = self.view_provider.latest_view()?; *view = CachedView::new(latest_view); From 19a32f312690c73b68ac12815d5fd9abb13c5320 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Tue, 15 Oct 2024 03:24:54 +0530 Subject: [PATCH 04/27] fix(p2p): use dashmap, remove mutex, remove instantiation with view --- Cargo.lock | 15 ++++++ crates/services/p2p/Cargo.toml | 1 + crates/services/p2p/src/service.rs | 77 ++++++++++++++---------------- 3 files changed, 53 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0df5ae8096..59ab0e4571 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2214,6 +2214,20 @@ dependencies = [ "syn 2.0.79", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.6.0" @@ -3530,6 +3544,7 @@ dependencies = [ "anyhow", "async-trait", "ctor", + "dashmap", "fuel-core-chain-config", "fuel-core-metrics", "fuel-core-p2p", diff --git a/crates/services/p2p/Cargo.toml b/crates/services/p2p/Cargo.toml index 718e3bbf0d..806e3496f1 100644 --- a/crates/services/p2p/Cargo.toml +++ b/crates/services/p2p/Cargo.toml @@ -13,6 +13,7 @@ description = "Fuel client networking" [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } +dashmap = "6.1.0" fuel-core-chain-config = { workspace = true } fuel-core-metrics = { workspace = true } # TODO make this a feature fuel-core-services = { workspace = true, features = ["sync-processor"] } diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 91efa76be5..08fd1e1d79 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -81,13 +81,11 @@ use libp2p::{ PeerId, }; use std::{ - collections::HashMap, fmt::Debug, future::Future, ops::Range, sync::{ Arc, - Mutex, }, }; use tokio::{ @@ -104,6 +102,7 @@ use tokio::{ Instant, }, }; +use dashmap::DashMap; use tracing::warn; const CHANNEL_SIZE: usize = 1024 * 10; @@ -396,10 +395,7 @@ pub struct UninitializedTask { /// Orchestrates various p2p-related events between the inner `P2pService` /// and the top level `NetworkService`. -pub struct Task -where - V: AtomicView, -{ +pub struct Task { chain_id: ChainId, response_timeout: Duration, p2p_service: P, @@ -421,7 +417,7 @@ where next_check_time: Instant, heartbeat_peer_reputation_config: HeartbeatPeerReputationConfig, // cached view - cached_view: Arc>>, + cached_view: Arc, // milliseconds wait time between cache reset cache_reset_interval: Duration, next_cache_reset_time: Instant, @@ -500,31 +496,35 @@ impl Task { } } -struct CachedView { - inner: V, - sealed_block_headers: HashMap, Vec>, - transactions_on_blocks: HashMap, Vec>, +struct CachedView { + sealed_block_headers: DashMap, Vec>, + transactions_on_blocks: DashMap, Vec>, } -impl CachedView { - fn new(inner: V) -> Self { +impl CachedView{ + fn new() -> Self { Self { - inner, - sealed_block_headers: HashMap::new(), - transactions_on_blocks: HashMap::new(), + sealed_block_headers: DashMap::new(), + transactions_on_blocks: DashMap::new(), } } + + fn clear(&self) { + self.sealed_block_headers.clear(); + self.transactions_on_blocks.clear(); + } } -impl CachedView { - fn get_sealed_headers( - &mut self, +impl CachedView { + fn get_sealed_headers( + &self, + view: &V, block_height_range: Range, - ) -> StorageResult>> { + ) -> StorageResult>> where V: P2pDb { if let Some(headers) = self.sealed_block_headers.get(&block_height_range) { Ok(Some(headers.clone())) } else { - let headers = self.inner.get_sealed_headers(block_height_range.clone())?; + let headers = view.get_sealed_headers(block_height_range.clone())?; if let Some(headers) = &headers { self.sealed_block_headers .insert(block_height_range, headers.clone()); @@ -533,14 +533,15 @@ impl CachedView { } } - fn get_transactions( - &mut self, + fn get_transactions( + &self, + view: &V, block_height_range: Range, - ) -> StorageResult>> { + ) -> StorageResult>> where V: P2pDb { if let Some(transactions) = self.transactions_on_blocks.get(&block_height_range) { Ok(Some(transactions.clone())) } else { - let transactions = self.inner.get_transactions(block_height_range.clone())?; + let transactions = view.get_transactions(block_height_range.clone())?; if let Some(transactions) = &transactions { self.transactions_on_blocks .insert(block_height_range, transactions.clone()); @@ -596,7 +597,7 @@ where max_len: usize, ) -> anyhow::Result<()> where - DbLookUpFn: Fn(&mut CachedView, Range) -> anyhow::Result> + DbLookUpFn: Fn(&V::LatestView, &Arc, Range) -> anyhow::Result> + Send + 'static, ResponseSenderFn: Fn(Option) -> ResponseMessage + Send + 'static, @@ -624,17 +625,16 @@ where return Ok(()); } - let view = self.cached_view.clone(); + let view = self.view_provider.latest_view()?; let result = self.db_heavy_task_processor.try_spawn({ + let cached_view = self.cached_view.clone(); move || { if instant.elapsed() > timeout { tracing::warn!("Request timed out"); return; } - let mut view = view.lock().expect("Cached view lock poisoned"); - - let response = db_lookup(&mut view, range.clone()).ok().flatten(); + let response = db_lookup(&view, &cached_view, range.clone()).ok().flatten(); let _ = response_channel .try_send(task_request(response, request_id)) @@ -660,7 +660,7 @@ where range, request_id, ResponseMessage::Transactions, - |view, range| view.get_transactions(range).map_err(anyhow::Error::from), + |view, cached_view, range| cached_view.get_transactions(view, range).map_err(anyhow::Error::from), |response, request_id| TaskRequest::DatabaseTransactionsLookUp { response, request_id, @@ -678,7 +678,7 @@ where range, request_id, ResponseMessage::SealedHeaders, - |view, range| view.get_sealed_headers(range).map_err(anyhow::Error::from), + |view, cached_view, range| cached_view.get_sealed_headers(view, range).map_err(anyhow::Error::from), |response, request_id| TaskRequest::DatabaseHeaderLookUp { response, request_id, @@ -876,7 +876,7 @@ where heartbeat_max_time_since_last, next_check_time, heartbeat_peer_reputation_config, - cached_view: Arc::new(Mutex::new(CachedView::new(view))), + cached_view: Arc::new(CachedView::new()), cache_reset_interval, next_cache_reset_time, }; @@ -1028,10 +1028,7 @@ where } _ = tokio::time::sleep_until(self.next_cache_reset_time) => { should_continue = true; - let mut view = self.cached_view.lock().map_err(|e| anyhow!("Failed to lock cached view: {:?}", e))?; - // we could just call .clear() on the internal hashmaps - let latest_view = self.view_provider.latest_view()?; - *view = CachedView::new(latest_view); + self.cached_view.clear(); self.next_cache_reset_time += self.cache_reset_interval; } } @@ -1664,7 +1661,7 @@ pub mod tests { heartbeat_max_time_since_last, next_check_time: Instant::now(), heartbeat_peer_reputation_config: heartbeat_peer_reputation_config.clone(), - cached_view: Arc::new(Mutex::new(CachedView::new(FakeDB))), + cached_view: Arc::new(CachedView::new()), cache_reset_interval: Duration::from_secs(0), next_cache_reset_time: Instant::now(), }; @@ -1757,7 +1754,7 @@ pub mod tests { heartbeat_max_time_since_last, next_check_time: Instant::now(), heartbeat_peer_reputation_config: heartbeat_peer_reputation_config.clone(), - cached_view: Arc::new(Mutex::new(CachedView::new(FakeDB))), + cached_view: Arc::new(CachedView::new()), cache_reset_interval: Duration::from_secs(0), next_cache_reset_time: Instant::now(), }; @@ -1822,7 +1819,7 @@ pub mod tests { heartbeat_max_time_since_last: Default::default(), next_check_time: Instant::now(), heartbeat_peer_reputation_config: Default::default(), - cached_view: Arc::new(Mutex::new(CachedView::new(FakeDB))), + cached_view: Arc::new(CachedView::new()), cache_reset_interval: Duration::from_secs(0), next_cache_reset_time: Instant::now(), }; From 805a8665886b4696c39f37dda00b04c47c765e6f Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Tue, 15 Oct 2024 03:26:55 +0530 Subject: [PATCH 05/27] fix: fmt --- crates/services/p2p/src/service.rs | 33 +++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 08fd1e1d79..f97102e08c 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -27,6 +27,7 @@ use crate::{ }, }; use anyhow::anyhow; +use dashmap::DashMap; use fuel_core_metrics::p2p_metrics::set_blocks_requested; use fuel_core_services::{ stream::BoxStream, @@ -84,9 +85,7 @@ use std::{ fmt::Debug, future::Future, ops::Range, - sync::{ - Arc, - }, + sync::Arc, }; use tokio::{ sync::{ @@ -102,7 +101,6 @@ use tokio::{ Instant, }, }; -use dashmap::DashMap; use tracing::warn; const CHANNEL_SIZE: usize = 1024 * 10; @@ -501,7 +499,7 @@ struct CachedView { transactions_on_blocks: DashMap, Vec>, } -impl CachedView{ +impl CachedView { fn new() -> Self { Self { sealed_block_headers: DashMap::new(), @@ -520,7 +518,10 @@ impl CachedView { &self, view: &V, block_height_range: Range, - ) -> StorageResult>> where V: P2pDb { + ) -> StorageResult>> + where + V: P2pDb, + { if let Some(headers) = self.sealed_block_headers.get(&block_height_range) { Ok(Some(headers.clone())) } else { @@ -537,7 +538,10 @@ impl CachedView { &self, view: &V, block_height_range: Range, - ) -> StorageResult>> where V: P2pDb { + ) -> StorageResult>> + where + V: P2pDb, + { if let Some(transactions) = self.transactions_on_blocks.get(&block_height_range) { Ok(Some(transactions.clone())) } else { @@ -634,7 +638,8 @@ where return; } - let response = db_lookup(&view, &cached_view, range.clone()).ok().flatten(); + let response = + db_lookup(&view, &cached_view, range.clone()).ok().flatten(); let _ = response_channel .try_send(task_request(response, request_id)) @@ -660,7 +665,11 @@ where range, request_id, ResponseMessage::Transactions, - |view, cached_view, range| cached_view.get_transactions(view, range).map_err(anyhow::Error::from), + |view, cached_view, range| { + cached_view + .get_transactions(view, range) + .map_err(anyhow::Error::from) + }, |response, request_id| TaskRequest::DatabaseTransactionsLookUp { response, request_id, @@ -678,7 +687,11 @@ where range, request_id, ResponseMessage::SealedHeaders, - |view, cached_view, range| cached_view.get_sealed_headers(view, range).map_err(anyhow::Error::from), + |view, cached_view, range| { + cached_view + .get_sealed_headers(view, range) + .map_err(anyhow::Error::from) + }, |response, request_id| TaskRequest::DatabaseHeaderLookUp { response, request_id, From c5db7a6c1dd29922c11a44deb1b2fc23c9f72aa2 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Tue, 15 Oct 2024 18:39:14 +0530 Subject: [PATCH 06/27] chore: add metrics to cache hits/misses --- crates/metrics/src/p2p_metrics.rs | 26 +++++++++++++++++++++++ crates/services/p2p/src/service.rs | 34 +++++++++++++++++++++++------- 2 files changed, 52 insertions(+), 8 deletions(-) diff --git a/crates/metrics/src/p2p_metrics.rs b/crates/metrics/src/p2p_metrics.rs index a139cfb6ee..b535bcccb4 100644 --- a/crates/metrics/src/p2p_metrics.rs +++ b/crates/metrics/src/p2p_metrics.rs @@ -8,16 +8,22 @@ use std::sync::OnceLock; pub struct P2PMetrics { pub unique_peers: Counter, pub blocks_requested: Gauge, + pub p2p_req_res_cache_hits: Counter, + pub p2p_req_res_cache_misses: Counter, } impl P2PMetrics { fn new() -> Self { let unique_peers = Counter::default(); let blocks_requested = Gauge::default(); + let p2p_req_res_cache_hits = Counter::default(); + let p2p_req_res_cache_misses = Counter::default(); let metrics = P2PMetrics { unique_peers, blocks_requested, + p2p_req_res_cache_hits, + p2p_req_res_cache_misses, }; let mut registry = global_registry().registry.lock(); @@ -33,6 +39,18 @@ impl P2PMetrics { metrics.blocks_requested.clone() ); + registry.register( + "P2p_Req_Res_Cache_Hits", + "A Counter which keeps track of the number of cache hits for the p2p req/res protocol", + metrics.p2p_req_res_cache_hits.clone() + ); + + registry.register( + "P2p_Req_Res_Cache_Misses", + "A Counter which keeps track of the number of cache misses for the p2p req/res protocol", + metrics.p2p_req_res_cache_misses.clone() + ); + metrics } } @@ -50,3 +68,11 @@ pub fn increment_unique_peers() { pub fn set_blocks_requested(count: usize) { p2p_metrics().blocks_requested.set(count as i64); } + +pub fn increment_p2p_req_res_cache_hits() { + p2p_metrics().p2p_req_res_cache_hits.inc(); +} + +pub fn increment_p2p_req_res_cache_misses() { + p2p_metrics().p2p_req_res_cache_misses.inc(); +} diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index f97102e08c..c9c46b02c8 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -28,7 +28,11 @@ use crate::{ }; use anyhow::anyhow; use dashmap::DashMap; -use fuel_core_metrics::p2p_metrics::set_blocks_requested; +use fuel_core_metrics::p2p_metrics::{ + increment_p2p_req_res_cache_hits, + increment_p2p_req_res_cache_misses, + set_blocks_requested, +}; use fuel_core_services::{ stream::BoxStream, AsyncProcessor, @@ -497,13 +501,15 @@ impl Task { struct CachedView { sealed_block_headers: DashMap, Vec>, transactions_on_blocks: DashMap, Vec>, + metrics: bool, } impl CachedView { - fn new() -> Self { + fn new(metrics: bool) -> Self { Self { sealed_block_headers: DashMap::new(), transactions_on_blocks: DashMap::new(), + metrics, } } @@ -511,9 +517,16 @@ impl CachedView { self.sealed_block_headers.clear(); self.transactions_on_blocks.clear(); } -} -impl CachedView { + fn update_metrics(&self, update_fn: U) + where + U: FnOnce(), + { + if self.metrics { + update_fn() + } + } + fn get_sealed_headers( &self, view: &V, @@ -523,8 +536,10 @@ impl CachedView { V: P2pDb, { if let Some(headers) = self.sealed_block_headers.get(&block_height_range) { + self.update_metrics(|| increment_p2p_req_res_cache_hits()); Ok(Some(headers.clone())) } else { + self.update_metrics(|| increment_p2p_req_res_cache_misses()); let headers = view.get_sealed_headers(block_height_range.clone())?; if let Some(headers) = &headers { self.sealed_block_headers @@ -543,8 +558,10 @@ impl CachedView { V: P2pDb, { if let Some(transactions) = self.transactions_on_blocks.get(&block_height_range) { + self.update_metrics(|| increment_p2p_req_res_cache_hits()); Ok(Some(transactions.clone())) } else { + self.update_metrics(|| increment_p2p_req_res_cache_misses()); let transactions = view.get_transactions(block_height_range.clone())?; if let Some(transactions) = &transactions { self.transactions_on_blocks @@ -833,6 +850,7 @@ where heartbeat_max_time_since_last, database_read_threads, tx_pool_threads, + metrics, .. } = config; @@ -889,7 +907,7 @@ where heartbeat_max_time_since_last, next_check_time, heartbeat_peer_reputation_config, - cached_view: Arc::new(CachedView::new()), + cached_view: Arc::new(CachedView::new(metrics)), cache_reset_interval, next_cache_reset_time, }; @@ -1674,7 +1692,7 @@ pub mod tests { heartbeat_max_time_since_last, next_check_time: Instant::now(), heartbeat_peer_reputation_config: heartbeat_peer_reputation_config.clone(), - cached_view: Arc::new(CachedView::new()), + cached_view: Arc::new(CachedView::new(false)), cache_reset_interval: Duration::from_secs(0), next_cache_reset_time: Instant::now(), }; @@ -1767,7 +1785,7 @@ pub mod tests { heartbeat_max_time_since_last, next_check_time: Instant::now(), heartbeat_peer_reputation_config: heartbeat_peer_reputation_config.clone(), - cached_view: Arc::new(CachedView::new()), + cached_view: Arc::new(CachedView::new(false)), cache_reset_interval: Duration::from_secs(0), next_cache_reset_time: Instant::now(), }; @@ -1832,7 +1850,7 @@ pub mod tests { heartbeat_max_time_since_last: Default::default(), next_check_time: Instant::now(), heartbeat_peer_reputation_config: Default::default(), - cached_view: Arc::new(CachedView::new()), + cached_view: Arc::new(CachedView::new(false)), cache_reset_interval: Duration::from_secs(0), next_cache_reset_time: Instant::now(), }; From b4763be554e6e08fef16efa08b395e65e0756cdf Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Tue, 15 Oct 2024 18:48:53 +0530 Subject: [PATCH 07/27] fix: clippy --- crates/services/p2p/src/service.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index c9c46b02c8..6fdd1b087a 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -536,10 +536,10 @@ impl CachedView { V: P2pDb, { if let Some(headers) = self.sealed_block_headers.get(&block_height_range) { - self.update_metrics(|| increment_p2p_req_res_cache_hits()); + self.update_metrics(increment_p2p_req_res_cache_hits); Ok(Some(headers.clone())) } else { - self.update_metrics(|| increment_p2p_req_res_cache_misses()); + self.update_metrics(increment_p2p_req_res_cache_misses); let headers = view.get_sealed_headers(block_height_range.clone())?; if let Some(headers) = &headers { self.sealed_block_headers @@ -558,10 +558,10 @@ impl CachedView { V: P2pDb, { if let Some(transactions) = self.transactions_on_blocks.get(&block_height_range) { - self.update_metrics(|| increment_p2p_req_res_cache_hits()); + self.update_metrics(increment_p2p_req_res_cache_hits); Ok(Some(transactions.clone())) } else { - self.update_metrics(|| increment_p2p_req_res_cache_misses()); + self.update_metrics(increment_p2p_req_res_cache_misses); let transactions = view.get_transactions(block_height_range.clone())?; if let Some(transactions) = &transactions { self.transactions_on_blocks From 5b03fa0a45a2aa13a9e6ff02471afc455a896557 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Tue, 15 Oct 2024 21:02:05 +0530 Subject: [PATCH 08/27] chore: refactor cached_view into own module --- crates/services/p2p/src/cached_view.rs | 278 +++++++++++++++++++++++++ crates/services/p2p/src/lib.rs | 2 + crates/services/p2p/src/service.rs | 87 +------- 3 files changed, 283 insertions(+), 84 deletions(-) create mode 100644 crates/services/p2p/src/cached_view.rs diff --git a/crates/services/p2p/src/cached_view.rs b/crates/services/p2p/src/cached_view.rs new file mode 100644 index 0000000000..fb6831c43b --- /dev/null +++ b/crates/services/p2p/src/cached_view.rs @@ -0,0 +1,278 @@ +use crate::ports::P2pDb; +use dashmap::DashMap; +use fuel_core_metrics::p2p_metrics::{ + increment_p2p_req_res_cache_hits, + increment_p2p_req_res_cache_misses, +}; +use fuel_core_storage::Result as StorageResult; +use fuel_core_types::{ + blockchain::SealedBlockHeader, + services::p2p::Transactions, +}; +use std::ops::Range; + +pub struct CachedView { + sealed_block_headers: DashMap, Vec>, + transactions_on_blocks: DashMap, Vec>, + metrics: bool, +} + +impl CachedView { + pub fn new(metrics: bool) -> Self { + Self { + sealed_block_headers: DashMap::new(), + transactions_on_blocks: DashMap::new(), + metrics, + } + } + + pub fn clear(&self) { + self.sealed_block_headers.clear(); + self.transactions_on_blocks.clear(); + } + + fn update_metrics(&self, update_fn: U) + where + U: FnOnce(), + { + if self.metrics { + update_fn() + } + } + + pub(crate) fn get_sealed_headers( + &self, + view: &V, + block_height_range: Range, + ) -> StorageResult>> + where + V: P2pDb, + { + if let Some(headers) = self.sealed_block_headers.get(&block_height_range) { + self.update_metrics(increment_p2p_req_res_cache_hits); + Ok(Some(headers.clone())) + } else { + self.update_metrics(increment_p2p_req_res_cache_misses); + let headers = view.get_sealed_headers(block_height_range.clone())?; + if let Some(headers) = &headers { + self.sealed_block_headers + .insert(block_height_range, headers.clone()); + } + Ok(headers) + } + } + + pub(crate) fn get_transactions( + &self, + view: &V, + block_height_range: Range, + ) -> StorageResult>> + where + V: P2pDb, + { + if let Some(transactions) = self.transactions_on_blocks.get(&block_height_range) { + self.update_metrics(increment_p2p_req_res_cache_hits); + Ok(Some(transactions.clone())) + } else { + self.update_metrics(increment_p2p_req_res_cache_misses); + let transactions = view.get_transactions(block_height_range.clone())?; + if let Some(transactions) = &transactions { + self.transactions_on_blocks + .insert(block_height_range, transactions.clone()); + } + Ok(transactions) + } + } +} + +#[allow(non_snake_case)] +#[cfg(test)] +mod tests { + use super::*; + use fuel_core_types::blockchain::consensus::Genesis; + use std::sync::Arc; + use tokio::sync::Notify; + + struct FakeDb { + sender: Arc, + values: bool, + } + + #[inline] + fn default_sealed_headers(range: Range) -> Vec { + vec![SealedBlockHeader::default(); range.len()] + } + + #[inline] + fn default_transactions(range: Range) -> Vec { + vec![Transactions::default(); range.len()] + } + + impl P2pDb for FakeDb { + fn get_sealed_headers( + &self, + range: Range, + ) -> StorageResult>> { + self.sender.notify_waiters(); + if !self.values { + return Ok(None); + } + let headers = default_sealed_headers(range); + Ok(Some(headers)) + } + + fn get_transactions( + &self, + range: Range, + ) -> StorageResult>> { + self.sender.notify_waiters(); + if !self.values { + return Ok(None); + } + let transactions = default_transactions(range); + Ok(Some(transactions)) + } + + fn get_genesis(&self) -> StorageResult { + self.sender.notify_waiters(); + Ok(Genesis::default()) + } + } + + #[tokio::test] + async fn cached_view__get_sealed_headers__cache_hit() { + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: true, + }; + let cached_view = CachedView::new(false); + + let block_height_range = 0..10; + let sealed_headers = vec![SealedBlockHeader::default()]; + cached_view + .sealed_block_headers + .insert(block_height_range.clone(), sealed_headers.clone()); + + let result = cached_view + .get_sealed_headers(&db, block_height_range.clone()) + .unwrap(); + assert_eq!(result, Some(sealed_headers)); + } + + #[tokio::test] + async fn cached_view__get_sealed_headers__cache_miss() { + // given + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: true, + }; + let cached_view = CachedView::new(false); + + // when + let notified = sender.notified(); + let block_height_range = 0..10; + let sealed_headers = default_sealed_headers(block_height_range.clone()); + let result = cached_view + .get_sealed_headers(&db, block_height_range.clone()) + .unwrap(); + + // then + notified.await; + assert_eq!(result, Some(sealed_headers)); + } + + #[tokio::test] + async fn cached_view__when_response_is_none__get_sealed_headers__cache_miss() { + // given + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: false, + }; + let cached_view = CachedView::new(false); + + // when + let notified = sender.notified(); + let block_height_range = 0..10; + let result = cached_view + .get_sealed_headers(&db, block_height_range.clone()) + .unwrap(); + + // then + notified.await; + assert!(result.is_none()); + } + + #[tokio::test] + async fn cached_view__get_transactions__cache_hit() { + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: true, + }; + let cached_view = CachedView::new(false); + + let block_height_range = 0..10; + let transactions = default_transactions(block_height_range.clone()); + cached_view + .transactions_on_blocks + .insert(block_height_range.clone(), transactions.clone()); + + let result = cached_view + .get_transactions(&db, block_height_range.clone()) + .unwrap(); + + for (expected, actual) in transactions.iter().zip(result.unwrap().iter()) { + assert_eq!(expected.0, actual.0); + } + } + + #[tokio::test] + async fn cached_view__get_transactions__cache_miss() { + // given + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: true, + }; + let cached_view = CachedView::new(false); + + // when + let notified = sender.notified(); + let block_height_range = 0..10; + let transactions = default_transactions(block_height_range.clone()); + let result = cached_view + .get_transactions(&db, block_height_range.clone()) + .unwrap(); + + // then + notified.await; + for (expected, actual) in transactions.iter().zip(result.unwrap().iter()) { + assert_eq!(expected.0, actual.0); + } + } + + #[tokio::test] + async fn cached_view__when_response_is_none__get_transactions__cache_miss() { + // given + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: false, + }; + let cached_view = CachedView::new(false); + + // when + let notified = sender.notified(); + let block_height_range = 0..10; + let result = cached_view + .get_transactions(&db, block_height_range.clone()) + .unwrap(); + + // then + notified.await; + assert!(result.is_none()); + } +} diff --git a/crates/services/p2p/src/lib.rs b/crates/services/p2p/src/lib.rs index fbd82c2545..375eeb7351 100644 --- a/crates/services/p2p/src/lib.rs +++ b/crates/services/p2p/src/lib.rs @@ -16,6 +16,8 @@ pub mod request_response; pub mod service; mod utils; +mod cached_view; + pub use gossipsub::config as gossipsub_config; pub use heartbeat::Config; diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 6fdd1b087a..89f097f9ec 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -1,4 +1,5 @@ use crate::{ + cached_view::CachedView, codecs::postcard::PostcardCodec, config::{ Config, @@ -27,12 +28,7 @@ use crate::{ }, }; use anyhow::anyhow; -use dashmap::DashMap; -use fuel_core_metrics::p2p_metrics::{ - increment_p2p_req_res_cache_hits, - increment_p2p_req_res_cache_misses, - set_blocks_requested, -}; +use fuel_core_metrics::p2p_metrics::set_blocks_requested; use fuel_core_services::{ stream::BoxStream, AsyncProcessor, @@ -43,10 +39,7 @@ use fuel_core_services::{ SyncProcessor, TraceErr, }; -use fuel_core_storage::{ - transactional::AtomicView, - Result as StorageResult, -}; +use fuel_core_storage::transactional::AtomicView; use fuel_core_types::{ blockchain::SealedBlockHeader, fuel_tx::{ @@ -498,80 +491,6 @@ impl Task { } } -struct CachedView { - sealed_block_headers: DashMap, Vec>, - transactions_on_blocks: DashMap, Vec>, - metrics: bool, -} - -impl CachedView { - fn new(metrics: bool) -> Self { - Self { - sealed_block_headers: DashMap::new(), - transactions_on_blocks: DashMap::new(), - metrics, - } - } - - fn clear(&self) { - self.sealed_block_headers.clear(); - self.transactions_on_blocks.clear(); - } - - fn update_metrics(&self, update_fn: U) - where - U: FnOnce(), - { - if self.metrics { - update_fn() - } - } - - fn get_sealed_headers( - &self, - view: &V, - block_height_range: Range, - ) -> StorageResult>> - where - V: P2pDb, - { - if let Some(headers) = self.sealed_block_headers.get(&block_height_range) { - self.update_metrics(increment_p2p_req_res_cache_hits); - Ok(Some(headers.clone())) - } else { - self.update_metrics(increment_p2p_req_res_cache_misses); - let headers = view.get_sealed_headers(block_height_range.clone())?; - if let Some(headers) = &headers { - self.sealed_block_headers - .insert(block_height_range, headers.clone()); - } - Ok(headers) - } - } - - fn get_transactions( - &self, - view: &V, - block_height_range: Range, - ) -> StorageResult>> - where - V: P2pDb, - { - if let Some(transactions) = self.transactions_on_blocks.get(&block_height_range) { - self.update_metrics(increment_p2p_req_res_cache_hits); - Ok(Some(transactions.clone())) - } else { - self.update_metrics(increment_p2p_req_res_cache_misses); - let transactions = view.get_transactions(block_height_range.clone())?; - if let Some(transactions) = &transactions { - self.transactions_on_blocks - .insert(block_height_range, transactions.clone()); - } - Ok(transactions) - } - } -} - impl Task where P: TaskP2PService + 'static, From 64222101e1ace05b7bf4c3b23f44303651f1e0bd Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Wed, 16 Oct 2024 11:39:24 +0530 Subject: [PATCH 09/27] chore: retain time based clear, but cache is now on a per block basis instead of a per-range basis --- crates/services/p2p/src/cached_view.rs | 107 +++++++++++++++++-------- 1 file changed, 74 insertions(+), 33 deletions(-) diff --git a/crates/services/p2p/src/cached_view.rs b/crates/services/p2p/src/cached_view.rs index fb6831c43b..4e65f2309b 100644 --- a/crates/services/p2p/src/cached_view.rs +++ b/crates/services/p2p/src/cached_view.rs @@ -12,8 +12,10 @@ use fuel_core_types::{ use std::ops::Range; pub struct CachedView { - sealed_block_headers: DashMap, Vec>, - transactions_on_blocks: DashMap, Vec>, + // map block height to sealed block header + sealed_block_headers: DashMap, + // map block height to transactions + transactions_on_blocks: DashMap, metrics: bool, } @@ -48,17 +50,32 @@ impl CachedView { where V: P2pDb, { - if let Some(headers) = self.sealed_block_headers.get(&block_height_range) { - self.update_metrics(increment_p2p_req_res_cache_hits); - Ok(Some(headers.clone())) + let mut headers = Vec::new(); + let mut missing_ranges = block_height_range.clone(); + for block_height in block_height_range.clone() { + if let Some(header) = self.sealed_block_headers.get(&block_height) { + self.update_metrics(increment_p2p_req_res_cache_hits); + headers.push(header.clone()); + } else { + self.update_metrics(increment_p2p_req_res_cache_misses); + // for the first block not in the cache, start a new range + missing_ranges.start = block_height; + break; + } + } + + if missing_ranges.is_empty() { + Ok(Some(headers)) } else { - self.update_metrics(increment_p2p_req_res_cache_misses); - let headers = view.get_sealed_headers(block_height_range.clone())?; - if let Some(headers) = &headers { - self.sealed_block_headers - .insert(block_height_range, headers.clone()); + let missing_headers = view.get_sealed_headers(missing_ranges.clone())?; + if let Some(missing_headers) = &missing_headers { + for header in missing_headers.iter() { + self.sealed_block_headers + .insert((*header.entity.height()).into(), header.clone()); + headers.push(header.clone()); + } } - Ok(headers) + Ok(missing_headers) } } @@ -70,17 +87,32 @@ impl CachedView { where V: P2pDb, { - if let Some(transactions) = self.transactions_on_blocks.get(&block_height_range) { - self.update_metrics(increment_p2p_req_res_cache_hits); - Ok(Some(transactions.clone())) + let mut transactions = Vec::new(); + let mut missing_ranges = block_height_range.clone(); + for block_height in block_height_range.clone() { + if let Some(cached_tx) = self.transactions_on_blocks.get(&block_height) { + self.update_metrics(increment_p2p_req_res_cache_hits); + transactions.push(cached_tx.clone()); + } else { + self.update_metrics(increment_p2p_req_res_cache_misses); + // for the first block not in the cache, start a new range + missing_ranges.start = block_height; + break; + } + } + if missing_ranges.is_empty() { + Ok(Some(transactions)) } else { - self.update_metrics(increment_p2p_req_res_cache_misses); - let transactions = view.get_transactions(block_height_range.clone())?; - if let Some(transactions) = &transactions { - self.transactions_on_blocks - .insert(block_height_range, transactions.clone()); + let missing_transactions = view.get_transactions(missing_ranges.clone())?; + if let Some(missing_transactions) = &missing_transactions { + for (block_height, transactions) in + missing_ranges.zip(missing_transactions.iter()) + { + self.transactions_on_blocks + .insert(block_height, transactions.clone()); + } } - Ok(transactions) + Ok(missing_transactions) } } } @@ -148,11 +180,15 @@ mod tests { }; let cached_view = CachedView::new(false); - let block_height_range = 0..10; - let sealed_headers = vec![SealedBlockHeader::default()]; - cached_view - .sealed_block_headers - .insert(block_height_range.clone(), sealed_headers.clone()); + let block_height_range = 0..100; + let sealed_headers = default_sealed_headers(block_height_range.clone()); + for (block_height, header) in + block_height_range.clone().zip(sealed_headers.iter()) + { + cached_view + .sealed_block_headers + .insert(block_height, header.clone()); + } let result = cached_view .get_sealed_headers(&db, block_height_range.clone()) @@ -172,7 +208,7 @@ mod tests { // when let notified = sender.notified(); - let block_height_range = 0..10; + let block_height_range = 0..100; let sealed_headers = default_sealed_headers(block_height_range.clone()); let result = cached_view .get_sealed_headers(&db, block_height_range.clone()) @@ -195,7 +231,7 @@ mod tests { // when let notified = sender.notified(); - let block_height_range = 0..10; + let block_height_range = 0..100; let result = cached_view .get_sealed_headers(&db, block_height_range.clone()) .unwrap(); @@ -214,11 +250,16 @@ mod tests { }; let cached_view = CachedView::new(false); - let block_height_range = 0..10; + let block_height_range = 0..100; let transactions = default_transactions(block_height_range.clone()); - cached_view - .transactions_on_blocks - .insert(block_height_range.clone(), transactions.clone()); + + for (block_height, transactions) in + block_height_range.clone().zip(transactions.iter()) + { + cached_view + .transactions_on_blocks + .insert(block_height, transactions.clone()); + } let result = cached_view .get_transactions(&db, block_height_range.clone()) @@ -241,7 +282,7 @@ mod tests { // when let notified = sender.notified(); - let block_height_range = 0..10; + let block_height_range = 0..100; let transactions = default_transactions(block_height_range.clone()); let result = cached_view .get_transactions(&db, block_height_range.clone()) @@ -266,7 +307,7 @@ mod tests { // when let notified = sender.notified(); - let block_height_range = 0..10; + let block_height_range = 0..100; let result = cached_view .get_transactions(&db, block_height_range.clone()) .unwrap(); From 2b2a8fb8cfecfbe23a4e828b66980122e5aedebd Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Wed, 16 Oct 2024 12:15:23 +0530 Subject: [PATCH 10/27] fix: metrics logging and clearing --- crates/services/p2p/src/cached_view.rs | 51 +++++++++++++------------- crates/services/p2p/src/service.rs | 1 + 2 files changed, 27 insertions(+), 25 deletions(-) diff --git a/crates/services/p2p/src/cached_view.rs b/crates/services/p2p/src/cached_view.rs index 4e65f2309b..d08e93437d 100644 --- a/crates/services/p2p/src/cached_view.rs +++ b/crates/services/p2p/src/cached_view.rs @@ -54,10 +54,8 @@ impl CachedView { let mut missing_ranges = block_height_range.clone(); for block_height in block_height_range.clone() { if let Some(header) = self.sealed_block_headers.get(&block_height) { - self.update_metrics(increment_p2p_req_res_cache_hits); headers.push(header.clone()); } else { - self.update_metrics(increment_p2p_req_res_cache_misses); // for the first block not in the cache, start a new range missing_ranges.start = block_height; break; @@ -65,18 +63,20 @@ impl CachedView { } if missing_ranges.is_empty() { - Ok(Some(headers)) - } else { - let missing_headers = view.get_sealed_headers(missing_ranges.clone())?; - if let Some(missing_headers) = &missing_headers { - for header in missing_headers.iter() { - self.sealed_block_headers - .insert((*header.entity.height()).into(), header.clone()); - headers.push(header.clone()); - } + self.update_metrics(increment_p2p_req_res_cache_hits); + return Ok(Some(headers)) + } + + self.update_metrics(increment_p2p_req_res_cache_misses); + let missing_headers = view.get_sealed_headers(missing_ranges.clone())?; + if let Some(missing_headers) = &missing_headers { + for header in missing_headers.iter() { + self.sealed_block_headers + .insert((*header.entity.height()).into(), header.clone()); + headers.push(header.clone()); } - Ok(missing_headers) } + Ok(missing_headers) } pub(crate) fn get_transactions( @@ -91,29 +91,30 @@ impl CachedView { let mut missing_ranges = block_height_range.clone(); for block_height in block_height_range.clone() { if let Some(cached_tx) = self.transactions_on_blocks.get(&block_height) { - self.update_metrics(increment_p2p_req_res_cache_hits); transactions.push(cached_tx.clone()); } else { - self.update_metrics(increment_p2p_req_res_cache_misses); // for the first block not in the cache, start a new range missing_ranges.start = block_height; break; } } + if missing_ranges.is_empty() { - Ok(Some(transactions)) - } else { - let missing_transactions = view.get_transactions(missing_ranges.clone())?; - if let Some(missing_transactions) = &missing_transactions { - for (block_height, transactions) in - missing_ranges.zip(missing_transactions.iter()) - { - self.transactions_on_blocks - .insert(block_height, transactions.clone()); - } + self.update_metrics(increment_p2p_req_res_cache_hits); + return Ok(Some(transactions)) + } + + self.update_metrics(increment_p2p_req_res_cache_misses); + let transactions = view.get_transactions(missing_ranges.clone())?; + if let Some(transactions) = &transactions { + for (block_height, transactions_per_block) in + missing_ranges.zip(transactions.iter()) + { + self.transactions_on_blocks + .insert(block_height, transactions_per_block.clone()); } - Ok(missing_transactions) } + Ok(transactions) } } diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 89f097f9ec..116afd58aa 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -978,6 +978,7 @@ where } _ = tokio::time::sleep_until(self.next_cache_reset_time) => { should_continue = true; + tracing::debug!("Resetting req/res protocol cache"); self.cached_view.clear(); self.next_cache_reset_time += self.cache_reset_interval; } From f597ac9db0602caed7e20eaeafed0a1bd2e96ca7 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Mon, 28 Oct 2024 11:33:12 +0100 Subject: [PATCH 11/27] fix: fmt and clippy --- crates/services/p2p/src/service.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index e20be28283..9f415f81b3 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -539,13 +539,14 @@ where max_len: usize, ) -> anyhow::Result<()> where - DbLookUpFn: - Fn(&V::LatestView, &Arc, Range) -> anyhow::Result> + Send + 'static, + DbLookUpFn: Fn(&V::LatestView, &Arc, Range) -> anyhow::Result> + + Send + + 'static, ResponseSenderFn: Fn(Result) -> V2ResponseMessage + Send + 'static, TaskRequestFn: Fn(Result, InboundRequestId) -> TaskRequest + Send - + 'static. + + 'static, R: Send + 'static, { let instant = Instant::now(); @@ -579,12 +580,12 @@ where return; } - // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 - // Add new error code - let response = db_lookup(&view, &cached_view, range.clone()) - .ok() - .flatten() - .ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse); + // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 + // Add new error code + let response = db_lookup(&view, &cached_view, range.clone()) + .ok() + .flatten() + .ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse); let _ = response_channel .try_send(task_request(response, request_id)) From 25515ee176ddf319b17307ade585a076acf7f99a Mon Sep 17 00:00:00 2001 From: Aaryamann Challani <43716372+rymnc@users.noreply.github.com> Date: Tue, 29 Oct 2024 15:08:05 +0530 Subject: [PATCH 12/27] Update CHANGELOG.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Mårten Blankfors --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 422113c3bc..1895f448d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,7 +43,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Fixed -- [2352](https://github.com/FuelLabs/fuel-core/pull/2352): Fetches transactions during sync phase from any node that can provide it instead of just 1. +- [2352](https://github.com/FuelLabs/fuel-core/pull/2352): Cache p2p responses to serve without roundtrip to db. ## [Version 0.39.0] From e481e6aca9692720933c4666fc01a58e35dbf2eb Mon Sep 17 00:00:00 2001 From: Aaryamann Challani <43716372+rymnc@users.noreply.github.com> Date: Tue, 29 Oct 2024 17:42:26 +0530 Subject: [PATCH 13/27] Update crates/services/p2p/src/cached_view.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Mårten Blankfors --- crates/services/p2p/src/cached_view.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/services/p2p/src/cached_view.rs b/crates/services/p2p/src/cached_view.rs index d08e93437d..d7608faccf 100644 --- a/crates/services/p2p/src/cached_view.rs +++ b/crates/services/p2p/src/cached_view.rs @@ -10,12 +10,11 @@ use fuel_core_types::{ services::p2p::Transactions, }; use std::ops::Range; +type BlockHeight = u32; pub struct CachedView { - // map block height to sealed block header - sealed_block_headers: DashMap, - // map block height to transactions - transactions_on_blocks: DashMap, + sealed_block_headers: DashMap, + transactions_on_blocks: DashMap, metrics: bool, } From 5ad509354d91f94dcf56713f71350138905819f0 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Tue, 29 Oct 2024 16:57:56 +0100 Subject: [PATCH 14/27] fix: make fetch generic --- crates/services/p2p/src/cached_view.rs | 95 +++++++++++++------------- 1 file changed, 48 insertions(+), 47 deletions(-) diff --git a/crates/services/p2p/src/cached_view.rs b/crates/services/p2p/src/cached_view.rs index d7608faccf..24b3e314bc 100644 --- a/crates/services/p2p/src/cached_view.rs +++ b/crates/services/p2p/src/cached_view.rs @@ -41,41 +41,64 @@ impl CachedView { } } - pub(crate) fn get_sealed_headers( + fn get_from_cache_or_db( &self, + cache: &DashMap, view: &V, - block_height_range: Range, - ) -> StorageResult>> + range: Range, + fetch_fn: F, + ) -> StorageResult>> where V: P2pDb, + T: Clone, + F: Fn(&V, Range) -> StorageResult>>, { - let mut headers = Vec::new(); - let mut missing_ranges = block_height_range.clone(); - for block_height in block_height_range.clone() { - if let Some(header) = self.sealed_block_headers.get(&block_height) { - headers.push(header.clone()); + let mut items = Vec::new(); + let mut missing_start = None; + + for height in range.clone() { + if let Some(item) = cache.get(&height) { + items.push(item.clone()); } else { - // for the first block not in the cache, start a new range - missing_ranges.start = block_height; + missing_start = Some(height); break; } } - if missing_ranges.is_empty() { + if missing_start.is_none() { self.update_metrics(increment_p2p_req_res_cache_hits); - return Ok(Some(headers)) + return Ok(Some(items)); } + let missing_range = missing_start.unwrap()..range.end; + self.update_metrics(increment_p2p_req_res_cache_misses); - let missing_headers = view.get_sealed_headers(missing_ranges.clone())?; - if let Some(missing_headers) = &missing_headers { - for header in missing_headers.iter() { - self.sealed_block_headers - .insert((*header.entity.height()).into(), header.clone()); - headers.push(header.clone()); + if let Some(fetched_items) = fetch_fn(view, missing_range.clone())? { + for (height, item) in missing_range.zip(fetched_items.iter()) { + cache.insert(height, item.clone()); + items.push(item.clone()); } + + return Ok(Some(items)); } - Ok(missing_headers) + + Ok(None) + } + + pub(crate) fn get_sealed_headers( + &self, + view: &V, + block_height_range: Range, + ) -> StorageResult>> + where + V: P2pDb, + { + self.get_from_cache_or_db( + &self.sealed_block_headers, + view, + block_height_range, + V::get_sealed_headers, + ) } pub(crate) fn get_transactions( @@ -86,34 +109,12 @@ impl CachedView { where V: P2pDb, { - let mut transactions = Vec::new(); - let mut missing_ranges = block_height_range.clone(); - for block_height in block_height_range.clone() { - if let Some(cached_tx) = self.transactions_on_blocks.get(&block_height) { - transactions.push(cached_tx.clone()); - } else { - // for the first block not in the cache, start a new range - missing_ranges.start = block_height; - break; - } - } - - if missing_ranges.is_empty() { - self.update_metrics(increment_p2p_req_res_cache_hits); - return Ok(Some(transactions)) - } - - self.update_metrics(increment_p2p_req_res_cache_misses); - let transactions = view.get_transactions(missing_ranges.clone())?; - if let Some(transactions) = &transactions { - for (block_height, transactions_per_block) in - missing_ranges.zip(transactions.iter()) - { - self.transactions_on_blocks - .insert(block_height, transactions_per_block.clone()); - } - } - Ok(transactions) + self.get_from_cache_or_db( + &self.transactions_on_blocks, + view, + block_height_range, + V::get_transactions, + ) } } From f16cb2f0435fa3b478ae615ea4ba908acdabbf2e Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Wed, 30 Oct 2024 09:42:52 +0100 Subject: [PATCH 15/27] fix: use let-else pattern --- crates/services/p2p/src/cached_view.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/services/p2p/src/cached_view.rs b/crates/services/p2p/src/cached_view.rs index 24b3e314bc..ff21eb2515 100644 --- a/crates/services/p2p/src/cached_view.rs +++ b/crates/services/p2p/src/cached_view.rs @@ -65,12 +65,12 @@ impl CachedView { } } - if missing_start.is_none() { + let Some(missing_start) = missing_start else { self.update_metrics(increment_p2p_req_res_cache_hits); return Ok(Some(items)); - } + }; - let missing_range = missing_start.unwrap()..range.end; + let missing_range = missing_start..range.end; self.update_metrics(increment_p2p_req_res_cache_misses); if let Some(fetched_items) = fetch_fn(view, missing_range.clone())? { From 1630e2bd21be59f0ffd48e585f7a6ad0dbd4189f Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Wed, 30 Oct 2024 14:04:06 +0100 Subject: [PATCH 16/27] fix: add test --- crates/services/p2p/src/service.rs | 66 +++++++++++++++++++++++++++--- 1 file changed, 60 insertions(+), 6 deletions(-) diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 9f415f81b3..e47454dbac 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -877,6 +877,12 @@ where _ = watcher.while_started() => { should_continue = false; }, + _ = tokio::time::sleep_until(self.next_cache_reset_time) => { + should_continue = true; + tracing::debug!("Resetting req/res protocol cache"); + self.cached_view.clear(); + self.next_cache_reset_time += self.cache_reset_interval; + }, latest_block_height = self.next_block_height.next() => { if let Some(latest_block_height) = latest_block_height { let _ = self.p2p_service.update_block_height(latest_block_height); @@ -999,12 +1005,6 @@ where } self.next_check_time += self.heartbeat_check_interval; } - _ = tokio::time::sleep_until(self.next_cache_reset_time) => { - should_continue = true; - tracing::debug!("Resetting req/res protocol cache"); - self.cached_view.clear(); - self.next_cache_reset_time += self.cache_reset_interval; - } } tracing::debug!("P2P task is finished"); @@ -1810,4 +1810,58 @@ pub mod tests { .expect("Should process the block height even under p2p pressure"); } } + + #[tokio::test] + async fn cached_view__is_reset_after_interval_passed() { + // given + let p2p_service = FakeP2PService { + peer_info: vec![], + next_event_stream: Box::pin(futures::stream::pending()), + }; + let (request_sender, request_receiver) = mpsc::channel(100); + + let (report_sender, _) = mpsc::channel(100); + let broadcast = FakeBroadcast { + peer_reports: report_sender, + }; + + let cache_reset_interval = Duration::from_millis(100); + let next_cache_reset_time = Instant::now(); + let mut task = Task { + chain_id: Default::default(), + response_timeout: Default::default(), + p2p_service, + view_provider: FakeDB, + tx_pool: FakeTxPool, + next_block_height: FakeBlockImporter.next_block_height(), + request_receiver, + request_sender, + db_heavy_task_processor: SyncProcessor::new("Test", 1, 1).unwrap(), + tx_pool_heavy_task_processor: AsyncProcessor::new("Test", 1, 1).unwrap(), + broadcast, + max_headers_per_request: 0, + max_txs_per_request: 100, + heartbeat_check_interval: Duration::from_secs(10), + heartbeat_max_avg_interval: Default::default(), + heartbeat_max_time_since_last: Default::default(), + next_check_time: Instant::now(), + heartbeat_peer_reputation_config: Default::default(), + cached_view: Arc::new(CachedView::new(false)), + cache_reset_interval, + next_cache_reset_time, + }; + let (watch_sender, watch_receiver) = tokio::sync::watch::channel(State::Started); + let mut watcher = StateWatcher::from(watch_receiver); + + // when + task.run(&mut watcher).await.unwrap(); + + // then + // we raise the sleep factor to ensure that the cache is reset + tokio::time::sleep(cache_reset_interval * 2).await; + watch_sender.send(State::Stopped).unwrap(); + + // if this was changed, we can be sure that the cache was reset + assert_ne!(task.next_cache_reset_time, next_cache_reset_time); + } } From 09d7bd2d4b3ac60034683130376a2268e2752e0f Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Wed, 30 Oct 2024 16:50:25 +0100 Subject: [PATCH 17/27] fix: call run multiple times until heartbeat report is available --- crates/services/p2p/src/service.rs | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index e47454dbac..cfc98d0632 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -1643,12 +1643,11 @@ pub mod tests { let mut watcher = StateWatcher::from(watch_receiver); // when - task.run(&mut watcher).await.unwrap(); - - // then let (report_peer_id, report, reporting_service) = - report_receiver.recv().await.unwrap(); + wait_until_report_received(&mut report_receiver, &mut task, &mut watcher) + .await; + // then watch_sender.send(State::Stopped).unwrap(); assert_eq!( @@ -1736,12 +1735,12 @@ pub mod tests { let mut watcher = StateWatcher::from(watch_receiver); // when - task.run(&mut watcher).await.unwrap(); - - // then + // we run this in a loop to ensure that the task is run until it reports let (report_peer_id, report, reporting_service) = - report_receiver.recv().await.unwrap(); + wait_until_report_received(&mut report_receiver, &mut task, &mut watcher) + .await; + // then watch_sender.send(State::Stopped).unwrap(); assert_eq!( @@ -1755,6 +1754,20 @@ pub mod tests { assert_eq!(reporting_service, "p2p"); } + async fn wait_until_report_received( + report_receiver: &mut Receiver<(FuelPeerId, AppScore, String)>, + task: &mut Task, + mut watcher: &mut StateWatcher, + ) -> (FuelPeerId, AppScore, String) { + loop { + task.run(&mut watcher).await.unwrap(); + if let Some((peer_id, recv_report, service)) = report_receiver.try_recv().ok() + { + return (peer_id, recv_report, service); + } + } + } + #[tokio::test] async fn should_process_all_imported_block_under_infinite_events_from_p2p() { // Given From 3972a47b3338c29cae73baeb358ac7f0cd47c92b Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Wed, 30 Oct 2024 17:07:35 +0100 Subject: [PATCH 18/27] fix: clippy --- crates/services/p2p/src/service.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index cfc98d0632..5820e4aa1e 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -1757,12 +1757,11 @@ pub mod tests { async fn wait_until_report_received( report_receiver: &mut Receiver<(FuelPeerId, AppScore, String)>, task: &mut Task, - mut watcher: &mut StateWatcher, + watcher: &mut StateWatcher, ) -> (FuelPeerId, AppScore, String) { loop { - task.run(&mut watcher).await.unwrap(); - if let Some((peer_id, recv_report, service)) = report_receiver.try_recv().ok() - { + task.run(watcher).await.unwrap(); + if let Ok((peer_id, recv_report, service)) = report_receiver.try_recv() { return (peer_id, recv_report, service); } } From 69de0711afae9c9edcee7bfd8de2057b00950e3b Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Wed, 30 Oct 2024 18:48:08 +0100 Subject: [PATCH 19/27] fix: dont allow cache reset interval to swallow runtime of other tasks --- crates/services/p2p/src/service.rs | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 5820e4aa1e..c84fbc9923 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -1636,16 +1636,19 @@ pub mod tests { next_check_time: Instant::now(), heartbeat_peer_reputation_config: heartbeat_peer_reputation_config.clone(), cached_view: Arc::new(CachedView::new(false)), - cache_reset_interval: Duration::from_secs(0), + cache_reset_interval: Duration::from_secs(10), next_cache_reset_time: Instant::now(), }; let (watch_sender, watch_receiver) = tokio::sync::watch::channel(State::Started); let mut watcher = StateWatcher::from(watch_receiver); // when - let (report_peer_id, report, reporting_service) = - wait_until_report_received(&mut report_receiver, &mut task, &mut watcher) - .await; + let (report_peer_id, report, reporting_service) = tokio::time::timeout( + Duration::from_secs(1), + wait_until_report_received(&mut report_receiver, &mut task, &mut watcher), + ) + .await + .unwrap(); // then watch_sender.send(State::Stopped).unwrap(); @@ -1728,7 +1731,7 @@ pub mod tests { next_check_time: Instant::now(), heartbeat_peer_reputation_config: heartbeat_peer_reputation_config.clone(), cached_view: Arc::new(CachedView::new(false)), - cache_reset_interval: Duration::from_secs(0), + cache_reset_interval: Duration::from_secs(10), next_cache_reset_time: Instant::now(), }; let (watch_sender, watch_receiver) = tokio::sync::watch::channel(State::Started); @@ -1736,9 +1739,12 @@ pub mod tests { // when // we run this in a loop to ensure that the task is run until it reports - let (report_peer_id, report, reporting_service) = - wait_until_report_received(&mut report_receiver, &mut task, &mut watcher) - .await; + let (report_peer_id, report, reporting_service) = tokio::time::timeout( + Duration::from_secs(1), + wait_until_report_received(&mut report_receiver, &mut task, &mut watcher), + ) + .await + .unwrap(); // then watch_sender.send(State::Stopped).unwrap(); @@ -1806,7 +1812,7 @@ pub mod tests { next_check_time: Instant::now(), heartbeat_peer_reputation_config: Default::default(), cached_view: Arc::new(CachedView::new(false)), - cache_reset_interval: Duration::from_secs(0), + cache_reset_interval: Duration::from_secs(10), next_cache_reset_time: Instant::now(), }; let mut watcher = StateWatcher::started(); From 7a0a7762d7312423ac226d1fd085fcd4ddf2266c Mon Sep 17 00:00:00 2001 From: Aaryamann Challani <43716372+rymnc@users.noreply.github.com> Date: Fri, 1 Nov 2024 15:41:12 +0530 Subject: [PATCH 20/27] chore: visibility of CachedView MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Rafał Chabowski <88321181+rafal-ch@users.noreply.github.com> --- crates/services/p2p/src/cached_view.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/services/p2p/src/cached_view.rs b/crates/services/p2p/src/cached_view.rs index ff21eb2515..215866e793 100644 --- a/crates/services/p2p/src/cached_view.rs +++ b/crates/services/p2p/src/cached_view.rs @@ -12,7 +12,7 @@ use fuel_core_types::{ use std::ops::Range; type BlockHeight = u32; -pub struct CachedView { +pub(super) struct CachedView { sealed_block_headers: DashMap, transactions_on_blocks: DashMap, metrics: bool, @@ -27,7 +27,7 @@ impl CachedView { } } - pub fn clear(&self) { + pub(super) fn clear(&self) { self.sealed_block_headers.clear(); self.transactions_on_blocks.clear(); } From d897cba69cac59d5c517d1987a2fa0a3af7e2c84 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Thu, 14 Nov 2024 17:29:12 +0700 Subject: [PATCH 21/27] chore: todos --- crates/services/p2p/src/cached_view.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/services/p2p/src/cached_view.rs b/crates/services/p2p/src/cached_view.rs index 215866e793..809b9f3314 100644 --- a/crates/services/p2p/src/cached_view.rs +++ b/crates/services/p2p/src/cached_view.rs @@ -58,6 +58,7 @@ impl CachedView { for height in range.clone() { if let Some(item) = cache.get(&height) { + // TODO(2436): replace with cheap Arc clone items.push(item.clone()); } else { missing_start = Some(height); @@ -76,6 +77,7 @@ impl CachedView { if let Some(fetched_items) = fetch_fn(view, missing_range.clone())? { for (height, item) in missing_range.zip(fetched_items.iter()) { cache.insert(height, item.clone()); + // TODO(2436): replace with cheap Arc clone items.push(item.clone()); } From b677c01cf301b433f1a26630020b888a9eb80e21 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Mon, 18 Nov 2024 10:58:43 +0530 Subject: [PATCH 22/27] fix: nit --- crates/services/p2p/src/service.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index c84fbc9923..8819d99bc9 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -450,7 +450,12 @@ impl UninitializedTask { } } -impl Task { +impl Task +where + P: TaskP2PService, + V: AtomicView, + B: Broadcast, +{ fn peer_heartbeat_reputation_checks(&self) -> anyhow::Result<()> { for (peer_id, peer_info) in self.p2p_service.get_all_peer_info() { if peer_info.heartbeat_data.duration_since_last_heartbeat() From 496071f27230210e9ef448245faf22e0f6b2c885 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Mon, 18 Nov 2024 11:38:59 +0530 Subject: [PATCH 23/27] chore: with lru cache --- crates/services/p2p/src/cached_view.rs | 162 +++++++++++++++++++++---- crates/services/p2p/src/service.rs | 104 ++++------------ 2 files changed, 158 insertions(+), 108 deletions(-) diff --git a/crates/services/p2p/src/cached_view.rs b/crates/services/p2p/src/cached_view.rs index 809b9f3314..b042ee5b9c 100644 --- a/crates/services/p2p/src/cached_view.rs +++ b/crates/services/p2p/src/cached_view.rs @@ -9,29 +9,82 @@ use fuel_core_types::{ blockchain::SealedBlockHeader, services::p2p::Transactions, }; -use std::ops::Range; +use std::{ + collections::VecDeque, + hash::Hash, + ops::Range, + sync::{ + Arc, + Mutex, + }, +}; + type BlockHeight = u32; +struct LruCache { + cache: DashMap>, + order: Mutex>, + capacity: usize, +} + +impl LruCache +where + K: Eq + Hash + Clone, +{ + fn new(capacity: usize) -> Self { + Self { + cache: DashMap::new(), + order: Mutex::new(VecDeque::new()), + capacity, + } + } + + fn insert(&self, key: K, value: V) { + let mut order = self.order.lock().expect("Poisoned lock"); + + if self.cache.len() >= self.capacity { + if let Some(least_used) = order.pop_front() { + self.cache.remove(&least_used); + } + } + + self.cache.insert(key.clone(), Arc::new(value)); + + // Update the access order. + order.retain(|k| k != &key); + order.push_back(key); + } + + fn get(&self, key: &K) -> Option> { + let mut order = self.order.lock().expect("Poisoned lock"); + + if let Some(value) = self.cache.get(key) { + // Update the access order. + order.retain(|k| k != key); + order.push_back(key.clone()); + + Some(Arc::clone(&value)) + } else { + None + } + } +} + pub(super) struct CachedView { - sealed_block_headers: DashMap, - transactions_on_blocks: DashMap, + sealed_block_headers: LruCache, + transactions_on_blocks: LruCache, metrics: bool, } impl CachedView { - pub fn new(metrics: bool) -> Self { + pub fn new(capacity: usize, metrics: bool) -> Self { Self { - sealed_block_headers: DashMap::new(), - transactions_on_blocks: DashMap::new(), + sealed_block_headers: LruCache::new(capacity), + transactions_on_blocks: LruCache::new(capacity), metrics, } } - pub(super) fn clear(&self) { - self.sealed_block_headers.clear(); - self.transactions_on_blocks.clear(); - } - fn update_metrics(&self, update_fn: U) where U: FnOnce(), @@ -43,11 +96,11 @@ impl CachedView { fn get_from_cache_or_db( &self, - cache: &DashMap, + cache: &LruCache, view: &V, range: Range, fetch_fn: F, - ) -> StorageResult>> + ) -> StorageResult>>> where V: P2pDb, T: Clone, @@ -58,7 +111,6 @@ impl CachedView { for height in range.clone() { if let Some(item) = cache.get(&height) { - // TODO(2436): replace with cheap Arc clone items.push(item.clone()); } else { missing_start = Some(height); @@ -77,8 +129,7 @@ impl CachedView { if let Some(fetched_items) = fetch_fn(view, missing_range.clone())? { for (height, item) in missing_range.zip(fetched_items.iter()) { cache.insert(height, item.clone()); - // TODO(2436): replace with cheap Arc clone - items.push(item.clone()); + items.push(item.clone().into()); } return Ok(Some(items)); @@ -91,7 +142,7 @@ impl CachedView { &self, view: &V, block_height_range: Range, - ) -> StorageResult>> + ) -> StorageResult>>> where V: P2pDb, { @@ -107,7 +158,7 @@ impl CachedView { &self, view: &V, block_height_range: Range, - ) -> StorageResult>> + ) -> StorageResult>>> where V: P2pDb, { @@ -181,10 +232,11 @@ mod tests { sender: sender.clone(), values: true, }; - let cached_view = CachedView::new(false); + let cached_view = CachedView::new(10, false); let block_height_range = 0..100; let sealed_headers = default_sealed_headers(block_height_range.clone()); + let sealed_headers_heap = sealed_headers.iter().cloned().map(Arc::new).collect(); for (block_height, header) in block_height_range.clone().zip(sealed_headers.iter()) { @@ -196,7 +248,7 @@ mod tests { let result = cached_view .get_sealed_headers(&db, block_height_range.clone()) .unwrap(); - assert_eq!(result, Some(sealed_headers)); + assert_eq!(result, Some(sealed_headers_heap)); } #[tokio::test] @@ -207,19 +259,20 @@ mod tests { sender: sender.clone(), values: true, }; - let cached_view = CachedView::new(false); + let cached_view = CachedView::new(10, false); // when let notified = sender.notified(); let block_height_range = 0..100; let sealed_headers = default_sealed_headers(block_height_range.clone()); + let sealed_headers_heap = sealed_headers.iter().cloned().map(Arc::new).collect(); let result = cached_view .get_sealed_headers(&db, block_height_range.clone()) .unwrap(); // then notified.await; - assert_eq!(result, Some(sealed_headers)); + assert_eq!(result, Some(sealed_headers_heap)); } #[tokio::test] @@ -230,7 +283,7 @@ mod tests { sender: sender.clone(), values: false, }; - let cached_view = CachedView::new(false); + let cached_view = CachedView::new(10, false); // when let notified = sender.notified(); @@ -251,7 +304,7 @@ mod tests { sender: sender.clone(), values: true, }; - let cached_view = CachedView::new(false); + let cached_view = CachedView::new(10, false); let block_height_range = 0..100; let transactions = default_transactions(block_height_range.clone()); @@ -281,7 +334,7 @@ mod tests { sender: sender.clone(), values: true, }; - let cached_view = CachedView::new(false); + let cached_view = CachedView::new(10, false); // when let notified = sender.notified(); @@ -306,7 +359,7 @@ mod tests { sender: sender.clone(), values: false, }; - let cached_view = CachedView::new(false); + let cached_view = CachedView::new(10, false); // when let notified = sender.notified(); @@ -319,4 +372,61 @@ mod tests { notified.await; assert!(result.is_none()); } + + #[tokio::test] + async fn cached_view__when_lru_is_full_it_makes_call_to_db() { + // given + let cache_capacity = 10; + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: true, + }; + let cached_view = CachedView::new(cache_capacity, false); + + // when + let block_height_range = 0..u32::try_from(cache_capacity).unwrap() + 1; + let _ = cached_view + .get_transactions(&db, block_height_range.clone()) + .unwrap(); + + let notified = sender.notified(); + let result = cached_view + .get_transactions(&db, block_height_range.clone()) + .unwrap(); + + // then + notified.await; + assert!(result.is_some()); + } + + #[tokio::test] + async fn cached_view__when_lru_is_partially_full_it_does_not_make_call_to_db() { + // given + let cache_capacity = 100; + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: true, + }; + let cached_view = CachedView::new(cache_capacity, false); + + // when + let block_height_range = 0..10; + let _ = cached_view + .get_transactions(&db, block_height_range.clone()) + .unwrap(); + + let notified = sender.notified(); + let _ = cached_view + .get_transactions(&db, block_height_range.clone()) + .unwrap(); + + // then + assert!( + tokio::time::timeout(std::time::Duration::from_millis(50), notified) + .await + .is_err() + ) + } } diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 8819d99bc9..9836f6058c 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -82,7 +82,10 @@ use libp2p::{ use std::{ fmt::Debug, future::Future, - ops::Range, + ops::{ + Deref, + Range, + }, sync::Arc, }; use tokio::{ @@ -415,9 +418,6 @@ pub struct Task { heartbeat_peer_reputation_config: HeartbeatPeerReputationConfig, // cached view cached_view: Arc, - // milliseconds wait time between cache reset - cache_reset_interval: Duration, - next_cache_reset_time: Instant, } #[derive(Default, Clone)] @@ -623,6 +623,11 @@ where cached_view .get_transactions(view, range) .map_err(anyhow::Error::from) + .map(|txs| { + txs.map(|txs| { + txs.into_iter().map(|tx| tx.deref().clone()).collect() + }) + }) }, |response, request_id| TaskRequest::DatabaseTransactionsLookUp { response, @@ -645,6 +650,14 @@ where cached_view .get_sealed_headers(view, range) .map_err(anyhow::Error::from) + .map(|headers| { + headers.map(|headers| { + headers + .into_iter() + .map(|header| header.deref().clone()) + .collect() + }) + }) }, |response, request_id| TaskRequest::DatabaseHeaderLookUp { response, @@ -830,11 +843,6 @@ where AsyncProcessor::new("P2P_TxPoolLookUpProcessor", tx_pool_threads, 32)?; let request_sender = broadcast.request_sender.clone(); - let cache_reset_interval = Duration::from_millis(10_000); - let next_cache_reset_time = Instant::now() - .checked_add(cache_reset_interval) - .expect("The cache reset interval should be small enough to do frequently"); - let task = Task { chain_id, response_timeout, @@ -854,9 +862,7 @@ where heartbeat_max_time_since_last, next_check_time, heartbeat_peer_reputation_config, - cached_view: Arc::new(CachedView::new(metrics)), - cache_reset_interval, - next_cache_reset_time, + cached_view: Arc::new(CachedView::new(1_000, metrics)), }; Ok(task) } @@ -882,12 +888,6 @@ where _ = watcher.while_started() => { should_continue = false; }, - _ = tokio::time::sleep_until(self.next_cache_reset_time) => { - should_continue = true; - tracing::debug!("Resetting req/res protocol cache"); - self.cached_view.clear(); - self.next_cache_reset_time += self.cache_reset_interval; - }, latest_block_height = self.next_block_height.next() => { if let Some(latest_block_height) = latest_block_height { let _ = self.p2p_service.update_block_height(latest_block_height); @@ -1038,7 +1038,7 @@ pub struct SharedState { reserved_peers_broadcast: broadcast::Sender, /// Used for communicating with the `Task`. request_sender: mpsc::Sender, - /// Sender of p2p blopck height data + /// Sender of p2p block height data block_height_broadcast: broadcast::Sender, /// Max txs per request max_txs_per_request: usize, @@ -1640,9 +1640,7 @@ pub mod tests { heartbeat_max_time_since_last, next_check_time: Instant::now(), heartbeat_peer_reputation_config: heartbeat_peer_reputation_config.clone(), - cached_view: Arc::new(CachedView::new(false)), - cache_reset_interval: Duration::from_secs(10), - next_cache_reset_time: Instant::now(), + cached_view: Arc::new(CachedView::new(100, false)), }; let (watch_sender, watch_receiver) = tokio::sync::watch::channel(State::Started); let mut watcher = StateWatcher::from(watch_receiver); @@ -1735,9 +1733,7 @@ pub mod tests { heartbeat_max_time_since_last, next_check_time: Instant::now(), heartbeat_peer_reputation_config: heartbeat_peer_reputation_config.clone(), - cached_view: Arc::new(CachedView::new(false)), - cache_reset_interval: Duration::from_secs(10), - next_cache_reset_time: Instant::now(), + cached_view: Arc::new(CachedView::new(100, false)), }; let (watch_sender, watch_receiver) = tokio::sync::watch::channel(State::Started); let mut watcher = StateWatcher::from(watch_receiver); @@ -1816,9 +1812,7 @@ pub mod tests { heartbeat_max_time_since_last: Default::default(), next_check_time: Instant::now(), heartbeat_peer_reputation_config: Default::default(), - cached_view: Arc::new(CachedView::new(false)), - cache_reset_interval: Duration::from_secs(10), - next_cache_reset_time: Instant::now(), + cached_view: Arc::new(CachedView::new(100, false)), }; let mut watcher = StateWatcher::started(); // End of initialization @@ -1833,58 +1827,4 @@ pub mod tests { .expect("Should process the block height even under p2p pressure"); } } - - #[tokio::test] - async fn cached_view__is_reset_after_interval_passed() { - // given - let p2p_service = FakeP2PService { - peer_info: vec![], - next_event_stream: Box::pin(futures::stream::pending()), - }; - let (request_sender, request_receiver) = mpsc::channel(100); - - let (report_sender, _) = mpsc::channel(100); - let broadcast = FakeBroadcast { - peer_reports: report_sender, - }; - - let cache_reset_interval = Duration::from_millis(100); - let next_cache_reset_time = Instant::now(); - let mut task = Task { - chain_id: Default::default(), - response_timeout: Default::default(), - p2p_service, - view_provider: FakeDB, - tx_pool: FakeTxPool, - next_block_height: FakeBlockImporter.next_block_height(), - request_receiver, - request_sender, - db_heavy_task_processor: SyncProcessor::new("Test", 1, 1).unwrap(), - tx_pool_heavy_task_processor: AsyncProcessor::new("Test", 1, 1).unwrap(), - broadcast, - max_headers_per_request: 0, - max_txs_per_request: 100, - heartbeat_check_interval: Duration::from_secs(10), - heartbeat_max_avg_interval: Default::default(), - heartbeat_max_time_since_last: Default::default(), - next_check_time: Instant::now(), - heartbeat_peer_reputation_config: Default::default(), - cached_view: Arc::new(CachedView::new(false)), - cache_reset_interval, - next_cache_reset_time, - }; - let (watch_sender, watch_receiver) = tokio::sync::watch::channel(State::Started); - let mut watcher = StateWatcher::from(watch_receiver); - - // when - task.run(&mut watcher).await.unwrap(); - - // then - // we raise the sleep factor to ensure that the cache is reset - tokio::time::sleep(cache_reset_interval * 2).await; - watch_sender.send(State::Stopped).unwrap(); - - // if this was changed, we can be sure that the cache was reset - assert_ne!(task.next_cache_reset_time, next_cache_reset_time); - } } From 1bdcf0094f89ce4d9ddc76174d6fef9cff0b097b Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Mon, 18 Nov 2024 18:53:26 +0530 Subject: [PATCH 24/27] fix: replace dashmap with quick_cache --- Cargo.lock | 28 +++++------ crates/services/p2p/Cargo.toml | 2 +- crates/services/p2p/src/cached_view.rs | 70 +++----------------------- 3 files changed, 22 insertions(+), 78 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 36f2d1f96e..c380de1136 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2214,20 +2214,6 @@ dependencies = [ "syn 2.0.85", ] -[[package]] -name = "dashmap" -version = "6.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" -dependencies = [ - "cfg-if", - "crossbeam-utils", - "hashbrown 0.14.5", - "lock_api", - "once_cell", - "parking_lot_core", -] - [[package]] name = "data-encoding" version = "2.6.0" @@ -3570,7 +3556,6 @@ dependencies = [ "anyhow", "async-trait", "ctor", - "dashmap", "fuel-core-chain-config", "fuel-core-metrics", "fuel-core-p2p", @@ -3589,6 +3574,7 @@ dependencies = [ "prometheus-client", "quick-protobuf", "quick-protobuf-codec 0.3.1", + "quick_cache", "rand", "rayon", "serde", @@ -7364,6 +7350,18 @@ dependencies = [ "unsigned-varint 0.8.0", ] +[[package]] +name = "quick_cache" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d7c94f8935a9df96bb6380e8592c70edf497a643f94bd23b2f76b399385dbf4" +dependencies = [ + "ahash", + "equivalent", + "hashbrown 0.14.5", + "parking_lot", +] + [[package]] name = "quinn" version = "0.11.5" diff --git a/crates/services/p2p/Cargo.toml b/crates/services/p2p/Cargo.toml index aeab90bc95..da5e55d3f9 100644 --- a/crates/services/p2p/Cargo.toml +++ b/crates/services/p2p/Cargo.toml @@ -13,7 +13,6 @@ description = "Fuel client networking" [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } -dashmap = "6.1.0" fuel-core-chain-config = { workspace = true } fuel-core-metrics = { workspace = true } # TODO make this a feature fuel-core-services = { workspace = true, features = ["sync-processor"] } @@ -42,6 +41,7 @@ libp2p = { version = "0.53.2", default-features = false, features = [ libp2p-mplex = "0.41.0" postcard = { workspace = true, features = ["use-std"] } prometheus-client = { workspace = true } +quick_cache = "0.6.9" quick-protobuf = "0.8.1" quick-protobuf-codec = "0.3.0" rand = { workspace = true } diff --git a/crates/services/p2p/src/cached_view.rs b/crates/services/p2p/src/cached_view.rs index b042ee5b9c..b641c5a1eb 100644 --- a/crates/services/p2p/src/cached_view.rs +++ b/crates/services/p2p/src/cached_view.rs @@ -1,5 +1,4 @@ use crate::ports::P2pDb; -use dashmap::DashMap; use fuel_core_metrics::p2p_metrics::{ increment_p2p_req_res_cache_hits, increment_p2p_req_res_cache_misses, @@ -9,78 +8,25 @@ use fuel_core_types::{ blockchain::SealedBlockHeader, services::p2p::Transactions, }; +use quick_cache::sync::Cache; use std::{ - collections::VecDeque, - hash::Hash, ops::Range, - sync::{ - Arc, - Mutex, - }, + sync::Arc, }; type BlockHeight = u32; -struct LruCache { - cache: DashMap>, - order: Mutex>, - capacity: usize, -} - -impl LruCache -where - K: Eq + Hash + Clone, -{ - fn new(capacity: usize) -> Self { - Self { - cache: DashMap::new(), - order: Mutex::new(VecDeque::new()), - capacity, - } - } - - fn insert(&self, key: K, value: V) { - let mut order = self.order.lock().expect("Poisoned lock"); - - if self.cache.len() >= self.capacity { - if let Some(least_used) = order.pop_front() { - self.cache.remove(&least_used); - } - } - - self.cache.insert(key.clone(), Arc::new(value)); - - // Update the access order. - order.retain(|k| k != &key); - order.push_back(key); - } - - fn get(&self, key: &K) -> Option> { - let mut order = self.order.lock().expect("Poisoned lock"); - - if let Some(value) = self.cache.get(key) { - // Update the access order. - order.retain(|k| k != key); - order.push_back(key.clone()); - - Some(Arc::clone(&value)) - } else { - None - } - } -} - pub(super) struct CachedView { - sealed_block_headers: LruCache, - transactions_on_blocks: LruCache, + sealed_block_headers: Cache, + transactions_on_blocks: Cache, metrics: bool, } impl CachedView { pub fn new(capacity: usize, metrics: bool) -> Self { Self { - sealed_block_headers: LruCache::new(capacity), - transactions_on_blocks: LruCache::new(capacity), + sealed_block_headers: Cache::new(capacity), + transactions_on_blocks: Cache::new(capacity), metrics, } } @@ -96,7 +42,7 @@ impl CachedView { fn get_from_cache_or_db( &self, - cache: &LruCache, + cache: &Cache, view: &V, range: Range, fetch_fn: F, @@ -111,7 +57,7 @@ impl CachedView { for height in range.clone() { if let Some(item) = cache.get(&height) { - items.push(item.clone()); + items.push(item.clone().into()); } else { missing_start = Some(height); break; From 099fe4fe839bdd27008dfb543ba8665b26c75657 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Mon, 18 Nov 2024 18:56:48 +0530 Subject: [PATCH 25/27] fix: unnecessary clone --- crates/services/p2p/src/cached_view.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/services/p2p/src/cached_view.rs b/crates/services/p2p/src/cached_view.rs index b641c5a1eb..1a16bfe1aa 100644 --- a/crates/services/p2p/src/cached_view.rs +++ b/crates/services/p2p/src/cached_view.rs @@ -57,7 +57,7 @@ impl CachedView { for height in range.clone() { if let Some(item) = cache.get(&height) { - items.push(item.clone().into()); + items.push(item.into()); } else { missing_start = Some(height); break; From 0b328c259e038bb64dfda7191330a3086fdeada2 Mon Sep 17 00:00:00 2001 From: green Date: Mon, 18 Nov 2024 09:12:25 -0500 Subject: [PATCH 26/27] Removed useless `Arc` --- crates/services/p2p/Cargo.toml | 2 +- crates/services/p2p/src/cached_view.rs | 23 +++++++++-------------- crates/services/p2p/src/service.rs | 18 +----------------- 3 files changed, 11 insertions(+), 32 deletions(-) diff --git a/crates/services/p2p/Cargo.toml b/crates/services/p2p/Cargo.toml index da5e55d3f9..158f16f836 100644 --- a/crates/services/p2p/Cargo.toml +++ b/crates/services/p2p/Cargo.toml @@ -41,9 +41,9 @@ libp2p = { version = "0.53.2", default-features = false, features = [ libp2p-mplex = "0.41.0" postcard = { workspace = true, features = ["use-std"] } prometheus-client = { workspace = true } -quick_cache = "0.6.9" quick-protobuf = "0.8.1" quick-protobuf-codec = "0.3.0" +quick_cache = "0.6.9" rand = { workspace = true } rayon = { workspace = true } serde = { workspace = true, features = ["derive"] } diff --git a/crates/services/p2p/src/cached_view.rs b/crates/services/p2p/src/cached_view.rs index 1a16bfe1aa..1bef869ca0 100644 --- a/crates/services/p2p/src/cached_view.rs +++ b/crates/services/p2p/src/cached_view.rs @@ -9,10 +9,7 @@ use fuel_core_types::{ services::p2p::Transactions, }; use quick_cache::sync::Cache; -use std::{ - ops::Range, - sync::Arc, -}; +use std::ops::Range; type BlockHeight = u32; @@ -46,7 +43,7 @@ impl CachedView { view: &V, range: Range, fetch_fn: F, - ) -> StorageResult>>> + ) -> StorageResult>> where V: P2pDb, T: Clone, @@ -57,7 +54,7 @@ impl CachedView { for height in range.clone() { if let Some(item) = cache.get(&height) { - items.push(item.into()); + items.push(item); } else { missing_start = Some(height); break; @@ -73,9 +70,9 @@ impl CachedView { self.update_metrics(increment_p2p_req_res_cache_misses); if let Some(fetched_items) = fetch_fn(view, missing_range.clone())? { - for (height, item) in missing_range.zip(fetched_items.iter()) { + for (height, item) in missing_range.zip(fetched_items.into_iter()) { cache.insert(height, item.clone()); - items.push(item.clone().into()); + items.push(item); } return Ok(Some(items)); @@ -88,7 +85,7 @@ impl CachedView { &self, view: &V, block_height_range: Range, - ) -> StorageResult>>> + ) -> StorageResult>> where V: P2pDb, { @@ -104,7 +101,7 @@ impl CachedView { &self, view: &V, block_height_range: Range, - ) -> StorageResult>>> + ) -> StorageResult>> where V: P2pDb, { @@ -182,7 +179,6 @@ mod tests { let block_height_range = 0..100; let sealed_headers = default_sealed_headers(block_height_range.clone()); - let sealed_headers_heap = sealed_headers.iter().cloned().map(Arc::new).collect(); for (block_height, header) in block_height_range.clone().zip(sealed_headers.iter()) { @@ -194,7 +190,7 @@ mod tests { let result = cached_view .get_sealed_headers(&db, block_height_range.clone()) .unwrap(); - assert_eq!(result, Some(sealed_headers_heap)); + assert_eq!(result, Some(sealed_headers)); } #[tokio::test] @@ -211,14 +207,13 @@ mod tests { let notified = sender.notified(); let block_height_range = 0..100; let sealed_headers = default_sealed_headers(block_height_range.clone()); - let sealed_headers_heap = sealed_headers.iter().cloned().map(Arc::new).collect(); let result = cached_view .get_sealed_headers(&db, block_height_range.clone()) .unwrap(); // then notified.await; - assert_eq!(result, Some(sealed_headers_heap)); + assert_eq!(result, Some(sealed_headers)); } #[tokio::test] diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 9836f6058c..e08071ad4a 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -82,10 +82,7 @@ use libp2p::{ use std::{ fmt::Debug, future::Future, - ops::{ - Deref, - Range, - }, + ops::Range, sync::Arc, }; use tokio::{ @@ -623,11 +620,6 @@ where cached_view .get_transactions(view, range) .map_err(anyhow::Error::from) - .map(|txs| { - txs.map(|txs| { - txs.into_iter().map(|tx| tx.deref().clone()).collect() - }) - }) }, |response, request_id| TaskRequest::DatabaseTransactionsLookUp { response, @@ -650,14 +642,6 @@ where cached_view .get_sealed_headers(view, range) .map_err(anyhow::Error::from) - .map(|headers| { - headers.map(|headers| { - headers - .into_iter() - .map(|header| header.deref().clone()) - .collect() - }) - }) }, |response, request_id| TaskRequest::DatabaseHeaderLookUp { response, From ccb42d5534c42a75840bbd844e1b379b5b7824bb Mon Sep 17 00:00:00 2001 From: Green Baneling Date: Mon, 18 Nov 2024 09:17:51 -0500 Subject: [PATCH 27/27] Update crates/services/p2p/src/service.rs --- crates/services/p2p/src/service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index e08071ad4a..d8aa7c170a 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -846,7 +846,7 @@ where heartbeat_max_time_since_last, next_check_time, heartbeat_peer_reputation_config, - cached_view: Arc::new(CachedView::new(1_000, metrics)), + cached_view: Arc::new(CachedView::new(614 * 10, metrics)), }; Ok(task) }