Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix query fail when there are timestamp or time columns after generated column (#7469) #7483

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 24 additions & 30 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,24 +341,25 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)
FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired);
FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired_once);

if (!table_scan.getPushedDownFilters().empty())
/// handle generated column if necessary.
executeGeneratedColumnPlaceholder(remote_read_streams_start_index, generated_column_infos, log, pipeline);
NamesAndTypes source_columns;
source_columns.reserve(table_scan.getColumnSize());
const auto table_scan_output_header = pipeline.firstStream()->getHeader();
for (const auto & col : table_scan_output_header)
source_columns.emplace_back(col.name, col.type);
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);
/// If there is no local stream, there is no need to execute cast and push down filter, return directly.
/// But we should make sure that the analyzer is initialized before return.
if (remote_read_streams_start_index == 0)
{
/// If there are duration type pushed down filters, type of columns may be changed.
const auto table_scan_output_header = pipeline.firstStream()->getHeader();
for (const auto & col : table_scan_output_header)
{
const auto & name = col.name;
auto it = std::find_if(source_columns.begin(), source_columns.end(), [&name](const auto & c) { return c.name == name; });
if (it != source_columns.end() && it->type != col.type)
it->type = col.type;
}
recordProfileStreams(pipeline, table_scan.getTableScanExecutorID());
if (filter_conditions.hasValue())
recordProfileStreams(pipeline, filter_conditions.executor_id);
return;
}
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);

/// handle timezone/duration cast for local and remote table scan.
executeCastAfterTableScan(remote_read_streams_start_index, pipeline);
/// handle generated column if necessary.
executeGeneratedColumnPlaceholder(remote_read_streams_start_index, generated_column_infos, log, pipeline);
recordProfileStreams(pipeline, table_scan.getTableScanExecutorID());

/// handle filter conditions for local and remote table scan.
Expand Down Expand Up @@ -401,7 +402,7 @@ void DAGStorageInterpreter::prepare()
assert(storages_with_structure_lock.find(logical_table_id) != storages_with_structure_lock.end());
storage_for_logical_table = storages_with_structure_lock[logical_table_id].storage;

std::tie(required_columns, source_columns, is_need_add_cast_column) = getColumnsForTableScan();
std::tie(required_columns, is_need_add_cast_column) = getColumnsForTableScan();
}

void DAGStorageInterpreter::executeCastAfterTableScan(
Expand Down Expand Up @@ -1001,12 +1002,10 @@ std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAG
return storages_with_lock;
}

std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageInterpreter::getColumnsForTableScan()
std::tuple<Names, std::vector<ExtraCastAfterTSMode>> DAGStorageInterpreter::getColumnsForTableScan()
{
Names required_columns_tmp;
required_columns_tmp.reserve(table_scan.getColumnSize());
NamesAndTypes source_columns_tmp;
source_columns_tmp.reserve(table_scan.getColumnSize());
std::vector<ExtraCastAfterTSMode> need_cast_column;
need_cast_column.reserve(table_scan.getColumnSize());
String handle_column_name = MutableSupport::tidb_pk_column_name;
Expand All @@ -1024,7 +1023,6 @@ std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageIn
const auto & data_type = getDataTypeByColumnInfoForComputingLayer(ci);
const auto & col_name = GeneratedColumnPlaceholderBlockInputStream::getColumnName(i);
generated_column_infos.push_back(std::make_tuple(i, col_name, data_type));
source_columns_tmp.emplace_back(NameAndTypePair{col_name, data_type});
continue;
}
// Column ID -1 return the handle column
Expand All @@ -1035,16 +1033,6 @@ std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageIn
name = MutableSupport::extra_table_id_column_name;
else
name = storage_for_logical_table->getTableInfo().getColumnName(cid);
if (cid == ExtraTableIDColumnID)
{
NameAndTypePair extra_table_id_column_pair = {name, MutableSupport::extra_table_id_column_type};
source_columns_tmp.emplace_back(std::move(extra_table_id_column_pair));
}
else
{
auto pair = storage_for_logical_table->getColumns().getPhysical(name);
source_columns_tmp.emplace_back(std::move(pair));
}
required_columns_tmp.emplace_back(std::move(name));
}

