Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(connector): replace hyper client implementation with reqwest #16146

Merged
merged 9 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 2 additions & 10 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ chrono = { version = "0.4", default-features = false, features = [
"clock",
"std",
] }
clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "622501c1c98c80baaf578c716d6903dde947804e", features = [
clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "d38c8b6391af098b724c114e5a4746aedab6ab8e", features = [
"time",
] }
csv = "1.3"
Expand All @@ -60,14 +60,6 @@ gcp-bigquery-client = "0.18.0"
glob = "0.3"
google-cloud-pubsub = "0.23"
http = "0.2"
hyper = { version = "0.14", features = [
"client",
"tcp",
"http1",
"http2",
"stream",
] } # required by clickhouse client
hyper-tls = "0.5"
icelake = { workspace = true }
indexmap = { version = "1.9.3", features = ["serde"] }
itertools = { workspace = true }
Expand Down Expand Up @@ -112,7 +104,7 @@ rdkafka = { workspace = true, features = [
] }
redis = { version = "0.25", features = ["aio", "tokio-comp", "async-std-comp","cluster-async"] }
regex = "1.4"
reqwest = { version = "0.12.2", features = ["json"] }
reqwest = { version = "0.12.2", features = ["json", "stream"] }
risingwave_common = { workspace = true }
risingwave_common_estimate_size = { workspace = true }
risingwave_jni_core = { workspace = true }
Expand Down
13 changes: 2 additions & 11 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use core::fmt::Debug;
use std::collections::{HashMap, HashSet};
use std::time::Duration;

use anyhow::anyhow;
use clickhouse::insert::Insert;
Expand Down Expand Up @@ -191,18 +191,9 @@ impl ClickHouseEngine {
}
}

const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(5);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is is ok to remove this POOL_IDLE_TIMEOUT setting? I guess the original purpose not to use ClickHouseClient::default() is to set this config. cc @xxhZs

Copy link
Contributor

@xxhZs xxhZs Apr 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The corresponding pr is this one, https://github.com/risingwavelabs/risingwave/pull/12041/files
If we do not set, the default value of POOL_IDLE_TIMEOUT is 2s in clickhouse.rs, the reason for not using the default here is to support https urls

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, we may not modify the code here? @BugenZhao

Copy link
Member Author

@BugenZhao BugenZhao Apr 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason for not using the default here is to support https urls

I didn't find the clue for this. The reason why #12041 works is explained in ClickHouse/clickhouse-rs#58 (comment). There seems nothing to do with the POOL_IDLE_TIMEOUT.

BTW, the upstream has submitted a fix (risingwavelabs/clickhouse.rs@3fc12f6) and there's no more need for this workaround. Should we sync the changes instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May need to run a test to connect to clickhouse with https. If it passes, we can then sync the change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cherry-picked the fix into our fork of clickhouse.rs.


