Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReplayStage: No More Clone SanitizedTransaction #3058

Merged
merged 21 commits into from
Oct 4, 2024

Conversation

apfitzge
Copy link

@apfitzge apfitzge commented Oct 2, 2024

Problem

  • We clone the transactions in our batch in replay stage (in both unified scheduler and old scheduler)
  • In transition to RuntimeTransaction, we do not want to implement Clone for RuntimeTransaction
  • We don't really need to clone in ReplayStage and it's probably slowing us down quite a bit

Summary of Changes

  • Take ownership of entries in process_entries
  • Perform manual locking of entry transactions without creating batches
    • Simplification of lock-retry to detect self-conflicting transactions (no loop!)
  • Pass locked and owned entries to process_batches
  • UnifiedScheduler path:
    • manually unlock the transactions
    • schedule_execution takes ownership rather than cloning
  • Old path:
    • Flatten all locked entries into single vec. always batch/rebatch.
    • ^ batches borrow from flat, but take responsibility of unlocking

Fixes #

Copy link

@bw-solana bw-solana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't spot anything wrong, but I do have a couple of questions to help me understand what's happening

{
// unlock before sending to scheduler.
bank.unlock_accounts(transactions.iter().zip(lock_results.iter()));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it okay to unlock accounts here? Do they not need to complete execution?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unified scheduler does its own lock management.
@ryoqun correct me if I'm wrong, but afaik the batches in original code are dropped soon after (and unlocked) since scheduling here is non-blocking anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that's correct.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably outside the scope of this PR, but I could use an education on how this locking mechanism works. Like how do we make sure we execute things in the right order on replay? Do we guarantee the batches get executed in order once scheduled?

Copy link
Member

@ryoqun ryoqun Oct 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like how do we make sure we execute things in the right order on replay?

Unified scheduler realizes this with 2 things: the event loop and SchedulingStateMachine.

The event loop is high-level driver of the state machine, starting from receiving tasks (= transactions) via new_task_receiver, which is sent from blockstore:

select_biased! {
recv(finished_blocked_task_receiver) -> executed_task => {
let Some(executed_task) = Self::accumulate_result_with_timings(
&mut result_with_timings,
executed_task.expect("alive handler")
) else {
break 'nonaborted_main_loop;
};
state_machine.deschedule_task(&executed_task.task);
},
recv(dummy_unblocked_task_receiver) -> dummy => {
assert_matches!(dummy, Err(RecvError));
let task = state_machine
.schedule_next_unblocked_task()
.expect("unblocked task");
runnable_task_sender.send_payload(task).unwrap();
},
recv(new_task_receiver) -> message => {
assert!(!session_ending);
match message {
Ok(NewTaskPayload::Payload(task)) => {
sleepless_testing::at(CheckPoint::NewTask(task.task_index()));
if let Some(task) = state_machine.schedule_task(task) {
runnable_task_sender.send_aux_payload(task).unwrap();
}
}
Ok(NewTaskPayload::CloseSubchannel) => {
session_ending = true;
}
Ok(NewTaskPayload::OpenSubchannel(_context_and_result_with_timings)) =>
unreachable!(),
Err(RecvError) => {
// Mostly likely is that this scheduler is dropped for pruned blocks of
// abandoned forks...
// This short-circuiting is tested with test_scheduler_drop_short_circuiting.
break 'nonaborted_main_loop;
}
}
},
recv(finished_idle_task_receiver) -> executed_task => {
let Some(executed_task) = Self::accumulate_result_with_timings(
&mut result_with_timings,
executed_task.expect("alive handler")
) else {
break 'nonaborted_main_loop;
};
state_machine.deschedule_task(&executed_task.task);
},
};

and this is relevant impl code of SchedulingStateMachine:

/// Schedules given `task`, returning it if successful.
///
/// Returns `Some(task)` if it's immediately scheduled. Otherwise, returns `None`,
/// indicating the scheduled task is blocked currently.
///
/// Note that this function takes ownership of the task to allow for future optimizations.
#[must_use]
pub fn schedule_task(&mut self, task: Task) -> Option<Task> {
self.total_task_count.increment_self();
self.active_task_count.increment_self();
self.try_lock_usage_queues(task)
}
#[must_use]
pub fn schedule_next_unblocked_task(&mut self) -> Option<Task> {
self.unblocked_task_queue.pop_front().inspect(|_| {
self.unblocked_task_count.increment_self();
})
}
/// Deschedules given scheduled `task`.
///
/// This must be called exactly once for all scheduled tasks to uphold both
/// `SchedulingStateMachine` and `UsageQueue` internal state consistency at any given moment of
/// time. It's serious logic error to call this twice with the same task or none at all after
/// scheduling. Similarly, calling this with not scheduled task is also forbidden.
///
/// Note that this function intentionally doesn't take ownership of the task to avoid dropping
/// tasks inside `SchedulingStateMachine` to provide an offloading-based optimization
/// opportunity for callers.
pub fn deschedule_task(&mut self, task: &Task) {
self.active_task_count.decrement_self();
self.handled_task_count.increment_self();
self.unlock_usage_queues(task);
}
#[must_use]
fn try_lock_usage_queues(&mut self, task: Task) -> Option<Task> {
let mut blocked_usage_count = ShortCounter::zero();
for context in task.lock_contexts() {
context.with_usage_queue_mut(&mut self.usage_queue_token, |usage_queue| {
let lock_result = if usage_queue.has_no_blocked_usage() {
usage_queue.try_lock(context.requested_usage)
} else {
LockResult::Err(())
};
if let Err(()) = lock_result {
blocked_usage_count.increment_self();
let usage_from_task = (context.requested_usage, task.clone());
usage_queue.push_blocked_usage_from_task(usage_from_task);
}
});
}
// no blocked usage count means success
if blocked_usage_count.is_zero() {
Some(task)
} else {
task.set_blocked_usage_count(&mut self.count_token, blocked_usage_count);
None
}
}
fn unlock_usage_queues(&mut self, task: &Task) {
for context in task.lock_contexts() {
context.with_usage_queue_mut(&mut self.usage_queue_token, |usage_queue| {
let mut unblocked_task_from_queue = usage_queue.unlock(context.requested_usage);
while let Some((requested_usage, task_with_unblocked_queue)) =
unblocked_task_from_queue
{
// When `try_unblock()` returns `None` as a failure of unblocking this time,
// this means the task is still blocked by other active task's usages. So,
// don't push task into unblocked_task_queue yet. It can be assumed that every
// task will eventually succeed to be unblocked, and enter in this condition
// clause as long as `SchedulingStateMachine` is used correctly.
if let Some(task) = task_with_unblocked_queue.try_unblock(&mut self.count_token)
{
self.unblocked_task_queue.push_back(task);
}
match usage_queue.try_lock(requested_usage) {
LockResult::Ok(()) => {
// Try to further schedule blocked task for parallelism in the case of
// readonly usages
unblocked_task_from_queue =
if matches!(requested_usage, RequestedUsage::Readonly) {
usage_queue.pop_unblocked_readonly_usage_from_task()
} else {
None
};
}
LockResult::Err(()) => panic!("should never fail in this context"),
}
}
});
}
}

Do we guarantee the batches get executed in order once scheduled?

yes, while maximally parallelizing executing tasks. order here is the order transactions are sent over the crossbeam channel.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it uses crossbeam order, but it also verifies the index is never out of order for conflicts, iirc.

index order is guaranteed with the way we send process entries in order (until conflict)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, at the last minute, we removed the index verification: solana-labs#35286 (comment)

index order is guaranteed with the way we send process entries in order (until conflict)

this is true.

@@ -496,7 +507,7 @@ fn rebatch_and_execute_batches(
let target_batch_count = get_thread_count() as u64;

let mut tx_batches: Vec<TransactionBatchWithIndexes<SanitizedTransaction>> = vec![];
let rebatched_txs = if total_cost > target_batch_count.saturating_mul(minimal_tx_cost) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this change about?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What this was doing before was checkiing some minimum cost, then if below that use the original batches passed in instead of rebatching.

But in the new case, ownership was passed, so we don't have any original batches to fall back to. I opted to simplify and just always run the same logic.

Possible I could re-order the above "flatteniing logic" with total_cost check.
Then we could more easily keep this original "if {} else {}" rebatching - no need to flatten in the case its' below the threshold cost either.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think this messed up some tests because it rebatches unexpectedly. I'll take a shot at preserving the previous behavior

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restored: 8eae1cc

ledger/src/blockstore_processor.rs Outdated Show resolved Hide resolved
@@ -445,18 +446,18 @@ impl BankWithScheduler {
/// Calling this will panic if the installed scheduler is Unavailable (the bank is
/// wait_for_termination()-ed or the unified scheduler is disabled in the first place).
// 'a is needed; anonymous_lifetime_in_impl_trait isn't stabilized yet...
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you remove the last line of this comment, because 'a gone: // 'a is needed; anonymous_lifetime_in_impl_trait isn't stabilized yet...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 170 to 171
transaction: SanitizedTransaction,
index: usize,
Copy link
Member

@ryoqun ryoqun Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hehe, this is a remnant of never-shipped attempt of sneaking GAT into our code base... thanks for cleaning up.

Comment on lines 643 to 646
for hash in &tick_hashes {
bank.register_tick(hash);
}
tick_hashes.clear();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: for hash in tick_hashes.drain(..) for the api use pattern consistency with the batches vec?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apfitzge apfitzge self-assigned this Oct 3, 2024
@apfitzge apfitzge marked this pull request as ready for review October 3, 2024 14:27
@apfitzge
Copy link
Author

apfitzge commented Oct 3, 2024

did a force push since @yihau disabled the windows CI that was breaking. wasn't sure how else to get around it

) -> ScheduleResult {
let task = SchedulingStateMachine::create_task(transaction.clone(), index, &mut |pubkey| {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

byebye: .clone()

@@ -2672,7 +2673,9 @@ mod tests {
.take(10000)
{
let scheduler = pool.take_scheduler(context.clone());
scheduler.schedule_execution(&(dummy_tx, index)).unwrap();
scheduler
.schedule_execution(dummy_tx.clone(), index)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: seems this .clone() isn't needed anymore.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i miss clippy's redundant_clone... ;)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one is necessary because it's in a loop

Copy link
Member

@ryoqun ryoqun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm with a nit.

thanks for largely simplifying the api even around unified scheduler.

@apfitzge apfitzge merged commit 9496f28 into anza-xyz:master Oct 4, 2024
38 checks passed
@apfitzge apfitzge deleted the fix_this_shit branch October 4, 2024 18:14
ray-kast pushed a commit to abklabs/agave that referenced this pull request Nov 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants