From 85f192a45755b3f15653fdc0a8fbd788086e125f Mon Sep 17 00:00:00 2001 From: Karik Isichei Date: Tue, 15 Jun 2021 18:25:11 +0200 Subject: [PATCH] ARROW-12096: [C++] Allows users to define arrow timestamp unit for Parquet INT96 timestamp Have added functionality in C++ code to allow users to define the arrow timestamp unit when reading parquet INT96 types. This avoids the overflow bug when trying to convert INT96 values which have dates which are out of bounds for Arrow NS Timestamp. See added test: `TestArrowReadWrite.DownsampleDeprecatedInt96` which demonstrates use and expected results. Main discussion of changes in [JIRA Issue ARROW-12096](https://issues.apache.org/jira/browse/ARROW-12096). Closes #10461 from isichei/ARROW-12096 Lead-authored-by: Karik Isichei Co-authored-by: Antoine Pitrou Signed-off-by: Antoine Pitrou --- .../parquet/arrow/arrow_reader_writer_test.cc | 56 +++++++++++++++++++ cpp/src/parquet/arrow/reader_internal.cc | 43 +++++++++----- cpp/src/parquet/arrow/schema.cc | 4 +- cpp/src/parquet/arrow/schema_internal.cc | 19 +++---- cpp/src/parquet/arrow/schema_internal.h | 7 ++- cpp/src/parquet/properties.h | 14 ++++- cpp/src/parquet/types.h | 43 ++++++++++++-- 7 files changed, 150 insertions(+), 36 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 677458ce37eb8..6c82b8dee7892 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -558,6 +558,35 @@ void ReadSingleColumnFileStatistics(std::unique_ptr file_reader, ASSERT_OK(StatisticsAsScalars(*statistics, min, max)); } +void DownsampleInt96RoundTrip(std::shared_ptr arrow_vector_in, + std::shared_ptr arrow_vector_out, + ::arrow::TimeUnit::type unit) { + // Create single input table of NS to be written to parquet with INT96 + auto input_schema = + ::arrow::schema({::arrow::field("f", ::arrow::timestamp(TimeUnit::NANO))}); + auto input = Table::Make(input_schema, {arrow_vector_in}); + + // Create an expected schema for each resulting table (one for each "downsampled" ts) + auto ex_schema = ::arrow::schema({::arrow::field("f", ::arrow::timestamp(unit))}); + auto ex_result = Table::Make(ex_schema, {arrow_vector_out}); + + std::shared_ptr result; + + ArrowReaderProperties arrow_reader_prop; + arrow_reader_prop.set_coerce_int96_timestamp_unit(unit); + + ASSERT_NO_FATAL_FAILURE(DoRoundtrip( + input, input->num_rows(), &result, default_writer_properties(), + ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build(), + arrow_reader_prop)); + + ASSERT_NO_FATAL_FAILURE(::arrow::AssertSchemaEqual(*ex_result->schema(), + *result->schema(), + /*check_metadata=*/false)); + + ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result)); +} + // Non-template base class for TestParquetIO, to avoid code duplication class ParquetIOTestBase : public ::testing::Test { public: @@ -1671,6 +1700,33 @@ TEST(TestArrowReadWrite, UseDeprecatedInt96) { ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result)); } +TEST(TestArrowReadWrite, DownsampleDeprecatedInt96) { + using ::arrow::ArrayFromJSON; + using ::arrow::field; + using ::arrow::schema; + + // Timestamp values at 2000-01-01 00:00:00, + // then with increment unit of 1ns, 1us, 1ms and 1s. + auto a_nano = + ArrayFromJSON(timestamp(TimeUnit::NANO), + "[946684800000000000, 946684800000000001, 946684800000001000, " + "946684800001000000, 946684801000000000]"); + auto a_micro = ArrayFromJSON(timestamp(TimeUnit::MICRO), + "[946684800000000, 946684800000000, 946684800000001, " + "946684800001000, 946684801000000]"); + auto a_milli = ArrayFromJSON( + timestamp(TimeUnit::MILLI), + "[946684800000, 946684800000, 946684800000, 946684800001, 946684801000]"); + auto a_second = + ArrayFromJSON(timestamp(TimeUnit::SECOND), + "[946684800, 946684800, 946684800, 946684800, 946684801]"); + + ASSERT_NO_FATAL_FAILURE(DownsampleInt96RoundTrip(a_nano, a_nano, TimeUnit::NANO)); + ASSERT_NO_FATAL_FAILURE(DownsampleInt96RoundTrip(a_nano, a_micro, TimeUnit::MICRO)); + ASSERT_NO_FATAL_FAILURE(DownsampleInt96RoundTrip(a_nano, a_milli, TimeUnit::MILLI)); + ASSERT_NO_FATAL_FAILURE(DownsampleInt96RoundTrip(a_nano, a_second, TimeUnit::SECOND)); +} + TEST(TestArrowReadWrite, CoerceTimestamps) { using ::arrow::ArrayFromVector; using ::arrow::field; diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index 1410a5f89e2de..0ffa3e8997007 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -353,7 +353,8 @@ Status TransferBool(RecordReader* reader, MemoryPool* pool, Datum* out) { } Status TransferInt96(RecordReader* reader, MemoryPool* pool, - const std::shared_ptr& type, Datum* out) { + const std::shared_ptr& type, Datum* out, + const ::arrow::TimeUnit::type int96_arrow_time_unit) { int64_t length = reader->values_written(); auto values = reinterpret_cast(reader->values()); ARROW_ASSIGN_OR_RAISE(auto data, @@ -365,7 +366,20 @@ Status TransferInt96(RecordReader* reader, MemoryPool* pool, // isn't representable as a 64-bit Unix timestamp. *data_ptr++ = 0; } else { - *data_ptr++ = Int96GetNanoSeconds(values[i]); + switch (int96_arrow_time_unit) { + case ::arrow::TimeUnit::NANO: + *data_ptr++ = Int96GetNanoSeconds(values[i]); + break; + case ::arrow::TimeUnit::MICRO: + *data_ptr++ = Int96GetMicroSeconds(values[i]); + break; + case ::arrow::TimeUnit::MILLI: + *data_ptr++ = Int96GetMilliSeconds(values[i]); + break; + case ::arrow::TimeUnit::SECOND: + *data_ptr++ = Int96GetSeconds(values[i]); + break; + } } } *out = std::make_shared(type, length, std::move(data), @@ -742,20 +756,19 @@ Status TransferColumnData(RecordReader* reader, std::shared_ptr value_ case ::arrow::Type::TIMESTAMP: { const ::arrow::TimestampType& timestamp_type = checked_cast<::arrow::TimestampType&>(*value_type); - switch (timestamp_type.unit()) { - case ::arrow::TimeUnit::MILLI: - case ::arrow::TimeUnit::MICRO: { - result = TransferZeroCopy(reader, value_type); - } break; - case ::arrow::TimeUnit::NANO: { - if (descr->physical_type() == ::parquet::Type::INT96) { - RETURN_NOT_OK(TransferInt96(reader, pool, value_type, &result)); - } else { + if (descr->physical_type() == ::parquet::Type::INT96) { + RETURN_NOT_OK( + TransferInt96(reader, pool, value_type, &result, timestamp_type.unit())); + } else { + switch (timestamp_type.unit()) { + case ::arrow::TimeUnit::MILLI: + case ::arrow::TimeUnit::MICRO: + case ::arrow::TimeUnit::NANO: result = TransferZeroCopy(reader, value_type); - } - } break; - default: - return Status::NotImplemented("TimeUnit not supported"); + break; + default: + return Status::NotImplemented("TimeUnit not supported"); + } } } break; default: diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 7610ce176058c..eb7fd628dfc94 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -454,7 +454,9 @@ bool IsDictionaryReadSupported(const ArrowType& type) { ::arrow::Result> GetTypeForNode( int column_index, const schema::PrimitiveNode& primitive_node, SchemaTreeContext* ctx) { - ASSIGN_OR_RAISE(std::shared_ptr storage_type, GetArrowType(primitive_node)); + ASSIGN_OR_RAISE( + std::shared_ptr storage_type, + GetArrowType(primitive_node, ctx->properties.coerce_int96_timestamp_unit())); if (ctx->properties.read_dictionary(column_index) && IsDictionaryReadSupported(*storage_type)) { return ::arrow::dictionary(::arrow::int32(), storage_type); diff --git a/cpp/src/parquet/arrow/schema_internal.cc b/cpp/src/parquet/arrow/schema_internal.cc index fbdfa09a04099..064bf4f55cc7e 100644 --- a/cpp/src/parquet/arrow/schema_internal.cc +++ b/cpp/src/parquet/arrow/schema_internal.cc @@ -179,9 +179,9 @@ Result> FromInt64(const LogicalType& logical_type) { } } -Result> GetArrowType(Type::type physical_type, - const LogicalType& logical_type, - int type_length) { +Result> GetArrowType( + Type::type physical_type, const LogicalType& logical_type, int type_length, + const ::arrow::TimeUnit::type int96_arrow_time_unit) { if (logical_type.is_invalid() || logical_type.is_null()) { return ::arrow::null(); } @@ -194,7 +194,7 @@ Result> GetArrowType(Type::type physical_type, case ParquetType::INT64: return FromInt64(logical_type); case ParquetType::INT96: - return ::arrow::timestamp(::arrow::TimeUnit::NANO); + return ::arrow::timestamp(int96_arrow_time_unit); case ParquetType::FLOAT: return ::arrow::float32(); case ParquetType::DOUBLE: @@ -211,14 +211,11 @@ Result> GetArrowType(Type::type physical_type, } } -Result> GetArrowType(const schema::PrimitiveNode& primitive) { +Result> GetArrowType( + const schema::PrimitiveNode& primitive, + const ::arrow::TimeUnit::type int96_arrow_time_unit) { return GetArrowType(primitive.physical_type(), *primitive.logical_type(), - primitive.type_length()); -} - -Result> GetArrowType(const ColumnDescriptor& descriptor) { - return GetArrowType(descriptor.physical_type(), *descriptor.logical_type(), - descriptor.type_length()); + primitive.type_length(), int96_arrow_time_unit); } } // namespace arrow diff --git a/cpp/src/parquet/arrow/schema_internal.h b/cpp/src/parquet/arrow/schema_internal.h index ec0d9571304a9..fb837c3ee6cab 100644 --- a/cpp/src/parquet/arrow/schema_internal.h +++ b/cpp/src/parquet/arrow/schema_internal.h @@ -40,9 +40,12 @@ Result> GetArrowType(Type::type physical_type int type_length); Result> GetArrowType( - const schema::PrimitiveNode& primitive); + Type::type physical_type, const LogicalType& logical_type, int type_length, + ::arrow::TimeUnit::type int96_arrow_time_unit = ::arrow::TimeUnit::NANO); + Result> GetArrowType( - const ColumnDescriptor& descriptor); + const schema::PrimitiveNode& primitive, + ::arrow::TimeUnit::type int96_arrow_time_unit = ::arrow::TimeUnit::NANO); } // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 5018fff9531f5..d217b8efa5294 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -575,7 +575,8 @@ class PARQUET_EXPORT ArrowReaderProperties { read_dict_indices_(), batch_size_(kArrowDefaultBatchSize), pre_buffer_(false), - cache_options_(::arrow::io::CacheOptions::Defaults()) {} + cache_options_(::arrow::io::CacheOptions::Defaults()), + coerce_int96_timestamp_unit_(::arrow::TimeUnit::NANO) {} void set_use_threads(bool use_threads) { use_threads_ = use_threads; } @@ -620,6 +621,16 @@ class PARQUET_EXPORT ArrowReaderProperties { const ::arrow::io::IOContext& io_context() const { return io_context_; } + /// Set timestamp unit to use for deprecated INT96-encoded timestamps + /// (default is NANO). + void set_coerce_int96_timestamp_unit(::arrow::TimeUnit::type unit) { + coerce_int96_timestamp_unit_ = unit; + } + + ::arrow::TimeUnit::type coerce_int96_timestamp_unit() const { + return coerce_int96_timestamp_unit_; + } + private: bool use_threads_; std::unordered_set read_dict_indices_; @@ -627,6 +638,7 @@ class PARQUET_EXPORT ArrowReaderProperties { bool pre_buffer_; ::arrow::io::IOContext io_context_; ::arrow::io::CacheOptions cache_options_; + ::arrow::TimeUnit::type coerce_int96_timestamp_unit_; }; /// EXPERIMENTAL: Constructs the default ArrowReaderProperties diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index 4529dbe613325..6bd67f1ee5fec 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -591,15 +591,46 @@ static inline void Int96SetNanoSeconds(parquet::Int96& i96, int64_t nanoseconds) std::memcpy(&i96.value, &nanoseconds, sizeof(nanoseconds)); } -static inline int64_t Int96GetNanoSeconds(const parquet::Int96& i96) { +struct DecodedInt96 { + uint64_t days_since_epoch; + uint64_t nanoseconds; +}; + +static inline DecodedInt96 DecodeInt96Timestamp(const parquet::Int96& i96) { // We do the computations in the unsigned domain to avoid unsigned behaviour // on overflow. - uint64_t days_since_epoch = - i96.value[2] - static_cast(kJulianToUnixEpochDays); - uint64_t nanoseconds = 0; + DecodedInt96 result; + result.days_since_epoch = i96.value[2] - static_cast(kJulianToUnixEpochDays); + result.nanoseconds = 0; + + memcpy(&result.nanoseconds, &i96.value, sizeof(uint64_t)); + return result; +} + +static inline int64_t Int96GetNanoSeconds(const parquet::Int96& i96) { + const auto decoded = DecodeInt96Timestamp(i96); + return static_cast(decoded.days_since_epoch * kNanosecondsPerDay + + decoded.nanoseconds); +} + +static inline int64_t Int96GetMicroSeconds(const parquet::Int96& i96) { + const auto decoded = DecodeInt96Timestamp(i96); + uint64_t microseconds = decoded.nanoseconds / static_cast(1000); + return static_cast(decoded.days_since_epoch * kMicrosecondsPerDay + + microseconds); +} + +static inline int64_t Int96GetMilliSeconds(const parquet::Int96& i96) { + const auto decoded = DecodeInt96Timestamp(i96); + uint64_t milliseconds = decoded.nanoseconds / static_cast(1000000); + return static_cast(decoded.days_since_epoch * kMillisecondsPerDay + + milliseconds); +} - memcpy(&nanoseconds, &i96.value, sizeof(uint64_t)); - return static_cast(days_since_epoch * kNanosecondsPerDay + nanoseconds); +static inline int64_t Int96GetSeconds(const parquet::Int96& i96) { + const auto decoded = DecodeInt96Timestamp(i96); + uint64_t seconds = decoded.nanoseconds / static_cast(1000000000); + return static_cast(decoded.days_since_epoch * kSecondsPerDay + seconds); } static inline std::string Int96ToString(const Int96& a) {