-
Notifications
You must be signed in to change notification settings - Fork 598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(source): enhance parquet file source #19221
Conversation
src/connector/src/source/filesystem/opendal_source/opendal_reader.rs
Outdated
Show resolved
Hide resolved
fn from_timestampsecond_array( | ||
&self, | ||
array: &arrow_array::TimestampSecondArray, | ||
) -> Result<ArrayImpl, ArrayError> { | ||
Ok(ArrayImpl::Timestamp(array.into())) | ||
} | ||
fn from_timestampsecond_some_array( | ||
&self, | ||
array: &arrow_array::TimestampSecondArray, | ||
) -> Result<ArrayImpl, ArrayError> { | ||
Ok(ArrayImpl::Timestamp(array.into())) | ||
} | ||
|
||
fn from_timestampms_array( | ||
&self, | ||
array: &arrow_array::TimestampMillisecondArray, | ||
) -> Result<ArrayImpl, ArrayError> { | ||
Ok(ArrayImpl::Timestamp(array.into())) | ||
} | ||
|
||
fn from_timestampms_some_array( | ||
&self, | ||
array: &arrow_array::TimestampMillisecondArray, | ||
) -> Result<ArrayImpl, ArrayError> { | ||
Ok(ArrayImpl::Timestamptz(array.into())) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am curious, we don't need to do any conversion between these types? All of them can be implemented in the same way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also don't get why we need to add different methods to convert timestamp in different units. IIUC, the current conversion logic won't lose any precision. What happened if we just add different units of the timestamp type to converts
macro and rely on the original implementation to do the conversion?
converts!(TimestampArray, arrow_array::TimestampMillisecondArray, @map);
converts!(TimestampArray, arrow_array::TimestampSecondArray, @map);
converts!(TimestampArray, arrow_array::TimestampNanosecondArray, @map);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In our previous code, we only handled timestamps in microseconds, so this conversion is only correct for timestamp(MicroSecond, _)
. As you mentioned in the logic, the seconds part is % 1_000_000
, and the nanoseconds part is also % 1_000_000
. If all four time units are converted using this logic, it will yield incorrect results, as follows:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The correct approach should be as shown in the PR, where different handling is done based on the time unit. For example, for milliseconds, it should be %1000
instead of %100000
:
TimeUnit::Millisecond => Timestamp(
DateTime::from_timestamp((value / 1_000) as _, (value % 1_000 * 1000000) as _)
.unwrap()
.naive_utc(),
),
So I think specifying the time unit is essential.
src/connector/src/source/filesystem/opendal_source/opendal_reader.rs
Outdated
Show resolved
Hide resolved
/// Converts RisingWave value from and into Arrow value. | ||
trait FromIntoArrow { | ||
/// The corresponding element type in the Arrow array. | ||
type ArrowType; | ||
/// The timestamp type used to distinguish different time units, only utilized when the Arrow type is a timestamp. | ||
type TimestampType; | ||
fn from_arrow(value: Self::ArrowType) -> Self; | ||
fn into_arrow(self) -> Self::ArrowType; | ||
/// Used for converting timestamp types and will not be used in conversions of other types. | ||
fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally speaking, for type conversion, schema (i.e., type information) should be provided, since data itself may not be self-contained.
fn convert(v_from, t_from, t_to) -> v_to
t_from
& t_to
are not always needed though
e.g., our Avro conversion
risingwave/src/connector/codec/src/decoder/avro/mod.rs
Lines 89 to 93 in 9a32e75
pub fn convert_to_datum<'b>( | |
&self, | |
value: &'b Value, | |
type_expected: &DataType, | |
) -> AccessResult<DatumCow<'b>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Therefore, _with_unit
looks a little strange to me. Haven't checked the details of Arrow yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @xiangjinwu @BugenZhao might also be able to comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://docs.rs/arrow-schema/53.2.0/arrow_schema/enum.DataType.html
If we need _with_unit
to pass extra info from DataType
, how about the following?
IntervalUnit
i32
length forFixedSizeBinary
- precision and scale for
Decimal{128,256}
To repeat, DataType
should be provided as data itself may not be self-contained.
($ArrayType:ty, $ArrowType:ty, $time_unit:expr, @map) => { | ||
|
||
impl From<&$ArrayType> for $ArrowType { | ||
fn from(array: &$ArrayType) -> Self { | ||
array.iter().map(|o| o.map(|v| v.into_arrow_with_unit($time_unit))).collect() | ||
} | ||
} | ||
|
||
impl From<&$ArrowType> for $ArrayType { | ||
fn from(array: &$ArrowType) -> Self { | ||
array.iter().map(|o| { | ||
o.map(|v| { | ||
let timestamp = <<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow_with_unit(v, $time_unit); | ||
timestamp | ||
}) | ||
}).collect() | ||
} | ||
} | ||
|
||
impl From<&[$ArrowType]> for $ArrayType { | ||
fn from(arrays: &[$ArrowType]) -> Self { | ||
arrays | ||
.iter() | ||
.flat_map(|a| a.iter()) | ||
.map(|o| { | ||
o.map(|v| { | ||
<<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow(v) | ||
}) | ||
}) | ||
.collect() | ||
} | ||
} | ||
|
||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks very specialized. What about putting it in a new macro, like converts_timestamp
?
/// The timestamp type used to distinguish different time units, only utilized when the Arrow type is a timestamp. | ||
type TimestampType; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed? I guess it's too intrusive to put timestamp
related things in every type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to distinguish the four timestamp types of arrow, instead of just using one i64 to represent them.
Timestamp(Microsecond, None) => DataType::Timestamp, | ||
Timestamp(Microsecond, Some(_)) => DataType::Timestamptz, | ||
Timestamp(Second, None) => DataType::Timestamp, | ||
Timestamp(Second, Some(_)) => DataType::Timestamptz, | ||
Timestamp(Millisecond, None) => DataType::Timestamp, | ||
Timestamp(Millisecond, Some(_)) => DataType::Timestamptz, | ||
Timestamp(Nanosecond, None) => DataType::Timestamp, | ||
Timestamp(Nanosecond, Some(_)) => DataType::Timestamptz, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Timestamp(_, None) => DataType::Timestamp,
Timestamp(_, Some(_)) => DataType::Timestamptz,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer to keep time unit designated as a reminder, there are four types of timeunit that need to be handled. Previously, other types were ignored because of the use of _
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously we use concrete type Microsecond
instead of _
so it was ignored not because of the use of _
but the contrast. I don't have a strong opinion on this so listing the types explicitly here is also okay. I am more concerned about having separate logics to handle the different time unit separately given that our original implementation won't lose any precision. See my comment below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
won't lose any precision
explained in #19221 (comment)
fn from_timestampsecond_array( | ||
&self, | ||
array: &arrow_array::TimestampSecondArray, | ||
) -> Result<ArrayImpl, ArrayError> { | ||
Ok(ArrayImpl::Timestamp(array.into())) | ||
} | ||
fn from_timestampsecond_some_array( | ||
&self, | ||
array: &arrow_array::TimestampSecondArray, | ||
) -> Result<ArrayImpl, ArrayError> { | ||
Ok(ArrayImpl::Timestamp(array.into())) | ||
} | ||
|
||
fn from_timestampms_array( | ||
&self, | ||
array: &arrow_array::TimestampMillisecondArray, | ||
) -> Result<ArrayImpl, ArrayError> { | ||
Ok(ArrayImpl::Timestamp(array.into())) | ||
} | ||
|
||
fn from_timestampms_some_array( | ||
&self, | ||
array: &arrow_array::TimestampMillisecondArray, | ||
) -> Result<ArrayImpl, ArrayError> { | ||
Ok(ArrayImpl::Timestamptz(array.into())) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also don't get why we need to add different methods to convert timestamp in different units. IIUC, the current conversion logic won't lose any precision. What happened if we just add different units of the timestamp type to converts
macro and rely on the original implementation to do the conversion?
converts!(TimestampArray, arrow_array::TimestampMillisecondArray, @map);
converts!(TimestampArray, arrow_array::TimestampSecondArray, @map);
converts!(TimestampArray, arrow_array::TimestampNanosecondArray, @map);
In order not to invade the |
TimeUnit::Second => self | ||
.0 | ||
.signed_duration_since(NaiveDateTime::default()) | ||
.num_seconds(), | ||
TimeUnit::Millisecond => self | ||
.0 | ||
.signed_duration_since(NaiveDateTime::default()) | ||
.num_milliseconds(), | ||
TimeUnit::Microsecond => self | ||
.0 | ||
.signed_duration_since(NaiveDateTime::default()) | ||
.num_microseconds() | ||
.unwrap(), | ||
TimeUnit::Nanosecond => self | ||
.0 | ||
.signed_duration_since(NaiveDateTime::default()) | ||
.num_nanoseconds() | ||
.unwrap(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can use DateTime::timestamp_xxx()
here: https://doc.servo.org/chrono/struct.DateTime.html#method.timestamp_micros
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not resolved.
…nto wcy/fix_parquet_type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM
TimeUnit::Second => self | ||
.0 | ||
.signed_duration_since(NaiveDateTime::default()) | ||
.num_seconds(), | ||
TimeUnit::Millisecond => self | ||
.0 | ||
.signed_duration_since(NaiveDateTime::default()) | ||
.num_milliseconds(), | ||
TimeUnit::Microsecond => self | ||
.0 | ||
.signed_duration_since(NaiveDateTime::default()) | ||
.num_microseconds() | ||
.unwrap(), | ||
TimeUnit::Nanosecond => self | ||
.0 | ||
.signed_duration_since(NaiveDateTime::default()) | ||
.num_nanoseconds() | ||
.unwrap(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not resolved.
} | ||
Timestamp(Millisecond, Some(_)) => { | ||
self.from_timestampms_some_array(array.as_any().downcast_ref().unwrap()) | ||
} | ||
Timestamp(Microsecond, None) => { | ||
self.from_timestampus_array(array.as_any().downcast_ref().unwrap()) | ||
} | ||
Timestamp(Microsecond, Some(_)) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not directly related to this PR but I wonder why we ignore the timezone (Some(_)
) from arrow timestamp when constructing Timestamptz
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Introduced in #17201
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we have encountered three issues regarding the conversion of the timestamp type:
- When converting Arrow's timestamp type to our timestamp, we should decide whether to convert it to
timestamp
ortimestampz
based on whether there is a timezone (none or some). - When converting Arrow's timestamp type to our timestamp, only microseconds were converted, while the other three time units were not converted.
- In type comparisons, Arrow's
timestamp(_, none)
should match with rw’s timestamp, whiletimestamp(_, some)
should match with rw’stimestamptz
. However, previously, apart from microseconds, the other units did not match.
#17201 fix 1, and this pr fix 2 and 3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand. Correct me if I am wrong, I think 1 is not fixed completely because we don't use the correct timezone for timestamptz based on the arrow data type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline, indeed this is another bug: during the conversion, the timezone should be handled using Arrow's timezone()
, as we are currently using the default timezone. I will fix this in the next PR. Thanks to @hzxa21 for identifying this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Double checked, their is no correctness issue:
When the timezone in Arrow is Some(_)
, the i64 always stores UTC time, which is consistent with PG/RW's timestamptz. Therefore, we only need to distinguish between None and Some(_), as the contents of Some do not affect the actual value; they are merely for display purposes.
refer to https://docs.rs/arrow-schema/53.2.0/arrow_schema/enum.DataType.html#timestamps-with-a-non-empty-timezone
TimeUnit::Second => self.timestamp(), | ||
TimeUnit::Millisecond => self.timestamp_millis(), | ||
TimeUnit::Microsecond => self.timestamp_micros(), | ||
TimeUnit::Nanosecond => self.timestamp_nanos().unwrap_or_default(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use unwrap
instead of unwrap_or_default
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better not to panic when I encounter a problem with the type conversion? It's better to convert to 0 by mistake than to panic a cluster
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If our TimestampTz
is in microsecond unit, I think we should never overflow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, microsecond checked_mul(1_000)
is safe, then unwrap()
safely.
… branch release-2.1 (#19401)
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
This PR primarily focuses on the conversion of the
Timestamp
type. The ArrowTimestamp
includes four types:but previously we only handled one
Microsecond
. This pr makes the comparison of data types more reasonable:risingwave_common::types::DataType::Timestamp
matcharrow_schema_iceberg::DataType::Timestamp(_, _)
.Nanosecond
,Millisecond
,Second
type.Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.