Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow DHT to be configured to repropagate messages for a number of rounds #3211

Merged
merged 2 commits into from
Aug 30, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion base_layer/core/src/mempool/service/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl MempoolInboundHandlers {
if tx_storage.is_stored() {
debug!(
target: LOG_TARGET,
"Mempool already has transaction: {}", kernel_excess_sig
"Mempool already has transaction: {}.", kernel_excess_sig
);
return Ok(tx_storage);
}
Expand Down
153 changes: 94 additions & 59 deletions comms/dht/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use tari_comms::{
peer_manager::{NodeId, NodeIdentity, PeerFeatures, PeerManager, PeerManagerError, PeerQuery, PeerQuerySortBy},
types::CommsPublicKey,
};
use tari_crypto::tari_utilities::hex::Hex;
use tari_shutdown::ShutdownSignal;
use tari_utilities::message_format::{MessageFormat, MessageFormatError};
use thiserror::Error;
Expand Down Expand Up @@ -101,9 +102,14 @@ impl From<SendError> for DhtActorError {
pub enum DhtRequest {
/// Send a Join request to the network
SendJoin,
/// Inserts a message signature to the msg hash cache. This operation replies with a boolean
/// which is true if the signature already exists in the cache, otherwise false
MsgHashCacheInsert(Vec<u8>, CommsPublicKey, oneshot::Sender<bool>),
/// Inserts a message signature to the msg hash cache. This operation replies with the number of times this message
/// has previously been seen (hit count)
MsgHashCacheInsert {
message_hash: Vec<u8>,
received_from: CommsPublicKey,
reply_tx: oneshot::Sender<u32>,
},
GetMsgHashHitCount(Vec<u8>, oneshot::Sender<u32>),
/// Fetch selected peers according to the broadcast strategy
SelectPeers(BroadcastStrategy, oneshot::Sender<Vec<NodeId>>),
GetMetadata(DhtMetadataKey, oneshot::Sender<Result<Option<Vec<u8>>, DhtActorError>>),
Expand All @@ -114,12 +120,22 @@ impl Display for DhtRequest {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use DhtRequest::*;
match self {
SendJoin => f.write_str("SendJoin"),
MsgHashCacheInsert(_, _, _) => f.write_str("MsgHashCacheInsert"),
SelectPeers(s, _) => f.write_str(&format!("SelectPeers (Strategy={})", s)),
GetMetadata(key, _) => f.write_str(&format!("GetMetadata (key={})", key)),
SendJoin => write!(f, "SendJoin"),
MsgHashCacheInsert {
message_hash,
received_from,
..
} => write!(
f,
"MsgHashCacheInsert(message hash: {}, received from: {})",
message_hash.to_hex(),
received_from.to_hex(),
),
GetMsgHashHitCount(hash, _) => write!(f, "GetMsgHashHitCount({})", hash.to_hex()),
SelectPeers(s, _) => write!(f, "SelectPeers (Strategy={})", s),
GetMetadata(key, _) => write!(f, "GetMetadata (key={})", key),
SetMetadata(key, value, _) => {
f.write_str(&format!("SetMetadata (key={}, value={} bytes)", key, value.len()))
write!(f, "SetMetadata (key={}, value={} bytes)", key, value.len())
},
}
}
Expand Down Expand Up @@ -147,14 +163,27 @@ impl DhtRequester {
reply_rx.await.map_err(|_| DhtActorError::ReplyCanceled)
}

pub async fn insert_message_hash(
pub async fn add_message_to_dedup_cache(
&mut self,
message_hash: Vec<u8>,
public_key: CommsPublicKey,
) -> Result<bool, DhtActorError> {
received_from: CommsPublicKey,
) -> Result<u32, DhtActorError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.sender
.send(DhtRequest::MsgHashCacheInsert {
message_hash,
received_from,
reply_tx,
})
.await?;

reply_rx.await.map_err(|_| DhtActorError::ReplyCanceled)
}

pub async fn get_message_cache_hit_count(&mut self, message_hash: Vec<u8>) -> Result<u32, DhtActorError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.sender
.send(DhtRequest::MsgHashCacheInsert(message_hash, public_key, reply_tx))
.send(DhtRequest::GetMsgHashHitCount(message_hash, reply_tx))
.await?;

reply_rx.await.map_err(|_| DhtActorError::ReplyCanceled)
Expand Down Expand Up @@ -268,7 +297,7 @@ impl DhtActor {
},

_ = dedup_cache_trim_ticker.select_next_some() => {
if let Err(err) = self.msg_hash_dedup_cache.truncate().await {
if let Err(err) = self.msg_hash_dedup_cache.trim_entries().await {
error!(target: LOG_TARGET, "Error when trimming message dedup cache: {:?}", err);
}
},
Expand Down Expand Up @@ -300,24 +329,36 @@ impl DhtActor {
let outbound_requester = self.outbound_requester.clone();
Box::pin(Self::broadcast_join(node_identity, outbound_requester))
},
MsgHashCacheInsert(hash, public_key, reply_tx) => {
MsgHashCacheInsert {
message_hash,
received_from,
reply_tx,
} => {
let msg_hash_cache = self.msg_hash_dedup_cache.clone();
Box::pin(async move {
match msg_hash_cache.insert_body_hash_if_unique(hash, public_key).await {
Ok(already_exists) => {
let _ = reply_tx.send(already_exists).map_err(|_| DhtActorError::ReplyCanceled);
match msg_hash_cache.add_body_hash(message_hash, received_from).await {
Ok(hit_count) => {
let _ = reply_tx.send(hit_count);
},
Err(err) => {
warn!(
target: LOG_TARGET,
"Unable to update message dedup cache because {:?}", err
);
let _ = reply_tx.send(false).map_err(|_| DhtActorError::ReplyCanceled);
let _ = reply_tx.send(0);
},
}
Ok(())
})
},
GetMsgHashHitCount(hash, reply_tx) => {
let msg_hash_cache = self.msg_hash_dedup_cache.clone();
Box::pin(async move {
let hit_count = msg_hash_cache.get_hit_count(hash).await?;
let _ = reply_tx.send(hit_count);
Ok(())
})
},
SelectPeers(broadcast_strategy, reply_tx) => {
let peer_manager = Arc::clone(&self.peer_manager);
let node_identity = Arc::clone(&self.node_identity);
Expand Down Expand Up @@ -690,11 +731,9 @@ mod test {
test_utils::{build_peer_manager, make_client_identity, make_node_identity},
};
use chrono::{DateTime, Utc};
use std::time::Duration;
use tari_comms::test_utils::mocks::{create_connectivity_mock, create_peer_connection_mock_pair};
use tari_shutdown::Shutdown;
use tari_test_utils::random;
use tokio::time::delay_for;

async fn db_connection() -> DbConnection {
let conn = DbConnection::connect_memory(random::string(8)).await.unwrap();
Expand Down Expand Up @@ -756,21 +795,21 @@ mod test {
actor.spawn();

let signature = vec![1u8, 2, 3];
let is_dup = requester
.insert_message_hash(signature.clone(), CommsPublicKey::default())
let num_hits = requester
.add_message_to_dedup_cache(signature.clone(), CommsPublicKey::default())
.await
.unwrap();
assert!(!is_dup);
let is_dup = requester
.insert_message_hash(signature, CommsPublicKey::default())
assert_eq!(num_hits, 1);
let num_hits = requester
.add_message_to_dedup_cache(signature, CommsPublicKey::default())
.await
.unwrap();
assert!(is_dup);
let is_dup = requester
.insert_message_hash(Vec::new(), CommsPublicKey::default())
assert_eq!(num_hits, 2);
let num_hits = requester
.add_message_to_dedup_cache(Vec::new(), CommsPublicKey::default())
.await
.unwrap();
assert!(!is_dup);
assert_eq!(num_hits, 1);
}

#[tokio_macros::test_basic]
Expand All @@ -783,14 +822,12 @@ mod test {
let (actor_tx, actor_rx) = mpsc::channel(1);
let mut requester = DhtRequester::new(actor_tx);
let outbound_requester = OutboundMessageRequester::new(out_tx);
let mut shutdown = Shutdown::new();
let trim_interval_ms = 500;
let shutdown = Shutdown::new();
// Note: This must be equal or larger than the minimum dedup cache capacity for DedupCacheDatabase
let capacity = 120;
let capacity = 10;
let actor = DhtActor::new(
DhtConfig {
dedup_cache_capacity: capacity,
dedup_cache_trim_interval: Duration::from_millis(trim_interval_ms),
..Default::default()
},
db_connection().await,
Expand All @@ -803,63 +840,61 @@ mod test {
);

// Create signatures for double the dedup cache capacity
let mut signatures: Vec<Vec<u8>> = Vec::new();
for i in 0..(capacity * 2) {
signatures.push(vec![1u8, 2, i as u8])
}
let signatures = (0..(capacity * 2)).map(|i| vec![1u8, 2, i as u8]).collect::<Vec<_>>();

// Pre-populate the dedup cache; everything should be accepted due to cleanup ticker not active yet
// Pre-populate the dedup cache; everything should be accepted because the cleanup ticker has not run yet
for key in &signatures {
let is_dup = actor
let num_hits = actor
.msg_hash_dedup_cache
.insert_body_hash_if_unique(key.clone(), CommsPublicKey::default())
.add_body_hash(key.clone(), CommsPublicKey::default())
.await
.unwrap();
assert!(!is_dup);
assert_eq!(num_hits, 1);
}
// Try to re-insert all; everything should be marked as duplicates due to cleanup ticker not active yet
// Try to re-insert all; all hashes should have incremented their hit count
for key in &signatures {
let is_dup = actor
let num_hits = actor
.msg_hash_dedup_cache
.insert_body_hash_if_unique(key.clone(), CommsPublicKey::default())
.add_body_hash(key.clone(), CommsPublicKey::default())
.await
.unwrap();
assert!(is_dup);
assert_eq!(num_hits, 2);
}

// The cleanup ticker starts when the actor is spawned; the first cleanup event will fire immediately
let dedup_cache_db = actor.msg_hash_dedup_cache.clone();
// The cleanup ticker starts when the actor is spawned; the first cleanup event will fire fairly soon after the
// task is running on a thread. To remove this race condition, we trim the cache in the test.
dedup_cache_db.trim_entries().await.unwrap();
actor.spawn();

// Verify that the last half of the signatures are still present in the cache
for key in signatures.iter().take(capacity * 2).skip(capacity) {
let is_dup = requester
.insert_message_hash(key.clone(), CommsPublicKey::default())
let num_hits = requester
.add_message_to_dedup_cache(key.clone(), CommsPublicKey::default())
.await
.unwrap();
assert!(is_dup);
assert_eq!(num_hits, 3);
}
// Verify that the first half of the signatures have been removed and can be re-inserted into cache
for key in signatures.iter().take(capacity) {
let is_dup = requester
.insert_message_hash(key.clone(), CommsPublicKey::default())
let num_hits = requester
.add_message_to_dedup_cache(key.clone(), CommsPublicKey::default())
.await
.unwrap();
assert!(!is_dup);
assert_eq!(num_hits, 1);
}

// Let the trim period expire; this will trim the dedup cache to capacity
delay_for(Duration::from_millis(trim_interval_ms * 2)).await;
// Trim the database of excess entries
dedup_cache_db.trim_entries().await.unwrap();

// Verify that the last half of the signatures have been removed and can be re-inserted into cache
for key in signatures.iter().take(capacity * 2).skip(capacity) {
let is_dup = requester
.insert_message_hash(key.clone(), CommsPublicKey::default())
let num_hits = requester
.add_message_to_dedup_cache(key.clone(), CommsPublicKey::default())
.await
.unwrap();
assert!(!is_dup);
assert_eq!(num_hits, 1);
}

shutdown.trigger().unwrap();
}

#[tokio_macros::test_basic]
Expand Down
5 changes: 5 additions & 0 deletions comms/dht/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ impl DhtBuilder {
self
}

pub fn with_dedup_discard_hit_count(mut self, max_hit_count: usize) -> Self {
self.config.dedup_allowed_message_occurrences = max_hit_count;
self
}

pub fn with_num_random_nodes(mut self, n: usize) -> Self {
self.config.num_random_nodes = n;
self
Expand Down
6 changes: 6 additions & 0 deletions comms/dht/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ pub struct DhtConfig {
/// The periodic trim interval for items in the message hash cache
/// Default: 300s (5 mins)
pub dedup_cache_trim_interval: Duration,
/// The number of occurrences of a message is allowed to pass through the DHT pipeline before being
/// deduped/discarded
/// Default: 1
pub dedup_allowed_message_occurrences: usize,
/// The duration to wait for a peer discovery to complete before giving up.
/// Default: 2 minutes
pub discovery_request_timeout: Duration,
Expand Down Expand Up @@ -136,6 +140,7 @@ impl DhtConfig {

impl Default for DhtConfig {
fn default() -> Self {
// NB: please remember to update field comments to reflect these defaults
Self {
num_neighbouring_nodes: 8,
num_random_nodes: 4,
Expand All @@ -151,6 +156,7 @@ impl Default for DhtConfig {
saf_max_message_size: 512 * 1024,
dedup_cache_capacity: 2_500,
dedup_cache_trim_interval: Duration::from_secs(5 * 60),
dedup_allowed_message_occurrences: 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: We may need a TODO comment here to test or further implement this feature.

database_url: DbConnectionUrl::Memory,
discovery_request_timeout: Duration::from_secs(2 * 60),
connectivity_update_interval: Duration::from_secs(2 * 60),
Expand Down
Loading