From 4f1042b8de1111b229127ee740f289a4c5f36122 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 7 Jun 2018 16:06:32 -0600 Subject: [PATCH] data_replicator -> ncp Fixes #327 --- src/bin/client-demo.rs | 6 +++--- src/lib.rs | 2 +- src/{data_replicator.rs => ncp.rs} | 16 ++++++++-------- src/server.rs | 8 ++++---- src/tvu.rs | 14 +++++++------- tests/data_replicator.rs | 8 ++++---- tests/multinode.rs | 4 ++-- 7 files changed, 29 insertions(+), 29 deletions(-) rename src/{data_replicator.rs => ncp.rs} (87%) diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 6e694da920cda6..9f629cb8faaf4a 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -11,8 +11,8 @@ use isatty::stdin_isatty; use pnet::datalink; use rayon::prelude::*; use solana::crdt::{Crdt, ReplicatedData}; -use solana::data_replicator::DataReplicator; use solana::mint::MintDemo; +use solana::ncp::Ncp; use solana::signature::{GenKeys, KeyPair, KeyPairUtil}; use solana::streamer::default_window; use solana::thin_client::ThinClient; @@ -290,7 +290,7 @@ fn converge( let spy_ref = Arc::new(RwLock::new(spy_crdt)); let window = default_window(); let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); - let data_replicator = DataReplicator::new( + let ncp = Ncp::new( spy_ref.clone(), window.clone(), spy_gossip, @@ -316,7 +316,7 @@ fn converge( } sleep(Duration::new(1, 0)); } - threads.extend(data_replicator.thread_hdls.into_iter()); + threads.extend(ncp.thread_hdls.into_iter()); rv } diff --git a/src/lib.rs b/src/lib.rs index b0a9d5afea8ddc..e1c89bdbc98957 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,7 +13,6 @@ pub mod bank; pub mod banking_stage; pub mod budget; pub mod crdt; -pub mod data_replicator; pub mod entry; pub mod entry_writer; #[cfg(feature = "erasure")] @@ -23,6 +22,7 @@ pub mod hash; pub mod ledger; pub mod logger; pub mod mint; +pub mod ncp; pub mod packet; pub mod payment_plan; pub mod record_stage; diff --git a/src/data_replicator.rs b/src/ncp.rs similarity index 87% rename from src/data_replicator.rs rename to src/ncp.rs index 18ce3c3dd31119..b4f8b759fb850d 100644 --- a/src/data_replicator.rs +++ b/src/ncp.rs @@ -1,4 +1,4 @@ -//! The `data_replicator` module implements the replication threads. +//! The `ncp` module implements the network control plane. use crdt; use packet; @@ -10,22 +10,22 @@ use std::sync::{Arc, RwLock}; use std::thread::JoinHandle; use streamer; -pub struct DataReplicator { +pub struct Ncp { pub thread_hdls: Vec>, } -impl DataReplicator { +impl Ncp { pub fn new( crdt: Arc>, window: Arc>>>, gossip_listen_socket: UdpSocket, gossip_send_socket: UdpSocket, exit: Arc, - ) -> Result { + ) -> Result { let blob_recycler = packet::BlobRecycler::default(); let (request_sender, request_receiver) = channel(); trace!( - "DataReplicator: id: {:?}, listening on: {:?}", + "Ncp: id: {:?}, listening on: {:?}", &crdt.read().unwrap().me[..4], gossip_listen_socket.local_addr().unwrap() ); @@ -52,14 +52,14 @@ impl DataReplicator { ); let t_gossip = crdt::Crdt::gossip(crdt.clone(), blob_recycler, response_sender, exit); let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip]; - Ok(DataReplicator { thread_hdls }) + Ok(Ncp { thread_hdls }) } } #[cfg(test)] mod tests { use crdt::{Crdt, TestNode}; - use data_replicator::DataReplicator; + use ncp::Ncp; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; @@ -71,7 +71,7 @@ mod tests { let crdt = Crdt::new(tn.data.clone()); let c = Arc::new(RwLock::new(crdt)); let w = Arc::new(RwLock::new(vec![])); - let d = DataReplicator::new( + let d = Ncp::new( c.clone(), w, tn.sockets.gossip, diff --git a/src/server.rs b/src/server.rs index 2e3a64164855ae..a769c322b0adee 100644 --- a/src/server.rs +++ b/src/server.rs @@ -2,7 +2,7 @@ use bank::Bank; use crdt::{Crdt, ReplicatedData}; -use data_replicator::DataReplicator; +use ncp::Ncp; use packet; use rpu::Rpu; use std::io::Write; @@ -75,14 +75,14 @@ impl Server { let crdt = Arc::new(RwLock::new(Crdt::new(me))); let window = streamer::default_window(); let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); - let data_replicator = DataReplicator::new( + let ncp = Ncp::new( crdt.clone(), window.clone(), gossip_socket, gossip_send_socket, exit.clone(), - ).expect("DataReplicator::new"); - thread_hdls.extend(data_replicator.thread_hdls); + ).expect("Ncp::new"); + thread_hdls.extend(ncp.thread_hdls); let t_broadcast = streamer::broadcaster( broadcast_socket, diff --git a/src/tvu.rs b/src/tvu.rs index a47b3658a1f638..fe8e9fb484bd5b 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -22,7 +22,7 @@ use bank::Bank; use crdt::{Crdt, ReplicatedData}; -use data_replicator::DataReplicator; +use ncp::Ncp; use packet; use replicate_stage::ReplicateStage; use std::net::UdpSocket; @@ -65,13 +65,13 @@ impl Tvu { .insert(&leader); let window = streamer::default_window(); let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); - let data_replicator = DataReplicator::new( + let ncp = Ncp::new( crdt.clone(), window.clone(), gossip_listen_socket, gossip_send_socket, exit.clone(), - ).expect("DataReplicator::new"); + ).expect("Ncp::new"); // TODO pull this socket out through the public interface // make sure we are on the same interface @@ -132,7 +132,7 @@ impl Tvu { t_repair_receiver, replicate_stage.thread_hdl, ]; - threads.extend(data_replicator.thread_hdls.into_iter()); + threads.extend(ncp.thread_hdls.into_iter()); Tvu { thread_hdls: threads, } @@ -144,11 +144,11 @@ pub mod tests { use bank::Bank; use bincode::serialize; use crdt::{Crdt, TestNode}; - use data_replicator::DataReplicator; use entry::Entry; use hash::{hash, Hash}; use logger; use mint::Mint; + use ncp::Ncp; use packet::BlobRecycler; use result::Result; use signature::{KeyPair, KeyPairUtil}; @@ -166,10 +166,10 @@ pub mod tests { crdt: Arc>, listen: UdpSocket, exit: Arc, - ) -> Result { + ) -> Result { let window = streamer::default_window(); let send_sock = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); - DataReplicator::new(crdt, window, listen, send_sock, exit) + Ncp::new(crdt, window, listen, send_sock, exit) } /// Test that message sent from leader to target1 and replicated to target2 #[test] diff --git a/tests/data_replicator.rs b/tests/data_replicator.rs index e6f37f5563aaff..1e9531a572a4f7 100644 --- a/tests/data_replicator.rs +++ b/tests/data_replicator.rs @@ -5,8 +5,8 @@ extern crate solana; use rayon::iter::*; use solana::crdt::{Crdt, TestNode}; -use solana::data_replicator::DataReplicator; use solana::logger; +use solana::ncp::Ncp; use solana::packet::Blob; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; @@ -14,12 +14,12 @@ use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; -fn test_node(exit: Arc) -> (Arc>, DataReplicator, UdpSocket) { +fn test_node(exit: Arc) -> (Arc>, Ncp, UdpSocket) { let tn = TestNode::new(); let crdt = Crdt::new(tn.data.clone()); let c = Arc::new(RwLock::new(crdt)); let w = Arc::new(RwLock::new(vec![])); - let d = DataReplicator::new( + let d = Ncp::new( c.clone(), w, tn.sockets.gossip, @@ -35,7 +35,7 @@ fn test_node(exit: Arc) -> (Arc>, DataReplicator, UdpSo /// tests that actually use this function are below fn run_gossip_topo(topo: F) where - F: Fn(&Vec<(Arc>, DataReplicator, UdpSocket)>) -> (), + F: Fn(&Vec<(Arc>, Ncp, UdpSocket)>) -> (), { let num: usize = 5; let exit = Arc::new(AtomicBool::new(false)); diff --git a/tests/multinode.rs b/tests/multinode.rs index b333a2888bb029..365ea6c6fdca0b 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -6,9 +6,9 @@ extern crate solana; use solana::bank::Bank; use solana::crdt::TestNode; use solana::crdt::{Crdt, ReplicatedData}; -use solana::data_replicator::DataReplicator; use solana::logger; use solana::mint::Mint; +use solana::ncp::Ncp; use solana::server::Server; use solana::signature::{KeyPair, KeyPairUtil, PublicKey}; use solana::streamer::default_window; @@ -61,7 +61,7 @@ fn converge( spy_crdt.set_leader(leader.id); let spy_ref = Arc::new(RwLock::new(spy_crdt)); let spy_window = default_window(); - let dr = DataReplicator::new( + let dr = Ncp::new( spy_ref.clone(), spy_window, spy.sockets.gossip,