Skip to content

Commit

Permalink
using last checked bank to calculate packet total_fee/request_cu for …
Browse files Browse the repository at this point in the history
…sorting
  • Loading branch information
tao-stones committed Feb 23, 2022
1 parent 3c8b9c8 commit 3c2465f
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 35 deletions.
59 changes: 38 additions & 21 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,13 +518,18 @@ impl BankingStage {
qos_service: &QosService,
// TODO - add back metrics_tracker with new dta points
_slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
last_working_bank: Option<Arc<Bank>>,
) {
// TODO - add cli option to enable/disable PacketSender info logging
let mut packet_sender_info = Some(PacketSenderInfo::default());

// we must have/had working_bank to start consume buffer. Using that bank to calculate
// transactino fees
let prioritized_packet_locators = prioritize_by_fee_then_stakes(
buffered_packet_batches,
&mut packet_sender_info);
last_working_bank,
&mut packet_sender_info,
);

let mut report_sender_info_start = Measure::start("packet_sender_info_report");
if let Some(packet_sender_info) = packet_sender_info {
Expand All @@ -540,31 +545,36 @@ impl BankingStage {
// if we are the next leader, skip the filtering logic now as buffer will be filter
// again.
if is_next_leader(my_pubkey, end_of_slot.next_slot_leader) {
retryable_packets_locators.extend_from_slice(&prioritized_packet_locators[chunk_start..prioritized_packet_locators.len()]);
retryable_packets_locators.extend_from_slice(
&prioritized_packet_locators
[chunk_start..prioritized_packet_locators.len()],
);
break;
}
// filter too_old unprocessed packets if have a bank, otherwise keep buffer as is
if let Some(bank) = &end_of_slot.working_bank {
let (transactions, sanitized_locators) = sanitize_transactions(
&buffered_packet_batches,
&prioritized_packet_locators[chunk_start..prioritized_packet_locators.len()],
&prioritized_packet_locators
[chunk_start..prioritized_packet_locators.len()],
&bank.feature_set,
bank.vote_only_bank(),
bank.as_ref(),
);
);
let retryable_transaction_indexes = (0..transactions.len()).collect_vec();
let filtered_retryable_transaction_locators =
filter_retryable_transactions(
bank,
&transactions,
&sanitized_locators,
&retryable_transaction_indexes,
);
let filtered_retryable_transaction_locators = filter_retryable_transactions(
bank,
&transactions,
&sanitized_locators,
&retryable_transaction_indexes,
);
retryable_packets_locators.extend(filtered_retryable_transaction_locators);
}
else {
} else {
// not filtering, just add remaining packet locators to retryable list
retryable_packets_locators.extend_from_slice(&prioritized_packet_locators[chunk_start..prioritized_packet_locators.len()]);
retryable_packets_locators.extend_from_slice(
&prioritized_packet_locators
[chunk_start..prioritized_packet_locators.len()],
);
}
break;
} else {
Expand Down Expand Up @@ -688,7 +698,7 @@ impl BankingStage {
qos_service: &QosService,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
) -> BufferedPacketsDecision {
let (decision, make_decision_time) = Measure::this(
let ((decision, working_bank), make_decision_time) = Measure::this(
|_| {
let bank_start;
let (
Expand All @@ -710,12 +720,15 @@ impl BankingStage {
)
};

Self::consume_or_forward_packets(
my_pubkey,
leader_at_slot_offset,
bank_still_processing_txs,
would_be_leader,
would_be_leader_shortly,
(
Self::consume_or_forward_packets(
my_pubkey,
leader_at_slot_offset,
bank_still_processing_txs,
would_be_leader,
would_be_leader_shortly,
),
bank_still_processing_txs.map_or(None, |bank| Some(bank.clone())),
)
},
(),
Expand All @@ -739,6 +752,7 @@ impl BankingStage {
recorder,
qos_service,
slot_metrics_tracker,
working_bank,
)
},
(),
Expand Down Expand Up @@ -3525,6 +3539,7 @@ mod tests {
&recorder,
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
&mut LeaderSlotMetricsTracker::new(0),
None,
);
assert_eq!(
buffered_packet_batches[0].unprocessed_packets.len(),
Expand All @@ -3546,6 +3561,7 @@ mod tests {
&recorder,
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
&mut LeaderSlotMetricsTracker::new(0),
None,
);
if num_expected_unprocessed == 0 {
assert!(buffered_packet_batches.is_empty())
Expand Down Expand Up @@ -3616,6 +3632,7 @@ mod tests {
&recorder,
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
&mut LeaderSlotMetricsTracker::new(0),
None,
);

// Check everything is correct. All indexes after `interrupted_iteration`
Expand Down
89 changes: 75 additions & 14 deletions core/src/unprocessed_packet_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use {
packet::{limited_deserialize, Packet, PacketBatch},
perf_libs,
},
solana_program_runtime::compute_budget::ComputeBudget,
solana_runtime::{accounts_db::ErrorCounters, bank::Bank},
solana_sdk::{
clock::{
Expand All @@ -16,8 +17,12 @@ use {
},
feature_set,
hash::Hash,
message::Message,
message::{
v0::{self, LoadedAddresses},
Message, SanitizedMessage, VersionedMessage,
},
pubkey::Pubkey,
sanitize::Sanitize,
short_vec::decode_shortu16_len,
signature::Signature,
transaction::{
Expand Down Expand Up @@ -146,6 +151,7 @@ pub type UnprocessedPacketBatches = VecDeque<DeserializedPacketBatch>;
// stakes
pub fn prioritize_by_fee_then_stakes(
unprocessed_packet_batches: &UnprocessedPacketBatches,
working_bank: Option<Arc<Bank>>,
packet_sender_info: &mut Option<PacketSenderInfo>,
) -> Vec<PacketLocator> {
let (stakes, locators) =
Expand All @@ -155,7 +161,11 @@ pub fn prioritize_by_fee_then_stakes(
let shuffled_packet_locators = weighted_shuffle(&stakes, &locators, packet_sender_info);

// 3. TODO TAO - sort by fee function -> sorted and shuffled locators
prioritize_by_fee(unprocessed_packet_batches, &shuffled_packet_locators)
prioritize_by_fee(
unprocessed_packet_batches,
&shuffled_packet_locators,
working_bank,
)
}

// Iterates packets in buffered batches, returns all unprocessed packet's stake,
Expand Down Expand Up @@ -220,16 +230,18 @@ fn weighted_shuffle(
fn prioritize_by_fee(
unprocessed_packet_batches: &UnprocessedPacketBatches,
locators: &Vec<PacketLocator>,
bank: Option<Arc<Bank>>,
) -> Vec<PacketLocator> {
let mut fee_buckets = BTreeMap::<u64, Vec<PacketLocator>>::new();
for locator in locators {
let fee_per_cu =
if let Some(fee_per_cu) = compute_fee_per_cu(unprocessed_packet_batches, locator) {
fee_per_cu
} else {
// if unable to compute fee-per-cu for the packet, put it to the `0` bucket
0u64
};
let fee_per_cu = if let Some(fee_per_cu) =
compute_fee_per_cu(unprocessed_packet_batches, locator, &bank)
{
fee_per_cu
} else {
// if unable to compute fee-per-cu for the packet, put it to the `0` bucket
0u64
};
let bucket = fee_buckets
.entry(fee_per_cu)
.or_insert(Vec::<PacketLocator>::new());
Expand All @@ -245,9 +257,29 @@ fn prioritize_by_fee(
fn compute_fee_per_cu(
unprocessed_packet_batches: &UnprocessedPacketBatches,
locator: &PacketLocator,
bank: &Option<Arc<Bank>>,
) -> Option<u64> {
// TODO - how to get base_fee
None
if let Some(bank) = bank {
let deserialized_packet_batch = unprocessed_packet_batches.get(locator.batch_index)?;
let deserialized_packet = deserialized_packet_batch
.unprocessed_packets
.get(&locator.packet_index)?;
let sanitized_message = sanitize_message(
&deserialized_packet.versioned_transaction.message,
bank.as_ref(),
)?;
let total_fee = bank.get_fee_for_message(&sanitized_message)?;

// TODO update bank to get_fee_and_cu_for_message() to avoid calling compute_budget again
let mut compute_budget = ComputeBudget::default();
let _ = compute_budget
.process_message(&sanitized_message, false)
.ok()?;

Some(total_fee / compute_budget.max_units)
} else {
None
}
}

// This function creates SanitizedTransactions from deseralized VersionedTransactions.i
Expand Down Expand Up @@ -285,6 +317,23 @@ pub fn sanitize_transactions(
.unzip()
}

fn sanitize_message(
versioned_message: &VersionedMessage,
address_loader: &impl AddressLoader,
) -> Option<SanitizedMessage> {
versioned_message.sanitize().ok()?;

match versioned_message {
VersionedMessage::Legacy(message) => Some(SanitizedMessage::Legacy(message.clone())),
VersionedMessage::V0(message) => Some(SanitizedMessage::V0(v0::LoadedMessage {
loaded_addresses: address_loader
.load_addresses(&message.address_table_lookups)
.ok()?,
message: message.clone(),
})),
}
}

// insert new packet batch into buffer,
// if buffer is at limit, using eviction strategy to evict lower priority packets
// until an empty batch is located, swap that with new batch
Expand Down Expand Up @@ -1007,9 +1056,21 @@ mod tests {
packet_index: 2,
},
];
let prioritized_locators = prioritize_by_fee(&unprocessed_packets, &locators);
// TODO rn fee-per-cu is not calculated, should expect output is same as input
assert_eq!(locators, prioritized_locators);
{
// If no bank is given, fee-per-cu won't calculate, should expect output is same as input
let prioritized_locators = prioritize_by_fee(&unprocessed_packets, &locators, None);
assert_eq!(locators, prioritized_locators);
}

// TODO test request additional fee to alter locators orders
/*
{
// If bank is given, fee-per-cu is calculated, should expect different output
let bank = Arc::new(Bank::default_for_tests());
let prioritized_locators = prioritize_by_fee(&unprocessed_packets, &locators, Some(bank));
assert_eq!(locators, prioritized_locators);
}
// */
}
}
}

0 comments on commit 3c2465f

Please sign in to comment.