Skip to content

Commit

Permalink
BlobFetchStage cleanup post shred work (#6254)
Browse files Browse the repository at this point in the history
  • Loading branch information
pgarg66 authored Oct 7, 2019
1 parent 6662986 commit 17f169f
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 28 deletions.
2 changes: 1 addition & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
pub mod bank_forks;
pub mod banking_stage;
pub mod blob_fetch_stage;
pub mod broadcast_stage;
pub mod chacha;
pub mod chacha_cuda;
pub mod cluster_info_vote_listener;
pub mod confidence;
pub mod perf_libs;
pub mod recycler;
pub mod shred_fetch_stage;
#[macro_use]
pub mod contact_info;
pub mod crds;
Expand Down
4 changes: 2 additions & 2 deletions core/src/replicator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::blob_fetch_stage::BlobFetchStage;
use crate::blocktree::Blocktree;
use crate::chacha::{chacha_cbc_encrypt_ledger, CHACHA_BLOCK_SIZE};
use crate::cluster_info::{ClusterInfo, Node, FULLNODE_PORT_RANGE};
Expand All @@ -12,6 +11,7 @@ use crate::repair_service::{RepairService, RepairSlotRange, RepairStrategy};
use crate::result::{Error, Result};
use crate::service::Service;
use crate::shred::Shred;
use crate::shred_fetch_stage::ShredFetchStage;
use crate::storage_stage::NUM_STORAGE_SAMPLES;
use crate::streamer::{receiver, responder, PacketReceiver};
use crate::window_service::WindowService;
Expand Down Expand Up @@ -263,7 +263,7 @@ impl Replicator {
.map(Arc::new)
.collect();
let (blob_fetch_sender, blob_fetch_receiver) = channel();
let fetch_stage = BlobFetchStage::new_multi_socket_packet(
let fetch_stage = ShredFetchStage::new_multi_socket(
blob_sockets,
blob_forward_sockets,
&blob_fetch_sender,
Expand Down
28 changes: 6 additions & 22 deletions core/src/blob_fetch_stage.rs → core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,21 @@
//! The `blob_fetch_stage` pulls blobs from UDP sockets and sends it to a channel.
//! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel.
use crate::recycler::Recycler;
use crate::result;
use crate::result::Error;
use crate::service::Service;
use crate::streamer::{self, BlobSender, PacketReceiver, PacketSender};
use crate::streamer::{self, PacketReceiver, PacketSender};
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::{channel, RecvTimeoutError};
use std::sync::Arc;
use std::thread::{self, Builder, JoinHandle};

pub struct BlobFetchStage {
pub struct ShredFetchStage {
thread_hdls: Vec<JoinHandle<()>>,
}

impl BlobFetchStage {
pub fn new(socket: Arc<UdpSocket>, sender: &BlobSender, exit: &Arc<AtomicBool>) -> Self {
Self::new_multi_socket(vec![socket], sender, exit)
}
pub fn new_multi_socket(
sockets: Vec<Arc<UdpSocket>>,
sender: &BlobSender,
exit: &Arc<AtomicBool>,
) -> Self {
let thread_hdls: Vec<_> = sockets
.into_iter()
.map(|socket| streamer::blob_receiver(socket, &exit, sender.clone()))
.collect();

Self { thread_hdls }
}

impl ShredFetchStage {
fn handle_forwarded_packets(
recvr: &PacketReceiver,
sendr: &PacketSender,
Expand All @@ -55,7 +39,7 @@ impl BlobFetchStage {
Ok(())
}

pub fn new_multi_socket_packet(
pub fn new_multi_socket(
sockets: Vec<Arc<UdpSocket>>,
forward_sockets: Vec<Arc<UdpSocket>>,
sender: &PacketSender,
Expand Down Expand Up @@ -106,7 +90,7 @@ impl BlobFetchStage {
}
}

impl Service for BlobFetchStage {
impl Service for ShredFetchStage {
type JoinReturnType = ();

fn join(self) -> thread::Result<()> {
Expand Down
6 changes: 3 additions & 3 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
//! - Generating the keys used to encrypt the ledger and sample it for storage mining.
use crate::bank_forks::BankForks;
use crate::blob_fetch_stage::BlobFetchStage;
use crate::blockstream_service::BlockstreamService;
use crate::blocktree::{Blocktree, CompletedSlotsReceiver};
use crate::cluster_info::ClusterInfo;
Expand All @@ -25,6 +24,7 @@ use crate::replay_stage::ReplayStage;
use crate::retransmit_stage::RetransmitStage;
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service;
use crate::shred_fetch_stage::ShredFetchStage;
use crate::snapshot_package::SnapshotPackagerService;
use crate::storage_stage::{StorageStage, StorageState};
use solana_sdk::pubkey::Pubkey;
Expand All @@ -37,7 +37,7 @@ use std::sync::{Arc, Mutex, RwLock};
use std::thread;

pub struct Tvu {
fetch_stage: BlobFetchStage,
fetch_stage: ShredFetchStage,
retransmit_stage: RetransmitStage,
replay_stage: ReplayStage,
blockstream_service: Option<BlockstreamService>,
Expand Down Expand Up @@ -104,7 +104,7 @@ impl Tvu {
blob_sockets.push(repair_socket.clone());
let blob_forward_sockets: Vec<Arc<UdpSocket>> =
tvu_forward_sockets.into_iter().map(Arc::new).collect();
let fetch_stage = BlobFetchStage::new_multi_socket_packet(
let fetch_stage = ShredFetchStage::new_multi_socket(
blob_sockets,
blob_forward_sockets,
&fetch_sender,
Expand Down

0 comments on commit 17f169f

Please sign in to comment.