Skip to content

Commit

Permalink
Add a thread to support thin clients
Browse files Browse the repository at this point in the history
  • Loading branch information
garious committed May 8, 2018
1 parent a59f64c commit 9ff1a6f
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 12 deletions.
36 changes: 30 additions & 6 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,24 @@ impl AccountantSkel {
})
}

fn process_thin_client_requests(_obj: SharedSkel, _socket: &UdpSocket) -> Result<()> {
Ok(())
}

fn thin_client_service(
obj: SharedSkel,
exit: Arc<AtomicBool>,
socket: UdpSocket,
) -> JoinHandle<()> {
spawn(move || loop {
let _ = Self::process_thin_client_requests(obj.clone(), &socket);
if exit.load(Ordering::Relaxed) {
info!("sync_service exiting");
break;
}
})
}

/// Process any Entry items that have been published by the Historian.
/// continuosly broadcast blobs of entries out
fn run_sync_no_broadcast(obj: SharedSkel) -> Result<()> {
Expand Down Expand Up @@ -459,6 +477,7 @@ impl AccountantSkel {
obj: &SharedSkel,
me: ReplicatedData,
serve: UdpSocket,
skinny: UdpSocket,
gossip: UdpSocket,
exit: Arc<AtomicBool>,
writer: W,
Expand Down Expand Up @@ -513,6 +532,8 @@ impl AccountantSkel {
Arc::new(Mutex::new(writer)),
);

let t_skinny = Self::thin_client_service(obj.clone(), exit.clone(), skinny);

let skel = obj.clone();
let t_server = spawn(move || loop {
let e = Self::process(
Expand All @@ -534,6 +555,7 @@ impl AccountantSkel {
t_server,
t_verifier,
t_sync,
t_skinny,
t_gossip,
t_listen,
t_broadcast,
Expand Down Expand Up @@ -815,7 +837,7 @@ mod tests {

#[test]
fn test_accountant_bad_sig() {
let (leader_data, leader_gossip, _, leader_serve) = test_node();
let (leader_data, leader_gossip, _, leader_serve, leader_skinny) = test_node();
let alice = Mint::new(10_000);
let acc = Accountant::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
Expand All @@ -828,6 +850,7 @@ mod tests {
&acc_skel,
leader_data,
leader_serve,
leader_skinny,
leader_gossip,
exit.clone(),
sink(),
Expand Down Expand Up @@ -861,7 +884,8 @@ mod tests {
}
}

fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket) {
fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) {
let skinny = UdpSocket::bind("127.0.0.1:0").unwrap();
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
let replicate = UdpSocket::bind("127.0.0.1:0").unwrap();
let serve = UdpSocket::bind("127.0.0.1:0").unwrap();
Expand All @@ -872,16 +896,16 @@ mod tests {
replicate.local_addr().unwrap(),
serve.local_addr().unwrap(),
);
(d, gossip, replicate, serve)
(d, gossip, replicate, serve, skinny)
}

/// Test that mesasge sent from leader to target1 and repliated to target2
#[test]
fn test_replicate() {
logger::setup();
let (leader_data, leader_gossip, _, leader_serve) = test_node();
let (target1_data, target1_gossip, target1_replicate, target1_serve) = test_node();
let (target2_data, target2_gossip, target2_replicate, _) = test_node();
let (leader_data, leader_gossip, _, leader_serve, _) = test_node();
let (target1_data, target1_gossip, target1_replicate, target1_serve, _) = test_node();
let (target2_data, target2_gossip, target2_replicate, _, _) = test_node();
let exit = Arc::new(AtomicBool::new(false));

//start crdt_leader
Expand Down
12 changes: 8 additions & 4 deletions src/accountant_stub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ mod tests {
logger::setup();
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
let skinny = UdpSocket::bind("0.0.0.0:0").unwrap();
let addr = serve.local_addr().unwrap();
let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new(
Expand All @@ -184,7 +185,8 @@ mod tests {
let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
let acc = Arc::new(AccountantSkel::new(acc, input, historian));
let threads = AccountantSkel::serve(&acc, d, serve, gossip, exit.clone(), sink()).unwrap();
let threads =
AccountantSkel::serve(&acc, d, serve, skinny, gossip, exit.clone(), sink()).unwrap();
sleep(Duration::from_millis(300));

let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
Expand All @@ -200,9 +202,10 @@ mod tests {
}
}

fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket) {
fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) {
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
let skinny = UdpSocket::bind("0.0.0.0:0").unwrap();
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
let pubkey = KeyPair::new().pubkey();
let leader = ReplicatedData::new(
Expand All @@ -211,7 +214,7 @@ mod tests {
replicate.local_addr().unwrap(),
serve.local_addr().unwrap(),
);
(leader, gossip, serve, replicate)
(leader, gossip, serve, replicate, skinny)
}

#[test]
Expand Down Expand Up @@ -242,6 +245,7 @@ mod tests {
&leader_acc,
leader.0.clone(),
leader.2,
leader.4,
leader.1,
exit.clone(),
sink(),
Expand All @@ -257,7 +261,7 @@ mod tests {
).unwrap();

//lets spy on the network
let (mut spy, spy_gossip, _, _) = test_node();
let (mut spy, spy_gossip, _, _, _) = test_node();
let daddr = "0.0.0.0:0".parse().unwrap();
spy.replicate_addr = daddr;
spy.serve_addr = daddr;
Expand Down
13 changes: 11 additions & 2 deletions src/bin/testnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ fn main() {
let serve_addr = format!("0.0.0.0:{}", port);
let gossip_addr = format!("0.0.0.0:{}", port + 1);
let replicate_addr = format!("0.0.0.0:{}", port + 2);
let skinny_addr = format!("0.0.0.0:{}", port + 3);

if stdin_isatty() {
eprintln!("nothing found on stdin, expected a log file");
Expand Down Expand Up @@ -122,6 +123,7 @@ fn main() {
let serve_sock = UdpSocket::bind(&serve_addr).unwrap();
let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap();
let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap();
let skinny_sock = UdpSocket::bind(&skinny_addr).unwrap();
let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new(
pubkey,
Expand All @@ -130,8 +132,15 @@ fn main() {
serve_sock.local_addr().unwrap(),
);
eprintln!("starting server...");
let threads =
AccountantSkel::serve(&skel, d, serve_sock, gossip_sock, exit.clone(), stdout()).unwrap();
let threads = AccountantSkel::serve(
&skel,
d,
serve_sock,
skinny_sock,
gossip_sock,
exit.clone(),
stdout(),
).unwrap();
eprintln!("Ready. Listening on {}", serve_addr);
for t in threads {
t.join().expect("join");
Expand Down

0 comments on commit 9ff1a6f

Please sign in to comment.