Skip to content

Commit

Permalink
3966 Remove clone from Membership and wrap it in Arc<RwLock>>
Browse files Browse the repository at this point in the history
  • Loading branch information
pls148 committed Dec 17, 2024
1 parent f177ccd commit 5d4c2cd
Show file tree
Hide file tree
Showing 53 changed files with 679 additions and 352 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ name = "whitelist-push-cdn"
path = "push-cdn/whitelist-adapter.rs"

[dependencies]
async-lock = { workspace = true }
async-trait = { workspace = true }

cdn-broker = { workspace = true, features = ["global-permits"] }
Expand Down
13 changes: 8 additions & 5 deletions crates/examples/infra/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::{
time::Instant,
};

use async_lock::RwLock;
use async_trait::async_trait;
use cdn_broker::reexports::crypto::signature::KeyPair;
use chrono::Utc;
Expand Down Expand Up @@ -396,7 +397,7 @@ pub trait RunDa<
sk,
config.node_index,
config.config,
memberships,
Arc::new(RwLock::new(memberships)),
Arc::from(network),
initializer,
ConsensusMetricsValue::default(),
Expand Down Expand Up @@ -526,13 +527,15 @@ pub trait RunDa<
}
}
}
let consensus_lock = context.hotshot.consensus();
let consensus = consensus_lock.read().await;
let num_eligible_leaders = context
.hotshot
.memberships
.read()
.await
.committee_leaders(TYPES::View::genesis(), TYPES::Epoch::genesis())
.len();
let consensus_lock = context.hotshot.consensus();
let consensus = consensus_lock.read().await;
let total_num_views = usize::try_from(consensus.locked_view().u64()).unwrap();
// `failed_num_views` could include uncommitted views
let failed_num_views = total_num_views - num_successful_commits;
Expand Down Expand Up @@ -739,7 +742,7 @@ where
// Create the quorum membership from the list of known nodes
let all_nodes = config.config.known_nodes_with_stake.clone();
let da_nodes = config.config.known_da_nodes.clone();
let quorum_membership = TYPES::Membership::new(all_nodes, da_nodes);
let membership = Arc::new(RwLock::new(TYPES::Membership::new(all_nodes, da_nodes)));

// Derive the bind address
let bind_address =
Expand All @@ -748,7 +751,7 @@ where
// Create the Libp2p network
let libp2p_network = Libp2pNetwork::from_config(
config.clone(),
quorum_membership,
membership,
GossipConfig::default(),
RequestResponseConfig::default(),
bind_address,
Expand Down
25 changes: 17 additions & 8 deletions crates/hotshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub struct SystemContext<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versi
pub network: Arc<I::Network>,

/// Memberships used by consensus
pub memberships: Arc<TYPES::Membership>,
pub memberships: Arc<RwLock<TYPES::Membership>>,

/// the metrics that the implementor is using.
metrics: Arc<ConsensusMetricsValue>,
Expand Down Expand Up @@ -199,7 +199,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
nonce: u64,
config: HotShotConfig<TYPES::SignatureKey>,
memberships: TYPES::Membership,
memberships: Arc<RwLock<TYPES::Membership>>,
network: Arc<I::Network>,
initializer: HotShotInitializer<TYPES>,
metrics: ConsensusMetricsValue,
Expand Down Expand Up @@ -252,7 +252,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
nonce: u64,
config: HotShotConfig<TYPES::SignatureKey>,
memberships: TYPES::Membership,
memberships: Arc<RwLock<TYPES::Membership>>,
network: Arc<I::Network>,
initializer: HotShotInitializer<TYPES>,
metrics: ConsensusMetricsValue,
Expand Down Expand Up @@ -364,7 +364,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
start_view: initializer.start_view,
start_epoch: initializer.start_epoch,
network,
memberships: Arc::new(memberships),
memberships,
metrics: Arc::clone(&consensus_metrics),
internal_event_stream: (internal_tx, internal_rx.deactivate()),
output_event_stream: (external_tx.clone(), external_rx.clone().deactivate()),
Expand Down Expand Up @@ -507,6 +507,15 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
})?;

