Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] Broadcast #164

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ license = "Apache-2.0"
name = "solana-historian-demo"
path = "src/bin/historian-demo.rs"

[[bin]]
name = "solana-client-demo"
path = "src/bin/client-demo.rs"
#[[bin]]
#name = "solana-client-demo"
#path = "src/bin/client-demo.rs"

[[bin]]
name = "solana-testnode"
path = "src/bin/testnode.rs"
#[[bin]]
#name = "solana-testnode"
#path = "src/bin/testnode.rs"

[[bin]]
name = "solana-genesis"
Expand Down
3 changes: 0 additions & 3 deletions libcuda_verify_ed25519.a

This file was deleted.

150 changes: 111 additions & 39 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use accountant::Accountant;
use bincode::{deserialize, serialize};
use serde::Serializer;
use ecdsa;
use entry::Entry;
use event::Event;
Expand All @@ -18,7 +19,7 @@ use serde_json;
use signature::PublicKey;
use std::cmp::max;
use std::collections::VecDeque;
use std::io::Write;
use std::io::{Cursor, Write};
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
Expand All @@ -27,14 +28,11 @@ use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use streamer;
use transaction::Transaction;
use crdt::{ReplicatedData, Crdt};

use subscribers;

pub struct AccountantSkel<W: Write + Send + 'static> {
pub struct AccountantSkel {
acc: Accountant,
last_id: Hash,
writer: W,
historian: Historian,
entry_info_subscribers: Vec<SocketAddr>,
}

Expand Down Expand Up @@ -76,13 +74,12 @@ pub enum Response {
LastId { id: Hash },
}

