Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
esemeniuc committed Dec 18, 2024
1 parent 4c84f31 commit 922e5f0
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 62 deletions.
37 changes: 5 additions & 32 deletions proxy/src/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ use std::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, RwLock,
},
thread::{sleep, spawn, Builder, JoinHandle},
thread::{Builder, JoinHandle},
time::{Duration, SystemTime},
};

use arc_swap::ArcSwap;
use crossbeam_channel::{Receiver, RecvError, Sender, TrySendError};
use crossbeam_channel::{Receiver, RecvError};
use dashmap::DashMap;
use itertools::Itertools;
use jito_protos::trace_shred::TraceShred;
Expand Down Expand Up @@ -44,6 +44,7 @@ pub fn start_forwarder_threads(
num_threads: Option<usize>,
deduper: Arc<RwLock<Deduper<2, [u8]>>>,
metrics: Arc<ShredMetrics>,
forward_stats: Arc<StreamerReceiveStats>,
use_discovery_service: bool,
debug_trace_shred: bool,
shutdown_receiver: Receiver<()>,
Expand All @@ -64,30 +65,19 @@ pub fn start_forwarder_threads(
.enumerate()
.flat_map(|(thread_id, incoming_shred_socket)| {
let (packet_sender, packet_receiver) = crossbeam_channel::unbounded();
let stats = Arc::new(StreamerReceiveStats::new("shredstream_proxy-listen_thread"));
let listen_thread = streamer::receiver(
format!("ssListen{thread_id}"),
Arc::new(incoming_shred_socket),
exit.clone(),
packet_sender,
recycler.clone(),
stats.clone(),
forward_stats.clone(),
Duration::default(), // do not coalesce since batching consumes more cpu cycles and adds latency.
false,
None,
false,
);

let report_metrics_thread = {
let exit = exit.clone();
spawn(move || {
while !exit.load(Ordering::Relaxed) {
sleep(Duration::from_secs(1));
stats.report();
}
})
};

let deduper = deduper.clone();
let unioned_dest_sockets = unioned_dest_sockets.clone();
let metrics = metrics.clone();
Expand Down Expand Up @@ -139,7 +129,7 @@ pub fn start_forwarder_threads(
})
.unwrap();

[listen_thread, send_thread, report_metrics_thread]
[listen_thread, send_thread]
})
.collect::<Vec<JoinHandle<()>>>()
}
Expand Down Expand Up @@ -325,7 +315,6 @@ pub fn start_forwarder_accessory_thread(
deduper: Arc<RwLock<Deduper<2, [u8]>>>,
metrics: Arc<ShredMetrics>,
metrics_update_interval_ms: u64,
grpc_restart_signal: Sender<()>,
shutdown_receiver: Receiver<()>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
Expand All @@ -335,9 +324,7 @@ pub fn start_forwarder_accessory_thread(
let metrics_tick =
crossbeam_channel::tick(Duration::from_millis(metrics_update_interval_ms));
let deduper_reset_tick = crossbeam_channel::tick(Duration::from_secs(2));
let stale_connection_tick = crossbeam_channel::tick(Duration::from_secs(30));
let mut rng = rand::thread_rng();
let mut last_cumulative_received_shred_count = 0;
while !exit.load(Ordering::Relaxed) {
crossbeam_channel::select! {
// reset deduper to avoid false positives
Expand All @@ -354,20 +341,6 @@ pub fn start_forwarder_accessory_thread(
metrics.reset();
}

// handle scenario when grpc connection is open, but backend doesn't receive heartbeat
// possibly due to envoy losing track of the pod when backend restarts.
// we restart our grpc connection to work around the stale connection
recv(stale_connection_tick) -> _ => {
// if no shreds received, then restart
let new_received_count = metrics.agg_received_cumulative.load(Ordering::Relaxed);
if new_received_count == last_cumulative_received_shred_count {
if let Err(TrySendError::Disconnected(())) = grpc_restart_signal.try_send(()) {
panic!("Failed to send grpc restart signal, channel disconnected");
}
}
last_cumulative_received_shred_count = new_received_count;
}

// handle SIGINT shutdown
recv(shutdown_receiver) -> _ => {
break;
Expand Down
73 changes: 56 additions & 17 deletions proxy/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,37 @@ use tokio::runtime::Runtime;
use tonic::{codegen::InterceptedService, transport::Channel, Code};

use crate::{
forwarder::ShredMetrics,
token_authenticator::{create_grpc_channel, ClientInterceptor},
ShredstreamProxyError,
};
/*
This is a wrapper around AtomicBool that allows us to scope the lifetime of the AtomicBool to the heartbeat loop.
This is useful because we want to ensure that the AtomicBool is set to true when the heartbeat loop exits.
*/
struct ScopedAtomicBool {
inner: Arc<AtomicBool>,
}

impl ScopedAtomicBool {
fn get_inner_clone(&self) -> Arc<AtomicBool> {
self.inner.clone()
}
}

impl Default for ScopedAtomicBool {
fn default() -> Self {
Self {
inner: Arc::new(AtomicBool::new(false)),
}
}
}

impl Drop for ScopedAtomicBool {
fn drop(&mut self) {
self.inner.store(true, Ordering::Relaxed);
}
}

#[allow(clippy::too_many_arguments)]
pub fn heartbeat_loop_thread(
Expand All @@ -33,7 +61,7 @@ pub fn heartbeat_loop_thread(
recv_socket: SocketAddr,
runtime: Runtime,
service_name: String,
grpc_restart_signal: Receiver<()>,
metrics: Arc<ShredMetrics>,
shutdown_receiver: Receiver<()>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
Expand All @@ -43,9 +71,10 @@ pub fn heartbeat_loop_thread(
port: recv_socket.port() as i64,
};
let mut heartbeat_interval = Duration::from_secs(1); //start with 1s, change based on server suggestion
// use tick() since we want to avoid thread::sleep(), as it's not interruptable. want to be interruptable for exiting quickly
// use tick() since we want to avoid thread::sleep(), as it's not interruptible. want to be interruptible for exiting quickly
let mut heartbeat_tick = crossbeam_channel::tick(heartbeat_interval);
let metrics_tick = crossbeam_channel::tick(Duration::from_secs(30));
let mut last_cumulative_received_shred_count = 0;
let mut client_restart_count = 0u64;
let mut successful_heartbeat_count = 0u64;
let mut failed_heartbeat_count = 0u64;
Expand All @@ -54,14 +83,16 @@ pub fn heartbeat_loop_thread(
let mut failed_heartbeat_count_cumulative = 0u64;

while !exit.load(Ordering::Relaxed) {
// We want to scope the grpc shredstream client to the heartbeat loop. This way shredstream client exits when the heartbeat loop exits
let per_con_exit = ScopedAtomicBool::default();
info!("Starting heartbeat client");
let shredstream_client_res = runtime.block_on(
get_grpc_client(
block_engine_url.clone(),
auth_url.clone(),
auth_keypair.clone(),
service_name.clone(),
exit.clone()
per_con_exit.get_inner_clone(),
)
);
let (mut shredstream_client , refresh_thread_hdl) = match shredstream_client_res {
Expand Down Expand Up @@ -116,7 +147,8 @@ pub fn heartbeat_loop_thread(
}
}
}
// send metrics

// send metrics and handle grpc connection failing
recv(metrics_tick) -> _ => {
datapoint_info!(
"shredstream_proxy-heartbeat_stats",
Expand All @@ -125,26 +157,33 @@ pub fn heartbeat_loop_thread(
("failed_heartbeat_count", failed_heartbeat_count, i64),
("client_restart_count", client_restart_count, i64),
);

// handle scenario when grpc connection is open, but backend doesn't receive heartbeat
// possibly due to envoy losing track of the pod when backend restarts.
// we restart our grpc connection to work around the stale connection
// if no shreds received, then restart
let new_received_count = metrics.agg_received_cumulative.load(Ordering::Relaxed);
if new_received_count == last_cumulative_received_shred_count {
warn!("No shreds received recently, restarting heartbeat client.");
datapoint_warn!(
"shredstream_proxy-heartbeat_restart_signal",
"block_engine_url" => block_engine_url,
("desired_regions", format!("{desired_regions:?}"), String),
);
refresh_thread_hdl.abort();
break;
}
last_cumulative_received_shred_count = new_received_count;


successful_heartbeat_count_cumulative += successful_heartbeat_count;
failed_heartbeat_count_cumulative += failed_heartbeat_count;
client_restart_count_cumulative += client_restart_count;
successful_heartbeat_count = 0;
failed_heartbeat_count = 0;
client_restart_count = 0;
}
// restart grpc client if no shreds received
recv(grpc_restart_signal) -> _ => {
refresh_thread_hdl.abort();
warn!("No shreds received recently, restarting heartbeat client.");
datapoint_warn!(
"shredstream_proxy-heartbeat_restart_signal",
"block_engine_url" => block_engine_url,
("desired_regions", format!("{desired_regions:?}"), String),
("client_restart_count", client_restart_count, i64),
);
// exit should be false
break;
}

// handle SIGINT shutdown
recv(shutdown_receiver) -> _ => {
// exit should be true
Expand Down
34 changes: 21 additions & 13 deletions proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
Arc, RwLock,
},
thread,
thread::{sleep, JoinHandle},
thread::{sleep, spawn, JoinHandle},
time::Duration,
};

Expand All @@ -23,6 +23,7 @@ use solana_client::client_error::{reqwest, ClientError};
use solana_metrics::set_host_id;
use solana_perf::deduper::Deduper;
use solana_sdk::signature::read_keypair_file;
use solana_streamer::streamer::StreamerReceiveStats;
use thiserror::Error;
use tokio::runtime::Runtime;
use tonic::Status;
Expand Down Expand Up @@ -226,17 +227,13 @@ fn main() -> Result<(), ShredstreamProxyError> {
}));
}

let metrics = Arc::new(ShredMetrics::new());

let runtime = Runtime::new()?;
let (grpc_restart_signal_s, grpc_restart_signal_r) = crossbeam_channel::bounded(1);
let mut thread_handles = vec![];
if let ProxySubcommands::Shredstream(args) = shredstream_args {
let heartbeat_hdl = start_heartbeat(
args,
&exit,
&shutdown_receiver,
runtime,
grpc_restart_signal_r,
);
let heartbeat_hdl =
start_heartbeat(args, &exit, &shutdown_receiver, runtime, metrics.clone());
thread_handles.push(heartbeat_hdl);
}

Expand All @@ -255,7 +252,7 @@ fn main() -> Result<(), ShredstreamProxyError> {
forwarder::DEDUPER_NUM_BITS,
)));

let metrics = Arc::new(ShredMetrics::new());
let forward_stats = Arc::new(StreamerReceiveStats::new("shredstream_proxy-listen_thread"));
let use_discovery_service =
args.endpoint_discovery_url.is_some() && args.discovered_endpoints_port.is_some();
let forwarder_hdls = forwarder::start_forwarder_threads(
Expand All @@ -265,18 +262,29 @@ fn main() -> Result<(), ShredstreamProxyError> {
args.num_threads,
deduper.clone(),
metrics.clone(),
forward_stats.clone(),
use_discovery_service,
args.debug_trace_shred,
shutdown_receiver.clone(),
exit.clone(),
);
thread_handles.extend(forwarder_hdls);

let report_metrics_thread = {
let exit = exit.clone();
spawn(move || {
while !exit.load(Ordering::Relaxed) {
sleep(Duration::from_secs(1));
forward_stats.report();
}
})
};
thread_handles.push(report_metrics_thread);

let metrics_hdl = forwarder::start_forwarder_accessory_thread(
deduper,
metrics.clone(),
args.metrics_report_interval_ms,
grpc_restart_signal_s,
shutdown_receiver.clone(),
exit.clone(),
);
Expand Down Expand Up @@ -319,7 +327,7 @@ fn start_heartbeat(
exit: &Arc<AtomicBool>,
shutdown_receiver: &Receiver<()>,
runtime: Runtime,
grpc_restart_signal_r: Receiver<()>,
metrics: Arc<ShredMetrics>,
) -> JoinHandle<()> {
let auth_keypair = Arc::new(
read_keypair_file(Path::new(&args.auth_keypair)).unwrap_or_else(|e| {
Expand All @@ -343,7 +351,7 @@ fn start_heartbeat(
),
runtime,
"shredstream_proxy".to_string(),
grpc_restart_signal_r,
metrics,
shutdown_receiver.clone(),
exit.clone(),
)
Expand Down

0 comments on commit 922e5f0

Please sign in to comment.