Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix query fail when there are timestamp or time columns after generated column #7469

Merged
56 changes: 33 additions & 23 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,11 +352,21 @@ SourceOps DAGStorageInterpreter::executeImpl(PipelineExecutorStatus & exec_statu

void DAGStorageInterpreter::executeSuffix(PipelineExecutorStatus & exec_status, PipelineExecGroupBuilder & group_builder)
{
/// handle generated column if necessary.
executeGeneratedColumnPlaceholder(exec_status, group_builder, remote_read_sources_start_index, generated_column_infos, log);
NamesAndTypes source_columns;
source_columns.reserve(table_scan.getColumnSize());
const auto table_scan_output_header = group_builder.getCurrentHeader();
for (const auto & col : table_scan_output_header)
source_columns.emplace_back(col.name, col.type);
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);
/// If there is no local source, there is no need to execute cast and push down filter, return directly.
/// But we should make sure that the analyzer is initialized before return.
if (remote_read_sources_start_index == 0)
return;
/// handle timezone/duration cast for local table scan.
executeCastAfterTableScan(exec_status, group_builder, remote_read_sources_start_index);

executeGeneratedColumnPlaceholder(exec_status, group_builder, remote_read_sources_start_index, generated_column_infos, log);

/// handle filter conditions for local and remote table scan.
if (filter_conditions.hasValue())
{
Expand Down Expand Up @@ -431,11 +441,20 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)

FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired);
FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired_once);

/// handle timezone/duration cast for local and remote table scan.
executeCastAfterTableScan(remote_read_streams_start_index, pipeline);
/// handle generated column if necessary.
executeGeneratedColumnPlaceholder(remote_read_streams_start_index, generated_column_infos, log, pipeline);
NamesAndTypes source_columns;
source_columns.reserve(table_scan.getColumnSize());
const auto table_scan_output_header = pipeline.firstStream()->getHeader();
for (const auto & col : table_scan_output_header)
source_columns.emplace_back(col.name, col.type);
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);
/// If there is no local stream, there is no need to execute cast and push down filter, return directly.
/// But we should make sure that the analyzer is initialized before return.
if (remote_read_streams_start_index == 0)
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
return;
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
/// handle timezone/duration cast for local and remote table scan.
executeCastAfterTableScan(remote_read_streams_start_index, pipeline);
recordProfileStreams(pipeline, table_scan.getTableScanExecutorID());

/// handle filter conditions for local and remote table scan.
Expand Down Expand Up @@ -478,9 +497,7 @@ void DAGStorageInterpreter::prepare()
assert(storages_with_structure_lock.find(logical_table_id) != storages_with_structure_lock.end());
storage_for_logical_table = storages_with_structure_lock[logical_table_id].storage;

std::tie(required_columns, source_columns, is_need_add_cast_column) = getColumnsForTableScan();

analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);
std::tie(required_columns, is_need_add_cast_column) = getColumnsForTableScan();
}

void DAGStorageInterpreter::executeCastAfterTableScan(
Expand Down Expand Up @@ -1222,12 +1239,10 @@ std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAG
return storages_with_lock;
}

std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageInterpreter::getColumnsForTableScan()
std::tuple<Names, std::vector<ExtraCastAfterTSMode>> DAGStorageInterpreter::getColumnsForTableScan()
{
Names required_columns_tmp;
required_columns_tmp.reserve(table_scan.getColumnSize());
NamesAndTypes source_columns_tmp;
source_columns_tmp.reserve(table_scan.getColumnSize());
std::vector<ExtraCastAfterTSMode> need_cast_column;
need_cast_column.reserve(table_scan.getColumnSize());
String handle_column_name = MutableSupport::tidb_pk_column_name;
Expand All @@ -1245,7 +1260,6 @@ std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageIn
const auto & data_type = getDataTypeByColumnInfoForComputingLayer(ci);
const auto & col_name = GeneratedColumnPlaceholderBlockInputStream::getColumnName(i);
generated_column_infos.push_back(std::make_tuple(i, col_name, data_type));
source_columns_tmp.emplace_back(NameAndTypePair{col_name, data_type});
continue;
}
// Column ID -1 return the handle column
Expand All @@ -1256,16 +1270,6 @@ std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageIn
name = MutableSupport::extra_table_id_column_name;
else
name = storage_for_logical_table->getTableInfo().getColumnName(cid);
if (cid == ExtraTableIDColumnID)
{
NameAndTypePair extra_table_id_column_pair = {name, MutableSupport::extra_table_id_column_type};
source_columns_tmp.emplace_back(std::move(extra_table_id_column_pair));
}
else
{
auto pair = storage_for_logical_table->getColumns().getPhysical(name);
source_columns_tmp.emplace_back(std::move(pair));
}
required_columns_tmp.emplace_back(std::move(name));
}

