Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tvu cleanup2 #241

Merged
merged 8 commits into from
May 23, 2018
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
wip
aeyakovenko committed May 23, 2018
commit 6f4e2c184b59e3de10f747c68d63115867d50e43
130 changes: 35 additions & 95 deletions src/thin_client.rs
Original file line number Diff line number Diff line change
@@ -192,47 +192,28 @@ mod tests {
use std::thread::sleep;
use std::time::Duration;
use streamer::default_window;
use tvu::tests::TestNode;

#[test]
fn test_thin_client() {
logger::setup();
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(1, 0)))
.unwrap();
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let events_addr = events_socket.local_addr().unwrap();
let addr = requests_socket.local_addr().unwrap();
let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
"0.0.0.0:0".parse().unwrap(),
requests_socket.local_addr().unwrap(),
events_addr,
);
let leader = TestNode::new();

let alice = Mint::new(10_000);
let bank = Bank::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));

let mut local = requests_socket.local_addr().unwrap();
local.set_port(0);
let broadcast_socket = UdpSocket::bind(local).unwrap();
let respond_socket = UdpSocket::bind(local.clone()).unwrap();

let server = Server::leader(
bank,
alice.last_id(),
Some(Duration::from_millis(30)),
d,
requests_socket,
events_socket,
broadcast_socket,
respond_socket,
gossip,
leader.data,
leader.requests,
leader.event,
leader.broadcast,
leader.respond,
leader.gossip,
exit.clone(),
sink(),
);
@@ -241,7 +222,7 @@ mod tests {
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();

let mut client = ThinClient::new(addr, requests_socket, events_addr, events_socket);
let mut client = ThinClient::new(leader.data.requests_addr, requests_socket, leader.data.events_addr, events_socket);
let last_id = client.get_last_id().wait().unwrap();
let _sig = client
.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
@@ -262,24 +243,18 @@ mod tests {
let bank = Bank::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let serve_addr = leader.data.requests_addr;
let mut local = serve_addr.local_addr().unwrap();
local.set_port(0);
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let broadcast_socket = UdpSocket::bind(local).unwrap();
let respond_socket = UdpSocket::bind(local.clone()).unwrap();
let events_addr = events_socket.local_addr().unwrap();
let events_addr = leader.data.events_addr;

let server = Server::leader(
bank,
alice.last_id(),
Some(Duration::from_millis(30)),
leader_data,
leader_serve,
events_socket,
broadcast_socket,
respond_socket,
leader_gossip,
leader.data,
leader.requests,
leader.event,
leader.broadcast,
leader.respond,
leader.gossip,
exit.clone(),
sink(),
);
@@ -290,7 +265,7 @@ mod tests {
.set_read_timeout(Some(Duration::new(5, 0)))
.unwrap();
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut client = ThinClient::new(serve_addr, requests_socket, events_addr, events_socket);
let mut client = ThinClient::new(leader.data.requests_addr, requests_socket, events_addr, events_socket);
let last_id = client.get_last_id().wait().unwrap();

let tr = Transaction::new(&alice.keypair(), bob_pubkey, 500, last_id);
@@ -311,35 +286,6 @@ mod tests {
t.join().unwrap();
}
}
struct TestNode {
data: ReplicatedData,
gossip: UdpSocket,
requests: UdpSocket,
replicate: UdpSocket,
event: UdpSocket,
respond: UdpSocket,
broadcast: UdpSocket,
}
impl TestNode {
fn new() -> TestNode {
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
let requests = UdpSocket::bind("0.0.0.0:0").unwrap();
let event = UdpSocket::bind("0.0.0.0:0").unwrap();
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
let respond = UdpSocket::bind("0.0.0.0:0").unwrap();
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
let pubkey = KeyPair::new().pubkey();
let data = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
requests.local_addr().unwrap(),
event.local_addr().unwrap(),
);
TestNode {data, gossip, requests, replicate, event, respond, broadcast }
}
}

fn replicant(
leader: &ReplicatedData,
exit: Arc<AtomicBool>,
@@ -368,18 +314,18 @@ mod tests {
threads: &mut Vec<JoinHandle<()>>,
) -> Vec<SocketAddr> {
//lets spy on the network
let mut spy = test_node();
let mut spy = TestNode::new();
let daddr = "0.0.0.0:0".parse().unwrap();
let me = spy.id.clone();
spy.replicate_addr = daddr;
spy.requests_addr = daddr;
let mut spy_crdt = Crdt::new(spy);
let me = spy.data.id.clone();
spy.data.replicate_addr = daddr;
spy.data.requests_addr = daddr;
let mut spy_crdt = Crdt::new(spy.data);
spy_crdt.insert(&leader);
spy_crdt.set_leader(leader.id);

let spy_ref = Arc::new(RwLock::new(spy_crdt));
let spy_window = default_window();
let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone());
let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy.gossip, exit.clone());
let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone());
//wait for the network to converge
let mut converged = false;
@@ -411,38 +357,32 @@ mod tests {
logger::setup();
const N: usize = 5;
trace!("test_multi_accountant_stub");
let leader = test_node();
let leader = TestNode::new();
let alice = Mint::new(10_000);
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));

let leader_bank = Bank::new(&alice);

let mut local = leader.2.local_addr().unwrap();
local.set_port(0);
let broadcast_socket = UdpSocket::bind(local).unwrap();
let respond_socket = UdpSocket::bind(local.clone()).unwrap();
let events_addr = leader.4.local_addr().unwrap();