impl<W: Write + Send + 'static> AccountantSkel<W> {
impl AccountantSkel {
/// Create a new AccountantSkel that wraps the given Accountant.
pub fn new(acc: Accountant, last_id: Hash, writer: W, historian: Historian) -> Self {
pub fn new(acc: Accountant, last_id: Hash, historian: Historian) -> Self {
AccountantSkel {
acc,
last_id,
writer,
historian,
entry_info_subscribers: vec![],
}
Expand All @@ -104,16 +101,64 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
}

/// Process any Entry items that have been published by the Historian.
pub fn sync(&mut self) -> Hash {
while let Ok(entry) = self.historian.receiver.try_recv() {
/// continuosly broadcast blobs of entries out
fn run_sync<W: Write>(
obj: Arc<RwLock<Self>>,
broadcast: &streamer::BlobSender,
blob_recycler: &streamer::BlobRecycler
writer: W
historian: Receiver<Entry>,
) -> Result<()> {
//TODO clean this mess up
let entry = historian.recv(Duration::new(1, 0))?;
let mut b = blob_recycler.allocate();
let mut out = Cursor::new(b.data_mut());
let mut ser = bincode::Serializer::new(out);
let mut seq = ser.serialize_seq(None);
seq.serialize(entry).expect("serialize failed on first entry!");
obj.write().unwrap().notify_entry_info_subscribers(&entry);

while let Ok(entry) = historian.try_recv() {
//UNLOCK skel in this scope
let mut robj = obj.write().unwrap();
if let Err(e) = seq.serialize(entry) {
seq.end();
b.set_size(out.len());
broadcast.send(b)?;

//NEW SEQUENCE
b = blob_recycler.allocate();
out = Cursor::new(b.data_mut());
seq = ser.serialize_seq(None);
seq.serialize(entry).expect("serialize failed on first entry!");
}
self.last_id = entry.id;
self.acc.register_entry_id(&self.last_id);
writeln!(self.writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap();
writeln!(writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap();
self.notify_entry_info_subscribers(&entry);
}
self.last_id
seq.end();
b.set_size(out.len());
broadcast.send(b)?;
Ok(())
}

pub fn sync_service<W: Write + Send + 'static>(
obj: Arc<RwLock<Self>>,
exit: AtomicBool,
broadcast: &streamer::BlobSender,
blob_recycler: &streamer::BlobRecycler
writer: W,
historian: Receiver<Entry>,
) -> JoinHandle<()> {
spawn(move|| loop {
let e = Self::run_sync(&obj, &broadcast, &blob_recycler, writer, &historian);
if e.is_err() && exit_.load(Ordering::Relaxed) {
break;
}
})
}

/// Process Request items sent by clients.
pub fn process_request(
&mut self,
Expand Down Expand Up @@ -261,9 +306,10 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
}

fn process(
obj: &Arc<Mutex<AccountantSkel<W>>>,
obj: &Arc<Mutex<Self>>,
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
blob_sender: &streamer::BlobSender,
broadcast_sender: &streamer::BlobSender,
responder_sender: &streamer::BlobSender,
packet_recycler: &packet::PacketRecycler,
blob_recycler: &packet::BlobRecycler,
) -> Result<()> {
Expand All @@ -280,7 +326,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
let blobs = Self::serialize_responses(rsps, blob_recycler)?;
if !blobs.is_empty() {
//don't wake up the other side if there is nothing
blob_sender.send(blobs)?;
responder_sender.send(blobs)?;
}
packet_recycler.recycle(msgs);

Expand All @@ -293,7 +339,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
/// Process verified blobs, already in order
/// Respond with a signed hash of the state
fn replicate_state(
obj: &Arc<Mutex<AccountantSkel<W>>>,
obj: &Arc<Mutex<Self>>,
verified_receiver: &streamer::BlobReceiver,
blob_recycler: &packet::BlobRecycler,
) -> Result<()> {
Expand Down Expand Up @@ -322,11 +368,14 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
/// This service is the network leader
/// Set `exit` to shutdown its threads.
pub fn serve(
obj: &Arc<Mutex<AccountantSkel<W>>>,
addr: &str,
obj: &Arc<Mutex<Self>>,
me: ReplicatedData
exit: Arc<AtomicBool>,
) -> Result<Vec<JoinHandle<()>>> {
let read = UdpSocket::bind(addr)?;
let gossip = UdpSocket::bind(me.gossip_addr)?;
let read = UdpSocket::bind(me.serve_addr)?;
let crdt = Arc::new(RwLock::new(Crdt::new(me)));

// make sure we are on the same interface
let mut local = read.local_addr()?;
local.set_port(0);
Expand All @@ -337,9 +386,13 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
let (packet_sender, packet_receiver) = channel();
let t_receiver =
streamer::receiver(read, exit.clone(), packet_recycler.clone(), packet_sender)?;
let (blob_sender, blob_receiver) = channel();
let t_responder =
streamer::responder(write, exit.clone(), blob_recycler.clone(), blob_receiver);
let (responder_sender, responder_receiver) = channel();
let t_responder = streamer::responder(
write,
exit.clone(),
blob_recycler.clone(),
responder_receiver
);
let (verified_sender, verified_receiver) = channel();

let exit_ = exit.clone();
Expand All @@ -350,12 +403,23 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
}
});

let (broadcast_sender, broadcast_receiver) = channel();

let t_broadcast = streamer::broadcaster(
write,
exit.clone(),
crdt.clone(),
blob_recycler.clone(),
broadcast_receiver,
);

let skel = obj.clone();
let t_server = spawn(move || loop {
let e = Self::process(
&skel,
&verified_receiver,
&blob_sender,
&broadcast_sender,
&responder_sender,
&packet_recycler,
&blob_recycler,
);
Expand All @@ -375,7 +439,8 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
/// on the accountant state.
/// # Arguments
/// * `obj` - The accountant state.
/// * `rsubs` - The subscribers.
/// * `me` - my configuration
/// * `leader` - leader configuration
/// * `exit` - The exit signal.
/// # Remarks
/// The pipeline is constructed as follows:
Expand All @@ -389,11 +454,19 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
/// 4. process the transaction state machine
/// 5. respond with the hash of the state back to the leader
pub fn replicate(
obj: &Arc<Mutex<AccountantSkel<W>>>,
rsubs: subscribers::Subscribers,
obj: &Arc<Mutex<Self>>,
me: ReplicatedData,
leader: ReplicatedData,
exit: Arc<AtomicBool>,
) -> Result<Vec<JoinHandle<()>>> {
let read = UdpSocket::bind(rsubs.me.addr)?;
let gossip = UdpSocket::bind(me.gossip_addr)?;
let read = UdpSocket::bind(me.replicate_addr)?;

let crdt = Arc::new(RwLock::new(Crdt::new(me)));
crdt.write().unwrap().insert(&leader);
let t_gossip = Crdt::gossip(crdt.clone(), exit.clone());
let t_listen = Crdt::listen(crdt.clone(), exit.clone());

// make sure we are on the same interface
let mut local = read.local_addr()?;
local.set_port(0);
Expand All @@ -410,20 +483,20 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
let (window_sender, window_receiver) = channel();
let (retransmit_sender, retransmit_receiver) = channel();

let subs = Arc::new(RwLock::new(rsubs));
let t_retransmit = streamer::retransmitter(
write,
exit.clone(),
subs.clone(),
crdt.clone(),
blob_recycler.clone(),
retransmit_receiver,
);
//TODO

//TODO
//the packets coming out of blob_receiver need to be sent to the GPU and verified
//then sent to the window, which does the erasure coding reconstruction
let t_window = streamer::window(
exit.clone(),
subs,
crdt,
blob_recycler.clone(),
blob_receiver,
window_sender,
Expand All @@ -437,7 +510,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
break;
}
});
Ok(vec![t_blob_receiver, t_retransmit, t_window, t_server])
Ok(vec![t_blob_receiver, t_retransmit, t_window, t_server, t_gossip, t_listen])
}
}

Expand Down Expand Up @@ -486,8 +559,7 @@ mod tests {
use std::thread::sleep;
use std::time::Duration;
use transaction::Transaction;

use subscribers::{Node, Subscribers};
use crdt::Crdt;
use streamer;
use std::sync::mpsc::channel;
use std::collections::VecDeque;
Expand Down Expand Up @@ -623,10 +695,10 @@ mod tests {
let source_peer_sock = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));

let node_me = Node::new([0, 0, 0, 0, 0, 0, 0, 1], 10, me_addr);
let node_subs = vec![Node::new([0, 0, 0, 0, 0, 0, 0, 2], 8, target_peer_addr); 1];
let node_leader = Node::new([0, 0, 0, 0, 0, 0, 0, 3], 20, leader_addr);
let subs = Subscribers::new(node_me, node_leader, &node_subs);
let node_me = ReplicateData::new(KeyPair::new().pubkey(), me_addr);
let node_target = ReplicateData::new(KeyPair::new().pubkey(), target_peer_addr);
let node_leader = ReplicateData::new(KeyPair::new().pubkey(), leader_addr);
let crdt_me = Crdt::new(node_me, node_leader, &node_subs);

// setup some blob services to send blobs into the socket
// to simulate the source peer and get blobs out of the socket to
Expand Down
Loading