Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(source): enhance parquet file source #19221

Merged
merged 13 commits into from
Nov 12, 2024
36 changes: 24 additions & 12 deletions e2e_test/s3/fs_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ def gen_data(file_num, item_num_per_file):
'test_bytea': pa.scalar(b'\xDe00BeEf', type=pa.binary()),
'test_date': pa.scalar(datetime.now().date(), type=pa.date32()),
'test_time': pa.scalar(datetime.now().time(), type=pa.time64('us')),
'test_timestamp': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us')),
'test_timestamptz': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('us', tz='+00:00')),
'test_timestamp_s': pa.scalar(datetime.now().timestamp(), type=pa.timestamp('s')),
'test_timestamp_ms': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('ms')),
'test_timestamp_us': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us')),
'test_timestamp_ns': pa.scalar(datetime.now().timestamp() * 1000000000, type=pa.timestamp('ns')),
} for item_id in range(item_num_per_file)]
for file_id in range(file_num)
]
Expand Down Expand Up @@ -60,8 +62,10 @@ def _table():
test_bytea bytea,
test_date date,
test_time time,
test_timestamp timestamp,
test_timestamptz timestamptz,
test_timestamp_s timestamp,
test_timestamp_ms timestamp,
test_timestamp_us timestamp,
test_timestamp_ns timestamp
) WITH (
connector = 's3',
match_pattern = '*.parquet',
Expand Down Expand Up @@ -128,8 +132,10 @@ def _table():
test_bytea,
test_date,
test_time,
test_timestamp,
test_timestamptz
test_timestamp_s,
test_timestamp_ms,
test_timestamp_us,
test_timestamp_ns
from {_table()} WITH (
connector = 's3',
match_pattern = '*.parquet',
Expand Down Expand Up @@ -158,8 +164,10 @@ def _table():
test_bytea bytea,
test_date date,
test_time time,
test_timestamp timestamp,
test_timestamptz timestamptz,
test_timestamp_s timestamp,
test_timestamp_ms timestamp,
test_timestamp_us timestamp,
test_timestamp_ns timestamp
) WITH (
connector = 's3',
match_pattern = 'test_parquet_sink/*.parquet',
Expand Down Expand Up @@ -196,8 +204,10 @@ def _table():
test_bytea,
test_date,
test_time,
test_timestamp,
test_timestamptz
test_timestamp_s,
test_timestamp_ms,
test_timestamp_us,
test_timestamp_ns
from {_table()} WITH (
connector = 's3',
match_pattern = '*.parquet',
Expand Down Expand Up @@ -226,8 +236,10 @@ def _table():
test_bytea bytea,
test_date date,
test_time time,
test_timestamp timestamp,
test_timestamptz timestamptz,
test_timestamp_s timestamp,
test_timestamp_ms timestamp,
test_timestamp_us timestamp,
test_timestamp_ns timestamp
) WITH (
connector = 's3',
match_pattern = 'test_json_sink/*.json',
Expand Down
66 changes: 65 additions & 1 deletion src/common/src/array/arrow/arrow_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,6 @@ pub trait FromArrow {
if let Some(type_name) = field.metadata().get("ARROW:extension:name") {
return self.from_extension_array(type_name, array);
}

match array.data_type() {
Boolean => self.from_bool_array(array.as_any().downcast_ref().unwrap()),
Int16 => self.from_int16_array(array.as_any().downcast_ref().unwrap()),
Expand All @@ -584,12 +583,30 @@ pub trait FromArrow {
Float64 => self.from_float64_array(array.as_any().downcast_ref().unwrap()),
Date32 => self.from_date32_array(array.as_any().downcast_ref().unwrap()),
Time64(Microsecond) => self.from_time64us_array(array.as_any().downcast_ref().unwrap()),
Timestamp(Second, None) => {
self.from_timestampsecond_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Second, Some(_)) => {
self.from_timestampsecond_some_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Millisecond, None) => {
self.from_timestampms_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Millisecond, Some(_)) => {
self.from_timestampms_some_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Microsecond, None) => {
self.from_timestampus_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Microsecond, Some(_)) => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not directly related to this PR but I wonder why we ignore the timezone (Some(_)) from arrow timestamp when constructing Timestamptz

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduced in #17201

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we have encountered three issues regarding the conversion of the timestamp type:

  1. When converting Arrow's timestamp type to our timestamp, we should decide whether to convert it to timestamp or timestampz based on whether there is a timezone (none or some).
  2. When converting Arrow's timestamp type to our timestamp, only microseconds were converted, while the other three time units were not converted.
  3. In type comparisons, Arrow's timestamp(_, none) should match with rw’s timestamp, while timestamp(_, some) should match with rw’s timestamptz. However, previously, apart from microseconds, the other units did not match.

#17201 fix 1, and this pr fix 2 and 3.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand. Correct me if I am wrong, I think 1 is not fixed completely because we don't use the correct timezone for timestamptz based on the arrow data type.

Copy link
Contributor Author

@wcy-fdu wcy-fdu Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, indeed this is another bug: during the conversion, the timezone should be handled using Arrow's timezone(), as we are currently using the default timezone. I will fix this in the next PR. Thanks to @hzxa21 for identifying this issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double checked, their is no correctness issue:
When the timezone in Arrow is Some(_), the i64 always stores UTC time, which is consistent with PG/RW's timestamptz. Therefore, we only need to distinguish between None and Some(_), as the contents of Some do not affect the actual value; they are merely for display purposes.
refer to https://docs.rs/arrow-schema/53.2.0/arrow_schema/enum.DataType.html#timestamps-with-a-non-empty-timezone

self.from_timestampus_some_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Nanosecond, None) => {
self.from_timestampns_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Nanosecond, Some(_)) => {
self.from_timestampns_some_array(array.as_any().downcast_ref().unwrap())
}
Interval(MonthDayNano) => {
self.from_interval_array(array.as_any().downcast_ref().unwrap())
}
Expand Down Expand Up @@ -692,6 +709,33 @@ pub trait FromArrow {
Ok(ArrayImpl::Time(array.into()))
}

fn from_timestampsecond_array(
&self,
array: &arrow_array::TimestampSecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamp(array.into()))
}
fn from_timestampsecond_some_array(
&self,
array: &arrow_array::TimestampSecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamp(array.into()))
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
}

fn from_timestampms_array(
&self,
array: &arrow_array::TimestampMillisecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamp(array.into()))
}

fn from_timestampms_some_array(
&self,
array: &arrow_array::TimestampMillisecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamptz(array.into()))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious, we don't need to do any conversion between these types? All of them can be implemented in the same way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, just added such logic and verify it works. I constructed a parquet file containing timestamps of four time units, then imported it into RisingWave and it was parsed successfully:
image

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't get why we need to add different methods to convert timestamp in different units. IIUC, the current conversion logic won't lose any precision. What happened if we just add different units of the timestamp type to converts macro and rely on the original implementation to do the conversion?

converts!(TimestampArray, arrow_array::TimestampMillisecondArray, @map);
converts!(TimestampArray, arrow_array::TimestampSecondArray, @map);
converts!(TimestampArray, arrow_array::TimestampNanosecondArray, @map);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our previous code, we only handled timestamps in microseconds, so this conversion is only correct for timestamp(MicroSecond, _). As you mentioned in the logic, the seconds part is % 1_000_000, and the nanoseconds part is also % 1_000_000. If all four time units are converted using this logic, it will yield incorrect results, as follows:
image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The correct approach should be as shown in the PR, where different handling is done based on the time unit. For example, for milliseconds, it should be %1000 instead of %100000:

 TimeUnit::Millisecond => Timestamp(
                DateTime::from_timestamp((value / 1_000) as _, (value % 1_000 * 1000000) as _)
                    .unwrap()
                    .naive_utc(),
            ),

So I think specifying the time unit is essential.


fn from_timestampus_array(
&self,
array: &arrow_array::TimestampMicrosecondArray,
Expand All @@ -706,6 +750,20 @@ pub trait FromArrow {
Ok(ArrayImpl::Timestamptz(array.into()))
}

fn from_timestampns_array(
&self,
array: &arrow_array::TimestampNanosecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamp(array.into()))
}

fn from_timestampns_some_array(
&self,
array: &arrow_array::TimestampNanosecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamptz(array.into()))
}

fn from_interval_array(
&self,
array: &arrow_array::IntervalMonthDayNanoArray,
Expand Down Expand Up @@ -854,8 +912,14 @@ converts!(Utf8Array, arrow_array::StringArray);
converts!(Utf8Array, arrow_array::LargeStringArray);
converts!(DateArray, arrow_array::Date32Array, @map);
converts!(TimeArray, arrow_array::Time64MicrosecondArray, @map);
converts!(TimestampArray, arrow_array::TimestampSecondArray, @map);
converts!(TimestampArray, arrow_array::TimestampMillisecondArray, @map);
converts!(TimestampArray, arrow_array::TimestampMicrosecondArray, @map);
converts!(TimestampArray, arrow_array::TimestampNanosecondArray, @map);
converts!(TimestamptzArray, arrow_array::TimestampSecondArray, @map);
converts!(TimestamptzArray, arrow_array::TimestampMillisecondArray, @map);
converts!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, @map);
converts!(TimestamptzArray, arrow_array::TimestampNanosecondArray, @map);
converts!(IntervalArray, arrow_array::IntervalMonthDayNanoArray, @map);
converts!(SerialArray, arrow_array::Int64Array, @map);

Expand Down
29 changes: 8 additions & 21 deletions src/connector/src/parser/parquet_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use deltalake::parquet::arrow::async_reader::AsyncFileReader;
use futures_async_stream::try_stream;
use risingwave_common::array::arrow::arrow_array_iceberg::RecordBatch;
use risingwave_common::array::arrow::{arrow_schema_iceberg, IcebergArrowConvert};
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::array::{ArrayBuilderImpl, DataChunk, StreamChunk};
use risingwave_common::bail;
use risingwave_common::types::{Datum, ScalarImpl};
Expand Down Expand Up @@ -104,32 +104,19 @@ impl ParquetParser {
crate::source::SourceColumnType::Normal => {
match source_column.is_hidden_addition_col {
false => {
let rw_data_type = &source_column.data_type;
let rw_data_type: &risingwave_common::types::DataType =
&source_column.data_type;
let rw_column_name = &source_column.name;

if let Some(parquet_column) =
record_batch.column_by_name(rw_column_name)
{
let arrow_field = IcebergArrowConvert
.to_arrow_field(rw_column_name, rw_data_type)?;
let converted_arrow_data_type: &arrow_schema_iceberg::DataType =
arrow_field.data_type();
if converted_arrow_data_type == parquet_column.data_type() {
let array_impl = IcebergArrowConvert
.array_from_arrow_array(&arrow_field, parquet_column)?;
let column = Arc::new(array_impl);
chunk_columns.push(column);
} else {
// data type mismatch, this column is set to null.
let mut array_builder = ArrayBuilderImpl::with_type(
column_size,
rw_data_type.clone(),
);

array_builder.append_n_null(record_batch.num_rows());
let res = array_builder.finish();
let column = Arc::new(res);
chunk_columns.push(column);
}
let array_impl = IcebergArrowConvert
.array_from_arrow_array(&arrow_field, parquet_column)?;
let column = Arc::new(array_impl);
chunk_columns.push(column);
} else {
// For columns defined in the source schema but not present in the Parquet file, null values are filled in.
let mut array_builder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use opendal::Operator;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{parquet_to_arrow_schema, ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::metadata::FileMetaData;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::array::arrow::arrow_schema_iceberg;
use risingwave_common::array::StreamChunk;
use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt;
use tokio::io::{AsyncRead, BufReader};
Expand Down Expand Up @@ -269,10 +269,10 @@ pub fn extract_valid_column_indices(
.iter()
.position(|&name| name == column.name)
.and_then(|pos| {
let arrow_field = IcebergArrowConvert
.to_arrow_field(&column.name, &column.data_type)
.ok()?;
if &arrow_field == converted_arrow_schema.field(pos) {
if is_data_type_matching(
&column.data_type,
converted_arrow_schema.field(pos).data_type(),
) {
Some(pos)
} else {
None
Expand All @@ -285,3 +285,85 @@ pub fn extract_valid_column_indices(
None => Ok(vec![]),
}
}

/// Checks if the data type in RisingWave matches the data type in a Parquet(arrow) file.
///
/// This function compares the `DataType` from RisingWave with the `DataType` from
/// Parquet file, returning `true` if they are compatible. Specifically, for `Timestamp`
/// types, it ensures that any of the four `TimeUnit` variants from Parquet
/// (i.e., `Second`, `Millisecond`, `Microsecond`, and `Nanosecond`) can be matched
/// with the corresponding `Timestamp` type in RisingWave.
pub fn is_data_type_matching(
rw_data_type: &risingwave_common::types::DataType,
arrow_data_type: &arrow_schema_iceberg::DataType,
) -> bool {
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
match rw_data_type {
risingwave_common::types::DataType::Boolean => {
matches!(arrow_data_type, arrow_schema_iceberg::DataType::Boolean)
}
risingwave_common::types::DataType::Int16 => {
matches!(arrow_data_type, arrow_schema_iceberg::DataType::Int16)
}
risingwave_common::types::DataType::Int32 => {
matches!(arrow_data_type, arrow_schema_iceberg::DataType::Int32)
}
risingwave_common::types::DataType::Int64 => {
matches!(arrow_data_type, arrow_schema_iceberg::DataType::Int64)
}
risingwave_common::types::DataType::Float32 => {
matches!(arrow_data_type, arrow_schema_iceberg::DataType::Float32)
}
risingwave_common::types::DataType::Float64 => {
matches!(arrow_data_type, arrow_schema_iceberg::DataType::Float64)
}
risingwave_common::types::DataType::Decimal => {
matches!(
arrow_data_type,
arrow_schema_iceberg::DataType::Decimal128(_, _)
) || matches!(
arrow_data_type,
arrow_schema_iceberg::DataType::Decimal256(_, _)
)
}
risingwave_common::types::DataType::Date => {
matches!(arrow_data_type, arrow_schema_iceberg::DataType::Date32)
|| matches!(arrow_data_type, arrow_schema_iceberg::DataType::Date64)
}
risingwave_common::types::DataType::Varchar => {
matches!(arrow_data_type, arrow_schema_iceberg::DataType::Utf8)
}
risingwave_common::types::DataType::Time => {
matches!(arrow_data_type, arrow_schema_iceberg::DataType::Time32(_))
|| matches!(arrow_data_type, arrow_schema_iceberg::DataType::Time64(_))
}
risingwave_common::types::DataType::Timestamp => {
matches!(
arrow_data_type,
arrow_schema_iceberg::DataType::Timestamp(_, _)
)
}
risingwave_common::types::DataType::Timestamptz => {
matches!(
arrow_data_type,
arrow_schema_iceberg::DataType::Timestamp(_, _)
)
}
risingwave_common::types::DataType::Interval => {
matches!(arrow_data_type, arrow_schema_iceberg::DataType::Interval(_))
}
risingwave_common::types::DataType::List(inner_type) => {
if let arrow_schema_iceberg::DataType::List(field_ref) = arrow_data_type {
let inner_rw_type = inner_type.clone();
let inner_arrow_type = field_ref.data_type();
is_data_type_matching(&inner_rw_type, inner_arrow_type)
} else {
false
}
}
risingwave_common::types::DataType::Map(_) => {
// Directly return false for Map types
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
false
}
_ => false, // Handle other data types as necessary
}
}
Loading