Skip to content

Commit

Permalink
feat(FQDN): Replace rpc_address to host_prot for function & Add host_…
Browse files Browse the repository at this point in the history
…port on thrift struct
  • Loading branch information
GehaFearless committed Jan 11, 2024
1 parent a461fa5 commit 41dc5f7
Show file tree
Hide file tree
Showing 186 changed files with 3,987 additions and 2,658 deletions.
4 changes: 4 additions & 0 deletions idl/bulk_load.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ struct bulk_load_request
7:bulk_load_status meta_bulk_load_status;
8:bool query_bulk_load_metadata;
9:string remote_root_path;
10:optional dsn.host_port hp_primary_addr;
}

struct bulk_load_response
Expand All @@ -119,6 +120,7 @@ struct bulk_load_response
8:optional bool is_group_ingestion_finished;
9:optional bool is_group_bulk_load_context_cleaned_up;
10:optional bool is_group_bulk_load_paused;
11:optional map<dsn.host_port, partition_bulk_load_state> hp_group_bulk_load_state;
}

// primary -> secondary
Expand All @@ -131,6 +133,7 @@ struct group_bulk_load_request
5:string cluster_name;
6:bulk_load_status meta_bulk_load_status;
7:string remote_root_path;
8:optional dsn.host_port hp_target_address;
}

struct group_bulk_load_response
Expand Down Expand Up @@ -218,6 +221,7 @@ struct query_bulk_load_response
6:list<map<dsn.rpc_address, partition_bulk_load_state>> bulk_load_states;
7:optional string hint_msg;
8:optional bool is_bulk_loading;
9:optional list<map<dsn.host_port, partition_bulk_load_state>> hp_bulk_load_states;
}

struct clear_bulk_load_state_request
Expand Down
3 changes: 3 additions & 0 deletions idl/dsn.layer2.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ struct partition_configuration
6:list<dsn.rpc_address> last_drops;
7:i64 last_committed_decree;
8:i32 partition_flags;
9:optional dsn.host_port hp_primary;
10:optional list<dsn.host_port> hp_secondaries;
11:optional list<dsn.host_port> hp_last_drops;
}

struct query_cfg_request
Expand Down
1 change: 1 addition & 0 deletions idl/duplication.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ struct duplication_sync_request
1:dsn.rpc_address node;

2:map<dsn.gpid, list<duplication_confirm_entry>> confirm_list;
3:dsn.host_port hp_node;
}

struct duplication_sync_response
Expand Down
8 changes: 8 additions & 0 deletions idl/meta_admin.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ struct configuration_update_request
// the `meta_split_status` will be set
// only used when on_config_sync
6:optional metadata.split_status meta_split_status;
7:optional dsn.host_port hp_node;
}

// meta server (config mgr) => primary | secondary (downgrade) (w/ new config)
Expand Down Expand Up @@ -103,6 +104,7 @@ struct configuration_query_by_node_request
1:dsn.rpc_address node;
2:optional list<metadata.replica_info> stored_replicas;
3:optional replica_server_info info;
4:optional dsn.host_port hp_node;
}

struct configuration_query_by_node_response
Expand All @@ -117,6 +119,7 @@ struct configuration_recovery_request
1:list<dsn.rpc_address> recovery_set;
2:bool skip_bad_nodes;
3:bool skip_lost_partitions;
4:optional list<dsn.host_port> hp_recovery_set;
}

struct configuration_recovery_response
Expand Down Expand Up @@ -205,6 +208,7 @@ struct configuration_list_apps_response
struct query_app_info_request
{
1:dsn.rpc_address meta_server;
2:optional dsn.host_port hp_meta_server;
}

struct query_app_info_response
Expand Down Expand Up @@ -280,6 +284,7 @@ struct node_info
{
1:node_status status = node_status.NS_INVALID;
2:dsn.rpc_address address;
3:optional dsn.host_port hp_address;
}

struct configuration_list_nodes_request
Expand Down Expand Up @@ -349,6 +354,8 @@ struct configuration_proposal_action
// depricated now
// new fields of this struct should start with 5
// 4:i64 period_ts;
5:optional dsn.host_port hp_target;
6:optional dsn.host_port hp_node;
}

