Skip to content

Commit

Permalink
Reduce number of messages sent when propagating
Browse files Browse the repository at this point in the history
This is a stop gap fix - the code will be simplified by
ConnectivityManager.

- Added `propagation_factor` (default: 0.5) to indicate what percentage
  of neighbours should be propagated to
- If the destination is given, send to at most `num_neighbours *
  propagation_factor` nodes unless nodes are further away from the
  destination
- If destination is Unknown, send to a random set of peers (either
  connect to a random set of peers or if we have enough peers,
  select randomly from peers that are already connected)
- Join messages are propagated with a reduced number of messages
- Updated memorynet to test network wide message propagation
  • Loading branch information
sdbondi committed May 13, 2020
1 parent d8edb8e commit 0f80272
Show file tree
Hide file tree
Showing 4 changed files with 281 additions and 127 deletions.
202 changes: 150 additions & 52 deletions comms/dht/examples/memorynet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
//! `RUST_BACKTRACE=1 RUST_LOG=trace cargo run --example memorynet 2> /tmp/debug.log`
// Size of network
const NUM_NODES: usize = 39;
const NUM_NODES: usize = 40;
// Must be at least 2
const NUM_WALLETS: usize = 8;
const NUM_WALLETS: usize = 6;
const QUIET_MODE: bool = true;

mod memory_net;
Expand Down Expand Up @@ -71,14 +71,15 @@ use tari_comms::{
};
use tari_comms_dht::{
domain_message::OutboundDomainMessage,
envelope::NodeDestination,
inbound::DecryptedDhtMessage,
outbound::OutboundEncryption,
Dht,
DhtBuilder,
};
use tari_storage::{lmdb_store::LMDBBuilder, LMDBWrapper};
use tari_test_utils::{paths::create_temporary_data_path, random};
use tokio::{runtime, time};
use tokio::{runtime, task, time};
use tower::ServiceBuilder;

type MessagingEventRx = mpsc::UnboundedReceiver<(NodeId, NodeId)>;
Expand Down Expand Up @@ -242,6 +243,10 @@ async fn main() {
// Put the wallet back
wallets.push(random_wallet);

do_network_wide_propagation(&mut nodes).await;

total_messages += drain_messaging_events(&mut messaging_events_rx, false).await;

println!("{} messages sent in total across the network", total_messages);

network_peer_list_stats(&nodes, &wallets).await;
Expand Down Expand Up @@ -419,16 +424,101 @@ async fn network_connectivity_stats(nodes: &[TestNode], wallets: &[TestNode]) {
);
}

async fn do_network_wide_propagation(nodes: &mut [TestNode]) {
let random_node = &nodes[OsRng.gen_range(0, nodes.len() - 1)];
let random_node_id = random_node.comms.node_identity().node_id().clone();
const PUBLIC_MESSAGE: &str = "This is something you're all interested in!";

banner!("🌎 {} is going to broadcast a message to the network", random_node);
random_node
.dht
.outbound_requester()
.broadcast(
NodeDestination::Unknown,
OutboundEncryption::None,
vec![],
OutboundDomainMessage::new(0i32, PUBLIC_MESSAGE.to_string()),
)
.await
.unwrap();

// Spawn task for each peer that will read the message and propagate it on
let tasks = nodes
.into_iter()
.filter(|n| n.comms.node_identity().node_id() != &random_node_id)
.enumerate()
.map(|(idx, node)| {
let mut outbound_req = node.dht.outbound_requester();
let mut ims_rx = node.ims_rx.take().unwrap();
let start = Instant::now();
let node_name = node.name.clone();

task::spawn(async move {
let result = time::timeout(Duration::from_secs(5), ims_rx.next()).await;
let mut is_success = false;
match result {
Ok(Some(msg)) => {
let public_msg = msg
.decryption_result
.unwrap()
.decode_part::<String>(1)
.unwrap()
.unwrap();
println!("📬 {} got public message '{}'", node_name, public_msg);
is_success = true;
let sent_state = outbound_req
.propagate(
NodeDestination::Unknown,
OutboundEncryption::None,
vec![msg.source_peer.node_id.clone()],
OutboundDomainMessage::new(0i32, public_msg),
)
.await
.unwrap();
let states = sent_state.resolve_ok().await.unwrap();
println!("🦠 {} propagated to {} peer(s)", node_name, states.len());
},
Err(_) | Ok(None) => {
banner!(
"💩 {} failed to receive network message after {}ms",
node_name,
start.elapsed().as_millis(),
);
},
}

(idx, ims_rx, is_success)
})
});

// Put the ims_rxs back
let ims_rxs = future::join_all(tasks).await;
let mut num_successes = 0;
for ims in ims_rxs {
let (idx, ims_rx, is_success) = ims.unwrap();
nodes[idx].ims_rx = Some(ims_rx);
if is_success {
num_successes += 1;
}
}

banner!(
"🙌 Finished propagation test. {} out of {} nodes received the message",
num_successes,
nodes.len() - 1
);
}

