Skip to content

Commit

Permalink
BlockFileCache refractor
Browse files Browse the repository at this point in the history
Signed-off-by: freemandealer <[email protected]>
  • Loading branch information
freemandealer committed Jul 11, 2024
1 parent 80131d0 commit dfbffcb
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 174 deletions.
257 changes: 83 additions & 174 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -754,55 +754,92 @@ const BlockFileCache::LRUQueue& BlockFileCache::get_queue(FileCacheType type) co
return _normal_queue;
}

bool BlockFileCache::try_reserve_for_ttl_without_lru(size_t size,
std::lock_guard<std::mutex>& cache_lock) {
size_t removed_size = 0;
size_t cur_cache_size = _cur_cache_size;
auto limit = config::max_ttl_cache_ratio * _capacity;
if ((_cur_ttl_size + size) * 100 > limit) {
return false;
}
void BlockFileCache::remove_file_blocks(std::vector<FileBlockCell*>& to_evict,
std::lock_guard<std::mutex>& cache_lock) {
auto remove_file_block_if = [&](FileBlockCell* cell) {
FileBlockSPtr file_block = cell->file_block;
if (file_block) {
std::lock_guard block_lock(file_block->_mutex);
remove(file_block, cache_lock, block_lock);
}
};
size_t normal_queue_size = _normal_queue.get_capacity(cache_lock);
size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock);
size_t index_queue_size = _index_queue.get_capacity(cache_lock);
if (is_overflow(removed_size, size, cur_cache_size) && normal_queue_size == 0 &&
disposable_queue_size == 0 && index_queue_size == 0) {
return false;
}
std::vector<FileBlockCell*> to_evict;
auto collect_eliminate_fragments = [&](LRUQueue& queue) {
for (const auto& [entry_key, entry_offset, entry_size] : queue) {
if (!is_overflow(removed_size, size, cur_cache_size)) {
break;
std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
}

void BlockFileCache::remove_file_blocks_and_clean_time_maps(std::vector<FileBlockCell*>& to_evict,
std::lock_guard<std::mutex>& cache_lock) {
auto remove_file_block_and_clean_time_maps_if = [&](FileBlockCell* cell) {
FileBlockSPtr file_block = cell->file_block;
if (file_block) {
std::lock_guard block_lock(file_block->_mutex);
auto hash = cell->file_block->get_hash_value();
remove(file_block, cache_lock, block_lock);
if (_files.find(hash) == _files.end()) {
if (auto iter = _key_to_time.find(hash);
_key_to_time.find(hash) != _key_to_time.end()) {
auto _time_to_key_iter = _time_to_key.equal_range(iter->second);
while (_time_to_key_iter.first != _time_to_key_iter.second) {
if (_time_to_key_iter.first->second == hash) {
_time_to_key_iter.first =
_time_to_key.erase(_time_to_key_iter.first);
break;
}
_time_to_key_iter.first++;
}
_key_to_time.erase(hash);
}
}
auto* cell = get_cell(entry_key, entry_offset, cache_lock);
}
};
std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_and_clean_time_maps_if);
}

DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
<< ", offset: " << entry_offset;
void BlockFileCache::find_evict_candidates(LRUQueue &queue, size_t size, size_t cur_cache_size,
size_t &removed_size,
std::vector<FileBlockCell*> &to_evict,
std::lock_guard<std::mutex> &cache_lock) {
for (const auto& [entry_key, entry_offset, entry_size] : queue) {
if (!is_overflow(removed_size, size, cur_cache_size)) {
break;
}
auto* cell = get_cell(entry_key, entry_offset, cache_lock);

size_t cell_size = cell->size();
DCHECK(entry_size == cell_size);
DCHECK(cell) << "Cache became inconsistent. key: " << entry_key.to_string()
<< ", offset: " << entry_offset;

/// It is guaranteed that cell is not removed from cache as long as
/// pointer to corresponding file block is hold by any other thread.
if (!cell->releasable()) {
continue;
}
size_t cell_size = cell->size();
DCHECK(entry_size == cell_size);

if (cell->releasable()) {
auto& file_block = cell->file_block;

std::lock_guard block_lock(file_block->_mutex);
DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
to_evict.push_back(cell);
removed_size += cell_size;
}
}
}

bool BlockFileCache::try_reserve_for_ttl_without_lru(size_t size,
std::lock_guard<std::mutex>& cache_lock) {
size_t removed_size = 0;
size_t cur_cache_size = _cur_cache_size;
auto limit = config::max_ttl_cache_ratio * _capacity;
if ((_cur_ttl_size + size) * 100 > limit) {
return false;
}

size_t normal_queue_size = _normal_queue.get_capacity(cache_lock);
size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock);
size_t index_queue_size = _index_queue.get_capacity(cache_lock);
if (is_overflow(removed_size, size, cur_cache_size) && normal_queue_size == 0 &&
disposable_queue_size == 0 && index_queue_size == 0) {
return false;
}
std::vector<FileBlockCell*> to_evict;
auto collect_eliminate_fragments = [&](LRUQueue& queue) {
find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock);
};
if (disposable_queue_size != 0) {
collect_eliminate_fragments(get_queue(FileCacheType::DISPOSABLE));
Expand All @@ -813,7 +850,7 @@ bool BlockFileCache::try_reserve_for_ttl_without_lru(size_t size,
if (index_queue_size != 0) {
collect_eliminate_fragments(get_queue(FileCacheType::INDEX));
}
std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
remove_file_blocks(to_evict, cache_lock);
if (is_overflow(removed_size, size, cur_cache_size)) {
return false;
}
Expand All @@ -829,54 +866,8 @@ bool BlockFileCache::try_reserve_for_ttl(size_t size, std::lock_guard<std::mutex
size_t cur_cache_size = _cur_cache_size;

std::vector<FileBlockCell*> to_evict;
for (const auto& [entry_key, entry_offset, entry_size] : queue) {
if (!is_overflow(removed_size, size, cur_cache_size)) {
break;
}
auto* cell = get_cell(entry_key, entry_offset, cache_lock);

DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
<< ", offset: " << entry_offset;

size_t cell_size = cell->size();
DCHECK(entry_size == cell_size);

if (cell->releasable()) {
auto& file_block = cell->file_block;

std::lock_guard block_lock(file_block->_mutex);
DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
to_evict.push_back(cell);

removed_size += cell_size;
}
}

auto remove_file_block_if = [&](FileBlockCell* cell) {
FileBlockSPtr file_block = cell->file_block;
if (file_block) {
std::lock_guard block_lock(file_block->_mutex);
auto hash = cell->file_block->get_hash_value();
remove(file_block, cache_lock, block_lock);
if (_files.find(hash) == _files.end()) {
if (auto iter = _key_to_time.find(hash);
_key_to_time.find(hash) != _key_to_time.end()) {
auto _time_to_key_iter = _time_to_key.equal_range(iter->second);
while (_time_to_key_iter.first != _time_to_key_iter.second) {
if (_time_to_key_iter.first->second == hash) {
_time_to_key_iter.first =
_time_to_key.erase(_time_to_key_iter.first);
break;
}
_time_to_key_iter.first++;
}
_key_to_time.erase(hash);
}
}
}
};

std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock);
remove_file_blocks_and_clean_time_maps(to_evict, cache_lock);

return !is_overflow(removed_size, size, cur_cache_size);
} else {
Expand Down Expand Up @@ -1069,13 +1060,7 @@ void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) {
}
}
}
auto remove_file_block_if = [&](FileBlockCell* cell) {
if (FileBlockSPtr file_block = cell->file_block; file_block) {
std::lock_guard block_lock(file_block->_mutex);
remove(file_block, cache_lock, block_lock);
}
};
std::for_each(to_remove.begin(), to_remove.end(), remove_file_block_if);
remove_file_blocks(to_remove, cache_lock);
}
}

