Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JIT-1699] Add Exponential Backoff, upgrade to 1.16 #18

Merged
merged 5 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,530 changes: 1,589 additions & 941 deletions Cargo.lock

Large diffs are not rendered by default.

17 changes: 9 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
[package]
name = "jito-shredstream-proxy"
version = "0.1.4"
version = "0.1.5"
description = "Fast path to receive shreds from Jito, forwarding to local consumers. See https://jito-labs.gitbook.io/mev/searcher-services/shredstream for details."
authors = ["Jito Team <[email protected]>"]
homepage = "https://jito.wtf/"
edition = "2021"

[dependencies]
arc-swap = "1.6"
backon = "0.4"
clap = { version = "4", features = ["derive", "env"] }
crossbeam-channel = "0.5.8"
env_logger = "0.10.0"
Expand All @@ -22,12 +23,12 @@ rand_07 = { package = "rand", version = "~0.7" }
reqwest = { version = "0.11", features = ["blocking", "json"] }
serde_json = "1"
signal-hook = "0.3"
solana-client = "~1.14"
solana-metrics = "~1.14"
solana-net-utils = "~1.14"
solana-perf = "~1.14"
solana-sdk = "~1.14"
solana-streamer = "~1.14"
solana-client = "~1.16"
solana-metrics = "~1.16"
solana-net-utils = "~1.16"
solana-perf = "~1.16"
solana-sdk = "~1.16"
solana-streamer = "~1.16"
thiserror = "1.0.40"
tokio = "1"
tonic = { version = "0.9.2", features = ["tls", "tls-roots", "tls-webpki-roots"] }
tonic = { version = "0.9", features = ["tls", "tls-roots", "tls-webpki-roots"] }
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# syntax=docker/dockerfile:1.4.0
FROM --platform=linux/amd64 rust:1.64-slim-bullseye as builder
FROM --platform=linux/amd64 rust:1.72-slim-bullseye as builder

RUN apt-get -qq update && apt-get install -qq -y ca-certificates libssl-dev protobuf-compiler pkg-config
RUN rustup component add rustfmt && update-ca-certificates
Expand Down
2 changes: 1 addition & 1 deletion p
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
set -eux

# Some container vars
TAG=${USER}-dev
TAG=${TAG:-${USER}-dev} # READ tag in from env var, defaulting to ${USER}-dev
ORG="jitolabs"

