Skip to content

Commit

Permalink
Merge pull request #2494 from EspressoSystems/bf/code-cleanup
Browse files Browse the repository at this point in the history
[DEAD CODE] Remove More Unused Code
  • Loading branch information
bfish713 authored Feb 1, 2024
2 parents 6c75f2f + 1248f94 commit e457776
Show file tree
Hide file tree
Showing 12 changed files with 16 additions and 476 deletions.
8 changes: 1 addition & 7 deletions crates/hotshot/examples/combined/types.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::infra::CombinedDARun;
use hotshot::traits::implementations::{CombinedCommChannel, MemoryStorage};
use hotshot_testing::state_types::TestTypes;
use hotshot_types::traits::node_implementation::{ChannelMaps, NodeImplementation, NodeType};
use hotshot_types::traits::node_implementation::NodeImplementation;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;

Expand All @@ -22,12 +22,6 @@ impl NodeImplementation<TestTypes> for NodeImpl {
type Storage = MemoryStorage<TestTypes>;
type QuorumNetwork = QuorumNetwork;
type CommitteeNetwork = DANetwork;

fn new_channel_maps(
start_view: <TestTypes as NodeType>::Time,
) -> (ChannelMaps<TestTypes>, Option<ChannelMaps<TestTypes>>) {
(ChannelMaps::new(start_view), None)
}
}
/// convenience type alias
pub type ThisRun = CombinedDARun<TestTypes>;
8 changes: 1 addition & 7 deletions crates/hotshot/examples/libp2p/types.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::infra::Libp2pDARun;
use hotshot::traits::implementations::{Libp2pCommChannel, MemoryStorage};
use hotshot_testing::state_types::TestTypes;
use hotshot_types::traits::node_implementation::{ChannelMaps, NodeImplementation, NodeType};
use hotshot_types::traits::node_implementation::NodeImplementation;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;

Expand All @@ -22,12 +22,6 @@ impl NodeImplementation<TestTypes> for NodeImpl {
type Storage = MemoryStorage<TestTypes>;
type QuorumNetwork = QuorumNetwork;
type CommitteeNetwork = DANetwork;

fn new_channel_maps(
start_view: <TestTypes as NodeType>::Time,
) -> (ChannelMaps<TestTypes>, Option<ChannelMaps<TestTypes>>) {
(ChannelMaps::new(start_view), None)
}
}
/// convenience type alias
pub type ThisRun = Libp2pDARun<TestTypes>;
8 changes: 1 addition & 7 deletions crates/hotshot/examples/webserver/types.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::infra::WebServerDARun;
use hotshot::traits::implementations::{MemoryStorage, WebCommChannel};
use hotshot_testing::state_types::TestTypes;
use hotshot_types::traits::node_implementation::{ChannelMaps, NodeImplementation, NodeType};
use hotshot_types::traits::node_implementation::NodeImplementation;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;

Expand All @@ -22,12 +22,6 @@ impl NodeImplementation<TestTypes> for NodeImpl {
type Storage = MemoryStorage<TestTypes>;
type CommitteeNetwork = DANetwork;
type QuorumNetwork = QuorumNetwork;

fn new_channel_maps(
start_view: <TestTypes as NodeType>::Time,
) -> (ChannelMaps<TestTypes>, Option<ChannelMaps<TestTypes>>) {
(ChannelMaps::new(start_view), None)
}
}
/// convenience type alias
pub type ThisRun = WebServerDARun<TestTypes>;
161 changes: 7 additions & 154 deletions crates/hotshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@ use crate::{
traits::{NodeImplementation, Storage},
types::{Event, SystemContextHandle},
};
use async_compatibility_layer::{
art::{async_spawn, async_spawn_local},
channel::UnboundedSender,
};
use async_lock::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard};
use async_compatibility_layer::art::async_spawn;
use async_lock::RwLock;
use async_trait::async_trait;
use commit::Committable;
use custom_debug::Debug;
Expand All @@ -37,22 +34,17 @@ use hotshot_task::{
};
use hotshot_task_impls::{events::HotShotEvent, network::NetworkTaskKind};

#[cfg(feature = "hotshot-testing")]
use hotshot_types::traits::node_implementation::ChannelMaps;

