Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3966 Remove clone from Membership and wrap it in Arc<RwLock>> #3976

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ name = "whitelist-push-cdn"
path = "push-cdn/whitelist-adapter.rs"

[dependencies]
async-lock = { workspace = true }
async-trait = { workspace = true }

cdn-broker = { workspace = true, features = ["global-permits"] }
Expand Down
66 changes: 38 additions & 28 deletions crates/examples/infra/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::{
time::Instant,
};

use async_lock::RwLock;
use async_trait::async_trait;
use cdn_broker::reexports::crypto::signature::KeyPair;
use chrono::Utc;
Expand Down Expand Up @@ -350,13 +351,17 @@ pub trait RunDa<
config: NetworkConfig<TYPES::SignatureKey>,
validator_config: ValidatorConfig<TYPES::SignatureKey>,
libp2p_advertise_address: Option<String>,
membership: &Arc<RwLock<<TYPES as NodeType>::Membership>>,
) -> Self;

/// Initializes the genesis state and HotShot instance; does not start HotShot consensus
/// # Panics if it cannot generate a genesis block, fails to initialize HotShot, or cannot
/// get the anchored view
/// Note: sequencing leaf does not have state, so does not return state
async fn initialize_state_and_hotshot(&self) -> SystemContextHandle<TYPES, NODE, V> {
async fn initialize_state_and_hotshot(
&self,
membership: Arc<RwLock<<TYPES as NodeType>::Membership>>,
) -> SystemContextHandle<TYPES, NODE, V> {
let initializer =
hotshot::HotShotInitializer::<TYPES>::from_genesis::<V>(TestInstanceState::default())
.await
Expand All @@ -371,20 +376,6 @@ pub trait RunDa<

let network = self.network();

let all_nodes = if cfg!(feature = "fixed-leader-election") {
let mut vec = config.config.known_nodes_with_stake.clone();
vec.truncate(config.config.fixed_leader_for_gpuvid);
vec
} else {
config.config.known_nodes_with_stake.clone()
};

let da_nodes = config.config.known_da_nodes.clone();

// Create the quorum membership from all nodes, specifying the committee
// as the known da nodes
let memberships = <TYPES as NodeType>::Membership::new(all_nodes, da_nodes);

let marketplace_config = MarketplaceConfig {
auction_results_provider: TestAuctionResultsProvider::<TYPES>::default().into(),
// TODO: we need to pass a valid fallback builder url here somehow
Expand All @@ -396,7 +387,7 @@ pub trait RunDa<
sk,
config.node_index,
config.config,
memberships,
membership,
Arc::from(network),
initializer,
ConsensusMetricsValue::default(),
Expand Down Expand Up @@ -526,13 +517,15 @@ pub trait RunDa<
}
}
}
let consensus_lock = context.hotshot.consensus();
let consensus = consensus_lock.read().await;
let num_eligible_leaders = context
.hotshot
.memberships
.read()
.await
.committee_leaders(TYPES::View::genesis(), TYPES::Epoch::genesis())
.len();
let consensus_lock = context.hotshot.consensus();
let consensus = consensus_lock.read().await;
let total_num_views = usize::try_from(consensus.locked_view().u64()).unwrap();
// `failed_num_views` could include uncommitted views
let failed_num_views = total_num_views - num_successful_commits;
Expand Down Expand Up @@ -622,6 +615,7 @@ where
config: NetworkConfig<TYPES::SignatureKey>,
validator_config: ValidatorConfig<TYPES::SignatureKey>,
_libp2p_advertise_address: Option<String>,
_membership: &Arc<RwLock<<TYPES as NodeType>::Membership>>,
) -> PushCdnDaRun<TYPES> {
// Convert to the Push-CDN-compatible type
let keypair = KeyPair {
Expand Down Expand Up @@ -708,6 +702,7 @@ where
config: NetworkConfig<TYPES::SignatureKey>,
validator_config: ValidatorConfig<TYPES::SignatureKey>,
libp2p_advertise_address: Option<String>,
membership: &Arc<RwLock<<TYPES as NodeType>::Membership>>,
) -> Libp2pDaRun<TYPES> {
// Extrapolate keys for ease of use
let public_key = &validator_config.public_key;
Expand Down Expand Up @@ -736,19 +731,14 @@ where
.to_string()
};

// Create the quorum membership from the list of known nodes
let all_nodes = config.config.known_nodes_with_stake.clone();
let da_nodes = config.config.known_da_nodes.clone();
let quorum_membership = TYPES::Membership::new(all_nodes, da_nodes);

// Derive the bind address
let bind_address =
derive_libp2p_multiaddr(&bind_address).expect("failed to derive bind address");

// Create the Libp2p network
let libp2p_network = Libp2pNetwork::from_config(
config.clone(),
quorum_membership,
Arc::clone(membership),
GossipConfig::default(),
RequestResponseConfig::default(),
bind_address,
Expand Down Expand Up @@ -820,6 +810,7 @@ where
config: NetworkConfig<TYPES::SignatureKey>,
validator_config: ValidatorConfig<TYPES::SignatureKey>,
libp2p_advertise_address: Option<String>,
membership: &Arc<RwLock<<TYPES as NodeType>::Membership>>,
) -> CombinedDaRun<TYPES> {
// Initialize our Libp2p network
let libp2p_network: Libp2pDaRun<TYPES> = <Libp2pDaRun<TYPES> as RunDa<
Expand All @@ -831,6 +822,7 @@ where
config.clone(),
validator_config.clone(),
libp2p_advertise_address.clone(),
membership,
)
.await;

Expand All @@ -844,6 +836,7 @@ where
config.clone(),
validator_config.clone(),
libp2p_advertise_address,
membership,
)
.await;

