Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
brodcast
Browse files Browse the repository at this point in the history
wip, combining subscribers and crdt

wip

progress

progress

update

building

progress

progress

progress

update

wip
  • Loading branch information
aeyakovenko committed May 1, 2018
1 parent d522397 commit 0a20f71
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 32 deletions.
51 changes: 29 additions & 22 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,10 @@ pub enum Response {

impl AccountantSkel {
/// Create a new AccountantSkel that wraps the given Accountant.
pub fn new(acc: Accountant, last_id: Hash, historian: Historian) -> Self {
pub fn new(acc: Accountant, last_id: Hash) -> Self {
AccountantSkel {
acc,
last_id,
historian,
entry_info_subscribers: vec![],
}
}
Expand All @@ -105,24 +104,24 @@ impl AccountantSkel {
fn run_sync<W: Write>(
obj: Arc<RwLock<Self>>,
broadcast: &streamer::BlobSender,
blob_recycler: &streamer::BlobRecycler
writer: W
blob_recycler: &packet::BlobRecycler,
writer: W,
historian: Receiver<Entry>,
) -> Result<()> {
//TODO clean this mess up
let entry = historian.recv(Duration::new(1, 0))?;
let mut b = blob_recycler.allocate();
let mut out = Cursor::new(b.data_mut());
let mut ser = bincode::Serializer::new(out);
let mut seq = ser.serialize_seq(None);
let mut ser = Serializer::new(out);
let mut seq = ser.serialize_seq(None).expect("serialize end");
seq.serialize(entry).expect("serialize failed on first entry!");
obj.write().unwrap().notify_entry_info_subscribers(&entry);

while let Ok(entry) = historian.try_recv() {
//UNLOCK skel in this scope
let mut robj = obj.write().unwrap();
let mut wobj = obj.write().unwrap();
if let Err(e) = seq.serialize(entry) {
seq.end();
seq.end().expect("serialize end");
b.set_size(out.len());
broadcast.send(b)?;

Expand All @@ -132,10 +131,10 @@ impl AccountantSkel {
seq = ser.serialize_seq(None);
seq.serialize(entry).expect("serialize failed on first entry!");
}
self.last_id = entry.id;
self.acc.register_entry_id(&self.last_id);
wobj.last_id = entry.id;
wobj.acc.register_entry_id(&wobj.last_id);
writeln!(writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap();
self.notify_entry_info_subscribers(&entry);
wobj.notify_entry_info_subscribers(&entry);
}
seq.end();
b.set_size(out.len());
Expand All @@ -146,14 +145,14 @@ impl AccountantSkel {
pub fn sync_service<W: Write + Send + 'static>(
obj: Arc<RwLock<Self>>,
exit: AtomicBool,
broadcast: &streamer::BlobSender,
blob_recycler: &streamer::BlobRecycler
broadcast: streamer::BlobSender,
blob_recycler: packet::BlobRecycler,
writer: W,
historian: Receiver<Entry>,
) -> JoinHandle<()> {
spawn(move|| loop {
let e = Self::run_sync(&obj, &broadcast, &blob_recycler, writer, &historian);
if e.is_err() && exit_.load(Ordering::Relaxed) {
if e.is_err() && exit.load(Ordering::Relaxed) {
break;
}
})
Expand Down Expand Up @@ -367,10 +366,12 @@ impl AccountantSkel {
/// Create a UDP microservice that forwards messages the given AccountantSkel.
/// This service is the network leader
/// Set `exit` to shutdown its threads.
pub fn serve(
pub fn serve<W: Write + Send + 'static>(
obj: &Arc<Mutex<Self>>,
me: ReplicatedData
me: ReplicatedData,
exit: Arc<AtomicBool>,
writer: W,
historian: Historian,
) -> Result<Vec<JoinHandle<()>>> {
let gossip = UdpSocket::bind(me.gossip_addr)?;
let read = UdpSocket::bind(me.serve_addr)?;
Expand Down Expand Up @@ -412,27 +413,33 @@ impl AccountantSkel {
blob_recycler.clone(),
broadcast_receiver,
);

let t_sync = Self::sync_service(
obj.clone(),
exit.clone(),
broadcast_sender,
blob_recycler.clone(),
writer,
historian
);

let skel = obj.clone();
let t_server = spawn(move || loop {
let e = Self::process(
&skel,
&obj.clone(),
&verified_receiver,
&broadcast_sender,
&responder_sender,
&packet_recycler,
&blob_recycler,
);
if e.is_err() {
// Assume this was a timeout, so sync any empty entries.
skel.lock().unwrap().sync();

if exit.load(Ordering::Relaxed) {
break;
}
}
});
Ok(vec![t_receiver, t_responder, t_server, t_verifier])
Ok(vec![t_receiver, t_responder, t_server, t_verifier, t_sync])
}

/// This service receives messages from a leader in the network and processes the transactions
Expand Down Expand Up @@ -465,7 +472,7 @@ impl AccountantSkel {
let crdt = Arc::new(RwLock::new(Crdt::new(me)));
crdt.write().unwrap().insert(&leader);
let t_gossip = Crdt::gossip(crdt.clone(), exit.clone());
let t_listen = Crdt::listen(crdt.clone(), exit.clone());
let t_listen = Crdt::listen(crdt.clone(), gossip, exit.clone());

// make sure we are on the same interface
let mut local = read.local_addr()?;
Expand Down
4 changes: 2 additions & 2 deletions src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ pub struct ReplicatedData {
/// should always be increasing
version: u64,
/// address to connect to for gossip
gossip_addr: SocketAddr,
pub gossip_addr: SocketAddr,
/// address to connect to for replication
replicate_addr: SocketAddr,
pub replicate_addr: SocketAddr,
/// address to connect to when this node is leader
serve_addr: SocketAddr,
/// current leader identity
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![cfg_attr(feature = "unstable", feature(test))]
pub mod accountant;
//pub mod accountant_skel;
//pub mod accountant_stub;
pub mod accountant_stub;
pub mod crdt;
//pub mod ecdsa;
pub mod entry;
Expand Down
2 changes: 1 addition & 1 deletion src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl Blob {
Ok(e)
}
pub fn set_id(&mut self, id: PublicKey) -> Result<()> {
let wtr = serialize(&id)?;
let wrt = serialize(&id)?;
self.data[BLOB_INDEX_END..BLOB_ID_END].clone_from_slice(&wtr);
Ok(())
}
Expand Down
6 changes: 0 additions & 6 deletions src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,6 @@ mod test {
use std::time::Duration;
use streamer::{blob_receiver, receiver, responder, retransmitter, window, BlobReceiver,
PacketReceiver};

use crdt::{Crdt, ReplicatedData};
use subscribers::Node;
use signature::KeyPair;
Expand Down Expand Up @@ -514,16 +513,11 @@ mod test {
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let serve = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));

let rep_data = ReplicatedData::new(pubkey_me,
read.local_addr().unwrap(),
send.local_addr().unwrap(),
serve.local_addr().unwrap());
let subs = Arc::new(RwLock::new(Crdt::new(rep_data)));
/*
Node::default(),
Node::new([0; 8], 0, send.local_addr().unwrap()),
&[],*/

let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
Expand Down

0 comments on commit 0a20f71

Please sign in to comment.