Skip to content

Commit

Permalink
[enhancement](file-cache) limit the file cache handle num and init th…
Browse files Browse the repository at this point in the history
…e file cache concurrently (#22919)

1. the real value of BE config `file_cache_max_file_reader_cache_size` will be the 1/3 of process's max open file number.
2. use thread pool to create or init the file cache concurrently.
    To solve the issue that when there are lots of files in file cache dir, the starting time of BE will be very slow because
    it will traverse all file cache dirs sequentially.
  • Loading branch information
morningman authored and xiaokang committed Aug 17, 2023
1 parent 3cf4da1 commit c272fa4
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 21 deletions.
20 changes: 19 additions & 1 deletion be/src/io/cache/block/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

#include <glog/logging.h>
// IWYU pragma: no_include <bits/chrono.h>
#include <sys/resource.h>

#include <chrono> // IWYU pragma: keep
#include <filesystem>
#include <utility>
Expand Down Expand Up @@ -176,7 +178,7 @@ std::weak_ptr<FileReader> IFileCache::cache_file_reader(const AccessKeyAndOffset
std::weak_ptr<FileReader> wp;
if (!s_read_only) [[likely]] {
std::lock_guard lock(s_file_reader_cache_mtx);
if (config::file_cache_max_file_reader_cache_size == s_file_reader_cache.size()) {
if (s_file_reader_cache.size() >= _max_file_reader_cache_size) {
s_file_name_to_reader.erase(s_file_reader_cache.back().first);
s_file_reader_cache.pop_back();
}
Expand Down Expand Up @@ -205,5 +207,21 @@ size_t IFileCache::file_reader_cache_size() {
return s_file_name_to_reader.size();
}

void IFileCache::init() {
struct rlimit limit;
if (getrlimit(RLIMIT_NOFILE, &limit) != 0) {
LOG(FATAL) << "getrlimit() failed with errno: " << errno;
return;
}

_max_file_reader_cache_size =
std::min((uint64_t)config::file_cache_max_file_reader_cache_size, limit.rlim_max / 3);
LOG(INFO) << "max file reader cache size is: " << _max_file_reader_cache_size
<< ", resource hard limit is: " << limit.rlim_max
<< ", config file_cache_max_file_reader_cache_size is: "
<< config::file_cache_max_file_reader_cache_size;
return;
}

} // namespace io
} // namespace doris
4 changes: 4 additions & 0 deletions be/src/io/cache/block/block_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,12 @@ class IFileCache {
s_file_name_to_reader;
static inline std::mutex s_file_reader_cache_mtx;
static inline std::atomic_bool s_read_only {false};
static inline uint64_t _max_file_reader_cache_size = 65533;

public:
// should be call when BE start
static void init();

static void set_read_only(bool read_only);

static bool read_only() { return s_read_only; }
Expand Down
23 changes: 17 additions & 6 deletions be/src/io/cache/block/block_file_cache_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ size_t FileCacheFactory::try_release(const std::string& base_path) {
return 0;
}

Status FileCacheFactory::create_file_cache(const std::string& cache_base_path,
const FileCacheSettings& file_cache_settings) {
void FileCacheFactory::create_file_cache(const std::string& cache_base_path,
const FileCacheSettings& file_cache_settings,
Status* status) {
if (config::clear_file_cache) {
auto fs = global_local_filesystem();
bool res = false;
Expand All @@ -71,12 +72,22 @@ Status FileCacheFactory::create_file_cache(const std::string& cache_base_path,

std::unique_ptr<IFileCache> cache =
std::make_unique<LRUFileCache>(cache_base_path, file_cache_settings);
RETURN_IF_ERROR(cache->initialize());
_path_to_cache[cache_base_path] = cache.get();
_caches.push_back(std::move(cache));
*status = cache->initialize();
if (!status->ok()) {
return;
}

{
// the create_file_cache() may be called concurrently,
// so need to protect it with lock
std::lock_guard<std::mutex> lock(_cache_mutex);
_path_to_cache[cache_base_path] = cache.get();
_caches.push_back(std::move(cache));
}
LOG(INFO) << "[FileCache] path: " << cache_base_path
<< " total_size: " << file_cache_settings.total_size;
return Status::OK();
*status = Status::OK();
return;
}

CloudFileCachePtr FileCacheFactory::get_by_path(const IFileCache::Key& key) {
Expand Down
6 changes: 4 additions & 2 deletions be/src/io/cache/block/block_file_cache_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class FileCacheFactory {
public:
static FileCacheFactory& instance();

Status create_file_cache(const std::string& cache_base_path,
const FileCacheSettings& file_cache_settings);
void create_file_cache(const std::string& cache_base_path,
const FileCacheSettings& file_cache_settings, Status* status);

size_t try_release();

Expand All @@ -55,6 +55,8 @@ class FileCacheFactory {
FileCacheFactory(const FileCacheFactory&) = delete;

private:
// to protect following containers
std::mutex _cache_mutex;
std::vector<std::unique_ptr<IFileCache>> _caches;
std::unordered_map<std::string, CloudFileCachePtr> _path_to_cache;
};
Expand Down
8 changes: 6 additions & 2 deletions be/src/io/cache/block/block_lru_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#include "io/fs/path.h"
#include "util/doris_metrics.h"
#include "util/slice.h"
#include "util/stopwatch.hpp"
#include "vec/common/hex.h"

namespace fs = std::filesystem;
Expand Down Expand Up @@ -118,6 +119,8 @@ LRUFileCache::LRUFileCache(const std::string& cache_base_path,
}

Status LRUFileCache::initialize() {
MonotonicStopWatch watch;
watch.start();
std::lock_guard cache_lock(_mutex);
if (!_is_initialized) {
if (fs::exists(_cache_base_path)) {
Expand All @@ -134,17 +137,18 @@ Status LRUFileCache::initialize() {
}
_is_initialized = true;
_cache_background_thread = std::thread(&LRUFileCache::run_background_operation, this);
int64_t cost = watch.elapsed_time() / 1000 / 1000;
LOG(INFO) << fmt::format(
"After initialize file cache path={}, disposable queue size={} elements={}, index "
"queue size={} "
"elements={}, query queue "
"size={} elements={}",
"size={} elements={}, init cost(ms)={}",
_cache_base_path, _disposable_queue.get_total_cache_size(cache_lock),
_disposable_queue.get_elements_num(cache_lock),
_index_queue.get_total_cache_size(cache_lock),
_index_queue.get_elements_num(cache_lock),
_normal_queue.get_total_cache_size(cache_lock),
_normal_queue.get_elements_num(cache_lock));
_normal_queue.get_elements_num(cache_lock), cost);
return Status::OK();
}

Expand Down
32 changes: 28 additions & 4 deletions be/src/service/doris_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ int main(int argc, char** argv) {
}

if (doris::config::enable_file_cache) {
doris::io::IFileCache::init();
std::unordered_set<std::string> cache_path_set;
std::vector<doris::CachePath> cache_paths;
olap_res = doris::parse_conf_cache_paths(doris::config::file_cache_path, cache_paths);
Expand All @@ -400,16 +401,39 @@ int main(int argc, char** argv) {
<< doris::config::file_cache_path;
exit(-1);
}

std::unique_ptr<doris::ThreadPool> file_cache_init_pool;
doris::ThreadPoolBuilder("FileCacheInitThreadPool")
.set_min_threads(cache_paths.size())
.set_max_threads(cache_paths.size())
.build(&file_cache_init_pool);

std::vector<doris::Status> cache_status;
for (auto& cache_path : cache_paths) {
if (cache_path_set.find(cache_path.path) != cache_path_set.end()) {
LOG(WARNING) << fmt::format("cache path {} is duplicate", cache_path.path);
continue;
}

cache_status.push_back(Status::OK());
RETURN_IF_ERROR(file_cache_init_pool->submit_func(
std::bind(&doris::io::FileCacheFactory::create_file_cache,
&(doris::io::FileCacheFactory::instance()), cache_path.path,
cache_path.init_settings(), &(cache_status.back()))));

cache_path_set.emplace(cache_path.path);
Status st = doris::io::FileCacheFactory::instance().create_file_cache(
cache_path.path, cache_path.init_settings());
if (!st) {
LOG(FATAL) << st;
// Status st = doris::io::FileCacheFactory::instance().create_file_cache(
// cache_path.path, cache_path.init_settings());
// if (!st) {
// LOG(FATAL) << st;
// exit(-1);
// }
}

file_cache_init_pool->wait();
for (int i = 0; i < cache_status.size(); ++i) {
if (!cache_status[i].ok()) {
LOG(FATAL) << "failed to init file cache: " << i << ", err: " << cache_status[i];
exit(-1);
}
}
Expand Down
1 change: 1 addition & 0 deletions be/test/io/cache/file_block_cache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,7 @@ TEST(LRUFileCache, fd_cache_evict) {
context.cache_type = io::CacheType::NORMAL;
auto key = io::LRUFileCache::hash("key1");
config::file_cache_max_file_reader_cache_size = 2;
IFileCache::init();
{
auto holder = cache.get_or_set(key, 0, 9, context); /// Add range [0, 8]
auto segments = fromHolder(holder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,17 @@ suite("test_truncate_char_or_varchar_columns", "p2") {
// test parquet format
def q01_parquet = {
qt_q01 """ select * from multi_catalog.test_truncate_char_or_varchar_columns_parquet order by id """
qt_q02 """ select city, concat("at ", city, " in ", country) from regression.multi_catalog.test_truncate_char_or_varchar_columns_parquet order by id """
qt_q02 """ select city, concat("at ", city, " in ", country) from ${catalog_name}.multi_catalog.test_truncate_char_or_varchar_columns_parquet order by id """
}
// test orc format
def q01_orc = {
qt_q01 """ select * from multi_catalog.test_truncate_char_or_varchar_columns_orc order by id """
qt_q02 """ select city, concat("at ", city, " in ", country) from regression.multi_catalog.test_truncate_char_or_varchar_columns_orc order by id """
qt_q02 """ select city, concat("at ", city, " in ", country) from ${catalog_name}.multi_catalog.test_truncate_char_or_varchar_columns_orc order by id """
}
// test text format
def q01_text = {
qt_q01 """ select * from multi_catalog.test_truncate_char_or_varchar_columns_text order by id """
qt_q02 """ select city, concat("at ", city, " in ", country) from regression.multi_catalog.test_truncate_char_or_varchar_columns_text order by id """
qt_q02 """ select city, concat("at ", city, " in ", country) from ${catalog_name}.multi_catalog.test_truncate_char_or_varchar_columns_text order by id """
}
sql """ use `multi_catalog`; """
q01_parquet()
Expand All @@ -57,17 +57,17 @@ suite("test_truncate_char_or_varchar_columns", "p2") {
// test parquet format
def q02_parquet = {
qt_q01 """ select * from multi_catalog.test_truncate_char_or_varchar_columns_parquet order by id """
qt_q02 """ select city, concat("at ", city, " in ", country) from regression.multi_catalog.test_truncate_char_or_varchar_columns_parquet order by id """
qt_q02 """ select city, concat("at ", city, " in ", country) from ${catalog_name}.multi_catalog.test_truncate_char_or_varchar_columns_parquet order by id """
}
// test orc format
def q02_orc = {
qt_q01 """ select * from multi_catalog.test_truncate_char_or_varchar_columns_orc order by id """
qt_q02 """ select city, concat("at ", city, " in ", country) from regression.multi_catalog.test_truncate_char_or_varchar_columns_orc order by id """
qt_q02 """ select city, concat("at ", city, " in ", country) from ${catalog_name}.multi_catalog.test_truncate_char_or_varchar_columns_orc order by id """
}
// test text format
def q02_text = {
qt_q01 """ select * from multi_catalog.test_truncate_char_or_varchar_columns_text order by id """
qt_q02 """ select city, concat("at ", city, " in ", country) from regression.multi_catalog.test_truncate_char_or_varchar_columns_text order by id """
qt_q02 """ select city, concat("at ", city, " in ", country) from ${catalog_name}.multi_catalog.test_truncate_char_or_varchar_columns_text order by id """
}
sql """ use `multi_catalog`; """
q02_parquet()
Expand Down

0 comments on commit c272fa4

Please sign in to comment.