Skip to content

Commit

Permalink
Enable QUIC client by default. Add arg to disable QUIC client. (Forwa…
Browse files Browse the repository at this point in the history
…rd port solana-labs#26927) (solana-labs#27194)

Enable QUIC client by default. Add arg to disable QUIC client.

* Enable QUIC client by default. Add arg to disable QUIC client.
* Deprecate --disable-quic-servers arg
* Add #[ignore] annotation to failing tests
  • Loading branch information
willhickey authored and haoran committed Aug 21, 2022
1 parent 6436a68 commit 58cd2d3
Show file tree
Hide file tree
Showing 14 changed files with 87 additions and 63 deletions.
6 changes: 3 additions & 3 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,10 @@ fn main() {
.help("Number of threads to use in the banking stage"),
)
.arg(
Arg::new("tpu_use_quic")
.long("tpu-use-quic")
Arg::new("tpu_disable_quic")
.long("tpu-disable-quic")
.takes_value(false)
.help("Forward messages to TPU using QUIC"),
.help("Disable forwarding messages to TPU using QUIC"),
)
.get_matches();

Expand Down
10 changes: 5 additions & 5 deletions bench-tps/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,10 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> {
.help("Submit transactions with a TpuClient")
)
.arg(
Arg::with_name("tpu_use_quic")
.long("tpu-use-quic")
Arg::with_name("tpu_disable_quic")
.long("tpu-disable-quic")
.takes_value(false)
.help("Submit transactions via QUIC; only affects ThinClient (default) \
.help("Do not submit transactions via QUIC; only affects ThinClient (default) \
or TpuClient sends"),
)
.arg(
Expand Down Expand Up @@ -358,8 +358,8 @@ pub fn extract_args(matches: &ArgMatches) -> Config {
args.external_client_type = ExternalClientType::RpcClient;
}

if matches.is_present("tpu_use_quic") {
args.use_quic = true;
if matches.is_present("tpu_disable_quic") {
args.use_quic = false;
}

if let Some(v) = matches.value_of("tpu_connection_pool_size") {
Expand Down
1 change: 1 addition & 0 deletions bench-tps/tests/bench_tps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ fn test_bench_tps_test_validator(config: Config) {

#[test]
#[serial]
#[ignore]
fn test_bench_tps_local_cluster_solana() {
test_bench_tps_local_cluster(Config {
tx_count: 100,
Expand Down
30 changes: 23 additions & 7 deletions client/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ static MAX_CONNECTIONS: usize = 1024;

/// Used to decide whether the TPU and underlying connection cache should use
/// QUIC connections.
pub const DEFAULT_TPU_USE_QUIC: bool = false;
pub const DEFAULT_TPU_USE_QUIC: bool = true;

/// Default TPU connection pool size per remote address
pub const DEFAULT_TPU_CONNECTION_POOL_SIZE: usize = 4;
Expand Down Expand Up @@ -683,6 +683,11 @@ mod tests {
// be lazy and not connect until first use or handle connection errors somehow
// (without crashing, as would be required in a real practical validator)
let connection_cache = ConnectionCache::default();
let port_offset = if connection_cache.use_quic() {
QUIC_PORT_OFFSET
} else {
0
};
let addrs = (0..MAX_CONNECTIONS)
.into_iter()
.map(|_| {
Expand All @@ -695,18 +700,29 @@ mod tests {
let map = connection_cache.map.read().unwrap();
assert!(map.len() == MAX_CONNECTIONS);
addrs.iter().for_each(|a| {
let conn = &map.get(a).expect("Address not found").connections[0];
let conn = conn.new_blocking_connection(*a, connection_cache.stats.clone());
assert!(a.ip() == conn.tpu_addr().ip());
let port = a
.port()
.checked_add(port_offset)
.unwrap_or_else(|| a.port());
let addr = &SocketAddr::new(a.ip(), port);

let conn = &map.get(addr).expect("Address not found").connections[0];
let conn = conn.new_blocking_connection(*addr, connection_cache.stats.clone());
assert!(addr.ip() == conn.tpu_addr().ip());
});
}

let addr = get_addr(&mut rng);
connection_cache.get_connection(&addr);
let addr = &get_addr(&mut rng);
connection_cache.get_connection(addr);

let port = addr
.port()
.checked_add(port_offset)
.unwrap_or_else(|| addr.port());
let addr_with_quic_port = SocketAddr::new(addr.ip(), port);
let map = connection_cache.map.read().unwrap();
assert!(map.len() == MAX_CONNECTIONS);
let _conn = map.get(&addr).expect("Address not found");
let _conn = map.get(&addr_with_quic_port).expect("Address not found");
}

#[test]
Expand Down
2 changes: 2 additions & 0 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4122,6 +4122,7 @@ mod tests {
}

#[test]
#[ignore]
fn test_forwarder_budget() {
solana_logger::setup();
// Create `PacketBatch` with 1 unprocessed packet
Expand Down Expand Up @@ -4209,6 +4210,7 @@ mod tests {
}

#[test]
#[ignore]
fn test_handle_forwarding() {
solana_logger::setup();
// packets are deserialized upon receiving, failed packets will not be
Expand Down
69 changes: 30 additions & 39 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ pub struct Tpu {
banking_stage: BankingStage,
cluster_info_vote_listener: ClusterInfoVoteListener,
broadcast_stage: BroadcastStage,
tpu_quic_t: Option<thread::JoinHandle<()>>,
tpu_forwards_quic_t: Option<thread::JoinHandle<()>>,
tpu_quic_t: thread::JoinHandle<()>,
tpu_forwards_quic_t: thread::JoinHandle<()>,
find_packet_sender_stake_stage: FindPacketSenderStakeStage,
vote_find_packet_sender_stake_stage: FindPacketSenderStakeStage,
staked_nodes_updater_service: StakedNodesUpdaterService,
Expand Down Expand Up @@ -97,7 +97,6 @@ impl Tpu {
connection_cache: &Arc<ConnectionCache>,
keypair: &Keypair,
log_messages_bytes_limit: Option<usize>,
enable_quic_servers: bool,
staked_nodes: &Arc<RwLock<StakedNodes>>,
shared_staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
) -> Self {
Expand Down Expand Up @@ -157,37 +156,33 @@ impl Tpu {
let (verified_sender, verified_receiver) = unbounded();

let stats = Arc::new(StreamStats::default());
let tpu_quic_t = enable_quic_servers.then(|| {
spawn_server(
transactions_quic_sockets,
keypair,
cluster_info.my_contact_info().tpu.ip(),
packet_sender,
exit.clone(),
MAX_QUIC_CONNECTIONS_PER_PEER,
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
stats.clone(),
)
.unwrap()
});
let tpu_quic_t = spawn_server(
transactions_quic_sockets,
keypair,
cluster_info.my_contact_info().tpu.ip(),
packet_sender,
exit.clone(),
MAX_QUIC_CONNECTIONS_PER_PEER,
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
stats.clone(),
)
.unwrap();

let tpu_forwards_quic_t = enable_quic_servers.then(|| {
spawn_server(
transactions_forwards_quic_sockets,
keypair,
cluster_info.my_contact_info().tpu_forwards.ip(),
forwarded_packet_sender,
exit.clone(),
MAX_QUIC_CONNECTIONS_PER_PEER,
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS),
0, // Prevent unstaked nodes from forwarding transactions
stats,
)
.unwrap()
});
let tpu_forwards_quic_t = spawn_server(
transactions_forwards_quic_sockets,
keypair,
cluster_info.my_contact_info().tpu_forwards.ip(),
forwarded_packet_sender,
exit.clone(),
MAX_QUIC_CONNECTIONS_PER_PEER,
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS),
0, // Prevent unstaked nodes from forwarding transactions
stats,
)
.unwrap();

let sigverify_stage = {
let verifier = TransactionSigVerifier::new(verified_sender);
Expand Down Expand Up @@ -274,13 +269,9 @@ impl Tpu {
self.find_packet_sender_stake_stage.join(),
self.vote_find_packet_sender_stake_stage.join(),
self.staked_nodes_updater_service.join(),
self.tpu_quic_t.join(),
self.tpu_forwards_quic_t.join(),
];
if let Some(tpu_quic_t) = self.tpu_quic_t {
tpu_quic_t.join()?;
}
if let Some(tpu_forwards_quic_t) = self.tpu_forwards_quic_t {
tpu_forwards_quic_t.join()?;
}
let broadcast_result = self.broadcast_stage.join();
for result in results {
result?;
Expand Down
3 changes: 0 additions & 3 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ pub struct ValidatorConfig {
pub wait_to_vote_slot: Option<Slot>,
pub ledger_column_options: LedgerColumnOptions,
pub runtime_config: RuntimeConfig,
pub enable_quic_servers: bool,
}

impl Default for ValidatorConfig {
Expand Down Expand Up @@ -239,7 +238,6 @@ impl Default for ValidatorConfig {
wait_to_vote_slot: None,
ledger_column_options: LedgerColumnOptions::default(),
runtime_config: RuntimeConfig::default(),
enable_quic_servers: true,
}
}
}
Expand Down Expand Up @@ -1025,7 +1023,6 @@ impl Validator {
&connection_cache,
&identity_keypair,
config.runtime_config.log_messages_bytes_limit,
config.enable_quic_servers,
&staked_nodes,
config.staked_nodes_overrides.clone(),
);
Expand Down
2 changes: 2 additions & 0 deletions dos/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1185,11 +1185,13 @@ pub mod test {
}

#[test]
#[ignore]
fn test_dos_with_blockhash_and_payer() {
run_dos_with_blockhash_and_payer(/*tpu_use_quic*/ false)
}

#[test]
#[ignore]
fn test_dos_with_blockhash_and_payer_and_quic() {
run_dos_with_blockhash_and_payer(/*tpu_use_quic*/ true)
}
Expand Down
1 change: 0 additions & 1 deletion local-cluster/src/validator_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
wait_to_vote_slot: config.wait_to_vote_slot,
ledger_column_options: config.ledger_column_options.clone(),
runtime_config: config.runtime_config.clone(),
enable_quic_servers: config.enable_quic_servers,
}
}

Expand Down
4 changes: 4 additions & 0 deletions local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ fn test_spend_and_verify_all_nodes_3() {

#[test]
#[serial]
#[ignore]
fn test_local_cluster_signature_subscribe() {
solana_logger::setup_with_default(RUST_LOG_FILTER);
let num_nodes = 2;
Expand Down Expand Up @@ -311,6 +312,7 @@ fn test_two_unbalanced_stakes() {

#[test]
#[serial]
#[ignore]
fn test_forwarding() {
solana_logger::setup_with_default(RUST_LOG_FILTER);
// Set up a cluster where one node is never the leader, so all txs sent to this node
Expand Down Expand Up @@ -1228,6 +1230,7 @@ fn test_incremental_snapshot_download_with_crossing_full_snapshot_interval_at_st
#[allow(unused_attributes)]
#[test]
#[serial]
#[ignore]
fn test_snapshot_restart_tower() {
solana_logger::setup_with_default(RUST_LOG_FILTER);
// First set up the cluster with 2 nodes
Expand Down Expand Up @@ -2520,6 +2523,7 @@ fn run_test_load_program_accounts_partition(scan_commitment: CommitmentConfig) {

#[test]
#[serial]
#[ignore]
fn test_votes_land_in_fork_during_long_partition() {
let total_stake = 3 * DEFAULT_NODE_STAKE;
// Make `lighter_stake` insufficient for switching threshold
Expand Down
2 changes: 2 additions & 0 deletions local-cluster/tests/local_cluster_slow_1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ mod common;

#[test]
#[serial]
#[ignore]
// Steps in this test:
// We want to create a situation like:
/*
Expand Down Expand Up @@ -588,6 +589,7 @@ fn test_duplicate_shreds_broadcast_leader() {

#[test]
#[serial]
#[ignore]
fn test_switch_threshold_uses_gossip_votes() {
solana_logger::setup_with_default(RUST_LOG_FILTER);
let total_stake = 100 * DEFAULT_NODE_STAKE;
Expand Down
1 change: 1 addition & 0 deletions local-cluster/tests/local_cluster_slow_2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ fn test_leader_failure_4() {

#[test]
#[serial]
#[ignore]
fn test_ledger_cleanup_service() {
solana_logger::setup_with_default(RUST_LOG_FILTER);
error!("test_ledger_cleanup_service");
Expand Down
2 changes: 1 addition & 1 deletion multinode-demo/bootstrap-validator.sh
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ while [[ -n $1 ]]; do
elif [[ $1 = --enable-rpc-bigtable-ledger-storage ]]; then
args+=("$1")
shift
elif [[ $1 = --tpu-use-quic ]]; then
elif [[ $1 = --tpu-disable-quic ]]; then
args+=("$1")
shift
elif [[ $1 = --rpc-send-batch-ms ]]; then
Expand Down
17 changes: 13 additions & 4 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1218,13 +1218,21 @@ pub fn main() {
Arg::with_name("tpu_use_quic")
.long("tpu-use-quic")
.takes_value(false)
.hidden(true)
.conflicts_with("tpu_disable_quic")
.help("Use QUIC to send transactions."),
)
.arg(
Arg::with_name("tpu_disable_quic")
.long("tpu-disable-quic")
.takes_value(false)
.help("Do not use QUIC to send transactions."),
)
.arg(
Arg::with_name("disable_quic_servers")
.long("disable-quic-servers")
.takes_value(false)
.help("Disable QUIC TPU servers"),
.hidden(true)
)
.arg(
Arg::with_name("enable_quic_servers")
Expand Down Expand Up @@ -2394,8 +2402,7 @@ pub fn main() {
let restricted_repair_only_mode = matches.is_present("restricted_repair_only_mode");
let accounts_shrink_optimize_total_space =
value_t_or_exit!(matches, "accounts_shrink_optimize_total_space", bool);
let tpu_use_quic = matches.is_present("tpu_use_quic");
let enable_quic_servers = !matches.is_present("disable_quic_servers");
let tpu_use_quic = !matches.is_present("tpu_disable_quic");
let tpu_connection_pool_size = value_t_or_exit!(matches, "tpu_connection_pool_size", usize);

let shrink_ratio = value_t_or_exit!(matches, "accounts_shrink_ratio", f64);
Expand Down Expand Up @@ -2565,6 +2572,9 @@ pub fn main() {
if matches.is_present("enable_quic_servers") {
warn!("--enable-quic-servers is now the default behavior. This flag is deprecated and can be removed from the launch args");
}
if matches.is_present("disable_quic_servers") {
warn!("--disable-quic-servers is deprecated. The quic server cannot be disabled.");
}

let rpc_bigtable_config = if matches.is_present("enable_rpc_bigtable_ledger_storage")
|| matches.is_present("enable_bigtable_ledger_upload")
Expand Down Expand Up @@ -2749,7 +2759,6 @@ pub fn main() {
log_messages_bytes_limit: value_of(&matches, "log_messages_bytes_limit"),
..RuntimeConfig::default()
},
enable_quic_servers,
staked_nodes_overrides: staked_nodes_overrides.clone(),
..ValidatorConfig::default()
};
Expand Down

0 comments on commit 58cd2d3

Please sign in to comment.