Skip to content

Commit

Permalink
feat(manual_compaction): meta server support starting compaction (#989)
Browse files Browse the repository at this point in the history
  • Loading branch information
hycdong authored Dec 17, 2021
1 parent fac96a3 commit e8cee55
Show file tree
Hide file tree
Showing 10 changed files with 290 additions and 1 deletion.
1 change: 1 addition & 0 deletions include/dsn/dist/replication/replication.codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ MAKE_EVENT_CODE_RPC(RPC_CM_CONTROL_BULK_LOAD, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_BULK_LOAD_STATUS, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_START_BACKUP_APP, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_BACKUP_STATUS, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_START_MANUAL_COMPACT, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_MANUAL_COMPACT_STATUS, TASK_PRIORITY_COMMON)
#undef CURRENT_THREAD_POOL

Expand Down
6 changes: 6 additions & 0 deletions include/dsn/dist/replication/replication_ddl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,12 @@ class replication_ddl_client
error_with<add_new_disk_response> add_new_disk(const rpc_address &target_node,
const std::string &disk_str);

error_with<start_app_manual_compact_response>
start_app_manual_compact(const std::string &app_name,
bool bottommost = false,
const int32_t level = -1,
const int32_t max_count = 0);

error_with<query_app_manual_compact_response>
query_app_manual_compact(const std::string &app_name);

Expand Down
14 changes: 14 additions & 0 deletions src/client/replication_ddl_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1684,6 +1684,20 @@ replication_ddl_client::add_new_disk(const rpc_address &target_node, const std::
return resps.begin()->second.get_value();
}

error_with<start_app_manual_compact_response> replication_ddl_client::start_app_manual_compact(
const std::string &app_name, bool bottommost, const int32_t level, const int32_t max_count)
{
auto req = make_unique<start_app_manual_compact_request>();
req->app_name = app_name;
req->__set_trigger_time(dsn_now_s());
req->__set_target_level(level);
req->__set_bottommost(bottommost);
if (max_count > 0) {
req->__set_max_running_count(max_count);
}
return call_rpc_sync(start_manual_compact_rpc(std::move(req), RPC_CM_START_MANUAL_COMPACT));
}

error_with<query_app_manual_compact_response>
replication_ddl_client::query_app_manual_compact(const std::string &app_name)
{
Expand Down
20 changes: 20 additions & 0 deletions src/common/meta_admin.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,26 @@ struct configuration_update_app_env_response
2:string hint_message;
}

struct start_app_manual_compact_request
{
1:string app_name;
2:optional i64 trigger_time;
3:optional i32 target_level;
4:optional bool bottommost;
5:optional i32 max_running_count;
}

struct start_app_manual_compact_response
{
// Possible error:
// - ERR_APP_NOT_EXIST: app not exist
// - ERR_APP_DROPPED: app has been dropped
// - ERR_OPERATION_DISABLED: app disable manual compaction
// - ERR_INVALID_PARAMETERS: invalid manual compaction parameters
1:dsn.error_code err;
2:string hint_msg;
}

struct query_app_manual_compact_request
{
1:string app_name;
Expand Down
3 changes: 3 additions & 0 deletions src/common/replication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ typedef rpc_holder<notify_stop_split_request, notify_stop_split_response> notify
typedef rpc_holder<query_child_state_request, query_child_state_response> query_child_state_rpc;

typedef rpc_holder<backup_request, backup_response> backup_rpc;

typedef rpc_holder<start_app_manual_compact_request, start_app_manual_compact_response>
start_manual_compact_rpc;
typedef rpc_holder<query_app_manual_compact_request, query_app_manual_compact_response>
query_manual_compact_rpc;

Expand Down
13 changes: 13 additions & 0 deletions src/meta/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,9 @@ void meta_service::register_rpc_handlers()
RPC_CM_START_BACKUP_APP, "start_backup_app", &meta_service::on_start_backup_app);
register_rpc_handler_with_rpc_holder(
RPC_CM_QUERY_BACKUP_STATUS, "query_backup_status", &meta_service::on_query_backup_status);
register_rpc_handler_with_rpc_holder(RPC_CM_START_MANUAL_COMPACT,
"start_manual_compact",
&meta_service::on_start_manual_compact);
register_rpc_handler_with_rpc_holder(RPC_CM_QUERY_MANUAL_COMPACT_STATUS,
"query_manual_compact_status",
&meta_service::on_query_manual_compact_status);
Expand Down Expand Up @@ -1222,6 +1225,16 @@ size_t meta_service::get_alive_node_count() const
return _alive_set.size();
}

void meta_service::on_start_manual_compact(start_manual_compact_rpc rpc)
{
if (!check_status(rpc)) {
return;
}
tasking::enqueue(LPC_META_STATE_NORMAL,
nullptr,
std::bind(&server_state::on_start_manual_compact, _state.get(), rpc));
}

