Skip to content

Commit

Permalink
relayer mvp (solana-labs#4)
Browse files Browse the repository at this point in the history
Adds basic functionality 
* connect to multiple validators w/ simple auth
* forward verified transactions from tpu to leaders
   within specified window of current slot
* stubs out start of bench script
  • Loading branch information
jedleggett authored Jun 28, 2022
1 parent bbbf0c9 commit 1bab15c
Show file tree
Hide file tree
Showing 11 changed files with 857 additions and 60 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ members = [
"jito-protos",
"transaction-relayer",
"rpc",
"relayer"
"relayer",
"bench_e2e"
]
13 changes: 13 additions & 0 deletions bench_e2e/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "bench_e2e"
version = "0.1.0"
edition = "2021"

[dependencies]
bincode = "1.3.3"
env_logger = "0.9.0"
log = "0.4.17"
solana-perf = "1.10.24"
solana-client = "1.10.24"
solana-sdk = "1.10.24"
tokio = { version = "1.14.1", features = ["rt-multi-thread"] }
117 changes: 117 additions & 0 deletions bench_e2e/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
use std::{
io,
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{sleep, Builder, JoinHandle},
time::{Duration, Instant},
};

use bincode::serialize;
use solana_client::rpc_client::RpcClient;
use solana_sdk::{
signature::{Keypair, Signature, Signer},
system_transaction::transfer,
};

fn main() {
env_logger::init();

let client = RpcClient::new("http://127.0.0.1:8899");
let keypair = Keypair::new();
assert!(request_and_confirm_airdrop(&client, &[keypair.pubkey()]));

let exit = Arc::new(AtomicBool::new(false));

const NUM_THREADS: usize = 1;
const NUM_PACKETS_PER_ITER: usize = 1000;
const PACKET_RATE_PER_SEC_PER_SERVER: usize = 1000;
const PACKET_RATE_PER_THREAD: usize =
((PACKET_RATE_PER_SEC_PER_SERVER as f32) / (NUM_THREADS as f32)) as usize;
const LOOP_DURATION: f32 = (NUM_PACKETS_PER_ITER as f32) / (PACKET_RATE_PER_THREAD as f32);

let tpu_ip: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
// let (tpu_port, tpu_sockets) =
// multi_bind_in_range(tpu_ip, (40_000, 41_000), 32).expect("tpu multi_bind");
let tpu_addr = SocketAddr::new(tpu_ip, 10_500);

let kp_string = keypair.to_base58_string();
let client = Arc::new(client);
let send_threads: Vec<JoinHandle<()>> = (0..NUM_THREADS)
.map(|_| {
let send_exit = exit.clone();
let client = client.clone();
let keypair = Keypair::from_base58_string(&kp_string.clone());
Builder::new()
.name(String::from("send_thread"))
.spawn(move || {
let udp_sender = UdpSocket::bind("0.0.0.0:0").unwrap();
loop {
let exec_start = Instant::now();
if send_exit.load(Ordering::Acquire) {
break;
}

let latest = client.get_latest_blockhash().unwrap();

// transfer()
// TODO (LB): need to generate this faster
let serialized_txs: Vec<Vec<u8>> = (0..NUM_PACKETS_PER_ITER)
.map(|c| {
serialize(&transfer(
&keypair,
&keypair.pubkey(),
(c * 100) as u64,
latest,
))
.unwrap()
})
.collect();

let _: Vec<io::Result<usize>> = serialized_txs
.iter()
.map(|tx| udp_sender.send_to(tx, tpu_addr))
.collect();

let sleep_duration = Duration::from_secs_f32(LOOP_DURATION)
.checked_sub(exec_start.elapsed())
.unwrap_or_else(|| Duration::from_secs(0));

sleep(sleep_duration);
}
})
.unwrap()
})
.collect();

// Run the test for this long
sleep(Duration::from_secs(60));

exit.store(true, Ordering::Relaxed);
for s in send_threads {
let _ = s.join();
}
}

fn request_and_confirm_airdrop(client: &RpcClient, pubkeys: &[solana_sdk::pubkey::Pubkey]) -> bool {
let sigs: Vec<_> = pubkeys
.iter()
.map(|pubkey| client.request_airdrop(pubkey, 100_000_000_000))
.collect();

if sigs.iter().any(|s| s.is_err()) {
return false;
}
let sigs: Vec<Signature> = sigs.into_iter().map(|s| s.unwrap()).collect();

let now = Instant::now();
while now.elapsed() < Duration::from_secs(20) {
let r = client.get_signature_statuses(&sigs).expect("got statuses");
if r.value.iter().all(|s| s.is_some()) {
return true;
}
}
false
}
12 changes: 8 additions & 4 deletions relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@ publish = false

[dependencies]
crossbeam-channel = "0.5.4"
ed25519-dalek = "1.0.1"
jito-protos = { path = "../jito-protos" }
jito-rpc = { path = "../rpc" }
log = "0.4.17"
solana-sdk = "1.10.24"
solana-client = "1.10.24"
solana-core = "1.10.24"
solana-sdk = "=1.10.25"
solana-client = "=1.10.25"
solana-core = "=1.10.25"
solana-perf = "=1.10.25"
solana-metrics = "=1.10.25"
tokio-stream = "0.1.9"
tokio = "1.14.1"
tokio = { version = "1.14.1", features = ["time"] }
tonic = "0.7.2"
90 changes: 90 additions & 0 deletions relayer/src/auth.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use std::sync::Arc;

use ed25519_dalek::{PublicKey, Signature, Verifier};
use solana_sdk::pubkey::Pubkey;
use tonic::{metadata::MetadataMap, service::Interceptor, Request, Status};

use crate::schedule_cache::LeaderScheduleCache;

#[derive(Clone)]
pub struct AuthenticationInterceptor {
pub cache: Arc<LeaderScheduleCache>,
}

impl AuthenticationInterceptor {
pub fn auth(
req: &mut tonic::Request<()>,
cache: &Arc<LeaderScheduleCache>,
) -> Result<(), Status> {
let meta = req.metadata();
let pubkey = extract_signer_pubkey(meta)?;
let msg = Self::extract_msg(meta)?;
let signature = Self::extract_signature(meta)?;
pubkey
.verify(msg.as_slice(), &signature)
.map_err(|_| Status::invalid_argument("bad message and/or signature"))?;

let validator_pubkey = Pubkey::new(&pubkey.to_bytes());

// TODO: is this called in async runtime?
if !cache.is_validator_scheduled(validator_pubkey) {
return Err(Status::permission_denied(
"not a validator scheduled for this epoch",
));
}

req.extensions_mut().insert(validator_pubkey);

Ok(())
}

fn extract_msg(meta: &MetadataMap) -> Result<Vec<u8>, Status> {
let msg = meta
.get_bin("message-bin")
.ok_or_else(|| Status::invalid_argument("message missing"))?;

Ok(msg
.to_bytes()
.map_err(|_| Status::invalid_argument("bad message"))?
.to_vec())
}

fn extract_signature(meta: &MetadataMap) -> Result<Signature, Status> {
let sig_bytes = meta
.get_bin("signature-bin")
.ok_or_else(|| Status::invalid_argument("missing signature"))?
.to_bytes()
.map_err(|_| Status::invalid_argument("invalid signature format"))?;
let sig_bytes = sig_bytes.to_vec();
let sig_bytes = &<[u8; 64]>::try_from(sig_bytes.as_slice())
.map_err(|_| Status::invalid_argument("invalid signature format"))?;

Signature::from_bytes(sig_bytes)
.map_err(|_| Status::invalid_argument("invalid signature format"))
}
}

impl Interceptor for AuthenticationInterceptor {
fn call(
&mut self,
mut request: tonic::Request<()>,
) -> std::result::Result<Request<()>, Status> {
Self::auth(&mut request, &self.cache)?;
Ok(request)
}
}

pub fn extract_signer_pubkey(meta: &MetadataMap) -> Result<PublicKey, Status> {
let pubkey_bytes = meta
.get_bin("public-key-bin")
.ok_or_else(|| Status::invalid_argument("missing public key"))?
.to_bytes()
.map_err(|_| Status::invalid_argument("incorrectly formatted public key"))?;
let pk = PublicKey::from_bytes(pubkey_bytes.as_ref())
.map_err(|_| Status::invalid_argument("incorrectly formatted public key"))?;
Ok(pk)
}

pub fn extract_pubkey(meta: &MetadataMap) -> Result<Pubkey, Status> {
Ok(Pubkey::new(&extract_signer_pubkey(meta)?.to_bytes()))
}
2 changes: 2 additions & 0 deletions relayer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod auth;
mod manager;
pub mod relayer;
mod router;
pub mod schedule_cache;
Loading

0 comments on commit 1bab15c

Please sign in to comment.