From 0a2b0a74d3301cda87f75471f5b4facffda8f615 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 21 Sep 2022 12:37:19 -0400 Subject: [PATCH] Add serialization of `ScalarValue::Binary` and `ScalarValue::LargeBinary`, `ScalarValue::Time64` (#3534) --- datafusion/proto/proto/datafusion.proto | 16 ++++++-- datafusion/proto/src/from_proto.rs | 52 ++++++++++++++++--------- datafusion/proto/src/lib.rs | 8 ++++ datafusion/proto/src/to_proto.rs | 40 ++++++++++--------- 4 files changed, 76 insertions(+), 40 deletions(-) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 7ec4705b03b6..6553653aec40 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -761,6 +761,9 @@ message ScalarValue{ int64 interval_daytime_value = 25; ScalarTimestampValue timestamp_value = 26; ScalarDictionaryValue dictionary_value = 27; + bytes binary_value = 28; + bytes large_binary_value = 29; + int64 time64_value = 30; } } @@ -788,16 +791,21 @@ enum PrimitiveScalarType{ UTF8 = 11; LARGE_UTF8 = 12; DATE32 = 13; - TIME_MICROSECOND = 14; - TIME_NANOSECOND = 15; + TIMESTAMP_MICROSECOND = 14; + TIMESTAMP_NANOSECOND = 15; NULL = 16; DECIMAL128 = 17; DATE64 = 20; - TIME_SECOND = 21; - TIME_MILLISECOND = 22; + TIMESTAMP_SECOND = 21; + TIMESTAMP_MILLISECOND = 22; INTERVAL_YEARMONTH = 23; INTERVAL_DAYTIME = 24; + + BINARY = 25; + LARGE_BINARY = 26; + + TIME64 = 27; } message ScalarType{ diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs index 95bfeb819a0c..d77a46e2e463 100644 --- a/datafusion/proto/src/from_proto.rs +++ b/datafusion/proto/src/from_proto.rs @@ -218,20 +218,25 @@ impl From for DataType { protobuf::PrimitiveScalarType::Float64 => DataType::Float64, protobuf::PrimitiveScalarType::Utf8 => DataType::Utf8, protobuf::PrimitiveScalarType::LargeUtf8 => DataType::LargeUtf8, + protobuf::PrimitiveScalarType::Binary => DataType::Binary, + protobuf::PrimitiveScalarType::LargeBinary => DataType::LargeBinary, protobuf::PrimitiveScalarType::Date32 => DataType::Date32, - protobuf::PrimitiveScalarType::TimeMicrosecond => { - DataType::Time64(TimeUnit::Microsecond) - } - protobuf::PrimitiveScalarType::TimeNanosecond => { + protobuf::PrimitiveScalarType::Time64 => { DataType::Time64(TimeUnit::Nanosecond) } + protobuf::PrimitiveScalarType::TimestampMicrosecond => { + DataType::Timestamp(TimeUnit::Microsecond, None) + } + protobuf::PrimitiveScalarType::TimestampNanosecond => { + DataType::Timestamp(TimeUnit::Nanosecond, None) + } protobuf::PrimitiveScalarType::Null => DataType::Null, protobuf::PrimitiveScalarType::Decimal128 => DataType::Decimal128(0, 0), protobuf::PrimitiveScalarType::Date64 => DataType::Date64, - protobuf::PrimitiveScalarType::TimeSecond => { + protobuf::PrimitiveScalarType::TimestampSecond => { DataType::Timestamp(TimeUnit::Second, None) } - protobuf::PrimitiveScalarType::TimeMillisecond => { + protobuf::PrimitiveScalarType::TimestampMillisecond => { DataType::Timestamp(TimeUnit::Millisecond, None) } protobuf::PrimitiveScalarType::IntervalYearmonth => { @@ -643,15 +648,20 @@ impl TryFrom<&protobuf::PrimitiveScalarType> for ScalarValue { PrimitiveScalarType::Float64 => Self::Float64(None), PrimitiveScalarType::Utf8 => Self::Utf8(None), PrimitiveScalarType::LargeUtf8 => Self::LargeUtf8(None), + PrimitiveScalarType::Binary => Self::Binary(None), + PrimitiveScalarType::LargeBinary => Self::LargeBinary(None), PrimitiveScalarType::Date32 => Self::Date32(None), - PrimitiveScalarType::TimeMicrosecond => { + PrimitiveScalarType::Time64 => Self::Time64(None), + PrimitiveScalarType::TimestampMicrosecond => { Self::TimestampMicrosecond(None, None) } - PrimitiveScalarType::TimeNanosecond => Self::TimestampNanosecond(None, None), + PrimitiveScalarType::TimestampNanosecond => { + Self::TimestampNanosecond(None, None) + } PrimitiveScalarType::Decimal128 => Self::Decimal128(None, 0, 0), PrimitiveScalarType::Date64 => Self::Date64(None), - PrimitiveScalarType::TimeSecond => Self::TimestampSecond(None, None), - PrimitiveScalarType::TimeMillisecond => { + PrimitiveScalarType::TimestampSecond => Self::TimestampSecond(None, None), + PrimitiveScalarType::TimestampMillisecond => { Self::TimestampMillisecond(None, None) } PrimitiveScalarType::IntervalYearmonth => Self::IntervalYearMonth(None), @@ -749,6 +759,7 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue { ) } Value::Date64Value(v) => Self::Date64(Some(*v)), + Value::Time64Value(v) => Self::Time64(Some(*v)), Value::IntervalYearmonthValue(v) => Self::IntervalYearMonth(Some(*v)), Value::IntervalDaytimeValue(v) => Self::IntervalDayTime(Some(*v)), Value::TimestampValue(v) => { @@ -792,6 +803,8 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue { Self::Dictionary(Box::new(index_type), Box::new(value)) } + Value::BinaryValue(v) => Self::Binary(Some(v.clone())), + Value::LargeBinaryValue(v) => Self::LargeBinary(Some(v.clone())), }) } } @@ -1419,7 +1432,7 @@ fn typechecked_scalar_value_conversion( value: Some(protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(v)), }), - PrimitiveScalarType::TimeMicrosecond, + PrimitiveScalarType::TimestampMicrosecond, ) => ScalarValue::TimestampMicrosecond(Some(*v), unwrap_timezone(timezone)), ( Value::TimestampValue(protobuf::ScalarTimestampValue { @@ -1427,14 +1440,14 @@ fn typechecked_scalar_value_conversion( value: Some(protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(v)), }), - PrimitiveScalarType::TimeNanosecond, + PrimitiveScalarType::TimestampNanosecond, ) => ScalarValue::TimestampNanosecond(Some(*v), unwrap_timezone(timezone)), ( Value::TimestampValue(protobuf::ScalarTimestampValue { timezone, value: Some(protobuf::scalar_timestamp_value::Value::TimeSecondValue(v)), }), - PrimitiveScalarType::TimeSecond, + PrimitiveScalarType::TimestampSecond, ) => ScalarValue::TimestampSecond(Some(*v), unwrap_timezone(timezone)), ( Value::TimestampValue(protobuf::ScalarTimestampValue { @@ -1442,7 +1455,7 @@ fn typechecked_scalar_value_conversion( value: Some(protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(v)), }), - PrimitiveScalarType::TimeMillisecond, + PrimitiveScalarType::TimestampMillisecond, ) => ScalarValue::TimestampMillisecond(Some(*v), unwrap_timezone(timezone)), (Value::Utf8Value(v), PrimitiveScalarType::Utf8) => { ScalarValue::Utf8(Some(v.to_owned())) @@ -1469,10 +1482,11 @@ fn typechecked_scalar_value_conversion( PrimitiveScalarType::Utf8 => ScalarValue::Utf8(None), PrimitiveScalarType::LargeUtf8 => ScalarValue::LargeUtf8(None), PrimitiveScalarType::Date32 => ScalarValue::Date32(None), - PrimitiveScalarType::TimeMicrosecond => { + PrimitiveScalarType::Time64 => ScalarValue::Time64(None), + PrimitiveScalarType::TimestampMicrosecond => { ScalarValue::TimestampMicrosecond(None, None) } - PrimitiveScalarType::TimeNanosecond => { + PrimitiveScalarType::TimestampNanosecond => { ScalarValue::TimestampNanosecond(None, None) } PrimitiveScalarType::Null => { @@ -1484,10 +1498,10 @@ fn typechecked_scalar_value_conversion( ScalarValue::Decimal128(None, 0, 0) } PrimitiveScalarType::Date64 => ScalarValue::Date64(None), - PrimitiveScalarType::TimeSecond => { + PrimitiveScalarType::TimestampSecond => { ScalarValue::TimestampSecond(None, None) } - PrimitiveScalarType::TimeMillisecond => { + PrimitiveScalarType::TimestampMillisecond => { ScalarValue::TimestampMillisecond(None, None) } PrimitiveScalarType::IntervalYearmonth => { @@ -1496,6 +1510,8 @@ fn typechecked_scalar_value_conversion( PrimitiveScalarType::IntervalDaytime => { ScalarValue::IntervalDayTime(None) } + PrimitiveScalarType::Binary => ScalarValue::Binary(None), + PrimitiveScalarType::LargeBinary => ScalarValue::LargeBinary(None), }; scalar_value } else { diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs index f34da705f94a..dfe2bbaaa45f 100644 --- a/datafusion/proto/src/lib.rs +++ b/datafusion/proto/src/lib.rs @@ -402,6 +402,10 @@ mod roundtrip_tests { ScalarValue::LargeUtf8(Some(String::from("Test Large utf8"))), ScalarValue::Date32(Some(0)), ScalarValue::Date32(Some(i32::MAX)), + ScalarValue::Date32(None), + ScalarValue::Time64(Some(0)), + ScalarValue::Time64(Some(i64::MAX)), + ScalarValue::Time64(None), ScalarValue::TimestampNanosecond(Some(0), None), ScalarValue::TimestampNanosecond(Some(i64::MAX), None), ScalarValue::TimestampNanosecond(Some(0), Some("UTC".to_string())), @@ -459,6 +463,10 @@ mod roundtrip_tests { Box::new(DataType::Int32), Box::new(ScalarValue::Utf8(None)), ), + ScalarValue::Binary(Some(b"bar".to_vec())), + ScalarValue::Binary(None), + ScalarValue::LargeBinary(Some(b"bar".to_vec())), + ScalarValue::LargeBinary(None), ]; for test_case in should_pass.into_iter() { diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs index ed0b5ec0871c..e29b8ec68964 100644 --- a/datafusion/proto/src/to_proto.rs +++ b/datafusion/proto/src/to_proto.rs @@ -1098,7 +1098,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { }) } datafusion::scalar::ScalarValue::TimestampMicrosecond(val, tz) => { - create_proto_scalar(val, PrimitiveScalarType::TimeMicrosecond, |s| { + create_proto_scalar(val, PrimitiveScalarType::TimestampMicrosecond, |s| { Value::TimestampValue(protobuf::ScalarTimestampValue { timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(), value: Some( @@ -1110,7 +1110,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { }) } datafusion::scalar::ScalarValue::TimestampNanosecond(val, tz) => { - create_proto_scalar(val, PrimitiveScalarType::TimeNanosecond, |s| { + create_proto_scalar(val, PrimitiveScalarType::TimestampNanosecond, |s| { Value::TimestampValue(protobuf::ScalarTimestampValue { timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(), value: Some( @@ -1145,7 +1145,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { }) } datafusion::scalar::ScalarValue::TimestampSecond(val, tz) => { - create_proto_scalar(val, PrimitiveScalarType::TimeSecond, |s| { + create_proto_scalar(val, PrimitiveScalarType::TimestampSecond, |s| { Value::TimestampValue(protobuf::ScalarTimestampValue { timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(), value: Some( @@ -1155,7 +1155,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { }) } datafusion::scalar::ScalarValue::TimestampMillisecond(val, tz) => { - create_proto_scalar(val, PrimitiveScalarType::TimeMillisecond, |s| { + create_proto_scalar(val, PrimitiveScalarType::TimestampMillisecond, |s| { Value::TimestampValue(protobuf::ScalarTimestampValue { timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(), value: Some( @@ -1180,19 +1180,21 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { value: Some(Value::NullValue(PrimitiveScalarType::Null as i32)), }, - datafusion::scalar::ScalarValue::Binary(_) => { - // not yet implemented (TODO file ticket) - return Err(Error::invalid_scalar_value(val)); + scalar::ScalarValue::Binary(val) => { + create_proto_scalar(val, PrimitiveScalarType::Binary, |s| { + Value::BinaryValue(s.to_owned()) + }) } - - datafusion::scalar::ScalarValue::LargeBinary(_) => { - // not yet implemented (TODO file ticket) - return Err(Error::invalid_scalar_value(val)); + scalar::ScalarValue::LargeBinary(val) => { + create_proto_scalar(val, PrimitiveScalarType::LargeBinary, |s| { + Value::LargeBinaryValue(s.to_owned()) + }) } - datafusion::scalar::ScalarValue::Time64(_) => { - // not yet implemented (TODO file ticket) - return Err(Error::invalid_scalar_value(val)); + datafusion::scalar::ScalarValue::Time64(v) => { + create_proto_scalar(v, PrimitiveScalarType::Time64, |v| { + Value::Time64Value(*v) + }) } datafusion::scalar::ScalarValue::IntervalMonthDayNano(_) => { @@ -1335,10 +1337,10 @@ impl TryFrom<&DataType> for protobuf::scalar_type::Datatype { DataType::Date32 => Self::Scalar(PrimitiveScalarType::Date32 as i32), DataType::Time64(time_unit) => match time_unit { TimeUnit::Microsecond => { - Self::Scalar(PrimitiveScalarType::TimeMicrosecond as i32) + Self::Scalar(PrimitiveScalarType::TimestampMicrosecond as i32) } TimeUnit::Nanosecond => { - Self::Scalar(PrimitiveScalarType::TimeNanosecond as i32) + Self::Scalar(PrimitiveScalarType::TimestampNanosecond as i32) } _ => { return Err(Error::invalid_time_unit(time_unit)); @@ -1379,8 +1381,10 @@ impl TryFrom<&DataType> for protobuf::scalar_type::Datatype { DataType::Float64 => PrimitiveScalarType::Float64, DataType::Date32 => PrimitiveScalarType::Date32, DataType::Time64(time_unit) => match time_unit { - TimeUnit::Microsecond => PrimitiveScalarType::TimeMicrosecond, - TimeUnit::Nanosecond => PrimitiveScalarType::TimeNanosecond, + TimeUnit::Microsecond => { + PrimitiveScalarType::TimestampMicrosecond + } + TimeUnit::Nanosecond => PrimitiveScalarType::TimestampNanosecond, _ => { return Err(Error::invalid_time_unit(time_unit)); }