Skip to content

Commit

Permalink
feat(dht): allow messages to be reporopagated for a number of rounds …
Browse files Browse the repository at this point in the history
…(gossip)

Use the dedup cache hit count to allow certain duplicate messages through a
configurable number of times. This improves mempool synchronization.
  • Loading branch information
sdbondi committed Aug 19, 2021
1 parent 211dcfd commit 3cc2399
Show file tree
Hide file tree
Showing 23 changed files with 372 additions and 201 deletions.
82 changes: 38 additions & 44 deletions comms/dht/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ impl From<SendError> for DhtActorError {
pub enum DhtRequest {
/// Send a Join request to the network
SendJoin,
/// Inserts a message signature to the msg hash cache. This operation replies with a boolean
/// which is true if the signature already exists in the cache, otherwise false
MsgHashCacheInsert(Vec<u8>, CommsPublicKey, oneshot::Sender<bool>),
/// Inserts a message signature to the msg hash cache. This operation replies with the number of times this message
/// has previously been seen (hit count)
MsgHashCacheInsert(Vec<u8>, CommsPublicKey, oneshot::Sender<u32>),
/// Fetch selected peers according to the broadcast strategy
SelectPeers(BroadcastStrategy, oneshot::Sender<Vec<NodeId>>),
GetMetadata(DhtMetadataKey, oneshot::Sender<Result<Option<Vec<u8>>, DhtActorError>>),
Expand Down Expand Up @@ -151,7 +151,7 @@ impl DhtRequester {
&mut self,
message_hash: Vec<u8>,
public_key: CommsPublicKey,
) -> Result<bool, DhtActorError> {
) -> Result<u32, DhtActorError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.sender
.send(DhtRequest::MsgHashCacheInsert(message_hash, public_key, reply_tx))
Expand Down Expand Up @@ -268,7 +268,7 @@ impl DhtActor {
},

_ = dedup_cache_trim_ticker.select_next_some() => {
if let Err(err) = self.msg_hash_dedup_cache.truncate().await {
if let Err(err) = self.msg_hash_dedup_cache.trim_entries().await {
error!(target: LOG_TARGET, "Error when trimming message dedup cache: {:?}", err);
}
},
Expand Down Expand Up @@ -303,16 +303,16 @@ impl DhtActor {
MsgHashCacheInsert(hash, public_key, reply_tx) => {
let msg_hash_cache = self.msg_hash_dedup_cache.clone();
Box::pin(async move {
match msg_hash_cache.insert_body_hash_if_unique(hash, public_key).await {
Ok(already_exists) => {
let _ = reply_tx.send(already_exists).map_err(|_| DhtActorError::ReplyCanceled);
match msg_hash_cache.add_body_hash(hash, public_key).await {
Ok(hit_count) => {
let _ = reply_tx.send(hit_count);
},
Err(err) => {
warn!(
target: LOG_TARGET,
"Unable to update message dedup cache because {:?}", err
);
let _ = reply_tx.send(false).map_err(|_| DhtActorError::ReplyCanceled);
let _ = reply_tx.send(0);
},
}
Ok(())
Expand Down Expand Up @@ -690,11 +690,9 @@ mod test {
test_utils::{build_peer_manager, make_client_identity, make_node_identity},
};
use chrono::{DateTime, Utc};
use std::time::Duration;
use tari_comms::test_utils::mocks::{create_connectivity_mock, create_peer_connection_mock_pair};
use tari_shutdown::Shutdown;
use tari_test_utils::random;
use tokio::time::delay_for;

async fn db_connection() -> DbConnection {
let conn = DbConnection::connect_memory(random::string(8)).await.unwrap();
Expand Down Expand Up @@ -756,21 +754,21 @@ mod test {
actor.spawn();

let signature = vec![1u8, 2, 3];
let is_dup = requester
let num_hits = requester
.insert_message_hash(signature.clone(), CommsPublicKey::default())
.await
.unwrap();
assert!(!is_dup);
let is_dup = requester
assert_eq!(num_hits, 1);
let num_hits = requester
.insert_message_hash(signature, CommsPublicKey::default())
.await
.unwrap();
assert!(is_dup);
let is_dup = requester
assert_eq!(num_hits, 2);
let num_hits = requester
.insert_message_hash(Vec::new(), CommsPublicKey::default())
.await
.unwrap();
assert!(!is_dup);
assert_eq!(num_hits, 1);
}

#[tokio_macros::test_basic]
Expand All @@ -783,14 +781,12 @@ mod test {
let (actor_tx, actor_rx) = mpsc::channel(1);
let mut requester = DhtRequester::new(actor_tx);
let outbound_requester = OutboundMessageRequester::new(out_tx);
let mut shutdown = Shutdown::new();
let trim_interval_ms = 500;
let shutdown = Shutdown::new();
// Note: This must be equal or larger than the minimum dedup cache capacity for DedupCacheDatabase
let capacity = 120;
let capacity = 10;
let actor = DhtActor::new(
DhtConfig {
dedup_cache_capacity: capacity,
dedup_cache_trim_interval: Duration::from_millis(trim_interval_ms),
..Default::default()
},
db_connection().await,
Expand All @@ -803,63 +799,61 @@ mod test {
);

// Create signatures for double the dedup cache capacity
let mut signatures: Vec<Vec<u8>> = Vec::new();
for i in 0..(capacity * 2) {
signatures.push(vec![1u8, 2, i as u8])
}
let signatures = (0..(capacity * 2)).map(|i| vec![1u8, 2, i as u8]).collect::<Vec<_>>();

// Pre-populate the dedup cache; everything should be accepted due to cleanup ticker not active yet
// Pre-populate the dedup cache; everything should be accepted because the cleanup ticker has not run yet
for key in &signatures {
let is_dup = actor
let num_hits = actor
.msg_hash_dedup_cache
.insert_body_hash_if_unique(key.clone(), CommsPublicKey::default())
.add_body_hash(key.clone(), CommsPublicKey::default())
.await
.unwrap();
assert!(!is_dup);
assert_eq!(num_hits, 1);
}
// Try to re-insert all; everything should be marked as duplicates due to cleanup ticker not active yet
// Try to re-insert all; all hashes should have incremented their hit count
for key in &signatures {
let is_dup = actor
let num_hits = actor
.msg_hash_dedup_cache
.insert_body_hash_if_unique(key.clone(), CommsPublicKey::default())
.add_body_hash(key.clone(), CommsPublicKey::default())
.await
.unwrap();
assert!(is_dup);
assert_eq!(num_hits, 2);
}

// The cleanup ticker starts when the actor is spawned; the first cleanup event will fire immediately
let dedup_cache_db = actor.msg_hash_dedup_cache.clone();
// The cleanup ticker starts when the actor is spawned; the first cleanup event will fire fairly soon after the
// task is running on a thread. To remove this race condition, we trim the cache in the test.
dedup_cache_db.trim_entries().await.unwrap();
actor.spawn();

// Verify that the last half of the signatures are still present in the cache
for key in signatures.iter().take(capacity * 2).skip(capacity) {
let is_dup = requester
let num_hits = requester
.insert_message_hash(key.clone(), CommsPublicKey::default())
.await
.unwrap();
assert!(is_dup);
assert_eq!(num_hits, 3);
}
// Verify that the first half of the signatures have been removed and can be re-inserted into cache
for key in signatures.iter().take(capacity) {
let is_dup = requester
let num_hits = requester
.insert_message_hash(key.clone(), CommsPublicKey::default())
.await
.unwrap();
assert!(!is_dup);
assert_eq!(num_hits, 1);
}

// Let the trim period expire; this will trim the dedup cache to capacity
delay_for(Duration::from_millis(trim_interval_ms * 2)).await;
// Trim the database of excess entries
dedup_cache_db.trim_entries().await.unwrap();

// Verify that the last half of the signatures have been removed and can be re-inserted into cache
for key in signatures.iter().take(capacity * 2).skip(capacity) {
let is_dup = requester
let num_hits = requester
.insert_message_hash(key.clone(), CommsPublicKey::default())
.await
.unwrap();
assert!(!is_dup);
assert_eq!(num_hits, 1);
}

shutdown.trigger().unwrap();
}

#[tokio_macros::test_basic]
Expand Down
5 changes: 5 additions & 0 deletions comms/dht/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ impl DhtBuilder {
self
}

pub fn with_dedup_discard_hit_count(mut self, max_hit_count: usize) -> Self {
self.config.dedup_discard_hit_count = max_hit_count;
self
}

pub fn with_num_random_nodes(mut self, n: usize) -> Self {
self.config.num_random_nodes = n;
self
Expand Down
5 changes: 5 additions & 0 deletions comms/dht/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ pub struct DhtConfig {
/// The periodic trim interval for items in the message hash cache
/// Default: 300s (5 mins)
pub dedup_cache_trim_interval: Duration,
/// The number of hits a message is allowed before being discarded
/// Default: 3
pub dedup_discard_hit_count: usize,
/// The duration to wait for a peer discovery to complete before giving up.
/// Default: 2 minutes
pub discovery_request_timeout: Duration,
Expand Down Expand Up @@ -136,6 +139,7 @@ impl DhtConfig {

impl Default for DhtConfig {
fn default() -> Self {
// NB: please remember to update field comments to reflect these defaults
Self {
num_neighbouring_nodes: 8,
num_random_nodes: 4,
Expand All @@ -151,6 +155,7 @@ impl Default for DhtConfig {
saf_max_message_size: 512 * 1024,
dedup_cache_capacity: 2_500,
dedup_cache_trim_interval: Duration::from_secs(5 * 60),
dedup_discard_hit_count: 3,
database_url: DbConnectionUrl::Memory,
discovery_request_timeout: Duration::from_secs(2 * 60),
connectivity_update_interval: Duration::from_secs(2 * 60),
Expand Down
96 changes: 49 additions & 47 deletions comms/dht/src/dedup/dedup_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,23 @@ use crate::{
schema::dedup_cache,
storage::{DbConnection, StorageError},
};
use chrono::Utc;
use chrono::{NaiveDateTime, Utc};
use diesel::{dsl, result::DatabaseErrorKind, ExpressionMethods, QueryDsl, RunQueryDsl};
use log::*;
use tari_comms::types::CommsPublicKey;
use tari_crypto::tari_utilities::{hex::Hex, ByteArray};
use tari_utilities::hex;
use tari_crypto::tari_utilities::hex::Hex;

const LOG_TARGET: &str = "comms::dht::dedup_cache";

#[derive(Queryable, PartialEq, Debug)]
struct DedupCacheEntry {
body_hash: String,
sender_public_ke: String,
number_of_hit: i32,
stored_at: NaiveDateTime,
last_hit_at: NaiveDateTime,
}

#[derive(Clone)]
pub struct DedupCacheDatabase {
connection: DbConnection,
Expand All @@ -48,36 +56,24 @@ impl DedupCacheDatabase {
Self { connection, capacity }
}

/// Inserts and returns Ok(true) if the item already existed and Ok(false) if it didn't, also updating hit stats
pub async fn insert_body_hash_if_unique(
&self,
body_hash: Vec<u8>,
public_key: CommsPublicKey,
) -> Result<bool, StorageError> {
let body_hash = hex::to_hex(&body_hash.as_bytes());
let public_key = public_key.to_hex();
match self
.insert_body_hash_or_update_stats(body_hash.clone(), public_key.clone())
.await
{
Ok(val) => {
if val == 0 {
warn!(
target: LOG_TARGET,
"Unable to insert new entry into message dedup cache"
);
}
Ok(false)
},
Err(e) => match e {
StorageError::UniqueViolation(_) => Ok(true),
_ => Err(e),
},
/// Adds the body hash to the cache, returning the number of hits (inclusive) that have been recorded for this body
/// hash
pub async fn add_body_hash(&self, body_hash: Vec<u8>, public_key: CommsPublicKey) -> Result<u32, StorageError> {
let hit_count = self
.insert_body_hash_or_update_stats(body_hash.to_hex(), public_key.to_hex())
.await?;

if hit_count == 0 {
warn!(
target: LOG_TARGET,
"Unable to insert new entry into message dedup cache"
);
}
Ok(hit_count)
}

/// Trims the dedup cache to the configured limit by removing the oldest entries
pub async fn truncate(&self) -> Result<usize, StorageError> {
pub async fn trim_entries(&self) -> Result<usize, StorageError> {
let capacity = self.capacity;
self.connection
.with_connection_async(move |conn| {
Expand Down Expand Up @@ -109,40 +105,46 @@ impl DedupCacheDatabase {
.await
}

// Insert new row into the table or update existing row in an atomic fashion; more than one thread can access this
// table at the same time.
/// Insert new row into the table or updates an existing row. Returns the number of hits for this body hash.
async fn insert_body_hash_or_update_stats(
&self,
body_hash: String,
public_key: String,
) -> Result<usize, StorageError> {
) -> Result<u32, StorageError> {
self.connection
.with_connection_async(move |conn| {
let insert_result = diesel::insert_into(dedup_cache::table)
.values((
dedup_cache::body_hash.eq(body_hash.clone()),
dedup_cache::sender_public_key.eq(public_key.clone()),
dedup_cache::body_hash.eq(&body_hash),
dedup_cache::sender_public_key.eq(&public_key),
dedup_cache::number_of_hits.eq(1),
dedup_cache::last_hit_at.eq(Utc::now().naive_utc()),
))
.execute(conn);
match insert_result {
Ok(val) => Ok(val),
Ok(1) => Ok(1),
Ok(n) => Err(StorageError::UnexpectedResult(format!(
"Expected exactly one row to be inserted. Got {}",
n
))),
Err(diesel::result::Error::DatabaseError(kind, e_info)) => match kind {
DatabaseErrorKind::UniqueViolation => {
// Update hit stats for the message
let result =
diesel::update(dedup_cache::table.filter(dedup_cache::body_hash.eq(&body_hash)))
.set((
dedup_cache::sender_public_key.eq(public_key),
dedup_cache::number_of_hits.eq(dedup_cache::number_of_hits + 1),
dedup_cache::last_hit_at.eq(Utc::now().naive_utc()),
))
.execute(conn);
match result {
Ok(_) => Err(StorageError::UniqueViolation(body_hash)),
Err(e) => Err(e.into()),
}
diesel::update(dedup_cache::table.filter(dedup_cache::body_hash.eq(&body_hash)))
.set((
dedup_cache::sender_public_key.eq(&public_key),
dedup_cache::number_of_hits.eq(dedup_cache::number_of_hits + 1),
dedup_cache::last_hit_at.eq(Utc::now().naive_utc()),
))
.execute(conn)?;
// TODO: Diesel support for RETURNING statements would remove this query, but is not
// available for Diesel + SQLite yet
let hits = dedup_cache::table
.select(dedup_cache::number_of_hits)
.filter(dedup_cache::body_hash.eq(&body_hash))
.get_result::<i32>(conn)?;

Ok(hits as u32)
},
_ => Err(diesel::result::Error::DatabaseError(kind, e_info).into()),
},
Expand Down
Loading

0 comments on commit 3cc2399

Please sign in to comment.