Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(bulk_load): add update compaction envs http interface #695

Merged
merged 11 commits into from
Dec 25, 2020
96 changes: 96 additions & 0 deletions src/meta/meta_http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <dsn/c/api_layer1.h>
#include <dsn/cpp/serialization_helper/dsn.layer2_types.h>
#include <dsn/dist/replication/replica_envs.h>
#include <dsn/dist/replication/replication_types.h>
#include <dsn/dist/replication/duplication_common.h>
#include <dsn/utility/config_api.h>
Expand Down Expand Up @@ -699,6 +700,75 @@ void meta_http_service::query_bulk_load_handler(const http_request &req, http_re
resp.status_code = http_status_code::ok;
}

void meta_http_service::start_compaction_handler(const http_request &req, http_response &resp)
{
if (!redirect_if_not_primary(req, resp)) {
return;
}

// validate parameters
manual_compaction_info info;
bool ret = json::json_forwarder<manual_compaction_info>::decode(req.body, info);

if (!ret) {
resp.body = "invalid request structure";
resp.status_code = http_status_code::bad_request;
return;
}
if (info.app_name.empty()) {
resp.body = "app_name should not be empty";
resp.status_code = http_status_code::bad_request;
return;
}
if (info.type.empty() || (info.type != "once" && info.type != "periodic")) {
resp.body = "type should ony be 'once' or 'periodic'";
resp.status_code = http_status_code::bad_request;
return;
}
if (info.target_level < -1) {
hycdong marked this conversation as resolved.
Show resolved Hide resolved
resp.body = "target_level should be >= -1";
resp.status_code = http_status_code::bad_request;
return;
}
if (info.bottommost_level_compaction.empty() || (info.bottommost_level_compaction != "skip" &&
info.bottommost_level_compaction != "force")) {
resp.body = "bottommost_level_compaction should ony be 'skip' or 'force'";
resp.status_code = http_status_code::bad_request;
return;
}
if (info.max_concurrent_running_count < 0) {
hycdong marked this conversation as resolved.
Show resolved Hide resolved
resp.body = "max_running_count should be >= 0";
resp.status_code = http_status_code::bad_request;
return;
}
if (info.type == "periodic" && info.trigger_time.empty()) {
resp.body = "trigger_time should not be empty when type is periodic";
resp.status_code = http_status_code::bad_request;
return;
}

// create configuration_update_app_env_request
std::vector<std::string> keys;
std::vector<std::string> values;
Comment on lines +751 to +752
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There maybe problems when keys and values' size or order not matched, how about to use std::map ors std::pair?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In struct configuration_update_app_env_request, key and value is std::vector, so I also use it here, besides, function set_app_envs will check if keys.size is equal to values.size, if keys size is not equal to values size, it will set ERR_INVALID_PARAMETERS in response.

if (info.type == "once") {
keys.emplace_back(replica_envs::MANUAL_COMPACT_ONCE_TARGET_LEVEL);
keys.emplace_back(replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION);
keys.emplace_back(replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME);
} else {
keys.emplace_back(replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL);
keys.emplace_back(replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION);
keys.emplace_back(replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME);
}
values.emplace_back(std::to_string(info.target_level));
values.emplace_back(info.bottommost_level_compaction);
values.emplace_back(info.type == "once" ? std::to_string(dsn_now_s()) : info.trigger_time);
if (info.max_concurrent_running_count > 0) {
keys.emplace_back(replica_envs::MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT);
values.emplace_back(std::to_string(info.max_concurrent_running_count));
}
update_app_env(info.app_name, keys, values, resp);
}