Expand All @@ -1276,6 +1280,12 @@ std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageIn
}
for (const auto & col : table_scan.getColumns())
{
if (col.hasGeneratedColumnFlag())
{
need_cast_column.push_back(ExtraCastAfterTSMode::None);
continue;
}

if (col_id_set.contains(col.id))
{
need_cast_column.push_back(ExtraCastAfterTSMode::None);
Expand All @@ -1291,7 +1301,7 @@ std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageIn
}
}

return {required_columns_tmp, source_columns_tmp, need_cast_column};
return {required_columns_tmp, need_cast_column};
}

// Build remote requests from `region_retry_from_local_region` and `table_regions_info.remote_regions`
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class DAGStorageInterpreter

std::unordered_map<TableID, StorageWithStructureLock> getAndLockStorages(Int64 query_schema_version);

std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> getColumnsForTableScan();
std::tuple<Names, std::vector<ExtraCastAfterTSMode>> getColumnsForTableScan();

std::vector<RemoteRequest> buildRemoteRequests(const DM::ScanContextPtr & scan_context);

Expand Down Expand Up @@ -164,7 +164,6 @@ class DAGStorageInterpreter
std::unordered_map<TableID, StorageWithStructureLock> storages_with_structure_lock;
ManageableStoragePtr storage_for_logical_table;
Names required_columns;
NamesAndTypes source_columns;
// For generated column, just need a placeholder, and TiDB will fill this column.
std::vector<std::tuple<UInt64, String, DataTypePtr>> generated_column_infos;

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1087,7 +1087,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
{
stream = std::make_shared<UnorderedInputStream>(
read_task_pool,
columns_to_read,
filter && filter->extra_cast ? *filter->columns_after_cast : columns_to_read,
extra_table_id_index,
physical_table_id,
log_tracing_id);
Expand Down
9 changes: 5 additions & 4 deletions dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,18 @@ class PushDownFilter : public std::enable_shared_from_this<PushDownFilter>
const ExpressionActionsPtr & beofre_where_,
const ColumnDefines & filter_columns_,
const String filter_column_name_,
const ExpressionActionsPtr & extra_cast_)
const ExpressionActionsPtr & extra_cast_,
const ColumnDefinesPtr & columns_after_cast_)
: rs_operator(rs_operator_)
, before_where(beofre_where_)
, filter_column_name(std::move(filter_column_name_))
, filter_columns(std::move(filter_columns_))
, extra_cast(extra_cast_)
, columns_after_cast(columns_after_cast_)
{}

explicit PushDownFilter(const RSOperatorPtr & rs_operator_)
: rs_operator(rs_operator_)
, before_where(nullptr)
, filter_columns({})
, extra_cast(nullptr)
{}

