Skip to content

Commit

Permalink
Fix network memory leaks (MystenLabs#687)
Browse files Browse the repository at this point in the history
Facility to cleanup network
  • Loading branch information
asonnino authored and huitseeker committed Aug 8, 2022
1 parent bdc46a0 commit 0351f42
Show file tree
Hide file tree
Showing 14 changed files with 115 additions and 11 deletions.
40 changes: 39 additions & 1 deletion config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crypto::{traits::EncodeDecodeBase64, PublicKey};
use multiaddr::Multiaddr;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{
collections::{BTreeMap, HashMap},
collections::{BTreeMap, HashMap, HashSet},
fs::{self, OpenOptions},
io::{BufWriter, Write as _},
net::SocketAddr,
Expand Down Expand Up @@ -452,6 +452,44 @@ impl Committee {
.collect()
}

/// Return all the network addresses in the committee.
fn get_all_network_addresses(&self) -> HashSet<&Multiaddr> {
self.authorities
.values()
.flat_map(|authority| {
std::iter::once(&authority.primary.primary_to_primary)
.chain(std::iter::once(&authority.primary.worker_to_primary))
.chain(
authority
.workers
.values()
.map(|address| &address.transactions),
)
.chain(
authority
.workers
.values()
.map(|address| &address.worker_to_worker),
)
.chain(
authority
.workers
.values()
.map(|address| &address.primary_to_worker),
)
})
.collect()
}

/// Return the network addresses that are present in the current committee but that are absent
/// from the new committee (provided as argument).
pub fn network_diff<'a>(&'a self, other: &'a Self) -> HashSet<&Multiaddr> {
self.get_all_network_addresses()
.difference(&other.get_all_network_addresses())
.cloned()
.collect()
}

/// Update the networking information of some of the primaries. The arguments are a full vector of
/// authorities which Public key and Stake must match the one stored in the current Committee. Any discrepancy
/// will generate no update and return a vector of errors.
Expand Down
18 changes: 18 additions & 0 deletions network/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ impl PrimaryNetwork {
}
}
}

pub fn cleanup<'a, I>(&mut self, to_remove: I)
where
I: IntoIterator<Item = &'a Multiaddr>,
{
for address in to_remove {
self.clients.remove(address);
}
}
}

impl BaseNetwork for PrimaryNetwork {
Expand Down Expand Up @@ -183,6 +192,15 @@ impl PrimaryToWorkerNetwork {
m.set_network_available_tasks(self.executor.available_capacity() as i64, None);
}
}

pub fn cleanup<'a, I>(&mut self, to_remove: I)
where
I: IntoIterator<Item = &'a Multiaddr>,
{
for address in to_remove {
self.clients.remove(address);
}
}
}

impl Default for PrimaryToWorkerNetwork {
Expand Down
9 changes: 9 additions & 0 deletions network/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ impl WorkerNetwork {
}
}
}

pub fn cleanup<'a, I>(&mut self, to_remove: I)
where
I: IntoIterator<Item = &'a Multiaddr>,
{
for address in to_remove {
self.clients.remove(address);
}
}
}