bool meta_http_service::redirect_if_not_primary(const http_request &req, http_response &resp)
{
#ifdef DSN_MOCK_TEST
Expand All @@ -722,5 +792,31 @@ bool meta_http_service::redirect_if_not_primary(const http_request &req, http_re
return false;
}

void meta_http_service::update_app_env(const std::string &app_name,
const std::vector<std::string> &keys,
const std::vector<std::string> &values,
http_response &resp)
{
configuration_update_app_env_request request;
request.app_name = app_name;
request.op = app_env_operation::APP_ENV_OP_SET;
request.__set_keys(keys);
request.__set_values(values);

auto rpc_req = dsn::make_unique<configuration_update_app_env_request>(request);
update_app_env_rpc rpc(std::move(rpc_req), LPC_META_STATE_NORMAL);
_service->_state->set_app_envs(rpc);

auto rpc_resp = rpc.response();
// output as json format
dsn::utils::table_printer tp;
tp.add_row_name_and_data("error", rpc_resp.err.to_string());
tp.add_row_name_and_data("hint_message", rpc_resp.hint_message);
std::ostringstream out;
tp.output(out, dsn::utils::table_printer::output_format::kJsonCompact);
resp.body = out.str();
resp.status_code = http_status_code::ok;
}

} // namespace replication
} // namespace dsn
29 changes: 29 additions & 0 deletions src/meta/meta_http_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,22 @@ namespace replication {
NON_MEMBER_JSON_SERIALIZATION(
start_bulk_load_request, app_name, cluster_name, file_provider_type, remote_root_path)

struct manual_compaction_info
{
std::string app_name;
std::string type; // periodic or once
int32_t target_level; // [-1,num_levels]
std::string bottommost_level_compaction; // skip or force
int32_t max_concurrent_running_count; // 0 means no limit
std::string trigger_time; // only used when the type is periodic
DEFINE_JSON_SERIALIZATION(app_name,
type,
target_level,
bottommost_level_compaction,
max_concurrent_running_count,
trigger_time)
};

class meta_service;
class meta_http_service : public http_service
{
Expand Down Expand Up @@ -76,6 +92,13 @@ class meta_http_service : public http_service
std::placeholders::_1,
std::placeholders::_2),
"ip:port/meta/query_bulk_load?name=temp");
// request body should be manual_compaction_info
register_handler("app/start_compaction",
std::bind(&meta_http_service::start_compaction_handler,
this,
std::placeholders::_1,
std::placeholders::_2),
"ip:port/meta/start_compaction");
}

std::string path() const override { return "meta"; }
Expand All @@ -89,11 +112,17 @@ class meta_http_service : public http_service
void query_duplication_handler(const http_request &req, http_response &resp);
void start_bulk_load_handler(const http_request &req, http_response &resp);
void query_bulk_load_handler(const http_request &req, http_response &resp);
void start_compaction_handler(const http_request &req, http_response &resp);

private:
// set redirect location if current server is not primary
bool redirect_if_not_primary(const http_request &req, http_response &resp);

void update_app_env(const std::string &app_name,
const std::vector<std::string> &keys,
const std::vector<std::string> &values,
http_response &resp);

meta_service *_service;
};

Expand Down
84 changes: 65 additions & 19 deletions src/meta/test/meta_http_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include <iostream>
#include <gtest/gtest.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/http/http_server.h>
Expand Down Expand Up @@ -214,6 +215,15 @@ class meta_bulk_load_http_test : public meta_test_base
return resp.body;
}

http_response test_start_compaction(std::string req_body_json)
{
http_request req;
http_response resp;
req.body = blob::create_from_bytes(std::move(req_body_json));
_mhs->start_compaction_handler(req, resp);
return resp;
}

