Skip to content

Commit

Permalink
[fix](memory) Refactor LRU cache policy memory tracking (apache#36235)
Browse files Browse the repository at this point in the history
Fix apache#35590, CacheManager get CachePolicy should be used lock.

Add LRUCachePolicyTrackingAllocator and LRUCachePolicyTrackingManual.
tracking memory in LRUCachePolicy::insert, LRUCacheValueBase not need init mem_tracker, so not need to get CachePolicy under lock.
  • Loading branch information
xinyiZzz committed Jul 11, 2024
1 parent 9dff269 commit 220f667
Show file tree
Hide file tree
Showing 26 changed files with 283 additions and 225 deletions.
35 changes: 34 additions & 1 deletion be/src/olap/page_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,39 @@
#include "runtime/exec_env.h"

namespace doris {
template <typename TAllocator>
PageBase<TAllocator>::PageBase(size_t b, bool use_cache, segment_v2::PageTypePB page_type)
: LRUCacheValueBase(),
_size(b),
_capacity(b),
_use_cache(use_cache),
_page_type(page_type) {
if (_use_cache) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
StoragePageCache::instance()->mem_tracker(_page_type));
_data = reinterpret_cast<char*>(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16));
} else {
_data = reinterpret_cast<char*>(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16));
}
}

template <typename TAllocator>
PageBase<TAllocator>::~PageBase() {
if (_data != nullptr) {
DCHECK(_capacity != 0 && _size != 0);
if (_use_cache) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
StoragePageCache::instance()->mem_tracker(_page_type));
TAllocator::free(_data, _capacity);
} else {
TAllocator::free(_data, _capacity);
}
}
}

template class PageBase<Allocator<true>>;
template class PageBase<Allocator<false>>;

StoragePageCache* StoragePageCache::create_global_cache(size_t capacity,
int32_t index_cache_percentage,
int64_t pk_index_cache_capacity,
Expand Down Expand Up @@ -70,7 +103,7 @@ void StoragePageCache::insert(const CacheKey& key, DataPage* data, PageCacheHand
}

auto* cache = _get_page_cache(page_type);
auto* lru_handle = cache->insert_no_tracking(key.encode(), data, data->capacity(), priority);
auto* lru_handle = cache->insert(key.encode(), data, data->capacity(), 0, priority);
*handle = PageCacheHandle(cache, lru_handle);
}

Expand Down
54 changes: 18 additions & 36 deletions be/src/olap/page_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,10 @@ template <typename TAllocator>
class PageBase : private TAllocator, public LRUCacheValueBase {
public:
PageBase() = default;

PageBase(size_t b, const std::shared_ptr<MemTrackerLimiter>& mem_tracker)
: LRUCacheValueBase(), _size(b), _capacity(b), _mem_tracker_by_allocator(mem_tracker) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker_by_allocator);
_data = reinterpret_cast<char*>(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16));
}

PageBase(size_t b, bool use_cache, segment_v2::PageTypePB page_type);
PageBase(const PageBase&) = delete;
PageBase& operator=(const PageBase&) = delete;

~PageBase() override {
if (_data != nullptr) {
DCHECK(_capacity != 0 && _size != 0);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker_by_allocator);
TAllocator::free(_data, _capacity);
}
}
~PageBase() override;

char* data() { return _data; }
size_t size() { return _size; }
Expand All @@ -73,7 +60,8 @@ class PageBase : private TAllocator, public LRUCacheValueBase {
// Effective size, smaller than capacity, such as data page remove checksum suffix.
size_t _size = 0;
size_t _capacity = 0;
std::shared_ptr<MemTrackerLimiter> _mem_tracker_by_allocator;
bool _use_cache;
segment_v2::PageTypePB _page_type;
};

using DataPage = PageBase<Allocator<false>>;
Expand Down Expand Up @@ -105,34 +93,28 @@ class StoragePageCache {
}
};

