From 6cbe47a80a5169a0227a21be955a183a8db8c796 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Wed, 16 Dec 2020 18:33:31 +0800 Subject: [PATCH] feat(bulk_load): add start bulk load http interface --- src/meta/meta_bulk_load_service.cpp | 3 ++ src/meta/meta_http_service.cpp | 56 +++++++++++++++++++++++- src/meta/meta_http_service.h | 12 +++++ src/meta/test/meta_http_service_test.cpp | 42 ++++++++++++++++++ 4 files changed, 112 insertions(+), 1 deletion(-) diff --git a/src/meta/meta_bulk_load_service.cpp b/src/meta/meta_bulk_load_service.cpp index 3f0df95602..cbead0afb8 100644 --- a/src/meta/meta_bulk_load_service.cpp +++ b/src/meta/meta_bulk_load_service.cpp @@ -30,6 +30,9 @@ void bulk_load_service::initialize_bulk_load_service() // ThreadPool: THREAD_POOL_META_SERVER void bulk_load_service::on_start_bulk_load(start_bulk_load_rpc rpc) { + FAIL_POINT_INJECT_F("meta_on_start_bulk_load", + [=](dsn::string_view) { rpc.response().err = ERR_OK; }); + const auto &request = rpc.request(); auto &response = rpc.response(); response.err = ERR_OK; diff --git a/src/meta/meta_http_service.cpp b/src/meta/meta_http_service.cpp index fa15b35550..c9552b29ee 100644 --- a/src/meta/meta_http_service.cpp +++ b/src/meta/meta_http_service.cpp @@ -5,7 +5,6 @@ #include #include -#include #include #include #include @@ -611,6 +610,61 @@ void meta_http_service::query_duplication_handler(const http_request &req, http_ resp.body = duplication_query_response_to_string(rpc_resp); } +void meta_http_service::start_bulk_load_handler(const http_request &req, http_response &resp) +{ + if (!redirect_if_not_primary(req, resp)) { + return; + } + + if (_service->_bulk_load_svc == nullptr) { + resp.body = "bulk load is not enabled"; + resp.status_code = http_status_code::not_found; + return; + } + + start_bulk_load_request request; + bool ret = json::json_forwarder::decode(req.body, request); + if (!ret) { + resp.body = "invalid request structure"; + resp.status_code = http_status_code::bad_request; + return; + } + if (request.app_name.empty()) { + resp.body = "app_name should not be empty"; + resp.status_code = http_status_code::bad_request; + return; + } + if (request.cluster_name.empty()) { + resp.body = "cluster_name should not be empty"; + resp.status_code = http_status_code::bad_request; + return; + } + if (request.file_provider_type.empty()) { + resp.body = "file_provider_type should not be empty"; + resp.status_code = http_status_code::bad_request; + return; + } + if (request.remote_root_path.empty()) { + resp.body = "remote_root_path should not be empty"; + resp.status_code = http_status_code::bad_request; + return; + } + + auto rpc_req = dsn::make_unique(request); + start_bulk_load_rpc rpc(std::move(rpc_req), LPC_META_CALLBACK); + _service->_bulk_load_svc->on_start_bulk_load(rpc); + + auto rpc_resp = rpc.response(); + // output as json format + dsn::utils::table_printer tp; + tp.add_row_name_and_data("error", rpc_resp.err.to_string()); + tp.add_row_name_and_data("hint_msg", rpc_resp.hint_msg); + std::ostringstream out; + tp.output(out, dsn::utils::table_printer::output_format::kJsonCompact); + resp.body = out.str(); + resp.status_code = http_status_code::ok; +} + void meta_http_service::query_bulk_load_handler(const http_request &req, http_response &resp) { if (!redirect_if_not_primary(req, resp)) { diff --git a/src/meta/meta_http_service.h b/src/meta/meta_http_service.h index e355a14b53..e857df176f 100644 --- a/src/meta/meta_http_service.h +++ b/src/meta/meta_http_service.h @@ -6,11 +6,15 @@ #include +#include #include namespace dsn { namespace replication { +NON_MEMBER_JSON_SERIALIZATION( + start_bulk_load_request, app_name, cluster_name, file_provider_type, remote_root_path) + class meta_service; class meta_http_service : public http_service { @@ -59,6 +63,13 @@ class meta_http_service : public http_service std::placeholders::_1, std::placeholders::_2), "ip:port/meta/backup_policy"); + // request body should be start_bulk_load_request + register_handler("app/start_bulk_load", + std::bind(&meta_http_service::start_bulk_load_handler, + this, + std::placeholders::_1, + std::placeholders::_2), + "ip:port/meta/start_bulk_load"); register_handler("app/query_bulk_load", std::bind(&meta_http_service::query_bulk_load_handler, this, @@ -76,6 +87,7 @@ class meta_http_service : public http_service void get_app_envs_handler(const http_request &req, http_response &resp); void query_backup_policy_handler(const http_request &req, http_response &resp); void query_duplication_handler(const http_request &req, http_response &resp); + void start_bulk_load_handler(const http_request &req, http_response &resp); void query_bulk_load_handler(const http_request &req, http_response &resp); private: diff --git a/src/meta/test/meta_http_service_test.cpp b/src/meta/test/meta_http_service_test.cpp index cf947484bb..7902e17199 100644 --- a/src/meta/test/meta_http_service_test.cpp +++ b/src/meta/test/meta_http_service_test.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include "meta/meta_http_service.h" #include "meta/meta_service.h" @@ -195,6 +196,15 @@ class meta_bulk_load_http_test : public meta_test_base meta_test_base::TearDown(); } + http_response test_start_bulk_load(std::string req_body_json) + { + http_request req; + http_response resp; + req.body = blob::create_from_bytes(std::move(req_body_json)); + _mhs->start_bulk_load_handler(req, resp); + return resp; + } + std::string test_query_bulk_load(const std::string &app_name) { http_request req; @@ -230,6 +240,38 @@ class meta_bulk_load_http_test : public meta_test_base std::string APP_NAME = "test_bulk_load"; }; +TEST_F(meta_bulk_load_http_test, start_bulk_load_request) +{ + fail::setup(); + fail::cfg("meta_on_start_bulk_load", "return()"); + struct start_bulk_load_test + { + std::string request_json; + http_status_code expected_code; + std::string expected_response_json; + } tests[] = { + {"{\"app\":\"test_bulk_load\",\"cluster_name\":\"onebox\", " + "\"file_provider_type\":\"local_service\", \"remote_root_path\":\"bulk_load_root\"}", + http_status_code::bad_request, + "invalid request structure"}, + {"{\"app_name\":\"test_bulk_load\",\"cluster_name\":\"onebox\", " + "\"file_provider_type\":\"\", \"remote_root_path\":\"bulk_load_root\"}", + http_status_code::bad_request, + "file_provider_type should not be empty"}, + {"{\"app_name\":\"test_bulk_load\",\"cluster_name\":\"onebox\", " + "\"file_provider_type\":\"local_service\", \"remote_root_path\":\"bulk_load_root\"}", + http_status_code::ok, + "{\"error\":\"ERR_OK\",\"hint_msg\":\"\"}\n"}, + }; + + for (const auto &test : tests) { + http_response resp = test_start_bulk_load(test.request_json); + ASSERT_EQ(resp.status_code, test.expected_code); + ASSERT_EQ(resp.body, test.expected_response_json); + } + fail::teardown(); +} + TEST_F(meta_bulk_load_http_test, query_bulk_load_request) { const std::string NOT_BULK_LOAD = "not_bulk_load_app";