diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 18ce6cb6b95e85..58dad07712d200 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -212,6 +212,24 @@ impl AccountantSkel { }) } + fn process_thin_client_requests(_obj: SharedSkel, _socket: &UdpSocket) -> Result<()> { + Ok(()) + } + + fn thin_client_service( + obj: SharedSkel, + exit: Arc, + socket: UdpSocket, + ) -> JoinHandle<()> { + spawn(move || loop { + let _ = Self::process_thin_client_requests(obj.clone(), &socket); + if exit.load(Ordering::Relaxed) { + info!("sync_service exiting"); + break; + } + }) + } + /// Process any Entry items that have been published by the Historian. /// continuosly broadcast blobs of entries out fn run_sync_no_broadcast(obj: SharedSkel) -> Result<()> { @@ -459,6 +477,7 @@ impl AccountantSkel { obj: &SharedSkel, me: ReplicatedData, serve: UdpSocket, + skinny: UdpSocket, gossip: UdpSocket, exit: Arc, writer: W, @@ -513,6 +532,8 @@ impl AccountantSkel { Arc::new(Mutex::new(writer)), ); + let t_skinny = Self::thin_client_service(obj.clone(), exit.clone(), skinny); + let skel = obj.clone(); let t_server = spawn(move || loop { let e = Self::process( @@ -534,6 +555,7 @@ impl AccountantSkel { t_server, t_verifier, t_sync, + t_skinny, t_gossip, t_listen, t_broadcast, @@ -815,7 +837,7 @@ mod tests { #[test] fn test_accountant_bad_sig() { - let (leader_data, leader_gossip, _, leader_serve) = test_node(); + let (leader_data, leader_gossip, _, leader_serve, leader_skinny) = test_node(); let alice = Mint::new(10_000); let acc = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); @@ -828,6 +850,7 @@ mod tests { &acc_skel, leader_data, leader_serve, + leader_skinny, leader_gossip, exit.clone(), sink(), @@ -861,7 +884,8 @@ mod tests { } } - fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket) { + fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) { + let skinny = UdpSocket::bind("127.0.0.1:0").unwrap(); let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); let replicate = UdpSocket::bind("127.0.0.1:0").unwrap(); let serve = UdpSocket::bind("127.0.0.1:0").unwrap(); @@ -872,16 +896,16 @@ mod tests { replicate.local_addr().unwrap(), serve.local_addr().unwrap(), ); - (d, gossip, replicate, serve) + (d, gossip, replicate, serve, skinny) } /// Test that mesasge sent from leader to target1 and repliated to target2 #[test] fn test_replicate() { logger::setup(); - let (leader_data, leader_gossip, _, leader_serve) = test_node(); - let (target1_data, target1_gossip, target1_replicate, target1_serve) = test_node(); - let (target2_data, target2_gossip, target2_replicate, _) = test_node(); + let (leader_data, leader_gossip, _, leader_serve, _) = test_node(); + let (target1_data, target1_gossip, target1_replicate, target1_serve, _) = test_node(); + let (target2_data, target2_gossip, target2_replicate, _, _) = test_node(); let exit = Arc::new(AtomicBool::new(false)); //start crdt_leader diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index d5356083be5536..0527f6b6b5f3ee 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -168,6 +168,7 @@ mod tests { logger::setup(); let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); + let skinny = UdpSocket::bind("0.0.0.0:0").unwrap(); let addr = serve.local_addr().unwrap(); let pubkey = KeyPair::new().pubkey(); let d = ReplicatedData::new( @@ -184,7 +185,8 @@ mod tests { let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); let acc = Arc::new(AccountantSkel::new(acc, input, historian)); - let threads = AccountantSkel::serve(&acc, d, serve, gossip, exit.clone(), sink()).unwrap(); + let threads = + AccountantSkel::serve(&acc, d, serve, skinny, gossip, exit.clone(), sink()).unwrap(); sleep(Duration::from_millis(300)); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -200,9 +202,10 @@ mod tests { } } - fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket) { + fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) { let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); + let skinny = UdpSocket::bind("0.0.0.0:0").unwrap(); let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); let pubkey = KeyPair::new().pubkey(); let leader = ReplicatedData::new( @@ -211,7 +214,7 @@ mod tests { replicate.local_addr().unwrap(), serve.local_addr().unwrap(), ); - (leader, gossip, serve, replicate) + (leader, gossip, serve, replicate, skinny) } #[test] @@ -242,6 +245,7 @@ mod tests { &leader_acc, leader.0.clone(), leader.2, + leader.4, leader.1, exit.clone(), sink(), @@ -257,7 +261,7 @@ mod tests { ).unwrap(); //lets spy on the network - let (mut spy, spy_gossip, _, _) = test_node(); + let (mut spy, spy_gossip, _, _, _) = test_node(); let daddr = "0.0.0.0:0".parse().unwrap(); spy.replicate_addr = daddr; spy.serve_addr = daddr; diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 8d1a4e1eab7a53..5aede0c3f174fa 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -55,6 +55,7 @@ fn main() { let serve_addr = format!("0.0.0.0:{}", port); let gossip_addr = format!("0.0.0.0:{}", port + 1); let replicate_addr = format!("0.0.0.0:{}", port + 2); + let skinny_addr = format!("0.0.0.0:{}", port + 3); if stdin_isatty() { eprintln!("nothing found on stdin, expected a log file"); @@ -122,6 +123,7 @@ fn main() { let serve_sock = UdpSocket::bind(&serve_addr).unwrap(); let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap(); let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap(); + let skinny_sock = UdpSocket::bind(&skinny_addr).unwrap(); let pubkey = KeyPair::new().pubkey(); let d = ReplicatedData::new( pubkey, @@ -130,8 +132,15 @@ fn main() { serve_sock.local_addr().unwrap(), ); eprintln!("starting server..."); - let threads = - AccountantSkel::serve(&skel, d, serve_sock, gossip_sock, exit.clone(), stdout()).unwrap(); + let threads = AccountantSkel::serve( + &skel, + d, + serve_sock, + skinny_sock, + gossip_sock, + exit.clone(), + stdout(), + ).unwrap(); eprintln!("Ready. Listening on {}", serve_addr); for t in threads { t.join().expect("join");