diff --git a/src/meta/meta_http_service.cpp b/src/meta/meta_http_service.cpp index c9552b29ee..34ef26e467 100644 --- a/src/meta/meta_http_service.cpp +++ b/src/meta/meta_http_service.cpp @@ -6,6 +6,7 @@ #include #include +#include #include #include #include @@ -699,6 +700,75 @@ void meta_http_service::query_bulk_load_handler(const http_request &req, http_re resp.status_code = http_status_code::ok; } +void meta_http_service::start_compaction_handler(const http_request &req, http_response &resp) +{ + if (!redirect_if_not_primary(req, resp)) { + return; + } + + // validate parameters + manual_compaction_info info; + bool ret = json::json_forwarder::decode(req.body, info); + + if (!ret) { + resp.body = "invalid request structure"; + resp.status_code = http_status_code::bad_request; + return; + } + if (info.app_name.empty()) { + resp.body = "app_name should not be empty"; + resp.status_code = http_status_code::bad_request; + return; + } + if (info.type.empty() || (info.type != "once" && info.type != "periodic")) { + resp.body = "type should ony be 'once' or 'periodic'"; + resp.status_code = http_status_code::bad_request; + return; + } + if (info.target_level < -1) { + resp.body = "target_level should be >= -1"; + resp.status_code = http_status_code::bad_request; + return; + } + if (info.bottommost_level_compaction.empty() || (info.bottommost_level_compaction != "skip" && + info.bottommost_level_compaction != "force")) { + resp.body = "bottommost_level_compaction should ony be 'skip' or 'force'"; + resp.status_code = http_status_code::bad_request; + return; + } + if (info.max_concurrent_running_count < 0) { + resp.body = "max_running_count should be >= 0"; + resp.status_code = http_status_code::bad_request; + return; + } + if (info.type == "periodic" && info.trigger_time.empty()) { + resp.body = "trigger_time should not be empty when type is periodic"; + resp.status_code = http_status_code::bad_request; + return; + } + + // create configuration_update_app_env_request + std::vector keys; + std::vector values; + if (info.type == "once") { + keys.emplace_back(replica_envs::MANUAL_COMPACT_ONCE_TARGET_LEVEL); + keys.emplace_back(replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION); + keys.emplace_back(replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME); + } else { + keys.emplace_back(replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL); + keys.emplace_back(replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION); + keys.emplace_back(replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME); + } + values.emplace_back(std::to_string(info.target_level)); + values.emplace_back(info.bottommost_level_compaction); + values.emplace_back(info.type == "once" ? std::to_string(dsn_now_s()) : info.trigger_time); + if (info.max_concurrent_running_count > 0) { + keys.emplace_back(replica_envs::MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT); + values.emplace_back(std::to_string(info.max_concurrent_running_count)); + } + update_app_env(info.app_name, keys, values, resp); +} + bool meta_http_service::redirect_if_not_primary(const http_request &req, http_response &resp) { #ifdef DSN_MOCK_TEST @@ -722,5 +792,31 @@ bool meta_http_service::redirect_if_not_primary(const http_request &req, http_re return false; } +void meta_http_service::update_app_env(const std::string &app_name, + const std::vector &keys, + const std::vector &values, + http_response &resp) +{ + configuration_update_app_env_request request; + request.app_name = app_name; + request.op = app_env_operation::APP_ENV_OP_SET; + request.__set_keys(keys); + request.__set_values(values); + + auto rpc_req = dsn::make_unique(request); + update_app_env_rpc rpc(std::move(rpc_req), LPC_META_STATE_NORMAL); + _service->_state->set_app_envs(rpc); + + auto rpc_resp = rpc.response(); + // output as json format + dsn::utils::table_printer tp; + tp.add_row_name_and_data("error", rpc_resp.err.to_string()); + tp.add_row_name_and_data("hint_message", rpc_resp.hint_message); + std::ostringstream out; + tp.output(out, dsn::utils::table_printer::output_format::kJsonCompact); + resp.body = out.str(); + resp.status_code = http_status_code::ok; +} + } // namespace replication } // namespace dsn diff --git a/src/meta/meta_http_service.h b/src/meta/meta_http_service.h index e857df176f..d403cbc0da 100644 --- a/src/meta/meta_http_service.h +++ b/src/meta/meta_http_service.h @@ -15,6 +15,22 @@ namespace replication { NON_MEMBER_JSON_SERIALIZATION( start_bulk_load_request, app_name, cluster_name, file_provider_type, remote_root_path) +struct manual_compaction_info +{ + std::string app_name; + std::string type; // periodic or once + int32_t target_level; // [-1,num_levels] + std::string bottommost_level_compaction; // skip or force + int32_t max_concurrent_running_count; // 0 means no limit + std::string trigger_time; // only used when the type is periodic + DEFINE_JSON_SERIALIZATION(app_name, + type, + target_level, + bottommost_level_compaction, + max_concurrent_running_count, + trigger_time) +}; + class meta_service; class meta_http_service : public http_service { @@ -76,6 +92,13 @@ class meta_http_service : public http_service std::placeholders::_1, std::placeholders::_2), "ip:port/meta/query_bulk_load?name=temp"); + // request body should be manual_compaction_info + register_handler("app/start_compaction", + std::bind(&meta_http_service::start_compaction_handler, + this, + std::placeholders::_1, + std::placeholders::_2), + "ip:port/meta/start_compaction"); } std::string path() const override { return "meta"; } @@ -89,11 +112,17 @@ class meta_http_service : public http_service void query_duplication_handler(const http_request &req, http_response &resp); void start_bulk_load_handler(const http_request &req, http_response &resp); void query_bulk_load_handler(const http_request &req, http_response &resp); + void start_compaction_handler(const http_request &req, http_response &resp); private: // set redirect location if current server is not primary bool redirect_if_not_primary(const http_request &req, http_response &resp); + void update_app_env(const std::string &app_name, + const std::vector &keys, + const std::vector &values, + http_response &resp); + meta_service *_service; }; diff --git a/src/meta/test/meta_http_service_test.cpp b/src/meta/test/meta_http_service_test.cpp index b19167a23e..202f8f3877 100644 --- a/src/meta/test/meta_http_service_test.cpp +++ b/src/meta/test/meta_http_service_test.cpp @@ -2,6 +2,7 @@ // This source code is licensed under the Apache License Version 2.0, which // can be found in the LICENSE file in the root directory of this source tree. +#include #include #include #include @@ -211,6 +212,15 @@ class meta_bulk_load_http_test : public meta_test_base return resp.body; } + http_response test_start_compaction(std::string req_body_json) + { + http_request req; + http_response resp; + req.body = blob::create_from_bytes(std::move(req_body_json)); + _mhs->start_compaction_handler(req, resp); + return resp; + } + void mock_bulk_load_context(const bulk_load_status::type &status) { auto app = find_app(APP_NAME); @@ -247,24 +257,24 @@ TEST_F(meta_bulk_load_http_test, start_bulk_load_request) http_status_code expected_code; std::string expected_response_json; } tests[] = { - {"{\"app\":\"test_bulk_load\",\"cluster_name\":\"onebox\", " - "\"file_provider_type\":\"local_service\", \"remote_root_path\":\"bulk_load_root\"}", + {R"({"app":"test_bulk_load","cluster_name":"onebox","file_provider_type":"local_service","remote_root_path":"bulk_load_root"})", http_status_code::bad_request, "invalid request structure"}, - {"{\"app_name\":\"test_bulk_load\",\"cluster_name\":\"onebox\", " - "\"file_provider_type\":\"\", \"remote_root_path\":\"bulk_load_root\"}", + {R"({"app_name":"test_bulk_load","cluster_name":"onebox","file_provider_type":"","remote_root_path":"bulk_load_root"})", http_status_code::bad_request, "file_provider_type should not be empty"}, - {"{\"app_name\":\"test_bulk_load\",\"cluster_name\":\"onebox\", " - "\"file_provider_type\":\"local_service\", \"remote_root_path\":\"bulk_load_root\"}", + {R"({"app_name":"test_bulk_load","cluster_name":"onebox","file_provider_type":"local_service","remote_root_path":"bulk_load_root"})", http_status_code::ok, - "{\"error\":\"ERR_OK\",\"hint_msg\":\"\"}\n"}, + R"({"error":"ERR_OK","hint_msg":""})"}, }; - for (const auto &test : tests) { http_response resp = test_start_bulk_load(test.request_json); ASSERT_EQ(resp.status_code, test.expected_code); - ASSERT_EQ(resp.body, test.expected_response_json); + std::string expected_json = test.expected_response_json; + if (test.expected_code == http_status_code::ok) { + expected_json += "\n"; + } + ASSERT_EQ(resp.body, expected_json); } fail::teardown(); } @@ -281,21 +291,57 @@ TEST_F(meta_bulk_load_http_test, query_bulk_load_request) { std::string app_name; std::string expected_json; - } tests[] = {{APP_NAME, - "{\"error\":\"ERR_OK\",\"app_status\":\"replication::bulk_load_status::" - "BLS_DOWNLOADING\"}\n"}, - {NOT_BULK_LOAD, - "{\"error\":\"ERR_INVALID_STATE\",\"app_status\":\"replication::" - "bulk_load_status::BLS_INVALID\"}\n"}, - {NOT_FOUND, - "{\"error\":\"ERR_APP_NOT_EXIST\",\"app_status\":\"replication::bulk_" - "load_status::BLS_INVALID\"}\n"}}; + } tests[] = { + {APP_NAME, + R"({"error":"ERR_OK","app_status":"replication::bulk_load_status::BLS_DOWNLOADING"})"}, + {NOT_BULK_LOAD, + R"({"error":"ERR_INVALID_STATE","app_status":"replication::bulk_load_status::BLS_INVALID"})"}, + {NOT_FOUND, + R"({"error":"ERR_APP_NOT_EXIST","app_status":"replication::bulk_load_status::BLS_INVALID"})"}}; for (const auto &test : tests) { - ASSERT_EQ(test_query_bulk_load(test.app_name), test.expected_json); + ASSERT_EQ(test_query_bulk_load(test.app_name), test.expected_json + "\n"); } drop_app(NOT_BULK_LOAD); } +TEST_F(meta_bulk_load_http_test, start_compaction_test) +{ + struct start_compaction_test + { + std::string request_json; + http_status_code expected_code; + std::string expected_response_json; + } tests[] = { + {R"({"app_name":"test_bulk_load","type":"once","target_level":-1,"bottommost_level_compaction":"skip","max_concurrent_running_count":"0"})", + http_status_code::bad_request, + "invalid request structure"}, + {R"({"app_name":"test_bulk_load","type":"wrong","target_level":-1,"bottommost_level_compaction":"skip","max_concurrent_running_count":0,"trigger_time":""})", + http_status_code::bad_request, + "type should ony be 'once' or 'periodic'"}, + {R"({"app_name":"test_bulk_load","type":"once","target_level":-3,"bottommost_level_compaction":"skip","max_concurrent_running_count":0,"trigger_time":""})", + http_status_code::bad_request, + "target_level should be >= -1"}, + {R"({"app_name":"test_bulk_load","type":"once","target_level":-1,"bottommost_level_compaction":"wrong","max_concurrent_running_count":0,"trigger_time":""})", + http_status_code::bad_request, + "bottommost_level_compaction should ony be 'skip' or 'force'"}, + {R"({"app_name":"test_bulk_load","type":"once","target_level":-1,"bottommost_level_compaction":"skip","max_concurrent_running_count":-2,"trigger_time":""})", + http_status_code::bad_request, + "max_running_count should be >= 0"}, + {R"({"app_name":"test_bulk_load","type":"once","target_level":-1,"bottommost_level_compaction":"skip","max_concurrent_running_count":0,"trigger_time":""})", + http_status_code::ok, + R"({"error":"ERR_OK","hint_message":""})"}}; + + for (const auto &test : tests) { + http_response resp = test_start_compaction(test.request_json); + ASSERT_EQ(resp.status_code, test.expected_code); + std::string expected_json = test.expected_response_json; + if (test.expected_code == http_status_code::ok) { + expected_json += "\n"; + } + ASSERT_EQ(resp.body, expected_json); + } +} + } // namespace replication } // namespace dsn