struct configuration_balancer_request
Expand Down Expand Up @@ -381,6 +388,7 @@ struct ddd_node_info
5:i64 ballot; // collected && ballot == -1 means replica not exist on this node
6:i64 last_committed_decree;
7:i64 last_prepared_decree;
8:optional dsn.host_port hp_node;
}

struct ddd_partition_info
Expand Down
1 change: 1 addition & 0 deletions idl/metadata.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ struct replica_configuration
// 2. false - secondary copy mutation in this prepare message asynchronously
// NOTICE: it should always be false when update_local_configuration
7:optional bool split_sync_to_child = false;
8:optional dsn.host_port hp_primary;
}

struct replica_info
Expand Down
3 changes: 3 additions & 0 deletions idl/partition_split.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ struct notify_catch_up_request
2:dsn.gpid child_gpid;
3:i64 child_ballot;
4:dsn.rpc_address child_address;
5:optional dsn.host_port hp_child_address;
}

struct notify_cacth_up_response
Expand All @@ -116,6 +117,7 @@ struct update_child_group_partition_count_request
2:i32 new_partition_count;
3:dsn.gpid child_pid;
4:i64 ballot;
5:optional dsn.host_port hp_target_address;
}

struct update_child_group_partition_count_response
Expand All @@ -133,6 +135,7 @@ struct register_child_request
2:dsn.layer2.partition_configuration parent_config;
3:dsn.layer2.partition_configuration child_config;
4:dsn.rpc_address primary_address;
5:optional dsn.host_port hp_primary_address;
}

struct register_child_response
Expand Down
1 change: 1 addition & 0 deletions idl/replica_admin.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ struct query_disk_info_request
{
1:dsn.rpc_address node;
2:string app_name;
3:optional dsn.host_port hp_node;
}

// This response is from replica_server to client.
Expand Down
22 changes: 20 additions & 2 deletions run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ export REPORT_DIR="$ROOT/test_report"
export THIRDPARTY_ROOT=$ROOT/thirdparty
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:${BUILD_LATEST_DIR}/output/lib:${THIRDPARTY_ROOT}/output/lib:${LD_LIBRARY_PATH}
# Disable AddressSanitizerOneDefinitionRuleViolation, see https://github.com/google/sanitizers/issues/1017 for details.
export ASAN_OPTIONS=detect_odr_violation=0
# Add parameters in order to be able to generate coredump file when run ASAN tests
export ASAN_OPTIONS=detect_odr_violation=0:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1
# See https://github.com/gperftools/gperftools/wiki/gperftools'-stacktrace-capturing-methods-and-their-issues.
# Now we choose libgcc, because of https://github.com/apache/incubator-pegasus/issues/1685.
export TCMALLOC_STACKTRACE_METHOD=libgcc # Can be generic_fp, generic_fp_unsafe, libunwind or libgcc
Expand Down Expand Up @@ -241,6 +242,7 @@ function run_build()

if [ ! -z "${SANITIZER}" ]; then
CMAKE_OPTIONS="${CMAKE_OPTIONS} -DSANITIZER=${SANITIZER}"
echo "ASAN_OPTIONS=$ASAN_OPTIONS"
fi

MAKE_OPTIONS="-j$JOB_NUM"
Expand Down Expand Up @@ -484,7 +486,9 @@ function run_test()
# Update options if needed, this should be done before starting onebox to make new options take effect.
if [ "${module}" == "recovery_test" ]; then
master_count=1
opts="meta_state_service_type=meta_state_service_simple,distributed_lock_service_type=distributed_lock_service_simple"
# all test case in recovery_test just run one meta_server, so we should change it
fqdn=`hostname -f`
opts="server_list=$fqdn:34601,meta_state_service_type=meta_state_service_simple,distributed_lock_service_type=distributed_lock_service_simple"
fi
if [ "${module}" == "backup_restore_test" ]; then
opts="cold_backup_disabled=false,cold_backup_checkpoint_reserve_minutes=0,cold_backup_root=onebox"
Expand All @@ -509,6 +513,20 @@ function run_test()

