Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
aeyakovenko committed May 1, 2018
1 parent 4bfe61e commit 62704ff
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 24 deletions.
51 changes: 29 additions & 22 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,10 @@ pub enum Response {

impl AccountantSkel {
/// Create a new AccountantSkel that wraps the given Accountant.
pub fn new(acc: Accountant, last_id: Hash, historian: Historian) -> Self {
pub fn new(acc: Accountant, last_id: Hash) -> Self {
AccountantSkel {
acc,
last_id,
historian,
entry_info_subscribers: vec![],
}
}
Expand All @@ -105,24 +104,24 @@ impl AccountantSkel {
fn run_sync<W: Write>(
obj: Arc<RwLock<Self>>,
broadcast: &streamer::BlobSender,
blob_recycler: &streamer::BlobRecycler
writer: W
blob_recycler: &packet::BlobRecycler,
writer: W,
historian: Receiver<Entry>,
) -> Result<()> {
//TODO clean this mess up
let entry = historian.recv(Duration::new(1, 0))?;
let mut b = blob_recycler.allocate();
let mut out = Cursor::new(b.data_mut());
let mut ser = bincode::Serializer::new(out);
let mut seq = ser.serialize_seq(None);
let mut ser = Serializer::new(out);
let mut seq = ser.serialize_seq(None).expect("serialize end");
seq.serialize(entry).expect("serialize failed on first entry!");
obj.write().unwrap().notify_entry_info_subscribers(&entry);

while let Ok(entry) = historian.try_recv() {
//UNLOCK skel in this scope
let mut robj = obj.write().unwrap();
let mut wobj = obj.write().unwrap();
if let Err(e) = seq.serialize(entry) {
seq.end();
seq.end().expect("serialize end");
b.set_size(out.len());
broadcast.send(b)?;

Expand All @@ -132,10 +131,10 @@ impl AccountantSkel {
seq = ser.serialize_seq(None);
seq.serialize(entry).expect("serialize failed on first entry!");
}
self.last_id = entry.id;
self.acc.register_entry_id(&self.last_id);
wobj.last_id = entry.id;
wobj.acc.register_entry_id(&wobj.last_id);
writeln!(writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap();
self.notify_entry_info_subscribers(&entry);
wobj.notify_entry_info_subscribers(&entry);
}
seq.end();
b.set_size(out.len());
Expand All @@ -146,14 +145,14 @@ impl AccountantSkel {
pub fn sync_service<W: Write + Send + 'static>(
obj: Arc<RwLock<Self>>,
exit: AtomicBool,
broadcast: &streamer::BlobSender,
blob_recycler: &streamer::BlobRecycler
broadcast: streamer::BlobSender,
blob_recycler: packet::BlobRecycler,
writer: W,
historian: Receiver<Entry>,
) -> JoinHandle<()> {
spawn(move|| loop {
let e = Self::run_sync(&obj, &broadcast, &blob_recycler, writer, &historian);
if e.is_err() && exit_.load(Ordering::Relaxed) {
if e.is_err() && exit.load(Ordering::Relaxed) {
break;
}
})
Expand Down Expand Up @@ -367,10 +366,12 @@ impl AccountantSkel {
/// Create a UDP microservice that forwards messages the given AccountantSkel.
/// This service is the network leader
/// Set `exit` to shutdown its threads.
pub fn serve(
pub fn serve<W: Write + Send + 'static>(
obj: &Arc<Mutex<Self>>,
me: ReplicatedData
me: ReplicatedData,
exit: Arc<AtomicBool>,
writer: W,
historian: Historian,
) -> Result<Vec<JoinHandle<()>>> {
let gossip = UdpSocket::bind(me.gossip_addr)?;
let read = UdpSocket::bind(me.serve_addr)?;
Expand Down Expand Up @@ -412,27 +413,33 @@ impl AccountantSkel {
blob_recycler.clone(),
broadcast_receiver,
);

let t_sync = Self::sync_service(
obj.clone(),
exit.clone(),
broadcast_sender,
blob_recycler.clone(),
writer,
historian
);

let skel = obj.clone();
let t_server = spawn(move || loop {
let e = Self::process(
&skel,
&obj.clone(),
&verified_receiver,
&broadcast_sender,
&responder_sender,
&packet_recycler,
&blob_recycler,
);
if e.is_err() {
// Assume this was a timeout, so sync any empty entries.
skel.lock().unwrap().sync();

if exit.load(Ordering::Relaxed) {
break;
}
}
});
Ok(vec![t_receiver, t_responder, t_server, t_verifier])
Ok(vec![t_receiver, t_responder, t_server, t_verifier, t_sync])
}

/// This service receives messages from a leader in the network and processes the transactions
Expand Down Expand Up @@ -465,7 +472,7 @@ impl AccountantSkel {
let crdt = Arc::new(RwLock::new(Crdt::new(me)));
crdt.write().unwrap().insert(&leader);
let t_gossip = Crdt::gossip(crdt.clone(), exit.clone());
let t_listen = Crdt::listen(crdt.clone(), exit.clone());
let t_listen = Crdt::listen(crdt.clone(), gossip, exit.clone());

// make sure we are on the same interface
let mut local = read.local_addr()?;
Expand Down
4 changes: 2 additions & 2 deletions src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ pub struct ReplicatedData {
/// should always be increasing
version: u64,
/// address to connect to for gossip
gossip_addr: SocketAddr,
pub gossip_addr: SocketAddr,
/// address to connect to for replication
replicate_addr: SocketAddr,
pub replicate_addr: SocketAddr,
/// address to connect to when this node is leader
serve_addr: SocketAddr,
/// current leader identity
Expand Down

0 comments on commit 62704ff

Please sign in to comment.