Skip to content

Commit

Permalink
feat(manual_compaction): meta server support querying compaction stat…
Browse files Browse the repository at this point in the history
…us (#987)
  • Loading branch information
hycdong authored Dec 15, 2021
1 parent f845d8c commit fac96a3
Show file tree
Hide file tree
Showing 12 changed files with 237 additions and 0 deletions.
1 change: 1 addition & 0 deletions include/dsn/dist/replication/replication.codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ MAKE_EVENT_CODE_RPC(RPC_CM_CONTROL_BULK_LOAD, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_BULK_LOAD_STATUS, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_START_BACKUP_APP, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_BACKUP_STATUS, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_MANUAL_COMPACT_STATUS, TASK_PRIORITY_COMMON)
#undef CURRENT_THREAD_POOL

#define CURRENT_THREAD_POOL THREAD_POOL_META_STATE
Expand Down
3 changes: 3 additions & 0 deletions include/dsn/dist/replication/replication_ddl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ class replication_ddl_client
error_with<add_new_disk_response> add_new_disk(const rpc_address &target_node,
const std::string &disk_str);

error_with<query_app_manual_compact_response>
query_app_manual_compact(const std::string &app_name);

private:
bool static valid_app_char(int c);

Expand Down
9 changes: 9 additions & 0 deletions src/client/replication_ddl_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1684,5 +1684,14 @@ replication_ddl_client::add_new_disk(const rpc_address &target_node, const std::
return resps.begin()->second.get_value();
}

error_with<query_app_manual_compact_response>
replication_ddl_client::query_app_manual_compact(const std::string &app_name)
{
auto req = make_unique<query_app_manual_compact_request>();
req->app_name = app_name;
return call_rpc_sync(
query_manual_compact_rpc(std::move(req), RPC_CM_QUERY_MANUAL_COMPACT_STATUS));
}

} // namespace replication
} // namespace dsn
16 changes: 16 additions & 0 deletions src/common/meta_admin.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,22 @@ struct configuration_update_app_env_response
2:string hint_message;
}

struct query_app_manual_compact_request
{
1:string app_name;
}

struct query_app_manual_compact_response
{
// Possible error:
// - ERR_APP_NOT_EXIST: app not exist
// - ERR_APP_DROPPED: app has been dropped
// - ERR_INVALID_STATE: app is not executing manual compaction
1:dsn.error_code err;
2:string hint_msg;
3:optional i32 progress;
}

/////////////////// Nodes Management ////////////////////

struct node_info
Expand Down
2 changes: 2 additions & 0 deletions src/common/replication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ typedef rpc_holder<notify_stop_split_request, notify_stop_split_response> notify
typedef rpc_holder<query_child_state_request, query_child_state_response> query_child_state_rpc;

typedef rpc_holder<backup_request, backup_response> backup_rpc;
typedef rpc_holder<query_app_manual_compact_request, query_app_manual_compact_response>
query_manual_compact_rpc;

class replication_options
{
Expand Down
39 changes: 39 additions & 0 deletions src/meta/meta_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
* xxxx-xx-xx, author, fix bug about xxx
*/
#include <boost/lexical_cast.hpp>

#include <dsn/dist/fmt_logging.h>
#include <dsn/service_api_cpp.h>

#include "meta_data.h"

namespace dsn {
Expand Down Expand Up @@ -479,6 +482,42 @@ void app_state_helper::on_init_partitions()
restore_states.resize(owner->partition_count);
}

void app_state_helper::reset_manual_compact_status()
{
for (auto &cc : contexts) {
for (auto &r : cc.serving) {
r.compact_status = manual_compaction_status::IDLE;
}
}
}

bool app_state_helper::get_manual_compact_progress(/*out*/ int32_t &progress) const
{
int32_t total_replica_count = owner->partition_count * owner->max_replica_count;
dassert_f(total_replica_count > 0,
"invalid app metadata, app({}), partition_count({}), max_replica_count({})",
owner->app_name,
owner->partition_count,
owner->max_replica_count);
int32_t finish_count = 0, idle_count = 0;
for (const auto &cc : contexts) {
for (const auto &r : cc.serving) {
if (r.compact_status == manual_compaction_status::IDLE) {
idle_count++;
} else if (r.compact_status == manual_compaction_status::FINISHED) {
finish_count++;
}
}
}
// all replicas of all partitions are idle
if (idle_count == total_replica_count) {
progress = 0;
return false;
}
progress = finish_count * 100 / total_replica_count;
return true;
}

app_state::app_state(const app_info &info) : app_info(info), helpers(new app_state_helper())
{
log_name = info.app_name + "(" + boost::lexical_cast<std::string>(info.app_id) + ")";
Expand Down
5 changes: 5 additions & 0 deletions src/meta/meta_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,11 @@ class app_state_helper
cc.lb_actions.clear();
}
}

void reset_manual_compact_status();
// get replica group manual compact progress
// return false if partition is not executing manual compaction
bool get_manual_compact_progress(/*out*/ int32_t &progress) const;
};