// Rough set operator
Expand All @@ -56,6 +55,8 @@ class PushDownFilter : public std::enable_shared_from_this<PushDownFilter>
ColumnDefines filter_columns;
// The expression actions used to cast the timestamp/datetime column
ExpressionActionsPtr extra_cast;
// If the extra_cast is not null, the types of the columns may be changed
ColumnDefinesPtr columns_after_cast;
};

} // namespace DB::DM
17 changes: 16 additions & 1 deletion dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,22 @@ DM::PushDownFilterPtr StorageDeltaMerge::buildPushDownFilter(const RSOperatorPtr
auto [before_where, filter_column_name, _] = ::DB::buildPushDownFilter(pushed_down_filters, *analyzer);
LOG_DEBUG(tracing_logger, "Push down filter: {}", before_where->dumpActions());

return std::make_shared<PushDownFilter>(rs_operator, before_where, filter_columns, filter_column_name, extra_cast);
auto columns_after_cast = std::make_shared<ColumnDefines>();
if (extra_cast != nullptr)
{
columns_after_cast->reserve(columns_to_read.size());
const auto & current_names_and_types = analyzer->getCurrentInputColumns();
for (size_t i = 0; i < table_scan_column_info.size(); ++i)
{
if (table_scan_column_info[i].hasGeneratedColumnFlag())
continue;
auto col = columns_to_read_map.at(table_scan_column_info[i].id);
RUNTIME_CHECK_MSG(col.name == current_names_and_types[i].name, "Column name mismatch, expect: {}, actual: {}", col.name, current_names_and_types[i].name);
columns_after_cast->push_back(col);
columns_after_cast->back().type = current_names_and_types[i].type;
}
}
return std::make_shared<PushDownFilter>(rs_operator, before_where, filter_columns, filter_column_name, extra_cast, columns_after_cast);
}
LOG_DEBUG(tracing_logger, "Push down filter is empty");
return std::make_shared<PushDownFilter>(rs_operator);
Expand Down
50 changes: 50 additions & 0 deletions dbms/src/Storages/StorageDisaggregated.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/TiRemoteBlockInputStream.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
Expand Down Expand Up @@ -305,4 +306,53 @@ void StorageDisaggregated::filterConditions(DAGExpressionAnalyzer & analyzer, DA
pipeline.transform([&profile_streams](auto & stream) { profile_streams.push_back(stream); });
}
}

void StorageDisaggregated::extraCast(DAGExpressionAnalyzer & analyzer, DAGPipeline & pipeline)
{
// If the column is not in the columns of pushed down filter, append a cast to the column.
std::vector<ExtraCastAfterTSMode> need_cast_column;
need_cast_column.reserve(table_scan.getColumnSize());
std::unordered_set<ColumnID> col_id_set;
for (const auto & expr : table_scan.getPushedDownFilters())
{
getColumnIDsFromExpr(expr, table_scan.getColumns(), col_id_set);
}
bool has_need_cast_column = false;
for (const auto & col : table_scan.getColumns())
{
if (col_id_set.contains(col.id))
{
need_cast_column.push_back(ExtraCastAfterTSMode::None);
}
else
{
if (col.id != -1 && col.tp == TiDB::TypeTimestamp)
{
need_cast_column.push_back(ExtraCastAfterTSMode::AppendTimeZoneCast);
has_need_cast_column = true;
}
else if (col.id != -1 && col.tp == TiDB::TypeTime)
{
need_cast_column.push_back(ExtraCastAfterTSMode::AppendDurationCast);
has_need_cast_column = true;
}
else
{
need_cast_column.push_back(ExtraCastAfterTSMode::None);
}
}
}
ExpressionActionsChain chain;
if (has_need_cast_column && analyzer.appendExtraCastsAfterTS(chain, need_cast_column, table_scan))
{
ExpressionActionsPtr extra_cast = chain.getLastActions();
chain.finalize();
chain.clear();
for (auto & stream : pipeline.streams)
{
stream = std::make_shared<ExpressionBlockInputStream>(stream, extra_cast, log->identifier());
stream->setExtraInfo("cast after local tableScan");
}
}
}
} // namespace DB
50 changes: 2 additions & 48 deletions dbms/src/Storages/StorageDisaggregatedRemote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include <Common/ThreadManager.h>
#include <Common/TiFlashMetrics.h>
#include <Core/NamesAndTypes.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/TiRemoteBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
Expand Down Expand Up @@ -156,56 +155,11 @@ BlockInputStreams StorageDisaggregated::readThroughS3(
{
source_columns.emplace_back(col.name, col.type);
}

analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);

