diff --git a/be/src/io/cache/block_file_cache.cpp b/be/src/io/cache/block_file_cache.cpp index 608e98edef2416..1c626cbe7e2b98 100644 --- a/be/src/io/cache/block_file_cache.cpp +++ b/be/src/io/cache/block_file_cache.cpp @@ -754,14 +754,8 @@ const BlockFileCache::LRUQueue& BlockFileCache::get_queue(FileCacheType type) co return _normal_queue; } -bool BlockFileCache::try_reserve_for_ttl_without_lru(size_t size, - std::lock_guard& cache_lock) { - size_t removed_size = 0; - size_t cur_cache_size = _cur_cache_size; - auto limit = config::max_ttl_cache_ratio * _capacity; - if ((_cur_ttl_size + size) * 100 > limit) { - return false; - } +void BlockFileCache::remove_file_blocks(std::vector& to_evict, + std::lock_guard& cache_lock) { auto remove_file_block_if = [&](FileBlockCell* cell) { FileBlockSPtr file_block = cell->file_block; if (file_block) { @@ -769,33 +763,54 @@ bool BlockFileCache::try_reserve_for_ttl_without_lru(size_t size, remove(file_block, cache_lock, block_lock); } }; - size_t normal_queue_size = _normal_queue.get_capacity(cache_lock); - size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock); - size_t index_queue_size = _index_queue.get_capacity(cache_lock); - if (is_overflow(removed_size, size, cur_cache_size) && normal_queue_size == 0 && - disposable_queue_size == 0 && index_queue_size == 0) { - return false; - } - std::vector to_evict; - auto collect_eliminate_fragments = [&](LRUQueue& queue) { - for (const auto& [entry_key, entry_offset, entry_size] : queue) { - if (!is_overflow(removed_size, size, cur_cache_size)) { - break; + std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if); +} + +void BlockFileCache::remove_file_blocks_and_clean_time_maps(std::vector& to_evict, + std::lock_guard& cache_lock) { + auto remove_file_block_and_clean_time_maps_if = [&](FileBlockCell* cell) { + FileBlockSPtr file_block = cell->file_block; + if (file_block) { + std::lock_guard block_lock(file_block->_mutex); + auto hash = cell->file_block->get_hash_value(); + remove(file_block, cache_lock, block_lock); + if (_files.find(hash) == _files.end()) { + if (auto iter = _key_to_time.find(hash); + _key_to_time.find(hash) != _key_to_time.end()) { + auto _time_to_key_iter = _time_to_key.equal_range(iter->second); + while (_time_to_key_iter.first != _time_to_key_iter.second) { + if (_time_to_key_iter.first->second == hash) { + _time_to_key_iter.first = + _time_to_key.erase(_time_to_key_iter.first); + break; + } + _time_to_key_iter.first++; + } + _key_to_time.erase(hash); + } } - auto* cell = get_cell(entry_key, entry_offset, cache_lock); + } + }; + std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_and_clean_time_maps_if); +} - DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string() - << ", offset: " << entry_offset; +void BlockFileCache::find_evict_candidates(LRUQueue &queue, size_t size, size_t cur_cache_size, + size_t &removed_size, + std::vector &to_evict, + std::lock_guard &cache_lock) { + for (const auto& [entry_key, entry_offset, entry_size] : queue) { + if (!is_overflow(removed_size, size, cur_cache_size)) { + break; + } + auto* cell = get_cell(entry_key, entry_offset, cache_lock); - size_t cell_size = cell->size(); - DCHECK(entry_size == cell_size); + DCHECK(cell) << "Cache became inconsistent. key: " << entry_key.to_string() + << ", offset: " << entry_offset; - /// It is guaranteed that cell is not removed from cache as long as - /// pointer to corresponding file block is hold by any other thread. - if (!cell->releasable()) { - continue; - } + size_t cell_size = cell->size(); + DCHECK(entry_size == cell_size); + if (cell->releasable()) { auto& file_block = cell->file_block; std::lock_guard block_lock(file_block->_mutex); @@ -803,6 +818,28 @@ bool BlockFileCache::try_reserve_for_ttl_without_lru(size_t size, to_evict.push_back(cell); removed_size += cell_size; } + } +} + +bool BlockFileCache::try_reserve_for_ttl_without_lru(size_t size, + std::lock_guard& cache_lock) { + size_t removed_size = 0; + size_t cur_cache_size = _cur_cache_size; + auto limit = config::max_ttl_cache_ratio * _capacity; + if ((_cur_ttl_size + size) * 100 > limit) { + return false; + } + + size_t normal_queue_size = _normal_queue.get_capacity(cache_lock); + size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock); + size_t index_queue_size = _index_queue.get_capacity(cache_lock); + if (is_overflow(removed_size, size, cur_cache_size) && normal_queue_size == 0 && + disposable_queue_size == 0 && index_queue_size == 0) { + return false; + } + std::vector to_evict; + auto collect_eliminate_fragments = [&](LRUQueue& queue) { + find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock); }; if (disposable_queue_size != 0) { collect_eliminate_fragments(get_queue(FileCacheType::DISPOSABLE)); @@ -813,7 +850,7 @@ bool BlockFileCache::try_reserve_for_ttl_without_lru(size_t size, if (index_queue_size != 0) { collect_eliminate_fragments(get_queue(FileCacheType::INDEX)); } - std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if); + remove_file_blocks(to_evict, cache_lock); if (is_overflow(removed_size, size, cur_cache_size)) { return false; } @@ -829,54 +866,8 @@ bool BlockFileCache::try_reserve_for_ttl(size_t size, std::lock_guard to_evict; - for (const auto& [entry_key, entry_offset, entry_size] : queue) { - if (!is_overflow(removed_size, size, cur_cache_size)) { - break; - } - auto* cell = get_cell(entry_key, entry_offset, cache_lock); - - DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string() - << ", offset: " << entry_offset; - - size_t cell_size = cell->size(); - DCHECK(entry_size == cell_size); - - if (cell->releasable()) { - auto& file_block = cell->file_block; - - std::lock_guard block_lock(file_block->_mutex); - DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED); - to_evict.push_back(cell); - - removed_size += cell_size; - } - } - - auto remove_file_block_if = [&](FileBlockCell* cell) { - FileBlockSPtr file_block = cell->file_block; - if (file_block) { - std::lock_guard block_lock(file_block->_mutex); - auto hash = cell->file_block->get_hash_value(); - remove(file_block, cache_lock, block_lock); - if (_files.find(hash) == _files.end()) { - if (auto iter = _key_to_time.find(hash); - _key_to_time.find(hash) != _key_to_time.end()) { - auto _time_to_key_iter = _time_to_key.equal_range(iter->second); - while (_time_to_key_iter.first != _time_to_key_iter.second) { - if (_time_to_key_iter.first->second == hash) { - _time_to_key_iter.first = - _time_to_key.erase(_time_to_key_iter.first); - break; - } - _time_to_key_iter.first++; - } - _key_to_time.erase(hash); - } - } - } - }; - - std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if); + find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock); + remove_file_blocks_and_clean_time_maps(to_evict, cache_lock); return !is_overflow(removed_size, size, cur_cache_size); } else { @@ -1069,13 +1060,7 @@ void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) { } } } - auto remove_file_block_if = [&](FileBlockCell* cell) { - if (FileBlockSPtr file_block = cell->file_block; file_block) { - std::lock_guard block_lock(file_block->_mutex); - remove(file_block, cache_lock, block_lock); - } - }; - std::for_each(to_remove.begin(), to_remove.end(), remove_file_block_if); + remove_file_blocks(to_remove, cache_lock); } } @@ -1141,15 +1126,7 @@ bool BlockFileCache::try_reserve_from_other_queue_by_hot_interval( } } } - auto remove_file_block_if = [&](FileBlockCell* cell) { - FileBlockSPtr file_block = cell->file_block; - if (file_block) { - std::lock_guard block_lock(file_block->_mutex); - remove(file_block, cache_lock, block_lock); - } - }; - - std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if); + remove_file_blocks(to_evict, cache_lock); return !is_overflow(removed_size, size, cur_cache_size); } @@ -1168,37 +1145,9 @@ bool BlockFileCache::try_reserve_from_other_queue_by_size( std::vector to_evict; for (FileCacheType cache_type : other_cache_types) { auto& queue = get_queue(cache_type); - for (const auto& [entry_key, entry_offset, entry_size] : queue) { - if (!is_overflow(removed_size, size, cur_cache_size)) { - break; - } - auto* cell = get_cell(entry_key, entry_offset, cache_lock); - DCHECK(cell) << "Cache became inconsistent. Key: " << entry_key.to_string() - << ", offset: " << entry_offset; - - size_t cell_size = cell->size(); - DCHECK(entry_size == cell_size); - - if (cell->releasable()) { - auto& file_block = cell->file_block; - - std::lock_guard segment_lock(file_block->_mutex); - DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED); - to_evict.push_back(cell); - removed_size += cell_size; - } - } + find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock); } - auto remove_file_block_if = [&](FileBlockCell* cell) { - FileBlockSPtr file_block = cell->file_block; - if (file_block) { - std::lock_guard segment_lock(file_block->_mutex); - remove(file_block, cache_lock, segment_lock); - } - }; - - std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if); - + remove_file_blocks(to_evict, cache_lock); return !is_overflow(removed_size, size, cur_cache_size); } @@ -1238,38 +1187,8 @@ bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash, size_t cur_cache_size = _cur_cache_size; std::vector to_evict; - for (const auto& [entry_key, entry_offset, entry_size] : queue) { - if (!is_overflow(removed_size, size, cur_cache_size)) { - break; - } - auto* cell = get_cell(entry_key, entry_offset, cache_lock); - - DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string() - << ", offset: " << entry_offset; - - size_t cell_size = cell->size(); - DCHECK(entry_size == cell_size); - - if (cell->releasable()) { - auto& file_block = cell->file_block; - - std::lock_guard block_lock(file_block->_mutex); - DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED); - to_evict.push_back(cell); - - removed_size += cell_size; - } - } - - auto remove_file_block_if = [&](FileBlockCell* cell) { - FileBlockSPtr file_block = cell->file_block; - if (file_block) { - std::lock_guard block_lock(file_block->_mutex); - remove(file_block, cache_lock, block_lock); - } - }; - - std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if); + find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock); + remove_file_blocks(to_evict, cache_lock); if (is_overflow(removed_size, size, cur_cache_size)) { return false; @@ -1660,12 +1579,6 @@ BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const { bool BlockFileCache::try_reserve_for_lazy_load(size_t size, std::lock_guard& cache_lock) { size_t removed_size = 0; - auto remove_file_block_if = [&](FileBlockCell* cell) { - if (FileBlockSPtr file_block = cell->file_block; file_block) { - std::lock_guard block_lock(file_block->_mutex); - remove(file_block, cache_lock, block_lock); - } - }; size_t normal_queue_size = _normal_queue.get_capacity(cache_lock); size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock); size_t index_queue_size = _index_queue.get_capacity(cache_lock); @@ -1684,18 +1597,14 @@ bool BlockFileCache::try_reserve_for_lazy_load(size_t size, size_t cell_size = cell->size(); DCHECK(entry_size == cell_size); - /// It is guaranteed that cell is not removed from cache as long as - /// pointer to corresponding file block is hold by any other thread. + if (cell->releasable()) { + auto& file_block = cell->file_block; - if (!cell->releasable()) { - continue; + std::lock_guard block_lock(file_block->_mutex); + DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED); + to_evict.push_back(cell); + removed_size += cell_size; } - auto& file_block = cell->file_block; - - std::lock_guard block_lock(file_block->_mutex); - DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED); - to_evict.push_back(cell); - removed_size += cell_size; } }; if (disposable_queue_size != 0) { @@ -1707,7 +1616,7 @@ bool BlockFileCache::try_reserve_for_lazy_load(size_t size, if (index_queue_size != 0) { collect_eliminate_fragments(get_queue(FileCacheType::INDEX)); } - std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if); + remove_file_blocks(to_evict, cache_lock); return !_disk_resource_limit_mode || removed_size >= size; } diff --git a/be/src/io/cache/block_file_cache.h b/be/src/io/cache/block_file_cache.h index 8f45d0f7800e03..f78b0bf4ae77b3 100644 --- a/be/src/io/cache/block_file_cache.h +++ b/be/src/io/cache/block_file_cache.h @@ -384,6 +384,15 @@ class BlockFileCache { bool is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size) const; + void remove_file_blocks(std::vector&, std::lock_guard&); + + void remove_file_blocks_and_clean_time_maps(std::vector&, + std::lock_guard&); + + void find_evict_candidates(LRUQueue &queue, size_t size, size_t cur_cache_size, + size_t &removed_size, + std::vector &to_evict, + std::lock_guard &cache_lock); // info std::string _cache_base_path; size_t _capacity = 0;