Skip to content

Commit

Permalink
Merge branch 'development' into ho_fix_dropped_direct_transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
hansieodendaal authored Jun 4, 2020
2 parents 616079c + d9b6dd9 commit 57ef437
Show file tree
Hide file tree
Showing 59 changed files with 1,445 additions and 1,238 deletions.
2 changes: 1 addition & 1 deletion applications/tari_base_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ tari_service_framework = { version = "^0.0", path = "../../base_layer/service_fr
tari_shutdown = { path = "../../infrastructure/shutdown", version = "^0.0" }
tari_mmr = { path = "../../base_layer/mmr", version = "^0.1" }
tari_wallet = { path = "../../base_layer/wallet", version = "^0.1" }
tari_broadcast_channel = "^0.1"
tari_broadcast_channel = "^0.2"
tari_crypto = { version = "^0.3" }

structopt = { version = "0.3.13", default_features = false }
Expand Down
4 changes: 2 additions & 2 deletions applications/tari_base_node/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ where

//---------------------------------- Base Node --------------------------------------------//

let (publisher, base_node_subscriptions) = pubsub_connector(handle.clone(), 100);
let (publisher, base_node_subscriptions) = pubsub_connector(handle.clone(), 100, 1);
let base_node_subscriptions = Arc::new(base_node_subscriptions);
create_peer_db_folder(&config.peer_db_path)?;
let (base_node_comms, base_node_dht) = setup_base_node_comms(base_node_identity, config, publisher).await?;
Expand All @@ -515,7 +515,7 @@ where
debug!(target: LOG_TARGET, "Base node service registration complete.");

//---------------------------------- Wallet --------------------------------------------//
let (publisher, wallet_subscriptions) = pubsub_connector(handle.clone(), 1000);
let (publisher, wallet_subscriptions) = pubsub_connector(handle.clone(), 1000, 2);
let wallet_subscriptions = Arc::new(wallet_subscriptions);
create_peer_db_folder(&config.wallet_peer_db_path)?;
let (wallet_comms, wallet_dht) = setup_wallet_comms(
Expand Down
4 changes: 2 additions & 2 deletions base_layer/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ tari_common = { version= "^0.1", path = "../../common"}
tari_service_framework = { version = "^0.0", path = "../service_framework"}
tari_p2p = { version = "^0.1", path = "../../base_layer/p2p" }
tari_comms_dht = { version = "^0.1", path = "../../comms/dht"}
tari_broadcast_channel = "^0.1"
tari_pubsub = "^0.1"
tari_broadcast_channel = "^0.2"
tari_pubsub = "^0.2"
tari_shutdown = { version = "^0.0", path = "../../infrastructure/shutdown" }
tari_mmr = { version = "^0.1", path = "../../base_layer/mmr", optional = true }
randomx-rs = { version = "0.2.1", optional = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl ServiceInitializer for ChainMetadataServiceInitializer {
shutdown: ShutdownSignal,
) -> Self::Future
{
let (publisher, subscriber) = broadcast_channel::bounded(BROADCAST_EVENT_BUFFER_SIZE);
let (publisher, subscriber) = broadcast_channel::bounded(BROADCAST_EVENT_BUFFER_SIZE, 5);
let handle = ChainMetadataHandle::new(subscriber);
handles_fut.register(handle);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ mod test {

let (base_node, mut base_node_receiver) = create_base_node_nci();

let (publisher, _subscriber) = broadcast_channel::bounded(1);
let (publisher, _subscriber) = broadcast_channel::bounded(1, 106);
let mut service = ChainMetadataService::new(liveness_handle, base_node, publisher);

let mut proto_chain_metadata = create_sample_proto_chain_metadata();
Expand Down Expand Up @@ -347,7 +347,7 @@ mod test {

let (base_node, _) = create_base_node_nci();

let (publisher, _subscriber) = broadcast_channel::bounded(1);
let (publisher, _subscriber) = broadcast_channel::bounded(1, 107);
let mut service = ChainMetadataService::new(liveness_handle, base_node, publisher);

// To prevent the chain metadata buffer being flushed after receiving a single pong event,
Expand Down Expand Up @@ -377,7 +377,7 @@ mod test {
};

let (base_node, _) = create_base_node_nci();
let (publisher, _subscriber) = broadcast_channel::bounded(1);
let (publisher, _subscriber) = broadcast_channel::bounded(1, 108);
let mut service = ChainMetadataService::new(liveness_handle, base_node, publisher);

let sample_event = LivenessEvent::ReceivedPong(Box::new(pong_event));
Expand All @@ -400,7 +400,7 @@ mod test {
};

let (base_node, _) = create_base_node_nci();
let (publisher, _subscriber) = broadcast_channel::bounded(1);
let (publisher, _subscriber) = broadcast_channel::bounded(1, 109);
let mut service = ChainMetadataService::new(liveness_handle, base_node, publisher);

let sample_event = LivenessEvent::ReceivedPong(Box::new(pong_event));
Expand Down
4 changes: 2 additions & 2 deletions base_layer/core/src/base_node/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ impl<B: BlockchainBackend + 'static> BaseNodeStateMachine<B> {
shutdown_signal: ShutdownSignal,
) -> Self
{
let (event_sender, event_receiver): (Publisher<_>, Subscriber<_>) = bounded(10);
let (status_event_publisher, status_event_subscriber): (Publisher<_>, Subscriber<_>) = bounded(1); // only latest update is important
let (event_sender, event_receiver): (Publisher<_>, Subscriber<_>) = bounded(10, 3);
let (status_event_publisher, status_event_subscriber): (Publisher<_>, Subscriber<_>) = bounded(1, 4); // only latest update is important
Self {
db: db.clone(),
local_node_interface: local_node_interface.clone(),
Expand Down
3 changes: 1 addition & 2 deletions base_layer/core/src/base_node/states/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ async fn exclude_sync_peer(sync_peers: &mut Vec<NodeId>, sync_peer: NodeId) -> R
Ok(())
}

// Ban and disconnect the provided sync peer.
// Ban and disconnect the provided sync peer if this node is online
async fn ban_sync_peer_if_online<B: BlockchainBackend + 'static>(
shared: &mut BaseNodeStateMachine<B>,
sync_peers: &mut Vec<NodeId>,
Expand All @@ -746,7 +746,6 @@ async fn ban_sync_peer<B: BlockchainBackend + 'static>(
) -> Result<(), BlockSyncError>
{
info!(target: LOG_TARGET, "Banning peer {} from local node.", sync_peer);
sync_peers.retain(|p| *p != sync_peer);
shared.connectivity.ban_peer(sync_peer.clone(), ban_duration).await?;
exclude_sync_peer(sync_peers, sync_peer).await
}
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/mempool/service/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ where T: BlockchainBackend + 'static
let (outbound_tx_sender_service, outbound_tx_stream) = futures_mpsc_channel_unbounded();
let (outbound_request_sender_service, outbound_request_stream) = reply_channel::unbounded();
let (local_request_sender_service, local_request_stream) = reply_channel::unbounded();
let (mempool_state_event_publisher, mempool_state_event_subscriber) = bounded(100);
let (mempool_state_event_publisher, mempool_state_event_subscriber) = bounded(100, 6);
let outbound_mp_interface =
OutboundMempoolServiceInterface::new(outbound_request_sender_service, outbound_tx_sender_service);
let local_mp_interface = LocalMempoolService::new(local_request_sender_service, mempool_state_event_subscriber);
Expand Down
4 changes: 2 additions & 2 deletions base_layer/core/src/mempool/service/local_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ mod test {

#[tokio_macros::test]
async fn mempool_stats() {
let (_, event_subscriber) = bounded(100);
let (_, event_subscriber) = bounded(100, 110);
let (tx, rx) = unbounded();
let mut service = LocalMempoolService::new(tx, event_subscriber);
task::spawn(mock_handler(rx));
Expand All @@ -140,7 +140,7 @@ mod test {

#[tokio_macros::test]
async fn mempool_stats_from_multiple() {
let (_, event_subscriber) = bounded(100);
let (_, event_subscriber) = bounded(100, 111);
let (tx, rx) = unbounded();
let mut service = LocalMempoolService::new(tx, event_subscriber);
let mut service2 = service.clone();
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/tests/helpers/chain_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct MockChainMetadata {

impl MockChainMetadata {
pub fn new() -> Self {
let (publisher, subscriber) = bounded(10);
let (publisher, subscriber) = bounded(10, 114);
Self { publisher, subscriber }
}

Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/tests/helpers/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ fn setup_base_node_services(
CommsNode,
)
{
let (publisher, subscription_factory) = pubsub_connector(runtime.handle().clone(), 100);
let (publisher, subscription_factory) = pubsub_connector(runtime.handle().clone(), 100, 104);
let subscription_factory = Arc::new(subscription_factory);
let (comms, dht) = runtime.block_on(setup_comms_services(node_identity, peers, publisher, data_path));

Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/tests/mining.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ fn mining() {
let shutdown = Shutdown::new();
let mut miner = Miner::new(shutdown.to_signal(), consensus_manager, &alice_node.local_nci, 1);
miner.enable_mining_flag().store(true, Ordering::Relaxed);
let (mut state_event_sender, state_event_receiver): (Publisher<_>, Subscriber<_>) = bounded(1);
let (mut state_event_sender, state_event_receiver): (Publisher<_>, Subscriber<_>) = bounded(1, 112);
miner.subscribe_to_node_state_events(state_event_receiver);
miner.subscribe_to_mempool_state_events(alice_node.local_mp_interface.get_mempool_state_event_stream());
let miner_utxo_stream = miner.get_utxo_receiver_channel().fuse();
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/tests/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ fn wallet_base_node_integration_test() {
let mut shutdown = Shutdown::new();
let mut miner = Miner::new(shutdown.to_signal(), consensus_manager, &base_node.local_nci, 1);
miner.enable_mining_flag().store(true, Ordering::Relaxed);
let (mut state_event_sender, state_event_receiver): (Publisher<_>, Subscriber<_>) = bounded(1);
let (mut state_event_sender, state_event_receiver): (Publisher<_>, Subscriber<_>) = bounded(1, 113);
miner.subscribe_to_node_state_events(state_event_receiver);
miner.subscribe_to_mempool_state_events(base_node.local_mp_interface.get_mempool_state_event_stream());
let miner_utxo_stream = miner.get_utxo_receiver_channel().fuse();
Expand Down
6 changes: 6 additions & 0 deletions base_layer/mmr/src/merkle_checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ impl MerkleCheckPoint {
self.prev_accumulated_nodes_added_count + self.nodes_added.len() as u32
}

/// Merge the provided Merkle checkpoint into the current checkpoint.
pub fn append(&mut self, mut cp: MerkleCheckPoint) {
self.nodes_added.append(&mut cp.nodes_added);
self.nodes_deleted.or_inplace(&cp.nodes_deleted);
}

/// Break a checkpoint up into its constituent parts
pub fn into_parts(self) -> (Vec<Hash>, Bitmap) {
(self.nodes_added, self.nodes_deleted)
Expand Down
10 changes: 10 additions & 0 deletions base_layer/mmr/src/mmr_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,16 @@ where
Ok(())
}

/// Inform the MmrCache that the first N checkpoints have been merged to allow the base and current indices to be
/// updated.
pub fn checkpoints_merged(&mut self, num_merged: usize) -> Result<(), MerkleMountainRangeError> {
if let Some(num_reverse) = num_merged.checked_sub(1) {
self.base_cp_index = self.base_cp_index.saturating_sub(num_reverse);
self.curr_cp_index = self.curr_cp_index.saturating_sub(num_reverse);
}
self.update()
}

/// This function updates the state of the MMR cache based on the current state of the shared checkpoints.
pub fn update(&mut self) -> Result<(), MerkleMountainRangeError> {
let cp_count = self
Expand Down
73 changes: 73 additions & 0 deletions base_layer/mmr/tests/mmr_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,76 @@ fn multiple_rewinds() {
assert!(mmr_cache.update().is_ok());
assert_eq!(mmr_cache.get_mmr_only_root(), Ok(combine_hashes(&[&h1h2]).clone()));
}

#[test]
fn checkpoint_merging() {
let config = MmrCacheConfig { rewind_hist_len: 2 };
let mut checkpoint_db = MemBackendVec::<MerkleCheckPoint>::new();
let mut mmr_cache = MmrCache::<Hasher, _, _>::new(Vec::new(), checkpoint_db.clone(), config).unwrap();

let h1 = int_to_hash(1);
let h2 = int_to_hash(2);
let h3 = int_to_hash(3);
let h4 = int_to_hash(4);
let h5 = int_to_hash(5);
let h6 = int_to_hash(6);
let ha = combine_hashes(&[&h1, &h2]);
let hb = combine_hashes(&[&h3, &h4]);
let hc = combine_hashes(&[&h5, &h6]);
let hahb = combine_hashes(&[&ha, &hb]);
let cp4_mmr_only_root = combine_hashes(&[&hahb]);
let cp6_mmr_only_root = combine_hashes(&[&hahb, &hc]);
let cp1 = MerkleCheckPoint::new(vec![h1.clone()], Bitmap::create(), 0);
let cp2 = MerkleCheckPoint::new(vec![h2.clone()], Bitmap::create(), 0);
let cp3 = MerkleCheckPoint::new(vec![h3.clone()], Bitmap::create(), 0);
let cp4 = MerkleCheckPoint::new(vec![h4.clone()], Bitmap::create(), 0);
let cp5 = MerkleCheckPoint::new(vec![h5.clone()], Bitmap::create(), 0);
let cp6 = MerkleCheckPoint::new(vec![h6.clone()], Bitmap::create(), 0);

checkpoint_db.push(cp1).unwrap();
assert!(mmr_cache.update().is_ok());
checkpoint_db.push(cp2).unwrap();
assert!(mmr_cache.update().is_ok());
checkpoint_db.push(cp3).unwrap();
assert!(mmr_cache.update().is_ok());
checkpoint_db.push(cp4).unwrap();
assert!(mmr_cache.update().is_ok());
assert_eq!(mmr_cache.get_mmr_only_root(), Ok(cp4_mmr_only_root.clone()));

let mut merged_cp = checkpoint_db.get(0).unwrap().unwrap();
merged_cp.append(checkpoint_db.get(1).unwrap().unwrap());
assert!(checkpoint_db.shift(2).is_ok());
assert!(checkpoint_db.push_front(merged_cp).is_ok());
assert_eq!(checkpoint_db.len(), Ok(3));
assert!(mmr_cache.checkpoints_merged(2).is_ok());
assert_eq!(mmr_cache.get_mmr_only_root(), Ok(cp4_mmr_only_root.clone()));

checkpoint_db.push(cp5).unwrap();
assert!(mmr_cache.update().is_ok());
checkpoint_db.push(cp6).unwrap();
assert!(mmr_cache.update().is_ok());
assert_eq!(mmr_cache.get_mmr_only_root(), Ok(cp6_mmr_only_root.clone()));

let mut merged_cp = checkpoint_db.get(0).unwrap().unwrap();
merged_cp.append(checkpoint_db.get(1).unwrap().unwrap());
merged_cp.append(checkpoint_db.get(2).unwrap().unwrap());
assert!(checkpoint_db.shift(3).is_ok());
assert!(checkpoint_db.push_front(merged_cp).is_ok());
assert_eq!(checkpoint_db.len(), Ok(3));
assert!(mmr_cache.checkpoints_merged(3).is_ok());
assert_eq!(mmr_cache.get_mmr_only_root(), Ok(cp6_mmr_only_root.clone()));

// Recreate the MmrCache from the altered checkpoints
let mut mmr_cache = MmrCache::<Hasher, _, _>::new(Vec::new(), checkpoint_db.clone(), config).unwrap();
assert_eq!(mmr_cache.get_mmr_only_root(), Ok(cp6_mmr_only_root.clone()));

// Replace all existing checkpoints with a single accumulated checkpoint
let mut merged_cp = checkpoint_db.get(0).unwrap().unwrap();
merged_cp.append(checkpoint_db.get(1).unwrap().unwrap());
merged_cp.append(checkpoint_db.get(2).unwrap().unwrap());
assert!(checkpoint_db.shift(3).is_ok());
assert!(checkpoint_db.push_front(merged_cp).is_ok());
assert_eq!(checkpoint_db.len(), Ok(1));
assert!(mmr_cache.checkpoints_merged(3).is_ok());
assert_eq!(mmr_cache.get_mmr_only_root(), Ok(cp6_mmr_only_root.clone()));
}
4 changes: 2 additions & 2 deletions base_layer/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ edition = "2018"
test-mocks = []

[dependencies]
tari_broadcast_channel = "^0.1"
tari_broadcast_channel = "^0.2"
tari_comms = { version = "^0.1", path = "../../comms"}
tari_comms_dht = { version = "^0.1", path = "../../comms/dht"}
tari_crypto = { version = "^0.3" }
tari_pubsub = "^0.1"
tari_pubsub = "^0.2"
tari_service_framework = { version = "^0.0", path = "../service_framework"}
tari_shutdown = { version = "^0.0", path="../../infrastructure/shutdown" }
tari_storage = {version = "^0.1", path = "../../infrastructure/storage"}
Expand Down
2 changes: 1 addition & 1 deletion base_layer/p2p/examples/pingpong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ mod pingpong {

let datastore_path = TempDir::new(random_string(8).as_str()).unwrap();

let (publisher, subscription_factory) = pubsub_connector(rt.handle().clone(), 100);
let (publisher, subscription_factory) = pubsub_connector(rt.handle().clone(), 100, 105);
let subscription_factory = Arc::new(subscription_factory);

let transport_type = if is_tor_enabled {
Expand Down
11 changes: 8 additions & 3 deletions base_layer/p2p/src/comms_connector/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{comms_connector::InboundDomainConnector, tari_message::TariMessageTy
use futures::{channel::mpsc, FutureExt, SinkExt, StreamExt};
use log::*;
use std::sync::Arc;
use tari_pubsub::{pubsub_channel, TopicPayload, TopicSubscriptionFactory};
use tari_pubsub::{pubsub_channel_with_id, TopicPayload, TopicSubscriptionFactory};
use tokio::runtime::Handle;

const LOG_TARGET: &str = "comms::middleware::pubsub";
Expand All @@ -35,8 +35,13 @@ pub type PubsubDomainConnector = InboundDomainConnector<mpsc::Sender<Arc<PeerMes
pub type SubscriptionFactory = TopicSubscriptionFactory<TariMessageType, Arc<PeerMessage>>;

/// Connects `InboundDomainConnector` to a `tari_pubsub::TopicPublisher` through a buffered channel
pub fn pubsub_connector(executor: Handle, buf_size: usize) -> (PubsubDomainConnector, SubscriptionFactory) {
let (publisher, subscription_factory) = pubsub_channel(buf_size);
pub fn pubsub_connector(
executor: Handle,
buf_size: usize,
buf_id: usize,
) -> (PubsubDomainConnector, SubscriptionFactory)
{
let (publisher, subscription_factory) = pubsub_channel_with_id(buf_size, buf_id);
let (sender, receiver) = mpsc::channel(buf_size);

// Spawn a task which forwards messages from the pubsub service to the TopicPublisher
Expand Down
1 change: 1 addition & 0 deletions base_layer/p2p/src/initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ where
.take(8)
.collect::<String>()
};
std::fs::create_dir_all(data_path).unwrap();
let datastore = LMDBBuilder::new()
.set_path(data_path)
.set_environment_size(50)
Expand Down
2 changes: 1 addition & 1 deletion base_layer/p2p/tests/services/liveness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub async fn setup_liveness_service(
) -> (LivenessHandle, CommsNode, Dht)
{
let rt_handle = runtime::Handle::current();
let (publisher, subscription_factory) = pubsub_connector(rt_handle.clone(), 100);
let (publisher, subscription_factory) = pubsub_connector(rt_handle.clone(), 100, 101);
let subscription_factory = Arc::new(subscription_factory);
let (comms, dht) = setup_comms_services(node_identity.clone(), peers, publisher, data_path).await;

Expand Down
4 changes: 2 additions & 2 deletions base_layer/wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ test_harness = ["tari_test_utils"]
c_integration = []

[dependencies]
tari_broadcast_channel = "^0.1"
tari_broadcast_channel = "^0.2"
tari_comms = { path = "../../comms", version = "^0.1"}
tari_comms_dht = { path = "../../comms/dht", version = "^0.1"}
tari_crypto = { version = "^0.3" }
tari_key_manager = {path = "../key_manager", version = "^0.0"}
tari_p2p = {path = "../p2p", version = "^0.1"}
tari_pubsub = "^0.1"
tari_pubsub = "^0.2"
tari_service_framework = { version = "^0.0", path = "../service_framework"}
tari_shutdown = { path = "../../infrastructure/shutdown", version = "^0.0"}
tari_storage = { version = "^0.1", path = "../../infrastructure/storage"}
Expand Down
2 changes: 1 addition & 1 deletion base_layer/wallet/src/output_manager_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ where T: OutputManagerBackend + 'static
let base_node_response_stream = self.base_node_response_stream();

let (sender, receiver) = reply_channel::unbounded();
let (publisher, subscriber) = bounded(100);
let (publisher, subscriber) = bounded(100, 7);

let oms_handle = OutputManagerHandle::new(sender, subscriber);

Expand Down
2 changes: 1 addition & 1 deletion base_layer/wallet/src/text_message_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl ServiceInitializer for TextMessageServiceInitializer {
.expect("text message service initializer already called");

let (sender, receiver) = reply_channel::unbounded();
let (publisher, subscriber) = bounded(100);
let (publisher, subscriber) = bounded(100, 117);

let text_message_stream = self.text_message_stream();
let text_message_ack_stream = self.text_message_ack_stream();
Expand Down
Loading

0 comments on commit 57ef437

Please sign in to comment.