Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

brpc加redis cluster支持 #2228

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ set(BUTIL_SOURCES
${PROJECT_SOURCE_DIR}/src/butil/time.cpp
${PROJECT_SOURCE_DIR}/src/butil/zero_copy_stream_as_streambuf.cpp
${PROJECT_SOURCE_DIR}/src/butil/crc32c.cc
${PROJECT_SOURCE_DIR}/src/butil/redis_cluster_slot.cc
${PROJECT_SOURCE_DIR}/src/butil/containers/case_ignored_flat_map.cpp
${PROJECT_SOURCE_DIR}/src/butil/iobuf.cpp
${PROJECT_SOURCE_DIR}/src/butil/binary_printer.cpp
Expand Down
2 changes: 1 addition & 1 deletion example/http_c++/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ int main(int argc, char* argv[]) {

// Initialize the channel, NULL means using default options.
// options, see `brpc/channel.h'.
if (channel.Init(url, FLAGS_load_balancer.c_str(), &options) != 0) {
if (channel.Init(url, "", &options) != 0) {
LOG(ERROR) << "Fail to initialize channel";
return -1;
}
Expand Down
27 changes: 26 additions & 1 deletion example/redis_c++/redis_cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,38 @@
#include <readline/history.h>
#include <gflags/gflags.h>
#include <butil/logging.h>
#include <butil/redis_cluster_slot.h>
#include <brpc/channel.h>
#include <brpc/redis.h>
#include <brpc/policy/redis_authenticator.h>

DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short");
DEFINE_string(server, "127.0.0.1:6379", "IP Address of server");
DEFINE_int32(timeout_ms, 1000, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
DEFINE_string(auth, "", "auth...");
DEFINE_uint64(request_code, 0, "request code");

namespace brpc {
const char* logo();
}

static std::string extractSecondWord(const std::string &str) {
std::istringstream iss(str);
std::vector<std::string> words;
std::string word;

while (iss >> word) {
words.push_back(word);
}

if (words.size() >= 2) {
return words[1];
} else {
return "";
}
}

// Send `command' to redis-server via `channel'
static bool access_redis(brpc::Channel& channel, const char* command) {
brpc::RedisRequest request;
Expand All @@ -44,6 +64,7 @@ static bool access_redis(brpc::Channel& channel, const char* command) {
}
brpc::RedisResponse response;
brpc::Controller cntl;
cntl.set_request_code(butil::cal_slot_num(extractSecondWord(command)));
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to access redis, " << cntl.ErrorText();
Expand Down Expand Up @@ -89,7 +110,11 @@ int main(int argc, char* argv[]) {
options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
options.max_retry = FLAGS_max_retry;
if (channel.Init(FLAGS_server.c_str(), &options) != 0) {
if (!FLAGS_auth.empty()) {
brpc::policy::RedisAuthenticator* auth = new brpc::policy::RedisAuthenticator(FLAGS_auth);
options.auth = auth;
}
if (channel.Init(FLAGS_server.c_str(), "c_redis_cluster", &options) != 0) {
LOG(ERROR) << "Fail to initialize channel";
return -1;
}
Expand Down
1 change: 0 additions & 1 deletion src/brpc/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "butil/third_party/murmurhash3/murmurhash3.h"
#include "butil/strings/string_util.h"
#include "bthread/unstable.h" // bthread_timer_add
#include "brpc/socket_map.h" // SocketMapInsert
#include "brpc/compress.h"
#include "brpc/global.h"
#include "brpc/span.h"
Expand Down
1 change: 1 addition & 0 deletions src/brpc/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "brpc/adaptive_connection_type.h" // AdaptiveConnectionType
#include "brpc/socket_id.h" // SocketId
#include "brpc/controller.h" // brpc::Controller
#include "brpc/socket_map.h" // SocketMapInsert
#include "brpc/details/profiler_linker.h"
#include "brpc/retry_policy.h"
#include "brpc/naming_service_filter.h"
Expand Down
21 changes: 18 additions & 3 deletions src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
#include "bthread/unstable.h"
#include "bvar/bvar.h"
#include "brpc/socket.h"
#include "brpc/socket_map.h"
#include "brpc/channel.h"
#include "brpc/load_balancer.h"
#include "brpc/closure_guard.h"
Expand Down Expand Up @@ -265,6 +264,7 @@ void Controller::ResetPods() {
_sender = NULL;
_request_code = 0;
_single_server_id = INVALID_SOCKET_ID;
_tmp_single_server_id = INVALID_SOCKET_ID;
_unfinished_call = NULL;
_stream_creator = NULL;
_accessed = NULL;
Expand Down Expand Up @@ -587,6 +587,12 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info,
return;
}

if (saved_error == EMOVED) {
//todo set real serverid....
_current_call.OnComplete(this, _error_code, info.responded, false);
return IssueRPC(butil::gettimeofday_us());
}

if ((!_error_code && _retry_policy == NULL) ||
_current_call.nretry >= _max_retry) {
goto END_OF_RPC;
Expand Down Expand Up @@ -1020,14 +1026,23 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
if (SingleServer()) {
// Don't use _current_call.peer_id which is set to -1 after construction
// of the backup call.
const int rc = Socket::Address(_single_server_id, &tmp_sock);
int rc;
SocketId peer_id;
if (_single_server_id != INVALID_SOCKET_ID) {
rc = Socket::Address(_single_server_id, &tmp_sock);
peer_id = _single_server_id;
} else {
rc = Socket::Address(_tmp_single_server_id, &tmp_sock);
peer_id = _tmp_single_server_id;
_tmp_single_server_id = INVALID_SOCKET_ID;
}
if (rc != 0 || (!is_health_check_call() && !tmp_sock->IsAvailable())) {
SetFailed(EHOSTDOWN, "Not connected to %s yet, server_id=%" PRIu64,
endpoint2str(_remote_side).c_str(), _single_server_id);
tmp_sock.reset(); // Release ref ASAP
return HandleSendFailed();
}
_current_call.peer_id = _single_server_id;
_current_call.peer_id = peer_id;
} else {
LoadBalancer::SelectIn sel_in =
{ start_realtime_us, true,
Expand Down
8 changes: 7 additions & 1 deletion src/brpc/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "brpc/callback.h"
#include "brpc/progressive_attachment.h" // ProgressiveAttachment
#include "brpc/progressive_reader.h" // ProgressiveReader
#include "brpc/socket_map.h"
#include "brpc/grpc.h"
#include "brpc/kvmap.h"

Expand Down Expand Up @@ -564,6 +565,10 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
// -1 means no deadline.
int64_t deadline_us() const { return _deadline_us; }

void set_tmp_single_socket_id (const SocketId& id) {
_tmp_single_server_id = id;
}

private:
struct CompletionInfo {
CallId id; // call_id of the corresponding request
Expand Down Expand Up @@ -679,7 +684,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);

void HandleStreamConnection(Socket *host_socket);

bool SingleServer() const { return _single_server_id != INVALID_SOCKET_ID; }
bool SingleServer() const { return _single_server_id != INVALID_SOCKET_ID || _tmp_single_server_id != INVALID_SOCKET_ID; }

void SubmitSpan();

Expand Down Expand Up @@ -774,6 +779,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
RPCSender* _sender;
uint64_t _request_code;
SocketId _single_server_id;
SocketId _tmp_single_server_id;
butil::intrusive_ptr<SharedLoadBalancer> _lb;

// for passing parameters to created bthread, don't modify it otherwhere.
Expand Down
1 change: 1 addition & 0 deletions src/brpc/errno.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ enum Errno {
ELIMIT = 2004; // Reached server's limit on resources
ECLOSE = 2005; // Close socket initiatively
EITP = 2006; // Failed Itp response
EMOVED = 2007; // Failed Itp response

// Errno related to RDMA (may happen at both sides)
ERDMA = 3001; // RDMA verbs error
Expand Down
6 changes: 6 additions & 0 deletions src/brpc/global.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "brpc/policy/consul_naming_service.h"
#include "brpc/policy/discovery_naming_service.h"
#include "brpc/policy/nacos_naming_service.h"
#include "brpc/policy/redis_cluster_naming_service.h"

// Load Balancers
#include "brpc/policy/round_robin_load_balancer.h"
Expand Down Expand Up @@ -123,6 +124,7 @@ struct GlobalExtensions {
, ch_mh_lb(CONS_HASH_LB_MURMUR3)
, ch_md5_lb(CONS_HASH_LB_MD5)
, ch_ketama_lb(CONS_HASH_LB_KETAMA)
, ch_redis_cluster_lb(CONS_HASH_LB_REDIS_CLUSTER)
, constant_cl(0) {
}

Expand All @@ -138,6 +140,7 @@ struct GlobalExtensions {
ConsulNamingService cns;
DiscoveryNamingService dcns;
NacosNamingService nns;
RedisClusterNamingService rcns;

RoundRobinLoadBalancer rr_lb;
WeightedRoundRobinLoadBalancer wrr_lb;
Expand All @@ -147,6 +150,7 @@ struct GlobalExtensions {
ConsistentHashingLoadBalancer ch_mh_lb;
ConsistentHashingLoadBalancer ch_md5_lb;
ConsistentHashingLoadBalancer ch_ketama_lb;
ConsistentHashingLoadBalancer ch_redis_cluster_lb;
DynPartLoadBalancer dynpart_lb;

AutoConcurrencyLimiter auto_cl;
Expand Down Expand Up @@ -363,6 +367,7 @@ static void GlobalInitializeOrDieImpl() {
NamingServiceExtension()->RegisterOrDie("consul", &g_ext->cns);
NamingServiceExtension()->RegisterOrDie("discovery", &g_ext->dcns);
NamingServiceExtension()->RegisterOrDie("nacos", &g_ext->nns);
NamingServiceExtension()->RegisterOrDie("redis_cluster", &g_ext->rcns);

// Load Balancers
LoadBalancerExtension()->RegisterOrDie("rr", &g_ext->rr_lb);
Expand All @@ -373,6 +378,7 @@ static void GlobalInitializeOrDieImpl() {
LoadBalancerExtension()->RegisterOrDie("c_murmurhash", &g_ext->ch_mh_lb);
LoadBalancerExtension()->RegisterOrDie("c_md5", &g_ext->ch_md5_lb);
LoadBalancerExtension()->RegisterOrDie("c_ketama", &g_ext->ch_ketama_lb);
LoadBalancerExtension()->RegisterOrDie("c_redis_cluster", &g_ext->ch_redis_cluster_lb);
LoadBalancerExtension()->RegisterOrDie("_dynpart", &g_ext->dynpart_lb);

// Compress Handlers
Expand Down
47 changes: 40 additions & 7 deletions src/brpc/policy/consistent_hashing_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,38 @@ bool KetamaReplicaPolicy::Build(ServerId server,
return true;
}

class RedisClusterPolicy : public ReplicaPolicy {
public:

virtual bool Build(ServerId server,
size_t num_replicas,
std::vector<ConsistentHashingLoadBalancer::Node>* replicas) const;

virtual const char* name() const { return "RedisCluster"; }
};

bool RedisClusterPolicy::Build(ServerId server,
size_t,
std::vector<ConsistentHashingLoadBalancer::Node>* replicas) const {
SocketUniquePtr ptr;
if (Socket::AddressFailedAsWell(server.id, &ptr) == -1) {
return false;
}
replicas->clear();
std::stringstream ss(server.tag);
int slot_start, slot_end;
char delimiter;
ss >> slot_start >> delimiter >> slot_end;
for (size_t i = slot_start; i <= slot_end; ++i) {
ConsistentHashingLoadBalancer::Node node;
node.hash = i;
node.server_sock = server;
node.server_addr = ptr->remote_side();
replicas->push_back(node);
}
return true;
}

namespace {

pthread_once_t s_replica_policy_once = PTHREAD_ONCE_INIT;
Expand All @@ -130,7 +162,8 @@ void InitReplicaPolicy() {
g_replica_policy = new std::array<const ReplicaPolicy*, CONS_HASH_LB_LAST>({
new DefaultReplicaPolicy(MurmurHash32),
new DefaultReplicaPolicy(MD5Hash32),
new KetamaReplicaPolicy
new KetamaReplicaPolicy,
new RedisClusterPolicy
});
}

Expand Down Expand Up @@ -245,12 +278,12 @@ size_t ConsistentHashingLoadBalancer::AddServersInBatch(
std::sort(add_nodes.begin(), add_nodes.end());
bool executed = false;
const size_t ret = _db_hash_ring.ModifyWithForeground(AddBatch, add_nodes, &executed);
CHECK(ret % _num_replicas == 0);
const size_t n = ret / _num_replicas;
LOG_IF(ERROR, n != servers.size())
<< "Fail to AddServersInBatch, expected " << servers.size()
<< " actually " << n;
return n;
// CHECK(ret % _num_replicas == 0);
// const size_t n = ret / _num_replicas;
// LOG_IF(ERROR, n != servers.size())
// << "Fail to AddServersInBatch, expected " << servers.size()
// << " actually " << n;
return add_nodes.size();
}

bool ConsistentHashingLoadBalancer::RemoveServer(const ServerId& server) {
Expand Down
3 changes: 2 additions & 1 deletion src/brpc/policy/consistent_hashing_load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ enum ConsistentHashingLoadBalancerType {
CONS_HASH_LB_MURMUR3 = 0,
CONS_HASH_LB_MD5 = 1,
CONS_HASH_LB_KETAMA = 2,
CONS_HASH_LB_REDIS_CLUSTER = 3,

// Identify the last one.
CONS_HASH_LB_LAST = 3
CONS_HASH_LB_LAST = 4
};

class ConsistentHashingLoadBalancer : public LoadBalancer {
Expand Down
Loading