Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log notification capacity info for wss #2

Open
wants to merge 6 commits into
base: v1.18.22-helius
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion rpc/src/rpc_pubsub_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use {
},
stream_cancel::{Trigger, Tripwire},
thiserror::Error,
tokio::time::Instant,
tokio::{net::TcpStream, pin, select, sync::broadcast},
tokio_util::compat::TokioAsyncReadCompatExt,
};
Expand Down Expand Up @@ -399,11 +400,13 @@ async fn handle_connection(
Err(err) => return Err(err.into()),
},
result = broadcast_receiver.recv() => {

let time = Instant::now();
// In both possible error cases (closed or lagged) we disconnect the client.
if let Some(json) = broadcast_handler.handle(result?)? {
sender.send_text(&*json).await?;
}
let send_time = time.elapsed().as_micros();
datapoint_info!("rpc-pubsub-broadcast-receive-send-us", ("send_time", send_time, i64));
},
_ = &mut tripwire => {
warn!("disconnecting websocket client: shutting down");
Expand Down
131 changes: 73 additions & 58 deletions rpc/src/rpc_subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,62 +208,62 @@ struct RpcNotificationContext {

const RPC_NOTIFICATIONS_METRICS_SUBMISSION_INTERVAL_MS: Duration = Duration::from_millis(2_000);

struct RecentItems {
queue: VecDeque<Arc<String>>,
total_bytes: usize,
max_len: usize,
max_total_bytes: usize,
last_metrics_submission: Instant,
}

impl RecentItems {
fn new(max_len: usize, max_total_bytes: usize) -> Self {
Self {
queue: VecDeque::new(),
total_bytes: 0,
max_len,
max_total_bytes,
last_metrics_submission: Instant::now(),
}
}

fn push(&mut self, item: Arc<String>) {
self.total_bytes = self
.total_bytes
.checked_add(item.len())
.expect("total bytes overflow");
self.queue.push_back(item);

while self.total_bytes > self.max_total_bytes || self.queue.len() > self.max_len {
let item = self.queue.pop_front().expect("can't be empty");
self.total_bytes = self
.total_bytes
.checked_sub(item.len())
.expect("total bytes underflow");
}

let now = Instant::now();
let last_metrics_ago = now.duration_since(self.last_metrics_submission);
if last_metrics_ago > RPC_NOTIFICATIONS_METRICS_SUBMISSION_INTERVAL_MS {
datapoint_info!(
"rpc_subscriptions_recent_items",
("num", self.queue.len(), i64),
("total_bytes", self.total_bytes, i64),
);
self.last_metrics_submission = now;
} else {
trace!(
"rpc_subscriptions_recent_items num={} total_bytes={}",
self.queue.len(),
self.total_bytes,
);
}
}
}
// struct RecentItems {
// queue: VecDeque<Arc<String>>,
// total_bytes: usize,
// max_len: usize,
// max_total_bytes: usize,
// last_metrics_submission: Instant,
// }

// impl RecentItems {
// fn new(max_len: usize, max_total_bytes: usize) -> Self {
// Self {
// queue: VecDeque::new(),
// total_bytes: 0,
// max_len,
// max_total_bytes,
// last_metrics_submission: Instant::now(),
// }
// }

// fn push(&mut self, item: Arc<String>) {
// self.total_bytes = self
// .total_bytes
// .checked_add(item.len())
// .expect("total bytes overflow");
// self.queue.push_back(item);

// while self.total_bytes > self.max_total_bytes || self.queue.len() > self.max_len {
// let item = self.queue.pop_front().expect("can't be empty");
// self.total_bytes = self
// .total_bytes
// .checked_sub(item.len())
// .expect("total bytes underflow");
// }

// let now = Instant::now();
// let last_metrics_ago = now.duration_since(self.last_metrics_submission);
// if last_metrics_ago > RPC_NOTIFICATIONS_METRICS_SUBMISSION_INTERVAL_MS {
// datapoint_info!(
// "rpc_subscriptions_recent_items",
// ("num", self.queue.len(), i64),
// ("total_bytes", self.total_bytes, i64),
// );
// self.last_metrics_submission = now;
// } else {
// trace!(
// "rpc_subscriptions_recent_items num={} total_bytes={}",
// self.queue.len(),
// self.total_bytes,
// );
// }
// }
// }

struct RpcNotifier {
sender: broadcast::Sender<RpcNotification>,
recent_items: Mutex<RecentItems>,
// recent_items: Mutex<RecentItems>,
}

thread_local! {
Expand Down Expand Up @@ -313,12 +313,18 @@ impl RpcNotifier {
};
// There is an unlikely case where this can fail: if the last subscription is closed
// just as the notifier generates a notification for it.
let time = Instant::now();
let _ = self.sender.send(notification);
let send_time = time.elapsed().as_micros();
datapoint_info!(
"rpc-pubsub-broadcast-send-time-us",
("send_time", send_time, i64)
);

inc_new_counter_info!("rpc-pubsub-messages", 1);
inc_new_counter_info!("rpc-pubsub-bytes", buf_arc.len());

self.recent_items.lock().unwrap().push(buf_arc);
// self.recent_items.lock().unwrap().push(buf_arc);
}
}

Expand Down Expand Up @@ -626,10 +632,10 @@ impl RpcSubscriptions {

let notifier = RpcNotifier {
sender: broadcast_sender.clone(),
recent_items: Mutex::new(RecentItems::new(
config.queue_capacity_items,
config.queue_capacity_bytes,
)),
// recent_items: Mutex::new(RecentItems::new(
// config.queue_capacity_items,
// config.queue_capacity_bytes,
// )),
};
let notification_threads = config.notification_threads.unwrap_or_else(get_thread_count);
let t_cleanup = if notification_threads == 0 {
Expand Down Expand Up @@ -757,6 +763,15 @@ impl RpcSubscriptions {

fn enqueue_notification(&self, notification_entry: NotificationEntry) {
if let Some(ref notification_sender) = self.notification_sender {
let queue_size = notification_sender.len() as i64;
let max_capacity = notification_sender.capacity().unwrap_or(1_123_456) as i64;
datapoint_info!(
"rpc_pubsub_queue",
("size", queue_size, i64),
("remaining_capacity", max_capacity - queue_size, i64),
("capacity", max_capacity, i64)
);

match notification_sender.send(notification_entry.into()) {
Ok(()) => (),
Err(SendError(notification)) => {
Expand Down
4 changes: 2 additions & 2 deletions sdk/program/src/serde_varint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ macro_rules! impl_var_int {
let mut shift = 0u32;
while shift < <$type>::BITS {
let Some(byte) = seq.next_element::<u8>()? else {
return Err(A::Error::custom("Invalid Sequence"));
};
return Err(A::Error::custom("Invalid Sequence"));
};
out |= ((byte & 0x7F) as Self) << shift;
if byte & 0x80 == 0 {
// Last byte should not have been truncated when it was
Expand Down