Skip to content

Commit

Permalink
runtime filter: fix timezone error in runtime filter (#8273) (#8275)
Browse files Browse the repository at this point in the history
close #8222
  • Loading branch information
ti-chi-bot authored Oct 31, 2023
1 parent b6b2d1e commit 4effafa
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 16 deletions.
10 changes: 9 additions & 1 deletion dbms/src/DataStreams/RuntimeFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ void RuntimeFilter::setINValuesSet(const std::shared_ptr<Set> & in_values_set_)
in_values_set = in_values_set_;
}

void RuntimeFilter::setTimezoneInfo(const TimezoneInfo & timezone_info_)
{
timezone_info = timezone_info_;
}

void RuntimeFilter::build()
{
if (!DM::FilterParser::isRSFilterSupportType(target_expr.field_type().tp()))
Expand Down Expand Up @@ -206,11 +211,14 @@ DM::RSOperatorPtr RuntimeFilter::parseToRSOperator(DM::ColumnDefines & columns_t
switch (rf_type)
{
case tipb::IN:
// Note that the elements are added from the block read (after timezone casted).
// Take care of them when parsing to rough set filter.
return DM::FilterParser::parseRFInExpr(
rf_type,
target_expr,
columns_to_read,
in_values_set->getUniqueSetElements());
in_values_set->getUniqueSetElements(),
timezone_info);
case tipb::MIN_MAX:
case tipb::BLOOM_FILTER:
// TODO
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/DataStreams/RuntimeFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class RuntimeFilter

void setINValuesSet(const std::shared_ptr<Set> & in_values_set_);

void setTimezoneInfo(const TimezoneInfo & timezone_info_);

void build();

void updateValues(const ColumnWithTypeAndName & values, const LoggerPtr & log);
Expand All @@ -85,6 +87,7 @@ class RuntimeFilter
tipb::Expr source_expr;
tipb::Expr target_expr;
const tipb::RuntimeFilterType rf_type;
TimezoneInfo timezone_info;
// thread safe
std::atomic<RuntimeFilterStatus> status = RuntimeFilterStatus::NOT_READY;
// used for failed_reason thread safe
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1375,6 +1375,7 @@ void DAGExpressionAnalyzer::appendRuntimeFilterProperties(RuntimeFilterPtr & run
header.insert(ColumnWithTypeAndName(name_and_type.type->createColumn(), name_and_type.type, "_" + toString(1)));
in_values_set->setHeader(header);
runtime_filter->setINValuesSet(in_values_set);
runtime_filter->setTimezoneInfo(context.getTimezoneInfo());
break;
case tipb::MIN_MAX:
case tipb::BLOOM_FILTER:
Expand Down
47 changes: 33 additions & 14 deletions dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,19 @@ ColumnDefine getColumnDefineForColumnExpr(const tipb::Expr & expr, const ColumnD
return columns_to_read[column_index];
}

// convert literal value from timezone specified in cop request to UTC in-place
inline void convertFieldWithTimezone(Field & value, const TimezoneInfo & timezone_info)
{
static const auto & time_zone_utc = DateLUT::instance("UTC");
UInt64 from_time = value.get<UInt64>();
UInt64 result_time = from_time;
if (timezone_info.is_name_based)
convertTimeZone(from_time, result_time, *timezone_info.timezone, time_zone_utc);
else if (timezone_info.timezone_offset != 0)
convertTimeZoneByOffset(from_time, result_time, false, timezone_info.timezone_offset);
value = Field(result_time);
}

enum class OperandType
{
Unknown = 0,
Expand Down Expand Up @@ -190,16 +203,7 @@ inline RSOperatorPtr parseTiCompareExpr( //
false);
// convert literal value from timezone specified in cop request to UTC
if (literal_type == TiDB::TypeDatetime && !timezone_info.is_utc_timezone)
{
static const auto & time_zone_utc = DateLUT::instance("UTC");
UInt64 from_time = value.get<UInt64>();
UInt64 result_time = from_time;
if (timezone_info.is_name_based)
convertTimeZone(from_time, result_time, *timezone_info.timezone, time_zone_utc);
else if (timezone_info.timezone_offset != 0)
convertTimeZoneByOffset(from_time, result_time, false, timezone_info.timezone_offset);
value = Field(result_time);
}
convertFieldWithTimezone(value, timezone_info);
}
values.push_back(value);
}
Expand Down Expand Up @@ -474,7 +478,8 @@ RSOperatorPtr FilterParser::parseRFInExpr(
const tipb::RuntimeFilterType rf_type,
const tipb::Expr & target_expr,
const ColumnDefines & columns_to_read,
const std::set<Field> & setElements)
const std::set<Field> & setElements,
const TimezoneInfo & timezone_info)
{
// todo check if set elements is empty
Attr attr;
Expand All @@ -488,9 +493,23 @@ RSOperatorPtr FilterParser::parseRFInExpr(
return createUnsupported(target_expr.ShortDebugString(), "rf target expr is not column expr", false);
}
auto column_define = cop::getColumnDefineForColumnExpr(target_expr, columns_to_read);
attr = Attr{.col_name = column_define.name, .col_id = column_define.id, .type = column_define.type};
std::for_each(setElements.begin(), setElements.end(), [&](Field element) { values.push_back(element); });
return createIn(attr, values);
auto attr = Attr{.col_name = column_define.name, .col_id = column_define.id, .type = column_define.type};
if (target_expr.field_type().tp() == TiDB::TypeTimestamp && !timezone_info.is_utc_timezone)
{
Fields values;
values.reserve(setElements.size());
std::for_each(setElements.begin(), setElements.end(), [&](Field element) {
// convert literal value from timezone specified in cop request to UTC
cop::convertFieldWithTimezone(element, timezone_info);
values.push_back(element);
});
return createIn(attr, values);
}
else
{
Fields values(setElements.begin(), setElements.end());
return createIn(attr, values);
}
}
case tipb::MIN_MAX:
case tipb::BLOOM_FILTER:
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class FilterParser
tipb::RuntimeFilterType rf_type,
const tipb::Expr & target_expr,
const ColumnDefines & columns_to_read,
const std::set<Field> & setElements);
const std::set<Field> & setElements,
const TimezoneInfo & timezone_info);

