From 2b962d356be9d65c290582dd140e5eba75f5bf73 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Mon, 25 Mar 2019 20:31:51 +0800 Subject: [PATCH 1/2] update client forcely --- contrib/client-c/.gitignore | 1 + contrib/client-c/CMakeLists.txt | 2 + contrib/client-c/cmake/find_kvproto.cmake | 2 +- contrib/client-c/include/common/Exception.h | 30 +++ contrib/client-c/include/pd/MockPDClient.h | 39 ++-- contrib/client-c/include/tikv/Backoff.h | 4 +- contrib/client-c/include/tikv/Region.h | 20 +- contrib/client-c/include/tikv/RegionClient.h | 73 +++++-- contrib/client-c/include/tikv/Rpc.h | 5 +- contrib/client-c/src/CMakeLists.txt | 4 +- contrib/client-c/src/pd/Client.cc | 29 ++- contrib/client-c/src/test/CMakeLists.txt | 43 ++++ contrib/client-c/src/test/MockPDServer.h | 191 ++++++++++++++++++ contrib/client-c/src/test/MockTiKV.h | 156 ++++++++++++++ .../src/test/get_region_id_wrong_test.cc | 50 +++++ .../src/test/get_store_id_wrong_2_test.cc | 51 +++++ .../src/test/get_store_id_wrong_test.cc | 51 +++++ contrib/client-c/src/test/grpc_error_test.cc | 52 +++++ contrib/client-c/src/test/pd_test.cc | 46 +++++ contrib/client-c/src/test/read_index_test.cc | 47 +++++ contrib/client-c/src/tikv/Backoff.cc | 26 ++- contrib/client-c/src/tikv/Region.cc | 55 ++++- contrib/client-c/src/tikv/Rpc.cc | 1 + 23 files changed, 921 insertions(+), 57 deletions(-) create mode 100644 contrib/client-c/include/common/Exception.h create mode 100644 contrib/client-c/src/test/CMakeLists.txt create mode 100644 contrib/client-c/src/test/MockPDServer.h create mode 100644 contrib/client-c/src/test/MockTiKV.h create mode 100644 contrib/client-c/src/test/get_region_id_wrong_test.cc create mode 100644 contrib/client-c/src/test/get_store_id_wrong_2_test.cc create mode 100644 contrib/client-c/src/test/get_store_id_wrong_test.cc create mode 100644 contrib/client-c/src/test/grpc_error_test.cc create mode 100644 contrib/client-c/src/test/pd_test.cc create mode 100644 contrib/client-c/src/test/read_index_test.cc diff --git a/contrib/client-c/.gitignore b/contrib/client-c/.gitignore index 18574a292b4..1edae9c39d3 100644 --- a/contrib/client-c/.gitignore +++ b/contrib/client-c/.gitignore @@ -38,6 +38,7 @@ build .idea *.iml *.swp +*.swo tags .clang-format diff --git a/contrib/client-c/CMakeLists.txt b/contrib/client-c/CMakeLists.txt index 052349d946f..bbf18c62266 100644 --- a/contrib/client-c/CMakeLists.txt +++ b/contrib/client-c/CMakeLists.txt @@ -1,5 +1,7 @@ cmake_minimum_required(VERSION 2.8) project(kvClient) +set (CMAKE_CXX_STANDARD 11) +set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-narrowing") if (NOT gRPC_FOUND) include (cmake/find_grpc.cmake) diff --git a/contrib/client-c/cmake/find_kvproto.cmake b/contrib/client-c/cmake/find_kvproto.cmake index 44a1fa1aba0..b130b72dee6 100644 --- a/contrib/client-c/cmake/find_kvproto.cmake +++ b/contrib/client-c/cmake/find_kvproto.cmake @@ -1,7 +1,7 @@ # Currently kvproto should always use bundled library. if (NOT EXISTS "${kvClient_SOURCE_DIR}/third_party/kvproto/cpp/kvproto/errorpb.pb.h") - message (FATAL_ERROR "kvproto submodule in thrid_party/kvproto is missing.") + message (FATAL_ERROR "kvproto submodule in third_party/kvproto is missing.") endif () message(STATUS "Using kvproto: ${kvClient_SOURCE_DIR}/third_party/kvproto/cpp") diff --git a/contrib/client-c/include/common/Exception.h b/contrib/client-c/include/common/Exception.h new file mode 100644 index 00000000000..5c21a909e88 --- /dev/null +++ b/contrib/client-c/include/common/Exception.h @@ -0,0 +1,30 @@ +#pragma once + +#include +#include +#include + +namespace pingcap { + +const int MismatchClusterIDCode = 1; +const int GRPCErrorCode = 2; +const int InitClusterIDFailed = 3; +const int UpdatePDLeaderFailed = 4; +const int TimeoutError = 5; +const int RegionUnavailable = 6; + +class Exception : public Poco::Exception +{ +public: + Exception() {} /// For deferred initialization. + Exception(const std::string & msg, int code = 0) : Poco::Exception(msg, code) {} + Exception(const std::string & msg, const std::string & arg, int code = 0) : Poco::Exception(msg, arg, code) {} + Exception(const std::string & msg, const Exception & exc, int code = 0) : Poco::Exception(msg, exc, code) {} + explicit Exception(const Poco::Exception & exc) : Poco::Exception(exc.displayText()) {} + + Exception * clone() const override { return new Exception(*this); } + void rethrow() const override { throw *this; } + +}; + +} diff --git a/contrib/client-c/include/pd/MockPDClient.h b/contrib/client-c/include/pd/MockPDClient.h index 259949858aa..ffaed769f4d 100644 --- a/contrib/client-c/include/pd/MockPDClient.h +++ b/contrib/client-c/include/pd/MockPDClient.h @@ -3,55 +3,44 @@ #include #include -namespace pingcap::pd -{ +namespace pingcap { +namespace pd { -class MockPDClient : public IClient -{ +class MockPDClient : public IClient { public: - MockPDClient(bool enable_gc_ = true) : enable_gc(enable_gc_) {} + MockPDClient() = default; ~MockPDClient() override {} uint64_t getGCSafePoint() override { - return enable_gc ? (uint64_t)curr : 0; - //std::time_t t = std::time(nullptr); - //std::tm & tm = *std::localtime(&t); - //tm.tm_sec -= 2; - //return static_cast(std::mktime(&tm)); + std::time_t t = std::time(nullptr); + std::tm & tm = *std::localtime(&t); + tm.tm_sec -= 2; + return static_cast(std::mktime(&tm)); } uint64_t getTS() override { - return curr++; - //return static_cast(std::time(NULL)); + return static_cast(std::time(NULL)); } - std::tuple getRegion(std::string) override - { + std::tuple getRegion(std::string) override { throw "not implemented"; } - std::tuple getRegionByID(uint64_t) override - { + std::tuple getRegionByID(uint64_t) override { throw "not implemented"; } - metapb::Store getStore(uint64_t) override - { + metapb::Store getStore(uint64_t) override { throw "not implemented"; } - // TODO: remove this? - bool isMock() override - { + bool isMock() override { return true; } - -private: - bool enable_gc; - std::atomic curr = static_cast(std::time(NULL)); }; } +} diff --git a/contrib/client-c/include/tikv/Backoff.h b/contrib/client-c/include/tikv/Backoff.h index fa1db09798e..465d79b8711 100644 --- a/contrib/client-c/include/tikv/Backoff.h +++ b/contrib/client-c/include/tikv/Backoff.h @@ -6,7 +6,7 @@ #include #include -#include +#include namespace pingcap { namespace kv { @@ -71,6 +71,8 @@ struct Backoff { } }; +constexpr int readIndexMaxBackoff = 20000; + using BackoffPtr = std::shared_ptr; struct Backoffer { diff --git a/contrib/client-c/include/tikv/Region.h b/contrib/client-c/include/tikv/Region.h index add13823740..b0fee6c6ba1 100644 --- a/contrib/client-c/include/tikv/Region.h +++ b/contrib/client-c/include/tikv/Region.h @@ -7,6 +7,8 @@ #include #include #include +#include +#include namespace pingcap { namespace kv { @@ -31,6 +33,8 @@ struct RegionVerID { uint64_t confVer; uint64_t ver; + RegionVerID(uint64_t id_, uint64_t conf_ver, uint64_t ver_): id(id_), confVer(conf_ver), ver(ver_){} + bool operator == (const RegionVerID & rhs) const { return id == rhs.id && confVer == rhs.confVer && ver == rhs.ver; } @@ -56,6 +60,8 @@ struct Region { metapb::Peer peer; metapb::Peer learner; + Region(const metapb::Region & meta_, const metapb::Peer & peer_) : meta(meta_), peer(peer_), learner(metapb::Peer::default_instance()) {} + Region(const metapb::Region & meta_, const metapb::Peer & peer_, const metapb::Peer & learner_) : meta(meta_), peer(peer_), learner(learner_) {} @@ -118,12 +124,22 @@ using RPCContextPtr = std::shared_ptr; class RegionCache { public: - RegionCache(pd::ClientPtr pdClient_) : pdClient(pdClient_) { + RegionCache(pd::ClientPtr pdClient_) : pdClient(pdClient_), log(&Logger::get("pingcap.tikv")) { } RPCContextPtr getRPCContext(Backoffer & bo, const RegionVerID & id, bool is_learner); + void updateLeader(Backoffer & bo, const RegionVerID & region_id, uint64_t leader_store_id); + //KeyLocation locateKey(Backoffer & bo, std::string key); + // + void dropRegion(const RegionVerID &); + + void dropStore(uint64_t failed_store_id); + + void dropStoreOnSendReqFail(RPCContextPtr & ctx, const Exception & exc); + + void onRegionStale(RPCContextPtr ctx, const errorpb::StaleEpoch & stale_epoch); private: RegionPtr getCachedRegion(Backoffer & bo, const RegionVerID & id); @@ -153,6 +169,8 @@ class RegionCache { std::mutex region_mutex; std::mutex store_mutex; + + Logger * log; }; using RegionCachePtr = std::shared_ptr; diff --git a/contrib/client-c/include/tikv/RegionClient.h b/contrib/client-c/include/tikv/RegionClient.h index 0165d01b795..fe185fecddf 100644 --- a/contrib/client-c/include/tikv/RegionClient.h +++ b/contrib/client-c/include/tikv/RegionClient.h @@ -17,28 +17,77 @@ struct RegionClient { int64_t getReadIndex() { auto request = new kvrpcpb::ReadIndexRequest(); - Backoffer bo(10000); + Backoffer bo(readIndexMaxBackoff); auto rpc_call = std::make_shared>(request); - auto ctx = cache -> getRPCContext(bo, region_id, true); - store_addr = ctx->addr; - sendReqToRegion(bo, rpc_call, ctx); + sendReqToRegion(bo, rpc_call, true); return rpc_call -> getResp() -> read_index(); } template - void sendReqToRegion(Backoffer & bo, RpcCallPtr rpc, RPCContextPtr rpc_ctx) { - try { - rpc -> setCtx(rpc_ctx); - client -> sendRequest(store_addr, rpc); + void sendReqToRegion(Backoffer & bo, RpcCallPtr rpc, bool learner) { + for (;;) { + auto ctx = cache -> getRPCContext(bo, region_id, learner); + store_addr = ctx->addr; + std::cout<<"store_addr "<< store_addr < setCtx(ctx); + try { + client -> sendRequest(store_addr, rpc); + } catch(const Exception & e) { + onSendFail(bo, e, ctx); + continue; + } + auto resp = rpc -> getResp(); + if (resp -> has_region_error()) { + onRegionError(bo, ctx, resp->region_error()); + } else { + return; + } } - catch(const Exception & e) { - onSendFail(bo, e); + } + + void onRegionError(Backoffer & bo, RPCContextPtr rpc_ctx, const errorpb::Error & err) { + if (err.has_not_leader()) { + auto not_leader = err.not_leader(); + if (not_leader.has_leader()) { + cache -> updateLeader(bo, rpc_ctx->region, not_leader.leader().store_id()); + bo.backoff(boUpdateLeader, Exception("not leader")); + } else { + cache -> dropRegion(rpc_ctx->region); + bo.backoff(boRegionMiss, Exception("not leader")); + } + return; + } + + if (err.has_store_not_match()) { + cache -> dropStore(rpc_ctx->peer.store_id()); + return; + } + + if (err.has_stale_epoch()) { + cache -> onRegionStale(rpc_ctx, err.stale_epoch()); + return; + } + + if (err.has_server_is_busy()) { + bo.backoff(boServerBusy, Exception("server busy")); + return; + } + + if (err.has_stale_command()) { + return; + } + + if (err.has_raft_entry_too_large()) { + throw Exception("entry too large"); } + cache -> dropRegion(rpc_ctx -> region); } - void onSendFail(Backoffer & ,const Exception & e) { - e.rethrow(); + void onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx) { + std::cout<<"send failed!!\n"; + cache->dropStoreOnSendReqFail(rpc_ctx, e); + bo.backoff(boTiKVRPC, e); } }; diff --git a/contrib/client-c/include/tikv/Rpc.h b/contrib/client-c/include/tikv/Rpc.h index dc1f9ad17cf..72362ac6b6b 100644 --- a/contrib/client-c/include/tikv/Rpc.h +++ b/contrib/client-c/include/tikv/Rpc.h @@ -78,9 +78,12 @@ class RpcCall { void call(std::unique_ptr stub) { if constexpr(std::is_same::value) { grpc::ClientContext context; + context.set_deadline(std::chrono::system_clock::now() + std::chrono::seconds(1)); auto status = stub->ReadIndex(&context, *req, resp); if (!status.ok()) { - log -> error("read index failed: " + std::to_string(status.error_code()) + ": " + status.error_message()); + std::string err_msg = ("read index failed: " + std::to_string(status.error_code()) + ": " + status.error_message()); + log->error(err_msg); + throw Exception(err_msg, GRPCErrorCode); } } } diff --git a/contrib/client-c/src/CMakeLists.txt b/contrib/client-c/src/CMakeLists.txt index 004fa5fd4a2..948558ae324 100644 --- a/contrib/client-c/src/CMakeLists.txt +++ b/contrib/client-c/src/CMakeLists.txt @@ -1,5 +1,3 @@ -add_subdirectory (pd) - set(kvClient_sources) list(APPEND kvClient_sources pd/Client.cc) @@ -13,3 +11,5 @@ set(kvClient_INCLUDE_DIR ${kvClient_SOURCE_DIR}/include) add_library(kv_client ${kvClient_sources}) target_include_directories(kv_client PUBLIC ${KVPROTO_INCLUDE_DIR} ${kvClient_INCLUDE_DIR}) target_link_libraries(kv_client kvproto ${Poco_Foundation_LIBRARY}) + +add_subdirectory (test) diff --git a/contrib/client-c/src/pd/Client.cc b/contrib/client-c/src/pd/Client.cc index 4496c1fd4df..ad95bccd798 100644 --- a/contrib/client-c/src/pd/Client.cc +++ b/contrib/client-c/src/pd/Client.cc @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -76,7 +77,9 @@ pdpb::GetMembersResponse Client::getMembers(std::string url) auto status = pdpb::PD::NewStub(cc)->GetMembers(&context, pdpb::GetMembersRequest{}, &resp); if (!status.ok()) { - log->error("get member failed: " + std::to_string(status.error_code()) + ": " + status.error_message()); + std::string err_msg = "get member failed: " + std::to_string(status.error_code()) + ": " + status.error_message(); + log->error(err_msg); + throw Exception(err_msg, GRPCErrorCode); } return resp; } @@ -100,7 +103,7 @@ void Client::initClusterID() { }; std::this_thread::sleep_for(std::chrono::seconds(1)); } - throw "failed to init cluster id"; + throw Exception("failed to init cluster id", InitClusterIDFailed); } void Client::updateLeader() { @@ -115,7 +118,7 @@ void Client::updateLeader() { switchLeader(resp.leader().client_urls()); return; } - throw "failed to update leader"; + throw Exception("failed to update leader", UpdatePDLeaderFailed); } void Client::switchLeader(const ::google::protobuf::RepeatedPtrField& leader_urls) { @@ -162,8 +165,8 @@ void Client::leaderLoop() { try { check_leader = false; updateLeader(); - } catch (...) { - log->error("update leader failed."); + } catch (Exception & e) { + log->error(e.displayText()); } } } @@ -186,7 +189,9 @@ uint64_t Client::getGCSafePoint() { auto status = leaderStub()->GetGCSafePoint(&context, request, &response); if (!status.ok()) { - log->error("get safe point failed: " + std::to_string(status.error_code()) + ": " + status.error_message()); + std::string err_msg = "get safe point failed: " + std::to_string(status.error_code()) + ": " + status.error_message(); + log->error(err_msg); + throw Exception(err_msg, GRPCErrorCode); } return response.safe_point(); } @@ -204,7 +209,9 @@ std::tuple Client::getRegion(std::st auto status = leaderStub()->GetRegion(&context, request, &response); if (!status.ok()) { - log->error("get region failed: " + std::to_string(status.error_code()) + " : " + status.error_message()); + std::string err_msg = ("get region failed: " + std::to_string(status.error_code()) + " : " + status.error_message()); + log->error(err_msg); + throw Exception(err_msg, GRPCErrorCode); } if (response.slaves_size() == 0) { return std::make_tuple(response.region(), response.leader(), metapb::Peer::default_instance()); @@ -225,7 +232,9 @@ std::tuple Client::getRegionByID(uin auto status = leaderStub()->GetRegionByID(&context, request, &response); if (!status.ok()) { - log-> error("get region by id failed: " + std::to_string (status.error_code()) + ": " + status.error_message()); + std::string err_msg = ("get region by id failed: " + std::to_string (status.error_code()) + ": " + status.error_message()); + log->error(err_msg); + throw Exception(err_msg, GRPCErrorCode); } if (response.slaves_size() == 0) { return std::make_tuple(response.region(), response.leader(), metapb::Peer::default_instance()); @@ -246,7 +255,9 @@ metapb::Store Client::getStore(uint64_t store_id) { auto status = leaderStub()->GetStore(&context, request, &response); if (!status.ok()) { - log-> error("get store failed: " + std::to_string (status.error_code()) + ": " + status.error_message()); + std::string err_msg = ("get store failed: " + std::to_string (status.error_code()) + ": " + status.error_message()); + log->error(err_msg); + throw Exception(err_msg, GRPCErrorCode); } return response.store(); } diff --git a/contrib/client-c/src/test/CMakeLists.txt b/contrib/client-c/src/test/CMakeLists.txt new file mode 100644 index 00000000000..4b0e8553b52 --- /dev/null +++ b/contrib/client-c/src/test/CMakeLists.txt @@ -0,0 +1,43 @@ +add_executable(pd_test pd_test.cc) + +target_include_directories(pd_test PUBLIC ${KVPROTO_INCLUDE_DIR} ${kvClient_INCLUDE_DIR}) + +target_link_libraries(pd_test kvproto + kv_client + ${Protobuf_LIBRARY} + gRPC::grpc++_unsecure) + +add_executable(read_index_test read_index_test.cc) +target_include_directories(read_index_test PUBLIC ${KVPROTO_INCLUDE_DIR} ${kvClient_INCLUDE_DIR}) +target_link_libraries(read_index_test kvproto + kv_client + ${Protobuf_LIBRARY} + gRPC::grpc++_unsecure) + +add_executable(get_store_id_wrong_test get_store_id_wrong_test.cc) +target_include_directories(get_store_id_wrong_test PUBLIC ${KVPROTO_INCLUDE_DIR} ${kvClient_INCLUDE_DIR}) +target_link_libraries(get_store_id_wrong_test kvproto + kv_client + ${Protobuf_LIBRARY} + gRPC::grpc++_unsecure) + +add_executable(get_store_id_wrong_test2 get_store_id_wrong_2_test.cc) +target_include_directories(get_store_id_wrong_test2 PUBLIC ${KVPROTO_INCLUDE_DIR} ${kvClient_INCLUDE_DIR}) +target_link_libraries(get_store_id_wrong_test2 kvproto + kv_client + ${Protobuf_LIBRARY} + gRPC::grpc++_unsecure) + +add_executable(get_region_id_wrong_test get_region_id_wrong_test.cc) +target_include_directories(get_region_id_wrong_test PUBLIC ${KVPROTO_INCLUDE_DIR} ${kvClient_INCLUDE_DIR}) +target_link_libraries(get_region_id_wrong_test kvproto + kv_client + ${Protobuf_LIBRARY} + gRPC::grpc++_unsecure) + +add_executable(grpc_error_test grpc_error_test.cc) +target_include_directories(grpc_error_test PUBLIC ${KVPROTO_INCLUDE_DIR} ${kvClient_INCLUDE_DIR}) +target_link_libraries(grpc_error_test kvproto + kv_client + ${Protobuf_LIBRARY} + gRPC::grpc++_unsecure) diff --git a/contrib/client-c/src/test/MockPDServer.h b/contrib/client-c/src/test/MockPDServer.h new file mode 100644 index 00000000000..22b1e2e002e --- /dev/null +++ b/contrib/client-c/src/test/MockPDServer.h @@ -0,0 +1,191 @@ +#include +#include +#include +#include + +#include "MockTiKV.h" + +namespace pingcap{ +namespace test { + +class PDService final : public ::pdpb::PD::Service { +public: + PDService(std::vector addrs_){ + addrs = addrsToUrls(addrs_); + leader = addrs[0]; + } + + ::grpc::Status GetMembers(::grpc::ServerContext* context, const ::pdpb::GetMembersRequest* request, ::pdpb::GetMembersResponse* response) override + { + pdpb::Member * leader_pb = new pdpb::Member(); + setMember(leader, leader_pb); + response->set_allocated_leader(leader_pb); + pdpb::Member * etcd_leader_pb = new pdpb::Member(); + setMember(leader, etcd_leader_pb); + response->set_allocated_etcd_leader(etcd_leader_pb); + for (size_t i = 0; i < addrs.size(); i++) { + pdpb::Member * member = response -> add_members(); + setMember(addrs[i], member); + } + pdpb::ResponseHeader * header = new pdpb::ResponseHeader(); + setHeader(header); + response -> set_allocated_header(header); + return ::grpc::Status::OK; + } + + ::grpc::Status GetGCSafePoint(::grpc::ServerContext* context, const ::pdpb::GetGCSafePointRequest* request, ::pdpb::GetGCSafePointResponse* response) override + { + pdpb::ResponseHeader * header = new pdpb::ResponseHeader(); + setHeader(header); + response -> set_allocated_header(header); + response -> set_safe_point(gc_point); + return ::grpc::Status::OK; + } + + ::grpc::Status GetStore(::grpc::ServerContext* context, const ::pdpb::GetStoreRequest* request, ::pdpb::GetStoreResponse* response) override + { + pdpb::ResponseHeader * header = new pdpb::ResponseHeader(); + setHeader(header); + auto it = stores.find(request->store_id()); + response -> set_allocated_header(header); + if (it != stores.end()) { + auto store = new ::metapb::Store(); + store -> set_address(it -> second -> getStoreUrl()); + store -> set_id(it -> second -> store_id); + response -> set_allocated_store(store); + } + if (statuses.empty()) + return ::grpc::Status::OK; + auto ret = statuses.front(); + statuses.pop(); + return ret; + } + + void registerStoreAddr(uint64_t store_id, std::string addr) + { + stores[store_id] -> registerAddr(addr); + } + + ::grpc::Status GetRegionByID(::grpc::ServerContext* context, const ::pdpb::GetRegionByIDRequest* request, ::pdpb::GetRegionResponse* response) override + { + pdpb::ResponseHeader * header = new pdpb::ResponseHeader(); + setHeader(header); + auto it = regions.find(request->region_id()); + response -> set_allocated_header(header); + if (it != regions.end()) { + auto region = new ::metapb::Region (it -> second->meta); + response -> set_allocated_region(region); + auto leader = new ::metapb::Peer (it -> second->peer); + response -> set_allocated_leader(leader); + auto slave = response -> add_slaves(); + *slave = it->second->learner; + } + if (statuses.empty()) + return ::grpc::Status::OK; + auto ret = statuses.front(); + statuses.pop(); + return ret; + } + + void setGCPoint(uint64_t gc_point_) { + gc_point = gc_point_; + } + + void addStore() { + std::string addr = ("127.0.0.1:" + std::to_string(6000+cur_store_id)); + Store* store = new Store(addr, cur_store_id); + stores[cur_store_id] = store; + store -> aynsc_run(); + cur_store_id ++; + } + + void addRegion(::metapb::Region region, uint64_t leader_id, uint64_t learner_id) { + ::metapb::Peer *learner, * leader; + int i = 0; + for (auto it = stores.begin(); it != stores.end(); it++, i ++) { + Store * store = it -> second; + ::metapb::Peer* peer = region.add_peers(); + peer -> set_id(i); + peer -> set_store_id(store->store_id); + if (store -> store_id == learner_id) { + learner = peer; + } + if (store -> store_id == leader_id) { + leader = peer; + } + } + kv::RegionPtr region_ptr = std::make_shared(region, *leader, *learner); + for (auto it: stores) { + it.second -> addRegion(region_ptr); + } + regions[region_ptr->meta.id()] = (region_ptr); + } + std::map stores; + + void registerGRPCStatus(::grpc::Status status_) { + statuses.push(status_); + } + +private: + std::vector addrsToUrls(std::vector addrs) { + std::vector urls; + for (const std::string & addr: addrs) { + if (addr.find("://") == std::string::npos) { + urls.push_back("http://" + addr); + } else { + urls.push_back(addr); + } + } + return urls; + } + + std::queue<::grpc::Status> statuses; + + std::string leader; + std::vector addrs; + std::map regions; + uint64_t gc_point; + + void setMember(const std::string & addr, pdpb::Member* member) { + member->set_name(addr); + member->add_peer_urls(addr); + member->add_client_urls(addr); + member->set_leader_priority(1); + } + + void setHeader(pdpb::ResponseHeader * header) { + header->set_cluster_id(0); + } + + int cur_store_id = 0; +}; + +struct PDServerHandler { + + std::vector addrs; + PDService * service; + + PDServerHandler(std::vector addrs_) : addrs(addrs_) {} + + void startServer() { + grpc::ServerBuilder builder; + for (auto addr : addrs) { + builder.AddListeningPort(addr, grpc::InsecureServerCredentials()); + } + builder.RegisterService(service); + auto server = builder.BuildAndStart(); + server->Wait(); + } + + PDService * RunPDServer() + { + service = new PDService(addrs); + std::thread pd_server_thread(&PDServerHandler::startServer, this); + pd_server_thread.detach(); + return service; + } + +}; + +} +} diff --git a/contrib/client-c/src/test/MockTiKV.h b/contrib/client-c/src/test/MockTiKV.h new file mode 100644 index 00000000000..6b982a4353d --- /dev/null +++ b/contrib/client-c/src/test/MockTiKV.h @@ -0,0 +1,156 @@ +#include + +#include +#include +#include +#include +#include + +namespace pingcap { +namespace test { + +class Store final : public ::tikvpb::Tikv::Service { +public: + Store(std::string addr_, int store_id_) : store_addr (addr_), store_id(store_id_) { + } + + void addRegion(kv::RegionPtr region) { + regions[region->verID().id] = region; + } + + ::grpc::Status ReadIndex(::grpc::ServerContext* context, const ::kvrpcpb::ReadIndexRequest* request, ::kvrpcpb::ReadIndexResponse* response) override + { + ::errorpb::Error* error_pb = checkContext(request->context()); + if (error_pb != NULL) { + response->set_allocated_region_error(error_pb); + } else { + response->set_read_index(read_idx); + } + return ::grpc::Status::OK; + } + + void setReadIndex(uint64_t idx_) { + read_idx = idx_; + } + + void aynsc_run() { + std::thread server_thread(&Store::start_server, this); + server_thread.detach(); + } + + uint64_t store_id; + + uint64_t getStoreID() { + } + + std::string getStoreUrl() { + if (addrs.empty()) + return store_addr; + std::string addr = addrs.front(); + addrs.pop(); + return addr; + } + + void registerAddr(std::string addr) { + addrs.push(addr); + } + + void registerStoreId(uint64_t store_id_) { + store_ids.push(store_id_); + } + + uint64_t getStoreId() { + if (store_ids.empty()) { + return store_id; + } + uint64_t store_id_ = store_ids.front(); + store_ids.pop(); + return store_id_; + } + + bool inject_region_not_found; + +private: + std::string store_addr; + + std::queue addrs; + + std::queue store_ids; + + std::string addrToUrl(std::string addr) { + if (addr.find("://") == std::string::npos) { + return ("http://" + addr); + } else { + return addr; + } + } + + std::map regions; + + int read_idx; + + void start_server() { + grpc::ServerBuilder builder; + builder.AddListeningPort(store_addr, grpc::InsecureServerCredentials()); + builder.RegisterService(this); + auto server = builder.BuildAndStart(); + server->Wait(); + } + + + ::errorpb::Error* checkContext(const ::kvrpcpb::Context & ctx) { + uint64_t store_id_ = ctx.peer().id(); + ::errorpb::Error* err = new ::errorpb::Error(); + if (store_id_ != getStoreId()) { + ::errorpb::StoreNotMatch* store_not_match = new ::errorpb::StoreNotMatch(); + err -> set_allocated_store_not_match(store_not_match); + return err; + } + + uint64_t region_id = ctx.region_id(); + auto it = regions.find(region_id); + if (it == regions.end() || inject_region_not_found) { + inject_region_not_found = false; + ::errorpb::RegionNotFound * region_not_found = new ::errorpb::RegionNotFound(); + region_not_found -> set_region_id(region_id); + err -> set_allocated_region_not_found(region_not_found); + return err; + } + // + //bool found = false + //for (auto addr : it->addrs) { + // if (addr == store_addr) + // found = true; + //} + //if (!found) { + // RegionNotFound * region_not_found = new RegionNotFound(); + // region_not_found -> set_region_id(region_id); + // err -> set_allocated_region_not_found(store_not_match); + // return err; + //} + + if (it->second->verID().confVer != ctx.region_epoch().conf_ver() || it->second->verID().ver != ctx.region_epoch().version()) { + ::errorpb::StaleEpoch * stale_epoch = new ::errorpb::StaleEpoch(); + err -> set_allocated_stale_epoch(stale_epoch); + return err; + } + + return nullptr; + + } +}; + +inline ::metapb::Region generateRegion(const kv::RegionVerID & ver_id, std::string start, std::string end) { + ::metapb::Region region; + region.set_id(ver_id.id); + region.set_start_key(start); + region.set_end_key(end); + ::metapb::RegionEpoch * region_epoch = new ::metapb::RegionEpoch(); + region_epoch -> set_conf_ver(ver_id.confVer); + region_epoch -> set_version(ver_id.ver); + region.set_allocated_region_epoch(region_epoch); + return region; +} + +} +} diff --git a/contrib/client-c/src/test/get_region_id_wrong_test.cc b/contrib/client-c/src/test/get_region_id_wrong_test.cc new file mode 100644 index 00000000000..290463a1fdd --- /dev/null +++ b/contrib/client-c/src/test/get_region_id_wrong_test.cc @@ -0,0 +1,50 @@ +#include "MockPDServer.h" +#include +#include + +namespace pingcap { +namespace test { +bool testReadIndex () { + std::vector addrs; + for (int i = 1; i <= 3; i++) + { + addrs.push_back("127.0.0.1:" + std::to_string(5000+i)); + } + + PDServerHandler handler(addrs); + + PDService * pd_server = handler.RunPDServer(); + + pd_server -> addStore(); + pd_server -> addStore(); + pd_server -> addStore(); + kv::RegionVerID verID(1,3,0); + ::metapb::Region region = generateRegion(verID, "a", "b"); + pd_server -> addRegion(region, 0, 1); + pd_server -> stores[1] -> setReadIndex(5); + pd_server -> stores[1] -> inject_region_not_found = true; + + ::sleep(1); + + pd::ClientPtr clt = std::make_shared(addrs); + kv::RegionCachePtr cache = std::make_shared(clt); + kv::RpcClientPtr rpc = std::make_shared(); + kv::RegionClient client(cache, rpc, verID); + int idx = client.getReadIndex(); + if (idx != 5) { + std::cout<<"wrong idx: "< +#include +#include + +namespace pingcap { +namespace test { +bool testReadIndex () { + std::vector addrs; + for (int i = 1; i <= 3; i++) + { + addrs.push_back("127.0.0.1:" + std::to_string(5000+i)); + } + + PDServerHandler handler(addrs); + + PDService * pd_server = handler.RunPDServer(); + + pd_server -> addStore(); + pd_server -> addStore(); + pd_server -> addStore(); + kv::RegionVerID verID(1,3,0); + ::metapb::Region region = generateRegion(verID, "a", "b"); + pd_server -> addRegion(region, 0, 1); + pd_server -> stores[1] -> setReadIndex(5); + pd_server -> registerStoreAddr(1, "127.0.0.1:7000"); + pd_server -> stores[1] -> registerStoreId(101); + pd_server -> stores[1] -> registerStoreId(102); + + ::sleep(1); + + pd::ClientPtr clt = std::make_shared(addrs); + kv::RegionCachePtr cache = std::make_shared(clt); + kv::RpcClientPtr rpc = std::make_shared(); + kv::RegionClient client(cache, rpc, verID); + if( 5 != client.getReadIndex()) { + return false; + } + std::cout<<"success!!!!!!!!!!!\n"; + return true; +} +} +} + +int main(int argv, char ** args) +{ + if (!pingcap::test::testReadIndex()) { + throw "get gc point wrong !"; + } + return 0; +} diff --git a/contrib/client-c/src/test/get_store_id_wrong_test.cc b/contrib/client-c/src/test/get_store_id_wrong_test.cc new file mode 100644 index 00000000000..10dd8e4820d --- /dev/null +++ b/contrib/client-c/src/test/get_store_id_wrong_test.cc @@ -0,0 +1,51 @@ +#include "MockPDServer.h" +#include +#include + +namespace pingcap { +namespace test { +bool testReadIndex () { + std::vector addrs; + for (int i = 1; i <= 3; i++) + { + addrs.push_back("127.0.0.1:" + std::to_string(5000+i)); + } + + PDServerHandler handler(addrs); + + PDService * pd_server = handler.RunPDServer(); + + pd_server -> addStore(); + pd_server -> addStore(); + pd_server -> addStore(); + kv::RegionVerID verID(1,3,0); + ::metapb::Region region = generateRegion(verID, "a", "b"); + pd_server -> addRegion(region, 0, 1); + pd_server -> stores[1] -> setReadIndex(5); + pd_server -> registerStoreAddr(1, "127.0.0.1:7000"); + pd_server -> registerStoreAddr(1, "127.0.0.1:6500"); + + ::sleep(1); + + pd::ClientPtr clt = std::make_shared(addrs); + kv::RegionCachePtr cache = std::make_shared(clt); + kv::RpcClientPtr rpc = std::make_shared(); + kv::RegionClient client(cache, rpc, verID); + int idx = client.getReadIndex(); + if (idx != 5) { + std::cout<<"wrong idx: "< +#include + +namespace pingcap { +namespace test { +bool testReadIndex () { + std::vector addrs; + for (int i = 1; i <= 3; i++) + { + addrs.push_back("127.0.0.1:" + std::to_string(5000+i)); + } + + PDServerHandler handler(addrs); + + PDService * pd_server = handler.RunPDServer(); + + pd_server -> addStore(); + pd_server -> addStore(); + pd_server -> addStore(); + kv::RegionVerID verID(1,3,0); + ::metapb::Region region = generateRegion(verID, "a", "b"); + pd_server -> addRegion(region, 0, 1); + pd_server -> stores[1] -> setReadIndex(5); + pd_server -> registerGRPCStatus(grpc::Status::CANCELLED); + pd_server -> registerGRPCStatus(grpc::Status::CANCELLED); + pd_server -> registerGRPCStatus(grpc::Status::CANCELLED); + + ::sleep(1); + + pd::ClientPtr clt = std::make_shared(addrs); + kv::RegionCachePtr cache = std::make_shared(clt); + kv::RpcClientPtr rpc = std::make_shared(); + kv::RegionClient client(cache, rpc, verID); + int idx = client.getReadIndex(); + if (idx != 5) { + std::cout<<"wrong idx: "< + +#include +#include +#include "MockPDServer.h" + +namespace pingcap { +namespace test{ + +bool testPDGetGCSafePoint() { + std::vector addrs; + for (int i = 1; i <= 3; i++) + { + addrs.push_back("127.0.0.1:" + std::to_string(5000+i)); + } + + PDServerHandler handler(addrs); + + PDService * pd_server = handler.RunPDServer(); + + ::sleep(1); + + pd_server -> setGCPoint(233); + + pd::Client clt(addrs); + auto safe = clt.getGCSafePoint(); + + if (safe != 233) { + return false; + } + std::cout<<"success!!!!!\n"; + return true; +} + +} +} + +int main(int argv, char** args) +{ + if (!pingcap::test::testPDGetGCSafePoint()) { + throw "get gc point wrong !"; + } + + return 0; +} + diff --git a/contrib/client-c/src/test/read_index_test.cc b/contrib/client-c/src/test/read_index_test.cc new file mode 100644 index 00000000000..fdcb24c0ea3 --- /dev/null +++ b/contrib/client-c/src/test/read_index_test.cc @@ -0,0 +1,47 @@ +#include "MockPDServer.h" +#include +#include + +namespace pingcap { +namespace test { +bool testReadIndex () { + std::vector addrs; + for (int i = 1; i <= 3; i++) + { + addrs.push_back("127.0.0.1:" + std::to_string(5000+i)); + } + + PDServerHandler handler(addrs); + + PDService * pd_server = handler.RunPDServer(); + + pd_server -> addStore(); + pd_server -> addStore(); + pd_server -> addStore(); + kv::RegionVerID verID(1,3,0); + ::metapb::Region region = generateRegion(verID, "a", "b"); + pd_server -> addRegion(region, 0, 1); + pd_server -> stores[1] -> setReadIndex(5); + + ::sleep(1); + pd::ClientPtr clt = std::make_shared(addrs); + kv::RegionCachePtr cache = std::make_shared(clt); + kv::RpcClientPtr rpc = std::make_shared(); + kv::RegionClient client(cache, rpc, verID); + int idx = client.getReadIndex(); + if (idx != 5) { + return false; + } + std::cout<<"success!!!!!\n"; + return true; +} +} +} + +int main(int argv, char ** args) +{ + if (!pingcap::test::testReadIndex()) { + throw "get gc point wrong !"; + } + return 0; +} diff --git a/contrib/client-c/src/tikv/Backoff.cc b/contrib/client-c/src/tikv/Backoff.cc index 56dad04db9f..0076782745e 100644 --- a/contrib/client-c/src/tikv/Backoff.cc +++ b/contrib/client-c/src/tikv/Backoff.cc @@ -1,4 +1,5 @@ #include +#include namespace pingcap { namespace kv { @@ -23,6 +24,24 @@ BackoffPtr newBackoff(BackoffType tp) { return nullptr; } +Exception Type2Exception(BackoffType tp) { + switch(tp) { + case boTiKVRPC: + return Exception("TiKV Timeout", TimeoutError); + case boTxnLock: + case boTxnLockFast: + return Exception("Resolve lock Timeout", TimeoutError); + case boPDRPC: + return Exception("PD Timeout", TimeoutError); + case boRegionMiss: + case boUpdateLeader: + return Exception("Region Unavaliable", RegionUnavailable); + case boServerBusy: + return Exception("TiKV Server Busy", TimeoutError); + } + return Exception("Unknown Exception, tp is :" + std::to_string(tp)); +} + void Backoffer::backoff(BackoffType tp, const Exception & exc) { if (exc.code() == MismatchClusterIDCode) { exc.rethrow(); @@ -32,12 +51,13 @@ void Backoffer::backoff(BackoffType tp, const Exception & exc) { auto it = backoff_map.find(tp); if (it != backoff_map.end()) { bo = it -> second; + } else { + bo = newBackoff(tp); + backoff_map[tp] = bo; } - bo = newBackoff(tp); - backoff_map[tp] = bo; total_sleep += bo->sleep(); if (max_sleep > 0 && total_sleep > max_sleep) { - throw Exception("total sleep time exceeded\n"); + throw Type2Exception(tp); } } diff --git a/contrib/client-c/src/tikv/Region.cc b/contrib/client-c/src/tikv/Region.cc index 475dff3e44c..4766d9f863c 100644 --- a/contrib/client-c/src/tikv/Region.cc +++ b/contrib/client-c/src/tikv/Region.cc @@ -1,5 +1,5 @@ #include -#include +#include namespace pingcap { namespace kv { @@ -19,7 +19,7 @@ RPCContextPtr RegionCache::getRPCContext(Backoffer & bo, const RegionVerID & id, } std::string addr = getStoreAddr(bo, peer.store_id()); if (addr == "") { - //dropRegion(id); + dropRegion(id); return NULL; } return std::make_shared(id, meta, peer, addr); @@ -135,5 +135,56 @@ void RegionCache::insertRegionToCache(RegionPtr region) { regions[region->verID()] = region; } +void RegionCache::dropRegion(const RegionVerID & region_id) { + std::lock_guard lock(region_mutex); + if(regions.erase(region_id)) { + log->information("drop region because of send failure"); + } +} + +void RegionCache::dropStore(uint64_t failed_store_id) { + std::lock_guard lock(store_mutex); + std::cout<<"try to drop store: "<information("drop store " + std::to_string(failed_store_id) + " because of send failure"); + } +} + +void RegionCache::dropStoreOnSendReqFail(RPCContextPtr & ctx, const Exception & exc) { + const auto & failed_region_id = ctx->region; + uint64_t failed_store_id = ctx->peer.store_id(); + dropRegion(failed_region_id); + dropStore(failed_store_id); +} + +void RegionCache::updateLeader(Backoffer & bo, const RegionVerID & region_id, uint64_t leader_store_id) { + auto region = getCachedRegion(bo, region_id); + if (!region -> switchPeer(leader_store_id)) { + dropRegion(region_id); + } + +} + +void RegionCache::onRegionStale(RPCContextPtr ctx, const errorpb::StaleEpoch & stale_epoch) { + + dropRegion(ctx->region); + + std::lock_guard lock(region_mutex); + for (int i = 0; i < stale_epoch.new_regions_size(); i++) { + auto & meta = stale_epoch.new_regions(i); + RegionPtr region = std::make_shared(meta, meta.peers(0)); + region->switchPeer(ctx->peer.store_id()); + for (int i = 0; i < meta.peers_size(); i++) { + auto peer = meta.peers(i); + if (peer.is_learner()) { + region->learner = peer; + break; + } + } + insertRegionToCache(region); + } +} + } } diff --git a/contrib/client-c/src/tikv/Rpc.cc b/contrib/client-c/src/tikv/Rpc.cc index a380d658ebd..a8e06cd3fcf 100644 --- a/contrib/client-c/src/tikv/Rpc.cc +++ b/contrib/client-c/src/tikv/Rpc.cc @@ -5,6 +5,7 @@ namespace kv { ConnArray::ConnArray(size_t max_size, std::string addr) : index(0) { vec.resize(max_size); + std::cout<<"create addr: "< Date: Mon, 25 Mar 2019 21:17:39 +0800 Subject: [PATCH 2/2] remove useless logs --- contrib/client-c/include/tikv/Exception.h | 28 ------- contrib/client-c/include/tikv/RegionClient.h | 2 - contrib/client-c/include/tikv/Rpc.h | 2 +- contrib/client-c/src/pd/CMakeLists.txt | 1 - contrib/client-c/src/pd/mock/CMakeLists.txt | 14 ---- contrib/client-c/src/pd/mock/MockPDServer.h | 80 ------------------- contrib/client-c/src/pd/mock/mock_client.cc | 23 ------ contrib/client-c/src/pd/mock/mock_server.cc | 13 --- .../src/test/get_region_id_wrong_test.cc | 2 - .../src/test/get_store_id_wrong_2_test.cc | 1 - .../src/test/get_store_id_wrong_test.cc | 2 - contrib/client-c/src/test/grpc_error_test.cc | 2 - contrib/client-c/src/test/pd_test.cc | 1 - contrib/client-c/src/test/read_index_test.cc | 1 - contrib/client-c/src/tikv/Region.cc | 2 - contrib/client-c/src/tikv/Rpc.cc | 1 - 16 files changed, 1 insertion(+), 174 deletions(-) delete mode 100644 contrib/client-c/include/tikv/Exception.h delete mode 100644 contrib/client-c/src/pd/CMakeLists.txt delete mode 100644 contrib/client-c/src/pd/mock/CMakeLists.txt delete mode 100644 contrib/client-c/src/pd/mock/MockPDServer.h delete mode 100644 contrib/client-c/src/pd/mock/mock_client.cc delete mode 100644 contrib/client-c/src/pd/mock/mock_server.cc diff --git a/contrib/client-c/include/tikv/Exception.h b/contrib/client-c/include/tikv/Exception.h deleted file mode 100644 index 62ad530cb63..00000000000 --- a/contrib/client-c/include/tikv/Exception.h +++ /dev/null @@ -1,28 +0,0 @@ -#pragma once - -#include -#include -#include - -namespace pingcap { -namespace kv { - -const int MismatchClusterIDCode = 1; - -class Exception : public Poco::Exception -{ -public: - Exception() {} /// For deferred initialization. - Exception(const std::string & msg, int code = 0) : Poco::Exception(msg, code) {} - Exception(const std::string & msg, const std::string & arg, int code = 0) : Poco::Exception(msg, arg, code) {} - Exception(const std::string & msg, const Exception & exc, int code = 0) : Poco::Exception(msg, exc, code) {} - explicit Exception(const Poco::Exception & exc) : Poco::Exception(exc.displayText()) {} - - Exception * clone() const override { return new Exception(*this); } - void rethrow() const override { throw *this; } - -}; - - -} -} diff --git a/contrib/client-c/include/tikv/RegionClient.h b/contrib/client-c/include/tikv/RegionClient.h index fe185fecddf..d75f9d4b69a 100644 --- a/contrib/client-c/include/tikv/RegionClient.h +++ b/contrib/client-c/include/tikv/RegionClient.h @@ -28,7 +28,6 @@ struct RegionClient { for (;;) { auto ctx = cache -> getRPCContext(bo, region_id, learner); store_addr = ctx->addr; - std::cout<<"store_addr "<< store_addr < setCtx(ctx); try { client -> sendRequest(store_addr, rpc); @@ -85,7 +84,6 @@ struct RegionClient { } void onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx) { - std::cout<<"send failed!!\n"; cache->dropStoreOnSendReqFail(rpc_ctx, e); bo.backoff(boTiKVRPC, e); } diff --git a/contrib/client-c/include/tikv/Rpc.h b/contrib/client-c/include/tikv/Rpc.h index 72362ac6b6b..fadfca1ce46 100644 --- a/contrib/client-c/include/tikv/Rpc.h +++ b/contrib/client-c/include/tikv/Rpc.h @@ -78,7 +78,7 @@ class RpcCall { void call(std::unique_ptr stub) { if constexpr(std::is_same::value) { grpc::ClientContext context; - context.set_deadline(std::chrono::system_clock::now() + std::chrono::seconds(1)); + context.set_deadline(std::chrono::system_clock::now() + std::chrono::seconds(3)); auto status = stub->ReadIndex(&context, *req, resp); if (!status.ok()) { std::string err_msg = ("read index failed: " + std::to_string(status.error_code()) + ": " + status.error_message()); diff --git a/contrib/client-c/src/pd/CMakeLists.txt b/contrib/client-c/src/pd/CMakeLists.txt deleted file mode 100644 index f7841d32620..00000000000 --- a/contrib/client-c/src/pd/CMakeLists.txt +++ /dev/null @@ -1 +0,0 @@ -add_subdirectory(mock) diff --git a/contrib/client-c/src/pd/mock/CMakeLists.txt b/contrib/client-c/src/pd/mock/CMakeLists.txt deleted file mode 100644 index 8c3923f24c6..00000000000 --- a/contrib/client-c/src/pd/mock/CMakeLists.txt +++ /dev/null @@ -1,14 +0,0 @@ -add_executable(mock_server mock_server.cc) - -target_include_directories(mock_server PUBLIC ${KVPROTO_INCLUDE_DIR}) -target_link_libraries(mock_server kvproto - ${Protobuf_LIBRARY} - gRPC::grpc++_unsecure) - -add_executable(mock_client mock_client.cc) -target_include_directories(mock_client PUBLIC ${KVPROTO_INCLUDE_DIR}) -target_link_libraries(mock_client kvproto - kv_client - ${Protobuf_LIBRARY} - gRPC::grpc++_unsecure) - diff --git a/contrib/client-c/src/pd/mock/MockPDServer.h b/contrib/client-c/src/pd/mock/MockPDServer.h deleted file mode 100644 index f79e2bd4844..00000000000 --- a/contrib/client-c/src/pd/mock/MockPDServer.h +++ /dev/null @@ -1,80 +0,0 @@ -#include -#include -#include -#include - -namespace pingcap{ -namespace pd { -namespace mock { - -class PDService final : public pdpb::PD::Service { -public: - // first one is leader ? - PDService(std::vector addrs_):addrs(addrs_), leader(addrs_[0]), gc_point(11) { - } - - ::grpc::Status GetMembers(::grpc::ServerContext* context, const ::pdpb::GetMembersRequest* request, ::pdpb::GetMembersResponse* response) override - { - pdpb::Member * leader_pb = new pdpb::Member(); - setMember(leader, leader_pb); - response->set_allocated_leader(leader_pb); - pdpb::Member * etcd_leader_pb = new pdpb::Member(); - setMember(leader, etcd_leader_pb); - response->set_allocated_etcd_leader(etcd_leader_pb); - for (size_t i = 0; i < addrs.size(); i++) { - pdpb::Member * member = response -> add_members(); - setMember(addrs[i], member); - } - pdpb::ResponseHeader * header = new pdpb::ResponseHeader(); - setHeader(header); - response -> set_allocated_header(header); - return ::grpc::Status::OK; - } - - ::grpc::Status GetGCSafePoint(::grpc::ServerContext* context, const ::pdpb::GetGCSafePointRequest* request, ::pdpb::GetGCSafePointResponse* response) override - { - pdpb::ResponseHeader * header = new pdpb::ResponseHeader(); - setHeader(header); - response -> set_allocated_header(header); - response -> set_safe_point(gc_point); - return ::grpc::Status::OK; - } - - void setGCPoint(uint64_t gc_point_) { - gc_point = gc_point_; - } - - -private: - std::string leader; - std::vector addrs; - uint64_t gc_point; - - void setMember(const std::string & addr, pdpb::Member* member) { - member->set_name(addr); - member->add_peer_urls(addr); - member->add_client_urls(addr); - member->set_leader_priority(1); - } - - void setHeader(pdpb::ResponseHeader * header) { - header->set_cluster_id(0); - } -}; - -inline void RunPDServer(std::vector addrs) -{ - PDService service(addrs); - - grpc::ServerBuilder builder; - for (auto addr : addrs) { - builder.AddListeningPort(addr, grpc::InsecureServerCredentials()); - } - builder.RegisterService(&service); - auto server = builder.BuildAndStart(); - server->Wait(); -} - -} -} -} diff --git a/contrib/client-c/src/pd/mock/mock_client.cc b/contrib/client-c/src/pd/mock/mock_client.cc deleted file mode 100644 index 3e809e65886..00000000000 --- a/contrib/client-c/src/pd/mock/mock_client.cc +++ /dev/null @@ -1,23 +0,0 @@ -#include - -int main(int argv, char ** args) { - std::vector addrs; - for (int i = 1; i < argv; i++) - { - addrs.push_back(args[i]); - } - pingcap::pd::Client clt(addrs); - std::string cmd; - while(std::cin >> cmd) { - switch(cmd[0]) { - case 'g': - { - auto safe = clt.getGCSafePoint(); - break; - } - default: - throw "unknown cmd: " + cmd[0]; - } - } - return 0; -} diff --git a/contrib/client-c/src/pd/mock/mock_server.cc b/contrib/client-c/src/pd/mock/mock_server.cc deleted file mode 100644 index 6a5c0924c61..00000000000 --- a/contrib/client-c/src/pd/mock/mock_server.cc +++ /dev/null @@ -1,13 +0,0 @@ -#include -#include "MockPDServer.h" - -int main(int argv, char** args) -{ - std::vector addrs; - for (int i = 1; i < argv; i++) - { - addrs.push_back(args[i]); - } - pingcap::pd::mock::RunPDServer(addrs); - return 0; -} diff --git a/contrib/client-c/src/test/get_region_id_wrong_test.cc b/contrib/client-c/src/test/get_region_id_wrong_test.cc index 290463a1fdd..1603f543bc7 100644 --- a/contrib/client-c/src/test/get_region_id_wrong_test.cc +++ b/contrib/client-c/src/test/get_region_id_wrong_test.cc @@ -32,10 +32,8 @@ bool testReadIndex () { kv::RegionClient client(cache, rpc, verID); int idx = client.getReadIndex(); if (idx != 5) { - std::cout<<"wrong idx: "< lock(store_mutex); - std::cout<<"try to drop store: "<information("drop store " + std::to_string(failed_store_id) + " because of send failure"); } } diff --git a/contrib/client-c/src/tikv/Rpc.cc b/contrib/client-c/src/tikv/Rpc.cc index a8e06cd3fcf..a380d658ebd 100644 --- a/contrib/client-c/src/tikv/Rpc.cc +++ b/contrib/client-c/src/tikv/Rpc.cc @@ -5,7 +5,6 @@ namespace kv { ConnArray::ConnArray(size_t max_size, std::string addr) : index(0) { vec.resize(max_size); - std::cout<<"create addr: "<