Skip to content

Commit

Permalink
Support edge in meta service (#355)
Browse files Browse the repository at this point in the history
* support edge in meta service

* support leader
  • Loading branch information
darionyaphet authored May 13, 2019
1 parent b9020cd commit d1cdf84
Show file tree
Hide file tree
Showing 25 changed files with 147 additions and 90 deletions.
2 changes: 0 additions & 2 deletions src/graph/AlterTagExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@ AlterTagExecutor::AlterTagExecutor(Sentence *sentence,
sentence_ = static_cast<AlterTagSentence*>(sentence);
}


Status AlterTagExecutor::prepare() {
return checkIfGraphSpaceChosen();
}


void AlterTagExecutor::execute() {
auto *mc = ectx()->getMetaClient();
auto *name = sentence_->name();
Expand Down
1 change: 1 addition & 0 deletions src/graph/AlterTagExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class AlterTagExecutor final : public Executor {

private:
AlterTagSentence *sentence_{nullptr};

nebula::meta::cpp2::AlterSchemaOp
getTagOpType(const AlterTagOptItem::OptionType type);
};
Expand Down
35 changes: 18 additions & 17 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,15 @@ enum ErrorCode {
// KV Failure
E_STORE_FAILURE = -31,
E_STORE_SEGMENT_ILLEGAL = -32,
E_KEY_NOT_FOUND = -33,

E_UNKNOWN = -99,
} (cpp.enum_strict)


enum AlterSchemaOp {
ADD = 0x01,
ADD = 0x01,
CHANGE = 0x02,
DROP = 0x03,
DROP = 0x03,
UNKNOWN = 0x04,
} (cpp.enum_strict)

Expand Down Expand Up @@ -159,28 +158,29 @@ struct GetTagResp {
3: common.Schema schema,
}

struct GetEdgeReq {
// Edge related operations.
struct CreateEdgeReq {
1: common.GraphSpaceID space_id,
2: common.EdgeType edge_type,
3: common.SchemaVer version,
2: string edge_name,
3: common.Schema schema,
}

struct AlterEdgeReq {
1: common.GraphSpaceID space_id,
2: string edge_name,
3: list<AlterSchemaItem> edge_items,
1: common.GraphSpaceID space_id,
2: string edge_name,
3: list<AlterSchemaItem> edge_items,
}

struct GetEdgeResp {
1: ErrorCode code,
2: common.Schema schema,
struct GetEdgeReq {
1: common.GraphSpaceID space_id,
2: common.EdgeType edge_type,
3: common.SchemaVer version,
}

// Edge related operations.
struct CreateEdgeReq {
1: common.GraphSpaceID space_id,
2: string edge_name,
3: common.Schema schema,
struct GetEdgeResp {
1: ErrorCode code,
2: common.HostAddr leader,
3: common.Schema schema,
}

struct RemoveEdgeReq {
Expand Down Expand Up @@ -304,6 +304,7 @@ service MetaService {
ListTagsResp listTags(1: ListTagsReq req);

ExecResp createEdge(1: CreateEdgeReq req);
ExecResp alterEdge(1: AlterEdgeReq req);
ExecResp removeEdge(1: RemoveEdgeReq req);
GetEdgeResp getEdge(1: GetEdgeReq req);
ListEdgesResp listEdges(1: ListEdgesReq req);
Expand Down
20 changes: 17 additions & 3 deletions src/meta/MetaServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
#include "meta/processors/AlterTagProcessor.h"
#include "meta/processors/CreateEdgeProcessor.h"
#include "meta/processors/GetTagProcessor.h"
#include "meta/processors/GetEdgeProcessor.h"
#include "meta/processors/ListTagsProcessor.h"
#include "meta/processors/ListEdgesProcessor.h"
#include "meta/processors/RemoveEdgeProcessor.h"
#include "meta/processors/MultiPutProcessor.h"
#include "meta/processors/GetProcessor.h"
#include "meta/processors/MultiGetProcessor.h"
Expand Down Expand Up @@ -144,9 +146,9 @@ MetaServiceHandler::future_listTags(const cpp2::ListTagsReq& req) {
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResp>
MetaServiceHandler::future_createEdge(const cpp2::CreateEdgeReq& req) {
auto* processor = CreateEdgeProcessor::instance(kvstore_);
folly::Future<cpp2::GetEdgeResp>
MetaServiceHandler::future_getEdge(const cpp2::GetEdgeReq& req) {
auto* processor = GetEdgeProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}

Expand All @@ -156,6 +158,18 @@ MetaServiceHandler::future_listEdges(const cpp2::ListEdgesReq& req) {
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResp>
MetaServiceHandler::future_createEdge(const cpp2::CreateEdgeReq& req) {
auto* processor = CreateEdgeProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResp>
MetaServiceHandler::future_removeEdge(const cpp2::RemoveEdgeReq& req) {
auto* processor = RemoveEdgeProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}

folly::Future<cpp2::HBResp>
MetaServiceHandler::future_heartBeat(const cpp2::HBReq& req) {
auto* processor = HBProcessor::instance(kvstore_);
Expand Down
10 changes: 8 additions & 2 deletions src/meta/MetaServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,18 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf {
folly::Future<cpp2::ListTagsResp>
future_listTags(const cpp2::ListTagsReq& req) override;

folly::Future<cpp2::ExecResp>
future_createEdge(const cpp2::CreateEdgeReq& req) override;
folly::Future<cpp2::GetEdgeResp>
future_getEdge(const cpp2::GetEdgeReq& req) override;

folly::Future<cpp2::ListEdgesResp>
future_listEdges(const cpp2::ListEdgesReq& req) override;

folly::Future<cpp2::ExecResp>
future_createEdge(const cpp2::CreateEdgeReq& req) override;

folly::Future<cpp2::ExecResp>
future_removeEdge(const cpp2::RemoveEdgeReq& req) override;

/**
* HeartBeat
* */
Expand Down
2 changes: 1 addition & 1 deletion src/meta/MetaServiceUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const std::string kIndexTable = "__index__"; // NOLINT

std::string MetaServiceUtils::spaceKey(GraphSpaceID spaceId) {
std::string key;
key.reserve(256);
key.reserve(128);
key.append(kSpacesTable.data(), kSpacesTable.size());
key.append(reinterpret_cast<const char*>(&spaceId), sizeof(spaceId));
return key;
Expand Down
43 changes: 41 additions & 2 deletions src/meta/client/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -656,11 +656,11 @@ MetaClient::createTagSchema(GraphSpaceID spaceId, std::string name, nebula::cpp2
folly::Future<StatusOr<TagID>>
MetaClient::alterTagSchema(GraphSpaceID spaceId,
std::string name,
std::vector<cpp2::AlterSchemaItem> tagItems) {
std::vector<cpp2::AlterSchemaItem> items) {
cpp2::AlterTagReq req;
req.set_space_id(std::move(spaceId));
req.set_tag_name(std::move(name));
req.set_tag_items(std::move(tagItems));
req.set_tag_items(std::move(items));

return getResponse(std::move(req), [] (auto client, auto request) {
return client->future_alterTag(request);
Expand Down Expand Up @@ -718,6 +718,20 @@ MetaClient::createEdgeSchema(GraphSpaceID spaceId, std::string name, nebula::cpp
}, true);
}

folly::Future<StatusOr<bool>>
MetaClient::alterEdge(GraphSpaceID spaceId, std::string name,
std::vector<cpp2::AlterSchemaItem> items) {
cpp2::AlterEdgeReq req;
req.set_space_id(std::move(spaceId));
req.set_edge_name(std::move(name));
req.set_edge_items(std::move(items));
return getResponse(std::move(req), [] (auto client, auto request) {
return client->future_alterEdge(request);
}, [] (cpp2::ExecResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, true);
}

folly::Future<StatusOr<std::vector<cpp2::EdgeItem>>>
MetaClient::listEdgeSchemas(GraphSpaceID spaceId) {
cpp2::ListEdgesReq req;
Expand All @@ -729,6 +743,31 @@ MetaClient::listEdgeSchemas(GraphSpaceID spaceId) {
});
}

folly::Future<StatusOr<nebula::cpp2::Schema>>
MetaClient::getEdgeSchema(GraphSpaceID spaceId, int32_t edgeType, SchemaVer version) {
cpp2::GetEdgeReq req;
req.set_space_id(std::move(spaceId));
req.set_edge_type(edgeType);
req.set_version(version);
return getResponse(std::move(req), [] (auto client, auto request) {
return client->future_getEdge(request);
}, [] (cpp2::GetEdgeResp&& resp) -> nebula::cpp2::Schema {
return std::move(resp).get_schema();
});
}

folly::Future<StatusOr<bool>>
MetaClient::removeEdgeSchema(GraphSpaceID spaceId, std::string name) {
cpp2::RemoveEdgeReq req;
req.set_space_id(std::move(spaceId));
req.set_edge_name(std::move(name));
return getResponse(std::move(req), [] (auto client, auto request) {
return client->future_removeEdge(request);
}, [] (cpp2::ExecResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, true);
}

StatusOr<std::shared_ptr<const SchemaProviderIf>>
MetaClient::getTagSchemaFromCache(GraphSpaceID spaceId, TagID tagID, SchemaVer ver) {
folly::RWSpinLock::ReadHolder holder(localCacheLock_);
Expand Down
9 changes: 9 additions & 0 deletions src/meta/client/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,18 @@ class MetaClient {
folly::Future<StatusOr<EdgeType>>
createEdgeSchema(GraphSpaceID spaceId, std::string name, nebula::cpp2::Schema schema);

folly::Future<StatusOr<bool>>
alterEdge(GraphSpaceID spaceId, std::string name, std::vector<cpp2::AlterSchemaItem> items);

folly::Future<StatusOr<std::vector<cpp2::EdgeItem>>>
listEdgeSchemas(GraphSpaceID spaceId);

folly::Future<StatusOr<nebula::cpp2::Schema>>
getEdgeSchema(GraphSpaceID spaceId, int32_t edgeType, SchemaVer version);

folly::Future<StatusOr<bool>>
removeEdgeSchema(GraphSpaceID spaceId, std::string name);

// Operations for custom kv
folly::Future<StatusOr<bool>>
multiPut(std::string segment,
Expand Down
6 changes: 1 addition & 5 deletions src/meta/processors/AlterEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@ namespace nebula {
namespace meta {

void AlterEdgeProcessor::process(const cpp2::AlterEdgeReq& req) {
if (spaceExist(req.get_space_id()) == Status::SpaceNotFound()) {
resp_.set_code(cpp2::ErrorCode::E_NOT_FOUND);
onFinished();
return;
}
CHECK_SPACE_ID_AND_RETURN(req.get_space_id());
folly::SharedMutex::WriteHolder wHolder(LockUtils::edgeLock());
auto ret = getEdgeType(req.get_space_id(), req.get_edge_name());
if (!ret.ok()) {
Expand Down
6 changes: 1 addition & 5 deletions src/meta/processors/AlterTagProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@ namespace nebula {
namespace meta {

void AlterTagProcessor::process(const cpp2::AlterTagReq& req) {
if (spaceExist(req.get_space_id()) == Status::SpaceNotFound()) {
resp_.set_code(cpp2::ErrorCode::E_NOT_FOUND);
onFinished();
return;
}
CHECK_SPACE_ID_AND_RETURN(req.get_space_id());
folly::SharedMutex::WriteHolder wHolder(LockUtils::tagLock());
auto ret = getTagId(req.get_space_id(), req.get_tag_name());
if (!ret.ok()) {
Expand Down
9 changes: 9 additions & 0 deletions src/meta/processors/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ GENERATE_LOCK(edge);
#undef GENERATE_LOCK
};

#define CHECK_SPACE_ID_AND_RETURN(spaceID) \
if (spaceExist(spaceID) == Status::SpaceNotFound()) { \
resp_.set_code(cpp2::ErrorCode::E_NOT_FOUND); \
onFinished(); \
return; \
}

/**
* Check segemnt is consist of numbers and letters and should not empty.
* */
Expand Down Expand Up @@ -118,6 +125,8 @@ class BaseProcessor {
* */
void doPut(std::vector<kvstore::KV> data);

StatusOr<std::unique_ptr<kvstore::KVIterator>> doPrefix(const std::string& key);

/**
* General get function.
* */
Expand Down
21 changes: 16 additions & 5 deletions src/meta/processors/BaseProcessor.inl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ void BaseProcessor<RESP>::doPut(std::vector<kvstore::KV> data) {
});
}

template<typename RESP>
StatusOr<std::unique_ptr<kvstore::KVIterator>>
BaseProcessor<RESP>::doPrefix(const std::string& key) {
std::unique_ptr<kvstore::KVIterator> iter;
auto code = kvstore_->prefix(kDefaultSpaceId_, kDefaultPartId_, key, &iter);
if (code != kvstore::ResultCode::SUCCEEDED) {
return Status::Error("Prefix Failed");
}
return iter;
}

template<typename RESP>
StatusOr<std::string> BaseProcessor<RESP>::doGet(const std::string& key) {
std::string value;
Expand Down Expand Up @@ -123,7 +134,7 @@ int32_t BaseProcessor<RESP>::autoIncrementId() {
std::string val;
auto ret = kvstore_->get(kDefaultSpaceId_, kDefaultPartId_, kIdKey, &val);
if (ret != kvstore::ResultCode::SUCCEEDED) {
CHECK_EQ(ret, kvstore::ResultCode::ERR_KEY_NOT_FOUND);
CHECK_EQ(kvstore::ResultCode::ERR_KEY_NOT_FOUND, ret);
id = 1;
} else {
id = *reinterpret_cast<const int32_t*>(val.c_str()) + 1;
Expand Down Expand Up @@ -186,12 +197,12 @@ StatusOr<TagID> BaseProcessor<RESP>::getTagId(GraphSpaceID spaceId, const std::s
template<typename RESP>
StatusOr<EdgeType> BaseProcessor<RESP>::getEdgeType(GraphSpaceID spaceId, const std::string& name) {
auto indexKey = MetaServiceUtils::indexEdgeKey(spaceId, name);
std::string val;
auto ret = kvstore_->get(kDefaultSpaceId_, kDefaultPartId_, indexKey, &val);
if (ret == kvstore::ResultCode::SUCCEEDED) {
return *reinterpret_cast<const TagID*>(val.c_str());
auto ret = doGet(indexKey);
if (ret.ok()) {
return *reinterpret_cast<const EdgeType*>(ret.value().c_str());
}
return Status::EdgeNotFound(folly::stringPrintf("Edge %s not found", name.c_str()));
}

} // namespace meta
} // namespace nebula
6 changes: 1 addition & 5 deletions src/meta/processors/CreateEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@ namespace nebula {
namespace meta {

void CreateEdgeProcessor::process(const cpp2::CreateEdgeReq& req) {
if (spaceExist(req.get_space_id()) == Status::SpaceNotFound()) {
resp_.set_code(cpp2::ErrorCode::E_NOT_FOUND);
onFinished();
return;
}
CHECK_SPACE_ID_AND_RETURN(req.get_space_id());
folly::SharedMutex::WriteHolder wHolder(LockUtils::edgeLock());
auto ret = getEdgeType(req.get_space_id(), req.get_edge_name());
if (ret.ok()) {
Expand Down
7 changes: 1 addition & 6 deletions src/meta/processors/CreateTagProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,7 @@ namespace nebula {
namespace meta {

void CreateTagProcessor::process(const cpp2::CreateTagReq& req) {
if (spaceExist(req.get_space_id()) == Status::SpaceNotFound()) {
LOG(ERROR) << "Create Tag Failed : Space " << req.get_space_id() << " not found";
resp_.set_code(cpp2::ErrorCode::E_NOT_FOUND);
onFinished();
return;
}
CHECK_SPACE_ID_AND_RETURN(req.get_space_id());
folly::SharedMutex::WriteHolder wHolder(LockUtils::tagLock());
auto ret = getTagId(req.get_space_id(), req.get_tag_name());
std::vector<kvstore::KV> data;
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/GetEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace nebula {
namespace meta {

void GetEdgeProcessor::process(const cpp2::GetEdgeReq& req) {
CHECK_SPACE_ID_AND_RETURN(req.get_space_id());
folly::SharedMutex::ReadHolder rHolder(LockUtils::edgeLock());
std::string val;
std::string edgeKey = MetaServiceUtils::schemaEdgeKey(req.get_space_id(),
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/GetEdgeProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ class GetEdgeProcessor : public BaseProcessor<cpp2::GetEdgeResp> {

} // namespace meta
} // namespace nebula

#endif // META_GETEDGEPROCESSOR_H_
2 changes: 1 addition & 1 deletion src/meta/processors/GetProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ void GetProcessor::process(const cpp2::GetReq& req) {
auto result = doGet(key);
if (!result.ok()) {
LOG(ERROR) << "Get Failed :" << key << " not found!";
resp_.set_code(cpp2::ErrorCode::E_KEY_NOT_FOUND);
resp_.set_code(cpp2::ErrorCode::E_NOT_FOUND);
onFinished();
return;
}
Expand Down
Loading

0 comments on commit d1cdf84

Please sign in to comment.