Expand All @@ -1055,6 +1043,12 @@ std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageIn
}
for (const auto & col : table_scan.getColumns())
{
if (col.hasGeneratedColumnFlag())
{
need_cast_column.push_back(ExtraCastAfterTSMode::None);
continue;
}

if (col_id_set.contains(col.id))
{
need_cast_column.push_back(ExtraCastAfterTSMode::None);
Expand All @@ -1070,7 +1064,7 @@ std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageIn
}
}

return {required_columns_tmp, source_columns_tmp, need_cast_column};
return {required_columns_tmp, need_cast_column};
}

// Build remote requests from `region_retry_from_local_region` and `table_regions_info.remote_regions`
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class DAGStorageInterpreter

std::unordered_map<TableID, StorageWithStructureLock> getAndLockStorages(Int64 query_schema_version);

std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> getColumnsForTableScan();
std::tuple<Names, std::vector<ExtraCastAfterTSMode>> getColumnsForTableScan();

std::vector<RemoteRequest> buildRemoteRequests(const DM::ScanContextPtr & scan_context);

Expand Down Expand Up @@ -136,7 +136,6 @@ class DAGStorageInterpreter
std::unordered_map<TableID, StorageWithStructureLock> storages_with_structure_lock;
ManageableStoragePtr storage_for_logical_table;
Names required_columns;
NamesAndTypes source_columns;
// For generated column, just need a placeholder, and TiDB will fill this column.
std::vector<std::tuple<UInt64, String, DataTypePtr>> generated_column_infos;
};
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
{
stream = std::make_shared<UnorderedInputStream>(
read_task_pool,
filter && filter->before_where ? filter->columns_after_cast : columns_to_read,
filter && filter->extra_cast ? *filter->columns_after_cast : columns_to_read,
extra_table_id_index,
physical_table_id,
log_tracing_id);
Expand Down
10 changes: 3 additions & 7 deletions dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,17 @@ class PushDownFilter : public std::enable_shared_from_this<PushDownFilter>
const ColumnDefines & filter_columns_,
const String filter_column_name_,
const ExpressionActionsPtr & extra_cast_,
const ColumnDefines & columns_after_cast_)
const ColumnDefinesPtr & columns_after_cast_)
: rs_operator(rs_operator_)
, before_where(beofre_where_)
, filter_column_name(std::move(filter_column_name_))
, filter_columns(std::move(filter_columns_))
, extra_cast(extra_cast_)
, columns_after_cast(std::move(columns_after_cast_))
, columns_after_cast(columns_after_cast_)
{}

explicit PushDownFilter(const RSOperatorPtr & rs_operator_)
: rs_operator(rs_operator_)
, before_where(nullptr)
, filter_columns({})
, extra_cast(nullptr)
, columns_after_cast({})
{}

