From 6a305d375bcf9827a35d0ff21c938f4ebd32d8ea Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 22 Sep 2022 10:21:03 +0800 Subject: [PATCH 1/5] Always use readable pathname in DMFileReader (#5958) (#5983) close pingcap/tiflash#5956 --- .../File/DMFileBlockInputStream.cpp | 7 +- .../Storages/DeltaMerge/File/DMFileReader.h | 8 +- .../ReadThread/ColumnSharingCache.cpp | 7 ++ .../ReadThread/ColumnSharingCache.h | 2 + .../DeltaMerge/tests/gtest_segment_reader.cpp | 94 +++++++++++++++++++ 5 files changed, 107 insertions(+), 11 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp index bfb5d2cc054..b8c7e83102c 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp @@ -30,12 +30,7 @@ DMFileBlockInputStreamBuilder::DMFileBlockInputStreamBuilder(const Context & con DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr & dmfile, const ColumnDefines & read_columns, const RowKeyRanges & rowkey_ranges) { - if (dmfile->getStatus() != DMFile::Status::READABLE) - throw Exception(fmt::format( - "DMFile [{}] is expected to be in READABLE status, but: {}", - dmfile->fileId(), - DMFile::statusString(dmfile->getStatus())), - ErrorCodes::LOGICAL_ERROR); + RUNTIME_CHECK(dmfile->getStatus() == DMFile::Status::READABLE, dmfile->fileId(), DMFile::statusString(dmfile->getStatus())); // if `rowkey_ranges` is empty, we unconditionally read all packs // `rowkey_ranges` and `is_common_handle` will only be useful in clean read mode. diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h index 2eb14c6d19e..bffa0ef59ce 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h @@ -102,13 +102,11 @@ class DMFileReader /// Return false if it is the end of stream. bool getSkippedRows(size_t & skip_rows); Block read(); - UInt64 fileId() const - { - return dmfile->fileId(); - } std::string path() const { - return dmfile->path(); + // Status of DMFile can be updated when DMFileReader in used and the pathname will be changed. + // For DMFileReader, always use the readable path. + return DMFile::getPathByStatus(dmfile->parentPath(), dmfile->fileId(), DMFile::Status::READABLE); } void addCachedPacks(ColId col_id, size_t start_pack_id, size_t pack_count, ColumnPtr & col); diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.cpp index 13b187167b6..fc541aa9124 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.cpp @@ -60,4 +60,11 @@ void DMFileReaderPool::set(DMFileReader & from_reader, int64_t col_id, size_t st r->addCachedPacks(col_id, start, count, col); } } + +DMFileReader * DMFileReaderPool::get(const std::string & name) +{ + std::lock_guard lock(mtx); + auto itr = readers.find(name); + return itr != readers.end() && !itr->second.empty() ? *(itr->second.begin()) : nullptr; +} } // namespace DB::DM \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h index 57d60eae77b..9bf93ade499 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h @@ -224,6 +224,8 @@ class DMFileReaderPool void add(DMFileReader & reader); void del(DMFileReader & reader); void set(DMFileReader & from_reader, int64_t col_id, size_t start, size_t count, ColumnPtr & col); + // `get` is just for test. + DMFileReader * get(const std::string & name); private: DMFileReaderPool() = default; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_reader.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_reader.cpp index c06b74757d5..eb81a9edb29 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_reader.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_reader.cpp @@ -20,8 +20,10 @@ #include #include #include +#include #include #include +#include #include #include #include @@ -127,6 +129,98 @@ try } CATCH + +TEST_P(DeltaMergeStoreRWTest, DMFileNameChangedInDMFileReadPool) +try +{ + const ColumnDefine col_str_define(2, "col2", std::make_shared()); + const ColumnDefine col_i8_define(3, "i8", std::make_shared()); + { + auto table_column_defines = DMTestEnv::getDefaultColumns(); + table_column_defines->emplace_back(col_str_define); + table_column_defines->emplace_back(col_i8_define); + store = reload(table_column_defines); + } + + constexpr size_t num_rows_write = 128; + // Ensure stable is not empty. + { + auto block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false); + block.insert(DB::tests::createColumn( + createNumberStrings(0, num_rows_write), + col_str_define.name, + col_str_define.id)); + block.insert(DB::tests::createColumn( + createSignedNumbers(0, num_rows_write), + col_i8_define.name, + col_i8_define.id)); + store->write(*db_context, db_context->getSettingsRef(), block); + ASSERT_TRUE(store->flushCache(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()))); + store->mergeDeltaAll(*db_context); + auto stable = store->id_to_segment.begin()->second->getStable(); + ASSERT_EQ(stable->getRows(), num_rows_write); + } + + // Ensure delta is not empty. + { + auto beg = num_rows_write; + auto end = num_rows_write + num_rows_write; + auto block = DMTestEnv::prepareSimpleWriteBlock(beg, end, false); + block.insert(DB::tests::createColumn( + createNumberStrings(beg, end), + col_str_define.name, + col_str_define.id)); + block.insert(DB::tests::createColumn( + createSignedNumbers(beg, end), + col_i8_define.name, + col_i8_define.id)); + store->write(*db_context, db_context->getSettingsRef(), block); + auto delta = store->id_to_segment.begin()->second->getDelta(); + ASSERT_EQ(delta->getRows(), num_rows_write); + } + + // Check DMFile + const auto & dmfiles = store->id_to_segment.begin()->second->getStable()->getDMFiles(); + ASSERT_EQ(dmfiles.size(), 1); + auto dmfile = dmfiles.front(); + auto readable_path = DMFile::getPathByStatus(dmfile->parentPath(), dmfile->fileId(), DMFile::Status::READABLE); + ASSERT_EQ(dmfile->path(), readable_path); + ASSERT_EQ(DMFileReaderPool::instance().get(readable_path), nullptr); + + { + const auto & columns = store->getTableColumns(); + BlockInputStreamPtr in = store->read(*db_context, + db_context->getSettingsRef(), + columns, + {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + /* num_streams= */ 1, + /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, + TRACING_NAME, + /* keep_order= */ false, + /* is_fast_scan= */ false, + /* expected_block_size= */ 128)[0]; + auto blk = in->read(); + // DMFileReader is created and add to DMFileReaderPool. + auto * reader = DMFileReaderPool::instance().get(readable_path); + ASSERT_NE(reader, nullptr); + ASSERT_EQ(reader->path(), readable_path); + + // Update DMFile. + ASSERT_TRUE(store->flushCache(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()))); + store->mergeDeltaAll(*db_context); + auto stable = store->id_to_segment.begin()->second->getStable(); + ASSERT_EQ(stable->getRows(), 2 * num_rows_write); + + dmfile->remove(db_context->getFileProvider()); + ASSERT_NE(dmfile->path(), readable_path); + + in = nullptr; + ASSERT_EQ(DMFileReaderPool::instance().get(readable_path), nullptr); + } +} +CATCH + } // namespace tests } // namespace DM } // namespace DB From a05f9e722d6476f0a76789262be70b55090f0d4a Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 22 Sep 2022 19:59:03 +0800 Subject: [PATCH 2/5] Disable folly memcpy as default memcpy (#5996) (#6001) close pingcap/tiflash#5949 --- libs/libmemcpy/CMakeLists.txt | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/libs/libmemcpy/CMakeLists.txt b/libs/libmemcpy/CMakeLists.txt index a329d64bb49..ae0f9dc5776 100644 --- a/libs/libmemcpy/CMakeLists.txt +++ b/libs/libmemcpy/CMakeLists.txt @@ -12,7 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -option(TIFLASH_FOLLY_MEMCPY_IS_MEMCPY "use folly memcpy as default `memcpy` and `memmove`" ON) +# TODO: optimize for small size memory +option(USE_FOLLY_MEMCPY_AS_MEMCPY "use folly memcpy as default `memcpy` and `memmove`" OFF) set (memcpy_sources) @@ -24,11 +25,11 @@ if (ARCH_LINUX AND TIFLASH_ENABLE_AVX_SUPPORT) list (APPEND memcpy_sources folly/memcpy.S) message (STATUS "`libmemcpy` support Folly memcpy") else () - set (TIFLASH_FOLLY_MEMCPY_IS_MEMCPY OFF) + set (USE_FOLLY_MEMCPY_AS_MEMCPY OFF) endif () if (USE_INTERNAL_MEMCPY) - if (TIFLASH_FOLLY_MEMCPY_IS_MEMCPY) + if (USE_FOLLY_MEMCPY_AS_MEMCPY) message (STATUS "Using Folly memcpy as default `memcpy` and `memmove`") add_definitions(-DFOLLY_MEMCPY_IS_MEMCPY=1) else () From 418fde310b194bf9172456119f55b567eb402780 Mon Sep 17 00:00:00 2001 From: Wish Date: Thu, 22 Sep 2022 23:04:40 +0800 Subject: [PATCH 3/5] storage: GC segments contained by the pack (#6010) Signed-off-by: Wish --- dbms/src/Common/FailPoint.cpp | 2 + .../Storages/DeltaMerge/DeltaMergeStore.cpp | 10 +- .../DeltaMerge/DeltaMergeStore_InternalBg.cpp | 111 ++-- dbms/src/Storages/DeltaMerge/Segment.cpp | 15 +- .../Storages/DeltaMerge/StableValueSpace.cpp | 62 ++- .../Storages/DeltaMerge/StableValueSpace.h | 109 +++- .../tests/gtest_dm_delta_merge_store.cpp | 158 ------ .../DeltaMerge/tests/gtest_dm_segment.cpp | 2 +- .../tests/gtest_dm_simple_pk_test_basic.cpp | 376 +++++++++++++ .../tests/gtest_dm_simple_pk_test_basic.h | 110 ++++ .../tests/gtest_dm_store_background.cpp | 509 ++++++++++++++++++ .../DeltaMerge/tests/gtest_segment.cpp | 70 --- libs/libsymbolization/CMakeLists.txt | 9 +- 13 files changed, 1229 insertions(+), 314 deletions(-) create mode 100644 dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.cpp create mode 100644 dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.h create mode 100644 dbms/src/Storages/DeltaMerge/tests/gtest_dm_store_background.cpp diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 4b2ffcbe167..9ac51fa0806 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -76,6 +76,8 @@ std::unordered_map> FailPointHelper::f #define APPLY_FOR_FAILPOINTS(M) \ M(skip_check_segment_update) \ M(gc_skip_update_safe_point) \ + M(gc_skip_merge_delta) \ + M(gc_skip_merge) \ M(force_set_page_file_write_errno) \ M(force_split_io_size_4k) \ M(minimum_block_size_for_cross_join) \ diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index fc6cbb49795..085d44d89d8 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1546,17 +1546,17 @@ DeltaMergeStoreStat DeltaMergeStore::getStat() total_delta_valid_cache_rows += delta->getValidCacheRows(); } - if (stable->getPacks()) + if (stable->getDMFilesPacks()) { stat.total_rows += stable->getRows(); stat.total_size += stable->getBytes(); stat.stable_count += 1; - stat.total_pack_count_in_stable += stable->getPacks(); + stat.total_pack_count_in_stable += stable->getDMFilesPacks(); stat.total_stable_rows += stable->getRows(); stat.total_stable_size += stable->getBytes(); - stat.total_stable_size_on_disk += stable->getBytesOnDisk(); + stat.total_stable_size_on_disk += stable->getDMFilesBytesOnDisk(); } } @@ -1679,10 +1679,10 @@ SegmentStats DeltaMergeStore::getSegmentStats() stat.size = delta->getBytes() + stable->getBytes(); stat.delete_ranges = delta->getDeletes(); - stat.stable_size_on_disk = stable->getBytesOnDisk(); + stat.stable_size_on_disk = stable->getDMFilesBytesOnDisk(); stat.delta_pack_count = delta->getColumnFileCount(); - stat.stable_pack_count = stable->getPacks(); + stat.stable_pack_count = stable->getDMFilesPacks(); stat.avg_delta_pack_rows = static_cast(delta->getRows()) / stat.delta_pack_count; stat.avg_stable_pack_rows = static_cast(stable->getRows()) / stat.stable_pack_count; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp index ea387529198..b65ec088d38 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp @@ -30,6 +30,8 @@ namespace DB namespace FailPoints { extern const char gc_skip_update_safe_point[]; +extern const char gc_skip_merge_delta[]; +extern const char gc_skip_merge[]; extern const char pause_before_dt_background_delta_merge[]; extern const char pause_until_dt_background_delta_merge[]; } // namespace FailPoints @@ -396,12 +398,22 @@ bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(const DMContext & conte double invalid_data_ratio_threshold, const LoggerPtr & log) { - auto [first_pack_included, last_pack_included] = snap->stable->isFirstAndLastPackIncludedInRange(context, seg->getRowKeyRange()); - // Do a quick check about whether the DTFile is completely included in the segment range - if (first_pack_included && last_pack_included) + if (snap->stable->getDMFilesPacks() == 0) { - LOG_FMT_TRACE(log, "GC - shouldCompactStableWithTooMuchDataOutOfSegmentRange marking " - "segment as valid data ratio checked because all packs are included, segment={}", + LOG_FMT_TRACE( + log, + "GC - shouldCompactStableWithTooMuchDataOutOfSegmentRange skipped segment " + "because the DTFile of stable is empty, segment={}", + seg->info()); + return false; + } + + auto at_least_result = snap->stable->getAtLeastRowsAndBytes(context, seg->getRowKeyRange()); + if (at_least_result.first_pack_intersection == RSResult::All // + && at_least_result.last_pack_intersection == RSResult::All) + { + LOG_FMT_TRACE(log, "GC - shouldCompactStableWithTooMuchDataOutOfSegmentRange permanently skipped segment " + "because all packs in DTFiles are fully contained by the segment range, segment={}", seg->info()); seg->setValidDataRatioChecked(); return false; @@ -410,34 +422,34 @@ bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(const DMContext & conte std::unordered_set prev_segment_file_ids = getDMFileIDs(prev_seg); std::unordered_set next_segment_file_ids = getDMFileIDs(next_seg); + // Only try to compact the segment when there is data out of this segment range and is also not shared by neighbor segments. bool contains_invalid_data = false; const auto & dt_files = snap->stable->getDMFiles(); - if (!first_pack_included) + if (at_least_result.first_pack_intersection != RSResult::All) { - auto first_file_id = dt_files[0]->fileId(); - if (prev_segment_file_ids.count(first_file_id) == 0) + auto first_file_id = dt_files.front()->fileId(); + if (prev_seg != nullptr && prev_segment_file_ids.count(first_file_id) == 0) { contains_invalid_data = true; } } - if (!last_pack_included) + if (at_least_result.last_pack_intersection != RSResult::All) { - auto last_file_id = dt_files[dt_files.size() - 1]->fileId(); - if (next_segment_file_ids.count(last_file_id) == 0) + auto last_file_id = dt_files.back()->fileId(); + if (next_seg != nullptr && next_segment_file_ids.count(last_file_id) == 0) { contains_invalid_data = true; } } - // Only try to compact the segment when there is data out of this segment range and is also not shared by neighbor segments. if (!contains_invalid_data) { LOG_FMT_TRACE( log, - "GC - shouldCompactStableWithTooMuchDataOutOfSegmentRange checked false because no invalid data, " - "segment={} first_pack_included={} last_pack_included={} prev_seg_files=[{}] next_seg_files=[{}] my_files=[{}]", - seg->simpleInfo(), - first_pack_included, - last_pack_included, + "GC - shouldCompactStableWithTooMuchDataOutOfSegmentRange checked false " + "because segment DTFile is shared with a neighbor segment, " + "first_pack_inc={} last_pack_inc={} prev_seg_files=[{}] next_seg_files=[{}] my_files=[{}] segment={}", + magic_enum::enum_name(at_least_result.first_pack_intersection), + magic_enum::enum_name(at_least_result.last_pack_intersection), fmt::join(prev_segment_file_ids, ","), fmt::join(next_segment_file_ids, ","), [&] { @@ -450,30 +462,53 @@ bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(const DMContext & conte }, ","); return fmt_buf.toString(); - }()); + }(), + seg->info()); + // We do not mark `setValidDataRatioChecked` because neighbor segments' state could change. return false; } - size_t total_rows = 0; - size_t total_bytes = 0; - for (const auto & file : dt_files) - { - total_rows += file->getRows(); - total_bytes += file->getBytes(); - } - auto valid_rows = snap->stable->getRows(); - auto valid_bytes = snap->stable->getBytes(); - - auto check_result = (valid_rows < total_rows * (1 - invalid_data_ratio_threshold)) || (valid_bytes < total_bytes * (1 - invalid_data_ratio_threshold)); + size_t file_rows = snap->stable->getDMFilesRows(); + size_t file_bytes = snap->stable->getDMFilesBytes(); + + // We use at_least_rows|bytes, instead of stable_rows|bytes. The difference is that, at_least_rows|bytes only count packs + // that are fully contained in the segment range, while stable_rows|bytes count packs that are intersected with the segment + // range. + // + // Consider the following case, where segment only contain one pack: + // │***** ******│ DTFile only contains 1 pack + // │<------>│ Segment + // This kind of data layout may be produced by logical split. In this case, ratio calculated using at_least_rows would be 0%, + // but ratio calculated using stable_rows would be 100%. + // We definitely want such DTFile to be reclaimed, because this segment is not containing any real rows at all!. + // + // Of course there are false positives, consider the following case: + // │*************************│ DTFile only contains 1 pack + // │<------------------->│ Segment + // The segment is containing most of the data in the DTFile and not much space can be reclaimed after merging the delta. + // We are just wasting the disk IO when doing the GC. + // This is currently acceptable, considering that: + // 1) The cost of rewriting the stable of 1 pack is small + // 2) After rewriting, the segment will not need to be rewritten again, as it will look like: + // │*********************│ DTFile only contains 1 pack + // │<------------------->│ Segment + // + // See https://github.com/pingcap/tiflash/pull/6010 for more details. + + auto check_result = (at_least_result.rows < file_rows * (1 - invalid_data_ratio_threshold)) // + || (at_least_result.bytes < file_bytes * (1 - invalid_data_ratio_threshold)); LOG_FMT_TRACE( log, "GC - Checking shouldCompactStableWithTooMuchDataOutOfSegmentRange, " - "check_result={} valid_rows={} valid_bytes={} file_rows={} file_bytes={}", + "check_result={} first_pack_inc={} last_pack_inc={} rows_at_least={} bytes_at_least={} file_rows={} file_bytes={} segment={} ", check_result, - valid_rows, - valid_bytes, - total_rows, - total_bytes); + magic_enum::enum_name(at_least_result.first_pack_intersection), + magic_enum::enum_name(at_least_result.last_pack_intersection), + at_least_result.rows, + at_least_result.bytes, + file_rows, + file_bytes, + seg->info()); seg->setValidDataRatioChecked(); return check_result; } @@ -482,6 +517,10 @@ bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(const DMContext & conte SegmentPtr DeltaMergeStore::gcTrySegmentMerge(const DMContextPtr & dm_context, const SegmentPtr & segment) { + fiu_do_on(FailPoints::gc_skip_merge, { + return {}; + }); + auto segment_rows = segment->getEstimatedRows(); auto segment_bytes = segment->getEstimatedBytes(); if (segment_rows >= dm_context->small_segment_rows || segment_bytes >= dm_context->small_segment_bytes) @@ -521,6 +560,10 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMerge(const DMContextPtr & dm_context, c SegmentPtr DeltaMergeStore::gcTrySegmentMergeDelta(const DMContextPtr & dm_context, const SegmentPtr & segment, const SegmentPtr & prev_segment, const SegmentPtr & next_segment, DB::Timestamp gc_safe_point) { + fiu_do_on(FailPoints::gc_skip_merge_delta, { + return {}; + }); + SegmentSnapshotPtr segment_snap; { std::shared_lock lock(read_write_mutex); diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 86dbec61db0..684d596b226 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -1010,7 +1010,7 @@ std::optional Segment::prepareSplit(DMContext & dm_context, { // When split point is not specified, there are some preconditions in order to use logical split. if (!dm_context.enable_logical_split // - || segment_snap->stable->getPacks() <= 3 // + || segment_snap->stable->getDMFilesPacks() <= 3 // || segment_snap->delta->getRows() > segment_snap->stable->getRows()) { try_split_mode = SplitMode::Physical; @@ -1565,18 +1565,27 @@ String Segment::simpleInfo() const String Segment::info() const { - return fmt::format("", + return fmt::format("", segment_id, epoch, rowkey_range.toDebugString(), hasAbandoned() ? " abandoned=true" : "", next_segment_id, + delta->getRows(), delta->getBytes(), delta->getDeletes(), + stable->getDMFilesString(), stable->getRows(), - stable->getBytes()); + stable->getBytes(), + + stable->getDMFilesRows(), + stable->getDMFilesBytes(), + stable->getDMFilesPacks()); } String Segment::simpleInfo(const std::vector & segments) diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index 1c3e8de30ab..4a45a3a1bdf 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -128,17 +128,15 @@ size_t StableValueSpace::getBytes() const return valid_bytes; } -size_t StableValueSpace::getBytesOnDisk() const +size_t StableValueSpace::getDMFilesBytesOnDisk() const { - // If this stable value space is logical splitted, some file may not used, - // and this will return more bytes than actual used. size_t bytes = 0; for (const auto & file : files) bytes += file->getBytesOnDisk(); return bytes; } -size_t StableValueSpace::getPacks() const +size_t StableValueSpace::getDMFilesPacks() const { size_t packs = 0; for (const auto & file : files) @@ -146,6 +144,22 @@ size_t StableValueSpace::getPacks() const return packs; } +size_t StableValueSpace::getDMFilesRows() const +{ + size_t rows = 0; + for (const auto & file : files) + rows += file->getRows(); + return rows; +} + +size_t StableValueSpace::getDMFilesBytes() const +{ + size_t bytes = 0; + for (const auto & file : files) + bytes += file->getBytes(); + return bytes; +} + String StableValueSpace::getDMFilesString() { String s; @@ -397,16 +411,17 @@ RowsAndBytes StableValueSpace::Snapshot::getApproxRowsAndBytes(const DMContext & return {approx_rows, approx_bytes}; } -std::pair StableValueSpace::Snapshot::isFirstAndLastPackIncludedInRange(const DMContext & context, const RowKeyRange & range) const +StableValueSpace::Snapshot::AtLeastRowsAndBytesResult // +StableValueSpace::Snapshot::getAtLeastRowsAndBytes(const DMContext & context, const RowKeyRange & range) const { + AtLeastRowsAndBytesResult ret{}; + // Usually, this method will be called for some "cold" key ranges. // Loading the index into cache may pollute the cache and make the hot index cache invalid. // So don't refill the cache if the index does not exist. - bool first_pack_included = false; - bool last_pack_included = false; - for (size_t i = 0; i < stable->files.size(); i++) + for (size_t file_idx = 0; file_idx < stable->files.size(); ++file_idx) { - const auto & file = stable->files[i]; + const auto & file = stable->files[file_idx]; auto filter = DMFilePackFilter::loadFrom( file, context.db_context.getGlobalContext().getMinMaxIndexCache(), @@ -417,20 +432,37 @@ std::pair StableValueSpace::Snapshot::isFirstAndLastPackIncludedInRa context.db_context.getFileProvider(), context.getReadLimiter(), context.tracing_id); - const auto & use_packs = filter.getUsePacks(); - if (i == 0) + const auto & handle_filter_result = filter.getHandleRes(); + if (file_idx == 0) { // TODO: this check may not be correct when support multiple files in a stable, let's just keep it now for simplicity - first_pack_included = use_packs.empty() || use_packs[0]; + if (handle_filter_result.empty()) + ret.first_pack_intersection = RSResult::None; + else + ret.first_pack_intersection = handle_filter_result.front(); } - if (i == stable->files.size() - 1) + if (file_idx == stable->files.size() - 1) { // TODO: this check may not be correct when support multiple files in a stable, let's just keep it now for simplicity - last_pack_included = use_packs.empty() || use_packs.back(); + if (handle_filter_result.empty()) + ret.last_pack_intersection = RSResult::None; + else + ret.last_pack_intersection = handle_filter_result.back(); + } + + const auto & pack_stats = file->getPackStats(); + for (size_t pack_idx = 0; pack_idx < pack_stats.size(); ++pack_idx) + { + // Only count packs that are fully contained by the range. + if (handle_filter_result[pack_idx] == RSResult::All) + { + ret.rows += pack_stats[pack_idx].rows; + ret.bytes += pack_stats[pack_idx].bytes; + } } } - return std::make_pair(first_pack_included, last_pack_included); + return ret; } } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.h b/dbms/src/Storages/DeltaMerge/StableValueSpace.h index f9006058e7a..2bd15e87db4 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.h @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -49,13 +50,46 @@ class StableValueSpace : public std::enable_shared_from_this PageId getId() { return id; } void saveMeta(WriteBatch & meta_wb); - const DMFiles & getDMFiles() { return files; } - String getDMFilesString(); size_t getRows() const; size_t getBytes() const; - size_t getBytesOnDisk() const; - size_t getPacks() const; + + /** + * Return the underlying DTFiles. + * DTFiles are not fully included in the segment range will be also included in the result. + * Note: Out-of-range DTFiles may be produced by logical split. + */ + const DMFiles & getDMFiles() const { return files; } + + String getDMFilesString(); + + /** + * Return the total on-disk size of the underlying DTFiles. + * DTFiles are not fully included in the segment range will be also counted in. + * Note: Out-of-range DTFiles may be produced by logical split. + */ + size_t getDMFilesBytesOnDisk() const; + + /** + * Return the total number of packs of the underlying DTFiles. + * Packs that are not included in the segment range will be also counted in. + * Note: Out-of-range packs may be produced by logical split. + */ + size_t getDMFilesPacks() const; + + /** + * Return the total number of rows of the underlying DTFiles. + * Rows from packs that are not included in the segment range will be also counted in. + * Note: Out-of-range rows may be produced by logical split. + */ + size_t getDMFilesRows() const; + + /** + * Return the total size of the data of the underlying DTFiles. + * Rows from packs that are not included in the segment range will be also counted in. + * Note: Out-of-range rows may be produced by logical split. + */ + size_t getDMFilesBytes() const; void enableDMFilesGC(); @@ -111,7 +145,7 @@ class StableValueSpace : public std::enable_shared_from_this : log(&Poco::Logger::get("StableValueSpace::Snapshot")) {} - SnapshotPtr clone() + SnapshotPtr clone() const { auto c = std::make_shared(); c->stable = stable; @@ -127,20 +161,38 @@ class StableValueSpace : public std::enable_shared_from_this return c; } - PageId getId() { return id; } - - size_t getRows() { return valid_rows; } - size_t getBytes() { return valid_bytes; } - - const DMFiles & getDMFiles() { return stable->getDMFiles(); } - - size_t getPacks() - { - size_t packs = 0; - for (auto & file : getDMFiles()) - packs += file->getPacks(); - return packs; - } + PageId getId() const { return id; } + + size_t getRows() const { return valid_rows; } + size_t getBytes() const { return valid_bytes; } + + /** + * Return the underlying DTFiles. + * DTFiles are not fully included in the segment range will be also included in the result. + * Note: Out-of-range DTFiles may be produced by logical split. + */ + const DMFiles & getDMFiles() const { return stable->getDMFiles(); } + + /** + * Return the total number of packs of the underlying DTFiles. + * Packs that are not included in the segment range will be also counted in. + * Note: Out-of-range packs may be produced by logical split. + */ + size_t getDMFilesPacks() const { return stable->getDMFilesPacks(); } + + /** + * Return the total number of rows of the underlying DTFiles. + * Rows from packs that are not included in the segment range will be also counted in. + * Note: Out-of-range rows may be produced by logical split. + */ + size_t getDMFilesRows() const { return stable->getDMFilesRows(); }; + + /** + * Return the total size of the data of the underlying DTFiles. + * Rows from packs that are not included in the segment range will be also counted in. + * Note: Out-of-range rows may be produced by logical split. + */ + size_t getDMFilesBytes() const { return stable->getDMFilesBytes(); }; ColumnCachePtrs & getColumnCaches() { return column_caches; } @@ -156,7 +208,19 @@ class StableValueSpace : public std::enable_shared_from_this RowsAndBytes getApproxRowsAndBytes(const DMContext & context, const RowKeyRange & range) const; - std::pair isFirstAndLastPackIncludedInRange(const DMContext & context, const RowKeyRange & range) const; + struct AtLeastRowsAndBytesResult + { + size_t rows = 0; + size_t bytes = 0; + RSResult first_pack_intersection = RSResult::None; + RSResult last_pack_intersection = RSResult::None; + }; + + /** + * Get the rows and bytes calculated from packs that is **fully contained** by the given range. + * If the pack is partially intersected, then it is not counted. + */ + AtLeastRowsAndBytesResult getAtLeastRowsAndBytes(const DMContext & context, const RowKeyRange & range) const; private: Poco::Logger * log; @@ -171,8 +235,9 @@ class StableValueSpace : public std::enable_shared_from_this // Valid rows is not always the sum of rows in file, // because after logical split, two segments could reference to a same file. - UInt64 valid_rows; - UInt64 valid_bytes; + UInt64 valid_rows; /* At most. The actual valid rows may be lower than this value. */ + UInt64 valid_bytes; /* At most. The actual valid bytes may be lower than this value. */ + DMFiles files; StableProperty property; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index 16e8c25dbcb..edb34cea2b4 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -35,8 +35,6 @@ #include #include -#include "Storages/DeltaMerge/RowKeyRange.h" - namespace DB { namespace FailPoints @@ -3383,162 +3381,6 @@ try CATCH -class DeltaMergeStoreBackgroundTest - : public DB::base::TiFlashStorageTestBasic -{ -public: - void SetUp() override - { - FailPointHelper::enableFailPoint(FailPoints::gc_skip_update_safe_point); - - try - { - TiFlashStorageTestBasic::SetUp(); - setupDMStore(); - // Split into 4 segments. - helper = std::make_unique(*db_context); - helper->prepareSegments(store, 50, DMTestEnv::PkType::CommonHandle); - } - CATCH - } - - void TearDown() override - { - TiFlashStorageTestBasic::TearDown(); - FailPointHelper::disableFailPoint(FailPoints::gc_skip_update_safe_point); - } - - void setupDMStore() - { - auto cols = DMTestEnv::getDefaultColumns(DMTestEnv::PkType::CommonHandle); - store = std::make_shared(*db_context, - false, - "test", - DB::base::TiFlashStorageTestBasic::getCurrentFullTestName(), - 101, - *cols, - (*cols)[0], - true, - 1, - DeltaMergeStore::Settings()); - dm_context = store->newDMContext(*db_context, db_context->getSettingsRef(), DB::base::TiFlashStorageTestBasic::getCurrentFullTestName()); - } - -protected: - std::unique_ptr helper{}; - DeltaMergeStorePtr store; - DMContextPtr dm_context; -}; - -TEST_F(DeltaMergeStoreBackgroundTest, GCWillMergeMultipleSegments) -try -{ - ASSERT_EQ(store->segments.size(), 4); - auto gc_n = store->onSyncGc(1); - ASSERT_EQ(store->segments.size(), 1); - ASSERT_EQ(gc_n, 1); -} -CATCH - -TEST_F(DeltaMergeStoreBackgroundTest, GCOnlyMergeSmallSegments) -try -{ - UInt64 gc_n = 0; - - // Note: initially we have 4 segments, each segment contains 50 rows. - - ASSERT_EQ(store->segments.size(), 4); - db_context->getGlobalContext().getSettingsRef().dt_segment_limit_rows = 10; - gc_n = store->onSyncGc(100); - ASSERT_EQ(store->segments.size(), 4); - ASSERT_EQ(gc_n, 0); - - // In this case, merge two segments will exceed small_segment_rows, so no merge will happen - db_context->getGlobalContext().getSettingsRef().dt_segment_limit_rows = 55 * 3; - gc_n = store->onSyncGc(100); - ASSERT_EQ(store->segments.size(), 4); - ASSERT_EQ(gc_n, 0); - - // In this case, we will only merge two segments and then stop. - // [50, 50, 50, 50] => [100, 100] - db_context->getGlobalContext().getSettingsRef().dt_segment_limit_rows = 105 * 3; - gc_n = store->onSyncGc(100); - ASSERT_EQ(store->segments.size(), 2); - ASSERT_EQ(gc_n, 2); - helper->resetExpectedRows(); - ASSERT_EQ(helper->rows_by_segments[0], 100); - ASSERT_EQ(helper->rows_by_segments[1], 100); - - gc_n = store->onSyncGc(100); - ASSERT_EQ(store->segments.size(), 2); - ASSERT_EQ(gc_n, 0); - helper->verifyExpectedRowsForAllSegments(); -} -CATCH - -TEST_F(DeltaMergeStoreBackgroundTest, GCMergeAndStop) -try -{ - UInt64 gc_n = 0; - - // Note: initially we have 4 segments, each segment contains 50 rows. - - ASSERT_EQ(store->segments.size(), 4); - - // In this case, we will only merge two segments and then stop. - // [50, 50, 50, 50] => [100, 50, 50] - db_context->getGlobalContext().getSettingsRef().dt_segment_limit_rows = 105 * 3; - gc_n = store->onSyncGc(1); - ASSERT_EQ(store->segments.size(), 3); - ASSERT_EQ(gc_n, 1); - helper->resetExpectedRows(); - ASSERT_EQ(helper->rows_by_segments[0], 100); - ASSERT_EQ(helper->rows_by_segments[1], 50); - ASSERT_EQ(helper->rows_by_segments[2], 50); -} -CATCH - -TEST_F(DeltaMergeStoreBackgroundTest, GCMergeWhileFlushing) -try -{ - ASSERT_EQ(store->segments.size(), 4); - - Block block = DMTestEnv::prepareSimpleWriteBlock(0, 500, false, DMTestEnv::PkType::CommonHandle, 10 /* new tso */); - store->write(*db_context, db_context->getSettingsRef(), block); - - // Currently, when there is a flush in progress, the segment merge in GC thread will be blocked. - - auto sp_flush_commit = SyncPointCtl::enableInScope("before_ColumnFileFlushTask::commit"); - auto sp_merge_flush_retry = SyncPointCtl::enableInScope("before_DeltaMergeStore::segmentMerge|retry_flush"); - - auto th_flush = std::async([&]() { - auto result = store->segments.begin()->second->flushCache(*dm_context); - ASSERT_TRUE(result); - }); - - sp_flush_commit.waitAndPause(); - - auto th_gc = std::async([&]() { - auto gc_n = store->onSyncGc(1); - ASSERT_EQ(gc_n, 1); - ASSERT_EQ(store->segments.size(), 1); - }); - - // Expect merge triggered by GC is retrying... because there is a flush in progress. - sp_merge_flush_retry.waitAndPause(); - - // Finish the flush. - sp_flush_commit.next(); - sp_flush_commit.disable(); - th_flush.wait(); - - // The merge in GC should continue without any further retries. - sp_merge_flush_retry.next(); - th_gc.wait(); -} -CATCH - - } // namespace tests } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index b5de675d2ee..24f79c892c8 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -197,7 +197,7 @@ try // flush segment and make sure there is two packs in stable segment = segment->mergeDelta(dmContext(), tableColumns()); - ASSERT_EQ(segment->getStable()->getPacks(), 2); + ASSERT_EQ(segment->getStable()->getDMFilesPacks(), 2); } { diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.cpp new file mode 100644 index 00000000000..b2169a20440 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.cpp @@ -0,0 +1,376 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +namespace DB +{ + +namespace FailPoints +{ +extern const char skip_check_segment_update[]; +} // namespace FailPoints + +namespace DM +{ + +namespace tests +{ + +void SimplePKTestBasic::reload() +{ + TiFlashStorageTestBasic::SetUp(); + + version = 0; + + auto cols = DMTestEnv::getDefaultColumns(is_common_handle ? DMTestEnv::PkType::CommonHandle : DMTestEnv::PkType::HiddenTiDBRowID); + store = std::make_shared(*db_context, + false, + "test", + DB::base::TiFlashStorageTestBasic::getCurrentFullTestName(), + 101, + *cols, + (*cols)[0], + is_common_handle, + 1, + DeltaMergeStore::Settings()); + dm_context = store->newDMContext(*db_context, db_context->getSettingsRef(), DB::base::TiFlashStorageTestBasic::getCurrentFullTestName()); +} + +SegmentPtr SimplePKTestBasic::getSegmentAt(Int64 key) const +{ + auto row_key = buildRowKey(key); + std::shared_lock lock(store->read_write_mutex); + auto segment_it = store->segments.upper_bound(row_key.toRowKeyValueRef()); + RUNTIME_CHECK(segment_it != store->segments.end()); + auto segment = segment_it->second; + RUNTIME_CHECK(store->isSegmentValid(lock, segment)); + return segment; +} + +void SimplePKTestBasic::ensureSegmentBreakpoints(const std::vector & breakpoints, bool use_logical_split) +{ + LOG_FMT_INFO( + logger_op, + "ensureSegmentBreakpoints [{}] logical_split={}", + fmt::join(breakpoints, ","), + use_logical_split); + + for (const auto & bp : breakpoints) + { + auto bp_key = buildRowKey(bp); + + while (true) + { + SegmentPtr segment; + { + std::shared_lock lock(store->read_write_mutex); + + auto segment_it = store->segments.upper_bound(bp_key.toRowKeyValueRef()); + RUNTIME_CHECK(segment_it != store->segments.end()); + segment = segment_it->second; + } + // The segment is already break at the boundary + if (compare(segment->getRowKeyRange().getStart(), bp_key.toRowKeyValueRef()) == 0) + break; + auto split_mode = use_logical_split ? DeltaMergeStore::SegmentSplitMode::Logical : DeltaMergeStore::SegmentSplitMode::Physical; + auto [left, right] = store->segmentSplit(*dm_context, segment, DeltaMergeStore::SegmentSplitReason::ForegroundWrite, bp_key, split_mode); + if (left) + break; + } + } +} + +std::vector SimplePKTestBasic::getSegmentBreakpoints() const +{ + std::vector breakpoints; + std::unique_lock lock(store->read_write_mutex); + for (auto it = std::next(store->segments.cbegin()); it != store->segments.cend(); it++) + { + auto [start, end] = parseRange(it->second->getRowKeyRange()); + breakpoints.push_back(start); + } + return breakpoints; +} + +RowKeyValue SimplePKTestBasic::buildRowKey(Int64 pk) const +{ + if (!is_common_handle) + return RowKeyValue::fromHandle(pk); + + WriteBufferFromOwnString ss; + ::DB::EncodeUInt(static_cast(TiDB::CodecFlagInt), ss); + ::DB::EncodeInt64(pk, ss); + return RowKeyValue{true, std::make_shared(ss.releaseStr()), pk}; +} + +RowKeyRange SimplePKTestBasic::buildRowRange(Int64 start, Int64 end) const +{ + return RowKeyRange(buildRowKey(start), buildRowKey(end), is_common_handle, 1); +} + +std::pair SimplePKTestBasic::parseRange(const RowKeyRange & range) const +{ + Int64 start_key, end_key; + + if (!is_common_handle) + { + start_key = range.getStart().int_value; + end_key = range.getEnd().int_value; + return {start_key, end_key}; + } + + if (range.isStartInfinite()) + { + start_key = std::numeric_limits::min(); + } + else + { + EXPECT_EQ(range.getStart().data[0], TiDB::CodecFlagInt); + size_t cursor = 1; + start_key = DecodeInt64(cursor, String(range.getStart().data, range.getStart().size)); + } + if (range.isEndInfinite()) + { + end_key = std::numeric_limits::max(); + } + else + { + EXPECT_EQ(range.getEnd().data[0], TiDB::CodecFlagInt); + size_t cursor = 1; + end_key = DecodeInt64(cursor, String(range.getEnd().data, range.getEnd().size)); + } + + return {start_key, end_key}; +} + +Block SimplePKTestBasic::prepareWriteBlock(Int64 start_key, Int64 end_key, bool is_deleted) +{ + RUNTIME_CHECK(start_key <= end_key); + if (end_key == start_key) + return Block{}; + version++; + return DMTestEnv::prepareSimpleWriteBlock( + start_key, // + end_key, + false, + version, + DMTestEnv::pk_name, + EXTRA_HANDLE_COLUMN_ID, + is_common_handle ? EXTRA_HANDLE_COLUMN_STRING_TYPE : EXTRA_HANDLE_COLUMN_INT_TYPE, + is_common_handle, + 1, + true, + is_deleted); +} + +void SimplePKTestBasic::fill(Int64 start_key, Int64 end_key) +{ + LOG_FMT_INFO( + logger_op, + "fill [{}, {})", + start_key, + end_key); + + auto block = prepareWriteBlock(start_key, end_key); + store->write(*db_context, db_context->getSettingsRef(), block); +} + +void SimplePKTestBasic::fillDelete(Int64 start_key, Int64 end_key) +{ + LOG_FMT_INFO( + logger_op, + "fillDelete [{}, {})", + start_key, + end_key); + + auto block = prepareWriteBlock(start_key, end_key, /* delete */ true); + store->write(*db_context, db_context->getSettingsRef(), block); +} + +void SimplePKTestBasic::flush(Int64 start_key, Int64 end_key) +{ + LOG_FMT_INFO( + logger_op, + "flush [{}, {})", + start_key, + end_key); + + auto range = buildRowRange(start_key, end_key); + store->flushCache(*db_context, range, true); +} + +void SimplePKTestBasic::flush() +{ + LOG_FMT_INFO( + logger_op, + "flushAll"); + + auto range = RowKeyRange::newAll(is_common_handle, 1); + store->flushCache(*db_context, range, true); +} + +void SimplePKTestBasic::mergeDelta(Int64 start_key, Int64 end_key) +{ + LOG_FMT_INFO( + logger_op, + "mergeDelta [{}, {})", + start_key, + end_key); + + auto range = buildRowRange(start_key, end_key); + while (!range.none()) + { + auto processed_range = store->mergeDeltaBySegment(*db_context, range.start); + RUNTIME_CHECK(processed_range.has_value()); + range.setStart(processed_range->end); + } +} + +void SimplePKTestBasic::mergeDelta() +{ + LOG_FMT_INFO( + logger_op, + "mergeDeltaAll"); + + flush(); // as mergeDeltaBySegment always flush, so we also flush here. + store->mergeDeltaAll(*db_context); +} + +void SimplePKTestBasic::deleteRange(Int64 start_key, Int64 end_key) +{ + LOG_FMT_INFO( + logger_op, + "deleteRange [{}, {})", + start_key, + end_key); + + auto range = buildRowRange(start_key, end_key); + store->deleteRange(*db_context, db_context->getSettingsRef(), range); +} + +size_t SimplePKTestBasic::getRowsN() +{ + const auto & columns = store->getTableColumns(); + auto in = store->read( + *db_context, + db_context->getSettingsRef(), + columns, + {RowKeyRange::newAll(is_common_handle, 1)}, + /* num_streams= */ 1, + /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, + "", + /* keep_order= */ false, + /* is_fast_scan= */ false, + /* expected_block_size= */ 1024)[0]; + return getInputStreamNRows(in); +} + +size_t SimplePKTestBasic::getRowsN(Int64 start_key, Int64 end_key) +{ + const auto & columns = store->getTableColumns(); + auto in = store->read( + *db_context, + db_context->getSettingsRef(), + columns, + {buildRowRange(start_key, end_key)}, + /* num_streams= */ 1, + /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, + "", + /* keep_order= */ false, + /* is_fast_scan= */ false, + /* expected_block_size= */ 1024)[0]; + return getInputStreamNRows(in); +} + +void SimplePKTestBasic::debugDumpAllSegments() const +{ + std::shared_lock lock(store->read_write_mutex); + for (auto [key, segment] : store->segments) + { + UNUSED(key); + LOG_FMT_INFO(logger, "debugDumpAllSegments: {}", segment->info()); + } +} + + +TEST_F(SimplePKTestBasic, FillAndRead) +try +{ + fill(0, 100); + EXPECT_EQ(100, getRowsN(-50, 200)); + EXPECT_EQ(80, getRowsN(20, 200)); + + fillDelete(40, 150); + EXPECT_EQ(40, getRowsN(-50, 200)); +} +CATCH + + +TEST_F(SimplePKTestBasic, SegmentBreakpoints) +try +{ + FailPointHelper::enableFailPoint(FailPoints::skip_check_segment_update); + SCOPE_EXIT({ + FailPointHelper::disableFailPoint(FailPoints::skip_check_segment_update); + }); + + for (auto ch : {true, false}) + { + is_common_handle = ch; + reload(); + + { + ASSERT_EQ(store->segments.size(), 1); + auto bps = getSegmentBreakpoints(); + ASSERT_EQ(bps.size(), 0); + } + { + ensureSegmentBreakpoints({100, 10, -40, 500}); + } + { + ASSERT_EQ(store->segments.size(), 5); + auto bps = getSegmentBreakpoints(); + ASSERT_EQ(bps.size(), 4); + ASSERT_EQ(bps[0], -40); + ASSERT_EQ(bps[1], 10); + ASSERT_EQ(bps[2], 100); + ASSERT_EQ(bps[3], 500); + } + { + // One breakpoint is equal to a segment boundary, check whether it does not cause problems. + ensureSegmentBreakpoints({30, 10}); + } + { + ASSERT_EQ(store->segments.size(), 6); + auto bps = getSegmentBreakpoints(); + ASSERT_EQ(bps.size(), 5); + ASSERT_EQ(bps[0], -40); + ASSERT_EQ(bps[1], 10); + ASSERT_EQ(bps[2], 30); + ASSERT_EQ(bps[3], 100); + ASSERT_EQ(bps[4], 500); + } + } +} +CATCH + + +} // namespace tests +} // namespace DM +} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.h b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.h new file mode 100644 index 00000000000..1d349ca7755 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.h @@ -0,0 +1,110 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +namespace DB +{ +namespace DM +{ +namespace tests +{ + +/** + * This is similar to SegmentTestBasic, but is for the DeltaMergeStore. + * It allows you to write tests easier based on the assumption that the PK is either Int or Int encoded in String. + */ +class SimplePKTestBasic : public DB::base::TiFlashStorageTestBasic +{ +public: + void SetUp() override + { + reload(); + } + + void TearDown() override + { + TiFlashStorageTestBasic::TearDown(); + } + +public: + // Lightweight wrappers + + void fill(Int64 start_key, Int64 end_key); + void fillDelete(Int64 start_key, Int64 end_key); + void flush(Int64 start_key, Int64 end_key); + void flush(); + void mergeDelta(Int64 start_key, Int64 end_key); + void mergeDelta(); + void deleteRange(Int64 start_key, Int64 end_key); + size_t getRowsN(); + size_t getRowsN(Int64 start_key, Int64 end_key); + +public: + SegmentPtr getSegmentAt(Int64 key) const; + + /** + * Ensure segments in the store are split at the specified breakpoints. + * This could be used to initialize segments as desired. + */ + void ensureSegmentBreakpoints(const std::vector & breakpoints, bool use_logical_split = false); + + /** + * Returns the breakpoints of all segments in the store. + * + * Example: + * + * Segments | Expected Breakpoints + * -------------------------------------------------- + * [-inf, +inf] | None + * [-inf, 10), [10, +inf) | 10 + * [-inf, 10), [10, 30), [30, +inf) | 10, 30 + */ + std::vector getSegmentBreakpoints() const; + + void debugDumpAllSegments() const; + +protected: + void reload(); + + RowKeyValue buildRowKey(Int64 pk) const; + + RowKeyRange buildRowRange(Int64 start, Int64 end) const; + + std::pair parseRange(const RowKeyRange & range) const; + + Block prepareWriteBlock(Int64 start_key, Int64 end_key, bool is_deleted = false); + +protected: + DeltaMergeStorePtr store; + DMContextPtr dm_context; + + UInt64 version = 0; + + LoggerPtr logger = Logger::get("SimplePKTestBasic"); + LoggerPtr logger_op = Logger::get("SimplePKTestBasicOperations"); + +protected: + // Below are options + bool is_common_handle = false; +}; +} // namespace tests +} // namespace DM +} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_store_background.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_store_background.cpp new file mode 100644 index 00000000000..2164a1c0a43 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_store_background.cpp @@ -0,0 +1,509 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include + +namespace DB +{ +namespace FailPoints +{ +extern const char gc_skip_update_safe_point[]; +extern const char gc_skip_merge_delta[]; +extern const char gc_skip_merge[]; +extern const char skip_check_segment_update[]; +} // namespace FailPoints + +namespace DM +{ +namespace tests +{ + + +class DeltaMergeStoreGCTest + : public SimplePKTestBasic +{ +public: + void SetUp() override + { + FailPointHelper::enableFailPoint(FailPoints::gc_skip_update_safe_point); + FailPointHelper::enableFailPoint(FailPoints::skip_check_segment_update); + SimplePKTestBasic::SetUp(); + global_settings_backup = db_context->getGlobalContext().getSettings(); + } + + void TearDown() override + { + SimplePKTestBasic::TearDown(); + FailPointHelper::disableFailPoint(FailPoints::skip_check_segment_update); + FailPointHelper::disableFailPoint(FailPoints::gc_skip_update_safe_point); + db_context->getGlobalContext().setSettings(global_settings_backup); + } + +protected: + Settings global_settings_backup; +}; + + +class DeltaMergeStoreGCMergeTest : public DeltaMergeStoreGCTest +{ +public: + void SetUp() override + { + FailPointHelper::enableFailPoint(FailPoints::gc_skip_merge_delta); + DeltaMergeStoreGCTest::SetUp(); + } + + void TearDown() override + { + DeltaMergeStoreGCTest::TearDown(); + FailPointHelper::disableFailPoint(FailPoints::gc_skip_merge_delta); + } +}; + +TEST_F(DeltaMergeStoreGCMergeTest, MergeMultipleSegments) +try +{ + ensureSegmentBreakpoints({0, 10, 40, 100}); + ASSERT_EQ(std::vector({0, 10, 40, 100}), getSegmentBreakpoints()); + + auto gc_n = store->onSyncGc(1); + ASSERT_EQ(std::vector{}, getSegmentBreakpoints()); + ASSERT_EQ(gc_n, 1); + ASSERT_EQ(0, getRowsN()); +} +CATCH + + +TEST_F(DeltaMergeStoreGCMergeTest, OnlyMergeSmallSegments) +try +{ + UInt64 gc_n = 0; + + ensureSegmentBreakpoints({0, 50, 100, 150, 200}); + fill(-1000, 1000); + ASSERT_EQ(2000, getRowsN()); + + db_context->getGlobalContext().getSettingsRef().dt_segment_limit_rows = 10; + gc_n = store->onSyncGc(100); + ASSERT_EQ(std::vector({0, 50, 100, 150, 200}), getSegmentBreakpoints()); + ASSERT_EQ(gc_n, 0); + + // In this case, merge two segments will exceed small_segment_rows, so no merge will happen + db_context->getGlobalContext().getSettingsRef().dt_segment_limit_rows = 55 * 3; + gc_n = store->onSyncGc(100); + ASSERT_EQ(std::vector({0, 50, 100, 150, 200}), getSegmentBreakpoints()); + ASSERT_EQ(gc_n, 0); + + // In this case, we will only merge two segments and then stop. + db_context->getGlobalContext().getSettingsRef().dt_segment_limit_rows = 105 * 3; + gc_n = store->onSyncGc(100); + ASSERT_EQ(std::vector({0, 100, 200}), getSegmentBreakpoints()); + ASSERT_EQ(gc_n, 2); + + gc_n = store->onSyncGc(100); + ASSERT_EQ(std::vector({0, 100, 200}), getSegmentBreakpoints()); + ASSERT_EQ(gc_n, 0); + + ASSERT_EQ(200, getRowsN(0, 200)); + ASSERT_EQ(2000, getRowsN()); +} +CATCH + + +TEST_F(DeltaMergeStoreGCMergeTest, MergeAndStop) +try +{ + fill(-1000, 1000); + flush(); + mergeDelta(); + + ensureSegmentBreakpoints({0, 50, 100, 150, 200}); + + // In this case, we will only merge two segments and then stop. + db_context->getGlobalContext().getSettingsRef().dt_segment_limit_rows = 105 * 3; + auto gc_n = store->onSyncGc(1); + ASSERT_EQ(std::vector({0, 100, 150, 200}), getSegmentBreakpoints()); + ASSERT_EQ(gc_n, 1); + + ASSERT_EQ(200, getRowsN(0, 200)); + ASSERT_EQ(2000, getRowsN()); +} +CATCH + + +TEST_F(DeltaMergeStoreGCMergeTest, MergeWhileFlushing) +try +{ + fill(-1000, 1000); + + ensureSegmentBreakpoints({0, 50, 100}); + + // Currently, when there is a flush in progress, the segment merge in GC thread will be blocked. + + auto sp_flush_commit = SyncPointCtl::enableInScope("before_ColumnFileFlushTask::commit"); + auto sp_merge_flush_retry = SyncPointCtl::enableInScope("before_DeltaMergeStore::segmentMerge|retry_flush"); + + auto th_flush = std::async([&]() { + // Flush the first segment that GC will touch with. + flush(-10, 0); + }); + + sp_flush_commit.waitAndPause(); + + auto th_gc = std::async([&]() { + auto gc_n = store->onSyncGc(1); + ASSERT_EQ(gc_n, 1); + ASSERT_EQ(store->segments.size(), 1); + }); + + // Expect merge triggered by GC is retrying... because there is a flush in progress. + sp_merge_flush_retry.waitAndPause(); + + // Finish the flush. + sp_flush_commit.next(); + sp_flush_commit.disable(); + th_flush.wait(); + + // The merge in GC should continue without any further retries. + sp_merge_flush_retry.next(); + th_gc.wait(); +} +CATCH + + +class DeltaMergeStoreGCMergeDeltaTest : public DeltaMergeStoreGCTest +{ +public: + void SetUp() override + { + FailPointHelper::enableFailPoint(FailPoints::gc_skip_merge); + DeltaMergeStoreGCTest::SetUp(); + } + + void TearDown() override + { + DeltaMergeStoreGCTest::TearDown(); + FailPointHelper::disableFailPoint(FailPoints::gc_skip_merge); + } +}; + + +TEST_F(DeltaMergeStoreGCMergeDeltaTest, AfterLogicalSplit) +try +{ + db_context->getSettingsRef().dt_segment_stable_pack_rows = 107; // for mergeDelta + db_context->getGlobalContext().getSettingsRef().dt_segment_stable_pack_rows = 107; // for GC + + auto gc_n = store->onSyncGc(1); + ASSERT_EQ(0, gc_n); + + fill(0, 1000); + flush(); + mergeDelta(); + + gc_n = store->onSyncGc(1); + ASSERT_EQ(0, gc_n); + + // Segments that are just logical splited out should not trigger merge delta at all. + ensureSegmentBreakpoints({500}, /* logical_split */ true); + gc_n = store->onSyncGc(1); + ASSERT_EQ(0, gc_n); + + ASSERT_EQ(2, store->segments.size()); + ASSERT_EQ(1000, getSegmentAt(0)->getStable()->getDMFilesRows()); + ASSERT_EQ(1000, getSegmentAt(500)->getStable()->getDMFilesRows()); + + // Segments that are just logical splited out should not trigger merge delta at all. + ensureSegmentBreakpoints({150, 500}, /* logical_split */ true); + gc_n = store->onSyncGc(1); + ASSERT_EQ(0, gc_n); + + ASSERT_EQ(3, store->segments.size()); + ASSERT_EQ(1000, getSegmentAt(0)->getStable()->getDMFilesRows()); + ASSERT_EQ(1000, getSegmentAt(300)->getStable()->getDMFilesRows()); + ASSERT_EQ(1000, getSegmentAt(600)->getStable()->getDMFilesRows()); + + // merge delta for right most segment and check again + mergeDelta(1000, 1001); + ASSERT_EQ(500, getSegmentAt(600)->getStable()->getDMFilesRows()); + + gc_n = store->onSyncGc(100); + ASSERT_EQ(1, gc_n); + + ASSERT_EQ(3, store->segments.size()); + ASSERT_EQ(1000, getSegmentAt(0)->getStable()->getDMFilesRows()); + ASSERT_EQ(350, getSegmentAt(300)->getStable()->getDMFilesRows()); + ASSERT_EQ(500, getSegmentAt(600)->getStable()->getDMFilesRows()); + + // Trigger GC again, more segments will be merged delta + gc_n = store->onSyncGc(100); + ASSERT_EQ(1, gc_n); + + ASSERT_EQ(3, store->segments.size()); + ASSERT_EQ(150, getSegmentAt(0)->getStable()->getDMFilesRows()); + ASSERT_EQ(350, getSegmentAt(300)->getStable()->getDMFilesRows()); + ASSERT_EQ(500, getSegmentAt(600)->getStable()->getDMFilesRows()); + + // Trigger GC again, no more merge delta. + gc_n = store->onSyncGc(100); + ASSERT_EQ(0, gc_n); + ASSERT_EQ(3, store->segments.size()); +} +CATCH + + +TEST_F(DeltaMergeStoreGCMergeDeltaTest, SegmentContainsPack) +try +{ + ensureSegmentBreakpoints({400, 500}); + fill(410, 450); + flush(); + mergeDelta(); + + auto gc_n = store->onSyncGc(100); + ASSERT_EQ(0, gc_n); +} +CATCH + + +TEST_F(DeltaMergeStoreGCMergeDeltaTest, SegmentExactlyContainsStable) +try +{ + ensureSegmentBreakpoints({400, 500}); + fill(100, 600); + flush(); + mergeDelta(); + + auto gc_n = store->onSyncGc(100); + ASSERT_EQ(0, gc_n); +} +CATCH + + +TEST_F(DeltaMergeStoreGCMergeDeltaTest, NoPacks) +try +{ + db_context->getSettingsRef().dt_segment_stable_pack_rows = 1; + db_context->getGlobalContext().getSettingsRef().dt_segment_stable_pack_rows = 1; + + ensureSegmentBreakpoints({100, 200, 300}); + + auto gc_n = store->onSyncGc(100); + ASSERT_EQ(0, gc_n); +} +CATCH + + +TEST_F(DeltaMergeStoreGCMergeDeltaTest, SegmentContainedByPack) +try +{ + for (auto pack_size : {7, 200}) + { + reload(); + db_context->getSettingsRef().dt_segment_stable_pack_rows = pack_size; // for mergeDelta + db_context->getGlobalContext().getSettingsRef().dt_segment_stable_pack_rows = pack_size; // for GC + + fill(0, 200); + flush(); + mergeDelta(); + + auto pack_n = static_cast(std::ceil(200.0 / static_cast(pack_size))); + EXPECT_EQ(pack_n, getSegmentAt(0)->getStable()->getDMFilesPacks()); + + auto gc_n = store->onSyncGc(100); + EXPECT_EQ(0, gc_n); + + ensureSegmentBreakpoints({10, 190}, /* logical_split */ true); + gc_n = store->onSyncGc(100); + EXPECT_EQ(0, gc_n); + + mergeDelta(0, 1); + mergeDelta(190, 191); + + EXPECT_EQ(10, getSegmentAt(0)->getStable()->getDMFilesRows()); + EXPECT_EQ(10, getSegmentAt(190)->getStable()->getDMFilesRows()); + + EXPECT_EQ(pack_n, getSegmentAt(50)->getStable()->getDMFilesPacks()); + EXPECT_EQ(200, getSegmentAt(50)->getStable()->getDMFilesRows()); + + if (pack_size == 200) + { + // The segment [10, 190) only overlaps with 1 pack and is contained by the pack. + // Even it contains most of the data, it will still be GCed. + gc_n = store->onSyncGc(50); + EXPECT_EQ(1, gc_n); + EXPECT_EQ(1, getSegmentAt(150)->getStable()->getDMFilesPacks()); + EXPECT_EQ(180, getSegmentAt(150)->getStable()->getDMFilesRows()); + + // There should be no more GCs. + gc_n = store->onSyncGc(100); + EXPECT_EQ(0, gc_n); + } + else if (pack_size == 7) + { + // When pack size is small, we will more precisely know that most of the DTFile is still valid. + // So in this case, no GC will happen. + gc_n = store->onSyncGc(50); + EXPECT_EQ(0, gc_n); + } + else + { + FAIL(); + } + } +} +CATCH + + +TEST_F(DeltaMergeStoreGCMergeDeltaTest, SmallReclaimRatioDoesNotMergeDelta) +try +{ + db_context->getSettingsRef().dt_segment_stable_pack_rows = 7; + db_context->getGlobalContext().getSettingsRef().dt_segment_stable_pack_rows = 7; + + fill(0, 400); + flush(); + mergeDelta(); + + auto gc_n = store->onSyncGc(100); + EXPECT_EQ(0, gc_n); + + ensureSegmentBreakpoints({10}, /* logical_split */ true); + gc_n = store->onSyncGc(100); + EXPECT_EQ(0, gc_n); + + mergeDelta(0, 1); + EXPECT_EQ(10, getSegmentAt(0)->getStable()->getDMFilesRows()); + EXPECT_EQ(400, getSegmentAt(150)->getStable()->getDMFilesRows()); + + gc_n = store->onSyncGc(100); + EXPECT_EQ(0, gc_n); + EXPECT_EQ(10, getSegmentAt(0)->getStable()->getDMFilesRows()); + EXPECT_EQ(400, getSegmentAt(150)->getStable()->getDMFilesRows()); +} +CATCH + + +TEST_F(DeltaMergeStoreGCMergeDeltaTest, SimpleBigReclaimRatio) +try +{ + db_context->getSettingsRef().dt_segment_stable_pack_rows = 7; + db_context->getGlobalContext().getSettingsRef().dt_segment_stable_pack_rows = 7; + + fill(0, 400); + flush(); + mergeDelta(); + + auto gc_n = store->onSyncGc(100); + EXPECT_EQ(0, gc_n); + + ensureSegmentBreakpoints({10}, /* logical_split */ true); + gc_n = store->onSyncGc(100); + EXPECT_EQ(0, gc_n); + + mergeDelta(100, 101); + EXPECT_EQ(400, getSegmentAt(0)->getStable()->getDMFilesRows()); + EXPECT_EQ(390, getSegmentAt(150)->getStable()->getDMFilesRows()); + + gc_n = store->onSyncGc(100); + EXPECT_EQ(1, gc_n); + EXPECT_EQ(10, getSegmentAt(0)->getStable()->getDMFilesRows()); + EXPECT_EQ(390, getSegmentAt(150)->getStable()->getDMFilesRows()); + + // GC again does not introduce new changes + gc_n = store->onSyncGc(100); + EXPECT_EQ(0, gc_n); + EXPECT_EQ(10, getSegmentAt(0)->getStable()->getDMFilesRows()); + EXPECT_EQ(390, getSegmentAt(150)->getStable()->getDMFilesRows()); +} +CATCH + + +// This test enables GC merge and GC merge delta. +TEST_F(DeltaMergeStoreGCTest, RandomShuffleLogicalSplitAndDeleteRange) +try +{ + // TODO: Better to be fuzz tests, in order to reach edge cases efficiently. + + std::random_device rd; + std::mt19937 random(rd()); + + for (auto pack_size : {1, 7, 10, 200}) + { + for (size_t random_round = 0; random_round < 10; random_round++) + { + LOG_FMT_INFO(logger, "Run round #{} for pack_size = {}", random_round, pack_size); + + // For each pack_size, we randomize N rounds. We should always expect everything are + // reclaimed in each round. + + reload(); + db_context->getSettingsRef().dt_segment_stable_pack_rows = pack_size; // for mergeDelta + db_context->getGlobalContext().getSettingsRef().dt_segment_stable_pack_rows = pack_size; // for GC + + fill(0, 100); + fill(500, 600); + flush(); + mergeDelta(); + + auto operations = std::vector>{ + [&] { ensureSegmentBreakpoints({50}, true); }, + [&] { ensureSegmentBreakpoints({80}, true); }, + [&] { ensureSegmentBreakpoints({100}, true); }, + [&] { ensureSegmentBreakpoints({300}, true); }, + [&] { ensureSegmentBreakpoints({700}, true); }, + [&] { deleteRange(-10, 30); }, + [&] { deleteRange(30, 70); }, + [&] { deleteRange(70, 100); }, + [&] { deleteRange(400, 500); }, + [&] { deleteRange(500, 600); }, + }; + + std::shuffle(std::begin(operations), std::end(operations), random); + + for (const auto & op : operations) + { + op(); + + // There will be also a change to randomly merge some delta. + auto merge_delta_ops = std::uniform_int_distribution(0, 2)(random); + for (size_t i = 0; i < merge_delta_ops; i++) + { + auto merge_delta_at = std::uniform_int_distribution(0, 700)(random); + mergeDelta(merge_delta_at, merge_delta_at + 1); + } + } + + // Finally, let's do GCs. We should expect everything are reclaimed within 10 rounds of GC. + for (size_t gc_round = 0; gc_round < 10; gc_round++) + store->onSyncGc(100); + + // Check whether we have reclaimed everything + EXPECT_EQ(store->segments.size(), 1); + EXPECT_EQ(getSegmentAt(0)->getStable()->getDMFilesPacks(), 0); + + // No more GCs are needed. + EXPECT_EQ(0, store->onSyncGc(100)); + } + } +} +CATCH + + +} // namespace tests +} // namespace DM +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp index 7c18b32a795..f22cd93a1fc 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp @@ -494,76 +494,6 @@ try CATCH -TEST_F(SegmentOperationTest, GCCheckAfterSegmentLogicalSplit) -try -{ - { - SegmentTestOptions options; - options.db_settings.dt_segment_stable_pack_rows = 100; - reloadWithOptions(options); - } - - auto invalid_data_ratio_threshold = dm_context->db_context.getSettingsRef().dt_bg_gc_delta_delete_ratio_to_trigger_gc; - { - auto segment = segments[DELTA_MERGE_FIRST_SEGMENT_ID]; - auto snap = segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge); - ASSERT_FALSE(GC::shouldCompactStableWithTooMuchDataOutOfSegmentRange(*dm_context, segment, snap, /* prev_seg */ nullptr, /* next_seg */ nullptr, invalid_data_ratio_threshold, log)); - } - - writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 1000); - flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); - mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); - { - auto segment = segments[DELTA_MERGE_FIRST_SEGMENT_ID]; - auto snap = segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge); - ASSERT_FALSE(GC::shouldCompactStableWithTooMuchDataOutOfSegmentRange(*dm_context, segment, snap, /* prev_seg */ nullptr, /* next_seg */ nullptr, invalid_data_ratio_threshold, log)); - } - - auto new_seg_id_opt = splitSegment(DELTA_MERGE_FIRST_SEGMENT_ID, Segment::SplitMode::Logical); - ASSERT_TRUE(new_seg_id_opt.has_value()); - auto left_segment_id = DELTA_MERGE_FIRST_SEGMENT_ID; - auto right_segment_id = new_seg_id_opt.value(); - { - auto left_segment = segments[left_segment_id]; - auto right_segment = segments[right_segment_id]; - auto left_snap = left_segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge); - auto right_snap = right_segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge); - ASSERT_FALSE(GC::shouldCompactStableWithTooMuchDataOutOfSegmentRange(*dm_context, left_segment, left_snap, /* prev_seg */ nullptr, /* next_seg */ right_segment, invalid_data_ratio_threshold, log)); - ASSERT_FALSE(GC::shouldCompactStableWithTooMuchDataOutOfSegmentRange(*dm_context, right_segment, right_snap, /* prev_seg */ left_segment, /* next_seg */ nullptr, invalid_data_ratio_threshold, log)); - } - - auto new_seg_id_opt2 = splitSegment(DELTA_MERGE_FIRST_SEGMENT_ID, Segment::SplitMode::Logical); - ASSERT_TRUE(new_seg_id_opt2.has_value()); - auto middle_segment_id = new_seg_id_opt2.value(); - { - auto left_segment = segments[left_segment_id]; - auto middle_segment = segments[middle_segment_id]; - auto right_segment = segments[right_segment_id]; - auto left_snap = left_segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge); - auto middle_snap = middle_segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge); - auto right_snap = right_segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge); - ASSERT_FALSE(GC::shouldCompactStableWithTooMuchDataOutOfSegmentRange(*dm_context, left_segment, left_snap, /* prev_seg */ nullptr, /* next_seg */ middle_segment, invalid_data_ratio_threshold, log)); - ASSERT_FALSE(GC::shouldCompactStableWithTooMuchDataOutOfSegmentRange(*dm_context, middle_segment, middle_snap, /* prev_seg */ left_segment, /* next_seg */ right_segment, invalid_data_ratio_threshold, log)); - ASSERT_FALSE(GC::shouldCompactStableWithTooMuchDataOutOfSegmentRange(*dm_context, right_segment, right_snap, /* prev_seg */ middle_segment, /* next_seg */ nullptr, invalid_data_ratio_threshold, log)); - } - - // merge delta left segment and check again - mergeSegmentDelta(left_segment_id); - { - auto left_segment = segments[left_segment_id]; - auto middle_segment = segments[middle_segment_id]; - auto right_segment = segments[right_segment_id]; - auto left_snap = left_segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge); - auto middle_snap = middle_segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge); - auto right_snap = right_segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge); - ASSERT_FALSE(GC::shouldCompactStableWithTooMuchDataOutOfSegmentRange(*dm_context, left_segment, left_snap, /* prev_seg */ nullptr, /* next_seg */ middle_segment, invalid_data_ratio_threshold, log)); - ASSERT_TRUE(GC::shouldCompactStableWithTooMuchDataOutOfSegmentRange(*dm_context, middle_segment, middle_snap, /* prev_seg */ left_segment, /* next_seg */ right_segment, invalid_data_ratio_threshold, log)); - ASSERT_FALSE(GC::shouldCompactStableWithTooMuchDataOutOfSegmentRange(*dm_context, right_segment, right_snap, /* prev_seg */ middle_segment, /* next_seg */ nullptr, invalid_data_ratio_threshold, log)); - } -} -CATCH - - TEST_F(SegmentOperationTest, Issue5570) try { diff --git a/libs/libsymbolization/CMakeLists.txt b/libs/libsymbolization/CMakeLists.txt index 78dae44199a..ffd360fc9d4 100644 --- a/libs/libsymbolization/CMakeLists.txt +++ b/libs/libsymbolization/CMakeLists.txt @@ -12,12 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -if (CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG" OR SAN_DEBUG) - set(_SYMBOLIZATION_BUILD_PROFILE "debug") -else() - set(_SYMBOLIZATION_BUILD_PROFILE "release") - set(_SYMBOLIZATION_BUILD_ARGS --release) -endif() +# Debug-mode symbolization is slow. Let's always build a release. The building is fast anyway. +set(_SYMBOLIZATION_BUILD_PROFILE "release") +set(_SYMBOLIZATION_BUILD_ARGS --release) set(_SYMBOLIZATION_SOURCE_DIR "${TiFlash_SOURCE_DIR}/libs/libsymbolization") set(_SYMBOLIZATION_LIBRARY "${CMAKE_CURRENT_BINARY_DIR}/${_SYMBOLIZATION_BUILD_PROFILE}/${CMAKE_STATIC_LIBRARY_PREFIX}symbolization${CMAKE_STATIC_LIBRARY_SUFFIX}") From 61555a4fc845af51c22c339d2358e900dbdb8ee3 Mon Sep 17 00:00:00 2001 From: Wish Date: Sat, 24 Sep 2022 11:44:07 +0800 Subject: [PATCH 4/5] Update segment stats --- dbms/src/Interpreters/AsynchronousMetrics.cpp | 2 +- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 200 --------------- .../src/Storages/DeltaMerge/DeltaMergeStore.h | 53 ++-- .../DeltaMerge/DeltaMergeStore_Statistics.cpp | 239 ++++++++++++++++++ dbms/src/Storages/DeltaMerge/Segment.h | 1 + .../tests/gtest_dm_delta_merge_store.cpp | 2 +- .../tests/gtest_dm_storage_delta_merge.cpp | 12 +- dbms/src/Storages/StorageDeltaMerge.cpp | 4 +- .../System/StorageSystemDTSegments.cpp | 62 +++-- .../Storages/System/StorageSystemDTTables.cpp | 2 +- 10 files changed, 321 insertions(+), 256 deletions(-) create mode 100644 dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp diff --git a/dbms/src/Interpreters/AsynchronousMetrics.cpp b/dbms/src/Interpreters/AsynchronousMetrics.cpp index dfaf5b24943..d416e5a6598 100644 --- a/dbms/src/Interpreters/AsynchronousMetrics.cpp +++ b/dbms/src/Interpreters/AsynchronousMetrics.cpp @@ -179,7 +179,7 @@ void AsynchronousMetrics::update() { if (auto store = dt_storage->getStoreIfInited(); store) { - auto stat = store->getStat(); + auto stat = store->getStoreStats(); calculateMax(max_dt_stable_oldest_snapshot_lifetime, stat.storage_stable_oldest_snapshot_lifetime); calculateMax(max_dt_delta_oldest_snapshot_lifetime, stat.storage_delta_oldest_snapshot_lifetime); calculateMax(max_dt_meta_oldest_snapshot_lifetime, stat.storage_meta_oldest_snapshot_lifetime); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 085d44d89d8..8534322ade8 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1497,206 +1497,6 @@ void DeltaMergeStore::restoreStableFiles() } } -static inline DB::PS::V2::PageEntriesVersionSetWithDelta::Snapshot * -toConcreteSnapshot(const DB::PageStorage::SnapshotPtr & ptr) -{ - return dynamic_cast(ptr.get()); -} - -DeltaMergeStoreStat DeltaMergeStore::getStat() -{ - std::shared_lock lock(read_write_mutex); - - DeltaMergeStoreStat stat; - - if (shutdown_called.load(std::memory_order_relaxed)) - return stat; - - stat.segment_count = segments.size(); - - Int64 total_placed_rows = 0; - Int64 total_delta_cache_rows = 0; - Float64 total_delta_cache_size = 0; - Int64 total_delta_valid_cache_rows = 0; - for (const auto & [handle, segment] : segments) - { - (void)handle; - const auto & delta = segment->getDelta(); - const auto & stable = segment->getStable(); - - total_placed_rows += delta->getPlacedDeltaRows(); - - if (delta->getColumnFileCount()) - { - stat.total_rows += delta->getRows(); - stat.total_size += delta->getBytes(); - - stat.total_delete_ranges += delta->getDeletes(); - - stat.delta_count += 1; - stat.total_pack_count_in_delta += delta->getColumnFileCount(); - - stat.total_delta_rows += delta->getRows(); - stat.total_delta_size += delta->getBytes(); - - stat.delta_index_size += delta->getDeltaIndexBytes(); - - total_delta_cache_rows += delta->getTotalCacheRows(); - total_delta_cache_size += delta->getTotalCacheBytes(); - total_delta_valid_cache_rows += delta->getValidCacheRows(); - } - - if (stable->getDMFilesPacks()) - { - stat.total_rows += stable->getRows(); - stat.total_size += stable->getBytes(); - - stat.stable_count += 1; - stat.total_pack_count_in_stable += stable->getDMFilesPacks(); - - stat.total_stable_rows += stable->getRows(); - stat.total_stable_size += stable->getBytes(); - stat.total_stable_size_on_disk += stable->getDMFilesBytesOnDisk(); - } - } - - stat.delta_rate_rows = static_cast(stat.total_delta_rows) / stat.total_rows; - stat.delta_rate_segments = static_cast(stat.delta_count) / stat.segment_count; - - stat.delta_placed_rate = static_cast(total_placed_rows) / stat.total_delta_rows; - stat.delta_cache_size = total_delta_cache_size; - stat.delta_cache_rate = static_cast(total_delta_valid_cache_rows) / stat.total_delta_rows; - stat.delta_cache_wasted_rate = static_cast(total_delta_cache_rows - total_delta_valid_cache_rows) / total_delta_valid_cache_rows; - - stat.avg_segment_rows = static_cast(stat.total_rows) / stat.segment_count; - stat.avg_segment_size = static_cast(stat.total_size) / stat.segment_count; - - stat.avg_delta_rows = static_cast(stat.total_delta_rows) / stat.delta_count; - stat.avg_delta_size = static_cast(stat.total_delta_size) / stat.delta_count; - stat.avg_delta_delete_ranges = static_cast(stat.total_delete_ranges) / stat.delta_count; - - stat.avg_stable_rows = static_cast(stat.total_stable_rows) / stat.stable_count; - stat.avg_stable_size = static_cast(stat.total_stable_size) / stat.stable_count; - - stat.avg_pack_count_in_delta = static_cast(stat.total_pack_count_in_delta) / stat.delta_count; - stat.avg_pack_rows_in_delta = static_cast(stat.total_delta_rows) / stat.total_pack_count_in_delta; - stat.avg_pack_size_in_delta = static_cast(stat.total_delta_size) / stat.total_pack_count_in_delta; - - stat.avg_pack_count_in_stable = static_cast(stat.total_pack_count_in_stable) / stat.stable_count; - stat.avg_pack_rows_in_stable = static_cast(stat.total_stable_rows) / stat.total_pack_count_in_stable; - stat.avg_pack_size_in_stable = static_cast(stat.total_stable_size) / stat.total_pack_count_in_stable; - - static const String useless_tracing_id("DeltaMergeStore::getStat"); - { - auto snaps_stat = storage_pool->dataReader()->getSnapshotsStat(); - stat.storage_stable_num_snapshots = snaps_stat.num_snapshots; - stat.storage_stable_oldest_snapshot_lifetime = snaps_stat.longest_living_seconds; - stat.storage_stable_oldest_snapshot_thread_id = snaps_stat.longest_living_from_thread_id; - stat.storage_stable_oldest_snapshot_tracing_id = snaps_stat.longest_living_from_tracing_id; - PageStorage::SnapshotPtr stable_snapshot = storage_pool->dataReader()->getSnapshot(useless_tracing_id); - const auto * concrete_snap = toConcreteSnapshot(stable_snapshot); - if (concrete_snap) - { - if (const auto * const version = concrete_snap->version(); version != nullptr) - { - stat.storage_stable_num_pages = version->numPages(); - stat.storage_stable_num_normal_pages = version->numNormalPages(); - stat.storage_stable_max_page_id = version->maxId(); - } - else - { - LOG_FMT_ERROR(log, "Can't get any version from current snapshot, type=data"); - } - } - } - { - auto snaps_stat = storage_pool->logReader()->getSnapshotsStat(); - stat.storage_delta_num_snapshots = snaps_stat.num_snapshots; - stat.storage_delta_oldest_snapshot_lifetime = snaps_stat.longest_living_seconds; - stat.storage_delta_oldest_snapshot_thread_id = snaps_stat.longest_living_from_thread_id; - stat.storage_delta_oldest_snapshot_tracing_id = snaps_stat.longest_living_from_tracing_id; - PageStorage::SnapshotPtr log_snapshot = storage_pool->logReader()->getSnapshot(useless_tracing_id); - const auto * concrete_snap = toConcreteSnapshot(log_snapshot); - if (concrete_snap) - { - if (const auto * const version = concrete_snap->version(); version != nullptr) - { - stat.storage_delta_num_pages = version->numPages(); - stat.storage_delta_num_normal_pages = version->numNormalPages(); - stat.storage_delta_max_page_id = version->maxId(); - } - else - { - LOG_FMT_ERROR(log, "Can't get any version from current snapshot, type=log"); - } - } - } - { - auto snaps_stat = storage_pool->metaReader()->getSnapshotsStat(); - stat.storage_meta_num_snapshots = snaps_stat.num_snapshots; - stat.storage_meta_oldest_snapshot_lifetime = snaps_stat.longest_living_seconds; - stat.storage_meta_oldest_snapshot_thread_id = snaps_stat.longest_living_from_thread_id; - stat.storage_meta_oldest_snapshot_tracing_id = snaps_stat.longest_living_from_tracing_id; - PageStorage::SnapshotPtr meta_snapshot = storage_pool->metaReader()->getSnapshot(useless_tracing_id); - const auto * concrete_snap = toConcreteSnapshot(meta_snapshot); - if (concrete_snap) - { - if (const auto * const version = concrete_snap->version(); version != nullptr) - { - stat.storage_meta_num_pages = version->numPages(); - stat.storage_meta_num_normal_pages = version->numNormalPages(); - stat.storage_meta_max_page_id = version->maxId(); - } - else - { - LOG_FMT_ERROR(log, "Can't get any version from current snapshot, type=meta"); - } - } - } - - stat.background_tasks_length = background_tasks.length(); - - return stat; -} - -SegmentStats DeltaMergeStore::getSegmentStats() -{ - std::shared_lock lock(read_write_mutex); - - SegmentStats stats; - for (const auto & [handle, segment] : segments) - { - (void)handle; - - SegmentStat stat; - const auto & delta = segment->getDelta(); - const auto & stable = segment->getStable(); - - stat.segment_id = segment->segmentId(); - stat.range = segment->getRowKeyRange(); - - stat.rows = segment->getEstimatedRows(); - stat.size = delta->getBytes() + stable->getBytes(); - stat.delete_ranges = delta->getDeletes(); - - stat.stable_size_on_disk = stable->getDMFilesBytesOnDisk(); - - stat.delta_pack_count = delta->getColumnFileCount(); - stat.stable_pack_count = stable->getDMFilesPacks(); - - stat.avg_delta_pack_rows = static_cast(delta->getRows()) / stat.delta_pack_count; - stat.avg_stable_pack_rows = static_cast(stable->getRows()) / stat.stable_pack_count; - - stat.delta_rate = static_cast(delta->getRows()) / stat.rows; - stat.delta_cache_size = delta->getTotalCacheBytes(); - - stat.delta_index_size = delta->getDeltaIndexBytes(); - - stats.push_back(stat); - } - return stats; -} - SegmentReadTasks DeltaMergeStore::getReadTasksByRanges( DMContext & dm_context, const RowKeyRanges & sorted_ranges, diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index f2f6b829874..ffacb52b6b8 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -52,31 +52,40 @@ struct ExternalDTFileInfo; inline static const PageId DELTA_MERGE_FIRST_SEGMENT_ID = 1; -struct SegmentStat +struct SegmentStats { UInt64 segment_id = 0; RowKeyRange range; - + UInt64 epoch = 0; UInt64 rows = 0; UInt64 size = 0; - UInt64 delete_ranges = 0; - - UInt64 stable_size_on_disk = 0; - - UInt64 delta_pack_count = 0; - UInt64 stable_pack_count = 0; - - Float64 avg_delta_pack_rows = 0; - Float64 avg_stable_pack_rows = 0; Float64 delta_rate = 0; + UInt64 delta_memtable_rows = 0; + UInt64 delta_memtable_size = 0; + UInt64 delta_memtable_column_files = 0; + UInt64 delta_memtable_delete_ranges = 0; + UInt64 delta_persisted_page_id = 0; + UInt64 delta_persisted_rows = 0; + UInt64 delta_persisted_size = 0; + UInt64 delta_persisted_column_files = 0; + UInt64 delta_persisted_delete_ranges = 0; UInt64 delta_cache_size = 0; - UInt64 delta_index_size = 0; + + UInt64 stable_page_id = 0; + UInt64 stable_rows = 0; + UInt64 stable_size = 0; + UInt64 stable_dmfiles = 0; + UInt64 stable_dmfiles_id_0 = 0; + UInt64 stable_dmfiles_rows = 0; + UInt64 stable_dmfiles_size = 0; + UInt64 stable_dmfiles_size_on_disk = 0; + UInt64 stable_dmfiles_packs = 0; }; -using SegmentStats = std::vector; +using SegmentsStats = std::vector; -struct DeltaMergeStoreStat +struct StoreStats { UInt64 segment_count = 0; @@ -385,16 +394,12 @@ class DeltaMergeStore : private boost::noncopyable SortDescription getPrimarySortDescription() const; void check(const Context & db_context); - DeltaMergeStoreStat getStat(); - SegmentStats getSegmentStats(); - bool isCommonHandle() const - { - return is_common_handle; - } - size_t getRowKeyColumnSize() const - { - return rowkey_column_size; - } + + StoreStats getStoreStats(); + SegmentsStats getSegmentsStats(); + + bool isCommonHandle() const { return is_common_handle; } + size_t getRowKeyColumnSize() const { return rowkey_column_size; } public: /// Methods mainly used by region split. diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp new file mode 100644 index 00000000000..8209ed1b444 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp @@ -0,0 +1,239 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +namespace DB +{ +namespace DM +{ + +static inline DB::PS::V2::PageEntriesVersionSetWithDelta::Snapshot * +toConcreteSnapshot(const DB::PageStorage::SnapshotPtr & ptr) +{ + return dynamic_cast(ptr.get()); +} + +StoreStats DeltaMergeStore::getStoreStats() +{ + std::shared_lock lock(read_write_mutex); + + StoreStats stat; + + if (shutdown_called.load(std::memory_order_relaxed)) + return stat; + + stat.segment_count = segments.size(); + + Int64 total_placed_rows = 0; + Int64 total_delta_cache_rows = 0; + Float64 total_delta_cache_size = 0; + Int64 total_delta_valid_cache_rows = 0; + for (const auto & [handle, segment] : segments) + { + (void)handle; + const auto & delta = segment->getDelta(); + const auto & stable = segment->getStable(); + + total_placed_rows += delta->getPlacedDeltaRows(); + + if (delta->getColumnFileCount()) + { + stat.total_rows += delta->getRows(); + stat.total_size += delta->getBytes(); + + stat.total_delete_ranges += delta->getDeletes(); + + stat.delta_count += 1; + stat.total_pack_count_in_delta += delta->getColumnFileCount(); + + stat.total_delta_rows += delta->getRows(); + stat.total_delta_size += delta->getBytes(); + + stat.delta_index_size += delta->getDeltaIndexBytes(); + + total_delta_cache_rows += delta->getTotalCacheRows(); + total_delta_cache_size += delta->getTotalCacheBytes(); + total_delta_valid_cache_rows += delta->getValidCacheRows(); + } + + if (stable->getDMFilesPacks()) + { + stat.total_rows += stable->getRows(); + stat.total_size += stable->getBytes(); + + stat.stable_count += 1; + stat.total_pack_count_in_stable += stable->getDMFilesPacks(); + + stat.total_stable_rows += stable->getRows(); + stat.total_stable_size += stable->getBytes(); + stat.total_stable_size_on_disk += stable->getDMFilesBytesOnDisk(); + } + } + + stat.delta_rate_rows = static_cast(stat.total_delta_rows) / stat.total_rows; + stat.delta_rate_segments = static_cast(stat.delta_count) / stat.segment_count; + + stat.delta_placed_rate = static_cast(total_placed_rows) / stat.total_delta_rows; + stat.delta_cache_size = total_delta_cache_size; + stat.delta_cache_rate = static_cast(total_delta_valid_cache_rows) / stat.total_delta_rows; + stat.delta_cache_wasted_rate = static_cast(total_delta_cache_rows - total_delta_valid_cache_rows) / total_delta_valid_cache_rows; + + stat.avg_segment_rows = static_cast(stat.total_rows) / stat.segment_count; + stat.avg_segment_size = static_cast(stat.total_size) / stat.segment_count; + + stat.avg_delta_rows = static_cast(stat.total_delta_rows) / stat.delta_count; + stat.avg_delta_size = static_cast(stat.total_delta_size) / stat.delta_count; + stat.avg_delta_delete_ranges = static_cast(stat.total_delete_ranges) / stat.delta_count; + + stat.avg_stable_rows = static_cast(stat.total_stable_rows) / stat.stable_count; + stat.avg_stable_size = static_cast(stat.total_stable_size) / stat.stable_count; + + stat.avg_pack_count_in_delta = static_cast(stat.total_pack_count_in_delta) / stat.delta_count; + stat.avg_pack_rows_in_delta = static_cast(stat.total_delta_rows) / stat.total_pack_count_in_delta; + stat.avg_pack_size_in_delta = static_cast(stat.total_delta_size) / stat.total_pack_count_in_delta; + + stat.avg_pack_count_in_stable = static_cast(stat.total_pack_count_in_stable) / stat.stable_count; + stat.avg_pack_rows_in_stable = static_cast(stat.total_stable_rows) / stat.total_pack_count_in_stable; + stat.avg_pack_size_in_stable = static_cast(stat.total_stable_size) / stat.total_pack_count_in_stable; + + static const String useless_tracing_id("DeltaMergeStore::getStat"); + { + auto snaps_stat = storage_pool->dataReader()->getSnapshotsStat(); + stat.storage_stable_num_snapshots = snaps_stat.num_snapshots; + stat.storage_stable_oldest_snapshot_lifetime = snaps_stat.longest_living_seconds; + stat.storage_stable_oldest_snapshot_thread_id = snaps_stat.longest_living_from_thread_id; + stat.storage_stable_oldest_snapshot_tracing_id = snaps_stat.longest_living_from_tracing_id; + PageStorage::SnapshotPtr stable_snapshot = storage_pool->dataReader()->getSnapshot(useless_tracing_id); + const auto * concrete_snap = toConcreteSnapshot(stable_snapshot); + if (concrete_snap) + { + if (const auto * const version = concrete_snap->version(); version != nullptr) + { + stat.storage_stable_num_pages = version->numPages(); + stat.storage_stable_num_normal_pages = version->numNormalPages(); + stat.storage_stable_max_page_id = version->maxId(); + } + else + { + LOG_FMT_ERROR(log, "Can't get any version from current snapshot, type=data"); + } + } + } + { + auto snaps_stat = storage_pool->logReader()->getSnapshotsStat(); + stat.storage_delta_num_snapshots = snaps_stat.num_snapshots; + stat.storage_delta_oldest_snapshot_lifetime = snaps_stat.longest_living_seconds; + stat.storage_delta_oldest_snapshot_thread_id = snaps_stat.longest_living_from_thread_id; + stat.storage_delta_oldest_snapshot_tracing_id = snaps_stat.longest_living_from_tracing_id; + PageStorage::SnapshotPtr log_snapshot = storage_pool->logReader()->getSnapshot(useless_tracing_id); + const auto * concrete_snap = toConcreteSnapshot(log_snapshot); + if (concrete_snap) + { + if (const auto * const version = concrete_snap->version(); version != nullptr) + { + stat.storage_delta_num_pages = version->numPages(); + stat.storage_delta_num_normal_pages = version->numNormalPages(); + stat.storage_delta_max_page_id = version->maxId(); + } + else + { + LOG_FMT_ERROR(log, "Can't get any version from current snapshot, type=log"); + } + } + } + { + auto snaps_stat = storage_pool->metaReader()->getSnapshotsStat(); + stat.storage_meta_num_snapshots = snaps_stat.num_snapshots; + stat.storage_meta_oldest_snapshot_lifetime = snaps_stat.longest_living_seconds; + stat.storage_meta_oldest_snapshot_thread_id = snaps_stat.longest_living_from_thread_id; + stat.storage_meta_oldest_snapshot_tracing_id = snaps_stat.longest_living_from_tracing_id; + PageStorage::SnapshotPtr meta_snapshot = storage_pool->metaReader()->getSnapshot(useless_tracing_id); + const auto * concrete_snap = toConcreteSnapshot(meta_snapshot); + if (concrete_snap) + { + if (const auto * const version = concrete_snap->version(); version != nullptr) + { + stat.storage_meta_num_pages = version->numPages(); + stat.storage_meta_num_normal_pages = version->numNormalPages(); + stat.storage_meta_max_page_id = version->maxId(); + } + else + { + LOG_FMT_ERROR(log, "Can't get any version from current snapshot, type=meta"); + } + } + } + + stat.background_tasks_length = background_tasks.length(); + + return stat; +} + +SegmentsStats DeltaMergeStore::getSegmentsStats() +{ + std::shared_lock lock(read_write_mutex); + + SegmentsStats stats; + for (const auto & [handle, segment] : segments) + { + UNUSED(handle); + + SegmentStats stat; + const auto & delta = segment->getDelta(); + const auto & delta_memtable = delta->getMemTableSet(); + const auto & delta_persisted = delta->getPersistedFileSet(); + const auto & stable = segment->getStable(); + + stat.segment_id = segment->segmentId(); + stat.range = segment->getRowKeyRange(); + stat.epoch = segment->segmentEpoch(); + stat.rows = segment->getEstimatedRows(); + stat.size = segment->getEstimatedBytes(); + + stat.delta_rate = static_cast(delta->getRows()) / stat.rows; + stat.delta_memtable_rows = delta_memtable->getRows(); + stat.delta_memtable_size = delta_memtable->getBytes(); + stat.delta_memtable_column_files = delta_memtable->getColumnFileCount(); + stat.delta_memtable_delete_ranges = delta_memtable->getDeletes(); + stat.delta_persisted_page_id = delta_persisted->getId(); + stat.delta_persisted_rows = delta_persisted->getRows(); + stat.delta_persisted_size = delta_persisted->getBytes(); + stat.delta_persisted_column_files = delta_persisted->getColumnFileCount(); + stat.delta_persisted_delete_ranges = delta_persisted->getDeletes(); + stat.delta_cache_size = delta->getTotalCacheBytes(); + stat.delta_index_size = delta->getDeltaIndexBytes(); + + stat.stable_page_id = stable->getId(); + stat.stable_rows = stable->getRows(); + stat.stable_size = stable->getBytes(); + stat.stable_dmfiles = stable->getDMFiles().size(); + if (stat.stable_dmfiles > 0) + stat.stable_dmfiles_id_0 = stable->getDMFiles().front()->fileId(); + stat.stable_dmfiles_rows = stable->getDMFilesRows(); + stat.stable_dmfiles_size = stable->getDMFilesBytes(); + stat.stable_dmfiles_size_on_disk = stable->getDMFilesBytesOnDisk(); + stat.stable_dmfiles_packs = stable->getDMFilesPacks(); + + stats.emplace_back(stat); + } + return stats; +} + + +} // namespace DM +} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index 1bda20c8bf4..bd24a2d422b 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -373,6 +373,7 @@ class Segment : private boost::noncopyable PageId segmentId() const { return segment_id; } PageId nextSegmentId() const { return next_segment_id; } + UInt64 segmentEpoch() const { return epoch; }; void check(DMContext & dm_context, const String & when) const; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index edb34cea2b4..7f72be0153b 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -2953,7 +2953,7 @@ try store->flushCache(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); num_rows_write_in_total += num_rows_per_write; - auto segment_stats = store->getSegmentStats(); + auto segment_stats = store->getSegmentsStats(); size_t delta_cache_size = 0; for (auto & stat : segment_stats) { diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp index 4406cf06289..77b629228ec 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp @@ -144,7 +144,7 @@ try } auto delta_store = storage->getStore(); size_t total_segment_rows = 0; - auto segment_stats = delta_store->getSegmentStats(); + auto segment_stats = delta_store->getSegmentsStats(); for (auto & stat : segment_stats) { total_segment_rows += stat.rows; @@ -772,11 +772,11 @@ try { write_data(num_rows_write, 1000); num_rows_write += 1000; - if (storage->getStore()->getSegmentStats().size() > 1) + if (storage->getStore()->getSegmentsStats().size() > 1) break; } { - ASSERT_GT(storage->getStore()->getSegmentStats().size(), 1); + ASSERT_GT(storage->getStore()->getSegmentsStats().size(), 1); ASSERT_EQ(read_data(), num_rows_write); } storage->flushCache(ctx); @@ -793,13 +793,13 @@ try // write more data make sure segments more than 1 for (size_t i = 0; i < 100000; i++) { - if (storage->getStore()->getSegmentStats().size() > 1) + if (storage->getStore()->getSegmentsStats().size() > 1) break; write_data(num_rows_write, 1000); num_rows_write += 1000; } { - ASSERT_GT(storage->getStore()->getSegmentStats().size(), 1); + ASSERT_GT(storage->getStore()->getSegmentsStats().size(), 1); ASSERT_EQ(read_data(), num_rows_write); } storage->flushCache(ctx); @@ -819,7 +819,7 @@ try // restore the table and make sure there is just one segment left create_table(); { - ASSERT_EQ(storage->getStore()->getSegmentStats().size(), 1); + ASSERT_EQ(storage->getStore()->getSegmentsStats().size(), 1); ASSERT_LT(read_data(), num_rows_write); } storage->drop(); diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index ef1b74e7679..c3611fb9884 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -1362,10 +1362,10 @@ BlockInputStreamPtr StorageDeltaMerge::status() auto & name_col = columns[0]; auto & value_col = columns[1]; - DeltaMergeStoreStat stat; + StoreStats stat; if (storeInited()) { - stat = _store->getStat(); + stat = _store->getStoreStats(); } #define INSERT_INT(NAME) \ diff --git a/dbms/src/Storages/System/StorageSystemDTSegments.cpp b/dbms/src/Storages/System/StorageSystemDTSegments.cpp index f84a19a005c..77dd55e0b1f 100644 --- a/dbms/src/Storages/System/StorageSystemDTSegments.cpp +++ b/dbms/src/Storages/System/StorageSystemDTSegments.cpp @@ -42,22 +42,32 @@ StorageSystemDTSegments::StorageSystemDTSegments(const std::string & name_) {"segment_id", std::make_shared()}, {"range", std::make_shared()}, - + {"epoch", std::make_shared()}, {"rows", std::make_shared()}, {"size", std::make_shared()}, - {"delete_ranges", std::make_shared()}, - - {"stable_size_on_disk", std::make_shared()}, - - {"delta_pack_count", std::make_shared()}, - {"stable_pack_count", std::make_shared()}, - - {"avg_delta_pack_rows", std::make_shared()}, - {"avg_stable_pack_rows", std::make_shared()}, {"delta_rate", std::make_shared()}, + {"delta_memtable_rows", std::make_shared()}, + {"delta_memtable_size", std::make_shared()}, + {"delta_memtable_column_files", std::make_shared()}, + {"delta_memtable_delete_ranges", std::make_shared()}, + {"delta_persisted_page_id", std::make_shared()}, + {"delta_persisted_rows", std::make_shared()}, + {"delta_persisted_size", std::make_shared()}, + {"delta_persisted_column_files", std::make_shared()}, + {"delta_persisted_delete_ranges", std::make_shared()}, {"delta_cache_size", std::make_shared()}, {"delta_index_size", std::make_shared()}, + + {"stable_page_id", std::make_shared()}, + {"stable_rows", std::make_shared()}, + {"stable_size", std::make_shared()}, + {"stable_dmfiles", std::make_shared()}, + {"stable_dmfiles_id_0", std::make_shared()}, + {"stable_dmfiles_rows", std::make_shared()}, + {"stable_dmfiles_size", std::make_shared()}, + {"stable_dmfiles_size_on_disk", std::make_shared()}, + {"stable_dmfiles_packs", std::make_shared()}, })); } @@ -93,7 +103,7 @@ BlockInputStreams StorageSystemDTSegments::read(const Names & column_names, auto dm_storage = std::dynamic_pointer_cast(storage); const auto & table_info = dm_storage->getTableInfo(); auto table_id = table_info.id; - auto segment_stats = dm_storage->getStore()->getSegmentStats(); + auto segment_stats = dm_storage->getStore()->getSegmentsStats(); for (auto & stat : segment_stats) { size_t j = 0; @@ -111,22 +121,32 @@ BlockInputStreams StorageSystemDTSegments::read(const Names & column_names, res_columns[j++]->insert(stat.segment_id); res_columns[j++]->insert(stat.range.toString()); + res_columns[j++]->insert(stat.epoch); res_columns[j++]->insert(stat.rows); res_columns[j++]->insert(stat.size); - res_columns[j++]->insert(stat.delete_ranges); - - res_columns[j++]->insert(stat.stable_size_on_disk); - - res_columns[j++]->insert(stat.delta_pack_count); - res_columns[j++]->insert(stat.stable_pack_count); - - res_columns[j++]->insert(stat.avg_delta_pack_rows); - res_columns[j++]->insert(stat.avg_stable_pack_rows); res_columns[j++]->insert(stat.delta_rate); + res_columns[j++]->insert(stat.delta_memtable_rows); + res_columns[j++]->insert(stat.delta_memtable_size); + res_columns[j++]->insert(stat.delta_memtable_column_files); + res_columns[j++]->insert(stat.delta_memtable_delete_ranges); + res_columns[j++]->insert(stat.delta_persisted_page_id); + res_columns[j++]->insert(stat.delta_persisted_rows); + res_columns[j++]->insert(stat.delta_persisted_size); + res_columns[j++]->insert(stat.delta_persisted_column_files); + res_columns[j++]->insert(stat.delta_persisted_delete_ranges); res_columns[j++]->insert(stat.delta_cache_size); - res_columns[j++]->insert(stat.delta_index_size); + + res_columns[j++]->insert(stat.stable_page_id); + res_columns[j++]->insert(stat.stable_rows); + res_columns[j++]->insert(stat.stable_size); + res_columns[j++]->insert(stat.stable_dmfiles); + res_columns[j++]->insert(stat.stable_dmfiles_id_0); + res_columns[j++]->insert(stat.stable_dmfiles_rows); + res_columns[j++]->insert(stat.stable_dmfiles_size); + res_columns[j++]->insert(stat.stable_dmfiles_size_on_disk); + res_columns[j++]->insert(stat.stable_dmfiles_packs); } } } diff --git a/dbms/src/Storages/System/StorageSystemDTTables.cpp b/dbms/src/Storages/System/StorageSystemDTTables.cpp index b700cfb5324..3e1c37db593 100644 --- a/dbms/src/Storages/System/StorageSystemDTTables.cpp +++ b/dbms/src/Storages/System/StorageSystemDTTables.cpp @@ -145,7 +145,7 @@ BlockInputStreams StorageSystemDTTables::read( auto dm_storage = std::dynamic_pointer_cast(storage); const auto & table_info = dm_storage->getTableInfo(); auto table_id = table_info.id; - auto stat = dm_storage->getStore()->getStat(); + auto stat = dm_storage->getStore()->getStoreStats(); size_t j = 0; res_columns[j++]->insert(database_name); From b8f3badf1a6b34e2186e125902ba79fe15912926 Mon Sep 17 00:00:00 2001 From: Wish Date: Wed, 5 Oct 2022 15:37:43 +0800 Subject: [PATCH 5/5] Address comments Signed-off-by: Wish --- .../DeltaMerge/DeltaMergeStore_Statistics.cpp | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp index 8209ed1b444..507cd539ba2 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp @@ -22,7 +22,7 @@ namespace DB namespace DM { -static inline DB::PS::V2::PageEntriesVersionSetWithDelta::Snapshot * +static inline auto * toConcreteSnapshot(const DB::PageStorage::SnapshotPtr & ptr) { return dynamic_cast(ptr.get()); @@ -45,7 +45,7 @@ StoreStats DeltaMergeStore::getStoreStats() Int64 total_delta_valid_cache_rows = 0; for (const auto & [handle, segment] : segments) { - (void)handle; + UNUSED(handle); const auto & delta = segment->getDelta(); const auto & stable = segment->getStable(); @@ -119,8 +119,7 @@ StoreStats DeltaMergeStore::getStoreStats() stat.storage_stable_oldest_snapshot_thread_id = snaps_stat.longest_living_from_thread_id; stat.storage_stable_oldest_snapshot_tracing_id = snaps_stat.longest_living_from_tracing_id; PageStorage::SnapshotPtr stable_snapshot = storage_pool->dataReader()->getSnapshot(useless_tracing_id); - const auto * concrete_snap = toConcreteSnapshot(stable_snapshot); - if (concrete_snap) + if (const auto * concrete_snap = toConcreteSnapshot(stable_snapshot); concrete_snap != nullptr) { if (const auto * const version = concrete_snap->version(); version != nullptr) { @@ -130,7 +129,7 @@ StoreStats DeltaMergeStore::getStoreStats() } else { - LOG_FMT_ERROR(log, "Can't get any version from current snapshot, type=data"); + LOG_ERROR(log, "Can't get any version from current snapshot, type=data"); } } } @@ -141,8 +140,7 @@ StoreStats DeltaMergeStore::getStoreStats() stat.storage_delta_oldest_snapshot_thread_id = snaps_stat.longest_living_from_thread_id; stat.storage_delta_oldest_snapshot_tracing_id = snaps_stat.longest_living_from_tracing_id; PageStorage::SnapshotPtr log_snapshot = storage_pool->logReader()->getSnapshot(useless_tracing_id); - const auto * concrete_snap = toConcreteSnapshot(log_snapshot); - if (concrete_snap) + if (const auto * concrete_snap = toConcreteSnapshot(log_snapshot); concrete_snap != nullptr) { if (const auto * const version = concrete_snap->version(); version != nullptr) { @@ -152,7 +150,7 @@ StoreStats DeltaMergeStore::getStoreStats() } else { - LOG_FMT_ERROR(log, "Can't get any version from current snapshot, type=log"); + LOG_ERROR(log, "Can't get any version from current snapshot, type=log"); } } } @@ -163,8 +161,7 @@ StoreStats DeltaMergeStore::getStoreStats() stat.storage_meta_oldest_snapshot_thread_id = snaps_stat.longest_living_from_thread_id; stat.storage_meta_oldest_snapshot_tracing_id = snaps_stat.longest_living_from_tracing_id; PageStorage::SnapshotPtr meta_snapshot = storage_pool->metaReader()->getSnapshot(useless_tracing_id); - const auto * concrete_snap = toConcreteSnapshot(meta_snapshot); - if (concrete_snap) + if (const auto * concrete_snap = toConcreteSnapshot(meta_snapshot); concrete_snap != nullptr) { if (const auto * const version = concrete_snap->version(); version != nullptr) { @@ -174,7 +171,7 @@ StoreStats DeltaMergeStore::getStoreStats() } else { - LOG_FMT_ERROR(log, "Can't get any version from current snapshot, type=meta"); + LOG_ERROR(log, "Can't get any version from current snapshot, type=meta"); } } }