Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
bchocho committed Oct 18, 2023
1 parent 6b82523 commit ef2f46a
Show file tree
Hide file tree
Showing 19 changed files with 425 additions and 279 deletions.
34 changes: 19 additions & 15 deletions config/src/config/mempool_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl Default for MempoolConfig {
eager_expire_threshold_ms: Some(10_000),
eager_expire_time_ms: 3_000,
peer_update_interval_ms: 1_000,
broadcast_peers_selector: BroadcastPeersSelectorConfig::AllPeers,
broadcast_peers_selector: BroadcastPeersSelectorConfig::PrioritizedPeers(1),
}
}
}
Expand Down Expand Up @@ -118,10 +118,11 @@ impl ConfigOptimizer for MempoolConfig {
}
if node_type.is_validator_fullnode() {
// Set broadcast peers to prioritized, with a max of 1
// TODO: as a workaround for smoke tests, always apply even if there is an override
mempool_config.broadcast_peers_selector =
BroadcastPeersSelectorConfig::PrioritizedPeers(1);
modified_config = true;
if local_mempool_config_yaml["broadcast_peers_selector"].is_null() {
mempool_config.broadcast_peers_selector =
BroadcastPeersSelectorConfig::PrioritizedPeers(1);
modified_config = true;
}

// Set the shared_mempool_max_concurrent_inbound_syncs to 16 (default is 4)
if local_mempool_config_yaml["shared_mempool_max_concurrent_inbound_syncs"].is_null() {
Expand All @@ -135,13 +136,20 @@ impl ConfigOptimizer for MempoolConfig {
modified_config = true;
}
} else if node_type.is_validator() {
// None for now
// TODO: With quorum store, this isn't used. Used for testing, but should be removed.
if local_mempool_config_yaml["broadcast_peers_selector"].is_null() {
mempool_config.broadcast_peers_selector =
BroadcastPeersSelectorConfig::PrioritizedPeers(1);
modified_config = true;
}
} else {
// The node is a PFN
// Set broadcast peers to fresh, with a max of 2
// TODO: as a workaround for smoke tests, always apply even if there is an override
mempool_config.broadcast_peers_selector = BroadcastPeersSelectorConfig::FreshPeers(2);
modified_config = true;
if local_mempool_config_yaml["broadcast_peers_selector"].is_null() {
mempool_config.broadcast_peers_selector =
BroadcastPeersSelectorConfig::FreshPeers(2, 1000);
modified_config = true;
}
}

Ok(modified_config)
Expand All @@ -151,8 +159,8 @@ impl ConfigOptimizer for MempoolConfig {
#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum BroadcastPeersSelectorConfig {
AllPeers,
FreshPeers(usize),
/// num_peers_to_select, version_threshold
FreshPeers(usize, u64),
PrioritizedPeers(usize),
}

Expand Down Expand Up @@ -268,10 +276,6 @@ mod tests {
mempool_config.max_broadcasts_per_peer,
local_max_broadcasts_per_peer
);
assert_ne!(
mempool_config.broadcast_peers_selector,
default_mempool_config.broadcast_peers_selector
);
assert_ne!(
mempool_config.shared_mempool_tick_interval_ms,
default_mempool_config.shared_mempool_tick_interval_ms
Expand Down
88 changes: 45 additions & 43 deletions mempool/src/core_mempool/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl TimelineIndex {
&self,
timeline_id: u64,
count: usize,
peer: Option<PeerNetworkId>,
peer: PeerNetworkId,
) -> (Vec<(AccountAddress, u64)>, u64) {
let mut batch = vec![];
let mut updated_timeline_id = timeline_id;
Expand All @@ -226,18 +226,15 @@ impl TimelineIndex {
.range((Bound::Excluded(timeline_id), Bound::Unbounded))
{
updated_timeline_id = id;
match (peer, timeline_peer) {
(Some(peer), Some(timeline_peer)) => {
match timeline_peer {
Some(timeline_peer) => {
if peer == *timeline_peer {
batch.push((*address, *sequence_number));
}
},
(None, None) => {
None => {
batch.push((*address, *sequence_number));
},
_ => {
panic!("mismatch: {:?}, {:?}", peer, timeline_peer);
},
}
if batch.len() == count {
break;
Expand Down Expand Up @@ -273,56 +270,65 @@ impl TimelineIndex {
.collect()
}

pub(crate) fn insert(
&mut self,
txn: &mut MempoolTransaction,
peers: Option<Vec<PeerNetworkId>>,
) {
if let Some(peers) = peers {
let mut timeline_ids = vec![];
for peer in peers {
self.timeline.insert(
self.timeline_id,
(
txn.get_sender(),
txn.sequence_info.transaction_sequence_number,
Some(peer),
),
);
timeline_ids.push(self.timeline_id);
self.timeline_id += 1;
}
txn.timeline_state = TimelineState::Ready(timeline_ids);
} else {
pub(crate) fn insert(&mut self, txn: &mut MempoolTransaction, peers: Vec<PeerNetworkId>) {
let mut timeline_ids = vec![];
for peer in peers {
self.timeline.insert(
self.timeline_id,
(
txn.get_sender(),
txn.sequence_info.transaction_sequence_number,
None,
Some(peer),
),
);
txn.timeline_state = TimelineState::Ready(vec![self.timeline_id]);
timeline_ids.push((peer, self.timeline_id));
self.timeline_id += 1;
}
txn.timeline_state = TimelineState::Ready(timeline_ids);
}

pub(crate) fn update(&mut self, txn: &mut MempoolTransaction, peers: Vec<PeerNetworkId>) {
let sender = txn.get_sender();
let sequence_number = txn.sequence_info.transaction_sequence_number;
if let TimelineState::Ready(ref mut timeline_ids) = txn.timeline_state {
for peer in peers {
self.timeline
.insert(self.timeline_id, (sender, sequence_number, Some(peer)));
timeline_ids.push(self.timeline_id);
self.timeline_id += 1;
if let TimelineState::Ready(previous_timeline_entries) = &mut txn.timeline_state {
// TODO: this seems pretty inefficient, but a more efficient way might be harder to understand

// (1) partition previous_timeline_entries into those that are still in peers and those
// that are not
let (to_remain, to_remove): (Vec<_>, Vec<_>) = previous_timeline_entries
.clone()
.into_iter()
.partition(|(peer, _)| peers.contains(peer));

// (2) remove the ones that are not in peers
for (_peer, timeline_id) in &to_remove {
self.timeline.remove(timeline_id);
}

// (3) add the new peers that are not already in the timeline
let new_peers = peers
.iter()
.filter(|&peer| !to_remain.iter().any(|(peer2, _)| peer == peer2))
.map(|peer| {
let timeline_id = self.timeline_id;
self.timeline
.insert(timeline_id, (sender, sequence_number, Some(*peer)));
self.timeline_id += 1;
(*peer, timeline_id)
});

// (4) combine the remaining with the new
previous_timeline_entries.extend(new_peers);
} else {
// TODO: possibly this should just be one method?
// self.insert(txn, Some(peers));
panic!("unexpected");
};
}

pub(crate) fn remove(&mut self, txn: &MempoolTransaction) {
if let TimelineState::Ready(timeline_ids) = &txn.timeline_state {
for timeline_id in timeline_ids {
for (_peer, timeline_id) in timeline_ids {
self.timeline.remove(timeline_id);
}
}
Expand Down Expand Up @@ -372,7 +378,7 @@ impl MultiBucketTimelineIndex {
&self,
timeline_ids: &MultiBucketTimelineIndexIds,
count: usize,
peer: Option<PeerNetworkId>,
peer: PeerNetworkId,
) -> (Vec<Vec<(AccountAddress, u64)>>, MultiBucketTimelineIndexIds) {
assert_eq!(timeline_ids.id_per_bucket.len(), self.bucket_mins.len());

Expand Down Expand Up @@ -434,11 +440,7 @@ impl MultiBucketTimelineIndex {
self.timelines.get_mut(index).unwrap()
}

pub(crate) fn insert(
&mut self,
txn: &mut MempoolTransaction,
peers: Option<Vec<PeerNetworkId>>,
) {
pub(crate) fn insert(&mut self, txn: &mut MempoolTransaction, peers: Vec<PeerNetworkId>) {
self.get_timeline(txn.ranking_score).insert(txn, peers);
}

Expand Down
3 changes: 2 additions & 1 deletion mempool/src/core_mempool/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ impl Mempool {
&self,
timeline_id: &MultiBucketTimelineIndexIds,
count: usize,
peer: Option<PeerNetworkId>,
peer: PeerNetworkId,
) -> (Vec<SignedTransaction>, MultiBucketTimelineIndexIds) {
self.transactions.read_timeline(timeline_id, count, peer)
}
Expand All @@ -417,6 +417,7 @@ impl Mempool {
pub(crate) fn timeline_range(
&self,
start_end_pairs: &Vec<(u64, u64)>,
// TODO: do we need option here?
peer: Option<PeerNetworkId>,
) -> Vec<SignedTransaction> {
self.transactions.timeline_range(start_end_pairs, peer)
Expand Down
3 changes: 2 additions & 1 deletion mempool/src/core_mempool/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{core_mempool::TXN_INDEX_ESTIMATED_BYTES, counters};
use aptos_config::network_id::PeerNetworkId;
use aptos_crypto::HashValue;
use aptos_types::{account_address::AccountAddress, transaction::SignedTransaction};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -73,7 +74,7 @@ pub enum TimelineState {
// The transaction is ready for broadcast.
// The vector shows the position in the log -- the transaction can be present in multiple
// positions in the log due to retries to other peers.
Ready(Vec<u64>),
Ready(Vec<(PeerNetworkId, u64)>),
// Transaction is not yet ready for broadcast, but it might change in the future.
NotReady,
// Transaction will never be qualified for broadcasting.
Expand Down
Loading

0 comments on commit ef2f46a

Please sign in to comment.