Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Blockstore should drop signals before validator exit #24025

Merged
merged 28 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ pub struct Validator {
ip_echo_server: Option<solana_net_utils::IpEchoServer>,
pub cluster_info: Arc<ClusterInfo>,
pub bank_forks: Arc<RwLock<BankForks>>,
pub blockstore: Arc<Blockstore>,
accountsdb_repl_service: Option<AccountsDbReplService>,
geyser_plugin_service: Option<GeyserPluginService>,
}
Expand Down Expand Up @@ -656,7 +657,7 @@ impl Validator {
bank.ticks_per_slot(),
&id,
&blockstore,
blockstore.new_shreds_signals.first().cloned(),
blockstore.get_new_shred_signal(0),
&leader_schedule_cache,
&poh_config,
Some(poh_timing_point_sender),
Expand Down Expand Up @@ -856,7 +857,7 @@ impl Validator {
record_receiver,
);
assert_eq!(
blockstore.new_shreds_signals.len(),
blockstore.get_new_shred_signals_len(),
1,
"New shred signal for the TVU should be the same as the clear bank signal."
);
Expand Down Expand Up @@ -994,6 +995,7 @@ impl Validator {
validator_exit: config.validator_exit.clone(),
cluster_info,
bank_forks,
blockstore: blockstore.clone(),
accountsdb_repl_service,
geyser_plugin_service,
}
Expand All @@ -1002,6 +1004,9 @@ impl Validator {
// Used for notifying many nodes in parallel to exit
pub fn exit(&mut self) {
self.validator_exit.write().unwrap().exit();

// drop all signals in blockstore
self.blockstore.drop_signal();
}

pub fn close(mut self) {
Expand Down
41 changes: 31 additions & 10 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ pub struct Blockstore {
bank_hash_cf: LedgerColumn<cf::BankHash>,
last_root: RwLock<Slot>,
insert_shreds_lock: Mutex<()>,
pub new_shreds_signals: Vec<Sender<bool>>,
pub completed_slots_senders: Vec<CompletedSlotsSender>,
new_shreds_signals: Mutex<Vec<Sender<bool>>>,
completed_slots_senders: Mutex<Vec<CompletedSlotsSender>>,
pub shred_timing_point_sender: Option<PohTimingSender>,
pub lowest_cleanup_slot: RwLock<Slot>,
no_compaction: bool,
Expand Down Expand Up @@ -444,8 +444,8 @@ impl Blockstore {
block_height_cf,
program_costs_cf,
bank_hash_cf,
new_shreds_signals: vec![],
completed_slots_senders: vec![],
new_shreds_signals: Mutex::default(),
completed_slots_senders: Mutex::default(),
shred_timing_point_sender: None,
insert_shreds_lock: Mutex::<()>::default(),
last_root,
Expand All @@ -463,13 +463,13 @@ impl Blockstore {
ledger_path: &Path,
options: BlockstoreOptions,
) -> Result<BlockstoreSignals> {
let mut blockstore = Self::open_with_options(ledger_path, options)?;
let blockstore = Self::open_with_options(ledger_path, options)?;
let (ledger_signal_sender, ledger_signal_receiver) = bounded(1);
let (completed_slots_sender, completed_slots_receiver) =
bounded(MAX_COMPLETED_SLOTS_IN_CHANNEL);

blockstore.new_shreds_signals = vec![ledger_signal_sender];
blockstore.completed_slots_senders = vec![completed_slots_sender];
blockstore.add_new_shred_signal(ledger_signal_sender);
blockstore.add_completed_slots_signal(completed_slots_sender);

Ok(BlockstoreSignals {
blockstore,
Expand Down Expand Up @@ -1027,7 +1027,7 @@ impl Blockstore {
let mut start = Measure::start("Commit Working Sets");
let (should_signal, newly_completed_slots) = commit_slot_meta_working_set(
&slot_meta_working_set,
&self.completed_slots_senders,
&self.completed_slots_senders.lock().unwrap(),
&mut write_batch,
)?;

Expand All @@ -1049,8 +1049,8 @@ impl Blockstore {
metrics.write_batch_elapsed += start.as_us();

send_signals(
&self.new_shreds_signals,
&self.completed_slots_senders,
&self.new_shreds_signals.lock().unwrap(),
&self.completed_slots_senders.lock().unwrap(),
should_signal,
newly_completed_slots,
);
Expand All @@ -1063,6 +1063,27 @@ impl Blockstore {
Ok((newly_completed_data_sets, inserted_indices))
}

pub fn add_new_shred_signal(&self, s: Sender<bool>) {
self.new_shreds_signals.lock().unwrap().push(s);
}

pub fn add_completed_slots_signal(&self, s: CompletedSlotsSender) {
self.completed_slots_senders.lock().unwrap().push(s);
}

pub fn get_new_shred_signals_len(&self) -> usize {
self.new_shreds_signals.lock().unwrap().len()
}

pub fn get_new_shred_signal(&self, index: usize) -> Option<Sender<bool>> {
self.new_shreds_signals.lock().unwrap().get(index).cloned()
}

pub fn drop_signal(&self) {
self.new_shreds_signals.lock().unwrap().clear();
self.completed_slots_senders.lock().unwrap().clear();
}

/// Range-delete all entries which prefix matches the specified `slot` and
/// clear all the related `SlotMeta` except its next_slots.
///
Expand Down