/*
Expand Down
13 changes: 13 additions & 0 deletions src/meta/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,9 @@ void meta_service::register_rpc_handlers()
RPC_CM_START_BACKUP_APP, "start_backup_app", &meta_service::on_start_backup_app);
register_rpc_handler_with_rpc_holder(
RPC_CM_QUERY_BACKUP_STATUS, "query_backup_status", &meta_service::on_query_backup_status);
register_rpc_handler_with_rpc_holder(RPC_CM_QUERY_MANUAL_COMPACT_STATUS,
"query_manual_compact_status",
&meta_service::on_query_manual_compact_status);
}

int meta_service::check_leader(dsn::message_ex *req, dsn::rpc_address *forward_address)
Expand Down Expand Up @@ -1219,5 +1222,15 @@ size_t meta_service::get_alive_node_count() const
return _alive_set.size();
}

void meta_service::on_query_manual_compact_status(query_manual_compact_rpc rpc)
{
if (!check_status(rpc)) {
return;
}
tasking::enqueue(LPC_META_STATE_NORMAL,
nullptr,
std::bind(&server_state::on_query_manual_compact_status, _state.get(), rpc));
}

} // namespace replication
} // namespace dsn
3 changes: 3 additions & 0 deletions src/meta/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ class meta_service : public serverlet<meta_service>
void on_control_bulk_load(control_bulk_load_rpc rpc);
void on_query_bulk_load_status(query_bulk_load_rpc rpc);

// manual compaction
void on_query_manual_compact_status(query_manual_compact_rpc rpc);

// common routines
// ret:
// 1. the meta is leader
Expand Down
35 changes: 35 additions & 0 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2943,5 +2943,40 @@ bool server_state::validate_target_max_replica_count(int32_t max_replica_count)
return valid;
}

void server_state::on_query_manual_compact_status(query_manual_compact_rpc rpc)
{
const std::string &app_name = rpc.request().app_name;
auto &response = rpc.response();

std::shared_ptr<app_state> app;
{
zauto_read_lock l(_lock);
app = get_app(app_name);
}

if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
response.err = app == nullptr ? ERR_APP_NOT_EXIST : ERR_APP_DROPPED;
response.hint_msg =
fmt::format("app {} is {}",
app_name,
response.err == ERR_APP_NOT_EXIST ? "not existed" : "not available");
derror_f("{}", response.hint_msg);
return;
}

int32_t total_progress = 0;
if (!app->helpers->get_manual_compact_progress(total_progress)) {
response.err = ERR_INVALID_STATE;
response.hint_msg = fmt::format("app {} is not manual compaction", app_name);
dwarn_f("{}", response.hint_msg);
return;
}

ddebug_f("query app {} manual compact succeed, total_progress = {}", app_name, total_progress);
response.err = ERR_OK;
response.hint_msg = "succeed";
response.__set_progress(total_progress);
}

} // namespace replication
} // namespace dsn
3 changes: 3 additions & 0 deletions src/meta/server_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ class server_state

void on_query_restore_status(configuration_query_restore_rpc rpc);

// manual compaction
void on_query_manual_compact_status(query_manual_compact_rpc rpc);

// return true if no need to do any actions
bool check_all_partitions();
void get_cluster_balance_score(double &primary_stddev /*out*/, double &total_stddev /*out*/);
Expand Down
108 changes: 108 additions & 0 deletions src/meta/test/meta_mauanl_compaction_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <dsn/dist/replication/replica_envs.h>

#include "meta_service_test_app.h"
#include "meta_test_base.h"

namespace dsn {
namespace replication {
class meta_app_compaction_test : public meta_test_base
{
public:
meta_app_compaction_test() {}

void SetUp() override
{
meta_test_base::SetUp();
prepare();
}

void prepare()
{
create_app(APP_NAME, PARTITION_COUNT);
auto app = find_app(APP_NAME);
app->partitions.resize(PARTITION_COUNT);
app->helpers->contexts.resize(PARTITION_COUNT);
for (auto i = 0; i < PARTITION_COUNT; ++i) {
serving_replica rep;
rep.compact_status = manual_compaction_status::IDLE;
std::vector<serving_replica> reps;
reps.emplace_back(rep);
reps.emplace_back(rep);
reps.emplace_back(rep);
app->helpers->contexts[i].serving = reps;
}
}

query_app_manual_compact_response query_manual_compaction(int32_t mock_progress)
{
manual_compaction_status::type status = manual_compaction_status::IDLE;
if (mock_progress == 0) {
status = manual_compaction_status::QUEUING;
} else if (mock_progress == 100) {
status = manual_compaction_status::FINISHED;
}
auto app = find_app(APP_NAME);
app->helpers->reset_manual_compact_status();
for (auto &cc : app->helpers->contexts) {
for (auto &r : cc.serving) {
r.compact_status = status;
}
}
if (mock_progress == 50) {
for (auto i = 0; i < PARTITION_COUNT / 2; i++) {
auto &cc = app->helpers->contexts[i];
for (auto &r : cc.serving) {
r.compact_status = manual_compaction_status::FINISHED;
}
}
}
auto request = dsn::make_unique<query_app_manual_compact_request>();
request->app_name = APP_NAME;

query_manual_compact_rpc rpc(std::move(request), RPC_CM_QUERY_MANUAL_COMPACT_STATUS);
_ss->on_query_manual_compact_status(rpc);
wait_all();
return rpc.response();
}

public:
std::string APP_NAME = "manual_compaction_test";
int32_t PARTITION_COUNT = 4;
};

TEST_F(meta_app_compaction_test, test_query_compaction)
{
struct test_case
{
int32_t mock_progress;
error_code expected_err;
} tests[] = {{-1, ERR_INVALID_STATE}, {0, ERR_OK}, {50, ERR_OK}, {100, ERR_OK}};

for (auto test : tests) {
auto resp = query_manual_compaction(test.mock_progress);
ASSERT_EQ(resp.err, test.expected_err);
if (resp.err == ERR_OK) {
ASSERT_EQ(resp.progress, test.mock_progress);
}
}
}

} // namespace replication
} // namespace dsn

0 comments on commit fac96a3

Please sign in to comment.