Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat(bulk-load): add query bulk load http interface (#687)
Browse files Browse the repository at this point in the history
  • Loading branch information
hycdong committed Dec 28, 2020
1 parent 630a2ef commit 2191e17
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/meta/meta_bulk_load_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ class bulk_load_service

private:
friend class bulk_load_service_test;
friend class meta_bulk_load_http_test;

meta_service *_meta_svc;
server_state *_state;
Expand Down
35 changes: 35 additions & 0 deletions src/meta/meta_http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "server_load_balancer.h"
#include "server_state.h"
#include "meta/duplication/meta_duplication_service.h"
#include "meta/meta_bulk_load_service.h"

namespace dsn {
namespace replication {
Expand Down Expand Up @@ -610,6 +611,40 @@ void meta_http_service::query_duplication_handler(const http_request &req, http_
resp.body = duplication_query_response_to_string(rpc_resp);
}

void meta_http_service::query_bulk_load_handler(const http_request &req, http_response &resp)
{
if (!redirect_if_not_primary(req, resp)) {
return;
}

if (_service->_bulk_load_svc == nullptr) {
resp.body = "bulk load is not enabled";
resp.status_code = http_status_code::not_found;
return;
}

auto it = req.query_args.find("name");
if (it == req.query_args.end()) {
resp.body = "name should not be empty";
resp.status_code = http_status_code::bad_request;
return;
}

auto rpc_req = dsn::make_unique<query_bulk_load_request>();
rpc_req->app_name = it->second;
query_bulk_load_rpc rpc(std::move(rpc_req), LPC_META_CALLBACK);
_service->_bulk_load_svc->on_query_bulk_load_status(rpc);
auto rpc_resp = rpc.response();
// output as json format
dsn::utils::table_printer tp;
tp.add_row_name_and_data("error", rpc_resp.err.to_string());
tp.add_row_name_and_data("app_status", dsn::enum_to_string(rpc_resp.app_status));
std::ostringstream out;
tp.output(out, dsn::utils::table_printer::output_format::kJsonCompact);
resp.body = out.str();
resp.status_code = http_status_code::ok;
}

bool meta_http_service::redirect_if_not_primary(const http_request &req, http_response &resp)
{
#ifdef DSN_MOCK_TEST
Expand Down
7 changes: 7 additions & 0 deletions src/meta/meta_http_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ class meta_http_service : public http_service
std::placeholders::_1,
std::placeholders::_2),
"ip:port/meta/backup_policy");
register_handler("app/query_bulk_load",
std::bind(&meta_http_service::query_bulk_load_handler,
this,
std::placeholders::_1,
std::placeholders::_2),
"ip:port/meta/query_bulk_load?name=temp");
}

std::string path() const override { return "meta"; }
Expand All @@ -70,6 +76,7 @@ class meta_http_service : public http_service
void get_app_envs_handler(const http_request &req, http_response &resp);
void query_backup_policy_handler(const http_request &req, http_response &resp);
void query_duplication_handler(const http_request &req, http_response &resp);
void query_bulk_load_handler(const http_request &req, http_response &resp);

private:
// set redirect location if current server is not primary
Expand Down
82 changes: 82 additions & 0 deletions src/meta/test/meta_http_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "meta/meta_service.h"
#include "meta_test_base.h"
#include "meta_service_test_app.h"
#include "meta/meta_bulk_load_service.h"

namespace dsn {
namespace replication {
Expand Down Expand Up @@ -176,5 +177,86 @@ TEST_F(meta_backup_test_base, get_backup_policy)
test_get_backup_policy(tests[4].name, tests[4].expected_json, tests[4].http_status);
}

class meta_bulk_load_http_test : public meta_test_base
{
public:
void SetUp() override
{
meta_test_base::SetUp();
FLAGS_enable_http_server = false;
_mhs = dsn::make_unique<meta_http_service>(_ms.get());
create_app(APP_NAME);
}

void TearDown() override
{
drop_app(APP_NAME);
_mhs = nullptr;
meta_test_base::TearDown();
}

std::string test_query_bulk_load(const std::string &app_name)
{
http_request req;
http_response resp;
req.query_args.emplace("name", app_name);
_mhs->query_bulk_load_handler(req, resp);
return resp.body;
}

void mock_bulk_load_context(const bulk_load_status::type &status)
{
auto app = find_app(APP_NAME);
app->is_bulk_loading = true;
const auto app_id = app->app_id;
bulk_svc()._bulk_load_app_id.insert(app_id);
bulk_svc()._apps_in_progress_count[app_id] = app->partition_count;
bulk_svc()._app_bulk_load_info[app_id].status = status;
for (int i = 0; i < app->partition_count; ++i) {
gpid pid = gpid(app_id, i);
bulk_svc()._partition_bulk_load_info[pid].status = status;
}
}

void reset_local_bulk_load_states()
{
auto app = find_app(APP_NAME);
bulk_svc().reset_local_bulk_load_states(app->app_id, APP_NAME);
app->is_bulk_loading = false;
}

protected:
std::unique_ptr<meta_http_service> _mhs;
std::string APP_NAME = "test_bulk_load";
};

TEST_F(meta_bulk_load_http_test, query_bulk_load_request)
{
const std::string NOT_BULK_LOAD = "not_bulk_load_app";
const std::string NOT_FOUND = "app_not_exist";

create_app(NOT_BULK_LOAD);
mock_bulk_load_context(bulk_load_status::BLS_DOWNLOADING);

struct query_bulk_load_test
{
std::string app_name;
std::string expected_json;
} tests[] = {{APP_NAME,
"{\"error\":\"ERR_OK\",\"app_status\":\"replication::bulk_load_status::"
"BLS_DOWNLOADING\"}\n"},
{NOT_BULK_LOAD,
"{\"error\":\"ERR_INVALID_STATE\",\"app_status\":\"replication::"
"bulk_load_status::BLS_INVALID\"}\n"},
{NOT_FOUND,
"{\"error\":\"ERR_APP_NOT_EXIST\",\"app_status\":\"replication::bulk_"
"load_status::BLS_INVALID\"}\n"}};
for (const auto &test : tests) {
ASSERT_EQ(test_query_bulk_load(test.app_name), test.expected_json);
}

drop_app(NOT_BULK_LOAD);
}

} // namespace replication
} // namespace dsn

0 comments on commit 2191e17

Please sign in to comment.