From aa65f0c080011ddcb197d24d8aae233b602d5fb8 Mon Sep 17 00:00:00 2001 From: Ashin Gau Date: Fri, 18 Aug 2023 14:05:33 +0800 Subject: [PATCH] [fix](multi-catalog) conversion of compatible numerical types (#23113) Hive support schema change, but doesn't rewrite the parquet file, so the physical type of parquet file may not equal the logical type of table schema. --- .../parquet/fix_length_dict_decoder.hpp | 12 +++--- .../parquet/fix_length_plain_decoder.cpp | 24 +++++++++--- .../format/parquet/fix_length_plain_decoder.h | 2 +- .../hive/test_hive_schema_change.out | 7 ++++ .../hive/test_hive_schema_change.groovy | 37 +++++++++++++++++++ 5 files changed, 70 insertions(+), 12 deletions(-) create mode 100644 regression-test/data/external_table_p2/hive/test_hive_schema_change.out create mode 100644 regression-test/suites/external_table_p2/hive/test_hive_schema_change.groovy diff --git a/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp b/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp index bb95fb426f14408..c368868fd887062 100644 --- a/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp +++ b/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp @@ -74,10 +74,10 @@ class FixLengthDictDecoder final : public BaseDictDecoder { TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); switch (logical_type) { -#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE, PHYSICAL_TYPE) \ - case NUMERIC_TYPE: \ - if constexpr (std::is_same_v) { \ - return _decode_numeric(doris_column, select_vector); \ +#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE, PHYSICAL_TYPE) \ + case NUMERIC_TYPE: \ + if constexpr (!std::is_same_v) { \ + return _decode_numeric(doris_column, select_vector); \ } FOR_LOGICAL_NUMERIC_TYPES(DISPATCH) #undef DISPATCH @@ -177,7 +177,7 @@ class FixLengthDictDecoder final : public BaseDictDecoder { } protected: - template + template Status _decode_numeric(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { auto& column_data = static_cast&>(*doris_column).get_data(); size_t data_index = column_data.size(); @@ -189,7 +189,7 @@ class FixLengthDictDecoder final : public BaseDictDecoder { case ColumnSelectVector::CONTENT: { for (size_t i = 0; i < run_length; ++i) { column_data[data_index++] = - static_cast(_dict_items[_indexes[dict_index++]]); + static_cast(_dict_items[_indexes[dict_index++]]); } break; } diff --git a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp index 3fe58e6a5dd8ba5..f4e24ca4ab8babe 100644 --- a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp +++ b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp @@ -75,9 +75,23 @@ Status FixLengthPlainDecoder::_decode_values(MutableColumnPtr& doris_column, Dat } TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); switch (logical_type) { -#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE, PHYSICAL_TYPE) \ - case NUMERIC_TYPE: \ - return _decode_numeric(doris_column, select_vector); +#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE, PHYSICAL_TYPE) \ + case NUMERIC_TYPE: \ + if (_physical_type == tparquet::Type::INT32) { \ + return _decode_numeric(doris_column, \ + select_vector); \ + } else if (_physical_type == tparquet::Type::INT64) { \ + return _decode_numeric(doris_column, \ + select_vector); \ + } else if (_physical_type == tparquet::Type::FLOAT) { \ + return _decode_numeric(doris_column, \ + select_vector); \ + } else if (_physical_type == tparquet::Type::DOUBLE) { \ + return _decode_numeric(doris_column, \ + select_vector); \ + } else { \ + break; \ + } FOR_LOGICAL_NUMERIC_TYPES(DISPATCH) #undef DISPATCH case TypeIndex::Date: @@ -207,7 +221,7 @@ Status FixLengthPlainDecoder::_decode_string(MutableColumnPtr& doris_column, } return Status::OK(); } -template +template Status FixLengthPlainDecoder::_decode_numeric(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { auto& column_data = static_cast&>(*doris_column).get_data(); @@ -219,7 +233,7 @@ Status FixLengthPlainDecoder::_decode_numeric(MutableColumnPtr& doris_column, case ColumnSelectVector::CONTENT: { for (size_t i = 0; i < run_length; ++i) { char* buf_start = _data->data + _offset; - column_data[data_index++] = *(Numeric*)buf_start; + column_data[data_index++] = *(PhysicalType*)buf_start; _offset += _type_length; } break; diff --git a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.h b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.h index 3204d54464244a1..0b3a6e19453cec0 100644 --- a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.h +++ b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.h @@ -47,7 +47,7 @@ class FixLengthPlainDecoder final : public Decoder { Status skip_values(size_t num_values) override; protected: - template + template Status _decode_numeric(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector); template diff --git a/regression-test/data/external_table_p2/hive/test_hive_schema_change.out b/regression-test/data/external_table_p2/hive/test_hive_schema_change.out new file mode 100644 index 000000000000000..fde929309bdb530 --- /dev/null +++ b/regression-test/data/external_table_p2/hive/test_hive_schema_change.out @@ -0,0 +1,7 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !schema_change -- +1 1 1.0 1.0 1.0099999904632568 1.10101 +2 2 2.0 2.0 2.0199999809265137 2.20202 +3 3 3.0 3.0 3.0299999713897705 3.30303 +123 6334 7898763.0 1.2837483628455E13 0.010101010091602802 0.09238498728784825 + diff --git a/regression-test/suites/external_table_p2/hive/test_hive_schema_change.groovy b/regression-test/suites/external_table_p2/hive/test_hive_schema_change.groovy new file mode 100644 index 000000000000000..fbc1e40d892eefb --- /dev/null +++ b/regression-test/suites/external_table_p2/hive/test_hive_schema_change.groovy @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hive_schema_change", "p2,external,hive,external_remote,external_remote_hive") { + String enabled = context.config.otherConfigs.get("enableExternalHiveTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost") + String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort") + String catalog_name = "test_hive_schema_change" + sql """drop catalog if exists ${catalog_name};""" + sql """ + create catalog if not exists ${catalog_name} properties ( + 'type'='hms', + 'hadoop.username' = 'hadoop', + 'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}' + ); + """ + sql """ switch ${catalog_name} """ + sql """ use `default` """ + qt_schema_change """ select * from schema_change order by tinyint_col """ + sql """ drop catalog ${catalog_name} """ + } +}