Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add broadcast impl #169

Merged
merged 1 commit into from
May 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
475 changes: 338 additions & 137 deletions src/accountant_skel.rs

Large diffs are not rendered by default.

39 changes: 23 additions & 16 deletions src/accountant_stub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ use hash::Hash;
use signature::{KeyPair, PublicKey, Signature};
use std::collections::HashMap;
use std::io;
use std::net::UdpSocket;
use std::net::{SocketAddr, UdpSocket};
use transaction::Transaction;

pub struct AccountantStub {
pub addr: String,
pub addr: SocketAddr,
pub socket: UdpSocket,
last_id: Option<Hash>,
num_events: u64,
Expand All @@ -25,9 +25,9 @@ impl AccountantStub {
/// Create a new AccountantStub that will interface with AccountantSkel
/// over `socket`. To receive responses, the caller must bind `socket`
/// to a public address before invoking AccountantStub methods.
pub fn new(addr: &str, socket: UdpSocket) -> Self {
pub fn new(addr: SocketAddr, socket: UdpSocket) -> Self {
let stub = AccountantStub {
addr: addr.to_string(),
addr: addr,
socket,
last_id: None,
num_events: 0,
Expand Down Expand Up @@ -160,39 +160,43 @@ mod tests {
use super::*;
use accountant::Accountant;
use accountant_skel::AccountantSkel;
use crdt::ReplicatedData;
use futures::Future;
use historian::Historian;
use mint::Mint;
use signature::{KeyPair, KeyPairUtil};
use std::io::sink;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::sync_channel;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;

// TODO: Figure out why this test sometimes hangs on TravisCI.
#[test]
fn test_accountant_stub() {
let addr = "127.0.0.1:9000";
let send_addr = "127.0.0.1:9001";
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
let addr = serve.local_addr().unwrap();
let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
"0.0.0.0:0".parse().unwrap(),
serve.local_addr().unwrap(),
);

let alice = Mint::new(10_000);
let acc = Accountant::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
let acc = Arc::new(Mutex::new(AccountantSkel::new(
acc,
alice.last_id(),
sink(),
input,
historian,
)));
let _threads = AccountantSkel::serve(&acc, addr, exit.clone()).unwrap();
let acc = Arc::new(AccountantSkel::new(acc, alice.last_id(), input, historian));
let threads = AccountantSkel::serve(&acc, d, serve, gossip, exit.clone(), sink()).unwrap();
sleep(Duration::from_millis(300));

let socket = UdpSocket::bind(send_addr).unwrap();
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap();

let mut acc = AccountantStub::new(addr, socket);
Expand All @@ -201,5 +205,8 @@ mod tests {
.unwrap();
assert_eq!(acc.get_balance(&bob_pubkey).wait().unwrap(), 500);
exit.store(true, Ordering::Relaxed);
for t in threads {
t.join().unwrap();
}
}
}
4 changes: 2 additions & 2 deletions src/bin/client-demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ fn main() {

println!("Binding to {}", client_addr);
let socket = UdpSocket::bind(&client_addr).unwrap();
let mut acc = AccountantStub::new(&addr, socket);
let mut acc = AccountantStub::new(addr.parse().unwrap(), socket);

println!("Get last ID...");
let last_id = acc.get_last_id().wait().unwrap();
Expand Down Expand Up @@ -125,7 +125,7 @@ fn main() {
let mut client_addr: SocketAddr = client_addr.parse().unwrap();
client_addr.set_port(0);
let socket = UdpSocket::bind(client_addr).unwrap();
let acc = AccountantStub::new(&addr, socket);
let acc = AccountantStub::new(addr.parse().unwrap(), socket);
for tr in trs {
acc.transfer_signed(tr.clone()).unwrap();
}
Expand Down
2 changes: 1 addition & 1 deletion src/bin/historian-demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ fn main() {
let hist = Historian::new(event_receiver, &seed, Some(10));
create_ledger(&input, &seed).expect("send error");
drop(input);
let entries: Vec<Entry> = hist.output.iter().collect();
let entries: Vec<Entry> = hist.output.lock().unwrap().iter().collect();
for entry in &entries {
println!("{:?}", entry);
}
Expand Down
32 changes: 21 additions & 11 deletions src/bin/testnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@ use getopts::Options;
use isatty::stdin_isatty;
use solana::accountant::Accountant;
use solana::accountant_skel::AccountantSkel;
use solana::crdt::ReplicatedData;
use solana::entry::Entry;
use solana::event::Event;
use solana::historian::Historian;
use solana::signature::{KeyPair, KeyPairUtil};
use std::env;
use std::io::{stdin, stdout, Read};
use std::net::UdpSocket;
use std::process::exit;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::sync_channel;
use std::sync::{Arc, Mutex};
use std::sync::Arc;

fn print_usage(program: &str, opts: Options) {
let mut brief = format!("Usage: cat <transaction.log> | {} [options]\n\n", program);
Expand Down Expand Up @@ -49,7 +52,9 @@ fn main() {
if matches.opt_present("p") {
port = matches.opt_str("p").unwrap().parse().expect("port");
}
let addr = format!("0.0.0.0:{}", port);
let serve_addr = format!("0.0.0.0:{}", port);
let gossip_addr = format!("0.0.0.0:{}", port + 1);
let replicate_addr = format!("0.0.0.0:{}", port + 2);

if stdin_isatty() {
eprintln!("nothing found on stdin, expected a log file");
Expand Down Expand Up @@ -99,15 +104,20 @@ fn main() {
let (input, event_receiver) = sync_channel(10_000);
let historian = Historian::new(event_receiver, &last_id, Some(1000));
let exit = Arc::new(AtomicBool::new(false));
let skel = Arc::new(Mutex::new(AccountantSkel::new(
acc,
last_id,
stdout(),
input,
historian,
)));
let threads = AccountantSkel::serve(&skel, &addr, exit.clone()).unwrap();
eprintln!("Ready. Listening on {}", addr);
let skel = Arc::new(AccountantSkel::new(acc, last_id, input, historian));
let serve_sock = UdpSocket::bind(&serve_addr).unwrap();
let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap();
let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap();
let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new(
pubkey,
gossip_sock.local_addr().unwrap(),
replicate_sock.local_addr().unwrap(),
serve_sock.local_addr().unwrap(),
);
let threads =
AccountantSkel::serve(&skel, d, serve_sock, gossip_sock, exit.clone(), stdout()).unwrap();
eprintln!("Ready. Listening on {}", serve_addr);
for t in threads {
t.join().expect("join");
}
Expand Down
Loading