Skip to content

Commit

Permalink
using last checked bank to calculate packet total_fee/request_cu for …
Browse files Browse the repository at this point in the history
…sorting;

add test
  • Loading branch information
tao-stones committed Feb 23, 2022
1 parent 3c8b9c8 commit 5af8e7c
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 47 deletions.
59 changes: 38 additions & 21 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,13 +518,18 @@ impl BankingStage {
qos_service: &QosService,
// TODO - add back metrics_tracker with new dta points
_slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
last_working_bank: Option<Arc<Bank>>,
) {
// TODO - add cli option to enable/disable PacketSender info logging
let mut packet_sender_info = Some(PacketSenderInfo::default());

// we must have/had working_bank to start consume buffer. Using that bank to calculate
// transactino fees
let prioritized_packet_locators = prioritize_by_fee_then_stakes(
buffered_packet_batches,
&mut packet_sender_info);
last_working_bank,
&mut packet_sender_info,
);

let mut report_sender_info_start = Measure::start("packet_sender_info_report");
if let Some(packet_sender_info) = packet_sender_info {
Expand All @@ -540,31 +545,36 @@ impl BankingStage {
// if we are the next leader, skip the filtering logic now as buffer will be filter
// again.
if is_next_leader(my_pubkey, end_of_slot.next_slot_leader) {
retryable_packets_locators.extend_from_slice(&prioritized_packet_locators[chunk_start..prioritized_packet_locators.len()]);
retryable_packets_locators.extend_from_slice(
&prioritized_packet_locators
[chunk_start..prioritized_packet_locators.len()],
);
break;
}
// filter too_old unprocessed packets if have a bank, otherwise keep buffer as is
if let Some(bank) = &end_of_slot.working_bank {
let (transactions, sanitized_locators) = sanitize_transactions(
&buffered_packet_batches,
&prioritized_packet_locators[chunk_start..prioritized_packet_locators.len()],
&prioritized_packet_locators
[chunk_start..prioritized_packet_locators.len()],
&bank.feature_set,
bank.vote_only_bank(),
bank.as_ref(),
);
);
let retryable_transaction_indexes = (0..transactions.len()).collect_vec();
let filtered_retryable_transaction_locators =
filter_retryable_transactions(
bank,
&transactions,
&sanitized_locators,
&retryable_transaction_indexes,
);
let filtered_retryable_transaction_locators = filter_retryable_transactions(
bank,
&transactions,
&sanitized_locators,
&retryable_transaction_indexes,
);
retryable_packets_locators.extend(filtered_retryable_transaction_locators);
}
else {
} else {
// not filtering, just add remaining packet locators to retryable list
retryable_packets_locators.extend_from_slice(&prioritized_packet_locators[chunk_start..prioritized_packet_locators.len()]);
retryable_packets_locators.extend_from_slice(
&prioritized_packet_locators
[chunk_start..prioritized_packet_locators.len()],
);
}
break;
} else {
Expand Down Expand Up @@ -688,7 +698,7 @@ impl BankingStage {
qos_service: &QosService,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
) -> BufferedPacketsDecision {
let (decision, make_decision_time) = Measure::this(
let ((decision, working_bank), make_decision_time) = Measure::this(
|_| {
let bank_start;
let (
Expand All @@ -710,12 +720,15 @@ impl BankingStage {
)
};

Self::consume_or_forward_packets(
my_pubkey,
leader_at_slot_offset,
bank_still_processing_txs,
would_be_leader,
would_be_leader_shortly,
(
Self::consume_or_forward_packets(
my_pubkey,
leader_at_slot_offset,
bank_still_processing_txs,
would_be_leader,
would_be_leader_shortly,
),
bank_still_processing_txs.map_or(None, |bank| Some(bank.clone())),
)
},
(),
Expand All @@ -739,6 +752,7 @@ impl BankingStage {
recorder,
qos_service,
slot_metrics_tracker,
working_bank,
)
},
(),
Expand Down Expand Up @@ -3525,6 +3539,7 @@ mod tests {
&recorder,
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
&mut LeaderSlotMetricsTracker::new(0),
None,
);
assert_eq!(
buffered_packet_batches[0].unprocessed_packets.len(),
Expand All @@ -3546,6 +3561,7 @@ mod tests {
&recorder,
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
&mut LeaderSlotMetricsTracker::new(0),
None,
);
if num_expected_unprocessed == 0 {
assert!(buffered_packet_batches.is_empty())
Expand Down Expand Up @@ -3616,6 +3632,7 @@ mod tests {
&recorder,
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
&mut LeaderSlotMetricsTracker::new(0),
None,
);

// Check everything is correct. All indexes after `interrupted_iteration`
Expand Down
170 changes: 144 additions & 26 deletions core/src/unprocessed_packet_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use {
packet::{limited_deserialize, Packet, PacketBatch},
perf_libs,
},
solana_program_runtime::compute_budget::ComputeBudget,
solana_runtime::{accounts_db::ErrorCounters, bank::Bank},
solana_sdk::{
clock::{
Expand All @@ -16,8 +17,12 @@ use {
},
feature_set,
hash::Hash,
message::Message,
message::{
v0::{self, LoadedAddresses},
Message, SanitizedMessage, VersionedMessage,
},
pubkey::Pubkey,
sanitize::Sanitize,
short_vec::decode_shortu16_len,
signature::Signature,
transaction::{
Expand Down Expand Up @@ -146,6 +151,7 @@ pub type UnprocessedPacketBatches = VecDeque<DeserializedPacketBatch>;
// stakes
pub fn prioritize_by_fee_then_stakes(
unprocessed_packet_batches: &UnprocessedPacketBatches,
working_bank: Option<Arc<Bank>>,
packet_sender_info: &mut Option<PacketSenderInfo>,
) -> Vec<PacketLocator> {
let (stakes, locators) =
Expand All @@ -154,8 +160,12 @@ pub fn prioritize_by_fee_then_stakes(
// 2. weight shuffle -> shuffled locators
let shuffled_packet_locators = weighted_shuffle(&stakes, &locators, packet_sender_info);

// 3. TODO TAO - sort by fee function -> sorted and shuffled locators
prioritize_by_fee(unprocessed_packet_batches, &shuffled_packet_locators)
// 3. sort by fee function -> sorted and shuffled locators
prioritize_by_fee(
unprocessed_packet_batches,
&shuffled_packet_locators,
working_bank,
)
}

// Iterates packets in buffered batches, returns all unprocessed packet's stake,
Expand Down Expand Up @@ -220,23 +230,27 @@ fn weighted_shuffle(
fn prioritize_by_fee(
unprocessed_packet_batches: &UnprocessedPacketBatches,
locators: &Vec<PacketLocator>,
bank: Option<Arc<Bank>>,
) -> Vec<PacketLocator> {
let mut fee_buckets = BTreeMap::<u64, Vec<PacketLocator>>::new();
for locator in locators {
let fee_per_cu =
if let Some(fee_per_cu) = compute_fee_per_cu(unprocessed_packet_batches, locator) {
fee_per_cu
} else {
// if unable to compute fee-per-cu for the packet, put it to the `0` bucket
0u64
};
let fee_per_cu = if let Some(fee_per_cu) =
compute_fee_per_cu(unprocessed_packet_batches, locator, &bank)
{
fee_per_cu
} else {
// if unable to compute fee-per-cu for the packet, put it to the `0` bucket
0u64
};

let bucket = fee_buckets
.entry(fee_per_cu)
.or_insert(Vec::<PacketLocator>::new());
bucket.push(locator.clone());
}
fee_buckets
.iter()
.rev()
.flat_map(|(_key, bucket)| bucket.iter().map(|x| x.clone()))
.collect()
}
Expand All @@ -245,9 +259,29 @@ fn prioritize_by_fee(
fn compute_fee_per_cu(
unprocessed_packet_batches: &UnprocessedPacketBatches,
locator: &PacketLocator,
bank: &Option<Arc<Bank>>,
) -> Option<u64> {
// TODO - how to get base_fee
None
if let Some(bank) = bank {
let deserialized_packet_batch = unprocessed_packet_batches.get(locator.batch_index)?;
let deserialized_packet = deserialized_packet_batch
.unprocessed_packets
.get(&locator.packet_index)?;
let sanitized_message = sanitize_message(
&deserialized_packet.versioned_transaction.message,
bank.as_ref(),
)?;
let total_fee = bank.get_fee_for_message(&sanitized_message)?;

// TODO update bank to get_fee_and_cu_for_message() to avoid calling compute_budget again
let mut compute_budget = ComputeBudget::new(false);
let _ = compute_budget
.process_message(&sanitized_message, false)
.ok()?;

Some(total_fee / compute_budget.max_units)
} else {
None
}
}

// This function creates SanitizedTransactions from deseralized VersionedTransactions.i
Expand Down Expand Up @@ -285,6 +319,23 @@ pub fn sanitize_transactions(
.unzip()
}

fn sanitize_message(
versioned_message: &VersionedMessage,
address_loader: &impl AddressLoader,
) -> Option<SanitizedMessage> {
versioned_message.sanitize().ok()?;

match versioned_message {
VersionedMessage::Legacy(message) => Some(SanitizedMessage::Legacy(message.clone())),
VersionedMessage::V0(message) => Some(SanitizedMessage::V0(v0::LoadedMessage {
loaded_addresses: address_loader
.load_addresses(&message.address_table_lookups)
.ok()?,
message: message.clone(),
})),
}
}

// insert new packet batch into buffer,
// if buffer is at limit, using eviction strategy to evict lower priority packets
// until an empty batch is located, swap that with new batch
Expand Down Expand Up @@ -498,10 +549,16 @@ mod tests {
use {
super::*,
solana_perf::packet::PacketFlags,
solana_runtime::{
bank::goto_end_of_slot,
genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo},
},
solana_sdk::{
compute_budget::{self, ComputeBudgetInstruction},
signature::{Keypair, Signer},
system_instruction::{self},
system_transaction,
transaction::DisabledAddressLoader,
transaction::{DisabledAddressLoader, Transaction},
},
solana_vote_program::vote_transaction,
};
Expand Down Expand Up @@ -979,23 +1036,83 @@ mod tests {

#[test]
fn test_prioritize_by_fee() {
solana_logger::setup();

let leader = solana_sdk::pubkey::new_rand();
let GenesisConfigInfo {
mut genesis_config,
mint_keypair,
..
} = create_genesis_config_with_leader(1_000_000, &leader, 3);
genesis_config
.fee_rate_governor
.target_lamports_per_signature = 1;
genesis_config.fee_rate_governor.target_signatures_per_slot = 1;

let mut bank = Bank::new_for_tests(&genesis_config);
goto_end_of_slot(&mut bank);
let mut bank = Bank::new_from_parent(&Arc::new(bank), &leader, 1);
goto_end_of_slot(&mut bank);
let mut unprocessed_packets = build_unprocessed_packets_buffer();
// add a packet with 2 signatures to buffer that has doubled fee, and additional fee
let key0 = Keypair::new();
let key1 = Keypair::new();
let ix0 = system_instruction::transfer(&key0.pubkey(), &key1.pubkey(), 1);
let ix1 = system_instruction::transfer(&key1.pubkey(), &key0.pubkey(), 1);
let ix_cb = ComputeBudgetInstruction::request_units(1000, 20000);
let mut message = Message::new(&[ix0, ix1, ix_cb], Some(&key0.pubkey()));
message.recent_blockhash = bank.last_blockhash();
let tx = Transaction::new(&[&key0, &key1], message, bank.last_blockhash());

let packet = Packet::from_data(None, &tx).unwrap();
unprocessed_packets.push_back(DeserializedPacketBatch::new(
PacketBatch::new(vec![packet]),
vec![0],
false,
));

let locators = vec![
PacketLocator {
batch_index: 2,
packet_index: 2,
},
PacketLocator {
batch_index: 1,
packet_index: 2,
},
PacketLocator {
batch_index: 3,
packet_index: 0,
},
PacketLocator {
batch_index: 3,
packet_index: 2,
},
PacketLocator {
batch_index: 4,
packet_index: 0,
},
];
{
let unprocessed_packets = build_unprocessed_packets_buffer();
let locators = vec![
PacketLocator {
batch_index: 1,
packet_index: 1,
},
// If no bank is given, fee-per-cu won't calculate, should expect output is same as input
let prioritized_locators = prioritize_by_fee(&unprocessed_packets, &locators, None);
assert_eq!(locators, prioritized_locators);
}

{
// If bank is given, fee-per-cu is calculated, should expect higher fee-per-cu come
// out first
let expected_locators = vec![
PacketLocator {
batch_index: 1,
packet_index: 2,
batch_index: 4,
packet_index: 0,
},
PacketLocator {
batch_index: 2,
packet_index: 0,
packet_index: 2,
},
PacketLocator {
batch_index: 2,
batch_index: 1,
packet_index: 2,
},
PacketLocator {
Expand All @@ -1007,9 +1124,10 @@ mod tests {
packet_index: 2,
},
];
let prioritized_locators = prioritize_by_fee(&unprocessed_packets, &locators);
// TODO rn fee-per-cu is not calculated, should expect output is same as input
assert_eq!(locators, prioritized_locators);

let prioritized_locators =
prioritize_by_fee(&unprocessed_packets, &locators, Some(Arc::new(bank)));
assert_eq!(expected_locators, prioritized_locators);
}
}
}

0 comments on commit 5af8e7c

Please sign in to comment.