Skip to content

Commit

Permalink
plug in LedgerWindow
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-solana committed Aug 6, 2018
1 parent 91741e2 commit 64ae794
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 45 deletions.
1 change: 1 addition & 0 deletions src/bin/bench-tps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ fn converge(
let ncp = Ncp::new(
&spy_ref,
window.clone(),
None,
spy_gossip,
gossip_send_socket,
exit_signal.clone(),
Expand Down
120 changes: 91 additions & 29 deletions src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use byteorder::{LittleEndian, ReadBytesExt};
use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseWeightedPeerStrategy};
use counter::Counter;
use hash::Hash;
use ledger::LedgerWindow;
use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE};
use pnet_datalink as datalink;
use rand::{thread_rng, RngCore};
Expand Down Expand Up @@ -954,6 +955,7 @@ impl Crdt {
}
fn run_window_request(
window: &SharedWindow,
ledger_window: &mut Option<&mut LedgerWindow>,
me: &NodeInfo,
from: &NodeInfo,
ix: u64,
Expand Down Expand Up @@ -1000,31 +1002,44 @@ impl Crdt {
"requested ix {} != blob_ix {}, outside window!",
ix, blob_ix
);
// falls through to checking window_ledger
}
} else {
inc_new_counter!("crdt-window-request-fail", 1);
assert!(window.read().unwrap()[pos].data.is_none());
info!(
"{:x}: failed RequestWindowIndex {:x} {} {}",
me.debug_id(),
from.debug_id(),
ix,
pos,
);
}

if let Some(ledger_window) = ledger_window {
if let Ok(entry) = ledger_window.get_entry(ix) {
inc_new_counter!("crdt-window-request-ledger", 1);

let out = entry.to_blob(blob_recycler, Some(ix), Some(from.id));

return Some(out);
}
}

inc_new_counter!("crdt-window-request-fail", 1);
info!(
"{:x}: failed RequestWindowIndex {:x} {} {}",
me.debug_id(),
from.debug_id(),
ix,
pos,
);

None
}

