Skip to content

Commit

Permalink
[JIT-1699] Add Exponential Backoff, upgrade to 1.16 (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
esemeniuc authored Oct 23, 2023
1 parent 5eccd79 commit d57b3e2
Show file tree
Hide file tree
Showing 8 changed files with 1,672 additions and 1,009 deletions.
2,530 changes: 1,589 additions & 941 deletions Cargo.lock

Large diffs are not rendered by default.

17 changes: 9 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
[package]
name = "jito-shredstream-proxy"
version = "0.1.4"
version = "0.1.5"
description = "Fast path to receive shreds from Jito, forwarding to local consumers. See https://jito-labs.gitbook.io/mev/searcher-services/shredstream for details."
authors = ["Jito Team <[email protected]>"]
homepage = "https://jito.wtf/"
edition = "2021"

[dependencies]
arc-swap = "1.6"
backon = "0.4"
clap = { version = "4", features = ["derive", "env"] }
crossbeam-channel = "0.5.8"
env_logger = "0.10.0"
Expand All @@ -22,12 +23,12 @@ rand_07 = { package = "rand", version = "~0.7" }
reqwest = { version = "0.11", features = ["blocking", "json"] }
serde_json = "1"
signal-hook = "0.3"
solana-client = "~1.14"
solana-metrics = "~1.14"
solana-net-utils = "~1.14"
solana-perf = "~1.14"
solana-sdk = "~1.14"
solana-streamer = "~1.14"
solana-client = "~1.16"
solana-metrics = "~1.16"
solana-net-utils = "~1.16"
solana-perf = "~1.16"
solana-sdk = "~1.16"
solana-streamer = "~1.16"
thiserror = "1.0.40"
tokio = "1"
tonic = { version = "0.9.2", features = ["tls", "tls-roots", "tls-webpki-roots"] }
tonic = { version = "0.9", features = ["tls", "tls-roots", "tls-webpki-roots"] }
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# syntax=docker/dockerfile:1.4.0
FROM --platform=linux/amd64 rust:1.64-slim-bullseye as builder
FROM --platform=linux/amd64 rust:1.72-slim-bullseye as builder

RUN apt-get -qq update && apt-get install -qq -y ca-certificates libssl-dev protobuf-compiler pkg-config
RUN rustup component add rustfmt && update-ca-certificates
Expand Down
2 changes: 1 addition & 1 deletion p
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
set -eux

# Some container vars
TAG=${USER}-dev
TAG=${TAG:-${USER}-dev} # READ tag in from env var, defaulting to ${USER}-dev
ORG="jitolabs"

DOCKER_BUILDKIT=1 docker build -t "$ORG/jito-shredstream-proxy:${TAG}" .
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "1.64.0"
channel = "1.72"
components = [ "rustfmt", "rustc-dev", "clippy", "cargo", "cargo-sort" ]
57 changes: 28 additions & 29 deletions src/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use log::{debug, error, info, warn};
use prost::Message;
use solana_metrics::{datapoint_info, datapoint_warn};
use solana_perf::{
deduper::Deduper,
packet::{PacketBatch, PacketBatchRecycler},
recycler::Recycler,
sigverify::Deduper,
};
use solana_streamer::{
sendmmsg::{batch_send, SendPktsError},
Expand Down Expand Up @@ -69,7 +69,7 @@ pub fn start_forwarder_threads(
packet_sender,
recycler.clone(),
Arc::new(StreamerReceiveStats::new("shredstream_proxy-listen_thread")),
0, // do not coalesce since batching consumes more cpu cycles and adds latency.
Duration::default(), // do not coalesce since batching consumes more cpu cycles and adds latency.
true,
None,
);
Expand All @@ -81,7 +81,7 @@ pub fn start_forwarder_threads(
let exit = exit.clone();

let send_thread = Builder::new()
.name(format!("shredstream_proxy-send_thread_{thread_id}"))
.name(format!("ssPxyTx_{thread_id}"))
.spawn(move || {
let send_socket =
UdpSocket::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0))
Expand Down Expand Up @@ -151,11 +151,11 @@ fn recv_from_channel_and_send_multiple_dest(
debug!(
"Got batch of {} packets, total size in bytes: {}",
packet_batch.len(),
packet_batch.iter().map(|x| x.meta.size).sum::<usize>()
packet_batch.iter().map(|x| x.meta().size).sum::<usize>()
);
let mut packet_batch_vec = vec![packet_batch];

let num_deduped = solana_perf::sigverify::dedup_packets_and_count_discards(
let num_deduped = solana_perf::deduper::dedup_packets_and_count_discards(
&deduper.read().unwrap(),
&mut packet_batch_vec,
|_received_packet, _is_already_marked_as_discard, _is_dup| {},
Expand Down Expand Up @@ -185,18 +185,18 @@ fn recv_from_channel_and_send_multiple_dest(
packet_batch_vec[0]
.iter()
.filter_map(|p| TraceShred::decode(p.data(..)?).ok())
.filter_map(|t| {
let created_at = SystemTime::try_from(t.created_at?);
Some((t.region, t.seq_num, created_at.ok()?))
})
.for_each(|(region, seq_num, created_at)| {
if let Ok(elapsed) = trace_shred_received_time.duration_since(created_at) {
datapoint_info!("shredstream_proxy-trace_shred_latency",
"trace_region" => region,
("trace_seq_num", seq_num, i64),
("elapsed_micros", elapsed.as_micros(), i64),
);
}
.filter(|t| t.created_at.is_some())
.for_each(|trace_shred| {
let elapsed = trace_shred_received_time
.duration_since(SystemTime::try_from(trace_shred.created_at.unwrap()).unwrap())
.unwrap_or_default();

datapoint_info!(
"shredstream_proxy-trace_shred_latency",
"trace_region" => trace_shred.region,
("trace_seq_num", trace_shred.seq_num as i64, i64),
("elapsed_micros", elapsed.as_micros(), i64),
);
});
}
Ok(())
Expand All @@ -211,7 +211,7 @@ pub fn start_destination_refresh_thread(
shutdown_receiver: Receiver<()>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
Builder::new().name("shredstream_proxy-destination_refresh_thread".to_string()).spawn(move || {
Builder::new().name("ssPxyDstRefresh".to_string()).spawn(move || {
let fetch_socket_tick = crossbeam_channel::tick(Duration::from_secs(30));
let metrics_tick = crossbeam_channel::tick(Duration::from_secs(30));
let mut socket_count = static_dest_sockets.len();
Expand Down Expand Up @@ -300,7 +300,7 @@ pub fn start_forwarder_accessory_thread(
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
Builder::new()
.name("shredstream_proxy-accessory_thread".to_string())
.name("ssPxyAccessory".to_string())
.spawn(move || {
let metrics_tick =
crossbeam_channel::tick(Duration::from_millis(metrics_update_interval_ms));
Expand All @@ -326,7 +326,7 @@ pub fn start_forwarder_accessory_thread(

// handle scenario when grpc connection is open, but backend doesn't receive heartbeat
// possibly due to envoy losing track of the pod when backend restarts.
// we restart our grpc connection work around the stale connection
// we restart our grpc connection to work around the stale connection
recv(stale_connection_tick) -> _ => {
// if no shreds received, then restart
let new_received_count = metrics.agg_received_cumulative.load(Ordering::Relaxed);
Expand Down Expand Up @@ -431,7 +431,10 @@ mod tests {
time::Duration,
};

use solana_perf::packet::{Meta, Packet, PacketBatch};
use solana_perf::{
deduper::Deduper,
packet::{Meta, Packet, PacketBatch},
};
use solana_sdk::packet::{PacketFlags, PACKET_DATA_SIZE};

use crate::forwarder::{recv_from_channel_and_send_multiple_dest, ShredMetrics};
Expand All @@ -454,7 +457,6 @@ mod tests {
addr: IpAddr::V4(Ipv4Addr::UNSPECIFIED),
port: 48289, // received on random port
flags: PacketFlags::empty(),
sender_stake: 0,
},
),
Packet::new(
Expand All @@ -464,7 +466,6 @@ mod tests {
addr: IpAddr::V4(Ipv4Addr::UNSPECIFIED),
port: 9999,
flags: PacketFlags::empty(),
sender_stake: 0,
},
),
]);
Expand Down Expand Up @@ -503,12 +504,10 @@ mod tests {
// send packets
recv_from_channel_and_send_multiple_dest(
packet_receiver.recv(),
&Arc::new(RwLock::new(
solana_perf::sigverify::Deduper::<2, [u8]>::new(
&mut rand_07::thread_rng(),
crate::forwarder::DEDUPER_NUM_BITS,
),
)),
&Arc::new(RwLock::new(Deduper::<2, [u8]>::new(
&mut rand_07::thread_rng(),
crate::forwarder::DEDUPER_NUM_BITS,
))),
&udp_sender,
&Arc::new(dest_socketaddrs),
false,
Expand Down
60 changes: 38 additions & 22 deletions src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use std::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{sleep, Builder, JoinHandle},
thread::{Builder, JoinHandle},
time::Duration,
};

use backon::{ExponentialBuilder, Retryable};
use crossbeam_channel::Receiver;
use jito_protos::{
auth::{auth_service_client::AuthServiceClient, Role},
Expand Down Expand Up @@ -37,8 +38,7 @@ pub fn heartbeat_loop_thread(
shutdown_receiver: Receiver<()>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
let auth_keypair = auth_keypair;
Builder::new().name("shredstream_proxy-heartbeat_loop_thread".to_string()).spawn(move || {
Builder::new().name("ssPxyHbeatLoop".to_string()).spawn(move || {
let heartbeat_socket = jito_protos::shared::Socket {
ip: recv_socket.ip().to_string(),
port: recv_socket.port() as i64,
Expand All @@ -56,9 +56,24 @@ pub fn heartbeat_loop_thread(

while !exit.load(Ordering::Relaxed) {
info!("Starting heartbeat client");
let shredstream_client = runtime.block_on(get_grpc_client(block_engine_url.clone(), auth_url.clone(), auth_keypair.clone(), service_name.clone(),exit.clone()));

let (mut shredstream_client , refresh_thread_hdl) = match shredstream_client {
let shredstream_client_res = runtime.block_on(
(|| {
get_grpc_client(
block_engine_url.clone(),
auth_url.clone(),
auth_keypair.clone(),
service_name.clone(),
exit.clone()
)
})
.retry(
&ExponentialBuilder::default()
.with_max_times(100)
.with_min_delay(failed_heartbeat_interval)
.with_max_delay(Duration::from_secs(5 * 60))
)
);
let (mut shredstream_client , refresh_thread_hdl) = match shredstream_client_res {
Ok(c) => c,
Err(e) => {
warn!("Failed to connect to block engine, retrying. Error: {e}");
Expand All @@ -69,9 +84,7 @@ pub fn heartbeat_loop_thread(
("errors", 1, i64),
("error_str", e.to_string(), String),
);
if !exit.load(Ordering::Relaxed) {
sleep(failed_heartbeat_interval);
}

continue; // avoid sending heartbeat, try acquiring grpc client again
}
};
Expand Down Expand Up @@ -103,22 +116,24 @@ pub fn heartbeat_loop_thread(
panic!("Invalid arguments: {err}.");
};
warn!("Error sending heartbeat: {err}");
datapoint_warn!("shredstream_proxy-heartbeat_send_error",
"block_engine_url" => block_engine_url,
("errors", 1, i64),
("error_str", err.to_string(), String),
datapoint_warn!(
"shredstream_proxy-heartbeat_send_error",
"block_engine_url" => block_engine_url,
("errors", 1, i64),
("error_str", err.to_string(), String),
);
failed_heartbeat_count += 1;
}
}
}
// send metrics
recv(metrics_tick) -> _ => {
datapoint_info!("shredstream_proxy-heartbeat_stats",
"block_engine_url" => block_engine_url,
("successful_heartbeat_count", successful_heartbeat_count, i64),
("failed_heartbeat_count", failed_heartbeat_count, i64),
("client_restart_count", client_restart_count, i64),
datapoint_info!(
"shredstream_proxy-heartbeat_stats",
"block_engine_url" => block_engine_url,
("successful_heartbeat_count", successful_heartbeat_count, i64),
("failed_heartbeat_count", failed_heartbeat_count, i64),
("client_restart_count", client_restart_count, i64),
);
successful_heartbeat_count_cumulative += successful_heartbeat_count;
failed_heartbeat_count_cumulative += failed_heartbeat_count;
Expand All @@ -131,10 +146,11 @@ pub fn heartbeat_loop_thread(
recv(grpc_restart_signal) -> _ => {
refresh_thread_hdl.abort();
warn!("No shreds received recently, restarting heartbeat client.");
datapoint_warn!("shredstream_proxy-heartbeat_restart_signal",
"block_engine_url" => block_engine_url,
("desired_regions", format!("{desired_regions:?}"), String),
("client_restart_count", client_restart_count, i64),
datapoint_warn!(
"shredstream_proxy-heartbeat_restart_signal",
"block_engine_url" => block_engine_url,
("desired_regions", format!("{desired_regions:?}"), String),
("client_restart_count", client_restart_count, i64),
);
// exit should be false
break;
Expand Down
11 changes: 5 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use log::*;
use signal_hook::consts::{SIGINT, SIGTERM};
use solana_client::client_error::{reqwest, ClientError};
use solana_metrics::set_host_id;
use solana_perf::deduper::Deduper;
use solana_sdk::signature::read_keypair_file;
use thiserror::Error;
use tokio::runtime::Runtime;
Expand Down Expand Up @@ -249,12 +250,10 @@ fn main() -> Result<(), ShredstreamProxyError> {

// share deduper + metrics between forwarder <-> accessory thread
// use mutex since metrics are write heavy. cheaper than rwlock
let deduper = Arc::new(RwLock::new(
solana_perf::sigverify::Deduper::<2, [u8]>::new(
&mut rand_07::thread_rng(),
forwarder::DEDUPER_NUM_BITS,
),
));
let deduper = Arc::new(RwLock::new(Deduper::<2, [u8]>::new(
&mut rand_07::thread_rng(),
forwarder::DEDUPER_NUM_BITS,
)));

let metrics = Arc::new(ShredMetrics::new());
let use_discovery_service =
Expand Down

0 comments on commit d57b3e2

Please sign in to comment.