Skip to content

Commit

Permalink
Merge branch 'develop' into keyao/block-header
Browse files Browse the repository at this point in the history
  • Loading branch information
shenkeyao committed Oct 30, 2023
2 parents 172c38f + 8ceb257 commit 4232453
Show file tree
Hide file tree
Showing 21 changed files with 915 additions and 175 deletions.
299 changes: 201 additions & 98 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ time = "0.3.30"
toml = "0.7.8"
tracing = "0.1.40"
typenum = "1.17.0"
libp2p = { package = "libp2p", version = "0.52.3", features = [
libp2p = { package = "libp2p", version = "0.52.4", features = [
"macros",
"autonat",
"deflate",
Expand Down
152 changes: 148 additions & 4 deletions crates/hotshot/src/traits/networking/web_server_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ struct Inner<M: NetworkMsg, KEY: SignatureKey, TYPES: NodeType> {
/// Task map for quorum votes.
vote_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task map for vid votes
vid_vote_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task map for VID certs
vid_cert_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task map for VID disperse data
vid_disperse_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task map for DACs.
dac_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
Expand Down Expand Up @@ -169,7 +178,7 @@ impl<M: NetworkMsg, KEY: SignatureKey, TYPES: NodeType> Inner<M, KEY, TYPES> {
MessagePurpose::DAC => config::get_da_certificate_route(view_number),
MessagePurpose::VidDisperse => config::get_vid_disperse_route(view_number), // like `Proposal`
MessagePurpose::VidVote => config::get_vid_vote_route(view_number, vote_index), // like `Vote`
MessagePurpose::VidCert => config::get_vid_cert_route(view_number), // like `DAC`
MessagePurpose::VidCert => config::get_vid_certificate_route(view_number), // like `DAC`
};

if message_purpose == MessagePurpose::Data {
Expand Down Expand Up @@ -351,8 +360,10 @@ impl<M: NetworkMsg, KEY: SignatureKey, TYPES: NodeType> Inner<M, KEY, TYPES> {
// TODO ED Should add extra error checking here to make sure we are intending to cancel a task
ConsensusIntentEvent::CancelPollForVotes(event_view)
| ConsensusIntentEvent::CancelPollForProposal(event_view)
| ConsensusIntentEvent::CancelPollForVIDVotes(event_view)
| ConsensusIntentEvent::CancelPollForVIDCertificate(event_view)
| ConsensusIntentEvent::CancelPollForDAC(event_view)
| ConsensusIntentEvent::CancelPollForViewSyncCertificate(event_view)
| ConsensusIntentEvent::CancelPollForVIDDisperse(event_view)
| ConsensusIntentEvent::CancelPollForViewSyncVotes(event_view) => {
if view_number == event_view {
debug!("Shutting down polling task for view {}", event_view);
Expand All @@ -371,7 +382,9 @@ impl<M: NetworkMsg, KEY: SignatureKey, TYPES: NodeType> Inner<M, KEY, TYPES> {
}
}

_ => unimplemented!(),
_ => {
unimplemented!()
}
}
}
// Nothing on receiving channel
Expand Down Expand Up @@ -528,6 +541,9 @@ impl<
tx_index: Arc::default(),
proposal_task_map: Arc::default(),
vote_task_map: Arc::default(),
vid_vote_task_map: Arc::default(),
vid_cert_task_map: Arc::default(),
vid_disperse_task_map: Arc::default(),
dac_task_map: Arc::default(),
view_sync_cert_task_map: Arc::default(),
view_sync_vote_task_map: Arc::default(),
Expand Down Expand Up @@ -562,7 +578,7 @@ impl<
MessagePurpose::DAC => config::post_da_certificate_route(*view_number),
MessagePurpose::VidVote => config::post_vid_vote_route(*view_number),
MessagePurpose::VidDisperse => config::post_vid_disperse_route(*view_number),
MessagePurpose::VidCert => config::post_vid_cert_route(*view_number),
MessagePurpose::VidCert => config::post_vid_certificate_route(*view_number),
};

let network_msg: SendMsg<M> = SendMsg {
Expand Down Expand Up @@ -822,6 +838,46 @@ impl<
.await;
}
}
ConsensusIntentEvent::PollForVIDDisperse(view_number) => {
// Check if we already have a task for this (we shouldn't)

// Going to do a write lock since mostly likely we will need it - can change to upgradable read in the future
let mut task_map = self.inner.vid_disperse_task_map.write().await;
if let Entry::Vacant(e) = task_map.entry(view_number) {
// create new task
let (sender, receiver) = unbounded();
e.insert(sender);

async_spawn({
let inner_clone = self.inner.clone();
async move {
if let Err(e) = inner_clone
.poll_web_server(receiver, MessagePurpose::VidDisperse, view_number)
.await
{
error!(
"Background receive VID disperse polling encountered an error: {:?}",
e
);
}
}
});
} else {
error!("Somehow task already existed!");
}

// GC proposal collection if we are two views in the future
if let Some((_, sender)) = task_map.remove_entry(&view_number.wrapping_sub(2)) {
// Send task cancel message to old task

// If task already exited we expect an error
let _res = sender
.send(ConsensusIntentEvent::CancelPollForVIDDisperse(
view_number.wrapping_sub(2),
))
.await;
}
}
ConsensusIntentEvent::PollForCurrentProposal => {
// create new task
let (_, receiver) = unbounded();
Expand Down Expand Up @@ -878,6 +934,44 @@ impl<
.await;
}
}
ConsensusIntentEvent::PollForVIDVotes(view_number) => {
let mut task_map = self.inner.vid_vote_task_map.write().await;
if let Entry::Vacant(e) = task_map.entry(view_number) {
// create new task
let (sender, receiver) = unbounded();
e.insert(sender);
async_spawn({
let inner_clone = self.inner.clone();
async move {
if let Err(e) = inner_clone
.poll_web_server(receiver, MessagePurpose::VidVote, view_number)
.await
{
error!(
"Background receive proposal polling encountered an error: {:?}",
e
);
}
}
});
} else {
error!("Somehow task already existed!");
}

// GC proposal collection if we are two views in the future
// TODO ED This won't work for vote collection, last task is more than 2 view ago depending on size of network, will need to rely on cancel task from consensus
if let Some((_, sender)) = task_map.remove_entry(&(view_number.wrapping_sub(2))) {
// Send task cancel message to old task

// If task already exited we expect an error
let _res = sender
.send(ConsensusIntentEvent::CancelPollForVIDVotes(
view_number.wrapping_sub(2),
))
.await;
}
}

ConsensusIntentEvent::PollForDAC(view_number) => {
let mut task_map = self.inner.dac_task_map.write().await;
if let Entry::Vacant(e) = task_map.entry(view_number) {
Expand Down Expand Up @@ -914,6 +1008,43 @@ impl<
.await;
}
}

ConsensusIntentEvent::PollForVIDCertificate(view_number) => {
let mut task_map = self.inner.vid_cert_task_map.write().await;
if let Entry::Vacant(e) = task_map.entry(view_number) {
// create new task
let (sender, receiver) = unbounded();
e.insert(sender);
async_spawn({
let inner_clone = self.inner.clone();
async move {
if let Err(e) = inner_clone
.poll_web_server(receiver, MessagePurpose::VidCert, view_number)
.await
{
error!(
"Background receive proposal polling encountered an error: {:?}",
e
);
}
}
});
} else {
error!("Somehow task already existed!");
}

// GC proposal collection if we are two views in the future
if let Some((_, sender)) = task_map.remove_entry(&(view_number.wrapping_sub(2))) {
// Send task cancel message to old task

// If task already exited we expect an error
let _res = sender
.send(ConsensusIntentEvent::CancelPollForVIDCertificate(
view_number.wrapping_sub(2),
))
.await;
}
}
ConsensusIntentEvent::CancelPollForVotes(view_number) => {
let mut task_map = self.inner.vote_task_map.write().await;

Expand All @@ -927,6 +1058,19 @@ impl<
}
}

ConsensusIntentEvent::CancelPollForVIDVotes(view_number) => {
let mut task_map = self.inner.vid_vote_task_map.write().await;

if let Some((_, sender)) = task_map.remove_entry(&(view_number)) {
// Send task cancel message to old task

// If task already exited we expect an error
let _res = sender
.send(ConsensusIntentEvent::CancelPollForVIDVotes(view_number))
.await;
}
}

ConsensusIntentEvent::PollForViewSyncCertificate(view_number) => {
let mut task_map = self.inner.view_sync_cert_task_map.write().await;
if let Entry::Vacant(e) = task_map.entry(view_number) {
Expand Down
13 changes: 8 additions & 5 deletions crates/libp2p-networking/src/network/behaviours/dht/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ mod cache;

use async_compatibility_layer::art::async_block_on;
use futures::channel::oneshot::Sender;
use libp2p::kad::Behaviour as KademliaBehaviour;
use libp2p::kad::Event as KademliaEvent;
use libp2p::{
kad::{
/* handler::KademliaHandlerIn, */ store::MemoryStore, BootstrapError, BootstrapOk,
GetClosestPeersOk, GetRecordOk, GetRecordResult, Kademlia, KademliaEvent, Mode,
ProgressStep, PutRecordResult, QueryId, QueryResult, Quorum, Record,
GetClosestPeersOk, GetRecordOk, GetRecordResult, Mode, ProgressStep, PutRecordResult,
QueryId, QueryResult, Quorum, Record,
},
swarm::{NetworkBehaviour, THandlerInEvent, THandlerOutEvent, ToSwarm},
Multiaddr,
Expand Down Expand Up @@ -52,7 +54,7 @@ pub struct DHTBehaviour {
/// List of previously failled put requests
queued_put_record_queries: VecDeque<KadPutQuery>,
/// Kademlia behaviour
pub kadem: Kademlia<MemoryStore>,
pub kadem: KademliaBehaviour<MemoryStore>,
/// State of bootstrapping
pub bootstrap_state: Bootstrap,
/// State of last random walk
Expand Down Expand Up @@ -114,7 +116,7 @@ impl DHTBehaviour {
/// Create a new DHT behaviour
#[must_use]
pub async fn new(
mut kadem: Kademlia<MemoryStore>,
mut kadem: KademliaBehaviour<MemoryStore>,
pid: PeerId,
replication_factor: NonZeroUsize,
cache_location: Option<String>,
Expand Down Expand Up @@ -540,7 +542,8 @@ pub enum DHTProgress {
// 1. use of deprecated associated function `libp2p::libp2p_swarm::NetworkBehaviour::inject_event`: Implement `NetworkBehaviour::on_connection_handler_event` instead. The default implementation of this `inject_*` method delegates to it.

impl NetworkBehaviour for DHTBehaviour {
type ConnectionHandler = <Kademlia<MemoryStore> as NetworkBehaviour>::ConnectionHandler;
type ConnectionHandler =
<KademliaBehaviour<MemoryStore> as NetworkBehaviour>::ConnectionHandler;

type ToSwarm = DHTEvent;

Expand Down
8 changes: 4 additions & 4 deletions crates/libp2p-networking/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ use std::{collections::HashSet, fmt::Debug, str::FromStr, sync::Arc, time::Durat
use tracing::{info, instrument};

#[cfg(async_executor_impl = "async-std")]
use libp2p::dns::DnsConfig;
use libp2p::dns::async_std::Transport as DnsTransport;
#[cfg(async_executor_impl = "tokio")]
use libp2p::dns::TokioDnsConfig as DnsConfig;
use libp2p::dns::tokio::Transport as DnsTransport;
#[cfg(async_executor_impl = "async-std")]
use quic::async_std::Transport as QuicTransport;
#[cfg(async_executor_impl = "tokio")]
Expand Down Expand Up @@ -211,12 +211,12 @@ pub async fn gen_transport(
let dns_quic = {
#[cfg(async_executor_impl = "async-std")]
{
DnsConfig::system(quic_transport).await
DnsTransport::system(quic_transport).await
}

#[cfg(async_executor_impl = "tokio")]
{
DnsConfig::system(quic_transport)
DnsTransport::system(quic_transport)
}
}
.map_err(|e| NetworkError::TransportLaunch { source: e })?;
Expand Down
27 changes: 17 additions & 10 deletions crates/libp2p-networking/src/network/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ use libp2p::{
Info as IdentifyInfo,
},
identity::Keypair,
kad::{store::MemoryStore, Kademlia, KademliaConfig},
kad::{store::MemoryStore, Behaviour, Config},
request_response::{
Behaviour as RequestResponse, Config as RequestResponseConfig, ProtocolSupport,
},
swarm::{SwarmBuilder, SwarmEvent},
Multiaddr, Swarm,
swarm::SwarmEvent,
Multiaddr, Swarm, SwarmBuilder,
};
use libp2p_identity::PeerId;
use rand::{prelude::SliceRandom, thread_rng};
Expand Down Expand Up @@ -236,7 +236,7 @@ impl NetworkNode {
let identify = IdentifyBehaviour::new(identify_cfg);

// - Build DHT needed for peer discovery
let mut kconfig = KademliaConfig::default();
let mut kconfig = Config::default();
// 8 hours by default
let record_republication_interval = config
.republication_interval
Expand All @@ -252,7 +252,7 @@ impl NetworkNode {
kconfig.set_replication_factor(factor);
}

let kadem = Kademlia::with_config(peer_id, MemoryStore::new(peer_id), kconfig);
let kadem = Behaviour::with_config(peer_id, MemoryStore::new(peer_id), kconfig);

let rrconfig = RequestResponseConfig::default();

Expand All @@ -275,12 +275,19 @@ impl NetworkNode {
identify,
DMBehaviour::new(request_response),
);
let executor = Box::new(|fut| {
async_spawn(fut);
});

SwarmBuilder::with_executor(transport, network, peer_id, executor)
.dial_concurrency_factor(std::num::NonZeroU8::new(1).unwrap())
// build swarm
let swarm = SwarmBuilder::with_existing_identity(identity.clone());
#[cfg(async_executor_impl = "async-std")]
let swarm = swarm.with_async_std();
#[cfg(async_executor_impl = "tokio")]
let swarm = swarm.with_tokio();

swarm
.with_other_transport(|_| transport)
.unwrap()
.with_behaviour(|_| network)
.unwrap()
.build()
};
for (peer, addr) in &config.to_connect_addrs {
Expand Down
7 changes: 2 additions & 5 deletions crates/task-impls/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1244,11 +1244,9 @@ where
let view = cert.view_number;
self.vid_certs.insert(view, cert);

// TODO Make sure we aren't voting for an arbitrarily old round for no reason
if self.vote_if_able().await {
self.current_proposal = None;
}
// RM TODO: VOTING
}

HotShotEvent::ViewChange(new_view) => {
debug!("View Change event for view {}", *new_view);

Expand Down Expand Up @@ -1530,7 +1528,6 @@ pub fn consensus_event_filter<TYPES: NodeType, I: NodeImplementation<TYPES>>(
| HotShotEvent::QuorumVoteRecv(_)
| HotShotEvent::QCFormed(_)
| HotShotEvent::DACRecv(_)
| HotShotEvent::VidCertRecv(_)
| HotShotEvent::ViewChange(_)
| HotShotEvent::SendPayloadCommitment(_)
| HotShotEvent::Timeout(_)
Expand Down
6 changes: 1 addition & 5 deletions crates/task-impls/src/da.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,7 @@ where
}
}
HotShotEvent::DAVoteRecv(vote) => {
// warn!(
// "DA vote recv, Main Task {:?}, key: {:?}",
// vote.current_view,
// self.committee_exchange.public_key()
// );
debug!("DA vote recv, Main Task {:?}", vote.current_view,);
// Check if we are the leader and the vote is from the sender.
let view = vote.current_view;
if !self.committee_exchange.is_leader(view) {
Expand Down
Loading

0 comments on commit 4232453

Please sign in to comment.