Skip to content

Commit

Permalink
Increase base node wallet inbound pubsub channel size (#1932)
Browse files Browse the repository at this point in the history
Merge pull request #1932

`pubsub` channel increased to facilitate multiple transactions being sent between two base node
wallets without dropping messages, as fast as the network can handle it

* pull/1932/head:
  `pubsub` channel increased to facilitate multiple transactions being sent between two base node wallets without dropping messages, as fast as the network can handle it
  • Loading branch information
sdbondi committed Jun 4, 2020
2 parents ee8d0ac + 57ef437 commit fc1779e
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 3 deletions.
2 changes: 1 addition & 1 deletion applications/tari_base_node/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ where
debug!(target: LOG_TARGET, "Base node service registration complete.");

//---------------------------------- Wallet --------------------------------------------//
let (publisher, wallet_subscriptions) = pubsub_connector(handle.clone(), 100, 2);
let (publisher, wallet_subscriptions) = pubsub_connector(handle.clone(), 1000, 2);
let wallet_subscriptions = Arc::new(wallet_subscriptions);
create_peer_db_folder(&config.wallet_peer_db_path)?;
let (wallet_comms, wallet_dht) = setup_wallet_comms(
Expand Down
9 changes: 8 additions & 1 deletion base_layer/p2p/src/comms_connector/inbound_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@

use super::peer_message::PeerMessage;
use futures::{task::Context, Future, Sink, SinkExt};
use log::*;
use std::{error::Error, pin::Pin, sync::Arc, task::Poll};
use tari_comms::pipeline::PipelineError;
use tari_comms_dht::{domain_message::MessageHeader, inbound::DecryptedDhtMessage};
use tower::Service;

const LOG_TARGET: &str = "comms::middleware::inbound_connector";
/// This service receives DecryptedDhtMessage, deserializes the MessageHeader and
/// sends a `PeerMessage` on the given sink.
#[derive(Clone)]
Expand Down Expand Up @@ -99,7 +101,12 @@ impl<TSink> InboundDomainConnector<TSink> {
dht_header,
body: msg_bytes,
};

trace!(
target: LOG_TARGET,
"Forwarding message {:?} to pubsub, Trace: {}",
inbound_message.tag,
&peer_message.dht_header.message_tag
);
Ok(peer_message)
}
}
Expand Down
1 change: 1 addition & 0 deletions base_layer/p2p/src/comms_connector/peer_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use tari_comms_dht::{domain_message::MessageHeader, envelope::DhtMessageHeader};
const LOG_TARGET: &str = "comms::dht::requests::inbound";

/// A domain-level message
#[derive(Debug)]
pub struct PeerMessage {
/// The message envelope header
pub dht_header: DhtMessageHeader,
Expand Down
11 changes: 10 additions & 1 deletion base_layer/p2p/src/comms_connector/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,16 @@ pub fn pubsub_connector(
// Map DomainMessage into a TopicPayload
.map(|msg: Arc<PeerMessage>| {
TariMessageType::from_i32(msg.message_header.message_type)
.map(|msg_type| TopicPayload::new(msg_type, msg))
.map(|msg_type| {
let message_tag_trace = msg.dht_header.message_tag;
let payload = TopicPayload::new(msg_type, msg);
trace!(
target: LOG_TARGET,
"Created topic payload message {:?}, Trace: {}",
&payload.topic(), message_tag_trace
);
payload
})
.ok_or_else(|| "Invalid or unrecognised Tari message type".to_string())
})
// Forward TopicPayloads to the publisher
Expand Down

0 comments on commit fc1779e

Please sign in to comment.