Expand Down Expand Up @@ -1141,15 +1126,7 @@ bool BlockFileCache::try_reserve_from_other_queue_by_hot_interval(
}
}
}
auto remove_file_block_if = [&](FileBlockCell* cell) {
FileBlockSPtr file_block = cell->file_block;
if (file_block) {
std::lock_guard block_lock(file_block->_mutex);
remove(file_block, cache_lock, block_lock);
}
};

std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
remove_file_blocks(to_evict, cache_lock);

return !is_overflow(removed_size, size, cur_cache_size);
}
Expand All @@ -1168,37 +1145,9 @@ bool BlockFileCache::try_reserve_from_other_queue_by_size(
std::vector<FileBlockCell*> to_evict;
for (FileCacheType cache_type : other_cache_types) {
auto& queue = get_queue(cache_type);
for (const auto& [entry_key, entry_offset, entry_size] : queue) {
if (!is_overflow(removed_size, size, cur_cache_size)) {
break;
}
auto* cell = get_cell(entry_key, entry_offset, cache_lock);
DCHECK(cell) << "Cache became inconsistent. Key: " << entry_key.to_string()
<< ", offset: " << entry_offset;

size_t cell_size = cell->size();
DCHECK(entry_size == cell_size);

if (cell->releasable()) {
auto& file_block = cell->file_block;

std::lock_guard segment_lock(file_block->_mutex);
DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
to_evict.push_back(cell);
removed_size += cell_size;
}
}
find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock);
}
auto remove_file_block_if = [&](FileBlockCell* cell) {
FileBlockSPtr file_block = cell->file_block;
if (file_block) {
std::lock_guard segment_lock(file_block->_mutex);
remove(file_block, cache_lock, segment_lock);
}
};

