Skip to content

Commit

Permalink
data_replicator -> ncp
Browse files Browse the repository at this point in the history
Fixes #327
  • Loading branch information
garious committed Jun 7, 2018
1 parent cdfbbe5 commit 7aa0561
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 29 deletions.
6 changes: 3 additions & 3 deletions src/bin/client-demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use isatty::stdin_isatty;
use pnet::datalink;
use rayon::prelude::*;
use solana::crdt::{Crdt, ReplicatedData};
use solana::data_replicator::DataReplicator;
use solana::mint::MintDemo;
use solana::ncp::Ncp;
use solana::signature::{GenKeys, KeyPair, KeyPairUtil};
use solana::streamer::default_window;
use solana::thin_client::ThinClient;
Expand Down Expand Up @@ -290,7 +290,7 @@ fn converge(
let spy_ref = Arc::new(RwLock::new(spy_crdt));
let window = default_window();
let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
let data_replicator = DataReplicator::new(
let ncp = Ncp::new(
spy_ref.clone(),
window.clone(),
spy_gossip,
Expand All @@ -316,7 +316,7 @@ fn converge(
}
sleep(Duration::new(1, 0));
}
threads.extend(data_replicator.thread_hdls.into_iter());
threads.extend(ncp.thread_hdls.into_iter());
rv
}

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ pub mod bank;
pub mod banking_stage;
pub mod budget;
pub mod crdt;
pub mod data_replicator;
pub mod entry;
pub mod entry_writer;
#[cfg(feature = "erasure")]
Expand All @@ -23,6 +22,7 @@ pub mod hash;
pub mod ledger;
pub mod logger;
pub mod mint;
pub mod ncp;
pub mod packet;
pub mod payment_plan;
pub mod record_stage;
Expand Down
16 changes: 8 additions & 8 deletions src/data_replicator.rs → src/ncp.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! The `data_replicator` module implements the replication threads.
//! The `ncp` module implements the network control plane.

use crdt;
use packet;
Expand All @@ -10,22 +10,22 @@ use std::sync::{Arc, RwLock};
use std::thread::JoinHandle;
use streamer;

pub struct DataReplicator {
pub struct Ncp {
pub thread_hdls: Vec<JoinHandle<()>>,
}

impl DataReplicator {
impl Ncp {
pub fn new(
crdt: Arc<RwLock<crdt::Crdt>>,
window: Arc<RwLock<Vec<Option<packet::SharedBlob>>>>,
gossip_listen_socket: UdpSocket,
gossip_send_socket: UdpSocket,
exit: Arc<AtomicBool>,
) -> Result<DataReplicator> {
) -> Result<Ncp> {
let blob_recycler = packet::BlobRecycler::default();
let (request_sender, request_receiver) = channel();
trace!(
"DataReplicator: id: {:?}, listening on: {:?}",
"Ncp: id: {:?}, listening on: {:?}",
&crdt.read().unwrap().me[..4],
gossip_listen_socket.local_addr().unwrap()
);
Expand All @@ -52,14 +52,14 @@ impl DataReplicator {
);
let t_gossip = crdt::Crdt::gossip(crdt.clone(), blob_recycler, response_sender, exit);
let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip];
Ok(DataReplicator { thread_hdls })
Ok(Ncp { thread_hdls })
}
}

#[cfg(test)]
mod tests {
use crdt::{Crdt, TestNode};
use data_replicator::DataReplicator;
use ncp::Ncp;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};

Expand All @@ -71,7 +71,7 @@ mod tests {
let crdt = Crdt::new(tn.data.clone());
let c = Arc::new(RwLock::new(crdt));
let w = Arc::new(RwLock::new(vec![]));
let d = DataReplicator::new(
let d = Ncp::new(
c.clone(),
w,
tn.sockets.gossip,
Expand Down
8 changes: 4 additions & 4 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use bank::Bank;
use crdt::{Crdt, ReplicatedData};
use data_replicator::DataReplicator;
use ncp::Ncp;
use packet;
use rpu::Rpu;
use std::io::Write;
Expand Down Expand Up @@ -75,14 +75,14 @@ impl Server {
let crdt = Arc::new(RwLock::new(Crdt::new(me)));
let window = streamer::default_window();
let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
let data_replicator = DataReplicator::new(
let ncp = Ncp::new(
crdt.clone(),
window.clone(),
gossip_socket,
gossip_send_socket,
exit.clone(),
).expect("DataReplicator::new");
thread_hdls.extend(data_replicator.thread_hdls);
).expect("Ncp::new");
thread_hdls.extend(ncp.thread_hdls);

let t_broadcast = streamer::broadcaster(
broadcast_socket,
Expand Down
14 changes: 7 additions & 7 deletions src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

use bank::Bank;
use crdt::{Crdt, ReplicatedData};
use data_replicator::DataReplicator;
use ncp::Ncp;
use packet;
use replicate_stage::ReplicateStage;
use std::net::UdpSocket;
Expand Down Expand Up @@ -65,13 +65,13 @@ impl Tvu {
.insert(&leader);
let window = streamer::default_window();
let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
let data_replicator = DataReplicator::new(
let ncp = Ncp::new(
crdt.clone(),
window.clone(),
gossip_listen_socket,
gossip_send_socket,
exit.clone(),
).expect("DataReplicator::new");
).expect("Ncp::new");

// TODO pull this socket out through the public interface
// make sure we are on the same interface
Expand Down Expand Up @@ -132,7 +132,7 @@ impl Tvu {
t_repair_receiver,
replicate_stage.thread_hdl,
];
threads.extend(data_replicator.thread_hdls.into_iter());
threads.extend(ncp.thread_hdls.into_iter());
Tvu {
thread_hdls: threads,
}
Expand All @@ -144,11 +144,11 @@ pub mod tests {
use bank::Bank;
use bincode::serialize;
use crdt::{Crdt, TestNode};
use data_replicator::DataReplicator;
use entry::Entry;
use hash::{hash, Hash};
use logger;
use mint::Mint;
use ncp::Ncp;
use packet::BlobRecycler;
use result::Result;
use signature::{KeyPair, KeyPairUtil};
Expand All @@ -166,10 +166,10 @@ pub mod tests {
crdt: Arc<RwLock<Crdt>>,
listen: UdpSocket,
exit: Arc<AtomicBool>,
) -> Result<DataReplicator> {
) -> Result<Ncp> {
let window = streamer::default_window();
let send_sock = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
DataReplicator::new(crdt, window, listen, send_sock, exit)
Ncp::new(crdt, window, listen, send_sock, exit)
}
/// Test that message sent from leader to target1 and replicated to target2
#[test]
Expand Down
8 changes: 4 additions & 4 deletions tests/data_replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@ extern crate solana;

use rayon::iter::*;
use solana::crdt::{Crdt, TestNode};
use solana::data_replicator::DataReplicator;
use solana::logger;
use solana::ncp::Ncp;
use solana::packet::Blob;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::Duration;

fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<Crdt>>, DataReplicator, UdpSocket) {
fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<Crdt>>, Ncp, UdpSocket) {
let tn = TestNode::new();
let crdt = Crdt::new(tn.data.clone());
let c = Arc::new(RwLock::new(crdt));
let w = Arc::new(RwLock::new(vec![]));
let d = DataReplicator::new(
let d = Ncp::new(
c.clone(),
w,
tn.sockets.gossip,
Expand All @@ -35,7 +35,7 @@ fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<Crdt>>, DataReplicator, UdpSo
/// tests that actually use this function are below
fn run_gossip_topo<F>(topo: F)
where
F: Fn(&Vec<(Arc<RwLock<Crdt>>, DataReplicator, UdpSocket)>) -> (),
F: Fn(&Vec<(Arc<RwLock<Crdt>>, Ncp, UdpSocket)>) -> (),
{
let num: usize = 5;
let exit = Arc::new(AtomicBool::new(false));
Expand Down
4 changes: 2 additions & 2 deletions tests/multinode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ extern crate solana;
use solana::bank::Bank;
use solana::crdt::TestNode;
use solana::crdt::{Crdt, ReplicatedData};
use solana::data_replicator::DataReplicator;
use solana::logger;
use solana::mint::Mint;
use solana::ncp::Ncp;
use solana::server::Server;
use solana::signature::{KeyPair, KeyPairUtil, PublicKey};
use solana::streamer::default_window;
Expand Down Expand Up @@ -61,7 +61,7 @@ fn converge(
spy_crdt.set_leader(leader.id);
let spy_ref = Arc::new(RwLock::new(spy_crdt));
let spy_window = default_window();
let dr = DataReplicator::new(
let dr = Ncp::new(
spy_ref.clone(),
spy_window,
spy.sockets.gossip,
Expand Down

0 comments on commit 7aa0561

Please sign in to comment.