Skip to content

Commit

Permalink
Merge pull request #1907 from EspressoSystems/bf/cherrypick-webserver…
Browse files Browse the repository at this point in the history
…-catchup

Cherry-pick webserver catchup endpoint
  • Loading branch information
bfish713 authored Oct 13, 2023
2 parents b5c6026 + 30bae78 commit a5f311c
Show file tree
Hide file tree
Showing 11 changed files with 134 additions and 12 deletions.
33 changes: 32 additions & 1 deletion crates/hotshot/src/traits/networking/web_server_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ impl<M: NetworkMsg, KEY: SignatureKey, TYPES: NodeType> Inner<M, KEY, TYPES> {
while self.running.load(Ordering::Relaxed) {
let endpoint = match message_purpose {
MessagePurpose::Proposal => config::get_proposal_route(view_number),
MessagePurpose::CurrentProposal => config::get_recent_proposal_route(),
MessagePurpose::Vote => config::get_vote_route(view_number, vote_index),
MessagePurpose::Data => config::get_transactions_route(tx_index),
MessagePurpose::Internal => unimplemented!(),
Expand Down Expand Up @@ -221,6 +222,15 @@ impl<M: NetworkMsg, KEY: SignatureKey, TYPES: NodeType> Inner<M, KEY, TYPES> {
// }
// }
}
MessagePurpose::CurrentProposal => {
// Only pushing the first proposal since we will soon only be allowing 1 proposal per view
self.broadcast_poll_queue
.write()
.await
.push(deserialized_messages[0].clone());

return Ok(());
}
MessagePurpose::Vote => {
// error!(
// "Received {} votes from web server for view {} is da {}",
Expand Down Expand Up @@ -500,7 +510,9 @@ impl<
MessagePurpose::Proposal => config::post_proposal_route(*view_number),
MessagePurpose::Vote => config::post_vote_route(*view_number),
MessagePurpose::Data => config::post_transactions_route(),
MessagePurpose::Internal => return Err(WebServerNetworkError::EndpointError),
MessagePurpose::Internal | MessagePurpose::CurrentProposal => {
return Err(WebServerNetworkError::EndpointError)
}
MessagePurpose::ViewSyncProposal => {
// error!("Posting view sync proposal route is: {}", config::post_view_sync_proposal_route(*view_number));
config::post_view_sync_proposal_route(*view_number)
Expand Down Expand Up @@ -783,6 +795,25 @@ impl<
.await;
}
}
ConsensusIntentEvent::PollForCurrentProposal => {
// create new task
let (_, receiver) = unbounded();

async_spawn({
let inner_clone = self.inner.clone();
async move {
if let Err(e) = inner_clone
.poll_web_server(receiver, MessagePurpose::CurrentProposal, 1)
.await
{
error!(
"Background receive proposal polling encountered an error: {:?}",
e
);
}
}
});
}
ConsensusIntentEvent::PollForVotes(view_number) => {
let mut task_map = self.inner.vote_task_map.write().await;
if let Entry::Vacant(e) = task_map.entry(view_number) {
Expand Down
7 changes: 6 additions & 1 deletion crates/task-impls/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,12 @@ where
// }
self.cur_view = new_view;
self.current_proposal = None;

if new_view == TYPES::Time::new(1) {
self.quorum_exchange
.network()
.inject_consensus_info(ConsensusIntentEvent::PollForCurrentProposal)
.await;
}
// Start polling for proposals for the new view
self.quorum_exchange
.network()
Expand Down
3 changes: 2 additions & 1 deletion crates/testing/src/spinning_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ impl SpinningTaskDescription {
state.late_start.remove(&idx.try_into().unwrap())
{
tracing::error!("Spinning up node late");
node.run_tasks().await;
let handle = node.run_tasks().await;
handle.hotshot.start_consensus().await;
}
}
UpDown::Down => {
Expand Down
5 changes: 0 additions & 5 deletions crates/testing/src/test_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,6 @@ where
}
}
}
assert!(
late_start_nodes.len()
<= self.launcher.metadata.total_nodes - self.launcher.metadata.start_nodes,
"Test wants to late start too many nodes."
);

self.add_nodes(self.launcher.metadata.total_nodes, &late_start_nodes)
.await;
Expand Down
65 changes: 62 additions & 3 deletions crates/testing/tests/catchup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async fn test_catchup() {
metadata.total_nodes = 20;

metadata.spinning_properties = SpinningTaskDescription {
node_changes: vec![(Duration::new(1, 0), catchup_nodes)],
node_changes: vec![(Duration::from_millis(1400), catchup_nodes)],
};

metadata.completion_task_description =
Expand All @@ -51,6 +51,7 @@ async fn test_catchup() {
check_leaf: true,
..Default::default()
};
metadata.overall_safety_properties.num_failed_views = 2;

metadata
.gen_launcher::<SequencingTestTypes, SequencingMemoryImpl>()
Expand All @@ -59,6 +60,64 @@ async fn test_catchup() {
.await;
}

#[cfg(test)]
#[cfg_attr(
async_executor_impl = "tokio",
tokio::test(flavor = "multi_thread", worker_threads = 2)
)]
#[cfg_attr(async_executor_impl = "async-std", async_std::test)]
async fn test_catchup_web() {
use std::time::Duration;

use hotshot_testing::{
completion_task::{CompletionTaskDescription, TimeBasedCompletionTaskDescription},
node_types::{SequencingTestTypes, SequencingWebImpl},
overall_safety_task::OverallSafetyPropertiesDescription,
spinning_task::{ChangeNode, SpinningTaskDescription, UpDown},
test_builder::{TestMetadata, TimingData},
};

async_compatibility_layer::logging::setup_logging();
async_compatibility_layer::logging::setup_backtrace();
let timing_data = TimingData {
next_view_timeout: 1000,
..Default::default()
};
let mut metadata = TestMetadata::default();
let catchup_nodes = vec![ChangeNode {
idx: 18,
updown: UpDown::Up,
}];

metadata.timing_data = timing_data;
metadata.start_nodes = 20;
metadata.total_nodes = 20;

metadata.spinning_properties = SpinningTaskDescription {
node_changes: vec![(Duration::from_millis(2500), catchup_nodes)],
};

metadata.completion_task_description =
CompletionTaskDescription::TimeBasedCompletionTaskBuilder(
TimeBasedCompletionTaskDescription {
duration: Duration::from_millis(100000),
},
);
metadata.overall_safety_properties = OverallSafetyPropertiesDescription {
check_leaf: true,
..Default::default()
};

// only alow for the view which the catchup node hasn't started to fail
metadata.overall_safety_properties.num_failed_views = 1;

metadata
.gen_launcher::<SequencingTestTypes, SequencingWebImpl>()
.launch()
.run_test()
.await;
}

/// Test that one node catches up and has sucessful views after coming back
#[cfg(test)]
#[cfg_attr(
Expand Down Expand Up @@ -94,13 +153,13 @@ async fn test_catchup_one_node() {
metadata.total_nodes = 20;

metadata.spinning_properties = SpinningTaskDescription {
node_changes: vec![(Duration::new(1, 0), catchup_nodes)],
node_changes: vec![(Duration::from_millis(400), catchup_nodes)],
};

metadata.completion_task_description =
CompletionTaskDescription::TimeBasedCompletionTaskBuilder(
TimeBasedCompletionTaskDescription {
duration: Duration::from_millis(100000),
duration: Duration::from_millis(20000),
},
);
metadata.overall_safety_properties = OverallSafetyPropertiesDescription {
Expand Down
2 changes: 1 addition & 1 deletion crates/testing/tests/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn test_timeout() {
metadata.timing_data = timing_data;

metadata.spinning_properties = SpinningTaskDescription {
node_changes: vec![(Duration::new(1, 0), dead_nodes)],
node_changes: vec![(Duration::new(0, 5000), dead_nodes)],
};

// TODO ED Add safety task, etc to confirm TCs are being formed
Expand Down
2 changes: 2 additions & 0 deletions crates/types/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ pub struct Messages<TYPES: NodeType, I: NodeImplementation<TYPES>>(pub Vec<Messa
pub enum MessagePurpose {
/// Message with a quorum proposal.
Proposal,
/// Message with most recent proposal the server has
CurrentProposal,
/// Message with a quorum vote.
Vote,
/// Message with a view sync vote.
Expand Down
3 changes: 3 additions & 0 deletions crates/types/src/traits/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ pub enum ConsensusIntentEvent {
PollForVotes(u64),
/// Poll for a proposal for a particular view
PollForProposal(u64),
/// Poll for the most recent proposal the webserver has
PollForCurrentProposal,
/// Poll for a DAC for a particular view
PollForDAC(u64),
/// Poll for view sync votes starting at a particular view
Expand Down Expand Up @@ -174,6 +176,7 @@ impl ConsensusIntentEvent {
| ConsensusIntentEvent::PollForViewSyncCertificate(view_number)
| ConsensusIntentEvent::PollForTransactions(view_number)
| ConsensusIntentEvent::CancelPollForTransactions(view_number) => *view_number,
ConsensusIntentEvent::PollForCurrentProposal => 1,
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions crates/web_server/api.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ DOC = """
Return the proposal for a given view number
"""

# GET the proposal for a view, where the view is passed as an argument
[route.getrecentproposal]
PATH = ["proposal/"]
DOC = """
Return the proposal for the most recent view the server has
"""

# POST a proposal, where the view is passed as an argument
[route.postproposal]
PATH = ["proposal/:view_number"]
Expand Down
4 changes: 4 additions & 0 deletions crates/web_server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ pub fn post_proposal_route(view_number: u64) -> String {
format!("api/proposal/{view_number}")
}

pub fn get_recent_proposal_route() -> String {
"api/proposal".to_string()
}

pub fn get_da_certificate_route(view_number: u64) -> String {
format!("api/certificate/{view_number}")
}
Expand Down
15 changes: 15 additions & 0 deletions crates/web_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ struct WebServerState<KEY> {
da_certificates: HashMap<u64, (String, Vec<u8>)>,
/// view for oldest proposals in memory
oldest_proposal: u64,
/// view for the most recent proposal to help nodes catchup
recent_proposal: u64,
/// view for teh oldest DA certificate
oldest_certificate: u64,

Expand Down Expand Up @@ -74,6 +76,7 @@ impl<KEY: SignatureKey + 'static> WebServerState<KEY> {
num_txns: 0,
oldest_vote: 0,
oldest_proposal: 0,
recent_proposal: 0,
oldest_certificate: 0,
shutdown: None,
stake_table: Vec::new(),
Expand Down Expand Up @@ -101,6 +104,7 @@ impl<KEY: SignatureKey + 'static> WebServerState<KEY> {
/// Trait defining methods needed for the `WebServerState`
pub trait WebServerDataSource<KEY> {
fn get_proposal(&self, view_number: u64) -> Result<Option<Vec<Vec<u8>>>, Error>;
fn get_recent_proposal(&self) -> Result<Option<Vec<Vec<u8>>>, Error>;
fn get_view_sync_proposal(
&self,
view_number: u64,
Expand Down Expand Up @@ -156,6 +160,10 @@ impl<KEY: SignatureKey> WebServerDataSource<KEY> for WebServerState<KEY> {
}
}

fn get_recent_proposal(&self) -> Result<Option<Vec<Vec<u8>>>, Error> {
self.get_proposal(self.recent_proposal)
}

fn get_view_sync_proposal(
&self,
view_number: u64,
Expand Down Expand Up @@ -316,6 +324,10 @@ impl<KEY: SignatureKey> WebServerDataSource<KEY> for WebServerState<KEY> {
fn post_proposal(&mut self, view_number: u64, mut proposal: Vec<u8>) -> Result<(), Error> {
debug!("Received proposal for view {}", view_number);

if view_number > self.recent_proposal {
self.recent_proposal = view_number;
}

// Only keep proposal history for MAX_VIEWS number of view
if self.proposals.len() >= MAX_VIEWS {
self.proposals.remove(&self.oldest_proposal);
Expand Down Expand Up @@ -495,6 +507,9 @@ where
}
.boxed()
})?
.get("getrecentproposal", |_req, state| {
async move { state.get_recent_proposal() }.boxed()
})?
.get("getviewsyncproposal", |req, state| {
async move {
let view_number: u64 = req.integer_param("view_number")?;
Expand Down

0 comments on commit a5f311c

Please sign in to comment.