std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);

remove_file_blocks(to_evict, cache_lock);
return !is_overflow(removed_size, size, cur_cache_size);
}

Expand Down Expand Up @@ -1238,38 +1187,8 @@ bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
size_t cur_cache_size = _cur_cache_size;

std::vector<FileBlockCell*> to_evict;
for (const auto& [entry_key, entry_offset, entry_size] : queue) {
if (!is_overflow(removed_size, size, cur_cache_size)) {
break;
}
auto* cell = get_cell(entry_key, entry_offset, cache_lock);

DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
<< ", offset: " << entry_offset;

size_t cell_size = cell->size();
DCHECK(entry_size == cell_size);

if (cell->releasable()) {
auto& file_block = cell->file_block;

std::lock_guard block_lock(file_block->_mutex);
DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
to_evict.push_back(cell);

removed_size += cell_size;
}
}

auto remove_file_block_if = [&](FileBlockCell* cell) {
FileBlockSPtr file_block = cell->file_block;
if (file_block) {
std::lock_guard block_lock(file_block->_mutex);
remove(file_block, cache_lock, block_lock);
}
};

std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock);
remove_file_blocks(to_evict, cache_lock);

if (is_overflow(removed_size, size, cur_cache_size)) {
return false;
Expand Down Expand Up @@ -1660,12 +1579,6 @@ BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const {
bool BlockFileCache::try_reserve_for_lazy_load(size_t size,
std::lock_guard<std::mutex>& cache_lock) {
size_t removed_size = 0;
auto remove_file_block_if = [&](FileBlockCell* cell) {
if (FileBlockSPtr file_block = cell->file_block; file_block) {
std::lock_guard block_lock(file_block->_mutex);
remove(file_block, cache_lock, block_lock);
}
};
size_t normal_queue_size = _normal_queue.get_capacity(cache_lock);
size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock);
size_t index_queue_size = _index_queue.get_capacity(cache_lock);
Expand All @@ -1684,18 +1597,14 @@ bool BlockFileCache::try_reserve_for_lazy_load(size_t size,
size_t cell_size = cell->size();
DCHECK(entry_size == cell_size);

/// It is guaranteed that cell is not removed from cache as long as
/// pointer to corresponding file block is hold by any other thread.
if (cell->releasable()) {
auto& file_block = cell->file_block;

if (!cell->releasable()) {
continue;
std::lock_guard block_lock(file_block->_mutex);
DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
to_evict.push_back(cell);
removed_size += cell_size;
}
auto& file_block = cell->file_block;

std::lock_guard block_lock(file_block->_mutex);
DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
to_evict.push_back(cell);
removed_size += cell_size;
}
};
if (disposable_queue_size != 0) {
Expand All @@ -1707,7 +1616,7 @@ bool BlockFileCache::try_reserve_for_lazy_load(size_t size,
if (index_queue_size != 0) {
collect_eliminate_fragments(get_queue(FileCacheType::INDEX));
}
std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
remove_file_blocks(to_evict, cache_lock);

return !_disk_resource_limit_mode || removed_size >= size;
}
Expand Down
9 changes: 9 additions & 0 deletions be/src/io/cache/block_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,15 @@ class BlockFileCache {

bool is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size) const;

void remove_file_blocks(std::vector<FileBlockCell*>&, std::lock_guard<std::mutex>&);

void remove_file_blocks_and_clean_time_maps(std::vector<FileBlockCell*>&,
std::lock_guard<std::mutex>&);

void find_evict_candidates(LRUQueue &queue, size_t size, size_t cur_cache_size,
size_t &removed_size,
std::vector<FileBlockCell*> &to_evict,
std::lock_guard<std::mutex> &cache_lock);
// info
std::string _cache_base_path;
size_t _capacity = 0;
Expand Down

0 comments on commit dfbffcb

Please sign in to comment.