Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

141 add futures #148

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,5 @@ matches = "^0.1.6"
byteorder = "^1.2.1"
libc = "^0.2.1"
getopts = "^0.2"
futures = "0.1"
isatty = "0.1"
6 changes: 6 additions & 0 deletions src/accountant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ impl Accountant {
true
}

// fn forget_signature_with_last_id(&self, last_id: &Hash) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need this. Even if not commented out, it's not right. It should remove last_id, not the last item.

// let mut last_ids = self.last_ids.write().unwrap();
// last_ids.pop_back();
// true
// }

fn reserve_signature_with_last_id(&self, sig: &Signature, last_id: &Hash) -> bool {
if let Some(entry) = self.last_ids
.read()
Expand Down
8 changes: 5 additions & 3 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::time::Duration;
use streamer;
use transaction::Transaction;


pub struct AccountantSkel<W: Write + Send + 'static> {
acc: Accountant,
last_id: Hash,
Expand Down Expand Up @@ -326,6 +327,7 @@ mod tests {
use accountant_skel::AccountantSkel;
use accountant_stub::AccountantStub;
use entry::Entry;
use futures::Future;
use historian::Historian;
use mint::Mint;
use plan::Plan;
Expand Down Expand Up @@ -426,20 +428,20 @@ mod tests {
socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap();

let acc = AccountantStub::new(&addr, socket);
let last_id = acc.get_last_id().unwrap();
let last_id = acc.get_last_id().wait().unwrap();

let tr = Transaction::new(&alice.keypair(), bob_pubkey, 500, last_id);

let _sig = acc.transfer_signed(tr).unwrap();

let last_id = acc.get_last_id().unwrap();
let last_id = acc.get_last_id().wait().unwrap();

let mut tr2 = Transaction::new(&alice.keypair(), bob_pubkey, 501, last_id);
tr2.data.tokens = 502;
tr2.data.plan = Plan::new_payment(502, bob_pubkey);
let _sig = acc.transfer_signed(tr2).unwrap();

assert_eq!(acc.get_balance(&bob_pubkey).unwrap().unwrap(), 500);
assert_eq!(acc.get_balance(&bob_pubkey).wait().unwrap(), 500);
exit.store(true, Ordering::Relaxed);
}

Expand Down
29 changes: 17 additions & 12 deletions src/accountant_stub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

use accountant_skel::{Request, Response};
use bincode::{deserialize, serialize};
use futures::future::{err, ok, FutureResult};
use hash::Hash;
use signature::{KeyPair, PublicKey, Signature};
use std::io;
Expand Down Expand Up @@ -51,35 +52,38 @@ impl AccountantStub {
/// Request the balance of the user holding `pubkey`. This method blocks
/// until the server sends a response. If the response packet is dropped
/// by the network, this method will hang indefinitely.
pub fn get_balance(&self, pubkey: &PublicKey) -> io::Result<Option<i64>> {
pub fn get_balance(&self, pubkey: &PublicKey) -> FutureResult<i64, i64> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about something likeFutureResult<Option<i64>, io::Error> ? Or perhaps the library offers a futures::io::FutureResult<Option<i64>>?

let req = Request::GetBalance { key: *pubkey };
let data = serialize(&req).expect("serialize GetBalance");
self.socket.send_to(&data, &self.addr)?;
self.socket.send_to(&data, &self.addr).expect("buffer error");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't panic on this one. It's too likely to happen.

let mut buf = vec![0u8; 1024];
self.socket.recv_from(&mut buf)?;
self.socket.recv_from(&mut buf).expect("buffer error");
let resp = deserialize(&buf).expect("deserialize balance");
if let Response::Balance { key, val } = resp {
assert_eq!(key, *pubkey);
return Ok(val);
return match val {
Some(x) => ok(x),
_ => err(0),
};
}
Ok(None)
err(0)
}

/// Request the last Entry ID from the server. This method blocks
/// until the server sends a response. At the time of this writing,
/// it also has the side-effect of causing the server to log any
/// entries that have been published by the Historian.
pub fn get_last_id(&self) -> io::Result<Hash> {
pub fn get_last_id(&self) -> FutureResult<Hash, ()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, io::Errror instead of ()?

let req = Request::GetLastId;
let data = serialize(&req).expect("serialize GetId");
self.socket.send_to(&data, &self.addr)?;
self.socket.send_to(&data, &self.addr).expect("buffer error");
let mut buf = vec![0u8; 1024];
self.socket.recv_from(&mut buf)?;
self.socket.recv_from(&mut buf).expect("buffer error");
let resp = deserialize(&buf).expect("deserialize Id");
if let Response::LastId { id } = resp {
return Ok(id);
return ok(id);
}
Ok(Default::default())
ok(Default::default())
}
}

Expand All @@ -88,6 +92,7 @@ mod tests {
use super::*;
use accountant::Accountant;
use accountant_skel::AccountantSkel;
use futures::Future;
use historian::Historian;
use mint::Mint;
use signature::{KeyPair, KeyPairUtil};
Expand Down Expand Up @@ -120,10 +125,10 @@ mod tests {
socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap();

let acc = AccountantStub::new(addr, socket);
let last_id = acc.get_last_id().unwrap();
let last_id = acc.get_last_id().wait().unwrap();
let _sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
.unwrap();
assert_eq!(acc.get_balance(&bob_pubkey).unwrap().unwrap(), 500);
assert_eq!(acc.get_balance(&bob_pubkey).wait().unwrap(), 500);
exit.store(true, Ordering::Relaxed);
}
}
8 changes: 5 additions & 3 deletions src/bin/client-demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ extern crate isatty;
extern crate rayon;
extern crate serde_json;
extern crate solana;
extern crate futures;

use getopts::Options;
use isatty::stdin_isatty;
Expand All @@ -17,6 +18,7 @@ use std::net::UdpSocket;
use std::process::exit;
use std::thread::sleep;
use std::time::{Duration, Instant};
use futures::Future;

fn print_usage(program: &str, opts: Options) {
let mut brief = format!("Usage: cat <mint.json> | {} [options]\n\n", program);
Expand Down Expand Up @@ -84,10 +86,10 @@ fn main() {
println!("Stub new");
let acc = AccountantStub::new(&addr, socket);
println!("Get last id");
let last_id = acc.get_last_id().unwrap();
let last_id = acc.get_last_id().wait().unwrap();

println!("Get Balance");
let mint_balance = acc.get_balance(&mint_pubkey).unwrap().unwrap();
let mint_balance = acc.get_balance(&mint_pubkey).wait().unwrap();
println!("Mint's Initial Balance {}", mint_balance);

println!("Signing transactions...");
Expand Down Expand Up @@ -133,7 +135,7 @@ fn main() {
while val != prev {
sleep(Duration::from_millis(20));
prev = val;
val = acc.get_balance(&mint_pubkey).unwrap().unwrap();
val = acc.get_balance(&mint_pubkey).wait().unwrap();
}
println!("Mint's Final Balance {}", val);
let txs = mint_balance - val;
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ extern crate serde_json;
extern crate sha2;
extern crate untrusted;

extern crate futures;

#[cfg(test)]
#[macro_use]
extern crate matches;