class DataPageCache : public LRUCachePolicy {
class DataPageCache : public LRUCachePolicyTrackingAllocator {
public:
DataPageCache(size_t capacity, uint32_t num_shards)
: LRUCachePolicy(CachePolicy::CacheType::DATA_PAGE_CACHE, capacity,
LRUCacheType::SIZE, config::data_page_cache_stale_sweep_time_sec,
num_shards) {
init_mem_tracker_by_allocator(lru_cache_type_string(LRUCacheType::SIZE));
}
: LRUCachePolicyTrackingAllocator(
CachePolicy::CacheType::DATA_PAGE_CACHE, capacity, LRUCacheType::SIZE,
config::data_page_cache_stale_sweep_time_sec, num_shards) {}
};

class IndexPageCache : public LRUCachePolicy {
class IndexPageCache : public LRUCachePolicyTrackingAllocator {
public:
IndexPageCache(size_t capacity, uint32_t num_shards)
: LRUCachePolicy(CachePolicy::CacheType::INDEXPAGE_CACHE, capacity,
LRUCacheType::SIZE, config::index_page_cache_stale_sweep_time_sec,
num_shards) {
init_mem_tracker_by_allocator(lru_cache_type_string(LRUCacheType::SIZE));
}
: LRUCachePolicyTrackingAllocator(
CachePolicy::CacheType::INDEXPAGE_CACHE, capacity, LRUCacheType::SIZE,
config::index_page_cache_stale_sweep_time_sec, num_shards) {}
};

class PKIndexPageCache : public LRUCachePolicy {
class PKIndexPageCache : public LRUCachePolicyTrackingAllocator {
public:
PKIndexPageCache(size_t capacity, uint32_t num_shards)
: LRUCachePolicy(CachePolicy::CacheType::PK_INDEX_PAGE_CACHE, capacity,
LRUCacheType::SIZE,
config::pk_index_page_cache_stale_sweep_time_sec, num_shards) {
init_mem_tracker_by_allocator(lru_cache_type_string(LRUCacheType::SIZE));
}
: LRUCachePolicyTrackingAllocator(
CachePolicy::CacheType::PK_INDEX_PAGE_CACHE, capacity, LRUCacheType::SIZE,
config::pk_index_page_cache_stale_sweep_time_sec, num_shards) {}
};

static constexpr uint32_t kDefaultNumShards = 16;
Expand Down Expand Up @@ -169,7 +151,7 @@ class StoragePageCache {
segment_v2::PageTypePB page_type, bool in_memory = false);

std::shared_ptr<MemTrackerLimiter> mem_tracker(segment_v2::PageTypePB page_type) {
return _get_page_cache(page_type)->mem_tracker_by_allocator();
return _get_page_cache(page_type)->mem_tracker();
}

private:
Expand All @@ -183,7 +165,7 @@ class StoragePageCache {
// delete bitmap in unique key with mow
std::unique_ptr<PKIndexPageCache> _pk_index_page_cache;

LRUCachePolicy* _get_page_cache(segment_v2::PageTypePB page_type) {
LRUCachePolicyTrackingAllocator* _get_page_cache(segment_v2::PageTypePB page_type) {
switch (page_type) {
case segment_v2::DATA_PAGE: {
return _data_page_cache.get();
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/bitshuffle_page_pre_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ struct BitShufflePagePreDecoder : public DataPagePreDecoder {
* @return Status
*/
Status decode(std::unique_ptr<DataPage>* page, Slice* page_slice, size_t size_of_tail,
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) override {
bool _use_cache, segment_v2::PageTypePB page_type) override {
size_t num_elements, compressed_size, num_element_after_padding;
int size_of_element;

Expand Down Expand Up @@ -67,7 +67,7 @@ struct BitShufflePagePreDecoder : public DataPagePreDecoder {
decoded_slice.size = size_of_dict_header + BITSHUFFLE_PAGE_HEADER_SIZE +
num_element_after_padding * size_of_element + size_of_tail;
std::unique_ptr<DataPage> decoded_page =
std::make_unique<DataPage>(decoded_slice.size, mem_tracker);
std::make_unique<DataPage>(decoded_slice.size, _use_cache, page_type);
decoded_slice.data = decoded_page->data();

if constexpr (USED_IN_DICT_ENCODING) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/encoding_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ enum EncodingTypePB : int;
class DataPagePreDecoder {
public:
virtual Status decode(std::unique_ptr<DataPage>* page, Slice* page_slice, size_t size_of_tail,
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) = 0;
bool _use_cache, segment_v2::PageTypePB page_type) = 0;
virtual ~DataPagePreDecoder() = default;
};

Expand Down
10 changes: 3 additions & 7 deletions be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,10 @@ void InvertedIndexQueryCache::insert(const CacheKey& key, std::shared_ptr<roarin
return;
}

auto* lru_handle = LRUCachePolicy::insert(key.encode(), (void*)cache_value_ptr.release(),
bitmap->getSizeInBytes(), bitmap->getSizeInBytes(),
CachePriority::NORMAL);
auto* lru_handle = LRUCachePolicyTrackingManual::insert(
key.encode(), (void*)cache_value_ptr.release(), bitmap->getSizeInBytes(),
bitmap->getSizeInBytes(), CachePriority::NORMAL);
*handle = InvertedIndexQueryCacheHandle(this, lru_handle);
}

int64_t InvertedIndexQueryCache::mem_consumption() {
return LRUCachePolicy::mem_consumption();
}

} // namespace doris::segment_v2
42 changes: 20 additions & 22 deletions be/src/olap/rowset/segment_v2/inverted_index_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,9 @@ class InvertedIndexSearcherCache {
size_t size = 0;
int64_t last_visit_time;

CacheValue() : LRUCacheValueBase(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE) {}
CacheValue() = default;
explicit CacheValue(IndexSearcherPtr searcher, size_t mem_size, int64_t visit_time)
: LRUCacheValueBase(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE),
index_searcher(std::move(searcher)) {
: index_searcher(std::move(searcher)) {
size = mem_size;
last_visit_time = visit_time;
}
Expand Down Expand Up @@ -100,23 +99,23 @@ class InvertedIndexSearcherCache {
private:
InvertedIndexSearcherCache() = default;

class InvertedIndexSearcherCachePolicy : public LRUCachePolicy {
class InvertedIndexSearcherCachePolicy : public LRUCachePolicyTrackingManual {
public:
InvertedIndexSearcherCachePolicy(size_t capacity, uint32_t num_shards,
uint32_t element_count_capacity)
: LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE, capacity,
LRUCacheType::SIZE,
config::inverted_index_cache_stale_sweep_time_sec, num_shards,
element_count_capacity, true) {}
: LRUCachePolicyTrackingManual(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE,
capacity, LRUCacheType::SIZE,
config::inverted_index_cache_stale_sweep_time_sec,
num_shards, element_count_capacity, true) {}
InvertedIndexSearcherCachePolicy(size_t capacity, uint32_t num_shards,
uint32_t element_count_capacity,
CacheValueTimeExtractor cache_value_time_extractor,
bool cache_value_check_timestamp)
: LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE, capacity,
LRUCacheType::SIZE,
config::inverted_index_cache_stale_sweep_time_sec, num_shards,
element_count_capacity, cache_value_time_extractor,
cache_value_check_timestamp, true) {}
: LRUCachePolicyTrackingManual(
CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE, capacity,
LRUCacheType::SIZE, config::inverted_index_cache_stale_sweep_time_sec,
num_shards, element_count_capacity, cache_value_time_extractor,
cache_value_check_timestamp, true) {}
};
// Insert a cache entry by key.
// And the cache entry will be returned in handle.
Expand Down Expand Up @@ -180,8 +179,10 @@ class InvertedIndexCacheHandle {

class InvertedIndexQueryCacheHandle;

class InvertedIndexQueryCache : public LRUCachePolicy {
class InvertedIndexQueryCache : public LRUCachePolicyTrackingManual {
public:
using LRUCachePolicyTrackingManual::insert;

// cache key
struct CacheKey {
io::Path index_path; // index file path
Expand All @@ -208,14 +209,12 @@ class InvertedIndexQueryCache : public LRUCachePolicy {

class CacheValue : public LRUCacheValueBase {
public:
CacheValue() : LRUCacheValueBase(CachePolicy::CacheType::INVERTEDINDEX_QUERY_CACHE) {}

std::shared_ptr<roaring::Roaring> bitmap;
};

// Create global instance of this class
static InvertedIndexQueryCache* create_global_cache(size_t capacity, uint32_t num_shards = 16) {
InvertedIndexQueryCache* res = new InvertedIndexQueryCache(capacity, num_shards);
auto* res = new InvertedIndexQueryCache(capacity, num_shards);
return res;
}

Expand All @@ -228,16 +227,15 @@ class InvertedIndexQueryCache : public LRUCachePolicy {
InvertedIndexQueryCache() = delete;

InvertedIndexQueryCache(size_t capacity, uint32_t num_shards)
: LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_QUERY_CACHE, capacity,
LRUCacheType::SIZE, config::inverted_index_cache_stale_sweep_time_sec,
num_shards) {}
: LRUCachePolicyTrackingManual(CachePolicy::CacheType::INVERTEDINDEX_QUERY_CACHE,
capacity, LRUCacheType::SIZE,
config::inverted_index_cache_stale_sweep_time_sec,
num_shards) {}

bool lookup(const CacheKey& key, InvertedIndexQueryCacheHandle* handle);

void insert(const CacheKey& key, std::shared_ptr<roaring::Roaring> bitmap,
InvertedIndexQueryCacheHandle* handle);

int64_t mem_consumption();
};

class InvertedIndexQueryCacheHandle {
Expand Down
14 changes: 4 additions & 10 deletions be/src/olap/rowset/segment_v2/page_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,9 @@ Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle*
opts.file_reader->path().native());
}

std::shared_ptr<MemTrackerLimiter> page_mem_tracker;
if (opts.use_page_cache && cache) {
page_mem_tracker = cache->mem_tracker(opts.type);
} else {
page_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
}

// hold compressed page at first, reset to decompressed page later
std::unique_ptr<DataPage> page = std::make_unique<DataPage>(page_size, page_mem_tracker);
std::unique_ptr<DataPage> page =
std::make_unique<DataPage>(page_size, opts.use_page_cache, opts.type);
Slice page_slice(page->data(), page_size);
{
SCOPED_RAW_TIMER(&opts.stats->io_ns);
Expand Down Expand Up @@ -190,7 +184,7 @@ Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle*
}
SCOPED_RAW_TIMER(&opts.stats->decompress_ns);
std::unique_ptr<DataPage> decompressed_page = std::make_unique<DataPage>(
footer->uncompressed_size() + footer_size + 4, page_mem_tracker);
footer->uncompressed_size() + footer_size + 4, opts.use_page_cache, opts.type);

// decompress page body
Slice compressed_body(page_slice.data, body_size);
Expand Down Expand Up @@ -218,7 +212,7 @@ Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle*
if (pre_decoder) {
RETURN_IF_ERROR(pre_decoder->decode(
&page, &page_slice, footer->data_page_footer().nullmap_size() + footer_size + 4,
page_mem_tracker));
opts.use_page_cache, opts.type));
}
}

Expand Down
9 changes: 4 additions & 5 deletions be/src/olap/schema_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ using SegmentIteratorUPtr = std::unique_ptr<SegmentIterator>;
// eliminating the need for frequent allocation and deallocation during usage.
// This caching mechanism proves immensely advantageous, particularly in scenarios
// with high concurrency, where queries are executed simultaneously.
class SchemaCache : public LRUCachePolicy {
class SchemaCache : public LRUCachePolicyTrackingManual {
public:
enum class Type { TABLET_SCHEMA = 0, SCHEMA = 1 };

Expand Down Expand Up @@ -104,17 +104,16 @@ class SchemaCache : public LRUCachePolicy {

class CacheValue : public LRUCacheValueBase {
public:
CacheValue() : LRUCacheValueBase(CachePolicy::CacheType::SCHEMA_CACHE) {}

Type type;
// either tablet_schema or schema
TabletSchemaSPtr tablet_schema = nullptr;
SchemaSPtr schema = nullptr;
};

SchemaCache(size_t capacity)
: LRUCachePolicy(CachePolicy::CacheType::SCHEMA_CACHE, capacity, LRUCacheType::NUMBER,
config::schema_cache_sweep_time_sec) {}
: LRUCachePolicyTrackingManual(CachePolicy::CacheType::SCHEMA_CACHE, capacity,
LRUCacheType::NUMBER,
config::schema_cache_sweep_time_sec) {}

private:
static constexpr char SCHEMA_DELIMITER = '-';
Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/segment_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ bool SegmentCache::lookup(const SegmentCache::CacheKey& key, SegmentCacheHandle*

void SegmentCache::insert(const SegmentCache::CacheKey& key, SegmentCache::CacheValue& value,
SegmentCacheHandle* handle) {
auto* lru_handle =
LRUCachePolicy::insert(key.encode(), &value, value.segment->meta_mem_usage(),
value.segment->meta_mem_usage(), CachePriority::NORMAL);
auto* lru_handle = LRUCachePolicyTrackingManual::insert(
key.encode(), &value, value.segment->meta_mem_usage(), value.segment->meta_mem_usage(),
CachePriority::NORMAL);
handle->push_segment(this, lru_handle);
}

Expand Down
Loading

0 comments on commit 220f667

Please sign in to comment.