Skip to content

Commit

Permalink
[BugFix] fix in-memory pk index memory leak (#52903)
Browse files Browse the repository at this point in the history
Signed-off-by: luohaha <[email protected]>
  • Loading branch information
luohaha authored Nov 14, 2024
1 parent 8c16a01 commit 20e446e
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 7 deletions.
28 changes: 21 additions & 7 deletions be/src/storage/primary_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1427,7 +1427,9 @@ Status PrimaryIndex::insert(uint32_t rssid, const vector<uint32_t>& rowids, cons
auto scope = IOProfiler::scope(IOProfiler::TAG_PKINDEX, _tablet_id);
return _insert_into_persistent_index(rssid, rowids, pks);
} else {
return _pkey_to_rssid_rowid->insert(rssid, rowids, pks, 0, pks.size());
auto st = _pkey_to_rssid_rowid->insert(rssid, rowids, pks, 0, pks.size());
_calc_memory_usage();
return st;
}
}

Expand All @@ -1445,7 +1447,9 @@ Status PrimaryIndex::upsert(uint32_t rssid, uint32_t rowid_start, const Column&
if (_persistent_index != nullptr) {
return _upsert_into_persistent_index(rssid, rowid_start, pks, 0, pks.size(), deletes, stat);
} else {
return _pkey_to_rssid_rowid->upsert(rssid, rowid_start, pks, 0, pks.size(), deletes);
auto st = _pkey_to_rssid_rowid->upsert(rssid, rowid_start, pks, 0, pks.size(), deletes);
_calc_memory_usage();
return st;
}
}

Expand All @@ -1455,7 +1459,9 @@ Status PrimaryIndex::upsert(uint32_t rssid, uint32_t rowid_start, const Column&
if (_persistent_index != nullptr) {
return _upsert_into_persistent_index(rssid, rowid_start, pks, idx_begin, idx_end, deletes, nullptr);
} else {
return _pkey_to_rssid_rowid->upsert(rssid, rowid_start, pks, idx_begin, idx_end, deletes);
auto st = _pkey_to_rssid_rowid->upsert(rssid, rowid_start, pks, idx_begin, idx_end, deletes);
_calc_memory_usage();
return st;
}
}

Expand All @@ -1481,7 +1487,9 @@ Status PrimaryIndex::replace(uint32_t rssid, uint32_t rowid_start, const std::ve
if (_persistent_index != nullptr) {
return _replace_persistent_index_by_indexes(rssid, rowid_start, replace_indexes, pks);
} else {
return _pkey_to_rssid_rowid->replace(rssid, rowid_start, replace_indexes, 0, replace_indexes.size(), pks);
auto st = _pkey_to_rssid_rowid->replace(rssid, rowid_start, replace_indexes, 0, replace_indexes.size(), pks);
_calc_memory_usage();
return st;
}
}

Expand All @@ -1491,7 +1499,9 @@ Status PrimaryIndex::replace(uint32_t rssid, uint32_t rowid_start, const std::ve
if (_persistent_index != nullptr) {
return _replace_persistent_index(rssid, rowid_start, pks, src_rssid, deletes);
} else {
return _pkey_to_rssid_rowid->try_replace(rssid, rowid_start, pks, src_rssid, 0, pks.size(), deletes);
auto st = _pkey_to_rssid_rowid->try_replace(rssid, rowid_start, pks, src_rssid, 0, pks.size(), deletes);
_calc_memory_usage();
return st;
}
}

Expand All @@ -1501,7 +1511,9 @@ Status PrimaryIndex::try_replace(uint32_t rssid, uint32_t rowid_start, const Col
if (_persistent_index != nullptr) {
return _replace_persistent_index(rssid, rowid_start, pks, max_src_rssid, deletes);
} else {
return _pkey_to_rssid_rowid->try_replace(rssid, rowid_start, pks, max_src_rssid, 0, pks.size(), deletes);
auto st = _pkey_to_rssid_rowid->try_replace(rssid, rowid_start, pks, max_src_rssid, 0, pks.size(), deletes);
_calc_memory_usage();
return st;
}
}

Expand All @@ -1511,7 +1523,9 @@ Status PrimaryIndex::erase(const Column& key_col, DeletesMap* deletes) {
auto scope = IOProfiler::scope(IOProfiler::TAG_PKINDEX, _tablet_id);
return _erase_persistent_index(key_col, deletes);
} else {
return _pkey_to_rssid_rowid->erase(key_col, 0, key_col.size(), deletes);
auto st = _pkey_to_rssid_rowid->erase(key_col, 0, key_col.size(), deletes);
_calc_memory_usage();
return st;
}
}

Expand Down
6 changes: 6 additions & 0 deletions be/test/storage/lake/primary_key_publish_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ TEST_P(LakePrimaryKeyPublishTest, test_write_read_success) {
writer->close();

ASSERT_OK(publish_single_version(_tablet_metadata->id(), 2, txn_id).status());
// update memory usage, should large than zero
EXPECT_TRUE(_update_mgr->mem_tracker()->consumption() > 0);
EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_absent(_tablet_metadata->id(), txn_id));

// read at version 2
Expand Down Expand Up @@ -255,6 +257,8 @@ TEST_P(LakePrimaryKeyPublishTest, test_write_multitime_check_result) {
EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_absent(tablet_id, txn_id));
version++;
}
// update memory usage, should large than zero
EXPECT_TRUE(_update_mgr->mem_tracker()->consumption() > 0);
ASSERT_EQ(kChunkSize, read_rows(tablet_id, version));
_tablet_mgr->prune_metacache();
// fill delvec cache again
Expand Down Expand Up @@ -394,6 +398,8 @@ TEST_P(LakePrimaryKeyPublishTest, test_publish_multi_segments) {
EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_absent(tablet_id, txn_id));
version++;
}
// update memory usage, should large than zero
EXPECT_TRUE(_update_mgr->mem_tracker()->consumption() > 0);
config::write_buffer_size = old_size;
ASSERT_EQ(kChunkSize * 3, read_rows(tablet_id, version));
EXPECT_TRUE(_update_mgr->update_state_mem_tracker()->consumption() == 0);
Expand Down

0 comments on commit 20e446e

Please sign in to comment.