Expand Down Expand Up @@ -878,6 +871,7 @@ where
}
}

#[allow(clippy::too_many_lines)]
/// Main entry point for validators
/// # Panics
/// if unable to get the local ip address
Expand Down Expand Up @@ -974,11 +968,27 @@ pub async fn main_entry_point<
.join(",")
);

let all_nodes = if cfg!(feature = "fixed-leader-election") {
let mut vec = run_config.config.known_nodes_with_stake.clone();
vec.truncate(run_config.config.fixed_leader_for_gpuvid);
vec
} else {
run_config.config.known_nodes_with_stake.clone()
};
let membership = Arc::new(RwLock::new(<TYPES as NodeType>::Membership::new(
all_nodes,
run_config.config.known_da_nodes.clone(),
)));

info!("Initializing networking");
let run =
RUNDA::initialize_networking(run_config.clone(), validator_config, args.advertise_address)
.await;
let hotshot = run.initialize_state_and_hotshot().await;
let run = RUNDA::initialize_networking(
run_config.clone(),
validator_config,
args.advertise_address,
&membership,
)
.await;
let hotshot = run.initialize_state_and_hotshot(membership).await;

if let Some(task) = builder_task {
task.start(Box::new(hotshot.event_stream()));
Expand Down
25 changes: 17 additions & 8 deletions crates/hotshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub struct SystemContext<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versi
pub network: Arc<I::Network>,

/// Memberships used by consensus
pub memberships: Arc<TYPES::Membership>,
pub memberships: Arc<RwLock<TYPES::Membership>>,

/// the metrics that the implementor is using.
metrics: Arc<ConsensusMetricsValue>,
Expand Down Expand Up @@ -199,7 +199,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
nonce: u64,
config: HotShotConfig<TYPES::SignatureKey>,
memberships: TYPES::Membership,
memberships: Arc<RwLock<TYPES::Membership>>,
network: Arc<I::Network>,
initializer: HotShotInitializer<TYPES>,
metrics: ConsensusMetricsValue,
Expand Down Expand Up @@ -252,7 +252,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
nonce: u64,
config: HotShotConfig<TYPES::SignatureKey>,
memberships: TYPES::Membership,
memberships: Arc<RwLock<TYPES::Membership>>,
network: Arc<I::Network>,
initializer: HotShotInitializer<TYPES>,
metrics: ConsensusMetricsValue,
Expand Down Expand Up @@ -365,7 +365,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
start_view: initializer.start_view,
start_epoch: initializer.start_epoch,
network,
memberships: Arc::new(memberships),
memberships,
metrics: Arc::clone(&consensus_metrics),
internal_event_stream: (internal_tx, internal_rx.deactivate()),
output_event_stream: (external_tx.clone(), external_rx.clone().deactivate()),
Expand Down Expand Up @@ -512,6 +512,15 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
})?;

