Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BlobFetchStage cleanup post shred work #6254

Merged
merged 1 commit into from
Oct 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@

pub mod bank_forks;
pub mod banking_stage;
pub mod blob_fetch_stage;
pub mod broadcast_stage;
pub mod chacha;
pub mod chacha_cuda;
pub mod cluster_info_vote_listener;
pub mod confidence;
pub mod perf_libs;
pub mod recycler;
pub mod shred_fetch_stage;
#[macro_use]
pub mod contact_info;
pub mod crds;
Expand Down
4 changes: 2 additions & 2 deletions core/src/replicator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::blob_fetch_stage::BlobFetchStage;
use crate::blocktree::Blocktree;
use crate::chacha::{chacha_cbc_encrypt_ledger, CHACHA_BLOCK_SIZE};
use crate::cluster_info::{ClusterInfo, Node, FULLNODE_PORT_RANGE};
Expand All @@ -12,6 +11,7 @@ use crate::repair_service::{RepairService, RepairSlotRange, RepairStrategy};
use crate::result::{Error, Result};
use crate::service::Service;
use crate::shred::Shred;
use crate::shred_fetch_stage::ShredFetchStage;
use crate::storage_stage::NUM_STORAGE_SAMPLES;
use crate::streamer::{receiver, responder, PacketReceiver};
use crate::window_service::WindowService;
Expand Down Expand Up @@ -263,7 +263,7 @@ impl Replicator {
.map(Arc::new)
.collect();
let (blob_fetch_sender, blob_fetch_receiver) = channel();
let fetch_stage = BlobFetchStage::new_multi_socket_packet(
let fetch_stage = ShredFetchStage::new_multi_socket(
blob_sockets,
blob_forward_sockets,
&blob_fetch_sender,
Expand Down
28 changes: 6 additions & 22 deletions core/src/blob_fetch_stage.rs → core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,21 @@
//! The `blob_fetch_stage` pulls blobs from UDP sockets and sends it to a channel.
//! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel.

use crate::recycler::Recycler;
use crate::result;
use crate::result::Error;
use crate::service::Service;
use crate::streamer::{self, BlobSender, PacketReceiver, PacketSender};
use crate::streamer::{self, PacketReceiver, PacketSender};
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::{channel, RecvTimeoutError};
use std::sync::Arc;
use std::thread::{self, Builder, JoinHandle};

pub struct BlobFetchStage {
pub struct ShredFetchStage {
thread_hdls: Vec<JoinHandle<()>>,
}

impl BlobFetchStage {
pub fn new(socket: Arc<UdpSocket>, sender: &BlobSender, exit: &Arc<AtomicBool>) -> Self {
Self::new_multi_socket(vec![socket], sender, exit)
}
pub fn new_multi_socket(
sockets: Vec<Arc<UdpSocket>>,
sender: &BlobSender,
exit: &Arc<AtomicBool>,
) -> Self {
let thread_hdls: Vec<_> = sockets
.into_iter()
.map(|socket| streamer::blob_receiver(socket, &exit, sender.clone()))
.collect();

Self { thread_hdls }
}

impl ShredFetchStage {
fn handle_forwarded_packets(
recvr: &PacketReceiver,
sendr: &PacketSender,
Expand All @@ -55,7 +39,7 @@ impl BlobFetchStage {
Ok(())
}

pub fn new_multi_socket_packet(
pub fn new_multi_socket(
sockets: Vec<Arc<UdpSocket>>,
forward_sockets: Vec<Arc<UdpSocket>>,
sender: &PacketSender,
Expand Down Expand Up @@ -106,7 +90,7 @@ impl BlobFetchStage {
}
}

impl Service for BlobFetchStage {
impl Service for ShredFetchStage {
type JoinReturnType = ();

fn join(self) -> thread::Result<()> {
Expand Down
6 changes: 3 additions & 3 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
//! - Generating the keys used to encrypt the ledger and sample it for storage mining.

use crate::bank_forks::BankForks;
use crate::blob_fetch_stage::BlobFetchStage;
use crate::blockstream_service::BlockstreamService;
use crate::blocktree::{Blocktree, CompletedSlotsReceiver};
use crate::cluster_info::ClusterInfo;
Expand All @@ -25,6 +24,7 @@ use crate::replay_stage::ReplayStage;
use crate::retransmit_stage::RetransmitStage;
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service;
use crate::shred_fetch_stage::ShredFetchStage;
use crate::snapshot_package::SnapshotPackagerService;
use crate::storage_stage::{StorageStage, StorageState};
use solana_sdk::pubkey::Pubkey;
Expand All @@ -37,7 +37,7 @@ use std::sync::{Arc, Mutex, RwLock};
use std::thread;

pub struct Tvu {
fetch_stage: BlobFetchStage,
fetch_stage: ShredFetchStage,
retransmit_stage: RetransmitStage,
replay_stage: ReplayStage,
blockstream_service: Option<BlockstreamService>,
Expand Down Expand Up @@ -104,7 +104,7 @@ impl Tvu {
blob_sockets.push(repair_socket.clone());
let blob_forward_sockets: Vec<Arc<UdpSocket>> =
tvu_forward_sockets.into_iter().map(Arc::new).collect();
let fetch_stage = BlobFetchStage::new_multi_socket_packet(
let fetch_stage = ShredFetchStage::new_multi_socket(
blob_sockets,
blob_forward_sockets,
&fetch_sender,
Expand Down