From 95551dbe57ee78066713b77eae3bb45ad8645bdc Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Mon, 7 Oct 2019 10:16:27 -0700 Subject: [PATCH] BlobFetchStage cleanup post shred work --- core/src/lib.rs | 2 +- core/src/replicator.rs | 4 +-- ...ob_fetch_stage.rs => shred_fetch_stage.rs} | 28 ++++--------------- core/src/tvu.rs | 6 ++-- 4 files changed, 12 insertions(+), 28 deletions(-) rename core/src/{blob_fetch_stage.rs => shred_fetch_stage.rs} (78%) diff --git a/core/src/lib.rs b/core/src/lib.rs index 3d74112e5684ca..1846ce72f2f4f5 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -7,7 +7,6 @@ pub mod bank_forks; pub mod banking_stage; -pub mod blob_fetch_stage; pub mod broadcast_stage; pub mod chacha; pub mod chacha_cuda; @@ -15,6 +14,7 @@ pub mod cluster_info_vote_listener; pub mod confidence; pub mod perf_libs; pub mod recycler; +pub mod shred_fetch_stage; #[macro_use] pub mod contact_info; pub mod crds; diff --git a/core/src/replicator.rs b/core/src/replicator.rs index 7f22f068fba9aa..0514046961560f 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -1,4 +1,3 @@ -use crate::blob_fetch_stage::BlobFetchStage; use crate::blocktree::Blocktree; use crate::chacha::{chacha_cbc_encrypt_ledger, CHACHA_BLOCK_SIZE}; use crate::cluster_info::{ClusterInfo, Node, FULLNODE_PORT_RANGE}; @@ -12,6 +11,7 @@ use crate::repair_service::{RepairService, RepairSlotRange, RepairStrategy}; use crate::result::{Error, Result}; use crate::service::Service; use crate::shred::Shred; +use crate::shred_fetch_stage::ShredFetchStage; use crate::storage_stage::NUM_STORAGE_SAMPLES; use crate::streamer::{receiver, responder, PacketReceiver}; use crate::window_service::WindowService; @@ -263,7 +263,7 @@ impl Replicator { .map(Arc::new) .collect(); let (blob_fetch_sender, blob_fetch_receiver) = channel(); - let fetch_stage = BlobFetchStage::new_multi_socket_packet( + let fetch_stage = ShredFetchStage::new_multi_socket( blob_sockets, blob_forward_sockets, &blob_fetch_sender, diff --git a/core/src/blob_fetch_stage.rs b/core/src/shred_fetch_stage.rs similarity index 78% rename from core/src/blob_fetch_stage.rs rename to core/src/shred_fetch_stage.rs index 74334742726843..778b9220265047 100644 --- a/core/src/blob_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -1,37 +1,21 @@ -//! The `blob_fetch_stage` pulls blobs from UDP sockets and sends it to a channel. +//! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel. use crate::recycler::Recycler; use crate::result; use crate::result::Error; use crate::service::Service; -use crate::streamer::{self, BlobSender, PacketReceiver, PacketSender}; +use crate::streamer::{self, PacketReceiver, PacketSender}; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::{channel, RecvTimeoutError}; use std::sync::Arc; use std::thread::{self, Builder, JoinHandle}; -pub struct BlobFetchStage { +pub struct ShredFetchStage { thread_hdls: Vec>, } -impl BlobFetchStage { - pub fn new(socket: Arc, sender: &BlobSender, exit: &Arc) -> Self { - Self::new_multi_socket(vec![socket], sender, exit) - } - pub fn new_multi_socket( - sockets: Vec>, - sender: &BlobSender, - exit: &Arc, - ) -> Self { - let thread_hdls: Vec<_> = sockets - .into_iter() - .map(|socket| streamer::blob_receiver(socket, &exit, sender.clone())) - .collect(); - - Self { thread_hdls } - } - +impl ShredFetchStage { fn handle_forwarded_packets( recvr: &PacketReceiver, sendr: &PacketSender, @@ -55,7 +39,7 @@ impl BlobFetchStage { Ok(()) } - pub fn new_multi_socket_packet( + pub fn new_multi_socket( sockets: Vec>, forward_sockets: Vec>, sender: &PacketSender, @@ -106,7 +90,7 @@ impl BlobFetchStage { } } -impl Service for BlobFetchStage { +impl Service for ShredFetchStage { type JoinReturnType = (); fn join(self) -> thread::Result<()> { diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 394cf873c1528c..dbe3e530d02613 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -13,7 +13,6 @@ //! - Generating the keys used to encrypt the ledger and sample it for storage mining. use crate::bank_forks::BankForks; -use crate::blob_fetch_stage::BlobFetchStage; use crate::blockstream_service::BlockstreamService; use crate::blocktree::{Blocktree, CompletedSlotsReceiver}; use crate::cluster_info::ClusterInfo; @@ -25,6 +24,7 @@ use crate::replay_stage::ReplayStage; use crate::retransmit_stage::RetransmitStage; use crate::rpc_subscriptions::RpcSubscriptions; use crate::service::Service; +use crate::shred_fetch_stage::ShredFetchStage; use crate::snapshot_package::SnapshotPackagerService; use crate::storage_stage::{StorageStage, StorageState}; use solana_sdk::pubkey::Pubkey; @@ -37,7 +37,7 @@ use std::sync::{Arc, Mutex, RwLock}; use std::thread; pub struct Tvu { - fetch_stage: BlobFetchStage, + fetch_stage: ShredFetchStage, retransmit_stage: RetransmitStage, replay_stage: ReplayStage, blockstream_service: Option, @@ -104,7 +104,7 @@ impl Tvu { blob_sockets.push(repair_socket.clone()); let blob_forward_sockets: Vec> = tvu_forward_sockets.into_iter().map(Arc::new).collect(); - let fetch_stage = BlobFetchStage::new_multi_socket_packet( + let fetch_stage = ShredFetchStage::new_multi_socket( blob_sockets, blob_forward_sockets, &fetch_sender,