Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Extricate RpcCompletedSlotsService from RetransmitStage (backport #18017
Browse files Browse the repository at this point in the history
) (#20294)

* Extricate RpcCompletedSlotsService from RetransmitStage

(cherry picked from commit fa04531)

# Conflicts:
#	core/src/replay_stage.rs
#	core/src/retransmit_stage.rs
#	core/src/tvu.rs
#	core/src/validator.rs

* removes backport merge conflicts

Co-authored-by: Michael Vines <[email protected]>
Co-authored-by: behzad nouri <[email protected]>
  • Loading branch information
3 people authored Sep 28, 2021
1 parent 55ccff7 commit 2f2948f
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 43 deletions.
22 changes: 4 additions & 18 deletions core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,12 @@ use {
solana_gossip::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT},
solana_ledger::{
shred::Shred,
{
blockstore::{Blockstore, CompletedSlotsReceiver},
leader_schedule_cache::LeaderScheduleCache,
},
{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache},
},
solana_measure::measure::Measure,
solana_metrics::inc_new_counter_error,
solana_perf::packet::Packets,
solana_rpc::{
max_slots::MaxSlots, rpc_completed_slots_service::RpcCompletedSlotsService,
rpc_subscriptions::RpcSubscriptions,
},
solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions},
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{
clock::Slot,
Expand Down Expand Up @@ -457,7 +451,6 @@ impl RetransmitStage {
repair_socket: Arc<UdpSocket>,
verified_receiver: Receiver<Vec<Packets>>,
exit: Arc<AtomicBool>,
rpc_completed_slots_receiver: CompletedSlotsReceiver,
cluster_slots_update_receiver: ClusterSlotsUpdateReceiver,
epoch_schedule: EpochSchedule,
cfg: Option<Arc<AtomicBool>>,
Expand All @@ -476,18 +469,16 @@ impl RetransmitStage {
let _retransmit_sender = retransmit_sender.clone();

let retransmit_receiver = Arc::new(Mutex::new(retransmit_receiver));
let t_retransmit = retransmitter(
let thread_hdls = retransmitter(
retransmit_sockets,
bank_forks.clone(),
leader_schedule_cache.clone(),
cluster_info.clone(),
retransmit_receiver,
max_slots,
rpc_subscriptions.clone(),
rpc_subscriptions,
);

let rpc_completed_slots_hdl =
RpcCompletedSlotsService::spawn(rpc_completed_slots_receiver, rpc_subscriptions);
let cluster_slots_service = ClusterSlotsService::new(
blockstore.clone(),
cluster_slots.clone(),
Expand Down Expand Up @@ -534,11 +525,6 @@ impl RetransmitStage {
duplicate_slots_sender,
);

let mut thread_hdls = t_retransmit;
if let Some(thread_hdl) = rpc_completed_slots_hdl {
thread_hdls.push(thread_hdl);
}

Self {
thread_hdls,
window_service,
Expand Down
7 changes: 1 addition & 6 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ use crate::{
use crossbeam_channel::unbounded;
use solana_gossip::cluster_info::ClusterInfo;
use solana_ledger::{
blockstore::{Blockstore, CompletedSlotsReceiver},
blockstore_processor::TransactionStatusSender,
blockstore::Blockstore, blockstore_processor::TransactionStatusSender,
leader_schedule_cache::LeaderScheduleCache,
};
use solana_poh::poh_recorder::PohRecorder;
Expand Down Expand Up @@ -115,7 +114,6 @@ impl Tvu {
tower: Tower,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
exit: &Arc<AtomicBool>,
completed_slots_receiver: CompletedSlotsReceiver,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
cfg: Option<Arc<AtomicBool>>,
transaction_status_sender: Option<TransactionStatusSender>,
Expand Down Expand Up @@ -179,7 +177,6 @@ impl Tvu {
repair_socket,
verified_receiver,
exit.clone(),
completed_slots_receiver,
cluster_slots_update_receiver,
*bank_forks.read().unwrap().working_bank().epoch_schedule(),
cfg,
Expand Down Expand Up @@ -393,7 +390,6 @@ pub mod tests {
let BlockstoreSignals {
blockstore,
ledger_signal_receiver,
completed_slots_receiver,
..
} = Blockstore::open_with_signal(&blockstore_path, None, true)
.expect("Expected to successfully open ledger");
Expand Down Expand Up @@ -437,7 +433,6 @@ pub mod tests {
tower,
&leader_schedule_cache,
&exit,
completed_slots_receiver,
block_commitment_cache,
None,
None,
Expand Down
14 changes: 12 additions & 2 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use {
OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker,
},
rpc::JsonRpcConfig,
rpc_completed_slots_service::RpcCompletedSlotsService,
rpc_pubsub_service::{PubSubConfig, PubSubService},
rpc_service::JsonRpcService,
rpc_subscriptions::RpcSubscriptions,
Expand Down Expand Up @@ -84,7 +85,7 @@ use {
mpsc::Receiver,
Arc, Mutex, RwLock,
},
thread::{sleep, Builder},
thread::{sleep, Builder, JoinHandle},
time::{Duration, Instant},
},
};
Expand Down Expand Up @@ -248,6 +249,7 @@ pub struct Validator {
validator_exit: Arc<RwLock<Exit>>,
json_rpc_service: Option<JsonRpcService>,
pubsub_service: Option<PubSubService>,
rpc_completed_slots_service: JoinHandle<()>,
optimistically_confirmed_bank_tracker: Option<OptimisticallyConfirmedBankTracker>,
transaction_status_service: Option<TransactionStatusService>,
rewards_recorder_service: Option<RewardsRecorderService>,
Expand Down Expand Up @@ -681,6 +683,10 @@ impl Validator {
let (verified_vote_sender, verified_vote_receiver) = unbounded();
let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
let (cluster_confirmed_slot_sender, cluster_confirmed_slot_receiver) = unbounded();

let rpc_completed_slots_service =
RpcCompletedSlotsService::spawn(completed_slots_receiver, rpc_subscriptions.clone());

let tvu = Tvu::new(
vote_account,
authorized_voter_keypairs,
Expand Down Expand Up @@ -718,7 +724,6 @@ impl Validator {
tower,
&leader_schedule_cache,
&exit,
completed_slots_receiver,
block_commitment_cache,
config.enable_partition.clone(),
transaction_status_sender.clone(),
Expand Down Expand Up @@ -784,6 +789,7 @@ impl Validator {
serve_repair_service,
json_rpc_service,
pubsub_service,
rpc_completed_slots_service,
optimistically_confirmed_bank_tracker,
transaction_status_service,
rewards_recorder_service,
Expand Down Expand Up @@ -847,6 +853,10 @@ impl Validator {
pubsub_service.join().expect("pubsub_service");
}

self.rpc_completed_slots_service
.join()
.expect("rpc_completed_slots_service");

if let Some(optimistically_confirmed_bank_tracker) =
self.optimistically_confirmed_bank_tracker
{
Expand Down
31 changes: 14 additions & 17 deletions rpc/src/rpc_completed_slots_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,20 @@ pub struct RpcCompletedSlotsService;
impl RpcCompletedSlotsService {
pub fn spawn(
completed_slots_receiver: CompletedSlotsReceiver,
rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
) -> Option<JoinHandle<()>> {
let rpc_subscriptions = rpc_subscriptions?;
Some(
Builder::new()
.name("solana-rpc-completed-slots-service".to_string())
.spawn(move || {
for slots in completed_slots_receiver.iter() {
for slot in slots {
rpc_subscriptions.notify_slot_update(SlotUpdate::Completed {
slot,
timestamp: timestamp(),
});
}
rpc_subscriptions: Arc<RpcSubscriptions>,
) -> JoinHandle<()> {
Builder::new()
.name("solana-rpc-completed-slots-service".to_string())
.spawn(move || {
for slots in completed_slots_receiver.iter() {
for slot in slots {
rpc_subscriptions.notify_slot_update(SlotUpdate::Completed {
slot,
timestamp: timestamp(),
});
}
})
.unwrap(),
)
}
})
.unwrap()
}
}

0 comments on commit 2f2948f

Please sign in to comment.