async fn do_store_and_forward_message_propagation(
wallet: TestNode,
wallets: &[TestNode],
messaging_tx: MessagingEventTx,
messaging_rx: &mut MessagingEventRx,
) -> (usize, TestNode)
{
println!(
"{} chosen at random to be receive a message from {} using store and forward",
wallet, wallets[0]
banner!(
"{} chosen at random to be receive messages from other nodes using store and forward",
wallet,
);
let wallets_peers = wallet.comms.peer_manager().all().await.unwrap();
let node_identity = wallet.comms.node_identity().clone();
Expand All @@ -437,28 +527,28 @@ async fn do_store_and_forward_message_propagation(
wallet.comms.shutdown().await;

banner!(
"🌎 {} ({}) is going to attempt to discover {} ({})",
wallets[0],
wallets[0].comms.node_identity().public_key(),
"🎤 All other wallets are going to attempt to broadcast messages to {} ({})",
get_name(node_identity.node_id()),
node_identity.public_key(),
);
let secret_message = format!("My name is wiki wiki {}", wallets[0]);

let start = Instant::now();
wallets[0]
.dht
.outbound_requester()
.broadcast(
node_identity.node_id().clone().into(),
OutboundEncryption::EncryptFor(Box::new(node_identity.public_key().clone())),
vec![],
OutboundDomainMessage::new(123i32, secret_message.clone()),
)
.await
.unwrap();
for wallet in wallets {
let secret_message = format!("My name is wiki wiki {}", wallet);
wallet
.dht
.outbound_requester()
.broadcast(
node_identity.node_id().clone().into(),
OutboundEncryption::EncryptFor(Box::new(node_identity.public_key().clone())),
vec![],
OutboundDomainMessage::new(123i32, secret_message.clone()),
)
.await
.unwrap();
}

println!("Waiting a few seconds for message to propagate around the network...");
banner!("Waiting a few seconds for messages to propagate around the network...");
time::delay_for(Duration::from_secs(5)).await;

let mut total_messages = drain_messaging_events(messaging_rx, false).await;
Expand All @@ -475,33 +565,41 @@ async fn do_store_and_forward_message_propagation(
.await
.unwrap();

let result = time::timeout(Duration::from_secs(20), wallet.ims_rx.next()).await;
total_messages += match result {
Ok(msg) => {
let msg = msg.unwrap();
let secret_msg = msg
.decryption_result
.unwrap()
.decode_part::<String>(1)
.unwrap()
.unwrap();
banner!(
"🎉 Wallet {} received propagated message '{}' from store and forward in {}ms",
wallet,
secret_msg,
start.elapsed().as_millis()
);
drain_messaging_events(messaging_rx, false).await
},
Err(err) => {
banner!(
"💩 Failed to receive message after {}ms using store and forward '{:?}'",
start.elapsed().as_millis(),
err
);
drain_messaging_events(messaging_rx, true).await
},
};
let mut num_msgs = 0;
loop {
let result = time::timeout(Duration::from_secs(20), wallet.ims_rx.as_mut().unwrap().next()).await;
num_msgs += 1;
match result {
Ok(msg) => {
let msg = msg.unwrap();
let secret_msg = msg
.decryption_result
.unwrap()
.decode_part::<String>(1)
.unwrap()
.unwrap();
banner!(
"🎉 Wallet {} received propagated message '{}' from store and forward in {}ms",
wallet,
secret_msg,
start.elapsed().as_millis()
);
},
Err(err) => {
banner!(
"💩 Failed to receive message after {}ms using store and forward '{:?}'",
start.elapsed().as_millis(),
err
);
},
};

if num_msgs == wallets.len() {
break;
}
}

total_messages += drain_messaging_events(messaging_rx, false).await;

(total_messages, wallet)
}
Expand Down Expand Up @@ -609,7 +707,7 @@ struct TestNode {
seed_peer: Option<Peer>,
dht: Dht,
conn_man_events_rx: mpsc::Receiver<Arc<ConnectionManagerEvent>>,
ims_rx: mpsc::Receiver<DecryptedDhtMessage>,
ims_rx: Option<mpsc::Receiver<DecryptedDhtMessage>>,
}

impl TestNode {
Expand All @@ -632,7 +730,7 @@ impl TestNode {
seed_peer,
comms,
dht,
ims_rx,
ims_rx: Some(ims_rx),
conn_man_events_rx: events_rx,
}
}
Expand Down Expand Up @@ -817,5 +915,5 @@ async fn setup_comms_dht(

async fn take_a_break() {
banner!("Taking a break for a few seconds to let things settle...");
time::delay_for(Duration::from_millis(NUM_NODES as u64 * 300)).await;
time::delay_for(Duration::from_millis(NUM_NODES as u64 * 100)).await;
}
Loading

0 comments on commit 0f80272

Please sign in to comment.