Skip to content

Commit

Permalink
[BugFix] overflow in parsing parquet int96 timestamp (StarRocks#22356)
Browse files Browse the repository at this point in the history
The parquet format would use int96 to store big datetime like `9999-12-31 23:59:59`, which leads to overflow when it is cast to int64.
The parquet reader provides an option to set the unit of int96 timestamp in apache/arrow#10461.
This PR adds a config `parquet_coerce_int96_timestamp_unit` for BE to set the unit of reading parquet int96 timestamp.
With the default value `MICRO`, the maximum datetime value `9999-12-31 23:59:59.999999` in MySQL could be correctly handled.
  • Loading branch information
rickif committed May 10, 2023
1 parent 26fbfb1 commit 658ecf2
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 6 deletions.
14 changes: 13 additions & 1 deletion be/src/exec/vectorized/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,21 @@ Status ParquetReaderWrap::next_selected_row_group() {
Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
const std::string& timezone) {
try {
parquet::ArrowReaderProperties arrow_reader_properties;
/*
* timestamp unit to use for INT96-encoded timestamps in parquet.
* SECOND, MICRO, MILLI, NANO
* We use MICRO second as the unit to parse int96 timestamp, which is the precision of DATETIME/TIMESTAMP in MySQL.
* https://dev.mysql.com/doc/refman/8.0/en/datetime.html
* A DATETIME or TIMESTAMP value can include a trailing fractional seconds part in up to microseconds (6 digits) precision
*/
arrow_reader_properties.set_coerce_int96_timestamp_unit(arrow::TimeUnit::MICRO);

// new file reader for parquet file
auto st = parquet::arrow::FileReader::Make(arrow::default_memory_pool(),
parquet::ParquetFileReader::Open(_parquet, _properties), &_reader);
parquet::ParquetFileReader::Open(_parquet, _properties),
arrow_reader_properties, &_reader);

if (!st.ok()) {
LOG(WARNING) << "failed to create parquet file reader, errmsg=" << st.ToString();
return Status::InternalError("Failed to create file reader");
Expand Down
Binary file not shown.
34 changes: 29 additions & 5 deletions be/test/exec/vectorized/parquet_scanner_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,9 @@ class ParquetScannerTest : public ::testing::Test {
}

template <bool is_nullable>
ChunkPtr test_json_column(const std::vector<std::string>& columns_from_file,
const std::unordered_map<size_t, ::starrocks::TExpr>& dst_slot_exprs,
std::string specific_file) {
ChunkPtr get_chunk(const std::vector<std::string>& columns_from_file,
const std::unordered_map<size_t, ::starrocks::TExpr>& dst_slot_exprs, std::string specific_file,
size_t expected_row) {
std::vector<std::string> file_names{specific_file};
std::vector<std::string> column_names = columns_from_file;

Expand All @@ -312,7 +312,7 @@ class ParquetScannerTest : public ::testing::Test {
}
result = chunk;
};
validate(scanner, 36865, check);
validate(scanner, expected_rows, check);

return result;
}
Expand Down Expand Up @@ -523,7 +523,7 @@ TEST_F(ParquetScannerTest, test_to_json) {
std::vector<std::string> column_names{column_name};
std::cerr << "test " << column_name << std::endl;

ChunkPtr chunk = test_json_column<true>(column_names, slot_map, parquet_file_name);
ChunkPtr chunk = get_chunk<true>(column_names, slot_map, parquet_file_name, 36865);
ASSERT_EQ(1, chunk->num_columns());

auto col = chunk->columns()[0];
Expand Down Expand Up @@ -568,4 +568,28 @@ TEST_F(ParquetScannerTest, test_arrow_null) {
validate(scanner, 3, check);
}

TEST_F(ParquetScannerTest, int96_timestamp) {
const std::string parquet_file_name = test_exec_dir + "/test_data/parquet_data/int96_timestamp.parquet";
std::vector<std::tuple<std::string, std::vector<std::string>>> test_cases = {
{"col_datetime", {"9999-12-31 23:59:59", "2006-01-02 15:04:05"}}};

std::vector<std::string> columns_from_path;
std::vector<std::string> path_values;
std::unordered_map<size_t, TExpr> slot_map;

for (auto& [column_name, expected] : test_cases) {
std::vector<std::string> column_names{column_name};

ChunkPtr chunk = get_chunk<true>(column_names, slot_map, parquet_file_name, 2);
ASSERT_EQ(1, chunk->num_columns());

auto col = chunk->columns()[0];
for (int i = 0; i < col->size(); i++) {
std::string result = col->debug_item(i);
std::string expect = expected[i];
EXPECT_EQ(expect, result);
}
}
}

} // namespace starrocks::vectorized

0 comments on commit 658ecf2

Please sign in to comment.