spawn(async move {
let memberships_da_committee_members = api
.memberships
.read()
.await
.da_committee_members(view_number, TYPES::Epoch::new(1))
.iter()
.cloned()
.collect();

join! {
// TODO We should have a function that can return a network error if there is one
// but first we'd need to ensure our network implementations can support that
Expand All @@ -518,7 +527,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
api
.network.da_broadcast_message(
serialized_message,
api.memberships.da_committee_members(view_number, TYPES::Epoch::new(1)).iter().cloned().collect(),
memberships_da_committee_members,
BroadcastDelay::None,
),
api
Expand Down Expand Up @@ -603,7 +612,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
node_id: u64,
config: HotShotConfig<TYPES::SignatureKey>,
memberships: TYPES::Membership,
memberships: Arc<RwLock<TYPES::Membership>>,
network: Arc<I::Network>,
initializer: HotShotInitializer<TYPES>,
metrics: ConsensusMetricsValue,
Expand Down Expand Up @@ -766,7 +775,7 @@ where
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
nonce: u64,
config: HotShotConfig<TYPES::SignatureKey>,
memberships: TYPES::Membership,
memberships: Arc<RwLock<TYPES::Membership>>,
network: Arc<I::Network>,
initializer: HotShotInitializer<TYPES>,
metrics: ConsensusMetricsValue,
Expand All @@ -782,7 +791,7 @@ where
private_key.clone(),
nonce,
config.clone(),
memberships.clone(),
Arc::clone(&memberships),
Arc::clone(&network),
initializer.clone(),
metrics.clone(),
Expand Down
13 changes: 7 additions & 6 deletions crates/hotshot/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub fn add_response_task<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versi
) {
let state = NetworkResponseState::<TYPES>::new(
handle.hotshot.consensus(),
(*handle.hotshot.memberships).clone().into(),
Arc::clone(&handle.hotshot.memberships),
handle.public_key().clone(),
handle.private_key().clone(),
handle.hotshot.id,
Expand Down Expand Up @@ -190,7 +190,7 @@ pub fn add_network_event_task<
>(
handle: &mut SystemContextHandle<TYPES, I, V>,
network: Arc<NET>,
membership: TYPES::Membership,
membership: Arc<RwLock<TYPES::Membership>>,
) {
let network_state: NetworkEventTaskState<_, V, _, _> = NetworkEventTaskState {
network,
Expand Down Expand Up @@ -321,7 +321,7 @@ where
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
nonce: u64,
config: HotShotConfig<TYPES::SignatureKey>,
memberships: TYPES::Membership,
memberships: Arc<RwLock<TYPES::Membership>>,
network: Arc<I::Network>,
initializer: HotShotInitializer<TYPES>,
metrics: ConsensusMetricsValue,
Expand Down Expand Up @@ -516,16 +516,17 @@ where
/// Adds the `NetworkEventTaskState` tasks possibly modifying them as well.
fn add_network_event_tasks(&self, handle: &mut SystemContextHandle<TYPES, I, V>) {
let network = Arc::clone(&handle.network);
let memberships = Arc::clone(&handle.memberships);

self.add_network_event_task(handle, Arc::clone(&network), (*handle.memberships).clone());
self.add_network_event_task(handle, network, memberships);
}

/// Adds a `NetworkEventTaskState` task. Can be reimplemented to modify its behaviour.
fn add_network_event_task(
&self,
handle: &mut SystemContextHandle<TYPES, I, V>,
channel: Arc<<I as NodeImplementation<TYPES>>::Network>,
membership: TYPES::Membership,
membership: Arc<RwLock<TYPES::Membership>>,
) {
add_network_event_task(handle, channel, membership);
}
Expand Down Expand Up @@ -563,6 +564,6 @@ pub fn add_network_event_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>, V:
add_network_event_task(
handle,
Arc::clone(&handle.network),
(*handle.memberships).clone(),
Arc::clone(&handle.memberships),
);
}
22 changes: 11 additions & 11 deletions crates/hotshot/src/tasks/task_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
consensus: OuterConsensus::new(handle.hotshot.consensus()),
view: handle.cur_view().await,
delay: handle.hotshot.config.data_request_delay,
membership: (*handle.hotshot.memberships).clone(),
membership: Arc::clone(&handle.hotshot.memberships),
public_key: handle.public_key().clone(),
private_key: handle.private_key().clone(),
id: handle.hotshot.id,
Expand All @@ -78,7 +78,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
output_event_stream: handle.hotshot.external_event_stream.0.clone(),
cur_view: handle.cur_view().await,
cur_epoch: handle.cur_epoch().await,
quorum_membership: (*handle.hotshot.memberships).clone().into(),
membership: Arc::clone(&handle.hotshot.memberships),
vote_collectors: BTreeMap::default(),
public_key: handle.public_key().clone(),
private_key: handle.private_key().clone(),
Expand All @@ -99,7 +99,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
output_event_stream: handle.hotshot.external_event_stream.0.clone(),
cur_view: handle.cur_view().await,
cur_epoch: handle.cur_epoch().await,
membership: (*handle.hotshot.memberships).clone().into(),
membership: Arc::clone(&handle.hotshot.memberships),
network: Arc::clone(&handle.hotshot.network),
vote_collector: None.into(),
public_key: handle.public_key().clone(),
Expand Down Expand Up @@ -128,7 +128,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
cur_view: handle.cur_view().await,
cur_epoch: handle.cur_epoch().await,
network: Arc::clone(&handle.hotshot.network),
membership: (*handle.hotshot.memberships).clone().into(),
membership: Arc::clone(&handle.hotshot.memberships),
public_key: handle.public_key().clone(),
private_key: handle.private_key().clone(),
id: handle.hotshot.id,
Expand All @@ -144,7 +144,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
Self {
consensus: OuterConsensus::new(handle.hotshot.consensus()),
output_event_stream: handle.hotshot.external_event_stream.0.clone(),
membership: (*handle.hotshot.memberships).clone().into(),
membership: Arc::clone(&handle.hotshot.memberships),
network: Arc::clone(&handle.hotshot.network),
cur_view: handle.cur_view().await,
cur_epoch: handle.cur_epoch().await,
Expand All @@ -169,7 +169,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
cur_view,
next_view: cur_view,
cur_epoch: handle.cur_epoch().await,
membership: (*handle.hotshot.memberships).clone().into(),
membership: Arc::clone(&handle.hotshot.memberships),
public_key: handle.public_key().clone(),
private_key: handle.private_key().clone(),
num_timeouts_tracked: 0,
Expand All @@ -196,7 +196,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
consensus: OuterConsensus::new(handle.hotshot.consensus()),
cur_view: handle.cur_view().await,
cur_epoch: handle.cur_epoch().await,
membership: (*handle.hotshot.memberships).clone().into(),
membership: Arc::clone(&handle.hotshot.memberships),
public_key: handle.public_key().clone(),
private_key: handle.private_key().clone(),
instance_state: handle.hotshot.instance_state(),
Expand Down Expand Up @@ -240,7 +240,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
latest_voted_view: handle.cur_view().await,
vote_dependencies: BTreeMap::new(),
network: Arc::clone(&handle.hotshot.network),
membership: (*handle.hotshot.memberships).clone().into(),
membership: Arc::clone(&handle.hotshot.memberships),
drb_computations: DrbComputations::new(),
output_event_stream: handle.hotshot.external_event_stream.0.clone(),
id: handle.hotshot.id,
Expand All @@ -265,7 +265,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
proposal_dependencies: BTreeMap::new(),
consensus: OuterConsensus::new(consensus),
instance_state: handle.hotshot.instance_state(),
quorum_membership: (*handle.hotshot.memberships).clone().into(),
membership: Arc::clone(&handle.hotshot.memberships),
public_key: handle.public_key().clone(),
private_key: handle.private_key().clone(),
storage: Arc::clone(&handle.storage),
Expand All @@ -292,7 +292,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
consensus: OuterConsensus::new(consensus),
cur_view: handle.cur_view().await,
cur_epoch: handle.cur_epoch().await,
quorum_membership: (*handle.hotshot.memberships).clone().into(),
membership: Arc::clone(&handle.hotshot.memberships),
timeout: handle.hotshot.config.next_view_timeout,
output_event_stream: handle.hotshot.external_event_stream.0.clone(),
storage: Arc::clone(&handle.storage),
Expand All @@ -316,7 +316,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
private_key: handle.private_key().clone(),
instance_state: handle.hotshot.instance_state(),
network: Arc::clone(&handle.hotshot.network),
membership: (*handle.hotshot.memberships).clone().into(),
membership: Arc::clone(&handle.hotshot.memberships),
vote_collectors: BTreeMap::default(),
timeout_vote_collectors: BTreeMap::default(),
cur_view: handle.cur_view().await,
Expand Down
4 changes: 2 additions & 2 deletions crates/hotshot/src/traits/networking/libp2p_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ impl<T: NodeType> Libp2pNetwork<T> {
#[allow(clippy::too_many_arguments)]
pub async fn from_config(
mut config: NetworkConfig<T::SignatureKey>,
quorum_membership: T::Membership,
membership: Arc<RwLock<T::Membership>>,
gossip_config: GossipConfig,
request_response_config: RequestResponseConfig,
bind_address: Multiaddr,
Expand Down Expand Up @@ -421,7 +421,7 @@ impl<T: NodeType> Libp2pNetwork<T> {

// Set the auth message and stake table
config_builder
.stake_table(Some(quorum_membership))
.stake_table(Some(membership))
.auth_message(Some(auth_message));

// The replication factor is the minimum of [the default and 2/3 the number of nodes]
Expand Down
11 changes: 8 additions & 3 deletions crates/hotshot/src/types/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub struct SystemContextHandle<TYPES: NodeType, I: NodeImplementation<TYPES>, V:
pub network: Arc<I::Network>,

/// Memberships used by consensus
pub memberships: Arc<TYPES::Membership>,
pub memberships: Arc<RwLock<TYPES::Membership>>,

/// Number of blocks in an epoch, zero means there are no epochs
pub epoch_height: u64,
Expand Down Expand Up @@ -156,7 +156,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
signed_proposal_request.commit().as_ref(),
)?;

let mem = (*self.memberships).clone();
let mem = Arc::clone(&self.memberships);
let receiver = self.internal_event_stream.1.activate_cloned();
let sender = self.internal_event_stream.0.clone();
let epoch_height = self.epoch_height;
Expand Down Expand Up @@ -187,10 +187,13 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = hs_event.as_ref()
{
// Make sure that the quorum_proposal is valid
if let Err(err) = quorum_proposal.validate_signature(&mem, epoch_height) {
let mem_reader = mem.read().await;
if let Err(err) = quorum_proposal.validate_signature(&mem_reader, epoch_height)
{
tracing::warn!("Invalid Proposal Received after Request. Err {:?}", err);
continue;
}
drop(mem_reader);
let proposed_leaf = Leaf2::from_quorum_proposal(&quorum_proposal.data);
let commit = proposed_leaf.commit();
if commit == leaf_commitment {
Expand Down Expand Up @@ -326,6 +329,8 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
) -> Result<TYPES::SignatureKey> {
self.hotshot
.memberships
.read()
.await
.leader(view_number, epoch_number)
.context("Failed to lookup leader")
}
Expand Down
1 change: 1 addition & 0 deletions crates/libp2p-networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ hotshot-example-types = { path = "../example-types" }

[dependencies]
anyhow = { workspace = true }
async-lock = { workspace = true }
async-trait = { workspace = true }
bincode = { workspace = true }
blake3 = { workspace = true }
Expand Down
Loading

0 comments on commit 5d4c2cd

Please sign in to comment.