Skip to content
This repository has been archived by the owner on Sep 4, 2024. It is now read-only.

Commit

Permalink
simple_http: store the BufReader rather than bare socket
Browse files Browse the repository at this point in the history
When we create/destroy BufReaders we potentially lose data on the socket.
Avoid doing this by keeping the BufReader permanently.
  • Loading branch information
apoelstra committed Nov 26, 2022
1 parent 8d2ec83 commit 20da231
Showing 1 changed file with 29 additions and 29 deletions.
58 changes: 29 additions & 29 deletions src/simple_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

#[cfg(feature = "proxy")]
use socks::Socks5Stream;
use std::io::{BufRead, BufReader, Read, Write};
use std::io::{BufRead, BufReader, BufWriter, Read, Write};
#[cfg(not(fuzzing))]
use std::net::TcpStream;
use std::net::{SocketAddr, ToSocketAddrs};
Expand Down Expand Up @@ -82,7 +82,7 @@ pub struct SimpleHttpTransport {
proxy_addr: net::SocketAddr,
#[cfg(feature = "proxy")]
proxy_auth: Option<(String, String)>,
sock: Arc<Mutex<Option<TcpStream>>>,
sock: Arc<Mutex<Option<BufReader<TcpStream>>>>,
}

impl Default for SimpleHttpTransport {
Expand Down Expand Up @@ -147,7 +147,7 @@ impl SimpleHttpTransport {
// No part of this codebase should panic, so unwrapping a mutex lock is fine
let mut sock = self.sock.lock().expect("poisoned mutex");
if sock.is_none() {
*sock = Some({
*sock = Some(BufReader::new({
#[cfg(feature = "proxy")]
{
if let Some((username, password)) = &self.proxy_auth {
Expand All @@ -170,39 +170,39 @@ impl SimpleHttpTransport {
stream.set_write_timeout(Some(self.timeout))?;
stream
}
})
}));
};
// In the immediately preceding block, we made sure that `sock` is non-`None`,
// so unwrapping here is fine.
let sock = sock.as_mut().unwrap();
let mut sock = sock.as_mut().unwrap();

// Serialize the body first so we can set the Content-Length header.
let body = serde_json::to_vec(&req)?;

// Send HTTP request
sock.write_all(b"POST ")?;
sock.write_all(self.path.as_bytes())?;
sock.write_all(b" HTTP/1.1\r\n")?;
// Write headers
sock.write_all(b"Content-Type: application/json\r\n")?;
sock.write_all(b"Content-Length: ")?;
sock.write_all(body.len().to_string().as_bytes())?;
sock.write_all(b"\r\n")?;
if let Some(ref auth) = self.basic_auth {
sock.write_all(b"Authorization: ")?;
sock.write_all(auth.as_ref())?;
{
let mut sock = BufWriter::new(sock.get_ref());
sock.write_all(b"POST ")?;
sock.write_all(self.path.as_bytes())?;
sock.write_all(b" HTTP/1.1\r\n")?;
// Write headers
sock.write_all(b"Content-Type: application/json\r\n")?;
sock.write_all(b"Content-Length: ")?;
sock.write_all(body.len().to_string().as_bytes())?;
sock.write_all(b"\r\n")?;
if let Some(ref auth) = self.basic_auth {
sock.write_all(b"Authorization: ")?;
sock.write_all(auth.as_ref())?;
sock.write_all(b"\r\n")?;
}
// Write body
sock.write_all(b"\r\n")?;
sock.write_all(&body)?;
sock.flush()?;
}
// Write body
sock.write_all(b"\r\n")?;
sock.write_all(&body)?;
sock.flush()?;

// Receive response
let mut reader = BufReader::new(sock);

// Parse first HTTP response header line
let http_response = get_line(&mut reader, Instant::now() + self.timeout)?;
let http_response = get_line(&mut sock, Instant::now() + self.timeout)?;
if http_response.len() < 12 {
return Err(Error::HttpResponseTooShort { actual: http_response.len(), needed: 12 });
}
Expand All @@ -226,7 +226,7 @@ impl SimpleHttpTransport {
// Parse response header fields
let mut content_length = None;
loop {
let mut line = get_line(&mut reader, request_deadline)?;
let mut line = get_line(&mut sock, request_deadline)?;
if line == "\r\n" {
break;
}
Expand Down Expand Up @@ -254,8 +254,8 @@ impl SimpleHttpTransport {
let buffer = match content_length {
None => {
let mut buffer = Vec::with_capacity(INITIAL_RESP_ALLOC as usize);
// `take` consumes `reader` and drops it, unlocking the mutex
reader.take(FINAL_RESP_ALLOC).read_to_end(&mut buffer)?;
// `take` consumes `sock` and drops it, unlocking the mutex
sock.take(FINAL_RESP_ALLOC).read_to_end(&mut buffer)?;
buffer
},
Some(n) if n > FINAL_RESP_ALLOC => {
Expand All @@ -266,8 +266,8 @@ impl SimpleHttpTransport {
},
Some(n) => {
let mut buffer = Vec::with_capacity(INITIAL_RESP_ALLOC as usize);
// `take` consumes `reader` and drops it, unlocking the mutex
let n_read = reader.take(n).read_to_end(&mut buffer)? as u64;
// `take` consumes `sock` and drops it, unlocking the mutex
let n_read = sock.take(n).read_to_end(&mut buffer)? as u64;
if n_read < n {
return Err(Error::IncompleteResponse { content_length: n, n_read });
}
Expand Down

0 comments on commit 20da231

Please sign in to comment.