Skip to content

Commit

Permalink
use a single BlobRecycler per fullnode
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-solana committed Sep 11, 2018
1 parent 297f859 commit a8fdb8a
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 35 deletions.
28 changes: 12 additions & 16 deletions src/bin/bench-tps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use solana::hash::Hash;
use solana::logger;
use solana::metrics;
use solana::ncp::Ncp;
use solana::packet::BlobRecycler;
use solana::service::Service;
use solana::signature::{read_keypair, GenKeys, Keypair, KeypairUtil};
use solana::thin_client::{poll_gossip_for_leader, ThinClient};
Expand Down Expand Up @@ -611,12 +612,10 @@ fn main() {
.collect();

// generate and send transactions for the specified duration
let now = Instant::now();
let mut last_stat = Instant::now();
let stat_interval = Duration::new(90, 0);
let start = Instant::now();
let mut reclaim_tokens_back_to_source_account = false;
let mut i = keypair0_balance;
while now.elapsed() < duration {
while start.elapsed() < duration {
let balance = client.poll_get_balance(&id.pubkey()).unwrap_or(-1);
metrics_submit_token_balance(balance);

Expand Down Expand Up @@ -648,16 +647,6 @@ fn main() {
if should_switch_directions(num_tokens_per_account, i) {
reclaim_tokens_back_to_source_account = !reclaim_tokens_back_to_source_account;
}

if last_stat.elapsed() >= stat_interval {
last_stat = Instant::now();
compute_and_report_stats(
&maxes,
sample_period,
&now.elapsed(),
total_tx_sent_count.load(Ordering::Relaxed),
);
}
}

// Stop the sampling threads so it will collect the stats
Expand All @@ -684,7 +673,7 @@ fn main() {
compute_and_report_stats(
&maxes,
sample_period,
&now.elapsed(),
&start.elapsed(),
total_tx_sent_count.load(Ordering::Relaxed),
);

Expand All @@ -707,7 +696,14 @@ fn converge(
spy_crdt.set_leader(leader.id);
let spy_ref = Arc::new(RwLock::new(spy_crdt));
let window = Arc::new(RwLock::new(default_window()));
let ncp = Ncp::new(&spy_ref, window, None, gossip_socket, exit_signal.clone());
let ncp = Ncp::new(
&spy_ref,
window,
BlobRecycler::default(),
None,
gossip_socket,
exit_signal.clone(),
);
let mut v: Vec<NodeInfo> = vec![];
// wait for the network to converge, 30 seconds should be plenty
for _ in 0..30 {
Expand Down
7 changes: 5 additions & 2 deletions src/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,14 @@ impl Fullnode {
}

let bank = Arc::new(bank);
let blob_recycler = BlobRecycler::default();
let mut thread_hdls = vec![];

let rpu = Rpu::new(
&bank,
node.sockets.requests,
node.sockets.respond,
&blob_recycler,
exit.clone(),
);
thread_hdls.extend(rpu.thread_hdls());
Expand All @@ -199,7 +201,6 @@ impl Fullnode {
);
thread_hdls.extend(rpc_service.thread_hdls());

let blob_recycler = BlobRecycler::default();
let window =
window::new_window_from_entries(ledger_tail, entry_height, &node.info, &blob_recycler);
let shared_window = Arc::new(RwLock::new(window));
Expand All @@ -209,6 +210,7 @@ impl Fullnode {
let ncp = Ncp::new(
&crdt,
shared_window.clone(),
blob_recycler.clone(),
ledger_path,
node.sockets.gossip,
exit.clone(),
Expand All @@ -226,6 +228,7 @@ impl Fullnode {
entry_height,
crdt,
shared_window,
blob_recycler.clone(),
node.sockets.replicate,
node.sockets.repair,
node.sockets.retransmit,
Expand All @@ -247,7 +250,7 @@ impl Fullnode {
&crdt,
tick_duration,
node.sockets.transaction,
&blob_recycler,
blob_recycler.clone(),
exit.clone(),
ledger_path,
sigverify_disabled,
Expand Down
12 changes: 10 additions & 2 deletions src/ncp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ impl Ncp {
pub fn new(
crdt: &Arc<RwLock<Crdt>>,
window: SharedWindow,
blob_recycler: BlobRecycler,
ledger_path: Option<&str>,
gossip_socket: UdpSocket,
exit: Arc<AtomicBool>,
) -> Self {
let blob_recycler = BlobRecycler::default();
let (request_sender, request_receiver) = channel();
let gossip_socket = Arc::new(gossip_socket);
trace!(
Expand Down Expand Up @@ -82,6 +82,7 @@ impl Service for Ncp {
mod tests {
use crdt::{Crdt, Node};
use ncp::Ncp;
use packet::BlobRecycler;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock};

Expand All @@ -94,7 +95,14 @@ mod tests {
let crdt = Crdt::new(tn.info.clone()).expect("Crdt::new");
let c = Arc::new(RwLock::new(crdt));
let w = Arc::new(RwLock::new(vec![]));
let d = Ncp::new(&c, w, None, tn.sockets.gossip, exit.clone());
let d = Ncp::new(
&c,
w,
BlobRecycler::default(),
None,
tn.sockets.gossip,
exit.clone(),
);
d.close().expect("thread join");
}
}
2 changes: 1 addition & 1 deletion src/rpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ impl Rpu {
bank: &Arc<Bank>,
requests_socket: UdpSocket,
respond_socket: UdpSocket,
blob_recycler: &BlobRecycler,
exit: Arc<AtomicBool>,
) -> Self {
let packet_recycler = PacketRecycler::default();
Expand All @@ -55,7 +56,6 @@ impl Rpu {
packet_sender,
);

let blob_recycler = BlobRecycler::default();
let request_processor = RequestProcessor::new(bank.clone());
let (request_stage, blob_receiver) = RequestStage::new(
request_processor,
Expand Down
10 changes: 9 additions & 1 deletion src/thin_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crdt::{Crdt, CrdtError, NodeInfo};
use hash::Hash;
use log::Level;
use ncp::Ncp;
use packet::BlobRecycler;
use request::{Request, Response};
use result::{Error, Result};
use signature::{Keypair, Pubkey, Signature};
Expand Down Expand Up @@ -377,7 +378,14 @@ pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option<u64>) -> R
let my_addr = gossip_socket.local_addr().unwrap();
let crdt = Arc::new(RwLock::new(Crdt::new(node).expect("Crdt::new")));
let window = Arc::new(RwLock::new(vec![]));
let ncp = Ncp::new(&crdt.clone(), window, None, gossip_socket, exit.clone());
let ncp = Ncp::new(
&crdt.clone(),
window,
BlobRecycler::default(),
None,
gossip_socket,
exit.clone(),
);

let leader_entry_point = NodeInfo::new_entry_point(&leader_ncp);
crdt.write().unwrap().insert(&leader_entry_point);
Expand Down
2 changes: 1 addition & 1 deletion src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl Tpu {
crdt: &Arc<RwLock<Crdt>>,
tick_duration: Option<Duration>,
transactions_sockets: Vec<UdpSocket>,
blob_recycler: &BlobRecycler,
blob_recycler: BlobRecycler,
exit: Arc<AtomicBool>,
ledger_path: &str,
sigverify_disabled: bool,
Expand Down
19 changes: 10 additions & 9 deletions src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@ impl Tvu {
entry_height: u64,
crdt: Arc<RwLock<Crdt>>,
window: SharedWindow,
blob_recycler: BlobRecycler,
replicate_socket: UdpSocket,
repair_socket: UdpSocket,
retransmit_socket: UdpSocket,
ledger_path: Option<&str>,
exit: Arc<AtomicBool>,
) -> Self {
let repair_socket = Arc::new(repair_socket);
let blob_recycler = BlobRecycler::default();
let (fetch_stage, blob_fetch_receiver) = BlobFetchStage::new_multi_socket(
vec![Arc::new(replicate_socket), repair_socket.clone()],
exit.clone(),
Expand Down Expand Up @@ -168,10 +168,11 @@ pub mod tests {
crdt: Arc<RwLock<Crdt>>,
gossip: UdpSocket,
exit: Arc<AtomicBool>,
) -> (Ncp, SharedWindow) {
) -> (Ncp, SharedWindow, BlobRecycler) {
let window = Arc::new(RwLock::new(window::default_window()));
let ncp = Ncp::new(&crdt, window.clone(), None, gossip, exit);
(ncp, window)
let recycler = BlobRecycler::default();
let ncp = Ncp::new(&crdt, window.clone(), recycler.clone(), None, gossip, exit);
(ncp, window, recycler)
}

/// Test that message sent from leader to target1 and replicated to target2
Expand Down Expand Up @@ -202,13 +203,12 @@ pub mod tests {
// setup some blob services to send blobs into the socket
// to simulate the source peer and get blobs out of the socket to
// simulate target peer
let recv_recycler = BlobRecycler::default();
let resp_recycler = BlobRecycler::default();
let recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = streamer::blob_receiver(
Arc::new(target2.sockets.replicate),
exit.clone(),
recv_recycler.clone(),
recycler.clone(),
s_reader,
);

Expand All @@ -217,7 +217,7 @@ pub mod tests {
let t_responder = streamer::responder(
"test_replicate",
Arc::new(leader.sockets.requests),
resp_recycler.clone(),
recycler.clone(),
r_responder,
);

Expand All @@ -239,6 +239,7 @@ pub mod tests {
0,
cref1,
dr_1.1,
dr_1.2,
target1.sockets.replicate,
target1.sockets.repair,
target1.sockets.retransmit,
Expand Down Expand Up @@ -273,7 +274,7 @@ pub mod tests {
alice_ref_balance -= transfer_amount;

for entry in vec![entry0, entry1] {
let b = resp_recycler.allocate();
let b = recycler.allocate();
{
let mut w = b.write().unwrap();
w.set_index(blob_id).unwrap();
Expand Down
11 changes: 9 additions & 2 deletions tests/data_replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use rayon::iter::*;
use solana::crdt::{Crdt, Node};
use solana::logger;
use solana::ncp::Ncp;
use solana::packet::Blob;
use solana::packet::{Blob, BlobRecycler};
use solana::result;
use solana::service::Service;
use std::net::UdpSocket;
Expand All @@ -21,7 +21,14 @@ fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<Crdt>>, Ncp, UdpSocket) {
let crdt = Crdt::new(tn.info.clone()).expect("Crdt::new");
let c = Arc::new(RwLock::new(crdt));
let w = Arc::new(RwLock::new(vec![]));
let d = Ncp::new(&c.clone(), w, None, tn.sockets.gossip, exit);
let d = Ncp::new(
&c.clone(),
w,
BlobRecycler::default(),
None,
tn.sockets.gossip,
exit,
);
(c, d, tn.sockets.replicate)
}

Expand Down
11 changes: 10 additions & 1 deletion tests/multinode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use solana::ledger::LedgerWriter;
use solana::logger;
use solana::mint::Mint;
use solana::ncp::Ncp;
use solana::packet::BlobRecycler;
use solana::result;
use solana::service::Service;
use solana::signature::{Keypair, KeypairUtil, Pubkey};
Expand Down Expand Up @@ -42,7 +43,15 @@ fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
spy_crdt.set_leader(leader.id);
let spy_ref = Arc::new(RwLock::new(spy_crdt));
let spy_window = Arc::new(RwLock::new(default_window()));
let ncp = Ncp::new(&spy_ref, spy_window, None, spy.sockets.gossip, exit.clone());
let recycler = BlobRecycler::default();
let ncp = Ncp::new(
&spy_ref,
spy_window,
recycler,
None,
spy.sockets.gossip,
exit.clone(),
);
//wait for the network to converge
let mut converged = false;
let mut rv = vec![];
Expand Down

0 comments on commit a8fdb8a

Please sign in to comment.