use hotshot_types::{
consensus::{Consensus, ConsensusMetricsValue, View, ViewInner, ViewQueue},
consensus::{Consensus, ConsensusMetricsValue, View, ViewInner},
data::Leaf,
error::StorageSnafu,
event::EventType,
message::{
DataMessage, InternalTrigger, Message, MessageKind, ProcessedGeneralConsensusMessage,
},
message::{DataMessage, Message, MessageKind},
simple_certificate::QuorumCertificate,
traits::{
consensus_api::ConsensusApi,
network::{CommunicationChannel, NetworkError},
node_implementation::{ConsensusTime, NodeType, SendToTasks},
network::CommunicationChannel,
node_implementation::{ConsensusTime, NodeType},
signature_key::SignatureKey,
states::ValidatedState,
storage::StoredView,
Expand All @@ -69,7 +61,7 @@ use std::{
time::Duration,
};
use tasks::add_vid_task;
use tracing::{debug, info, instrument, trace, warn};
use tracing::{debug, info, instrument, trace};

// -- Rexports
// External
Expand Down Expand Up @@ -148,11 +140,6 @@ pub struct SystemContextInner<TYPES: NodeType, I: NodeImplementation<TYPES>> {
/// The hotstuff implementation
consensus: Arc<RwLock<Consensus<TYPES>>>,

/// Channels for sending/recv-ing proposals and votes for quorum and committee exchanges, the
/// latter of which is only applicable for sequencing consensus.
#[cfg(feature = "hotshot-testing")]
channel_maps: (ChannelMaps<TYPES>, Option<ChannelMaps<TYPES>>),

// global_registry: GlobalRegistry,
/// Access to the output event stream.
output_event_stream: ChannelStream<Event<TYPES>>,
Expand Down Expand Up @@ -253,7 +240,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
let inner: Arc<SystemContextInner<TYPES, I>> = Arc::new(SystemContextInner {
id: nonce,
#[cfg(feature = "hotshot-testing")]
channel_maps: I::new_channel_maps(start_view),
consensus,
public_key,
private_key,
Expand All @@ -279,44 +265,12 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
.await;
}

/// Marks a given view number as timed out. This should be called a fixed period after a round is started.
///
/// If the round has already ended then this function will essentially be a no-op. Otherwise `run_round` will return shortly after this function is called.
/// # Panics
/// Panics if the current view is not in the channel map
#[instrument(
skip_all,
fields(id = self.inner.id, view = *current_view),
name = "Timeout consensus tasks",
level = "warn"
)]
pub async fn timeout_view(
&self,
current_view: TYPES::Time,
send_replica: UnboundedSender<ProcessedGeneralConsensusMessage<TYPES>>,
send_next_leader: Option<UnboundedSender<ProcessedGeneralConsensusMessage<TYPES>>>,
) {
let msg = ProcessedGeneralConsensusMessage::<TYPES>::InternalTrigger(
InternalTrigger::Timeout(current_view),
);
if let Some(chan) = send_next_leader {
if chan.send(msg.clone()).await.is_err() {
debug!("Error timing out next leader task");
}
};
// NOTE this should always exist
if send_replica.send(msg).await.is_err() {
debug!("Error timing out replica task");
};
}

/// Emit an external event
// A copypasta of `ConsensusApi::send_event`
// TODO: remove with https://github.com/EspressoSystems/HotShot/issues/2407
async fn send_external_event(&self, event: Event<TYPES>) {
debug!(?event, "send_external_event");
self.inner.output_event_stream.publish(event).await;

}

/// Publishes a transaction asynchronously to the network
Expand Down Expand Up @@ -457,112 +411,11 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
Ok((handle, internal_event_stream))
}

/// Send a broadcast message.
///
/// This is an alias for `hotshot.inner.networking.broadcast_message(msg.into())`.
///
/// # Errors
///
/// Will return any errors that the underlying `broadcast_message` can return.
// this clippy lint is silly. This is async by requirement of the trait.
#[allow(clippy::unused_async)]
pub async fn send_broadcast_message(
&self,
kind: impl Into<MessageKind<TYPES>>,
) -> std::result::Result<(), NetworkError> {
let inner = self.inner.clone();
let pk = self.inner.public_key.clone();
let kind = kind.into();

async_spawn_local(async move {
if inner
.networks
.quorum_network
.broadcast_message(
Message {
version: PROGRAM_PROTOCOL_VERSION,
sender: pk,
kind,
},
// TODO this is morally wrong
&inner.memberships.quorum_membership.clone(),
)
.await
.is_err()
{
warn!("Failed to broadcast message");
};
});
Ok(())
}

/// Send a direct message to a given recipient.
///
/// This is an alias for `hotshot.inner.networking.message_node(msg.into(), recipient)`.
///
/// # Errors
///
/// Will return any errors that the underlying `message_node` can return.
pub async fn send_direct_message(
&self,
kind: impl Into<MessageKind<TYPES>>,
recipient: TYPES::SignatureKey,
) -> std::result::Result<(), NetworkError> {
self.inner
.networks
.quorum_network
.direct_message(
Message {
version: PROGRAM_PROTOCOL_VERSION,
sender: self.inner.public_key.clone(),
kind: kind.into(),
},
recipient,
)
.await?;
Ok(())
}

/// return the timeout for a view for `self`
#[must_use]
pub fn get_next_view_timeout(&self) -> u64 {
self.inner.config.next_view_timeout
}

/// given a view number and a upgradable read lock on a channel map, inserts entry into map if it
/// doesn't exist, or creates entry. Then returns a clone of the entry
pub async fn create_or_obtain_chan_from_read(
view_num: TYPES::Time,
channel_map: RwLockUpgradableReadGuard<'_, SendToTasks<TYPES>>,
) -> ViewQueue<TYPES> {
// check if we have the entry
// if we don't, insert
if let Some(vq) = channel_map.channel_map.get(&view_num) {
vq.clone()
} else {
let mut channel_map =
RwLockUpgradableReadGuard::<'_, SendToTasks<TYPES>>::upgrade(channel_map).await;
let new_view_queue = ViewQueue::default();
let vq = new_view_queue.clone();
// NOTE: the read lock is held until all other read locks are DROPPED and
// the read lock may be turned into a write lock.
// This means that the `channel_map` will not change. So we don't need
// to check again to see if a channel was added

channel_map.channel_map.insert(view_num, new_view_queue);
vq
}
}

/// given a view number and a write lock on a channel map, inserts entry into map if it
/// doesn't exist, or creates entry. Then returns a clone of the entry
#[allow(clippy::unused_async)] // async for API compatibility reasons
pub async fn create_or_obtain_chan_from_write(
view_num: TYPES::Time,
mut channel_map: RwLockWriteGuard<'_, SendToTasks<TYPES>>,
) -> ViewQueue<TYPES> {
channel_map.channel_map.entry(view_num).or_default().clone()
}
}

impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
Expand Down
Loading

0 comments on commit e457776

Please sign in to comment.