From 9dfbe220bff88032aa215952358d68bdf850432f Mon Sep 17 00:00:00 2001 From: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> Date: Thu, 11 Apr 2024 16:10:49 +0800 Subject: [PATCH] fix(sink): fix starrocks doris and clickhouse decimal (#15664) --- ci/scripts/e2e-clickhouse-sink-test.sh | 16 ++++----- ci/scripts/e2e-starrocks-sink-test.sh | 4 +-- e2e_test/sink/clickhouse_sink.slt | 6 ++-- e2e_test/sink/starrocks_sink.slt | 4 +-- src/connector/src/sink/clickhouse.rs | 29 +++++++-------- src/connector/src/sink/doris.rs | 6 ++-- src/connector/src/sink/encoder/json.rs | 45 +++++++++-------------- src/connector/src/sink/encoder/mod.rs | 4 +-- src/connector/src/sink/starrocks.rs | 50 ++------------------------ 9 files changed, 54 insertions(+), 110 deletions(-) diff --git a/ci/scripts/e2e-clickhouse-sink-test.sh b/ci/scripts/e2e-clickhouse-sink-test.sh index b248d35d49d42..3e234ed5c69bc 100755 --- a/ci/scripts/e2e-clickhouse-sink-test.sh +++ b/ci/scripts/e2e-clickhouse-sink-test.sh @@ -31,7 +31,7 @@ sleep 1 echo "--- create clickhouse table" curl https://clickhouse.com/ | sh sleep 2 -./clickhouse client --host=clickhouse-server --port=9000 --query="CREATE table demo_test(v1 Int32,v2 Int64,v3 String,v4 Enum16('A'=1,'B'=2))ENGINE = ReplacingMergeTree PRIMARY KEY (v1);" +./clickhouse client --host=clickhouse-server --port=9000 --query="CREATE table demo_test(v1 Int32,v2 Int64,v3 String,v4 Enum16('A'=1,'B'=2), v5 decimal64(3))ENGINE = ReplacingMergeTree PRIMARY KEY (v1);" echo "--- testing sinks" sqllogictest -p 4566 -d dev './e2e_test/sink/clickhouse_sink.slt' @@ -41,13 +41,13 @@ sleep 5 # check sink destination using shell if cat ./query_result.csv | sort | awk -F "," '{ -if ($1 == 1 && $2 == 50 && $3 == "\"1-50\"" && $4 == "\"A\"") c1++; - if ($1 == 13 && $2 == 2 && $3 == "\"13-2\"" && $4 == "\"B\"") c2++; - if ($1 == 2 && $2 == 2 && $3 == "\"2-2\"" && $4 == "\"B\"") c3++; - if ($1 == 21 && $2 == 2 && $3 == "\"21-2\"" && $4 == "\"A\"") c4++; - if ($1 == 3 && $2 == 2 && $3 == "\"3-2\"" && $4 == "\"A\"") c5++; - if ($1 == 5 && $2 == 2 && $3 == "\"5-2\"" && $4 == "\"B\"") c6++; - if ($1 == 8 && $2 == 2 && $3 == "\"8-2\"" && $4 == "\"A\"") c7++; } +if ($1 == 1 && $2 == 50 && $3 == "\"1-50\"" && $4 == "\"A\"" && $5 == 1.1) c1++; + if ($1 == 13 && $2 == 2 && $3 == "\"13-2\"" && $4 == "\"B\"" && $5 == 0) c2++; + if ($1 == 2 && $2 == 2 && $3 == "\"2-2\"" && $4 == "\"B\"" && $5 == 2.2) c3++; + if ($1 == 21 && $2 == 2 && $3 == "\"21-2\"" && $4 == "\"A\"" && $5 == 0) c4++; + if ($1 == 3 && $2 == 2 && $3 == "\"3-2\"" && $4 == "\"A\"" && $5 == 3.3) c5++; + if ($1 == 5 && $2 == 2 && $3 == "\"5-2\"" && $4 == "\"B\"" && $5 == 4.4) c6++; + if ($1 == 8 && $2 == 2 && $3 == "\"8-2\"" && $4 == "\"A\"" && $5 == 0) c7++; } END { exit !(c1 == 1 && c2 == 1 && c3 == 1 && c4 == 1 && c5 == 1 && c6 == 1 && c7 == 1); }'; then echo "Clickhouse sink check passed" else diff --git a/ci/scripts/e2e-starrocks-sink-test.sh b/ci/scripts/e2e-starrocks-sink-test.sh index e17eaed33f437..88bde0fb5c6d1 100755 --- a/ci/scripts/e2e-starrocks-sink-test.sh +++ b/ci/scripts/e2e-starrocks-sink-test.sh @@ -32,7 +32,7 @@ echo "--- create starrocks table" apt-get update -y && apt-get install -y mysql-client sleep 2 mysql -uroot -P 9030 -h starrocks-fe-server -e "CREATE database demo;use demo; -CREATE table demo_bhv_table(v1 int,v2 smallint,v3 bigint,v4 float,v5 double,v6 string,v7 date,v8 datetime,v9 boolean,v10 json) ENGINE=OLAP +CREATE table demo_bhv_table(v1 int,v2 smallint,v3 bigint,v4 float,v5 double,v6 string,v7 date,v8 datetime,v9 boolean,v10 json,v11 decimal(10,5)) ENGINE=OLAP PRIMARY KEY(\`v1\`) DISTRIBUTED BY HASH(\`v1\`) properties(\"replication_num\" = \"1\"); CREATE USER 'users'@'%' IDENTIFIED BY '123456'; @@ -46,7 +46,7 @@ mysql -uroot -P 9030 -h starrocks-fe-server -e "select * from demo.demo_bhv_tabl if cat ./query_result.csv | sed '1d; s/\t/,/g' | awk -F "," '{ - exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01 01:01:01" && $9 == 0 && $10 = "{"v101": 100}"); }'; then + exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01 01:01:01" && $9 == 0 && $10 = "{"v101": 100}" && $11 == 1.12346); }'; then echo "Starrocks sink check passed" else cat ./query_result.csv diff --git a/e2e_test/sink/clickhouse_sink.slt b/e2e_test/sink/clickhouse_sink.slt index 9791f484326d7..2adc70dcf409e 100644 --- a/e2e_test/sink/clickhouse_sink.slt +++ b/e2e_test/sink/clickhouse_sink.slt @@ -1,11 +1,11 @@ statement ok -CREATE TABLE t6 (v1 int primary key, v2 bigint, v3 varchar, v4 smallint); +CREATE TABLE t6 (v1 int primary key, v2 bigint, v3 varchar, v4 smallint, v5 decimal); statement ok CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6; statement ok -CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, mv6.v4 as v4 from mv6 WITH ( +CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, mv6.v4 as v4, mv6.v5 as v5 from mv6 WITH ( connector = 'clickhouse', type = 'append-only', force_append_only='true', @@ -17,7 +17,7 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, mv6.v4 as v4 ); statement ok -INSERT INTO t6 VALUES (1, 50, '1-50', 1), (2, 2, '2-2', 2), (3, 2, '3-2', 1), (5, 2, '5-2', 2), (8, 2, '8-2', 1), (13, 2, '13-2', 2), (21, 2, '21-2', 1); +INSERT INTO t6 VALUES (1, 50, '1-50', 1, 1.1), (2, 2, '2-2', 2, 2.2), (3, 2, '3-2', 1, 3.3), (5, 2, '5-2', 2, 4.4), (8, 2, '8-2', 1, 'inf'), (13, 2, '13-2', 2, '-inf'), (21, 2, '21-2', 1, 'nan'); statement ok FLUSH; diff --git a/e2e_test/sink/starrocks_sink.slt b/e2e_test/sink/starrocks_sink.slt index a1ee1b0ffe039..3b8a3987a8bc2 100644 --- a/e2e_test/sink/starrocks_sink.slt +++ b/e2e_test/sink/starrocks_sink.slt @@ -1,5 +1,5 @@ statement ok -CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamp, v9 boolean, v10 jsonb); +CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamp, v9 boolean, v10 jsonb, v11 decimal); statement ok CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6; @@ -21,7 +21,7 @@ FROM ); statement ok -INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01' , false, '{"v101":100}'); +INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01' , false, '{"v101":100}',1.12345678910); statement ok FLUSH; diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index 16407a8f68c6c..cb4c1b64b04ed 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -29,6 +29,7 @@ use serde::Serialize; use serde_derive::Deserialize; use serde_with::serde_as; use thiserror_ext::AsReport; +use tracing::warn; use with_options::WithOptions; use super::{DummySinkCommitCoordinator, SinkWriterParam}; @@ -747,27 +748,27 @@ impl ClickHouseFieldWithNull { ScalarRefImpl::Utf8(v) => ClickHouseField::String(v.to_string()), ScalarRefImpl::Bool(v) => ClickHouseField::Bool(v), ScalarRefImpl::Decimal(d) => { - if let Decimal::Normalized(d) = d { + let d = if let Decimal::Normalized(d) = d { let scale = clickhouse_schema_feature.accuracy_decimal.1 as i32 - d.scale() as i32; - - let scale = if scale < 0 { + if scale < 0 { d.mantissa() / 10_i128.pow(scale.unsigned_abs()) } else { d.mantissa() * 10_i128.pow(scale as u32) - }; - - if clickhouse_schema_feature.accuracy_decimal.0 <= 9 { - ClickHouseField::Decimal(ClickHouseDecimal::Decimal32(scale as i32)) - } else if clickhouse_schema_feature.accuracy_decimal.0 <= 18 { - ClickHouseField::Decimal(ClickHouseDecimal::Decimal64(scale as i64)) - } else { - ClickHouseField::Decimal(ClickHouseDecimal::Decimal128(scale)) } + } else if clickhouse_schema_feature.can_null { + warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse null!"); + return Ok(vec![ClickHouseFieldWithNull::None]); } else { - return Err(SinkError::ClickHouse( - "clickhouse can not support Decimal NAN,-INF and INF".to_string(), - )); + warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse 0!"); + 0_i128 + }; + if clickhouse_schema_feature.accuracy_decimal.0 <= 9 { + ClickHouseField::Decimal(ClickHouseDecimal::Decimal32(d as i32)) + } else if clickhouse_schema_feature.accuracy_decimal.0 <= 18 { + ClickHouseField::Decimal(ClickHouseDecimal::Decimal64(d as i64)) + } else { + ClickHouseField::Decimal(ClickHouseDecimal::Decimal128(d)) } } ScalarRefImpl::Interval(_) => { diff --git a/src/connector/src/sink/doris.rs b/src/connector/src/sink/doris.rs index be3976f30fd60..55b499b652623 100644 --- a/src/connector/src/sink/doris.rs +++ b/src/connector/src/sink/doris.rs @@ -495,11 +495,9 @@ pub struct DorisField { aggregation_type: String, } impl DorisField { - pub fn get_decimal_pre_scale(&self) -> Option<(u8, u8)> { + pub fn get_decimal_pre_scale(&self) -> Option { if self.r#type.contains("DECIMAL") { - let a = self.precision.clone().unwrap().parse::().unwrap(); - let b = self.scale.clone().unwrap().parse::().unwrap(); - Some((a, b)) + Some(self.scale.clone().unwrap().parse::().unwrap()) } else { None } diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index 64a06ff70770f..bbd424d5db036 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -24,7 +24,7 @@ use itertools::Itertools; use risingwave_common::array::{ArrayError, ArrayResult}; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::row::Row; -use risingwave_common::types::{DataType, DatumRef, Decimal, JsonbVal, ScalarRefImpl, ToText}; +use risingwave_common::types::{DataType, DatumRef, JsonbVal, ScalarRefImpl, ToText}; use risingwave_common::util::iter_util::ZipEqDebug; use serde_json::{json, Map, Value}; use thiserror_ext::AsReport; @@ -83,7 +83,7 @@ impl JsonEncoder { pub fn new_with_doris( schema: Schema, col_indices: Option>, - map: HashMap, + map: HashMap, ) -> Self { Self { schema, @@ -97,11 +97,7 @@ impl JsonEncoder { } } - pub fn new_with_starrocks( - schema: Schema, - col_indices: Option>, - map: HashMap, - ) -> Self { + pub fn new_with_starrocks(schema: Schema, col_indices: Option>) -> Self { Self { schema, col_indices, @@ -109,7 +105,7 @@ impl JsonEncoder { date_handling_mode: DateHandlingMode::String, timestamp_handling_mode: TimestampHandlingMode::String, timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix, - custom_json_type: CustomJsonType::StarRocks(map), + custom_json_type: CustomJsonType::StarRocks, kafka_connect: None, } } @@ -242,24 +238,17 @@ fn datum_to_json_object( (DataType::Varchar, ScalarRefImpl::Utf8(v)) => { json!(v) } + // Doris/Starrocks will convert out-of-bounds decimal and -INF, INF, NAN to NULL (DataType::Decimal, ScalarRefImpl::Decimal(mut v)) => match custom_json_type { - CustomJsonType::Doris(map) | CustomJsonType::StarRocks(map) => { - if !matches!(v, Decimal::Normalized(_)) { - return Err(ArrayError::internal( - "doris/starrocks can't support decimal Inf, -Inf, Nan".to_string(), - )); - } - let (p, s) = map.get(&field.name).unwrap(); + CustomJsonType::Doris(map) => { + let s = map.get(&field.name).unwrap(); v.rescale(*s as u32); - let v_string = v.to_text(); - let len = v_string.clone().replace(['.', '-'], "").len(); - if len > *p as usize { - return Err(ArrayError::internal( - format!("rw Decimal's precision is large than doris/starrocks max decimal len is {:?}, doris max is {:?}",v_string.len(),p))); - } - json!(v_string) + json!(v.to_text()) } - CustomJsonType::Es | CustomJsonType::None | CustomJsonType::BigQuery => { + CustomJsonType::Es + | CustomJsonType::None + | CustomJsonType::BigQuery + | CustomJsonType::StarRocks => { json!(v.to_text()) } }, @@ -310,7 +299,7 @@ fn datum_to_json_object( json!(v.as_iso_8601()) } (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match custom_json_type { - CustomJsonType::Es | CustomJsonType::StarRocks(_) => JsonbVal::from(jsonb_ref).take(), + CustomJsonType::Es | CustomJsonType::StarRocks => JsonbVal::from(jsonb_ref).take(), CustomJsonType::Doris(_) | CustomJsonType::None | CustomJsonType::BigQuery => { json!(jsonb_ref.to_string()) } @@ -357,7 +346,7 @@ fn datum_to_json_object( serde_json::to_string(&map).context("failed to serialize into JSON")?, ) } - CustomJsonType::StarRocks(_) => { + CustomJsonType::StarRocks => { return Err(ArrayError::internal( "starrocks can't support struct".to_string(), )); @@ -470,8 +459,8 @@ fn type_as_json_schema(rw_type: &DataType) -> Map { mod tests { use risingwave_common::types::{ - DataType, Date, Interval, Scalar, ScalarImpl, StructRef, StructType, StructValue, Time, - Timestamp, + DataType, Date, Decimal, Interval, Scalar, ScalarImpl, StructRef, StructType, StructValue, + Time, Timestamp, }; use super::*; @@ -639,7 +628,7 @@ mod tests { assert_eq!(interval_value, json!("P1Y1M2DT0H0M1S")); let mut map = HashMap::default(); - map.insert("aaa".to_string(), (10_u8, 5_u8)); + map.insert("aaa".to_string(), 5_u8); let decimal = datum_to_json_object( &Field { data_type: DataType::Decimal, diff --git a/src/connector/src/sink/encoder/mod.rs b/src/connector/src/sink/encoder/mod.rs index 34dc4c8886448..0b6899dbad955 100644 --- a/src/connector/src/sink/encoder/mod.rs +++ b/src/connector/src/sink/encoder/mod.rs @@ -139,11 +139,11 @@ pub enum CustomJsonType { // Doris's json need date is string. // The internal order of the struct should follow the insertion order. // The decimal needs verification and calibration. - Doris(HashMap), + Doris(HashMap), // Es's json need jsonb is struct Es, // starrocks' need jsonb is struct - StarRocks(HashMap), + StarRocks, // bigquery need null array -> [] BigQuery, None, diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index 324d386f3ef3d..994e4e43d8172 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use anyhow::anyhow; use async_trait::async_trait; use bytes::Bytes; -use itertools::Itertools; use mysql_async::prelude::Queryable; use mysql_async::Opts; use risingwave_common::array::{Op, StreamChunk}; @@ -223,8 +222,7 @@ impl Sink for StarrocksSink { self.schema.clone(), self.pk_indices.clone(), self.is_append_only, - ) - .await? + )? .into_log_sinker(writer_param.sink_metrics)) } @@ -293,54 +291,12 @@ impl TryFrom for StarrocksSink { } impl StarrocksSinkWriter { - pub async fn new( + pub fn new( config: StarrocksConfig, schema: Schema, pk_indices: Vec, is_append_only: bool, ) -> Result { - let mut decimal_map = HashMap::default(); - let starrocks_columns = StarrocksSchemaClient::new( - config.common.host.clone(), - config.common.mysql_port.clone(), - config.common.table.clone(), - config.common.database.clone(), - config.common.user.clone(), - config.common.password.clone(), - ) - .await? - .get_columns_from_starrocks() - .await?; - - for (name, column_type) in &starrocks_columns { - if column_type.contains("decimal") { - let decimal_all = column_type - .split("decimal(") - .last() - .ok_or_else(|| SinkError::Starrocks("must have last".to_string()))? - .split(')') - .next() - .ok_or_else(|| SinkError::Starrocks("must have next".to_string()))? - .split(',') - .collect_vec(); - let length = decimal_all - .first() - .ok_or_else(|| SinkError::Starrocks("must have next".to_string()))? - .parse::() - .map_err(|e| { - SinkError::Starrocks(format!("starrocks sink error: {}", e.as_report())) - })?; - - let scale = decimal_all - .last() - .ok_or_else(|| SinkError::Starrocks("must have next".to_string()))? - .parse::() - .map_err(|e| { - SinkError::Starrocks(format!("starrocks sink error: {}", e.as_report())) - })?; - decimal_map.insert(name.to_string(), (length, scale)); - } - } let mut fields_name = schema.names_str(); if !is_append_only { fields_name.push(STARROCKS_DELETE_SIGN); @@ -367,7 +323,7 @@ impl StarrocksSinkWriter { inserter_innet_builder: starrocks_insert_builder, is_append_only, client: None, - row_encoder: JsonEncoder::new_with_starrocks(schema, None, decimal_map), + row_encoder: JsonEncoder::new_with_starrocks(schema, None), }) }