From 03f187462cb7dc2a9d6fbe72f8924798236adc0f Mon Sep 17 00:00:00 2001 From: xufei Date: Wed, 22 Feb 2023 17:05:14 +0800 Subject: [PATCH] merge master to join spill branch (#1) * Refine `Spiller` to support append write (#6862) ref pingcap/tiflash#6528 * fix build Signed-off-by: xufei * fix Signed-off-by: xufei * fix error Signed-off-by: xufei --------- Signed-off-by: xufei --- dbms/src/Columns/ColumnAggregateFunction.cpp | 24 +++ dbms/src/Columns/ColumnAggregateFunction.h | 2 + dbms/src/Columns/ColumnString.cpp | 5 - dbms/src/Columns/ColumnString.h | 1 + dbms/src/Columns/IColumn.h | 3 + dbms/src/Common/FailPoint.cpp | 5 +- dbms/src/Core/Block.cpp | 9 ++ dbms/src/Core/Block.h | 2 + dbms/src/Core/SpillConfig.cpp | 6 +- dbms/src/Core/SpillConfig.h | 10 +- dbms/src/Core/SpillHandler.cpp | 100 +++++++++--- dbms/src/Core/SpillHandler.h | 9 +- dbms/src/Core/Spiller.cpp | 152 ++++++++++++++---- dbms/src/Core/Spiller.h | 68 +++++--- dbms/src/Core/tests/gtest_block.cpp | 111 +++++++++++++ dbms/src/Core/tests/gtest_spiller.cpp | 126 +++++++++++++-- dbms/src/DataStreams/copyData.cpp | 24 --- dbms/src/DataStreams/copyData.h | 4 - .../Coprocessor/DAGQueryBlockInterpreter.cpp | 17 +- .../Flash/Coprocessor/InterpreterUtils.cpp | 4 +- .../Planner/Plans/PhysicalAggregation.cpp | 2 +- dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp | 13 +- .../Flash/tests/gtest_spill_aggregation.cpp | 4 +- dbms/src/Flash/tests/gtest_spill_sort.cpp | 4 +- dbms/src/Functions/FunctionsString.cpp | 3 +- dbms/src/Interpreters/ExpressionAnalyzer.cpp | 7 +- .../Interpreters/InterpreterSelectQuery.cpp | 6 +- dbms/src/Interpreters/Join.cpp | 46 +++--- dbms/src/Interpreters/Join.h | 13 +- dbms/src/Interpreters/Settings.h | 6 +- 30 files changed, 591 insertions(+), 195 deletions(-) create mode 100644 dbms/src/Core/tests/gtest_block.cpp diff --git a/dbms/src/Columns/ColumnAggregateFunction.cpp b/dbms/src/Columns/ColumnAggregateFunction.cpp index fee5c7578b1..46ff4ddf5e9 100644 --- a/dbms/src/Columns/ColumnAggregateFunction.cpp +++ b/dbms/src/Columns/ColumnAggregateFunction.cpp @@ -14,11 +14,15 @@ #include #include +#include #include #include #include #include #include +#include +#include +#include #include #include @@ -244,6 +248,26 @@ size_t ColumnAggregateFunction::allocatedBytes() const return res; } +size_t ColumnAggregateFunction::estimateByteSizeForSpill() const +{ + static const std::unordered_set trivial_agg_func_name{"sum", "min", "max", "count", "avg", "first_row", "any"}; + if (trivial_agg_func_name.find(func->getName()) != trivial_agg_func_name.end()) + { + size_t res = func->sizeOfData() * size(); + /// For trivial agg, we can estimate each element's size as `func->sizeofData()`, and + /// if the result is String, use `APPROX_STRING_SIZE` as the average size of the String + if (removeNullable(func->getReturnType())->isString()) + res += size() * ColumnString::APPROX_STRING_SIZE; + return res; + } + else + { + /// For non-trivial agg like uniqXXX/group_concat, can't estimate the memory usage, so just return byteSize(), + /// it will highly overestimates size of a column if it was produced in AggregatingBlockInputStream (it contains size of other columns) + return byteSize(); + } +} + MutableColumnPtr ColumnAggregateFunction::cloneEmpty() const { return create(func, Arenas(1, std::make_shared())); diff --git a/dbms/src/Columns/ColumnAggregateFunction.h b/dbms/src/Columns/ColumnAggregateFunction.h index 0b7ff103146..618e6e52410 100644 --- a/dbms/src/Columns/ColumnAggregateFunction.h +++ b/dbms/src/Columns/ColumnAggregateFunction.h @@ -181,6 +181,8 @@ class ColumnAggregateFunction final : public COWPtrHelper #include - -/// Used in the `reserve` method, when the number of rows is known, but sizes of elements are not. -#define APPROX_STRING_SIZE 64 - - namespace DB { namespace ErrorCodes diff --git a/dbms/src/Columns/ColumnString.h b/dbms/src/Columns/ColumnString.h index eecbc371a39..df40832142e 100644 --- a/dbms/src/Columns/ColumnString.h +++ b/dbms/src/Columns/ColumnString.h @@ -33,6 +33,7 @@ class ColumnString final : public COWPtrHelper { public: using Chars_t = PaddedPODArray; + static const auto APPROX_STRING_SIZE = 64; private: friend class COWPtrHelper; diff --git a/dbms/src/Columns/IColumn.h b/dbms/src/Columns/IColumn.h index e406fa0b7b5..7f136b63cae 100644 --- a/dbms/src/Columns/IColumn.h +++ b/dbms/src/Columns/IColumn.h @@ -325,6 +325,9 @@ class IColumn : public COWPtr /// Size of column data in memory (may be approximate) - for profiling. Zero, if could not be determined. virtual size_t byteSize() const = 0; + /// Size of the column if it is spilled, the same as byteSize() except for ColumnAggregateFunction + virtual size_t estimateByteSizeForSpill() const { return byteSize(); } + /// Size of column data between [offset, offset+limit) in memory (may be approximate) - for profiling. /// This method throws NOT_IMPLEMENTED exception if it is called with unimplemented subclass. virtual size_t byteSize(size_t /*offset*/, size_t /*limit*/) const diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index af0323a1d67..cd3402ca144 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -14,7 +14,6 @@ #include #include -#include #include #include #include @@ -22,7 +21,6 @@ #include #include -#include namespace DB { @@ -69,7 +67,8 @@ std::unordered_map> FailPointHelper::f M(exception_after_drop_segment) \ M(exception_between_schema_change_in_the_same_diff) \ M(force_ps_wal_compact) \ - M(pause_before_full_gc_prepare) + M(pause_before_full_gc_prepare) \ + M(exception_during_spill) #define APPLY_FOR_FAILPOINTS(M) \ M(skip_check_segment_update) \ diff --git a/dbms/src/Core/Block.cpp b/dbms/src/Core/Block.cpp index 4655fad90e0..62174b84eb7 100644 --- a/dbms/src/Core/Block.cpp +++ b/dbms/src/Core/Block.cpp @@ -282,6 +282,15 @@ size_t Block::bytes() const return res; } +size_t Block::estimateBytesForSpill() const +{ + size_t res = 0; + for (const auto & elem : data) + res += elem.column->estimateByteSizeForSpill(); + + return res; +} + size_t Block::bytes(size_t offset, size_t limit) const { size_t res = 0; diff --git a/dbms/src/Core/Block.h b/dbms/src/Core/Block.h index 0d337d6d3e2..73184ab48e5 100644 --- a/dbms/src/Core/Block.h +++ b/dbms/src/Core/Block.h @@ -111,6 +111,8 @@ class Block /// Approximate number of bytes in memory - for profiling and limits. size_t bytes() const; + size_t estimateBytesForSpill() const; + /// Approximate number of bytes between [offset, offset+limit) in memory - for profiling and limits. size_t bytes(size_t offset, size_t limit) const; diff --git a/dbms/src/Core/SpillConfig.cpp b/dbms/src/Core/SpillConfig.cpp index 7f0a88958a8..d5a63d9adc9 100644 --- a/dbms/src/Core/SpillConfig.cpp +++ b/dbms/src/Core/SpillConfig.cpp @@ -27,11 +27,13 @@ bool needReplace(char c) return std::isspace(c) || String::npos != forbidden_or_unusual_chars.find(c); } } // namespace -SpillConfig::SpillConfig(const DB::String & spill_dir_, const DB::String & spill_id_, size_t max_spilled_size_per_spill_, const FileProviderPtr & file_provider_) +SpillConfig::SpillConfig(const DB::String & spill_dir_, const DB::String & spill_id_, size_t max_cached_data_bytes_in_spiller_, size_t max_spilled_rows_per_file_, size_t max_spilled_bytes_per_file_, const FileProviderPtr & file_provider_) : spill_dir(spill_dir_) , spill_id(spill_id_) , spill_id_as_file_name_prefix(spill_id) - , max_spilled_size_per_spill(max_spilled_size_per_spill_) + , max_cached_data_bytes_in_spiller(max_cached_data_bytes_in_spiller_) + , max_spilled_rows_per_file(max_spilled_rows_per_file_) + , max_spilled_bytes_per_file(max_spilled_bytes_per_file_) , file_provider(file_provider_) { RUNTIME_CHECK_MSG(!spill_dir.empty(), "Spiller dir must be non-empty"); diff --git a/dbms/src/Core/SpillConfig.h b/dbms/src/Core/SpillConfig.h index 41e194787fa..d43e6529b79 100644 --- a/dbms/src/Core/SpillConfig.h +++ b/dbms/src/Core/SpillConfig.h @@ -18,18 +18,22 @@ namespace DB { - class FileProvider; using FileProviderPtr = std::shared_ptr; struct SpillConfig { public: - SpillConfig(const String & spill_dir_, const String & spill_id_, size_t max_spilled_size_per_spill_, const FileProviderPtr & file_provider_); + SpillConfig(const String & spill_dir_, const String & spill_id_, size_t max_cached_data_bytes_in_spiller_, size_t max_spilled_rows_per_file_, size_t max_spilled_bytes_per_file_, const FileProviderPtr & file_provider_); String spill_dir; String spill_id; String spill_id_as_file_name_prefix; - size_t max_spilled_size_per_spill; + /// soft limit of the max cached data bytes in spiller(used in Spiller::spillBlocksUsingBlockInputStream) + size_t max_cached_data_bytes_in_spiller; + /// soft limit of the max rows per spilled file + UInt64 max_spilled_rows_per_file; + /// soft limit of the max bytes per spilled file + UInt64 max_spilled_bytes_per_file; FileProviderPtr file_provider; }; } // namespace DB diff --git a/dbms/src/Core/SpillHandler.cpp b/dbms/src/Core/SpillHandler.cpp index fc778b19ff2..c6febdb1754 100644 --- a/dbms/src/Core/SpillHandler.cpp +++ b/dbms/src/Core/SpillHandler.cpp @@ -12,19 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include namespace DB { +namespace FailPoints +{ +extern const char exception_during_spill[]; +} // namespace FailPoints -SpillHandler::SpillWriter::SpillWriter(const FileProviderPtr & file_provider, const String & file_name, const Block & header, size_t spill_version) - : file_buf(file_provider, file_name, EncryptionPath(file_name, "")) +SpillHandler::SpillWriter::SpillWriter(const FileProviderPtr & file_provider, const String & file_name, bool append_write, const Block & header, size_t spill_version) + : file_buf(file_provider, file_name, EncryptionPath(file_name, ""), true, nullptr, DBMS_DEFAULT_BUFFER_SIZE, append_write ? O_APPEND | O_WRONLY : -1) , compressed_buf(file_buf) { - /// note this implicitly assumes that a SpillWriter will always write to a new file, - /// if we support append write, don't need to write the spill version again - writeVarUInt(spill_version, compressed_buf); + if (!append_write) + writeVarUInt(spill_version, compressed_buf); out = std::make_unique(compressed_buf, spill_version, header); out->writePrefix(); } @@ -44,43 +48,74 @@ void SpillHandler::SpillWriter::write(const Block & block) out->write(block); } -SpillHandler::SpillHandler(Spiller * spiller_, std::unique_ptr && spilled_file, size_t partition_id_) +SpillHandler::SpillHandler(Spiller * spiller_, size_t partition_id_) : spiller(spiller_) , partition_id(partition_id_) + , current_spilled_file_index(-1) + , writer(nullptr) +{} + +std::pair SpillHandler::setUpNextSpilledFile() { + assert(writer == nullptr); + auto [spilled_file, append_write] = spiller->getOrCreateSpilledFile(partition_id); + if (append_write) + prev_spill_details.merge(spilled_file->getSpillDetails()); current_spill_file_name = spilled_file->path(); - current_spilled_file_index = 0; spilled_files.push_back(std::move(spilled_file)); + current_spilled_file_index = spilled_files.size() - 1; + writer = std::make_unique(spiller->config.file_provider, current_spill_file_name, append_write, spiller->input_schema, spiller->spill_version); + return std::make_pair(spilled_files[current_spilled_file_index]->getSpillDetails().rows, spilled_files[current_spilled_file_index]->getSpillDetails().data_bytes_uncompressed); +} + +bool SpillHandler::isSpilledFileFull(UInt64 spilled_rows, UInt64 spilled_bytes) +{ + return (spiller->config.max_spilled_rows_per_file > 0 && spilled_rows >= spiller->config.max_spilled_rows_per_file) || (spiller->config.max_spilled_bytes_per_file > 0 && spilled_bytes >= spiller->config.max_spilled_bytes_per_file); } void SpillHandler::spillBlocks(const Blocks & blocks) { - /// todo 1. set max_file_size and spill to new file if needed - /// 2. check the disk usage + /// todo check the disk usage if (unlikely(blocks.empty())) return; - RUNTIME_CHECK_MSG(current_spilled_file_index >= 0, "{}: spill after the spill handler meeting error or finished.", spiller->config.spill_id); + RUNTIME_CHECK_MSG(current_spilled_file_index != INVALID_CURRENT_SPILLED_FILE_INDEX, "{}: spill after the spill handler meeting error or finished.", spiller->config.spill_id); try { Stopwatch watch; - RUNTIME_CHECK_MSG(spiller->spill_finished == false, "{}: spill after the spiller is finished.", spiller->config.spill_id); + RUNTIME_CHECK_MSG(spiller->isSpillFinished() == false, "{}: spill after the spiller is finished.", spiller->config.spill_id); auto block_size = blocks.size(); LOG_INFO(spiller->logger, "Spilling {} blocks data into temporary file {}", block_size, current_spill_file_name); + + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_spill); + size_t total_rows = 0; - if (unlikely(writer == nullptr)) - { - writer = std::make_unique(spiller->config.file_provider, current_spill_file_name, blocks[0].cloneEmpty(), spiller->spill_version); - } + size_t rows_in_file = 0; + size_t bytes_in_file = 0; for (const auto & block : blocks) { - total_rows += block.rows(); + if (unlikely(!block || block.rows() == 0)) + continue; + if (unlikely(writer == nullptr)) + { + std::tie(rows_in_file, bytes_in_file) = setUpNextSpilledFile(); + } + auto rows = block.rows(); + total_rows += rows; + rows_in_file += rows; + bytes_in_file += block.estimateBytesForSpill(); writer->write(block); + if (spiller->enable_append_write && isSpilledFileFull(rows_in_file, bytes_in_file)) + { + spilled_files[current_spilled_file_index]->updateSpillDetails(writer->finishWrite()); + spilled_files[current_spilled_file_index]->markFull(); + writer = nullptr; + } } double cost = watch.elapsedSeconds(); time_cost += cost; LOG_INFO(spiller->logger, "Spilled {} rows from {} blocks into temporary file, time cost: {:.3f} sec.", total_rows, block_size, cost); - RUNTIME_CHECK_MSG(current_spilled_file_index >= 0, "{}: spill after the spill handler is finished.", spiller->config.spill_id); - RUNTIME_CHECK_MSG(spiller->spill_finished == false, "{}: spill after the spiller is finished.", spiller->config.spill_id); + RUNTIME_CHECK_MSG(current_spilled_file_index != INVALID_CURRENT_SPILLED_FILE_INDEX, "{}: spill after the spill handler is finished.", spiller->config.spill_id); + RUNTIME_CHECK_MSG(spiller->isSpillFinished() == false, "{}: spill after the spiller is finished.", spiller->config.spill_id); return; } catch (...) @@ -88,22 +123,32 @@ void SpillHandler::spillBlocks(const Blocks & blocks) /// mark the spill handler invalid writer = nullptr; spilled_files.clear(); - current_spilled_file_index = -1; + current_spilled_file_index = INVALID_CURRENT_SPILLED_FILE_INDEX; throw Exception(fmt::format("Failed to spill blocks to disk for file {}, error: {}", current_spill_file_name, getCurrentExceptionMessage(false, false))); } } void SpillHandler::finish() { - if (likely(writer != nullptr)) + /// it is guaranteed that once current_spilled_file_index >= 0, at least one block is written to spilled_files[current_spilled_file_index] + if (likely(current_spilled_file_index >= 0)) { - auto spill_details = writer->finishWrite(); - spilled_files[current_spilled_file_index]->updateSpillDetails(spill_details); + if (writer != nullptr) + { + spilled_files[current_spilled_file_index]->updateSpillDetails(writer->finishWrite()); + auto current_spill_details = spilled_files[current_spilled_file_index]->getSpillDetails(); + if (!spiller->enable_append_write || isSpilledFileFull(current_spill_details.rows, current_spill_details.data_bytes_uncompressed)) + { + /// always mark full if enable_append_write is false here, since if enable_append_write is false, all the files are treated as full file + spilled_files[current_spilled_file_index]->markFull(); + } + } auto gen_spill_detail_info = [&]() { SpillDetails details{0, 0, 0}; for (Int64 i = 0; i <= current_spilled_file_index; i++) details.merge(spilled_files[i]->getSpillDetails()); + details.subtract(prev_spill_details); return fmt::format("Commit spilled data, details: spill {} rows in {:.3f} sec," " {:.3f} MiB uncompressed, {:.3f} MiB compressed, {:.3f} uncompressed bytes per row, {:.3f} compressed bytes per row, " "compression rate: {:.3f} ({:.3f} rows/sec., {:.3f} MiB/sec. uncompressed, {:.3f} MiB/sec. compressed)", @@ -121,11 +166,16 @@ void SpillHandler::finish() LOG_DEBUG(spiller->logger, gen_spill_detail_info()); std::unique_lock lock(spiller->spilled_files[partition_id]->spilled_files_mutex); for (auto & spilled_file : spilled_files) - spiller->spilled_files[partition_id]->spilled_files.push_back(std::move(spilled_file)); + { + if (!spilled_file->isFull()) + spiller->spilled_files[partition_id]->mutable_spilled_files.push_back(std::move(spilled_file)); + else + spiller->spilled_files[partition_id]->immutable_spilled_files.push_back(std::move(spilled_file)); + } spilled_files.clear(); spiller->has_spilled_data = true; - current_spilled_file_index = -1; - RUNTIME_CHECK_MSG(spiller->spill_finished == false, "{}: spill after the spiller is finished.", spiller->config.spill_id); + current_spilled_file_index = INVALID_CURRENT_SPILLED_FILE_INDEX; + RUNTIME_CHECK_MSG(spiller->isSpillFinished() == false, "{}: spill after the spiller is finished.", spiller->config.spill_id); } } diff --git a/dbms/src/Core/SpillHandler.h b/dbms/src/Core/SpillHandler.h index 73238182ba4..6de16ba8d89 100644 --- a/dbms/src/Core/SpillHandler.h +++ b/dbms/src/Core/SpillHandler.h @@ -22,7 +22,6 @@ namespace DB { - class IBlockOutputStream; /// SpillHandler is used to spill blocks, currently hidden behind `Spiller::spillBlocks` @@ -32,15 +31,17 @@ class IBlockOutputStream; class SpillHandler { public: - SpillHandler(Spiller * spiller_, std::unique_ptr && spilled_file, size_t partition_id_); + SpillHandler(Spiller * spiller_, size_t partition_id_); void spillBlocks(const Blocks & blocks); void finish(); private: + std::pair setUpNextSpilledFile(); + bool isSpilledFileFull(UInt64 spilled_rows, UInt64 spilled_bytes); class SpillWriter { public: - SpillWriter(const FileProviderPtr & file_provider, const String & file_name, const Block & header, size_t spill_version); + SpillWriter(const FileProviderPtr & file_provider, const String & file_name, bool append_write, const Block & header, size_t spill_version); SpillDetails finishWrite(); void write(const Block & block); @@ -57,6 +58,8 @@ class SpillHandler String current_spill_file_name; std::unique_ptr writer; double time_cost = 0; + SpillDetails prev_spill_details; + static const Int64 INVALID_CURRENT_SPILLED_FILE_INDEX = -10; }; } // namespace DB diff --git a/dbms/src/Core/Spiller.cpp b/dbms/src/Core/Spiller.cpp index 9ca5bfef44c..1ec45140600 100644 --- a/dbms/src/Core/Spiller.cpp +++ b/dbms/src/Core/Spiller.cpp @@ -47,7 +47,7 @@ SpilledFile::~SpilledFile() } } -Spiller::Spiller(const SpillConfig & config_, bool is_input_sorted_, size_t partition_num_, const Block & input_schema_, const LoggerPtr & logger_, Int64 spill_version_, bool release_spilled_file_on_restore_) +Spiller::Spiller(const SpillConfig & config_, bool is_input_sorted_, UInt64 partition_num_, const Block & input_schema_, const LoggerPtr & logger_, Int64 spill_version_, bool release_spilled_file_on_restore_) : config(config_) , is_input_sorted(is_input_sorted_) , partition_num(partition_num_) @@ -56,8 +56,10 @@ Spiller::Spiller(const SpillConfig & config_, bool is_input_sorted_, size_t part , spill_version(spill_version_) , release_spilled_file_on_restore(release_spilled_file_on_restore_) { - for (size_t i = 0; i < partition_num; ++i) + for (UInt64 i = 0; i < partition_num; ++i) spilled_files.push_back(std::make_unique()); + /// if is_input_sorted is true, can not append write because it will break the sort property + enable_append_write = !is_input_sorted && (config.max_spilled_bytes_per_file != 0 || config.max_spilled_rows_per_file != 0); Poco::File spill_dir(config.spill_dir); if (!spill_dir.exists()) { @@ -70,14 +72,39 @@ Spiller::Spiller(const SpillConfig & config_, bool is_input_sorted_, size_t part } } -void Spiller::spillBlocksUsingBlockInputStream(IBlockInputStream & block_in, size_t partition_id, const std::function & is_cancelled) +namespace +{ +/// bytes_threshold == 0 means no limit, and will read all data +std::vector readDataForSpill(IBlockInputStream & from, size_t bytes_threshold, const std::function & is_cancelled) +{ + std::vector ret; + size_t current_return_size = 0; + + while (Block block = from.read()) + { + if unlikely (is_cancelled()) + return {}; + ret.push_back(std::move(block)); + current_return_size += ret.back().estimateBytesForSpill(); + if (bytes_threshold > 0 && current_return_size >= bytes_threshold) + break; + } + + if unlikely (is_cancelled()) + return {}; + + return ret; +} +} // namespace + +void Spiller::spillBlocksUsingBlockInputStream(IBlockInputStream & block_in, UInt64 partition_id, const std::function & is_cancelled) { auto spill_handler = createSpillHandler(partition_id); block_in.readPrefix(); Blocks spill_blocks; while (true) { - spill_blocks = readData(block_in, config.max_spilled_size_per_spill, is_cancelled); + spill_blocks = readDataForSpill(block_in, config.max_cached_data_bytes_in_spiller, is_cancelled); if (spill_blocks.empty()) break; spill_handler.spillBlocks(spill_blocks); @@ -89,42 +116,79 @@ void Spiller::spillBlocksUsingBlockInputStream(IBlockInputStream & block_in, siz spill_handler.finish(); } -SpillHandler Spiller::createSpillHandler(size_t partition_id) +std::pair, bool> Spiller::getOrCreateSpilledFile(UInt64 partition_id) +{ + RUNTIME_CHECK_MSG(isSpillFinished() == false, "{}: spill after the spiller is finished.", config.spill_id); + std::unique_ptr spilled_file = nullptr; + if (enable_append_write) + { + auto & partition_spilled_files = spilled_files[partition_id]; + std::lock_guard partition_lock(partition_spilled_files->spilled_files_mutex); + if (!partition_spilled_files->mutable_spilled_files.empty()) + { + spilled_file = std::move(partition_spilled_files->mutable_spilled_files.back()); + partition_spilled_files->mutable_spilled_files.pop_back(); + } + } + if (spilled_file == nullptr) + { + auto spilled_file_name = nextSpillFileName(partition_id); + spilled_file = std::make_unique(spilled_file_name, config.file_provider); + RUNTIME_CHECK_MSG(!spilled_file->exists(), "Duplicated spilled file: {}, should not happens", spilled_file_name); + return std::make_pair(std::move(spilled_file), false); + } + else + { + RUNTIME_CHECK_MSG(spilled_file->exists(), "Missed spilled file: {}, should not happens", spilled_file->path()); + return std::make_pair(std::move(spilled_file), true); + } +} + +SpillHandler Spiller::createSpillHandler(UInt64 partition_id) { RUNTIME_CHECK_MSG(partition_id < partition_num, "{}: partition id {} exceeds partition num {}.", config.spill_id, partition_id, partition_num); - RUNTIME_CHECK_MSG(spill_finished == false, "{}: spill after the spiller is finished.", config.spill_id); - auto spilled_file_name = nextSpillFileName(partition_id); - auto spilled_file = std::make_unique(spilled_file_name, config.file_provider); - RUNTIME_CHECK_MSG(!spilled_file->exists(), "Duplicated spilled file: {}, should not happens", spilled_file_name); - return SpillHandler(this, std::move(spilled_file), partition_id); + RUNTIME_CHECK_MSG(isSpillFinished() == false, "{}: spill after the spiller is finished.", config.spill_id); + return SpillHandler(this, partition_id); } -void Spiller::spillBlocks(const Blocks & blocks, size_t partition_id) +void Spiller::spillBlocks(const Blocks & blocks, UInt64 partition_id) { + if (blocks.empty()) + return; auto spiller_handler = createSpillHandler(partition_id); spiller_handler.spillBlocks(blocks); spiller_handler.finish(); } -BlockInputStreams Spiller::restoreBlocks(size_t partition_id, size_t max_stream_size) +BlockInputStreams Spiller::restoreBlocks(UInt64 partition_id, UInt64 max_stream_size, bool append_dummy_read_stream) { RUNTIME_CHECK_MSG(partition_id < partition_num, "{}: partition id {} exceeds partition num {}.", config.spill_id, partition_id, partition_num); - RUNTIME_CHECK_MSG(spill_finished, "{}: restore before the spiller is finished.", config.spill_id); + RUNTIME_CHECK_MSG(isSpillFinished(), "{}: restore before the spiller is finished.", config.spill_id); std::lock_guard partition_lock(spilled_files[partition_id]->spilled_files_mutex); + RUNTIME_CHECK_MSG(spilled_files[partition_id]->mutable_spilled_files.empty(), "{}: the mutable spilled files must be empty when restore.", config.spill_id); + auto & partition_spilled_files = spilled_files[partition_id]->immutable_spilled_files; + if (max_stream_size == 0) - max_stream_size = spilled_files[partition_id]->spilled_files.size(); - if (is_input_sorted && spilled_files[partition_id]->spilled_files.size() > max_stream_size) - LOG_WARNING(logger, "sorted spilled data restore does not take max_stream_size into account"); + max_stream_size = partition_spilled_files.size(); + if (is_input_sorted && partition_spilled_files.size() > max_stream_size) + { + LOG_WARNING(logger, "Sorted spilled data restore does not take max_stream_size into account"); + } + SpillDetails details{0, 0, 0}; BlockInputStreams ret; + UInt64 spill_file_read_stream_num = is_input_sorted ? partition_spilled_files.size() : std::min(max_stream_size, partition_spilled_files.size()); + std::vector restore_stream_read_rows; + if (is_input_sorted) { - for (auto & file : spilled_files[partition_id]->spilled_files) + for (auto & file : partition_spilled_files) { RUNTIME_CHECK_MSG(file->exists(), "Spill file {} does not exists", file->path()); details.merge(file->getSpillDetails()); std::vector file_infos; file_infos.emplace_back(file->path()); + restore_stream_read_rows.push_back(file->getSpillDetails().rows); if (release_spilled_file_on_restore) file_infos.back().file = std::move(file); ret.push_back(std::make_shared(std::move(file_infos), input_schema, config.file_provider, spill_version)); @@ -132,20 +196,20 @@ BlockInputStreams Spiller::restoreBlocks(size_t partition_id, size_t max_stream_ } else { - // size_t return_stream_num = std::min(max_stream_size, spilled_files[partition_id]->spilled_files.size()); - size_t return_stream_num = max_stream_size; - std::vector> file_infos(return_stream_num); + std::vector> file_infos(spill_file_read_stream_num); + restore_stream_read_rows.resize(spill_file_read_stream_num, 0); // todo balance based on SpilledRows - for (size_t i = 0; i < spilled_files[partition_id]->spilled_files.size(); ++i) + for (size_t i = 0; i < partition_spilled_files.size(); ++i) { - auto & file = spilled_files[partition_id]->spilled_files[i]; + auto & file = partition_spilled_files[i]; RUNTIME_CHECK_MSG(file->exists(), "Spill file {} does not exists", file->path()); details.merge(file->getSpillDetails()); - file_infos[i % return_stream_num].push_back(file->path()); + file_infos[i % spill_file_read_stream_num].push_back(file->path()); + restore_stream_read_rows[i % spill_file_read_stream_num] += file->getSpillDetails().rows; if (release_spilled_file_on_restore) - file_infos[i % return_stream_num].back().file = std::move(file); + file_infos[i % spill_file_read_stream_num].back().file = std::move(file); } - for (size_t i = 0; i < return_stream_num; ++i) + for (UInt64 i = 0; i < spill_file_read_stream_num; ++i) { if (likely(i < file_infos.size() && !file_infos[i].empty())) ret.push_back(std::make_shared(std::move(file_infos[i]), input_schema, config.file_provider, spill_version)); @@ -153,28 +217,48 @@ BlockInputStreams Spiller::restoreBlocks(size_t partition_id, size_t max_stream_ ret.push_back(std::make_shared(input_schema)); } } - LOG_INFO(logger, "Will restore {} rows from {} files of size {:.3f} MiB compressed, {:.3f} MiB uncompressed using {} streams.", details.rows, spilled_files[partition_id]->spilled_files.size(), (details.data_bytes_compressed / 1048576.0), (details.data_bytes_uncompressed / 1048576.0), ret.size()); + for (size_t i = 0; i < spill_file_read_stream_num; ++i) + LOG_TRACE(logger, "Restore {} rows from {}-th stream", restore_stream_read_rows[i], i); + LOG_INFO(logger, "Will restore {} rows from {} files of size {:.3f} MiB compressed, {:.3f} MiB uncompressed using {} streams.", details.rows, spilled_files[partition_id]->immutable_spilled_files.size(), (details.data_bytes_compressed / 1048576.0), (details.data_bytes_uncompressed / 1048576.0), ret.size()); if (release_spilled_file_on_restore) { /// clear the spilled_files so we can safely assume that the element in spilled_files is always not nullptr - spilled_files[partition_id]->spilled_files.clear(); + partition_spilled_files.clear(); + } + if (ret.empty()) + ret.push_back(std::make_shared(input_schema)); + if (append_dummy_read_stream) + { + /// if append_dummy_read_stream = true, make sure at least `max_stream_size`'s streams are returned, will be used in join + for (UInt64 i = ret.size(); i < max_stream_size; ++i) + ret.push_back(std::make_shared(input_schema)); } -// if (ret.empty()) -// ret.push_back(std::make_shared(input_schema)); return ret; } -size_t Spiller::spilledRows(size_t partition_id) +void Spiller::finishSpill() +{ + std::lock_guard lock(spill_finished_mutex); + spill_finished = true; + for (auto & partition_spilled_files : spilled_files) + { + partition_spilled_files->makeAllSpilledFilesImmutable(); + } +} + +UInt64 Spiller::spilledRows(UInt64 partition_id) { RUNTIME_CHECK_MSG(partition_id < partition_num, "{}: partition id {} exceeds partition num {}.", config.spill_id, partition_id, partition_num); - RUNTIME_CHECK_MSG(spill_finished, "{}: spilledBlockDataSize must be called when the spiller is finished.", config.spill_id); - size_t ret = 0; - for (auto & file : spilled_files[partition_id]->spilled_files) + RUNTIME_CHECK_MSG(isSpillFinished(), "{}: spilledBlockDataSize must be called when the spiller is finished.", config.spill_id); + UInt64 ret = 0; + + std::lock_guard partition_lock(spilled_files[partition_id]->spilled_files_mutex); + for (auto & file : spilled_files[partition_id]->immutable_spilled_files) ret += file->getSpilledRows(); return ret; } -String Spiller::nextSpillFileName(size_t partition_id) +String Spiller::nextSpillFileName(UInt64 partition_id) { Int64 index = tmp_file_index.fetch_add(1); return fmt::format("{}tmp_{}_partition_{}_{}", config.spill_dir, config.spill_id_as_file_name_prefix, partition_id, index); diff --git a/dbms/src/Core/Spiller.h b/dbms/src/Core/Spiller.h index e27e5ec96e1..1f14bb50fec 100644 --- a/dbms/src/Core/Spiller.h +++ b/dbms/src/Core/Spiller.h @@ -20,7 +20,6 @@ namespace DB { - class IBlockInputStream; using BlockInputStreamPtr = std::shared_ptr; using BlockInputStreams = std::vector; @@ -28,11 +27,11 @@ class SpillHandler; struct SpillDetails { - size_t rows; - size_t data_bytes_uncompressed; - size_t data_bytes_compressed; + UInt64 rows = 0; + UInt64 data_bytes_uncompressed = 0; + UInt64 data_bytes_compressed = 0; SpillDetails() = default; - SpillDetails(size_t rows_, size_t data_bytes_uncompressed_, size_t data_bytes_compressed_) + SpillDetails(UInt64 rows_, UInt64 data_bytes_uncompressed_, UInt64 data_bytes_compressed_) : rows(rows_) , data_bytes_uncompressed(data_bytes_uncompressed_) , data_bytes_compressed(data_bytes_compressed_) @@ -43,54 +42,86 @@ struct SpillDetails data_bytes_uncompressed += other.data_bytes_uncompressed; data_bytes_compressed += other.data_bytes_compressed; } + void subtract(const SpillDetails & other) + { + assert(rows >= other.rows); + rows -= other.rows; + assert(data_bytes_uncompressed >= other.data_bytes_uncompressed); + data_bytes_uncompressed -= other.data_bytes_uncompressed; + assert(data_bytes_compressed >= other.data_bytes_compressed); + data_bytes_compressed -= other.data_bytes_compressed; + } }; class SpilledFile : public Poco::File { public: SpilledFile(const String & file_name, const FileProviderPtr & file_provider_); ~SpilledFile() override; - size_t getSpilledRows() const { return details.rows; } + UInt64 getSpilledRows() const { return details.rows; } const SpillDetails & getSpillDetails() const { return details; } void updateSpillDetails(const SpillDetails & other_details) { details.merge(other_details); } + void markFull() { is_full = true; } + bool isFull() const { return is_full; } private: SpillDetails details; + bool is_full = false; FileProviderPtr file_provider; }; struct SpilledFiles { std::mutex spilled_files_mutex; - std::vector> spilled_files; + /// immutable spilled files mean the file can not be append + std::vector> immutable_spilled_files; + /// mutable spilled files means the next spill can append to the files + std::vector> mutable_spilled_files; + void makeAllSpilledFilesImmutable() + { + std::lock_guard lock(spilled_files_mutex); + for (auto & mutable_file : mutable_spilled_files) + { + mutable_file->markFull(); + immutable_spilled_files.push_back(std::move(mutable_file)); + } + mutable_spilled_files.clear(); + } }; class Spiller { public: - Spiller(const SpillConfig & config, bool is_input_sorted, size_t partition_num, const Block & input_schema, const LoggerPtr & logger, Int64 spill_version = 1, bool release_spilled_file_on_restore = true); - void spillBlocks(const Blocks & blocks, size_t partition_id); + Spiller(const SpillConfig & config, bool is_input_sorted, UInt64 partition_num, const Block & input_schema, const LoggerPtr & logger, Int64 spill_version = 1, bool release_spilled_file_on_restore = true); + void spillBlocks(const Blocks & blocks, UInt64 partition_id); /// spill blocks by reading from BlockInputStream, this is more memory friendly compared to spillBlocks - void spillBlocksUsingBlockInputStream(IBlockInputStream & block_in, size_t partition_id, const std::function & is_cancelled); + void spillBlocksUsingBlockInputStream(IBlockInputStream & block_in, UInt64 partition_id, const std::function & is_cancelled); /// max_stream_size == 0 means the spiller choose the stream size automatically - BlockInputStreams restoreBlocks(size_t partition_id, size_t max_stream_size = 0); - size_t spilledRows(size_t partition_id); - void finishSpill() { spill_finished = true; }; + BlockInputStreams restoreBlocks(UInt64 partition_id, UInt64 max_stream_size = 0, bool append_dummy_read_stream = false); + UInt64 spilledRows(UInt64 partition_id); + void finishSpill(); bool hasSpilledData() const { return has_spilled_data; }; /// only for test now bool releaseSpilledFileOnRestore() const { return release_spilled_file_on_restore; } private: friend class SpillHandler; - String nextSpillFileName(size_t partition_id); - SpillHandler createSpillHandler(size_t partition_id); + String nextSpillFileName(UInt64 partition_id); + SpillHandler createSpillHandler(UInt64 partition_id); + std::pair, bool> getOrCreateSpilledFile(UInt64 partition_id); + bool isSpillFinished() + { + std::lock_guard lock(spill_finished_mutex); + return spill_finished; + } - const SpillConfig config; + SpillConfig config; const bool is_input_sorted; - const size_t partition_num; + const UInt64 partition_num; /// todo remove input_schema if spiller does not rely on BlockInputStream const Block input_schema; const LoggerPtr logger; - std::atomic spill_finished{false}; + std::mutex spill_finished_mutex; + bool spill_finished = false; std::atomic has_spilled_data{false}; static std::atomic tmp_file_index; std::vector> spilled_files; @@ -100,6 +131,7 @@ class Spiller /// file can be released on restore since it is only read once, but in the future if SharedScan(shared cte) need spill, /// the data may be restored multiple times and release_spilled_file_on_restore need to be set to false. const bool release_spilled_file_on_restore; + bool enable_append_write = false; }; using SpillerPtr = std::unique_ptr; diff --git a/dbms/src/Core/tests/gtest_block.cpp b/dbms/src/Core/tests/gtest_block.cpp new file mode 100644 index 00000000000..940bb69d8eb --- /dev/null +++ b/dbms/src/Core/tests/gtest_block.cpp @@ -0,0 +1,111 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ +namespace tests +{ +class BlockTest : public testing::Test +{ +}; + +TEST_F(BlockTest, TestEstimateBytesForSpillNormalColumn) +try +{ + ColumnsWithTypeAndName columns; + std::vector all_types{"Int64", "Int32", "UInt64", "UInt32", "Decimal(5,2)", "Decimal(10,2)", "Decimal(20,2)", "Decimal(40,2)", "MyDate", "MyDateTime", "String", "FixedString(10)"}; + for (auto & type_name : all_types) + { + DataTypePtr types[2]; + types[0] = DataTypeFactory::instance().get(type_name); + types[1] = makeNullable(types[0]); + for (auto & type : types) + { + auto column = type->createColumn(); + for (size_t i = 0; i < 10; i++) + column->insertDefault(); + columns.emplace_back(std::move(column), type); + } + } + Block block(columns); + ASSERT_TRUE(block.bytes() == block.estimateBytesForSpill()); +} +CATCH + +TEST_F(BlockTest, TestEstimateBytesForSpillColumnAggregateFunction) +try +{ + DB::registerAggregateFunctions(); + ArenaPtr pool = std::make_shared(); + pool->alloc(1024 * 1024); + /// case 1, agg function not allocate memory in arena + std::vector types{"Int64", "String", "Nullable(Int64)", "Nullable(String)"}; + std::vector data_size{16, ColumnString::APPROX_STRING_SIZE * 2, 24, ColumnString::APPROX_STRING_SIZE * 2 + 8}; + for (size_t i = 0; i < types.size(); ++i) + { + auto agg_data_type = DataTypeFactory::instance().get(fmt::format("AggregateFunction(Min, {})", types[i])); + auto agg_column = agg_data_type->createColumn(); + auto agg_func = typeid_cast(agg_column.get())->getAggregateFunction(); + auto size_of_aggregate_states = agg_func->sizeOfData(); + auto align_aggregate_states = agg_func->alignOfData(); + typeid_cast(agg_column.get())->addArena(pool); + for (size_t j = 0; j < 10; ++j) + { + auto * aggregate_data = pool->alignedAlloc(size_of_aggregate_states, align_aggregate_states); + agg_func->create(aggregate_data); + agg_column->insertData(reinterpret_cast(&aggregate_data), data_size[i]); + } + ColumnsWithTypeAndName columns; + columns.emplace_back(std::move(agg_column), agg_data_type); + Block block(columns); + ASSERT_NE(block.estimateBytesForSpill(), block.bytes()); + ASSERT_EQ(block.estimateBytesForSpill(), data_size[i] * 10); + } + /// case 2, agg function allocate memory in arena + for (size_t i = 0; i < types.size(); ++i) + { + auto agg_data_type = DataTypeFactory::instance().get(fmt::format("AggregateFunction(uniqExact, {})", types[i])); + auto agg_column = agg_data_type->createColumn(); + auto agg_func = typeid_cast(agg_column.get())->getAggregateFunction(); + typeid_cast(agg_column.get())->addArena(pool); + auto size_of_aggregate_states = agg_func->sizeOfData(); + auto align_aggregate_states = agg_func->alignOfData(); + for (size_t j = 0; j < 10; ++j) + { + auto * aggregate_data = pool->alignedAlloc(size_of_aggregate_states, align_aggregate_states); + agg_func->create(aggregate_data); + agg_column->insertData(reinterpret_cast(&aggregate_data), data_size[i]); + } + ColumnsWithTypeAndName columns; + columns.emplace_back(std::move(agg_column), agg_data_type); + Block block(columns); + ASSERT_EQ(block.estimateBytesForSpill(), block.bytes()); + } +} +CATCH + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Core/tests/gtest_spiller.cpp b/dbms/src/Core/tests/gtest_spiller.cpp index 18ac0cd0545..7f0ba13434c 100644 --- a/dbms/src/Core/tests/gtest_spiller.cpp +++ b/dbms/src/Core/tests/gtest_spiller.cpp @@ -42,7 +42,7 @@ class SpillerTest : public testing::Test spiller_test_header = Block(names_and_types); auto key_manager = std::make_shared(false); auto file_provider = std::make_shared(key_manager, false); - spill_config_ptr = std::make_shared(spill_dir, "test", 1024ULL * 1024 * 1024, file_provider); + spill_config_ptr = std::make_shared(spill_dir, "test", 1024ULL * 1024 * 1024, 0, 0, file_provider); } void TearDown() override { @@ -85,9 +85,9 @@ class SpillerTest : public testing::Test } return ret; } - static void verifyRestoreBlocks(Spiller & spiller, size_t restore_partition_id, size_t restore_max_stream_size, size_t expected_stream_size, const Blocks & expected_blocks) + static void verifyRestoreBlocks(Spiller & spiller, size_t restore_partition_id, size_t restore_max_stream_size, size_t expected_stream_size, const Blocks & expected_blocks, bool append_dummy_read_stream = false) { - auto block_streams = spiller.restoreBlocks(restore_partition_id, restore_max_stream_size); + auto block_streams = spiller.restoreBlocks(restore_partition_id, restore_max_stream_size, append_dummy_read_stream); if (expected_stream_size > 0) { GTEST_ASSERT_EQ(block_streams.size(), expected_stream_size); @@ -138,6 +138,24 @@ catch (Exception & e) GTEST_ASSERT_EQ(e.message(), "Check partition_id < partition_num failed: test: partition id 30 exceeds partition num 20."); } +TEST_F(SpillerTest, ExceptionDuringSpill) +try +{ + FailPointHelper::enableFailPoint("exception_during_spill"); + Spiller spiller(*spill_config_ptr, false, 1, spiller_test_header, logger); + try + { + spiller.spillBlocks(generateBlocks(10), 0); + GTEST_FAIL(); + } + catch (Exception & e) + { + GTEST_ASSERT_EQ(std::strstr(e.message().c_str(), "exception_during_spill") != nullptr, true); + GTEST_ASSERT_EQ(spiller.hasSpilledData(), false); + } +} +CATCH + TEST_F(SpillerTest, SpillAfterFinish) try { @@ -148,7 +166,7 @@ try } catch (Exception & e) { - GTEST_ASSERT_EQ(e.message(), "Check spill_finished == false failed: test: spill after the spiller is finished."); + GTEST_ASSERT_EQ(e.message(), "Check isSpillFinished() == false failed: test: spill after the spiller is finished."); } TEST_F(SpillerTest, InvalidPartitionIdInRestore) @@ -173,7 +191,7 @@ try } catch (Exception & e) { - GTEST_ASSERT_EQ(e.message(), "Check spill_finished failed: test: restore before the spiller is finished."); + GTEST_ASSERT_EQ(e.message(), "Check isSpillFinished() failed: test: restore before the spiller is finished."); } TEST_F(SpillerTest, SpilledBlockDataSize) @@ -226,7 +244,7 @@ try std::vector> spillers; spillers.push_back(std::make_unique(*spill_config_ptr, false, 2, spiller_test_header, logger)); auto spiller_config_with_small_max_spill_size = *spill_config_ptr; - spiller_config_with_small_max_spill_size.max_spilled_size_per_spill = spill_config_ptr->max_spilled_size_per_spill / 1000; + spiller_config_with_small_max_spill_size.max_cached_data_bytes_in_spiller = spill_config_ptr->max_cached_data_bytes_in_spiller / 1000; spillers.push_back(std::make_unique(spiller_config_with_small_max_spill_size, false, 2, spiller_test_header, logger)); for (auto & spiller : spillers) @@ -263,7 +281,7 @@ try std::vector> spillers; spillers.push_back(std::make_unique(*spill_config_ptr, false, 1, spiller_test_header, logger, 1, false)); auto new_spill_path = fmt::format("{}{}_{}", spill_config_ptr->spill_dir, "release_file_on_restore_test", rand()); - SpillConfig new_spill_config(new_spill_path, spill_config_ptr->spill_id, spill_config_ptr->max_spilled_size_per_spill, spill_config_ptr->file_provider); + SpillConfig new_spill_config(new_spill_path, spill_config_ptr->spill_id, spill_config_ptr->max_cached_data_bytes_in_spiller, 0, 0, spill_config_ptr->file_provider); Poco::File new_spiller_dir(new_spill_config.spill_dir); /// remove spiller dir if exists if (new_spiller_dir.exists()) @@ -321,7 +339,7 @@ try std::vector> spillers; spillers.push_back(std::make_unique(*spill_config_ptr, true, 2, spiller_test_header, logger)); auto spiller_config_with_small_max_spill_size = *spill_config_ptr; - spiller_config_with_small_max_spill_size.max_spilled_size_per_spill = spill_config_ptr->max_spilled_size_per_spill / 1000; + spiller_config_with_small_max_spill_size.max_cached_data_bytes_in_spiller = spill_config_ptr->max_cached_data_bytes_in_spiller / 1000; spillers.push_back(std::make_unique(spiller_config_with_small_max_spill_size, true, 2, spiller_test_header, logger)); for (auto & spiller : spillers) @@ -353,6 +371,96 @@ try } CATCH +TEST_F(SpillerTest, RestoreWithAppendDummyReadStream) +try +{ + auto spiller_config_for_append_write = *spill_config_ptr; + + /// append_dummy_read = false + { + spiller_config_for_append_write.max_spilled_rows_per_file = 1000000000; + Spiller spiller(spiller_config_for_append_write, false, 1, spiller_test_header, logger); + Blocks all_blocks; + auto blocks = generateBlocks(20); + spiller.spillBlocks(blocks, 0); + spiller.spillBlocks(blocks, 0); + spiller.finishSpill(); + all_blocks.insert(all_blocks.end(), blocks.begin(), blocks.end()); + all_blocks.insert(all_blocks.end(), blocks.begin(), blocks.end()); + verifyRestoreBlocks(spiller, 0, 20, 1, all_blocks, false); + } + /// append_dummy_read = true + { + spiller_config_for_append_write.max_spilled_rows_per_file = 1000000000; + Spiller spiller(spiller_config_for_append_write, false, 1, spiller_test_header, logger); + Blocks all_blocks; + auto blocks = generateBlocks(20); + spiller.spillBlocks(blocks, 0); + spiller.spillBlocks(blocks, 0); + spiller.finishSpill(); + all_blocks.insert(all_blocks.end(), blocks.begin(), blocks.end()); + all_blocks.insert(all_blocks.end(), blocks.begin(), blocks.end()); + verifyRestoreBlocks(spiller, 0, 20, 20, all_blocks, true); + } +} +CATCH + +TEST_F(SpillerTest, AppendWrite) +try +{ + auto spiller_config_for_append_write = *spill_config_ptr; + + /// case 1, multiple spill write to the same file + { + spiller_config_for_append_write.max_spilled_rows_per_file = 1000000000; + Spiller spiller(spiller_config_for_append_write, false, 1, spiller_test_header, logger); + Blocks all_blocks; + auto blocks = generateBlocks(50); + spiller.spillBlocks(blocks, 0); + spiller.spillBlocks(blocks, 0); + spiller.finishSpill(); + all_blocks.insert(all_blocks.end(), blocks.begin(), blocks.end()); + all_blocks.insert(all_blocks.end(), blocks.begin(), blocks.end()); + verifyRestoreBlocks(spiller, 0, 2, 1, all_blocks); + } + /// case 2, one spill write to multiple files + { + spiller_config_for_append_write.max_spilled_rows_per_file = 1; + Spiller spiller(spiller_config_for_append_write, false, 1, spiller_test_header, logger); + auto all_blocks = generateBlocks(20); + spiller.spillBlocks(all_blocks, 0); + spiller.finishSpill(); + verifyRestoreBlocks(spiller, 0, 0, 20, all_blocks); + } + /// case 3, spill empty blocks to existing spilled file + { + spiller_config_for_append_write.max_spilled_rows_per_file = 1000000000; + Spiller spiller(spiller_config_for_append_write, false, 1, spiller_test_header, logger); + Blocks all_blocks = generateBlocks(20); + spiller.spillBlocks(all_blocks, 0); + Blocks empty_blocks; + spiller.spillBlocks(empty_blocks, 0); + BlocksList empty_blocks_list; + BlocksListBlockInputStream block_input_stream(std::move(empty_blocks_list)); + spiller.spillBlocksUsingBlockInputStream(block_input_stream, 0, []() { return false; }); + spiller.finishSpill(); + verifyRestoreBlocks(spiller, 0, 2, 1, all_blocks); + } + /// case 4, spill empty blocks to new spilled file + { + spiller_config_for_append_write.max_spilled_rows_per_file = 1000000000; + Spiller spiller(spiller_config_for_append_write, false, 1, spiller_test_header, logger); + Blocks empty_blocks; + spiller.spillBlocks(empty_blocks, 0); + BlocksList empty_blocks_list; + BlocksListBlockInputStream block_input_stream(std::move(empty_blocks_list)); + spiller.spillBlocksUsingBlockInputStream(block_input_stream, 0, []() { return false; }); + spiller.finishSpill(); + ASSERT_TRUE(spiller.hasSpilledData() == false); + } +} +CATCH + TEST_F(SpillerTest, SpillAndMeetCancelled) try { @@ -362,7 +470,7 @@ try total_block_size += block.bytes(); auto spiller_config_with_small_max_spill_size = *spill_config_ptr; - spiller_config_with_small_max_spill_size.max_spilled_size_per_spill = total_block_size / 50; + spiller_config_with_small_max_spill_size.max_cached_data_bytes_in_spiller = total_block_size / 50; Spiller spiller(spiller_config_with_small_max_spill_size, false, 1, spiller_test_header, logger); BlocksList block_list; block_list.insert(block_list.end(), blocks.begin(), blocks.end()); diff --git a/dbms/src/DataStreams/copyData.cpp b/dbms/src/DataStreams/copyData.cpp index 20bf4a00028..5b18d24b8e7 100644 --- a/dbms/src/DataStreams/copyData.cpp +++ b/dbms/src/DataStreams/copyData.cpp @@ -19,10 +19,8 @@ namespace DB { - namespace { - bool isAtomicSet(std::atomic * val) { return ((val != nullptr) && val->load(std::memory_order_seq_cst)); @@ -77,28 +75,6 @@ void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic readData(IBlockInputStream & from, size_t max_return_size, const std::function & is_cancelled) -{ - std::vector ret; - - size_t current_return_size = 0; - while (Block block = from.read()) - { - if (is_cancelled()) - break; - ret.push_back(block); - current_return_size += ret.back().bytes(); - if (max_return_size > 0 && current_return_size >= max_return_size) - break; - } - - if (is_cancelled()) - return {}; - - return ret; -} - - void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function & is_cancelled) { copyDataImpl(from, to, is_cancelled, doNothing); diff --git a/dbms/src/DataStreams/copyData.h b/dbms/src/DataStreams/copyData.h index f97e12e9742..1d5202c5f72 100644 --- a/dbms/src/DataStreams/copyData.h +++ b/dbms/src/DataStreams/copyData.h @@ -20,7 +20,6 @@ namespace DB { - class IBlockInputStream; class IBlockOutputStream; class Block; @@ -30,9 +29,6 @@ class Block; */ void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic * is_cancelled = nullptr); -/// max_return_size == 0 mean read all data -std::vector readData(IBlockInputStream & from, size_t max_return_size, const std::function & is_cancelled); - void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function & is_cancelled); void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function & is_cancelled, const std::function & progress); diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index cec463041bc..515d20bb45e 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -263,8 +263,8 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline & const Settings & settings = context.getSettingsRef(); size_t max_block_size_for_cross_join = settings.max_block_size; - size_t max_spilled_size_per_spill = settings.max_spilled_size_per_spill; - size_t max_join_bytes = settings.max_join_bytes; + SpillConfig build_spill_config(context.getTemporaryPath(), fmt::format("{}_hash_join_0_build", log->identifier()), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, context.getFileProvider()); + SpillConfig probe_spill_config(context.getTemporaryPath(), fmt::format("{}_hash_join_0_probe", log->identifier()), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, context.getFileProvider()); fiu_do_on(FailPoints::minimum_block_size_for_cross_join, { max_block_size_for_cross_join = 1; }); JoinPtr join_ptr = std::make_shared( @@ -275,6 +275,9 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline & log->identifier(), enableFineGrainedShuffle(fine_grained_shuffle_count), fine_grained_shuffle_count, + settings.max_join_bytes, + build_spill_config, + probe_spill_config, tiflash_join.join_key_collators, probe_filter_column_name, build_filter_column_name, @@ -282,11 +285,7 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline & other_eq_filter_from_in_column_name, other_condition_expr, max_block_size_for_cross_join, - match_helper_name, - max_spilled_size_per_spill, - max_join_bytes, - context.getTemporaryPath(), - context.getFileProvider()); + match_helper_name); recordJoinExecuteInfo(tiflash_join.build_side_index, join_ptr); @@ -400,9 +399,10 @@ void DAGQueryBlockInterpreter::executeAggregation( executeExpression(pipeline, expression_actions_ptr, log, "before aggregation"); Block before_agg_header = pipeline.firstStream()->getHeader(); + const Settings & settings = context.getSettingsRef(); AggregationInterpreterHelper::fillArgColumnNumbers(aggregate_descriptions, before_agg_header); - SpillConfig spill_config(context.getTemporaryPath(), fmt::format("{}_aggregation", log->identifier()), context.getSettingsRef().max_spilled_size_per_spill, context.getFileProvider()); + SpillConfig spill_config(context.getTemporaryPath(), fmt::format("{}_aggregation", log->identifier()), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, context.getFileProvider()); auto params = AggregationInterpreterHelper::buildParams( context, before_agg_header, @@ -430,7 +430,6 @@ void DAGQueryBlockInterpreter::executeAggregation( else if (pipeline.streams.size() > 1) { /// If there are several sources, then we perform parallel aggregation - const Settings & settings = context.getSettingsRef(); BlockInputStreamPtr stream = std::make_shared( pipeline.streams, BlockInputStreams{}, diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp index d2e18a36e00..a6d85e36ed4 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp @@ -124,7 +124,7 @@ void orderStreams( settings.max_block_size, limit, getAverageThreshold(settings.max_bytes_before_external_sort, pipeline.streams.size()), - SpillConfig(context.getTemporaryPath(), fmt::format("{}_sort", log->identifier()), settings.max_spilled_size_per_spill, context.getFileProvider()), + SpillConfig(context.getTemporaryPath(), fmt::format("{}_sort", log->identifier()), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, context.getFileProvider()), log->identifier()); stream->setExtraInfo(String(enableFineGrainedShuffleExtraInfo)); }); @@ -142,7 +142,7 @@ void orderStreams( limit, settings.max_bytes_before_external_sort, // todo use identifier_executor_id as the spill id - SpillConfig(context.getTemporaryPath(), fmt::format("{}_sort", log->identifier()), settings.max_spilled_size_per_spill, context.getFileProvider()), + SpillConfig(context.getTemporaryPath(), fmt::format("{}_sort", log->identifier()), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, context.getFileProvider()), log->identifier()); } } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp b/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp index 6efdf523b6a..9aacf49e967 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp @@ -94,7 +94,7 @@ void PhysicalAggregation::buildBlockInputStreamImpl(DAGPipeline & pipeline, Cont Block before_agg_header = pipeline.firstStream()->getHeader(); AggregationInterpreterHelper::fillArgColumnNumbers(aggregate_descriptions, before_agg_header); - SpillConfig spill_config(context.getTemporaryPath(), fmt::format("{}_aggregation", log->identifier()), context.getSettingsRef().max_spilled_size_per_spill, context.getFileProvider()); + SpillConfig spill_config(context.getTemporaryPath(), fmt::format("{}_aggregation", log->identifier()), context.getSettingsRef().max_cached_data_bytes_in_spiller, context.getSettingsRef().max_spilled_rows_per_file, context.getSettingsRef().max_spilled_bytes_per_file, context.getFileProvider()); auto params = AggregationInterpreterHelper::buildParams( context, before_agg_header, diff --git a/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp b/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp index 8f387482f4e..1fe9c122cf5 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp @@ -117,8 +117,8 @@ PhysicalPlanNodePtr PhysicalJoin::build( const Settings & settings = context.getSettingsRef(); size_t max_block_size_for_cross_join = settings.max_block_size; - size_t max_spilled_size_per_spill = settings.max_spilled_size_per_spill; - size_t max_join_bytes = settings.max_join_bytes; + SpillConfig build_spill_config(context.getTemporaryPath(), fmt::format("{}_hash_join_0_build", log->identifier()), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, context.getFileProvider()); + SpillConfig probe_spill_config(context.getTemporaryPath(), fmt::format("{}_hash_join_0_probe", log->identifier()), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, context.getFileProvider()); fiu_do_on(FailPoints::minimum_block_size_for_cross_join, { max_block_size_for_cross_join = 1; }); JoinPtr join_ptr = std::make_shared( @@ -129,6 +129,9 @@ PhysicalPlanNodePtr PhysicalJoin::build( log->identifier(), fine_grained_shuffle.enable(), fine_grained_shuffle.stream_count, + settings.max_join_bytes, + build_spill_config, + probe_spill_config, tiflash_join.join_key_collators, probe_filter_column_name, build_filter_column_name, @@ -136,11 +139,7 @@ PhysicalPlanNodePtr PhysicalJoin::build( other_eq_filter_from_in_column_name, other_condition_expr, max_block_size_for_cross_join, - match_helper_name, - max_spilled_size_per_spill, - max_join_bytes, - context.getTemporaryPath(), - context.getFileProvider()); + match_helper_name); recordJoinExecuteInfo(dag_context, executor_id, build_plan->execId(), join_ptr); diff --git a/dbms/src/Flash/tests/gtest_spill_aggregation.cpp b/dbms/src/Flash/tests/gtest_spill_aggregation.cpp index 90c7ff6cdfd..9a1744164f0 100644 --- a/dbms/src/Flash/tests/gtest_spill_aggregation.cpp +++ b/dbms/src/Flash/tests/gtest_spill_aggregation.cpp @@ -70,8 +70,8 @@ try ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, 1, true)); /// test parallel aggregation ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams, true)); - /// enable spill and use small max_spilled_size_per_spill - context.context.setSetting("max_spilled_size_per_spill", Field(static_cast(total_data_size / 200))); + /// enable spill and use small max_cached_data_bytes_in_spiller + context.context.setSetting("max_cached_data_bytes_in_spiller", Field(static_cast(total_data_size / 200))); /// test single thread aggregation ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, 1, true)); /// test parallel aggregation diff --git a/dbms/src/Flash/tests/gtest_spill_sort.cpp b/dbms/src/Flash/tests/gtest_spill_sort.cpp index 78b7e75de6b..381105fddc5 100644 --- a/dbms/src/Flash/tests/gtest_spill_sort.cpp +++ b/dbms/src/Flash/tests/gtest_spill_sort.cpp @@ -64,8 +64,8 @@ try // don't use `executeAndAssertColumnsEqual` since it takes too long to run /// todo use ASSERT_COLUMNS_EQ_R once TiFlash support final TopN ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams)); - /// enable spill and use small max_spilled_size_per_spill - context.context.setSetting("max_spilled_size_per_spill", Field(static_cast(total_data_size / 100))); + /// enable spill and use small max_cached_data_bytes_in_spiller + context.context.setSetting("max_cached_data_bytes_in_spiller", Field(static_cast(total_data_size / 100))); ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams)); } CATCH diff --git a/dbms/src/Functions/FunctionsString.cpp b/dbms/src/Functions/FunctionsString.cpp index b8677e2af72..3f55e98b193 100644 --- a/dbms/src/Functions/FunctionsString.cpp +++ b/dbms/src/Functions/FunctionsString.cpp @@ -4388,7 +4388,6 @@ class FunctionSpace : public IFunction // tidb mysql.MaxBlobWidth space max input : space(MAX_BLOB_WIDTH+1) will return NULL static constexpr auto MAX_BLOB_WIDTH = 16777216; - static const auto APPROX_STRING_SIZE = 64; FunctionSpace() = default; @@ -4529,7 +4528,7 @@ class FunctionSpace : public IFunction ColumnString::Offsets & res_offsets) { ColumnString::Offset res_offset = 0; - res_data.reserve(val_num * APPROX_STRING_SIZE); + res_data.reserve(val_num * ColumnString::APPROX_STRING_SIZE); const auto & col_vector_space_num_value = col_vector_space_num->getData(); for (size_t row = 0; row < val_num; ++row) diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index 434143ab21e..4804f03fc7a 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -2175,6 +2175,8 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty if (!subquery_for_set.join) { + SpillConfig build_spill_config(context.getTemporaryPath(), fmt::format("hash_join_0_build"), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, context.getFileProvider()); + SpillConfig probe_spill_config(context.getTemporaryPath(), fmt::format("hash_join_0_probe"), settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, context.getFileProvider()); JoinPtr join = std::make_shared( join_key_names_left, join_key_names_right, @@ -2182,7 +2184,10 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty join_params.strictness, "" /*req_id=*/, false /*enable_fine_grained_shuffle_*/, - 0 /*fine_grained_shuffle_count_*/); + 0 /*fine_grained_shuffle_count_*/, + settings.max_join_bytes, + build_spill_config, + probe_spill_config); Names required_joined_columns(join_key_names_right.begin(), join_key_names_right.end()); for (const auto & name_type : columns_added_by_join) diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 9745b084878..3260a6453ea 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -918,7 +918,7 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre */ bool allow_to_use_two_level_group_by = pipeline.streams.size() > 1 || settings.max_bytes_before_external_group_by != 0; - SpillConfig spill_config(context.getTemporaryPath(), "aggregation", settings.max_spilled_size_per_spill, context.getFileProvider()); + SpillConfig spill_config(context.getTemporaryPath(), "aggregation", settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, context.getFileProvider()); Aggregator::Params params(header, keys, aggregates, allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0), allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0), settings.max_bytes_before_external_group_by, false, spill_config, settings.max_block_size); /// If there are several sources, then we perform parallel aggregation @@ -989,7 +989,7 @@ void InterpreterSelectQuery::executeMergeAggregated(Pipeline & pipeline, bool fi const Settings & settings = context.getSettingsRef(); - Aggregator::Params params(header, keys, aggregates, SpillConfig(context.getTemporaryPath(), "aggregation", settings.max_spilled_size_per_spill, context.getFileProvider()), settings.max_block_size); + Aggregator::Params params(header, keys, aggregates, SpillConfig(context.getTemporaryPath(), "aggregation", settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, context.getFileProvider()), settings.max_block_size); pipeline.firstStream() = std::make_shared( pipeline.streams, @@ -1089,7 +1089,7 @@ void InterpreterSelectQuery::executeOrder(Pipeline & pipeline) settings.max_block_size, limit, settings.max_bytes_before_external_sort, - SpillConfig(context.getTemporaryPath(), "sort", settings.max_spilled_size_per_spill, context.getFileProvider()), + SpillConfig(context.getTemporaryPath(), "sort", settings.max_cached_data_bytes_in_spiller, settings.max_spilled_rows_per_file, settings.max_spilled_bytes_per_file, context.getFileProvider()), /*req_id=*/""); } diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index abe361a5351..5f587f1ddc7 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -103,6 +103,10 @@ ColumnRawPtrs getKeyColumns(const Names & key_names, const Block & block) return key_columns; } +SpillConfig createSpillConfigWithNewSpillId(const SpillConfig & config, const String & new_spill_id) +{ + return SpillConfig(config.spill_dir, new_spill_id, config.max_cached_data_bytes_in_spiller, config.max_spilled_rows_per_file, config.max_spilled_bytes_per_file, config.file_provider); +} } // namespace const std::string Join::match_helper_prefix = "__left-semi-join-match-helper"; @@ -123,6 +127,9 @@ Join::Join( const String & req_id, bool enable_fine_grained_shuffle_, size_t fine_grained_shuffle_count_, + size_t max_join_bytes_, + const SpillConfig & build_spill_config_, + const SpillConfig & probe_spill_config_, const TiDB::TiDBCollators & collators_, const String & left_filter_column_, const String & right_filter_column_, @@ -131,10 +138,6 @@ Join::Join( ExpressionActionsPtr other_condition_ptr_, size_t max_block_size_, const String & match_helper_name, - size_t max_spilled_size_per_spill_, - size_t max_join_bytes_, - String tmp_path_, - FileProviderPtr file_provider_, JoinMemoryInfo join_memory_info_, size_t restore_round_) : restore_round(restore_round_) @@ -156,12 +159,11 @@ Join::Join( , other_condition_ptr(other_condition_ptr_) , original_strictness(strictness) , max_block_size_for_cross_join(max_block_size_) - , max_spilled_size_per_spill(max_spilled_size_per_spill_) , max_join_bytes(max_join_bytes_) + , build_spill_config(build_spill_config_) + , probe_spill_config(probe_spill_config_) , log(Logger::get(req_id)) , join_memory_info(join_memory_info_) - , tmp_path(tmp_path_) - , file_provider(file_provider_) , enable_fine_grained_shuffle(enable_fine_grained_shuffle_) , fine_grained_shuffle_count(fine_grained_shuffle_count_) { @@ -607,6 +609,10 @@ std::shared_ptr Join::createRestoreJoin() log->identifier(), false, 0, + // todo update max_join_bytes based on the restore concurrency + max_join_bytes, + createSpillConfigWithNewSpillId(build_spill_config, fmt::format("{}_hash_join_{}_build", log->identifier(), restore_round + 1)), + createSpillConfigWithNewSpillId(probe_spill_config, fmt::format("{}_hash_join_{}_probe", log->identifier(), restore_round + 1)), collators, left_filter_column, right_filter_column, @@ -615,10 +621,6 @@ std::shared_ptr Join::createRestoreJoin() other_condition_ptr, max_block_size_for_cross_join, match_helper_name, - max_spilled_size_per_spill, - max_join_bytes, - tmp_path, - file_provider, join_memory_info, restore_round + 1); } @@ -630,11 +632,8 @@ void Join::initBuild(const Block & sample_block, size_t build_concurrency_) throw Exception("Logical error: Join has been initialized", ErrorCodes::LOGICAL_ERROR); initialized = true; setBuildConcurrencyAndInitPool(build_concurrency_); - build_spiller = std::make_unique(SpillConfig(tmp_path, fmt::format("{}_hash_join_{}_build", log->identifier(), restore_round), max_spilled_size_per_spill, file_provider), - false, - build_concurrency_, - sample_block, - log); + /// todo fix wrong header(sample_block) + build_spiller = std::make_unique(build_spill_config, false, build_concurrency_, sample_block, log); /// Choose data structure to use for JOIN. initMapImpl(chooseMethod(getKeyColumns(key_names_right, sample_block), key_sizes)); setSampleBlock(sample_block); @@ -644,11 +643,8 @@ void Join::initProbe(const Block & sample_block, size_t probe_concurrency_) { std::unique_lock lock(rwlock); setProbeConcurrency(probe_concurrency_); - probe_spiller = std::make_unique(SpillConfig(tmp_path, fmt::format("{}_hash_join_{}_probe", log->identifier(), restore_round), max_spilled_size_per_spill, file_provider), - false, - build_concurrency, - sample_block, - log); + /// todo fix wrong header(sample_block) + probe_spiller = std::make_unique(probe_spill_config, false, build_concurrency, sample_block, log); } namespace @@ -2617,8 +2613,8 @@ std::tuple Join::getOneRestor auto spilled_partition_index = spilled_partition_indexes.front(); RUNTIME_CHECK_MSG(partitions[spilled_partition_index].spill, "should not restore unspilled partition."); LOG_DEBUG(log, "partition {}, round {}", spilled_partition_index, restore_round); - restore_build_streams = build_spiller->restoreBlocks(spilled_partition_index, probe_concurrency); - restore_probe_streams = probe_spiller->restoreBlocks(spilled_partition_index, probe_concurrency); + restore_build_streams = build_spiller->restoreBlocks(spilled_partition_index, probe_concurrency, true); + restore_probe_streams = probe_spiller->restoreBlocks(spilled_partition_index, probe_concurrency, true); RUNTIME_CHECK_MSG(!restore_build_streams.empty(), "restore streams should not be empty"); auto build_stream = get_back_stream(restore_build_streams); auto probe_stream = get_back_stream(restore_probe_streams); @@ -2657,7 +2653,7 @@ void Join::dispatchProbeBlock(Block & block, std::list void Join::trySpillBuildPartition(size_t partition_index, bool force) { if (partitions[partition_index].spill - && ((force && partitions[partition_index].build_partition.bytes) || partitions[partition_index].build_partition.bytes >= max_spilled_size_per_spill)) + && ((force && partitions[partition_index].build_partition.bytes) || partitions[partition_index].build_partition.bytes >= build_spill_config.max_cached_data_bytes_in_spiller)) { build_spiller->spillBlocks(partitions[partition_index].build_partition.original_blocks, partition_index); releaseBuildPartitionBlocks(partition_index); @@ -2674,7 +2670,7 @@ void Join::trySpillBuildPartitions(bool force) void Join::trySpillProbePartition(size_t partition_index, bool force) { - if (partitions[partition_index].spill && ((force && partitions[partition_index].probe_partition.bytes) || partitions[partition_index].probe_partition.bytes >= max_spilled_size_per_spill)) + if (partitions[partition_index].spill && ((force && partitions[partition_index].probe_partition.bytes) || partitions[partition_index].probe_partition.bytes >= probe_spill_config.max_cached_data_bytes_in_spiller)) { probe_spiller->spillBlocks(partitions[partition_index].probe_partition.blocks, partition_index); releaseProbePartitionBlocks(partition_index); diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 3a6c525d8e7..d9847e78f77 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -144,6 +144,9 @@ class Join const String & req_id, bool enable_fine_grained_shuffle_, size_t fine_grained_shuffle_count_, + size_t max_join_bytes_, + const SpillConfig & build_spill_config_, + const SpillConfig & probe_spill_config_, const TiDB::TiDBCollators & collators_ = TiDB::dummy_collators, const String & left_filter_column = "", const String & right_filter_column = "", @@ -152,10 +155,6 @@ class Join ExpressionActionsPtr other_condition_ptr = nullptr, size_t max_block_size = 0, const String & match_helper_name = "", - size_t max_spilled_size_per_spill_ = 1024ULL * 1024 * 1024, - size_t max_join_bytes_ = 0, - String tmp_path = "", - FileProviderPtr file_provider = nullptr, JoinMemoryInfo join_memory_info = JoinMemoryInfo(), size_t restore_round = 0); @@ -462,8 +461,9 @@ class Join std::list spilled_partition_indexes; - size_t max_spilled_size_per_spill; size_t max_join_bytes; + SpillConfig build_spill_config; + SpillConfig probe_spill_config; BlockInputStreams restore_build_streams; BlockInputStreams restore_probe_streams; @@ -501,9 +501,6 @@ class Join Block totals; JoinMemoryInfo join_memory_info; - String tmp_path; - FileProviderPtr file_provider; - /** Protect state for concurrent use in insertFromBlock and joinBlock. * Note that these methods could be called simultaneously only while use of StorageJoin, * and StorageJoin only calls these two methods. diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index e994063cccd..94afe9ce518 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -293,13 +293,13 @@ struct Settings M(SettingUInt64, async_pollers_per_cq, 200, "grpc async pollers per cqs") \ M(SettingUInt64, async_cqs, 1, "grpc async cqs") \ M(SettingUInt64, preallocated_request_count_per_poller, 20, "grpc preallocated_request_count_per_poller") \ - \ M(SettingUInt64, manual_compact_pool_size, 1, "The number of worker threads to handle manual compact requests.") \ M(SettingUInt64, manual_compact_max_concurrency, 10, "Max concurrent tasks. It should be larger than pool size.") \ M(SettingUInt64, manual_compact_more_until_ms, 60000, "Continuously compact more segments until reaching specified elapsed time. If 0 is specified, only one segment will be compacted each round.") \ - M(SettingUInt64, max_spilled_size_per_spill, 1024ULL * 1024 * 1024, "Max spilled data size per spill, 1GB as the default value.") \ M(SettingUInt64, max_join_bytes, 0, "max bytes used by join") \ - \ + M(SettingUInt64, max_cached_data_bytes_in_spiller, 1024ULL * 1024 * 100, "Max cached data bytes in spiller before spilling, 100MB as the default value, 0 means no limit") \ + M(SettingUInt64, max_spilled_rows_per_file, 200000, "Max spilled data rows per spill file, 200000 as the default value, 0 mean no limit.") \ + M(SettingUInt64, max_spilled_bytes_per_file, 0, "Max spilled data bytes per spill file, 0 as the default value, 0 means no limit.") \ M(SettingBool, enable_planner, true, "Enable planner") \ M(SettingBool, enable_pipeline, false, "Enable pipeline model") \ M(SettingUInt64, pipeline_task_thread_pool_size, 0, "The size of task thread pool. 0 means using number_of_logical_cpu_cores.") \