Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Add InstalledScheduler for blockstore_processor #33875

Merged
merged 5 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ memmap2 = "0.5.10"
memoffset = "0.9"
merlin = "3"
min-max-heap = "1.3.0"
mockall = "0.11.4"
modular-bitfield = "0.11.2"
nix = "0.26.4"
num-bigint = "0.4.4"
Expand Down
180 changes: 146 additions & 34 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,70 @@ fn execute_batches_internal(
})
}

// This fn diverts the code-path into two variants. Both must provide exactly the same set of
// validations. For this reason, this fn is deliberately inserted into the code path to be called
Comment on lines +297 to +298
Copy link
Contributor Author

@ryoqun ryoqun Oct 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both must provide exactly the same set of validations

I'm aware that apply_cost_tracker_during_replay needs to be handled for unified scheduler ;)

// inside process_entries(), so that Bank::prepare_sanitized_batch() has been called on all of
// batches already, while minimizing code duplication (thus divergent behavior risk) at the cost of
// acceptable overhead of meaningless buffering of batches for the scheduler variant.
//
// Also note that the scheduler variant can't implement the batch-level sanitization naively, due
// to the nature of individual tx processing. That's another reason of this particular placement of
// divergent point in the code-path (i.e. not one layer up with its own prepare_sanitized_batch()
// invocation).
fn process_batches(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this fn is a drop-in replacement for the previous execute_batches.

In process_entries we still call this drop-in at the same times we called execute_batches, i.e. tick-boundaries, entry conflicts, end of loop.
Scheduling doesn't block on execution, and also cannot fail w/ an error - how does replay stage become aware of invalid block state without this fn returning it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with that in mind, i'm curious why the switch was introduced here instead of one layer up. Is there a benefit in collecting the until conflict/tick before scheduling?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, thanks for well-deserved good questions! hope this in-source comments should answer to both of your questions: 2897280

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so? I think the main reason is we want to share most of the same code since there are checks on non-self-conflicting entries. And since scheduling doesn't block on execution, there shouldn't be too much overhead from just collecting the multiple entries instead of scheduling them as they come in?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. that's correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi, i a bit improved the in-source comment: 6e63d87

bank: &BankWithScheduler,
batches: &[TransactionBatchWithIndexes],
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
batch_execution_timing: &mut BatchExecutionTiming,
log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: &PrioritizationFeeCache,
) -> Result<()> {
if bank.has_installed_scheduler() {
debug!(
"process_batches()/schedule_batches_for_execution({} batches)",
batches.len()
);
// scheduling always succeeds here without being blocked on actual transaction executions.
// The transaction execution errors will be collected via the blocking fn called
// BankWithScheduler::wait_for_completed_scheduler(), if any.
Copy link
Contributor Author

@ryoqun ryoqun Oct 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi, wait_for_completed_scheduler() doesn't exist in the master branch. yet written as if it exists to avoid needless future rewrite. that fn will be added just by next pr..

schedule_batches_for_execution(bank, batches);
Ok(())
} else {
debug!(
"process_batches()/rebatch_and_execute_batches({} batches)",
batches.len()
);
rebatch_and_execute_batches(
bank,
batches,
transaction_status_sender,
replay_vote_sender,
batch_execution_timing,
log_messages_bytes_limit,
prioritization_fee_cache,
)
}
}

fn schedule_batches_for_execution(
bank: &BankWithScheduler,
batches: &[TransactionBatchWithIndexes],
) {
for TransactionBatchWithIndexes {
batch,
transaction_indexes,
} in batches
{
bank.schedule_transaction_executions(
batch
.sanitized_transactions()
.iter()
.zip(transaction_indexes.iter()),
);
}
}

fn rebatch_transactions<'a>(
lock_results: &'a [Result<()>],
bank: &'a Arc<Bank>,
Expand All @@ -314,7 +378,7 @@ fn rebatch_transactions<'a>(
}
}

