Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix the problem that there may be some obsolete data left in storage which cannot be deleted (#5660) #5678

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ ColumnFileBig * ColumnFile::tryToBigFile()
return !isBigFile() ? nullptr : static_cast<ColumnFileBig *>(this);
}

ColumnFilePersisted * ColumnFile::tryToColumnFilePersisted()
{
return !isPersisted() ? nullptr : static_cast<ColumnFilePersisted *>(this);
}

template <class T>
String columnFilesToString(const T & column_files)
{
Expand Down
13 changes: 9 additions & 4 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class ColumnFileInMemory;
class ColumnFileTiny;
class ColumnFileDeleteRange;
class ColumnFileBig;
class ColumnFilePersisted;
class ColumnFileReader;
using ColumnFileReaderPtr = std::shared_ptr<ColumnFileReader>;

Expand Down Expand Up @@ -97,20 +98,24 @@ class ColumnFile

virtual Type getType() const = 0;

/// Is a ColumnInMemoryFile or not.
/// Is a ColumnFileInMemory or not.
bool isInMemoryFile() const { return getType() == Type::INMEMORY_FILE; }
/// Is a ColumnTinyFile or not.
/// Is a ColumnFileTiny or not.
bool isTinyFile() const { return getType() == Type::TINY_FILE; }
/// Is a ColumnDeleteRangeFile or not.
/// Is a ColumnFileDeleteRange or not.
bool isDeleteRange() const { return getType() == Type::DELETE_RANGE; };
/// Is a ColumnBigFile or not.
/// Is a ColumnFileBig or not.
bool isBigFile() const { return getType() == Type::BIG_FILE; };
/// Is a ColumnFilePersisted or not
bool isPersisted() const { return getType() != Type::INMEMORY_FILE; };

ColumnFileInMemory * tryToInMemoryFile();
ColumnFileTiny * tryToTinyFile();
ColumnFileDeleteRange * tryToDeleteRange();
ColumnFileBig * tryToBigFile();

ColumnFilePersisted * tryToColumnFilePersisted();

virtual ColumnFileReaderPtr
getReader(const DMContext & context, const StorageSnapshotPtr & storage_snap, const ColumnDefinesPtr & col_defs) const = 0;

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ size_t DeltaValueSpace::getValidCacheRows() const
void DeltaValueSpace::recordRemoveColumnFilesPages(WriteBatches & wbs) const
{
persisted_file_set->recordRemoveColumnFilesPages(wbs);
// there could be some persisted column files in the `mem_table_set` which should be removed.
mem_table_set->recordRemoveColumnFilesPages(wbs);
}

bool DeltaValueSpace::appendColumnFile(DMContext & /*context*/, const ColumnFilePtr & column_file)
Expand Down
11 changes: 11 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,17 @@ ColumnFiles MemTableSet::cloneColumnFiles(DMContext & context, const RowKeyRange
return cloned_column_files;
}

void MemTableSet::recordRemoveColumnFilesPages(WriteBatches & wbs) const
{
for (const auto & column_file : column_files)
{
if (auto * p = column_file->tryToColumnFilePersisted(); p)
{
p->removeData(wbs);
}
}
}

void MemTableSet::appendColumnFile(const ColumnFilePtr & column_file)
{
appendColumnFileInner(column_file);
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class MemTableSet : public std::enable_shared_from_this<MemTableSet>

ColumnFiles cloneColumnFiles(DMContext & context, const RowKeyRange & target_range, WriteBatches & wbs);

void recordRemoveColumnFilesPages(WriteBatches & wbs) const;

/// The following methods returning false means this operation failed, caused by other threads could have done
/// some updates on this instance. E.g. this instance have been abandoned.
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,8 @@ SegmentPtr Segment::mergeDelta(DMContext & dm_context, const ColumnDefinesPtr &
wbs.writeLogAndData();
new_stable->enableDMFilesGC();

SYNC_FOR("before_Segment::applyMergeDelta"); // pause without holding the lock on the segment

auto lock = mustGetUpdateLock();
auto new_segment = applyMergeDelta(dm_context, segment_snap, wbs, new_stable);

Expand Down Expand Up @@ -702,6 +704,8 @@ SegmentPair Segment::split(DMContext & dm_context, const ColumnDefinesPtr & sche
split_info.my_stable->enableDMFilesGC();
split_info.other_stable->enableDMFilesGC();

SYNC_FOR("before_Segment::applySplit"); // pause without holding the lock on the segment

auto lock = mustGetUpdateLock();
auto segment_pair = applySplit(dm_context, segment_snap, wbs, split_info);

Expand Down Expand Up @@ -1199,6 +1203,8 @@ SegmentPtr Segment::merge(DMContext & dm_context, const ColumnDefinesPtr & schem
wbs.writeLogAndData();
merged_stable->enableDMFilesGC();

SYNC_FOR("before_Segment::applyMerge"); // pause without holding the lock on segments to be merged

auto left_lock = left->mustGetUpdateLock();
auto right_lock = right->mustGetUpdateLock();

Expand Down
192 changes: 192 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include <Common/CurrentMetrics.h>
#include <Common/Logger.h>
#include <Common/SyncPoint/Ctl.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/tests/gtest_segment_test_basic.h>
#include <TestUtils/TiFlashTestBasic.h>

#include <future>

namespace DB
{
Expand All @@ -28,6 +31,13 @@ class SegmentOperationTest : public SegmentTestBasic
{
protected:
static void SetUpTestCase() {}

void SetUp() override
{
log = DB::Logger::get("SegmentOperationTest");
}

DB::LoggerPtr log;
};

TEST_F(SegmentOperationTest, Issue4956)
Expand Down Expand Up @@ -83,6 +93,188 @@ try
}
CATCH

TEST_F(SegmentOperationTest, WriteDuringSegmentMergeDelta)
try
{
SegmentTestOptions options;
reloadWithOptions(options);
writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100);
flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID);
mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID);

{
LOG_DEBUG(log, "beginSegmentMergeDelta");

// Start a segment merge and suspend it before applyMerge
auto sp_seg_merge_delta_apply = SyncPointCtl::enableInScope("before_Segment::applyMergeDelta");
auto th_seg_merge_delta = std::async([&]() {
mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID, /* check_rows */ false);
});
sp_seg_merge_delta_apply.waitAndPause();

LOG_DEBUG(log, "pausedBeforeApplyMergeDelta");

// non-flushed column files
writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100);
ingestDTFileIntoSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100);
sp_seg_merge_delta_apply.next();
th_seg_merge_delta.wait();

LOG_DEBUG(log, "finishApplyMergeDelta");
}

