From da2d59d27c4c786a18136a236fe1a30531a4fa8b Mon Sep 17 00:00:00 2001 From: heyuchen Date: Fri, 18 Dec 2020 15:50:48 +0800 Subject: [PATCH 1/4] feat(bulk_load): add update compaction envs http interface --- src/meta/meta_http_service.cpp | 95 ++++++++++++++++++++++++ src/meta/meta_http_service.h | 29 ++++++++ src/meta/test/meta_http_service_test.cpp | 53 +++++++++++++ 3 files changed, 177 insertions(+) diff --git a/src/meta/meta_http_service.cpp b/src/meta/meta_http_service.cpp index c9552b29ee..963751777f 100644 --- a/src/meta/meta_http_service.cpp +++ b/src/meta/meta_http_service.cpp @@ -6,6 +6,7 @@ #include #include +#include #include #include #include @@ -699,6 +700,74 @@ void meta_http_service::query_bulk_load_handler(const http_request &req, http_re resp.status_code = http_status_code::ok; } +void meta_http_service::start_compaction_handler(const http_request &req, http_response &resp) +{ + if (!redirect_if_not_primary(req, resp)) { + return; + } + + // validate paramters + manual_compaction_info info; + bool ret = json::json_forwarder::decode(req.body, info); + if (!ret) { + resp.body = "invalid request structure"; + resp.status_code = http_status_code::bad_request; + return; + } + if (info.app_name.empty()) { + resp.body = "app_name should not be empty"; + resp.status_code = http_status_code::bad_request; + return; + } + if (info.type.empty() || (info.type != "once" && info.type != "periodic")) { + resp.body = "type should ony be once or periodic"; + resp.status_code = http_status_code::bad_request; + return; + } + if (info.target_level < -1) { + resp.body = "target_level should be greater than -1"; + resp.status_code = http_status_code::bad_request; + return; + } + if (info.bottommost_level_compaction.empty() || (info.bottommost_level_compaction != "skip" && + info.bottommost_level_compaction != "force")) { + resp.body = "bottommost_level_compaction should ony be skip or force"; + resp.status_code = http_status_code::bad_request; + return; + } + if (info.max_concurrent_running_count < 0) { + resp.body = "max_running_count should be greater than 0"; + resp.status_code = http_status_code::bad_request; + return; + } + if (info.type == "periodic" && info.trigger_time.empty()) { + resp.body = "trigger_time should not be empty when type is periodic"; + resp.status_code = http_status_code::bad_request; + return; + } + + // create configuration_update_app_env_request + std::vector keys; + std::vector values; + if (info.type == "once") { + keys.emplace_back(replica_envs::MANUAL_COMPACT_ONCE_TARGET_LEVEL); + keys.emplace_back(replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION); + keys.emplace_back(replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME); + } else { + keys.emplace_back(replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL); + keys.emplace_back(replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION); + keys.emplace_back(replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME); + } + values.emplace_back(std::to_string(info.target_level)); + values.emplace_back(info.bottommost_level_compaction); + values.emplace_back(info.type == "once" ? std::to_string(dsn_now_s()) : info.trigger_time); + if (info.max_concurrent_running_count > 0) { + keys.emplace_back(replica_envs::MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT); + values.emplace_back(std::to_string(info.max_concurrent_running_count)); + } + update_app_env_handler(info.app_name, keys, values, resp); +} + bool meta_http_service::redirect_if_not_primary(const http_request &req, http_response &resp) { #ifdef DSN_MOCK_TEST @@ -722,5 +791,31 @@ bool meta_http_service::redirect_if_not_primary(const http_request &req, http_re return false; } +void meta_http_service::update_app_env_handler(const std::string &app_name, + const std::vector &keys, + const std::vector &values, + http_response &resp) +{ + configuration_update_app_env_request request; + request.app_name = app_name; + request.op = app_env_operation::APP_ENV_OP_SET; + request.__set_keys(keys); + request.__set_values(values); + + auto rpc_req = dsn::make_unique(request); + update_app_env_rpc rpc(std::move(rpc_req), LPC_META_STATE_NORMAL); + _service->_state->set_app_envs(rpc); + + auto rpc_resp = rpc.response(); + // output as json format + dsn::utils::table_printer tp; + tp.add_row_name_and_data("error", rpc_resp.err.to_string()); + tp.add_row_name_and_data("hint_message", rpc_resp.hint_message); + std::ostringstream out; + tp.output(out, dsn::utils::table_printer::output_format::kJsonCompact); + resp.body = out.str(); + resp.status_code = http_status_code::ok; +} + } // namespace replication } // namespace dsn diff --git a/src/meta/meta_http_service.h b/src/meta/meta_http_service.h index e857df176f..c1d1c3d406 100644 --- a/src/meta/meta_http_service.h +++ b/src/meta/meta_http_service.h @@ -15,6 +15,22 @@ namespace replication { NON_MEMBER_JSON_SERIALIZATION( start_bulk_load_request, app_name, cluster_name, file_provider_type, remote_root_path) +struct manual_compaction_info +{ + std::string app_name; + std::string type; // periodic or once + int32_t target_level; // [-1,num_levels] + std::string bottommost_level_compaction; // skip or force + int32_t max_concurrent_running_count; // 0 means no limit + std::string trigger_time; // only used when the type is periodic + DEFINE_JSON_SERIALIZATION(app_name, + type, + target_level, + bottommost_level_compaction, + max_concurrent_running_count, + trigger_time) +}; + class meta_service; class meta_http_service : public http_service { @@ -76,6 +92,13 @@ class meta_http_service : public http_service std::placeholders::_1, std::placeholders::_2), "ip:port/meta/query_bulk_load?name=temp"); + // request body should be manual_compaction_info + register_handler("app/start_compaction", + std::bind(&meta_http_service::start_compaction_handler, + this, + std::placeholders::_1, + std::placeholders::_2), + "ip:port/meta/start_compaction"); } std::string path() const override { return "meta"; } @@ -89,11 +112,17 @@ class meta_http_service : public http_service void query_duplication_handler(const http_request &req, http_response &resp); void start_bulk_load_handler(const http_request &req, http_response &resp); void query_bulk_load_handler(const http_request &req, http_response &resp); + void start_compaction_handler(const http_request &req, http_response &resp); private: // set redirect location if current server is not primary bool redirect_if_not_primary(const http_request &req, http_response &resp); + void update_app_env_handler(const std::string &app_name, + const std::vector &keys, + const std::vector &values, + http_response &resp); + meta_service *_service; }; diff --git a/src/meta/test/meta_http_service_test.cpp b/src/meta/test/meta_http_service_test.cpp index 7902e17199..94af1ad977 100644 --- a/src/meta/test/meta_http_service_test.cpp +++ b/src/meta/test/meta_http_service_test.cpp @@ -214,6 +214,15 @@ class meta_bulk_load_http_test : public meta_test_base return resp.body; } + http_response test_start_compaction(std::string req_body_json) + { + http_request req; + http_response resp; + req.body = blob::create_from_bytes(std::move(req_body_json)); + _mhs->start_compaction_handler(req, resp); + return resp; + } + void mock_bulk_load_context(const bulk_load_status::type &status) { auto app = find_app(APP_NAME); @@ -300,5 +309,49 @@ TEST_F(meta_bulk_load_http_test, query_bulk_load_request) drop_app(NOT_BULK_LOAD); } +TEST_F(meta_bulk_load_http_test, start_compaction_test) +{ + struct start_compaction_test + { + std::string request_json; + http_status_code expected_code; + std::string expected_response_json; + } tests[] = { + {"{\"app_name\":\"test_bulk_load\",\"type\":\"once\",\"target_level\":-1,\"bottommost_" + "level_compaction\":\"skip\",\"max_concurrent_running_count\":\"0\"}", + http_status_code::bad_request, + "invalid request structure"}, + + {"{\"app_name\":\"test_bulk_load\",\"type\":\"wrong\",\"target_level\":-1,\"bottommost_" + "level_compaction\":\"skip\",\"max_concurrent_running_count\":0,\"trigger_time\":\"\"}", + http_status_code::bad_request, + "type should ony be once or periodic"}, + + {"{\"app_name\":\"test_bulk_load\",\"type\":\"once\",\"target_level\":-3,\"bottommost_" + "level_compaction\":\"skip\",\"max_concurrent_running_count\":0,\"trigger_time\":\"\"}", + http_status_code::bad_request, + "target_level should be greater than -1"}, + + {"{\"app_name\":\"test_bulk_load\",\"type\":\"once\",\"target_level\":-1,\"bottommost_" + "level_compaction\":\"wrong\",\"max_concurrent_running_count\":0,\"trigger_time\":\"\"}", + http_status_code::bad_request, + "bottommost_level_compaction should ony be skip or force"}, + {"{\"app_name\":\"test_bulk_load\",\"type\":\"once\",\"target_level\":-1,\"bottommost_" + "level_compaction\":\"skip\",\"max_concurrent_running_count\":-2,\"trigger_time\":\"\"}", + http_status_code::bad_request, + "max_running_count should be greater than 0"}, + + {"{\"app_name\":\"test_bulk_load\",\"type\":\"once\",\"target_level\":-1,\"bottommost_" + "level_compaction\":\"skip\",\"max_concurrent_running_count\":0,\"trigger_time\":\"\"}", + http_status_code::ok, + "{\"error\":\"ERR_OK\",\"hint_message\":\"\"}\n"}}; + + for (const auto &test : tests) { + http_response resp = test_start_compaction(test.request_json); + ASSERT_EQ(resp.status_code, test.expected_code); + ASSERT_EQ(resp.body, test.expected_response_json); + } +} + } // namespace replication } // namespace dsn From dff659a935ca5405759512d282204e22bb8168a0 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Tue, 22 Dec 2020 17:50:18 +0800 Subject: [PATCH 2/4] update by cr --- src/meta/meta_http_service.cpp | 12 ++++++------ src/meta/meta_http_service.h | 8 ++++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/meta/meta_http_service.cpp b/src/meta/meta_http_service.cpp index 963751777f..6a9464bb0b 100644 --- a/src/meta/meta_http_service.cpp +++ b/src/meta/meta_http_service.cpp @@ -706,7 +706,7 @@ void meta_http_service::start_compaction_handler(const http_request &req, http_r return; } - // validate paramters + // validate parameters manual_compaction_info info; bool ret = json::json_forwarder::decode(req.body, info); if (!ret) { @@ -765,7 +765,7 @@ void meta_http_service::start_compaction_handler(const http_request &req, http_r keys.emplace_back(replica_envs::MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT); values.emplace_back(std::to_string(info.max_concurrent_running_count)); } - update_app_env_handler(info.app_name, keys, values, resp); + update_app_env(info.app_name, keys, values, resp); } bool meta_http_service::redirect_if_not_primary(const http_request &req, http_response &resp) @@ -791,10 +791,10 @@ bool meta_http_service::redirect_if_not_primary(const http_request &req, http_re return false; } -void meta_http_service::update_app_env_handler(const std::string &app_name, - const std::vector &keys, - const std::vector &values, - http_response &resp) +void meta_http_service::update_app_env(const std::string &app_name, + const std::vector &keys, + const std::vector &values, + http_response &resp) { configuration_update_app_env_request request; request.app_name = app_name; diff --git a/src/meta/meta_http_service.h b/src/meta/meta_http_service.h index c1d1c3d406..d403cbc0da 100644 --- a/src/meta/meta_http_service.h +++ b/src/meta/meta_http_service.h @@ -118,10 +118,10 @@ class meta_http_service : public http_service // set redirect location if current server is not primary bool redirect_if_not_primary(const http_request &req, http_response &resp); - void update_app_env_handler(const std::string &app_name, - const std::vector &keys, - const std::vector &values, - http_response &resp); + void update_app_env(const std::string &app_name, + const std::vector &keys, + const std::vector &values, + http_response &resp); meta_service *_service; }; From 8a1a98bc58075b9e17530e657ffea1c63b9c3a52 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Wed, 23 Dec 2020 13:46:56 +0800 Subject: [PATCH 3/4] update by code review --- src/meta/meta_http_service.cpp | 9 ++++--- src/meta/test/meta_http_service_test.cpp | 31 +++++++++--------------- 2 files changed, 16 insertions(+), 24 deletions(-) diff --git a/src/meta/meta_http_service.cpp b/src/meta/meta_http_service.cpp index 6a9464bb0b..34ef26e467 100644 --- a/src/meta/meta_http_service.cpp +++ b/src/meta/meta_http_service.cpp @@ -709,6 +709,7 @@ void meta_http_service::start_compaction_handler(const http_request &req, http_r // validate parameters manual_compaction_info info; bool ret = json::json_forwarder::decode(req.body, info); + if (!ret) { resp.body = "invalid request structure"; resp.status_code = http_status_code::bad_request; @@ -720,23 +721,23 @@ void meta_http_service::start_compaction_handler(const http_request &req, http_r return; } if (info.type.empty() || (info.type != "once" && info.type != "periodic")) { - resp.body = "type should ony be once or periodic"; + resp.body = "type should ony be 'once' or 'periodic'"; resp.status_code = http_status_code::bad_request; return; } if (info.target_level < -1) { - resp.body = "target_level should be greater than -1"; + resp.body = "target_level should be >= -1"; resp.status_code = http_status_code::bad_request; return; } if (info.bottommost_level_compaction.empty() || (info.bottommost_level_compaction != "skip" && info.bottommost_level_compaction != "force")) { - resp.body = "bottommost_level_compaction should ony be skip or force"; + resp.body = "bottommost_level_compaction should ony be 'skip' or 'force'"; resp.status_code = http_status_code::bad_request; return; } if (info.max_concurrent_running_count < 0) { - resp.body = "max_running_count should be greater than 0"; + resp.body = "max_running_count should be >= 0"; resp.status_code = http_status_code::bad_request; return; } diff --git a/src/meta/test/meta_http_service_test.cpp b/src/meta/test/meta_http_service_test.cpp index 94af1ad977..9f60f54874 100644 --- a/src/meta/test/meta_http_service_test.cpp +++ b/src/meta/test/meta_http_service_test.cpp @@ -2,6 +2,7 @@ // This source code is licensed under the Apache License Version 2.0, which // can be found in the LICENSE file in the root directory of this source tree. +#include #include #include #include @@ -317,32 +318,22 @@ TEST_F(meta_bulk_load_http_test, start_compaction_test) http_status_code expected_code; std::string expected_response_json; } tests[] = { - {"{\"app_name\":\"test_bulk_load\",\"type\":\"once\",\"target_level\":-1,\"bottommost_" - "level_compaction\":\"skip\",\"max_concurrent_running_count\":\"0\"}", + {R"({"app_name":"test_bulk_load","type":"once","target_level":-1,"bottommost_level_compaction":"skip","max_concurrent_running_count":"0"})", http_status_code::bad_request, "invalid request structure"}, - - {"{\"app_name\":\"test_bulk_load\",\"type\":\"wrong\",\"target_level\":-1,\"bottommost_" - "level_compaction\":\"skip\",\"max_concurrent_running_count\":0,\"trigger_time\":\"\"}", + {R"({"app_name":"test_bulk_load","type":"wrong","target_level":-1,"bottommost_level_compaction":"skip","max_concurrent_running_count":0,"trigger_time":""})", http_status_code::bad_request, - "type should ony be once or periodic"}, - - {"{\"app_name\":\"test_bulk_load\",\"type\":\"once\",\"target_level\":-3,\"bottommost_" - "level_compaction\":\"skip\",\"max_concurrent_running_count\":0,\"trigger_time\":\"\"}", + "type should ony be 'once' or 'periodic'"}, + {R"({"app_name":"test_bulk_load","type":"once","target_level":-3,"bottommost_level_compaction":"skip","max_concurrent_running_count":0,"trigger_time":""})", http_status_code::bad_request, - "target_level should be greater than -1"}, - - {"{\"app_name\":\"test_bulk_load\",\"type\":\"once\",\"target_level\":-1,\"bottommost_" - "level_compaction\":\"wrong\",\"max_concurrent_running_count\":0,\"trigger_time\":\"\"}", + "target_level should be >= -1"}, + {R"({"app_name":"test_bulk_load","type":"once","target_level":-1,"bottommost_level_compaction":"wrong","max_concurrent_running_count":0,"trigger_time":""})", http_status_code::bad_request, - "bottommost_level_compaction should ony be skip or force"}, - {"{\"app_name\":\"test_bulk_load\",\"type\":\"once\",\"target_level\":-1,\"bottommost_" - "level_compaction\":\"skip\",\"max_concurrent_running_count\":-2,\"trigger_time\":\"\"}", + "bottommost_level_compaction should ony be 'skip' or 'force'"}, + {R"({"app_name":"test_bulk_load","type":"once","target_level":-1,"bottommost_level_compaction":"skip","max_concurrent_running_count":-2,"trigger_time":""})", http_status_code::bad_request, - "max_running_count should be greater than 0"}, - - {"{\"app_name\":\"test_bulk_load\",\"type\":\"once\",\"target_level\":-1,\"bottommost_" - "level_compaction\":\"skip\",\"max_concurrent_running_count\":0,\"trigger_time\":\"\"}", + "max_running_count should be >= 0"}, + {R"({"app_name":"test_bulk_load","type":"once","target_level":-1,"bottommost_level_compaction":"skip","max_concurrent_running_count":0,"trigger_time":""})", http_status_code::ok, "{\"error\":\"ERR_OK\",\"hint_message\":\"\"}\n"}}; From 215e3678769a5fec5e3494ecc451497d9cd95d31 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Wed, 23 Dec 2020 16:56:00 +0800 Subject: [PATCH 4/4] update --- src/meta/test/meta_http_service_test.cpp | 44 +++++++++++++----------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/src/meta/test/meta_http_service_test.cpp b/src/meta/test/meta_http_service_test.cpp index 9f60f54874..52233eebe2 100644 --- a/src/meta/test/meta_http_service_test.cpp +++ b/src/meta/test/meta_http_service_test.cpp @@ -260,24 +260,24 @@ TEST_F(meta_bulk_load_http_test, start_bulk_load_request) http_status_code expected_code; std::string expected_response_json; } tests[] = { - {"{\"app\":\"test_bulk_load\",\"cluster_name\":\"onebox\", " - "\"file_provider_type\":\"local_service\", \"remote_root_path\":\"bulk_load_root\"}", + {R"({"app":"test_bulk_load","cluster_name":"onebox","file_provider_type":"local_service","remote_root_path":"bulk_load_root"})", http_status_code::bad_request, "invalid request structure"}, - {"{\"app_name\":\"test_bulk_load\",\"cluster_name\":\"onebox\", " - "\"file_provider_type\":\"\", \"remote_root_path\":\"bulk_load_root\"}", + {R"({"app_name":"test_bulk_load","cluster_name":"onebox","file_provider_type":"","remote_root_path":"bulk_load_root"})", http_status_code::bad_request, "file_provider_type should not be empty"}, - {"{\"app_name\":\"test_bulk_load\",\"cluster_name\":\"onebox\", " - "\"file_provider_type\":\"local_service\", \"remote_root_path\":\"bulk_load_root\"}", + {R"({"app_name":"test_bulk_load","cluster_name":"onebox","file_provider_type":"local_service","remote_root_path":"bulk_load_root"})", http_status_code::ok, - "{\"error\":\"ERR_OK\",\"hint_msg\":\"\"}\n"}, + R"({"error":"ERR_OK","hint_msg":""})"}, }; - for (const auto &test : tests) { http_response resp = test_start_bulk_load(test.request_json); ASSERT_EQ(resp.status_code, test.expected_code); - ASSERT_EQ(resp.body, test.expected_response_json); + std::string expected_json = test.expected_response_json; + if (test.expected_code == http_status_code::ok) { + expected_json += "\n"; + } + ASSERT_EQ(resp.body, expected_json); } fail::teardown(); } @@ -294,17 +294,15 @@ TEST_F(meta_bulk_load_http_test, query_bulk_load_request) { std::string app_name; std::string expected_json; - } tests[] = {{APP_NAME, - "{\"error\":\"ERR_OK\",\"app_status\":\"replication::bulk_load_status::" - "BLS_DOWNLOADING\"}\n"}, - {NOT_BULK_LOAD, - "{\"error\":\"ERR_INVALID_STATE\",\"app_status\":\"replication::" - "bulk_load_status::BLS_INVALID\"}\n"}, - {NOT_FOUND, - "{\"error\":\"ERR_APP_NOT_EXIST\",\"app_status\":\"replication::bulk_" - "load_status::BLS_INVALID\"}\n"}}; + } tests[] = { + {APP_NAME, + R"({"error":"ERR_OK","app_status":"replication::bulk_load_status::BLS_DOWNLOADING"})"}, + {NOT_BULK_LOAD, + R"({"error":"ERR_INVALID_STATE","app_status":"replication::bulk_load_status::BLS_INVALID"})"}, + {NOT_FOUND, + R"({"error":"ERR_APP_NOT_EXIST","app_status":"replication::bulk_load_status::BLS_INVALID"})"}}; for (const auto &test : tests) { - ASSERT_EQ(test_query_bulk_load(test.app_name), test.expected_json); + ASSERT_EQ(test_query_bulk_load(test.app_name), test.expected_json + "\n"); } drop_app(NOT_BULK_LOAD); @@ -335,12 +333,16 @@ TEST_F(meta_bulk_load_http_test, start_compaction_test) "max_running_count should be >= 0"}, {R"({"app_name":"test_bulk_load","type":"once","target_level":-1,"bottommost_level_compaction":"skip","max_concurrent_running_count":0,"trigger_time":""})", http_status_code::ok, - "{\"error\":\"ERR_OK\",\"hint_message\":\"\"}\n"}}; + R"({"error":"ERR_OK","hint_message":""})"}}; for (const auto &test : tests) { http_response resp = test_start_compaction(test.request_json); ASSERT_EQ(resp.status_code, test.expected_code); - ASSERT_EQ(resp.body, test.expected_response_json); + std::string expected_json = test.expected_response_json; + if (test.expected_code == http_status_code::ok) { + expected_json += "\n"; + } + ASSERT_EQ(resp.body, expected_json); } }