// Handle duration type column
// If the column is not in the columns of pushed down filter, append a cast to the column.
std::vector<ExtraCastAfterTSMode> need_cast_column;
need_cast_column.reserve(table_scan.getColumnSize());
std::unordered_set<ColumnID> col_id_set;
for (const auto & expr : table_scan.getPushedDownFilters())
{
getColumnIDsFromExpr(expr, table_scan.getColumns(), col_id_set);
}
bool has_need_cast_column = false;
for (const auto & col : table_scan.getColumns())
{
if (col_id_set.contains(col.id))
{
need_cast_column.push_back(ExtraCastAfterTSMode::None);
}
else
{
if (col.id != -1 && col.tp == TiDB::TypeTimestamp)
{
need_cast_column.push_back(ExtraCastAfterTSMode::AppendTimeZoneCast);
has_need_cast_column = true;
}
else if (col.id != -1 && col.tp == TiDB::TypeTime)
{
need_cast_column.push_back(ExtraCastAfterTSMode::AppendDurationCast);
has_need_cast_column = true;
}
else
{
need_cast_column.push_back(ExtraCastAfterTSMode::None);
}
}
}
ExpressionActionsChain chain;
if (has_need_cast_column && analyzer->appendExtraCastsAfterTS(chain, need_cast_column, table_scan))
{
ExpressionActionsPtr extra_cast = chain.getLastActions();
chain.finalize();
chain.clear();
for (auto & stream : pipeline.streams)
{
stream = std::make_shared<ExpressionBlockInputStream>(stream, extra_cast, log->identifier());
stream->setExtraInfo("cast after local tableScan");
}
}

extraCast(*analyzer, pipeline);
// Handle filter
filterConditions(*analyzer, pipeline);
return pipeline.streams;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Copyright 2023 PingCAP, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

mysql> drop table if exists test.t;
mysql> create table if not exists test.t(a time(4), i int);

# insert more than 8192 rows to make sure filter conditions can be pushed down.
mysql> insert into test.t values('-700:10:10.123456', 1), ('700:11:11.123500', 2), ('600:11:11.123500', 3);
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;

mysql> alter table test.t set tiflash replica 1;

func> wait_table test t

mysql> select * from test.t where a = '500:11:11.123500';
# success, but the result is empty
mysql> select hour(a), i from test.t where a = '500:11:11.123500';
mysql> select minute(a), i from test.t where a = '500:11:11.123500';
mysql> select second(a), i from test.t where a = '500:11:11.123500';
mysql> select a, i from test.t where hour(a) = 500;
mysql> select a, i from test.t where minute(a) = 13;
mysql> select a, i from test.t where second(a) = 14;

mysql> drop table test.t;
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.


mysql> CREATE TABLE test.`IDT_26539` (`COL102` float DEFAULT NULL, `COL103` float DEFAULT NULL, `COL1` float GENERATED ALWAYS AS ((`COL102` DIV 10)) VIRTUAL, `COL2` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL, `COL4` datetime DEFAULT NULL, `COL3` bigint DEFAULT NULL, `COL5` float DEFAULT NULL, KEY `UK_COL1` (`COL1`) /*!80000 INVISIBLE */) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
mysql> CREATE TABLE test.`IDT_26539` (`COL102` float DEFAULT NULL, `COL103` float DEFAULT NULL, `COL1` float GENERATED ALWAYS AS ((`COL102` DIV 10)) VIRTUAL, `COL2` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL, `COL4` datetime DEFAULT NULL, `COL3` bigint DEFAULT NULL, `COL5` time DEFAULT NULL, KEY `UK_COL1` (`COL1`) /*!80000 INVISIBLE */) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
mysql> insert into test.IDT_26539 (COL102, COL103, COL2, COL4, COL3, COL5) values (NULL, NULL, NULL, NULL, NULL, NULL);
mysql> insert into test.IDT_26539 (COL102, COL103, COL2, COL4, COL3, COL5) select COL102, COL103, COL2, COL4, COL3, COL5 from test.IDT_26539;
mysql> insert into test.IDT_26539 (COL102, COL103, COL2, COL4, COL3, COL5) select COL102, COL103, COL2, COL4, COL3, COL5 from test.IDT_26539;
Expand Down