Skip to content

Commit

Permalink
uses Duration type instead of untyped ..._ms: u64 (#30971)
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri authored Mar 31, 2023
1 parent 3f6c33d commit ff9a42a
Show file tree
Hide file tree
Showing 20 changed files with 165 additions and 180 deletions.
2 changes: 1 addition & 1 deletion bench-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ fn main() -> Result<()> {
s_reader,
recycler.clone(),
stats.clone(),
1,
Duration::from_millis(1), // coalesce
true,
None,
));
Expand Down
8 changes: 4 additions & 4 deletions client/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ mod tests {
super::*,
crate::connection_cache::ConnectionCache,
crossbeam_channel::unbounded,
solana_sdk::{net::DEFAULT_TPU_COALESCE_MS, quic::QUIC_PORT_OFFSET, signature::Keypair},
solana_sdk::{net::DEFAULT_TPU_COALESCE, quic::QUIC_PORT_OFFSET, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, quic::StreamStats,
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::StreamStats,
streamer::StakedNodes,
},
std::{
Expand Down Expand Up @@ -262,8 +262,8 @@ mod tests {
10,
10,
response_recv_stats,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS,
DEFAULT_TPU_COALESCE_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions core/src/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl AncestorHashesService {
Arc::new(StreamerReceiveStats::new(
"ancestor_hashes_response_receiver",
)),
1,
Duration::from_millis(1), // coalesce
false,
None,
);
Expand Down Expand Up @@ -1008,7 +1008,7 @@ mod test {
Arc::new(StreamerReceiveStats::new(
"ancestor_hashes_response_receiver",
)),
1,
Duration::from_millis(1), // coalesce
false,
None,
);
Expand Down
16 changes: 8 additions & 8 deletions core/src/fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl FetchStage {
tpu_vote_sockets: Vec<UdpSocket>,
exit: &Arc<AtomicBool>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
coalesce_ms: u64,
coalesce: Duration,
) -> (Self, PacketBatchReceiver, PacketBatchReceiver) {
let (sender, receiver) = unbounded();
let (vote_sender, vote_receiver) = unbounded();
Expand All @@ -53,7 +53,7 @@ impl FetchStage {
&forward_sender,
forward_receiver,
poh_recorder,
coalesce_ms,
coalesce,
None,
DEFAULT_TPU_ENABLE_UDP,
),
Expand All @@ -73,7 +73,7 @@ impl FetchStage {
forward_sender: &PacketBatchSender,
forward_receiver: PacketBatchReceiver,
poh_recorder: &Arc<RwLock<PohRecorder>>,
coalesce_ms: u64,
coalesce: Duration,
in_vote_only_mode: Option<Arc<AtomicBool>>,
tpu_enable_udp: bool,
) -> Self {
Expand All @@ -90,7 +90,7 @@ impl FetchStage {
forward_sender,
forward_receiver,
poh_recorder,
coalesce_ms,
coalesce,
in_vote_only_mode,
tpu_enable_udp,
)
Expand Down Expand Up @@ -149,7 +149,7 @@ impl FetchStage {
forward_sender: &PacketBatchSender,
forward_receiver: PacketBatchReceiver,
poh_recorder: &Arc<RwLock<PohRecorder>>,
coalesce_ms: u64,
coalesce: Duration,
in_vote_only_mode: Option<Arc<AtomicBool>>,
tpu_enable_udp: bool,
) -> Self {
Expand All @@ -167,7 +167,7 @@ impl FetchStage {
sender.clone(),
recycler.clone(),
tpu_stats.clone(),
coalesce_ms,
coalesce,
true,
in_vote_only_mode.clone(),
)
Expand All @@ -188,7 +188,7 @@ impl FetchStage {
forward_sender.clone(),
recycler.clone(),
tpu_forward_stats.clone(),
coalesce_ms,
coalesce,
true,
in_vote_only_mode.clone(),
)
Expand All @@ -208,7 +208,7 @@ impl FetchStage {
vote_sender.clone(),
recycler.clone(),
tpu_vote_stats.clone(),
coalesce_ms,
coalesce,
true,
None,
)
Expand Down
3 changes: 2 additions & 1 deletion core/src/serve_repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use {
net::UdpSocket,
sync::{atomic::AtomicBool, Arc},
thread::{self, JoinHandle},
time::Duration,
},
};

Expand Down Expand Up @@ -40,7 +41,7 @@ impl ServeRepairService {
request_sender,
Recycler::default(),
Arc::new(StreamerReceiveStats::new("serve_repair_receiver")),
1,
Duration::from_millis(1), // coalesce
false,
None,
);
Expand Down
2 changes: 1 addition & 1 deletion core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl ShredFetchStage {
packet_sender.clone(),
recycler.clone(),
Arc::new(StreamerReceiveStats::new("packet_modifier")),
1,
Duration::from_millis(1), // coalesce
true,
None,
)
Expand Down
17 changes: 9 additions & 8 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! The `tpu` module implements the Transaction Processing Unit, a
//! multi-stage transaction processing pipeline in software.
pub use solana_sdk::net::DEFAULT_TPU_COALESCE_MS;
pub use solana_sdk::net::DEFAULT_TPU_COALESCE;
use {
crate::{
banking_stage::BankingStage,
Expand Down Expand Up @@ -33,7 +33,7 @@ use {
},
solana_sdk::{pubkey::Pubkey, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS,
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
quic::{spawn_server, StreamStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
streamer::StakedNodes,
},
Expand All @@ -42,6 +42,7 @@ use {
net::UdpSocket,
sync::{atomic::AtomicBool, Arc, RwLock},
thread,
time::Duration,
},
};

Expand Down Expand Up @@ -93,7 +94,7 @@ impl Tpu {
replay_vote_receiver: ReplayVoteReceiver,
replay_vote_sender: ReplayVoteSender,
bank_notification_sender: Option<BankNotificationSender>,
tpu_coalesce_ms: u64,
tpu_coalesce: Duration,
cluster_confirmed_slot_sender: GossipDuplicateConfirmedSlotsSender,
connection_cache: &Arc<ConnectionCache>,
keypair: &Keypair,
Expand Down Expand Up @@ -127,7 +128,7 @@ impl Tpu {
&forwarded_packet_sender,
forwarded_packet_receiver,
poh_recorder,
tpu_coalesce_ms,
tpu_coalesce,
Some(bank_forks.read().unwrap().get_vote_only_mode_signal()),
tpu_enable_udp,
);
Expand Down Expand Up @@ -177,8 +178,8 @@ impl Tpu {
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
stats.clone(),
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS,
tpu_coalesce_ms,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
tpu_coalesce,
)
.unwrap();

Expand All @@ -197,8 +198,8 @@ impl Tpu {
MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS),
0, // Prevent unstaked nodes from forwarding transactions
stats,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS,
tpu_coalesce_ms,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
tpu_coalesce,
)
.unwrap();

Expand Down
8 changes: 4 additions & 4 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use {
verify_net_stats_access, SystemMonitorService, SystemMonitorStatsReportConfig,
},
tower_storage::TowerStorage,
tpu::{Tpu, TpuSockets, DEFAULT_TPU_COALESCE_MS},
tpu::{Tpu, TpuSockets, DEFAULT_TPU_COALESCE},
tvu::{Tvu, TvuConfig, TvuSockets},
},
crossbeam_channel::{bounded, unbounded, Receiver},
Expand Down Expand Up @@ -225,7 +225,7 @@ pub struct ValidatorConfig {
pub warp_slot: Option<Slot>,
pub accounts_db_test_hash_calculation: bool,
pub accounts_db_skip_shrink: bool,
pub tpu_coalesce_ms: u64,
pub tpu_coalesce: Duration,
pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
pub validator_exit: Arc<RwLock<Exit>>,
pub no_wait_for_vote_to_start_leader: bool,
Expand Down Expand Up @@ -290,7 +290,7 @@ impl Default for ValidatorConfig {
warp_slot: None,
accounts_db_test_hash_calculation: false,
accounts_db_skip_shrink: false,
tpu_coalesce_ms: DEFAULT_TPU_COALESCE_MS,
tpu_coalesce: DEFAULT_TPU_COALESCE,
staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
validator_exit: Arc::new(RwLock::new(Exit::default())),
no_wait_for_vote_to_start_leader: true,
Expand Down Expand Up @@ -1146,7 +1146,7 @@ impl Validator {
replay_vote_receiver,
replay_vote_sender,
bank_notification_sender,
config.tpu_coalesce_ms,
config.tpu_coalesce,
cluster_confirmed_slot_sender,
&connection_cache,
&identity_keypair,
Expand Down
2 changes: 1 addition & 1 deletion gossip/src/gossip_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl GossipService {
request_sender,
Recycler::default(),
Arc::new(StreamerReceiveStats::new("gossip_receiver")),
1,
Duration::from_millis(1), // coalesce
false,
None,
);
Expand Down
2 changes: 1 addition & 1 deletion local-cluster/src/validator_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
warp_slot: config.warp_slot,
accounts_db_test_hash_calculation: config.accounts_db_test_hash_calculation,
accounts_db_skip_shrink: config.accounts_db_skip_shrink,
tpu_coalesce_ms: config.tpu_coalesce_ms,
tpu_coalesce: config.tpu_coalesce,
staked_nodes_overrides: config.staked_nodes_overrides.clone(),
validator_exit: Arc::new(RwLock::new(Exit::default())),
poh_hashes_per_batch: config.poh_hashes_per_batch,
Expand Down
29 changes: 8 additions & 21 deletions quic-client/src/nonblocking/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use {
log::*,
quinn::{
ClientConfig, ConnectError, Connection, ConnectionError, Endpoint, EndpointConfig,
IdleTimeout, TokioRuntime, TransportConfig, VarInt, WriteError,
IdleTimeout, TokioRuntime, TransportConfig, WriteError,
},
solana_connection_cache::{
client_connection::ClientStats, connection_cache_stats::ConnectionCacheStats,
Expand All @@ -20,7 +20,7 @@ use {
solana_rpc_client_api::client_error::ErrorKind as ClientErrorKind,
solana_sdk::{
quic::{
QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_KEEP_ALIVE_MS, QUIC_MAX_TIMEOUT_MS,
QUIC_CONNECTION_HANDSHAKE_TIMEOUT, QUIC_KEEP_ALIVE, QUIC_MAX_TIMEOUT,
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
},
signature::Keypair,
Expand All @@ -33,7 +33,6 @@ use {
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::{atomic::Ordering, Arc},
thread,
time::Duration,
},
thiserror::Error,
tokio::{sync::RwLock, time::timeout},
Expand Down Expand Up @@ -129,9 +128,9 @@ impl QuicLazyInitializedEndpoint {
let mut config = ClientConfig::new(Arc::new(crypto));
let mut transport_config = TransportConfig::default();

let timeout = IdleTimeout::from(VarInt::from_u32(QUIC_MAX_TIMEOUT_MS));
let timeout = IdleTimeout::try_from(QUIC_MAX_TIMEOUT).unwrap();
transport_config.max_idle_timeout(Some(timeout));
transport_config.keep_alive_interval(Some(Duration::from_millis(QUIC_KEEP_ALIVE_MS)));
transport_config.keep_alive_interval(Some(QUIC_KEEP_ALIVE));
config.transport_config(Arc::new(transport_config));

endpoint.set_default_client_config(config);
Expand Down Expand Up @@ -198,11 +197,7 @@ impl QuicNewConnection {

let connecting = endpoint.connect(addr, "connect")?;
stats.total_connections.fetch_add(1, Ordering::Relaxed);
if let Ok(connecting_result) = timeout(
Duration::from_millis(QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS),
connecting,
)
.await
if let Ok(connecting_result) = timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, connecting).await
{
if connecting_result.is_err() {
stats.connection_errors.fetch_add(1, Ordering::Relaxed);
Expand Down Expand Up @@ -239,12 +234,7 @@ impl QuicNewConnection {
stats.total_connections.fetch_add(1, Ordering::Relaxed);
let connection = match connecting.into_0rtt() {
Ok((connection, zero_rtt)) => {
if let Ok(zero_rtt) = timeout(
Duration::from_millis(QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS),
zero_rtt,
)
.await
{
if let Ok(zero_rtt) = timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, zero_rtt).await {
if zero_rtt {
stats.zero_rtt_accepts.fetch_add(1, Ordering::Relaxed);
} else {
Expand All @@ -258,11 +248,8 @@ impl QuicNewConnection {
Err(connecting) => {
stats.connection_errors.fetch_add(1, Ordering::Relaxed);

if let Ok(connecting_result) = timeout(
Duration::from_millis(QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS),
connecting,
)
.await
if let Ok(connecting_result) =
timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, connecting).await
{
connecting_result?
} else {
Expand Down
14 changes: 5 additions & 9 deletions quic-client/src/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use {
};

pub const MAX_OUTSTANDING_TASK: u64 = 2000;
pub const SEND_DATA_TIMEOUT_MS: u64 = 10000;
const SEND_DATA_TIMEOUT: Duration = Duration::from_secs(10);

/// A semaphore used for limiting the number of asynchronous tasks spawn to the
/// runtime. Before spawnning a task, use acquire. After the task is done (be it
Expand Down Expand Up @@ -79,11 +79,7 @@ async fn send_data_async(
connection: Arc<NonblockingQuicConnection>,
buffer: Vec<u8>,
) -> TransportResult<()> {
let result = timeout(
Duration::from_millis(SEND_DATA_TIMEOUT_MS),
connection.send_data(&buffer),
)
.await;
let result = timeout(SEND_DATA_TIMEOUT, connection.send_data(&buffer)).await;
ASYNC_TASK_SEMAPHORE.release();
handle_send_result(result, connection)
}
Expand All @@ -92,10 +88,10 @@ async fn send_data_batch_async(
connection: Arc<NonblockingQuicConnection>,
buffers: Vec<Vec<u8>>,
) -> TransportResult<()> {
let time_out = SEND_DATA_TIMEOUT_MS * buffers.len() as u64;

let result = timeout(
Duration::from_millis(time_out),
u32::try_from(buffers.len())
.map(|size| SEND_DATA_TIMEOUT.saturating_mul(size))
.unwrap_or(Duration::MAX),
connection.send_data_batch(&buffers),
)
.await;
Expand Down
Loading

0 comments on commit ff9a42a

Please sign in to comment.