Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flash-195] update client to newest version to avoid get-learner hanging. #21

Merged
merged 2 commits into from
Mar 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions contrib/client-c/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ build
.idea
*.iml
*.swp
*.swo
tags
.clang-format

Expand Down
2 changes: 2 additions & 0 deletions contrib/client-c/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
cmake_minimum_required(VERSION 2.8)
project(kvClient)
set (CMAKE_CXX_STANDARD 11)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-narrowing")

if (NOT gRPC_FOUND)
include (cmake/find_grpc.cmake)
Expand Down
2 changes: 1 addition & 1 deletion contrib/client-c/cmake/find_kvproto.cmake
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Currently kvproto should always use bundled library.

if (NOT EXISTS "${kvClient_SOURCE_DIR}/third_party/kvproto/cpp/kvproto/errorpb.pb.h")
message (FATAL_ERROR "kvproto submodule in thrid_party/kvproto is missing.")
message (FATAL_ERROR "kvproto submodule in third_party/kvproto is missing.")
endif ()

message(STATUS "Using kvproto: ${kvClient_SOURCE_DIR}/third_party/kvproto/cpp")
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@
#include <Poco/Exception.h>

namespace pingcap {
namespace kv {

const int MismatchClusterIDCode = 1;
const int GRPCErrorCode = 2;
const int InitClusterIDFailed = 3;
const int UpdatePDLeaderFailed = 4;
const int TimeoutError = 5;
const int RegionUnavailable = 6;

class Exception : public Poco::Exception
{
Expand All @@ -23,6 +27,4 @@ class Exception : public Poco::Exception

};


}
}
39 changes: 14 additions & 25 deletions contrib/client-c/include/pd/MockPDClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,55 +3,44 @@
#include <limits>
#include <pd/IClient.h>

