Skip to content

Commit

Permalink
Work on test_replicate to test replicate service
Browse files Browse the repository at this point in the history
generate some messages to send to replicator service
  • Loading branch information
sakridge committed Apr 24, 2018
1 parent 92b8bb6 commit 58d1ddd
Showing 1 changed file with 37 additions and 9 deletions.
46 changes: 37 additions & 9 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ mod tests {
use accountant_skel::{to_packets, Request};
use bincode::serialize;
use ecdsa;
use packet::{PacketRecycler, NUM_PACKETS};
use packet::{BlobRecycler, PacketRecycler, NUM_PACKETS};
use transaction::{memfind, test_tx};

use accountant::Accountant;
Expand All @@ -437,6 +437,10 @@ mod tests {
use transaction::Transaction;

use subscribers::{Node, Subscribers};
use streamer;
use std::sync::mpsc::channel;
use std::collections::VecDeque;
use packet::{PACKET_DATA_SIZE};

#[test]
fn test_layout() {
Expand Down Expand Up @@ -544,26 +548,50 @@ mod tests {

#[test]
fn test_replicate() {
let serve_port = 9004;
let send_port = 9005;
let addr = format!("127.0.0.1:{}", serve_port);
let send_addr = format!("127.0.0.1:{}", send_port);
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));

let node_me = Node::default();
let node_leader = Node::new([0; 8], 0, send.local_addr().unwrap());
let subs = Subscribers::new(node_me, node_leader, &[]);

let recv_recycler = PacketRecycler::default();
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = streamer::receiver(read, exit.clone(), recv_recycler.clone(), s_reader).unwrap();
let (s_responder, r_responder) = channel();
let t_responder = streamer::responder(send, exit.clone(), resp_recycler.clone(), r_responder);

let alice = Mint::new(10_000);
let acc = Accountant::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let historian = Historian::new(&alice.last_id(), Some(30));
let acc = Arc::new(Mutex::new(AccountantSkel::new(
acc,
alice.last_id(),
sink(),
historian,
)));
let node_me = Node::default();
let node_leader = Node::default();
let subs = Subscribers::new(node_me, node_leader, &[]);

let _threads = AccountantSkel::replicate(&acc, subs, exit.clone()).unwrap();

let mut msgs = VecDeque::new();
for i in 0..10 {
let b = resp_recycler.allocate();
let b_ = b.clone();
let mut w = b.write().unwrap();
w.data[0] = i as u8;
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&addr);
msgs.push_back(b_);
}
s_responder.send(msgs).expect("send");

exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_responder.join().expect("join");
}

}
Expand Down

0 comments on commit 58d1ddd

Please sign in to comment.