From 2dd3b75c49590843a258711b7356ded408981097 Mon Sep 17 00:00:00 2001 From: HeYuchen <377710264@qq.com> Date: Mon, 28 Dec 2020 11:01:16 +0800 Subject: [PATCH] feat(bulk_load): add query compaction status http interface (#696) --- src/replica/replica.cpp | 36 ++++++++++++++++++++++++ src/replica/replica.h | 14 ++++++++++ src/replica/replica_http_service.cpp | 41 ++++++++++++++++++++++++++++ src/replica/replica_http_service.h | 7 +++++ src/replica/replica_stub.cpp | 11 ++++++++ src/replica/replica_stub.h | 5 ++++ src/replica/test/replica_test.cpp | 33 ++++++++++++++++++++++ 7 files changed, 147 insertions(+) diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp index 1c6a060042..dd41e1de71 100644 --- a/src/replica/replica.cpp +++ b/src/replica/replica.cpp @@ -443,6 +443,42 @@ std::string replica::query_compact_state() const return _app->query_compact_state(); } +const char *manual_compaction_status_to_string(manual_compaction_status status) +{ + switch (status) { + case kFinish: + return "CompactionFinish"; + case kRunning: + return "CompactionRunning"; + case kQueue: + return "CompactionQueue"; + default: + dassert(false, "invalid status({})", status); + __builtin_unreachable(); + } +} + +manual_compaction_status replica::get_compact_status() const +{ + std::string compact_state = query_compact_state(); + // query_compact_state will return a message like: + // Case1. last finish at [-] + // - partition is not manual compaction + // Case2. last finish at [timestamp], last used {time_used} ms + // - partition manual compaction finished + // Case3. last finish at [-], recent enqueue at [timestamp] + // - partition is in manual compaction queue + // Case4. last finish at [-], recent enqueue at [timestamp], recent start at [timestamp] + // - partition is running manual compaction + if (compact_state.find("recent start at") != std::string::npos) { + return kRunning; + } else if (compact_state.find("recent enqueue at") != std::string::npos) { + return kQueue; + } else { + return kFinish; + } +} + // Replicas on the server which serves for the same table will share the same perf-counter. // For example counter `table.level.RPC_RRDB_RRDB_MULTI_PUT.latency(ns)@test_table` is shared by // all the replicas for `test_table`. diff --git a/src/replica/replica.h b/src/replica/replica.h index c501acce87..c86f4e68f0 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -78,6 +78,14 @@ namespace test { class test_checker; } +enum manual_compaction_status +{ + kFinish = 0, + kRunning, + kQueue +}; +const char *manual_compaction_status_to_string(manual_compaction_status status); + class replica : public serverlet, public ref_counter, public replica_base { public: @@ -392,8 +400,14 @@ class replica : public serverlet, public ref_counter, public replica_ba void update_restore_progress(uint64_t f_size); + // Used for remote command + // TODO: remove this interface and only expose the http interface + // now this remote commend will be used by `scripts/pegasus_manual_compact.sh` std::string query_compact_state() const; + // Used for http interface + manual_compaction_status get_compact_status() const; + void init_table_level_latency_counters(); void on_detect_hotkey(const detect_hotkey_request &req, /*out*/ detect_hotkey_response &resp); diff --git a/src/replica/replica_http_service.cpp b/src/replica/replica_http_service.cpp index 8ebb885f27..d4b985eb03 100644 --- a/src/replica/replica_http_service.cpp +++ b/src/replica/replica_http_service.cpp @@ -85,5 +85,46 @@ void replica_http_service::query_app_data_version_handler(const http_request &re resp.status_code = http_status_code::ok; } +void replica_http_service::query_compaction_handler(const http_request &req, http_response &resp) +{ + auto it = req.query_args.find("app_id"); + if (it == req.query_args.end()) { + resp.body = "app_id should not be empty"; + resp.status_code = http_status_code::bad_request; + return; + } + + int32_t app_id = -1; + if (!buf2int32(it->second, app_id) || app_id < 0) { + resp.body = fmt::format("invalid app_id={}", it->second); + resp.status_code = http_status_code::bad_request; + return; + } + + std::unordered_map partition_compaction_status; + _stub->query_app_compact_status(app_id, partition_compaction_status); + + int32_t running_count = 0; + int32_t queue_count = 0; + int32_t finish_count = 0; + for (const auto &kv : partition_compaction_status) { + if (kv.second == kRunning) { + running_count++; + } else if (kv.second == kQueue) { + queue_count++; + } else if (kv.second == kFinish) { + finish_count++; + } + } + dsn::utils::table_printer tp("status"); + tp.add_row_name_and_data(manual_compaction_status_to_string(kRunning), running_count); + tp.add_row_name_and_data(manual_compaction_status_to_string(kQueue), queue_count); + tp.add_row_name_and_data(manual_compaction_status_to_string(kFinish), finish_count); + std::ostringstream out; + tp.output(out, dsn::utils::table_printer::output_format::kJsonCompact); + resp.body = out.str(); + resp.status_code = http_status_code::ok; +} + } // namespace replication } // namespace dsn diff --git a/src/replica/replica_http_service.h b/src/replica/replica_http_service.h index d58daced5d..8f2c92fd1d 100644 --- a/src/replica/replica_http_service.h +++ b/src/replica/replica_http_service.h @@ -26,12 +26,19 @@ class replica_http_service : public http_service std::placeholders::_1, std::placeholders::_2), "ip:port/replica/data_version?app_id="); + register_handler("compaction", + std::bind(&replica_http_service::query_compaction_handler, + this, + std::placeholders::_1, + std::placeholders::_2), + "ip:port/replica/compaction?app_id="); } std::string path() const override { return "replica"; } void query_duplication_handler(const http_request &req, http_response &resp); void query_app_data_version_handler(const http_request &req, http_response &resp); + void query_compaction_handler(const http_request &req, http_response &resp); private: replica_stub *_stub; diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index 89cd4e6c05..174320e188 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -2828,5 +2828,16 @@ error_code replica_stub::query_app_data_version(int32_t app_id, /*out*/ uint32_t return ERR_OK; } +void replica_stub::query_app_compact_status( + int32_t app_id, std::unordered_map &status) +{ + zauto_read_lock l(_replicas_lock); + for (auto it = _replicas.begin(); it != _replicas.end(); ++it) { + if (it->first.get_app_id() == app_id) { + status[it->first] = it->second->get_compact_status(); + } + } +} + } // namespace replication } // namespace dsn diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h index bb04355863..66417ff2ee 100644 --- a/src/replica/replica_stub.h +++ b/src/replica/replica_stub.h @@ -217,6 +217,11 @@ class replica_stub : public serverlet, public ref_counter void on_query_disk_info(query_disk_info_rpc rpc); void on_disk_migrate(replica_disk_migrate_rpc rpc); + // query partitions compact status by app_id + void + query_app_compact_status(int32_t app_id, + /*out*/ std::unordered_map &status); + private: enum replica_node_state { diff --git a/src/replica/test/replica_test.cpp b/src/replica/test/replica_test.cpp index c7f13ec9eb..9157100552 100644 --- a/src/replica/test/replica_test.cpp +++ b/src/replica/test/replica_test.cpp @@ -115,5 +115,38 @@ TEST_F(replica_test, query_data_version_test) } } +TEST_F(replica_test, query_compaction_test) +{ + replica_http_service http_svc(stub.get()); + struct query_compaction_test + { + std::string app_id; + http_status_code expected_code; + std::string expected_response_json; + } tests[] = { + {"", http_status_code::bad_request, "app_id should not be empty"}, + {"xxx", http_status_code::bad_request, "invalid app_id=xxx"}, + {"2", + http_status_code::ok, + R"({"status":{"CompactionRunning":"0","CompactionQueue":"0","CompactionFinish":"1"}})"}, + {"4", + http_status_code::ok, + R"({"status":{"CompactionRunning":"0","CompactionQueue":"0","CompactionFinish":"0"}})"}}; + for (const auto &test : tests) { + http_request req; + http_response resp; + if (!test.app_id.empty()) { + req.query_args["app_id"] = test.app_id; + } + http_svc.query_compaction_handler(req, resp); + ASSERT_EQ(resp.status_code, test.expected_code); + std::string expected_json = test.expected_response_json; + if (test.expected_code == http_status_code::ok) { + expected_json += "\n"; + } + ASSERT_EQ(resp.body, expected_json); + } +} + } // namespace replication } // namespace dsn