Skip to content

Commit

Permalink
Apply cleanups to solana-core for unified scheduler (solana-labs#4123)
Browse files Browse the repository at this point in the history
* Setup PohRecorder earlier for unified scheduler

* Use BankWithScheduler in banking-bench for US

* Move BankingPacket{Batch,Receiver} to new crate

* Extract duplicate tpu bank handling code for US

* Call wait_for_completed_scheduler in banking simulator
  • Loading branch information
ryoqun authored Jan 9, 2025
1 parent 45a27ae commit 6b6a03b
Show file tree
Hide file tree
Showing 21 changed files with 160 additions and 76 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ members = [
"accounts-db/store-histogram",
"accounts-db/store-tool",
"banking-bench",
"banking-stage-ingress-types",
"banks-client",
"banks-interface",
"banks-server",
Expand Down Expand Up @@ -255,6 +256,7 @@ check-cfg = [

[workspace.dependencies]
Inflector = "0.11.4"
agave-banking-stage-ingress-types = { path = "banking-stage-ingress-types", version = "=2.2.0" }
agave-transaction-view = { path = "transaction-view", version = "=2.2.0" }
aquamarine = "0.3.3"
aes-gcm-siv = "0.11.1"
Expand Down
4 changes: 3 additions & 1 deletion banking-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ license = { workspace = true }
edition = { workspace = true }

[dependencies]
agave-banking-stage-ingress-types = { workspace = true }
assert_matches = { workspace = true }
clap = { version = "3.1.8", features = ["derive", "cargo"] }
crossbeam-channel = { workspace = true }
log = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
solana-client = { workspace = true }
solana-core = { workspace = true }
solana-core = { workspace = true, features = ["dev-context-only-utils"] }
solana-gossip = { workspace = true }
solana-ledger = { workspace = true }
solana-logger = { workspace = true }
Expand Down
33 changes: 18 additions & 15 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
#![allow(clippy::arithmetic_side_effects)]
use {
agave_banking_stage_ingress_types::BankingPacketBatch,
assert_matches::assert_matches,
clap::{crate_description, crate_name, Arg, ArgEnum, Command},
crossbeam_channel::{unbounded, Receiver},
log::*,
rand::{thread_rng, Rng},
rayon::prelude::*,
solana_client::connection_cache::ConnectionCache,
solana_core::{
banking_stage::BankingStage,
banking_trace::{
BankingPacketBatch, BankingTracer, Channels, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT,
},
banking_stage::{update_bank_forks_and_poh_recorder_for_new_tpu_bank, BankingStage},
banking_trace::{BankingTracer, Channels, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT},
validator::BlockProductionMethod,
},
solana_gossip::cluster_info::{ClusterInfo, Node},
Expand Down Expand Up @@ -349,7 +349,7 @@ fn main() {
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let bank0 = Bank::new_for_benches(&genesis_config);
let bank_forks = BankForks::new_rw_arc(bank0);
let mut bank = bank_forks.read().unwrap().working_bank();
let mut bank = bank_forks.read().unwrap().working_bank_with_scheduler();

// set cost tracker limits to MAX so it will not filter out TXs
bank.write_cost_tracker()
Expand Down Expand Up @@ -552,21 +552,24 @@ fn main() {
poh_time.stop();

let mut new_bank_time = Measure::start("new_bank");
if let Some((result, _timings)) = bank.wait_for_completed_scheduler() {
assert_matches!(result, Ok(_));
}
let new_slot = bank.slot() + 1;
let new_bank = Bank::new_from_parent(bank, &collector, new_slot);
let new_bank = Bank::new_from_parent(bank.clone(), &collector, new_slot);
new_bank_time.stop();

let mut insert_time = Measure::start("insert_time");
bank_forks.write().unwrap().insert(new_bank);
bank = bank_forks.read().unwrap().working_bank();
assert_matches!(poh_recorder.read().unwrap().bank(), None);
update_bank_forks_and_poh_recorder_for_new_tpu_bank(
&bank_forks,
&poh_recorder,
new_bank,
false,
);
bank = bank_forks.read().unwrap().working_bank_with_scheduler();
assert_matches!(poh_recorder.read().unwrap().bank(), Some(_));
insert_time.stop();

assert!(poh_recorder.read().unwrap().bank().is_none());
poh_recorder
.write()
.unwrap()
.set_bank_for_test(bank.clone());
assert!(poh_recorder.read().unwrap().bank().is_some());
debug!(
"new_bank_time: {}us insert_time: {}us poh_time: {}us",
new_bank_time.as_us(),
Expand Down
14 changes: 14 additions & 0 deletions banking-stage-ingress-types/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "agave-banking-stage-ingress-types"
description = "Agave banking stage ingress types"
documentation = "https://docs.rs/agave-banking-stage-ingress-types"
version = { workspace = true }
authors = { workspace = true }
repository = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
edition = { workspace = true }

[dependencies]
crossbeam-channel = { workspace = true }
solana-perf = { workspace = true }
4 changes: 4 additions & 0 deletions banking-stage-ingress-types/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
use {crossbeam_channel::Receiver, solana_perf::packet::PacketBatch, std::sync::Arc};

pub type BankingPacketBatch = Arc<Vec<PacketBatch>>;
pub type BankingPacketReceiver = Receiver<BankingPacketBatch>;
2 changes: 2 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ edition = { workspace = true }
codecov = { repository = "solana-labs/solana", branch = "master", service = "github" }

[dependencies]
agave-banking-stage-ingress-types = { workspace = true }
ahash = { workspace = true }
anyhow = { workspace = true }
arrayvec = { workspace = true }
assert_matches = { workspace = true }
base64 = { workspace = true }
bincode = { workspace = true }
bs58 = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#![feature(test)]

use {
agave_banking_stage_ingress_types::BankingPacketBatch,
solana_core::{banking_trace::Channels, validator::BlockProductionMethod},
solana_vote_program::{vote_state::TowerSync, vote_transaction::new_tower_sync_transaction},
};
Expand All @@ -24,7 +25,7 @@ use {
unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage},
BankingStage, BankingStageStats,
},
banking_trace::{BankingPacketBatch, BankingTracer},
banking_trace::BankingTracer,
},
solana_entry::entry::{next_hash, Entry},
solana_gossip::cluster_info::{ClusterInfo, Node},
Expand Down
5 changes: 3 additions & 2 deletions core/benches/banking_trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
extern crate test;

use {
agave_banking_stage_ingress_types::BankingPacketBatch,
solana_core::banking_trace::{
for_test::{
drop_and_clean_temp_dir_unless_suppressed, sample_packet_batch, terminate_tracer,
},
receiving_loop_with_minimized_sender_overhead, BankingPacketBatch, BankingTracer, Channels,
TraceError, TracerThreadResult, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT,
receiving_loop_with_minimized_sender_overhead, BankingTracer, Channels, TraceError,
TracerThreadResult, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT,
},
std::{
path::PathBuf,
Expand Down
25 changes: 16 additions & 9 deletions core/src/banking_simulation.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
#![cfg(feature = "dev-context-only-utils")]
use {
crate::{
banking_stage::{BankingStage, LikeClusterInfo},
banking_stage::{
update_bank_forks_and_poh_recorder_for_new_tpu_bank, BankingStage, LikeClusterInfo,
},
banking_trace::{
BankingPacketBatch, BankingTracer, ChannelLabel, Channels, TimedTracedEvent,
TracedEvent, TracedSender, TracerThread, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT,
BASENAME,
BankingTracer, ChannelLabel, Channels, TimedTracedEvent, TracedEvent, TracedSender,
TracerThread, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, BASENAME,
},
validator::BlockProductionMethod,
},
agave_banking_stage_ingress_types::BankingPacketBatch,
assert_matches::assert_matches,
bincode::deserialize_from,
crossbeam_channel::{unbounded, Sender},
itertools::Itertools,
Expand Down Expand Up @@ -450,6 +453,9 @@ impl SimulatorLoop {
info!("Bank::new_from_parent()!");

logger.log_jitter(&bank);
if let Some((result, _execute_timings)) = bank.wait_for_completed_scheduler() {
assert_matches!(result, Ok(()));
}
bank.freeze();
let new_slot = if bank.slot() == self.parent_slot {
info!("initial leader block!");
Expand Down Expand Up @@ -484,16 +490,17 @@ impl SimulatorLoop {
logger.log_frozen_bank_cost(&bank);
}
self.retransmit_slots_sender.send(bank.slot()).unwrap();
self.bank_forks.write().unwrap().insert(new_bank);
update_bank_forks_and_poh_recorder_for_new_tpu_bank(
&self.bank_forks,
&self.poh_recorder,
new_bank,
false,
);
bank = self
.bank_forks
.read()
.unwrap()
.working_bank_with_scheduler();
self.poh_recorder
.write()
.unwrap()
.set_bank(bank.clone_with_scheduler(), false);
} else {
logger.log_ongoing_bank_cost(&bank);
}
Expand Down
23 changes: 20 additions & 3 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
//! to construct a software pipeline. The stage uses all available CPU cores and
//! can do its processing in parallel with signature verification on the GPU.
#[cfg(feature = "dev-context-only-utils")]
use qualifier_attr::qualifiers;
use {
self::{
committer::Committer,
Expand All @@ -23,9 +25,9 @@ use {
scheduler_controller::SchedulerController, scheduler_error::SchedulerError,
},
},
banking_trace::BankingPacketReceiver,
validator::BlockProductionMethod,
},
agave_banking_stage_ingress_types::BankingPacketReceiver,
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
histogram::Histogram,
solana_client::connection_cache::ConnectionCache,
Expand All @@ -35,7 +37,7 @@ use {
solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH},
solana_poh::poh_recorder::{PohRecorder, TransactionRecorder},
solana_runtime::{
bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
vote_sender_types::ReplayVoteSender,
},
solana_sdk::{pubkey::Pubkey, timing::AtomicInterval},
Expand Down Expand Up @@ -716,11 +718,26 @@ impl BankingStage {
}
}

#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
pub(crate) fn update_bank_forks_and_poh_recorder_for_new_tpu_bank(
bank_forks: &RwLock<BankForks>,
poh_recorder: &RwLock<PohRecorder>,
tpu_bank: Bank,
track_transaction_indexes: bool,
) {
let tpu_bank = bank_forks.write().unwrap().insert(tpu_bank);
poh_recorder
.write()
.unwrap()
.set_bank(tpu_bank, track_transaction_indexes);
}

#[cfg(test)]
mod tests {
use {
super::*,
crate::banking_trace::{BankingPacketBatch, BankingTracer, Channels},
crate::banking_trace::{BankingTracer, Channels},
agave_banking_stage_ingress_types::BankingPacketBatch,
crossbeam_channel::{unbounded, Receiver},
itertools::Itertools,
solana_entry::entry::{self, Entry, EntrySlice},
Expand Down
2 changes: 1 addition & 1 deletion core/src/banking_stage/packet_deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
immutable_deserialized_packet::{DeserializedPacketError, ImmutableDeserializedPacket},
packet_filter::PacketFilterFailure,
},
crate::banking_trace::{BankingPacketBatch, BankingPacketReceiver},
agave_banking_stage_ingress_types::{BankingPacketBatch, BankingPacketReceiver},
crossbeam_channel::RecvTimeoutError,
solana_perf::packet::PacketBatch,
solana_sdk::saturating_add_assign,
Expand Down
2 changes: 1 addition & 1 deletion core/src/banking_stage/packet_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use {
unprocessed_transaction_storage::UnprocessedTransactionStorage,
BankingStageStats,
},
crate::banking_trace::BankingPacketReceiver,
agave_banking_stage_ingress_types::BankingPacketReceiver,
crossbeam_channel::RecvTimeoutError,
solana_measure::{measure::Measure, measure_us},
solana_sdk::{saturating_add_assign, timing::timestamp},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,19 +435,17 @@ impl<C: LikeClusterInfo, R: ReceiveAndBuffer> SchedulerController<C, R> {
mod tests {
use {
super::*,
crate::{
banking_stage::{
consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH,
packet_deserializer::PacketDeserializer,
scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId},
tests::create_slow_genesis_config,
transaction_scheduler::{
prio_graph_scheduler::PrioGraphSchedulerConfig,
receive_and_buffer::SanitizedTransactionReceiveAndBuffer,
},
crate::banking_stage::{
consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH,
packet_deserializer::PacketDeserializer,
scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId},
tests::create_slow_genesis_config,
transaction_scheduler::{
prio_graph_scheduler::PrioGraphSchedulerConfig,
receive_and_buffer::SanitizedTransactionReceiveAndBuffer,
},
banking_trace::BankingPacketBatch,
},
agave_banking_stage_ingress_types::BankingPacketBatch,
crossbeam_channel::{unbounded, Receiver, Sender},
itertools::Itertools,
solana_gossip::cluster_info::ClusterInfo,
Expand Down
4 changes: 1 addition & 3 deletions core/src/banking_trace.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use {
agave_banking_stage_ingress_types::{BankingPacketBatch, BankingPacketReceiver},
bincode::serialize_into,
chrono::{DateTime, Local},
crossbeam_channel::{unbounded, Receiver, SendError, Sender, TryRecvError},
rolling_file::{RollingCondition, RollingConditionBasic, RollingFileAppender},
solana_perf::packet::PacketBatch,
solana_sdk::{hash::Hash, slot_history::Slot},
std::{
fs::{create_dir_all, remove_dir_all},
Expand All @@ -19,9 +19,7 @@ use {
thiserror::Error,
};

pub type BankingPacketBatch = Arc<Vec<PacketBatch>>;
pub type BankingPacketSender = TracedSender;
pub type BankingPacketReceiver = Receiver<BankingPacketBatch>;
pub type TracerThreadResult = Result<(), TraceError>;
pub type TracerThread = Option<JoinHandle<TracerThreadResult>>;
pub type DirByteLimit = u64;
Expand Down
3 changes: 2 additions & 1 deletion core/src/cluster_info_vote_listener.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use {
crate::{
banking_trace::{BankingPacketBatch, BankingPacketSender},
banking_trace::BankingPacketSender,
consensus::vote_stake_tracker::VoteStakeTracker,
optimistic_confirmation_verifier::OptimisticConfirmationVerifier,
replay_stage::DUPLICATE_THRESHOLD,
result::{Error, Result},
sigverify,
},
agave_banking_stage_ingress_types::BankingPacketBatch,
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Select, Sender},
log::*,
solana_gossip::{
Expand Down
Loading

0 comments on commit 6b6a03b

Please sign in to comment.