void meta_service::on_query_manual_compact_status(query_manual_compact_rpc rpc)
{
if (!check_status(rpc)) {
Expand Down
1 change: 1 addition & 0 deletions src/meta/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ class meta_service : public serverlet<meta_service>
void on_query_bulk_load_status(query_bulk_load_rpc rpc);

// manual compaction
void on_start_manual_compact(start_manual_compact_rpc rpc);
void on_query_manual_compact_status(query_manual_compact_rpc rpc);

// common routines
Expand Down
134 changes: 134 additions & 0 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
*/

#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/replica_envs.h>
#include <dsn/utility/factory_store.h>
#include <dsn/utility/string_conv.h>
#include <dsn/tool-api/task.h>
Expand Down Expand Up @@ -2943,6 +2944,139 @@ bool server_state::validate_target_max_replica_count(int32_t max_replica_count)
return valid;
}

void server_state::on_start_manual_compact(start_manual_compact_rpc rpc)
{
const std::string &app_name = rpc.request().app_name;
auto &response = rpc.response();

std::map<std::string, std::string> envs;
{
zauto_read_lock l(_lock);
auto app = get_app(app_name);
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
response.err = app == nullptr ? ERR_APP_NOT_EXIST : ERR_APP_DROPPED;
response.hint_msg =
fmt::format("app {} is {}",
app_name,
response.err == ERR_APP_NOT_EXIST ? "not existed" : "not available");
derror_f("{}", response.hint_msg);
return;
}
envs = app->envs;
}

auto iter = envs.find(replica_envs::MANUAL_COMPACT_DISABLED);
if (iter != envs.end() && iter->second == "true") {
response.err = ERR_OPERATION_DISABLED;
response.hint_msg = fmt::format("app {} disable manual compaction", app_name);
derror_f("{}", response.hint_msg);
return;
}

std::vector<std::string> keys;
std::vector<std::string> values;
if (!parse_compaction_envs(rpc, keys, values)) {
return;
}

update_compaction_envs_on_remote_storage(rpc, keys, values);

// update local manual compaction status
{
zauto_write_lock l(_lock);
auto app = get_app(app_name);
app->helpers->reset_manual_compact_status();
}
}

bool server_state::parse_compaction_envs(start_manual_compact_rpc rpc,
std::vector<std::string> &keys,
std::vector<std::string> &values)
{
const auto &request = rpc.request();
auto &response = rpc.response();

int32_t target_level = -1;
if (request.__isset.target_level) {
target_level = request.target_level;
if (target_level < -1) {
response.err = ERR_INVALID_PARAMETERS;
response.hint_msg = fmt::format(
"invalid target_level({}), should in range of [-1, num_levels]", target_level);
derror_f("{}", response.hint_msg);
return false;
}
}
keys.emplace_back(replica_envs::MANUAL_COMPACT_ONCE_TARGET_LEVEL);
values.emplace_back(std::to_string(target_level));

if (request.__isset.max_running_count) {
if (request.max_running_count < 0) {
response.err = ERR_INVALID_PARAMETERS;
response.hint_msg =
fmt::format("invalid max_running_count({}), should be greater than 0",
request.max_running_count);
derror_f("{}", response.hint_msg);
return false;
}
if (request.max_running_count > 0) {
keys.emplace_back(replica_envs::MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT);
values.emplace_back(std::to_string(request.max_running_count));
}
}

std::string bottommost = "skip";
if (request.__isset.bottommost && request.bottommost) {
bottommost = "force";
}
keys.emplace_back(replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION);
values.emplace_back(bottommost);

int64_t trigger_time = dsn_now_s();
if (request.__isset.trigger_time) {
trigger_time = request.trigger_time;
}
keys.emplace_back(replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME);
values.emplace_back(std::to_string(trigger_time));

return true;
}

void server_state::update_compaction_envs_on_remote_storage(start_manual_compact_rpc rpc,
const std::vector<std::string> &keys,
const std::vector<std::string> &values)
{
const std::string &app_name = rpc.request().app_name;
std::string app_path = "";
app_info ainfo;
{
zauto_read_lock l(_lock);
auto app = get_app(app_name);
ainfo = *(reinterpret_cast<app_info *>(app.get()));
app_path = get_app_path(*app);
}
for (auto idx = 0; idx < keys.size(); idx++) {
ainfo.envs[keys[idx]] = values[idx];
}
do_update_app_info(app_path, ainfo, [this, app_name, keys, values, rpc](error_code ec) {
dassert_f(ec == ERR_OK, "update app_info to remote storage failed with err = {}", ec);

zauto_write_lock l(_lock);
auto app = get_app(app_name);
std::string old_envs = dsn::utils::kv_map_to_string(app->envs, ',', '=');
for (int idx = 0; idx < keys.size(); idx++) {
app->envs[keys[idx]] = values[idx];
}
std::string new_envs = dsn::utils::kv_map_to_string(app->envs, ',', '=');
ddebug_f("update manual compaction envs succeed: old_envs = {}, new_envs = {}",
old_envs,
new_envs);

rpc.response().err = ERR_OK;
rpc.response().hint_msg = "succeed";
});
}

void server_state::on_query_manual_compact_status(query_manual_compact_rpc rpc)
{
const std::string &app_name = rpc.request().app_name;
Expand Down
10 changes: 10 additions & 0 deletions src/meta/server_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ class server_state
void on_query_restore_status(configuration_query_restore_rpc rpc);

// manual compaction
void on_start_manual_compact(start_manual_compact_rpc rpc);
void on_query_manual_compact_status(query_manual_compact_rpc rpc);

// return true if no need to do any actions
Expand Down Expand Up @@ -298,6 +299,14 @@ class server_state
// check whether a max replica count is valid especially for a new app
bool validate_target_max_replica_count(int32_t max_replica_count);

// Used for `on_start_manual_compaction`
bool parse_compaction_envs(start_manual_compact_rpc rpc,
std::vector<std::string> &keys,
std::vector<std::string> &values);
void update_compaction_envs_on_remote_storage(start_manual_compact_rpc rpc,
const std::vector<std::string> &keys,
const std::vector<std::string> &values);

private:
friend class bulk_load_service;
friend class bulk_load_service_test;
Expand All @@ -311,6 +320,7 @@ class server_state
friend class meta_test_base;
friend class test::test_checker;
friend class server_state_restore_test;
friend class meta_app_compaction_test;

FRIEND_TEST(meta_backup_service_test, test_add_backup_policy);
FRIEND_TEST(policy_context_test, test_app_dropped_during_backup);
Expand Down
Loading

0 comments on commit e8cee55

Please sign in to comment.