Skip to content

Commit

Permalink
Subscribe to subnets an epoch in advance (#1600)
Browse files Browse the repository at this point in the history
## Issue Addressed

N/A

## Proposed Changes

Subscibe to subnet an epoch in advance of the attestation slot instead of 4 slots in advance.
  • Loading branch information
pawanjay176 committed Sep 22, 2020
1 parent 7aceff4 commit a97ec31
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 430 deletions.
168 changes: 54 additions & 114 deletions beacon_node/network/src/attestation_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@
//! given time. It schedules subscriptions to shard subnets, requests peer discoveries and
//! determines whether attestations should be aggregated and/or passed to the beacon node.
use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

use futures::prelude::*;
use rand::seq::SliceRandom;
use slog::{crit, debug, error, o, trace, warn};
use slog::{debug, error, o, trace, warn};

use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::{types::GossipKind, NetworkGlobals, SubnetDiscovery};
use eth2_libp2p::SubnetDiscovery;
use hashset_delay::HashSetDelay;
use rest_types::ValidatorSubscription;
use slot_clock::SlotClock;
Expand Down Expand Up @@ -66,17 +66,14 @@ pub struct AttestationService<T: BeaconChainTypes> {
/// Queued events to return to the driving service.
events: VecDeque<AttServiceMessage>,

/// A collection of public network variables.
network_globals: Arc<NetworkGlobals<T::EthSpec>>,

/// A reference to the beacon chain to process received attestations.
beacon_chain: Arc<BeaconChain<T>>,

/// The collection of currently subscribed random subnets mapped to their expiry deadline.
random_subnets: HashSetDelay<SubnetId>,

/// A collection of timeouts for when to subscribe to a shard subnet.
subscriptions: HashSetDelay<ExactSubnet>,
/// The collection of all currently subscribed subnets (long-lived **and** short-lived).
subscriptions: HashSet<SubnetId>,

/// A collection of timeouts for when to unsubscribe from a shard subnet.
unsubscriptions: HashSetDelay<ExactSubnet>,
Expand All @@ -100,11 +97,7 @@ pub struct AttestationService<T: BeaconChainTypes> {
impl<T: BeaconChainTypes> AttestationService<T> {
/* Public functions */

pub fn new(
beacon_chain: Arc<BeaconChain<T>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
log: &slog::Logger,
) -> Self {
pub fn new(beacon_chain: Arc<BeaconChain<T>>, log: &slog::Logger) -> Self {
let log = log.new(o!("service" => "attestation_service"));

// calculate the random subnet duration from the spec constants
Expand All @@ -125,10 +118,9 @@ impl<T: BeaconChainTypes> AttestationService<T> {

AttestationService {
events: VecDeque::with_capacity(10),
network_globals,
beacon_chain,
random_subnets: HashSetDelay::new(Duration::from_millis(random_subnet_duration_millis)),
subscriptions: HashSetDelay::new(default_timeout),
subscriptions: HashSet::new(),
unsubscriptions: HashSetDelay::new(default_timeout),
aggregate_validators_on_subnet: HashSetDelay::new(default_timeout),
known_validators: HashSetDelay::new(last_seen_val_timeout),
Expand All @@ -137,6 +129,12 @@ impl<T: BeaconChainTypes> AttestationService<T> {
}
}

/// Return count of all currently subscribed subnets (long-lived **and** short-lived).
#[cfg(test)]
pub fn subscription_count(&self) -> usize {
self.subscriptions.len()
}

/// Processes a list of validator subscriptions.
///
/// This will:
Expand Down Expand Up @@ -321,40 +319,23 @@ impl<T: BeaconChainTypes> AttestationService<T> {
.now()
.ok_or_else(|| "Could not get the current slot")?;

// Calculate the duration to the subscription event and the duration to the end event.
// Calculate the duration to the unsubscription event.
// There are two main cases. Attempting to subscribe to the current slot and all others.
let (duration_to_subscribe, expected_end_subscription_duration) = {
let duration_to_next_slot = self
.beacon_chain
let expected_end_subscription_duration = if current_slot >= exact_subnet.slot {
self.beacon_chain
.slot_clock
.duration_to_next_slot()
.ok_or_else(|| "Unable to determine duration to next slot")?;
.ok_or_else(|| "Unable to determine duration to next slot")?
} else {
let slot_duration = self.beacon_chain.slot_clock.slot_duration();

if current_slot >= exact_subnet.slot {
(Duration::from_secs(0), duration_to_next_slot)
} else {
let slot_duration = self.beacon_chain.slot_clock.slot_duration();
let advance_subscription_duration = slot_duration
.checked_div(ADVANCE_SUBSCRIBE_TIME)
.expect("ADVANCE_SUBSCRIPTION_TIME cannot be too large");

// calculate the time to subscribe to the subnet
let duration_to_subscribe = self
.beacon_chain
.slot_clock
.duration_to_slot(exact_subnet.slot)
.ok_or_else(|| "Unable to determine duration to subscription slot")?
.checked_sub(advance_subscription_duration)
.unwrap_or_else(|| Duration::from_secs(0));

// the duration until we no longer need this subscription. We assume a single slot is
// sufficient.
let expected_end_subscription_duration = duration_to_subscribe
+ slot_duration
+ std::cmp::min(advance_subscription_duration, duration_to_next_slot);

(duration_to_subscribe, expected_end_subscription_duration)
}
// the duration until we no longer need this subscription. We assume a single slot is
// sufficient.
self.beacon_chain
.slot_clock
.duration_to_slot(exact_subnet.slot)
.ok_or_else(|| "Unable to determine duration to subscription slot")?
+ slot_duration
};

// Regardless of whether or not we have already subscribed to a subnet, track the expiration
Expand All @@ -370,13 +351,12 @@ impl<T: BeaconChainTypes> AttestationService<T> {
// in-active. This case is checked on the subscription event (see `handle_subscriptions`).

// Return if we already have a subscription for this subnet_id and slot
if self.subscriptions.contains(&exact_subnet) {
if self.unsubscriptions.contains(&exact_subnet) {
return Ok(());
}

// We are not currently subscribed and have no waiting subscription, create one
self.subscriptions
.insert_at(exact_subnet.clone(), duration_to_subscribe);
self.handle_subscriptions(exact_subnet.clone());

// if there is an unsubscription event for the slot prior, we remove it to prevent
// unsubscriptions immediately after the subscription. We also want to minimize
Expand Down Expand Up @@ -437,35 +417,30 @@ impl<T: BeaconChainTypes> AttestationService<T> {
};

for subnet_id in to_subscribe_subnets {
// remove this subnet from any immediate subscription/un-subscription events
self.subscriptions
.retain(|exact_subnet| exact_subnet.subnet_id != subnet_id);
// remove this subnet from any immediate un-subscription events
self.unsubscriptions
.retain(|exact_subnet| exact_subnet.subnet_id != subnet_id);

// insert a new random subnet
self.random_subnets.insert(subnet_id);

// if we are not already subscribed, then subscribe
let topic_kind = &GossipKind::Attestation(subnet_id);

let already_subscribed = self
.network_globals
.gossipsub_subscriptions
.read()
.iter()
.any(|topic| topic.kind() == topic_kind);
// send discovery request
// Note: it's wasteful to send a DiscoverPeers request if we already have peers for this subnet.
// However, subscribing to random subnets ideally shouldn't happen very often (once in ~27 hours) and
// this makes it easier to deterministically test the attestations service.
self.events
.push_back(AttServiceMessage::DiscoverPeers(vec![SubnetDiscovery {
subnet_id,
min_ttl: None,
}]));

if !already_subscribed {
// send a discovery request and a subscription
self.events
.push_back(AttServiceMessage::DiscoverPeers(vec![SubnetDiscovery {
subnet_id,
min_ttl: None,
}]));
// if we are not already subscribed, then subscribe
if !self.subscriptions.contains(&subnet_id) {
self.subscriptions.insert(subnet_id);
self.events
.push_back(AttServiceMessage::Subscribe(subnet_id));
}

// add the subnet to the ENR bitfield
self.events.push_back(AttServiceMessage::EnrAdd(subnet_id));
}
Expand Down Expand Up @@ -499,17 +474,10 @@ impl<T: BeaconChainTypes> AttestationService<T> {
// we are also not un-subscribing from a subnet if the next slot requires us to be
// subscribed. Therefore there could be the case that we are already still subscribed
// to the required subnet. In which case we do not issue another subscription request.
let topic_kind = &GossipKind::Attestation(exact_subnet.subnet_id);
if self
.network_globals
.gossipsub_subscriptions
.read()
.iter()
.find(|topic| topic.kind() == topic_kind)
.is_none()
{
if !self.subscriptions.contains(&exact_subnet.subnet_id) {
// we are not already subscribed
debug!(self.log, "Subscribing to subnet"; "subnet" => *exact_subnet.subnet_id, "target_slot" => exact_subnet.slot.as_u64());
self.subscriptions.insert(exact_subnet.subnet_id);
self.events
.push_back(AttServiceMessage::Subscribe(exact_subnet.subnet_id));
}
Expand All @@ -528,10 +496,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {

debug!(self.log, "Unsubscribing from subnet"; "subnet" => *exact_subnet.subnet_id, "processed_slot" => exact_subnet.slot.as_u64());

// various logic checks
if self.subscriptions.contains(&exact_subnet) {
crit!(self.log, "Unsubscribing from a subnet in subscriptions");
}
self.subscriptions.remove(&exact_subnet.subnet_id);
self.events
.push_back(AttServiceMessage::Unsubscribe(exact_subnet.subnet_id));
}
Expand Down Expand Up @@ -581,33 +546,17 @@ impl<T: BeaconChainTypes> AttestationService<T> {
&mut rand::thread_rng(),
random_subnets_per_validator as usize,
);
let current_slot = self.beacon_chain.slot_clock.now().ok_or_else(|| {
warn!(self.log, "Could not get the current slot");
})?;

for subnet_id in to_remove_subnets {
// If a subscription is queued for two slots in the future, it's associated unsubscription
// will unsubscribe from the expired subnet.
// If there is no unsubscription for this subnet,slot it is safe to add one, without
// unsubscribing early from a required subnet
let subnet = ExactSubnet {
subnet_id: *subnet_id,
slot: current_slot + 2,
};
if self.subscriptions.get(&subnet).is_none() {
// set an unsubscribe event
let duration_to_next_slot = self
.beacon_chain
.slot_clock
.duration_to_next_slot()
.ok_or_else(|| {
warn!(self.log, "Unable to determine duration to next slot");
})?;
let slot_duration = self.beacon_chain.slot_clock.slot_duration();
// Set the unsubscription timeout
let unsubscription_duration = duration_to_next_slot + slot_duration * 2;
self.unsubscriptions
.insert_at(subnet, unsubscription_duration);
// If there are no unsubscription events for `subnet_id`, we unsubscribe immediately.
if self
.unsubscriptions
.keys()
.find(|s| s.subnet_id == *subnet_id)
.is_none()
{
self.events
.push_back(AttServiceMessage::Unsubscribe(*subnet_id));
}
// as the long lasting subnet subscription is being removed, remove the subnet_id from
// the ENR bitfield
Expand All @@ -632,15 +581,6 @@ impl<T: BeaconChainTypes> Stream for AttestationService<T> {
self.waker = Some(cx.waker().clone());
}

// process any subscription events
match self.subscriptions.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(exact_subnet))) => self.handle_subscriptions(exact_subnet),
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for subnet subscription times"; "error"=> e);
}
Poll::Ready(None) | Poll::Pending => {}
}

// process any un-subscription events
match self.unsubscriptions.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(exact_subnet))) => self.handle_unsubscriptions(exact_subnet),
Expand Down
Loading

0 comments on commit a97ec31

Please sign in to comment.