impl ClickHouseCommon {
pub(crate) fn build_client(&self) -> ConnectorResult<ClickHouseClient> {
use hyper_tls::HttpsConnector;

let https = HttpsConnector::new();
let client = hyper::Client::builder()
.pool_idle_timeout(POOL_IDLE_TIMEOUT)
.build::<_, hyper::Body>(https);

let client = ClickHouseClient::with_http_client(client)
let client = ClickHouseClient::default() // hyper(0.14) client inside
.with_url(&self.url)
.with_user(&self.user)
.with_password(&self.password)
Expand Down
46 changes: 17 additions & 29 deletions src/connector/src/sink/doris.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// TODO: use hyper 1 or reqwest 0.12.2

use std::collections::HashMap;
use std::sync::Arc;

Expand All @@ -22,9 +20,6 @@ use async_trait::async_trait;
use base64::engine::general_purpose;
use base64::Engine;
use bytes::{BufMut, Bytes, BytesMut};
use hyper::body::Body;
use hyper::{body, Client, Request};
use hyper_tls::HttpsConnector;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::Schema;
Expand Down Expand Up @@ -433,46 +428,39 @@ impl DorisSchemaClient {

pub async fn get_schema_from_doris(&self) -> Result<DorisSchema> {
let uri = format!("{}/api/{}/{}/_schema", self.url, self.db, self.table);
let builder = Request::get(uri);

let connector = HttpsConnector::new();
let client = Client::builder()
let client = reqwest::Client::builder()
.pool_idle_timeout(POOL_IDLE_TIMEOUT)
.build(connector);
.build()
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;

let request = builder
let response = client
.get(uri)
.header(
"Authorization",
format!(
"Basic {}",
general_purpose::STANDARD.encode(format!("{}:{}", self.user, self.password))
),
)
.body(Body::empty())
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;

let response = client
.request(request)
.send()
.await
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;

let raw_bytes = String::from_utf8(match body::to_bytes(response.into_body()).await {
Ok(bytes) => bytes.to_vec(),
Err(err) => return Err(SinkError::DorisStarrocksConnect(err.into())),
})
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;

let json_map: HashMap<String, Value> = serde_json::from_str(&raw_bytes)
let json: Value = response
.json()
.await
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
let json_data = if json_map.contains_key("code") && json_map.contains_key("msg") {
let data = json_map.get("data").ok_or_else(|| {
SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't find data"))
})?;
data.to_string()
let json_data = if json.get("code").is_some() && json.get("msg").is_some() {
json.get("data")
.ok_or_else(|| {
SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't find data"))
})?
.clone()
} else {
raw_bytes
json
};
let schema: DorisSchema = serde_json::from_str(&json_data)
let schema: DorisSchema = serde_json::from_value(json_data)
.context("Can't get schema from json")
.map_err(SinkError::DorisStarrocksConnect)?;
Ok(schema)
Expand Down
94 changes: 36 additions & 58 deletions src/connector/src/sink/doris_starrocks_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@
use core::mem;
use core::time::Duration;
use std::collections::HashMap;
use std::convert::Infallible;

use anyhow::Context;
use base64::engine::general_purpose;
use base64::Engine;
use bytes::{BufMut, Bytes, BytesMut};
use http::request::Builder;
use hyper::body::{Body, Sender};
use hyper::client::HttpConnector;
use hyper::{body, Client, Request, StatusCode};
use hyper_tls::HttpsConnector;
use futures::StreamExt;
use reqwest::{redirect, Body, Client, RequestBuilder, StatusCode};
use tokio::sync::mpsc::UnboundedSender;
use tokio::task::JoinHandle;
use url::Url;

Expand Down Expand Up @@ -187,33 +186,27 @@ impl InserterInnerBuilder {
})
}

// TODO: use hyper 1 or reqwest 0.12.2
fn build_request_and_client(
&self,
uri: String,
) -> (Builder, Client<HttpsConnector<HttpConnector>>) {
let mut builder = Request::put(uri);
for (k, v) in &self.header {
builder = builder.header(k, v);
}

let connector = HttpsConnector::new();
fn build_request(&self, uri: String) -> RequestBuilder {
let client = Client::builder()
.pool_idle_timeout(POOL_IDLE_TIMEOUT)
.build(connector);
.redirect(redirect::Policy::none()) // we handle redirect by ourselves
.build()
.unwrap();

(builder, client)
let mut builder = client.put(uri);
for (k, v) in &self.header {
builder = builder.header(k, v);
}
builder
}

pub async fn build(&self) -> Result<InserterInner> {
let (builder, client) = self.build_request_and_client(self.url.clone());
let request_get_url = builder
.body(Body::empty())
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
let resp = client
.request(request_get_url)
let builder = self.build_request(self.url.clone());
let resp = builder
.send()
.await
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
// TODO: shall we let `reqwest` handle the redirect?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I wonder if the logic of the following special verdict is an obstacle cc @xuefengze

let mut be_url = if resp.status() == StatusCode::TEMPORARY_REDIRECT {
resp.headers()
.get("location")
Expand Down Expand Up @@ -249,23 +242,25 @@ impl InserterInnerBuilder {
}
}

let (builder, client) = self.build_request_and_client(be_url);
let (sender, body) = Body::channel();
let request = builder
.body(body)
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
let future = client.request(request);
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
let body = Body::wrap_stream(
tokio_stream::wrappers::UnboundedReceiverStream::new(receiver).map(Ok::<_, Infallible>),
);
let builder = self.build_request(be_url).body(body);

let handle: JoinHandle<Result<Vec<u8>>> = tokio::spawn(async move {
let response = future
let response = builder
.send()
.await
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
let status = response.status();
let raw = body::to_bytes(response.into_body())
let raw = response
.bytes()
.await
.map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?
.to_vec();
if status == StatusCode::OK && !raw.is_empty() {
.into();

if status == StatusCode::OK {
Ok(raw)
} else {
Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!(
Expand All @@ -280,6 +275,8 @@ impl InserterInnerBuilder {
}
}

type Sender = UnboundedSender<Bytes>;

pub struct InserterInner {
sender: Option<Sender>,
join_handle: Option<JoinHandle<Result<Vec<u8>>>>,
Expand All @@ -301,37 +298,18 @@ impl InserterInner {

let chunk = mem::replace(&mut self.buffer, BytesMut::with_capacity(BUFFER_SIZE));

let is_timed_out = match tokio::time::timeout(
SEND_CHUNK_TIMEOUT,
self.sender.as_mut().unwrap().send_data(chunk.into()),
)
.await
{
Ok(Ok(_)) => return Ok(()),
Ok(Err(_)) => false,
Err(_) => true,
};
self.abort()?;
if let Err(_e) = self.sender.as_mut().unwrap().send(chunk.freeze()) {
self.sender.take();
self.wait_handle().await?;

let res = self.wait_handle().await;

if is_timed_out {
Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!("timeout")))
} else {
res?;
Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!(
"channel closed"
)))
} else {
Ok(())
}
}

fn abort(&mut self) -> Result<()> {
if let Some(sender) = self.sender.take() {
sender.abort();
}
Ok(())
}

pub async fn write(&mut self, data: Bytes) -> Result<()> {
self.buffer.put_slice(&data);
if self.buffer.len() >= MIN_CHUNK_SIZE {
Expand Down
Loading
Loading