From 39bec0d97ad5260af236d620445ec1b78252458a Mon Sep 17 00:00:00 2001 From: heyuchen Date: Fri, 18 Dec 2020 15:30:58 +0800 Subject: [PATCH 1/5] feat(bulk_load): add query replica status http interface --- src/replica/replica.cpp | 27 ++++++++++++++++++ src/replica/replica.h | 12 ++++++++ src/replica/replica_http_service.cpp | 42 ++++++++++++++++++++++++++++ src/replica/replica_http_service.h | 7 +++++ src/replica/replica_stub.cpp | 12 ++++++++ src/replica/replica_stub.h | 5 ++++ src/replica/test/replica_test.cpp | 32 +++++++++++++++++++++ 7 files changed, 137 insertions(+) diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp index 208942eed0..866a784f76 100644 --- a/src/replica/replica.cpp +++ b/src/replica/replica.cpp @@ -443,6 +443,33 @@ std::string replica::query_compact_state() const return _app->query_compact_state(); } +const char *manual_compaction_status_to_string(manual_compaction_status status) +{ + switch (status) { + case CompactionFinish: + return "CompactionFinish"; + case CompactionRunning: + return "CompactionRunning"; + case CompactionQueue: + return "CompactionQueue"; + default: + dassert(false, ""); + } + return "wrongCompactionStatus"; +} + +manual_compaction_status replica::get_compact_status() const +{ + std::string compact_state = query_compact_state(); + if (compact_state.find("recent start at") != std::string::npos) { + return CompactionRunning; + } else if (compact_state.find("recent enqueue at") != std::string::npos) { + return CompactionQueue; + } else { + return CompactionFinish; + } +} + // Replicas on the server which serves for the same table will share the same perf-counter. // For example counter `table.level.RPC_RRDB_RRDB_MULTI_PUT.latency(ns)@test_table` is shared by // all the replicas for `test_table`. diff --git a/src/replica/replica.h b/src/replica/replica.h index 84529ae086..3a4cb60d27 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -78,6 +78,14 @@ namespace test { class test_checker; } +enum manual_compaction_status +{ + CompactionFinish = 0, + CompactionRunning, + CompactionQueue +}; +const char *manual_compaction_status_to_string(manual_compaction_status status); + class replica : public serverlet, public ref_counter, public replica_base { public: @@ -392,8 +400,12 @@ class replica : public serverlet, public ref_counter, public replica_ba void update_restore_progress(uint64_t f_size); + // Used for remote command std::string query_compact_state() const; + // Used for http interface + manual_compaction_status get_compact_status() const; + void init_table_level_latency_counters(); void on_detect_hotkey(const detect_hotkey_request &req, /*out*/ detect_hotkey_response &resp); diff --git a/src/replica/replica_http_service.cpp b/src/replica/replica_http_service.cpp index 7396974e1f..b0626b80ae 100644 --- a/src/replica/replica_http_service.cpp +++ b/src/replica/replica_http_service.cpp @@ -4,6 +4,7 @@ #include #include +#include #include "replica_http_service.h" #include "duplication/duplication_sync_timer.h" @@ -55,5 +56,46 @@ void replica_http_service::query_duplication_handler(const http_request &req, ht resp.body = json.dump(); } +void replica_http_service::query_compaction_handler(const http_request &req, http_response &resp) +{ + auto it = req.query_args.find("app_id"); + if (it == req.query_args.end()) { + resp.body = "app_id should not be empty"; + resp.status_code = http_status_code::bad_request; + return; + } + + int32_t app_id = -1; + if (!buf2int32(it->second, app_id) || app_id < 0) { + resp.body = fmt::format("invalid app_id={}", it->second); + resp.status_code = http_status_code::bad_request; + return; + } + + std::unordered_map partition_compaction_status; + _stub->query_app_compact_status(app_id, partition_compaction_status); + + int32_t running_count = 0; + int32_t queue_count = 0; + int32_t finish_count = 0; + for (const auto &kv : partition_compaction_status) { + if (kv.second == CompactionRunning) { + running_count++; + } else if (kv.second == CompactionQueue) { + queue_count++; + } else if (kv.second == CompactionFinish) { + finish_count++; + } + } + dsn::utils::table_printer tp("status"); + tp.add_row_name_and_data(manual_compaction_status_to_string(CompactionRunning), running_count); + tp.add_row_name_and_data(manual_compaction_status_to_string(CompactionQueue), queue_count); + tp.add_row_name_and_data(manual_compaction_status_to_string(CompactionFinish), finish_count); + std::ostringstream out; + tp.output(out, dsn::utils::table_printer::output_format::kJsonCompact); + resp.body = out.str(); + resp.status_code = http_status_code::ok; +} + } // namespace replication } // namespace dsn diff --git a/src/replica/replica_http_service.h b/src/replica/replica_http_service.h index 8f191e6e0b..ec02125447 100644 --- a/src/replica/replica_http_service.h +++ b/src/replica/replica_http_service.h @@ -20,11 +20,18 @@ class replica_http_service : public http_service std::placeholders::_1, std::placeholders::_2), "ip:port/replica/duplication?appid="); + register_handler("compaction", + std::bind(&replica_http_service::query_compaction_handler, + this, + std::placeholders::_1, + std::placeholders::_2), + "ip:port/replica/compaction?app_id="); } std::string path() const override { return "replica"; } void query_duplication_handler(const http_request &req, http_response &resp); + void query_compaction_handler(const http_request &req, http_response &resp); private: replica_stub *_stub; diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index a9e709d775..f5f1d201c9 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -2808,5 +2808,17 @@ void replica_stub::on_detect_hotkey(detect_hotkey_rpc rpc) response.err_hint = fmt::format("not find the replica {} \n", request.pid); } } + +void replica_stub::query_app_compact_status( + int32_t app_id, std::unordered_map &status) +{ + zauto_read_lock l(_replicas_lock); + for (auto it = _replicas.begin(); it != _replicas.end(); ++it) { + if (it->first.get_app_id() == app_id) { + status[it->first] = it->second->get_compact_status(); + } + } +} + } // namespace replication } // namespace dsn diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h index 966d08127a..55e9bfeb00 100644 --- a/src/replica/replica_stub.h +++ b/src/replica/replica_stub.h @@ -217,6 +217,11 @@ class replica_stub : public serverlet, public ref_counter void on_query_disk_info(query_disk_info_rpc rpc); void on_disk_migrate(replica_disk_migrate_rpc rpc); + // query partitions compact status by app_id + void + query_app_compact_status(int32_t app_id, + /*out*/ std::unordered_map &status); + private: enum replica_node_state { diff --git a/src/replica/test/replica_test.cpp b/src/replica/test/replica_test.cpp index d4208c8dc1..e7d812b4a8 100644 --- a/src/replica/test/replica_test.cpp +++ b/src/replica/test/replica_test.cpp @@ -7,6 +7,7 @@ #include #include "replica_test_base.h" #include +#include "replica/replica_http_service.h" namespace dsn { namespace replication { @@ -21,6 +22,7 @@ class replica_test : public replica_test_base public: void SetUp() override { + FLAGS_enable_http_server = false; stub->install_perf_counters(); mock_app_info(); _mock_replica = stub->generate_replica(_app_info, pid, partition_status::PS_PRIMARY, 1); @@ -81,5 +83,35 @@ TEST_F(replica_test, backup_request_qps) ASSERT_GT(get_table_level_backup_request_qps(), 0); } +TEST_F(replica_test, query_compaction_test) +{ + replica_http_service http_svc(stub.get()); + struct query_compaction_test + { + std::string app_id; + http_status_code expected_code; + std::string expected_response_json; + } tests[] = {{"", http_status_code::bad_request, "app_id should not be empty"}, + {"xxx", http_status_code::bad_request, "invalid app_id=xxx"}, + {"2", + http_status_code::ok, + "{\"status\":{\"CompactionRunning\":\"0\",\"CompactionQueue\":" + "\"0\",\"CompactionFinish\":\"1\"}}\n"}, + {"4", + http_status_code::ok, + "{\"status\":{\"CompactionRunning\":\"0\",\"CompactionQueue\":" + "\"0\",\"CompactionFinish\":\"0\"}}\n"}}; + for (const auto &test : tests) { + http_request req; + http_response resp; + if (!test.app_id.empty()) { + req.query_args["app_id"] = test.app_id; + } + http_svc.query_compaction_handler(req, resp); + ASSERT_EQ(resp.status_code, test.expected_code); + ASSERT_EQ(resp.body, test.expected_response_json); + } +} + } // namespace replication } // namespace dsn From cfbe15a7405e5ed18d229ed9992478134a8f578d Mon Sep 17 00:00:00 2001 From: heyuchen Date: Wed, 23 Dec 2020 16:11:47 +0800 Subject: [PATCH 2/5] fix by code review --- src/replica/replica.cpp | 23 ++++++++++++++++------- src/replica/replica.h | 6 +++--- src/replica/replica_http_service.cpp | 12 ++++++------ src/replica/test/replica_test.cpp | 25 ++++++++++++++----------- 4 files changed, 39 insertions(+), 27 deletions(-) diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp index 866a784f76..bd49b119ea 100644 --- a/src/replica/replica.cpp +++ b/src/replica/replica.cpp @@ -446,27 +446,36 @@ std::string replica::query_compact_state() const const char *manual_compaction_status_to_string(manual_compaction_status status) { switch (status) { - case CompactionFinish: + case FINISH: return "CompactionFinish"; - case CompactionRunning: + case RUNNING: return "CompactionRunning"; - case CompactionQueue: + case QUEUE: return "CompactionQueue"; default: dassert(false, ""); + __builtin_unreachable(); } - return "wrongCompactionStatus"; } manual_compaction_status replica::get_compact_status() const { std::string compact_state = query_compact_state(); + // query_compact_state will return a message like: + // Case1. last finish at [-] + // - partition is not manual compaction + // Case2. last finish at [timestamp], last used {time_used} ms + // - partition manual compaction finished + // Case3. last finish at [-], recent enqueue at [timestamp] + // - partition is in manual compaction queue + // Case4. last finish at [-], recent enqueue at [timestamp], recent start at [timestamp] + // - partition is running manual compaction if (compact_state.find("recent start at") != std::string::npos) { - return CompactionRunning; + return RUNNING; } else if (compact_state.find("recent enqueue at") != std::string::npos) { - return CompactionQueue; + return QUEUE; } else { - return CompactionFinish; + return FINISH; } } diff --git a/src/replica/replica.h b/src/replica/replica.h index 3a4cb60d27..77d2c082ac 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -80,9 +80,9 @@ class test_checker; enum manual_compaction_status { - CompactionFinish = 0, - CompactionRunning, - CompactionQueue + FINISH = 0, + RUNNING, + QUEUE }; const char *manual_compaction_status_to_string(manual_compaction_status status); diff --git a/src/replica/replica_http_service.cpp b/src/replica/replica_http_service.cpp index b0626b80ae..ebd31ac05f 100644 --- a/src/replica/replica_http_service.cpp +++ b/src/replica/replica_http_service.cpp @@ -79,18 +79,18 @@ void replica_http_service::query_compaction_handler(const http_request &req, htt int32_t queue_count = 0; int32_t finish_count = 0; for (const auto &kv : partition_compaction_status) { - if (kv.second == CompactionRunning) { + if (kv.second == RUNNING) { running_count++; - } else if (kv.second == CompactionQueue) { + } else if (kv.second == QUEUE) { queue_count++; - } else if (kv.second == CompactionFinish) { + } else if (kv.second == FINISH) { finish_count++; } } dsn::utils::table_printer tp("status"); - tp.add_row_name_and_data(manual_compaction_status_to_string(CompactionRunning), running_count); - tp.add_row_name_and_data(manual_compaction_status_to_string(CompactionQueue), queue_count); - tp.add_row_name_and_data(manual_compaction_status_to_string(CompactionFinish), finish_count); + tp.add_row_name_and_data(manual_compaction_status_to_string(RUNNING), running_count); + tp.add_row_name_and_data(manual_compaction_status_to_string(QUEUE), queue_count); + tp.add_row_name_and_data(manual_compaction_status_to_string(FINISH), finish_count); std::ostringstream out; tp.output(out, dsn::utils::table_printer::output_format::kJsonCompact); resp.body = out.str(); diff --git a/src/replica/test/replica_test.cpp b/src/replica/test/replica_test.cpp index e7d812b4a8..ee15eb2776 100644 --- a/src/replica/test/replica_test.cpp +++ b/src/replica/test/replica_test.cpp @@ -91,16 +91,15 @@ TEST_F(replica_test, query_compaction_test) std::string app_id; http_status_code expected_code; std::string expected_response_json; - } tests[] = {{"", http_status_code::bad_request, "app_id should not be empty"}, - {"xxx", http_status_code::bad_request, "invalid app_id=xxx"}, - {"2", - http_status_code::ok, - "{\"status\":{\"CompactionRunning\":\"0\",\"CompactionQueue\":" - "\"0\",\"CompactionFinish\":\"1\"}}\n"}, - {"4", - http_status_code::ok, - "{\"status\":{\"CompactionRunning\":\"0\",\"CompactionQueue\":" - "\"0\",\"CompactionFinish\":\"0\"}}\n"}}; + } tests[] = { + {"", http_status_code::bad_request, "app_id should not be empty"}, + {"xxx", http_status_code::bad_request, "invalid app_id=xxx"}, + {"2", + http_status_code::ok, + R"({"status":{"CompactionRunning":"0","CompactionQueue":"0","CompactionFinish":"1"}})"}, + {"4", + http_status_code::ok, + R"({"status":{"CompactionRunning":"0","CompactionQueue":"0","CompactionFinish":"0"}})"}}; for (const auto &test : tests) { http_request req; http_response resp; @@ -109,7 +108,11 @@ TEST_F(replica_test, query_compaction_test) } http_svc.query_compaction_handler(req, resp); ASSERT_EQ(resp.status_code, test.expected_code); - ASSERT_EQ(resp.body, test.expected_response_json); + std::string expected_json = test.expected_response_json; + if (test.expected_code == http_status_code::ok) { + expected_json += "\n"; + } + ASSERT_EQ(resp.body, expected_json); } } From c571e0febcb7db802728274e9fe3b5b04bcc2a53 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Fri, 25 Dec 2020 11:58:08 +0800 Subject: [PATCH 3/5] small update --- src/replica/test/replica_test.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/replica/test/replica_test.cpp b/src/replica/test/replica_test.cpp index 62c63a5b09..9157100552 100644 --- a/src/replica/test/replica_test.cpp +++ b/src/replica/test/replica_test.cpp @@ -91,7 +91,6 @@ TEST_F(replica_test, query_data_version_test) std::string app_id; http_status_code expected_code; std::string expected_response_json; - } tests[] = {{"", http_status_code::bad_request, "app_id should not be empty"}, {"wrong", http_status_code::bad_request, "invalid app_id=wrong"}, {"2", @@ -100,7 +99,6 @@ TEST_F(replica_test, query_data_version_test) {"4", http_status_code::ok, R"({"error":"ERR_OBJECT_NOT_FOUND","data_version":"0"})"}}; - for (const auto &test : tests) { http_request req; http_response resp; From 046bb4a19a85ab0c88292381f3603b0e442a39ee Mon Sep 17 00:00:00 2001 From: HeYuchen <377710264@qq.com> Date: Fri, 25 Dec 2020 14:20:49 +0800 Subject: [PATCH 4/5] Update src/replica/replica.h Co-authored-by: Wu Tao --- src/replica/replica.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/replica/replica.h b/src/replica/replica.h index 2adf2457b4..7b8266fce5 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -401,6 +401,7 @@ class replica : public serverlet, public ref_counter, public replica_ba void update_restore_progress(uint64_t f_size); // Used for remote command + // TODO: remove this interface and only expose the http interface std::string query_compact_state() const; // Used for http interface From 794cec9dd91ec34611ed3d50f1fd0ba8f06da0a7 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Fri, 25 Dec 2020 15:24:47 +0800 Subject: [PATCH 5/5] fix by code review --- src/replica/replica.cpp | 2 +- src/replica/replica.h | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp index 3a4a9e9f18..dd41e1de71 100644 --- a/src/replica/replica.cpp +++ b/src/replica/replica.cpp @@ -453,7 +453,7 @@ const char *manual_compaction_status_to_string(manual_compaction_status status) case kQueue: return "CompactionQueue"; default: - dassert(false, ""); + dassert(false, "invalid status({})", status); __builtin_unreachable(); } } diff --git a/src/replica/replica.h b/src/replica/replica.h index 7b8266fce5..c86f4e68f0 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -402,6 +402,7 @@ class replica : public serverlet, public ref_counter, public replica_ba // Used for remote command // TODO: remove this interface and only expose the http interface + // now this remote commend will be used by `scripts/pegasus_manual_compact.sh` std::string query_compact_state() const; // Used for http interface