diff --git a/Cargo.lock b/Cargo.lock index 567eb3ccf992c7..36b534f4e91a15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6224,6 +6224,23 @@ dependencies = [ "thiserror", ] +[[package]] +name = "solana-hook-service-proto" +version = "1.18.0" +dependencies = [ + "bincode", + "bs58", + "enum-iterator", + "prost", + "protobuf-src", + "serde", + "solana-account-decoder", + "solana-sdk", + "solana-transaction-status", + "tonic", + "tonic-build", +] + [[package]] name = "solana-install" version = "1.18.0" @@ -7575,6 +7592,7 @@ dependencies = [ "solana-geyser-plugin-interface", "solana-geyser-plugin-manager", "solana-gossip", + "solana-hook-service-proto", "solana-ledger", "solana-logger", "solana-metrics", @@ -7597,6 +7615,9 @@ dependencies = [ "symlink", "thiserror", "tikv-jemallocator", + "tokio", + "tokio-stream", + "tonic", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 095f844475fe32..61411510b4b47c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ members = [ "geyser-plugin-interface", "geyser-plugin-manager", "gossip", + "hook-service", "install", "keygen", "ledger", @@ -331,6 +332,7 @@ solana-core = { path = "core", version = "=1.18.0" } solana-cost-model = { path = "cost-model", version = "=1.18.0" } solana-download-utils = { path = "download-utils", version = "=1.18.0" } solana-entry = { path = "entry", version = "=1.18.0" } +solana-hook-service-proto = { path = "hook-service", version = "=1.18.0" } solana-faucet = { path = "faucet", version = "=1.18.0" } solana-frozen-abi = { path = "frozen-abi", version = "=1.18.0" } solana-frozen-abi-macro = { path = "frozen-abi/macro", version = "=1.18.0" } diff --git a/hook-service/Cargo.toml b/hook-service/Cargo.toml new file mode 100644 index 00000000000000..9d0bb6d1edd369 --- /dev/null +++ b/hook-service/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "solana-hook-service-proto" +description = "Solana Hook service" +documentation = "https://docs.rs/solana-hook-service-proto" +version = { workspace = true } +authors = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +edition = { workspace = true } + +[dependencies] +bincode = { workspace = true } +bs58 = { workspace = true } +prost = { workspace = true } +serde = { workspace = true } +solana-account-decoder = { workspace = true } +solana-sdk = { workspace = true } +solana-transaction-status = { workspace = true } +tonic = { workspace = true } + +[dev-dependencies] +enum-iterator = { workspace = true } + +[lib] +crate-type = ["lib"] +name = "solana_hook_service_proto" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] + +[build-dependencies] +tonic-build = { workspace = true } + +# windows users should install the protobuf compiler manually and set the PROTOC +# envar to point to the installed binary +[target."cfg(not(windows))".build-dependencies] +protobuf-src = { workspace = true } diff --git a/hook-service/build.rs b/hook-service/build.rs new file mode 100644 index 00000000000000..e4783db94a7742 --- /dev/null +++ b/hook-service/build.rs @@ -0,0 +1,29 @@ +fn main() -> Result<(), std::io::Error> { + const PROTOC_ENVAR: &str = "PROTOC"; + if std::env::var(PROTOC_ENVAR).is_err() { + #[cfg(not(windows))] + std::env::set_var(PROTOC_ENVAR, protobuf_src::protoc()); + } + + let proto_base_path = std::path::PathBuf::from("proto"); + let proto_files = ["hook_service.proto"]; + let mut protos = Vec::new(); + for proto_file in &proto_files { + let proto = proto_base_path.join(proto_file); + println!("cargo:rerun-if-changed={}", proto.display()); + protos.push(proto); + } + + tonic_build::configure() + .build_client(true) + .build_server(true) + .type_attribute( + "TransactionErrorType", + "#[cfg_attr(test, derive(enum_iterator::Sequence))]", + ) + .type_attribute( + "InstructionErrorType", + "#[cfg_attr(test, derive(enum_iterator::Sequence))]", + ) + .compile(&protos, &[proto_base_path]) +} diff --git a/hook-service/proto/hook_service.proto b/hook-service/proto/hook_service.proto new file mode 100644 index 00000000000000..e8d689d8c282c6 --- /dev/null +++ b/hook-service/proto/hook_service.proto @@ -0,0 +1,93 @@ +syntax = "proto3"; + +package hook_service; + +message SocketAddress { + uint32 ip = 1; + uint32 port = 2; +} + +message EnableTpuRequest { + // Enable (or disable) packets coming from the TPU QUIC server + bool enable_tpu = 1; + // Enable (or disable) packets coming from the TPU forward QUIC server + bool enable_tpu_fwd = 2; +} + +message EnableTpuResponse {} + +message SetTpuAddressRequest { + // Sets the TPU socket address in gossip for this validator + SocketAddress tpu = 1; + // Sets the TPU forward socket address in gossip for this validator + SocketAddress tpu_fwd = 2; +} + +message SetTpuAddressResponse {} + +message Packet { + repeated bytes payload = 1; +} + +message SendPacketsRequest { + bool admin = 1; + repeated Packet packets = 2; +} + +message SendPacketsResponse {} + +message SetShredForwarderAddressRequest { + optional SocketAddress broadcast_address = 1; + optional SocketAddress retransmit_address = 2; +} + +message SetShredForwarderAddressResponse {} + +message SetMaxComputeRequest { + uint64 max_compute_units = 1; +} + +message SetMaxComputeResponse {} + +message Bundle { + bool admin = 1; + repeated Packet packets = 2; + string uuid = 3; +} + +message SendBundleRequest { + repeated Bundle bundles = 1; + uint64 slot_expiration = 2; +} + +message SendBundleResponse {} + +message MarkAccountsForDiscardRequest { + repeated string write_accounts = 1; + repeated string read_accounts = 2; +} + +message MarkAccountsForDiscardResponse {} + +service HookService { + // Enable or disable TPU and TPU forward ports in the validator + rpc EnableTpu(EnableTpuRequest) returns (EnableTpuResponse); + + // Set the TPU and TPU forward addresses + rpc SetTpuAddress(SetTpuAddressRequest) returns (SetTpuAddressResponse); + + // Sends packets into the BankingStage of the validator + rpc SendPackets(stream SendPacketsRequest) returns (SendPacketsResponse); + + // Marks accounts for discard + rpc MarkAccountsForDiscard(MarkAccountsForDiscardRequest) returns (MarkAccountsForDiscardResponse); + + // Sets the shred forwarder addresses + rpc SetShredForwarderAddress(SetShredForwarderAddressRequest) returns (SetShredForwarderAddressResponse); + + // Sets the max compute for the current slot + rpc SetMaxCompute(SetMaxComputeRequest) returns (SetMaxComputeResponse); + + // Sends a bundle stream to the validator + rpc SendBundle(stream SendBundleRequest) returns (stream SendBundleResponse); +} diff --git a/hook-service/src/lib.rs b/hook-service/src/lib.rs new file mode 100644 index 00000000000000..0deb0f6b8c1a4c --- /dev/null +++ b/hook-service/src/lib.rs @@ -0,0 +1,3 @@ +pub mod hook_service { + tonic::include_proto!("hook_service"); +} diff --git a/validator/Cargo.toml b/validator/Cargo.toml index 6c7f691c27b5fa..84f12f6a0578f4 100644 --- a/validator/Cargo.toml +++ b/validator/Cargo.toml @@ -44,6 +44,7 @@ solana-genesis-utils = { workspace = true } solana-geyser-plugin-interface = { workspace = true } solana-geyser-plugin-manager = { workspace = true } solana-gossip = { workspace = true } +solana-hook-service-proto = { workspace = true } solana-ledger = { workspace = true } solana-logger = { workspace = true } solana-metrics = { workspace = true } @@ -64,6 +65,9 @@ solana-version = { workspace = true } solana-vote-program = { workspace = true } symlink = { workspace = true } thiserror = { workspace = true } +tokio = { workspace = true } +tokio-stream = { workspace = true } +tonic = { workspace = true } [dev-dependencies] solana-account-decoder = { workspace = true } diff --git a/validator/src/hook_service.rs b/validator/src/hook_service.rs new file mode 100644 index 00000000000000..242877ef51c87f --- /dev/null +++ b/validator/src/hook_service.rs @@ -0,0 +1,119 @@ +use { + solana_hook_service_proto::hook_service::{ + hook_service_server::{HookService, HookServiceServer}, + EnableTpuRequest, EnableTpuResponse, MarkAccountsForDiscardRequest, + MarkAccountsForDiscardResponse, SendBundleRequest, SendBundleResponse, SendPacketsRequest, + SendPacketsResponse, SetMaxComputeRequest, SetMaxComputeResponse, + SetShredForwarderAddressRequest, SetShredForwarderAddressResponse, SetTpuAddressRequest, + SetTpuAddressResponse, + }, + std::path::{Path, PathBuf}, + thiserror::Error, + tokio::{fs::create_dir_all, net::UnixListener, task::JoinHandle}, + tokio_stream::wrappers::{ReceiverStream, UnixListenerStream}, + tonic::{async_trait, transport::Server, Request, Response, Status, Streaming}, +}; + +#[derive(Debug, Error)] +pub enum HookServiceError { + #[error("UdsPathError")] + UdsPathError, + + #[error("UdsSetupError: {0}")] + UdsSetupError(#[from] std::io::Error), + + #[error("TonicError: {0}")] + TonicError(#[from] tonic::transport::Error), +} + +pub type HookServiceResult = Result; + +pub struct HookGrpcService { + pub handle: JoinHandle>, +} + +impl HookGrpcService { + pub async fn new(uds_path: PathBuf) -> HookServiceResult { + create_dir_all( + Path::new(&uds_path) + .parent() + .ok_or(HookServiceError::UdsPathError)?, + ) + .await?; + + let service = HookServiceServer::new(HookServiceImpl::new()); + + let uds = UnixListener::bind(uds_path)?; + let uds_stream = UnixListenerStream::new(uds); + + let handle = tokio::spawn( + Server::builder() + .add_service(service) + .serve_with_incoming(uds_stream), + ); + + Ok(HookGrpcService { handle }) + } +} + +struct HookServiceImpl {} + +impl HookServiceImpl { + pub fn new() -> Self { + Self {} + } +} + +#[async_trait] +impl HookService for HookServiceImpl { + async fn enable_tpu( + &self, + _request: Request, + ) -> Result, Status> { + todo!() + } + + async fn set_tpu_address( + &self, + _request: Request, + ) -> Result, Status> { + todo!() + } + + async fn send_packets( + &self, + _request: Request>, + ) -> Result, Status> { + todo!() + } + + async fn mark_accounts_for_discard( + &self, + _request: Request, + ) -> Result, Status> { + todo!() + } + + async fn set_shred_forwarder_address( + &self, + _request: Request, + ) -> Result, Status> { + todo!() + } + + async fn set_max_compute( + &self, + _request: Request, + ) -> Result, Status> { + todo!() + } + + type SendBundleStream = ReceiverStream>; + + async fn send_bundle( + &self, + _request: Request>, + ) -> Result, Status> { + todo!() + } +} diff --git a/validator/src/lib.rs b/validator/src/lib.rs index e1b9df96b9b03e..9ce01bce60b3d7 100644 --- a/validator/src/lib.rs +++ b/validator/src/lib.rs @@ -20,6 +20,7 @@ pub mod admin_rpc_service; pub mod bootstrap; pub mod cli; pub mod dashboard; +pub mod hook_service; #[cfg(unix)] fn redirect_stderr(filename: &str) {