diff --git a/src/fullnode.rs b/src/fullnode.rs index 7c03985e79eb35..eaeb7f3523f41c 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -24,7 +24,6 @@ use window; pub struct Fullnode { exit: Arc, thread_hdls: Vec>, - _rpc_service: JsonRpcService, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] @@ -191,8 +190,14 @@ impl Fullnode { let mut drone_addr = node.info.contact_info.tpu; drone_addr.set_port(DRONE_PORT); let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from(0)), RPC_PORT); - let _rpc_service = - JsonRpcService::new(&bank, node.info.contact_info.tpu, drone_addr, rpc_addr); + let rpc_service = JsonRpcService::new( + &bank, + node.info.contact_info.tpu, + drone_addr, + rpc_addr, + exit.clone(), + ); + thread_hdls.extend(rpc_service.thread_hdls()); let blob_recycler = BlobRecycler::default(); let window = @@ -261,11 +266,7 @@ impl Fullnode { } } - Fullnode { - exit, - thread_hdls, - _rpc_service, - } + Fullnode { exit, thread_hdls } } //used for notifying many nodes in parallel to exit diff --git a/src/rpc.rs b/src/rpc.rs index 5ecf2ec869419f..ee9a368f465249 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -5,11 +5,13 @@ use bincode::deserialize; use bs58; use jsonrpc_core::*; use jsonrpc_http_server::*; +use service::Service; use signature::{Pubkey, Signature}; use std::mem; use std::net::{SocketAddr, UdpSocket}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::thread::sleep; +use std::thread::{self, sleep, Builder, JoinHandle}; use std::time::Duration; use std::time::Instant; use transaction::Transaction; @@ -18,7 +20,7 @@ use wallet::request_airdrop; pub const RPC_PORT: u16 = 8899; pub struct JsonRpcService { - server: Option, + thread_hdl: JoinHandle<()>, } impl JsonRpcService { @@ -27,42 +29,51 @@ impl JsonRpcService { transactions_addr: SocketAddr, drone_addr: SocketAddr, rpc_addr: SocketAddr, + exit: Arc, ) -> Self { let request_processor = JsonRpcRequestProcessor::new(bank.clone()); - let mut io = MetaIoHandler::default(); - let rpc = RpcSolImpl; - io.extend_with(rpc.to_delegate()); - - let server = ServerBuilder::with_meta_extractor(io, move |_req: &hyper::Request| Meta { - request_processor: request_processor.clone(), - transactions_addr, - drone_addr, - }).threads(4) - .cors(DomainsValidation::AllowOnly(vec![ - AccessControlAllowOrigin::Any, - ])) - .start_http(&rpc_addr); - - let server = match server { - Ok(server) => Some(server), - Err(e) => { - warn!( - "JSON RPC service unavailable: error {}. \n\ - Make sure the RPC port {} is not already in use by another application", - e, - rpc_addr.port() - ); - None - } - }; + let thread_hdl = Builder::new() + .name("solana-jsonrpc".to_string()) + .spawn(move || { + let mut io = MetaIoHandler::default(); + let rpc = RpcSolImpl; + io.extend_with(rpc.to_delegate()); + + let server = + ServerBuilder::with_meta_extractor(io, move |_req: &hyper::Request| Meta { + request_processor: request_processor.clone(), + transactions_addr, + drone_addr, + }).threads(4) + .cors(DomainsValidation::AllowOnly(vec![ + AccessControlAllowOrigin::Any, + ])) + .start_http(&rpc_addr); + if server.is_err() { + warn!("JSON RPC service unavailable: unable to bind to RPC port {}. \nMake sure this port is not already in use by another application", rpc_addr.port()); + return; + } + loop { + if exit.load(Ordering::Relaxed) { + server.unwrap().close(); + break; + } + sleep(Duration::from_millis(100)); + } + () + }) + .unwrap(); + JsonRpcService { thread_hdl } + } +} - JsonRpcService { server } +impl Service for JsonRpcService { + fn thread_hdls(self) -> Vec> { + vec![self.thread_hdl] } - pub fn close(self) { - if let Some(server) = self.server { - server.close(); - } + fn join(self) -> thread::Result<()> { + self.thread_hdl.join() } }