diff --git a/config/src/lib.rs b/config/src/lib.rs index 330526ec5..2cd5e93fe 100644 --- a/config/src/lib.rs +++ b/config/src/lib.rs @@ -13,7 +13,7 @@ use crypto::{traits::EncodeDecodeBase64, PublicKey}; use multiaddr::Multiaddr; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::{ - collections::{BTreeMap, HashMap}, + collections::{BTreeMap, HashMap, HashSet}, fs::{self, OpenOptions}, io::{BufWriter, Write as _}, net::SocketAddr, @@ -452,6 +452,44 @@ impl Committee { .collect() } + /// Return all the network addresses in the committee. + fn get_all_network_addresses(&self) -> HashSet<&Multiaddr> { + self.authorities + .values() + .flat_map(|authority| { + std::iter::once(&authority.primary.primary_to_primary) + .chain(std::iter::once(&authority.primary.worker_to_primary)) + .chain( + authority + .workers + .values() + .map(|address| &address.transactions), + ) + .chain( + authority + .workers + .values() + .map(|address| &address.worker_to_worker), + ) + .chain( + authority + .workers + .values() + .map(|address| &address.primary_to_worker), + ) + }) + .collect() + } + + /// Return the network addresses that are present in the current committee but that are absent + /// from the new committee (provided as argument). + pub fn network_diff<'a>(&'a self, other: &'a Self) -> HashSet<&Multiaddr> { + self.get_all_network_addresses() + .difference(&other.get_all_network_addresses()) + .cloned() + .collect() + } + /// Update the networking information of some of the primaries. The arguments are a full vector of /// authorities which Public key and Stake must match the one stored in the current Committee. Any discrepancy /// will generate no update and return a vector of errors. diff --git a/network/src/primary.rs b/network/src/primary.rs index 493167068..025580043 100644 --- a/network/src/primary.rs +++ b/network/src/primary.rs @@ -73,6 +73,15 @@ impl PrimaryNetwork { } } } + + pub fn cleanup<'a, I>(&mut self, to_remove: I) + where + I: IntoIterator, + { + for address in to_remove { + self.clients.remove(address); + } + } } impl BaseNetwork for PrimaryNetwork { @@ -183,6 +192,15 @@ impl PrimaryToWorkerNetwork { m.set_network_available_tasks(self.executor.available_capacity() as i64, None); } } + + pub fn cleanup<'a, I>(&mut self, to_remove: I) + where + I: IntoIterator, + { + for address in to_remove { + self.clients.remove(address); + } + } } impl Default for PrimaryToWorkerNetwork { diff --git a/network/src/worker.rs b/network/src/worker.rs index e0321878a..4f0f56efc 100644 --- a/network/src/worker.rs +++ b/network/src/worker.rs @@ -74,6 +74,15 @@ impl WorkerNetwork { } } } + + pub fn cleanup<'a, I>(&mut self, to_remove: I) + where + I: IntoIterator, + { + for address in to_remove { + self.clients.remove(address); + } + } } impl BaseNetwork for WorkerNetwork { diff --git a/node/src/restarter.rs b/node/src/restarter.rs index bb7598acd..79d5e1b40 100644 --- a/node/src/restarter.rs +++ b/node/src/restarter.rs @@ -106,6 +106,9 @@ impl NodeRestarter { join_all(worker_cancel_handles).await; tracing::debug!("Committee reconfiguration message successfully sent"); + // Cleanup the network. + worker_network.cleanup(committee.network_diff(&new_committee)); + // Wait for the components to shut down. join_all(handles.drain(..)).await; tracing::debug!("All tasks successfully exited"); diff --git a/primary/src/block_remover.rs b/primary/src/block_remover.rs index bd0f6f284..82d6db069 100644 --- a/primary/src/block_remover.rs +++ b/primary/src/block_remover.rs @@ -285,9 +285,11 @@ impl BlockRemover { let message = self.rx_reconfigure.borrow().clone(); match message { ReconfigureNotification::NewEpoch(new_committee)=> { + self.worker_network.cleanup(self.committee.network_diff(&new_committee)); self.committee = new_committee; } ReconfigureNotification::UpdateCommittee(new_committee)=> { + self.worker_network.cleanup(self.committee.network_diff(&new_committee)); self.committee = new_committee; } ReconfigureNotification::Shutdown => return diff --git a/primary/src/block_synchronizer/mod.rs b/primary/src/block_synchronizer/mod.rs index 3d0399a1f..fdd4d2082 100644 --- a/primary/src/block_synchronizer/mod.rs +++ b/primary/src/block_synchronizer/mod.rs @@ -316,9 +316,13 @@ impl BlockSynchronizer { let message = self.rx_reconfigure.borrow().clone(); match message { ReconfigureNotification::NewEpoch(new_committee)=> { + self.primary_network.cleanup(self.committee.network_diff(&new_committee)); + self.worker_network.cleanup(self.committee.network_diff(&new_committee)); self.committee = new_committee; } ReconfigureNotification::UpdateCommittee(new_committee)=> { + self.primary_network.cleanup(self.committee.network_diff(&new_committee)); + self.worker_network.cleanup(self.committee.network_diff(&new_committee)); self.committee = new_committee; } ReconfigureNotification::Shutdown => return diff --git a/primary/src/block_waiter.rs b/primary/src/block_waiter.rs index 93d5ef08b..84869c808 100644 --- a/primary/src/block_waiter.rs +++ b/primary/src/block_waiter.rs @@ -334,9 +334,11 @@ impl BlockWaiter { + self.worker_network.cleanup(self.committee.network_diff(&new_committee)); self.committee = new_committee; } ReconfigureNotification::UpdateCommittee(new_committee)=> { + self.worker_network.cleanup(self.committee.network_diff(&new_committee)); self.committee = new_committee; } ReconfigureNotification::Shutdown => return diff --git a/primary/src/core.rs b/primary/src/core.rs index 6840fb3cd..54751244f 100644 --- a/primary/src/core.rs +++ b/primary/src/core.rs @@ -532,13 +532,20 @@ impl Core { /// Update the committee and cleanup internal state. fn change_epoch(&mut self, committee: Committee) { - self.committee = committee; + // Cleanup the network. + self.network + .cleanup(self.committee.network_diff(&committee)); + // Cleanup internal state. self.last_voted.clear(); self.processing.clear(); self.certificates_aggregators.clear(); self.cancel_handlers.clear(); + // Update the committee + self.committee = committee; + + // Cleanup the synchronizer self.synchronizer.update_genesis(&self.committee); } @@ -594,6 +601,10 @@ impl Core { self.change_epoch(new_committee); }, ReconfigureNotification::UpdateCommittee(new_committee) => { + // Cleanup the network. + self.network.cleanup(self.committee.network_diff(&new_committee)); + + // Update the committee. self.committee = new_committee; }, ReconfigureNotification::Shutdown => return diff --git a/primary/src/header_waiter.rs b/primary/src/header_waiter.rs index 00fb0f44f..54ddb3f3c 100644 --- a/primary/src/header_waiter.rs +++ b/primary/src/header_waiter.rs @@ -137,6 +137,11 @@ impl HeaderWaiter { /// Update the committee and cleanup internal state. fn change_epoch(&mut self, committee: Committee) { + self.primary_network + .cleanup(self.committee.network_diff(&committee)); + self.worker_network + .cleanup(self.committee.network_diff(&committee)); + self.committee = committee; self.pending.clear(); @@ -331,6 +336,8 @@ impl HeaderWaiter { self.change_epoch(new_committee); }, ReconfigureNotification::UpdateCommittee(new_committee) => { + self.primary_network.cleanup(self.committee.network_diff(&new_committee)); + self.worker_network.cleanup(self.committee.network_diff(&new_committee)); self.committee = new_committee; }, ReconfigureNotification::Shutdown => return diff --git a/primary/src/helper.rs b/primary/src/helper.rs index 0f546440a..b108ff518 100644 --- a/primary/src/helper.rs +++ b/primary/src/helper.rs @@ -119,9 +119,11 @@ impl Helper { let message = self.rx_committee.borrow().clone(); match message { ReconfigureNotification::NewEpoch(new_committee) => { + self.primary_network.cleanup(self.committee.network_diff(&new_committee)); self.committee = new_committee; }, ReconfigureNotification::UpdateCommittee(new_committee) => { + self.primary_network.cleanup(self.committee.network_diff(&new_committee)); self.committee = new_committee; }, ReconfigureNotification::Shutdown => return diff --git a/primary/src/state_handler.rs b/primary/src/state_handler.rs index 9c73d49d8..4f11e3800 100644 --- a/primary/src/state_handler.rs +++ b/primary/src/state_handler.rs @@ -99,6 +99,9 @@ impl StateHandler { Some(message) = self.rx_reconfigure.recv() => { let shutdown = match &message { ReconfigureNotification::NewEpoch(committee) => { + // Cleanup the network. + self.worker_network.cleanup(self.committee.load().network_diff(committee)); + // Update the committee. self.committee.swap(Arc::new(committee.clone())); @@ -109,6 +112,10 @@ impl StateHandler { false }, ReconfigureNotification::UpdateCommittee(committee) => { + // Cleanup the network. + self.worker_network.cleanup(self.committee.load().network_diff(committee)); + + // Update the committee. self.committee.swap(Arc::new(committee.clone())); tracing::debug!("Committee updated to {}", self.committee); diff --git a/worker/src/helper.rs b/worker/src/helper.rs index c42a8bcd2..8f0eba3df 100644 --- a/worker/src/helper.rs +++ b/worker/src/helper.rs @@ -113,9 +113,11 @@ impl Helper { let message = self.rx_reconfigure.borrow().clone(); match message { ReconfigureNotification::NewEpoch(new_committee) => { + self.network.cleanup(self.committee.network_diff(&new_committee)); self.committee = new_committee; }, ReconfigureNotification::UpdateCommittee(new_committee) => { + self.network.cleanup(self.committee.network_diff(&new_committee)); self.committee = new_committee; }, diff --git a/worker/src/quorum_waiter.rs b/worker/src/quorum_waiter.rs index fdd027b7a..67df13ebc 100644 --- a/worker/src/quorum_waiter.rs +++ b/worker/src/quorum_waiter.rs @@ -116,15 +116,12 @@ impl QuorumWaiter { result.expect("Committee channel dropped"); let message = self.rx_reconfigure.borrow().clone(); match message { - ReconfigureNotification::NewEpoch(new_committee) => { - self.committee = new_committee; - tracing::debug!("Dropping batch: committee updated to {}", self.committee); - break; // Don't wait for acknowledgements. - }, - ReconfigureNotification::UpdateCommittee(new_committee) => { - self.committee = new_committee; - tracing::debug!("Dropping batch: committee updated to {}", self.committee); - break; // Don't wait for acknowledgements. + ReconfigureNotification::NewEpoch(new_committee) + | ReconfigureNotification::UpdateCommittee(new_committee) => { + self.network.cleanup(self.committee.network_diff(&new_committee)); + self.committee = new_committee; + tracing::debug!("Dropping batch: committee updated to {}", self.committee); + break; // Don't wait for acknowledgements. }, ReconfigureNotification::Shutdown => return } diff --git a/worker/src/synchronizer.rs b/worker/src/synchronizer.rs index 49ae298df..81fd19a29 100644 --- a/worker/src/synchronizer.rs +++ b/worker/src/synchronizer.rs @@ -204,6 +204,7 @@ impl Synchronizer { // Reconfigure this task and update the shared committee. let shutdown = match &message { ReconfigureNotification::NewEpoch(new_committee) => { + self.network.cleanup(self.committee.load().network_diff(new_committee)); self.committee.swap(Arc::new(new_committee.clone())); self.pending.clear(); @@ -214,6 +215,7 @@ impl Synchronizer { false } ReconfigureNotification::UpdateCommittee(new_committee) => { + self.network.cleanup(self.committee.load().network_diff(new_committee)); self.committee.swap(Arc::new(new_committee.clone())); tracing::debug!("Committee updated to {}", self.committee);