//TODO we should first coalesce all the requests
fn handle_blob(
obj: &Arc<RwLock<Self>>,
window: &SharedWindow,
ledger_window: &mut Option<&mut LedgerWindow>,
blob_recycler: &BlobRecycler,
blob: &Blob,
) -> Option<SharedBlob> {
match deserialize(&blob.data[..blob.meta.size]) {
Ok(request) => Crdt::handle_protocol(request, obj, window, blob_recycler),
Ok(request) => {
Crdt::handle_protocol(request, obj, window, ledger_window, blob_recycler)
}
Err(_) => {
warn!("deserialize crdt packet failed");
None
Expand All @@ -1036,6 +1051,7 @@ impl Crdt {
request: Protocol,
obj: &Arc<RwLock<Self>>,
window: &SharedWindow,
ledger_window: &mut Option<&mut LedgerWindow>,
blob_recycler: &BlobRecycler,
) -> Option<SharedBlob> {
match request {
Expand Down Expand Up @@ -1123,7 +1139,7 @@ impl Crdt {
inc_new_counter!("crdt-window-request-address-eq", 1);
return None;
}
Self::run_window_request(&window, &me, &from, ix, blob_recycler)
Self::run_window_request(&window, ledger_window, &me, &from, ix, blob_recycler)
}
}
}
Expand All @@ -1132,6 +1148,7 @@ impl Crdt {
fn run_listen(
obj: &Arc<RwLock<Self>>,
window: &SharedWindow,
ledger_window: &mut Option<&mut LedgerWindow>,
blob_recycler: &BlobRecycler,
requests_receiver: &BlobReceiver,
response_sender: &BlobSender,
Expand All @@ -1142,31 +1159,42 @@ impl Crdt {
while let Ok(mut more) = requests_receiver.try_recv() {
reqs.append(&mut more);
}
let resp: VecDeque<_> = reqs
.iter()
.filter_map(|b| Self::handle_blob(obj, window, blob_recycler, &b.read().unwrap()))
.collect();
response_sender.send(resp)?;
while let Some(r) = reqs.pop_front() {
blob_recycler.recycle(r);
let mut resps = VecDeque::new();
while let Some(req) = reqs.pop_front() {
if let Some(resp) = Self::handle_blob(
obj,
window,
ledger_window,
blob_recycler,
&req.read().unwrap(),
) {
resps.push_back(resp);
}
blob_recycler.recycle(req);
}
response_sender.send(resps)?;
Ok(())
}
pub fn listen(
obj: Arc<RwLock<Self>>,
window: SharedWindow,
ledger_path: Option<&str>,
blob_recycler: BlobRecycler,
requests_receiver: BlobReceiver,
response_sender: BlobSender,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
let debug_id = obj.read().unwrap().debug_id();

let mut ledger_window = ledger_path.map(|p| LedgerWindow::new(p).unwrap());

Builder::new()
.name("solana-listen".to_string())
.spawn(move || loop {
let e = Self::run_listen(
&obj,
&window,
&mut ledger_window.as_mut(),
&blob_recycler,
&requests_receiver,
&response_sender,
Expand Down Expand Up @@ -1299,11 +1327,14 @@ mod tests {
parse_port_or_addr, Crdt, CrdtError, NodeInfo, Protocol, GOSSIP_PURGE_MILLIS,
GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE,
};
use hash::Hash;
use entry::Entry;
use hash::{hash, Hash};
use ledger::{LedgerWindow, LedgerWriter};
use logger;
use packet::BlobRecycler;
use result::Error;
use signature::{KeyPair, KeyPairUtil, PublicKey};
use std::fs::remove_dir_all;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
Expand Down Expand Up @@ -1812,6 +1843,7 @@ mod tests {
/// test window requests respond with the right blob, and do not overrun
#[test]
fn run_window_request() {
logger::setup();
let window = default_window();
let me = NodeInfo::new(
KeyPair::new().pubkey(),
Expand All @@ -1822,19 +1854,48 @@ mod tests {
"127.0.0.1:1238".parse().unwrap(),
);
let recycler = BlobRecycler::default();
let rv = Crdt::run_window_request(&window, &me, &me, 0, &recycler);
let rv = Crdt::run_window_request(&window, &mut None, &me, &me, 0, &recycler);
assert!(rv.is_none());
let out = recycler.allocate();
out.write().unwrap().meta.size = 200;
window.write().unwrap()[0].data = Some(out);
let rv = Crdt::run_window_request(&window, &me, &me, 0, &recycler);
let rv = Crdt::run_window_request(&window, &mut None, &me, &me, 0, &recycler);
assert!(rv.is_some());
let v = rv.unwrap();
//test we copied the blob
assert_eq!(v.read().unwrap().meta.size, 200);
let len = window.read().unwrap().len() as u64;
let rv = Crdt::run_window_request(&window, &me, &me, len, &recycler);
let rv = Crdt::run_window_request(&window, &mut None, &me, &me, len, &recycler);
assert!(rv.is_none());

fn tmp_ledger(name: &str) -> String {
let keypair = KeyPair::new();

let path = format!("/tmp/farf/{}-{}", name, keypair.pubkey());

let mut writer = LedgerWriter::new(&path, true).unwrap();
let zero = Hash::default();
let one = hash(&zero.as_ref());
writer
.write_entries(vec![Entry::new_tick(0, &zero), Entry::new_tick(0, &one)].to_vec())
.unwrap();
path
}

let ledger_path = tmp_ledger("run_window_request");
let mut ledger_window = LedgerWindow::new(&ledger_path).unwrap();

let rv = Crdt::run_window_request(
&window,
&mut Some(&mut ledger_window),
&me,
&me,
1,
&recycler,
);
assert!(rv.is_some());

remove_dir_all(ledger_path).unwrap();
}

/// test window requests respond with the right blob, and do not overrun
Expand All @@ -1850,7 +1911,7 @@ mod tests {
let recycler = BlobRecycler::default();

// Simulate handling a repair request from mock_peer
let rv = Crdt::run_window_request(&window, &me, &mock_peer, 0, &recycler);
let rv = Crdt::run_window_request(&window, &mut None, &me, &mock_peer, 0, &recycler);
assert!(rv.is_none());
let blob = recycler.allocate();
let blob_size = 200;
Expand All @@ -1859,8 +1920,9 @@ mod tests {

let num_requests: u32 = 64;
for i in 0..num_requests {
let shared_blob =
Crdt::run_window_request(&window, &me, &mock_peer, 0, &recycler).unwrap();
let shared_blob = Crdt::run_window_request(
&window, &mut None, &me, &mock_peer, 0, &recycler,
).unwrap();
let blob = shared_blob.read().unwrap();
// Test we copied the blob
assert_eq!(blob.meta.size, blob_size);
Expand Down Expand Up @@ -1920,13 +1982,13 @@ mod tests {
let obj = Arc::new(RwLock::new(crdt));

let request = Protocol::RequestUpdates(1, node.clone());
assert!(Crdt::handle_protocol(request, &obj, &window, &recycler).is_none());
assert!(Crdt::handle_protocol(request, &obj, &window, &mut None, &recycler).is_none());

let request = Protocol::RequestUpdates(1, node_with_same_addr.clone());
assert!(Crdt::handle_protocol(request, &obj, &window, &recycler).is_none());
assert!(Crdt::handle_protocol(request, &obj, &window, &mut None, &recycler).is_none());

let request = Protocol::RequestUpdates(1, node_with_diff_addr.clone());
Crdt::handle_protocol(request, &obj, &window, &recycler);
Crdt::handle_protocol(request, &obj, &window, &mut None, &recycler);

let me = obj.write().unwrap();

Expand Down
32 changes: 30 additions & 2 deletions src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
//! unique ID that is the hash of the Entry before it, plus the hash of the
//! transactions within it. Entries cannot be reordered, and its field `num_hashes`
//! represents an approximate amount of time since the last Entry was created.
use bincode::serialized_size;
use bincode::{serialize_into, serialized_size};
use hash::{extend_and_hash, hash, Hash};
use packet::BLOB_DATA_SIZE;
use packet::{BlobRecycler, SharedBlob, BLOB_DATA_SIZE};
use rayon::prelude::*;
use signature::PublicKey;
use std::io::Cursor;
use transaction::Transaction;

/// Each Entry contains three pieces of data. The `num_hashes` field is the number
Expand Down Expand Up @@ -76,6 +78,32 @@ impl Entry {
entry
}

pub fn to_blob(
&self,
blob_recycler: &BlobRecycler,
idx: Option<u64>,
id: Option<PublicKey>,
) -> SharedBlob {
let blob = blob_recycler.allocate();
{
let mut blob_w = blob.write().unwrap();
let pos = {
let mut out = Cursor::new(blob_w.data_mut());
serialize_into(&mut out, &self).expect("failed to serialize output");
out.position() as usize
};
blob_w.set_size(pos);

if let Some(idx) = idx {
blob_w.set_index(idx).expect("set_index()");
}
if let Some(id) = id {
blob_w.set_id(id).expect("set_id()");
}
}
blob
}

pub fn will_fit(transactions: Vec<Transaction>) -> bool {
serialized_size(&Entry {
num_hashes: 0,
Expand Down
4 changes: 2 additions & 2 deletions src/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,6 @@ impl FullNode {

let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new")));

// let mut ledger_writer = LedgerWriter::new(ledger_path);

let (tpu, blob_receiver) = Tpu::new(
keypair,
&bank,
Expand All @@ -244,6 +242,7 @@ impl FullNode {
let ncp = Ncp::new(
&crdt,
window.clone(),
Some(ledger_path),
node.sockets.gossip,
node.sockets.gossip_send,
exit.clone(),
Expand Down Expand Up @@ -324,6 +323,7 @@ impl FullNode {
let ncp = Ncp::new(
&crdt,
window.clone(),
ledger_path,
node.sockets.gossip,
node.sockets.gossip_send,
exit.clone(),
Expand Down
14 changes: 3 additions & 11 deletions src/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use bincode::{self, deserialize, deserialize_from, serialize_into, serialized_si
use entry::Entry;
use hash::Hash;
//use log::Level::Trace;
use packet::{self, SharedBlob, BLOB_DATA_SIZE};
use packet::{self, SharedBlob};
use rayon::prelude::*;
use result::{Error, Result};
use std::collections::VecDeque;
use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions};
use std::io::prelude::*;
use std::io::{self, Cursor, Seek, SeekFrom};
use std::io::{self, Seek, SeekFrom};
use std::mem::size_of;
use std::path::Path;
use transaction::Transaction;
Expand Down Expand Up @@ -396,15 +396,7 @@ impl Block for [Entry] {

fn to_blobs(&self, blob_recycler: &packet::BlobRecycler, q: &mut VecDeque<SharedBlob>) {
for entry in self {
let blob = blob_recycler.allocate();
let pos = {
let mut bd = blob.write().unwrap();
let mut out = Cursor::new(bd.data_mut());
serialize_into(&mut out, &entry).expect("failed to serialize output");
out.position() as usize
};
assert!(pos <= BLOB_DATA_SIZE, "pos: {}", pos);
blob.write().unwrap().set_size(pos);
let blob = entry.to_blob(blob_recycler, None, None);
q.push_back(blob);
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/ncp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ impl Ncp {
pub fn new(
crdt: &Arc<RwLock<Crdt>>,
window: streamer::SharedWindow,
ledger_path: Option<&str>,
gossip_listen_socket: UdpSocket,
gossip_send_socket: UdpSocket,
exit: Arc<AtomicBool>,
Expand Down Expand Up @@ -47,6 +48,7 @@ impl Ncp {
let t_listen = Crdt::listen(
crdt.clone(),
window,
ledger_path,
blob_recycler.clone(),
request_receiver,
response_sender.clone(),
Expand Down Expand Up @@ -95,6 +97,7 @@ mod tests {
let d = Ncp::new(
&c,
w,
None,
tn.sockets.gossip,
tn.sockets.gossip_send,
exit.clone(),
Expand Down
Loading

0 comments on commit 64ae794

Please sign in to comment.