static bool isRSFilterSupportType(Int32 field_type);

Expand Down
25 changes: 25 additions & 0 deletions tests/fullstack-test/mpp/runtime_filter.test
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,23 @@ mysql> insert into test.t2 values (2,3,3,3,3333333,3, 1, 3, 3.0, 3.00, 3.0);
mysql> insert into test.t2 values (3,3,3,3,3333333,3, 1, 3, 3.0, 3.00, 3.0);
mysql> insert into test.t2 values (4,null,null,null,null,null, null, null, null, null, null);

mysql> drop table if exists test.t1_timestamp;
mysql> create table test.t1_timestamp (k1 int, k2 timestamp);
mysql> alter table test.t1_timestamp set tiflash replica 1;
mysql> insert into test.t1_timestamp values (1, '2023-10-20 00:00:00');

mysql> drop table if exists test.t2_timestamp;
mysql> create table test.t2_timestamp (k1 int, k2 timestamp);
mysql> alter table test.t2_timestamp set tiflash replica 1;
mysql> insert into test.t2_timestamp values (1, '2023-10-20 00:00:00');

func> wait_table test t1
func> wait_table test t2
func> wait_table test t1_timestamp
func> wait_table test t2_timestamp

mysql> alter table test.t1_timestamp compact tiflash replica;
mysql> alter table test.t2_timestamp compact tiflash replica;

# inner join
mysql> set @@tidb_isolation_read_engines='tiflash'; set tidb_enforce_mpp = 1; set tidb_runtime_filter_mode="LOCAL"; select t1_tinyint, t2_tinyint from test.t1, test.t2 where t1.t1_tinyint=t2.t2_tinyint;
Expand Down Expand Up @@ -62,5 +77,15 @@ mysql> set @@tidb_isolation_read_engines='tiflash'; set tidb_enforce_mpp = 1; se
| 1 |
+------------+

# test timestamp column type for issue #8222
mysql> set @@tidb_isolation_read_engines='tiflash'; set tidb_enforce_mpp = 1; set tidb_runtime_filter_mode="LOCAL"; select * from test.t1_timestamp, test.t2_timestamp where t1_timestamp.k2=t2_timestamp.k2;
+------+---------------------+------+---------------------+
| k1 | k2 | k1 | k2 |
+------+---------------------+------+---------------------+
| 1 | 2023-10-20 00:00:00 | 1 | 2023-10-20 00:00:00 |
+------+---------------------+------+---------------------+

mysql> drop table test.t1;
mysql> drop table test.t2;
mysql> drop table test.t1_timestamp;
mysql> drop table test.t2_timestamp;

0 comments on commit 4effafa

Please sign in to comment.