diff --git a/contrib/tiflash-proxy b/contrib/tiflash-proxy index 5cdf47726c5..70da4ccad80 160000 --- a/contrib/tiflash-proxy +++ b/contrib/tiflash-proxy @@ -1 +1 @@ -Subproject commit 5cdf47726c58e04683c2da3e2892c276fccb3e8a +Subproject commit 70da4ccad80bab4fe9235bd4e1fe203226347a2a diff --git a/dbms/src/Core/SpillHandler.cpp b/dbms/src/Core/SpillHandler.cpp index 0cd7c344959..61693e4a4bd 100644 --- a/dbms/src/Core/SpillHandler.cpp +++ b/dbms/src/Core/SpillHandler.cpp @@ -85,7 +85,7 @@ void SpillHandler::spillBlocks(Blocks && blocks) Stopwatch watch; RUNTIME_CHECK_MSG(spiller->isSpillFinished() == false, "{}: spill after the spiller is finished.", spiller->config.spill_id); auto block_size = blocks.size(); - LOG_DEBUG(spiller->logger, "Spilling {} blocks data into temporary file {}", block_size, current_spill_file_name); + LOG_DEBUG(spiller->logger, "Spilling {} blocks data", block_size); FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_spill); @@ -96,6 +96,8 @@ void SpillHandler::spillBlocks(Blocks && blocks) { if (unlikely(!block || block.rows() == 0)) continue; + /// erase constant column + spiller->removeConstantColumns(block); if (unlikely(writer == nullptr)) { std::tie(rows_in_file, bytes_in_file) = setUpNextSpilledFile(); diff --git a/dbms/src/Core/Spiller.cpp b/dbms/src/Core/Spiller.cpp index 9694b2fab7f..374cf247e38 100644 --- a/dbms/src/Core/Spiller.cpp +++ b/dbms/src/Core/Spiller.cpp @@ -101,6 +101,25 @@ Spiller::Spiller(const SpillConfig & config_, bool is_input_sorted_, UInt64 part { RUNTIME_CHECK_MSG(spill_dir.isDirectory(), "Spill dir {} is a file", spill_dir.path()); } + for (size_t i = 0; i < input_schema.columns(); ++i) + { + if (input_schema.getByPosition(i).column != nullptr && input_schema.getByPosition(i).column->isColumnConst()) + const_column_indexes.push_back(i); + } + RUNTIME_CHECK_MSG(const_column_indexes.size() < input_schema.columns(), "Try to spill blocks containing only constant columns, it is meaningless to spill blocks containing only constant columns"); + header_without_constants = input_schema; + removeConstantColumns(header_without_constants); +} + +void Spiller::removeConstantColumns(Block & block) const +{ + /// note must erase the constant column in reverse order because the index stored in const_column_indexes is based on + /// the original Block, if the column before the index is removed, the index has to be updated or it becomes invalid index + for (auto it = const_column_indexes.rbegin(); it != const_column_indexes.rend(); ++it) // NOLINT + { + RUNTIME_CHECK_MSG(block.getByPosition(*it).column->isColumnConst(), "The {}-th column in block must be constant column", *it); + block.erase(*it); + } } CachedSpillHandlerPtr Spiller::createCachedSpillHandler( @@ -200,7 +219,7 @@ BlockInputStreams Spiller::restoreBlocks(UInt64 partition_id, UInt64 max_stream_ restore_stream_read_rows.push_back(file->getSpillDetails().rows); if (release_spilled_file_on_restore) file_infos.back().file = std::move(file); - ret.push_back(std::make_shared(std::move(file_infos), input_schema, config.file_provider, spill_version)); + ret.push_back(std::make_shared(std::move(file_infos), input_schema, header_without_constants, const_column_indexes, config.file_provider, spill_version)); } } else @@ -221,7 +240,7 @@ BlockInputStreams Spiller::restoreBlocks(UInt64 partition_id, UInt64 max_stream_ for (UInt64 i = 0; i < spill_file_read_stream_num; ++i) { if (likely(!file_infos[i].empty())) - ret.push_back(std::make_shared(std::move(file_infos[i]), input_schema, config.file_provider, spill_version)); + ret.push_back(std::make_shared(std::move(file_infos[i]), input_schema, header_without_constants, const_column_indexes, config.file_provider, spill_version)); } } for (size_t i = 0; i < spill_file_read_stream_num; ++i) diff --git a/dbms/src/Core/Spiller.h b/dbms/src/Core/Spiller.h index 9892dff8924..2735d05dcbe 100644 --- a/dbms/src/Core/Spiller.h +++ b/dbms/src/Core/Spiller.h @@ -101,6 +101,7 @@ class Spiller bool hasSpilledData() const { return has_spilled_data; }; /// only for test now bool releaseSpilledFileOnRestore() const { return release_spilled_file_on_restore; } + void removeConstantColumns(Block & block) const; private: friend class SpillHandler; @@ -117,6 +118,8 @@ class Spiller const UInt64 partition_num; /// todo remove input_schema if spiller does not rely on BlockInputStream const Block input_schema; + std::vector const_column_indexes; + Block header_without_constants; const LoggerPtr logger; std::mutex spill_finished_mutex; bool spill_finished = false; diff --git a/dbms/src/Core/tests/gtest_spiller.cpp b/dbms/src/Core/tests/gtest_spiller.cpp index 95493cbae39..531ef855fe1 100644 --- a/dbms/src/Core/tests/gtest_spiller.cpp +++ b/dbms/src/Core/tests/gtest_spiller.cpp @@ -53,13 +53,13 @@ class SpillerTest : public testing::Test if (spiller_dir.exists()) spiller_dir.remove(true); } - Blocks generateBlocks(size_t block_num) + Blocks generateBlocks(size_t block_num, const Block & schema) { Blocks ret; for (size_t i = 0; i < block_num; ++i) { ColumnsWithTypeAndName data; - for (const auto & type_and_name : spiller_test_header) + for (const auto & type_and_name : schema) { auto column = type_and_name.type->createColumn(); for (size_t k = 0; k < 100; ++k) @@ -70,6 +70,10 @@ class SpillerTest : public testing::Test } return ret; } + Blocks generateBlocks(size_t block_num) + { + return generateBlocks(block_num, spiller_test_header); + } Blocks generateSortedBlocks(size_t block_num) { Blocks ret; @@ -491,33 +495,136 @@ try } CATCH -TEST_F(SpillerTest, SpillAndRestoreConstantData) +TEST_F(SpillerTest, SpillAllConstantBlock) try { - Spiller spiller(*spill_config_ptr, false, 1, spiller_test_header, logger); - Blocks ret; - ColumnsWithTypeAndName data; - for (const auto & type_and_name : spiller_test_header) - { - auto column = type_and_name.type->createColumnConst(100, Field(static_cast(1))); - data.push_back(ColumnWithTypeAndName(std::move(column), type_and_name.type, type_and_name.name)); - } - ret.emplace_back(data); - auto reference = ret; - spiller.spillBlocks(std::move(ret), 0); - spiller.finishSpill(); - auto block_streams = spiller.restoreBlocks(0, 2); - GTEST_ASSERT_EQ(block_streams.size(), 1); - Blocks restored_blocks; - for (auto & block_stream : block_streams) + auto constant_header = spiller_test_header; + for (auto & type_and_name : constant_header) + type_and_name.column = type_and_name.type->createColumnConst(1, Field(static_cast(1))); + + Spiller spiller(*spill_config_ptr, false, 1, constant_header, logger); + GTEST_FAIL(); +} +catch (Exception & e) +{ + GTEST_ASSERT_EQ(e.message(), "Check const_column_indexes.size() < input_schema.columns() failed: Try to spill blocks containing only constant columns, it is meaningless to spill blocks containing only constant columns"); +} + +TEST_F(SpillerTest, SpillWithConstantSchemaAndNonConstantData) +try +{ + NamesAndTypes names_and_types; + names_and_types.emplace_back("col0", DataTypeFactory::instance().get("Int64")); + names_and_types.emplace_back("col1", DataTypeFactory::instance().get("UInt64")); + + std::vector const_columns_flag = { + true, + false, + }; + + + ColumnsWithTypeAndName columns; + for (size_t i = 0; i < names_and_types.size(); i++) { - for (Block block = block_stream->read(); block; block = block_stream->read()) - restored_blocks.push_back(block); + if (const_columns_flag[i]) + { + /// const column + columns.emplace_back(names_and_types[i].type->createColumnConst(1, Field(static_cast(1))), + names_and_types[i].type, + names_and_types[i].name); + } + else + { + /// normal column + columns.emplace_back(names_and_types[i].type->createColumn(), + names_and_types[i].type, + names_and_types[i].name); + } } - GTEST_ASSERT_EQ(reference.size(), restored_blocks.size()); - for (size_t i = 0; i < reference.size(); ++i) + Block header(columns); + Spiller spiller(*spill_config_ptr, false, 1, header, logger); + auto all_blocks = generateBlocks(20, header); + spiller.spillBlocks(std::move(all_blocks), 0); + GTEST_FAIL(); +} +catch (Exception & e) +{ + GTEST_ASSERT_EQ(e.message().find("Check block.getByPosition(*it).column->isColumnConst() failed: The 0-th column in block must be constant column") != std::string::npos, true); +} + +TEST_F(SpillerTest, SpillAndRestoreConstantData) +try +{ + NamesAndTypes names_and_types; + names_and_types.emplace_back("col0", DataTypeFactory::instance().get("Int64")); + names_and_types.emplace_back("col1", DataTypeFactory::instance().get("UInt64")); + names_and_types.emplace_back("col2", DataTypeFactory::instance().get("Nullable(Int64)")); + names_and_types.emplace_back("col3", DataTypeFactory::instance().get("Nullable(UInt64)")); + names_and_types.emplace_back("col4", DataTypeFactory::instance().get("Int64")); + names_and_types.emplace_back("col5", DataTypeFactory::instance().get("UInt64")); + + std::vector> const_columns_flags = { + {false, false, false, false, false, true}, + {false, true, true, true, true, true}, + {true, false, false, false, false, false}, + {true, true, true, true, true, false}, + {true, false, true, false, true, false}, + {false, true, false, true, false, true}, + {false, true, false, true, false, false}, + {true, false, true, false, true, true}, + }; + + for (const auto & const_columns_flag : const_columns_flags) { - blockEqual(materializeBlock(reference[i]), restored_blocks[i]); + ColumnsWithTypeAndName columns; + for (size_t i = 0; i < names_and_types.size(); i++) + { + if (const_columns_flag[i]) + { + /// const column + columns.emplace_back(names_and_types[i].type->createColumnConst(1, Field(static_cast(1))), + names_and_types[i].type, + names_and_types[i].name); + } + else + { + /// normal column + columns.emplace_back(names_and_types[i].type->createColumn(), + names_and_types[i].type, + names_and_types[i].name); + } + } + Block header(columns); + Spiller spiller(*spill_config_ptr, false, 1, header, logger); + auto all_blocks = generateBlocks(20, header); + for (auto & block : all_blocks) + { + for (size_t i = 0; i < const_columns_flag.size(); i++) + { + if (header.getByPosition(i).column->isColumnConst()) + { + Field constant_field; + header.getByPosition(i).column->get(0, constant_field); + block.getByPosition(i).column = header.getByPosition(i).type->createColumnConst(block.rows(), constant_field); + } + } + } + auto reference = all_blocks; + spiller.spillBlocks(std::move(all_blocks), 0); + spiller.finishSpill(); + auto block_streams = spiller.restoreBlocks(0, 1); + GTEST_ASSERT_EQ(block_streams.size(), 1); + Blocks restored_blocks; + for (auto & block_stream : block_streams) + { + for (Block block = block_stream->read(); block; block = block_stream->read()) + restored_blocks.push_back(block); + } + GTEST_ASSERT_EQ(reference.size(), restored_blocks.size()); + for (size_t i = 0; i < reference.size(); ++i) + { + blockEqual(materializeBlock(reference[i]), restored_blocks[i]); + } } } CATCH diff --git a/dbms/src/DataStreams/SpilledFilesInputStream.cpp b/dbms/src/DataStreams/SpilledFilesInputStream.cpp index 2d4e4bcef09..bef3952ac0c 100644 --- a/dbms/src/DataStreams/SpilledFilesInputStream.cpp +++ b/dbms/src/DataStreams/SpilledFilesInputStream.cpp @@ -22,18 +22,43 @@ namespace FailPoints extern const char random_restore_from_disk_failpoint[]; } // namespace FailPoints -SpilledFilesInputStream::SpilledFilesInputStream(std::vector && spilled_file_infos_, const Block & header_, const FileProviderPtr & file_provider_, Int64 max_supported_spill_version_) +SpilledFilesInputStream::SpilledFilesInputStream( + std::vector && spilled_file_infos_, + const Block & header_, + const Block & header_without_constants_, + const std::vector & const_column_indexes_, + const FileProviderPtr & file_provider_, + Int64 max_supported_spill_version_) : spilled_file_infos(std::move(spilled_file_infos_)) , header(header_) + , header_without_constants(header_without_constants_) + , const_column_indexes(const_column_indexes_) , file_provider(file_provider_) , max_supported_spill_version(max_supported_spill_version_) { RUNTIME_CHECK_MSG(!spilled_file_infos.empty(), "Spilled files must not be empty"); current_reading_file_index = 0; - current_file_stream = std::make_unique(std::move(spilled_file_infos[0]), header, file_provider, max_supported_spill_version); + current_file_stream = std::make_unique(std::move(spilled_file_infos[0]), header_without_constants, file_provider, max_supported_spill_version); } Block SpilledFilesInputStream::readImpl() +{ + auto ret = readInternal(); + if likely (ret) + { + assert(ret.columns() != 0); + size_t rows = ret.rows(); + for (const auto index : const_column_indexes) + { + const auto & col_type_name = header.getByPosition(index); + assert(col_type_name.column->isColumnConst()); + ret.insert(index, {col_type_name.column->cloneResized(rows), col_type_name.type, col_type_name.name}); + } + } + return ret; +} + +Block SpilledFilesInputStream::readInternal() { if (unlikely(current_file_stream == nullptr)) return {}; diff --git a/dbms/src/DataStreams/SpilledFilesInputStream.h b/dbms/src/DataStreams/SpilledFilesInputStream.h index 18cf4bd7205..68f83ae3a97 100644 --- a/dbms/src/DataStreams/SpilledFilesInputStream.h +++ b/dbms/src/DataStreams/SpilledFilesInputStream.h @@ -22,7 +22,6 @@ namespace DB { - struct SpilledFileInfo { String path; @@ -35,12 +34,19 @@ struct SpilledFileInfo class SpilledFilesInputStream : public IProfilingBlockInputStream { public: - SpilledFilesInputStream(std::vector && spilled_file_infos, const Block & header, const FileProviderPtr & file_provider, Int64 max_supported_spill_version); + SpilledFilesInputStream( + std::vector && spilled_file_infos, + const Block & header, + const Block & header_without_constants, + const std::vector & const_column_indexes, + const FileProviderPtr & file_provider, + Int64 max_supported_spill_version); Block getHeader() const override; String getName() const override; protected: Block readImpl() override; + Block readInternal(); private: struct SpilledFileStream @@ -68,6 +74,8 @@ class SpilledFilesInputStream : public IProfilingBlockInputStream std::vector spilled_file_infos; size_t current_reading_file_index; Block header; + Block header_without_constants; + std::vector const_column_indexes; FileProviderPtr file_provider; Int64 max_supported_spill_version; std::unique_ptr current_file_stream; diff --git a/dbms/src/Flash/Coprocessor/TiDBTableScan.cpp b/dbms/src/Flash/Coprocessor/TiDBTableScan.cpp index 41b4428d834..e833778c41d 100644 --- a/dbms/src/Flash/Coprocessor/TiDBTableScan.cpp +++ b/dbms/src/Flash/Coprocessor/TiDBTableScan.cpp @@ -28,10 +28,8 @@ TiDBTableScan::TiDBTableScan( , pushed_down_filters(is_partition_table_scan ? std::move(table_scan->partition_table_scan().pushed_down_filter_conditions()) : std::move(table_scan->tbl_scan().pushed_down_filter_conditions())) // Only No-partition table need keep order when tablescan executor required keep order. // If keep_order is not set, keep order for safety. - // When keep_order is true, we will not push down filters to tablescan in TiDB. - // So even if keep_order is not set (may be lost), but pushed down filters is not empty, we can't keep order. - , keep_order(!is_partition_table_scan && pushed_down_filters.empty() && (!table_scan->tbl_scan().has_keep_order() || table_scan->tbl_scan().keep_order())) - , is_fast_scan(table_scan->tbl_scan().is_fast_scan()) + , keep_order(!is_partition_table_scan && (table_scan->tbl_scan().keep_order() || !table_scan->tbl_scan().has_keep_order())) + , is_fast_scan(is_partition_table_scan ? table_scan->partition_table_scan().is_fast_scan() : table_scan->tbl_scan().is_fast_scan()) { if (is_partition_table_scan) { @@ -81,6 +79,7 @@ void TiDBTableScan::constructTableScanForRemoteRead(tipb::TableScan * tipb_table for (auto id : partition_table_scan.primary_prefix_column_ids()) tipb_table_scan->add_primary_prefix_column_ids(id); tipb_table_scan->set_is_fast_scan(partition_table_scan.is_fast_scan()); + tipb_table_scan->set_keep_order(false); } else { diff --git a/dbms/src/Flash/Executor/PipelineExecutorStatus.cpp b/dbms/src/Flash/Executor/PipelineExecutorStatus.cpp index c354d8e86c3..3683d01b12f 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorStatus.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutorStatus.cpp @@ -16,7 +16,7 @@ namespace DB { -ExecutionResult PipelineExecutorStatus::toExecutionResult() noexcept +ExecutionResult PipelineExecutorStatus::toExecutionResult() { std::lock_guard lock(mu); return exception_ptr @@ -24,13 +24,13 @@ ExecutionResult PipelineExecutorStatus::toExecutionResult() noexcept : ExecutionResult::success(); } -std::exception_ptr PipelineExecutorStatus::getExceptionPtr() noexcept +std::exception_ptr PipelineExecutorStatus::getExceptionPtr() { std::lock_guard lock(mu); return exception_ptr; } -String PipelineExecutorStatus::getExceptionMsg() noexcept +String PipelineExecutorStatus::getExceptionMsg() { try { @@ -49,13 +49,13 @@ String PipelineExecutorStatus::getExceptionMsg() noexcept } } -void PipelineExecutorStatus::onErrorOccurred(const String & err_msg) noexcept +void PipelineExecutorStatus::onErrorOccurred(const String & err_msg) { DB::Exception e(err_msg); onErrorOccurred(std::make_exception_ptr(e)); } -bool PipelineExecutorStatus::setExceptionPtr(const std::exception_ptr & exception_ptr_) noexcept +bool PipelineExecutorStatus::setExceptionPtr(const std::exception_ptr & exception_ptr_) { RUNTIME_ASSERT(exception_ptr_ != nullptr); std::lock_guard lock(mu); @@ -65,12 +65,12 @@ bool PipelineExecutorStatus::setExceptionPtr(const std::exception_ptr & exceptio return true; } -bool PipelineExecutorStatus::isWaitMode() noexcept +bool PipelineExecutorStatus::isWaitMode() { return !result_queue.has_value(); } -void PipelineExecutorStatus::onErrorOccurred(const std::exception_ptr & exception_ptr_) noexcept +void PipelineExecutorStatus::onErrorOccurred(const std::exception_ptr & exception_ptr_) { if (setExceptionPtr(exception_ptr_)) { @@ -79,7 +79,7 @@ void PipelineExecutorStatus::onErrorOccurred(const std::exception_ptr & exceptio } } -void PipelineExecutorStatus::wait() noexcept +void PipelineExecutorStatus::wait() { { std::unique_lock lock(mu); @@ -89,7 +89,7 @@ void PipelineExecutorStatus::wait() noexcept LOG_DEBUG(log, "query finished and wait done"); } -ResultQueuePtr PipelineExecutorStatus::getConsumedResultQueue() noexcept +ResultQueuePtr PipelineExecutorStatus::getConsumedResultQueue() { std::lock_guard lock(mu); RUNTIME_ASSERT(!isWaitMode()); @@ -98,7 +98,7 @@ ResultQueuePtr PipelineExecutorStatus::getConsumedResultQueue() noexcept return consumed_result_queue; } -void PipelineExecutorStatus::consume(ResultHandler & result_handler) noexcept +void PipelineExecutorStatus::consume(ResultHandler & result_handler) { RUNTIME_ASSERT(result_handler); auto consumed_result_queue = getConsumedResultQueue(); @@ -114,13 +114,13 @@ void PipelineExecutorStatus::consume(ResultHandler & result_handler) noexcept LOG_DEBUG(log, "query finished and consume done"); } -void PipelineExecutorStatus::onEventSchedule() noexcept +void PipelineExecutorStatus::onEventSchedule() { std::lock_guard lock(mu); ++active_event_count; } -void PipelineExecutorStatus::onEventFinish() noexcept +void PipelineExecutorStatus::onEventFinish() { std::lock_guard lock(mu); RUNTIME_ASSERT(active_event_count > 0); @@ -143,12 +143,12 @@ void PipelineExecutorStatus::onEventFinish() noexcept } } -void PipelineExecutorStatus::cancel() noexcept +void PipelineExecutorStatus::cancel() { is_cancelled.store(true, std::memory_order_release); } -ResultQueuePtr PipelineExecutorStatus::toConsumeMode(size_t queue_size) noexcept +ResultQueuePtr PipelineExecutorStatus::toConsumeMode(size_t queue_size) { std::lock_guard lock(mu); RUNTIME_ASSERT(!result_queue.has_value()); diff --git a/dbms/src/Flash/Executor/PipelineExecutorStatus.h b/dbms/src/Flash/Executor/PipelineExecutorStatus.h index 40b27b905fe..ad5bc1702f5 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorStatus.h +++ b/dbms/src/Flash/Executor/PipelineExecutorStatus.h @@ -39,19 +39,19 @@ class PipelineExecutorStatus : private boost::noncopyable : log(Logger::get(req_id)) {} - ExecutionResult toExecutionResult() noexcept; + ExecutionResult toExecutionResult(); - std::exception_ptr getExceptionPtr() noexcept; - String getExceptionMsg() noexcept; + std::exception_ptr getExceptionPtr(); + String getExceptionMsg(); - void onEventSchedule() noexcept; + void onEventSchedule(); - void onEventFinish() noexcept; + void onEventFinish(); - void onErrorOccurred(const String & err_msg) noexcept; - void onErrorOccurred(const std::exception_ptr & exception_ptr_) noexcept; + void onErrorOccurred(const String & err_msg); + void onErrorOccurred(const std::exception_ptr & exception_ptr_); - void wait() noexcept; + void wait(); template void waitFor(const Duration & timeout_duration) @@ -71,7 +71,7 @@ class PipelineExecutorStatus : private boost::noncopyable LOG_DEBUG(log, "query finished and wait done"); } - void consume(ResultHandler & result_handler) noexcept; + void consume(ResultHandler & result_handler); template void consumeFor(ResultHandler & result_handler, const Duration & timeout_duration) @@ -106,14 +106,14 @@ class PipelineExecutorStatus : private boost::noncopyable LOG_DEBUG(log, "query finished and consume done"); } - void cancel() noexcept; + void cancel(); - ALWAYS_INLINE bool isCancelled() noexcept + ALWAYS_INLINE bool isCancelled() { return is_cancelled.load(std::memory_order_acquire); } - ResultQueuePtr toConsumeMode(size_t queue_size) noexcept; + ResultQueuePtr toConsumeMode(size_t queue_size); void update(const TaskProfileInfo & task_profile_info) { @@ -126,12 +126,12 @@ class PipelineExecutorStatus : private boost::noncopyable } private: - bool setExceptionPtr(const std::exception_ptr & exception_ptr_) noexcept; + bool setExceptionPtr(const std::exception_ptr & exception_ptr_); // Need to be called under lock. - bool isWaitMode() noexcept; + bool isWaitMode(); - ResultQueuePtr getConsumedResultQueue() noexcept; + ResultQueuePtr getConsumedResultQueue(); private: LoggerPtr log; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 8e7fda62ce9..9c7cdcd8e31 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1025,7 +1025,10 @@ static ReadMode getReadModeImpl(const Context & db_context, bool is_fast_scan, b ReadMode DeltaMergeStore::getReadMode(const Context & db_context, bool is_fast_scan, bool keep_order, const PushDownFilterPtr & filter) { auto read_mode = getReadModeImpl(db_context, is_fast_scan, keep_order); - RUNTIME_CHECK_MSG(!filter || !filter->before_where || read_mode == ReadMode::Bitmap, "Push down filters needs bitmap"); + RUNTIME_CHECK_MSG(!filter || !filter->before_where || read_mode == ReadMode::Bitmap, + "Push down filters needs bitmap, push down filters is empty: {}, read mode: {}", + filter == nullptr || filter->before_where == nullptr, + magic_enum::enum_name(read_mode)); return read_mode; } @@ -1055,11 +1058,13 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, SegmentReadTasks tasks = getReadTasksByRanges(*dm_context, sorted_ranges, num_streams, read_segments, /*try_split_task =*/!enable_read_thread); auto log_tracing_id = getLogTracingId(*dm_context); auto tracing_logger = log->getChild(log_tracing_id); - LOG_DEBUG(tracing_logger, - "Read create segment snapshot done, keep_order={} dt_enable_read_thread={} enable_read_thread={}", - keep_order, - db_context.getSettingsRef().dt_enable_read_thread, - enable_read_thread); + LOG_INFO(tracing_logger, + "Read create segment snapshot done, keep_order={} dt_enable_read_thread={} enable_read_thread={} is_fast_scan={} is_push_down_filter_empty={}", + keep_order, + db_context.getSettingsRef().dt_enable_read_thread, + enable_read_thread, + is_fast_scan, + filter == nullptr || filter->before_where == nullptr); auto after_segment_read = [&](const DMContextPtr & dm_context_, const SegmentPtr & segment_) { // TODO: Update the tracing_id before checkSegmentUpdate? diff --git a/tests/fullstack-test/issues/issue_7519.test b/tests/fullstack-test/issues/issue_7519.test new file mode 100644 index 00000000000..5366180300c --- /dev/null +++ b/tests/fullstack-test/issues/issue_7519.test @@ -0,0 +1,88 @@ +# Copyright 2023 PingCAP, Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Preparation. +=> DBGInvoke __init_fail_point() + +mysql> drop table if exists test.t +mysql> create table test.t (x int, a varchar(50), y int, t time) partition by range (x) (partition p0 values less than (5), partition p1 values less than (10)); + +mysql> insert into test.t values (1, 'a', 1, '700:11:11.1234'), (2, 'b', 2, '711:12:12.1234'); +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t values (8, 'c', 8, '500:21:21.1234'); + +mysql> alter table test.t set tiflash replica 1; +func> wait_table test t +mysql> analyze table test.t; + +mysql> set tidb_partition_prune_mode=dynamic; set tidb_enforce_mpp=1; select count(*) from test.t; ++----------+ +| count(*) | ++----------+ +| 16385 | ++----------+ + +mysql> set tidb_partition_prune_mode=dynamic; set tidb_enforce_mpp=1; select * from test.t where x >= 5 and x < 10; ++------+------+------+-----------+ +| x | a | y | t | ++------+------+------+-----------+ +| 8 | c | 8 | 500:21:21 | ++------+------+------+-----------+ + +mysql> set tidb_partition_prune_mode=dynamic; set tidb_enforce_mpp=1; select x, a, y, hour(t) from test.t where x >= 5 and x < 10; ++------+------+------+---------+ +| x | a | y | hour(t) | ++------+------+------+---------+ +| 8 | c | 8 | 500 | ++------+------+------+---------+ + +=> DBGInvoke __enable_fail_point(force_remote_read_for_batch_cop) + +mysql> set tidb_partition_prune_mode=dynamic; set tidb_enforce_mpp=1; select count(*) from test.t; ++----------+ +| count(*) | ++----------+ +| 16385 | ++----------+ + +mysql> set tidb_partition_prune_mode=dynamic; set tidb_enforce_mpp=1; select * from test.t where x >= 5 and x < 10; ++------+------+------+-----------+ +| x | a | y | t | ++------+------+------+-----------+ +| 8 | c | 8 | 500:21:21 | ++------+------+------+-----------+ + +mysql> set tidb_partition_prune_mode=dynamic; set tidb_enforce_mpp=1; select x, a, y, hour(t) from test.t where x >= 5 and x < 10; ++------+------+------+---------+ +| x | a | y | hour(t) | ++------+------+------+---------+ +| 8 | c | 8 | 500 | ++------+------+------+---------+ + +=> DBGInvoke __disable_fail_point(force_remote_read_for_batch_cop) + +# Clean up. +mysql> drop table if exists test.t;