Skip to content

Commit

Permalink
Use cluster info functions for tpu (#345)
Browse files Browse the repository at this point in the history
  • Loading branch information
buffalu committed Sep 5, 2023
1 parent cb146cd commit 1586cf9
Show file tree
Hide file tree
Showing 14 changed files with 28 additions and 22 deletions.
2 changes: 1 addition & 1 deletion bundle/src/bundle_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ pub fn load_and_execute_bundle<'a>(

fn get_account_transactions(
bank: &Bank,
account_overrides: &mut AccountOverrides,
account_overrides: &AccountOverrides,
accounts: &[Option<Vec<Pubkey>>],
batch: &TransactionBatch,
) -> Vec<Option<Vec<(Pubkey, AccountSharedData)>>> {
Expand Down
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ solana-core = { path = ".", features = ["dev-context-only-utils"] }
solana-logger = { workspace = true }
solana-program-runtime = { workspace = true }
solana-program-test = { workspace = true }
solana-sdk = { workspace = true, features = ["dev-context-only-utils"] }
solana-stake-program = { workspace = true }
static_assertions = { workspace = true }
systemstat = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion core/src/banking_stage/unprocessed_transaction_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1230,7 +1230,7 @@ impl BundleStorage {

sanitized_bundles
.into_iter()
.zip(bundle_execution_results.into_iter())
.zip(bundle_execution_results)
.for_each(
|((deserialized_bundle, sanitized_bundle), result)| match result {
Ok(_) => {
Expand Down
4 changes: 2 additions & 2 deletions core/src/bundle_stage/bundle_packet_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ mod tests {
// in the same order they were originally
let bank = &bank_forks.read().unwrap().working_bank();
let new_bank = Arc::new(Bank::new_from_parent(
bank,
bank.clone(),
bank.collector_id(),
bank.slot() + 1,
));
Expand Down Expand Up @@ -835,7 +835,7 @@ mod tests {
// create new bank then call process_bundles again, expect to see [bundles1,bundles2]
let bank = &bank_forks.read().unwrap().working_bank();
let new_bank = Arc::new(Bank::new_from_parent(
bank,
bank.clone(),
bank.collector_id(),
bank.slot() + 1,
));
Expand Down
2 changes: 1 addition & 1 deletion core/src/consensus_cache_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl ConsensusCacheUpdater {
.flat_map(|v| v.vote_accounts.clone());

// vote_account
consensus_accounts.extend(vote_accounts.into_iter());
consensus_accounts.extend(vote_accounts);
// authorized_voter_pubkey
consensus_accounts.extend(epoch_stakes.epoch_authorized_voters().keys());
// node_keypair
Expand Down
2 changes: 1 addition & 1 deletion core/src/immutable_deserialized_bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ mod tests {
} = create_genesis_config(10_000);
let parent = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config));
let vote_only_bank = Arc::new(Bank::new_from_parent_with_options(
&parent,
parent,
&Pubkey::new_unique(),
1,
NewBankOptions {
Expand Down
7 changes: 2 additions & 5 deletions core/src/proxy/fetch_stage_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,8 @@ impl FetchStageManager {
tpu_address: SocketAddr,
tpu_forward_address: SocketAddr,
) -> Result<(), contact_info::Error> {
let mut new_contact_info = cluster_info.my_contact_info();
// TODO (LB): double check protocol!!!!!!
new_contact_info.set_tpu(tpu_address)?;
new_contact_info.set_tpu_forwards(tpu_forward_address)?;
cluster_info.set_my_contact_info(new_contact_info);
cluster_info.set_tpu(tpu_address)?;
cluster_info.set_tpu_forwards(tpu_forward_address)?;
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion jito-protos/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ fn main() -> Result<(), std::io::Error> {
let mut protos = Vec::new();
for proto_file in &proto_files {
let proto = proto_base_path.join(proto_file);
println!("cargo::rerun-if-changed={}", proto.display());
println!("cargo:rerun-if-changed={}", proto.display());
protos.push(proto);
}

Expand Down
2 changes: 1 addition & 1 deletion ledger/src/bank_forks_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ pub fn load_bank_forks(
let Some(full_snapshot_archive_info) =
snapshot_utils::get_highest_full_snapshot_archive_info(
&snapshot_config.full_snapshot_archives_dir,
halt_at_slot
halt_at_slot,
)
else {
warn!(
Expand Down
2 changes: 1 addition & 1 deletion poh/src/poh_recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl TransactionRecorder {
.collect::<Vec<_>>());
record_transactions_timings.hash_us = hash_us;

let hashes_transactions: Vec<_> = hashes.into_iter().zip(batches.into_iter()).collect();
let hashes_transactions: Vec<_> = hashes.into_iter().zip(batches).collect();

let (res, poh_record_us) = measure_us!(self.record(bank_slot, hashes_transactions));
record_transactions_timings.poh_record_us = poh_record_us;
Expand Down
3 changes: 3 additions & 0 deletions tip-distributor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ tip-distribution = { workspace = true }
tip-payment = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "sync", "time", "full"] }

[dev-dependencies]
solana-sdk = { workspace = true, features = ["dev-context-only-utils"] }

[[bin]]
name = "solana-stake-meta-generator"
path = "src/bin/stake-meta-generator.rs"
Expand Down
2 changes: 1 addition & 1 deletion tip-distributor/src/stake_meta_generator_workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ mod tests {
bank.squash();

Arc::new(Bank::new_from_parent(
bank,
bank.clone(),
&Pubkey::default(),
bank.get_slots_in_epoch(bank.epoch()) + bank.slot(),
))
Expand Down
18 changes: 12 additions & 6 deletions turbine/src/broadcast_stage/broadcast_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,12 @@ pub(super) fn recv_slot_entries(receiver: &Receiver<WorkingBankEntry>) -> Result
// Drain channel
while last_tick_height != bank.max_tick_height() {
let Ok(WorkingBankEntry {
bank: try_bank,
entries_ticks: new_entries_ticks,
}) = receiver.try_recv() else { break };
bank: try_bank,
entries_ticks: new_entries_ticks,
}) = receiver.try_recv()
else {
break;
};
// If the bank changed, that implies the previous slot was interrupted and we do not have to
// broadcast its entries.
if try_bank.slot() != bank.slot() {
Expand All @@ -72,9 +75,12 @@ pub(super) fn recv_slot_entries(receiver: &Receiver<WorkingBankEntry>) -> Result
&& serialized_batch_byte_count < target_serialized_batch_byte_count
{
let Ok(WorkingBankEntry {
bank: try_bank,
entries_ticks: new_entries_ticks,
}) = receiver.recv_deadline(coalesce_start + ENTRY_COALESCE_DURATION) else { break };
bank: try_bank,
entries_ticks: new_entries_ticks,
}) = receiver.recv_deadline(coalesce_start + ENTRY_COALESCE_DURATION)
else {
break;
};
// If the bank changed, that implies the previous slot was interrupted and we do not have to
// broadcast its entries.
if try_bank.slot() != bank.slot() {
Expand Down
1 change: 0 additions & 1 deletion validator/src/admin_rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,6 @@ mod tests {
},
std::{
collections::HashSet,
str::FromStr,
sync::{atomic::AtomicBool, Mutex},
},
};
Expand Down

0 comments on commit 1586cf9

Please sign in to comment.