diff --git a/src/core/tests/hostname.cpp b/src/core/tests/hostname_test.cpp similarity index 100% rename from src/core/tests/hostname.cpp rename to src/core/tests/hostname_test.cpp diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index 9546f2da2a..7e868a3552 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -2388,7 +2388,7 @@ void replica_stub::create_child_replica(rpc_address primary_address, if (child_replica != nullptr) { ddebug_f("create child replica ({}) succeed", child_gpid); tasking::enqueue(LPC_PARTITION_SPLIT, - &_tracker, + child_replica->tracker(), std::bind(&replica::child_init_replica, child_replica, parent_gpid, diff --git a/src/dist/replication/meta_server/meta_backup_service.cpp b/src/dist/replication/meta_server/meta_backup_service.cpp index 7caf4e76e5..0736353f40 100644 --- a/src/dist/replication/meta_server/meta_backup_service.cpp +++ b/src/dist/replication/meta_server/meta_backup_service.cpp @@ -1,4 +1,8 @@ +#include "meta_backup_service.h" + #include +#include +#include #include "meta_backup_service.h" #include "dist/replication/meta_server/meta_service.h" @@ -1201,7 +1205,7 @@ void backup_service::start() start_create_policy_meta_root(after_create_policy_meta_root); } -void backup_service::add_new_policy(dsn::message_ex *msg) +void backup_service::add_backup_policy(dsn::message_ex *msg) { configuration_add_backup_policy_request request; configuration_add_backup_policy_response response; @@ -1366,12 +1370,10 @@ bool backup_service::is_valid_policy_name_unlocked(const std::string &policy_nam return (iter == _policy_states.end()); } -void backup_service::query_policy(dsn::message_ex *msg) +void backup_service::query_backup_policy(query_backup_policy_rpc rpc) { - configuration_query_backup_policy_request request; - configuration_query_backup_policy_response response; - - ::dsn::unmarshall(msg, request); + const configuration_query_backup_policy_request &request = rpc.request(); + configuration_query_backup_policy_response &response = rpc.response(); response.err = ERR_OK; @@ -1409,7 +1411,7 @@ void backup_service::query_policy(dsn::message_ex *msg) p_entry.backup_history_count_to_keep = cur_policy.backup_history_count_to_keep; p_entry.start_time = cur_policy.start_time.to_string(); p_entry.is_disable = cur_policy.is_disable; - response.policys.emplace_back(std::move(p_entry)); + response.policys.emplace_back(p_entry); // acquire backup_infos std::vector b_infos = policy_context_ptr->get_backup_infos(request.backup_info_count); @@ -1435,12 +1437,9 @@ void backup_service::query_policy(dsn::message_ex *msg) if (!response.hint_msg.empty()) { response.__isset.hint_msg = true; } - - _meta_svc->reply_data(msg, response); - msg->release_ref(); } -void backup_service::modify_policy(dsn::message_ex *msg) +void backup_service::modify_backup_policy(dsn::message_ex *msg) { configuration_modify_backup_policy_request request; configuration_modify_backup_policy_response response; diff --git a/src/dist/replication/meta_server/meta_backup_service.h b/src/dist/replication/meta_server/meta_backup_service.h index eeebdf89b8..4a386c7f22 100644 --- a/src/dist/replication/meta_server/meta_backup_service.h +++ b/src/dist/replication/meta_server/meta_backup_service.h @@ -4,7 +4,9 @@ #include #include // std::setfill, std::setw #include + #include +#include #include #include "meta_data.h" @@ -18,6 +20,10 @@ class meta_service; class server_state; class backup_service; +typedef rpc_holder + query_backup_policy_rpc; + struct backup_info_status { enum type @@ -314,9 +320,9 @@ class backup_service const std::string &backup_root() const { return _backup_root; } const std::string &policy_root() const { return _policy_meta_root; } - void add_new_policy(dsn::message_ex* msg); - void query_policy(dsn::message_ex* msg); - void modify_policy(dsn::message_ex* msg); + void add_backup_policy(dsn::message_ex* msg); + void query_backup_policy(query_backup_policy_rpc rpc); + void modify_backup_policy(dsn::message_ex* msg); // compose the absolute path(AP) for policy // input: diff --git a/src/dist/replication/meta_server/meta_http_service.cpp b/src/dist/replication/meta_server/meta_http_service.cpp index 1003421e85..94572c84ad 100644 --- a/src/dist/replication/meta_server/meta_http_service.cpp +++ b/src/dist/replication/meta_server/meta_http_service.cpp @@ -5,15 +5,16 @@ #include #include +#include #include #include #include #include -#include "server_load_balancer.h" -#include "server_state.h" #include "meta_http_service.h" #include "meta_server_failure_detector.h" +#include "server_load_balancer.h" +#include "server_state.h" namespace dsn { namespace replication { @@ -515,6 +516,66 @@ void meta_http_service::get_app_envs_handler(const http_request &req, http_respo resp.status_code = http_status_code::ok; } +std::string set_to_string(const std::set &s) +{ + std::stringstream out; + rapidjson::OStreamWrapper wrapper(out); + dsn::json::JsonWriter writer(wrapper); + dsn::json::json_encode(writer, s); + return out.str(); +} + +void meta_http_service::query_backup_policy_handler(const http_request &req, http_response &resp) +{ + if (!redirect_if_not_primary(req, resp)) + return; + + if (_service->_backup_handler == nullptr) { + resp.body = "cold_backup_disabled"; + resp.status_code = http_status_code::not_found; + return; + } + auto request = dsn::make_unique(); + std::vector policy_names; + for (const auto &p : req.query_args) { + if (p.first == "name") { + policy_names.push_back(p.second); + } else { + resp.body = "Invalid parameter"; + resp.status_code = http_status_code::bad_request; + return; + } + } + request->policy_names = std::move(policy_names); + query_backup_policy_rpc http_to_rpc(std::move(request), LPC_DEFAULT_CALLBACK); + _service->_backup_handler->query_backup_policy(http_to_rpc); + auto rpc_return = http_to_rpc.response(); + + dsn::utils::table_printer tp_query_backup_policy; + tp_query_backup_policy.add_title("name"); + tp_query_backup_policy.add_column("backup_provider_type"); + tp_query_backup_policy.add_column("backup_interval"); + tp_query_backup_policy.add_column("app_ids"); + tp_query_backup_policy.add_column("start_time"); + tp_query_backup_policy.add_column("status"); + tp_query_backup_policy.add_column("backup_history_count"); + for (const auto &cur_policy : rpc_return.policys) { + tp_query_backup_policy.add_row(cur_policy.policy_name); + tp_query_backup_policy.append_data(cur_policy.backup_provider_type); + tp_query_backup_policy.append_data(cur_policy.backup_interval_seconds); + tp_query_backup_policy.append_data(set_to_string(cur_policy.app_ids)); + tp_query_backup_policy.append_data(cur_policy.start_time); + tp_query_backup_policy.append_data(cur_policy.is_disable ? "disabled" : "enabled"); + tp_query_backup_policy.append_data(cur_policy.backup_history_count_to_keep); + } + std::ostringstream out; + tp_query_backup_policy.output(out, dsn::utils::table_printer::output_format::kJsonCompact); + resp.body = out.str(); + resp.status_code = http_status_code::ok; + + return; +} + bool meta_http_service::redirect_if_not_primary(const http_request &req, http_response &resp) { #ifdef DSN_MOCK_TEST diff --git a/src/dist/replication/meta_server/meta_http_service.h b/src/dist/replication/meta_server/meta_http_service.h index 40bbbbd62b..e6130c2da7 100644 --- a/src/dist/replication/meta_server/meta_http_service.h +++ b/src/dist/replication/meta_server/meta_http_service.h @@ -47,6 +47,12 @@ class meta_http_service : public http_service this, std::placeholders::_1, std::placeholders::_2)); + // GET ip:port/meta/backup_policy + register_handler("backup_policy", + std::bind(&meta_http_service::query_backup_policy_handler, + this, + std::placeholders::_1, + std::placeholders::_2)); } std::string path() const override { return "meta"; } @@ -56,6 +62,7 @@ class meta_http_service : public http_service void list_node_handler(const http_request &req, http_response &resp); void get_cluster_info_handler(const http_request &req, http_response &resp); void get_app_envs_handler(const http_request &req, http_response &resp); + void query_backup_policy_handler(const http_request &req, http_response &resp); private: // set redirect location if current server is not primary diff --git a/src/dist/replication/meta_server/meta_service.cpp b/src/dist/replication/meta_server/meta_service.cpp index 2d255d3873..04d2c0a09e 100644 --- a/src/dist/replication/meta_server/meta_service.cpp +++ b/src/dist/replication/meta_server/meta_service.cpp @@ -354,7 +354,7 @@ void meta_service::register_rpc_handlers() register_rpc_handler(RPC_CM_START_RESTORE, "start_restore", &meta_service::on_start_restore); register_rpc_handler( RPC_CM_ADD_BACKUP_POLICY, "add_backup_policy", &meta_service::on_add_backup_policy); - register_rpc_handler( + register_rpc_handler_with_rpc_holder( RPC_CM_QUERY_BACKUP_POLICY, "query_backup_policy", &meta_service::on_query_backup_policy); register_rpc_handler(RPC_CM_MODIFY_BACKUP_POLICY, "modify_backup_policy", @@ -731,24 +731,23 @@ void meta_service::on_add_backup_policy(dsn::message_ex *req) req->add_ref(); tasking::enqueue(LPC_DEFAULT_CALLBACK, nullptr, - std::bind(&backup_service::add_new_policy, _backup_handler.get(), req)); + std::bind(&backup_service::add_backup_policy, _backup_handler.get(), req)); } } -void meta_service::on_query_backup_policy(dsn::message_ex *req) +void meta_service::on_query_backup_policy(query_backup_policy_rpc policy_rpc) { - configuration_query_backup_policy_response response; - RPC_CHECK_STATUS(req, response); + auto &response = policy_rpc.response(); + RPC_CHECK_STATUS(policy_rpc.dsn_request(), response); if (_backup_handler == nullptr) { derror("meta doesn't enable backup service"); response.err = ERR_SERVICE_NOT_ACTIVE; - reply(req, response); } else { - req->add_ref(); - tasking::enqueue(LPC_DEFAULT_CALLBACK, - nullptr, - std::bind(&backup_service::query_policy, _backup_handler.get(), req)); + tasking::enqueue( + LPC_DEFAULT_CALLBACK, + nullptr, + std::bind(&backup_service::query_backup_policy, _backup_handler.get(), policy_rpc)); } } @@ -763,9 +762,10 @@ void meta_service::on_modify_backup_policy(dsn::message_ex *req) reply(req, response); } else { req->add_ref(); - tasking::enqueue(LPC_DEFAULT_CALLBACK, - nullptr, - std::bind(&backup_service::modify_policy, _backup_handler.get(), req)); + tasking::enqueue( + LPC_DEFAULT_CALLBACK, + nullptr, + std::bind(&backup_service::modify_backup_policy, _backup_handler.get(), req)); } } diff --git a/src/dist/replication/meta_server/meta_service.h b/src/dist/replication/meta_server/meta_service.h index 299976ea0e..7a6f1c94a9 100644 --- a/src/dist/replication/meta_server/meta_service.h +++ b/src/dist/replication/meta_server/meta_service.h @@ -168,7 +168,7 @@ class meta_service : public serverlet void on_start_recovery(dsn::message_ex *req); void on_start_restore(dsn::message_ex *req); void on_add_backup_policy(dsn::message_ex *req); - void on_query_backup_policy(dsn::message_ex *req); + void on_query_backup_policy(query_backup_policy_rpc policy_rpc); void on_modify_backup_policy(dsn::message_ex *req); void on_report_restore_status(dsn::message_ex *req); void on_query_restore_status(dsn::message_ex *req); @@ -217,6 +217,7 @@ class meta_service : public serverlet friend class meta_test_base; friend class meta_duplication_service; friend class meta_http_service_test; + friend class meta_backup_test_base; friend class meta_http_service; std::unique_ptr _dup_svc; diff --git a/src/dist/replication/test/meta_test/unit_test/backup_test.cpp b/src/dist/replication/test/meta_test/unit_test/backup_test.cpp index e68b1826ab..a1425eff34 100644 --- a/src/dist/replication/test/meta_test/unit_test/backup_test.cpp +++ b/src/dist/replication/test/meta_test/unit_test/backup_test.cpp @@ -662,9 +662,9 @@ void meta_service_test_app::backup_service_test() ASSERT_TRUE(flag); } - // testing add_new_policy() + // testing add_backup_policy() { - std::cout << "add_new_policy()..." << std::endl; + std::cout << "add_backup_policy()..." << std::endl; // create a fake add_backup_policy_request configuration_add_backup_policy_request req; req.backup_provider_type = std::string("local_service"); @@ -679,7 +679,7 @@ void meta_service_test_app::backup_service_test() auto r = fake_rpc_call(RPC_CM_ADD_BACKUP_POLICY, LPC_DEFAULT_CALLBACK, backup_svc, - &backup_service::add_new_policy, + &backup_service::add_backup_policy, req); fake_wait_rpc(r, resp); ASSERT_TRUE(resp.err == ERR_INVALID_PARAMETERS); @@ -694,7 +694,7 @@ void meta_service_test_app::backup_service_test() auto r = fake_rpc_call(RPC_CM_ADD_BACKUP_POLICY, LPC_DEFAULT_CALLBACK, backup_svc, - &backup_service::add_new_policy, + &backup_service::add_backup_policy, req); fake_wait_rpc(r, resp); ASSERT_TRUE(resp.err == ERR_OK); diff --git a/src/dist/replication/test/meta_test/unit_test/config-test.ini b/src/dist/replication/test/meta_test/unit_test/config-test.ini index df7ea400aa..399886b094 100644 --- a/src/dist/replication/test/meta_test/unit_test/config-test.ini +++ b/src/dist/replication/test/meta_test/unit_test/config-test.ini @@ -82,6 +82,7 @@ meta_function_level_on_start = lively app_balancer_in_turn = false only_primary_balancer = false only_move_primary = false +cold_backup_disabled = false [replication] cluster_name = master-cluster diff --git a/src/dist/replication/test/meta_test/unit_test/meta_http_service_test.cpp b/src/dist/replication/test/meta_test/unit_test/meta_http_service_test.cpp index edc1adc311..2c46e9fd3f 100644 --- a/src/dist/replication/test/meta_test/unit_test/meta_http_service_test.cpp +++ b/src/dist/replication/test/meta_test/unit_test/meta_http_service_test.cpp @@ -64,9 +64,115 @@ class meta_http_service_test : public meta_test_base std::string test_app = "test_meta_http"; }; +class meta_backup_test_base : public meta_test_base +{ +public: + void SetUp() override + { + meta_test_base::SetUp(); + + _ms->_backup_handler = std::make_shared( + _ms.get(), + _ms->_cluster_root + "/backup_meta", + _ms->_cluster_root + "/backup", + [](backup_service *bs) { return std::make_shared(bs); }); + _ms->_backup_handler->start(); + _ms->_backup_handler->backup_option().app_dropped_retry_delay_ms = 500_ms; + _ms->_backup_handler->backup_option().request_backup_period_ms = 20_ms; + _ms->_backup_handler->backup_option().issue_backup_interval_ms = 1000_ms; + const std::string policy_root = "/test"; + dsn::error_code ec; + _ms->_storage + ->create_node( + _policy_root, dsn::TASK_CODE_EXEC_INLINED, [&ec](dsn::error_code err) { ec = err; }) + ->wait(); + _mhs = dsn::make_unique(_ms.get()); + create_app(test_app); + } + + void add_backup_policy(const std::string &policy_name) + { + static const std::string test_policy_name = policy_name; + const std::string policy_root = "/test"; + + configuration_add_backup_policy_request request; + configuration_add_backup_policy_response response; + + request.policy_name = policy_name; + request.backup_provider_type = "local_service"; + request.backup_interval_seconds = 1; + request.backup_history_count_to_keep = 1; + request.start_time = "12:00"; + request.app_ids.clear(); + request.app_ids.push_back(2); + + auto result = fake_create_policy(_ms->_backup_handler.get(), request); + + fake_wait_rpc(result, response); + // need to fix + ASSERT_EQ(response.err, ERR_OK); + } + + void test_get_backup_policy(const std::string &name, + const std::string &expected_json, + const http_status_code &http_status) + { + http_request req; + http_response resp; + if (!name.empty()) + req.query_args.emplace("name", name); + _mhs->query_backup_policy_handler(req, resp); + ASSERT_EQ(resp.status_code, http_status) << http_status_code_to_string(resp.status_code); + ASSERT_EQ(resp.body, expected_json); + } + +protected: + const std::string _policy_root = "/test"; + + std::unique_ptr _mhs; + std::string test_app = "test_meta_http"; +}; + TEST_F(meta_http_service_test, get_app_from_primary) { test_get_app_from_primary(); } TEST_F(meta_http_service_test, get_app_envs) { test_get_app_envs(); } +TEST_F(meta_backup_test_base, get_backup_policy) +{ + struct http_backup_policy_test + { + std::string name; + std::string expected_json; + http_status_code http_status; + } tests[5] = { + {"", "{}\n", http_status_code::ok}, + {"TEST1", + "{\"TEST1\":{\"name\":\"TEST1\",\"backup_provider_type\":\"local_service\"," + "\"backup_interval\":\"1\",\"app_ids\":\"[2]\",\"start_time\":\"12:00\"," + "\"status\":\"enabled\",\"backup_history_count\":\"1\"}}\n", + http_status_code::ok}, + {"TEST2", + "{\"TEST2\":{\"name\":\"TEST2\",\"backup_provider_type\":\"local_service\"," + "\"backup_interval\":\"1\",\"app_ids\":\"[2]\",\"start_time\":\"12:00\"," + "\"status\":\"enabled\",\"backup_history_count\":\"1\"}}\n", + http_status_code::ok}, + {"", + "{\"TEST1\":{\"name\":\"TEST1\",\"backup_provider_type\":\"local_service\",\"backup_" + "interval\":\"1\",\"app_ids\":\"[2]\",\"start_time\":\"12:00\",\"status\":\"enabled\"," + "\"backup_history_count\":\"1\"},\"TEST2\":{\"name\":\"TEST2\",\"backup_provider_" + "type\":\"local_service\",\"backup_interval\":\"1\",\"app_ids\":\"[2]\",\"start_" + "time\":\"12:00\",\"status\":\"enabled\",\"backup_history_count\":\"1\"}}\n", + http_status_code::ok}, + {"TEST3", "{}\n", http_status_code::ok}, + }; + test_get_backup_policy(tests[0].name, tests[0].expected_json, tests[0].http_status); + add_backup_policy("TEST1"); + test_get_backup_policy(tests[1].name, tests[1].expected_json, tests[1].http_status); + add_backup_policy("TEST2"); + test_get_backup_policy(tests[2].name, tests[2].expected_json, tests[2].http_status); + test_get_backup_policy(tests[3].name, tests[3].expected_json, tests[3].http_status); + test_get_backup_policy(tests[4].name, tests[4].expected_json, tests[4].http_status); +} + } // namespace replication } // namespace dsn diff --git a/src/dist/replication/test/meta_test/unit_test/meta_service_test_app.h b/src/dist/replication/test/meta_test/unit_test/meta_service_test_app.h index 87b4f01cbb..1d703d61e3 100644 --- a/src/dist/replication/test/meta_test/unit_test/meta_service_test_app.h +++ b/src/dist/replication/test/meta_test/unit_test/meta_service_test_app.h @@ -179,6 +179,13 @@ fake_rpc_call(dsn::task_code rpc_code, fake_rpc_call( \ RPC_CM_RECALL_APP, LPC_META_STATE_NORMAL, state, &server_state::recall_app, request_data) +#define fake_create_policy(state, request_data) \ + fake_rpc_call(RPC_CM_ADD_BACKUP_POLICY, \ + LPC_DEFAULT_CALLBACK, \ + state, \ + &backup_service::add_backup_policy, \ + request_data) + #define fake_wait_rpc(context, response_data) \ do { \ context->e.wait(); \