fn execute_batches(
fn rebatch_and_execute_batches(
bank: &Arc<Bank>,
batches: &[TransactionBatchWithIndexes],
transaction_status_sender: Option<&TransactionStatusSender>,
Expand Down Expand Up @@ -488,7 +552,7 @@ fn process_entries(
if bank.is_block_boundary(bank.tick_height() + tick_hashes.len() as u64) {
// If it's a tick that will cause a new blockhash to be created,
// execute the group and register the tick
execute_batches(
process_batches(
bank,
&batches,
transaction_status_sender,
Expand Down Expand Up @@ -541,7 +605,7 @@ fn process_entries(
} else {
// else we have an entry that conflicts with a prior entry
// execute the current queue and try to process this entry again
execute_batches(
process_batches(
bank,
&batches,
transaction_status_sender,
Expand All @@ -556,7 +620,7 @@ fn process_entries(
}
}
}
execute_batches(
process_batches(
bank,
&batches,
transaction_status_sender,
Expand Down Expand Up @@ -1856,8 +1920,11 @@ pub mod tests {
rand::{thread_rng, Rng},
solana_entry::entry::{create_ticks, next_entry, next_entry_mut},
solana_program_runtime::declare_process_instruction,
solana_runtime::genesis_utils::{
self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs,
solana_runtime::{
genesis_utils::{
self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs,
},
installed_scheduler_pool::MockInstalledScheduler,
},
solana_sdk::{
account::{AccountSharedData, WritableAccount},
Expand Down Expand Up @@ -4245,6 +4312,38 @@ pub mod tests {
)
}

fn create_test_transactions(
mint_keypair: &Keypair,
genesis_hash: &Hash,
) -> Vec<SanitizedTransaction> {
let pubkey = solana_sdk::pubkey::new_rand();
let keypair2 = Keypair::new();
let pubkey2 = solana_sdk::pubkey::new_rand();
let keypair3 = Keypair::new();
let pubkey3 = solana_sdk::pubkey::new_rand();

vec![
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
mint_keypair,
&pubkey,
1,
*genesis_hash,
)),
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&keypair2,
&pubkey2,
1,
*genesis_hash,
)),
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&keypair3,
&pubkey3,
1,
*genesis_hash,
)),
]
}

#[test]
fn test_confirm_slot_entries_progress_num_txs_indexes() {
let GenesisConfigInfo {
Expand Down Expand Up @@ -4368,34 +4467,7 @@ pub mod tests {
..
} = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));

let pubkey = solana_sdk::pubkey::new_rand();
let keypair2 = Keypair::new();
let pubkey2 = solana_sdk::pubkey::new_rand();
let keypair3 = Keypair::new();
let pubkey3 = solana_sdk::pubkey::new_rand();

let txs = vec![
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_config.hash(),
)),
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&keypair2,
&pubkey2,
1,
genesis_config.hash(),
)),
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&keypair3,
&pubkey3,
1,
genesis_config.hash(),
)),
];

let txs = create_test_transactions(&mint_keypair, &genesis_config.hash());
let batch = bank.prepare_sanitized_batch(&txs);
assert!(batch.needs_unlock());
let transaction_indexes = vec![42, 43, 44];
Expand Down Expand Up @@ -4424,6 +4496,46 @@ pub mod tests {
assert_eq!(batch3.transaction_indexes, vec![43, 44]);
}

#[test]
fn test_schedule_batches_for_execution() {
solana_logger::setup();
let dummy_leader_pubkey = solana_sdk::pubkey::new_rand();
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));

let txs = create_test_transactions(&mint_keypair, &genesis_config.hash());

let mut mocked_scheduler = MockInstalledScheduler::new();
mocked_scheduler
.expect_schedule_execution()
.times(txs.len())
.returning(|_| ());
let bank = BankWithScheduler::new(bank, Some(Box::new(mocked_scheduler)));

let batch = bank.prepare_sanitized_batch(&txs);
let batch_with_indexes = TransactionBatchWithIndexes {
batch,
transaction_indexes: (0..txs.len()).collect(),
};

let mut batch_execution_timing = BatchExecutionTiming::default();
let ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64);
assert!(process_batches(
&bank,
&[batch_with_indexes],
None,
None,
&mut batch_execution_timing,
None,
&ignored_prioritization_fee_cache
)
.is_ok());
}

#[test]
fn test_confirm_slot_entries_with_fix() {
const HASHES_PER_TICK: u64 = 10;
Expand Down
Loading