// Rough set operator
Expand All @@ -60,7 +56,7 @@ class PushDownFilter : public std::enable_shared_from_this<PushDownFilter>
// The expression actions used to cast the timestamp/datetime column
ExpressionActionsPtr extra_cast;
// If the extra_cast is not null, the types of the columns may be changed
ColumnDefines columns_after_cast;
ColumnDefinesPtr columns_after_cast;
};

} // namespace DB::DM
24 changes: 13 additions & 11 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -839,19 +839,21 @@ DM::PushDownFilterPtr StorageDeltaMerge::buildPushDownFilter(const RSOperatorPtr
auto [before_where, filter_column_name, _] = ::DB::buildPushDownFilter(pushed_down_filters, *analyzer);
LOG_DEBUG(tracing_logger, "Push down filter: {}", before_where->dumpActions());

ColumnDefines columns_after_cast;
columns_after_cast.reserve(columns_to_read.size());
const auto & source_columns = analyzer->getCurrentInputColumns();
for (size_t i = 0; i < table_scan_column_info.size(); ++i)
auto columns_after_cast = std::make_shared<ColumnDefines>();
if (extra_cast != nullptr)
{
if (table_scan_column_info[i].hasGeneratedColumnFlag())
continue;
auto it = columns_to_read_map.at(table_scan_column_info[i].id);
RUNTIME_CHECK(it.name == source_columns[i].name);
columns_after_cast.push_back(std::move(it));
columns_after_cast.back().type = source_columns[i].type;
columns_after_cast->reserve(columns_to_read.size());
const auto & current_names_and_types = analyzer->getCurrentInputColumns();
for (size_t i = 0; i < table_scan_column_info.size(); ++i)
{
if (table_scan_column_info[i].hasGeneratedColumnFlag())
continue;
auto col = columns_to_read_map.at(table_scan_column_info[i].id);
RUNTIME_CHECK_MSG(col.name == current_names_and_types[i].name, "Column name mismatch, expect: {}, actual: {}", col.name, current_names_and_types[i].name);
columns_after_cast->push_back(col);
columns_after_cast->back().type = current_names_and_types[i].type;
}
}

return std::make_shared<PushDownFilter>(rs_operator, before_where, filter_columns, filter_column_name, extra_cast, columns_after_cast);
}
LOG_DEBUG(tracing_logger, "Push down filter is empty");
Expand Down
50 changes: 50 additions & 0 deletions dbms/src/Storages/StorageDisaggregated.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/TiRemoteBlockInputStream.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
Expand Down Expand Up @@ -303,4 +304,53 @@ void StorageDisaggregated::filterConditions(DAGExpressionAnalyzer & analyzer, DA
pipeline.transform([&profile_streams](auto & stream) { profile_streams.push_back(stream); });
}
}

void StorageDisaggregated::extraCast(DAGExpressionAnalyzer & analyzer, DAGPipeline & pipeline)
{
// If the column is not in the columns of pushed down filter, append a cast to the column.
std::vector<ExtraCastAfterTSMode> need_cast_column;
need_cast_column.reserve(table_scan.getColumnSize());
std::unordered_set<ColumnID> col_id_set;
for (const auto & expr : table_scan.getPushedDownFilters())
{
getColumnIDsFromExpr(expr, table_scan.getColumns(), col_id_set);
}
bool has_need_cast_column = false;
for (const auto & col : table_scan.getColumns())
{
if (col_id_set.contains(col.id))
{
need_cast_column.push_back(ExtraCastAfterTSMode::None);
}
else
{
if (col.id != -1 && col.tp == TiDB::TypeTimestamp)
{
need_cast_column.push_back(ExtraCastAfterTSMode::AppendTimeZoneCast);
has_need_cast_column = true;
}
else if (col.id != -1 && col.tp == TiDB::TypeTime)
{
need_cast_column.push_back(ExtraCastAfterTSMode::AppendDurationCast);
has_need_cast_column = true;
}
else
{
need_cast_column.push_back(ExtraCastAfterTSMode::None);
}
}
}
ExpressionActionsChain chain;
if (has_need_cast_column && analyzer.appendExtraCastsAfterTS(chain, need_cast_column, table_scan))
{
ExpressionActionsPtr extra_cast = chain.getLastActions();
chain.finalize();
chain.clear();
for (auto & stream : pipeline.streams)
{
stream = std::make_shared<ExpressionBlockInputStream>(stream, extra_cast, log->identifier());
stream->setExtraInfo("cast after local tableScan");
}
}
}
} // namespace DB
50 changes: 2 additions & 48 deletions dbms/src/Storages/StorageDisaggregatedRemote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include <Common/ThreadManager.h>
#include <Common/TiFlashMetrics.h>
#include <Core/NamesAndTypes.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/TiRemoteBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
Expand Down Expand Up @@ -156,56 +155,11 @@ BlockInputStreams StorageDisaggregated::readThroughS3(
{
source_columns.emplace_back(col.name, col.type);
}

analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);

