Skip to content

Commit

Permalink
[Stability] Poll for latest view sync cert (#2246)
Browse files Browse the repository at this point in the history
* add polling for latest view sync cert

* update comment

* add cancelling to view change event
  • Loading branch information
rob-maron authored Dec 18, 2023
1 parent c9bf7b6 commit 251a3e6
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 40 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/hotshot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ local-ip-address = "0.5.6"
dyn-clone = { git = "https://github.com/dtolnay/dyn-clone", tag = "1.0.16" }
derive_more = "0.99.17"
portpicker = "0.1.1"
lru = "0.12.1"

tracing = { workspace = true }
typenum = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/hotshot/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ pub async fn add_consensus_task<TYPES: NodeType, I: NodeImplementation<TYPES>>(
};
consensus_state
.quorum_network
.inject_consensus_info(ConsensusIntentEvent::PollForCurrentProposal)
.inject_consensus_info(ConsensusIntentEvent::PollForLatestQuorumProposal)
.await;
let filter = FilterEvent(Arc::new(consensus_event_filter));
let consensus_name = "Consensus Task";
Expand Down
111 changes: 98 additions & 13 deletions crates/hotshot/src/traits/networking/web_server_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ use hotshot_types::{
},
};
use hotshot_web_server::{self, config};
use lru::LruCache;
use serde::{Deserialize, Serialize};
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::num::NonZeroUsize;
use surf_disco::Url;

use hotshot_types::traits::network::ViewMessage;
Expand All @@ -53,6 +57,15 @@ impl<TYPES: NodeType> WebCommChannel<TYPES> {
}
}

/// # Note
///
/// This function uses `DefaultHasher` instead of cryptographic hash functions like SHA-256 because of an `AsRef` requirement.
fn hash<T: Hash>(t: &T) -> u64 {
let mut s = DefaultHasher::new();
t.hash(&mut s);
s.finish()
}

