From e1435c84ac5c26d64342e885367beda9c30090ca Mon Sep 17 00:00:00 2001 From: Sylvain Wallez Date: Thu, 19 Dec 2024 10:38:26 +0100 Subject: [PATCH] Add option to gzip compress request bodies (#239) (#241) Co-authored-by: Ryan Eno --- Cargo.lock | 5 +-- elasticsearch/Cargo.toml | 10 ++++-- elasticsearch/src/http/transport.rs | 55 +++++++++++++++++++++-------- 3 files changed, 50 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6678919..dcf77d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -649,6 +649,7 @@ dependencies = [ "clap", "dyn-clone", "failure", + "flate2", "futures", "http", "lazy_static", @@ -740,9 +741,9 @@ dependencies = [ [[package]] name = "flate2" -version = "1.0.33" +version = "1.0.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "324a1be68054ef05ad64b861cc9eaf1d623d2d8cb25b4bf2cb9cdd902b4bf253" +checksum = "c936bfdafb507ebbf50b8074c54fa31c5be9a1e7e5f467dd659697041407d07c" dependencies = [ "crc32fast", "miniz_oxide 0.8.0", diff --git a/elasticsearch/Cargo.toml b/elasticsearch/Cargo.toml index a3eb3d6..223e1f9 100644 --- a/elasticsearch/Cargo.toml +++ b/elasticsearch/Cargo.toml @@ -31,7 +31,10 @@ bytes = "1" dyn-clone = "1" lazy_static = "1" percent-encoding = "2" -reqwest = { version = "0.12", default-features = false, features = ["gzip", "json"] } +reqwest = { version = "0.12", default-features = false, features = [ + "gzip", + "json", +] } url = "2" serde = { version = "1", features = ["derive"] } serde_json = "1" @@ -40,6 +43,7 @@ serde_with = "3" #tokio = { version = "1", default-features = false, features = ["macros", "net", "time", "rt-multi-thread"] } void = "1" +flate2 = "^1.0.34" [target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio] version = "1.0" @@ -48,14 +52,14 @@ features = ["macros", "net", "time", "rt-multi-thread"] [dev-dependencies] chrono = { version = "0.4", features = ["serde"] } -clap = { version = "4", features = ["env"]} +clap = { version = "4", features = ["env"] } failure = "0.1" futures = "0.3" http = "1" axum = "0.7" #hyper = { version = "1", features = ["server", "http1"] } os_type = "2" -regex="1" +regex = "1" #sysinfo = "0.31" textwrap = "0.16" xml-rs = "0.8" diff --git a/elasticsearch/src/http/transport.rs b/elasticsearch/src/http/transport.rs index 42a7136..d44b9b7 100644 --- a/elasticsearch/src/http/transport.rs +++ b/elasticsearch/src/http/transport.rs @@ -18,7 +18,10 @@ */ //! HTTP transport and connection components -#[cfg(all(target_arch = "wasm32", any(feature = "native-tls", feature = "rustls-tls")))] +#[cfg(all( + target_arch = "wasm32", + any(feature = "native-tls", feature = "rustls-tls") +))] compile_error!("TLS features are not compatible with the wasm target"); #[cfg(any(feature = "native-tls", feature = "rustls-tls"))] @@ -30,8 +33,8 @@ use crate::{ error::Error, http::{ headers::{ - HeaderMap, HeaderName, HeaderValue, ACCEPT, AUTHORIZATION, CONTENT_TYPE, - DEFAULT_ACCEPT, DEFAULT_CONTENT_TYPE, DEFAULT_USER_AGENT, USER_AGENT, + HeaderMap, HeaderName, HeaderValue, ACCEPT, AUTHORIZATION, CONTENT_ENCODING, + CONTENT_TYPE, DEFAULT_ACCEPT, DEFAULT_CONTENT_TYPE, DEFAULT_USER_AGENT, USER_AGENT, }, request::Body, response::Response, @@ -40,6 +43,7 @@ use crate::{ }; use base64::{engine::general_purpose::STANDARD as BASE64_STANDARD, write::EncoderWriter, Engine}; use bytes::BytesMut; +use flate2::{write::GzEncoder, Compression}; use lazy_static::lazy_static; use serde::Serialize; use serde_json::Value; @@ -147,6 +151,7 @@ pub struct TransportBuilder { credentials: Option, #[cfg(any(feature = "native-tls", feature = "rustls-tls"))] cert_validation: Option, + request_body_compression: bool, #[cfg(not(target_arch = "wasm32"))] proxy: Option, #[cfg(not(target_arch = "wasm32"))] @@ -172,6 +177,7 @@ impl TransportBuilder { credentials: None, #[cfg(any(feature = "native-tls", feature = "rustls-tls"))] cert_validation: None, + request_body_compression: false, #[cfg(not(target_arch = "wasm32"))] proxy: None, #[cfg(not(target_arch = "wasm32"))] @@ -215,6 +221,12 @@ impl TransportBuilder { self } + /// Gzip compress the body of requests, adds the `Content-Encoding: gzip` header. + pub fn request_body_compression(mut self, enabled: bool) -> Self { + self.request_body_compression = enabled; + self + } + /// Validation applied to the certificate provided to establish a HTTPS connection. /// By default, full validation is applied. When using a self-signed certificate, /// different validation can be applied. @@ -335,6 +347,7 @@ impl TransportBuilder { Ok(Transport { client, conn_pool: self.conn_pool, + request_body_compression: self.request_body_compression, credentials: self.credentials, send_meta: self.meta_header, }) @@ -381,6 +394,7 @@ impl Connection { pub struct Transport { client: reqwest::Client, credentials: Option, + request_body_compression: bool, conn_pool: Arc, send_meta: bool, } @@ -481,8 +495,7 @@ impl Transport { headers: HeaderMap, query_string: Option<&Q>, body: Option, - #[allow(unused_variables)] - timeout: Option, + #[allow(unused_variables)] timeout: Option, ) -> Result where B: Body, @@ -552,7 +565,17 @@ impl Transport { bytes_mut.split().freeze() }; - request_builder = request_builder.body(bytes); + match self.request_body_compression { + true => { + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(&bytes)?; + request_builder = request_builder.body(encoder.finish()?); + request_builder = request_builder.header(CONTENT_ENCODING, "gzip"); + } + false => { + request_builder = request_builder.body(bytes); + } + } }; if let Some(q) = query_string { @@ -589,15 +612,17 @@ impl Transport { let connection = self.conn_pool.next(); // Build node info request - let node_request = self.request_builder( - &connection, - Method::Get, - "_nodes/http?filter_path=nodes.*.http", - HeaderMap::default(), - None::<&()>, - None::<()>, - None, - ).unwrap(); + let node_request = self + .request_builder( + &connection, + Method::Get, + "_nodes/http?filter_path=nodes.*.http", + HeaderMap::default(), + None::<&()>, + None::<()>, + None, + ) + .unwrap(); let scheme = connection.url.scheme(); let resp = node_request.send().await.unwrap();