diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 195ed5c53c2..23ed756da27 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1055,6 +1055,13 @@ std::optional DeltaMergeStore::mergeDeltaBySegment(const Contex segment = segment_it->second; } + if (!segment->flushCache(*dm_context)) + { + // If the flush failed, it means there are parallel updates to the segment in the background. + // In this case, we try again. + continue; + } + const auto new_segment = segmentMergeDelta(*dm_context, segment, run_thread); if (new_segment) { diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index d46e1b7aa36..b7913c44a2c 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -3762,6 +3762,55 @@ try CATCH +// Verify that unflushed data will also be compacted. +TEST_P(DeltaMergeStoreMergeDeltaBySegmentTest, Flush) +try +{ + { + // Write data to first 3 segments and flush. + auto newly_written_rows = helper->rows_by_segments[0] + helper->rows_by_segments[1] + helper->rows_by_segments[2]; + Block block = DMTestEnv::prepareSimpleWriteBlock(0, newly_written_rows, false, pk_type, 5 /* new tso */); + store->write(*db_context, db_context->getSettingsRef(), block); + store->flushCache(dm_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); + + helper->expected_delta_rows[0] += helper->rows_by_segments[0]; + helper->expected_delta_rows[1] += helper->rows_by_segments[1]; + helper->expected_delta_rows[2] += helper->rows_by_segments[2]; + helper->verifyExpectedRowsForAllSegments(); + + auto segment1 = std::next(store->segments.begin())->second; + ASSERT_EQ(segment1->getDelta()->getUnsavedRows(), 0); + } + { + // Write new data to segment[1] without flush. + auto newly_written_rows = helper->rows_by_segments[1]; + Block block = DMTestEnv::prepareSimpleWriteBlock(helper->rows_by_segments[0], helper->rows_by_segments[0] + newly_written_rows, false, pk_type, 10 /* new tso */); + store->write(*db_context, db_context->getSettingsRef(), block); + + helper->expected_delta_rows[1] += helper->rows_by_segments[1]; + helper->verifyExpectedRowsForAllSegments(); + + auto segment1 = std::next(store->segments.begin())->second; + ASSERT_GT(segment1->getDelta()->getUnsavedRows(), 0); + } + { + auto segment1 = std::next(store->segments.begin())->second; + auto result = store->mergeDeltaBySegment(*db_context, segment1->getRowKeyRange().start, DeltaMergeStore::TaskRunThread::Foreground); + ASSERT_NE(result, std::nullopt); + + segment1 = std::next(store->segments.begin())->second; + ASSERT_EQ(*result, segment1->getRowKeyRange()); + + helper->expected_stable_rows[1] += helper->expected_delta_rows[1]; + helper->expected_delta_rows[1] = 0; + helper->verifyExpectedRowsForAllSegments(); + + ASSERT_EQ(segment1->getDelta()->getUnsavedRows(), 0); + } +} +CATCH + + } // namespace tests } // namespace DM } // namespace DB