# Run server test.
pushd ${BUILD_LATEST_DIR}/bin/${module}
local function_tests=(
backup_restore_test
recovery_test
restore_test
base_api_test
throttle_test
bulk_load_test
detect_hotspot_test
partition_split_test
)
# function_tests need client used meta_server_list to connect
if [[ "${function_tests[@]}" =~ "${module}" ]]; then
sed -i "s/@LOCAL_HOSTNAME@/${LOCAL_HOSTNAME}/g" ./config.ini
fi
REPORT_DIR=${REPORT_DIR} TEST_BIN=${module} TEST_OPTS=${test_opts} ./run.sh
if [ $? != 0 ]; then
echo "run test \"$module\" in `pwd` failed"
Expand Down
2 changes: 1 addition & 1 deletion scripts/recompile_thrift.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ rm -rf $TMP_DIR
mkdir -p $TMP_DIR
$THIRDPARTY_ROOT/output/bin/thrift --gen cpp:moveable_types -out $TMP_DIR ../idl/rrdb.thrift

sed 's/#include "dsn_types.h"/#include "runtime\/rpc\/rpc_address.h"\n#include "runtime\/task\/task_code.h"\n#include "utils\/blob.h"/' $TMP_DIR/rrdb_types.h > ../src/include/rrdb/rrdb_types.h
sed 's/#include "dsn_types.h"/#include "runtime\/rpc\/rpc_address.h"\n#include "runtime\/rpc\/rpc_host_port.h"\n#include "runtime\/task\/task_code.h"\n#include "utils\/blob.h"/' $TMP_DIR/rrdb_types.h > ../src/include/rrdb/rrdb_types.h
sed 's/#include "rrdb_types.h"/#include <rrdb\/rrdb_types.h>/' $TMP_DIR/rrdb_types.cpp > ../src/base/rrdb_types.cpp

rm -rf $TMP_DIR
Expand Down
16 changes: 10 additions & 6 deletions src/client/partition_resolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,22 @@
#include "partition_resolver_manager.h"
#include "runtime/api_layer1.h"
#include "runtime/api_task.h"
#include "runtime/rpc/dns_resolver.h"
#include "runtime/task/task_spec.h"
#include "utils/fmt_logging.h"
#include "utils/threadpool_code.h"

