From 20e446e8e1046eebb9111754360861f7ec8909d9 Mon Sep 17 00:00:00 2001 From: Yixin Luo <18810541851@163.com> Date: Thu, 14 Nov 2024 19:18:29 +0800 Subject: [PATCH] [BugFix] fix in-memory pk index memory leak (#52903) Signed-off-by: luohaha <18810541851@163.com> --- be/src/storage/primary_index.cpp | 28 ++++++++++++++----- .../storage/lake/primary_key_publish_test.cpp | 6 ++++ 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/be/src/storage/primary_index.cpp b/be/src/storage/primary_index.cpp index 0641298efafc5..70ec05410d27b 100644 --- a/be/src/storage/primary_index.cpp +++ b/be/src/storage/primary_index.cpp @@ -1427,7 +1427,9 @@ Status PrimaryIndex::insert(uint32_t rssid, const vector& rowids, cons auto scope = IOProfiler::scope(IOProfiler::TAG_PKINDEX, _tablet_id); return _insert_into_persistent_index(rssid, rowids, pks); } else { - return _pkey_to_rssid_rowid->insert(rssid, rowids, pks, 0, pks.size()); + auto st = _pkey_to_rssid_rowid->insert(rssid, rowids, pks, 0, pks.size()); + _calc_memory_usage(); + return st; } } @@ -1445,7 +1447,9 @@ Status PrimaryIndex::upsert(uint32_t rssid, uint32_t rowid_start, const Column& if (_persistent_index != nullptr) { return _upsert_into_persistent_index(rssid, rowid_start, pks, 0, pks.size(), deletes, stat); } else { - return _pkey_to_rssid_rowid->upsert(rssid, rowid_start, pks, 0, pks.size(), deletes); + auto st = _pkey_to_rssid_rowid->upsert(rssid, rowid_start, pks, 0, pks.size(), deletes); + _calc_memory_usage(); + return st; } } @@ -1455,7 +1459,9 @@ Status PrimaryIndex::upsert(uint32_t rssid, uint32_t rowid_start, const Column& if (_persistent_index != nullptr) { return _upsert_into_persistent_index(rssid, rowid_start, pks, idx_begin, idx_end, deletes, nullptr); } else { - return _pkey_to_rssid_rowid->upsert(rssid, rowid_start, pks, idx_begin, idx_end, deletes); + auto st = _pkey_to_rssid_rowid->upsert(rssid, rowid_start, pks, idx_begin, idx_end, deletes); + _calc_memory_usage(); + return st; } } @@ -1481,7 +1487,9 @@ Status PrimaryIndex::replace(uint32_t rssid, uint32_t rowid_start, const std::ve if (_persistent_index != nullptr) { return _replace_persistent_index_by_indexes(rssid, rowid_start, replace_indexes, pks); } else { - return _pkey_to_rssid_rowid->replace(rssid, rowid_start, replace_indexes, 0, replace_indexes.size(), pks); + auto st = _pkey_to_rssid_rowid->replace(rssid, rowid_start, replace_indexes, 0, replace_indexes.size(), pks); + _calc_memory_usage(); + return st; } } @@ -1491,7 +1499,9 @@ Status PrimaryIndex::replace(uint32_t rssid, uint32_t rowid_start, const std::ve if (_persistent_index != nullptr) { return _replace_persistent_index(rssid, rowid_start, pks, src_rssid, deletes); } else { - return _pkey_to_rssid_rowid->try_replace(rssid, rowid_start, pks, src_rssid, 0, pks.size(), deletes); + auto st = _pkey_to_rssid_rowid->try_replace(rssid, rowid_start, pks, src_rssid, 0, pks.size(), deletes); + _calc_memory_usage(); + return st; } } @@ -1501,7 +1511,9 @@ Status PrimaryIndex::try_replace(uint32_t rssid, uint32_t rowid_start, const Col if (_persistent_index != nullptr) { return _replace_persistent_index(rssid, rowid_start, pks, max_src_rssid, deletes); } else { - return _pkey_to_rssid_rowid->try_replace(rssid, rowid_start, pks, max_src_rssid, 0, pks.size(), deletes); + auto st = _pkey_to_rssid_rowid->try_replace(rssid, rowid_start, pks, max_src_rssid, 0, pks.size(), deletes); + _calc_memory_usage(); + return st; } } @@ -1511,7 +1523,9 @@ Status PrimaryIndex::erase(const Column& key_col, DeletesMap* deletes) { auto scope = IOProfiler::scope(IOProfiler::TAG_PKINDEX, _tablet_id); return _erase_persistent_index(key_col, deletes); } else { - return _pkey_to_rssid_rowid->erase(key_col, 0, key_col.size(), deletes); + auto st = _pkey_to_rssid_rowid->erase(key_col, 0, key_col.size(), deletes); + _calc_memory_usage(); + return st; } } diff --git a/be/test/storage/lake/primary_key_publish_test.cpp b/be/test/storage/lake/primary_key_publish_test.cpp index d874a349aca21..5221be144669a 100644 --- a/be/test/storage/lake/primary_key_publish_test.cpp +++ b/be/test/storage/lake/primary_key_publish_test.cpp @@ -211,6 +211,8 @@ TEST_P(LakePrimaryKeyPublishTest, test_write_read_success) { writer->close(); ASSERT_OK(publish_single_version(_tablet_metadata->id(), 2, txn_id).status()); + // update memory usage, should large than zero + EXPECT_TRUE(_update_mgr->mem_tracker()->consumption() > 0); EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_absent(_tablet_metadata->id(), txn_id)); // read at version 2 @@ -255,6 +257,8 @@ TEST_P(LakePrimaryKeyPublishTest, test_write_multitime_check_result) { EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_absent(tablet_id, txn_id)); version++; } + // update memory usage, should large than zero + EXPECT_TRUE(_update_mgr->mem_tracker()->consumption() > 0); ASSERT_EQ(kChunkSize, read_rows(tablet_id, version)); _tablet_mgr->prune_metacache(); // fill delvec cache again @@ -394,6 +398,8 @@ TEST_P(LakePrimaryKeyPublishTest, test_publish_multi_segments) { EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_absent(tablet_id, txn_id)); version++; } + // update memory usage, should large than zero + EXPECT_TRUE(_update_mgr->mem_tracker()->consumption() > 0); config::write_buffer_size = old_size; ASSERT_EQ(kChunkSize * 3, read_rows(tablet_id, version)); EXPECT_TRUE(_update_mgr->update_state_mem_tracker()->consumption() == 0);