diff --git a/src/meta/meta_bulk_load_service.h b/src/meta/meta_bulk_load_service.h index e4ed128326..0fd0a4dc1e 100644 --- a/src/meta/meta_bulk_load_service.h +++ b/src/meta/meta_bulk_load_service.h @@ -349,6 +349,7 @@ class bulk_load_service private: friend class bulk_load_service_test; + friend class meta_bulk_load_http_test; meta_service *_meta_svc; server_state *_state; diff --git a/src/meta/meta_http_service.cpp b/src/meta/meta_http_service.cpp index 4a470460c3..5238a2c591 100644 --- a/src/meta/meta_http_service.cpp +++ b/src/meta/meta_http_service.cpp @@ -18,6 +18,7 @@ #include "server_load_balancer.h" #include "server_state.h" #include "meta/duplication/meta_duplication_service.h" +#include "meta/meta_bulk_load_service.h" namespace dsn { namespace replication { @@ -610,6 +611,40 @@ void meta_http_service::query_duplication_handler(const http_request &req, http_ resp.body = duplication_query_response_to_string(rpc_resp); } +void meta_http_service::query_bulk_load_handler(const http_request &req, http_response &resp) +{ + if (!redirect_if_not_primary(req, resp)) { + return; + } + + if (_service->_bulk_load_svc == nullptr) { + resp.body = "bulk load is not enabled"; + resp.status_code = http_status_code::not_found; + return; + } + + auto it = req.query_args.find("name"); + if (it == req.query_args.end()) { + resp.body = "name should not be empty"; + resp.status_code = http_status_code::bad_request; + return; + } + + auto rpc_req = dsn::make_unique(); + rpc_req->app_name = it->second; + query_bulk_load_rpc rpc(std::move(rpc_req), LPC_META_CALLBACK); + _service->_bulk_load_svc->on_query_bulk_load_status(rpc); + auto rpc_resp = rpc.response(); + // output as json format + dsn::utils::table_printer tp; + tp.add_row_name_and_data("error", rpc_resp.err.to_string()); + tp.add_row_name_and_data("app_status", dsn::enum_to_string(rpc_resp.app_status)); + std::ostringstream out; + tp.output(out, dsn::utils::table_printer::output_format::kJsonCompact); + resp.body = out.str(); + resp.status_code = http_status_code::ok; +} + bool meta_http_service::redirect_if_not_primary(const http_request &req, http_response &resp) { #ifdef DSN_MOCK_TEST diff --git a/src/meta/meta_http_service.h b/src/meta/meta_http_service.h index 209eba147b..e355a14b53 100644 --- a/src/meta/meta_http_service.h +++ b/src/meta/meta_http_service.h @@ -59,6 +59,12 @@ class meta_http_service : public http_service std::placeholders::_1, std::placeholders::_2), "ip:port/meta/backup_policy"); + register_handler("app/query_bulk_load", + std::bind(&meta_http_service::query_bulk_load_handler, + this, + std::placeholders::_1, + std::placeholders::_2), + "ip:port/meta/query_bulk_load?name=temp"); } std::string path() const override { return "meta"; } @@ -70,6 +76,7 @@ class meta_http_service : public http_service void get_app_envs_handler(const http_request &req, http_response &resp); void query_backup_policy_handler(const http_request &req, http_response &resp); void query_duplication_handler(const http_request &req, http_response &resp); + void query_bulk_load_handler(const http_request &req, http_response &resp); private: // set redirect location if current server is not primary diff --git a/src/meta/test/meta_http_service_test.cpp b/src/meta/test/meta_http_service_test.cpp index 103363ca53..cf947484bb 100644 --- a/src/meta/test/meta_http_service_test.cpp +++ b/src/meta/test/meta_http_service_test.cpp @@ -10,6 +10,7 @@ #include "meta/meta_service.h" #include "meta_test_base.h" #include "meta_service_test_app.h" +#include "meta/meta_bulk_load_service.h" namespace dsn { namespace replication { @@ -176,5 +177,86 @@ TEST_F(meta_backup_test_base, get_backup_policy) test_get_backup_policy(tests[4].name, tests[4].expected_json, tests[4].http_status); } +class meta_bulk_load_http_test : public meta_test_base +{ +public: + void SetUp() override + { + meta_test_base::SetUp(); + FLAGS_enable_http_server = false; + _mhs = dsn::make_unique(_ms.get()); + create_app(APP_NAME); + } + + void TearDown() override + { + drop_app(APP_NAME); + _mhs = nullptr; + meta_test_base::TearDown(); + } + + std::string test_query_bulk_load(const std::string &app_name) + { + http_request req; + http_response resp; + req.query_args.emplace("name", app_name); + _mhs->query_bulk_load_handler(req, resp); + return resp.body; + } + + void mock_bulk_load_context(const bulk_load_status::type &status) + { + auto app = find_app(APP_NAME); + app->is_bulk_loading = true; + const auto app_id = app->app_id; + bulk_svc()._bulk_load_app_id.insert(app_id); + bulk_svc()._apps_in_progress_count[app_id] = app->partition_count; + bulk_svc()._app_bulk_load_info[app_id].status = status; + for (int i = 0; i < app->partition_count; ++i) { + gpid pid = gpid(app_id, i); + bulk_svc()._partition_bulk_load_info[pid].status = status; + } + } + + void reset_local_bulk_load_states() + { + auto app = find_app(APP_NAME); + bulk_svc().reset_local_bulk_load_states(app->app_id, APP_NAME); + app->is_bulk_loading = false; + } + +protected: + std::unique_ptr _mhs; + std::string APP_NAME = "test_bulk_load"; +}; + +TEST_F(meta_bulk_load_http_test, query_bulk_load_request) +{ + const std::string NOT_BULK_LOAD = "not_bulk_load_app"; + const std::string NOT_FOUND = "app_not_exist"; + + create_app(NOT_BULK_LOAD); + mock_bulk_load_context(bulk_load_status::BLS_DOWNLOADING); + + struct query_bulk_load_test + { + std::string app_name; + std::string expected_json; + } tests[] = {{APP_NAME, + "{\"error\":\"ERR_OK\",\"app_status\":\"replication::bulk_load_status::" + "BLS_DOWNLOADING\"}\n"}, + {NOT_BULK_LOAD, + "{\"error\":\"ERR_INVALID_STATE\",\"app_status\":\"replication::" + "bulk_load_status::BLS_INVALID\"}\n"}, + {NOT_FOUND, + "{\"error\":\"ERR_APP_NOT_EXIST\",\"app_status\":\"replication::bulk_" + "load_status::BLS_INVALID\"}\n"}}; + for (const auto &test : tests) { + ASSERT_EQ(test_query_bulk_load(test.app_name), test.expected_json); + } + + drop_app(NOT_BULK_LOAD); +} + } // namespace replication } // namespace dsn