namespace dsn {
namespace replication {
/*static*/
partition_resolver_ptr partition_resolver::get_resolver(const char *cluster_name,
const std::vector<rpc_address> &meta_list,
const char *app_name)
partition_resolver_ptr
partition_resolver::get_resolver(const char *cluster_name,
const std::vector<host_port> &meta_list,
const char *app_name,
const std::shared_ptr<dns_resolver> &dns_resolver)
{
return partition_resolver_manager::instance().find_or_create(cluster_name, meta_list, app_name);
return partition_resolver_manager::instance().find_or_create(
cluster_name, meta_list, app_name, dns_resolver);
}

DEFINE_TASK_CODE(LPC_RPC_DELAY_CALL, TASK_PRIORITY_COMMON, THREAD_POOL_DEFAULT)
Expand Down Expand Up @@ -107,7 +111,7 @@ void partition_resolver::call_task(const rpc_response_task_ptr &t)
t->replace_callback(std::move(new_callback));

resolve(hdr.client.partition_hash,
[t](resolve_result &&result) mutable {
[t, this](resolve_result &&result) mutable {
if (result.err != ERR_OK) {
t->enqueue(result.err, nullptr);
return;
Expand All @@ -124,7 +128,7 @@ void partition_resolver::call_task(const rpc_response_task_ptr &t)
}
hdr.gpid = result.pid;
}
dsn_rpc_call(result.address, t.get());
dsn_rpc_call(this->_dns_resolver->resolve_address(result.hp), t.get());
},
hdr.client.timeout_ms);
}
Expand Down
26 changes: 16 additions & 10 deletions src/client/partition_resolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@
#include <stdint.h>
#include <chrono>
#include <functional>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "common/gpid.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_host_port.h"
#include "runtime/rpc/rpc_message.h"
#include "runtime/rpc/serialization.h"
#include "runtime/task/async_calls.h"
Expand All @@ -44,6 +45,7 @@
#include "utils/error_code.h"

namespace dsn {
class dns_resolver;
class task_tracker;

namespace replication {
Expand All @@ -53,8 +55,9 @@ class partition_resolver : public ref_counter
public:
static dsn::ref_ptr<partition_resolver>
get_resolver(const char *cluster_name,
const std::vector<dsn::rpc_address> &meta_list,
const char *app_name);
const std::vector<dsn::host_port> &meta_list,
const char *app_name,
const std::shared_ptr<dns_resolver> &dns_resolver);

template <typename TReq, typename TCallback>
dsn::rpc_response_task_ptr call_op(dsn::task_code code,
Expand Down Expand Up @@ -83,13 +86,15 @@ class partition_resolver : public ref_counter

std::string get_app_name() const { return _app_name; }

dsn::rpc_address get_meta_server() const { return _meta_server; }
dsn::host_port get_meta_server() const { return _meta_server; }

const char *log_prefix() const { return _app_name.c_str(); }

protected:
partition_resolver(rpc_address meta_server, const char *app_name)
: _app_name(app_name), _meta_server(meta_server)
partition_resolver(host_port meta_server,
const char *app_name,
const std::shared_ptr<dns_resolver> &dns_resolver)
: _app_name(app_name), _meta_server(meta_server), _dns_resolver(dns_resolver)
{
}

Expand All @@ -103,13 +108,13 @@ class partition_resolver : public ref_counter
///< should call resolve_async in this case
error_code err;
///< IPv4 of the target to send request to
rpc_address address;
host_port hp;
///< global partition indentity
dsn::gpid pid;
};

/**
* resolve partition_hash into IP or group addresses to know what to connect next
* resolve partition_hash into IP or group host_port to know what to connect next
*
* \param partition_hash the partition hash
* \param callback callback invoked on completion or timeout
Expand All @@ -127,7 +132,7 @@ class partition_resolver : public ref_counter
\param partition_index zero-based index of the partition.
\param err error code
this is usually to trigger new round of address resolve
this is usually to trigger new round of host_port resolve
*/
virtual void on_access_failure(int partition_index, error_code err) = 0;

Expand All @@ -144,7 +149,8 @@ class partition_resolver : public ref_counter

std::string _cluster_name;
std::string _app_name;
rpc_address _meta_server;
host_port _meta_server;
std::shared_ptr<dns_resolver> _dns_resolver;
};

typedef ref_ptr<partition_resolver> partition_resolver_ptr;
Expand Down
23 changes: 14 additions & 9 deletions src/client/partition_resolver_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@
#include "client/partition_resolver.h"
#include "partition_resolver_manager.h"
#include "partition_resolver_simple.h"
#include "runtime/rpc/group_address.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/group_host_port.h"
#include "runtime/rpc/rpc_host_port.h"
#include "utils/autoref_ptr.h"
#include "utils/fmt_logging.h"

namespace dsn {
class dns_resolver;

namespace replication {

template <typename T>
Expand All @@ -53,22 +55,25 @@ bool vector_equal(const std::vector<T> &a, const std::vector<T> &b)
return true;
}

partition_resolver_ptr partition_resolver_manager::find_or_create(
const char *cluster_name, const std::vector<rpc_address> &meta_list, const char *app_name)
partition_resolver_ptr
partition_resolver_manager::find_or_create(const char *cluster_name,
const std::vector<host_port> &meta_list,
const char *app_name,
const std::shared_ptr<dns_resolver> &dns_resolver)
{
dsn::zauto_lock l(_lock);
std::map<std::string, partition_resolver_ptr> &app_map = _resolvers[cluster_name];
partition_resolver_ptr &ptr = app_map[app_name];

if (ptr == nullptr) {
dsn::rpc_address meta_group;
dsn::host_port meta_group;
meta_group.assign_group(cluster_name);
meta_group.group_address()->add_list(meta_list);
ptr = new partition_resolver_simple(meta_group, app_name);
meta_group.group_host_port()->add_list(meta_list);
ptr = new partition_resolver_simple(meta_group, app_name, dns_resolver);
return ptr;
} else {
dsn::rpc_address meta_group = ptr->get_meta_server();
const std::vector<dsn::rpc_address> &existing_list = meta_group.group_address()->members();
dsn::host_port meta_group = ptr->get_meta_server();
const std::vector<dsn::host_port> &existing_list = meta_group.group_host_port()->members();
if (!vector_equal(meta_list, existing_list)) {
LOG_ERROR("meta list not match for cluster({})", cluster_name);
return nullptr;
Expand Down
Loading

0 comments on commit 41dc5f7

Please sign in to comment.