for (const auto & [seg_id, seg] : segments)
{
UNUSED(seg);
deleteRangeSegment(seg_id);
flushSegmentCache(seg_id);
mergeSegmentDelta(seg_id);
}
ASSERT_EQ(segments.size(), 1);

/// make sure all column file in delta value space is deleted
ASSERT_TRUE(storage_pool->log_storage_v3 != nullptr || storage_pool->log_storage_v2 != nullptr);
if (storage_pool->log_storage_v3)
{
storage_pool->log_storage_v3->gc(/* not_skip */ true);
storage_pool->data_storage_v3->gc(/* not_skip */ true);
ASSERT_EQ(storage_pool->log_storage_v3->getNumberOfPages(), 0);
ASSERT_EQ(storage_pool->data_storage_v3->getNumberOfPages(), 1);
}
if (storage_pool->log_storage_v2)
{
storage_pool->log_storage_v2->gc(/* not_skip */ true);
storage_pool->data_storage_v2->gc(/* not_skip */ true);
ASSERT_EQ(storage_pool->log_storage_v2->getNumberOfPages(), 0);
ASSERT_EQ(storage_pool->data_storage_v2->getNumberOfPages(), 1);
}
}
CATCH

TEST_F(SegmentOperationTest, WriteDuringSegmentSplit)
try
{
SegmentTestOptions options;
reloadWithOptions(options);
writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100);
flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID);
mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID);

