Skip to content

Commit

Permalink
Support leader change in meta client && do some refactors (vesoft-inc…
Browse files Browse the repository at this point in the history
…#354)

* Support leader change in meta client && do some refactors

* Address @laura-ding's comments

* Fix remove tag schema bug
  • Loading branch information
dangleptr authored and dutor committed May 13, 2019
1 parent 36a0145 commit e67c35b
Show file tree
Hide file tree
Showing 11 changed files with 283 additions and 266 deletions.
37 changes: 15 additions & 22 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ struct EdgeItem {

struct ExecResp {
1: ErrorCode code,
// For custom kv operations, it is useless.
2: ID id,
// Valid if ret equals E_LEADER_CHANGED.
3: common.HostAddr leader,
Expand Down Expand Up @@ -154,7 +155,8 @@ struct GetTagReq {

struct GetTagResp {
1: ErrorCode code,
2: common.Schema schema,
2: common.HostAddr leader,
3: common.Schema schema,
}

struct GetEdgeReq {
Expand Down Expand Up @@ -235,18 +237,15 @@ struct MultiPutReq {
2: list<Pair> pairs,
}

struct MultiPutResp {
1: ErrorCode code,
}

struct GetReq {
1: string segment,
2: string key,
}

struct GetResp {
1: ErrorCode code,
2: string value,
2: common.HostAddr leader,
3: string value,
}

struct MultiGetReq {
Expand All @@ -256,28 +255,21 @@ struct MultiGetReq {

struct MultiGetResp {
1: ErrorCode code,
2: list<string> values,
2: common.HostAddr leader,
3: list<string> values,
}

struct RemoveReq {
1: string segment,
2: string key,
}

struct RemoveResp {
1: ErrorCode code,
}

struct RemoveRangeReq {
1: string segment,
2: string start,
3: string end,
}

struct RemoveRangeResp {
1: ErrorCode code,
}

struct ScanReq {
1: string segment,
2: string start,
Expand All @@ -286,7 +278,8 @@ struct ScanReq {

struct ScanResp {
1: ErrorCode code,
2: list<string> values,
2: common.HostAddr leader,
3: list<string> values,
}

struct HBResp {
Expand Down Expand Up @@ -321,12 +314,12 @@ service MetaService {

GetPartsAllocResp getPartsAlloc(1: GetPartsAllocReq req);

MultiPutResp multiPut(1: MultiPutReq req);
GetResp get(1: GetReq req);
MultiGetResp multiGet(1: MultiGetReq req);
RemoveResp remove(1: RemoveReq req);
RemoveRangeResp removeRange(1: RemoveRangeReq req);
ScanResp scan(1: ScanReq req);
ExecResp multiPut(1: MultiPutReq req);
GetResp get(1: GetReq req);
MultiGetResp multiGet(1: MultiGetReq req);
ExecResp remove(1: RemoveReq req);
ExecResp removeRange(1: RemoveRangeReq req);
ScanResp scan(1: ScanReq req);

HBResp heartBeat(1: HBReq req);
}
Expand Down
28 changes: 20 additions & 8 deletions src/kvstore/PartManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,23 +76,35 @@ bool MetaServerBasedPartManager::spaceExist(const HostAddr& host,
}

void MetaServerBasedPartManager::onSpaceAdded(GraphSpaceID spaceId) {
CHECK_NOTNULL(handler_);
handler_->addSpace(spaceId);
if (handler_ != nullptr) {
handler_->addSpace(spaceId);
} else {
VLOG(1) << "handler_ is nullptr!";
}
}

void MetaServerBasedPartManager::onSpaceRemoved(GraphSpaceID spaceId) {
CHECK_NOTNULL(handler_);
handler_->removeSpace(spaceId);
if (handler_ != nullptr) {
handler_->removeSpace(spaceId);
} else {
VLOG(1) << "handler_ is nullptr!";
}
}

void MetaServerBasedPartManager::onPartAdded(const PartMeta& partMeta) {
CHECK_NOTNULL(handler_);
handler_->addPart(partMeta.spaceId_, partMeta.partId_);
if (handler_ != nullptr) {
handler_->addPart(partMeta.spaceId_, partMeta.partId_);
} else {
VLOG(1) << "handler_ is nullptr!";
}
}

void MetaServerBasedPartManager::onPartRemoved(GraphSpaceID spaceId, PartitionID partId) {
CHECK_NOTNULL(handler_);
handler_->removePart(spaceId, partId);
if (handler_ != nullptr) {
handler_->removePart(spaceId, partId);
} else {
VLOG(1) << "handler_ is nullptr!";
}
}

void MetaServerBasedPartManager::onPartUpdated(const PartMeta& partMeta) {
Expand Down
6 changes: 3 additions & 3 deletions src/meta/MetaServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ MetaServiceHandler::future_getPartsAlloc(const cpp2::GetPartsAllocReq& req) {
RETURN_FUTURE(processor);
}

folly::Future<cpp2::MultiPutResp>
folly::Future<cpp2::ExecResp>
MetaServiceHandler::future_multiPut(const cpp2::MultiPutReq& req) {
auto* processor = MultiPutProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
Expand All @@ -102,13 +102,13 @@ MetaServiceHandler::future_scan(const cpp2::ScanReq& req) {
RETURN_FUTURE(processor);
}

folly::Future<cpp2::RemoveResp>
folly::Future<cpp2::ExecResp>
MetaServiceHandler::future_remove(const cpp2::RemoveReq& req) {
auto* processor = RemoveProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}

folly::Future<cpp2::RemoveRangeResp>
folly::Future<cpp2::ExecResp>
MetaServiceHandler::future_removeRange(const cpp2::RemoveRangeReq& req) {
auto* processor = RemoveRangeProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
Expand Down
6 changes: 3 additions & 3 deletions src/meta/MetaServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf {
/**
* Custom kv related operations.
* */
folly::Future<cpp2::MultiPutResp>
folly::Future<cpp2::ExecResp>
future_multiPut(const cpp2::MultiPutReq& req) override;

folly::Future<cpp2::GetResp>
Expand All @@ -56,10 +56,10 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf {
folly::Future<cpp2::MultiGetResp>
future_multiGet(const cpp2::MultiGetReq& req) override;

folly::Future<cpp2::RemoveResp>
folly::Future<cpp2::ExecResp>
future_remove(const cpp2::RemoveReq& req) override;

folly::Future<cpp2::RemoveRangeResp>
folly::Future<cpp2::ExecResp>
future_removeRange(const cpp2::RemoveRangeReq& req) override;

folly::Future<cpp2::ScanResp>
Expand Down
4 changes: 2 additions & 2 deletions src/meta/ServerBasedSchemaManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ ServerBasedSchemaManager::getTagSchema(GraphSpaceID space, TagID tag, SchemaVer
if (ver < 0) {
ver = getNewestTagSchemaVer(space, tag);
}
auto ret = metaClient_->getTagSchemeFromCache(space, tag, ver);
auto ret = metaClient_->getTagSchemaFromCache(space, tag, ver);
if (ret.ok()) {
return ret.value();
}
Expand Down Expand Up @@ -72,7 +72,7 @@ ServerBasedSchemaManager::getEdgeSchema(GraphSpaceID space, EdgeType edge, Schem
ver = getNewestEdgeSchemaVer(space, edge);
}

auto ret = metaClient_->getEdgeSchemeFromCache(space, edge, ver);
auto ret = metaClient_->getEdgeSchemaFromCache(space, edge, ver);
if (ret.ok()) {
return ret.value();
}
Expand Down
Loading

0 comments on commit e67c35b

Please sign in to comment.