Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Stability] Poll for latest view sync cert #2246

Merged
merged 3 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/hotshot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ local-ip-address = "0.5.6"
dyn-clone = { git = "https://github.com/dtolnay/dyn-clone", tag = "1.0.16" }
derive_more = "0.99.17"
portpicker = "0.1.1"
lru = "0.12.1"

tracing = { workspace = true }
typenum = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/hotshot/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ pub async fn add_consensus_task<TYPES: NodeType, I: NodeImplementation<TYPES>>(
};
consensus_state
.quorum_network
.inject_consensus_info(ConsensusIntentEvent::PollForCurrentProposal)
.inject_consensus_info(ConsensusIntentEvent::PollForLatestQuorumProposal)
.await;
let filter = FilterEvent(Arc::new(consensus_event_filter));
let consensus_name = "Consensus Task";
Expand Down
111 changes: 98 additions & 13 deletions crates/hotshot/src/traits/networking/web_server_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ use hotshot_types::{
},
};
use hotshot_web_server::{self, config};
use lru::LruCache;
use serde::{Deserialize, Serialize};
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::num::NonZeroUsize;
use surf_disco::Url;

use hotshot_types::traits::network::ViewMessage;
Expand All @@ -53,6 +57,15 @@ impl<TYPES: NodeType> WebCommChannel<TYPES> {
}
}

/// # Note
///
/// This function uses `DefaultHasher` instead of cryptographic hash functions like SHA-256 because of an `AsRef` requirement.
fn hash<T: Hash>(t: &T) -> u64 {
let mut s = DefaultHasher::new();
t.hash(&mut s);
s.finish()
}