impl BaseNetwork for WorkerNetwork {
Expand Down
3 changes: 3 additions & 0 deletions node/src/restarter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ impl NodeRestarter {
join_all(worker_cancel_handles).await;
tracing::debug!("Committee reconfiguration message successfully sent");

// Cleanup the network.
worker_network.cleanup(committee.network_diff(&new_committee));

// Wait for the components to shut down.
join_all(handles.drain(..)).await;
tracing::debug!("All tasks successfully exited");
Expand Down
2 changes: 2 additions & 0 deletions primary/src/block_remover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,11 @@ impl BlockRemover {
let message = self.rx_reconfigure.borrow().clone();
match message {
ReconfigureNotification::NewEpoch(new_committee)=> {
self.worker_network.cleanup(self.committee.network_diff(&new_committee));
self.committee = new_committee;
}
ReconfigureNotification::UpdateCommittee(new_committee)=> {
self.worker_network.cleanup(self.committee.network_diff(&new_committee));
self.committee = new_committee;
}
ReconfigureNotification::Shutdown => return
Expand Down
4 changes: 4 additions & 0 deletions primary/src/block_synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,13 @@ impl BlockSynchronizer {
let message = self.rx_reconfigure.borrow().clone();
match message {
ReconfigureNotification::NewEpoch(new_committee)=> {
self.primary_network.cleanup(self.committee.network_diff(&new_committee));
self.worker_network.cleanup(self.committee.network_diff(&new_committee));
self.committee = new_committee;
}
ReconfigureNotification::UpdateCommittee(new_committee)=> {
self.primary_network.cleanup(self.committee.network_diff(&new_committee));
self.worker_network.cleanup(self.committee.network_diff(&new_committee));
self.committee = new_committee;
}
ReconfigureNotification::Shutdown => return
Expand Down
2 changes: 2 additions & 0 deletions primary/src/block_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,11 @@ impl<SynchronizerHandler: Handler + Send + Sync + 'static> BlockWaiter<Synchroni
let message = self.rx_reconfigure.borrow().clone();
match message {
ReconfigureNotification::NewEpoch(new_committee)=> {
self.worker_network.cleanup(self.committee.network_diff(&new_committee));
self.committee = new_committee;
}
ReconfigureNotification::UpdateCommittee(new_committee)=> {
self.worker_network.cleanup(self.committee.network_diff(&new_committee));
self.committee = new_committee;
}
ReconfigureNotification::Shutdown => return
Expand Down
13 changes: 12 additions & 1 deletion primary/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,13 +532,20 @@ impl Core {

/// Update the committee and cleanup internal state.
fn change_epoch(&mut self, committee: Committee) {
self.committee = committee;
// Cleanup the network.
self.network
.cleanup(self.committee.network_diff(&committee));

// Cleanup internal state.
self.last_voted.clear();
self.processing.clear();
self.certificates_aggregators.clear();
self.cancel_handlers.clear();

// Update the committee
self.committee = committee;

// Cleanup the synchronizer
self.synchronizer.update_genesis(&self.committee);
}

Expand Down Expand Up @@ -594,6 +601,10 @@ impl Core {
self.change_epoch(new_committee);
},
ReconfigureNotification::UpdateCommittee(new_committee) => {
// Cleanup the network.
self.network.cleanup(self.committee.network_diff(&new_committee));

// Update the committee.
self.committee = new_committee;
},
ReconfigureNotification::Shutdown => return
Expand Down
7 changes: 7 additions & 0 deletions primary/src/header_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ impl HeaderWaiter {

/// Update the committee and cleanup internal state.
fn change_epoch(&mut self, committee: Committee) {
self.primary_network
.cleanup(self.committee.network_diff(&committee));
self.worker_network
.cleanup(self.committee.network_diff(&committee));

self.committee = committee;

self.pending.clear();
Expand Down Expand Up @@ -331,6 +336,8 @@ impl HeaderWaiter {
self.change_epoch(new_committee);
},
ReconfigureNotification::UpdateCommittee(new_committee) => {
self.primary_network.cleanup(self.committee.network_diff(&new_committee));
self.worker_network.cleanup(self.committee.network_diff(&new_committee));
self.committee = new_committee;
},
ReconfigureNotification::Shutdown => return
Expand Down
2 changes: 2 additions & 0 deletions primary/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,11 @@ impl Helper {
let message = self.rx_committee.borrow().clone();
match message {
ReconfigureNotification::NewEpoch(new_committee) => {
self.primary_network.cleanup(self.committee.network_diff(&new_committee));
self.committee = new_committee;
},
ReconfigureNotification::UpdateCommittee(new_committee) => {
self.primary_network.cleanup(self.committee.network_diff(&new_committee));
self.committee = new_committee;
},
ReconfigureNotification::Shutdown => return
Expand Down
7 changes: 7 additions & 0 deletions primary/src/state_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ impl StateHandler {
Some(message) = self.rx_reconfigure.recv() => {
let shutdown = match &message {
ReconfigureNotification::NewEpoch(committee) => {
// Cleanup the network.
self.worker_network.cleanup(self.committee.load().network_diff(committee));

// Update the committee.
self.committee.swap(Arc::new(committee.clone()));

Expand All @@ -109,6 +112,10 @@ impl StateHandler {
false
},
ReconfigureNotification::UpdateCommittee(committee) => {
// Cleanup the network.
self.worker_network.cleanup(self.committee.load().network_diff(committee));

// Update the committee.
self.committee.swap(Arc::new(committee.clone()));

tracing::debug!("Committee updated to {}", self.committee);
Expand Down
2 changes: 2 additions & 0 deletions worker/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,11 @@ impl Helper {
let message = self.rx_reconfigure.borrow().clone();
match message {
ReconfigureNotification::NewEpoch(new_committee) => {
self.network.cleanup(self.committee.network_diff(&new_committee));
self.committee = new_committee;
},
ReconfigureNotification::UpdateCommittee(new_committee) => {
self.network.cleanup(self.committee.network_diff(&new_committee));
self.committee = new_committee;

},
Expand Down
15 changes: 6 additions & 9 deletions worker/src/quorum_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,12 @@ impl QuorumWaiter {
result.expect("Committee channel dropped");
let message = self.rx_reconfigure.borrow().clone();
match message {
ReconfigureNotification::NewEpoch(new_committee) => {
self.committee = new_committee;
tracing::debug!("Dropping batch: committee updated to {}", self.committee);
break; // Don't wait for acknowledgements.
},
ReconfigureNotification::UpdateCommittee(new_committee) => {
self.committee = new_committee;
tracing::debug!("Dropping batch: committee updated to {}", self.committee);
break; // Don't wait for acknowledgements.
ReconfigureNotification::NewEpoch(new_committee)
| ReconfigureNotification::UpdateCommittee(new_committee) => {
self.network.cleanup(self.committee.network_diff(&new_committee));
self.committee = new_committee;
tracing::debug!("Dropping batch: committee updated to {}", self.committee);
break; // Don't wait for acknowledgements.
},
ReconfigureNotification::Shutdown => return
}
Expand Down
2 changes: 2 additions & 0 deletions worker/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ impl Synchronizer {
// Reconfigure this task and update the shared committee.
let shutdown = match &message {
ReconfigureNotification::NewEpoch(new_committee) => {
self.network.cleanup(self.committee.load().network_diff(new_committee));
self.committee.swap(Arc::new(new_committee.clone()));

self.pending.clear();
Expand All @@ -214,6 +215,7 @@ impl Synchronizer {
false
}
ReconfigureNotification::UpdateCommittee(new_committee) => {
self.network.cleanup(self.committee.load().network_diff(new_committee));
self.committee.swap(Arc::new(new_committee.clone()));

tracing::debug!("Committee updated to {}", self.committee);
Expand Down

0 comments on commit 0351f42

Please sign in to comment.