let events_addr = leader.data.events_addr;
let server = Server::leader(
leader_bank,
alice.last_id(),
None,
leader.0.clone(),
leader.2,
leader.4,
broadcast_socket,
respond_socket,
leader.1,
leader.data,
leader.requests,
leader.event,
leader.broadcast,
leader.respond,
leader.gossip,
exit.clone(),
sink(),
);

let mut threads = server.thread_hdls;
for _ in 0..N {
replicant(&leader.0, exit.clone(), &alice, &mut threads);
replicant(&leader.data, exit.clone(), &alice, &mut threads);
}
let addrs = converge(&leader.0, exit.clone(), N + 2, &mut threads);
let addrs = converge(&leader.data, exit.clone(), N + 2, &mut threads);
//contains the leader addr as well
assert_eq!(addrs.len(), N + 1);
//verify leader can do transfer
@@ -454,9 +394,9 @@ mod tests {
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();

let mut client = ThinClient::new(
leader.0.requests_addr,
leader.data.requests_addr,
requests_socket,
events_addr,
leader.data.events_addr,
events_socket,
);
trace!("getting leader last_id");
71 changes: 50 additions & 21 deletions src/tvu.rs
Original file line number Diff line number Diff line change
@@ -149,7 +149,7 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke
}

#[cfg(test)]
mod tests {
pub mod tests {
use bank::Bank;
use bincode::serialize;
use chrono::prelude::*;
@@ -166,36 +166,38 @@ mod tests {
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use std::net::UdpSocket;
use streamer;
use tvu::{test_node, Tvu};
use crdt::ReplicatedData;
use tvu::Tvu;

/// Test that mesasge sent from leader to target1 and repliated to target2
#[test]
fn test_replicate() {
logger::setup();
let (leader_data, leader_gossip, _, leader_serve, _) = test_node();
let (target1_data, target1_gossip, target1_replicate, _, target1_events) = test_node();
let (target2_data, target2_gossip, target2_replicate, _, _) = test_node();
let leader = TestNode::new();
let target1 = TestNode::new();
let target2 = TestNode::new();
let exit = Arc::new(AtomicBool::new(false));

//start crdt_leader
let mut crdt_l = Crdt::new(leader_data.clone());
crdt_l.set_leader(leader_data.id);
let mut crdt_l = Crdt::new(leader.data.clone());
crdt_l.set_leader(leader.data.id);

let cref_l = Arc::new(RwLock::new(crdt_l));
let t_l_gossip = Crdt::gossip(cref_l.clone(), exit.clone());
let window1 = streamer::default_window();
let t_l_listen = Crdt::listen(cref_l, window1, leader_gossip, exit.clone());
let t_l_listen = Crdt::listen(cref_l, window1, leader.gossip, exit.clone());

//start crdt2
let mut crdt2 = Crdt::new(target2_data.clone());
crdt2.insert(&leader_data);
crdt2.set_leader(leader_data.id);
let leader_id = leader_data.id;
let mut crdt2 = Crdt::new(target2.data.clone());
crdt2.insert(&leader.data);
crdt2.set_leader(leader.data.id);
let leader_id = leader.data.id;
let cref2 = Arc::new(RwLock::new(crdt2));
let t2_gossip = Crdt::gossip(cref2.clone(), exit.clone());
let window2 = streamer::default_window();
let t2_listen = Crdt::listen(cref2, window2, target2_gossip, exit.clone());
let t2_listen = Crdt::listen(cref2, window2, target2.gossip, exit.clone());

// setup some blob services to send blobs into the socket
// to simulate the source peer and get blobs out of the socket to
@@ -206,30 +208,29 @@ mod tests {
let t_receiver = streamer::blob_receiver(
exit.clone(),
recv_recycler.clone(),
target2_replicate,
target2.replicate,
s_reader,
).unwrap();

// simulate leader sending messages
let (s_responder, r_responder) = channel();
let t_responder = streamer::responder(
leader_serve,
leader.requests,
exit.clone(),
resp_recycler.clone(),
r_responder,
);

let starting_balance = 10_000;
let mint = Mint::new(starting_balance);
let replicate_addr = target1_data.replicate_addr;
let replicate_addr = target1.data.replicate_addr;
let bank = Arc::new(Bank::new(&mint));
let tvu = Tvu::new(
bank,
target1_data,
target1_gossip,
target1_events,
target1_replicate,
leader_data,
target1.data,
target1.gossip,
target1.replicate,
leader.data,
exit.clone(),
);

@@ -302,4 +303,32 @@ mod tests {
t_l_gossip.join().expect("join");
t_l_listen.join().expect("join");
}
pub struct TestNode {
pub data: ReplicatedData,
pub gossip: UdpSocket,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you either add the suffix _socket or put all the sockets into an inner struct and name the field sockets?

pub requests: UdpSocket,
pub replicate: UdpSocket,
pub event: UdpSocket,
pub respond: UdpSocket,
pub broadcast: UdpSocket,
}
impl TestNode {
pub fn new() -> TestNode {
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
let requests = UdpSocket::bind("0.0.0.0:0").unwrap();
let event = UdpSocket::bind("0.0.0.0:0").unwrap();
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
let respond = UdpSocket::bind("0.0.0.0:0").unwrap();
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
let pubkey = KeyPair::new().pubkey();
let data = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
requests.local_addr().unwrap(),
event.local_addr().unwrap(),
);
TestNode {data, gossip, requests, replicate, event, respond, broadcast }
}
}
}