diff --git a/include/dsn/dist/replication/replication_ddl_client.h b/include/dsn/dist/replication/replication_ddl_client.h index fff8d58a7b..c6dc3d3efb 100644 --- a/include/dsn/dist/replication/replication_ddl_client.h +++ b/include/dsn/dist/replication/replication_ddl_client.h @@ -169,6 +169,11 @@ class replication_ddl_client dsn::error_code ddd_diagnose(gpid pid, std::vector &ddd_partitions); + void query_disk_info( + const std::vector &targets, + const std::string &app_name, + /*out*/ std::map> &resps); + private: bool static valid_app_char(int c); @@ -222,9 +227,45 @@ class replication_ddl_client return error_with(std::move(rpc.response())); } + /// Send request to multi replica server synchronously. + template + void call_rpcs_async(std::map &rpcs, + std::map> &resps, + int reply_thread_hash = 0, + bool enable_retry = true) + { + dsn::task_tracker tracker; + error_code err = ERR_UNKNOWN; + for (auto &rpc : rpcs) { + rpc.second.call( + rpc.first, &tracker, [&err, &resps, &rpcs, &rpc](error_code code) mutable { + err = code; + if (err == dsn::ERR_OK) { + resps.emplace(rpc.first, std::move(rpc.second.response())); + rpcs.erase(rpc.first); + } else { + resps.emplace( + rpc.first, + std::move(error_s::make(err, "unable to send rpc to server"))); + } + }); + } + tracker.wait_outstanding_tasks(); + + if (enable_retry && rpcs.size() > 0) { + std::map> retry_resps; + call_rpcs_async(rpcs, retry_resps, reply_thread_hash, false); + for (auto &resp : retry_resps) { + resps.emplace(resp.first, std::move(resp.second)); + } + } + } + private: dsn::rpc_address _meta_server; dsn::task_tracker _tracker; + + typedef rpc_holder query_disk_info_rpc; }; } // namespace replication } // namespace dsn diff --git a/src/dist/replication/ddl_lib/replication_ddl_client.cpp b/src/dist/replication/ddl_lib/replication_ddl_client.cpp index f3bcdac646..9b45e4b49e 100644 --- a/src/dist/replication/ddl_lib/replication_ddl_client.cpp +++ b/src/dist/replication/ddl_lib/replication_ddl_client.cpp @@ -1517,5 +1517,22 @@ replication_ddl_client::ddd_diagnose(gpid pid, std::vector & return dsn::ERR_OK; } + +void replication_ddl_client::query_disk_info( + const std::vector &targets, + const std::string &app_name, + /*out*/ std::map> &resps) +{ + std::map query_disk_info_rpcs; + for (const auto &target : targets) { + auto request = make_unique(); + request->node = target; + request->app_name = app_name; + query_disk_info_rpcs.emplace(target, + query_disk_info_rpc(std::move(request), RPC_QUERY_DISK_INFO)); + } + call_rpcs_async(query_disk_info_rpcs, resps); +} + } // namespace replication } // namespace dsn