From 20da23159a591a41d5996a675184d41107b55343 Mon Sep 17 00:00:00 2001 From: Andrew Poelstra Date: Sat, 26 Nov 2022 14:33:05 +0000 Subject: [PATCH] simple_http: store the BufReader rather than bare socket When we create/destroy BufReaders we potentially lose data on the socket. Avoid doing this by keeping the BufReader permanently. --- src/simple_http.rs | 58 +++++++++++++++++++++++----------------------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/src/simple_http.rs b/src/simple_http.rs index 93debf44..eb68b6b1 100644 --- a/src/simple_http.rs +++ b/src/simple_http.rs @@ -5,7 +5,7 @@ #[cfg(feature = "proxy")] use socks::Socks5Stream; -use std::io::{BufRead, BufReader, Read, Write}; +use std::io::{BufRead, BufReader, BufWriter, Read, Write}; #[cfg(not(fuzzing))] use std::net::TcpStream; use std::net::{SocketAddr, ToSocketAddrs}; @@ -82,7 +82,7 @@ pub struct SimpleHttpTransport { proxy_addr: net::SocketAddr, #[cfg(feature = "proxy")] proxy_auth: Option<(String, String)>, - sock: Arc>>, + sock: Arc>>>, } impl Default for SimpleHttpTransport { @@ -147,7 +147,7 @@ impl SimpleHttpTransport { // No part of this codebase should panic, so unwrapping a mutex lock is fine let mut sock = self.sock.lock().expect("poisoned mutex"); if sock.is_none() { - *sock = Some({ + *sock = Some(BufReader::new({ #[cfg(feature = "proxy")] { if let Some((username, password)) = &self.proxy_auth { @@ -170,39 +170,39 @@ impl SimpleHttpTransport { stream.set_write_timeout(Some(self.timeout))?; stream } - }) + })); }; // In the immediately preceding block, we made sure that `sock` is non-`None`, // so unwrapping here is fine. - let sock = sock.as_mut().unwrap(); + let mut sock = sock.as_mut().unwrap(); // Serialize the body first so we can set the Content-Length header. let body = serde_json::to_vec(&req)?; // Send HTTP request - sock.write_all(b"POST ")?; - sock.write_all(self.path.as_bytes())?; - sock.write_all(b" HTTP/1.1\r\n")?; - // Write headers - sock.write_all(b"Content-Type: application/json\r\n")?; - sock.write_all(b"Content-Length: ")?; - sock.write_all(body.len().to_string().as_bytes())?; - sock.write_all(b"\r\n")?; - if let Some(ref auth) = self.basic_auth { - sock.write_all(b"Authorization: ")?; - sock.write_all(auth.as_ref())?; + { + let mut sock = BufWriter::new(sock.get_ref()); + sock.write_all(b"POST ")?; + sock.write_all(self.path.as_bytes())?; + sock.write_all(b" HTTP/1.1\r\n")?; + // Write headers + sock.write_all(b"Content-Type: application/json\r\n")?; + sock.write_all(b"Content-Length: ")?; + sock.write_all(body.len().to_string().as_bytes())?; sock.write_all(b"\r\n")?; + if let Some(ref auth) = self.basic_auth { + sock.write_all(b"Authorization: ")?; + sock.write_all(auth.as_ref())?; + sock.write_all(b"\r\n")?; + } + // Write body + sock.write_all(b"\r\n")?; + sock.write_all(&body)?; + sock.flush()?; } - // Write body - sock.write_all(b"\r\n")?; - sock.write_all(&body)?; - sock.flush()?; - - // Receive response - let mut reader = BufReader::new(sock); // Parse first HTTP response header line - let http_response = get_line(&mut reader, Instant::now() + self.timeout)?; + let http_response = get_line(&mut sock, Instant::now() + self.timeout)?; if http_response.len() < 12 { return Err(Error::HttpResponseTooShort { actual: http_response.len(), needed: 12 }); } @@ -226,7 +226,7 @@ impl SimpleHttpTransport { // Parse response header fields let mut content_length = None; loop { - let mut line = get_line(&mut reader, request_deadline)?; + let mut line = get_line(&mut sock, request_deadline)?; if line == "\r\n" { break; } @@ -254,8 +254,8 @@ impl SimpleHttpTransport { let buffer = match content_length { None => { let mut buffer = Vec::with_capacity(INITIAL_RESP_ALLOC as usize); - // `take` consumes `reader` and drops it, unlocking the mutex - reader.take(FINAL_RESP_ALLOC).read_to_end(&mut buffer)?; + // `take` consumes `sock` and drops it, unlocking the mutex + sock.take(FINAL_RESP_ALLOC).read_to_end(&mut buffer)?; buffer }, Some(n) if n > FINAL_RESP_ALLOC => { @@ -266,8 +266,8 @@ impl SimpleHttpTransport { }, Some(n) => { let mut buffer = Vec::with_capacity(INITIAL_RESP_ALLOC as usize); - // `take` consumes `reader` and drops it, unlocking the mutex - let n_read = reader.take(n).read_to_end(&mut buffer)? as u64; + // `take` consumes `sock` and drops it, unlocking the mutex + let n_read = sock.take(n).read_to_end(&mut buffer)? as u64; if n_read < n { return Err(Error::IncompleteResponse { content_length: n, n_read }); }