namespace pingcap::pd
{
namespace pingcap {
namespace pd {

class MockPDClient : public IClient
{
class MockPDClient : public IClient {
public:
MockPDClient(bool enable_gc_ = true) : enable_gc(enable_gc_) {}
MockPDClient() = default;

~MockPDClient() override {}

uint64_t getGCSafePoint() override
{
return enable_gc ? (uint64_t)curr : 0;
//std::time_t t = std::time(nullptr);
//std::tm & tm = *std::localtime(&t);
//tm.tm_sec -= 2;
//return static_cast<uint64_t>(std::mktime(&tm));
std::time_t t = std::time(nullptr);
std::tm & tm = *std::localtime(&t);
tm.tm_sec -= 2;
return static_cast<uint64_t>(std::mktime(&tm));
}

uint64_t getTS() override
{
return curr++;
//return static_cast<uint64_t>(std::time(NULL));
return static_cast<uint64_t>(std::time(NULL));
}

std::tuple<metapb::Region, metapb::Peer, metapb::Peer> getRegion(std::string) override
{
std::tuple<metapb::Region, metapb::Peer, metapb::Peer> getRegion(std::string) override {
throw "not implemented";
}

std::tuple<metapb::Region, metapb::Peer, metapb::Peer> getRegionByID(uint64_t) override
{
std::tuple<metapb::Region, metapb::Peer, metapb::Peer> getRegionByID(uint64_t) override {
throw "not implemented";
}

metapb::Store getStore(uint64_t) override
{
metapb::Store getStore(uint64_t) override {
throw "not implemented";
}

// TODO: remove this?
bool isMock() override
{
bool isMock() override {
return true;
}

private:
bool enable_gc;
std::atomic<uint64_t> curr = static_cast<uint64_t>(std::time(NULL));
};

}
}
4 changes: 3 additions & 1 deletion contrib/client-c/include/tikv/Backoff.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include <map>
#include <memory>

#include <tikv/Exception.h>
#include <common/Exception.h>

namespace pingcap {
namespace kv {
Expand Down Expand Up @@ -71,6 +71,8 @@ struct Backoff {
}
};

constexpr int readIndexMaxBackoff = 20000;

using BackoffPtr = std::shared_ptr<Backoff>;

struct Backoffer {
Expand Down
20 changes: 19 additions & 1 deletion contrib/client-c/include/tikv/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include <pd/MockPDClient.h>
#include <kvproto/metapb.pb.h>
#include <tikv/Backoff.h>
#include <common/Log.h>
#include <kvproto/errorpb.pb.h>

namespace pingcap {
namespace kv {
Expand All @@ -31,6 +33,8 @@ struct RegionVerID {
uint64_t confVer;
uint64_t ver;

RegionVerID(uint64_t id_, uint64_t conf_ver, uint64_t ver_): id(id_), confVer(conf_ver), ver(ver_){}

bool operator == (const RegionVerID & rhs) const {
return id == rhs.id && confVer == rhs.confVer && ver == rhs.ver;
}
Expand All @@ -56,6 +60,8 @@ struct Region {
metapb::Peer peer;
metapb::Peer learner;

Region(const metapb::Region & meta_, const metapb::Peer & peer_) : meta(meta_), peer(peer_), learner(metapb::Peer::default_instance()) {}

Region(const metapb::Region & meta_, const metapb::Peer & peer_, const metapb::Peer & learner_)
: meta(meta_), peer(peer_), learner(learner_) {}

Expand Down Expand Up @@ -118,12 +124,22 @@ using RPCContextPtr = std::shared_ptr<RPCContext>;

class RegionCache {
public:
RegionCache(pd::ClientPtr pdClient_) : pdClient(pdClient_) {
RegionCache(pd::ClientPtr pdClient_) : pdClient(pdClient_), log(&Logger::get("pingcap.tikv")) {
}

RPCContextPtr getRPCContext(Backoffer & bo, const RegionVerID & id, bool is_learner);

void updateLeader(Backoffer & bo, const RegionVerID & region_id, uint64_t leader_store_id);

//KeyLocation locateKey(Backoffer & bo, std::string key);
//
void dropRegion(const RegionVerID &);

void dropStore(uint64_t failed_store_id);

void dropStoreOnSendReqFail(RPCContextPtr & ctx, const Exception & exc);

void onRegionStale(RPCContextPtr ctx, const errorpb::StaleEpoch & stale_epoch);

private:
RegionPtr getCachedRegion(Backoffer & bo, const RegionVerID & id);
Expand Down Expand Up @@ -153,6 +169,8 @@ class RegionCache {
std::mutex region_mutex;

std::mutex store_mutex;

Logger * log;
};

using RegionCachePtr = std::shared_ptr<RegionCache>;
Expand Down
71 changes: 59 additions & 12 deletions contrib/client-c/include/tikv/RegionClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,75 @@ struct RegionClient {

int64_t getReadIndex() {
auto request = new kvrpcpb::ReadIndexRequest();
Backoffer bo(10000);
Backoffer bo(readIndexMaxBackoff);
auto rpc_call = std::make_shared<RpcCall<kvrpcpb::ReadIndexRequest>>(request);
auto ctx = cache -> getRPCContext(bo, region_id, true);
store_addr = ctx->addr;
sendReqToRegion(bo, rpc_call, ctx);
sendReqToRegion(bo, rpc_call, true);
return rpc_call -> getResp() -> read_index();
}

template<typename T>
void sendReqToRegion(Backoffer & bo, RpcCallPtr<T> rpc, RPCContextPtr rpc_ctx) {
try {
rpc -> setCtx(rpc_ctx);
client -> sendRequest(store_addr, rpc);
void sendReqToRegion(Backoffer & bo, RpcCallPtr<T> rpc, bool learner) {
for (;;) {
auto ctx = cache -> getRPCContext(bo, region_id, learner);
store_addr = ctx->addr;
rpc -> setCtx(ctx);
try {
client -> sendRequest(store_addr, rpc);
} catch(const Exception & e) {
onSendFail(bo, e, ctx);
continue;
}
auto resp = rpc -> getResp();
if (resp -> has_region_error()) {
onRegionError(bo, ctx, resp->region_error());
} else {
return;
}
}
catch(const Exception & e) {
onSendFail(bo, e);
}

void onRegionError(Backoffer & bo, RPCContextPtr rpc_ctx, const errorpb::Error & err) {
if (err.has_not_leader()) {
auto not_leader = err.not_leader();
if (not_leader.has_leader()) {
cache -> updateLeader(bo, rpc_ctx->region, not_leader.leader().store_id());
bo.backoff(boUpdateLeader, Exception("not leader"));
} else {
cache -> dropRegion(rpc_ctx->region);
bo.backoff(boRegionMiss, Exception("not leader"));
}
return;
}

if (err.has_store_not_match()) {
cache -> dropStore(rpc_ctx->peer.store_id());
return;
}

if (err.has_stale_epoch()) {
cache -> onRegionStale(rpc_ctx, err.stale_epoch());
return;
}

if (err.has_server_is_busy()) {
bo.backoff(boServerBusy, Exception("server busy"));
return;
}

if (err.has_stale_command()) {
return;
}

if (err.has_raft_entry_too_large()) {
throw Exception("entry too large");
}

cache -> dropRegion(rpc_ctx -> region);
}

void onSendFail(Backoffer & ,const Exception & e) {
e.rethrow();
void onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx) {
cache->dropStoreOnSendReqFail(rpc_ctx, e);
bo.backoff(boTiKVRPC, e);
}
};

Expand Down
5 changes: 4 additions & 1 deletion contrib/client-c/include/tikv/Rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,12 @@ class RpcCall {
void call(std::unique_ptr<tikvpb::Tikv::Stub> stub) {
if constexpr(std::is_same<T, kvrpcpb::ReadIndexRequest>::value) {
grpc::ClientContext context;
context.set_deadline(std::chrono::system_clock::now() + std::chrono::seconds(3));
auto status = stub->ReadIndex(&context, *req, resp);
if (!status.ok()) {
log -> error("read index failed: " + std::to_string(status.error_code()) + ": " + status.error_message());
std::string err_msg = ("read index failed: " + std::to_string(status.error_code()) + ": " + status.error_message());
log->error(err_msg);
throw Exception(err_msg, GRPCErrorCode);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions contrib/client-c/src/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
add_subdirectory (pd)

set(kvClient_sources)

list(APPEND kvClient_sources pd/Client.cc)
Expand All @@ -13,3 +11,5 @@ set(kvClient_INCLUDE_DIR ${kvClient_SOURCE_DIR}/include)
add_library(kv_client ${kvClient_sources})
target_include_directories(kv_client PUBLIC ${KVPROTO_INCLUDE_DIR} ${kvClient_INCLUDE_DIR})
target_link_libraries(kv_client kvproto ${Poco_Foundation_LIBRARY})

add_subdirectory (test)
1 change: 0 additions & 1 deletion contrib/client-c/src/pd/CMakeLists.txt

This file was deleted.

Loading