void mock_bulk_load_context(const bulk_load_status::type &status)
{
auto app = find_app(APP_NAME);
Expand Down Expand Up @@ -250,24 +260,24 @@ TEST_F(meta_bulk_load_http_test, start_bulk_load_request)
http_status_code expected_code;
std::string expected_response_json;
} tests[] = {
{"{\"app\":\"test_bulk_load\",\"cluster_name\":\"onebox\", "
"\"file_provider_type\":\"local_service\", \"remote_root_path\":\"bulk_load_root\"}",
{R"({"app":"test_bulk_load","cluster_name":"onebox","file_provider_type":"local_service","remote_root_path":"bulk_load_root"})",
http_status_code::bad_request,
"invalid request structure"},
{"{\"app_name\":\"test_bulk_load\",\"cluster_name\":\"onebox\", "
"\"file_provider_type\":\"\", \"remote_root_path\":\"bulk_load_root\"}",
{R"({"app_name":"test_bulk_load","cluster_name":"onebox","file_provider_type":"","remote_root_path":"bulk_load_root"})",
http_status_code::bad_request,
"file_provider_type should not be empty"},
{"{\"app_name\":\"test_bulk_load\",\"cluster_name\":\"onebox\", "
"\"file_provider_type\":\"local_service\", \"remote_root_path\":\"bulk_load_root\"}",
{R"({"app_name":"test_bulk_load","cluster_name":"onebox","file_provider_type":"local_service","remote_root_path":"bulk_load_root"})",
http_status_code::ok,
"{\"error\":\"ERR_OK\",\"hint_msg\":\"\"}\n"},
R"({"error":"ERR_OK","hint_msg":""})"},
};

for (const auto &test : tests) {
http_response resp = test_start_bulk_load(test.request_json);
ASSERT_EQ(resp.status_code, test.expected_code);
ASSERT_EQ(resp.body, test.expected_response_json);
std::string expected_json = test.expected_response_json;
if (test.expected_code == http_status_code::ok) {
expected_json += "\n";
}
ASSERT_EQ(resp.body, expected_json);
}
fail::teardown();
}
Expand All @@ -284,21 +294,57 @@ TEST_F(meta_bulk_load_http_test, query_bulk_load_request)
{
std::string app_name;
std::string expected_json;
} tests[] = {{APP_NAME,
"{\"error\":\"ERR_OK\",\"app_status\":\"replication::bulk_load_status::"
"BLS_DOWNLOADING\"}\n"},
{NOT_BULK_LOAD,
"{\"error\":\"ERR_INVALID_STATE\",\"app_status\":\"replication::"
"bulk_load_status::BLS_INVALID\"}\n"},
{NOT_FOUND,
"{\"error\":\"ERR_APP_NOT_EXIST\",\"app_status\":\"replication::bulk_"
"load_status::BLS_INVALID\"}\n"}};
} tests[] = {
{APP_NAME,
R"({"error":"ERR_OK","app_status":"replication::bulk_load_status::BLS_DOWNLOADING"})"},
{NOT_BULK_LOAD,
R"({"error":"ERR_INVALID_STATE","app_status":"replication::bulk_load_status::BLS_INVALID"})"},
{NOT_FOUND,
R"({"error":"ERR_APP_NOT_EXIST","app_status":"replication::bulk_load_status::BLS_INVALID"})"}};
for (const auto &test : tests) {
ASSERT_EQ(test_query_bulk_load(test.app_name), test.expected_json);
ASSERT_EQ(test_query_bulk_load(test.app_name), test.expected_json + "\n");
}

drop_app(NOT_BULK_LOAD);
}

TEST_F(meta_bulk_load_http_test, start_compaction_test)
{
struct start_compaction_test
{
std::string request_json;
http_status_code expected_code;
std::string expected_response_json;
} tests[] = {
{R"({"app_name":"test_bulk_load","type":"once","target_level":-1,"bottommost_level_compaction":"skip","max_concurrent_running_count":"0"})",
http_status_code::bad_request,
"invalid request structure"},
{R"({"app_name":"test_bulk_load","type":"wrong","target_level":-1,"bottommost_level_compaction":"skip","max_concurrent_running_count":0,"trigger_time":""})",
http_status_code::bad_request,
"type should ony be 'once' or 'periodic'"},
{R"({"app_name":"test_bulk_load","type":"once","target_level":-3,"bottommost_level_compaction":"skip","max_concurrent_running_count":0,"trigger_time":""})",
http_status_code::bad_request,
"target_level should be >= -1"},
{R"({"app_name":"test_bulk_load","type":"once","target_level":-1,"bottommost_level_compaction":"wrong","max_concurrent_running_count":0,"trigger_time":""})",
http_status_code::bad_request,
"bottommost_level_compaction should ony be 'skip' or 'force'"},
{R"({"app_name":"test_bulk_load","type":"once","target_level":-1,"bottommost_level_compaction":"skip","max_concurrent_running_count":-2,"trigger_time":""})",
http_status_code::bad_request,
"max_running_count should be >= 0"},
{R"({"app_name":"test_bulk_load","type":"once","target_level":-1,"bottommost_level_compaction":"skip","max_concurrent_running_count":0,"trigger_time":""})",
http_status_code::ok,
R"({"error":"ERR_OK","hint_message":""})"}};

for (const auto &test : tests) {
http_response resp = test_start_compaction(test.request_json);
ASSERT_EQ(resp.status_code, test.expected_code);
std::string expected_json = test.expected_response_json;
if (test.expected_code == http_status_code::ok) {
expected_json += "\n";
}
ASSERT_EQ(resp.body, expected_json);
}
}

} // namespace replication
} // namespace dsn