From 6424707f23869556b0cfe1c7bb92ef7a680e2c29 Mon Sep 17 00:00:00 2001 From: Elsa <111482174+elsa0520@users.noreply.github.com> Date: Mon, 30 Oct 2023 18:51:36 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #8273 Signed-off-by: ti-chi-bot --- dbms/src/DataStreams/RuntimeFilter.cpp | 10 +++++++- dbms/src/DataStreams/RuntimeFilter.h | 3 +++ .../Coprocessor/DAGExpressionAnalyzer.cpp | 1 + .../DeltaMerge/FilterParser/FilterParser.cpp | 23 ++++++++++++++++- .../DeltaMerge/FilterParser/FilterParser.h | 3 ++- tests/fullstack-test/mpp/runtime_filter.test | 25 +++++++++++++++++++ 6 files changed, 62 insertions(+), 3 deletions(-) diff --git a/dbms/src/DataStreams/RuntimeFilter.cpp b/dbms/src/DataStreams/RuntimeFilter.cpp index 70d3c0dea4a..62ed46e5de5 100644 --- a/dbms/src/DataStreams/RuntimeFilter.cpp +++ b/dbms/src/DataStreams/RuntimeFilter.cpp @@ -55,6 +55,11 @@ void RuntimeFilter::setINValuesSet(const std::shared_ptr & in_values_set_) in_values_set = in_values_set_; } +void RuntimeFilter::setTimezoneInfo(const TimezoneInfo & timezone_info_) +{ + timezone_info = timezone_info_; +} + void RuntimeFilter::build() { if (!DM::FilterParser::isRSFilterSupportType(target_expr.field_type().tp())) @@ -206,11 +211,14 @@ DM::RSOperatorPtr RuntimeFilter::parseToRSOperator(DM::ColumnDefines & columns_t switch (rf_type) { case tipb::IN: + // Note that the elements are added from the block read (after timezone casted). + // Take care of them when parsing to rough set filter. return DM::FilterParser::parseRFInExpr( rf_type, target_expr, columns_to_read, - in_values_set->getUniqueSetElements()); + in_values_set->getUniqueSetElements(), + timezone_info); case tipb::MIN_MAX: case tipb::BLOOM_FILTER: // TODO diff --git a/dbms/src/DataStreams/RuntimeFilter.h b/dbms/src/DataStreams/RuntimeFilter.h index 6017ee56fc8..8a8ab3a8c96 100644 --- a/dbms/src/DataStreams/RuntimeFilter.h +++ b/dbms/src/DataStreams/RuntimeFilter.h @@ -61,6 +61,8 @@ class RuntimeFilter void setINValuesSet(const std::shared_ptr & in_values_set_); + void setTimezoneInfo(const TimezoneInfo & timezone_info_); + void build(); void updateValues(const ColumnWithTypeAndName & values, const LoggerPtr & log); @@ -85,6 +87,7 @@ class RuntimeFilter tipb::Expr source_expr; tipb::Expr target_expr; const tipb::RuntimeFilterType rf_type; + TimezoneInfo timezone_info; // thread safe std::atomic status = RuntimeFilterStatus::NOT_READY; // used for failed_reason thread safe diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp index ae65441bd1c..3e63eee26ba 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp @@ -1375,6 +1375,7 @@ void DAGExpressionAnalyzer::appendRuntimeFilterProperties(RuntimeFilterPtr & run header.insert(ColumnWithTypeAndName(name_and_type.type->createColumn(), name_and_type.type, "_" + toString(1))); in_values_set->setHeader(header); runtime_filter->setINValuesSet(in_values_set); + runtime_filter->setTimezoneInfo(context.getTimezoneInfo()); break; case tipb::MIN_MAX: case tipb::BLOOM_FILTER: diff --git a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp index 87c6a702de2..9870f616f5d 100644 --- a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp +++ b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp @@ -474,7 +474,8 @@ RSOperatorPtr FilterParser::parseRFInExpr( const tipb::RuntimeFilterType rf_type, const tipb::Expr & target_expr, const ColumnDefines & columns_to_read, - const std::set & setElements) + const std::set & setElements, + const TimezoneInfo & timezone_info) { // todo check if set elements is empty Attr attr; @@ -488,9 +489,29 @@ RSOperatorPtr FilterParser::parseRFInExpr( return createUnsupported(target_expr.ShortDebugString(), "rf target expr is not column expr", false); } auto column_define = cop::getColumnDefineForColumnExpr(target_expr, columns_to_read); +<<<<<<< HEAD attr = Attr{.col_name = column_define.name, .col_id = column_define.id, .type = column_define.type}; std::for_each(setElements.begin(), setElements.end(), [&](Field element) { values.push_back(element); }); return createIn(attr, values); +======= + auto attr = Attr{.col_name = column_define.name, .col_id = column_define.id, .type = column_define.type}; + if (target_expr.field_type().tp() == TiDB::TypeTimestamp && !timezone_info.is_utc_timezone) + { + Fields values; + values.reserve(setElements.size()); + std::for_each(setElements.begin(), setElements.end(), [&](Field element) { + // convert literal value from timezone specified in cop request to UTC + cop::convertFieldWithTimezone(element, timezone_info); + values.push_back(element); + }); + return createIn(attr, values); + } + else + { + Fields values(setElements.begin(), setElements.end()); + return createIn(attr, values); + } +>>>>>>> ceb7c2490d (runtime filter: fix timezone error in runtime filter (#8273)) } case tipb::MIN_MAX: case tipb::BLOOM_FILTER: diff --git a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h index 83efee678f3..2c6a730e24a 100644 --- a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h +++ b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h @@ -54,7 +54,8 @@ class FilterParser tipb::RuntimeFilterType rf_type, const tipb::Expr & target_expr, const ColumnDefines & columns_to_read, - const std::set & setElements); + const std::set & setElements, + const TimezoneInfo & timezone_info); static bool isRSFilterSupportType(Int32 field_type); diff --git a/tests/fullstack-test/mpp/runtime_filter.test b/tests/fullstack-test/mpp/runtime_filter.test index e561cf502c3..b06fffffba7 100644 --- a/tests/fullstack-test/mpp/runtime_filter.test +++ b/tests/fullstack-test/mpp/runtime_filter.test @@ -30,8 +30,23 @@ mysql> insert into test.t2 values (2,3,3,3,3333333,3, 1, 3, 3.0, 3.00, 3.0); mysql> insert into test.t2 values (3,3,3,3,3333333,3, 1, 3, 3.0, 3.00, 3.0); mysql> insert into test.t2 values (4,null,null,null,null,null, null, null, null, null, null); +mysql> drop table if exists test.t1_timestamp; +mysql> create table test.t1_timestamp (k1 int, k2 timestamp); +mysql> alter table test.t1_timestamp set tiflash replica 1; +mysql> insert into test.t1_timestamp values (1, '2023-10-20 00:00:00'); + +mysql> drop table if exists test.t2_timestamp; +mysql> create table test.t2_timestamp (k1 int, k2 timestamp); +mysql> alter table test.t2_timestamp set tiflash replica 1; +mysql> insert into test.t2_timestamp values (1, '2023-10-20 00:00:00'); + func> wait_table test t1 func> wait_table test t2 +func> wait_table test t1_timestamp +func> wait_table test t2_timestamp + +mysql> alter table test.t1_timestamp compact tiflash replica; +mysql> alter table test.t2_timestamp compact tiflash replica; # inner join mysql> set @@tidb_isolation_read_engines='tiflash'; set tidb_enforce_mpp = 1; set tidb_runtime_filter_mode="LOCAL"; select t1_tinyint, t2_tinyint from test.t1, test.t2 where t1.t1_tinyint=t2.t2_tinyint; @@ -62,5 +77,15 @@ mysql> set @@tidb_isolation_read_engines='tiflash'; set tidb_enforce_mpp = 1; se | 1 | +------------+ +# test timestamp column type for issue #8222 +mysql> set @@tidb_isolation_read_engines='tiflash'; set tidb_enforce_mpp = 1; set tidb_runtime_filter_mode="LOCAL"; select * from test.t1_timestamp, test.t2_timestamp where t1_timestamp.k2=t2_timestamp.k2; ++------+---------------------+------+---------------------+ +| k1 | k2 | k1 | k2 | ++------+---------------------+------+---------------------+ +| 1 | 2023-10-20 00:00:00 | 1 | 2023-10-20 00:00:00 | ++------+---------------------+------+---------------------+ + mysql> drop table test.t1; mysql> drop table test.t2; +mysql> drop table test.t1_timestamp; +mysql> drop table test.t2_timestamp; From 943a3470849134fdd0061cd5376c96c875e1da06 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger <60744015+Lloyd-Pottiger@users.noreply.github.com> Date: Mon, 30 Oct 2023 20:52:00 +0800 Subject: [PATCH 2/2] Update FilterParser.cpp to resolve conflicts --- .../DeltaMerge/FilterParser/FilterParser.cpp | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp index 9870f616f5d..dfd58990c1b 100644 --- a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp +++ b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp @@ -106,6 +106,19 @@ ColumnDefine getColumnDefineForColumnExpr(const tipb::Expr & expr, const ColumnD return columns_to_read[column_index]; } +// convert literal value from timezone specified in cop request to UTC in-place +inline void convertFieldWithTimezone(Field & value, const TimezoneInfo & timezone_info) +{ + static const auto & time_zone_utc = DateLUT::instance("UTC"); + UInt64 from_time = value.get(); + UInt64 result_time = from_time; + if (timezone_info.is_name_based) + convertTimeZone(from_time, result_time, *timezone_info.timezone, time_zone_utc); + else if (timezone_info.timezone_offset != 0) + convertTimeZoneByOffset(from_time, result_time, false, timezone_info.timezone_offset); + value = Field(result_time); +} + enum class OperandType { Unknown = 0, @@ -190,16 +203,7 @@ inline RSOperatorPtr parseTiCompareExpr( // false); // convert literal value from timezone specified in cop request to UTC if (literal_type == TiDB::TypeDatetime && !timezone_info.is_utc_timezone) - { - static const auto & time_zone_utc = DateLUT::instance("UTC"); - UInt64 from_time = value.get(); - UInt64 result_time = from_time; - if (timezone_info.is_name_based) - convertTimeZone(from_time, result_time, *timezone_info.timezone, time_zone_utc); - else if (timezone_info.timezone_offset != 0) - convertTimeZoneByOffset(from_time, result_time, false, timezone_info.timezone_offset); - value = Field(result_time); - } + convertFieldWithTimezone(value, timezone_info); } values.push_back(value); } @@ -489,11 +493,6 @@ RSOperatorPtr FilterParser::parseRFInExpr( return createUnsupported(target_expr.ShortDebugString(), "rf target expr is not column expr", false); } auto column_define = cop::getColumnDefineForColumnExpr(target_expr, columns_to_read); -<<<<<<< HEAD - attr = Attr{.col_name = column_define.name, .col_id = column_define.id, .type = column_define.type}; - std::for_each(setElements.begin(), setElements.end(), [&](Field element) { values.push_back(element); }); - return createIn(attr, values); -======= auto attr = Attr{.col_name = column_define.name, .col_id = column_define.id, .type = column_define.type}; if (target_expr.field_type().tp() == TiDB::TypeTimestamp && !timezone_info.is_utc_timezone) { @@ -511,7 +510,6 @@ RSOperatorPtr FilterParser::parseRFInExpr( Fields values(setElements.begin(), setElements.end()); return createIn(attr, values); } ->>>>>>> ceb7c2490d (runtime filter: fix timezone error in runtime filter (#8273)) } case tipb::MIN_MAX: case tipb::BLOOM_FILTER: