diff --git a/crates/hotshot/src/traits/networking/web_server_network.rs b/crates/hotshot/src/traits/networking/web_server_network.rs index e0001b94ab..54c8fa158b 100644 --- a/crates/hotshot/src/traits/networking/web_server_network.rs +++ b/crates/hotshot/src/traits/networking/web_server_network.rs @@ -161,6 +161,7 @@ impl Inner { while self.running.load(Ordering::Relaxed) { let endpoint = match message_purpose { MessagePurpose::Proposal => config::get_proposal_route(view_number), + MessagePurpose::CurrentProposal => config::get_recent_proposal_route(), MessagePurpose::Vote => config::get_vote_route(view_number, vote_index), MessagePurpose::Data => config::get_transactions_route(tx_index), MessagePurpose::Internal => unimplemented!(), @@ -221,6 +222,15 @@ impl Inner { // } // } } + MessagePurpose::CurrentProposal => { + // Only pushing the first proposal since we will soon only be allowing 1 proposal per view + self.broadcast_poll_queue + .write() + .await + .push(deserialized_messages[0].clone()); + + return Ok(()); + } MessagePurpose::Vote => { // error!( // "Received {} votes from web server for view {} is da {}", @@ -500,7 +510,9 @@ impl< MessagePurpose::Proposal => config::post_proposal_route(*view_number), MessagePurpose::Vote => config::post_vote_route(*view_number), MessagePurpose::Data => config::post_transactions_route(), - MessagePurpose::Internal => return Err(WebServerNetworkError::EndpointError), + MessagePurpose::Internal | MessagePurpose::CurrentProposal => { + return Err(WebServerNetworkError::EndpointError) + } MessagePurpose::ViewSyncProposal => { // error!("Posting view sync proposal route is: {}", config::post_view_sync_proposal_route(*view_number)); config::post_view_sync_proposal_route(*view_number) @@ -783,6 +795,25 @@ impl< .await; } } + ConsensusIntentEvent::PollForCurrentProposal => { + // create new task + let (_, receiver) = unbounded(); + + async_spawn({ + let inner_clone = self.inner.clone(); + async move { + if let Err(e) = inner_clone + .poll_web_server(receiver, MessagePurpose::CurrentProposal, 1) + .await + { + error!( + "Background receive proposal polling encountered an error: {:?}", + e + ); + } + } + }); + } ConsensusIntentEvent::PollForVotes(view_number) => { let mut task_map = self.inner.vote_task_map.write().await; if let Entry::Vacant(e) = task_map.entry(view_number) { diff --git a/crates/task-impls/src/consensus.rs b/crates/task-impls/src/consensus.rs index 12d7313075..cea584f915 100644 --- a/crates/task-impls/src/consensus.rs +++ b/crates/task-impls/src/consensus.rs @@ -494,7 +494,12 @@ where // } self.cur_view = new_view; self.current_proposal = None; - + if new_view == TYPES::Time::new(1) { + self.quorum_exchange + .network() + .inject_consensus_info(ConsensusIntentEvent::PollForCurrentProposal) + .await; + } // Start polling for proposals for the new view self.quorum_exchange .network() diff --git a/crates/testing/src/spinning_task.rs b/crates/testing/src/spinning_task.rs index c8ef07d51c..a9d9e5d586 100644 --- a/crates/testing/src/spinning_task.rs +++ b/crates/testing/src/spinning_task.rs @@ -113,7 +113,8 @@ impl SpinningTaskDescription { state.late_start.remove(&idx.try_into().unwrap()) { tracing::error!("Spinning up node late"); - node.run_tasks().await; + let handle = node.run_tasks().await; + handle.hotshot.start_consensus().await; } } UpDown::Down => { diff --git a/crates/testing/src/test_runner.rs b/crates/testing/src/test_runner.rs index 57c82d24e9..8c57183eaa 100644 --- a/crates/testing/src/test_runner.rs +++ b/crates/testing/src/test_runner.rs @@ -89,11 +89,6 @@ where } } } - assert!( - late_start_nodes.len() - <= self.launcher.metadata.total_nodes - self.launcher.metadata.start_nodes, - "Test wants to late start too many nodes." - ); self.add_nodes(self.launcher.metadata.total_nodes, &late_start_nodes) .await; diff --git a/crates/testing/tests/catchup.rs b/crates/testing/tests/catchup.rs index 2cbc6c8a16..2415f4318c 100644 --- a/crates/testing/tests/catchup.rs +++ b/crates/testing/tests/catchup.rs @@ -38,7 +38,7 @@ async fn test_catchup() { metadata.total_nodes = 20; metadata.spinning_properties = SpinningTaskDescription { - node_changes: vec![(Duration::new(1, 0), catchup_nodes)], + node_changes: vec![(Duration::from_millis(1400), catchup_nodes)], }; metadata.completion_task_description = @@ -51,6 +51,7 @@ async fn test_catchup() { check_leaf: true, ..Default::default() }; + metadata.overall_safety_properties.num_failed_views = 2; metadata .gen_launcher::() @@ -59,6 +60,64 @@ async fn test_catchup() { .await; } +#[cfg(test)] +#[cfg_attr( + async_executor_impl = "tokio", + tokio::test(flavor = "multi_thread", worker_threads = 2) +)] +#[cfg_attr(async_executor_impl = "async-std", async_std::test)] +async fn test_catchup_web() { + use std::time::Duration; + + use hotshot_testing::{ + completion_task::{CompletionTaskDescription, TimeBasedCompletionTaskDescription}, + node_types::{SequencingTestTypes, SequencingWebImpl}, + overall_safety_task::OverallSafetyPropertiesDescription, + spinning_task::{ChangeNode, SpinningTaskDescription, UpDown}, + test_builder::{TestMetadata, TimingData}, + }; + + async_compatibility_layer::logging::setup_logging(); + async_compatibility_layer::logging::setup_backtrace(); + let timing_data = TimingData { + next_view_timeout: 1000, + ..Default::default() + }; + let mut metadata = TestMetadata::default(); + let catchup_nodes = vec![ChangeNode { + idx: 18, + updown: UpDown::Up, + }]; + + metadata.timing_data = timing_data; + metadata.start_nodes = 20; + metadata.total_nodes = 20; + + metadata.spinning_properties = SpinningTaskDescription { + node_changes: vec![(Duration::from_millis(2500), catchup_nodes)], + }; + + metadata.completion_task_description = + CompletionTaskDescription::TimeBasedCompletionTaskBuilder( + TimeBasedCompletionTaskDescription { + duration: Duration::from_millis(100000), + }, + ); + metadata.overall_safety_properties = OverallSafetyPropertiesDescription { + check_leaf: true, + ..Default::default() + }; + + // only alow for the view which the catchup node hasn't started to fail + metadata.overall_safety_properties.num_failed_views = 1; + + metadata + .gen_launcher::() + .launch() + .run_test() + .await; +} + /// Test that one node catches up and has sucessful views after coming back #[cfg(test)] #[cfg_attr( @@ -94,13 +153,13 @@ async fn test_catchup_one_node() { metadata.total_nodes = 20; metadata.spinning_properties = SpinningTaskDescription { - node_changes: vec![(Duration::new(1, 0), catchup_nodes)], + node_changes: vec![(Duration::from_millis(400), catchup_nodes)], }; metadata.completion_task_description = CompletionTaskDescription::TimeBasedCompletionTaskBuilder( TimeBasedCompletionTaskDescription { - duration: Duration::from_millis(100000), + duration: Duration::from_millis(20000), }, ); metadata.overall_safety_properties = OverallSafetyPropertiesDescription { diff --git a/crates/testing/tests/timeout.rs b/crates/testing/tests/timeout.rs index 7ebeddd577..f8963c9d52 100644 --- a/crates/testing/tests/timeout.rs +++ b/crates/testing/tests/timeout.rs @@ -30,7 +30,7 @@ async fn test_timeout() { metadata.timing_data = timing_data; metadata.spinning_properties = SpinningTaskDescription { - node_changes: vec![(Duration::new(1, 0), dead_nodes)], + node_changes: vec![(Duration::new(0, 5000), dead_nodes)], }; // TODO ED Add safety task, etc to confirm TCs are being formed diff --git a/crates/types/src/message.rs b/crates/types/src/message.rs index 89d2864aa9..c499309a43 100644 --- a/crates/types/src/message.rs +++ b/crates/types/src/message.rs @@ -57,6 +57,8 @@ pub struct Messages>(pub Vec *view_number, + ConsensusIntentEvent::PollForCurrentProposal => 1, } } } diff --git a/crates/web_server/api.toml b/crates/web_server/api.toml index 5f749bcdb0..cc610fc9c9 100644 --- a/crates/web_server/api.toml +++ b/crates/web_server/api.toml @@ -11,6 +11,13 @@ DOC = """ Return the proposal for a given view number """ +# GET the proposal for a view, where the view is passed as an argument +[route.getrecentproposal] +PATH = ["proposal/"] +DOC = """ +Return the proposal for the most recent view the server has +""" + # POST a proposal, where the view is passed as an argument [route.postproposal] PATH = ["proposal/:view_number"] diff --git a/crates/web_server/src/config.rs b/crates/web_server/src/config.rs index 1da2781c55..c06f365bd3 100644 --- a/crates/web_server/src/config.rs +++ b/crates/web_server/src/config.rs @@ -17,6 +17,10 @@ pub fn post_proposal_route(view_number: u64) -> String { format!("api/proposal/{view_number}") } +pub fn get_recent_proposal_route() -> String { + "api/proposal".to_string() +} + pub fn get_da_certificate_route(view_number: u64) -> String { format!("api/certificate/{view_number}") } diff --git a/crates/web_server/src/lib.rs b/crates/web_server/src/lib.rs index ac2730b448..e29ce036e8 100644 --- a/crates/web_server/src/lib.rs +++ b/crates/web_server/src/lib.rs @@ -33,6 +33,8 @@ struct WebServerState { da_certificates: HashMap)>, /// view for oldest proposals in memory oldest_proposal: u64, + /// view for the most recent proposal to help nodes catchup + recent_proposal: u64, /// view for teh oldest DA certificate oldest_certificate: u64, @@ -74,6 +76,7 @@ impl WebServerState { num_txns: 0, oldest_vote: 0, oldest_proposal: 0, + recent_proposal: 0, oldest_certificate: 0, shutdown: None, stake_table: Vec::new(), @@ -101,6 +104,7 @@ impl WebServerState { /// Trait defining methods needed for the `WebServerState` pub trait WebServerDataSource { fn get_proposal(&self, view_number: u64) -> Result>>, Error>; + fn get_recent_proposal(&self) -> Result>>, Error>; fn get_view_sync_proposal( &self, view_number: u64, @@ -156,6 +160,10 @@ impl WebServerDataSource for WebServerState { } } + fn get_recent_proposal(&self) -> Result>>, Error> { + self.get_proposal(self.recent_proposal) + } + fn get_view_sync_proposal( &self, view_number: u64, @@ -316,6 +324,10 @@ impl WebServerDataSource for WebServerState { fn post_proposal(&mut self, view_number: u64, mut proposal: Vec) -> Result<(), Error> { debug!("Received proposal for view {}", view_number); + if view_number > self.recent_proposal { + self.recent_proposal = view_number; + } + // Only keep proposal history for MAX_VIEWS number of view if self.proposals.len() >= MAX_VIEWS { self.proposals.remove(&self.oldest_proposal); @@ -495,6 +507,9 @@ where } .boxed() })? + .get("getrecentproposal", |_req, state| { + async move { state.get_recent_proposal() }.boxed() + })? .get("getviewsyncproposal", |req, state| { async move { let view_number: u64 = req.integer_param("view_number")?;