diff --git a/.clippy.toml b/.clippy.toml new file mode 100644 index 00000000000000..756c7dc24eaf24 --- /dev/null +++ b/.clippy.toml @@ -0,0 +1 @@ +too-many-arguments-threshold = 9 diff --git a/benches/banking_stage.rs b/benches/banking_stage.rs index 9762edfd9df7e5..1b7e5e87c995ef 100644 --- a/benches/banking_stage.rs +++ b/benches/banking_stage.rs @@ -144,12 +144,8 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { let verified_setup_len = verified_setup.len(); verified_sender.send(verified_setup).unwrap(); - BankingStage::process_packets( - bank.clone(), - &verified_receiver, - &signal_sender, - &packet_recycler, - ).unwrap(); + BankingStage::process_packets(&bank, &verified_receiver, &signal_sender, &packet_recycler) + .unwrap(); check_txs(verified_setup_len, &signal_receiver, num_src_accounts); @@ -163,12 +159,8 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { let verified_len = verified.len(); verified_sender.send(verified).unwrap(); - BankingStage::process_packets( - bank.clone(), - &verified_receiver, - &signal_sender, - &packet_recycler, - ).unwrap(); + BankingStage::process_packets(&bank, &verified_receiver, &signal_sender, &packet_recycler) + .unwrap(); check_txs(verified_len, &signal_receiver, tx); }); @@ -210,12 +202,8 @@ fn bench_banking_stage_single_from(bencher: &mut Bencher) { .collect(); let verified_len = verified.len(); verified_sender.send(verified).unwrap(); - BankingStage::process_packets( - bank.clone(), - &verified_receiver, - &signal_sender, - &packet_recycler, - ).unwrap(); + BankingStage::process_packets(&bank, &verified_receiver, &signal_sender, &packet_recycler) + .unwrap(); check_txs(verified_len, &signal_receiver, tx); }); diff --git a/src/banking_stage.rs b/src/banking_stage.rs index 4616d45809ad46..c8b7f6de357834 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -40,7 +40,7 @@ impl BankingStage { .name("solana-banking-stage".to_string()) .spawn(move || loop { if let Err(e) = Self::process_packets( - bank.clone(), + &bank.clone(), &verified_receiver, &signal_sender, &packet_recycler, @@ -72,7 +72,7 @@ impl BankingStage { /// Process the incoming packets and send output `Signal` messages to `signal_sender`. /// Discard packets via `packet_recycler`. pub fn process_packets( - bank: Arc, + bank: &Arc, verified_receiver: &Receiver)>>, signal_sender: &Sender, packet_recycler: &PacketRecycler, diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 34e4a2c55aa421..512d03ff0d19d9 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -367,7 +367,7 @@ fn converge( let window = default_window(); let gossip_send_socket = udp_random_bind(8000, 10000, 5).unwrap(); let ncp = Ncp::new( - spy_ref.clone(), + &spy_ref.clone(), window.clone(), spy_gossip, gossip_send_socket, diff --git a/src/blob_fetch_stage.rs b/src/blob_fetch_stage.rs index c02c0836ee7188..b7ef06124a70ba 100644 --- a/src/blob_fetch_stage.rs +++ b/src/blob_fetch_stage.rs @@ -18,14 +18,14 @@ impl BlobFetchStage { pub fn new( socket: UdpSocket, exit: Arc, - blob_recycler: BlobRecycler, + blob_recycler: &BlobRecycler, ) -> (Self, BlobReceiver) { Self::new_multi_socket(vec![socket], exit, blob_recycler) } pub fn new_multi_socket( sockets: Vec, exit: Arc, - blob_recycler: BlobRecycler, + blob_recycler: &BlobRecycler, ) -> (Self, BlobReceiver) { let (blob_sender, blob_receiver) = channel(); let thread_hdls: Vec<_> = sockets diff --git a/src/drone.rs b/src/drone.rs index 50fd9e36e65e2a..d8e7736c970548 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -17,7 +17,7 @@ use transaction::Transaction; pub const TIME_SLICE: u64 = 60; pub const REQUEST_CAP: u64 = 1_000_000; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] pub enum DroneRequest { GetAirdrop { airdrop_request_amount: u64, diff --git a/src/fetch_stage.rs b/src/fetch_stage.rs index bac6be8d8a385c..c1731f2c3af865 100644 --- a/src/fetch_stage.rs +++ b/src/fetch_stage.rs @@ -18,14 +18,14 @@ impl FetchStage { pub fn new( socket: UdpSocket, exit: Arc, - packet_recycler: PacketRecycler, + packet_recycler: &PacketRecycler, ) -> (Self, PacketReceiver) { Self::new_multi_socket(vec![socket], exit, packet_recycler) } pub fn new_multi_socket( sockets: Vec, exit: Arc, - packet_recycler: PacketRecycler, + packet_recycler: &PacketRecycler, ) -> (Self, PacketReceiver) { let (packet_sender, packet_receiver) = channel(); let thread_hdls: Vec<_> = sockets diff --git a/src/fullnode.rs b/src/fullnode.rs index d6c4b2346f21b6..ab896202ca19b3 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -112,7 +112,7 @@ impl FullNode { entry_height, Some(ledger_tail), node, - network_entry_point, + &network_entry_point, exit.clone(), ); info!( @@ -208,7 +208,7 @@ impl FullNode { let bank = Arc::new(bank); let mut thread_hdls = vec![]; let rpu = Rpu::new( - bank.clone(), + &bank.clone(), node.sockets.requests, node.sockets.respond, exit.clone(), @@ -229,7 +229,7 @@ impl FullNode { thread_hdls.extend(tpu.thread_hdls()); let window = FullNode::new_window(ledger_tail, entry_height, &crdt, &blob_recycler); let ncp = Ncp::new( - crdt.clone(), + &crdt.clone(), window.clone(), node.sockets.gossip, node.sockets.gossip_send, @@ -285,13 +285,13 @@ impl FullNode { entry_height: u64, ledger_tail: Option>, node: TestNode, - entry_point: NodeInfo, + entry_point: &NodeInfo, exit: Arc, ) -> Self { let bank = Arc::new(bank); let mut thread_hdls = vec![]; let rpu = Rpu::new( - bank.clone(), + &bank.clone(), node.sockets.requests, node.sockets.respond, exit.clone(), @@ -308,7 +308,7 @@ impl FullNode { let window = FullNode::new_window(ledger_tail, entry_height, &crdt, &blob_recycler); let ncp = Ncp::new( - crdt.clone(), + &crdt.clone(), window.clone(), node.sockets.gossip, node.sockets.gossip_send, @@ -367,7 +367,7 @@ mod tests { let bank = Bank::new(&alice); let exit = Arc::new(AtomicBool::new(false)); let entry = tn.data.clone(); - let v = FullNode::new_validator(kp, bank, 0, None, tn, entry, exit); + let v = FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit); v.close().unwrap(); } } diff --git a/src/metrics.rs b/src/metrics.rs index 8fa796d906b4ba..c502722ae981aa 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -81,13 +81,13 @@ impl Default for MetricsAgent { impl MetricsAgent { fn new(writer: Arc, write_frequency: Duration) -> Self { let (sender, receiver) = channel::(); - thread::spawn(move || Self::run(receiver, writer, write_frequency)); + thread::spawn(move || Self::run(&receiver, &writer, write_frequency)); MetricsAgent { sender } } fn run( - receiver: Receiver, - writer: Arc, + receiver: &Receiver, + writer: &Arc, write_frequency: Duration, ) { trace!("run: enter"); diff --git a/src/ncp.rs b/src/ncp.rs index 7e8d05aa57aa0c..e03b7b5df1d714 100644 --- a/src/ncp.rs +++ b/src/ncp.rs @@ -18,7 +18,7 @@ pub struct Ncp { impl Ncp { pub fn new( - crdt: Arc>, + crdt: &Arc>, window: Arc>>>, gossip_listen_socket: UdpSocket, gossip_send_socket: UdpSocket, @@ -93,7 +93,7 @@ mod tests { let c = Arc::new(RwLock::new(crdt)); let w = Arc::new(RwLock::new(vec![])); let d = Ncp::new( - c.clone(), + &c.clone(), w, tn.sockets.gossip, tn.sockets.gossip_send, diff --git a/src/request.rs b/src/request.rs index a1a006c38b2788..96c84b80ad67e6 100644 --- a/src/request.rs +++ b/src/request.rs @@ -4,7 +4,7 @@ use hash::Hash; use signature::{PublicKey, Signature}; #[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] pub enum Request { GetBalance { key: PublicKey }, GetLastId, diff --git a/src/rpu.rs b/src/rpu.rs index ca5ac7d6a2ad72..af8b52ca93a3be 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -41,7 +41,7 @@ pub struct Rpu { impl Rpu { pub fn new( - bank: Arc, + bank: &Arc, requests_socket: UdpSocket, respond_socket: UdpSocket, exit: Arc, diff --git a/src/sigverify.rs b/src/sigverify.rs index 0c0b4fc010039c..ffe7698e09c8e0 100644 --- a/src/sigverify.rs +++ b/src/sigverify.rs @@ -66,6 +66,7 @@ fn batch_size(batches: &[SharedPackets]) -> usize { .sum() } +#[cfg_attr(feature = "cargo-clippy", allow(ptr_arg))] #[cfg(not(feature = "cuda"))] pub fn ed25519_verify(batches: &Vec) -> Vec> { use rayon::prelude::*; diff --git a/src/streamer.rs b/src/streamer.rs index 3f99cc88f53a83..cb6c5192854fff 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -249,6 +249,150 @@ fn repair_window( Ok(()) } +fn retransmit_all_leader_blocks( + maybe_leader: Option, + dq: &mut SharedBlobs, + debug_id: u64, + recycler: &BlobRecycler, + consumed: &mut u64, + received: &mut u64, + retransmit: &BlobSender, +) -> Result<()> { + let mut retransmit_queue = VecDeque::new(); + if let Some(leader) = maybe_leader { + for b in dq { + let p = b.read().expect("'b' read lock in fn recv_window"); + //TODO this check isn't safe against adverserial packets + //we need to maintain a sequence window + let leader_id = leader.id; + trace!( + "idx: {} addr: {:?} id: {:?} leader: {:?}", + p.get_index().expect("get_index in fn recv_window"), + p.get_id().expect("get_id in trace! fn recv_window"), + p.meta.addr(), + leader_id + ); + if p.get_id().expect("get_id in fn recv_window") == leader_id { + //TODO + //need to copy the retransmitted blob + //otherwise we get into races with which thread + //should do the recycling + // + //a better abstraction would be to recycle when the blob + //is dropped via a weakref to the recycler + let nv = recycler.allocate(); + { + let mut mnv = nv.write().expect("recycler write lock in fn recv_window"); + let sz = p.meta.size; + mnv.meta.size = sz; + mnv.data[..sz].copy_from_slice(&p.data[..sz]); + } + retransmit_queue.push_back(nv); + } + } + } else { + warn!("{:x}: no leader to retransmit from", debug_id); + } + if !retransmit_queue.is_empty() { + debug!( + "{:x}: RECV_WINDOW {} {}: retransmit {}", + debug_id, + *consumed, + *received, + retransmit_queue.len(), + ); + static mut COUNTER_RETRANSMIT: Counter = + create_counter!("streamer-recv_window-retransmit", LOG_RATE); + inc_counter!(COUNTER_RETRANSMIT, retransmit_queue.len()); + retransmit.send(retransmit_queue)?; + } + Ok(()) +} + +fn process_blob( + b: SharedBlob, + pix: u64, + w: usize, + consume_queue: &mut SharedBlobs, + locked_window: &Window, + debug_id: u64, + recycler: &BlobRecycler, + consumed: &mut u64, +) { + let mut window = locked_window.write().unwrap(); + + // Search the window for old blobs in the window + // of consumed to received and clear any old ones + for ix in *consumed..(pix + 1) { + let k = (ix % WINDOW_SIZE) as usize; + if let Some(b) = &mut window[k] { + if b.read().unwrap().get_index().unwrap() >= *consumed as u64 { + continue; + } + } + if let Some(b) = mem::replace(&mut window[k], None) { + recycler.recycle(b); + } + } + + // Insert the new blob into the window + // spot should be free because we cleared it above + if window[w].is_none() { + window[w] = Some(b); + } else if let Some(cblob) = &window[w] { + if cblob.read().unwrap().get_index().unwrap() != pix as u64 { + warn!("{:x}: overrun blob at index {:}", debug_id, w); + } else { + debug!("{:x}: duplicate blob at index {:}", debug_id, w); + } + } + loop { + let k = (*consumed % WINDOW_SIZE) as usize; + trace!("k: {} consumed: {}", k, *consumed); + + if window[k].is_none() { + break; + } + let mut is_coding = false; + if let Some(ref cblob) = window[k] { + let cblob_r = cblob + .read() + .expect("blob read lock for flogs streamer::window"); + if cblob_r.get_index().unwrap() < *consumed { + break; + } + if cblob_r.is_coding() { + is_coding = true; + } + } + if !is_coding { + consume_queue.push_back(window[k].clone().expect("clone in fn recv_window")); + *consumed += 1; + } else { + #[cfg(feature = "erasure")] + { + let block_start = *consumed - (*consumed % erasure::NUM_CODED as u64); + let coding_end = block_start + erasure::NUM_CODED as u64; + // We've received all this block's data blobs, go and null out the window now + for j in block_start..*consumed { + if let Some(b) = mem::replace(&mut window[(j % WINDOW_SIZE) as usize], None) { + recycler.recycle(b); + } + } + for j in *consumed..coding_end { + window[(j % WINDOW_SIZE) as usize] = None; + } + + *consumed += erasure::MAX_MISSING as u64; + debug!( + "skipping processing coding blob k: {} consumed: {}", + k, *consumed + ); + } + } + } +} + fn recv_window( debug_id: u64, locked_window: &Window, @@ -278,57 +422,17 @@ fn recv_window( *received, dq.len(), ); - { - //retransmit all leader blocks - let mut retransmit_queue = VecDeque::new(); - if let Some(leader) = maybe_leader { - for b in &dq { - let p = b.read().expect("'b' read lock in fn recv_window"); - //TODO this check isn't safe against adverserial packets - //we need to maintain a sequence window - let leader_id = leader.id; - trace!( - "idx: {} addr: {:?} id: {:?} leader: {:?}", - p.get_index().expect("get_index in fn recv_window"), - p.get_id().expect("get_id in trace! fn recv_window"), - p.meta.addr(), - leader_id - ); - if p.get_id().expect("get_id in fn recv_window") == leader_id { - //TODO - //need to copy the retransmitted blob - //otherwise we get into races with which thread - //should do the recycling - // - //a better abstraction would be to recycle when the blob - //is dropped via a weakref to the recycler - let nv = recycler.allocate(); - { - let mut mnv = nv.write().expect("recycler write lock in fn recv_window"); - let sz = p.meta.size; - mnv.meta.size = sz; - mnv.data[..sz].copy_from_slice(&p.data[..sz]); - } - retransmit_queue.push_back(nv); - } - } - } else { - warn!("{:x}: no leader to retransmit from", debug_id); - } - if !retransmit_queue.is_empty() { - debug!( - "{:x}: RECV_WINDOW {} {}: retransmit {}", - debug_id, - *consumed, - *received, - retransmit_queue.len(), - ); - static mut COUNTER_RETRANSMIT: Counter = - create_counter!("streamer-recv_window-retransmit", LOG_RATE); - inc_counter!(COUNTER_RETRANSMIT, retransmit_queue.len()); - retransmit.send(retransmit_queue)?; - } - } + + retransmit_all_leader_blocks( + maybe_leader, + &mut dq, + debug_id, + recycler, + consumed, + received, + retransmit, + )?; + //send a contiguous set of blocks let mut consume_queue = VecDeque::new(); while let Some(b) = dq.pop_front() { @@ -353,82 +457,17 @@ fn recv_window( //if we get different blocks at the same index //that is a network failure/attack trace!("window w: {} size: {}", w, meta_size); - { - let mut window = locked_window.write().unwrap(); - - // Search the window for old blobs in the window - // of consumed to received and clear any old ones - for ix in *consumed..(pix + 1) { - let k = (ix % WINDOW_SIZE) as usize; - if let Some(b) = &mut window[k] { - if b.read().unwrap().get_index().unwrap() >= *consumed as u64 { - continue; - } - } - if let Some(b) = mem::replace(&mut window[k], None) { - recycler.recycle(b); - } - } - // Insert the new blob into the window - // spot should be free because we cleared it above - if window[w].is_none() { - window[w] = Some(b); - } else if let Some(cblob) = &window[w] { - if cblob.read().unwrap().get_index().unwrap() != pix as u64 { - warn!("{:x}: overrun blob at index {:}", debug_id, w); - } else { - debug!("{:x}: duplicate blob at index {:}", debug_id, w); - } - } - loop { - let k = (*consumed % WINDOW_SIZE) as usize; - trace!("k: {} consumed: {}", k, *consumed); - - if window[k].is_none() { - break; - } - let mut is_coding = false; - if let Some(ref cblob) = window[k] { - let cblob_r = cblob - .read() - .expect("blob read lock for flogs streamer::window"); - if cblob_r.get_index().unwrap() < *consumed { - break; - } - if cblob_r.is_coding() { - is_coding = true; - } - } - if !is_coding { - consume_queue.push_back(window[k].clone().expect("clone in fn recv_window")); - *consumed += 1; - } else { - #[cfg(feature = "erasure")] - { - let block_start = *consumed - (*consumed % erasure::NUM_CODED as u64); - let coding_end = block_start + erasure::NUM_CODED as u64; - // We've received all this block's data blobs, go and null out the window now - for j in block_start..*consumed { - if let Some(b) = - mem::replace(&mut window[(j % WINDOW_SIZE) as usize], None) - { - recycler.recycle(b); - } - } - for j in *consumed..coding_end { - window[(j % WINDOW_SIZE) as usize] = None; - } - - *consumed += erasure::MAX_MISSING as u64; - debug!( - "skipping processing coding blob k: {} consumed: {}", - k, *consumed - ); - } - } - } - } + process_blob( + b, + pix, + w, + &mut consume_queue, + locked_window, + debug_id, + recycler, + consumed, + ); } print_window(debug_id, locked_window, *consumed); trace!("sending consume_queue.len: {}", consume_queue.len()); diff --git a/src/tpu.rs b/src/tpu.rs index 3119d26179193f..10a21c0b22db9e 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -63,7 +63,7 @@ impl Tpu { let packet_recycler = PacketRecycler::default(); let (fetch_stage, packet_receiver) = - FetchStage::new(transactions_socket, exit, packet_recycler.clone()); + FetchStage::new(transactions_socket, exit, &packet_recycler.clone()); let (sigverify_stage, verified_receiver) = SigVerifyStage::new(packet_receiver); diff --git a/src/tvu.rs b/src/tvu.rs index 7d51168e2e1ea3..cf1d79ce011da8 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -83,7 +83,7 @@ impl Tvu { let (fetch_stage, blob_fetch_receiver) = BlobFetchStage::new_multi_socket( vec![replicate_socket, repair_socket], exit, - blob_recycler.clone(), + &blob_recycler.clone(), ); //TODO //the packets coming out of blob_receiver need to be sent to the GPU and verified @@ -161,7 +161,7 @@ pub mod tests { ) -> Result<(Ncp, Window)> { let window = streamer::default_window(); let send_sock = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); - let ncp = Ncp::new(crdt, window.clone(), listen, send_sock, exit)?; + let ncp = Ncp::new(&crdt, window.clone(), listen, send_sock, exit)?; Ok((ncp, window)) } /// Test that message sent from leader to target1 and replicated to target2