Skip to content
This repository has been archived by the owner on Sep 4, 2024. It is now read-only.

Commit

Permalink
reuse socket with Mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
raphjaph committed Oct 27, 2022
1 parent 76256ae commit a6b32aa
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 39 deletions.
12 changes: 6 additions & 6 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ use crate::util::HashableValue;
/// An interface for a transport over which to use the JSONRPC protocol.
pub trait Transport: Send + Sync + 'static {
/// Send an RPC request over the transport.
fn send_request(&mut self, _: Request) -> Result<Response, Error>;
fn send_request(&self, _: Request) -> Result<Response, Error>;
/// Send a batch of RPC requests over the transport.
fn send_batch(&mut self, _: &[Request]) -> Result<Vec<Response>, Error>;
fn send_batch(&self, _: &[Request]) -> Result<Vec<Response>, Error>;
/// Format the target of this transport.
/// I.e. the URL/socket/...
fn fmt_target(&self, f: &mut fmt::Formatter) -> fmt::Result;
Expand Down Expand Up @@ -75,7 +75,7 @@ impl Client {
}

/// Sends a request to a client
pub fn send_request(&mut self, request: Request) -> Result<Response, Error> {
pub fn send_request(&self, request: Request) -> Result<Response, Error> {
self.transport.send_request(request)
}

Expand Down Expand Up @@ -124,7 +124,7 @@ impl Client {
/// To construct the arguments, one can use one of the shorthand methods
/// [`crate::arg`] or [`crate::try_arg`].
pub fn call<R: for<'a> serde::de::Deserialize<'a>>(
&mut self,
&self,
method: &str,
args: &[Box<RawValue>],
) -> Result<R, Error> {
Expand Down Expand Up @@ -158,10 +158,10 @@ mod tests {

struct DummyTransport;
impl Transport for DummyTransport {
fn send_request(&mut self, _: Request) -> Result<Response, Error> {
fn send_request(&self, _: Request) -> Result<Response, Error> {
Err(Error::NonceMismatch)
}
fn send_batch(&mut self, _: &[Request]) -> Result<Vec<Response>, Error> {
fn send_batch(&self, _: &[Request]) -> Result<Vec<Response>, Error> {
Ok(vec![])
}
fn fmt_target(&self, _: &mut fmt::Formatter) -> fmt::Result {
Expand Down
75 changes: 45 additions & 30 deletions src/simple_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
#[cfg(feature = "proxy")]
use socks::Socks5Stream;
use std::io::{BufRead, BufReader, Write};
#[cfg(not(feature = "proxy"))]
use std::net::TcpStream;
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use std::{error, fmt, io, net, thread};

Expand All @@ -27,7 +27,7 @@ pub const DEFAULT_PROXY_PORT: u16 = 9050;

/// Simple HTTP transport that implements the necessary subset of HTTP for
/// running a bitcoind RPC client.
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct SimpleHttpTransport {
addr: net::SocketAddr,
path: String,
Expand All @@ -38,7 +38,7 @@ pub struct SimpleHttpTransport {
proxy_addr: net::SocketAddr,
#[cfg(feature = "proxy")]
proxy_auth: Option<(String, String)>,
sock: Option<TcpStream>,
sock: Arc<Mutex<Option<TcpStream>>>,
}

impl Default for SimpleHttpTransport {
Expand All @@ -58,7 +58,7 @@ impl Default for SimpleHttpTransport {
),
#[cfg(feature = "proxy")]
proxy_auth: None,
sock: None,
sock: Arc::new(Mutex::new(None)),
}
}
}
Expand All @@ -74,36 +74,51 @@ impl SimpleHttpTransport {
Builder::new()
}

fn request<R>(&mut self, req: impl serde::Serialize) -> Result<R, Error>
fn request<R>(&self, req: impl serde::Serialize) -> Result<R, Error>
where
R: for<'a> serde::de::Deserialize<'a>,
{
match self.try_request(req) {
Ok(response) => Ok(response),
Err(err) => {
*self.sock.lock().unwrap() = None;
Err(err)
}
}
}

fn try_request<R>(&self, req: impl serde::Serialize) -> Result<R, Error>
where
R: for<'a> serde::de::Deserialize<'a>,
{
// Open connection
let request_deadline = Instant::now() + self.timeout;
#[cfg(feature = "proxy")]
let mut sock = if let Some((username, password)) = &self.proxy_auth {
Socks5Stream::connect_with_password(
self.proxy_addr,
self.addr,
username.as_str(),
password.as_str(),
)?
.into_inner()
} else {
Socks5Stream::connect(self.proxy_addr, self.addr)?.into_inner()
};
let mut sock = self.sock.lock().unwrap();
if sock.is_none() {
#[cfg(feature = "proxy")]
{
*sock = Some(if let Some((username, password)) = &self.proxy_auth {
Socks5Stream::connect_with_password(
self.proxy_addr,
self.addr,
username.as_str(),
password.as_str(),
)?
.into_inner()
} else {
Socks5Stream::connect(self.proxy_addr, self.addr)?.into_inner()
});
}

#[cfg(not(feature = "proxy"))]
if self.sock.is_none() {
let sock = TcpStream::connect_timeout(
&net::SocketAddr::new(net::IpAddr::V4(net::Ipv4Addr::new(127, 0, 0, 1)), 8080),
Duration::from_secs(15),
)?;
sock.set_read_timeout(Some(self.timeout))?;
sock.set_write_timeout(Some(self.timeout))?;
self.sock = Some(sock);
#[cfg(not(feature = "proxy"))]
{
let stream = TcpStream::connect_timeout(&self.addr, Duration::from_secs(15))?;
stream.set_read_timeout(Some(self.timeout))?;
stream.set_write_timeout(Some(self.timeout))?;
*sock = Some(stream);
}
}
let sock = self.sock.as_mut().unwrap();
let sock = sock.as_mut().unwrap();

// Serialize the body first so we can set the Content-Length header.
let body = serde_json::to_vec(&req)?;
Expand Down Expand Up @@ -346,11 +361,11 @@ fn check_url(url: &str) -> Result<(SocketAddr, String), Error> {
}

impl Transport for SimpleHttpTransport {
fn send_request(&mut self, req: Request) -> Result<Response, crate::Error> {
fn send_request(&self, req: Request) -> Result<Response, crate::Error> {
Ok(self.request(req)?)
}

fn send_batch(&mut self, reqs: &[Request]) -> Result<Vec<Response>, crate::Error> {
fn send_batch(&self, reqs: &[Request]) -> Result<Vec<Response>, crate::Error> {
Ok(self.request(reqs)?)
}

Expand All @@ -360,7 +375,7 @@ impl Transport for SimpleHttpTransport {
}

/// Builder for simple bitcoind `SimpleHttpTransport`s
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct Builder {
tp: SimpleHttpTransport,
}
Expand Down
6 changes: 3 additions & 3 deletions src/simple_tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ impl TcpTransport {
}

impl Transport for TcpTransport {
fn send_request(&mut self, req: Request) -> Result<Response, crate::Error> {
fn send_request(&self, req: Request) -> Result<Response, crate::Error> {
Ok(self.request(req)?)
}

fn send_batch(&mut self, reqs: &[Request]) -> Result<Vec<Response>, crate::Error> {
fn send_batch(&self, reqs: &[Request]) -> Result<Vec<Response>, crate::Error> {
Ok(self.request(reqs)?)
}

Expand Down Expand Up @@ -151,7 +151,7 @@ mod tests {
addr,
timeout: Some(time::Duration::from_secs(5)),
};
let mut client = Client::with_transport(transport);
let client = Client::with_transport(transport);

client.send_request(dummy_req.clone()).unwrap()
});
Expand Down

0 comments on commit a6b32aa

Please sign in to comment.