Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(cloud-merge) Supports online capacity expansion and contraction #37484

Merged
merged 1 commit into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 0 additions & 40 deletions be/src/http/action/clear_file_cache_action.cpp

This file was deleted.

32 changes: 0 additions & 32 deletions be/src/http/action/clear_file_cache_action.h

This file was deleted.

54 changes: 44 additions & 10 deletions be/src/http/action/file_cache_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,59 @@

namespace doris {

const static std::string HEADER_JSON = "application/json";
const static std::string OP = "op";
constexpr static std::string_view HEADER_JSON = "application/json";
constexpr static std::string_view OP = "op";
constexpr static std::string_view SYNC = "sync";
constexpr static std::string_view PATH = "path";
constexpr static std::string_view CLEAR = "clear";
constexpr static std::string_view RESET = "reset";
constexpr static std::string_view CAPACITY = "capacity";
constexpr static std::string_view RELEASE = "release";
constexpr static std::string_view BASE_PATH = "base_path";
constexpr static std::string_view RELEASED_ELEMENTS = "released_elements";

Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metrics) {
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
std::string operation = req->param(OP);
if (operation == "release") {
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.data());
std::string operation = req->param(OP.data());
Status st = Status::OK();
if (operation == RELEASE) {
size_t released = 0;
if (req->param("base_path") != "") {
released = io::FileCacheFactory::instance()->try_release(req->param("base_path"));
const std::string& base_path = req->param(BASE_PATH.data());
if (!base_path.empty()) {
released = io::FileCacheFactory::instance()->try_release(base_path);
} else {
released = io::FileCacheFactory::instance()->try_release();
}
EasyJson json;
json["released_elements"] = released;
json[RELEASED_ELEMENTS.data()] = released;
*json_metrics = json.ToString();
return Status::OK();
} else if (operation == CLEAR) {
const std::string& sync = req->param(SYNC.data());
auto ret = io::FileCacheFactory::instance()->clear_file_caches(to_lower(sync) == "true");
} else if (operation == RESET) {
Status st;
std::string capacity = req->param(CAPACITY.data());
int64_t new_capacity = 0;
bool parse = true;
try {
new_capacity = std::stoll(capacity);
} catch (...) {
parse = false;
}
if (!parse || new_capacity <= 0) {
st = Status::InvalidArgument(
"The capacity {} failed to be parsed, the capacity needs to be in "
"the interval (0, INT64_MAX]",
capacity);
} else {
const std::string& path = req->param(PATH.data());
auto ret = io::FileCacheFactory::instance()->reset_capacity(path, new_capacity);
LOG(INFO) << ret;
}
} else {
st = Status::InternalError("invalid operation: {}", operation);
}
return Status::InternalError("invalid operation: {}", operation);
return st;
}

void FileCacheAction::handle(HttpRequest* req) {
Expand Down
69 changes: 65 additions & 4 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1408,11 +1408,72 @@ int disk_used_percentage(const std::string& path, std::pair<int, int>* percent)
return 0;
}

void BlockFileCache::check_disk_resource_limit(const std::string& path) {
std::string BlockFileCache::reset_capacity(size_t new_capacity) {
using namespace std::chrono;
int64_t space_released = 0;
size_t old_capacity = 0;
std::stringstream ss;
ss << "finish reset_capacity, path=" << _cache_base_path;
auto start_time = steady_clock::time_point();
{
std::lock_guard cache_lock(_mutex);
if (new_capacity < _capacity && new_capacity < _cur_cache_size) {
int64_t need_remove_size = _cur_cache_size - new_capacity;
auto remove_blocks = [&](LRUQueue& queue) -> int64_t {
int64_t queue_released = 0;
for (const auto& [entry_key, entry_offset, entry_size] : queue) {
if (need_remove_size <= 0) return queue_released;
auto* cell = get_cell(entry_key, entry_offset, cache_lock);
if (!cell->releasable()) continue;
cell->is_deleted = true;
need_remove_size -= entry_size;
space_released += entry_size;
queue_released += entry_size;
}
return queue_released;
};
int64_t queue_released = remove_blocks(_disposable_queue);
ss << " disposable_queue released " << queue_released;
queue_released = remove_blocks(_normal_queue);
ss << " normal_queue released " << queue_released;
queue_released = remove_blocks(_index_queue);
ss << " index_queue released " << queue_released;
if (need_remove_size >= 0) {
queue_released = 0;
for (auto& [_, key] : _time_to_key) {
for (auto& [_, cell] : _files[key]) {
if (need_remove_size <= 0) break;
cell.is_deleted = true;
need_remove_size -= cell.file_block->range().size();
space_released += cell.file_block->range().size();
queue_released += cell.file_block->range().size();
}
}
ss << " ttl_queue released " << queue_released;
}
_disk_resource_limit_mode = true;
_async_clear_file_cache = true;
ss << " total_space_released=" << space_released;
}
old_capacity = _capacity;
_capacity = new_capacity;
}
auto use_time = duration_cast<milliseconds>(steady_clock::time_point() - start_time);
LOG(INFO) << "Finish tag deleted block. path=" << _cache_base_path
<< " use_time=" << static_cast<int64_t>(use_time.count());
ss << " old_capacity=" << old_capacity << " new_capacity=" << new_capacity;
LOG(INFO) << ss.str();
return ss.str();
}

void BlockFileCache::check_disk_resource_limit() {
if (_capacity > _cur_cache_size) {
_disk_resource_limit_mode = false;
}
std::pair<int, int> percent;
int ret = disk_used_percentage(path, &percent);
int ret = disk_used_percentage(_cache_base_path, &percent);
if (ret != 0) {
LOG_ERROR("").tag("file cache path", path).tag("error", strerror(errno));
LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
return;
}
auto [capacity_percentage, inode_percentage] = percent;
Expand Down Expand Up @@ -1452,7 +1513,7 @@ void BlockFileCache::run_background_operation() {
int64_t interval_time_seconds = 20;
while (!_close) {
TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_time_seconds);
check_disk_resource_limit(_cache_base_path);
check_disk_resource_limit();
{
std::unique_lock close_lock(_close_mtx);
_close_cv.wait_for(close_lock, std::chrono::seconds(interval_time_seconds));
Expand Down
10 changes: 9 additions & 1 deletion be/src/io/cache/block_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ class BlockFileCache {
*/
std::string clear_file_cache_async();
std::string clear_file_cache_directly();

/**
* Reset the cache capacity. If the new_capacity is smaller than _capacity, the redundant data will be remove async.
*
* @returns summary message
*/
std::string reset_capacity(size_t new_capacity);

std::map<size_t, FileBlockSPtr> get_blocks_by_key(const UInt128Wrapper& hash);
/// For debug.
std::string dump_structure(const UInt128Wrapper& hash);
Expand Down Expand Up @@ -358,7 +366,7 @@ class BlockFileCache {
size_t get_used_cache_size_unlocked(FileCacheType type,
std::lock_guard<std::mutex>& cache_lock) const;

void check_disk_resource_limit(const std::string& path);
void check_disk_resource_limit();

size_t get_available_cache_size_unlocked(FileCacheType type,
std::lock_guard<std::mutex>& cache_lock) const;
Expand Down
19 changes: 17 additions & 2 deletions be/src/io/cache/block_file_cache_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ Status FileCacheFactory::create_file_cache(const std::string& cache_base_path,
size_t disk_capacity = static_cast<size_t>(
static_cast<size_t>(stat.f_blocks) * static_cast<size_t>(stat.f_bsize) *
(static_cast<double>(config::file_cache_enter_disk_resource_limit_mode_percent) / 100));
if (disk_capacity < file_cache_settings.capacity) {
LOG_INFO("The cache {} config size {} is larger than {}% disk size {}, recalc it.",
if (file_cache_settings.capacity == 0 || disk_capacity < file_cache_settings.capacity) {
LOG_INFO("The cache {} config size {} is larger than {}% disk size {} or zero, recalc it.",
cache_base_path, file_cache_settings.capacity,
config::file_cache_enter_disk_resource_limit_mode_percent, disk_capacity);
file_cache_settings =
Expand Down Expand Up @@ -143,5 +143,20 @@ std::vector<std::string> FileCacheFactory::get_base_paths() {
return paths;
}

std::string FileCacheFactory::reset_capacity(const std::string& path, int64_t new_capacity) {
if (path.empty()) {
std::stringstream ss;
for (auto& [_, cache] : _path_to_cache) {
ss << cache->reset_capacity(new_capacity);
}
return ss.str();
} else {
if (auto iter = _path_to_cache.find(path); iter != _path_to_cache.end()) {
return iter->second->reset_capacity(new_capacity);
}
}
return "Unknown the cache path " + path;
}

} // namespace io
} // namespace doris
9 changes: 9 additions & 0 deletions be/src/io/cache/block_file_cache_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ class FileCacheFactory {

std::vector<std::string> get_base_paths();

/**
* Clears data of all file cache instances
*
* @param path file cache absolute path
* @param new_capacity
* @return summary message
*/
std::string reset_capacity(const std::string& path, int64_t new_capacity);

FileCacheFactory() = default;
FileCacheFactory& operator=(const FileCacheFactory&) = delete;
FileCacheFactory(const FileCacheFactory&) = delete;
Expand Down
1 change: 1 addition & 0 deletions be/src/io/cache/file_cache_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ FileCacheSettings get_file_cache_settings(size_t capacity, size_t max_query_cach
size_t normal_percent, size_t disposable_percent,
size_t index_percent) {
io::FileCacheSettings settings;
if (capacity == 0) return settings;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: statement should be inside braces [readability-braces-around-statements]

Suggested change
if (capacity == 0) return settings;
if (capacity == 0) { return settings;
}

settings.capacity = capacity;
settings.max_file_block_size = config::file_cache_each_block_size;
settings.max_query_cache_size = max_query_cache_size;
Expand Down
9 changes: 4 additions & 5 deletions be/src/olap/options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ Status parse_conf_cache_paths(const std::string& config_path, std::vector<CacheP
if (value.IsInt64()) {
total_size = value.GetInt64();
} else {
return Status::InvalidArgument("total_size should be int64");
total_size = 0;
}
}
if (config::enable_file_cache_query_limit) {
Expand All @@ -230,13 +230,12 @@ Status parse_conf_cache_paths(const std::string& config_path, std::vector<CacheP
if (value.IsInt64()) {
query_limit_bytes = value.GetInt64();
} else {
return Status::InvalidArgument("query_limit should be int64");
query_limit_bytes = 0;
}
}
}
if (total_size <= 0 || (config::enable_file_cache_query_limit && query_limit_bytes <= 0)) {
return Status::InvalidArgument(
"total_size or query_limit should not less than or equal to zero");
if (total_size < 0 || (config::enable_file_cache_query_limit && query_limit_bytes < 0)) {
return Status::InvalidArgument("total_size or query_limit should not less than zero");
}

// percent
Expand Down
14 changes: 4 additions & 10 deletions be/src/service/http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
#include "http/action/check_tablet_segment_action.h"
#include "http/action/checksum_action.h"
#include "http/action/clear_cache_action.h"
#include "http/action/clear_file_cache_action.h"
#include "http/action/compaction_action.h"
#include "http/action/config_action.h"
#include "http/action/debug_point_action.h"
Expand Down Expand Up @@ -208,9 +207,6 @@ Status HttpService::start() {
_pool.add(new MetaAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
_ev_http_server->register_handler(HttpMethod::GET, "/api/meta/{op}/{tablet_id}", meta_action);

FileCacheAction* file_cache_action = _pool.add(new FileCacheAction());
_ev_http_server->register_handler(HttpMethod::GET, "/api/file_cache", file_cache_action);

ConfigAction* update_config_action =
_pool.add(new ConfigAction(ConfigActionType::UPDATE_CONFIG));
_ev_http_server->register_handler(HttpMethod::POST, "/api/update_config", update_config_action);
Expand Down Expand Up @@ -303,9 +299,8 @@ void HttpService::register_local_handler(StorageEngine& engine) {
_ev_http_server->register_handler(HttpMethod::HEAD, "/api/_binlog/_download",
download_binlog_action);

ClearFileCacheAction* clear_file_cache_action = _pool.add(new ClearFileCacheAction());
_ev_http_server->register_handler(HttpMethod::POST, "/api/clear_file_cache",
clear_file_cache_action);
FileCacheAction* file_cache_action = _pool.add(new FileCacheAction());
_ev_http_server->register_handler(HttpMethod::POST, "/api/file_cache", file_cache_action);

TabletsDistributionAction* tablets_distribution_action =
_pool.add(new TabletsDistributionAction(_env, engine, TPrivilegeHier::GLOBAL,
Expand Down Expand Up @@ -400,9 +395,8 @@ void HttpService::register_cloud_handler(CloudStorageEngine& engine) {
_ev_http_server->register_handler(HttpMethod::GET, "/api/injection_point/{op}",
injection_point_action);
#endif
ClearFileCacheAction* clear_file_cache_action = _pool.add(new ClearFileCacheAction());
_ev_http_server->register_handler(HttpMethod::POST, "/api/clear_file_cache",
clear_file_cache_action);
FileCacheAction* file_cache_action = _pool.add(new FileCacheAction());
_ev_http_server->register_handler(HttpMethod::GET, "/api/file_cache", file_cache_action);
auto* show_hotspot_action = _pool.add(new ShowHotspotAction(engine));
_ev_http_server->register_handler(HttpMethod::GET, "/api/hotspot/tablet", show_hotspot_action);

Expand Down
Loading
Loading