// Handle duration type column
// If the column is not in the columns of pushed down filter, append a cast to the column.
std::vector<ExtraCastAfterTSMode> need_cast_column;
need_cast_column.reserve(table_scan.getColumnSize());
std::unordered_set<ColumnID> col_id_set;
for (const auto & expr : table_scan.getPushedDownFilters())
{
getColumnIDsFromExpr(expr, table_scan.getColumns(), col_id_set);
}
bool has_need_cast_column = false;
for (const auto & col : table_scan.getColumns())
{
if (col_id_set.contains(col.id))
{
need_cast_column.push_back(ExtraCastAfterTSMode::None);
}
else
{
if (col.id != -1 && col.tp == TiDB::TypeTimestamp)
{
need_cast_column.push_back(ExtraCastAfterTSMode::AppendTimeZoneCast);
has_need_cast_column = true;
}
else if (col.id != -1 && col.tp == TiDB::TypeTime)
{
need_cast_column.push_back(ExtraCastAfterTSMode::AppendDurationCast);
has_need_cast_column = true;
}
else
{
need_cast_column.push_back(ExtraCastAfterTSMode::None);
}
}
}
ExpressionActionsChain chain;
if (has_need_cast_column && analyzer->appendExtraCastsAfterTS(chain, need_cast_column, table_scan))
{
ExpressionActionsPtr extra_cast = chain.getLastActions();
chain.finalize();
chain.clear();
for (auto & stream : pipeline.streams)
{
stream = std::make_shared<ExpressionBlockInputStream>(stream, extra_cast, log->identifier());
stream->setExtraInfo("cast after local tableScan");
}
}

extraCast(*analyzer, pipeline);
// Handle filter
filterConditions(*analyzer, pipeline);
return pipeline.streams;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ mysql> select a, i from test.t where hour(a) = 500;
mysql> select a, i from test.t where minute(a) = 13;
mysql> select a, i from test.t where second(a) = 14;

mysql> drop table test.t;
mysql> drop table test.t;
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.


mysql> CREATE TABLE test.`IDT_26539` (`COL102` float DEFAULT NULL, `COL103` float DEFAULT NULL, `COL1` float GENERATED ALWAYS AS ((`COL102` DIV 10)) VIRTUAL, `COL2` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL, `COL4` datetime DEFAULT NULL, `COL3` bigint DEFAULT NULL, `COL5` float DEFAULT NULL, KEY `UK_COL1` (`COL1`) /*!80000 INVISIBLE */) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
mysql> CREATE TABLE test.`IDT_26539` (`COL102` float DEFAULT NULL, `COL103` float DEFAULT NULL, `COL1` float GENERATED ALWAYS AS ((`COL102` DIV 10)) VIRTUAL, `COL2` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL, `COL4` datetime DEFAULT NULL, `COL3` bigint DEFAULT NULL, `COL5` time DEFAULT NULL, KEY `UK_COL1` (`COL1`) /*!80000 INVISIBLE */) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
mysql> insert into test.IDT_26539 (COL102, COL103, COL2, COL4, COL3, COL5) values (NULL, NULL, NULL, NULL, NULL, NULL);
mysql> insert into test.IDT_26539 (COL102, COL103, COL2, COL4, COL3, COL5) select COL102, COL103, COL2, COL4, COL3, COL5 from test.IDT_26539;
mysql> insert into test.IDT_26539 (COL102, COL103, COL2, COL4, COL3, COL5) select COL102, COL103, COL2, COL4, COL3, COL5 from test.IDT_26539;
Expand Down