Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Cleanup tempoary pub modules (#30268)
Browse files Browse the repository at this point in the history
Clean up temporary_pub_modules in tpu_client and thin_client
  • Loading branch information
lijunwangs authored Feb 11, 2023
1 parent 8770b15 commit d49b481
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 106 deletions.
2 changes: 1 addition & 1 deletion client/src/nonblocking/tpu_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use {
transaction::{Transaction, TransactionError},
transport::Result as TransportResult,
},
solana_tpu_client::nonblocking::tpu_client::{temporary_pub::*, TpuClient as BackendTpuClient},
solana_tpu_client::nonblocking::tpu_client::{Result, TpuClient as BackendTpuClient},
std::sync::Arc,
};

Expand Down
2 changes: 1 addition & 1 deletion client/src/tpu_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use {
transaction::{Transaction, TransactionError},
transport::Result as TransportResult,
},
solana_tpu_client::tpu_client::{temporary_pub::Result, TpuClient as BackendTpuClient},
solana_tpu_client::tpu_client::{Result, TpuClient as BackendTpuClient},
std::sync::Arc,
};
pub use {
Expand Down
105 changes: 50 additions & 55 deletions thin-client/src/thin_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,77 +43,72 @@ use {
},
};

pub mod temporary_pub {
use super::*;
struct ClientOptimizer {
cur_index: AtomicUsize,
experiment_index: AtomicUsize,
experiment_done: AtomicBool,
times: RwLock<Vec<u64>>,
num_clients: usize,
}

pub struct ClientOptimizer {
cur_index: AtomicUsize,
experiment_index: AtomicUsize,
experiment_done: AtomicBool,
times: RwLock<Vec<u64>>,
num_clients: usize,
}

impl ClientOptimizer {
pub fn new(num_clients: usize) -> Self {
Self {
cur_index: AtomicUsize::new(0),
experiment_index: AtomicUsize::new(0),
experiment_done: AtomicBool::new(false),
times: RwLock::new(vec![std::u64::MAX; num_clients]),
num_clients,
}
impl ClientOptimizer {
fn new(num_clients: usize) -> Self {
Self {
cur_index: AtomicUsize::new(0),
experiment_index: AtomicUsize::new(0),
experiment_done: AtomicBool::new(false),
times: RwLock::new(vec![std::u64::MAX; num_clients]),
num_clients,
}
}

pub fn experiment(&self) -> usize {
if self.experiment_index.load(Ordering::Relaxed) < self.num_clients {
let old = self.experiment_index.fetch_add(1, Ordering::Relaxed);
if old < self.num_clients {
old
} else {
self.best()
}
fn experiment(&self) -> usize {
if self.experiment_index.load(Ordering::Relaxed) < self.num_clients {
let old = self.experiment_index.fetch_add(1, Ordering::Relaxed);
if old < self.num_clients {
old
} else {
self.best()
}
} else {
self.best()
}
}

pub fn report(&self, index: usize, time_ms: u64) {
if self.num_clients > 1
&& (!self.experiment_done.load(Ordering::Relaxed) || time_ms == std::u64::MAX)
{
fn report(&self, index: usize, time_ms: u64) {
if self.num_clients > 1
&& (!self.experiment_done.load(Ordering::Relaxed) || time_ms == std::u64::MAX)
{
trace!(
"report {} with {} exp: {}",
index,
time_ms,
self.experiment_index.load(Ordering::Relaxed)
);

self.times.write().unwrap()[index] = time_ms;

if index == (self.num_clients - 1) || time_ms == std::u64::MAX {
let times = self.times.read().unwrap();
let (min_time, min_index) = min_index(&times);
trace!(
"report {} with {} exp: {}",
index,
time_ms,
self.experiment_index.load(Ordering::Relaxed)
"done experimenting min: {} time: {} times: {:?}",
min_index,
min_time,
times
);

self.times.write().unwrap()[index] = time_ms;

if index == (self.num_clients - 1) || time_ms == std::u64::MAX {
let times = self.times.read().unwrap();
let (min_time, min_index) = min_index(&times);
trace!(
"done experimenting min: {} time: {} times: {:?}",
min_index,
min_time,
times
);

// Only 1 thread should grab the num_clients-1 index, so this should be ok.
self.cur_index.store(min_index, Ordering::Relaxed);
self.experiment_done.store(true, Ordering::Relaxed);
}
// Only 1 thread should grab the num_clients-1 index, so this should be ok.
self.cur_index.store(min_index, Ordering::Relaxed);
self.experiment_done.store(true, Ordering::Relaxed);
}
}
}

pub fn best(&self) -> usize {
self.cur_index.load(Ordering::Relaxed)
}
fn best(&self) -> usize {
self.cur_index.load(Ordering::Relaxed)
}
}
use temporary_pub::*;

/// An object for querying and sending transactions to the network.
pub struct ThinClient<
Expand Down
68 changes: 31 additions & 37 deletions tpu-client/src/nonblocking/tpu_client.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,4 @@
#[cfg(feature = "spinner")]
use {
crate::tpu_client::temporary_pub::{SEND_TRANSACTION_INTERVAL, TRANSACTION_RESEND_INTERVAL},
indicatif::ProgressBar,
solana_rpc_client::spinner,
solana_rpc_client_api::request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
solana_sdk::{message::Message, signers::Signers, transaction::TransactionError},
};
pub use crate::tpu_client::Result;
use {
crate::tpu_client::{RecentLeaderSlots, TpuClientConfig, MAX_FANOUT_SLOTS},
bincode::serialize,
Expand Down Expand Up @@ -48,37 +41,38 @@ use {
time::{sleep, timeout, Duration, Instant},
},
};
#[cfg(feature = "spinner")]
use {
crate::tpu_client::{SEND_TRANSACTION_INTERVAL, TRANSACTION_RESEND_INTERVAL},
indicatif::ProgressBar,
solana_rpc_client::spinner,
solana_rpc_client_api::request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
solana_sdk::{message::Message, signers::Signers, transaction::TransactionError},
};

pub mod temporary_pub {
use super::*;

pub type Result<T> = std::result::Result<T, TpuSenderError>;

#[cfg(feature = "spinner")]
pub fn set_message_for_confirmed_transactions(
progress_bar: &ProgressBar,
confirmed_transactions: u32,
total_transactions: usize,
block_height: Option<u64>,
last_valid_block_height: u64,
status: &str,
) {
progress_bar.set_message(format!(
"{:>5.1}% | {:<40}{}",
confirmed_transactions as f64 * 100. / total_transactions as f64,
status,
match block_height {
Some(block_height) => format!(
" [block height {}; re-sign in {} blocks]",
block_height,
last_valid_block_height.saturating_sub(block_height),
),
None => String::new(),
},
));
}
#[cfg(feature = "spinner")]
fn set_message_for_confirmed_transactions(
progress_bar: &ProgressBar,
confirmed_transactions: u32,
total_transactions: usize,
block_height: Option<u64>,
last_valid_block_height: u64,
status: &str,
) {
progress_bar.set_message(format!(
"{:>5.1}% | {:<40}{}",
confirmed_transactions as f64 * 100. / total_transactions as f64,
status,
match block_height {
Some(block_height) => format!(
" [block height {}; re-sign in {} blocks]",
block_height,
last_valid_block_height.saturating_sub(block_height),
),
None => String::new(),
},
));
}
use temporary_pub::*;

#[derive(Error, Debug)]
pub enum TpuSenderError {
Expand Down
19 changes: 7 additions & 12 deletions tpu-client/src/tpu_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,14 @@ pub const DEFAULT_TPU_ENABLE_UDP: bool = false;
pub const DEFAULT_TPU_USE_QUIC: bool = true;
pub const DEFAULT_TPU_CONNECTION_POOL_SIZE: usize = 4;

pub mod temporary_pub {
use super::*;

pub type Result<T> = std::result::Result<T, TpuSenderError>;
pub type Result<T> = std::result::Result<T, TpuSenderError>;

/// Send at ~100 TPS
#[cfg(feature = "spinner")]
pub const SEND_TRANSACTION_INTERVAL: Duration = Duration::from_millis(10);
/// Retry batch send after 4 seconds
#[cfg(feature = "spinner")]
pub const TRANSACTION_RESEND_INTERVAL: Duration = Duration::from_secs(4);
}
use temporary_pub::*;
/// Send at ~100 TPS
#[cfg(feature = "spinner")]
pub(crate) const SEND_TRANSACTION_INTERVAL: Duration = Duration::from_millis(10);
/// Retry batch send after 4 seconds
#[cfg(feature = "spinner")]
pub(crate) const TRANSACTION_RESEND_INTERVAL: Duration = Duration::from_secs(4);

/// Default number of slots used to build TPU socket fanout set
pub const DEFAULT_FANOUT_SLOTS: u64 = 12;
Expand Down

0 comments on commit d49b481

Please sign in to comment.