Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(bulk_load): add start bulk load http interface #693

Merged
merged 4 commits into from
Dec 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/meta/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ void bulk_load_service::initialize_bulk_load_service()
// ThreadPool: THREAD_POOL_META_SERVER
void bulk_load_service::on_start_bulk_load(start_bulk_load_rpc rpc)
{
FAIL_POINT_INJECT_F("meta_on_start_bulk_load",
[=](dsn::string_view) { rpc.response().err = ERR_OK; });

const auto &request = rpc.request();
auto &response = rpc.response();
response.err = ERR_OK;
Expand Down
56 changes: 55 additions & 1 deletion src/meta/meta_http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include <string>

#include <dsn/c/api_layer1.h>
#include <dsn/cpp/json_helper.h>
#include <dsn/cpp/serialization_helper/dsn.layer2_types.h>
#include <dsn/dist/replication/replication_types.h>
#include <dsn/dist/replication/duplication_common.h>
Expand Down Expand Up @@ -611,6 +610,61 @@ void meta_http_service::query_duplication_handler(const http_request &req, http_
resp.body = duplication_query_response_to_string(rpc_resp);
}

void meta_http_service::start_bulk_load_handler(const http_request &req, http_response &resp)
{
if (!redirect_if_not_primary(req, resp)) {
return;
}

if (_service->_bulk_load_svc == nullptr) {
resp.body = "bulk load is not enabled";
resp.status_code = http_status_code::not_found;
return;
}

start_bulk_load_request request;
bool ret = json::json_forwarder<start_bulk_load_request>::decode(req.body, request);
if (!ret) {
resp.body = "invalid request structure";
resp.status_code = http_status_code::bad_request;
return;
}
if (request.app_name.empty()) {
resp.body = "app_name should not be empty";
resp.status_code = http_status_code::bad_request;
return;
}
if (request.cluster_name.empty()) {
resp.body = "cluster_name should not be empty";
resp.status_code = http_status_code::bad_request;
return;
}
if (request.file_provider_type.empty()) {
resp.body = "file_provider_type should not be empty";
resp.status_code = http_status_code::bad_request;
return;
}
if (request.remote_root_path.empty()) {
resp.body = "remote_root_path should not be empty";
resp.status_code = http_status_code::bad_request;
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this simple args validate put client side?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In current implmetation, client and server both will validate arguments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will implement HTTP argument validation in later PR. I actually did it in my staging branch. But haven't ready for review yet. Got no time to make it steady.


auto rpc_req = dsn::make_unique<start_bulk_load_request>(request);
start_bulk_load_rpc rpc(std::move(rpc_req), LPC_META_CALLBACK);
_service->_bulk_load_svc->on_start_bulk_load(rpc);

auto rpc_resp = rpc.response();
// output as json format
dsn::utils::table_printer tp;
tp.add_row_name_and_data("error", rpc_resp.err.to_string());
tp.add_row_name_and_data("hint_msg", rpc_resp.hint_msg);
std::ostringstream out;
tp.output(out, dsn::utils::table_printer::output_format::kJsonCompact);
resp.body = out.str();
resp.status_code = http_status_code::ok;
}

void meta_http_service::query_bulk_load_handler(const http_request &req, http_response &resp)
{
if (!redirect_if_not_primary(req, resp)) {
Expand Down
12 changes: 12 additions & 0 deletions src/meta/meta_http_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@

#include <algorithm>

#include <dsn/cpp/json_helper.h>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can move this line to the corresponding cpp file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This update is for further pull request, I will add structure in .h file using json serilization.

Copy link
Contributor

@levy5307 levy5307 Dec 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think you can add this line in the future pull request, because it's meaningless for this pr.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is still useful for this pull request, I use NON_MEMBER_JSON_SERIALIZATION marco to make structure start_bulk_load_request json-serializable. As your comment shows, I can add this in meta_http_service.cpp, but considering it will finally move into .h file, I did it in this pull request.

#include <dsn/http/http_server.h>

namespace dsn {
namespace replication {

NON_MEMBER_JSON_SERIALIZATION(
start_bulk_load_request, app_name, cluster_name, file_provider_type, remote_root_path)

class meta_service;
class meta_http_service : public http_service
{
Expand Down Expand Up @@ -59,6 +63,13 @@ class meta_http_service : public http_service
std::placeholders::_1,
std::placeholders::_2),
"ip:port/meta/backup_policy");
// request body should be start_bulk_load_request
register_handler("app/start_bulk_load",
std::bind(&meta_http_service::start_bulk_load_handler,
this,
std::placeholders::_1,
std::placeholders::_2),
"ip:port/meta/start_bulk_load");
register_handler("app/query_bulk_load",
std::bind(&meta_http_service::query_bulk_load_handler,
this,
Expand All @@ -76,6 +87,7 @@ class meta_http_service : public http_service
void get_app_envs_handler(const http_request &req, http_response &resp);
void query_backup_policy_handler(const http_request &req, http_response &resp);
void query_duplication_handler(const http_request &req, http_response &resp);
void start_bulk_load_handler(const http_request &req, http_response &resp);
void query_bulk_load_handler(const http_request &req, http_response &resp);

private:
Expand Down
42 changes: 42 additions & 0 deletions src/meta/test/meta_http_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <gtest/gtest.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/http/http_server.h>
#include <dsn/utility/fail_point.h>

#include "meta/meta_http_service.h"
#include "meta/meta_service.h"
Expand Down Expand Up @@ -195,6 +196,15 @@ class meta_bulk_load_http_test : public meta_test_base
meta_test_base::TearDown();
}

http_response test_start_bulk_load(std::string req_body_json)
{
http_request req;
http_response resp;
req.body = blob::create_from_bytes(std::move(req_body_json));
_mhs->start_bulk_load_handler(req, resp);
return resp;
}

std::string test_query_bulk_load(const std::string &app_name)
{
http_request req;
Expand Down Expand Up @@ -230,6 +240,38 @@ class meta_bulk_load_http_test : public meta_test_base
std::string APP_NAME = "test_bulk_load";
};

TEST_F(meta_bulk_load_http_test, start_bulk_load_request)
{
fail::setup();
fail::cfg("meta_on_start_bulk_load", "return()");
struct start_bulk_load_test
{
std::string request_json;
http_status_code expected_code;
std::string expected_response_json;
} tests[] = {
{"{\"app\":\"test_bulk_load\",\"cluster_name\":\"onebox\", "
"\"file_provider_type\":\"local_service\", \"remote_root_path\":\"bulk_load_root\"}",
http_status_code::bad_request,
"invalid request structure"},
{"{\"app_name\":\"test_bulk_load\",\"cluster_name\":\"onebox\", "
"\"file_provider_type\":\"\", \"remote_root_path\":\"bulk_load_root\"}",
http_status_code::bad_request,
"file_provider_type should not be empty"},
{"{\"app_name\":\"test_bulk_load\",\"cluster_name\":\"onebox\", "
"\"file_provider_type\":\"local_service\", \"remote_root_path\":\"bulk_load_root\"}",
http_status_code::ok,
"{\"error\":\"ERR_OK\",\"hint_msg\":\"\"}\n"},
};

for (const auto &test : tests) {
http_response resp = test_start_bulk_load(test.request_json);
ASSERT_EQ(resp.status_code, test.expected_code);
ASSERT_EQ(resp.body, test.expected_response_json);
}
fail::teardown();
}

TEST_F(meta_bulk_load_http_test, query_bulk_load_request)
{
const std::string NOT_BULK_LOAD = "not_bulk_load_app";
Expand Down