Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transaction Scheduler Prototype #25389

Closed
wants to merge 19 commits into from
Closed

Conversation

buffalu
Copy link
Contributor

@buffalu buffalu commented May 20, 2022

About

This is a prototype for a transaction scheduler for BankingStage (and future transaction stages).

The very basic functionality of this scheduler includes scheduling sets of transactions which are validated and can be executed in parallel. It contains a thread for each type of transaction consumer and shares AccountsLocks with all of the stages to ensure no account contention. The general idea is to ensure that all transactions in the batch will get executed with extremely high level of certainty.

Basic configuration can be passed in to set the backlog size. By default, this scheduler behaves as a more parallelizable version of the current scheduler and addresses #22096. Another flag exists in the configuration which allows one to do state fee auctions similar to #23438. It does this by comparing transactions that are currently locked and checking against RW locks + fees of blocked accounts to ensure that the per-account fee market is respected.

Next Steps

This needs to be tested and the efficiency needs to be improved more. The best solution would probably require a pretty significant rewrite of banking stage, so wanted to get eyes on this + get thoughts.

@mergify mergify bot added the community Community contribution label May 20, 2022
@mergify mergify bot requested a review from a team May 20, 2022 00:41
Err(e) => {
trace!("e: {:?}", e);
match e {
SchedulerError::InvalidSanitizedTransaction => {
Copy link
Contributor

@segfaultdoc segfaultdoc May 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
SchedulerError::InvalidSanitizedTransaction => {
SchedulerError::InvalidSanitizedTransaction
| SchedulerError::InvalidTransactionFormat(_)
| SchedulerError::TransactionCheckFailed(_) => {
// non-recoverable error, drop the packet
continue;
}

// non-recoverable error, drop the packet
continue;
}
SchedulerError::AccountInUse => {
Copy link
Contributor

@segfaultdoc segfaultdoc May 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
SchedulerError::AccountInUse => {
SchedulerError::AccountInUse | SchedulerError::AccountBlocked(_) => {
// need to reschedule
rescheduled_packets.push(deserialized_packet);
}

}

fn can_lock_accounts(
account_locks: &AccountLocks,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why not wrap this in a MutexGuard to be explicit that call-site has acquired a lock on this object?

}

fn check_accounts_not_blocked(
account_locks: &AccountLocks,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto here re: MutexGuard

match highest_rl_blocked_account_fees.entry(**acc) {
Entry::Occupied(mut e) => {
if priority > *e.get() {
// NOTE: this should never be the case!
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to make sure I understand correctly, this should never be the case because get_scheduled_batch is popping off max priority txs and these maps don't outlive that function? i.e. the earlier iterations have already inserted higher priority to pubkey mappings

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah!

Ok(())
}

fn get_scheduled_batch(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function is worthy of a few comments above declaration. I understand it as:

// 1. pop highest priority pkt until requested batch size is reached
// 2. push it onto the sanitized queue if there's not a higher weighted tx scheduled OR currently executing with conflicting state access
// 3. push conflicting lower priority txs onto a reschedule queue
// 4. drop any bad pkts

let priority = deserialized_packet.immutable_section().priority();

{
let mut scheduled_accounts_l = scheduled_accounts.lock().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps can avoid grabbing this lock until absolutely necessary by pulling the Self::can_lock_accounts(account_locks, writable_keys, readonly_keys)?; call out of check_accounts_not_blocked and have the call-site down below before Self::lock_accounts(..). Maybe squeeze some juice when multiple banking threads request batches

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree! this lock will prob be held too long in current code and would need to see how long all the checks take

@segfaultdoc
Copy link
Contributor

bullish

@@ -64,33 +64,33 @@ pub type PubkeyAccountSlot = (Pubkey, AccountSharedData, Slot);

#[derive(Debug, Default, AbiExample)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore this file for now

SchedulerError::AccountInUse | SchedulerError::AccountBlocked(_) => {
// need to reschedule
// error!("e: {}", e);
rescheduled_packets.push(deserialized_packet);
Copy link
Contributor Author

@buffalu buffalu May 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to optimize this pop then push a little more. something about keeping blocked transactions somewhere else + linking rw accounts easily w/ fees. when txs that were blocking them come back as executed, unlock them. i think this was mentioned in the og proposal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually ill try what banking stage does with the memswap + remove then swap back.

might be worth looking at skiplist too, but can look at that later. https://docs.rs/skiplist/latest/skiplist/ordered_skiplist/struct.OrderedSkipList.html

qos_service,
config,
);
debug!(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: does not do well with lots of contention

@buffalu
Copy link
Contributor Author

buffalu commented May 25, 2022

hard thing about accounts-based scheduler is letting multiple read-only transactions run in parallel if the highest transaction for a read-only account is blocked on something else.

struct AccountLocksHeap {
    pubkey: Pubkey,
    lock_type: LockType,
    packets: BTreeSet<Rc<ImmutableDeserializedPacket>>,
    blocked_packets: BTreeSet<Rc<ImmutableDeserializedPacket>>,
}

// main datastructures

// O(1) lookup to see if AccountLocksHeap exists
let all_write_account_heaps: HashMap<Pubkey, Rc<RefCell<AccountLocksHeap>>
let all_read_account_heaps: HashMap<Pubkey, Rc<RefCell<AccountLocksHeap>>

// blocked accounts
let blocked_reads: HashMap<Pubkey, Rc<RefCell<AccountLocksHeap>>
let blocked_writes: HashMap<Pubkey, Rc<RefCell<AccountLocksHeap>>

// sorted by highest_priority packet (priority -> stake weight -> time of arrival to be unique)
let priority_heap = BTreeSet<Rc<RefCell<AccountLocksHeap>>

// scheduler
scheduler is a little harder... ergh


// insertion
let popped_heaps = HashMap::new();

for tx in new_txs {
    let account_locks = tx.get_account_locks();
    for acc in account_locks.write_locks {
        if !all_write_account_heaps.contains(acc) {
            let heap = Rc::new(RefCell::new(AccountLocksHeap{...}));
            all_write_account_heaps.insert(acc, heap.clone());

            // assume it got popped now in case something else accesses this we can do mass-update at the end
            popped_heaps.insert(acc, heap.clone())
        } else {
            if blocked_writes.contains(acc) {
                // can insert packet into this
                blocked_writes.get(acc).unwrap().insert_tx(tx);
            } else {
                let popped_heap = priority_heap.pop(acc);
                popped_heap.insert_tx(tx);
                popped_heaps.insert(acc, popped_heap);
            }
        }
    }
    // do the same for reads
}

// push heaps back on to get sorted for scheduler
for heap in popped_heaps {
    priority_heap.push(heap);
}
    

Err(e) => {
trace!("e: {:?}", e);
match e {
SchedulerError::InvalidSanitizedTransaction
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the packet be removed from the BTree on any of these errors?

@buffalu
Copy link
Contributor Author

buffalu commented Jun 9, 2022

plan to circle back in a few weeks once we get other more critical stuff done!

@buffalu buffalu closed this Jun 9, 2022
@ryoqun
Copy link
Member

ryoqun commented Aug 2, 2022

@buffalu do you have past bench results of transaction-scheduler-bench, showing bad numbers with state fee auctions? I wanna try to compare it to mine #26805. so, just its actual command line args could be enough..

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community Community contribution
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants