Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat(http): add HTTP interfaces to query backup info (#342)
Browse files Browse the repository at this point in the history
  • Loading branch information
Smityz authored and acelyc111 committed Dec 12, 2019
1 parent 33b459e commit ca49482
Show file tree
Hide file tree
Showing 12 changed files with 223 additions and 35 deletions.
File renamed without changes.
2 changes: 1 addition & 1 deletion src/dist/replication/lib/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2388,7 +2388,7 @@ void replica_stub::create_child_replica(rpc_address primary_address,
if (child_replica != nullptr) {
ddebug_f("create child replica ({}) succeed", child_gpid);
tasking::enqueue(LPC_PARTITION_SPLIT,
&_tracker,
child_replica->tracker(),
std::bind(&replica::child_init_replica,
child_replica,
parent_gpid,
Expand Down
21 changes: 10 additions & 11 deletions src/dist/replication/meta_server/meta_backup_service.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
#include "meta_backup_service.h"

#include <dsn/utility/filesystem.h>
#include <dsn/utility/output_utils.h>
#include <dsn/tool-api/http_server.h>

#include "meta_backup_service.h"
#include "dist/replication/meta_server/meta_service.h"
Expand Down Expand Up @@ -1201,7 +1205,7 @@ void backup_service::start()
start_create_policy_meta_root(after_create_policy_meta_root);
}

void backup_service::add_new_policy(dsn::message_ex *msg)
void backup_service::add_backup_policy(dsn::message_ex *msg)
{
configuration_add_backup_policy_request request;
configuration_add_backup_policy_response response;
Expand Down Expand Up @@ -1366,12 +1370,10 @@ bool backup_service::is_valid_policy_name_unlocked(const std::string &policy_nam
return (iter == _policy_states.end());
}

void backup_service::query_policy(dsn::message_ex *msg)
void backup_service::query_backup_policy(query_backup_policy_rpc rpc)
{
configuration_query_backup_policy_request request;
configuration_query_backup_policy_response response;

::dsn::unmarshall(msg, request);
const configuration_query_backup_policy_request &request = rpc.request();
configuration_query_backup_policy_response &response = rpc.response();

response.err = ERR_OK;

Expand Down Expand Up @@ -1409,7 +1411,7 @@ void backup_service::query_policy(dsn::message_ex *msg)
p_entry.backup_history_count_to_keep = cur_policy.backup_history_count_to_keep;
p_entry.start_time = cur_policy.start_time.to_string();
p_entry.is_disable = cur_policy.is_disable;
response.policys.emplace_back(std::move(p_entry));
response.policys.emplace_back(p_entry);
// acquire backup_infos
std::vector<backup_info> b_infos =
policy_context_ptr->get_backup_infos(request.backup_info_count);
Expand All @@ -1435,12 +1437,9 @@ void backup_service::query_policy(dsn::message_ex *msg)
if (!response.hint_msg.empty()) {
response.__isset.hint_msg = true;
}

_meta_svc->reply_data(msg, response);
msg->release_ref();
}

void backup_service::modify_policy(dsn::message_ex *msg)
void backup_service::modify_backup_policy(dsn::message_ex *msg)
{
configuration_modify_backup_policy_request request;
configuration_modify_backup_policy_response response;
Expand Down
12 changes: 9 additions & 3 deletions src/dist/replication/meta_server/meta_backup_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
#include <sstream>
#include <iomanip> // std::setfill, std::setw
#include <functional>

#include <dsn/dist/block_service.h>
#include <dsn/tool-api/http_server.h>
#include <dsn/perf_counter/perf_counter_wrapper.h>

#include "meta_data.h"
Expand All @@ -18,6 +20,10 @@ class meta_service;
class server_state;
class backup_service;

typedef rpc_holder<configuration_query_backup_policy_request,
configuration_query_backup_policy_response>
query_backup_policy_rpc;

struct backup_info_status
{
enum type
Expand Down Expand Up @@ -314,9 +320,9 @@ class backup_service

const std::string &backup_root() const { return _backup_root; }
const std::string &policy_root() const { return _policy_meta_root; }
void add_new_policy(dsn::message_ex* msg);
void query_policy(dsn::message_ex* msg);
void modify_policy(dsn::message_ex* msg);
void add_backup_policy(dsn::message_ex* msg);
void query_backup_policy(query_backup_policy_rpc rpc);
void modify_backup_policy(dsn::message_ex* msg);

// compose the absolute path(AP) for policy
// input:
Expand Down
65 changes: 63 additions & 2 deletions src/dist/replication/meta_server/meta_http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@
#include <string>

#include <dsn/c/api_layer1.h>
#include <dsn/cpp/json_helper.h>
#include <dsn/cpp/serialization_helper/dsn.layer2_types.h>
#include <dsn/dist/replication/replication_types.h>
#include <dsn/utility/config_api.h>
#include <dsn/utility/output_utils.h>

#include "server_load_balancer.h"
#include "server_state.h"
#include "meta_http_service.h"
#include "meta_server_failure_detector.h"
#include "server_load_balancer.h"
#include "server_state.h"

namespace dsn {
namespace replication {
Expand Down Expand Up @@ -515,6 +516,66 @@ void meta_http_service::get_app_envs_handler(const http_request &req, http_respo
resp.status_code = http_status_code::ok;
}

std::string set_to_string(const std::set<int32_t> &s)
{
std::stringstream out;
rapidjson::OStreamWrapper wrapper(out);
dsn::json::JsonWriter writer(wrapper);
dsn::json::json_encode(writer, s);
return out.str();
}

void meta_http_service::query_backup_policy_handler(const http_request &req, http_response &resp)
{
if (!redirect_if_not_primary(req, resp))
return;

if (_service->_backup_handler == nullptr) {
resp.body = "cold_backup_disabled";
resp.status_code = http_status_code::not_found;
return;
}
auto request = dsn::make_unique<configuration_query_backup_policy_request>();
std::vector<std::string> policy_names;
for (const auto &p : req.query_args) {
if (p.first == "name") {
policy_names.push_back(p.second);
} else {
resp.body = "Invalid parameter";
resp.status_code = http_status_code::bad_request;
return;
}
}
request->policy_names = std::move(policy_names);
query_backup_policy_rpc http_to_rpc(std::move(request), LPC_DEFAULT_CALLBACK);
_service->_backup_handler->query_backup_policy(http_to_rpc);
auto rpc_return = http_to_rpc.response();

dsn::utils::table_printer tp_query_backup_policy;
tp_query_backup_policy.add_title("name");
tp_query_backup_policy.add_column("backup_provider_type");
tp_query_backup_policy.add_column("backup_interval");
tp_query_backup_policy.add_column("app_ids");
tp_query_backup_policy.add_column("start_time");
tp_query_backup_policy.add_column("status");
tp_query_backup_policy.add_column("backup_history_count");
for (const auto &cur_policy : rpc_return.policys) {
tp_query_backup_policy.add_row(cur_policy.policy_name);
tp_query_backup_policy.append_data(cur_policy.backup_provider_type);
tp_query_backup_policy.append_data(cur_policy.backup_interval_seconds);
tp_query_backup_policy.append_data(set_to_string(cur_policy.app_ids));
tp_query_backup_policy.append_data(cur_policy.start_time);
tp_query_backup_policy.append_data(cur_policy.is_disable ? "disabled" : "enabled");
tp_query_backup_policy.append_data(cur_policy.backup_history_count_to_keep);
}
std::ostringstream out;
tp_query_backup_policy.output(out, dsn::utils::table_printer::output_format::kJsonCompact);
resp.body = out.str();
resp.status_code = http_status_code::ok;

return;
}

bool meta_http_service::redirect_if_not_primary(const http_request &req, http_response &resp)
{
#ifdef DSN_MOCK_TEST
Expand Down
7 changes: 7 additions & 0 deletions src/dist/replication/meta_server/meta_http_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ class meta_http_service : public http_service
this,
std::placeholders::_1,
std::placeholders::_2));
// GET ip:port/meta/backup_policy
register_handler("backup_policy",
std::bind(&meta_http_service::query_backup_policy_handler,
this,
std::placeholders::_1,
std::placeholders::_2));
}

std::string path() const override { return "meta"; }
Expand All @@ -56,6 +62,7 @@ class meta_http_service : public http_service
void list_node_handler(const http_request &req, http_response &resp);
void get_cluster_info_handler(const http_request &req, http_response &resp);
void get_app_envs_handler(const http_request &req, http_response &resp);
void query_backup_policy_handler(const http_request &req, http_response &resp);

private:
// set redirect location if current server is not primary
Expand Down
26 changes: 13 additions & 13 deletions src/dist/replication/meta_server/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ void meta_service::register_rpc_handlers()
register_rpc_handler(RPC_CM_START_RESTORE, "start_restore", &meta_service::on_start_restore);
register_rpc_handler(
RPC_CM_ADD_BACKUP_POLICY, "add_backup_policy", &meta_service::on_add_backup_policy);
register_rpc_handler(
register_rpc_handler_with_rpc_holder(
RPC_CM_QUERY_BACKUP_POLICY, "query_backup_policy", &meta_service::on_query_backup_policy);
register_rpc_handler(RPC_CM_MODIFY_BACKUP_POLICY,
"modify_backup_policy",
Expand Down Expand Up @@ -731,24 +731,23 @@ void meta_service::on_add_backup_policy(dsn::message_ex *req)
req->add_ref();
tasking::enqueue(LPC_DEFAULT_CALLBACK,
nullptr,
std::bind(&backup_service::add_new_policy, _backup_handler.get(), req));
std::bind(&backup_service::add_backup_policy, _backup_handler.get(), req));
}
}

void meta_service::on_query_backup_policy(dsn::message_ex *req)
void meta_service::on_query_backup_policy(query_backup_policy_rpc policy_rpc)
{
configuration_query_backup_policy_response response;
RPC_CHECK_STATUS(req, response);
auto &response = policy_rpc.response();
RPC_CHECK_STATUS(policy_rpc.dsn_request(), response);

if (_backup_handler == nullptr) {
derror("meta doesn't enable backup service");
response.err = ERR_SERVICE_NOT_ACTIVE;
reply(req, response);
} else {
req->add_ref();
tasking::enqueue(LPC_DEFAULT_CALLBACK,
nullptr,
std::bind(&backup_service::query_policy, _backup_handler.get(), req));
tasking::enqueue(
LPC_DEFAULT_CALLBACK,
nullptr,
std::bind(&backup_service::query_backup_policy, _backup_handler.get(), policy_rpc));
}
}

Expand All @@ -763,9 +762,10 @@ void meta_service::on_modify_backup_policy(dsn::message_ex *req)
reply(req, response);
} else {
req->add_ref();
tasking::enqueue(LPC_DEFAULT_CALLBACK,
nullptr,
std::bind(&backup_service::modify_policy, _backup_handler.get(), req));
tasking::enqueue(
LPC_DEFAULT_CALLBACK,
nullptr,
std::bind(&backup_service::modify_backup_policy, _backup_handler.get(), req));
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/dist/replication/meta_server/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class meta_service : public serverlet<meta_service>
void on_start_recovery(dsn::message_ex *req);
void on_start_restore(dsn::message_ex *req);
void on_add_backup_policy(dsn::message_ex *req);
void on_query_backup_policy(dsn::message_ex *req);
void on_query_backup_policy(query_backup_policy_rpc policy_rpc);
void on_modify_backup_policy(dsn::message_ex *req);
void on_report_restore_status(dsn::message_ex *req);
void on_query_restore_status(dsn::message_ex *req);
Expand Down Expand Up @@ -217,6 +217,7 @@ class meta_service : public serverlet<meta_service>
friend class meta_test_base;
friend class meta_duplication_service;
friend class meta_http_service_test;
friend class meta_backup_test_base;
friend class meta_http_service;
std::unique_ptr<meta_duplication_service> _dup_svc;

Expand Down
8 changes: 4 additions & 4 deletions src/dist/replication/test/meta_test/unit_test/backup_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -662,9 +662,9 @@ void meta_service_test_app::backup_service_test()
ASSERT_TRUE(flag);
}

// testing add_new_policy()
// testing add_backup_policy()
{
std::cout << "add_new_policy()..." << std::endl;
std::cout << "add_backup_policy()..." << std::endl;
// create a fake add_backup_policy_request
configuration_add_backup_policy_request req;
req.backup_provider_type = std::string("local_service");
Expand All @@ -679,7 +679,7 @@ void meta_service_test_app::backup_service_test()
auto r = fake_rpc_call(RPC_CM_ADD_BACKUP_POLICY,
LPC_DEFAULT_CALLBACK,
backup_svc,
&backup_service::add_new_policy,
&backup_service::add_backup_policy,
req);
fake_wait_rpc(r, resp);
ASSERT_TRUE(resp.err == ERR_INVALID_PARAMETERS);
Expand All @@ -694,7 +694,7 @@ void meta_service_test_app::backup_service_test()
auto r = fake_rpc_call(RPC_CM_ADD_BACKUP_POLICY,
LPC_DEFAULT_CALLBACK,
backup_svc,
&backup_service::add_new_policy,
&backup_service::add_backup_policy,
req);
fake_wait_rpc(r, resp);
ASSERT_TRUE(resp.err == ERR_OK);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ meta_function_level_on_start = lively
app_balancer_in_turn = false
only_primary_balancer = false
only_move_primary = false
cold_backup_disabled = false

[replication]
cluster_name = master-cluster
Expand Down
Loading

0 comments on commit ca49482

Please sign in to comment.