Skip to content

Commit

Permalink
feat(bulk_load): add update compaction envs http interface (XiaoMi#695)
Browse files Browse the repository at this point in the history
  • Loading branch information
hycdong authored and zhangyifan27 committed Dec 25, 2020
1 parent 3a05fbf commit 665c5b4
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 19 deletions.
96 changes: 96 additions & 0 deletions src/meta/meta_http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <dsn/c/api_layer1.h>
#include <dsn/cpp/serialization_helper/dsn.layer2_types.h>
#include <dsn/dist/replication/replica_envs.h>
#include <dsn/dist/replication/replication_types.h>
#include <dsn/dist/replication/duplication_common.h>
#include <dsn/utility/config_api.h>
Expand Down Expand Up @@ -699,6 +700,75 @@ void meta_http_service::query_bulk_load_handler(const http_request &req, http_re
resp.status_code = http_status_code::ok;
}

void meta_http_service::start_compaction_handler(const http_request &req, http_response &resp)
{
if (!redirect_if_not_primary(req, resp)) {
return;
}

// validate parameters
manual_compaction_info info;
bool ret = json::json_forwarder<manual_compaction_info>::decode(req.body, info);

if (!ret) {
resp.body = "invalid request structure";
resp.status_code = http_status_code::bad_request;
return;
}
if (info.app_name.empty()) {
resp.body = "app_name should not be empty";
resp.status_code = http_status_code::bad_request;
return;
}
if (info.type.empty() || (info.type != "once" && info.type != "periodic")) {
resp.body = "type should ony be 'once' or 'periodic'";
resp.status_code = http_status_code::bad_request;
return;
}
if (info.target_level < -1) {
resp.body = "target_level should be >= -1";
resp.status_code = http_status_code::bad_request;
return;
}
if (info.bottommost_level_compaction.empty() || (info.bottommost_level_compaction != "skip" &&
info.bottommost_level_compaction != "force")) {
resp.body = "bottommost_level_compaction should ony be 'skip' or 'force'";
resp.status_code = http_status_code::bad_request;
return;
}
if (info.max_concurrent_running_count < 0) {
resp.body = "max_running_count should be >= 0";
resp.status_code = http_status_code::bad_request;
return;
}
if (info.type == "periodic" && info.trigger_time.empty()) {
resp.body = "trigger_time should not be empty when type is periodic";
resp.status_code = http_status_code::bad_request;
return;
}

// create configuration_update_app_env_request
std::vector<std::string> keys;
std::vector<std::string> values;
if (info.type == "once") {
keys.emplace_back(replica_envs::MANUAL_COMPACT_ONCE_TARGET_LEVEL);
keys.emplace_back(replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION);
keys.emplace_back(replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME);
} else {
keys.emplace_back(replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL);
keys.emplace_back(replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION);
keys.emplace_back(replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME);
}
values.emplace_back(std::to_string(info.target_level));
values.emplace_back(info.bottommost_level_compaction);
values.emplace_back(info.type == "once" ? std::to_string(dsn_now_s()) : info.trigger_time);
if (info.max_concurrent_running_count > 0) {
keys.emplace_back(replica_envs::MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT);
values.emplace_back(std::to_string(info.max_concurrent_running_count));
}
update_app_env(info.app_name, keys, values, resp);
}

bool meta_http_service::redirect_if_not_primary(const http_request &req, http_response &resp)
{
#ifdef DSN_MOCK_TEST
Expand All @@ -722,5 +792,31 @@ bool meta_http_service::redirect_if_not_primary(const http_request &req, http_re
return false;
}

void meta_http_service::update_app_env(const std::string &app_name,
const std::vector<std::string> &keys,
const std::vector<std::string> &values,
http_response &resp)
{
configuration_update_app_env_request request;
request.app_name = app_name;
request.op = app_env_operation::APP_ENV_OP_SET;
request.__set_keys(keys);
request.__set_values(values);

auto rpc_req = dsn::make_unique<configuration_update_app_env_request>(request);
update_app_env_rpc rpc(std::move(rpc_req), LPC_META_STATE_NORMAL);
_service->_state->set_app_envs(rpc);

auto rpc_resp = rpc.response();
// output as json format
dsn::utils::table_printer tp;
tp.add_row_name_and_data("error", rpc_resp.err.to_string());
tp.add_row_name_and_data("hint_message", rpc_resp.hint_message);
std::ostringstream out;
tp.output(out, dsn::utils::table_printer::output_format::kJsonCompact);
resp.body = out.str();
resp.status_code = http_status_code::ok;
}

} // namespace replication
} // namespace dsn
29 changes: 29 additions & 0 deletions src/meta/meta_http_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,22 @@ namespace replication {
NON_MEMBER_JSON_SERIALIZATION(
start_bulk_load_request, app_name, cluster_name, file_provider_type, remote_root_path)

struct manual_compaction_info
{
std::string app_name;
std::string type; // periodic or once
int32_t target_level; // [-1,num_levels]
std::string bottommost_level_compaction; // skip or force
int32_t max_concurrent_running_count; // 0 means no limit
std::string trigger_time; // only used when the type is periodic
DEFINE_JSON_SERIALIZATION(app_name,
type,
target_level,
bottommost_level_compaction,
max_concurrent_running_count,
trigger_time)
};

class meta_service;
class meta_http_service : public http_service
{
Expand Down Expand Up @@ -76,6 +92,13 @@ class meta_http_service : public http_service
std::placeholders::_1,
std::placeholders::_2),
"ip:port/meta/query_bulk_load?name=temp");
// request body should be manual_compaction_info
register_handler("app/start_compaction",
std::bind(&meta_http_service::start_compaction_handler,
this,
std::placeholders::_1,
std::placeholders::_2),
"ip:port/meta/start_compaction");
}