/// The web server network state
#[derive(Clone, Debug)]
pub struct WebServerNetwork<TYPES: NodeType> {
Expand Down Expand Up @@ -205,9 +218,10 @@ struct Inner<TYPES: NodeType> {
view_sync_vote_task_map: Arc<RwLock<TaskMap<TYPES::SignatureKey>>>,
/// Task map for transactions
txn_task_map: Arc<RwLock<TaskMap<TYPES::SignatureKey>>>,
/// Task polling for current propsal
current_proposal_task:
Arc<RwLock<Option<UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task polling for latest quorum propsal
latest_quorum_proposal_task: Arc<RwLock<Option<TaskChannel<TYPES::SignatureKey>>>>,
/// Task polling for latest view sync proposal
latest_view_sync_proposal_task: Arc<RwLock<Option<TaskChannel<TYPES::SignatureKey>>>>,
}

impl<TYPES: NodeType> Inner<TYPES> {
Expand All @@ -221,6 +235,7 @@ impl<TYPES: NodeType> Inner<TYPES> {
) -> Result<(), NetworkError> {
let mut vote_index = 0;
let mut tx_index = 0;
let mut seen_proposals = LruCache::new(NonZeroUsize::new(100).unwrap());
rob-maron marked this conversation as resolved.
Show resolved Hide resolved

if message_purpose == MessagePurpose::Data {
tx_index = *self.tx_index.read().await;
Expand All @@ -230,7 +245,10 @@ impl<TYPES: NodeType> Inner<TYPES> {
while self.running.load(Ordering::Relaxed) {
let endpoint = match message_purpose {
MessagePurpose::Proposal => config::get_proposal_route(view_number),
MessagePurpose::CurrentProposal => config::get_recent_proposal_route(),
MessagePurpose::LatestQuorumProposal => config::get_latest_quorum_proposal_route(),
rob-maron marked this conversation as resolved.
Show resolved Hide resolved
MessagePurpose::LatestViewSyncProposal => {
config::get_latest_view_sync_proposal_route()
}
MessagePurpose::Vote => config::get_vote_route(view_number, vote_index),
MessagePurpose::Data => config::get_transactions_route(tx_index),
MessagePurpose::Internal => unimplemented!(),
Expand Down Expand Up @@ -292,7 +310,7 @@ impl<TYPES: NodeType> Inner<TYPES> {
// }
// }
}
MessagePurpose::CurrentProposal => {
MessagePurpose::LatestQuorumProposal => {
// Only pushing the first proposal since we will soon only be allowing 1 proposal per view
self.broadcast_poll_queue
.write()
Expand All @@ -301,6 +319,20 @@ impl<TYPES: NodeType> Inner<TYPES> {

return Ok(());
}
MessagePurpose::LatestViewSyncProposal => {
let mut broadcast_poll_queue =
self.broadcast_poll_queue.write().await;

for cert in &deserialized_messages {
let hash = hash(&cert);
if seen_proposals.put(hash, ()).is_none() {
broadcast_poll_queue.push(cert.clone());
}
}

// additional sleep to reduce load on web server
async_sleep(Duration::from_millis(300)).await;
}
MessagePurpose::Vote => {
// error!(
// "Received {} votes from web server for view {} is da {}",
Expand Down Expand Up @@ -374,7 +406,10 @@ impl<TYPES: NodeType> Inner<TYPES> {
// TODO ED Need to add vote indexing to web server for view sync certs
for cert in &deserialized_messages {
vote_index += 1;
broadcast_poll_queue.push(cert.clone());
let hash = hash(cert);
if seen_proposals.put(hash, ()).is_none() {
broadcast_poll_queue.push(cert.clone());
}
}
}

Expand Down Expand Up @@ -420,6 +455,10 @@ impl<TYPES: NodeType> Inner<TYPES> {
}
}

ConsensusIntentEvent::CancelPollForLatestViewSyncProposal => {
return Ok(());
}

_ => {
unimplemented!()
}
Expand Down Expand Up @@ -499,7 +538,7 @@ pub struct SendMsg<M: NetworkMsg> {
}

/// A message being received from the web server
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Hash)]
#[serde(bound(deserialize = ""))]
pub struct RecvMsg<M: NetworkMsg> {
/// The optional message being received
Expand Down Expand Up @@ -571,7 +610,8 @@ impl<TYPES: NodeType + 'static> WebServerNetwork<TYPES> {
view_sync_cert_task_map: Arc::default(),
view_sync_vote_task_map: Arc::default(),
txn_task_map: Arc::default(),
current_proposal_task: Arc::default(),
latest_quorum_proposal_task: Arc::default(),
latest_view_sync_proposal_task: Arc::default(),
});

inner.connected.store(true, Ordering::Relaxed);
Expand All @@ -593,7 +633,9 @@ impl<TYPES: NodeType + 'static> WebServerNetwork<TYPES> {
MessagePurpose::Proposal => config::post_proposal_route(*view_number),
MessagePurpose::Vote => config::post_vote_route(*view_number),
MessagePurpose::Data => config::post_transactions_route(),
MessagePurpose::Internal | MessagePurpose::CurrentProposal => {
MessagePurpose::Internal
| MessagePurpose::LatestQuorumProposal
| MessagePurpose::LatestViewSyncProposal => {
return Err(WebServerNetworkError::EndpointError)
}
MessagePurpose::ViewSyncProposal => {
Expand Down Expand Up @@ -907,8 +949,8 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
.prune_tasks(view_number, ConsensusIntentEvent::CancelPollForVIDDisperse)
.await;
}
ConsensusIntentEvent::PollForCurrentProposal => {
let mut proposal_task = self.inner.current_proposal_task.write().await;
ConsensusIntentEvent::PollForLatestQuorumProposal => {
let mut proposal_task = self.inner.latest_quorum_proposal_task.write().await;
if proposal_task.is_none() {
// create new task
let (sender, receiver) = unbounded();
Expand All @@ -918,20 +960,52 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
let inner_clone = self.inner.clone();
async move {
if let Err(e) = inner_clone
.poll_web_server(receiver, MessagePurpose::CurrentProposal, 1)
.poll_web_server(receiver, MessagePurpose::LatestQuorumProposal, 1)
.await
{
warn!(
"Background receive proposal polling encountered an error: {:?}",
e
);
}
let mut proposal_task = inner_clone.current_proposal_task.write().await;
let mut proposal_task =
inner_clone.latest_quorum_proposal_task.write().await;
*proposal_task = None;
}
});
}
}
ConsensusIntentEvent::PollForLatestViewSyncProposal => {
let mut latest_view_sync_proposal_task =
self.inner.latest_view_sync_proposal_task.write().await;
if latest_view_sync_proposal_task.is_none() {
// create new task
let (sender, receiver) = unbounded();
*latest_view_sync_proposal_task = Some(sender);

async_spawn({
let inner_clone = self.inner.clone();
async move {
if let Err(e) = inner_clone
.poll_web_server(
receiver,
MessagePurpose::LatestViewSyncProposal,
1,
)
.await
{
warn!(
"Background receive proposal polling encountered an error: {:?}",
e
);
}
let mut latest_view_sync_proposal_task =
inner_clone.latest_view_sync_proposal_task.write().await;
*latest_view_sync_proposal_task = None;
}
});
}
}
ConsensusIntentEvent::PollForVotes(view_number) => {
let mut task_map = self.inner.vote_task_map.write().await;
if let Entry::Vacant(e) = task_map.entry(view_number) {
Expand Down Expand Up @@ -1005,6 +1079,17 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
}
}

ConsensusIntentEvent::CancelPollForLatestViewSyncProposal => {
let mut latest_view_sync_proposal_task =
self.inner.latest_view_sync_proposal_task.write().await;

if let Some(thing) = latest_view_sync_proposal_task.take() {
let _res = thing
.send(ConsensusIntentEvent::CancelPollForLatestViewSyncProposal)
.await;
}
}

ConsensusIntentEvent::PollForViewSyncCertificate(view_number) => {
let mut task_map = self.inner.view_sync_cert_task_map.write().await;
if let Entry::Vacant(e) = task_map.entry(view_number) {
Expand Down
19 changes: 16 additions & 3 deletions crates/task-impls/src/view_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,12 @@ impl<
*view_number + 1,
))
.await;

// Poll for future view sync certificates
self.network
.inject_consensus_info(ConsensusIntentEvent::PollForLatestViewSyncProposal)
.await;

// Spawn replica task
let next_view = *view_number + 1;
// Subscribe to the view after we are leader since we know we won't propose in the next view if we are leader.
Expand All @@ -521,7 +527,7 @@ impl<
// Also subscribe to the latest view for the same reason. The GC will remove the above poll
// in the case that one doesn't resolve but this one does.
self.network
.inject_consensus_info(ConsensusIntentEvent::PollForCurrentProposal)
.inject_consensus_info(ConsensusIntentEvent::PollForLatestQuorumProposal)
.await;

self.network
Expand Down Expand Up @@ -829,6 +835,13 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
))
.await;

// Cancel poll for future view sync certificates
self.network
.inject_consensus_info(
ConsensusIntentEvent::CancelPollForLatestViewSyncProposal,
)
.await;

self.phase = last_seen_certificate;

if certificate.get_data().relay > self.relay {
Expand Down Expand Up @@ -895,9 +908,9 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
if let Some(timeout_task) = self.timeout_task.take() {
cancel_task(timeout_task).await;
}
// Keep tyring to get a more recent proposal to catch up to
// Keep trying to get a more recent proposal to catch up to
self.network
.inject_consensus_info(ConsensusIntentEvent::PollForCurrentProposal)
.inject_consensus_info(ConsensusIntentEvent::PollForLatestQuorumProposal)
.await;
self.relay += 1;
match self.phase {
Expand Down
6 changes: 4 additions & 2 deletions crates/types/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ pub struct Messages<TYPES: NodeType>(pub Vec<Message<TYPES>>);
pub enum MessagePurpose {
/// Message with a quorum proposal.
Proposal,
/// Message with most recent proposal the server has
CurrentProposal,
/// Message with most recent quorum proposal the server has
LatestQuorumProposal,
/// Message with most recent view sync proposal the server has
LatestViewSyncProposal,
/// Message with a quorum vote.
Vote,
/// Message with a view sync vote.
Expand Down
12 changes: 9 additions & 3 deletions crates/types/src/traits/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,10 @@ pub enum ConsensusIntentEvent<K: SignatureKey> {
PollForProposal(u64),
/// Poll for VID disperse data for a particular view
PollForVIDDisperse(u64),
/// Poll for the most recent proposal the webserver has
PollForCurrentProposal,
/// Poll for the most recent quorum proposal the webserver has
PollForLatestQuorumProposal,
/// Poll for the most recent view sync proposal the webserver has
PollForLatestViewSyncProposal,
/// Poll for a DAC for a particular view
PollForDAC(u64),
/// Poll for view sync votes starting at a particular view
Expand All @@ -172,6 +174,8 @@ pub enum ConsensusIntentEvent<K: SignatureKey> {
CancelPollForVIDDisperse(u64),
/// Cancel polling for transactions
CancelPollForTransactions(u64),
/// Cancel polling for most recent view sync proposal
CancelPollForLatestViewSyncProposal,
}

impl<K: SignatureKey> ConsensusIntentEvent<K> {
Expand All @@ -194,7 +198,9 @@ impl<K: SignatureKey> ConsensusIntentEvent<K> {
| ConsensusIntentEvent::PollForTransactions(view_number)
| ConsensusIntentEvent::CancelPollForTransactions(view_number)
| ConsensusIntentEvent::PollFutureLeader(view_number, _) => *view_number,
ConsensusIntentEvent::PollForCurrentProposal => 1,
ConsensusIntentEvent::PollForLatestQuorumProposal
| ConsensusIntentEvent::PollForLatestViewSyncProposal
| ConsensusIntentEvent::CancelPollForLatestViewSyncProposal => 1,
}
}
}
Expand Down
13 changes: 10 additions & 3 deletions crates/web_server/api.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,16 @@ DOC = """
Return the VID disperse data for a given view number
"""

# GET the proposal for a view, where the view is passed as an argument
[route.getrecentproposal]
PATH = ["proposal/recent"]
# GET the latest quorum proposal
[route.get_latest_quorum_proposal]
PATH = ["proposal/latest"]
DOC = """
Return the proposal for the most recent view the server has
"""

# GET the latest quorum proposal
[route.get_latest_view_sync_proposal]
PATH = ["view_sync_proposal/latest"]
DOC = """
Return the proposal for the most recent view the server has
"""
Expand Down
8 changes: 6 additions & 2 deletions crates/web_server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@ pub fn post_proposal_route(view_number: u64) -> String {
format!("api/proposal/{view_number}")
}

pub fn get_recent_proposal_route() -> String {
"api/proposal/recent".to_string()
pub fn get_latest_quorum_proposal_route() -> String {
"api/proposal/latest".to_string()
}

pub fn get_latest_view_sync_proposal_route() -> String {
"api/view_sync_proposal/latest".to_string()
}

pub fn get_da_certificate_route(view_number: u64) -> String {
Expand Down
Loading