Skip to content

Commit

Permalink
Add moka cache for stickiness
Browse files Browse the repository at this point in the history
  • Loading branch information
bchocho committed May 31, 2023
1 parent 705ffbd commit d4a1a42
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 35 deletions.
142 changes: 139 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ maplit = "1.0.2"
mime = "0.3.16"
mirai-annotations = "1.12.0"
mockall = "0.11.0"
moka = "0.11.0"
more-asserts = "0.3.0"
native-tls = "0.2.10"
ntest = "0.9.0"
Expand Down
1 change: 1 addition & 0 deletions mempool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ fail = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
maplit = { workspace = true }
moka = { workspace = true }
once_cell = { workspace = true }
proptest = { workspace = true, optional = true }
rand = { workspace = true }
Expand Down
71 changes: 39 additions & 32 deletions mempool/src/shared_mempool/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use aptos_types::{account_address::AccountAddress, transaction::SignedTransactio
use aptos_vm_validator::vm_validator::TransactionValidation;
use fail::fail_point;
use itertools::Itertools;
use moka::sync::Cache;
use serde::{Deserialize, Serialize};
use std::{
cmp::Ordering,
Expand Down Expand Up @@ -87,7 +88,7 @@ pub(crate) struct MempoolNetworkInterface<NetworkClient> {
mempool_config: MempoolConfig,
prioritized_peers_comparator: PrioritizedPeersComparator,
aptos_data_client: Option<AptosDataClient>,
// TODO: add stickiness cache
stickiness_cache: Arc<Cache<AccountAddress, Vec<PeerNetworkId>>>,
}

impl<NetworkClient: NetworkClientInterface<MempoolSyncMsg>> MempoolNetworkInterface<NetworkClient> {
Expand All @@ -105,6 +106,12 @@ impl<NetworkClient: NetworkClientInterface<MempoolSyncMsg>> MempoolNetworkInterf
mempool_config,
prioritized_peers_comparator: PrioritizedPeersComparator::new(),
aptos_data_client,
stickiness_cache: Arc::new(
Cache::builder()
.max_capacity(100_000)
.time_to_idle(Duration::from_secs(10))
.build(),
),
}
}

Expand Down Expand Up @@ -468,43 +475,43 @@ impl<NetworkClient: NetworkClientInterface<MempoolSyncMsg>> MempoolNetworkInterf
self.sync_states.read().get(peer).is_some()
}

pub fn broadcast_peers(&self, _account: &AccountAddress) -> Vec<PeerNetworkId> {
// TODO: 1. for now, use the first prioritized peer
// TODO: 2. use account to check stickiness cache
// TODO: 3. then, expand to random other peers
pub fn broadcast_peers(&self, account: &AccountAddress) -> Vec<PeerNetworkId> {
// TODO: Need a mode to exclude already tried peers
if self.role.is_validator() {
return vec![];
}

if let Some(aptos_data_client) = &self.aptos_data_client {
let peer_states = aptos_data_client.get_peer_states();
let mut peer_versions: Vec<_> = peer_states
.get_peer_to_states()
.into_iter()
.map(|(peer, state)| {
if let Some(summary) = state.storage_summary_if_not_ignored() {
if let Some(ledger_info) = &summary.data_summary.synced_ledger_info {
return (peer, ledger_info.ledger_info().version());
// TODO: Check if the peers are actually still good. If not, atomically replace.
self.stickiness_cache.get_with(*account, || {
let peer_states = aptos_data_client.get_peer_states();
let mut peer_versions: Vec<_> = peer_states
.get_peer_to_states()
.into_iter()
.map(|(peer, state)| {
if let Some(summary) = state.storage_summary_if_not_ignored() {
if let Some(ledger_info) = &summary.data_summary.synced_ledger_info {
return (peer, ledger_info.ledger_info().version());
}
}
}
(peer, 0)
})
.collect();
// TODO: random shuffle to keep from biasing
// peer_versions.shuffle()
// TODO: what if we don't actually have a mempool connection to this host?
// TODO: do we have to filter? or penalize but still allow selection?
peer_versions.sort_by_key(|(_peer, version)| *version);
let peers: Vec<_> = peer_versions
.iter()
.rev()
.take(self.mempool_config.default_failovers + 1)
.map(|(peer, _version)| *peer)
.collect();

// TODO: add a sample, completely remove
info!("peers (len {}): {:?}", peer_versions.len(), peers);
peers
(peer, 0)
})
.collect();
// TODO: random shuffle to keep from biasing
// peer_versions.shuffle()
// TODO: what if we don't actually have a mempool connection to this host?
// TODO: do we have to filter? or penalize but still allow selection?
peer_versions.sort_by_key(|(_peer, version)| *version);
let peers: Vec<_> = peer_versions
.iter()
.rev()
.take(self.mempool_config.default_failovers + 1)
.map(|(peer, _version)| *peer)
.collect();
// TODO: add a sample, completely remove
info!("peers (len {}): {:?}", peer_versions.len(), peers);
peers
})
} else {
// TODO: Remove this legacy behavior, with a good Mock for AptosDataClientInterface
let prioritized_peers = self.prioritized_peers.lock();
Expand Down

0 comments on commit d4a1a42

Please sign in to comment.