Skip to content

Commit

Permalink
PageStorage: Fix pages are not deleted under some cases (pingcap#5069) (
Browse files Browse the repository at this point in the history
  • Loading branch information
JaySon-Huang authored Jun 3, 2022
1 parent a547a53 commit df059cc
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 24 deletions.
18 changes: 3 additions & 15 deletions dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -515,21 +515,9 @@ void StoragePool::dataRegisterExternalPagesCallbacks(const ExternalPageCallbacks
}
case PageStorageRunMode::MIX_MODE:
{
// When PageStorage run as Mix Mode.
// We need both get alive pages from V2 and V3 which will feedback for the DM.
// But V2 and V3 won't GC in the same time. So V3 need proxy V2 external pages callback.
// When V3 GC happend, scan the external pages from V3, in remover will scanner all of external pages from V2.
ExternalPageCallbacks mix_mode_callbacks;

mix_mode_callbacks.scanner = callbacks.scanner;
mix_mode_callbacks.remover = [this, callbacks](const ExternalPageCallbacks::PathAndIdsVec & path_and_ids_vec, const std::set<PageId> & valid_ids) {
// ns_id won't used on V2
auto v2_valid_page_ids = data_storage_v2->getAliveExternalPageIds(ns_id);
v2_valid_page_ids.insert(valid_ids.begin(), valid_ids.end());
callbacks.remover(path_and_ids_vec, v2_valid_page_ids);
};
mix_mode_callbacks.ns_id = ns_id;
data_storage_v3->registerExternalPagesCallbacks(mix_mode_callbacks);
// We have transformed all pages from V2 to V3 in `restore`, so
// only need to register callbacks for V3.
data_storage_v3->registerExternalPagesCallbacks(callbacks);
break;
}
default:
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/Page/Page.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,13 @@ struct PageEntry

String toDebugString() const
{
return fmt::format("PageEntry{{file: {}, offset: 0x{:X}, size: {}, checksum: 0x{:X}, tag: {}, field_offsets_size: {}}}",
return fmt::format("PageEntry{{file: {}, offset: 0x{:X}, size: {}, checksum: 0x{:X}, tag: {}, ref: {}, field_offsets_size: {}}}",
file_id,
offset,
size,
checksum,
tag,
ref,
field_offsets.size());
}

Expand Down
26 changes: 18 additions & 8 deletions dbms/src/Storages/Page/V2/PageEntries.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ class PageEntriesMixin

protected:
template <bool must_exist = true>
void decreasePageRef(PageId page_id);
void decreasePageRef(PageId page_id, bool keep_tombstone);

void copyEntries(const PageEntriesMixin & rhs)
{
Expand Down Expand Up @@ -370,8 +370,10 @@ void PageEntriesMixin<T>::del(PageId page_id)
const size_t num_erase = page_ref.erase(page_id);
if (num_erase > 0)
{
// decrease origin page's ref counting
decreasePageRef<must_exist>(normal_page_id);
// decrease origin page's ref counting, this method can
// only called by base, so we should remove the entry if
// the ref count down to zero
decreasePageRef<must_exist>(normal_page_id, /*keep_tombstone=*/false);
}
}

Expand All @@ -392,7 +394,9 @@ void PageEntriesMixin<T>::ref(const PageId ref_id, const PageId page_id)
// if RefPage{ref-id} -> Page{normal_page_id} already exists, just ignore
if (ori_ref->second == normal_page_id)
return;
decreasePageRef<true>(ori_ref->second);
// this method can only called by base, so we should remove the entry if
// the ref count down to zero
decreasePageRef<true>(ori_ref->second, /*keep_tombstone=*/false);
}
// build ref
page_ref[ref_id] = normal_page_id;
Expand All @@ -408,7 +412,7 @@ void PageEntriesMixin<T>::ref(const PageId ref_id, const PageId page_id)

template <typename T>
template <bool must_exist>
void PageEntriesMixin<T>::decreasePageRef(const PageId page_id)
void PageEntriesMixin<T>::decreasePageRef(const PageId page_id, bool keep_tombstone)
{
auto iter = normal_pages.find(page_id);
if constexpr (must_exist)
Expand All @@ -421,8 +425,11 @@ void PageEntriesMixin<T>::decreasePageRef(const PageId page_id)
if (iter != normal_pages.end())
{
auto & entry = iter->second;
entry.ref -= 1;
if (entry.ref == 0)
if (entry.ref > 0)
{
entry.ref -= 1;
}
if (!keep_tombstone && entry.ref == 0)
{
normal_pages.erase(iter);
}
Expand Down Expand Up @@ -620,7 +627,10 @@ class PageEntriesForDelta : public PageEntriesMixin<PageEntriesForDelta>
{
ref_deletions.insert(page_id);
}
decreasePageRef<false>(page_id);
// If this is the base version, we should remove the entry if
// the ref count down to zero. Otherwise it is the delta version
// we should keep a tombstone.
decreasePageRef<false>(page_id, /*keep_tombstone=*/!this->isBase());
}
for (auto it : rhs.page_ref)
{
Expand Down
126 changes: 126 additions & 0 deletions dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Poco/Logger.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/Page/PageStorage.h>
#include <Storages/PathCapacityMetrics.h>
#include <Storages/PathPool.h>
#include <Storages/tests/TiFlashStorageTestBasic.h>
#include <TestUtils/MockDiskDelegator.h>
#include <TestUtils/TiFlashTestBasic.h>
#include <common/logger_useful.h>
#include <fmt/ranges.h>
#include <gtest/gtest.h>

namespace DB
Expand Down Expand Up @@ -568,6 +571,129 @@ try
}
CATCH


TEST_F(PageStorageMixedTest, RefV2External2)
try
{
auto logger = DB::Logger::get("PageStorageMixedTest");
{
WriteBatch batch;
batch.putExternal(100, 0);
batch.putRefPage(101, 100);
batch.delPage(100);
batch.putExternal(102, 0);
page_writer_v2->write(std::move(batch), nullptr);
}

ASSERT_EQ(reloadMixedStoragePool(), PageStorageRunMode::MIX_MODE);
{
WriteBatch batch;
batch.putExternal(100, 0);
batch.putRefPage(101, 100);
batch.delPage(100);
batch.putExternal(102, 0);
page_writer_mix->writeIntoV3(std::move(batch), nullptr);
}
{
auto snap = storage_pool_mix->log_storage_v2->getSnapshot("zzz"); // must hold
// after transform to v3, delete these from v2
WriteBatch batch;
batch.delPage(100);
batch.delPage(101);
batch.delPage(102);
page_writer_mix->writeIntoV2(std::move(batch), nullptr);
}

{
LOG_FMT_INFO(logger, "first check alive id in v2");
auto alive_dt_ids_in_v2 = storage_pool_mix->log_storage_v2->getAliveExternalPageIds(TEST_NAMESPACE_ID);
EXPECT_EQ(alive_dt_ids_in_v2.size(), 0);

storage_pool_mix->log_storage_v3->gc(false, nullptr, nullptr);
auto alive_dt_ids_in_v3 = storage_pool_mix->log_storage_v3->getAliveExternalPageIds(TEST_NAMESPACE_ID);
ASSERT_EQ(alive_dt_ids_in_v3.size(), 2);
auto iter = alive_dt_ids_in_v3.begin();
EXPECT_EQ(*iter, 100);
iter++;
EXPECT_EQ(*iter, 102);
}

{
LOG_FMT_INFO(logger, "remove 100, create 105");
StorageSnapshot snap(*storage_pool_mix, nullptr, "xxx", true); // must hold and write
// write delete again
WriteBatch batch;
batch.delPage(100);
batch.putExternal(105, 0);
page_writer_mix->write(std::move(batch), nullptr);
LOG_FMT_INFO(logger, "done");
}
{
LOG_FMT_INFO(logger, "remove 101, create 106");
StorageSnapshot snap(*storage_pool_mix, nullptr, "xxx", true); // must hold and write
// write delete again
WriteBatch batch;
batch.delPage(101);
batch.putExternal(106, 0);
page_writer_mix->write(std::move(batch), nullptr);
LOG_FMT_INFO(logger, "done");
}
{
LOG_FMT_INFO(logger, "remove 102, create 107");
StorageSnapshot snap(*storage_pool_mix, nullptr, "xxx", true); // must hold and write
// write delete again
WriteBatch batch;
batch.delPage(102);
batch.putExternal(107, 0);
page_writer_mix->write(std::move(batch), nullptr);
LOG_FMT_INFO(logger, "done");
}

{
LOG_FMT_INFO(logger, "second check alive id in v2");
auto alive_dt_ids_in_v2 = storage_pool_mix->log_storage_v2->getAliveExternalPageIds(TEST_NAMESPACE_ID);
EXPECT_EQ(alive_dt_ids_in_v2.size(), 0) << fmt::format("{}", alive_dt_ids_in_v2);

storage_pool_mix->log_storage_v3->gc(false, nullptr, nullptr);
auto alive_dt_ids_in_v3 = storage_pool_mix->log_storage_v3->getAliveExternalPageIds(TEST_NAMESPACE_ID);
ASSERT_EQ(alive_dt_ids_in_v3.size(), 3) << fmt::format("{}", alive_dt_ids_in_v3);
auto iter = alive_dt_ids_in_v3.begin();
EXPECT_EQ(*iter, 105);
iter++;
EXPECT_EQ(*iter, 106);
iter++;
EXPECT_EQ(*iter, 107);
}
{
LOG_FMT_INFO(logger, "third check alive id in v2");
auto alive_dt_ids_in_v2 = storage_pool_mix->log_storage_v2->getAliveExternalPageIds(TEST_NAMESPACE_ID);
EXPECT_EQ(alive_dt_ids_in_v2.size(), 0) << fmt::format("{}", alive_dt_ids_in_v2);

storage_pool_mix->log_storage_v3->gc(false, nullptr, nullptr);
auto alive_dt_ids_in_v3 = storage_pool_mix->log_storage_v3->getAliveExternalPageIds(TEST_NAMESPACE_ID);
ASSERT_EQ(alive_dt_ids_in_v3.size(), 3) << fmt::format("{}", alive_dt_ids_in_v3);
auto iter = alive_dt_ids_in_v3.begin();
EXPECT_EQ(*iter, 105);
iter++;
EXPECT_EQ(*iter, 106);
iter++;
EXPECT_EQ(*iter, 107);
}

{
// cleanup v3
WriteBatch batch;
batch.delPage(100);
batch.delPage(101);
batch.delPage(102);
batch.delPage(105);
batch.delPage(106);
batch.delPage(107);
page_writer_mix->write(std::move(batch), nullptr);
}
}
CATCH

TEST_F(PageStorageMixedTest, ReadWithSnapshotAfterMergeDelta)
try
{
Expand Down

0 comments on commit df059cc

Please sign in to comment.