Skip to content

Commit

Permalink
Remove subscriptions Hashset
Browse files Browse the repository at this point in the history
  • Loading branch information
pawanjay176 committed Sep 9, 2020
1 parent 95bfe26 commit 93f6cbf
Showing 1 changed file with 4 additions and 13 deletions.
17 changes: 4 additions & 13 deletions beacon_node/network/src/attestation_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! given time. It schedules subscriptions to shard subnets, requests peer discoveries and
//! determines whether attestations should be aggregated and/or passed to the beacon node.
use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::{HashMap, VecDeque};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -75,9 +75,6 @@ pub struct AttestationService<T: BeaconChainTypes> {
/// The collection of currently subscribed random subnets mapped to their expiry deadline.
random_subnets: HashSetDelay<SubnetId>,

/// A collection of timeouts for when to subscribe to a shard subnet.
subscriptions: HashSet<ExactSubnet>,

/// A collection of timeouts for when to unsubscribe from a shard subnet.
unsubscriptions: HashSetDelay<ExactSubnet>,

Expand Down Expand Up @@ -128,7 +125,6 @@ impl<T: BeaconChainTypes> AttestationService<T> {
network_globals,
beacon_chain,
random_subnets: HashSetDelay::new(Duration::from_millis(random_subnet_duration_millis)),
subscriptions: HashSet::new(),
unsubscriptions: HashSetDelay::new(default_timeout),
aggregate_validators_on_subnet: HashSetDelay::new(default_timeout),
known_validators: HashSetDelay::new(last_seen_val_timeout),
Expand Down Expand Up @@ -361,12 +357,11 @@ impl<T: BeaconChainTypes> AttestationService<T> {
// in-active. This case is checked on the subscription event (see `handle_subscriptions`).

// Return if we already have a subscription for this subnet_id and slot
if self.subscriptions.contains(&exact_subnet) {
if self.unsubscriptions.contains(&exact_subnet) {
return Ok(());
}

// We are not currently subscribed and have no waiting subscription, create one
self.subscriptions.insert(exact_subnet.clone());
self.handle_subscriptions(exact_subnet.clone());

// if there is an unsubscription event for the slot prior, we remove it to prevent
Expand Down Expand Up @@ -428,9 +423,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
};

for subnet_id in to_subscribe_subnets {
// remove this subnet from any immediate subscription/un-subscription events
self.subscriptions
.retain(|exact_subnet| exact_subnet.subnet_id != subnet_id);
// remove this subnet from any immediate un-subscription events
self.unsubscriptions
.retain(|exact_subnet| exact_subnet.subnet_id != subnet_id);

Expand Down Expand Up @@ -519,8 +512,6 @@ impl<T: BeaconChainTypes> AttestationService<T> {

debug!(self.log, "Unsubscribing from subnet"; "subnet" => *exact_subnet.subnet_id, "processed_slot" => exact_subnet.slot.as_u64());

// Remove the subscription from set of subscribed exact_subnets
self.subscriptions.remove(&exact_subnet);
self.events
.push_back(AttServiceMessage::Unsubscribe(exact_subnet.subnet_id));
}
Expand Down Expand Up @@ -583,7 +574,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
subnet_id: *subnet_id,
slot: current_slot + 2,
};
if self.subscriptions.get(&subnet).is_none() {
if self.unsubscriptions.get(&subnet).is_none() {
// set an unsubscribe event
let duration_to_next_slot = self
.beacon_chain
Expand Down

0 comments on commit 93f6cbf

Please sign in to comment.