From 27299afb48971e1d06d6f3395bb1fef195b1d785 Mon Sep 17 00:00:00 2001 From: jinhelin Date: Wed, 3 Jul 2024 15:32:58 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #9176 Signed-off-by: ti-chi-bot --- .../Storages/DeltaMerge/DeltaMergeDefines.h | 13 ++ .../Storages/DeltaMerge/DeltaMergeStore.cpp | 98 ++++++++- ...duration_filter_late_materialization2.test | 190 ++++++++++++++++++ 3 files changed, 300 insertions(+), 1 deletion(-) create mode 100644 tests/fullstack-test/expr/duration_filter_late_materialization2.test diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h b/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h index c0fe46e6cd7..bcde6249487 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h @@ -164,3 +164,16 @@ static constexpr bool DM_RUN_CHECK = true; } // namespace DM } // namespace DB +<<<<<<< HEAD +======= + +template <> +struct fmt::formatter +{ + template + auto format(const DB::DM::ColumnDefine & cd, FormatContext & ctx) const -> decltype(ctx.out()) + { + return fmt::format_to(ctx.out(), "{}/{}", cd.id, cd.type->getName()); + } +}; +>>>>>>> 567bcb1c67 (Storages: Fix returned column types may not match in late-materialization (#9176)) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 0db885dc960..5773e25ada7 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1075,10 +1075,16 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, GET_METRIC(tiflash_storage_read_tasks_count).Increment(tasks.size()); size_t final_num_stream = std::max(1, std::min(num_streams, tasks.size())); auto read_mode = getReadMode(db_context, is_fast_scan, keep_order, filter); + const auto & final_columns_to_read = filter && filter->extra_cast ? *filter->columns_after_cast : columns_to_read; auto read_task_pool = std::make_shared( +<<<<<<< HEAD physical_table_id, dm_context, columns_to_read, +======= + extra_table_id_index, + final_columns_to_read, +>>>>>>> 567bcb1c67 (Storages: Fix returned column types may not match in late-materialization (#9176)) filter, max_version, expected_block_size, @@ -1098,7 +1104,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, { stream = std::make_shared( read_task_pool, - filter && filter->extra_cast ? *filter->columns_after_cast : columns_to_read, + final_columns_to_read, extra_table_id_index, physical_table_id, log_tracing_id); @@ -1109,7 +1115,11 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, dm_context, read_task_pool, after_segment_read, +<<<<<<< HEAD columns_to_read, +======= + final_columns_to_read, +>>>>>>> 567bcb1c67 (Storages: Fix returned column types may not match in late-materialization (#9176)) filter, max_version, expected_block_size, @@ -1122,9 +1132,24 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, } LOG_INFO( tracing_logger, +<<<<<<< HEAD "Read create stream done, pool_id={} num_streams={}", read_task_pool->poolId(), final_num_stream); +======= + "Read create stream done, keep_order={} dt_enable_read_thread={} enable_read_thread={} " + "is_fast_scan={} is_push_down_filter_empty={} pool_id={} num_streams={} columns_to_read={} " + "final_columns_to_read={}", + keep_order, + db_context.getSettingsRef().dt_enable_read_thread, + enable_read_thread, + is_fast_scan, + filter == nullptr || filter->before_where == nullptr, + read_task_pool->pool_id, + final_num_stream, + columns_to_read, + final_columns_to_read); +>>>>>>> 567bcb1c67 (Storages: Fix returned column types may not match in late-materialization (#9176)) return res; } @@ -1169,11 +1194,21 @@ SourceOps DeltaMergeStore::readSourceOps( }; GET_METRIC(tiflash_storage_read_tasks_count).Increment(tasks.size()); +<<<<<<< HEAD size_t final_num_stream = std::max(1, num_streams); auto read_task_pool = std::make_shared( physical_table_id, dm_context, columns_to_read, +======= + size_t final_num_stream + = enable_read_thread ? std::max(1, num_streams) : std::max(1, std::min(num_streams, tasks.size())); + auto read_mode = getReadMode(db_context, is_fast_scan, keep_order, filter); + const auto & final_columns_to_read = filter && filter->extra_cast ? *filter->columns_after_cast : columns_to_read; + auto read_task_pool = std::make_shared( + extra_table_id_index, + final_columns_to_read, +>>>>>>> 567bcb1c67 (Storages: Fix returned column types may not match in late-materialization (#9176)) filter, max_version, expected_block_size, @@ -1182,6 +1217,7 @@ SourceOps DeltaMergeStore::readSourceOps( after_segment_read, log_tracing_id, enable_read_thread, +<<<<<<< HEAD final_num_stream); dm_context->scan_context->read_mode = ReadMode::Normal; @@ -1201,6 +1237,66 @@ SourceOps DeltaMergeStore::readSourceOps( LOG_INFO(tracing_logger, "Read create SourceOp done, pool_id={} num_streams={}", read_task_pool->poolId(), final_num_stream); return res; +======= + final_num_stream, + dm_context->scan_context->resource_group_name); + dm_context->scan_context->read_mode = read_mode; + + if (enable_read_thread) + { + for (size_t i = 0; i < final_num_stream; ++i) + { + group_builder.addConcurrency(std::make_unique( + exec_context, + read_task_pool, + final_columns_to_read, + extra_table_id_index, + log_tracing_id, + runtime_filter_list, + rf_max_wait_time_ms)); + } + } + else + { + for (size_t i = 0; i < final_num_stream; ++i) + { + group_builder.addConcurrency(std::make_unique( + exec_context, + dm_context, + read_task_pool, + after_segment_read, + final_columns_to_read, + filter, + start_ts, + expected_block_size, + read_mode, + log_tracing_id)); + } + group_builder.transform([&](auto & builder) { + builder.appendTransformOp(std::make_unique( + exec_context, + log_tracing_id, + final_columns_to_read, + extra_table_id_index, + physical_table_id)); + }); + } + + LOG_INFO( + tracing_logger, + "Read create PipelineExec done, keep_order={} dt_enable_read_thread={} enable_read_thread={} " + "is_fast_scan={} is_push_down_filter_empty={} pool_id={} num_streams={} columns_to_read={} " + "final_columns_to_read={}", + keep_order, + db_context.getSettingsRef().dt_enable_read_thread, + enable_read_thread, + is_fast_scan, + filter == nullptr || filter->before_where == nullptr, + read_task_pool->pool_id, + final_num_stream, + columns_to_read, + final_columns_to_read); +>>>>>>> 567bcb1c67 (Storages: Fix returned column types may not match in late-materialization (#9176)) } Remote::DisaggPhysicalTableReadSnapshotPtr diff --git a/tests/fullstack-test/expr/duration_filter_late_materialization2.test b/tests/fullstack-test/expr/duration_filter_late_materialization2.test new file mode 100644 index 00000000000..2dbfc4c2f90 --- /dev/null +++ b/tests/fullstack-test/expr/duration_filter_late_materialization2.test @@ -0,0 +1,190 @@ +# Copyright 2023 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +mysql> drop table if exists test.t; +mysql> create table if not exists test.t(a time(4), i int); + +# insert more than 8192 rows to make sure filter conditions can be pushed down. +mysql> insert into test.t values('-000:10:10.123456', 1), ('000:11:11.123500', 2), ('000:12:12.123500', 3), ('000:13:13.123500', 4); +mysql> insert into test.t values('-001:10:10.123456', 1), ('001:11:11.123500', 2), ('001:12:12.123500', 3), ('001:13:13.123500', 4); +mysql> insert into test.t values('-002:10:10.123456', 1), ('002:11:11.123500', 2), ('002:12:12.123500', 3), ('002:13:13.123500', 4); +mysql> insert into test.t values('-003:10:10.123456', 1), ('003:11:11.123500', 2), ('003:12:12.123500', 3), ('003:13:13.123500', 4); +mysql> insert into test.t values('-004:10:10.123456', 1), ('004:11:11.123500', 2), ('004:12:12.123500', 3), ('004:13:13.123500', 4); +mysql> insert into test.t values('-005:10:10.123456', 1), ('005:11:11.123500', 2), ('005:12:12.123500', 3), ('005:13:13.123500', 4); +mysql> insert into test.t values('-006:10:10.123456', 1), ('006:11:11.123500', 2), ('006:12:12.123500', 3), ('006:13:13.123500', 4); +mysql> insert into test.t values('-007:10:10.123456', 1), ('007:11:11.123500', 2), ('007:12:12.123500', 3), ('007:13:13.123500', 4); +mysql> insert into test.t values('-008:10:10.123456', 1), ('008:11:11.123500', 2), ('008:12:12.123500', 3), ('008:13:13.123500', 4); +mysql> insert into test.t values('-009:10:10.123456', 1), ('009:11:11.123500', 2), ('009:12:12.123500', 3), ('009:13:13.123500', 4); +mysql> insert into test.t values('-010:10:10.123456', 1), ('010:11:11.123500', 2), ('010:12:12.123500', 3), ('010:13:13.123500', 4); +mysql> insert into test.t values('-011:10:10.123456', 1), ('011:11:11.123500', 2), ('011:12:12.123500', 3), ('011:13:13.123500', 4); +mysql> insert into test.t values('-012:10:10.123456', 1), ('012:11:11.123500', 2), ('012:12:12.123500', 3), ('012:13:13.123500', 4); +mysql> insert into test.t values('-013:10:10.123456', 1), ('013:11:11.123500', 2), ('013:12:12.123500', 3), ('013:13:13.123500', 4); +mysql> insert into test.t values('-014:10:10.123456', 1), ('014:11:11.123500', 2), ('014:12:12.123500', 3), ('014:13:13.123500', 4); +mysql> insert into test.t values('-015:10:10.123456', 1), ('015:11:11.123500', 2), ('015:12:12.123500', 3), ('015:13:13.123500', 4); +mysql> insert into test.t values('-016:10:10.123456', 1), ('016:11:11.123500', 2), ('016:12:12.123500', 3), ('016:13:13.123500', 4); +mysql> insert into test.t values('-017:10:10.123456', 1), ('017:11:11.123500', 2), ('017:12:12.123500', 3), ('017:13:13.123500', 4); +mysql> insert into test.t values('-018:10:10.123456', 1), ('018:11:11.123500', 2), ('018:12:12.123500', 3), ('018:13:13.123500', 4); +mysql> insert into test.t values('-019:10:10.123456', 1), ('019:11:11.123500', 2), ('019:12:12.123500', 3), ('019:13:13.123500', 4); +mysql> insert into test.t values('-020:10:10.123456', 1), ('020:11:11.123500', 2), ('020:12:12.123500', 3), ('020:13:13.123500', 4); +mysql> insert into test.t values('-021:10:10.123456', 1), ('021:11:11.123500', 2), ('021:12:12.123500', 3), ('021:13:13.123500', 4); +mysql> insert into test.t values('-022:10:10.123456', 1), ('022:11:11.123500', 2), ('022:12:12.123500', 3), ('022:13:13.123500', 4); +mysql> insert into test.t values('-023:10:10.123456', 1), ('023:11:11.123500', 2), ('023:12:12.123500', 3), ('023:13:13.123500', 4); +mysql> insert into test.t values('-024:10:10.123456', 1), ('024:11:11.123500', 2), ('024:12:12.123500', 3), ('024:13:13.123500', 4); +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; + +mysql> alter table test.t set tiflash replica 1; + +func> wait_table test t + +mysql> set tidb_isolation_read_engines='tiflash'; select hour(a), i from test.t where a = '024:11:11.123500'; ++---------+------+ +| hour(a) | i | ++---------+------+ +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | +| 24 | 2 | ++---------+------+ + +mysql> drop table test.t; \ No newline at end of file From 13e4763fecf2bba5c8db81f7c4d2afce58c09715 Mon Sep 17 00:00:00 2001 From: jinhelin Date: Tue, 9 Jul 2024 11:43:42 +0800 Subject: [PATCH 2/2] Resolve conflict --- .../Storages/DeltaMerge/DeltaMergeDefines.h | 3 - .../Storages/DeltaMerge/DeltaMergeStore.cpp | 101 +----------------- 2 files changed, 5 insertions(+), 99 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h b/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h index bcde6249487..2044d1e4751 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h @@ -164,8 +164,6 @@ static constexpr bool DM_RUN_CHECK = true; } // namespace DM } // namespace DB -<<<<<<< HEAD -======= template <> struct fmt::formatter @@ -176,4 +174,3 @@ struct fmt::formatter return fmt::format_to(ctx.out(), "{}/{}", cd.id, cd.type->getName()); } }; ->>>>>>> 567bcb1c67 (Storages: Fix returned column types may not match in late-materialization (#9176)) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 5773e25ada7..14f9c9be8e3 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1077,14 +1077,9 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, auto read_mode = getReadMode(db_context, is_fast_scan, keep_order, filter); const auto & final_columns_to_read = filter && filter->extra_cast ? *filter->columns_after_cast : columns_to_read; auto read_task_pool = std::make_shared( -<<<<<<< HEAD physical_table_id, dm_context, - columns_to_read, -======= - extra_table_id_index, final_columns_to_read, ->>>>>>> 567bcb1c67 (Storages: Fix returned column types may not match in late-materialization (#9176)) filter, max_version, expected_block_size, @@ -1115,11 +1110,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, dm_context, read_task_pool, after_segment_read, -<<<<<<< HEAD - columns_to_read, -======= final_columns_to_read, ->>>>>>> 567bcb1c67 (Storages: Fix returned column types may not match in late-materialization (#9176)) filter, max_version, expected_block_size, @@ -1132,24 +1123,11 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, } LOG_INFO( tracing_logger, -<<<<<<< HEAD - "Read create stream done, pool_id={} num_streams={}", + "Read create stream done, pool_id={} num_streams={} columns_to_read={} final_columns_to_read={}", read_task_pool->poolId(), - final_num_stream); -======= - "Read create stream done, keep_order={} dt_enable_read_thread={} enable_read_thread={} " - "is_fast_scan={} is_push_down_filter_empty={} pool_id={} num_streams={} columns_to_read={} " - "final_columns_to_read={}", - keep_order, - db_context.getSettingsRef().dt_enable_read_thread, - enable_read_thread, - is_fast_scan, - filter == nullptr || filter->before_where == nullptr, - read_task_pool->pool_id, final_num_stream, columns_to_read, final_columns_to_read); ->>>>>>> 567bcb1c67 (Storages: Fix returned column types may not match in late-materialization (#9176)) return res; } @@ -1194,21 +1172,12 @@ SourceOps DeltaMergeStore::readSourceOps( }; GET_METRIC(tiflash_storage_read_tasks_count).Increment(tasks.size()); -<<<<<<< HEAD size_t final_num_stream = std::max(1, num_streams); + const auto & final_columns_to_read = filter && filter->extra_cast ? *filter->columns_after_cast : columns_to_read; auto read_task_pool = std::make_shared( physical_table_id, dm_context, - columns_to_read, -======= - size_t final_num_stream - = enable_read_thread ? std::max(1, num_streams) : std::max(1, std::min(num_streams, tasks.size())); - auto read_mode = getReadMode(db_context, is_fast_scan, keep_order, filter); - const auto & final_columns_to_read = filter && filter->extra_cast ? *filter->columns_after_cast : columns_to_read; - auto read_task_pool = std::make_shared( - extra_table_id_index, final_columns_to_read, ->>>>>>> 567bcb1c67 (Storages: Fix returned column types may not match in late-materialization (#9176)) filter, max_version, expected_block_size, @@ -1217,7 +1186,6 @@ SourceOps DeltaMergeStore::readSourceOps( after_segment_read, log_tracing_id, enable_read_thread, -<<<<<<< HEAD final_num_stream); dm_context->scan_context->read_mode = ReadMode::Normal; @@ -1229,74 +1197,15 @@ SourceOps DeltaMergeStore::readSourceOps( std::make_unique( exec_status_, read_task_pool, - columns_to_read, + final_columns_to_read, extra_table_id_index, physical_table_id, log_tracing_id)); } - LOG_INFO(tracing_logger, "Read create SourceOp done, pool_id={} num_streams={}", read_task_pool->poolId(), final_num_stream); + LOG_INFO(tracing_logger, "Read create SourceOp done, pool_id={} num_streams={} columns_to_read={} final_columns_to_read={}", + read_task_pool->poolId(), final_num_stream, columns_to_read, final_columns_to_read); return res; -======= - final_num_stream, - dm_context->scan_context->resource_group_name); - dm_context->scan_context->read_mode = read_mode; - - if (enable_read_thread) - { - for (size_t i = 0; i < final_num_stream; ++i) - { - group_builder.addConcurrency(std::make_unique( - exec_context, - read_task_pool, - final_columns_to_read, - extra_table_id_index, - log_tracing_id, - runtime_filter_list, - rf_max_wait_time_ms)); - } - } - else - { - for (size_t i = 0; i < final_num_stream; ++i) - { - group_builder.addConcurrency(std::make_unique( - exec_context, - dm_context, - read_task_pool, - after_segment_read, - final_columns_to_read, - filter, - start_ts, - expected_block_size, - read_mode, - log_tracing_id)); - } - group_builder.transform([&](auto & builder) { - builder.appendTransformOp(std::make_unique( - exec_context, - log_tracing_id, - final_columns_to_read, - extra_table_id_index, - physical_table_id)); - }); - } - - LOG_INFO( - tracing_logger, - "Read create PipelineExec done, keep_order={} dt_enable_read_thread={} enable_read_thread={} " - "is_fast_scan={} is_push_down_filter_empty={} pool_id={} num_streams={} columns_to_read={} " - "final_columns_to_read={}", - keep_order, - db_context.getSettingsRef().dt_enable_read_thread, - enable_read_thread, - is_fast_scan, - filter == nullptr || filter->before_where == nullptr, - read_task_pool->pool_id, - final_num_stream, - columns_to_read, - final_columns_to_read); ->>>>>>> 567bcb1c67 (Storages: Fix returned column types may not match in late-materialization (#9176)) } Remote::DisaggPhysicalTableReadSnapshotPtr