Skip to content

Commit

Permalink
Add broadcast impl
Browse files Browse the repository at this point in the history
  • Loading branch information
aeyakovenko authored and sakridge committed May 3, 2018
1 parent a158d74 commit 5e0d22c
Show file tree
Hide file tree
Showing 13 changed files with 856 additions and 421 deletions.
486 changes: 348 additions & 138 deletions src/accountant_skel.rs

Large diffs are not rendered by default.

39 changes: 23 additions & 16 deletions src/accountant_stub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ use hash::Hash;
use signature::{KeyPair, PublicKey, Signature};
use std::collections::HashMap;
use std::io;
use std::net::UdpSocket;
use std::net::{SocketAddr, UdpSocket};
use transaction::Transaction;

pub struct AccountantStub {
pub addr: String,
pub addr: SocketAddr,
pub socket: UdpSocket,
last_id: Option<Hash>,
num_events: u64,
Expand All @@ -25,9 +25,9 @@ impl AccountantStub {
/// Create a new AccountantStub that will interface with AccountantSkel
/// over `socket`. To receive responses, the caller must bind `socket`
/// to a public address before invoking AccountantStub methods.
pub fn new(addr: &str, socket: UdpSocket) -> Self {
pub fn new(addr: SocketAddr, socket: UdpSocket) -> Self {
let stub = AccountantStub {
addr: addr.to_string(),
addr: addr,
socket,
last_id: None,
num_events: 0,
Expand Down Expand Up @@ -159,39 +159,43 @@ mod tests {
use super::*;
use accountant::Accountant;
use accountant_skel::AccountantSkel;
use crdt::ReplicatedData;
use futures::Future;
use historian::Historian;
use mint::Mint;
use signature::{KeyPair, KeyPairUtil};
use std::io::sink;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::sync_channel;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;

// TODO: Figure out why this test sometimes hangs on TravisCI.
#[test]
fn test_accountant_stub() {
let addr = "127.0.0.1:9000";
let send_addr = "127.0.0.1:9001";
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
let addr = serve.local_addr().unwrap();
let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
"0.0.0.0:0".parse().unwrap(),
serve.local_addr().unwrap(),
);

let alice = Mint::new(10_000);
let acc = Accountant::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
let acc = Arc::new(Mutex::new(AccountantSkel::new(
acc,
alice.last_id(),
sink(),
input,
historian,
)));
let _threads = AccountantSkel::serve(&acc, addr, exit.clone()).unwrap();
let acc = Arc::new(AccountantSkel::new(acc, alice.last_id(), input, historian));
let threads = AccountantSkel::serve(&acc, d, serve, gossip, exit.clone(), sink()).unwrap();
sleep(Duration::from_millis(300));

let socket = UdpSocket::bind(send_addr).unwrap();
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap();

let mut acc = AccountantStub::new(addr, socket);
Expand All @@ -200,5 +204,8 @@ mod tests {
.unwrap();
assert_eq!(acc.get_balance(&bob_pubkey).wait().unwrap(), 500);
exit.store(true, Ordering::Relaxed);
for t in threads {
t.join().unwrap();
}
}
}
4 changes: 2 additions & 2 deletions src/bin/client-demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ fn main() {
});

let socket = UdpSocket::bind(&send_addr).unwrap();
let mut acc = AccountantStub::new(&addr, socket);
let mut acc = AccountantStub::new(addr.parse().unwrap(), socket);

println!("Get last ID...");
let last_id = acc.get_last_id().wait().unwrap();
Expand Down Expand Up @@ -122,7 +122,7 @@ fn main() {
println!("Transferring 1 unit {} times...", trs.len());
let send_addr = "0.0.0.0:0";
let socket = UdpSocket::bind(send_addr).unwrap();
let acc = AccountantStub::new(&addr, socket);
let acc = AccountantStub::new(addr.parse().unwrap(), socket);
for tr in trs {
acc.transfer_signed(tr.clone()).unwrap();
}
Expand Down
2 changes: 1 addition & 1 deletion src/bin/historian-demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ fn main() {
let hist = Historian::new(event_receiver, &seed, Some(10));
create_ledger(&input, &seed).expect("send error");
drop(input);
let entries: Vec<Entry> = hist.output.iter().collect();
let entries: Vec<Entry> = hist.output.lock().unwrap().iter().collect();
for entry in &entries {
println!("{:?}", entry);
}
Expand Down
32 changes: 21 additions & 11 deletions src/bin/testnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@ use getopts::Options;
use isatty::stdin_isatty;
use solana::accountant::Accountant;
use solana::accountant_skel::AccountantSkel;
use solana::crdt::ReplicatedData;
use solana::entry::Entry;
use solana::event::Event;
use solana::historian::Historian;
use solana::signature::{KeyPair, KeyPairUtil};
use std::env;
use std::io::{stdin, stdout, Read};
use std::net::UdpSocket;
use std::process::exit;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::sync_channel;
use std::sync::{Arc, Mutex};
use std::sync::Arc;

fn print_usage(program: &str, opts: Options) {
let mut brief = format!("Usage: cat <transaction.log> | {} [options]\n\n", program);
Expand Down Expand Up @@ -49,7 +52,9 @@ fn main() {
if matches.opt_present("p") {
port = matches.opt_str("p").unwrap().parse().expect("port");
}
let addr = format!("0.0.0.0:{}", port);
let serve_addr = format!("0.0.0.0:{}", port);
let gossip_addr = format!("0.0.0.0:{}", port + 1);
let replicate_addr = format!("0.0.0.0:{}", port + 2);

if stdin_isatty() {
eprintln!("nothing found on stdin, expected a log file");
Expand Down Expand Up @@ -99,15 +104,20 @@ fn main() {
let (input, event_receiver) = sync_channel(10_000);
let historian = Historian::new(event_receiver, &last_id, Some(1000));
let exit = Arc::new(AtomicBool::new(false));
let skel = Arc::new(Mutex::new(AccountantSkel::new(
acc,
last_id,
stdout(),
input,
historian,
)));
let threads = AccountantSkel::serve(&skel, &addr, exit.clone()).unwrap();
eprintln!("Ready. Listening on {}", addr);
let skel = Arc::new(AccountantSkel::new(acc, last_id, input, historian));
let serve_sock = UdpSocket::bind(&serve_addr).unwrap();
let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap();
let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap();
let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new(
pubkey,
gossip_sock.local_addr().unwrap(),
replicate_sock.local_addr().unwrap(),
serve_sock.local_addr().unwrap(),
);
let threads =
AccountantSkel::serve(&skel, d, serve_sock, gossip_sock, exit.clone(), stdout()).unwrap();
eprintln!("Ready. Listening on {}", serve_addr);
for t in threads {
t.join().expect("join");
}
Expand Down
Loading

0 comments on commit 5e0d22c

Please sign in to comment.