diff --git a/Cargo.lock b/Cargo.lock index e5a83abd0a..33dba1e525 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2755,6 +2755,7 @@ dependencies = [ "libp2p-identity", "libp2p-networking", "local-ip-address", + "lru", "portpicker", "rand 0.8.5", "rand_chacha 0.3.1", diff --git a/crates/hotshot/Cargo.toml b/crates/hotshot/Cargo.toml index 2483dbe8bd..2cb0b08fc0 100644 --- a/crates/hotshot/Cargo.toml +++ b/crates/hotshot/Cargo.toml @@ -116,6 +116,7 @@ local-ip-address = "0.5.6" dyn-clone = { git = "https://github.com/dtolnay/dyn-clone", tag = "1.0.16" } derive_more = "0.99.17" portpicker = "0.1.1" +lru = "0.12.1" tracing = { workspace = true } typenum = { workspace = true } diff --git a/crates/hotshot/src/tasks/mod.rs b/crates/hotshot/src/tasks/mod.rs index dd903d7741..b47dfa99a2 100644 --- a/crates/hotshot/src/tasks/mod.rs +++ b/crates/hotshot/src/tasks/mod.rs @@ -244,7 +244,7 @@ pub async fn add_consensus_task>( }; consensus_state .quorum_network - .inject_consensus_info(ConsensusIntentEvent::PollForCurrentProposal) + .inject_consensus_info(ConsensusIntentEvent::PollForLatestQuorumProposal) .await; let filter = FilterEvent(Arc::new(consensus_event_filter)); let consensus_name = "Consensus Task"; diff --git a/crates/hotshot/src/traits/networking/web_server_network.rs b/crates/hotshot/src/traits/networking/web_server_network.rs index 7d4d1294cf..ef35b9a393 100644 --- a/crates/hotshot/src/traits/networking/web_server_network.rs +++ b/crates/hotshot/src/traits/networking/web_server_network.rs @@ -26,7 +26,11 @@ use hotshot_types::{ }, }; use hotshot_web_server::{self, config}; +use lru::LruCache; use serde::{Deserialize, Serialize}; +use std::collections::hash_map::DefaultHasher; +use std::hash::{Hash, Hasher}; +use std::num::NonZeroUsize; use surf_disco::Url; use hotshot_types::traits::network::ViewMessage; @@ -53,6 +57,15 @@ impl WebCommChannel { } } +/// # Note +/// +/// This function uses `DefaultHasher` instead of cryptographic hash functions like SHA-256 because `DefaultHasher` can hash a value directly from a reference, while SHA-256 requires owning the value or copying/cloning it. +fn hash(t: &T) -> u64 { + let mut s = DefaultHasher::new(); + t.hash(&mut s); + s.finish() +} + /// The web server network state #[derive(Clone, Debug)] pub struct WebServerNetwork { @@ -205,9 +218,10 @@ struct Inner { view_sync_vote_task_map: Arc>>, /// Task map for transactions txn_task_map: Arc>>, - /// Task polling for current propsal - current_proposal_task: - Arc>>>>, + /// Task polling for latest quorum propsal + latest_quorum_proposal_task: Arc>>>, + /// Task polling for latest view sync proposal + latest_view_sync_proposal_task: Arc>>>, } impl Inner { @@ -221,6 +235,7 @@ impl Inner { ) -> Result<(), NetworkError> { let mut vote_index = 0; let mut tx_index = 0; + let mut seen_proposals = LruCache::new(NonZeroUsize::new(100).unwrap()); if message_purpose == MessagePurpose::Data { tx_index = *self.tx_index.read().await; @@ -230,7 +245,10 @@ impl Inner { while self.running.load(Ordering::Relaxed) { let endpoint = match message_purpose { MessagePurpose::Proposal => config::get_proposal_route(view_number), - MessagePurpose::CurrentProposal => config::get_recent_proposal_route(), + MessagePurpose::LatestQuorumProposal => config::get_latest_quorum_proposal_route(), + MessagePurpose::LatestViewSyncProposal => { + config::get_latest_view_sync_proposal_route() + } MessagePurpose::Vote => config::get_vote_route(view_number, vote_index), MessagePurpose::Data => config::get_transactions_route(tx_index), MessagePurpose::Internal => unimplemented!(), @@ -292,7 +310,7 @@ impl Inner { // } // } } - MessagePurpose::CurrentProposal => { + MessagePurpose::LatestQuorumProposal => { // Only pushing the first proposal since we will soon only be allowing 1 proposal per view self.broadcast_poll_queue .write() @@ -301,6 +319,20 @@ impl Inner { return Ok(()); } + MessagePurpose::LatestViewSyncProposal => { + let mut broadcast_poll_queue = + self.broadcast_poll_queue.write().await; + + for cert in &deserialized_messages { + let hash = hash(&cert); + if seen_proposals.put(hash, ()).is_none() { + broadcast_poll_queue.push(cert.clone()); + } + } + + // additional sleep to reduce load on web server + async_sleep(Duration::from_millis(300)).await; + } MessagePurpose::Vote => { // error!( // "Received {} votes from web server for view {} is da {}", @@ -374,7 +406,10 @@ impl Inner { // TODO ED Need to add vote indexing to web server for view sync certs for cert in &deserialized_messages { vote_index += 1; - broadcast_poll_queue.push(cert.clone()); + let hash = hash(cert); + if seen_proposals.put(hash, ()).is_none() { + broadcast_poll_queue.push(cert.clone()); + } } } @@ -420,6 +455,10 @@ impl Inner { } } + ConsensusIntentEvent::CancelPollForLatestViewSyncProposal => { + return Ok(()); + } + _ => { unimplemented!() } @@ -499,7 +538,7 @@ pub struct SendMsg { } /// A message being received from the web server -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Hash)] #[serde(bound(deserialize = ""))] pub struct RecvMsg { /// The optional message being received @@ -571,7 +610,8 @@ impl WebServerNetwork { view_sync_cert_task_map: Arc::default(), view_sync_vote_task_map: Arc::default(), txn_task_map: Arc::default(), - current_proposal_task: Arc::default(), + latest_quorum_proposal_task: Arc::default(), + latest_view_sync_proposal_task: Arc::default(), }); inner.connected.store(true, Ordering::Relaxed); @@ -593,7 +633,9 @@ impl WebServerNetwork { MessagePurpose::Proposal => config::post_proposal_route(*view_number), MessagePurpose::Vote => config::post_vote_route(*view_number), MessagePurpose::Data => config::post_transactions_route(), - MessagePurpose::Internal | MessagePurpose::CurrentProposal => { + MessagePurpose::Internal + | MessagePurpose::LatestQuorumProposal + | MessagePurpose::LatestViewSyncProposal => { return Err(WebServerNetworkError::EndpointError) } MessagePurpose::ViewSyncProposal => { @@ -907,8 +949,8 @@ impl ConnectedNetwork, TYPES::Signatur .prune_tasks(view_number, ConsensusIntentEvent::CancelPollForVIDDisperse) .await; } - ConsensusIntentEvent::PollForCurrentProposal => { - let mut proposal_task = self.inner.current_proposal_task.write().await; + ConsensusIntentEvent::PollForLatestQuorumProposal => { + let mut proposal_task = self.inner.latest_quorum_proposal_task.write().await; if proposal_task.is_none() { // create new task let (sender, receiver) = unbounded(); @@ -918,7 +960,7 @@ impl ConnectedNetwork, TYPES::Signatur let inner_clone = self.inner.clone(); async move { if let Err(e) = inner_clone - .poll_web_server(receiver, MessagePurpose::CurrentProposal, 1) + .poll_web_server(receiver, MessagePurpose::LatestQuorumProposal, 1) .await { warn!( @@ -926,12 +968,44 @@ impl ConnectedNetwork, TYPES::Signatur e ); } - let mut proposal_task = inner_clone.current_proposal_task.write().await; + let mut proposal_task = + inner_clone.latest_quorum_proposal_task.write().await; *proposal_task = None; } }); } } + ConsensusIntentEvent::PollForLatestViewSyncProposal => { + let mut latest_view_sync_proposal_task = + self.inner.latest_view_sync_proposal_task.write().await; + if latest_view_sync_proposal_task.is_none() { + // create new task + let (sender, receiver) = unbounded(); + *latest_view_sync_proposal_task = Some(sender); + + async_spawn({ + let inner_clone = self.inner.clone(); + async move { + if let Err(e) = inner_clone + .poll_web_server( + receiver, + MessagePurpose::LatestViewSyncProposal, + 1, + ) + .await + { + warn!( + "Background receive proposal polling encountered an error: {:?}", + e + ); + } + let mut latest_view_sync_proposal_task = + inner_clone.latest_view_sync_proposal_task.write().await; + *latest_view_sync_proposal_task = None; + } + }); + } + } ConsensusIntentEvent::PollForVotes(view_number) => { let mut task_map = self.inner.vote_task_map.write().await; if let Entry::Vacant(e) = task_map.entry(view_number) { @@ -1005,6 +1079,17 @@ impl ConnectedNetwork, TYPES::Signatur } } + ConsensusIntentEvent::CancelPollForLatestViewSyncProposal => { + let mut latest_view_sync_proposal_task = + self.inner.latest_view_sync_proposal_task.write().await; + + if let Some(thing) = latest_view_sync_proposal_task.take() { + let _res = thing + .send(ConsensusIntentEvent::CancelPollForLatestViewSyncProposal) + .await; + } + } + ConsensusIntentEvent::PollForViewSyncCertificate(view_number) => { let mut task_map = self.inner.view_sync_cert_task_map.write().await; if let Entry::Vacant(e) = task_map.entry(view_number) { diff --git a/crates/task-impls/src/view_sync.rs b/crates/task-impls/src/view_sync.rs index d31e9b6219..911451aca2 100644 --- a/crates/task-impls/src/view_sync.rs +++ b/crates/task-impls/src/view_sync.rs @@ -502,6 +502,12 @@ impl< *view_number + 1, )) .await; + + // Poll for future view sync certificates + self.network + .inject_consensus_info(ConsensusIntentEvent::PollForLatestViewSyncProposal) + .await; + // Spawn replica task let next_view = *view_number + 1; // Subscribe to the view after we are leader since we know we won't propose in the next view if we are leader. @@ -521,7 +527,7 @@ impl< // Also subscribe to the latest view for the same reason. The GC will remove the above poll // in the case that one doesn't resolve but this one does. self.network - .inject_consensus_info(ConsensusIntentEvent::PollForCurrentProposal) + .inject_consensus_info(ConsensusIntentEvent::PollForLatestQuorumProposal) .await; self.network @@ -829,6 +835,13 @@ impl, A: ConsensusApi + )) .await; + // Cancel poll for future view sync certificates + self.network + .inject_consensus_info( + ConsensusIntentEvent::CancelPollForLatestViewSyncProposal, + ) + .await; + self.phase = last_seen_certificate; if certificate.get_data().relay > self.relay { @@ -895,9 +908,9 @@ impl, A: ConsensusApi + if let Some(timeout_task) = self.timeout_task.take() { cancel_task(timeout_task).await; } - // Keep tyring to get a more recent proposal to catch up to + // Keep trying to get a more recent proposal to catch up to self.network - .inject_consensus_info(ConsensusIntentEvent::PollForCurrentProposal) + .inject_consensus_info(ConsensusIntentEvent::PollForLatestQuorumProposal) .await; self.relay += 1; match self.phase { diff --git a/crates/types/src/message.rs b/crates/types/src/message.rs index 6513413358..a4bb68907f 100644 --- a/crates/types/src/message.rs +++ b/crates/types/src/message.rs @@ -60,8 +60,10 @@ pub struct Messages(pub Vec>); pub enum MessagePurpose { /// Message with a quorum proposal. Proposal, - /// Message with most recent proposal the server has - CurrentProposal, + /// Message with most recent quorum proposal the server has + LatestQuorumProposal, + /// Message with most recent view sync proposal the server has + LatestViewSyncProposal, /// Message with a quorum vote. Vote, /// Message with a view sync vote. diff --git a/crates/types/src/traits/network.rs b/crates/types/src/traits/network.rs index 3a8e754aae..df1bea4bbd 100644 --- a/crates/types/src/traits/network.rs +++ b/crates/types/src/traits/network.rs @@ -146,8 +146,10 @@ pub enum ConsensusIntentEvent { PollForProposal(u64), /// Poll for VID disperse data for a particular view PollForVIDDisperse(u64), - /// Poll for the most recent proposal the webserver has - PollForCurrentProposal, + /// Poll for the most recent quorum proposal the webserver has + PollForLatestQuorumProposal, + /// Poll for the most recent view sync proposal the webserver has + PollForLatestViewSyncProposal, /// Poll for a DAC for a particular view PollForDAC(u64), /// Poll for view sync votes starting at a particular view @@ -172,6 +174,8 @@ pub enum ConsensusIntentEvent { CancelPollForVIDDisperse(u64), /// Cancel polling for transactions CancelPollForTransactions(u64), + /// Cancel polling for most recent view sync proposal + CancelPollForLatestViewSyncProposal, } impl ConsensusIntentEvent { @@ -194,7 +198,9 @@ impl ConsensusIntentEvent { | ConsensusIntentEvent::PollForTransactions(view_number) | ConsensusIntentEvent::CancelPollForTransactions(view_number) | ConsensusIntentEvent::PollFutureLeader(view_number, _) => *view_number, - ConsensusIntentEvent::PollForCurrentProposal => 1, + ConsensusIntentEvent::PollForLatestQuorumProposal + | ConsensusIntentEvent::PollForLatestViewSyncProposal + | ConsensusIntentEvent::CancelPollForLatestViewSyncProposal => 1, } } } diff --git a/crates/web_server/api.toml b/crates/web_server/api.toml index 2453ca4d3f..224f786d7d 100644 --- a/crates/web_server/api.toml +++ b/crates/web_server/api.toml @@ -19,9 +19,16 @@ DOC = """ Return the VID disperse data for a given view number """ -# GET the proposal for a view, where the view is passed as an argument -[route.getrecentproposal] -PATH = ["proposal/recent"] +# GET the latest quorum proposal +[route.get_latest_quorum_proposal] +PATH = ["proposal/latest"] +DOC = """ +Return the proposal for the most recent view the server has +""" + +# GET the latest quorum proposal +[route.get_latest_view_sync_proposal] +PATH = ["view_sync_proposal/latest"] DOC = """ Return the proposal for the most recent view the server has """ diff --git a/crates/web_server/src/config.rs b/crates/web_server/src/config.rs index 4e5bf6f1db..4b6378527c 100644 --- a/crates/web_server/src/config.rs +++ b/crates/web_server/src/config.rs @@ -17,8 +17,12 @@ pub fn post_proposal_route(view_number: u64) -> String { format!("api/proposal/{view_number}") } -pub fn get_recent_proposal_route() -> String { - "api/proposal/recent".to_string() +pub fn get_latest_quorum_proposal_route() -> String { + "api/proposal/latest".to_string() +} + +pub fn get_latest_view_sync_proposal_route() -> String { + "api/view_sync_proposal/latest".to_string() } pub fn get_da_certificate_route(view_number: u64) -> String { diff --git a/crates/web_server/src/lib.rs b/crates/web_server/src/lib.rs index 636b08c70c..0ca7575c81 100644 --- a/crates/web_server/src/lib.rs +++ b/crates/web_server/src/lib.rs @@ -34,7 +34,10 @@ struct WebServerState { /// view for oldest proposals in memory oldest_proposal: u64, /// view for the most recent proposal to help nodes catchup - recent_proposal: u64, + latest_quorum_proposal: u64, + /// view for the most recent view sync proposal + latest_view_sync_proposal: u64, + /// view for teh oldest DA certificate oldest_certificate: u64, @@ -88,7 +91,8 @@ impl WebServerState { num_txns: 0, oldest_vote: 0, oldest_proposal: 0, - recent_proposal: 0, + latest_quorum_proposal: 0, + latest_view_sync_proposal: 0, oldest_certificate: 0, shutdown: None, stake_table: Vec::new(), @@ -129,7 +133,8 @@ impl WebServerState { /// Trait defining methods needed for the `WebServerState` pub trait WebServerDataSource { fn get_proposal(&self, view_number: u64) -> Result>>, Error>; - fn get_recent_proposal(&self) -> Result>>, Error>; + fn get_latest_quorum_proposal(&self) -> Result>>, Error>; + fn get_latest_view_sync_proposal(&self) -> Result>>, Error>; fn get_view_sync_proposal( &self, view_number: u64, @@ -214,8 +219,12 @@ impl WebServerDataSource for WebServerState { } } - fn get_recent_proposal(&self) -> Result>>, Error> { - self.get_proposal(self.recent_proposal) + fn get_latest_quorum_proposal(&self) -> Result>>, Error> { + self.get_proposal(self.latest_quorum_proposal) + } + + fn get_latest_view_sync_proposal(&self) -> Result>>, Error> { + self.get_view_sync_proposal(self.latest_view_sync_proposal, 0) } fn get_view_sync_proposal( @@ -433,12 +442,8 @@ impl WebServerDataSource for WebServerState { fn post_proposal(&mut self, view_number: u64, mut proposal: Vec) -> Result<(), Error> { info!("Received proposal for view {}", view_number); - if view_number > self.recent_proposal { - self.recent_proposal = view_number; - } - - if view_number > self.recent_proposal { - self.recent_proposal = view_number; + if view_number > self.latest_quorum_proposal { + self.latest_quorum_proposal = view_number; } // Only keep proposal history for MAX_VIEWS number of view @@ -480,6 +485,10 @@ impl WebServerDataSource for WebServerState { view_number: u64, proposal: Vec, ) -> Result<(), Error> { + if view_number > self.latest_view_sync_proposal { + self.latest_view_sync_proposal = view_number; + } + // Only keep proposal history for MAX_VIEWS number of view if self.view_sync_proposals.len() >= MAX_VIEWS { self.view_sync_proposals @@ -672,8 +681,11 @@ where } .boxed() })? - .get("getrecentproposal", |_req, state| { - async move { state.get_recent_proposal() }.boxed() + .get("get_latest_quorum_proposal", |_req, state| { + async move { state.get_latest_quorum_proposal() }.boxed() + })? + .get("get_latest_view_sync_proposal", |_req, state| { + async move { state.get_latest_view_sync_proposal() }.boxed() })? .get("getviewsyncproposal", |req, state| { async move {