Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support edge in meta service #355

Merged
merged 13 commits into from
May 13, 2019
2 changes: 0 additions & 2 deletions src/graph/AlterTagExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@ AlterTagExecutor::AlterTagExecutor(Sentence *sentence,
sentence_ = static_cast<AlterTagSentence*>(sentence);
}


Status AlterTagExecutor::prepare() {
return checkIfGraphSpaceChosen();
}


void AlterTagExecutor::execute() {
auto *mc = ectx()->getMetaClient();
auto *name = sentence_->name();
Expand Down
1 change: 1 addition & 0 deletions src/graph/AlterTagExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class AlterTagExecutor final : public Executor {

private:
AlterTagSentence *sentence_{nullptr};

nebula::meta::cpp2::AlterSchemaOp
getTagOpType(const AlterTagOptItem::OptionType type);
};
Expand Down
32 changes: 16 additions & 16 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,15 @@ enum ErrorCode {
// KV Failure
E_STORE_FAILURE = -31,
E_STORE_SEGMENT_ILLEGAL = -32,
E_KEY_NOT_FOUND = -33,

E_UNKNOWN = -99,
} (cpp.enum_strict)


enum AlterSchemaOp {
ADD = 0x01,
ADD = 0x01,
CHANGE = 0x02,
DROP = 0x03,
DROP = 0x03,
UNKNOWN = 0x04,
} (cpp.enum_strict)

Expand Down Expand Up @@ -156,30 +155,30 @@ struct GetTagResp {
2: common.Schema schema,
}

struct GetEdgeReq {
// Edge related operations.
struct CreateEdgeReq {
1: common.GraphSpaceID space_id,
2: common.EdgeType edge_type,
3: common.SchemaVer version,
2: string edge_name,
3: common.Schema schema,
}

struct AlterEdgeReq {
1: common.GraphSpaceID space_id,
2: string edge_name,
3: list<AlterSchemaItem> edge_items,
1: common.GraphSpaceID space_id,
2: string edge_name,
3: list<AlterSchemaItem> edge_items,
}

struct GetEdgeReq {
1: common.GraphSpaceID space_id,
2: common.EdgeType edge_type,
3: common.SchemaVer version,
}

struct GetEdgeResp {
1: ErrorCode code,
2: common.Schema schema,
}

// Edge related operations.
struct CreateEdgeReq {
1: common.GraphSpaceID space_id,
2: string edge_name,
3: common.Schema schema,
}

struct RemoveEdgeReq {
1: common.GraphSpaceID space_id,
2: string edge_name,
Expand Down Expand Up @@ -301,6 +300,7 @@ service MetaService {
ListTagsResp listTags(1: ListTagsReq req);

ExecResp createEdge(1: CreateEdgeReq req);
ExecResp alterEdge(1: AlterEdgeReq req);
ExecResp removeEdge(1: RemoveEdgeReq req);
GetEdgeResp getEdge(1: GetEdgeReq req);
ListEdgesResp listEdges(1: ListEdgesReq req);
Expand Down
2 changes: 1 addition & 1 deletion src/meta/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ add_library(
processors/GetTagProcessor.cpp
processors/GetEdgeProcessor.cpp
processors/ListTagsProcessor.cpp
processors/ListEdgesProcessor.cpp
processors/RemoveTagProcessor.cpp
processors/ListEdgesProcessor.cpp
processors/RemoveEdgeProcessor.cpp
processors/GetProcessor.cpp
processors/MultiGetProcessor.cpp
Expand Down
20 changes: 17 additions & 3 deletions src/meta/MetaServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
#include "meta/processors/AlterTagProcessor.h"
#include "meta/processors/CreateEdgeProcessor.h"
#include "meta/processors/GetTagProcessor.h"
#include "meta/processors/GetEdgeProcessor.h"
#include "meta/processors/ListTagsProcessor.h"
#include "meta/processors/ListEdgesProcessor.h"
#include "meta/processors/RemoveEdgeProcessor.h"
#include "meta/processors/MultiPutProcessor.h"
#include "meta/processors/GetProcessor.h"
#include "meta/processors/MultiGetProcessor.h"
Expand Down Expand Up @@ -143,9 +145,9 @@ MetaServiceHandler::future_listTags(const cpp2::ListTagsReq& req) {
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResp>
MetaServiceHandler::future_createEdge(const cpp2::CreateEdgeReq& req) {
auto* processor = CreateEdgeProcessor::instance(kvstore_);
folly::Future<cpp2::GetEdgeResp>
MetaServiceHandler::future_getEdge(const cpp2::GetEdgeReq& req) {
auto* processor = GetEdgeProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}

Expand All @@ -155,5 +157,17 @@ MetaServiceHandler::future_listEdges(const cpp2::ListEdgesReq& req) {
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResp>
MetaServiceHandler::future_createEdge(const cpp2::CreateEdgeReq& req) {
auto* processor = CreateEdgeProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResp>
MetaServiceHandler::future_removeEdge(const cpp2::RemoveEdgeReq& req) {
auto* processor = RemoveEdgeProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}

} // namespace meta
} // namespace nebula
10 changes: 8 additions & 2 deletions src/meta/MetaServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,18 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf {
folly::Future<cpp2::ListTagsResp>
future_listTags(const cpp2::ListTagsReq& req) override;

folly::Future<cpp2::ExecResp>
future_createEdge(const cpp2::CreateEdgeReq& req) override;
folly::Future<cpp2::GetEdgeResp>
future_getEdge(const cpp2::GetEdgeReq& req) override;

folly::Future<cpp2::ListEdgesResp>
future_listEdges(const cpp2::ListEdgesReq& req) override;

folly::Future<cpp2::ExecResp>
future_createEdge(const cpp2::CreateEdgeReq& req) override;

folly::Future<cpp2::ExecResp>
future_removeEdge(const cpp2::RemoveEdgeReq& req) override;

private:
kvstore::KVStore* kvstore_ = nullptr;
};
Expand Down
2 changes: 1 addition & 1 deletion src/meta/MetaServiceUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const std::string kIndexTable = "__index__"; // NOLINT

std::string MetaServiceUtils::spaceKey(GraphSpaceID spaceId) {
std::string key;
key.reserve(256);
key.reserve(128);
key.append(kSpacesTable.data(), kSpacesTable.size());
key.append(reinterpret_cast<const char*>(&spaceId), sizeof(spaceId));
return key;
Expand Down
44 changes: 42 additions & 2 deletions src/meta/client/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -643,11 +643,11 @@ MetaClient::createTagSchema(GraphSpaceID spaceId, std::string name, nebula::cpp2
folly::Future<StatusOr<TagID>>
MetaClient::alterTagSchema(GraphSpaceID spaceId,
std::string name,
std::vector<cpp2::AlterSchemaItem> tagItems) {
std::vector<cpp2::AlterSchemaItem> items) {
cpp2::AlterTagReq req;
req.set_space_id(std::move(spaceId));
req.set_tag_name(std::move(name));
req.set_tag_items(std::move(tagItems));
req.set_tag_items(std::move(items));

return getResponse(std::move(req), [] (auto client, auto request) {
return client->future_alterTag(request);
Expand Down Expand Up @@ -705,6 +705,20 @@ MetaClient::createEdgeSchema(GraphSpaceID spaceId, std::string name, nebula::cpp
});
}

folly::Future<StatusOr<bool>>
MetaClient::alterEdge(GraphSpaceID spaceId, std::string name,
std::vector<cpp2::AlterSchemaItem> items) {
cpp2::AlterEdgeReq req;
req.set_space_id(std::move(spaceId));
req.set_edge_name(std::move(name));
req.set_edge_items(std::move(items));
return getResponse(std::move(req), [] (auto client, auto request) {
return client->future_alterEdge(request);
}, [] (cpp2::ExecResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
});
}

folly::Future<StatusOr<std::vector<cpp2::EdgeItem>>>
MetaClient::listEdgeSchemas(GraphSpaceID spaceId) {
cpp2::ListEdgesReq req;
Expand All @@ -716,6 +730,32 @@ MetaClient::listEdgeSchemas(GraphSpaceID spaceId) {
});
}

folly::Future<StatusOr<bool>>
darionyaphet marked this conversation as resolved.
Show resolved Hide resolved
MetaClient::getEdgeSchema(GraphSpaceID spaceId, int32_t edgeType, SchemaVer version) {
cpp2::GetEdgeReq req;
req.set_space_id(std::move(spaceId));
req.set_edge_type(edgeType);
req.set_version(version);
return getResponse(std::move(req), [] (auto client, auto request) {
return client->future_getEdge(request);
}, [] (cpp2::GetEdgeResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
});
}

folly::Future<StatusOr<bool>>
MetaClient::removeEdgeSchema(GraphSpaceID spaceId, std::string name) {
CHECK_PARAMETER_AND_RETURN_STATUS(name);
cpp2::RemoveEdgeReq req;
req.set_space_id(std::move(spaceId));
req.set_edge_name(std::move(name));
return getResponse(std::move(req), [] (auto client, auto request) {
return client->future_removeEdge(request);
}, [] (cpp2::ExecResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
});
}

StatusOr<std::shared_ptr<const SchemaProviderIf>>
MetaClient::getTagSchemeFromCache(GraphSpaceID spaceId, TagID tagID, SchemaVer ver) {
folly::RWSpinLock::ReadHolder holder(localCacheLock_);
Expand Down
9 changes: 9 additions & 0 deletions src/meta/client/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,18 @@ class MetaClient {
folly::Future<StatusOr<EdgeType>>
createEdgeSchema(GraphSpaceID spaceId, std::string name, nebula::cpp2::Schema schema);

folly::Future<StatusOr<bool>>
alterEdge(GraphSpaceID spaceId, std::string name, std::vector<cpp2::AlterSchemaItem> items);

folly::Future<StatusOr<std::vector<cpp2::EdgeItem>>>
listEdgeSchemas(GraphSpaceID spaceId);

folly::Future<StatusOr<bool>>
getEdgeSchema(GraphSpaceID spaceId, int32_t edgeType, SchemaVer version);

folly::Future<StatusOr<bool>>
removeEdgeSchema(GraphSpaceID spaceId, std::string name);

// These are the interfaces about cache opeartions.
StatusOr<GraphSpaceID> getSpaceIdByNameFromCache(const std::string& name);

Expand Down
9 changes: 9 additions & 0 deletions src/meta/processors/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ GENERATE_LOCK(edge);
#undef GENERATE_LOCK
};

#define CHECK_SPACE_ID_AND_RETURN(spaceID) \
if (spaceExist(spaceID) == Status::SpaceNotFound()) { \
resp_.set_code(cpp2::ErrorCode::E_NOT_FOUND); \
onFinished(); \
return; \
}

/**
* Check segemnt is consist of numbers and letters and should not empty.
* */
Expand Down Expand Up @@ -118,6 +125,8 @@ class BaseProcessor {
* */
void doPut(std::vector<kvstore::KV> data);

StatusOr<std::unique_ptr<kvstore::KVIterator>> doPrefix(const std::string& key);

/**
* General get function.
* */
Expand Down
21 changes: 16 additions & 5 deletions src/meta/processors/BaseProcessor.inl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ void BaseProcessor<RESP>::doPut(std::vector<kvstore::KV> data) {
});
}

template<typename RESP>
StatusOr<std::unique_ptr<kvstore::KVIterator>>
BaseProcessor<RESP>::doPrefix(const std::string& key) {
std::unique_ptr<kvstore::KVIterator> iter;
auto code = kvstore_->prefix(kDefaultSpaceId_, kDefaultPartId_, key, &iter);
if (code != kvstore::ResultCode::SUCCEEDED) {
return Status::Error("Prefix Failed");
}
return iter;
}

template<typename RESP>
StatusOr<std::string> BaseProcessor<RESP>::doGet(const std::string& key) {
std::string value;
Expand Down Expand Up @@ -123,7 +134,7 @@ int32_t BaseProcessor<RESP>::autoIncrementId() {
std::string val;
auto ret = kvstore_->get(kDefaultSpaceId_, kDefaultPartId_, kIdKey, &val);
if (ret != kvstore::ResultCode::SUCCEEDED) {
CHECK_EQ(ret, kvstore::ResultCode::ERR_KEY_NOT_FOUND);
CHECK_EQ(kvstore::ResultCode::ERR_KEY_NOT_FOUND, ret);
id = 1;
} else {
id = *reinterpret_cast<const int32_t*>(val.c_str()) + 1;
Expand Down Expand Up @@ -198,12 +209,12 @@ StatusOr<TagID> BaseProcessor<RESP>::getTagId(GraphSpaceID spaceId, const std::s
template<typename RESP>
StatusOr<EdgeType> BaseProcessor<RESP>::getEdgeType(GraphSpaceID spaceId, const std::string& name) {
auto indexKey = MetaServiceUtils::indexEdgeKey(spaceId, name);
std::string val;
auto ret = kvstore_->get(kDefaultSpaceId_, kDefaultPartId_, indexKey, &val);
if (ret == kvstore::ResultCode::SUCCEEDED) {
return *reinterpret_cast<const TagID*>(val.c_str());
auto ret = doGet(indexKey);
if (ret.ok()) {
return *reinterpret_cast<const EdgeType*>(ret.value().c_str());
}
return Status::EdgeNotFound(folly::stringPrintf("Edge %s not found", name.c_str()));
}

} // namespace meta
} // namespace nebula
1 change: 1 addition & 0 deletions src/meta/processors/GetEdgeProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ class GetEdgeProcessor : public BaseProcessor<cpp2::GetEdgeResp> {

} // namespace meta
} // namespace nebula

#endif // META_GETEDGEPROCESSOR_H_
2 changes: 1 addition & 1 deletion src/meta/processors/GetProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ void GetProcessor::process(const cpp2::GetReq& req) {
auto result = doGet(key);
if (!result.ok()) {
LOG(ERROR) << "Get Failed :" << key << " not found!";
resp_.set_code(cpp2::ErrorCode::E_KEY_NOT_FOUND);
resp_.set_code(cpp2::ErrorCode::E_NOT_FOUND);
onFinished();
return;
}
Expand Down
6 changes: 3 additions & 3 deletions src/meta/processors/ListEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace nebula {
namespace meta {

void ListEdgesProcessor::process(const cpp2::ListEdgesReq& req) {
CHECK_SPACE_ID_AND_RETURN(req.get_space_id());
folly::SharedMutex::ReadHolder rHolder(LockUtils::edgeLock());
auto spaceId = req.get_space_id();
auto prefix = MetaServiceUtils::schemaEdgesPrefix(spaceId);
Expand All @@ -30,9 +31,8 @@ void ListEdgesProcessor::process(const cpp2::ListEdgesReq& req) {
auto nameLen = *reinterpret_cast<const int32_t *>(val.data());
auto edgeName = val.subpiece(sizeof(int32_t), nameLen).str();
auto schema = MetaServiceUtils::parseSchema(val);
cpp2::EdgeItem edgeItem(apache::thrift::FragileConstructor::FRAGILE,
edgeType, edgeName, vers, schema);
edges.emplace_back(std::move(edgeItem));
edges.emplace_back(apache::thrift::FragileConstructor::FRAGILE,
edgeType, edgeName, vers, schema);
iter->next();
}
resp_.set_edges(std::move(edges));
Expand Down
1 change: 0 additions & 1 deletion src/meta/processors/ListHostsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ void ListHostsProcessor::process(const cpp2::ListHostsReq& req) {
onFinished();
return;
}
resp_.set_code(cpp2::ErrorCode::SUCCEEDED);
resp_.set_hosts(std::move(ret.value()));
onFinished();
}
Expand Down
5 changes: 2 additions & 3 deletions src/meta/processors/ListTagsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ void ListTagsProcessor::process(const cpp2::ListTagsReq& req) {
auto nameLen = *reinterpret_cast<const int32_t *>(val.data());
auto tagName = val.subpiece(sizeof(int32_t), nameLen).str();
auto schema = MetaServiceUtils::parseSchema(val);
cpp2::TagItem tagItem(apache::thrift::FragileConstructor::FRAGILE,
tagID, tagName, vers, schema);
tags.emplace_back(std::move(tagItem));
tags.emplace_back(apache::thrift::FragileConstructor::FRAGILE,
tagID, tagName, vers, schema);
iter->next();
}
resp_.set_tags(std::move(tags));
Expand Down
Loading