DOCKER_BUILDKIT=1 docker build -t "$ORG/jito-shredstream-proxy:${TAG}" .
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "1.64.0"
channel = "1.72"
components = [ "rustfmt", "rustc-dev", "clippy", "cargo", "cargo-sort" ]
57 changes: 28 additions & 29 deletions src/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use log::{debug, error, info, warn};
use prost::Message;
use solana_metrics::{datapoint_info, datapoint_warn};
use solana_perf::{
deduper::Deduper,
packet::{PacketBatch, PacketBatchRecycler},
recycler::Recycler,
sigverify::Deduper,
};
use solana_streamer::{
sendmmsg::{batch_send, SendPktsError},
Expand Down Expand Up @@ -69,7 +69,7 @@ pub fn start_forwarder_threads(
packet_sender,
recycler.clone(),
Arc::new(StreamerReceiveStats::new("shredstream_proxy-listen_thread")),
0, // do not coalesce since batching consumes more cpu cycles and adds latency.
Duration::default(), // do not coalesce since batching consumes more cpu cycles and adds latency.
true,
None,
);
Expand All @@ -81,7 +81,7 @@ pub fn start_forwarder_threads(
let exit = exit.clone();

let send_thread = Builder::new()
.name(format!("shredstream_proxy-send_thread_{thread_id}"))
.name(format!("ssPxyTx_{thread_id}"))
.spawn(move || {
let send_socket =
UdpSocket::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0))
Expand Down Expand Up @@ -151,11 +151,11 @@ fn recv_from_channel_and_send_multiple_dest(
debug!(
"Got batch of {} packets, total size in bytes: {}",
packet_batch.len(),
packet_batch.iter().map(|x| x.meta.size).sum::<usize>()
packet_batch.iter().map(|x| x.meta().size).sum::<usize>()
);
let mut packet_batch_vec = vec![packet_batch];

let num_deduped = solana_perf::sigverify::dedup_packets_and_count_discards(
let num_deduped = solana_perf::deduper::dedup_packets_and_count_discards(
&deduper.read().unwrap(),
&mut packet_batch_vec,
|_received_packet, _is_already_marked_as_discard, _is_dup| {},
Expand Down Expand Up @@ -185,18 +185,18 @@ fn recv_from_channel_and_send_multiple_dest(
packet_batch_vec[0]
.iter()
.filter_map(|p| TraceShred::decode(p.data(..)?).ok())
.filter_map(|t| {
let created_at = SystemTime::try_from(t.created_at?);
Some((t.region, t.seq_num, created_at.ok()?))
})
.for_each(|(region, seq_num, created_at)| {
if let Ok(elapsed) = trace_shred_received_time.duration_since(created_at) {
datapoint_info!("shredstream_proxy-trace_shred_latency",
"trace_region" => region,
("trace_seq_num", seq_num, i64),
("elapsed_micros", elapsed.as_micros(), i64),
);
}
.filter(|t| t.created_at.is_some())
.for_each(|trace_shred| {
let elapsed = trace_shred_received_time
.duration_since(SystemTime::try_from(trace_shred.created_at.unwrap()).unwrap())
.unwrap_or_default();

datapoint_info!(
"shredstream_proxy-trace_shred_latency",
"trace_region" => trace_shred.region,
("trace_seq_num", trace_shred.seq_num as i64, i64),
("elapsed_micros", elapsed.as_micros(), i64),
);
});
}
Ok(())
Expand All @@ -211,7 +211,7 @@ pub fn start_destination_refresh_thread(
shutdown_receiver: Receiver<()>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
Builder::new().name("shredstream_proxy-destination_refresh_thread".to_string()).spawn(move || {
Builder::new().name("ssPxyDstRefresh".to_string()).spawn(move || {
let fetch_socket_tick = crossbeam_channel::tick(Duration::from_secs(30));
let metrics_tick = crossbeam_channel::tick(Duration::from_secs(30));
let mut socket_count = static_dest_sockets.len();
Expand Down Expand Up @@ -300,7 +300,7 @@ pub fn start_forwarder_accessory_thread(
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
Builder::new()
.name("shredstream_proxy-accessory_thread".to_string())
.name("ssPxyAccessory".to_string())
.spawn(move || {
let metrics_tick =
crossbeam_channel::tick(Duration::from_millis(metrics_update_interval_ms));
Expand All @@ -326,7 +326,7 @@ pub fn start_forwarder_accessory_thread(

// handle scenario when grpc connection is open, but backend doesn't receive heartbeat
// possibly due to envoy losing track of the pod when backend restarts.
// we restart our grpc connection work around the stale connection
// we restart our grpc connection to work around the stale connection
recv(stale_connection_tick) -> _ => {
// if no shreds received, then restart
let new_received_count = metrics.agg_received_cumulative.load(Ordering::Relaxed);
Expand Down Expand Up @@ -431,7 +431,10 @@ mod tests {
time::Duration,
};

use solana_perf::packet::{Meta, Packet, PacketBatch};
use solana_perf::{
deduper::Deduper,
packet::{Meta, Packet, PacketBatch},
};
use solana_sdk::packet::{PacketFlags, PACKET_DATA_SIZE};

use crate::forwarder::{recv_from_channel_and_send_multiple_dest, ShredMetrics};
Expand All @@ -454,7 +457,6 @@ mod tests {
addr: IpAddr::V4(Ipv4Addr::UNSPECIFIED),
port: 48289, // received on random port
flags: PacketFlags::empty(),
sender_stake: 0,
},
),
Packet::new(
Expand All @@ -464,7 +466,6 @@ mod tests {
addr: IpAddr::V4(Ipv4Addr::UNSPECIFIED),
port: 9999,
flags: PacketFlags::empty(),
sender_stake: 0,
},
),
]);
Expand Down Expand Up @@ -503,12 +504,10 @@ mod tests {
// send packets
recv_from_channel_and_send_multiple_dest(
packet_receiver.recv(),
&Arc::new(RwLock::new(
solana_perf::sigverify::Deduper::<2, [u8]>::new(
&mut rand_07::thread_rng(),
crate::forwarder::DEDUPER_NUM_BITS,
),
)),
&Arc::new(RwLock::new(Deduper::<2, [u8]>::new(
&mut rand_07::thread_rng(),
crate::forwarder::DEDUPER_NUM_BITS,
))),
&udp_sender,
&Arc::new(dest_socketaddrs),
false,
Expand Down
60 changes: 38 additions & 22 deletions src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use std::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{sleep, Builder, JoinHandle},
thread::{Builder, JoinHandle},
time::Duration,
};

use backon::{ExponentialBuilder, Retryable};
use crossbeam_channel::Receiver;
use jito_protos::{
auth::{auth_service_client::AuthServiceClient, Role},
Expand Down Expand Up @@ -37,8 +38,7 @@ pub fn heartbeat_loop_thread(
shutdown_receiver: Receiver<()>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
let auth_keypair = auth_keypair;
Builder::new().name("shredstream_proxy-heartbeat_loop_thread".to_string()).spawn(move || {
Builder::new().name("ssPxyHbeatLoop".to_string()).spawn(move || {
let heartbeat_socket = jito_protos::shared::Socket {
ip: recv_socket.ip().to_string(),
port: recv_socket.port() as i64,
Expand All @@ -56,9 +56,24 @@ pub fn heartbeat_loop_thread(

while !exit.load(Ordering::Relaxed) {
info!("Starting heartbeat client");
let shredstream_client = runtime.block_on(get_grpc_client(block_engine_url.clone(), auth_url.clone(), auth_keypair.clone(), service_name.clone(),exit.clone()));

let (mut shredstream_client , refresh_thread_hdl) = match shredstream_client {
let shredstream_client_res = runtime.block_on(
(|| {
get_grpc_client(
block_engine_url.clone(),
auth_url.clone(),
auth_keypair.clone(),
service_name.clone(),
exit.clone()
)
})
.retry(
&ExponentialBuilder::default()
.with_max_times(100)
.with_min_delay(failed_heartbeat_interval)
.with_max_delay(Duration::from_secs(5 * 60))
)
);
let (mut shredstream_client , refresh_thread_hdl) = match shredstream_client_res {
Ok(c) => c,
Err(e) => {
warn!("Failed to connect to block engine, retrying. Error: {e}");
Expand All @@ -69,9 +84,7 @@ pub fn heartbeat_loop_thread(
("errors", 1, i64),
("error_str", e.to_string(), String),
);
if !exit.load(Ordering::Relaxed) {
sleep(failed_heartbeat_interval);
}

continue; // avoid sending heartbeat, try acquiring grpc client again
}
};
Expand Down Expand Up @@ -103,22 +116,24 @@ pub fn heartbeat_loop_thread(
panic!("Invalid arguments: {err}.");
};
warn!("Error sending heartbeat: {err}");
datapoint_warn!("shredstream_proxy-heartbeat_send_error",
"block_engine_url" => block_engine_url,
("errors", 1, i64),
("error_str", err.to_string(), String),
datapoint_warn!(
"shredstream_proxy-heartbeat_send_error",
"block_engine_url" => block_engine_url,
("errors", 1, i64),
("error_str", err.to_string(), String),
);
failed_heartbeat_count += 1;
}
}
}
// send metrics
recv(metrics_tick) -> _ => {
datapoint_info!("shredstream_proxy-heartbeat_stats",
"block_engine_url" => block_engine_url,
("successful_heartbeat_count", successful_heartbeat_count, i64),
("failed_heartbeat_count", failed_heartbeat_count, i64),
("client_restart_count", client_restart_count, i64),
datapoint_info!(
"shredstream_proxy-heartbeat_stats",
"block_engine_url" => block_engine_url,
("successful_heartbeat_count", successful_heartbeat_count, i64),
("failed_heartbeat_count", failed_heartbeat_count, i64),
("client_restart_count", client_restart_count, i64),
);
successful_heartbeat_count_cumulative += successful_heartbeat_count;
failed_heartbeat_count_cumulative += failed_heartbeat_count;
Expand All @@ -131,10 +146,11 @@ pub fn heartbeat_loop_thread(
recv(grpc_restart_signal) -> _ => {
refresh_thread_hdl.abort();
warn!("No shreds received recently, restarting heartbeat client.");
datapoint_warn!("shredstream_proxy-heartbeat_restart_signal",
"block_engine_url" => block_engine_url,
("desired_regions", format!("{desired_regions:?}"), String),
("client_restart_count", client_restart_count, i64),
datapoint_warn!(
"shredstream_proxy-heartbeat_restart_signal",
"block_engine_url" => block_engine_url,
("desired_regions", format!("{desired_regions:?}"), String),
("client_restart_count", client_restart_count, i64),
);
// exit should be false
break;
Expand Down
11 changes: 5 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use log::*;
use signal_hook::consts::{SIGINT, SIGTERM};
use solana_client::client_error::{reqwest, ClientError};
use solana_metrics::set_host_id;
use solana_perf::deduper::Deduper;
use solana_sdk::signature::read_keypair_file;
use thiserror::Error;
use tokio::runtime::Runtime;
Expand Down Expand Up @@ -249,12 +250,10 @@ fn main() -> Result<(), ShredstreamProxyError> {

// share deduper + metrics between forwarder <-> accessory thread
// use mutex since metrics are write heavy. cheaper than rwlock
let deduper = Arc::new(RwLock::new(
solana_perf::sigverify::Deduper::<2, [u8]>::new(
&mut rand_07::thread_rng(),
forwarder::DEDUPER_NUM_BITS,
),
));
let deduper = Arc::new(RwLock::new(Deduper::<2, [u8]>::new(
&mut rand_07::thread_rng(),
forwarder::DEDUPER_NUM_BITS,
)));

let metrics = Arc::new(ShredMetrics::new());
let use_discovery_service =
Expand Down