std::string path() const override { return "meta"; }
Expand All @@ -89,11 +112,17 @@ class meta_http_service : public http_service
void query_duplication_handler(const http_request &req, http_response &resp);
void start_bulk_load_handler(const http_request &req, http_response &resp);
void query_bulk_load_handler(const http_request &req, http_response &resp);
void start_compaction_handler(const http_request &req, http_response &resp);

private:
// set redirect location if current server is not primary
bool redirect_if_not_primary(const http_request &req, http_response &resp);

void update_app_env(const std::string &app_name,
const std::vector<std::string> &keys,
const std::vector<std::string> &values,
http_response &resp);

meta_service *_service;
};

Expand Down
84 changes: 65 additions & 19 deletions src/meta/test/meta_http_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include <iostream>
#include <gtest/gtest.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/http/http_server.h>
Expand Down Expand Up @@ -211,6 +212,15 @@ class meta_bulk_load_http_test : public meta_test_base
return resp.body;
}

http_response test_start_compaction(std::string req_body_json)
{
http_request req;
http_response resp;
req.body = blob::create_from_bytes(std::move(req_body_json));
_mhs->start_compaction_handler(req, resp);
return resp;
}

void mock_bulk_load_context(const bulk_load_status::type &status)
{
auto app = find_app(APP_NAME);
Expand Down Expand Up @@ -247,24 +257,24 @@ TEST_F(meta_bulk_load_http_test, start_bulk_load_request)
http_status_code expected_code;
std::string expected_response_json;
} tests[] = {
{"{\"app\":\"test_bulk_load\",\"cluster_name\":\"onebox\", "
"\"file_provider_type\":\"local_service\", \"remote_root_path\":\"bulk_load_root\"}",
{R"({"app":"test_bulk_load","cluster_name":"onebox","file_provider_type":"local_service","remote_root_path":"bulk_load_root"})",
http_status_code::bad_request,
"invalid request structure"},
{"{\"app_name\":\"test_bulk_load\",\"cluster_name\":\"onebox\", "
"\"file_provider_type\":\"\", \"remote_root_path\":\"bulk_load_root\"}",
{R"({"app_name":"test_bulk_load","cluster_name":"onebox","file_provider_type":"","remote_root_path":"bulk_load_root"})",
http_status_code::bad_request,
"file_provider_type should not be empty"},
{"{\"app_name\":\"test_bulk_load\",\"cluster_name\":\"onebox\", "
"\"file_provider_type\":\"local_service\", \"remote_root_path\":\"bulk_load_root\"}",
{R"({"app_name":"test_bulk_load","cluster_name":"onebox","file_provider_type":"local_service","remote_root_path":"bulk_load_root"})",
http_status_code::ok,
"{\"error\":\"ERR_OK\",\"hint_msg\":\"\"}\n"},
R"({"error":"ERR_OK","hint_msg":""})"},
};

for (const auto &test : tests) {
http_response resp = test_start_bulk_load(test.request_json);
ASSERT_EQ(resp.status_code, test.expected_code);
ASSERT_EQ(resp.body, test.expected_response_json);
std::string expected_json = test.expected_response_json;
if (test.expected_code == http_status_code::ok) {
expected_json += "\n";
}
ASSERT_EQ(resp.body, expected_json);
}
fail::teardown();
}
Expand All @@ -281,21 +291,57 @@ TEST_F(meta_bulk_load_http_test, query_bulk_load_request)
{
std::string app_name;
std::string expected_json;
} tests[] = {{APP_NAME,
"{\"error\":\"ERR_OK\",\"app_status\":\"replication::bulk_load_status::"
"BLS_DOWNLOADING\"}\n"},
{NOT_BULK_LOAD,
"{\"error\":\"ERR_INVALID_STATE\",\"app_status\":\"replication::"
"bulk_load_status::BLS_INVALID\"}\n"},
{NOT_FOUND,
"{\"error\":\"ERR_APP_NOT_EXIST\",\"app_status\":\"replication::bulk_"
"load_status::BLS_INVALID\"}\n"}};
} tests[] = {
{APP_NAME,
R"({"error":"ERR_OK","app_status":"replication::bulk_load_status::BLS_DOWNLOADING"})"},
{NOT_BULK_LOAD,
R"({"error":"ERR_INVALID_STATE","app_status":"replication::bulk_load_status::BLS_INVALID"})"},
{NOT_FOUND,
R"({"error":"ERR_APP_NOT_EXIST","app_status":"replication::bulk_load_status::BLS_INVALID"})"}};
for (const auto &test : tests) {
ASSERT_EQ(test_query_bulk_load(test.app_name), test.expected_json);
ASSERT_EQ(test_query_bulk_load(test.app_name), test.expected_json + "\n");
}

