From 8cf927189c593b90c627f27e540bfa9a73521d1d Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 4 Apr 2024 16:02:37 +0800 Subject: [PATCH 1/8] refactor(metrics): replace hyper implementation with axum --- Cargo.lock | 31 ++-------- Cargo.toml | 1 + src/common/common_service/Cargo.toml | 3 +- .../common_service/src/metrics_manager.rs | 62 +++++++++---------- src/meta/Cargo.toml | 2 +- src/meta/dashboard/Cargo.toml | 2 +- 6 files changed, 39 insertions(+), 62 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1043302b89b89..4e5c2ebcf0d55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5207,12 +5207,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "http-range-header" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "add0ab9360ddbd88cfeb3bd9574a1d85cfdfa14db10b3e21d3700dbc4328758f" - [[package]] name = "http-range-header" version = "0.4.0" @@ -9419,6 +9413,7 @@ name = "risingwave_common_service" version = "1.7.0-alpha" dependencies = [ "async-trait", + "axum 0.7.4", "futures", "hyper 0.14.27", "madsim-tokio", @@ -9430,7 +9425,7 @@ dependencies = [ "thiserror", "thiserror-ext", "tower", - "tower-http 0.4.4", + "tower-http", "tracing", "workspace-hack", ] @@ -10134,7 +10129,7 @@ dependencies = [ "tokio-retry", "tokio-stream", "tower", - "tower-http 0.5.2", + "tower-http", "tracing", "url", "uuid", @@ -13145,24 +13140,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "tower-http" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" -dependencies = [ - "bitflags 2.5.0", - "bytes", - "futures-core", - "futures-util", - "http 0.2.9", - "http-body 0.4.5", - "http-range-header 0.3.1", - "pin-project-lite", - "tower-layer", - "tower-service", -] - [[package]] name = "tower-http" version = "0.5.2" @@ -13177,7 +13154,7 @@ dependencies = [ "http 1.0.0", "http-body 1.0.0", "http-body-util", - "http-range-header 0.4.0", + "http-range-header", "httpdate", "mime", "mime_guess", diff --git a/Cargo.toml b/Cargo.toml index 6128eabfeddeb..67aa014821134 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -113,6 +113,7 @@ aws-smithy-types = { version = "1", default-features = false, features = [ ] } aws-endpoint = "0.60" aws-types = "1" +axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain etcd-client = { package = "madsim-etcd-client", version = "0.4" } futures-async-stream = "0.2.9" hytra = "0.1" diff --git a/src/common/common_service/Cargo.toml b/src/common/common_service/Cargo.toml index 4f39105be207a..f2fa832eaad55 100644 --- a/src/common/common_service/Cargo.toml +++ b/src/common/common_service/Cargo.toml @@ -16,6 +16,7 @@ normal = ["workspace-hack"] [dependencies] async-trait = "0.1" +axum = { workspace = true } futures = { version = "0.3", default-features = false, features = ["alloc"] } hyper = "0.14" # required by tonic prometheus = { version = "0.13" } @@ -27,7 +28,7 @@ thiserror-ext = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio", features = ["rt", "rt-multi-thread", "sync", "macros", "time", "signal"] } tonic = { workspace = true } tower = { version = "0.4", features = ["util", "load-shed"] } -tower-http = { version = "0.4", features = ["add-extension", "cors"] } +tower-http = { version = "0.5", features = ["add-extension"] } tracing = "0.1" [target.'cfg(not(madsim))'.dependencies] diff --git a/src/common/common_service/src/metrics_manager.rs b/src/common/common_service/src/metrics_manager.rs index 2a284294cf7df..f6d31c98ef31b 100644 --- a/src/common/common_service/src/metrics_manager.rs +++ b/src/common/common_service/src/metrics_manager.rs @@ -12,16 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::net::SocketAddr; use std::ops::Deref; use std::sync::OnceLock; -use hyper::{Body, Request, Response}; +use axum::body::Body; +use axum::response::{IntoResponse, Response}; +use axum::{Extension, Router}; use prometheus::{Encoder, Registry, TextEncoder}; use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY; use thiserror_ext::AsReport; -use tower::make::Shared; -use tower::ServiceBuilder; +use tokio::net::TcpListener; use tower_http::add_extension::AddExtensionLayer; use tracing::{error, info, warn}; @@ -31,28 +31,28 @@ impl MetricsManager { pub fn boot_metrics_service(listen_addr: String) { static METRICS_SERVICE_LISTEN_ADDR: OnceLock = OnceLock::new(); let new_listen_addr = listen_addr.clone(); - let current_listen_addr = METRICS_SERVICE_LISTEN_ADDR.get_or_init(|| { - let listen_addr_clone = listen_addr.clone(); - tokio::spawn(async move { - info!( - "Prometheus listener for Prometheus is set up on http://{}", - listen_addr - ); - let listen_socket_addr: SocketAddr = listen_addr.parse().unwrap(); - let service = ServiceBuilder::new() - .layer(AddExtensionLayer::new( - GLOBAL_METRICS_REGISTRY.deref().clone(), - )) - .service_fn(Self::metrics_service); - // TODO: use axum server - let serve_future = - hyper::Server::bind(&listen_socket_addr).serve(Shared::new(service)); - if let Err(err) = serve_future.await { - error!(error = %err.as_report(), "metrics service exited with error"); - } + let current_listen_addr = + METRICS_SERVICE_LISTEN_ADDR.get_or_init(|| { + let listen_addr_clone = listen_addr.clone(); + #[cfg(not(madsim))] // no need in simulation test + tokio::spawn(async move { + info!( + "Prometheus listener for Prometheus is set up on http://{}", + listen_addr + ); + + let service = Router::new().fallback(Self::metrics_service).layer( + AddExtensionLayer::new(GLOBAL_METRICS_REGISTRY.deref().clone()), + ); + + let serve_future = + axum::serve(TcpListener::bind(&listen_addr).await.unwrap(), service); + if let Err(err) = serve_future.await { + error!(error = %err.as_report(), "metrics service exited with error"); + } + }); + listen_addr_clone }); - listen_addr_clone - }); if new_listen_addr != *current_listen_addr { warn!( "unable to listen port {} for metrics service. Currently listening on {}", @@ -62,17 +62,15 @@ impl MetricsManager { } #[expect(clippy::unused_async, reason = "required by service_fn")] - async fn metrics_service(req: Request) -> Result, hyper::Error> { - let registry = req.extensions().get::().unwrap(); + async fn metrics_service(Extension(registry): Extension) -> impl IntoResponse { let encoder = TextEncoder::new(); let mut buffer = vec![]; let mf = registry.gather(); encoder.encode(&mf, &mut buffer).unwrap(); - let response = Response::builder() - .header(hyper::header::CONTENT_TYPE, encoder.format_type()) - .body(Body::from(buffer)) - .unwrap(); - Ok(response) + Response::builder() + .header(axum::http::header::CONTENT_TYPE, encoder.format_type()) + .body(Body::from(buffer)) + .unwrap() } } diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index 29c1c9520cf52..fb757462adbc5 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -85,7 +85,7 @@ url = "2" uuid = { version = "1", features = ["v4"] } [target.'cfg(not(madsim))'.dependencies] -axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain +axum = { workspace = true } tower-http = { version = "0.5", features = [ "add-extension", "cors", diff --git a/src/meta/dashboard/Cargo.toml b/src/meta/dashboard/Cargo.toml index e848c8015cbb1..9c7e59fab315c 100644 --- a/src/meta/dashboard/Cargo.toml +++ b/src/meta/dashboard/Cargo.toml @@ -10,7 +10,7 @@ repository = { workspace = true } [dependencies] anyhow = "1" -axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain +axum = { workspace = true } axum-embed = "0.1" bytes = "1" hyper = "1" From 86c3adf975dd6152c6c03f7d1d12f631e2698c25 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 4 Apr 2024 16:12:46 +0800 Subject: [PATCH 2/8] remove hyper dependency on dashboard proxy Signed-off-by: Bugen Zhao --- Cargo.lock | 1 - src/meta/dashboard/Cargo.toml | 1 - src/meta/dashboard/src/proxy.rs | 20 +++++++++----------- 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4e5c2ebcf0d55..f4b1716d1a20d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10146,7 +10146,6 @@ dependencies = [ "bytes", "cargo-emit", "dircpy", - "hyper 1.2.0", "mime_guess", "npm_rs", "reqwest 0.12.2", diff --git a/src/meta/dashboard/Cargo.toml b/src/meta/dashboard/Cargo.toml index 9c7e59fab315c..a0bd8e96b0b85 100644 --- a/src/meta/dashboard/Cargo.toml +++ b/src/meta/dashboard/Cargo.toml @@ -13,7 +13,6 @@ anyhow = "1" axum = { workspace = true } axum-embed = "0.1" bytes = "1" -hyper = "1" mime_guess = "2" reqwest = "0.12.2" rust-embed = { version = "8", features = ["interpolate-folder-path", "mime-guess"] } diff --git a/src/meta/dashboard/src/proxy.rs b/src/meta/dashboard/src/proxy.rs index f63a4f63c5fe3..3dc7ac2625241 100644 --- a/src/meta/dashboard/src/proxy.rs +++ b/src/meta/dashboard/src/proxy.rs @@ -16,12 +16,10 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex}; use anyhow::anyhow; -use axum::http::StatusCode; +use axum::http::{header, HeaderMap, StatusCode, Uri}; use axum::response::{IntoResponse, Response}; use axum::Router; use bytes::Bytes; -use hyper::header::CONTENT_TYPE; -use hyper::{HeaderMap, Uri}; use thiserror_ext::AsReport as _; use url::Url; @@ -37,21 +35,21 @@ impl IntoResponse for CachedResponse { fn into_response(self) -> Response { let guess = mime_guess::from_path(self.uri.path()); let mut headers = HeaderMap::new(); - if let Some(x) = self.headers.get(hyper::header::ETAG) { - headers.insert(hyper::header::ETAG, x.clone()); + if let Some(x) = self.headers.get(header::ETAG) { + headers.insert(header::ETAG, x.clone()); } - if let Some(x) = self.headers.get(hyper::header::CACHE_CONTROL) { - headers.insert(hyper::header::CACHE_CONTROL, x.clone()); + if let Some(x) = self.headers.get(header::CACHE_CONTROL) { + headers.insert(header::CACHE_CONTROL, x.clone()); } - if let Some(x) = self.headers.get(hyper::header::EXPIRES) { - headers.insert(hyper::header::EXPIRES, x.clone()); + if let Some(x) = self.headers.get(header::EXPIRES) { + headers.insert(header::EXPIRES, x.clone()); } if let Some(x) = guess.first() { if x.type_() == "image" && x.subtype() == "svg" { - headers.insert(CONTENT_TYPE, "image/svg+xml".parse().unwrap()); + headers.insert(header::CONTENT_TYPE, "image/svg+xml".parse().unwrap()); } else { headers.insert( - CONTENT_TYPE, + header::CONTENT_TYPE, format!("{}/{}", x.type_(), x.subtype()).parse().unwrap(), ); } From 5b2825b3f96dc107d7a082ffe75aeae78763287c Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 4 Apr 2024 16:47:53 +0800 Subject: [PATCH 3/8] doris Signed-off-by: Bugen Zhao --- src/connector/src/sink/doris.rs | 46 ++++++++++++--------------------- 1 file changed, 17 insertions(+), 29 deletions(-) diff --git a/src/connector/src/sink/doris.rs b/src/connector/src/sink/doris.rs index be3976f30fd60..b6b869fcb4141 100644 --- a/src/connector/src/sink/doris.rs +++ b/src/connector/src/sink/doris.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -// TODO: use hyper 1 or reqwest 0.12.2 - use std::collections::HashMap; use std::sync::Arc; @@ -22,9 +20,6 @@ use async_trait::async_trait; use base64::engine::general_purpose; use base64::Engine; use bytes::{BufMut, Bytes, BytesMut}; -use hyper::body::Body; -use hyper::{body, Client, Request}; -use hyper_tls::HttpsConnector; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; @@ -433,14 +428,14 @@ impl DorisSchemaClient { pub async fn get_schema_from_doris(&self) -> Result { let uri = format!("{}/api/{}/{}/_schema", self.url, self.db, self.table); - let builder = Request::get(uri); - let connector = HttpsConnector::new(); - let client = Client::builder() + let client = reqwest::Client::builder() .pool_idle_timeout(POOL_IDLE_TIMEOUT) - .build(connector); + .build() + .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; - let request = builder + let response = client + .get(uri) .header( "Authorization", format!( @@ -448,31 +443,24 @@ impl DorisSchemaClient { general_purpose::STANDARD.encode(format!("{}:{}", self.user, self.password)) ), ) - .body(Body::empty()) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; - - let response = client - .request(request) + .send() .await .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; - let raw_bytes = String::from_utf8(match body::to_bytes(response.into_body()).await { - Ok(bytes) => bytes.to_vec(), - Err(err) => return Err(SinkError::DorisStarrocksConnect(err.into())), - }) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; - - let json_map: HashMap = serde_json::from_str(&raw_bytes) + let json: Value = response + .json() + .await .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; - let json_data = if json_map.contains_key("code") && json_map.contains_key("msg") { - let data = json_map.get("data").ok_or_else(|| { - SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't find data")) - })?; - data.to_string() + let json_data = if json.get("code").is_some() && json.get("msg").is_some() { + json.get("data") + .ok_or_else(|| { + SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't find data")) + })? + .clone() } else { - raw_bytes + json }; - let schema: DorisSchema = serde_json::from_str(&json_data) + let schema: DorisSchema = serde_json::from_value(json_data) .context("Can't get schema from json") .map_err(SinkError::DorisStarrocksConnect)?; Ok(schema) From 7b59b918369956643ec23bb0275542837efe0cf8 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 4 Apr 2024 17:23:51 +0800 Subject: [PATCH 4/8] doris starrocks connector Signed-off-by: Bugen Zhao --- Cargo.lock | 17 +++- src/connector/Cargo.toml | 3 +- .../src/sink/doris_starrocks_connector.rs | 92 +++++++------------ 3 files changed, 51 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f4b1716d1a20d..9add4ee34a56c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8878,7 +8878,7 @@ dependencies = [ "url", "wasm-bindgen", "wasm-bindgen-futures", - "wasm-streams", + "wasm-streams 0.3.0", "web-sys", "webpki-roots", "winreg", @@ -8919,10 +8919,12 @@ dependencies = [ "system-configuration", "tokio", "tokio-native-tls", + "tokio-util", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams 0.4.0", "web-sys", "winreg", ] @@ -13793,6 +13795,19 @@ dependencies = [ "web-sys", ] +[[package]] +name = "wasm-streams" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b65dc4c90b63b118468cf747d8bf3566c1913ef60be765b5730ead9e0a3ba129" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasmparser" version = "0.201.0" diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index e8b3b9c9d131f..840510ed4b20e 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -65,7 +65,6 @@ hyper = { version = "0.14", features = [ "tcp", "http1", "http2", - "stream", ] } # required by clickhouse client hyper-tls = "0.5" icelake = { workspace = true } @@ -111,7 +110,7 @@ rdkafka = { workspace = true, features = [ ] } redis = { version = "0.25", features = ["aio", "tokio-comp", "async-std-comp"] } regex = "1.4" -reqwest = { version = "0.12.2", features = ["json"] } +reqwest = { version = "0.12.2", features = ["json", "stream"] } risingwave_common = { workspace = true } risingwave_common_estimate_size = { workspace = true } risingwave_jni_core = { workspace = true } diff --git a/src/connector/src/sink/doris_starrocks_connector.rs b/src/connector/src/sink/doris_starrocks_connector.rs index 04d591b820786..15b19bac8b89f 100644 --- a/src/connector/src/sink/doris_starrocks_connector.rs +++ b/src/connector/src/sink/doris_starrocks_connector.rs @@ -15,16 +15,15 @@ use core::mem; use core::time::Duration; use std::collections::HashMap; +use std::convert::Infallible; use anyhow::Context; use base64::engine::general_purpose; use base64::Engine; use bytes::{BufMut, Bytes, BytesMut}; -use http::request::Builder; -use hyper::body::{Body, Sender}; -use hyper::client::HttpConnector; -use hyper::{body, Client, Request, StatusCode}; -use hyper_tls::HttpsConnector; +use futures::StreamExt; +use reqwest::{Body, Client, RequestBuilder, StatusCode}; +use tokio::sync::mpsc::UnboundedSender; use tokio::task::JoinHandle; use url::Url; @@ -187,31 +186,23 @@ impl InserterInnerBuilder { }) } - // TODO: use hyper 1 or reqwest 0.12.2 - fn build_request_and_client( - &self, - uri: String, - ) -> (Builder, Client>) { - let mut builder = Request::put(uri); - for (k, v) in &self.header { - builder = builder.header(k, v); - } - - let connector = HttpsConnector::new(); + fn build_request_and_client(&self, uri: String) -> RequestBuilder { let client = Client::builder() .pool_idle_timeout(POOL_IDLE_TIMEOUT) - .build(connector); + .build() + .unwrap(); - (builder, client) + let mut builder = client.put(uri); + for (k, v) in &self.header { + builder = builder.header(k, v); + } + builder } pub async fn build(&self) -> Result { - let (builder, client) = self.build_request_and_client(self.url.clone()); - let request_get_url = builder - .body(Body::empty()) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; - let resp = client - .request(request_get_url) + let builder = self.build_request_and_client(self.url.clone()); + let resp = builder + .send() .await .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; let mut be_url = if resp.status() == StatusCode::TEMPORARY_REDIRECT { @@ -249,23 +240,25 @@ impl InserterInnerBuilder { } } - let (builder, client) = self.build_request_and_client(be_url); - let (sender, body) = Body::channel(); - let request = builder - .body(body) - .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; - let future = client.request(request); + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); + let body = Body::wrap_stream( + tokio_stream::wrappers::UnboundedReceiverStream::new(receiver).map(Ok::<_, Infallible>), + ); + let builder = self.build_request_and_client(be_url).body(body); let handle: JoinHandle>> = tokio::spawn(async move { - let response = future + let response = builder + .send() .await .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; let status = response.status(); - let raw = body::to_bytes(response.into_body()) + let raw = response + .bytes() .await .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))? - .to_vec(); - if status == StatusCode::OK && !raw.is_empty() { + .into(); + + if status == StatusCode::OK { Ok(raw) } else { Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( @@ -280,6 +273,8 @@ impl InserterInnerBuilder { } } +type Sender = UnboundedSender; + pub struct InserterInner { sender: Option, join_handle: Option>>>, @@ -301,37 +296,18 @@ impl InserterInner { let chunk = mem::replace(&mut self.buffer, BytesMut::with_capacity(BUFFER_SIZE)); - let is_timed_out = match tokio::time::timeout( - SEND_CHUNK_TIMEOUT, - self.sender.as_mut().unwrap().send_data(chunk.into()), - ) - .await - { - Ok(Ok(_)) => return Ok(()), - Ok(Err(_)) => false, - Err(_) => true, - }; - self.abort()?; + if let Err(_e) = self.sender.as_mut().unwrap().send(chunk.freeze()) { + self.sender.take(); + self.wait_handle().await?; - let res = self.wait_handle().await; - - if is_timed_out { - Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!("timeout"))) - } else { - res?; Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( "channel closed" ))) + } else { + Ok(()) } } - fn abort(&mut self) -> Result<()> { - if let Some(sender) = self.sender.take() { - sender.abort(); - } - Ok(()) - } - pub async fn write(&mut self, data: Bytes) -> Result<()> { self.buffer.put_slice(&data); if self.buffer.len() >= MIN_CHUNK_SIZE { From 48e38c0c2afdb8d026d91b33ed8ba06f81547ee8 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 4 Apr 2024 17:27:03 +0800 Subject: [PATCH 5/8] use default client for clickhouse client Signed-off-by: Bugen Zhao --- Cargo.lock | 2 -- src/connector/Cargo.toml | 7 ------- src/connector/src/sink/clickhouse.rs | 13 ++----------- 3 files changed, 2 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9add4ee34a56c..f28a0d74bd961 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9570,8 +9570,6 @@ dependencies = [ "glob", "google-cloud-pubsub", "http 0.2.9", - "hyper 0.14.27", - "hyper-tls 0.5.0", "icelake", "indexmap 1.9.3", "itertools 0.12.1", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 840510ed4b20e..511513f206ecd 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -60,13 +60,6 @@ gcp-bigquery-client = "0.18.0" glob = "0.3" google-cloud-pubsub = "0.23" http = "0.2" -hyper = { version = "0.14", features = [ - "client", - "tcp", - "http1", - "http2", -] } # required by clickhouse client -hyper-tls = "0.5" icelake = { workspace = true } indexmap = { version = "1.9.3", features = ["serde"] } itertools = "0.12" diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index 16407a8f68c6c..72d9edceda9cd 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -11,9 +11,9 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + use core::fmt::Debug; use std::collections::{HashMap, HashSet}; -use std::time::Duration; use anyhow::anyhow; use clickhouse::insert::Insert; @@ -190,18 +190,9 @@ impl ClickHouseEngine { } } -const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(5); - impl ClickHouseCommon { pub(crate) fn build_client(&self) -> ConnectorResult { - use hyper_tls::HttpsConnector; - - let https = HttpsConnector::new(); - let client = hyper::Client::builder() - .pool_idle_timeout(POOL_IDLE_TIMEOUT) - .build::<_, hyper::Body>(https); - - let client = ClickHouseClient::with_http_client(client) + let client = ClickHouseClient::default() // hyper(0.14) client inside .with_url(&self.url) .with_user(&self.user) .with_password(&self.password) From 9dc3caa07596106d84dcd3ec1075656a79069165 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 9 Apr 2024 14:21:12 +0800 Subject: [PATCH 6/8] bump clickhouse fork rev Signed-off-by: Bugen Zhao --- Cargo.lock | 4 ++-- src/connector/Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f28a0d74bd961..4a40f55eb2b1f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2231,7 +2231,7 @@ checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" [[package]] name = "clickhouse" version = "0.11.5" -source = "git+https://github.com/risingwavelabs/clickhouse.rs?rev=622501c1c98c80baaf578c716d6903dde947804e#622501c1c98c80baaf578c716d6903dde947804e" +source = "git+https://github.com/risingwavelabs/clickhouse.rs?rev=d38c8b6391af098b724c114e5a4746aedab6ab8e#d38c8b6391af098b724c114e5a4746aedab6ab8e" dependencies = [ "bstr", "bytes", @@ -2253,7 +2253,7 @@ dependencies = [ [[package]] name = "clickhouse-derive" version = "0.1.1" -source = "git+https://github.com/risingwavelabs/clickhouse.rs?rev=622501c1c98c80baaf578c716d6903dde947804e#622501c1c98c80baaf578c716d6903dde947804e" +source = "git+https://github.com/risingwavelabs/clickhouse.rs?rev=d38c8b6391af098b724c114e5a4746aedab6ab8e#d38c8b6391af098b724c114e5a4746aedab6ab8e" dependencies = [ "proc-macro2", "quote", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 511513f206ecd..05ed73befaa35 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -46,7 +46,7 @@ chrono = { version = "0.4", default-features = false, features = [ "clock", "std", ] } -clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "622501c1c98c80baaf578c716d6903dde947804e", features = [ +clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "d38c8b6391af098b724c114e5a4746aedab6ab8e", features = [ "time", ] } csv = "1.3" From f43dcb19fc84a81ccf2c32f7ee7c6c0f47ff02a0 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 11 Apr 2024 13:43:49 +0800 Subject: [PATCH 7/8] fix redirection Signed-off-by: Bugen Zhao --- src/connector/src/sink/doris_starrocks_connector.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/connector/src/sink/doris_starrocks_connector.rs b/src/connector/src/sink/doris_starrocks_connector.rs index 15b19bac8b89f..6c045c63beb47 100644 --- a/src/connector/src/sink/doris_starrocks_connector.rs +++ b/src/connector/src/sink/doris_starrocks_connector.rs @@ -22,7 +22,7 @@ use base64::engine::general_purpose; use base64::Engine; use bytes::{BufMut, Bytes, BytesMut}; use futures::StreamExt; -use reqwest::{Body, Client, RequestBuilder, StatusCode}; +use reqwest::{redirect, Body, Client, RequestBuilder, StatusCode}; use tokio::sync::mpsc::UnboundedSender; use tokio::task::JoinHandle; use url::Url; @@ -186,9 +186,10 @@ impl InserterInnerBuilder { }) } - fn build_request_and_client(&self, uri: String) -> RequestBuilder { + fn build_request(&self, uri: String) -> RequestBuilder { let client = Client::builder() .pool_idle_timeout(POOL_IDLE_TIMEOUT) + .redirect(redirect::Policy::none()) // we handle redirect by ourselves .build() .unwrap(); @@ -200,11 +201,12 @@ impl InserterInnerBuilder { } pub async fn build(&self) -> Result { - let builder = self.build_request_and_client(self.url.clone()); + let builder = self.build_request(self.url.clone()); let resp = builder .send() .await .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?; + // TODO: shall we let `reqwest` handle the redirect? let mut be_url = if resp.status() == StatusCode::TEMPORARY_REDIRECT { resp.headers() .get("location") @@ -244,7 +246,7 @@ impl InserterInnerBuilder { let body = Body::wrap_stream( tokio_stream::wrappers::UnboundedReceiverStream::new(receiver).map(Ok::<_, Infallible>), ); - let builder = self.build_request_and_client(be_url).body(body); + let builder = self.build_request(be_url).body(body); let handle: JoinHandle>> = tokio::spawn(async move { let response = builder From 2f1f8a3996ed82e08214ab8309ad78fb65df99bf Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 11 Apr 2024 16:57:28 +0800 Subject: [PATCH 8/8] fix snowflake Signed-off-by: Bugen Zhao --- src/connector/src/sink/snowflake_connector.rs | 36 +++++++------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index e5e37deb14652..30c74045441a2 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -22,14 +22,10 @@ use aws_sdk_s3::primitives::ByteStream; use aws_sdk_s3::Client as S3Client; use aws_types::region::Region; use bytes::Bytes; -use http::header; -use http::request::Builder; -use hyper::body::Body; -use hyper::client::HttpConnector; -use hyper::{Client, Request, StatusCode}; -use hyper_tls::HttpsConnector; use jsonwebtoken::{encode, Algorithm, EncodingKey, Header}; +use reqwest::{header, Client, RequestBuilder, StatusCode}; use serde::{Deserialize, Serialize}; +use thiserror_ext::AsReport; use super::doris_starrocks_connector::POOL_IDLE_TIMEOUT; use super::{Result, SinkError}; @@ -148,21 +144,19 @@ impl SnowflakeHttpClient { Ok(jwt_token) } - fn build_request_and_client(&self) -> (Builder, Client>) { - let builder = Request::post(self.url.clone()); - - let connector = HttpsConnector::new(); + fn build_request_and_client(&self) -> RequestBuilder { let client = Client::builder() .pool_idle_timeout(POOL_IDLE_TIMEOUT) - .build(connector); + .build() + .unwrap(); - (builder, client) + client.post(&self.url) } /// NOTE: this function should ONLY be called *after* /// uploading files to remote external staged storage, i.e., AWS S3 pub async fn send_request(&self, file_suffix: String) -> Result<()> { - let (builder, client) = self.build_request_and_client(); + let builder = self.build_request_and_client(); // Generate the jwt_token let jwt_token = self.generate_jwt_token()?; @@ -172,19 +166,13 @@ impl SnowflakeHttpClient { .header( "X-Snowflake-Authorization-Token-Type".to_string(), "KEYPAIR_JWT", - ); - - let request = builder - .body(Body::from(generate_s3_file_name( - self.s3_path.clone(), - file_suffix, - ))) - .map_err(|err| SinkError::Snowflake(err.to_string()))?; + ) + .body(generate_s3_file_name(self.s3_path.clone(), file_suffix)); - let response = client - .request(request) + let response = builder + .send() .await - .map_err(|err| SinkError::Snowflake(err.to_string()))?; + .map_err(|err| SinkError::Snowflake(err.to_report_string()))?; if response.status() != StatusCode::OK { return Err(SinkError::Snowflake(format!(