From 0e9609b2dc8137fc989edbf09caa0fc0960cc668 Mon Sep 17 00:00:00 2001 From: buffalu <85544055+buffalu@users.noreply.github.com> Date: Tue, 12 Jul 2022 12:38:24 -0500 Subject: [PATCH] Update protobuf for relayer (#14) --- Cargo.lock | 1 + core/src/fetch_stage.rs | 26 ------- core/src/staked_nodes_updater_service.rs | 1 + core/src/tpu.rs | 3 - jito-protos/build.rs | 1 - jito-protos/protos | 2 +- jito-protos/src/lib.rs | 4 -- relayer/Cargo.toml | 1 + relayer/src/relayer.rs | 27 ++++--- relayer/src/router.rs | 90 +++++++++++++----------- 10 files changed, 69 insertions(+), 87 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e4e8cd69b7ad87..e4d422896c3fd9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1834,6 +1834,7 @@ dependencies = [ "jito-protos", "jito-rpc", "log", + "prost-types 0.10.1", "solana-client", "solana-core", "solana-metrics", diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index c87e6a72da74e7..eac21d62d93ba8 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -38,10 +38,8 @@ impl FetchStage { pub fn new_with_sender( sockets: Vec, tpu_forwards_sockets: Vec, - tpu_vote_sockets: Vec, exit: &Arc, sender: &PacketBatchSender, - vote_sender: &PacketBatchSender, forward_sender: &PacketBatchSender, forward_receiver: PacketBatchReceiver, coalesce_ms: u64, @@ -49,14 +47,11 @@ impl FetchStage { ) -> Self { let tx_sockets = sockets.into_iter().map(Arc::new).collect(); let tpu_forwards_sockets = tpu_forwards_sockets.into_iter().map(Arc::new).collect(); - let tpu_vote_sockets = tpu_vote_sockets.into_iter().map(Arc::new).collect(); Self::new_multi_socket( tx_sockets, tpu_forwards_sockets, - tpu_vote_sockets, exit, sender, - vote_sender, forward_sender, forward_receiver, coalesce_ms, @@ -100,10 +95,8 @@ impl FetchStage { fn new_multi_socket( tpu_sockets: Vec>, tpu_forwards_sockets: Vec>, - tpu_vote_sockets: Vec>, exit: &Arc, sender: &PacketBatchSender, - vote_sender: &PacketBatchSender, forward_sender: &PacketBatchSender, forward_receiver: PacketBatchReceiver, coalesce_ms: u64, @@ -145,23 +138,6 @@ impl FetchStage { }) .collect(); - let tpu_vote_stats = Arc::new(StreamerReceiveStats::new("tpu_vote_receiver")); - let tpu_vote_threads: Vec<_> = tpu_vote_sockets - .into_iter() - .map(|socket| { - streamer::receiver( - socket, - exit.clone(), - vote_sender.clone(), - recycler.clone(), - tpu_vote_stats.clone(), - coalesce_ms, - true, - None, - ) - }) - .collect(); - let sender = sender.clone(); let fwd_thread_hdl = Builder::new() @@ -185,7 +161,6 @@ impl FetchStage { sleep(Duration::from_secs(1)); tpu_stats.report(); - tpu_vote_stats.report(); tpu_forward_stats.report(); if exit.load(Ordering::Relaxed) { @@ -198,7 +173,6 @@ impl FetchStage { thread_hdls: [ tpu_threads, tpu_forwards_threads, - tpu_vote_threads, vec![fwd_thread_hdl, metrics_thread_hdl], ] .into_iter() diff --git a/core/src/staked_nodes_updater_service.rs b/core/src/staked_nodes_updater_service.rs index 8d807ea8afee26..9e265c6fa1c93e 100644 --- a/core/src/staked_nodes_updater_service.rs +++ b/core/src/staked_nodes_updater_service.rs @@ -46,6 +46,7 @@ impl StakedNodesUpdaterService { error!("error updating ip to stake mapping: {}", e); } } + sleep(Duration::from_millis(100)); } }) .unwrap(); diff --git a/core/src/tpu.rs b/core/src/tpu.rs index c74f865b5999ca..33aeb738e4ae3a 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -61,16 +61,13 @@ impl Tpu { } = sockets; let (packet_sender, packet_receiver) = unbounded(); - let (vote_sender, _) = unbounded(); let (forwarded_packet_sender, forwarded_packet_receiver) = unbounded(); let fetch_stage = FetchStage::new_with_sender( transactions_sockets, transactions_forward_sockets, - vec![], exit, &packet_sender, - &vote_sender, &forwarded_packet_sender, forwarded_packet_receiver, tpu_coalesce_ms, diff --git a/jito-protos/build.rs b/jito-protos/build.rs index 9cc453bc80d166..1d20ecacc83c57 100644 --- a/jito-protos/build.rs +++ b/jito-protos/build.rs @@ -7,7 +7,6 @@ fn main() { "protos/block.proto", "protos/bundle.proto", "protos/packet.proto", - "protos/relayer.proto", "protos/searcher.proto", "protos/shared.proto", "protos/validator_interface_service.proto", diff --git a/jito-protos/protos b/jito-protos/protos index 3890dcd0e0dc64..f774dd1adb7ccc 160000 --- a/jito-protos/protos +++ b/jito-protos/protos @@ -1 +1 @@ -Subproject commit 3890dcd0e0dc643bfd4e51eb66995c15fbcdac25 +Subproject commit f774dd1adb7ccc2ec695701ace2d252520221a70 diff --git a/jito-protos/src/lib.rs b/jito-protos/src/lib.rs index fff6f9ce09bb7f..36c9a035c44ceb 100644 --- a/jito-protos/src/lib.rs +++ b/jito-protos/src/lib.rs @@ -10,10 +10,6 @@ pub mod packet { tonic::include_proto!("packet"); } -pub mod relayer { - tonic::include_proto!("relayer"); -} - pub mod searcher { tonic::include_proto!("searcher"); } diff --git a/relayer/Cargo.toml b/relayer/Cargo.toml index cbc3b5472de5b2..e5f53c0c4988be 100644 --- a/relayer/Cargo.toml +++ b/relayer/Cargo.toml @@ -13,6 +13,7 @@ ed25519-dalek = "1.0.1" jito-protos = { path = "../jito-protos" } jito-rpc = { path = "../rpc" } log = "0.4.17" +prost-types = "0.10.1" solana-client = "=1.10.25" solana-core = "=1.10.25" solana-metrics = "=1.10.25" diff --git a/relayer/src/relayer.rs b/relayer/src/relayer.rs index cda5394c9ab509..38e1b00e778001 100644 --- a/relayer/src/relayer.rs +++ b/relayer/src/relayer.rs @@ -14,9 +14,9 @@ use crossbeam_channel::{unbounded, Receiver, Sender}; use jito_protos::{ shared::Socket, validator_interface_service::{ - validator_interface_server::ValidatorInterface, GetTpuConfigsRequest, - GetTpuConfigsResponse, SubscribeBundlesRequest, SubscribeBundlesResponse, - SubscribePacketsRequest, SubscribePacketsResponse, + validator_interface_server::ValidatorInterface, AoiSubRequest, AoiSubResponse, + GetTpuConfigsRequest, GetTpuConfigsResponse, PacketStreamMsg, SubscribeBundlesRequest, + SubscribeBundlesResponse, }, }; use log::*; @@ -30,7 +30,7 @@ use tokio::{ task::spawn_blocking, }; use tokio_stream::{wrappers::ReceiverStream, Stream}; -use tonic::{Request, Response, Status}; +use tonic::{Request, Response, Status, Streaming}; use crate::{auth::extract_pubkey, router::Router, schedule_cache::LeaderScheduleCache}; @@ -234,13 +234,13 @@ impl ValidatorInterface for Relayer { )) } - type SubscribePacketsStream = ValidatorSubscriberStream; + type StartBiDirectionalPacketStreamStream = ValidatorSubscriberStream; - async fn subscribe_packets( + async fn start_bi_directional_packet_stream( &self, - req: Request, - ) -> Result, Status> { - let pubkey = extract_pubkey(req.metadata())?; + request: Request>, + ) -> Result, Status> { + let pubkey = extract_pubkey(request.metadata())?; info!("Validator Connected - {}", pubkey); let (subscription_sender, mut subscription_receiver) = unbounded_channel(); @@ -283,4 +283,13 @@ impl ValidatorInterface for Relayer { client_pubkey: pubkey, })) } + + type SubscribeAOIStream = ValidatorSubscriberStream; + + async fn subscribe_aoi( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("subscribe aoi unimplemented")) + } } diff --git a/relayer/src/router.rs b/relayer/src/router.rs index 8b9b612fb2bb28..c7e2bf650b75b5 100644 --- a/relayer/src/router.rs +++ b/relayer/src/router.rs @@ -1,17 +1,20 @@ use std::{ collections::{HashMap, HashSet}, sync::{Arc, RwLock}, + time::SystemTime, }; use crossbeam_channel::Receiver; use jito_protos::{ + self, packet::{ - Meta as PbMeta, Packet as PbPacket, PacketBatch as PbPacketBatch, - PacketBatchWrapper as PbPacketBatchWrapper, PacketFlags as PbPacketFlags, + Meta as PbMeta, Packet as PbPacket, PacketBatch as PbPacketBatch, PacketBatchList, + PacketFlags as PbPacketFlags, }, + shared::Header, validator_interface_service::{ - subscribe_packets_response::Msg::{BatchList, Heartbeat}, - SubscribePacketsResponse, + packet_stream_msg::Msg::{BatchList, Heartbeat}, + PacketStreamMsg, }, }; use log::{debug, info, warn}; @@ -27,7 +30,7 @@ use tonic::Status; use crate::schedule_cache::LeaderScheduleCache; -type PacketsResultSender = UnboundedSender>; +type PacketsResultSender = UnboundedSender>; #[derive(Clone)] pub struct PacketSubscription { @@ -62,9 +65,12 @@ impl Router { pub fn send_heartbeat(&self) -> Vec { let active_subscriptions = self.packet_subs.read().unwrap().clone(); let mut failed_subscriptions = Vec::new(); + + let ts = prost_types::Timestamp::from(SystemTime::now()); + let header = Header { ts: Some(ts) }; for (pk, subscription) in active_subscriptions.iter() { - if let Err(e) = subscription.tx.send(Ok(SubscribePacketsResponse { - msg: Some(Heartbeat(true)), + if let Err(e) = subscription.tx.send(Ok(PacketStreamMsg { + msg: Some(Heartbeat(header.clone())), })) { warn!("error sending heartbeat to subscriber [{}]", e); datapoint_info!( @@ -89,7 +95,7 @@ impl Router { pub(crate) fn add_packet_subscription( &self, pk: &Pubkey, - tx: UnboundedSender>, + tx: UnboundedSender>, ) -> bool { let mut active_subs = self.packet_subs.write().unwrap(); @@ -149,7 +155,7 @@ impl Router { /// tuple.1 = a set of slots that were streamed for pub fn stream_batch_list( &self, - batch_list: &PbPacketBatchWrapper, + batch_list: &PacketBatchList, start_slot: Slot, end_slot: Slot, ) -> (Vec, HashSet) { @@ -169,7 +175,7 @@ impl Router { let slot_to_send = validators_to_send.get(pk); debug!("Slot to Send: {:?}", slot_to_send); if let Some(slot) = slot_to_send { - if let Err(e) = subscription.tx.send(Ok(SubscribePacketsResponse { + if let Err(e) = subscription.tx.send(Ok(PacketStreamMsg { msg: Some(BatchList(batch_list.clone())), })) { datapoint_warn!( @@ -180,7 +186,6 @@ impl Router { ); failed_stream_pks.push(*pk); } else { - debug!("slot sent {}", slot); datapoint_info!( "validator_interface_stream_batch_list", ("subscriber", pk.to_string(), String), @@ -206,7 +211,7 @@ impl Router { validators_to_send.insert(pk, slot); } } - info!( + debug!( "validators_in_slot_range: {} - {}, val: {:?}", start_slot, end_slot, validators_to_send ); @@ -214,38 +219,37 @@ impl Router { validators_to_send } - pub fn batchlist_to_proto(batches: Vec) -> PbPacketBatchWrapper { - // ToDo: Turn this back into a map - let mut proto_batch_vec: Vec = Vec::new(); - for batch in batches.into_iter() { - let mut proto_pkt_vec: Vec = Vec::new(); - for p in batch.iter() { - if !p.meta.discard() { - proto_pkt_vec.push(PbPacket { - data: p.data[0..p.meta.size].to_vec(), - meta: Some(PbMeta { - size: p.meta.size as u64, - addr: p.meta.addr.to_string(), - port: p.meta.port as u32, - flags: Some(PbPacketFlags { - discard: p.meta.discard(), - forwarded: p.meta.forwarded(), - repair: p.meta.repair(), - simple_vote_tx: p.meta.is_simple_vote_tx(), - // tracer_tx: p.meta.is_tracer_tx(), // Couldn't get this to work? - tracer_tx: false, + pub fn batchlist_to_proto(batches: Vec) -> PacketBatchList { + PacketBatchList { + header: Some(Header { + ts: Some(prost_types::Timestamp::from(SystemTime::now())), + }), + batch_list: batches + .into_iter() + .map(|batch| PbPacketBatch { + packets: batch + .iter() + .filter(|p| !p.meta.discard()) + .map(|p| PbPacket { + data: p.data[0..p.meta.size].to_vec(), + meta: Some(PbMeta { + size: p.meta.size as u64, + addr: p.meta.addr.to_string(), + port: p.meta.port as u32, + flags: Some(PbPacketFlags { + discard: p.meta.discard(), + forwarded: p.meta.forwarded(), + repair: p.meta.repair(), + simple_vote_tx: p.meta.is_simple_vote_tx(), + tracer_packet: p.meta.is_tracer_packet(), + }), + sender_stake: p.meta.sender_stake, }), - }), - }) - } - } - proto_batch_vec.push(PbPacketBatch { - packets: proto_pkt_vec, - }) - } - - PbPacketBatchWrapper { - batch_list: proto_batch_vec, + }) + .collect(), + }) + .collect(), + expiry: 0, // TODO add the expiry from CLI args } } }