Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
Merge branch 'master' into fds
Browse files Browse the repository at this point in the history
  • Loading branch information
Wu Tao authored Mar 24, 2020
2 parents 632d4f0 + b0e20fd commit 17ec8e2
Show file tree
Hide file tree
Showing 55 changed files with 2,932 additions and 1,486 deletions.
3 changes: 1 addition & 2 deletions include/dsn/dist/replication/duplication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@
namespace dsn {
namespace replication {

typedef rpc_holder<duplication_status_change_request, duplication_status_change_response>
duplication_status_change_rpc;
typedef rpc_holder<duplication_modify_request, duplication_modify_response> duplication_modify_rpc;
typedef rpc_holder<duplication_add_request, duplication_add_response> duplication_add_rpc;
typedef rpc_holder<duplication_query_request, duplication_query_response> duplication_query_rpc;
typedef rpc_holder<duplication_sync_request, duplication_sync_response> duplication_sync_rpc;
Expand Down
1 change: 1 addition & 0 deletions include/dsn/dist/replication/replica_envs.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class replica_envs
static const std::string ROCKSDB_USAGE_SCENARIO;
static const std::string ROCKSDB_CHECKPOINT_RESERVE_MIN_COUNT;
static const std::string ROCKSDB_CHECKPOINT_RESERVE_TIME_SECONDS;
static const std::string ROCKSDB_ITERATION_THRESHOLD_TIME_MS;
static const std::string MANUAL_COMPACT_DISABLED;
static const std::string MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT;
static const std::string MANUAL_COMPACT_ONCE_TRIGGER_TIME;
Expand Down
3 changes: 2 additions & 1 deletion include/dsn/dist/replication/replication.codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ MAKE_EVENT_CODE_AIO(LPC_WRITE_REPLICATION_LOG_SHARED, TASK_PRIORITY_HIGH)
MAKE_EVENT_CODE(LPC_QUERY_CONFIGURATION_ALL, TASK_PRIORITY_HIGH)
MAKE_EVENT_CODE(LPC_MEM_RELEASE, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_CREATE_CHILD, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_QUERY_DISK_INFO, TASK_PRIORITY_COMMON)
#undef CURRENT_THREAD_POOL

// THREAD_POOL_META_SERVER
Expand Down Expand Up @@ -95,7 +96,7 @@ MAKE_EVENT_CODE(LPC_QUERY_PN_DECREE, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_REPORT_RESTORE_STATUS, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_RESTORE_STATUS, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_ADD_DUPLICATION, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_CHANGE_DUPLICATION_STATUS, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_MODIFY_DUPLICATION, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_DUPLICATION, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_DUPLICATION_SYNC, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_UPDATE_APP_ENV, TASK_PRIORITY_COMMON)
Expand Down
45 changes: 44 additions & 1 deletion include/dsn/dist/replication/replication_ddl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,10 @@ class replication_ddl_client

error_with<duplication_add_response>
add_dup(std::string app_name, std::string remote_address, bool freezed);
error_with<duplication_status_change_response>

error_with<duplication_modify_response>
change_dup_status(std::string app_name, int dupid, duplication_status::type status);

error_with<duplication_query_response> query_dup(std::string app_name);

dsn::error_code do_restore(const std::string &backup_provider_name,
Expand Down Expand Up @@ -169,6 +171,11 @@ class replication_ddl_client

dsn::error_code ddd_diagnose(gpid pid, std::vector<ddd_partition_info> &ddd_partitions);

void query_disk_info(
const std::vector<dsn::rpc_address> &targets,
const std::string &app_name,
/*out*/ std::map<dsn::rpc_address, error_with<query_disk_info_response>> &resps);

private:
bool static valid_app_char(int c);

Expand Down Expand Up @@ -222,9 +229,45 @@ class replication_ddl_client
return error_with<TResponse>(std::move(rpc.response()));
}

/// Send request to multi replica server synchronously.
template <typename TRpcHolder, typename TResponse = typename TRpcHolder::response_type>
void call_rpcs_async(std::map<dsn::rpc_address, TRpcHolder> &rpcs,
std::map<dsn::rpc_address, error_with<TResponse>> &resps,
int reply_thread_hash = 0,
bool enable_retry = true)
{
dsn::task_tracker tracker;
error_code err = ERR_UNKNOWN;
for (auto &rpc : rpcs) {
rpc.second.call(
rpc.first, &tracker, [&err, &resps, &rpcs, &rpc](error_code code) mutable {
err = code;
if (err == dsn::ERR_OK) {
resps.emplace(rpc.first, std::move(rpc.second.response()));
rpcs.erase(rpc.first);
} else {
resps.emplace(
rpc.first,
std::move(error_s::make(err, "unable to send rpc to server")));
}
});
}
tracker.wait_outstanding_tasks();

if (enable_retry && rpcs.size() > 0) {
std::map<dsn::rpc_address, dsn::error_with<TResponse>> retry_resps;
call_rpcs_async(rpcs, retry_resps, reply_thread_hash, false);
for (auto &resp : retry_resps) {
resps.emplace(resp.first, std::move(resp.second));
}
}
}

private:
dsn::rpc_address _meta_server;
dsn::task_tracker _tracker;

typedef rpc_holder<query_disk_info_request, query_disk_info_response> query_disk_info_rpc;
};
} // namespace replication
} // namespace dsn
Loading

0 comments on commit 17ec8e2

Please sign in to comment.