Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat(bulk_load): add query compaction status http interface (#696)
Browse files Browse the repository at this point in the history
  • Loading branch information
hycdong authored Dec 28, 2020
1 parent 60831d6 commit 2dd3b75
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 0 deletions.
36 changes: 36 additions & 0 deletions src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,42 @@ std::string replica::query_compact_state() const
return _app->query_compact_state();
}

const char *manual_compaction_status_to_string(manual_compaction_status status)
{
switch (status) {
case kFinish:
return "CompactionFinish";
case kRunning:
return "CompactionRunning";
case kQueue:
return "CompactionQueue";
default:
dassert(false, "invalid status({})", status);
__builtin_unreachable();
}
}

manual_compaction_status replica::get_compact_status() const
{
std::string compact_state = query_compact_state();
// query_compact_state will return a message like:
// Case1. last finish at [-]
// - partition is not manual compaction
// Case2. last finish at [timestamp], last used {time_used} ms
// - partition manual compaction finished
// Case3. last finish at [-], recent enqueue at [timestamp]
// - partition is in manual compaction queue
// Case4. last finish at [-], recent enqueue at [timestamp], recent start at [timestamp]
// - partition is running manual compaction
if (compact_state.find("recent start at") != std::string::npos) {
return kRunning;
} else if (compact_state.find("recent enqueue at") != std::string::npos) {
return kQueue;
} else {
return kFinish;
}
}

// Replicas on the server which serves for the same table will share the same perf-counter.
// For example counter `table.level.RPC_RRDB_RRDB_MULTI_PUT.latency(ns)@test_table` is shared by
// all the replicas for `test_table`.
Expand Down
14 changes: 14 additions & 0 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ namespace test {
class test_checker;
}

enum manual_compaction_status
{
kFinish = 0,
kRunning,
kQueue
};
const char *manual_compaction_status_to_string(manual_compaction_status status);

class replica : public serverlet<replica>, public ref_counter, public replica_base
{
public:
Expand Down Expand Up @@ -392,8 +400,14 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba

void update_restore_progress(uint64_t f_size);

// Used for remote command
// TODO: remove this interface and only expose the http interface
// now this remote commend will be used by `scripts/pegasus_manual_compact.sh`
std::string query_compact_state() const;

// Used for http interface
manual_compaction_status get_compact_status() const;

void init_table_level_latency_counters();

void on_detect_hotkey(const detect_hotkey_request &req, /*out*/ detect_hotkey_response &resp);
Expand Down
41 changes: 41 additions & 0 deletions src/replica/replica_http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,46 @@ void replica_http_service::query_app_data_version_handler(const http_request &re
resp.status_code = http_status_code::ok;
}

void replica_http_service::query_compaction_handler(const http_request &req, http_response &resp)
{
auto it = req.query_args.find("app_id");
if (it == req.query_args.end()) {
resp.body = "app_id should not be empty";
resp.status_code = http_status_code::bad_request;
return;
}

int32_t app_id = -1;
if (!buf2int32(it->second, app_id) || app_id < 0) {
resp.body = fmt::format("invalid app_id={}", it->second);
resp.status_code = http_status_code::bad_request;
return;
}

std::unordered_map<gpid, manual_compaction_status> partition_compaction_status;
_stub->query_app_compact_status(app_id, partition_compaction_status);

int32_t running_count = 0;
int32_t queue_count = 0;
int32_t finish_count = 0;
for (const auto &kv : partition_compaction_status) {
if (kv.second == kRunning) {
running_count++;
} else if (kv.second == kQueue) {
queue_count++;
} else if (kv.second == kFinish) {
finish_count++;
}
}
dsn::utils::table_printer tp("status");
tp.add_row_name_and_data(manual_compaction_status_to_string(kRunning), running_count);
tp.add_row_name_and_data(manual_compaction_status_to_string(kQueue), queue_count);
tp.add_row_name_and_data(manual_compaction_status_to_string(kFinish), finish_count);
std::ostringstream out;
tp.output(out, dsn::utils::table_printer::output_format::kJsonCompact);
resp.body = out.str();
resp.status_code = http_status_code::ok;
}

} // namespace replication
} // namespace dsn
7 changes: 7 additions & 0 deletions src/replica/replica_http_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,19 @@ class replica_http_service : public http_service
std::placeholders::_1,
std::placeholders::_2),
"ip:port/replica/data_version?app_id=<app_id>");
register_handler("compaction",
std::bind(&replica_http_service::query_compaction_handler,
this,
std::placeholders::_1,
std::placeholders::_2),
"ip:port/replica/compaction?app_id=<app_id>");
}

std::string path() const override { return "replica"; }

void query_duplication_handler(const http_request &req, http_response &resp);
void query_app_data_version_handler(const http_request &req, http_response &resp);
void query_compaction_handler(const http_request &req, http_response &resp);

private:
replica_stub *_stub;
Expand Down
11 changes: 11 additions & 0 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2828,5 +2828,16 @@ error_code replica_stub::query_app_data_version(int32_t app_id, /*out*/ uint32_t
return ERR_OK;
}

void replica_stub::query_app_compact_status(
int32_t app_id, std::unordered_map<gpid, manual_compaction_status> &status)
{
zauto_read_lock l(_replicas_lock);
for (auto it = _replicas.begin(); it != _replicas.end(); ++it) {
if (it->first.get_app_id() == app_id) {
status[it->first] = it->second->get_compact_status();
}
}
}

} // namespace replication
} // namespace dsn
5 changes: 5 additions & 0 deletions src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
void on_query_disk_info(query_disk_info_rpc rpc);
void on_disk_migrate(replica_disk_migrate_rpc rpc);

// query partitions compact status by app_id
void
query_app_compact_status(int32_t app_id,
/*out*/ std::unordered_map<gpid, manual_compaction_status> &status);

private:
enum replica_node_state
{
Expand Down
33 changes: 33 additions & 0 deletions src/replica/test/replica_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,5 +115,38 @@ TEST_F(replica_test, query_data_version_test)
}
}

TEST_F(replica_test, query_compaction_test)
{
replica_http_service http_svc(stub.get());
struct query_compaction_test
{
std::string app_id;
http_status_code expected_code;
std::string expected_response_json;
} tests[] = {
{"", http_status_code::bad_request, "app_id should not be empty"},
{"xxx", http_status_code::bad_request, "invalid app_id=xxx"},
{"2",
http_status_code::ok,
R"({"status":{"CompactionRunning":"0","CompactionQueue":"0","CompactionFinish":"1"}})"},
{"4",
http_status_code::ok,
R"({"status":{"CompactionRunning":"0","CompactionQueue":"0","CompactionFinish":"0"}})"}};
for (const auto &test : tests) {
http_request req;
http_response resp;
if (!test.app_id.empty()) {
req.query_args["app_id"] = test.app_id;
}
http_svc.query_compaction_handler(req, resp);
ASSERT_EQ(resp.status_code, test.expected_code);
std::string expected_json = test.expected_response_json;
if (test.expected_code == http_status_code::ok) {
expected_json += "\n";
}
ASSERT_EQ(resp.body, expected_json);
}
}

} // namespace replication
} // namespace dsn

0 comments on commit 2dd3b75

Please sign in to comment.