Skip to content

Commit

Permalink
refactor(source): enhance parquet file source (#19221)
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu committed Nov 15, 2024
1 parent 15492b2 commit be6c273
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 58 deletions.
63 changes: 50 additions & 13 deletions e2e_test/s3/fs_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,14 @@ def gen_data(file_num, item_num_per_file):
'test_bytea': pa.scalar(b'\xDe00BeEf', type=pa.binary()),
'test_date': pa.scalar(datetime.now().date(), type=pa.date32()),
'test_time': pa.scalar(datetime.now().time(), type=pa.time64('us')),
'test_timestamp': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us')),
'test_timestamptz': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('us', tz='+00:00')),
'test_timestamp_s': pa.scalar(datetime.now().timestamp(), type=pa.timestamp('s')),
'test_timestamp_ms': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('ms')),
'test_timestamp_us': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us')),
'test_timestamp_ns': pa.scalar(datetime.now().timestamp() * 1000000000, type=pa.timestamp('ns')),
'test_timestamptz_s': pa.scalar(datetime.now().timestamp(), type=pa.timestamp('s', tz='+00:00')),
'test_timestamptz_ms': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('ms', tz='+00:00')),
'test_timestamptz_us': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us', tz='+00:00')),
'test_timestamptz_ns': pa.scalar(datetime.now().timestamp() * 1000000000, type=pa.timestamp('ns', tz='+00:00')),
} for item_id in range(item_num_per_file)]
for file_id in range(file_num)
]
Expand Down Expand Up @@ -60,8 +66,15 @@ def _table():
test_bytea bytea,
test_date date,
test_time time,
test_timestamp timestamp,
test_timestamptz timestamptz,
test_timestamp_s timestamp,
test_timestamp_ms timestamp,
test_timestamp_us timestamp,
test_timestamp_ns timestamp,
test_timestamptz_s timestamptz,
test_timestamptz_ms timestamptz,
test_timestamptz_us timestamptz,
test_timestamptz_ns timestamptz
) WITH (
connector = 's3',
match_pattern = '*.parquet',
Expand Down Expand Up @@ -128,8 +141,14 @@ def _table():
test_bytea,
test_date,
test_time,
test_timestamp,
test_timestamptz
test_timestamp_s,
test_timestamp_ms,
test_timestamp_us,
test_timestamp_ns,
test_timestamptz_s,
test_timestamptz_ms,
test_timestamptz_us,
test_timestamptz_ns
from {_table()} WITH (
connector = 's3',
match_pattern = '*.parquet',
Expand All @@ -147,7 +166,7 @@ def _table():
print('Sink into s3 in parquet encode...')
# Execute a SELECT statement
cur.execute(f'''CREATE TABLE test_parquet_sink_table(
id bigint primary key,
id bigint primary key,\
name TEXT,
sex bigint,
mark bigint,
Expand All @@ -158,8 +177,14 @@ def _table():
test_bytea bytea,
test_date date,
test_time time,
test_timestamp timestamp,
test_timestamptz timestamptz,
test_timestamp_s timestamp,
test_timestamp_ms timestamp,
test_timestamp_us timestamp,
test_timestamp_ns timestamp,
test_timestamptz_s timestamptz,
test_timestamptz_ms timestamptz,
test_timestamptz_us timestamptz,
test_timestamptz_ns timestamptz
) WITH (
connector = 's3',
match_pattern = 'test_parquet_sink/*.parquet',
Expand Down Expand Up @@ -196,8 +221,14 @@ def _table():
test_bytea,
test_date,
test_time,
test_timestamp,
test_timestamptz
test_timestamp_s,
test_timestamp_ms,
test_timestamp_us,
test_timestamp_ns,
test_timestamptz_s,
test_timestamptz_ms,
test_timestamptz_us,
test_timestamptz_ns
from {_table()} WITH (
connector = 's3',
match_pattern = '*.parquet',
Expand Down Expand Up @@ -226,8 +257,14 @@ def _table():
test_bytea bytea,
test_date date,
test_time time,
test_timestamp timestamp,
test_timestamptz timestamptz,
test_timestamp_s timestamp,
test_timestamp_ms timestamp,
test_timestamp_us timestamp,
test_timestamp_ns timestamp,
test_timestamptz_s timestamptz,
test_timestamptz_ms timestamptz,
test_timestamptz_us timestamptz,
test_timestamptz_ns timestamptz
) WITH (
connector = 's3',
match_pattern = 'test_json_sink/*.json',
Expand Down
184 changes: 163 additions & 21 deletions src/common/src/array/arrow/arrow_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

use std::fmt::Write;

use arrow_53_schema::TimeUnit;
use arrow_array::array;
use arrow_array::cast::AsArray;
use arrow_buffer::OffsetBuffer;
Expand Down Expand Up @@ -512,6 +513,12 @@ pub trait FromArrow {
Time64(Microsecond) => DataType::Time,
Timestamp(Microsecond, None) => DataType::Timestamp,
Timestamp(Microsecond, Some(_)) => DataType::Timestamptz,
Timestamp(Second, None) => DataType::Timestamp,
Timestamp(Second, Some(_)) => DataType::Timestamptz,
Timestamp(Millisecond, None) => DataType::Timestamp,
Timestamp(Millisecond, Some(_)) => DataType::Timestamptz,
Timestamp(Nanosecond, None) => DataType::Timestamp,
Timestamp(Nanosecond, Some(_)) => DataType::Timestamptz,
Interval(MonthDayNano) => DataType::Interval,
Utf8 => DataType::Varchar,
Binary => DataType::Bytea,
Expand Down Expand Up @@ -572,7 +579,6 @@ pub trait FromArrow {
if let Some(type_name) = field.metadata().get("ARROW:extension:name") {
return self.from_extension_array(type_name, array);
}

match array.data_type() {
Boolean => self.from_bool_array(array.as_any().downcast_ref().unwrap()),
Int16 => self.from_int16_array(array.as_any().downcast_ref().unwrap()),
Expand All @@ -584,12 +590,30 @@ pub trait FromArrow {
Float64 => self.from_float64_array(array.as_any().downcast_ref().unwrap()),
Date32 => self.from_date32_array(array.as_any().downcast_ref().unwrap()),
Time64(Microsecond) => self.from_time64us_array(array.as_any().downcast_ref().unwrap()),
Timestamp(Second, None) => {
self.from_timestampsecond_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Second, Some(_)) => {
self.from_timestampsecond_some_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Millisecond, None) => {
self.from_timestampms_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Millisecond, Some(_)) => {
self.from_timestampms_some_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Microsecond, None) => {
self.from_timestampus_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Microsecond, Some(_)) => {
self.from_timestampus_some_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Nanosecond, None) => {
self.from_timestampns_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Nanosecond, Some(_)) => {
self.from_timestampns_some_array(array.as_any().downcast_ref().unwrap())
}
Interval(MonthDayNano) => {
self.from_interval_array(array.as_any().downcast_ref().unwrap())
}
Expand Down Expand Up @@ -692,6 +716,33 @@ pub trait FromArrow {
Ok(ArrayImpl::Time(array.into()))
}

fn from_timestampsecond_array(
&self,
array: &arrow_array::TimestampSecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamp(array.into()))
}
fn from_timestampsecond_some_array(
&self,
array: &arrow_array::TimestampSecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamptz(array.into()))
}

fn from_timestampms_array(
&self,
array: &arrow_array::TimestampMillisecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamp(array.into()))
}

fn from_timestampms_some_array(
&self,
array: &arrow_array::TimestampMillisecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamptz(array.into()))
}

fn from_timestampus_array(
&self,
array: &arrow_array::TimestampMicrosecondArray,
Expand All @@ -706,6 +757,20 @@ pub trait FromArrow {
Ok(ArrayImpl::Timestamptz(array.into()))
}

fn from_timestampns_array(
&self,
array: &arrow_array::TimestampNanosecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamp(array.into()))
}

fn from_timestampns_some_array(
&self,
array: &arrow_array::TimestampNanosecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamptz(array.into()))
}

fn from_interval_array(
&self,
array: &arrow_array::IntervalMonthDayNanoArray,
Expand Down Expand Up @@ -842,6 +907,44 @@ macro_rules! converts {
}
};
}

macro_rules! converts_with_timeunit {
($ArrayType:ty, $ArrowType:ty, $time_unit:expr, @map) => {

impl From<&$ArrayType> for $ArrowType {
fn from(array: &$ArrayType) -> Self {
array.iter().map(|o| o.map(|v| v.into_arrow_with_unit($time_unit))).collect()
}
}

impl From<&$ArrowType> for $ArrayType {
fn from(array: &$ArrowType) -> Self {
array.iter().map(|o| {
o.map(|v| {
let timestamp = <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit);
timestamp
})
}).collect()
}
}

impl From<&[$ArrowType]> for $ArrayType {
fn from(arrays: &[$ArrowType]) -> Self {
arrays
.iter()
.flat_map(|a| a.iter())
.map(|o| {
o.map(|v| {
<<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit)
})
})
.collect()
}
}

};
}

converts!(BoolArray, arrow_array::BooleanArray);
converts!(I16Array, arrow_array::Int16Array);
converts!(I32Array, arrow_array::Int32Array);
Expand All @@ -854,11 +957,19 @@ converts!(Utf8Array, arrow_array::StringArray);
converts!(Utf8Array, arrow_array::LargeStringArray);
converts!(DateArray, arrow_array::Date32Array, @map);
converts!(TimeArray, arrow_array::Time64MicrosecondArray, @map);
converts!(TimestampArray, arrow_array::TimestampMicrosecondArray, @map);
converts!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, @map);
converts!(IntervalArray, arrow_array::IntervalMonthDayNanoArray, @map);
converts!(SerialArray, arrow_array::Int64Array, @map);

converts_with_timeunit!(TimestampArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map);
converts_with_timeunit!(TimestampArray, arrow_array::TimestampMillisecondArray, TimeUnit::Millisecond, @map);
converts_with_timeunit!(TimestampArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map);
converts_with_timeunit!(TimestampArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map);

converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map);
converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMillisecondArray,TimeUnit::Millisecond, @map);
converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map);
converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map);

/// Converts RisingWave value from and into Arrow value.
trait FromIntoArrow {
/// The corresponding element type in the Arrow array.
Expand All @@ -867,6 +978,16 @@ trait FromIntoArrow {
fn into_arrow(self) -> Self::ArrowType;
}

/// Converts RisingWave value from and into Arrow value.
/// Specifically used for converting timestamp types according to timeunit.
trait FromIntoArrowWithUnit {
type ArrowType;
/// The timestamp type used to distinguish different time units, only utilized when the Arrow type is a timestamp.
type TimestampType;
fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self;
fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType;
}

impl FromIntoArrow for Serial {
type ArrowType = i64;

Expand Down Expand Up @@ -936,34 +1057,55 @@ impl FromIntoArrow for Time {
}
}

impl FromIntoArrow for Timestamp {
impl FromIntoArrowWithUnit for Timestamp {
type ArrowType = i64;
type TimestampType = TimeUnit;

fn from_arrow(value: Self::ArrowType) -> Self {
Timestamp(
DateTime::from_timestamp((value / 1_000_000) as _, (value % 1_000_000 * 1000) as _)
.unwrap()
.naive_utc(),
)
fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self {
match time_unit {
TimeUnit::Second => {
Timestamp(DateTime::from_timestamp(value as _, 0).unwrap().naive_utc())
}
TimeUnit::Millisecond => {
Timestamp(DateTime::from_timestamp_millis(value).unwrap().naive_utc())
}
TimeUnit::Microsecond => {
Timestamp(DateTime::from_timestamp_micros(value).unwrap().naive_utc())
}
TimeUnit::Nanosecond => Timestamp(DateTime::from_timestamp_nanos(value).naive_utc()),
}
}

fn into_arrow(self) -> Self::ArrowType {
self.0
.signed_duration_since(NaiveDateTime::default())
.num_microseconds()
.unwrap()
fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType {
match time_unit {
TimeUnit::Second => self.0.and_utc().timestamp(),
TimeUnit::Millisecond => self.0.and_utc().timestamp_millis(),
TimeUnit::Microsecond => self.0.and_utc().timestamp_micros(),
TimeUnit::Nanosecond => self.0.and_utc().timestamp_nanos_opt().unwrap(),
}
}
}

impl FromIntoArrow for Timestamptz {
impl FromIntoArrowWithUnit for Timestamptz {
type ArrowType = i64;

fn from_arrow(value: Self::ArrowType) -> Self {
Timestamptz::from_micros(value)
type TimestampType = TimeUnit;

fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self {
match time_unit {
TimeUnit::Second => Timestamptz::from_secs(value).unwrap_or_default(),
TimeUnit::Millisecond => Timestamptz::from_millis(value).unwrap_or_default(),
TimeUnit::Microsecond => Timestamptz::from_micros(value),
TimeUnit::Nanosecond => Timestamptz::from_nanos(value).unwrap_or_default(),
}
}

fn into_arrow(self) -> Self::ArrowType {
self.timestamp_micros()
fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType {
match time_unit {
TimeUnit::Second => self.timestamp(),
TimeUnit::Millisecond => self.timestamp_millis(),
TimeUnit::Microsecond => self.timestamp_micros(),
TimeUnit::Nanosecond => self.timestamp_nanos().unwrap(),
}
}
}

Expand Down
Loading

0 comments on commit be6c273

Please sign in to comment.