spawn(async move {
let memberships_da_committee_members = api
.memberships
.read()
.await
.da_committee_members(view_number, epoch)
.iter()
.cloned()
.collect();

join! {
// TODO We should have a function that can return a network error if there is one
// but first we'd need to ensure our network implementations can support that
Expand All @@ -523,7 +532,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
api
.network.da_broadcast_message(
serialized_message,
api.memberships.da_committee_members(view_number, epoch).iter().cloned().collect(),
memberships_da_committee_members,
BroadcastDelay::None,
),
api
Expand Down Expand Up @@ -608,7 +617,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
node_id: u64,
config: HotShotConfig<TYPES::SignatureKey>,
memberships: TYPES::Membership,
memberships: Arc<RwLock<TYPES::Membership>>,
network: Arc<I::Network>,
initializer: HotShotInitializer<TYPES>,
metrics: ConsensusMetricsValue,
Expand Down Expand Up @@ -771,7 +780,7 @@ where
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
nonce: u64,
config: HotShotConfig<TYPES::SignatureKey>,
memberships: TYPES::Membership,
memberships: Arc<RwLock<TYPES::Membership>>,
network: Arc<I::Network>,
initializer: HotShotInitializer<TYPES>,
metrics: ConsensusMetricsValue,
Expand All @@ -787,7 +796,7 @@ where
private_key.clone(),
nonce,
config.clone(),
memberships.clone(),
Arc::clone(&memberships),
Arc::clone(&network),
initializer.clone(),
metrics.clone(),
Expand Down
13 changes: 7 additions & 6 deletions crates/hotshot/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub fn add_response_task<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versi
) {
let state = NetworkResponseState::<TYPES>::new(
handle.hotshot.consensus(),
(*handle.hotshot.memberships).clone().into(),
Arc::clone(&handle.hotshot.memberships),
handle.public_key().clone(),
handle.private_key().clone(),
handle.hotshot.id,
Expand Down Expand Up @@ -190,7 +190,7 @@ pub fn add_network_event_task<
>(
handle: &mut SystemContextHandle<TYPES, I, V>,
network: Arc<NET>,
membership: TYPES::Membership,
membership: Arc<RwLock<TYPES::Membership>>,
) {
let network_state: NetworkEventTaskState<_, V, _, _> = NetworkEventTaskState {
network,
Expand Down Expand Up @@ -321,7 +321,7 @@ where
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
nonce: u64,
config: HotShotConfig<TYPES::SignatureKey>,
memberships: TYPES::Membership,
memberships: Arc<RwLock<TYPES::Membership>>,
network: Arc<I::Network>,
initializer: HotShotInitializer<TYPES>,
metrics: ConsensusMetricsValue,
Expand Down Expand Up @@ -516,16 +516,17 @@ where
/// Adds the `NetworkEventTaskState` tasks possibly modifying them as well.
fn add_network_event_tasks(&self, handle: &mut SystemContextHandle<TYPES, I, V>) {
let network = Arc::clone(&handle.network);
let memberships = Arc::clone(&handle.memberships);

self.add_network_event_task(handle, Arc::clone(&network), (*handle.memberships).clone());
self.add_network_event_task(handle, network, memberships);
}

/// Adds a `NetworkEventTaskState` task. Can be reimplemented to modify its behaviour.
fn add_network_event_task(
&self,
handle: &mut SystemContextHandle<TYPES, I, V>,
channel: Arc<<I as NodeImplementation<TYPES>>::Network>,
membership: TYPES::Membership,
membership: Arc<RwLock<TYPES::Membership>>,
) {
add_network_event_task(handle, channel, membership);
}
Expand Down Expand Up @@ -563,6 +564,6 @@ pub fn add_network_event_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>, V:
add_network_event_task(
handle,
Arc::clone(&handle.network),
(*handle.memberships).clone(),
Arc::clone(&handle.memberships),
);
}
Loading
Loading