drop_app(NOT_BULK_LOAD);
}

TEST_F(meta_bulk_load_http_test, start_compaction_test)
{
struct start_compaction_test
{
std::string request_json;
http_status_code expected_code;
std::string expected_response_json;
} tests[] = {
{R"({"app_name":"test_bulk_load","type":"once","target_level":-1,"bottommost_level_compaction":"skip","max_concurrent_running_count":"0"})",
http_status_code::bad_request,
"invalid request structure"},
{R"({"app_name":"test_bulk_load","type":"wrong","target_level":-1,"bottommost_level_compaction":"skip","max_concurrent_running_count":0,"trigger_time":""})",
http_status_code::bad_request,
"type should ony be 'once' or 'periodic'"},
{R"({"app_name":"test_bulk_load","type":"once","target_level":-3,"bottommost_level_compaction":"skip","max_concurrent_running_count":0,"trigger_time":""})",
http_status_code::bad_request,
"target_level should be >= -1"},
{R"({"app_name":"test_bulk_load","type":"once","target_level":-1,"bottommost_level_compaction":"wrong","max_concurrent_running_count":0,"trigger_time":""})",
http_status_code::bad_request,
"bottommost_level_compaction should ony be 'skip' or 'force'"},
{R"({"app_name":"test_bulk_load","type":"once","target_level":-1,"bottommost_level_compaction":"skip","max_concurrent_running_count":-2,"trigger_time":""})",
http_status_code::bad_request,
"max_running_count should be >= 0"},
{R"({"app_name":"test_bulk_load","type":"once","target_level":-1,"bottommost_level_compaction":"skip","max_concurrent_running_count":0,"trigger_time":""})",
http_status_code::ok,
R"({"error":"ERR_OK","hint_message":""})"}};

for (const auto &test : tests) {
http_response resp = test_start_compaction(test.request_json);
ASSERT_EQ(resp.status_code, test.expected_code);
std::string expected_json = test.expected_response_json;
if (test.expected_code == http_status_code::ok) {
expected_json += "\n";
}
ASSERT_EQ(resp.body, expected_json);
}
}

} // namespace replication
} // namespace dsn

0 comments on commit 665c5b4

Please sign in to comment.