Skip to content

Commit

Permalink
Separate TpuClient pubsub watcher from epoch refresh (solana-labs#2067
Browse files Browse the repository at this point in the history
)

Separate TpuClient pubsub watcher from epoch refresh
  • Loading branch information
cpubot authored Jul 19, 2024
1 parent 54935e7 commit 6e0e68f
Showing 1 changed file with 65 additions and 39 deletions.
104 changes: 65 additions & 39 deletions tpu-client/src/nonblocking/tpu_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use {
response::{RpcContactInfo, SlotUpdate},
},
solana_sdk::{
clock::Slot,
clock::{Slot, DEFAULT_MS_PER_SLOT},
commitment_config::CommitmentConfig,
epoch_info::EpochInfo,
pubkey::Pubkey,
Expand Down Expand Up @@ -822,48 +822,27 @@ impl LeaderTpuService {
pubsub_client: Option<PubsubClient>,
exit: Arc<AtomicBool>,
) -> Result<()> {
let (mut notifications, unsubscribe) = if let Some(pubsub_client) = &pubsub_client {
let (notifications, unsubscribe) = pubsub_client.slot_updates_subscribe().await?;
(Some(notifications), Some(unsubscribe))
} else {
(None, None)
};
tokio::try_join!(
Self::run_slot_watcher(recent_slots.clone(), pubsub_client, exit.clone()),
Self::run_cache_refresher(rpc_client, recent_slots, leader_tpu_cache, exit),
)?;

Ok(())
}

async fn run_cache_refresher(
rpc_client: Arc<RpcClient>,
recent_slots: RecentLeaderSlots,
leader_tpu_cache: Arc<RwLock<LeaderTpuCache>>,
exit: Arc<AtomicBool>,
) -> Result<()> {
let mut last_cluster_refresh = Instant::now();
let mut sleep_ms = 1000;
loop {
if exit.load(Ordering::Relaxed) {
if let Some(unsubscribe) = unsubscribe {
(unsubscribe)().await;
}
// `notifications` requires a valid reference to `pubsub_client`
// so `notifications` must be dropped before moving `pubsub_client`
drop(notifications);
if let Some(pubsub_client) = pubsub_client {
pubsub_client.shutdown().await.unwrap();
};
break;
}
let mut sleep_ms = DEFAULT_MS_PER_SLOT;

while !exit.load(Ordering::Relaxed) {
// Sleep a slot before checking if leader cache needs to be refreshed again
sleep(Duration::from_millis(sleep_ms)).await;
sleep_ms = 1000;

if let Some(notifications) = &mut notifications {
while let Ok(Some(update)) =
timeout(Duration::from_millis(10), notifications.next()).await
{
let current_slot = match update {
// This update indicates that a full slot was received by the connected
// node so we can stop sending transactions to the leader for that slot
SlotUpdate::Completed { slot, .. } => slot.saturating_add(1),
// This update indicates that we have just received the first shred from
// the leader for this slot and they are probably still accepting transactions.
SlotUpdate::FirstShredReceived { slot, .. } => slot,
_ => continue,
};
recent_slots.record_slot(current_slot);
}
}
sleep_ms = DEFAULT_MS_PER_SLOT;

let cache_update_info = maybe_fetch_cache_info(
&leader_tpu_cache,
Expand All @@ -885,6 +864,53 @@ impl LeaderTpuService {
}
}
}

Ok(())
}

async fn run_slot_watcher(
recent_slots: RecentLeaderSlots,
pubsub_client: Option<PubsubClient>,
exit: Arc<AtomicBool>,
) -> Result<()> {
let Some(pubsub_client) = pubsub_client else {
return Ok(());
};

let (mut notifications, unsubscribe) = pubsub_client.slot_updates_subscribe().await?;
// Time out slot update notification polling at 10ms.
//
// Rationale is two-fold:
// 1. Notifications are an unbounded stream -- polling them will block indefinitely if not
// interrupted, and the exit condition will never be checked. 10ms ensures negligible
// CPU overhead while keeping notification checking timely.
// 2. The timeout must be strictly less than the slot time (DEFAULT_MS_PER_SLOT: 400) to
// avoid timeout never being reached. For example, if notifications are received every
// 400ms and the timeout is >= 400ms, notifications may theoretically always be available
// before the timeout is reached, resulting in the exit condition never being checked.
const SLOT_UPDATE_TIMEOUT: Duration = Duration::from_millis(10);

while !exit.load(Ordering::Relaxed) {
while let Ok(Some(update)) = timeout(SLOT_UPDATE_TIMEOUT, notifications.next()).await {
let current_slot = match update {
// This update indicates that a full slot was received by the connected
// node so we can stop sending transactions to the leader for that slot
SlotUpdate::Completed { slot, .. } => slot.saturating_add(1),
// This update indicates that we have just received the first shred from
// the leader for this slot and they are probably still accepting transactions.
SlotUpdate::FirstShredReceived { slot, .. } => slot,
_ => continue,
};
recent_slots.record_slot(current_slot);
}
}

// `notifications` requires a valid reference to `pubsub_client`, so `notifications` must be
// dropped before moving `pubsub_client` via `shutdown()`.
drop(notifications);
unsubscribe().await;
pubsub_client.shutdown().await?;

Ok(())
}
}
Expand Down

0 comments on commit 6e0e68f

Please sign in to comment.