From 3761d94fdcb62113618ab829a21ace325aa542c8 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Wed, 11 Jul 2018 14:40:46 -0600 Subject: [PATCH] Apply most of clippy's feedback --- benches/banking_stage.rs | 6 +-- src/bank.rs | 8 +--- src/bin/client-demo.rs | 2 +- src/choose_gossip_peer_strategy.rs | 4 +- src/crdt.rs | 77 +++++++++++++++--------------- src/drone.rs | 4 +- src/entry.rs | 2 +- src/fullnode.rs | 10 ++-- src/logger.rs | 2 +- src/metrics.rs | 21 ++++---- src/nat.rs | 2 +- src/packet.rs | 14 +++--- src/record_stage.rs | 9 ++-- src/replicate_stage.rs | 2 +- src/sigverify.rs | 2 +- src/sigverify_stage.rs | 14 +++--- src/streamer.rs | 22 ++++----- src/thin_client.rs | 46 ++++++++---------- src/timing.rs | 8 ++-- src/tpu.rs | 6 +-- src/tvu.rs | 4 +- src/voting.rs | 8 ++-- src/window_stage.rs | 4 +- src/write_stage.rs | 2 +- 24 files changed, 133 insertions(+), 146 deletions(-) diff --git a/benches/banking_stage.rs b/benches/banking_stage.rs index 89b5122903a9f7..9762edfd9df7e5 100644 --- a/benches/banking_stage.rs +++ b/benches/banking_stage.rs @@ -134,7 +134,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { let bank = Arc::new(Bank::new(&mint)); let verified_setup: Vec<_> = - to_packets_chunked(&packet_recycler, setup_transactions.clone(), tx) + to_packets_chunked(&packet_recycler, &setup_transactions.clone(), tx) .into_iter() .map(|x| { let len = (*x).read().unwrap().packets.len(); @@ -153,7 +153,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { check_txs(verified_setup_len, &signal_receiver, num_src_accounts); - let verified: Vec<_> = to_packets_chunked(&packet_recycler, transactions.clone(), 192) + let verified: Vec<_> = to_packets_chunked(&packet_recycler, &transactions.clone(), 192) .into_iter() .map(|x| { let len = (*x).read().unwrap().packets.len(); @@ -201,7 +201,7 @@ fn bench_banking_stage_single_from(bencher: &mut Bencher) { bencher.iter(move || { let bank = Arc::new(Bank::new(&mint)); - let verified: Vec<_> = to_packets_chunked(&packet_recycler, transactions.clone(), tx) + let verified: Vec<_> = to_packets_chunked(&packet_recycler, &transactions.clone(), tx) .into_iter() .map(|x| { let len = (*x).read().unwrap().packets.len(); diff --git a/src/bank.rs b/src/bank.rs index 3c8ca651a3d2b6..b9ecf743c7e330 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -120,11 +120,7 @@ impl Bank { /// Commit funds to the `payment.to` party. fn apply_payment(&self, payment: &Payment, balances: &mut HashMap) { - if balances.contains_key(&payment.to) { - *balances.get_mut(&payment.to).unwrap() += payment.tokens; - } else { - balances.insert(payment.to, payment.tokens); - } + *balances.entry(payment.to).or_insert(0) += payment.tokens; } /// Return the last entry ID registered. @@ -511,7 +507,7 @@ impl Bank { let bals = self.balances .read() .expect("'balances' read lock in get_balance"); - bals.get(pubkey).map(|x| *x).unwrap_or(0) + bals.get(pubkey).cloned().unwrap_or(0) } pub fn transaction_count(&self) -> usize { diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index ae1f387f0e435d..b22bb1b3267c1e 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -129,7 +129,7 @@ fn generate_and_send_txs( leader.contact_info.tpu ); for tx in txs { - client.transfer_signed(tx.clone()).unwrap(); + client.transfer_signed(tx).unwrap(); } }); println!( diff --git a/src/choose_gossip_peer_strategy.rs b/src/choose_gossip_peer_strategy.rs index fd235387978daa..ad9156163909d6 100644 --- a/src/choose_gossip_peer_strategy.rs +++ b/src/choose_gossip_peer_strategy.rs @@ -159,7 +159,7 @@ impl<'a> ChooseWeightedPeerStrategy<'a> { // Return u32 b/c the weighted sampling API from rand::distributions // only takes u32 for weights - if weighted_vote >= std::u32::MAX as f64 { + if weighted_vote >= f64::from(std::u32::MAX) { return std::u32::MAX; } @@ -173,7 +173,7 @@ impl<'a> ChooseWeightedPeerStrategy<'a> { impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> { fn choose_peer<'b>(&self, options: Vec<&'b NodeInfo>) -> Result<&'b NodeInfo> { - if options.len() < 1 { + if options.is_empty() { Err(CrdtError::TooSmall)?; } diff --git a/src/crdt.rs b/src/crdt.rs index d241cc59634f67..d8d42b4e8092b3 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -56,7 +56,7 @@ pub fn parse_port_or_addr(optstr: Option) -> SocketAddr { let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address"); if let Some(addrstr) = optstr { if let Ok(port) = addrstr.parse() { - let mut addr = daddr.clone(); + let mut addr = daddr; addr.set_port(port); addr } else if let Ok(addr) = addrstr.parse() { @@ -173,12 +173,12 @@ impl NodeInfo { make_debug_id(&self.id) } fn next_port(addr: &SocketAddr, nxt: u16) -> SocketAddr { - let mut nxt_addr = addr.clone(); + let mut nxt_addr = *addr; nxt_addr.set_port(addr.port() + nxt); nxt_addr } pub fn new_leader_with_pubkey(pubkey: PublicKey, bind_addr: &SocketAddr) -> Self { - let transactions_addr = bind_addr.clone(); + let transactions_addr = *bind_addr; let gossip_addr = Self::next_port(&bind_addr, 1); let replicate_addr = Self::next_port(&bind_addr, 2); let requests_addr = Self::next_port(&bind_addr, 3); @@ -201,9 +201,9 @@ impl NodeInfo { NodeInfo::new( PublicKey::default(), gossip_addr, - daddr.clone(), - daddr.clone(), - daddr.clone(), + daddr, + daddr, + daddr, daddr, ) } @@ -341,19 +341,19 @@ impl Crdt { fn update_leader_liveness(&mut self) { //TODO: (leaders should vote) //until then we pet their liveness every time we see some votes from anyone - let ld = self.leader_data().map(|x| x.id.clone()); + let ld = self.leader_data().map(|x| x.id); trace!("leader_id {:?}", ld); if let Some(leader_id) = ld { self.update_liveness(leader_id); } } - pub fn insert_votes(&mut self, votes: Vec<(PublicKey, Vote, Hash)>) { + pub fn insert_votes(&mut self, votes: &[(PublicKey, Vote, Hash)]) { static mut COUNTER_VOTE: Counter = create_counter!("crdt-vote-count", LOG_RATE); inc_counter!(COUNTER_VOTE, votes.len()); - if votes.len() > 0 { + if !votes.is_empty() { info!("{:x}: INSERTING VOTES {}", self.debug_id(), votes.len()); } - for v in &votes { + for v in votes { self.insert_vote(&v.0, &v.1, v.2); } } @@ -371,7 +371,7 @@ impl Crdt { ); self.update_index += 1; - let _ = self.table.insert(v.id.clone(), v.clone()); + let _ = self.table.insert(v.id, v.clone()); let _ = self.local.insert(v.id, self.update_index); static mut COUNTER_UPDATE: Counter = create_counter!("crdt-update-count", LOG_RATE); inc_counter!(COUNTER_UPDATE, 1); @@ -449,7 +449,7 @@ impl Crdt { static mut COUNTER_PURGE: Counter = create_counter!("crdt-purge-count", LOG_RATE); inc_counter!(COUNTER_PURGE, dead_ids.len()); - for id in dead_ids.iter() { + for id in &dead_ids { self.alive.remove(id); self.table.remove(id); self.remote.remove(id); @@ -461,11 +461,7 @@ impl Crdt { } } - pub fn index_blobs( - me: &NodeInfo, - blobs: &Vec, - receive_index: &mut u64, - ) -> Result<()> { + pub fn index_blobs(me: &NodeInfo, blobs: &[SharedBlob], receive_index: &mut u64) -> Result<()> { // enumerate all the blobs, those are the indices trace!("{:x}: INDEX_BLOBS {}", me.debug_id(), blobs.len()); for (i, b) in blobs.iter().enumerate() { @@ -518,13 +514,13 @@ impl Crdt { /// We need to avoid having obj locked while doing any io, such as the `send_to` pub fn broadcast( me: &NodeInfo, - broadcast_table: &Vec, + broadcast_table: &[NodeInfo], window: &Window, s: &UdpSocket, transmit_index: &mut u64, received_index: u64, ) -> Result<()> { - if broadcast_table.len() < 1 { + if broadcast_table.is_empty() { warn!("{:x}:not enough peers in crdt table", me.debug_id()); Err(CrdtError::TooSmall)?; } @@ -676,7 +672,7 @@ impl Crdt { Err(CrdtError::TooSmall)?; } let n = (Self::random() as usize) % valid.len(); - let addr = valid[n].contact_info.ncp.clone(); + let addr = valid[n].contact_info.ncp; let req = Protocol::RequestWindowIndex(self.table[&self.me].clone(), ix); let out = serialize(&req)?; Ok((addr, out)) @@ -768,7 +764,7 @@ impl Crdt { } let mut sorted: Vec<(&PublicKey, usize)> = table.into_iter().collect(); let my_id = self.debug_id(); - for x in sorted.iter() { + for x in &sorted { trace!( "{:x}: sorted leaders {:x} votes: {}", my_id, @@ -784,10 +780,8 @@ impl Crdt { /// A t-shirt for the first person to actually use this bad behavior to attack the alpha testnet fn update_leader(&mut self) { if let Some(leader_id) = self.top_leader() { - if self.my_data().leader_id != leader_id { - if self.table.get(&leader_id).is_some() { - self.set_leader(leader_id); - } + if self.my_data().leader_id != leader_id && self.table.get(&leader_id).is_some() { + self.set_leader(leader_id); } } } @@ -822,7 +816,9 @@ impl Crdt { continue; } - let liveness_entry = self.external_liveness.entry(*pk).or_insert(HashMap::new()); + let liveness_entry = self.external_liveness + .entry(*pk) + .or_insert_with(HashMap::new); let peer_index = *liveness_entry.entry(from).or_insert(*external_remote_index); if *external_remote_index > peer_index { liveness_entry.insert(from, *external_remote_index); @@ -854,7 +850,7 @@ impl Crdt { obj.write().unwrap().purge(timestamp()); //TODO: possibly tune this parameter //we saw a deadlock passing an obj.read().unwrap().timeout into sleep - let _ = obj.write().unwrap().update_leader(); + obj.write().unwrap().update_leader(); let elapsed = timestamp() - start; if GOSSIP_SLEEP_MILLIS > elapsed { let time_left = GOSSIP_SLEEP_MILLIS - elapsed; @@ -948,10 +944,7 @@ impl Crdt { let me = obj.read().unwrap(); // only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from` let (from, ups, data) = me.get_updates_since(v); - let external_liveness = me.remote - .iter() - .map(|(k, v)| (k.clone(), v.clone())) - .collect(); + let external_liveness = me.remote.iter().map(|(k, v)| (*k, *v)).collect(); drop(me); trace!("get updates since response {} {}", v, data.len()); let len = data.len(); @@ -1091,6 +1084,12 @@ pub struct TestNode { pub sockets: Sockets, } +impl Default for TestNode { + fn default() -> Self { + Self::new() + } +} + impl TestNode { pub fn new() -> Self { let pubkey = KeyPair::new().pubkey(); @@ -1115,7 +1114,7 @@ impl TestNode { repair.local_addr().unwrap(), ); TestNode { - data: data, + data, sockets: Sockets { gossip, gossip_send, @@ -1130,19 +1129,19 @@ impl TestNode { } } pub fn new_with_bind_addr(data: NodeInfo, bind_addr: SocketAddr) -> TestNode { - let mut local_gossip_addr = bind_addr.clone(); + let mut local_gossip_addr = bind_addr; local_gossip_addr.set_port(data.contact_info.ncp.port()); - let mut local_replicate_addr = bind_addr.clone(); + let mut local_replicate_addr = bind_addr; local_replicate_addr.set_port(data.contact_info.tvu.port()); - let mut local_requests_addr = bind_addr.clone(); + let mut local_requests_addr = bind_addr; local_requests_addr.set_port(data.contact_info.rpu.port()); - let mut local_transactions_addr = bind_addr.clone(); + let mut local_transactions_addr = bind_addr; local_transactions_addr.set_port(data.contact_info.tpu.port()); - let mut local_repair_addr = bind_addr.clone(); + let mut local_repair_addr = bind_addr; local_repair_addr.set_port(data.contact_info.tvu_window.port()); let transaction = UdpSocket::bind(local_transactions_addr).unwrap(); @@ -1160,7 +1159,7 @@ impl TestNode { let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); TestNode { - data: data, + data, sockets: Sockets { gossip, gossip_send, @@ -1300,7 +1299,7 @@ mod tests { }; sleep(Duration::from_millis(100)); let votes = vec![(d.id.clone(), vote_new_version_old_addrs, Hash::default())]; - crdt.insert_votes(votes); + crdt.insert_votes(&votes); let updated = crdt.alive[&leader.id]; //should be accepted, since the update is for the same address field as the one we know assert_eq!(crdt.table[&d.id].version, 1); diff --git a/src/drone.rs b/src/drone.rs index 37a8bf4a5780c5..50fd9e36e65e2a 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -112,7 +112,7 @@ impl Drone { airdrop_request_amount, client_public_key, } => { - request_amount = airdrop_request_amount.clone(); + request_amount = airdrop_request_amount; tx = Transaction::new( &self.mint_keypair, client_public_key, @@ -136,7 +136,7 @@ impl Drone { ) .to_owned(), ); - client.transfer_signed(tx) + client.transfer_signed(&tx) } else { Err(Error::new(ErrorKind::Other, "token limit reached")) } diff --git a/src/entry.rs b/src/entry.rs index 704c246943dda8..dc04b07dbe12ec 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -144,7 +144,7 @@ fn next_hash(start_hash: &Hash, num_hashes: u64, transactions: &[Transaction]) - /// Creates the next Tick or Transaction Entry `num_hashes` after `start_hash`. pub fn next_entry(start_hash: &Hash, num_hashes: u64, transactions: Vec) -> Entry { - assert!(num_hashes > 0 || transactions.len() == 0); + assert!(num_hashes > 0 || transactions.is_empty()); Entry { num_hashes, id: next_hash(start_hash, num_hashes, &transactions), diff --git a/src/fullnode.rs b/src/fullnode.rs index 3df3fd75c7201b..d6c4b2346f21b6 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -99,7 +99,7 @@ impl FullNode { "starting... local gossip address: {} (advertising {})", local_gossip_addr, node.data.contact_info.ncp ); - let requests_addr = node.data.contact_info.rpu.clone(); + let requests_addr = node.data.contact_info.rpu; let exit = Arc::new(AtomicBool::new(false)); if !leader { let testnet_addr = network_entry_for_validator.expect("validator requires entry"); @@ -121,7 +121,7 @@ impl FullNode { ); server } else { - node.data.leader_id = node.data.id.clone(); + node.data.leader_id = node.data.id; let outfile_for_leader: Box = match outfile_for_leader { Some(OutFile::Path(file)) => Box::new( OpenOptions::new() @@ -218,11 +218,11 @@ impl FullNode { let blob_recycler = BlobRecycler::default(); let crdt = Arc::new(RwLock::new(Crdt::new(node.data))); let (tpu, blob_receiver) = Tpu::new( - bank.clone(), - crdt.clone(), + &bank.clone(), + &crdt.clone(), tick_duration, node.sockets.transaction, - blob_recycler.clone(), + &blob_recycler.clone(), exit.clone(), writer, ); diff --git a/src/logger.rs b/src/logger.rs index 120e3ff82412ea..b6ff42a8c11002 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -9,6 +9,6 @@ static INIT: Once = ONCE_INIT; /// Setup function that is only run once, even if called multiple times. pub fn setup() { INIT.call_once(|| { - let _ = env_logger::init(); + env_logger::init(); }); } diff --git a/src/metrics.rs b/src/metrics.rs index 05261137a036cf..8fa796d906b4ba 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -36,10 +36,11 @@ impl InfluxDbMetricsWriter { } fn build_client() -> Option { - let host = env::var("INFLUX_HOST").unwrap_or("https://metrics.solana.com:8086".to_string()); - let db = env::var("INFLUX_DATABASE").unwrap_or("scratch".to_string()); - let username = env::var("INFLUX_USERNAME").unwrap_or("scratch_writer".to_string()); - let password = env::var("INFLUX_PASSWORD").unwrap_or("topsecret".to_string()); + let host = env::var("INFLUX_HOST") + .unwrap_or_else(|_| "https://metrics.solana.com:8086".to_string()); + let db = env::var("INFLUX_DATABASE").unwrap_or_else(|_| "scratch".to_string()); + let username = env::var("INFLUX_USERNAME").unwrap_or_else(|_| "scratch_writer".to_string()); + let password = env::var("INFLUX_PASSWORD").unwrap_or_else(|_| "topsecret".to_string()); debug!("InfluxDB host={} db={} username={}", host, db, username); let mut client = influxdb::Client::new_with_option(host, db, None) @@ -120,13 +121,11 @@ impl MetricsAgent { } let now = Instant::now(); - if now.duration_since(last_write_time) >= write_frequency { - if !points.is_empty() { - debug!("run: writing {} points", points.len()); - writer.write(points); - points = Vec::new(); - last_write_time = now; - } + if now.duration_since(last_write_time) >= write_frequency && !points.is_empty() { + debug!("run: writing {} points", points.len()); + writer.write(points); + points = Vec::new(); + last_write_time = now; } } trace!("run: exit"); diff --git a/src/nat.rs b/src/nat.rs index 3ca30a7efc4abe..7481dd075c49bb 100644 --- a/src/nat.rs +++ b/src/nat.rs @@ -92,7 +92,7 @@ pub fn udp_public_bind(label: &str, startport: u16, endport: u16) -> UdpSocketPa // // TODO: Remove the |sender| socket and deal with the downstream changes to // the UDP signalling - let mut local_addr_sender = local_addr.clone(); + let mut local_addr_sender = local_addr; local_addr_sender.set_port(public_addr.port()); UdpSocket::bind(local_addr_sender).unwrap() }; diff --git a/src/packet.rs b/src/packet.rs index 037ac51e33d4dd..33c6ffc3a77eb4 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -240,7 +240,7 @@ impl Packets { pub fn to_packets_chunked( r: &PacketRecycler, - xs: Vec, + xs: &[T], chunks: usize, ) -> Vec { let mut out = vec![]; @@ -258,10 +258,10 @@ pub fn to_packets_chunked( } out.push(p); } - return out; + out } -pub fn to_packets(r: &PacketRecycler, xs: Vec) -> Vec { +pub fn to_packets(r: &PacketRecycler, xs: &[T]) -> Vec { to_packets_chunked(r, xs, NUM_PACKETS) } @@ -347,7 +347,7 @@ impl Blob { } pub fn is_coding(&self) -> bool { - return (self.get_flags().unwrap() & BLOB_FLAG_IS_CODING) != 0; + (self.get_flags().unwrap() & BLOB_FLAG_IS_CODING) != 0 } pub fn set_coding(&mut self) -> Result<()> { @@ -524,15 +524,15 @@ mod tests { fn test_to_packets() { let tx = Request::GetTransactionCount; let re = PacketRecycler::default(); - let rv = to_packets(&re, vec![tx.clone(); 1]); + let rv = to_packets(&re, &vec![tx.clone(); 1]); assert_eq!(rv.len(), 1); assert_eq!(rv[0].read().unwrap().packets.len(), 1); - let rv = to_packets(&re, vec![tx.clone(); NUM_PACKETS]); + let rv = to_packets(&re, &vec![tx.clone(); NUM_PACKETS]); assert_eq!(rv.len(), 1); assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); - let rv = to_packets(&re, vec![tx.clone(); NUM_PACKETS + 1]); + let rv = to_packets(&re, &vec![tx.clone(); NUM_PACKETS + 1]); assert_eq!(rv.len(), 2); assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); assert_eq!(rv[1].read().unwrap().packets.len(), 1); diff --git a/src/record_stage.rs b/src/record_stage.rs index 4f8651a7b2c440..a2aaa069908c1a 100644 --- a/src/record_stage.rs +++ b/src/record_stage.rs @@ -32,7 +32,7 @@ impl RecordStage { start_hash: &Hash, ) -> (Self, Receiver>) { let (entry_sender, entry_receiver) = channel(); - let start_hash = start_hash.clone(); + let start_hash = *start_hash; let thread_hdl = Builder::new() .name("solana-record-stage".to_string()) @@ -52,7 +52,7 @@ impl RecordStage { tick_duration: Duration, ) -> (Self, Receiver>) { let (entry_sender, entry_receiver) = channel(); - let start_hash = start_hash.clone(); + let start_hash = *start_hash; let thread_hdl = Builder::new() .name("solana-record-stage".to_string()) @@ -60,13 +60,14 @@ impl RecordStage { let mut recorder = Recorder::new(start_hash); let start_time = Instant::now(); loop { - if let Err(_) = Self::try_process_signals( + if Self::try_process_signals( &mut recorder, start_time, tick_duration, &signal_receiver, &entry_sender, - ) { + ).is_err() + { return; } recorder.hash(); diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index bb2cd8e231342e..8c2d8c087fe5b0 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -66,7 +66,7 @@ impl ReplicateStage { let shared_blob = blob_recycler.allocate(); let (vote, addr) = { let mut wcrdt = crdt.write().unwrap(); - wcrdt.insert_votes(votes); + wcrdt.insert_votes(&votes); //TODO: doesn't seem like there is a synchronous call to get height and id info!("replicate_stage {} {:?}", height, &last_id[..8]); wcrdt.new_vote(height, last_id) diff --git a/src/sigverify.rs b/src/sigverify.rs index c4bfeb6bc975f1..0c0b4fc010039c 100644 --- a/src/sigverify.rs +++ b/src/sigverify.rs @@ -59,7 +59,7 @@ fn verify_packet(packet: &Packet) -> u8 { ).is_ok() as u8 } -fn batch_size(batches: &Vec) -> usize { +fn batch_size(batches: &[SharedPackets]) -> usize { batches .iter() .map(|p| p.read().unwrap().packets.len()) diff --git a/src/sigverify_stage.rs b/src/sigverify_stage.rs index e90ea238f6384b..57ffa020e76a9a 100644 --- a/src/sigverify_stage.rs +++ b/src/sigverify_stage.rs @@ -17,27 +17,27 @@ use std::time::Instant; use streamer::{self, PacketReceiver}; use timing; +pub type VerifiedPackets = Vec<(SharedPackets, Vec)>; + pub struct SigVerifyStage { thread_hdls: Vec>, } impl SigVerifyStage { - pub fn new( - packet_receiver: Receiver, - ) -> (Self, Receiver)>>) { + pub fn new(packet_receiver: Receiver) -> (Self, Receiver) { let (verified_sender, verified_receiver) = channel(); let thread_hdls = Self::verifier_services(packet_receiver, verified_sender); (SigVerifyStage { thread_hdls }, verified_receiver) } - fn verify_batch(batch: Vec) -> Vec<(SharedPackets, Vec)> { + fn verify_batch(batch: Vec) -> VerifiedPackets { let r = sigverify::ed25519_verify(&batch); batch.into_iter().zip(r).collect() } fn verifier( recvr: &Arc>, - sendr: &Arc)>>>>, + sendr: &Arc>>, ) -> Result<()> { let (batch, len) = streamer::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?; @@ -74,7 +74,7 @@ impl SigVerifyStage { fn verifier_service( packet_receiver: Arc>, - verified_sender: Arc)>>>>, + verified_sender: Arc>>, ) -> JoinHandle<()> { spawn(move || loop { if let Err(e) = Self::verifier(&packet_receiver.clone(), &verified_sender.clone()) { @@ -89,7 +89,7 @@ impl SigVerifyStage { fn verifier_services( packet_receiver: PacketReceiver, - verified_sender: Sender)>>, + verified_sender: Sender, ) -> Vec> { let sender = Arc::new(Mutex::new(verified_sender)); let receiver = Arc::new(Mutex::new(packet_receiver)); diff --git a/src/streamer.rs b/src/streamer.rs index d2cbda902b311e..3f99cc88f53a83 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -170,7 +170,7 @@ fn find_next_missing( let reqs: Vec<_> = (*consumed..*received) .filter_map(|pix| { let i = (pix % WINDOW_SIZE) as usize; - if let &None = &window[i] { + if window[i].is_none() { let val = crdt.read().unwrap().window_index_request(pix as u64); if let Ok((to, req)) = val { return Some((to, req)); @@ -223,7 +223,7 @@ fn repair_window( let reqs = find_next_missing(locked_window, crdt, consumed, received)?; trace!("{:x}: repair_window missing: {}", debug_id, reqs.len()); - if reqs.len() > 0 { + if !reqs.is_empty() { static mut COUNTER_REPAIR: Counter = create_counter!("streamer-repair_window-repair", LOG_RATE); inc_counter!(COUNTER_REPAIR, reqs.len()); @@ -389,7 +389,7 @@ fn recv_window( break; } let mut is_coding = false; - if let &Some(ref cblob) = &window[k] { + if let Some(ref cblob) = window[k] { let cblob_r = cblob .read() .expect("blob read lock for flogs streamer::window"); @@ -461,16 +461,14 @@ fn print_window(debug_id: u64, locked_window: &Window, consumed: u64) { "_" } else if v.is_none() { "0" - } else { - if let &Some(ref cblob) = &v { - if cblob.read().unwrap().is_coding() { - "C" - } else { - "1" - } + } else if let Some(ref cblob) = v { + if cblob.read().unwrap().is_coding() { + "C" } else { - "0" + "1" } + } else { + "0" } }) .collect(); @@ -575,7 +573,7 @@ pub fn window( fn broadcast( me: &NodeInfo, - broadcast_table: &Vec, + broadcast_table: &[NodeInfo], window: &Window, recycler: &BlobRecycler, r: &BlobReceiver, diff --git a/src/thin_client.rs b/src/thin_client.rs index e40e4cf379b8fd..10d7b7373990af 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -39,7 +39,7 @@ impl ThinClient { transactions_addr: SocketAddr, transactions_socket: UdpSocket, ) -> Self { - let client = ThinClient { + ThinClient { requests_addr, requests_socket, transactions_addr, @@ -48,8 +48,7 @@ impl ThinClient { transaction_count: 0, balances: HashMap::new(), signature_status: false, - }; - client + } } pub fn recv_response(&self) -> io::Result { @@ -60,8 +59,8 @@ impl ThinClient { deserialize(&buf).or_else(|_| Err(io::Error::new(io::ErrorKind::Other, "deserialize"))) } - pub fn process_response(&mut self, resp: Response) { - match resp { + pub fn process_response(&mut self, resp: &Response) { + match *resp { Response::Balance { key, val } => { trace!("Response balance {:?} {:?}", key, val); self.balances.insert(key, val); @@ -76,13 +75,10 @@ impl ThinClient { } Response::SignatureStatus { signature_status } => { self.signature_status = signature_status; - match signature_status { - true => { - trace!("Response found signature"); - } - false => { - trace!("Response signature not found"); - } + if signature_status { + trace!("Response found signature"); + } else { + trace!("Response signature not found"); } } } @@ -90,7 +86,7 @@ impl ThinClient { /// Send a signed Transaction to the server for processing. This method /// does not wait for a response. - pub fn transfer_signed(&self, tx: Transaction) -> io::Result { + pub fn transfer_signed(&self, tx: &Transaction) -> io::Result { let data = serialize(&tx).expect("serialize Transaction in pub fn transfer_signed"); self.transactions_socket .send_to(&data, &self.transactions_addr) @@ -107,7 +103,7 @@ impl ThinClient { let now = Instant::now(); let tx = Transaction::new(keypair, to, n, *last_id); let sig = tx.sig; - let result = self.transfer_signed(tx).map(|_| sig); + let result = self.transfer_signed(&tx).map(|_| sig); metrics::submit( influxdb::Point::new("thinclient") .add_tag("op", influxdb::Value::String("transfer".to_string())) @@ -137,12 +133,12 @@ impl ThinClient { if let Response::Balance { key, .. } = &resp { done = key == pubkey; } - self.process_response(resp); + self.process_response(&resp); } self.balances .get(pubkey) - .map(|x| *x) - .ok_or(io::Error::new(io::ErrorKind::Other, "nokey")) + .cloned() + .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "nokey")) } /// Request the transaction count. If the response packet is dropped by the network, @@ -160,10 +156,10 @@ impl ThinClient { if let Ok(resp) = self.recv_response() { info!("recv_response {:?}", resp); - if let &Response::TransactionCount { .. } = &resp { + if let Response::TransactionCount { .. } = resp { done = true; } - self.process_response(resp); + self.process_response(&resp); } } self.transaction_count @@ -184,10 +180,10 @@ impl ThinClient { match self.recv_response() { Ok(resp) => { - if let &Response::LastId { .. } = &resp { + if let Response::LastId { .. } = resp { done = true; } - self.process_response(resp); + self.process_response(&resp); } Err(e) => { debug!("thin_client get_last_id error: {}", e); @@ -232,10 +228,10 @@ impl ThinClient { .expect("buffer error in pub fn get_last_id"); if let Ok(resp) = self.recv_response() { - if let &Response::SignatureStatus { .. } = &resp { + if let Response::SignatureStatus { .. } = resp { done = true; } - self.process_response(resp); + self.process_response(&resp); } } metrics::submit( @@ -355,7 +351,7 @@ mod tests { let tx = Transaction::new(&alice.keypair(), bob_pubkey, 500, last_id); - let _sig = client.transfer_signed(tx).unwrap(); + let _sig = client.transfer_signed(&tx).unwrap(); let last_id = client.get_last_id(); @@ -364,7 +360,7 @@ mod tests { contract.tokens = 502; contract.plan = Plan::Budget(Budget::new_payment(502, bob_pubkey)); } - let _sig = client.transfer_signed(tr2).unwrap(); + let _sig = client.transfer_signed(&tr2).unwrap(); let balance = client.poll_get_balance(&bob_pubkey); assert_eq!(balance.unwrap(), 500); diff --git a/src/timing.rs b/src/timing.rs index 3ae7bf8c78f1b3..4c7eeae4f89b05 100644 --- a/src/timing.rs +++ b/src/timing.rs @@ -3,20 +3,20 @@ use std::time::Duration; use std::time::{SystemTime, UNIX_EPOCH}; pub fn duration_as_us(d: &Duration) -> u64 { - return (d.as_secs() * 1000 * 1000) + (d.subsec_nanos() as u64 / 1_000); + (d.as_secs() * 1000 * 1000) + (u64::from(d.subsec_nanos()) / 1_000) } pub fn duration_as_ms(d: &Duration) -> u64 { - return (d.as_secs() * 1000) + (d.subsec_nanos() as u64 / 1_000_000); + (d.as_secs() * 1000) + (u64::from(d.subsec_nanos()) / 1_000_000) } pub fn duration_as_s(d: &Duration) -> f32 { - return d.as_secs() as f32 + (d.subsec_nanos() as f32 / 1_000_000_000.0); + d.as_secs() as f32 + (d.subsec_nanos() as f32 / 1_000_000_000.0) } pub fn timestamp() -> u64 { let now = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("create timestamp in timing"); - return duration_as_ms(&now); + duration_as_ms(&now) } diff --git a/src/tpu.rs b/src/tpu.rs index c0d14313c1ba0e..3119d26179193f 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -52,11 +52,11 @@ pub struct Tpu { impl Tpu { pub fn new( - bank: Arc, - crdt: Arc>, + bank: &Arc, + crdt: &Arc>, tick_duration: Option, transactions_socket: UdpSocket, - blob_recycler: BlobRecycler, + blob_recycler: &BlobRecycler, exit: Arc, writer: W, ) -> (Self, BlobReceiver) { diff --git a/src/tvu.rs b/src/tvu.rs index 4469bf73f93cde..7d51168e2e1ea3 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -89,11 +89,11 @@ impl Tvu { //the packets coming out of blob_receiver need to be sent to the GPU and verified //then sent to the window, which does the erasure coding reconstruction let (window_stage, blob_window_receiver) = WindowStage::new( - crdt.clone(), + &crdt.clone(), window, entry_height, retransmit_socket, - blob_recycler.clone(), + &blob_recycler.clone(), blob_fetch_receiver, ); diff --git a/src/voting.rs b/src/voting.rs index a32d202dc5365a..56bb10a5cd4df4 100644 --- a/src/voting.rs +++ b/src/voting.rs @@ -3,17 +3,15 @@ use hash::Hash; use signature::PublicKey; use transaction::{Instruction, Vote}; -pub fn entries_to_votes(entries: &Vec) -> Vec<(PublicKey, Vote, Hash)> { +pub fn entries_to_votes(entries: &[Entry]) -> Vec<(PublicKey, Vote, Hash)> { entries .iter() .flat_map(|entry| { let vs: Vec<(PublicKey, Vote, Hash)> = entry .transactions .iter() - .filter_map(|tx| match &tx.instruction { - &Instruction::NewVote(ref vote) => { - Some((tx.from.clone(), vote.clone(), tx.last_id.clone())) - } + .filter_map(|tx| match tx.instruction { + Instruction::NewVote(ref vote) => Some((tx.from, vote.clone(), tx.last_id)), _ => None, }) .collect(); diff --git a/src/window_stage.rs b/src/window_stage.rs index 74a0853de2a42f..244740d6ca0191 100644 --- a/src/window_stage.rs +++ b/src/window_stage.rs @@ -15,11 +15,11 @@ pub struct WindowStage { impl WindowStage { pub fn new( - crdt: Arc>, + crdt: &Arc>, window: Window, entry_height: u64, retransmit_socket: UdpSocket, - blob_recycler: BlobRecycler, + blob_recycler: &BlobRecycler, fetch_stage_receiver: BlobReceiver, ) -> (Self, BlobReceiver) { let (retransmit_sender, retransmit_receiver) = channel(); diff --git a/src/write_stage.rs b/src/write_stage.rs index 9e805623a28121..e0f62d06111f88 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -35,7 +35,7 @@ impl WriteStage { ) -> Result<()> { let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; let votes = entries_to_votes(&entries); - crdt.write().unwrap().insert_votes(votes); + crdt.write().unwrap().insert_votes(&votes); entry_writer.write_and_register_entries(&entries)?; trace!("New blobs? {}", entries.len()); let mut blobs = VecDeque::new();