diff --git a/beacon_node/network/src/attestation_service/mod.rs b/beacon_node/network/src/attestation_service/mod.rs index cf94c5cdd7e..59f63890a29 100644 --- a/beacon_node/network/src/attestation_service/mod.rs +++ b/beacon_node/network/src/attestation_service/mod.rs @@ -2,7 +2,7 @@ //! given time. It schedules subscriptions to shard subnets, requests peer discoveries and //! determines whether attestations should be aggregated and/or passed to the beacon node. -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -10,10 +10,10 @@ use std::time::{Duration, Instant}; use futures::prelude::*; use rand::seq::SliceRandom; -use slog::{crit, debug, error, o, trace, warn}; +use slog::{debug, error, o, trace, warn}; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use eth2_libp2p::{types::GossipKind, NetworkGlobals, SubnetDiscovery}; +use eth2_libp2p::SubnetDiscovery; use hashset_delay::HashSetDelay; use rest_types::ValidatorSubscription; use slot_clock::SlotClock; @@ -66,17 +66,14 @@ pub struct AttestationService { /// Queued events to return to the driving service. events: VecDeque, - /// A collection of public network variables. - network_globals: Arc>, - /// A reference to the beacon chain to process received attestations. beacon_chain: Arc>, /// The collection of currently subscribed random subnets mapped to their expiry deadline. random_subnets: HashSetDelay, - /// A collection of timeouts for when to subscribe to a shard subnet. - subscriptions: HashSetDelay, + /// The collection of all currently subscribed subnets (long-lived **and** short-lived). + subscriptions: HashSet, /// A collection of timeouts for when to unsubscribe from a shard subnet. unsubscriptions: HashSetDelay, @@ -100,11 +97,7 @@ pub struct AttestationService { impl AttestationService { /* Public functions */ - pub fn new( - beacon_chain: Arc>, - network_globals: Arc>, - log: &slog::Logger, - ) -> Self { + pub fn new(beacon_chain: Arc>, log: &slog::Logger) -> Self { let log = log.new(o!("service" => "attestation_service")); // calculate the random subnet duration from the spec constants @@ -125,10 +118,9 @@ impl AttestationService { AttestationService { events: VecDeque::with_capacity(10), - network_globals, beacon_chain, random_subnets: HashSetDelay::new(Duration::from_millis(random_subnet_duration_millis)), - subscriptions: HashSetDelay::new(default_timeout), + subscriptions: HashSet::new(), unsubscriptions: HashSetDelay::new(default_timeout), aggregate_validators_on_subnet: HashSetDelay::new(default_timeout), known_validators: HashSetDelay::new(last_seen_val_timeout), @@ -137,6 +129,12 @@ impl AttestationService { } } + /// Return count of all currently subscribed subnets (long-lived **and** short-lived). + #[cfg(test)] + pub fn subscription_count(&self) -> usize { + self.subscriptions.len() + } + /// Processes a list of validator subscriptions. /// /// This will: @@ -321,40 +319,23 @@ impl AttestationService { .now() .ok_or_else(|| "Could not get the current slot")?; - // Calculate the duration to the subscription event and the duration to the end event. + // Calculate the duration to the unsubscription event. // There are two main cases. Attempting to subscribe to the current slot and all others. - let (duration_to_subscribe, expected_end_subscription_duration) = { - let duration_to_next_slot = self - .beacon_chain + let expected_end_subscription_duration = if current_slot >= exact_subnet.slot { + self.beacon_chain .slot_clock .duration_to_next_slot() - .ok_or_else(|| "Unable to determine duration to next slot")?; + .ok_or_else(|| "Unable to determine duration to next slot")? + } else { + let slot_duration = self.beacon_chain.slot_clock.slot_duration(); - if current_slot >= exact_subnet.slot { - (Duration::from_secs(0), duration_to_next_slot) - } else { - let slot_duration = self.beacon_chain.slot_clock.slot_duration(); - let advance_subscription_duration = slot_duration - .checked_div(ADVANCE_SUBSCRIBE_TIME) - .expect("ADVANCE_SUBSCRIPTION_TIME cannot be too large"); - - // calculate the time to subscribe to the subnet - let duration_to_subscribe = self - .beacon_chain - .slot_clock - .duration_to_slot(exact_subnet.slot) - .ok_or_else(|| "Unable to determine duration to subscription slot")? - .checked_sub(advance_subscription_duration) - .unwrap_or_else(|| Duration::from_secs(0)); - - // the duration until we no longer need this subscription. We assume a single slot is - // sufficient. - let expected_end_subscription_duration = duration_to_subscribe - + slot_duration - + std::cmp::min(advance_subscription_duration, duration_to_next_slot); - - (duration_to_subscribe, expected_end_subscription_duration) - } + // the duration until we no longer need this subscription. We assume a single slot is + // sufficient. + self.beacon_chain + .slot_clock + .duration_to_slot(exact_subnet.slot) + .ok_or_else(|| "Unable to determine duration to subscription slot")? + + slot_duration }; // Regardless of whether or not we have already subscribed to a subnet, track the expiration @@ -370,13 +351,12 @@ impl AttestationService { // in-active. This case is checked on the subscription event (see `handle_subscriptions`). // Return if we already have a subscription for this subnet_id and slot - if self.subscriptions.contains(&exact_subnet) { + if self.unsubscriptions.contains(&exact_subnet) { return Ok(()); } // We are not currently subscribed and have no waiting subscription, create one - self.subscriptions - .insert_at(exact_subnet.clone(), duration_to_subscribe); + self.handle_subscriptions(exact_subnet.clone()); // if there is an unsubscription event for the slot prior, we remove it to prevent // unsubscriptions immediately after the subscription. We also want to minimize @@ -437,35 +417,30 @@ impl AttestationService { }; for subnet_id in to_subscribe_subnets { - // remove this subnet from any immediate subscription/un-subscription events - self.subscriptions - .retain(|exact_subnet| exact_subnet.subnet_id != subnet_id); + // remove this subnet from any immediate un-subscription events self.unsubscriptions .retain(|exact_subnet| exact_subnet.subnet_id != subnet_id); // insert a new random subnet self.random_subnets.insert(subnet_id); - // if we are not already subscribed, then subscribe - let topic_kind = &GossipKind::Attestation(subnet_id); - - let already_subscribed = self - .network_globals - .gossipsub_subscriptions - .read() - .iter() - .any(|topic| topic.kind() == topic_kind); + // send discovery request + // Note: it's wasteful to send a DiscoverPeers request if we already have peers for this subnet. + // However, subscribing to random subnets ideally shouldn't happen very often (once in ~27 hours) and + // this makes it easier to deterministically test the attestations service. + self.events + .push_back(AttServiceMessage::DiscoverPeers(vec![SubnetDiscovery { + subnet_id, + min_ttl: None, + }])); - if !already_subscribed { - // send a discovery request and a subscription - self.events - .push_back(AttServiceMessage::DiscoverPeers(vec![SubnetDiscovery { - subnet_id, - min_ttl: None, - }])); + // if we are not already subscribed, then subscribe + if !self.subscriptions.contains(&subnet_id) { + self.subscriptions.insert(subnet_id); self.events .push_back(AttServiceMessage::Subscribe(subnet_id)); } + // add the subnet to the ENR bitfield self.events.push_back(AttServiceMessage::EnrAdd(subnet_id)); } @@ -499,17 +474,10 @@ impl AttestationService { // we are also not un-subscribing from a subnet if the next slot requires us to be // subscribed. Therefore there could be the case that we are already still subscribed // to the required subnet. In which case we do not issue another subscription request. - let topic_kind = &GossipKind::Attestation(exact_subnet.subnet_id); - if self - .network_globals - .gossipsub_subscriptions - .read() - .iter() - .find(|topic| topic.kind() == topic_kind) - .is_none() - { + if !self.subscriptions.contains(&exact_subnet.subnet_id) { // we are not already subscribed debug!(self.log, "Subscribing to subnet"; "subnet" => *exact_subnet.subnet_id, "target_slot" => exact_subnet.slot.as_u64()); + self.subscriptions.insert(exact_subnet.subnet_id); self.events .push_back(AttServiceMessage::Subscribe(exact_subnet.subnet_id)); } @@ -528,10 +496,7 @@ impl AttestationService { debug!(self.log, "Unsubscribing from subnet"; "subnet" => *exact_subnet.subnet_id, "processed_slot" => exact_subnet.slot.as_u64()); - // various logic checks - if self.subscriptions.contains(&exact_subnet) { - crit!(self.log, "Unsubscribing from a subnet in subscriptions"); - } + self.subscriptions.remove(&exact_subnet.subnet_id); self.events .push_back(AttServiceMessage::Unsubscribe(exact_subnet.subnet_id)); } @@ -581,33 +546,17 @@ impl AttestationService { &mut rand::thread_rng(), random_subnets_per_validator as usize, ); - let current_slot = self.beacon_chain.slot_clock.now().ok_or_else(|| { - warn!(self.log, "Could not get the current slot"); - })?; for subnet_id in to_remove_subnets { - // If a subscription is queued for two slots in the future, it's associated unsubscription - // will unsubscribe from the expired subnet. - // If there is no unsubscription for this subnet,slot it is safe to add one, without - // unsubscribing early from a required subnet - let subnet = ExactSubnet { - subnet_id: *subnet_id, - slot: current_slot + 2, - }; - if self.subscriptions.get(&subnet).is_none() { - // set an unsubscribe event - let duration_to_next_slot = self - .beacon_chain - .slot_clock - .duration_to_next_slot() - .ok_or_else(|| { - warn!(self.log, "Unable to determine duration to next slot"); - })?; - let slot_duration = self.beacon_chain.slot_clock.slot_duration(); - // Set the unsubscription timeout - let unsubscription_duration = duration_to_next_slot + slot_duration * 2; - self.unsubscriptions - .insert_at(subnet, unsubscription_duration); + // If there are no unsubscription events for `subnet_id`, we unsubscribe immediately. + if self + .unsubscriptions + .keys() + .find(|s| s.subnet_id == *subnet_id) + .is_none() + { + self.events + .push_back(AttServiceMessage::Unsubscribe(*subnet_id)); } // as the long lasting subnet subscription is being removed, remove the subnet_id from // the ENR bitfield @@ -632,15 +581,6 @@ impl Stream for AttestationService { self.waker = Some(cx.waker().clone()); } - // process any subscription events - match self.subscriptions.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(exact_subnet))) => self.handle_subscriptions(exact_subnet), - Poll::Ready(Some(Err(e))) => { - error!(self.log, "Failed to check for subnet subscription times"; "error"=> e); - } - Poll::Ready(None) | Poll::Pending => {} - } - // process any un-subscription events match self.unsubscriptions.poll_next_unpin(cx) { Poll::Ready(Some(Ok(exact_subnet))) => self.handle_unsubscriptions(exact_subnet), diff --git a/beacon_node/network/src/attestation_service/tests/mod.rs b/beacon_node/network/src/attestation_service/tests/mod.rs index 16a8f9b0a6e..0a392727a6f 100644 --- a/beacon_node/network/src/attestation_service/tests/mod.rs +++ b/beacon_node/network/src/attestation_service/tests/mod.rs @@ -7,12 +7,6 @@ mod tests { events::NullEventHandler, migrate::NullMigrator, }; - use eth2_libp2p::discovery::{build_enr, Keypair}; - use eth2_libp2p::rpc::methods::MetaData; - use eth2_libp2p::types::EnrBitfield; - use eth2_libp2p::{ - discovery::CombinedKey, CombinedKeyExt, NetworkConfig, NetworkGlobals, SubnetDiscovery, - }; use futures::Stream; use genesis::{generate_deterministic_keypairs, interop_genesis_state}; use lazy_static::lazy_static; @@ -24,7 +18,7 @@ mod tests { use store::config::StoreConfig; use store::{HotColdDB, MemoryStore}; use tempfile::tempdir; - use types::{CommitteeIndex, EnrForkId, EthSpec, MinimalEthSpec}; + use types::{CommitteeIndex, EthSpec, MinimalEthSpec}; const SLOT_DURATION_MILLIS: u64 = 200; @@ -100,19 +94,7 @@ mod tests { let beacon_chain = CHAIN.chain.clone(); - let config = NetworkConfig::default(); - let enr_key = CombinedKey::from_libp2p(&Keypair::generate_secp256k1()).unwrap(); - let enr = build_enr::(&enr_key, &config, EnrForkId::default()).unwrap(); - - // Default metadata - let meta_data = MetaData { - seq_number: 0, - attnets: EnrBitfield::::default(), - }; - - let network_globals: NetworkGlobals = - NetworkGlobals::new(enr, 0, 0, meta_data, vec![], &log); - AttestationService::new(beacon_chain, Arc::new(network_globals), &log) + AttestationService::new(beacon_chain, &log) } fn get_subscription( @@ -151,9 +133,9 @@ mod tests { // gets a number of events from the subscription service, or returns none if it times out after a number // of slots async fn get_events + Unpin>( - mut stream: S, - no_events: usize, - no_slots_before_timeout: u32, + stream: &mut S, + num_events: Option, + num_slots_before_timeout: u32, ) -> Vec { let mut events = Vec::new(); @@ -161,8 +143,10 @@ mod tests { loop { if let Some(result) = stream.next().await { events.push(result); - if events.len() == no_events { - return; + if let Some(num) = num_events { + if events.len() == num { + return; + } } } } @@ -171,7 +155,7 @@ mod tests { tokio::select! { _ = collect_stream_fut => {return events} _ = tokio::time::delay_for( - Duration::from_millis(SLOT_DURATION_MILLIS) * no_slots_before_timeout, + Duration::from_millis(SLOT_DURATION_MILLIS) * num_slots_before_timeout, ) => { return events; } } } @@ -182,7 +166,7 @@ mod tests { let validator_index = 1; let committee_index = 1; let subscription_slot = 0; - let no_events_expected = 4; + let num_events_expected = 4; let committee_count = 1; // create the attestation service and subscriptions @@ -216,13 +200,16 @@ mod tests { .unwrap(), )]; - let events = get_events(attestation_service, no_events_expected, 1).await; + let events = get_events(&mut attestation_service, Some(num_events_expected), 1).await; assert_matches!( events[..3], [AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any1), AttServiceMessage::EnrAdd(_any3)] ); + + // Should be subscribed to 1 long lived and one short lived subnet. + assert_eq!(attestation_service.subscription_count(), 2); // if there are fewer events than expected, there's been a collision - if events.len() == no_events_expected { + if events.len() == num_events_expected { assert_eq!(expected[..], events[3..]); } } @@ -233,7 +220,7 @@ mod tests { let validator_index = 1; let committee_index = 1; let subscription_slot = 0; - let no_events_expected = 5; + let num_events_expected = 5; let committee_count = 1; // create the attestation service and subscriptions @@ -269,253 +256,25 @@ mod tests { AttServiceMessage::Unsubscribe(subnet_id), ]; - let events = get_events(attestation_service, no_events_expected, 2).await; + let events = get_events(&mut attestation_service, Some(num_events_expected), 2).await; assert_matches!( events[..3], [AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any1), AttServiceMessage::EnrAdd(_any3)] ); - // if there are fewer events than expected, there's been a collision - if events.len() == no_events_expected { - assert_eq!(expected[..], events[3..]); - } - } - - #[tokio::test] - async fn subscribe_five_slots_ahead() { - // subscription config - let validator_index = 1; - let committee_index = 1; - let subscription_slot = 5; - let no_events_expected = 4; - let committee_count = 1; - - // create the attestation service and subscriptions - let mut attestation_service = get_attestation_service(); - let current_slot = attestation_service - .beacon_chain - .slot_clock - .now() - .expect("Could not get current slot"); - - let subscriptions = vec![get_subscription( - validator_index, - committee_index, - current_slot + Slot::new(subscription_slot), - committee_count, - )]; - - // submit the subscriptions - attestation_service - .validator_subscriptions(subscriptions) - .unwrap(); - - let min_ttl = Instant::now().checked_add( - attestation_service - .beacon_chain - .slot_clock - .duration_to_slot(current_slot + Slot::new(subscription_slot) + Slot::new(1)) - .unwrap(), - ); - - // just discover peers, don't subscribe yet - let subnet_id = SubnetId::compute_subnet::( - current_slot + Slot::new(subscription_slot), - committee_index, - committee_count, - &attestation_service.beacon_chain.spec, - ) - .unwrap(); - let expected = vec![AttServiceMessage::DiscoverPeers(vec![SubnetDiscovery { - subnet_id, - min_ttl, - }])]; - - let events = get_events(attestation_service, no_events_expected, 1).await; - assert_matches!( - events[..3], - [AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any2), AttServiceMessage::EnrAdd(_any3)] - ); - // if there are fewer events than expected, there's been a collision - if events.len() == no_events_expected { - assert_eq!(expected[..], events[3..]); - } - } - - #[tokio::test] - async fn subscribe_five_slots_ahead_wait_five_slots() { - // subscription config - let validator_index = 1; - let committee_index = 1; - let subscription_slot = 5; - let no_events_expected = 5; - let committee_count = 1; - - // create the attestation service and subscriptions - let mut attestation_service = get_attestation_service(); - let current_slot = attestation_service - .beacon_chain - .slot_clock - .now() - .expect("Could not get current slot"); - - let subscriptions = vec![get_subscription( - validator_index, - committee_index, - current_slot + Slot::new(subscription_slot), - committee_count, - )]; - - // submit the subscriptions - attestation_service - .validator_subscriptions(subscriptions) - .unwrap(); - - let min_ttl = Instant::now().checked_add( - attestation_service - .beacon_chain - .slot_clock - .duration_to_slot(current_slot + Slot::new(subscription_slot) + Slot::new(1)) - .unwrap(), - ); - - // we should discover peers, wait, then subscribe - let subnet_id = SubnetId::compute_subnet::( - current_slot + Slot::new(subscription_slot), - committee_index, - committee_count, - &attestation_service.beacon_chain.spec, - ) - .unwrap(); - let expected = vec![ - AttServiceMessage::DiscoverPeers(vec![SubnetDiscovery { subnet_id, min_ttl }]), - AttServiceMessage::Subscribe(subnet_id), - ]; - - let events = get_events(attestation_service, no_events_expected, 5).await; - assert_matches!( - events[..3], - [AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any2), AttServiceMessage::EnrAdd(_any3)] - ); - // if there are fewer events than expected, there's been a collision - if events.len() == no_events_expected { - assert_eq!(expected[..], events[3..]); - } - } - - #[tokio::test] - async fn subscribe_7_slots_ahead() { - // subscription config - let validator_index = 1; - let committee_index = 1; - let subscription_slot = 7; - let no_events_expected = 3; - let committee_count = 1; - - // create the attestation service and subscriptions - let mut attestation_service = get_attestation_service(); - let current_slot = attestation_service - .beacon_chain - .slot_clock - .now() - .expect("Could not get current slot"); - - let subscriptions = vec![get_subscription( - validator_index, - committee_index, - current_slot + Slot::new(subscription_slot), - committee_count, - )]; - - // submit the subscriptions - attestation_service - .validator_subscriptions(subscriptions) - .unwrap(); - - // ten slots ahead is before our target peer discover time, so expect no messages - let expected: Vec = vec![]; - - let events = get_events(attestation_service, no_events_expected, 1).await; - - assert_matches!( - events[..3], - [AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any2), AttServiceMessage::EnrAdd(_any3)] - ); - // if there are fewer events than expected, there's been a collision - if events.len() == no_events_expected { - assert_eq!(expected[..], events[3..]); - } - } - - #[tokio::test] - async fn subscribe_ten_slots_ahead_wait_five_slots() { - // subscription config - let validator_index = 1; - let committee_index = 1; - let subscription_slot = 10; - let no_events_expected = 4; - let committee_count = 1; - - // create the attestation service and subscriptions - let mut attestation_service = get_attestation_service(); - - let current_slot = attestation_service - .beacon_chain - .slot_clock - .now() - .expect("Could not get current slot"); - let subscriptions = vec![get_subscription( - validator_index, - committee_index, - current_slot + Slot::new(subscription_slot), - committee_count, - )]; - - // submit the subscriptions - attestation_service - .validator_subscriptions(subscriptions) - .unwrap(); - - let min_ttl = Instant::now().checked_add( - attestation_service - .beacon_chain - .slot_clock - .duration_to_slot(current_slot + Slot::new(subscription_slot) + Slot::new(1)) - .unwrap(), - ); - - let subnet_id = SubnetId::compute_subnet::( - current_slot + Slot::new(subscription_slot), - committee_index, - committee_count, - &attestation_service.beacon_chain.spec, - ) - .unwrap(); - - // expect discover peers because we will enter TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD range - let expected: Vec = - vec![AttServiceMessage::DiscoverPeers(vec![SubnetDiscovery { - subnet_id, - min_ttl, - }])]; - - let events = get_events(attestation_service, no_events_expected, 5).await; - - assert_matches!( - events[..3], - [AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any2), AttServiceMessage::EnrAdd(_any3)] - ); + // Should be subscribed to only 1 long lived subnet after unsubscription. + assert_eq!(attestation_service.subscription_count(), 1); // if there are fewer events than expected, there's been a collision - if events.len() == no_events_expected { + if events.len() == num_events_expected { assert_eq!(expected[..], events[3..]); } } #[tokio::test] async fn subscribe_all_random_subnets() { - // subscribe 10 slots ahead so we do not produce any exact subnet messages + let attestation_subnet_count = MinimalEthSpec::default_spec().attestation_subnet_count; let subscription_slot = 10; - let subscription_count = 64; + let subscription_count = attestation_subnet_count; let committee_count = 1; // create the attestation service and subscriptions @@ -537,25 +296,34 @@ mod tests { .validator_subscriptions(subscriptions) .unwrap(); - let events = get_events(attestation_service, 192, 3).await; + let events = get_events(&mut attestation_service, None, 3).await; let mut discover_peer_count = 0; - let mut subscribe_count = 0; let mut enr_add_count = 0; let mut unexpected_msg_count = 0; - for event in events { + for event in &events { match event { AttServiceMessage::DiscoverPeers(_) => { discover_peer_count = discover_peer_count + 1 } - AttServiceMessage::Subscribe(_any_subnet) => subscribe_count = subscribe_count + 1, + AttServiceMessage::Subscribe(_any_subnet) => {} AttServiceMessage::EnrAdd(_any_subnet) => enr_add_count = enr_add_count + 1, _ => unexpected_msg_count = unexpected_msg_count + 1, } } - assert_eq!(discover_peer_count, 64); - assert_eq!(subscribe_count, 64); + // The bulk discovery request length should be equal to validator_count + let bulk_discovery_event = events.last().unwrap(); + if let AttServiceMessage::DiscoverPeers(d) = bulk_discovery_event { + assert_eq!(d.len(), attestation_subnet_count as usize); + } else { + panic!("Unexpected event {:?}", bulk_discovery_event); + } + + // 64 `DiscoverPeer` requests of length 1 corresponding to random subnets + // and 1 `DiscoverPeer` request corresponding to bulk subnet discovery. + assert_eq!(discover_peer_count, subscription_count + 1); + assert_eq!(attestation_service.subscription_count(), 64); assert_eq!(enr_add_count, 64); assert_eq!(unexpected_msg_count, 0); // test completed successfully @@ -563,10 +331,10 @@ mod tests { #[tokio::test] async fn subscribe_all_random_subnets_plus_one() { - // subscribe 10 slots ahead so we do not produce any exact subnet messages + let attestation_subnet_count = MinimalEthSpec::default_spec().attestation_subnet_count; let subscription_slot = 10; // the 65th subscription should result in no more messages than the previous scenario - let subscription_count = 65; + let subscription_count = attestation_subnet_count + 1; let committee_count = 1; // create the attestation service and subscriptions @@ -588,62 +356,35 @@ mod tests { .validator_subscriptions(subscriptions) .unwrap(); - let events = get_events(attestation_service, 192, 3).await; + let events = get_events(&mut attestation_service, None, 3).await; let mut discover_peer_count = 0; - let mut subscribe_count = 0; let mut enr_add_count = 0; let mut unexpected_msg_count = 0; - for event in events { + for event in &events { match event { AttServiceMessage::DiscoverPeers(_) => { discover_peer_count = discover_peer_count + 1 } - AttServiceMessage::Subscribe(_any_subnet) => subscribe_count = subscribe_count + 1, + AttServiceMessage::Subscribe(_any_subnet) => {} AttServiceMessage::EnrAdd(_any_subnet) => enr_add_count = enr_add_count + 1, _ => unexpected_msg_count = unexpected_msg_count + 1, } } - assert_eq!(discover_peer_count, 64); - assert_eq!(subscribe_count, 64); - assert_eq!(enr_add_count, 64); - assert_eq!(unexpected_msg_count, 0); - } - - #[tokio::test] - async fn test_discovery_peers_count() { - let subscription_slot = 10; - let validator_count = 32; - let committee_count = 1; - let expected_events = 97; - - // create the attestation service and subscriptions - let mut attestation_service = get_attestation_service(); - let current_slot = attestation_service - .beacon_chain - .slot_clock - .now() - .expect("Could not get current slot"); - - let subscriptions = get_subscriptions( - validator_count, - current_slot + subscription_slot, - committee_count, - ); - - // submit sthe subscriptions - attestation_service - .validator_subscriptions(subscriptions) - .unwrap(); - - let events = get_events(attestation_service, expected_events, 3).await; - - let event = events.get(96); - if let Some(AttServiceMessage::DiscoverPeers(d)) = event { - assert_eq!(d.len(), validator_count as usize); + // The bulk discovery request length shouldn't exceed max attestation_subnet_count + let bulk_discovery_event = events.last().unwrap(); + if let AttServiceMessage::DiscoverPeers(d) = bulk_discovery_event { + assert_eq!(d.len(), attestation_subnet_count as usize); } else { - panic!("Unexpected event {:?}", event); + panic!("Unexpected event {:?}", bulk_discovery_event); } + // 64 `DiscoverPeer` requests of length 1 corresponding to random subnets + // and 1 `DiscoverPeer` request corresponding to the bulk subnet discovery. + // For the 65th subscription, the call to `subscribe_to_random_subnets` is not made because we are at capacity. + assert_eq!(discover_peer_count, 64 + 1); + assert_eq!(attestation_service.subscription_count(), 64); + assert_eq!(enr_add_count, 64); + assert_eq!(unexpected_msg_count, 0); } } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index e07ebb166ab..84f807007fa 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -149,8 +149,7 @@ impl NetworkService { )?; // attestation service - let attestation_service = - AttestationService::new(beacon_chain.clone(), network_globals.clone(), &network_log); + let attestation_service = AttestationService::new(beacon_chain.clone(), &network_log); // create a timer for updating network metrics let metrics_update = tokio::time::interval(Duration::from_secs(METRIC_UPDATE_INTERVAL));