Skip to content

Commit

Permalink
client: Use async TPU client in sync TPU client by sharing tokio runt…
Browse files Browse the repository at this point in the history
…ime (solana-labs#26996)

* Make the sync tpu client use the async tpu client

* Try to fix CI errors

* Fix formatting

* Make rpc_client::get_nonblocking_client public only in the crate

* Save work

* Temporary hack to test sharing runtime between tpu_client and rpc_client

* [WIP] Copy rpc client

* Fix build

* Small refactoring

* Remove copies

* Refactor access to RPC client fields

* Change `clone_inner_client` to `get_inner_client`

Co-authored-by: Ryan Leung <[email protected]>
  • Loading branch information
2 people authored and xiangzhu70 committed Aug 17, 2022
1 parent cc36af0 commit 56e8975
Show file tree
Hide file tree
Showing 5 changed files with 385 additions and 774 deletions.
6 changes: 6 additions & 0 deletions client/src/nonblocking/pubsub_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,14 @@ pub enum PubsubClientError {
#[error("subscribe failed: {reason}")]
SubscribeFailed { reason: String, message: String },

#[error("unexpected message format: {0}")]
UnexpectedMessageError(String),

#[error("request failed: {reason}")]
RequestFailed { reason: String, message: String },

#[error("request error: {0}")]
RequestError(String),
}

type UnsubscribeFn = Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send>;
Expand Down
168 changes: 162 additions & 6 deletions client/src/nonblocking/tpu_client.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use {
crate::{
client_error::ClientError,
client_error::{ClientError, Result as ClientResult},
connection_cache::ConnectionCache,
nonblocking::{
pubsub_client::{PubsubClient, PubsubClientError},
rpc_client::RpcClient,
tpu_connection::TpuConnection,
},
rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
rpc_response::SlotUpdate,
rpc_response::{RpcContactInfo, SlotUpdate},
spinner,
tpu_client::{
LeaderTpuCache, LeaderTpuCacheUpdateInfo, RecentLeaderSlots, TpuClientConfig,
MAX_FANOUT_SLOTS, SEND_TRANSACTION_INTERVAL, TRANSACTION_RESEND_INTERVAL,
RecentLeaderSlots, TpuClientConfig, MAX_FANOUT_SLOTS, SEND_TRANSACTION_INTERVAL,
TRANSACTION_RESEND_INTERVAL,
},
},
bincode::serialize,
Expand All @@ -21,15 +21,18 @@ use {
solana_sdk::{
clock::Slot,
commitment_config::CommitmentConfig,
epoch_info::EpochInfo,
message::Message,
pubkey::Pubkey,
signature::SignerError,
signers::Signers,
transaction::{Transaction, TransactionError},
transport::{Result as TransportResult, TransportError},
},
std::{
collections::HashMap,
collections::{HashMap, HashSet},
net::SocketAddr,
str::FromStr,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
Expand All @@ -56,6 +59,156 @@ pub enum TpuSenderError {
Custom(String),
}

struct LeaderTpuCacheUpdateInfo {
maybe_cluster_nodes: Option<ClientResult<Vec<RpcContactInfo>>>,
maybe_epoch_info: Option<ClientResult<EpochInfo>>,
maybe_slot_leaders: Option<ClientResult<Vec<Pubkey>>>,
}
impl LeaderTpuCacheUpdateInfo {
pub fn has_some(&self) -> bool {
self.maybe_cluster_nodes.is_some()
|| self.maybe_epoch_info.is_some()
|| self.maybe_slot_leaders.is_some()
}
}

struct LeaderTpuCache {
first_slot: Slot,
leaders: Vec<Pubkey>,
leader_tpu_map: HashMap<Pubkey, SocketAddr>,
slots_in_epoch: Slot,
last_epoch_info_slot: Slot,
}

impl LeaderTpuCache {
pub fn new(
first_slot: Slot,
slots_in_epoch: Slot,
leaders: Vec<Pubkey>,
cluster_nodes: Vec<RpcContactInfo>,
) -> Self {
let leader_tpu_map = Self::extract_cluster_tpu_sockets(cluster_nodes);
Self {
first_slot,
leaders,
leader_tpu_map,
slots_in_epoch,
last_epoch_info_slot: first_slot,
}
}

// Last slot that has a cached leader pubkey
pub fn last_slot(&self) -> Slot {
self.first_slot + self.leaders.len().saturating_sub(1) as u64
}

pub fn slot_info(&self) -> (Slot, Slot, Slot) {
(
self.last_slot(),
self.last_epoch_info_slot,
self.slots_in_epoch,
)
}

// Get the TPU sockets for the current leader and upcoming leaders according to fanout size
pub fn get_leader_sockets(&self, current_slot: Slot, fanout_slots: u64) -> Vec<SocketAddr> {
let mut leader_set = HashSet::new();
let mut leader_sockets = Vec::new();
for leader_slot in current_slot..current_slot + fanout_slots {
if let Some(leader) = self.get_slot_leader(leader_slot) {
if let Some(tpu_socket) = self.leader_tpu_map.get(leader) {
if leader_set.insert(*leader) {
leader_sockets.push(*tpu_socket);
}
} else {
// The leader is probably delinquent
trace!("TPU not available for leader {}", leader);
}
} else {
// Overran the local leader schedule cache
warn!(
"Leader not known for slot {}; cache holds slots [{},{}]",
leader_slot,
self.first_slot,
self.last_slot()
);
}
}
leader_sockets
}

pub fn get_slot_leader(&self, slot: Slot) -> Option<&Pubkey> {
if slot >= self.first_slot {
let index = slot - self.first_slot;
self.leaders.get(index as usize)
} else {
None
}
}

pub fn extract_cluster_tpu_sockets(
cluster_contact_info: Vec<RpcContactInfo>,
) -> HashMap<Pubkey, SocketAddr> {
cluster_contact_info
.into_iter()
.filter_map(|contact_info| {
Some((
Pubkey::from_str(&contact_info.pubkey).ok()?,
contact_info.tpu?,
))
})
.collect()
}

pub fn fanout(slots_in_epoch: Slot) -> Slot {
(2 * MAX_FANOUT_SLOTS).min(slots_in_epoch)
}

pub fn update_all(
&mut self,
estimated_current_slot: Slot,
cache_update_info: LeaderTpuCacheUpdateInfo,
) -> (bool, bool) {
let mut has_error = false;
let mut cluster_refreshed = false;
if let Some(cluster_nodes) = cache_update_info.maybe_cluster_nodes {
match cluster_nodes {
Ok(cluster_nodes) => {
let leader_tpu_map = LeaderTpuCache::extract_cluster_tpu_sockets(cluster_nodes);
self.leader_tpu_map = leader_tpu_map;
cluster_refreshed = true;
}
Err(err) => {
warn!("Failed to fetch cluster tpu sockets: {}", err);
has_error = true;
}
}
}

if let Some(Ok(epoch_info)) = cache_update_info.maybe_epoch_info {
self.slots_in_epoch = epoch_info.slots_in_epoch;
self.last_epoch_info_slot = estimated_current_slot;
}

if let Some(slot_leaders) = cache_update_info.maybe_slot_leaders {
match slot_leaders {
Ok(slot_leaders) => {
self.first_slot = estimated_current_slot;
self.leaders = slot_leaders;
}
Err(err) => {
warn!(
"Failed to fetch slot leaders (current estimated slot: {}): {}",
estimated_current_slot, err
);
has_error = true;
}
}
}
(has_error, cluster_refreshed)
}
}

type Result<T> = std::result::Result<T, TpuSenderError>;

/// Client which sends transactions directly to the current leader's TPU port over UDP.
Expand Down Expand Up @@ -102,7 +255,10 @@ impl TpuClient {

/// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
/// Returns the last error if all sends fail
async fn try_send_wire_transaction(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
pub async fn try_send_wire_transaction(
&self,
wire_transaction: Vec<u8>,
) -> TransportResult<()> {
let leaders = self
.leader_tpu_service
.leader_tpu_sockets(self.fanout_slots);
Expand Down
22 changes: 2 additions & 20 deletions client/src/pubsub_client.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub use crate::nonblocking::pubsub_client::PubsubClientError;
use {
crate::{
rpc_config::{
Expand Down Expand Up @@ -31,29 +32,10 @@ use {
thread::{sleep, JoinHandle},
time::Duration,
},
thiserror::Error,
tungstenite::{connect, stream::MaybeTlsStream, Message, WebSocket},
url::{ParseError, Url},
url::Url,
};

#[derive(Debug, Error)]
pub enum PubsubClientError {
#[error("url parse error")]
UrlParseError(#[from] ParseError),

#[error("unable to connect to server")]
ConnectionError(#[from] tungstenite::Error),

#[error("json parse error")]
JsonParseError(#[from] serde_json::error::Error),

#[error("unexpected message format: {0}")]
UnexpectedMessageError(String),

#[error("request error: {0}")]
RequestError(String),
}

pub struct PubsubClientSubscription<T>
where
T: DeserializeOwned,
Expand Down
Loading

0 comments on commit 56e8975

Please sign in to comment.