Skip to content

Commit

Permalink
Update protobuf for relayer (solana-labs#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
buffalu authored Jul 12, 2022
1 parent d7d97dc commit 0e9609b
Show file tree
Hide file tree
Showing 10 changed files with 69 additions and 87 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 0 additions & 26 deletions core/src/fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,20 @@ impl FetchStage {
pub fn new_with_sender(
sockets: Vec<UdpSocket>,
tpu_forwards_sockets: Vec<UdpSocket>,
tpu_vote_sockets: Vec<UdpSocket>,
exit: &Arc<AtomicBool>,
sender: &PacketBatchSender,
vote_sender: &PacketBatchSender,
forward_sender: &PacketBatchSender,
forward_receiver: PacketBatchReceiver,
coalesce_ms: u64,
in_vote_only_mode: Option<Arc<AtomicBool>>,
) -> Self {
let tx_sockets = sockets.into_iter().map(Arc::new).collect();
let tpu_forwards_sockets = tpu_forwards_sockets.into_iter().map(Arc::new).collect();
let tpu_vote_sockets = tpu_vote_sockets.into_iter().map(Arc::new).collect();
Self::new_multi_socket(
tx_sockets,
tpu_forwards_sockets,
tpu_vote_sockets,
exit,
sender,
vote_sender,
forward_sender,
forward_receiver,
coalesce_ms,
Expand Down Expand Up @@ -100,10 +95,8 @@ impl FetchStage {
fn new_multi_socket(
tpu_sockets: Vec<Arc<UdpSocket>>,
tpu_forwards_sockets: Vec<Arc<UdpSocket>>,
tpu_vote_sockets: Vec<Arc<UdpSocket>>,
exit: &Arc<AtomicBool>,
sender: &PacketBatchSender,
vote_sender: &PacketBatchSender,
forward_sender: &PacketBatchSender,
forward_receiver: PacketBatchReceiver,
coalesce_ms: u64,
Expand Down Expand Up @@ -145,23 +138,6 @@ impl FetchStage {
})
.collect();

let tpu_vote_stats = Arc::new(StreamerReceiveStats::new("tpu_vote_receiver"));
let tpu_vote_threads: Vec<_> = tpu_vote_sockets
.into_iter()
.map(|socket| {
streamer::receiver(
socket,
exit.clone(),
vote_sender.clone(),
recycler.clone(),
tpu_vote_stats.clone(),
coalesce_ms,
true,
None,
)
})
.collect();

let sender = sender.clone();

let fwd_thread_hdl = Builder::new()
Expand All @@ -185,7 +161,6 @@ impl FetchStage {
sleep(Duration::from_secs(1));

tpu_stats.report();
tpu_vote_stats.report();
tpu_forward_stats.report();

if exit.load(Ordering::Relaxed) {
Expand All @@ -198,7 +173,6 @@ impl FetchStage {
thread_hdls: [
tpu_threads,
tpu_forwards_threads,
tpu_vote_threads,
vec![fwd_thread_hdl, metrics_thread_hdl],
]
.into_iter()
Expand Down
1 change: 1 addition & 0 deletions core/src/staked_nodes_updater_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl StakedNodesUpdaterService {
error!("error updating ip to stake mapping: {}", e);
}
}
sleep(Duration::from_millis(100));
}
})
.unwrap();
Expand Down
3 changes: 0 additions & 3 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,13 @@ impl Tpu {
} = sockets;

let (packet_sender, packet_receiver) = unbounded();
let (vote_sender, _) = unbounded();
let (forwarded_packet_sender, forwarded_packet_receiver) = unbounded();

let fetch_stage = FetchStage::new_with_sender(
transactions_sockets,
transactions_forward_sockets,
vec![],
exit,
&packet_sender,
&vote_sender,
&forwarded_packet_sender,
forwarded_packet_receiver,
tpu_coalesce_ms,
Expand Down
1 change: 0 additions & 1 deletion jito-protos/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ fn main() {
"protos/block.proto",
"protos/bundle.proto",
"protos/packet.proto",
"protos/relayer.proto",
"protos/searcher.proto",
"protos/shared.proto",
"protos/validator_interface_service.proto",
Expand Down
2 changes: 1 addition & 1 deletion jito-protos/protos
Submodule protos updated from 3890dc to f774dd
4 changes: 0 additions & 4 deletions jito-protos/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ pub mod packet {
tonic::include_proto!("packet");
}

pub mod relayer {
tonic::include_proto!("relayer");
}

pub mod searcher {
tonic::include_proto!("searcher");
}
Expand Down
1 change: 1 addition & 0 deletions relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ ed25519-dalek = "1.0.1"
jito-protos = { path = "../jito-protos" }
jito-rpc = { path = "../rpc" }
log = "0.4.17"
prost-types = "0.10.1"
solana-client = "=1.10.25"
solana-core = "=1.10.25"
solana-metrics = "=1.10.25"
Expand Down
27 changes: 18 additions & 9 deletions relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ use crossbeam_channel::{unbounded, Receiver, Sender};
use jito_protos::{
shared::Socket,
validator_interface_service::{
validator_interface_server::ValidatorInterface, GetTpuConfigsRequest,
GetTpuConfigsResponse, SubscribeBundlesRequest, SubscribeBundlesResponse,
SubscribePacketsRequest, SubscribePacketsResponse,
validator_interface_server::ValidatorInterface, AoiSubRequest, AoiSubResponse,
GetTpuConfigsRequest, GetTpuConfigsResponse, PacketStreamMsg, SubscribeBundlesRequest,
SubscribeBundlesResponse,
},
};
use log::*;
Expand All @@ -30,7 +30,7 @@ use tokio::{
task::spawn_blocking,
};
use tokio_stream::{wrappers::ReceiverStream, Stream};
use tonic::{Request, Response, Status};
use tonic::{Request, Response, Status, Streaming};

use crate::{auth::extract_pubkey, router::Router, schedule_cache::LeaderScheduleCache};

Expand Down Expand Up @@ -234,13 +234,13 @@ impl ValidatorInterface for Relayer {
))
}

type SubscribePacketsStream = ValidatorSubscriberStream<SubscribePacketsResponse>;
type StartBiDirectionalPacketStreamStream = ValidatorSubscriberStream<PacketStreamMsg>;

async fn subscribe_packets(
async fn start_bi_directional_packet_stream(
&self,
req: Request<SubscribePacketsRequest>,
) -> Result<Response<Self::SubscribePacketsStream>, Status> {
let pubkey = extract_pubkey(req.metadata())?;
request: Request<Streaming<PacketStreamMsg>>,
) -> Result<Response<Self::StartBiDirectionalPacketStreamStream>, Status> {
let pubkey = extract_pubkey(request.metadata())?;
info!("Validator Connected - {}", pubkey);

let (subscription_sender, mut subscription_receiver) = unbounded_channel();
Expand Down Expand Up @@ -283,4 +283,13 @@ impl ValidatorInterface for Relayer {
client_pubkey: pubkey,
}))
}

type SubscribeAOIStream = ValidatorSubscriberStream<AoiSubResponse>;

async fn subscribe_aoi(
&self,
_request: Request<AoiSubRequest>,
) -> Result<Response<Self::SubscribeAOIStream>, Status> {
Err(Status::unimplemented("subscribe aoi unimplemented"))
}
}
90 changes: 47 additions & 43 deletions relayer/src/router.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
use std::{
collections::{HashMap, HashSet},
sync::{Arc, RwLock},
time::SystemTime,
};

use crossbeam_channel::Receiver;
use jito_protos::{
self,
packet::{
Meta as PbMeta, Packet as PbPacket, PacketBatch as PbPacketBatch,
PacketBatchWrapper as PbPacketBatchWrapper, PacketFlags as PbPacketFlags,
Meta as PbMeta, Packet as PbPacket, PacketBatch as PbPacketBatch, PacketBatchList,
PacketFlags as PbPacketFlags,
},
shared::Header,
validator_interface_service::{
subscribe_packets_response::Msg::{BatchList, Heartbeat},
SubscribePacketsResponse,
packet_stream_msg::Msg::{BatchList, Heartbeat},
PacketStreamMsg,
},
};
use log::{debug, info, warn};
Expand All @@ -27,7 +30,7 @@ use tonic::Status;

use crate::schedule_cache::LeaderScheduleCache;

type PacketsResultSender = UnboundedSender<Result<SubscribePacketsResponse, Status>>;
type PacketsResultSender = UnboundedSender<Result<PacketStreamMsg, Status>>;

#[derive(Clone)]
pub struct PacketSubscription {
Expand Down Expand Up @@ -62,9 +65,12 @@ impl Router {
pub fn send_heartbeat(&self) -> Vec<Pubkey> {
let active_subscriptions = self.packet_subs.read().unwrap().clone();
let mut failed_subscriptions = Vec::new();

let ts = prost_types::Timestamp::from(SystemTime::now());
let header = Header { ts: Some(ts) };
for (pk, subscription) in active_subscriptions.iter() {
if let Err(e) = subscription.tx.send(Ok(SubscribePacketsResponse {
msg: Some(Heartbeat(true)),
if let Err(e) = subscription.tx.send(Ok(PacketStreamMsg {
msg: Some(Heartbeat(header.clone())),
})) {
warn!("error sending heartbeat to subscriber [{}]", e);
datapoint_info!(
Expand All @@ -89,7 +95,7 @@ impl Router {
pub(crate) fn add_packet_subscription(
&self,
pk: &Pubkey,
tx: UnboundedSender<Result<SubscribePacketsResponse, Status>>,
tx: UnboundedSender<Result<PacketStreamMsg, Status>>,
) -> bool {
let mut active_subs = self.packet_subs.write().unwrap();

Expand Down Expand Up @@ -149,7 +155,7 @@ impl Router {
/// tuple.1 = a set of slots that were streamed for
pub fn stream_batch_list(
&self,
batch_list: &PbPacketBatchWrapper,
batch_list: &PacketBatchList,
start_slot: Slot,
end_slot: Slot,
) -> (Vec<Pubkey>, HashSet<Slot>) {
Expand All @@ -169,7 +175,7 @@ impl Router {
let slot_to_send = validators_to_send.get(pk);
debug!("Slot to Send: {:?}", slot_to_send);
if let Some(slot) = slot_to_send {
if let Err(e) = subscription.tx.send(Ok(SubscribePacketsResponse {
if let Err(e) = subscription.tx.send(Ok(PacketStreamMsg {
msg: Some(BatchList(batch_list.clone())),
})) {
datapoint_warn!(
Expand All @@ -180,7 +186,6 @@ impl Router {
);
failed_stream_pks.push(*pk);
} else {
debug!("slot sent {}", slot);
datapoint_info!(
"validator_interface_stream_batch_list",
("subscriber", pk.to_string(), String),
Expand All @@ -206,46 +211,45 @@ impl Router {
validators_to_send.insert(pk, slot);
}
}
info!(
debug!(
"validators_in_slot_range: {} - {}, val: {:?}",
start_slot, end_slot, validators_to_send
);

validators_to_send
}

pub fn batchlist_to_proto(batches: Vec<PacketBatch>) -> PbPacketBatchWrapper {
// ToDo: Turn this back into a map
let mut proto_batch_vec: Vec<PbPacketBatch> = Vec::new();
for batch in batches.into_iter() {
let mut proto_pkt_vec: Vec<PbPacket> = Vec::new();
for p in batch.iter() {
if !p.meta.discard() {
proto_pkt_vec.push(PbPacket {
data: p.data[0..p.meta.size].to_vec(),
meta: Some(PbMeta {
size: p.meta.size as u64,
addr: p.meta.addr.to_string(),
port: p.meta.port as u32,
flags: Some(PbPacketFlags {
discard: p.meta.discard(),
forwarded: p.meta.forwarded(),
repair: p.meta.repair(),
simple_vote_tx: p.meta.is_simple_vote_tx(),
// tracer_tx: p.meta.is_tracer_tx(), // Couldn't get this to work?
tracer_tx: false,
pub fn batchlist_to_proto(batches: Vec<PacketBatch>) -> PacketBatchList {
PacketBatchList {
header: Some(Header {
ts: Some(prost_types::Timestamp::from(SystemTime::now())),
}),
batch_list: batches
.into_iter()
.map(|batch| PbPacketBatch {
packets: batch
.iter()
.filter(|p| !p.meta.discard())
.map(|p| PbPacket {
data: p.data[0..p.meta.size].to_vec(),
meta: Some(PbMeta {
size: p.meta.size as u64,
addr: p.meta.addr.to_string(),
port: p.meta.port as u32,
flags: Some(PbPacketFlags {
discard: p.meta.discard(),
forwarded: p.meta.forwarded(),
repair: p.meta.repair(),
simple_vote_tx: p.meta.is_simple_vote_tx(),
tracer_packet: p.meta.is_tracer_packet(),
}),
sender_stake: p.meta.sender_stake,
}),
}),
})
}
}
proto_batch_vec.push(PbPacketBatch {
packets: proto_pkt_vec,
})
}

PbPacketBatchWrapper {
batch_list: proto_batch_vec,
})
.collect(),
})
.collect(),
expiry: 0, // TODO add the expiry from CLI args
}
}
}

0 comments on commit 0e9609b

Please sign in to comment.