{
LOG_DEBUG(log, "beginSegmentSplit");

// Start a segment merge and suspend it before applyMerge
auto sp_seg_split_apply = SyncPointCtl::enableInScope("before_Segment::applySplit");
PageId new_seg_id;
auto th_seg_split = std::async([&]() {
auto new_seg_id_opt = splitSegment(DELTA_MERGE_FIRST_SEGMENT_ID, /* check_rows */ false);
ASSERT_TRUE(new_seg_id_opt.has_value());
new_seg_id = new_seg_id_opt.value();
});
sp_seg_split_apply.waitAndPause();

LOG_DEBUG(log, "pausedBeforeApplySplit");

// non-flushed column files
writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100);
ingestDTFileIntoSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100);
sp_seg_split_apply.next();
th_seg_split.wait();

LOG_DEBUG(log, "finishApplySplit");
mergeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, new_seg_id);
}

for (const auto & [seg_id, seg] : segments)
{
UNUSED(seg);
deleteRangeSegment(seg_id);
flushSegmentCache(seg_id);
mergeSegmentDelta(seg_id);
}
ASSERT_EQ(segments.size(), 1);

/// make sure all column file in delta value space is deleted
ASSERT_TRUE(storage_pool->log_storage_v3 != nullptr || storage_pool->log_storage_v2 != nullptr);
if (storage_pool->log_storage_v3)
{
storage_pool->log_storage_v3->gc(/* not_skip */ true);
storage_pool->data_storage_v3->gc(/* not_skip */ true);
ASSERT_EQ(storage_pool->log_storage_v3->getNumberOfPages(), 0);
ASSERT_EQ(storage_pool->data_storage_v3->getNumberOfPages(), 1);
}
if (storage_pool->log_storage_v2)
{
storage_pool->log_storage_v2->gc(/* not_skip */ true);
storage_pool->data_storage_v2->gc(/* not_skip */ true);
ASSERT_EQ(storage_pool->log_storage_v2->getNumberOfPages(), 0);
ASSERT_EQ(storage_pool->data_storage_v2->getNumberOfPages(), 1);
}
}
CATCH

TEST_F(SegmentOperationTest, WriteDuringSegmentMerge)
try
{
SegmentTestOptions options;
reloadWithOptions(options);
writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100);
flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID);
mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID);

auto new_seg_id_opt = splitSegment(DELTA_MERGE_FIRST_SEGMENT_ID);
ASSERT_TRUE(new_seg_id_opt.has_value());
auto new_seg_id = new_seg_id_opt.value();

{
LOG_DEBUG(log, "beginSegmentMerge");

// Start a segment merge and suspend it before applyMerge
auto sp_seg_merge_apply = SyncPointCtl::enableInScope("before_Segment::applyMerge");
auto th_seg_merge = std::async([&]() {
mergeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, new_seg_id, /* check_rows */ false);
});
sp_seg_merge_apply.waitAndPause();

LOG_DEBUG(log, "pausedBeforeApplyMerge");

// non-flushed column files
writeSegment(new_seg_id, 100);
ingestDTFileIntoSegment(new_seg_id, 100);
sp_seg_merge_apply.next();
th_seg_merge.wait();

LOG_DEBUG(log, "finishApplyMerge");
}

for (const auto & [seg_id, seg] : segments)
{
UNUSED(seg);
deleteRangeSegment(seg_id);
flushSegmentCache(seg_id);
mergeSegmentDelta(seg_id);
}
ASSERT_EQ(segments.size(), 1);

/// make sure all column file in delta value space is deleted
ASSERT_TRUE(storage_pool->log_storage_v3 != nullptr || storage_pool->log_storage_v2 != nullptr);
if (storage_pool->log_storage_v3)
{
storage_pool->log_storage_v3->gc(/* not_skip */ true);
storage_pool->data_storage_v3->gc(/* not_skip */ true);
ASSERT_EQ(storage_pool->log_storage_v3->getNumberOfPages(), 0);
ASSERT_EQ(storage_pool->data_storage_v3->getNumberOfPages(), 1);
}
if (storage_pool->log_storage_v2)
{
storage_pool->log_storage_v2->gc(/* not_skip */ true);
storage_pool->data_storage_v2->gc(/* not_skip */ true);
ASSERT_EQ(storage_pool->log_storage_v2->getNumberOfPages(), 0);
ASSERT_EQ(storage_pool->data_storage_v2->getNumberOfPages(), 1);
}
}
CATCH

// run in CI weekly
TEST_F(SegmentOperationTest, DISABLED_TestSegmentRandomForCI)
try
Expand Down
Loading