-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Leader qos #23257
Leader qos #23257
Conversation
ee61c65
to
0968872
Compare
This is still work-in-progress to actually get The The Regarding performance, the overhead added here (eg sorting packets instead of linearly traverse packet_bacthes, additional functions of fee-per-cu, and stake weight etc) are going to have negative impact. A suggestion is to reduce buffer size from current 500_000 packets to 4 slots worth packets (~6,400 packets), and increase transaction processing threads from current 2 to 4, should make a good difference. These numbers of course need to be played around. This is a rather big change, even it is till wip, I'd appreciate if more eyes to start looking at it. |
@buffalu ^^^ |
skimming through right now, this is my understanding of changes:
stream of consciousness/messy notes:
going to take another look in the AM, but initial thoughts above. hard problem to solve; i still feel like we wouldn't need all this if entries were packed better (can always be added later) + executed in parallel properly, but only going on intuition and don't have any data for this lol |
I'd raised this in the issue that Toly raised last week, but I'm wondering how we get a benefit from stake-weighted QoS on the TPU port given that staked validators typically only submit transactions on TpuFwd and TpuVote rather than Tpu, where we're seeing congestion |
Yea, using locator to avoid additional copies of packet_batch. |
The main motivation with sender stake weighted shuffling is to allow (some) valid transactions go through in bot spamming condition. I can see your concern of incentivizing high staked validators to spam. This change can be discussed more. |
Sorry, that's not my concern at all. My concern is that staked validators rarely submit transactions to the TPU port, so stake-weighting transactions there will have little to no effect. That is, nearly all TXs coming in to the TPU port are from unstaked nodes |
you are right, but wasn't it suggested that people could forward to high-staked validators and they could do QoS and submit? not totally sure what problem that solves right now tho |
Maybe. Kinda need that before qos though. There's no port to accept these transactions on today, nor logic discern them later in the pipeline (nor even a reliable way to discern?) |
3c2465f
to
5af8e7c
Compare
I feel like there is still a lot of discussion required for the scope of this change. And this is a very critical change. I think it'd be more efficient if we got consensus through a proposal first. I personally have little interest in reading thousands of lines of refactorings and changes we're not sure about yet and are very impactful to tx ingestion. That said, drafts like this are useful for checking perf implications and can aid the proposal discussion. |
It indeed is a lot of code change, I wanted to put all things together, at least as proof of concept, so the discussions would be more concrete. Also trying to keep commits rather self-contained, so they can be reviewed/discussed individually, and revert/cherry-picked if necessary. While the refactoring is large change, the motivation of it is rather simple, as described in PR comments. We should continue discussion at #23207. Just as a side note wrt performance, its |
gm 🤝 |
What the proposal should be? About the idea of prioritizing by fee/CU, or the implementation of it, or ideas of leader QoS in general? |
8a29354
to
eeee7e9
Compare
…ifo, to make room for new packet_batch
…on in chunk, instead of iterating through packet_batches;
…sorting; add test
…account limit. then forward by bucket to up to 10 block-limit CUs
@@ -0,0 +1,1205 @@ | |||
use { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for refactoring this, but I think this can be a separate PR that moves the code into this file that gets checked in first.
Otherwise it's hard here to tell what new functions were added and what code was just migrated
let mut result = Vec::<&'a Packet>::new(); | ||
self.forwardable_packets | ||
.iter() | ||
.for_each(|v| result.extend(v)); | ||
result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this would probably be more efficient as a iter().collect::<Vec<&'a Packet>>()
instead of multiple calls to extend()
cost: &TransactionCost, | ||
packet: &'a Packet, | ||
) { | ||
// try to sort the `transaction` into one of outbound (virtual) blocks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unclear what a "virtual" block is referring to here as that term is never defined, a more concrete definition would be helpful.
cost_trackers: Vec<CostTracker>, | ||
forwardable_packets: Vec<Vec<&'a Packet>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like it could be condensed into a single Vec<ForwardableBlock>
, where a ForwardableBlock
looks like:
struct ForwardableBlock {
cost_tracker: CostTracker,
forwardable_packets: Vec<&'a Packet>
}
prioritize_by_fee(unprocessed_packet_batches, &locators, working_bank) | ||
} | ||
|
||
fn sort_into_buckets( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: sort_into_buckets -> insert_into_forwardable_blocks
unprocessed_packet_batches: &'a UnprocessedPacketBatches, | ||
working_bank: Option<Arc<Bank>>, | ||
) -> Vec<PacketLocator> { | ||
let mut locators = Vec::<PacketLocator>::with_capacity(TOTAL_BUFFERED_PACKETS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: locators -> unforwarded_packets_locators
pub struct DeserializedPacketBatch { | ||
pub packet_batch: PacketBatch, | ||
pub forwarded: bool, | ||
// indexes of valid packets in batch, and their corrersponding deserialized_packet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: corrersponding -> corresponding
#[allow(dead_code)] | ||
pub batch_index: usize, | ||
#[allow(dead_code)] | ||
pub packet_index: usize, | ||
} | ||
|
||
// hold deserialized messages, as well as computed message_hash and other things needed to create | ||
// SanitizedTransaction | ||
#[derive(Debug, Default)] | ||
pub struct DeserializedPacket { | ||
#[allow(dead_code)] | ||
pub versioned_transaction: VersionedTransaction, | ||
|
||
#[allow(dead_code)] | ||
pub message_hash: Hash, | ||
|
||
#[allow(dead_code)] | ||
pub is_simple_vote: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dead_code
tag doesn't seem necessary
@@ -0,0 +1,427 @@ | |||
use { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like all the forwarding changes can also be broken into a separate PR after the main prioritization change lands
.collect() | ||
} | ||
|
||
// to comute (addition_fee + base_fee / requested_cu) for packet identified by `locator` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: comute -> compute
.unprocessed_packets | ||
.keys() | ||
.for_each(|packet_index| { | ||
let p = &packet_batch.packets[*packet_index]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pattern seems to come up a lot, let's add a new function to UnprocessedPacketBatches
:
pub fn packet_iter(&self) -> impl Iterator<Item = (usize, &Packet)> {
self.unprocessed_packets
.keys()
.map(|index| (*index, &self.packet_batch.packets[*index]))
}
let mut stakes = Vec::<u64>::with_capacity(TOTAL_BUFFERED_PACKETS); | ||
let mut locators = Vec::<PacketLocator>::with_capacity(TOTAL_BUFFERED_PACKETS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TOTAL_BUFFERED_PACKETS
seems excessively large for most cases, could just not preallocate or figure out the size beforehand
unprocessed_packet_batches: &'a UnprocessedPacketBatches, | ||
working_bank: Option<Arc<Bank>>, | ||
) -> Vec<PacketLocator> { | ||
let mut locators = Vec::<PacketLocator>::with_capacity(TOTAL_BUFFERED_PACKETS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This TOTAL_BUFFERED_PACKETS
seems unnecessarily large. We can instead collect over an iterator:
let unforwarded_packets_locators: Vec<PacketLocator> =
unprocessed_packet_batches.iter().enumerate().filter_map(
|(batch_index, deserialized_packet_batch)| {
if !deserialized_packet_batch.forwarded {
Some((batch_index, deserialized_packet_batch))
} else {
None
}
},
).flat_map(|(batch_index, deserialized_packet_batch)| {
deserialized_packet_batch
.unprocessed_packets
.keys()
.map(move |packet_index| {
PacketLocator {
batch_index,
packet_index: *packet_index,
}
})
}).collect();
)?; | ||
let total_fee = bank.get_fee_for_message(&sanitized_message)?; | ||
|
||
// TODO update bank to get_fee_and_cu_for_message() to avoid calling compute_budget again |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was going to comment this as well 😃 , process_message()
seems unnecessary here
// if unable to compute fee-per-cu for the packet, put it to the `0` bucket | ||
0u64 | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're unable to compute the fee-per-cu doesn't that mean the packet was corrupted and we can drop it entirely?
let prioritized_forwardable_packet_locators = Self::prioritize_unforwarded_packets_by_fee( | ||
unprocessed_packet_batches, | ||
Some(working_bank.clone()), | ||
); | ||
|
||
// if we have a bank to work with, then sort packets by write-account buckets | ||
let (transactions, sanitized_locators) = sanitize_transactions( | ||
unprocessed_packet_batches, | ||
&prioritized_forwardable_packet_locators, | ||
&working_bank.feature_set, | ||
working_bank.vote_only_bank(), | ||
working_bank.as_ref(), | ||
); | ||
let transactions_costs = qos_service.compute_transaction_costs(transactions.iter()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a comment describing the steps here:
- Compute the fee per cu and sort transactions by this priority for the
unprocessed_packet_batches
queue inprioritize_unforwarded_packets_by_fee()
- Lock cost model, clone every transaction from the unprocessed queue, deserialize all of these transactions, compute cost model cost in
compute_transaction_costs()
- Using the cost model costs from 2. split the
unprocessed_packet_batches
into separate chunks, where each chunk represents a block filled with transactions with costs that add up to the block cost limit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some perf implications:
- If the
unprocessed_packet_batches
was already a heap sorted by fee-per-cu priority which was updated when new transactions were added, we could avoid the sort here - Unprocessed buffer could be huge, calculating all this metadata might be expensive. Because there's already a limit on the number of packet bytes we actually forward here:
solana/core/src/banking_stage.rs
Lines 483 to 500 in 86e2f72
data_budget.update(INTERVAL_MS, |bytes| { std::cmp::min( bytes.saturating_add(MAX_BYTES_PER_INTERVAL), MAX_BYTES_BUDGET, ) }); let packet_vec: Vec<_> = packets .iter() .filter_map(|p| { if !p.meta.forwarded() && data_budget.take(p.meta.size) { Some((&p.data[..p.meta.size], tpu_forwards)) } else { None } }) .collect(); - If we do 2, I don't think we need to compute the costs of all these transactions, just forward up to
X
bytes from the buffer to avoid all the steps/cost needed to clone/deserialize every transaction and run the cost model
@taozhu-chicago still in the middle of reviewing this, but before you start resolving comments let's start breaking this up into pieces that can be checked in:
|
fn apply_weights(batches: &mut [PacketBatch], ip_to_stake: &HashMap<IpAddr, u64>) { | ||
batches.into_par_iter().for_each(|batch| { | ||
batch.packets.par_iter_mut().for_each(|packet| { | ||
packet.meta.weight = *ip_to_stake.get(&packet.meta.addr().ip()).unwrap_or(&0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this weight always just the stake of the sender? If so, does it make sense to:
- rename this
weight
field tosender_stake
, and all function names with the wordweight
tosender_stake
- rename this entire stage to
FindPacketSenderStakeStage
, since we're not really calculating a "weight" (like a fee) and we are operating on packets, not transactions
Sounds good. All commits in this pr provide an idea of the "end product" and allow to test it out end-to-end. But does make reviews much harder. I'll break it out as suggested for reviewing. |
|
the original PR: #21953 |
Having second-thought while trying to cherry-pick commits in this PR into separate PRs. Other than the first two commits, all the other commits are built on top of each other, which means new smaller PRs are depends on (therefore include) previous PRs, doesn't look makes review any easier than review commit-by-commit. wdyt @carllin ? |
@taozhu-chicago I think the code is complex enough and the reviews will be long enough where it would be good to break up those dependencies between commits into individual PR's, even though it might be a PITA. |
Alright, PITA it is. Let's do it |
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. |
This stale pull request has been automatically closed. Thank you for your contributions. |
Problem
Leader to prioritize transactions with fee-per-cu and/or sender stake weights
Summary of Changes
consume_buffered_packets
to prioritize SanitizedTransactions in fee-per-cu (still wip) then stake shuffled order, send to bank for processing in chunk.Fixes #23207