Skip to content

Commit

Permalink
feat: add query_disk_info api for shell command (#416)
Browse files Browse the repository at this point in the history
  • Loading branch information
foreverneverer authored Mar 19, 2020
1 parent 0c3fbcf commit 67012ed
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 0 deletions.
41 changes: 41 additions & 0 deletions include/dsn/dist/replication/replication_ddl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ class replication_ddl_client

dsn::error_code ddd_diagnose(gpid pid, std::vector<ddd_partition_info> &ddd_partitions);

void query_disk_info(
const std::vector<dsn::rpc_address> &targets,
const std::string &app_name,
/*out*/ std::map<dsn::rpc_address, error_with<query_disk_info_response>> &resps);

private:
bool static valid_app_char(int c);

Expand Down Expand Up @@ -222,9 +227,45 @@ class replication_ddl_client
return error_with<TResponse>(std::move(rpc.response()));
}

/// Send request to multi replica server synchronously.
template <typename TRpcHolder, typename TResponse = typename TRpcHolder::response_type>
void call_rpcs_async(std::map<dsn::rpc_address, TRpcHolder> &rpcs,
std::map<dsn::rpc_address, error_with<TResponse>> &resps,
int reply_thread_hash = 0,
bool enable_retry = true)
{
dsn::task_tracker tracker;
error_code err = ERR_UNKNOWN;
for (auto &rpc : rpcs) {
rpc.second.call(
rpc.first, &tracker, [&err, &resps, &rpcs, &rpc](error_code code) mutable {
err = code;
if (err == dsn::ERR_OK) {
resps.emplace(rpc.first, std::move(rpc.second.response()));
rpcs.erase(rpc.first);
} else {
resps.emplace(
rpc.first,
std::move(error_s::make(err, "unable to send rpc to server")));
}
});
}
tracker.wait_outstanding_tasks();

if (enable_retry && rpcs.size() > 0) {
std::map<dsn::rpc_address, dsn::error_with<TResponse>> retry_resps;
call_rpcs_async(rpcs, retry_resps, reply_thread_hash, false);
for (auto &resp : retry_resps) {
resps.emplace(resp.first, std::move(resp.second));
}
}
}

private:
dsn::rpc_address _meta_server;
dsn::task_tracker _tracker;

typedef rpc_holder<query_disk_info_request, query_disk_info_response> query_disk_info_rpc;
};
} // namespace replication
} // namespace dsn
17 changes: 17 additions & 0 deletions src/dist/replication/ddl_lib/replication_ddl_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1517,5 +1517,22 @@ replication_ddl_client::ddd_diagnose(gpid pid, std::vector<ddd_partition_info> &

return dsn::ERR_OK;
}

void replication_ddl_client::query_disk_info(
const std::vector<dsn::rpc_address> &targets,
const std::string &app_name,
/*out*/ std::map<dsn::rpc_address, error_with<query_disk_info_response>> &resps)
{
std::map<dsn::rpc_address, query_disk_info_rpc> query_disk_info_rpcs;
for (const auto &target : targets) {
auto request = make_unique<query_disk_info_request>();
request->node = target;
request->app_name = app_name;
query_disk_info_rpcs.emplace(target,
query_disk_info_rpc(std::move(request), RPC_QUERY_DISK_INFO));
}
call_rpcs_async(query_disk_info_rpcs, resps);
}

} // namespace replication
} // namespace dsn

0 comments on commit 67012ed

Please sign in to comment.