diff --git a/dbms/src/DataStreams/RuntimeFilter.cpp b/dbms/src/DataStreams/RuntimeFilter.cpp index 70d3c0dea4a..62ed46e5de5 100644 --- a/dbms/src/DataStreams/RuntimeFilter.cpp +++ b/dbms/src/DataStreams/RuntimeFilter.cpp @@ -55,6 +55,11 @@ void RuntimeFilter::setINValuesSet(const std::shared_ptr & in_values_set_) in_values_set = in_values_set_; } +void RuntimeFilter::setTimezoneInfo(const TimezoneInfo & timezone_info_) +{ + timezone_info = timezone_info_; +} + void RuntimeFilter::build() { if (!DM::FilterParser::isRSFilterSupportType(target_expr.field_type().tp())) @@ -206,11 +211,14 @@ DM::RSOperatorPtr RuntimeFilter::parseToRSOperator(DM::ColumnDefines & columns_t switch (rf_type) { case tipb::IN: + // Note that the elements are added from the block read (after timezone casted). + // Take care of them when parsing to rough set filter. return DM::FilterParser::parseRFInExpr( rf_type, target_expr, columns_to_read, - in_values_set->getUniqueSetElements()); + in_values_set->getUniqueSetElements(), + timezone_info); case tipb::MIN_MAX: case tipb::BLOOM_FILTER: // TODO diff --git a/dbms/src/DataStreams/RuntimeFilter.h b/dbms/src/DataStreams/RuntimeFilter.h index 6017ee56fc8..8a8ab3a8c96 100644 --- a/dbms/src/DataStreams/RuntimeFilter.h +++ b/dbms/src/DataStreams/RuntimeFilter.h @@ -61,6 +61,8 @@ class RuntimeFilter void setINValuesSet(const std::shared_ptr & in_values_set_); + void setTimezoneInfo(const TimezoneInfo & timezone_info_); + void build(); void updateValues(const ColumnWithTypeAndName & values, const LoggerPtr & log); @@ -85,6 +87,7 @@ class RuntimeFilter tipb::Expr source_expr; tipb::Expr target_expr; const tipb::RuntimeFilterType rf_type; + TimezoneInfo timezone_info; // thread safe std::atomic status = RuntimeFilterStatus::NOT_READY; // used for failed_reason thread safe diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp index ae65441bd1c..3e63eee26ba 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp @@ -1375,6 +1375,7 @@ void DAGExpressionAnalyzer::appendRuntimeFilterProperties(RuntimeFilterPtr & run header.insert(ColumnWithTypeAndName(name_and_type.type->createColumn(), name_and_type.type, "_" + toString(1))); in_values_set->setHeader(header); runtime_filter->setINValuesSet(in_values_set); + runtime_filter->setTimezoneInfo(context.getTimezoneInfo()); break; case tipb::MIN_MAX: case tipb::BLOOM_FILTER: diff --git a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp index 87c6a702de2..9870f616f5d 100644 --- a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp +++ b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp @@ -474,7 +474,8 @@ RSOperatorPtr FilterParser::parseRFInExpr( const tipb::RuntimeFilterType rf_type, const tipb::Expr & target_expr, const ColumnDefines & columns_to_read, - const std::set & setElements) + const std::set & setElements, + const TimezoneInfo & timezone_info) { // todo check if set elements is empty Attr attr; @@ -488,9 +489,29 @@ RSOperatorPtr FilterParser::parseRFInExpr( return createUnsupported(target_expr.ShortDebugString(), "rf target expr is not column expr", false); } auto column_define = cop::getColumnDefineForColumnExpr(target_expr, columns_to_read); +<<<<<<< HEAD attr = Attr{.col_name = column_define.name, .col_id = column_define.id, .type = column_define.type}; std::for_each(setElements.begin(), setElements.end(), [&](Field element) { values.push_back(element); }); return createIn(attr, values); +======= + auto attr = Attr{.col_name = column_define.name, .col_id = column_define.id, .type = column_define.type}; + if (target_expr.field_type().tp() == TiDB::TypeTimestamp && !timezone_info.is_utc_timezone) + { + Fields values; + values.reserve(setElements.size()); + std::for_each(setElements.begin(), setElements.end(), [&](Field element) { + // convert literal value from timezone specified in cop request to UTC + cop::convertFieldWithTimezone(element, timezone_info); + values.push_back(element); + }); + return createIn(attr, values); + } + else + { + Fields values(setElements.begin(), setElements.end()); + return createIn(attr, values); + } +>>>>>>> ceb7c2490d (runtime filter: fix timezone error in runtime filter (#8273)) } case tipb::MIN_MAX: case tipb::BLOOM_FILTER: diff --git a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h index 83efee678f3..2c6a730e24a 100644 --- a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h +++ b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h @@ -54,7 +54,8 @@ class FilterParser tipb::RuntimeFilterType rf_type, const tipb::Expr & target_expr, const ColumnDefines & columns_to_read, - const std::set & setElements); + const std::set & setElements, + const TimezoneInfo & timezone_info); static bool isRSFilterSupportType(Int32 field_type); diff --git a/tests/fullstack-test/mpp/runtime_filter.test b/tests/fullstack-test/mpp/runtime_filter.test index e561cf502c3..b06fffffba7 100644 --- a/tests/fullstack-test/mpp/runtime_filter.test +++ b/tests/fullstack-test/mpp/runtime_filter.test @@ -30,8 +30,23 @@ mysql> insert into test.t2 values (2,3,3,3,3333333,3, 1, 3, 3.0, 3.00, 3.0); mysql> insert into test.t2 values (3,3,3,3,3333333,3, 1, 3, 3.0, 3.00, 3.0); mysql> insert into test.t2 values (4,null,null,null,null,null, null, null, null, null, null); +mysql> drop table if exists test.t1_timestamp; +mysql> create table test.t1_timestamp (k1 int, k2 timestamp); +mysql> alter table test.t1_timestamp set tiflash replica 1; +mysql> insert into test.t1_timestamp values (1, '2023-10-20 00:00:00'); + +mysql> drop table if exists test.t2_timestamp; +mysql> create table test.t2_timestamp (k1 int, k2 timestamp); +mysql> alter table test.t2_timestamp set tiflash replica 1; +mysql> insert into test.t2_timestamp values (1, '2023-10-20 00:00:00'); + func> wait_table test t1 func> wait_table test t2 +func> wait_table test t1_timestamp +func> wait_table test t2_timestamp + +mysql> alter table test.t1_timestamp compact tiflash replica; +mysql> alter table test.t2_timestamp compact tiflash replica; # inner join mysql> set @@tidb_isolation_read_engines='tiflash'; set tidb_enforce_mpp = 1; set tidb_runtime_filter_mode="LOCAL"; select t1_tinyint, t2_tinyint from test.t1, test.t2 where t1.t1_tinyint=t2.t2_tinyint; @@ -62,5 +77,15 @@ mysql> set @@tidb_isolation_read_engines='tiflash'; set tidb_enforce_mpp = 1; se | 1 | +------------+ +# test timestamp column type for issue #8222 +mysql> set @@tidb_isolation_read_engines='tiflash'; set tidb_enforce_mpp = 1; set tidb_runtime_filter_mode="LOCAL"; select * from test.t1_timestamp, test.t2_timestamp where t1_timestamp.k2=t2_timestamp.k2; ++------+---------------------+------+---------------------+ +| k1 | k2 | k1 | k2 | ++------+---------------------+------+---------------------+ +| 1 | 2023-10-20 00:00:00 | 1 | 2023-10-20 00:00:00 | ++------+---------------------+------+---------------------+ + mysql> drop table test.t1; mysql> drop table test.t2; +mysql> drop table test.t1_timestamp; +mysql> drop table test.t2_timestamp;