From e8cee55915a779f4b85b9809c15dfcd94875ece1 Mon Sep 17 00:00:00 2001 From: HeYuchen Date: Fri, 17 Dec 2021 16:49:41 +0800 Subject: [PATCH] feat(manual_compaction): meta server support starting compaction (#989) --- .../dsn/dist/replication/replication.codes.h | 1 + .../dist/replication/replication_ddl_client.h | 6 + src/client/replication_ddl_client.cpp | 14 ++ src/common/meta_admin.thrift | 20 +++ src/common/replication_common.h | 3 + src/meta/meta_service.cpp | 13 ++ src/meta/meta_service.h | 1 + src/meta/server_state.cpp | 134 ++++++++++++++++++ src/meta/server_state.h | 10 ++ src/meta/test/meta_mauanl_compaction_test.cpp | 89 +++++++++++- 10 files changed, 290 insertions(+), 1 deletion(-) diff --git a/include/dsn/dist/replication/replication.codes.h b/include/dsn/dist/replication/replication.codes.h index be7bdab457..2cc8cf9c83 100644 --- a/include/dsn/dist/replication/replication.codes.h +++ b/include/dsn/dist/replication/replication.codes.h @@ -122,6 +122,7 @@ MAKE_EVENT_CODE_RPC(RPC_CM_CONTROL_BULK_LOAD, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_BULK_LOAD_STATUS, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_START_BACKUP_APP, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_BACKUP_STATUS, TASK_PRIORITY_COMMON) +MAKE_EVENT_CODE_RPC(RPC_CM_START_MANUAL_COMPACT, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_MANUAL_COMPACT_STATUS, TASK_PRIORITY_COMMON) #undef CURRENT_THREAD_POOL diff --git a/include/dsn/dist/replication/replication_ddl_client.h b/include/dsn/dist/replication/replication_ddl_client.h index 8bac4c1ecc..760c99357c 100644 --- a/include/dsn/dist/replication/replication_ddl_client.h +++ b/include/dsn/dist/replication/replication_ddl_client.h @@ -218,6 +218,12 @@ class replication_ddl_client error_with add_new_disk(const rpc_address &target_node, const std::string &disk_str); + error_with + start_app_manual_compact(const std::string &app_name, + bool bottommost = false, + const int32_t level = -1, + const int32_t max_count = 0); + error_with query_app_manual_compact(const std::string &app_name); diff --git a/src/client/replication_ddl_client.cpp b/src/client/replication_ddl_client.cpp index 9d2a693d52..4416aecba8 100644 --- a/src/client/replication_ddl_client.cpp +++ b/src/client/replication_ddl_client.cpp @@ -1684,6 +1684,20 @@ replication_ddl_client::add_new_disk(const rpc_address &target_node, const std:: return resps.begin()->second.get_value(); } +error_with replication_ddl_client::start_app_manual_compact( + const std::string &app_name, bool bottommost, const int32_t level, const int32_t max_count) +{ + auto req = make_unique(); + req->app_name = app_name; + req->__set_trigger_time(dsn_now_s()); + req->__set_target_level(level); + req->__set_bottommost(bottommost); + if (max_count > 0) { + req->__set_max_running_count(max_count); + } + return call_rpc_sync(start_manual_compact_rpc(std::move(req), RPC_CM_START_MANUAL_COMPACT)); +} + error_with replication_ddl_client::query_app_manual_compact(const std::string &app_name) { diff --git a/src/common/meta_admin.thrift b/src/common/meta_admin.thrift index d8b3634fcc..196d7d24f2 100644 --- a/src/common/meta_admin.thrift +++ b/src/common/meta_admin.thrift @@ -237,6 +237,26 @@ struct configuration_update_app_env_response 2:string hint_message; } +struct start_app_manual_compact_request +{ + 1:string app_name; + 2:optional i64 trigger_time; + 3:optional i32 target_level; + 4:optional bool bottommost; + 5:optional i32 max_running_count; +} + +struct start_app_manual_compact_response +{ + // Possible error: + // - ERR_APP_NOT_EXIST: app not exist + // - ERR_APP_DROPPED: app has been dropped + // - ERR_OPERATION_DISABLED: app disable manual compaction + // - ERR_INVALID_PARAMETERS: invalid manual compaction parameters + 1:dsn.error_code err; + 2:string hint_msg; +} + struct query_app_manual_compact_request { 1:string app_name; diff --git a/src/common/replication_common.h b/src/common/replication_common.h index 53bb8ed671..c9f6932f4d 100644 --- a/src/common/replication_common.h +++ b/src/common/replication_common.h @@ -51,6 +51,9 @@ typedef rpc_holder notify typedef rpc_holder query_child_state_rpc; typedef rpc_holder backup_rpc; + +typedef rpc_holder + start_manual_compact_rpc; typedef rpc_holder query_manual_compact_rpc; diff --git a/src/meta/meta_service.cpp b/src/meta/meta_service.cpp index e527c27740..e7cca4dd3f 100644 --- a/src/meta/meta_service.cpp +++ b/src/meta/meta_service.cpp @@ -540,6 +540,9 @@ void meta_service::register_rpc_handlers() RPC_CM_START_BACKUP_APP, "start_backup_app", &meta_service::on_start_backup_app); register_rpc_handler_with_rpc_holder( RPC_CM_QUERY_BACKUP_STATUS, "query_backup_status", &meta_service::on_query_backup_status); + register_rpc_handler_with_rpc_holder(RPC_CM_START_MANUAL_COMPACT, + "start_manual_compact", + &meta_service::on_start_manual_compact); register_rpc_handler_with_rpc_holder(RPC_CM_QUERY_MANUAL_COMPACT_STATUS, "query_manual_compact_status", &meta_service::on_query_manual_compact_status); @@ -1222,6 +1225,16 @@ size_t meta_service::get_alive_node_count() const return _alive_set.size(); } +void meta_service::on_start_manual_compact(start_manual_compact_rpc rpc) +{ + if (!check_status(rpc)) { + return; + } + tasking::enqueue(LPC_META_STATE_NORMAL, + nullptr, + std::bind(&server_state::on_start_manual_compact, _state.get(), rpc)); +} + void meta_service::on_query_manual_compact_status(query_manual_compact_rpc rpc) { if (!check_status(rpc)) { diff --git a/src/meta/meta_service.h b/src/meta/meta_service.h index 48671c7359..561276c939 100644 --- a/src/meta/meta_service.h +++ b/src/meta/meta_service.h @@ -230,6 +230,7 @@ class meta_service : public serverlet void on_query_bulk_load_status(query_bulk_load_rpc rpc); // manual compaction + void on_start_manual_compact(start_manual_compact_rpc rpc); void on_query_manual_compact_status(query_manual_compact_rpc rpc); // common routines diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp index 110d40858e..3a7d6b1630 100644 --- a/src/meta/server_state.cpp +++ b/src/meta/server_state.cpp @@ -35,6 +35,7 @@ */ #include +#include #include #include #include @@ -2943,6 +2944,139 @@ bool server_state::validate_target_max_replica_count(int32_t max_replica_count) return valid; } +void server_state::on_start_manual_compact(start_manual_compact_rpc rpc) +{ + const std::string &app_name = rpc.request().app_name; + auto &response = rpc.response(); + + std::map envs; + { + zauto_read_lock l(_lock); + auto app = get_app(app_name); + if (app == nullptr || app->status != app_status::AS_AVAILABLE) { + response.err = app == nullptr ? ERR_APP_NOT_EXIST : ERR_APP_DROPPED; + response.hint_msg = + fmt::format("app {} is {}", + app_name, + response.err == ERR_APP_NOT_EXIST ? "not existed" : "not available"); + derror_f("{}", response.hint_msg); + return; + } + envs = app->envs; + } + + auto iter = envs.find(replica_envs::MANUAL_COMPACT_DISABLED); + if (iter != envs.end() && iter->second == "true") { + response.err = ERR_OPERATION_DISABLED; + response.hint_msg = fmt::format("app {} disable manual compaction", app_name); + derror_f("{}", response.hint_msg); + return; + } + + std::vector keys; + std::vector values; + if (!parse_compaction_envs(rpc, keys, values)) { + return; + } + + update_compaction_envs_on_remote_storage(rpc, keys, values); + + // update local manual compaction status + { + zauto_write_lock l(_lock); + auto app = get_app(app_name); + app->helpers->reset_manual_compact_status(); + } +} + +bool server_state::parse_compaction_envs(start_manual_compact_rpc rpc, + std::vector &keys, + std::vector &values) +{ + const auto &request = rpc.request(); + auto &response = rpc.response(); + + int32_t target_level = -1; + if (request.__isset.target_level) { + target_level = request.target_level; + if (target_level < -1) { + response.err = ERR_INVALID_PARAMETERS; + response.hint_msg = fmt::format( + "invalid target_level({}), should in range of [-1, num_levels]", target_level); + derror_f("{}", response.hint_msg); + return false; + } + } + keys.emplace_back(replica_envs::MANUAL_COMPACT_ONCE_TARGET_LEVEL); + values.emplace_back(std::to_string(target_level)); + + if (request.__isset.max_running_count) { + if (request.max_running_count < 0) { + response.err = ERR_INVALID_PARAMETERS; + response.hint_msg = + fmt::format("invalid max_running_count({}), should be greater than 0", + request.max_running_count); + derror_f("{}", response.hint_msg); + return false; + } + if (request.max_running_count > 0) { + keys.emplace_back(replica_envs::MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT); + values.emplace_back(std::to_string(request.max_running_count)); + } + } + + std::string bottommost = "skip"; + if (request.__isset.bottommost && request.bottommost) { + bottommost = "force"; + } + keys.emplace_back(replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION); + values.emplace_back(bottommost); + + int64_t trigger_time = dsn_now_s(); + if (request.__isset.trigger_time) { + trigger_time = request.trigger_time; + } + keys.emplace_back(replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME); + values.emplace_back(std::to_string(trigger_time)); + + return true; +} + +void server_state::update_compaction_envs_on_remote_storage(start_manual_compact_rpc rpc, + const std::vector &keys, + const std::vector &values) +{ + const std::string &app_name = rpc.request().app_name; + std::string app_path = ""; + app_info ainfo; + { + zauto_read_lock l(_lock); + auto app = get_app(app_name); + ainfo = *(reinterpret_cast(app.get())); + app_path = get_app_path(*app); + } + for (auto idx = 0; idx < keys.size(); idx++) { + ainfo.envs[keys[idx]] = values[idx]; + } + do_update_app_info(app_path, ainfo, [this, app_name, keys, values, rpc](error_code ec) { + dassert_f(ec == ERR_OK, "update app_info to remote storage failed with err = {}", ec); + + zauto_write_lock l(_lock); + auto app = get_app(app_name); + std::string old_envs = dsn::utils::kv_map_to_string(app->envs, ',', '='); + for (int idx = 0; idx < keys.size(); idx++) { + app->envs[keys[idx]] = values[idx]; + } + std::string new_envs = dsn::utils::kv_map_to_string(app->envs, ',', '='); + ddebug_f("update manual compaction envs succeed: old_envs = {}, new_envs = {}", + old_envs, + new_envs); + + rpc.response().err = ERR_OK; + rpc.response().hint_msg = "succeed"; + }); +} + void server_state::on_query_manual_compact_status(query_manual_compact_rpc rpc) { const std::string &app_name = rpc.request().app_name; diff --git a/src/meta/server_state.h b/src/meta/server_state.h index a7c060f4cb..51fcab7b7b 100644 --- a/src/meta/server_state.h +++ b/src/meta/server_state.h @@ -172,6 +172,7 @@ class server_state void on_query_restore_status(configuration_query_restore_rpc rpc); // manual compaction + void on_start_manual_compact(start_manual_compact_rpc rpc); void on_query_manual_compact_status(query_manual_compact_rpc rpc); // return true if no need to do any actions @@ -298,6 +299,14 @@ class server_state // check whether a max replica count is valid especially for a new app bool validate_target_max_replica_count(int32_t max_replica_count); + // Used for `on_start_manual_compaction` + bool parse_compaction_envs(start_manual_compact_rpc rpc, + std::vector &keys, + std::vector &values); + void update_compaction_envs_on_remote_storage(start_manual_compact_rpc rpc, + const std::vector &keys, + const std::vector &values); + private: friend class bulk_load_service; friend class bulk_load_service_test; @@ -311,6 +320,7 @@ class server_state friend class meta_test_base; friend class test::test_checker; friend class server_state_restore_test; + friend class meta_app_compaction_test; FRIEND_TEST(meta_backup_service_test, test_add_backup_policy); FRIEND_TEST(policy_context_test, test_app_dropped_during_backup); diff --git a/src/meta/test/meta_mauanl_compaction_test.cpp b/src/meta/test/meta_mauanl_compaction_test.cpp index f10b71776f..4188d7af0d 100644 --- a/src/meta/test/meta_mauanl_compaction_test.cpp +++ b/src/meta/test/meta_mauanl_compaction_test.cpp @@ -50,6 +50,59 @@ class meta_app_compaction_test : public meta_test_base } } + error_code start_manual_compaction(std::string app_name, + std::string disable_manual, + bool bottommost = false, + int32_t target_level = -1, + int32_t running_count = 0) + { + if (app_name == APP_NAME) { + auto app = find_app(app_name); + app->envs[replica_envs::MANUAL_COMPACT_DISABLED] = disable_manual; + } + auto request = dsn::make_unique(); + request->app_name = app_name; + if (target_level != -1) { + request->__set_target_level(target_level); + } + if (running_count != 0) { + request->__set_max_running_count(running_count); + } + request->__set_bottommost(bottommost); + + start_manual_compact_rpc rpc(std::move(request), RPC_CM_START_MANUAL_COMPACT); + _ss->on_start_manual_compact(rpc); + _ss->wait_all_task(); + return rpc.response().err; + } + + void check_after_start_compaction(std::string bottommost, + int32_t target_level = -1, + int32_t running_count = 0) + { + auto app = find_app(APP_NAME); + if (app->envs.find(replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION) != + app->envs.end()) { + ASSERT_EQ(app->envs[replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION], + bottommost); + } + if (app->envs.find(replica_envs::MANUAL_COMPACT_ONCE_TARGET_LEVEL) != app->envs.end()) { + ASSERT_EQ(app->envs[replica_envs::MANUAL_COMPACT_ONCE_TARGET_LEVEL], + std::to_string(target_level)); + } + if (running_count > 0 && + app->envs.find(replica_envs::MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT) != + app->envs.end()) { + ASSERT_EQ(app->envs[replica_envs::MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT], + std::to_string(running_count)); + } + for (auto &cc : app->helpers->contexts) { + for (auto &r : cc.serving) { + ASSERT_EQ(r.compact_status, manual_compaction_status::IDLE); + } + } + } + query_app_manual_compact_response query_manual_compaction(int32_t mock_progress) { manual_compaction_status::type status = manual_compaction_status::IDLE; @@ -87,6 +140,40 @@ class meta_app_compaction_test : public meta_test_base int32_t PARTITION_COUNT = 4; }; +TEST_F(meta_app_compaction_test, test_start_compaction) +{ + struct test_case + { + std::string app_name; + std::string disable_compaction; + bool bottommost; + int32_t target_level; + int32_t running_count; + error_code expected_err; + std::string expected_bottommost; + } tests[] = {{"app_not_exist", "false", false, -1, 0, ERR_APP_NOT_EXIST, "skip"}, + {APP_NAME, "true", false, -1, 0, ERR_OPERATION_DISABLED, "skip"}, + {APP_NAME, "false", false, -5, 0, ERR_INVALID_PARAMETERS, "skip"}, + {APP_NAME, "false", false, -1, -1, ERR_INVALID_PARAMETERS, "skip"}, + {APP_NAME, "false", false, -1, 0, ERR_OK, "skip"}, + {APP_NAME, "false", true, -1, 0, ERR_OK, "force"}, + {APP_NAME, "false", false, 1, 0, ERR_OK, "skip"}, + {APP_NAME, "false", true, -1, 1, ERR_OK, "force"}}; + + for (const auto &test : tests) { + auto err = start_manual_compaction(test.app_name, + test.disable_compaction, + test.bottommost, + test.target_level, + test.running_count); + ASSERT_EQ(err, test.expected_err); + if (err == ERR_OK) { + check_after_start_compaction( + test.expected_bottommost, test.target_level, test.running_count); + } + } +} + TEST_F(meta_app_compaction_test, test_query_compaction) { struct test_case @@ -95,7 +182,7 @@ TEST_F(meta_app_compaction_test, test_query_compaction) error_code expected_err; } tests[] = {{-1, ERR_INVALID_STATE}, {0, ERR_OK}, {50, ERR_OK}, {100, ERR_OK}}; - for (auto test : tests) { + for (const auto &test : tests) { auto resp = query_manual_compaction(test.mock_progress); ASSERT_EQ(resp.err, test.expected_err); if (resp.err == ERR_OK) {