Skip to content

Commit

Permalink
[BugFix] overflow in parsing parquet int96 timestamp (StarRocks#22356)
Browse files Browse the repository at this point in the history
The parquet format would use int96 to store big datetime like `9999-12-31 23:59:59`, which leads to overflow when it is cast to int64.
The parquet reader provides an option to set the unit of int96 timestamp in apache/arrow#10461.
This PR adds a config `parquet_coerce_int96_timestamp_unit` for BE to set the unit of reading parquet int96 timestamp.
With the default value `MICRO`, the maximum datetime value `9999-12-31 23:59:59.999999` in MySQL could be correctly handled.
  • Loading branch information
rickif authored and abc982627271 committed Jun 5, 2023
1 parent 435c8fc commit d74bf66
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 5 deletions.
14 changes: 13 additions & 1 deletion be/src/exec/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <utility>

#include "common/config.h"
#include "common/logging.h"
#include "fmt/format.h"
#include "runtime/descriptors.h"
Expand Down Expand Up @@ -73,9 +74,20 @@ Status ParquetReaderWrap::next_selected_row_group() {
Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
const std::string& timezone) {
try {
parquet::ArrowReaderProperties arrow_reader_properties;
/*
* timestamp unit to use for INT96-encoded timestamps in parquet.
* SECOND, MICRO, MILLI, NANO
* We use MICRO second as the unit to parse int96 timestamp, which is the precision of DATETIME/TIMESTAMP in MySQL.
* https://dev.mysql.com/doc/refman/8.0/en/datetime.html
* A DATETIME or TIMESTAMP value can include a trailing fractional seconds part in up to microseconds (6 digits) precision
*/
arrow_reader_properties.set_coerce_int96_timestamp_unit(arrow::TimeUnit::MICRO);

// new file reader for parquet file
auto st = parquet::arrow::FileReader::Make(arrow::default_memory_pool(),
parquet::ParquetFileReader::Open(_parquet, _properties), &_reader);
parquet::ParquetFileReader::Open(_parquet, _properties),
arrow_reader_properties, &_reader);
if (!st.ok()) {
LOG(WARNING) << "Failed to create parquet file reader. error: " << st.ToString()
<< ", filename: " << _filename;
Expand Down
32 changes: 28 additions & 4 deletions be/test/exec/parquet_scanner_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,9 @@ class ParquetScannerTest : public ::testing::Test {
}

template <bool is_nullable>
ChunkPtr test_json_column(const std::vector<std::string>& columns_from_file,
ChunkPtr get_chunk(const std::vector<std::string>& columns_from_file,
const std::unordered_map<size_t, ::starrocks::TExpr>& dst_slot_exprs,
std::string specific_file) {
std::string specific_file, size_t expected_rows) {
std::vector<std::string> file_names{std::move(specific_file)};
const std::vector<std::string>& column_names = columns_from_file;

Expand All @@ -297,7 +297,7 @@ class ParquetScannerTest : public ::testing::Test {
}
result = chunk;
};
validate(scanner, 3, check);
validate(scanner, expected_rows, check);

return result;
}
Expand Down Expand Up @@ -574,7 +574,7 @@ TEST_F(ParquetScannerTest, test_to_json) {
std::vector<std::string> column_names{column_name};
std::cerr << "test " << column_name << std::endl;

ChunkPtr chunk = test_json_column<true>(column_names, slot_map, parquet_file_name);
ChunkPtr chunk = get_chunk<true>(column_names, slot_map, parquet_file_name, 3);
ASSERT_EQ(1, chunk->num_columns());

auto col = chunk->columns()[0];
Expand Down Expand Up @@ -619,4 +619,28 @@ TEST_F(ParquetScannerTest, test_arrow_null) {
validate(scanner, 3, check);
}

TEST_F(ParquetScannerTest, int96_timestamp) {
const std::string parquet_file_name = test_exec_dir + "/test_data/parquet_data/int96_timestamp.parquet";
std::vector<std::tuple<std::string, std::vector<std::string>>> test_cases = {
{"col_datetime", {"9999-12-31 23:59:59", "2006-01-02 15:04:05"}}};

std::vector<std::string> columns_from_path;
std::vector<std::string> path_values;
std::unordered_map<size_t, TExpr> slot_map;

for (auto& [column_name, expected] : test_cases) {
std::vector<std::string> column_names{column_name};

ChunkPtr chunk = get_chunk<true>(column_names, slot_map, parquet_file_name, 2);
ASSERT_EQ(1, chunk->num_columns());

auto col = chunk->columns()[0];
for (int i = 0; i < col->size(); i++) {
std::string result = col->debug_item(i);
std::string expect = expected[i];
EXPECT_EQ(expect, result);
}
}
}

} // namespace starrocks
Binary file not shown.

0 comments on commit d74bf66

Please sign in to comment.