/// The web server network state
#[derive(Clone, Debug)]
pub struct WebServerNetwork<TYPES: NodeType> {
Expand Down Expand Up @@ -205,9 +218,10 @@ struct Inner<TYPES: NodeType> {
view_sync_vote_task_map: Arc<RwLock<TaskMap<TYPES::SignatureKey>>>,
/// Task map for transactions
txn_task_map: Arc<RwLock<TaskMap<TYPES::SignatureKey>>>,
/// Task polling for current propsal
current_proposal_task:
Arc<RwLock<Option<UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task polling for latest quorum propsal
latest_quorum_proposal_task: Arc<RwLock<Option<TaskChannel<TYPES::SignatureKey>>>>,
/// Task polling for latest view sync proposal
latest_view_sync_proposal_task: Arc<RwLock<Option<TaskChannel<TYPES::SignatureKey>>>>,
}

impl<TYPES: NodeType> Inner<TYPES> {
Expand All @@ -221,6 +235,7 @@ impl<TYPES: NodeType> Inner<TYPES> {
) -> Result<(), NetworkError> {
let mut vote_index = 0;
let mut tx_index = 0;
let mut seen_proposals = LruCache::new(NonZeroUsize::new(100).unwrap());

if message_purpose == MessagePurpose::Data {
tx_index = *self.tx_index.read().await;
Expand All @@ -230,7 +245,10 @@ impl<TYPES: NodeType> Inner<TYPES> {
while self.running.load(Ordering::Relaxed) {
let endpoint = match message_purpose {
MessagePurpose::Proposal => config::get_proposal_route(view_number),
MessagePurpose::CurrentProposal => config::get_recent_proposal_route(),
MessagePurpose::LatestQuorumProposal => config::get_latest_quorum_proposal_route(),
MessagePurpose::LatestViewSyncProposal => {
config::get_latest_view_sync_proposal_route()
}
MessagePurpose::Vote => config::get_vote_route(view_number, vote_index),
MessagePurpose::Data => config::get_transactions_route(tx_index),
MessagePurpose::Internal => unimplemented!(),
Expand Down Expand Up @@ -292,7 +310,7 @@ impl<TYPES: NodeType> Inner<TYPES> {
// }
// }
}
MessagePurpose::CurrentProposal => {
MessagePurpose::LatestQuorumProposal => {
// Only pushing the first proposal since we will soon only be allowing 1 proposal per view
self.broadcast_poll_queue
.write()
Expand All @@ -301,6 +319,20 @@ impl<TYPES: NodeType> Inner<TYPES> {

return Ok(());
}
MessagePurpose::LatestViewSyncProposal => {
let mut broadcast_poll_queue =
self.broadcast_poll_queue.write().await;

for cert in &deserialized_messages {
let hash = hash(&cert);
if seen_proposals.put(hash, ()).is_none() {
broadcast_poll_queue.push(cert.clone());
}
}

// additional sleep to reduce load on web server
async_sleep(Duration::from_millis(300)).await;
}
MessagePurpose::Vote => {
// error!(
// "Received {} votes from web server for view {} is da {}",
Expand Down Expand Up @@ -374,7 +406,10 @@ impl<TYPES: NodeType> Inner<TYPES> {
// TODO ED Need to add vote indexing to web server for view sync certs
for cert in &deserialized_messages {
vote_index += 1;
broadcast_poll_queue.push(cert.clone());
let hash = hash(cert);
if seen_proposals.put(hash, ()).is_none() {
broadcast_poll_queue.push(cert.clone());
}
}
}

Expand Down Expand Up @@ -420,6 +455,10 @@ impl<TYPES: NodeType> Inner<TYPES> {
}
}

ConsensusIntentEvent::CancelPollForLatestViewSyncProposal => {
return Ok(());
}

_ => {
unimplemented!()
}
Expand Down Expand Up @@ -499,7 +538,7 @@ pub struct SendMsg<M: NetworkMsg> {
}

/// A message being received from the web server
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Hash)]
#[serde(bound(deserialize = ""))]
pub struct RecvMsg<M: NetworkMsg> {
/// The optional message being received
Expand Down Expand Up @@ -571,7 +610,8 @@ impl<TYPES: NodeType + 'static> WebServerNetwork<TYPES> {
view_sync_cert_task_map: Arc::default(),
view_sync_vote_task_map: Arc::default(),
txn_task_map: Arc::default(),
current_proposal_task: Arc::default(),
latest_quorum_proposal_task: Arc::default(),
latest_view_sync_proposal_task: Arc::default(),
});

inner.connected.store(true, Ordering::Relaxed);
Expand All @@ -593,7 +633,9 @@ impl<TYPES: NodeType + 'static> WebServerNetwork<TYPES> {
MessagePurpose::Proposal => config::post_proposal_route(*view_number),
MessagePurpose::Vote => config::post_vote_route(*view_number),
MessagePurpose::Data => config::post_transactions_route(),
MessagePurpose::Internal | MessagePurpose::CurrentProposal => {
MessagePurpose::Internal
| MessagePurpose::LatestQuorumProposal
| MessagePurpose::LatestViewSyncProposal => {
return Err(WebServerNetworkError::EndpointError)
}
MessagePurpose::ViewSyncProposal => {
Expand Down Expand Up @@ -907,8 +949,8 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
.prune_tasks(view_number, ConsensusIntentEvent::CancelPollForVIDDisperse)
.await;
}
ConsensusIntentEvent::PollForCurrentProposal => {
let mut proposal_task = self.inner.current_proposal_task.write().await;
ConsensusIntentEvent::PollForLatestQuorumProposal => {
let mut proposal_task = self.inner.latest_quorum_proposal_task.write().await;
if proposal_task.is_none() {
// create new task
let (sender, receiver) = unbounded();
Expand All @@ -918,20 +960,52 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
let inner_clone = self.inner.clone();
async move {
if let Err(e) = inner_clone
.poll_web_server(receiver, MessagePurpose::CurrentProposal, 1)
.poll_web_server(receiver, MessagePurpose::LatestQuorumProposal, 1)
.await
{
warn!(
"Background receive proposal polling encountered an error: {:?}",
e
);
}
let mut proposal_task = inner_clone.current_proposal_task.write().await;
let mut proposal_task =
inner_clone.latest_quorum_proposal_task.write().await;
*proposal_task = None;
}
});
}
}
ConsensusIntentEvent::PollForLatestViewSyncProposal => {
let mut latest_view_sync_proposal_task =
self.inner.latest_view_sync_proposal_task.write().await;
if latest_view_sync_proposal_task.is_none() {
// create new task
let (sender, receiver) = unbounded();
*latest_view_sync_proposal_task = Some(sender);

async_spawn({
let inner_clone = self.inner.clone();
async move {
if let Err(e) = inner_clone
.poll_web_server(
receiver,
MessagePurpose::LatestViewSyncProposal,
1,
)
.await
{
warn!(
"Background receive proposal polling encountered an error: {:?}",
e
);
}
let mut latest_view_sync_proposal_task =
inner_clone.latest_view_sync_proposal_task.write().await;
*latest_view_sync_proposal_task = None;
}
});
}
}
ConsensusIntentEvent::PollForVotes(view_number) => {
let mut task_map = self.inner.vote_task_map.write().await;
if let Entry::Vacant(e) = task_map.entry(view_number) {
Expand Down Expand Up @@ -1005,6 +1079,17 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
}
}

ConsensusIntentEvent::CancelPollForLatestViewSyncProposal => {
let mut latest_view_sync_proposal_task =
self.inner.latest_view_sync_proposal_task.write().await;

if let Some(thing) = latest_view_sync_proposal_task.take() {
let _res = thing
.send(ConsensusIntentEvent::CancelPollForLatestViewSyncProposal)
.await;
}
}

ConsensusIntentEvent::PollForViewSyncCertificate(view_number) => {
let mut task_map = self.inner.view_sync_cert_task_map.write().await;
if let Entry::Vacant(e) = task_map.entry(view_number) {
Expand Down
27 changes: 24 additions & 3 deletions crates/task-impls/src/view_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,14 @@ impl<

// Garbage collect old tasks
// We could put this into a separate async task, but that would require making several fields on ViewSyncTaskState thread-safe and harm readability. In the common case this will have zero tasks to clean up.
// cancel poll for votes
self.network
.inject_consensus_info(
ConsensusIntentEvent::CancelPollForLatestViewSyncProposal,
)
.await;

// run GC
for i in *self.last_garbage_collected_view..*self.current_view {
if let Some((_key, replica_task_info)) =
self.replica_task_map.remove_entry(&TYPES::Time::new(i))
Expand Down Expand Up @@ -502,6 +510,12 @@ impl<
*view_number + 1,
))
.await;

// Poll for future view sync certificates
self.network
.inject_consensus_info(ConsensusIntentEvent::PollForLatestViewSyncProposal)
.await;

// Spawn replica task
let next_view = *view_number + 1;
// Subscribe to the view after we are leader since we know we won't propose in the next view if we are leader.
Expand All @@ -521,7 +535,7 @@ impl<
// Also subscribe to the latest view for the same reason. The GC will remove the above poll
// in the case that one doesn't resolve but this one does.
self.network
.inject_consensus_info(ConsensusIntentEvent::PollForCurrentProposal)
.inject_consensus_info(ConsensusIntentEvent::PollForLatestQuorumProposal)
.await;

self.network
Expand Down Expand Up @@ -829,6 +843,13 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
))
.await;

// Cancel poll for future view sync certificates
self.network
.inject_consensus_info(
ConsensusIntentEvent::CancelPollForLatestViewSyncProposal,
)
.await;

self.phase = last_seen_certificate;

if certificate.get_data().relay > self.relay {
Expand Down Expand Up @@ -895,9 +916,9 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
if let Some(timeout_task) = self.timeout_task.take() {
cancel_task(timeout_task).await;
}
// Keep tyring to get a more recent proposal to catch up to
// Keep trying to get a more recent proposal to catch up to
self.network
.inject_consensus_info(ConsensusIntentEvent::PollForCurrentProposal)
.inject_consensus_info(ConsensusIntentEvent::PollForLatestQuorumProposal)
.await;
self.relay += 1;
match self.phase {
Expand Down
6 changes: 4 additions & 2 deletions crates/types/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ pub struct Messages<TYPES: NodeType>(pub Vec<Message<TYPES>>);
pub enum MessagePurpose {
/// Message with a quorum proposal.
Proposal,
/// Message with most recent proposal the server has
CurrentProposal,
/// Message with most recent quorum proposal the server has
LatestQuorumProposal,
/// Message with most recent view sync proposal the server has
LatestViewSyncProposal,
/// Message with a quorum vote.
Vote,
/// Message with a view sync vote.
Expand Down
12 changes: 9 additions & 3 deletions crates/types/src/traits/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,10 @@ pub enum ConsensusIntentEvent<K: SignatureKey> {
PollForProposal(u64),
/// Poll for VID disperse data for a particular view
PollForVIDDisperse(u64),
/// Poll for the most recent proposal the webserver has
PollForCurrentProposal,
/// Poll for the most recent quorum proposal the webserver has
PollForLatestQuorumProposal,
/// Poll for the most recent view sync proposal the webserver has
PollForLatestViewSyncProposal,
/// Poll for a DAC for a particular view
PollForDAC(u64),
/// Poll for view sync votes starting at a particular view
Expand All @@ -172,6 +174,8 @@ pub enum ConsensusIntentEvent<K: SignatureKey> {
CancelPollForVIDDisperse(u64),
/// Cancel polling for transactions
CancelPollForTransactions(u64),
/// Cancel polling for most recent view sync proposal
CancelPollForLatestViewSyncProposal,
}

impl<K: SignatureKey> ConsensusIntentEvent<K> {
Expand All @@ -194,7 +198,9 @@ impl<K: SignatureKey> ConsensusIntentEvent<K> {
| ConsensusIntentEvent::PollForTransactions(view_number)
| ConsensusIntentEvent::CancelPollForTransactions(view_number)
| ConsensusIntentEvent::PollFutureLeader(view_number, _) => *view_number,
ConsensusIntentEvent::PollForCurrentProposal => 1,
ConsensusIntentEvent::PollForLatestQuorumProposal
| ConsensusIntentEvent::PollForLatestViewSyncProposal
| ConsensusIntentEvent::CancelPollForLatestViewSyncProposal => 1,
}
}
}
Expand Down
13 changes: 10 additions & 3 deletions crates/web_server/api.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,16 @@ DOC = """
Return the VID disperse data for a given view number
"""

# GET the proposal for a view, where the view is passed as an argument
[route.getrecentproposal]
PATH = ["proposal/recent"]
# GET the latest quorum proposal
[route.get_latest_quorum_proposal]
PATH = ["proposal/latest"]
DOC = """
Return the proposal for the most recent view the server has
"""

# GET the latest quorum proposal
[route.get_latest_view_sync_proposal]
PATH = ["view_sync_proposal/latest"]
DOC = """
Return the proposal for the most recent view the server has
"""
Expand Down
8 changes: 6 additions & 2 deletions crates/web_server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@ pub fn post_proposal_route(view_number: u64) -> String {
format!("api/proposal/{view_number}")
}

pub fn get_recent_proposal_route() -> String {
"api/proposal/recent".to_string()
pub fn get_latest_quorum_proposal_route() -> String {
"api/proposal/latest".to_string()
}

pub fn get_latest_view_sync_proposal_route() -> String {
"api/view_sync_proposal/latest".to_string()
}

pub fn get_da_certificate_route(view_number: u64) -> String {
Expand Down
Loading

0 comments on commit 251a3e6

Please sign in to comment.