Skip to content

Commit

Permalink
fix: support variable scale decimal in avro (#10368)
Browse files Browse the repository at this point in the history
Co-authored-by: idx0-dev <[email protected]>
  • Loading branch information
tabVersion and adevday authored Jun 19, 2023
1 parent 75f6025 commit 608e183
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 20 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ itertools = "0.10"
maplit = "1.0.2"
moka = { version = "0.10", features = ["future"] }
nexmark = { version = "0.2", features = ["serde"] }
num-bigint = "0.4"
num-traits = "0.2"
parking_lot = "0.12"
prometheus = { version = "0.13", features = ["process"] }
Expand Down
52 changes: 33 additions & 19 deletions src/connector/src/parser/avro/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use risingwave_pb::plan_common::ColumnDesc;
use crate::parser::unified::avro::AvroParseOptions;

const RW_DECIMAL_MAX_PRECISION: usize = 28;
const DBZ_VARIABLE_SCALE_DECIMAL_NAME: &str = "VariableScaleDecimal";
const DBZ_VARIABLE_SCALE_DECIMAL_NAMESPACE: &str = "io.debezium.data";

pub(crate) fn avro_field_to_column_desc(
name: &str,
Expand Down Expand Up @@ -71,14 +73,29 @@ fn avro_type_mapping(schema: &Schema) -> Result<DataType> {
Schema::Boolean => DataType::Boolean,
Schema::Float => DataType::Float32,
Schema::Double => DataType::Float64,
Schema::Decimal { .. } => DataType::Decimal,
Schema::Decimal { precision, .. } => {
if precision > &RW_DECIMAL_MAX_PRECISION {
tracing::warn!(
"RisingWave supports decimal precision up to {}, but got {}. Will truncate.",
RW_DECIMAL_MAX_PRECISION,
precision
);
}
DataType::Decimal
}
Schema::Date => DataType::Date,
Schema::TimestampMillis => DataType::Timestamptz,
Schema::TimestampMicros => DataType::Timestamptz,
Schema::Duration => DataType::Interval,
Schema::Bytes => DataType::Bytea,
Schema::Enum { .. } => DataType::Varchar,
Schema::Record { fields, .. } => {
Schema::Record { fields, name, .. } => {
if name.name == DBZ_VARIABLE_SCALE_DECIMAL_NAME
&& name.namespace == Some(DBZ_VARIABLE_SCALE_DECIMAL_NAMESPACE.into())
{
return Ok(DataType::Decimal);
}

let struct_fields = fields
.iter()
.map(|f| avro_type_mapping(&f.schema))
Expand Down Expand Up @@ -140,20 +157,24 @@ pub(crate) fn get_field_from_avro_value<'a>(

pub(crate) fn avro_decimal_to_rust_decimal(
avro_decimal: AvroDecimal,
precision: usize,
_precision: usize,
scale: usize,
) -> Result<rust_decimal::Decimal> {
if precision > RW_DECIMAL_MAX_PRECISION {
return Err(RwError::from(ProtocolError(format!(
"only support decimal with max precision {} but given avro decimal with precision {}",
RW_DECIMAL_MAX_PRECISION, precision
))));
}

let negative = !avro_decimal.is_positive();
let bytes = avro_decimal.to_vec_unsigned();

let (lo, mid, hi) = match bytes.len() {
let (lo, mid, hi) = extract_decimal(bytes);
Ok(rust_decimal::Decimal::from_parts(
lo,
mid,
hi,
negative,
scale as u32,
))
}

pub(crate) fn extract_decimal(bytes: Vec<u8>) -> (u32, u32, u32) {
match bytes.len() {
len @ 0..=4 => {
let mut pad = vec![0; 4 - len];
pad.extend_from_slice(&bytes);
Expand All @@ -176,14 +197,7 @@ pub(crate) fn avro_decimal_to_rust_decimal(
(lo, mid, hi)
}
_ => unreachable!(),
};
Ok(rust_decimal::Decimal::from_parts(
lo,
mid,
hi,
negative,
scale as u32,
))
}
}

pub(crate) fn unix_epoch_days() -> i32 {
Expand Down
38 changes: 37 additions & 1 deletion src/connector/src/parser/unified/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,19 @@

use std::str::FromStr;

use anyhow::anyhow;
use apache_avro::types::Value;
use apache_avro::Schema;
use itertools::Itertools;
use num_bigint::{BigInt, Sign};
use risingwave_common::array::{ListValue, StructValue};
use risingwave_common::cast::{i64_to_timestamp, i64_to_timestamptz};
use risingwave_common::types::{DataType, Date, Datum, Interval, JsonbVal, ScalarImpl};
use risingwave_common::util::iter_util::ZipEqFast;

use super::{Access, AccessError, AccessResult};
use crate::parser::avro::util::{
avro_decimal_to_rust_decimal, extract_inner_field_schema, unix_epoch_days,
avro_decimal_to_rust_decimal, extract_decimal, extract_inner_field_schema, unix_epoch_days,
};
#[derive(Clone)]
/// Options for parsing an `AvroValue` into Datum, with an optional avro schema.
Expand Down Expand Up @@ -118,6 +120,40 @@ impl<'a> AvroParseOptions<'a> {
.map_err(|_| create_error())?;
ScalarImpl::Decimal(risingwave_common::types::Decimal::Normalized(decimal))
}
(Some(DataType::Decimal), Value::Record(fields)) => {
// VariableScaleDecimal has fixed fields, scale(int) and value(bytes)
let find_in_records = |field_name: &str| {
fields
.iter()
.find(|field| field.0 == field_name)
.map(|field| &field.1)
};
let scale = match find_in_records("scale").ok_or(AccessError::Other(anyhow!(
"scale field not found in VariableScaleDecimal"
)))? {
Value::Int(scale) => Ok(*scale),
avro_value => Err(AccessError::Other(anyhow!(
"scale field in VariableScaleDecimal is not int, got {:?}",
avro_value
))),
}?;

let value: BigInt = match find_in_records("value").ok_or(AccessError::Other(
anyhow!("value field not found in VariableScaleDecimal"),
))? {
Value::Bytes(bytes) => Ok(BigInt::from_signed_bytes_be(bytes)),
avro_value => Err(AccessError::Other(anyhow!(
"value field in VariableScaleDecimal is not bytes, got {:?}",
avro_value
))),
}?;

let negative = value.sign() == Sign::Minus;
let (lo, mid, hi) = extract_decimal(value.to_bytes_be().1);
let decimal =
rust_decimal::Decimal::from_parts(lo, mid, hi, negative, scale as u32);
ScalarImpl::Decimal(risingwave_common::types::Decimal::Normalized(decimal))
}

// ---- Date -----
(Some(DataType::Date) | None, Value::Date(days)) => {
Expand Down

0 comments on commit 608e183

Please sign in to comment.