Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support edge in meta service #355

Merged
merged 13 commits into from
May 13, 2019
2 changes: 0 additions & 2 deletions src/graph/AlterTagExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@ AlterTagExecutor::AlterTagExecutor(Sentence *sentence,
sentence_ = static_cast<AlterTagSentence*>(sentence);
}


Status AlterTagExecutor::prepare() {
return checkIfGraphSpaceChosen();
}


void AlterTagExecutor::execute() {
auto *mc = ectx()->getMetaClient();
auto *name = sentence_->name();
Expand Down
1 change: 1 addition & 0 deletions src/graph/AlterTagExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class AlterTagExecutor final : public Executor {

private:
AlterTagSentence *sentence_{nullptr};

nebula::meta::cpp2::AlterSchemaOp
getTagOpType(const AlterTagOptItem::OptionType type);
};
Expand Down
23 changes: 14 additions & 9 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,15 @@ enum ErrorCode {
// KV Failure
E_STORE_FAILURE = -31,
E_STORE_SEGMENT_ILLEGAL = -32,
E_KEY_NOT_FOUND = -33,

E_UNKNOWN = -99,
} (cpp.enum_strict)


enum AlterSchemaOp {
ADD = 0x01,
ADD = 0x01,
CHANGE = 0x02,
DROP = 0x03,
DROP = 0x03,
UNKNOWN = 0x04,
} (cpp.enum_strict)

Expand Down Expand Up @@ -77,6 +76,11 @@ struct EdgeItem {
4: common.Schema schema,
}

struct AlterEdgeItem {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we have unified AlterTagItem and AlterEdgeItem to AlterSchemaItem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

1: AlterSchemaOp op,
2: common.Schema schema,
}

struct ExecResp {
1: ErrorCode code,
2: ID id,
Expand Down Expand Up @@ -162,12 +166,6 @@ struct GetEdgeReq {
3: common.SchemaVer version,
}

struct AlterEdgeReq {
1: common.GraphSpaceID space_id,
2: string edge_name,
3: list<AlterSchemaItem> edge_items,
}

struct GetEdgeResp {
1: ErrorCode code,
2: common.Schema schema,
Expand All @@ -180,6 +178,12 @@ struct CreateEdgeReq {
3: common.Schema schema,
}

struct AlterEdgeReq {
1: common.GraphSpaceID space_id,
2: string edge_name,
3: list<AlterEdgeItem> edge_items,
}

struct RemoveEdgeReq {
1: common.GraphSpaceID space_id,
2: string edge_name,
Expand Down Expand Up @@ -301,6 +305,7 @@ service MetaService {
ListTagsResp listTags(1: ListTagsReq req);

ExecResp createEdge(1: CreateEdgeReq req);
ExecResp alterEdge(1: AlterEdgeReq req);
ExecResp removeEdge(1: RemoveEdgeReq req);
GetEdgeResp getEdge(1: GetEdgeReq req);
ListEdgesResp listEdges(1: ListEdgesReq req);
Expand Down
2 changes: 1 addition & 1 deletion src/meta/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ add_library(
processors/GetTagProcessor.cpp
processors/GetEdgeProcessor.cpp
processors/ListTagsProcessor.cpp
processors/ListEdgesProcessor.cpp
processors/RemoveTagProcessor.cpp
processors/ListEdgesProcessor.cpp
processors/RemoveEdgeProcessor.cpp
processors/GetProcessor.cpp
processors/MultiGetProcessor.cpp
Expand Down
20 changes: 17 additions & 3 deletions src/meta/MetaServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
#include "meta/processors/AlterTagProcessor.h"
#include "meta/processors/CreateEdgeProcessor.h"
#include "meta/processors/GetTagProcessor.h"
#include "meta/processors/GetEdgeProcessor.h"
#include "meta/processors/ListTagsProcessor.h"
#include "meta/processors/ListEdgesProcessor.h"
#include "meta/processors/RemoveEdgeProcessor.h"
#include "meta/processors/MultiPutProcessor.h"
#include "meta/processors/GetProcessor.h"
#include "meta/processors/MultiGetProcessor.h"
Expand Down Expand Up @@ -143,9 +145,9 @@ MetaServiceHandler::future_listTags(const cpp2::ListTagsReq& req) {
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResp>
MetaServiceHandler::future_createEdge(const cpp2::CreateEdgeReq& req) {
auto* processor = CreateEdgeProcessor::instance(kvstore_);
folly::Future<cpp2::GetEdgeResp>
MetaServiceHandler::future_getEdge(const cpp2::GetEdgeReq& req) {
auto* processor = GetEdgeProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}

Expand All @@ -155,5 +157,17 @@ MetaServiceHandler::future_listEdges(const cpp2::ListEdgesReq& req) {
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResp>
MetaServiceHandler::future_createEdge(const cpp2::CreateEdgeReq& req) {
auto* processor = CreateEdgeProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResp>
MetaServiceHandler::future_removeEdge(const cpp2::RemoveEdgeReq& req) {
auto* processor = RemoveEdgeProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}

} // namespace meta
} // namespace nebula
10 changes: 8 additions & 2 deletions src/meta/MetaServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,18 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf {
folly::Future<cpp2::ListTagsResp>
future_listTags(const cpp2::ListTagsReq& req) override;

folly::Future<cpp2::ExecResp>
future_createEdge(const cpp2::CreateEdgeReq& req) override;
folly::Future<cpp2::GetEdgeResp>
future_getEdge(const cpp2::GetEdgeReq& req) override;

folly::Future<cpp2::ListEdgesResp>
future_listEdges(const cpp2::ListEdgesReq& req) override;

folly::Future<cpp2::ExecResp>
future_createEdge(const cpp2::CreateEdgeReq& req) override;

folly::Future<cpp2::ExecResp>
future_removeEdge(const cpp2::RemoveEdgeReq& req) override;

private:
kvstore::KVStore* kvstore_ = nullptr;
};
Expand Down
2 changes: 1 addition & 1 deletion src/meta/MetaServiceUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const std::string kIndexTable = "__index__"; // NOLINT

std::string MetaServiceUtils::spaceKey(GraphSpaceID spaceId) {
std::string key;
key.reserve(256);
key.reserve(128);
key.append(kSpacesTable.data(), kSpacesTable.size());
key.append(reinterpret_cast<const char*>(&spaceId), sizeof(spaceId));
return key;
Expand Down
44 changes: 42 additions & 2 deletions src/meta/client/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -643,11 +643,11 @@ MetaClient::createTagSchema(GraphSpaceID spaceId, std::string name, nebula::cpp2
folly::Future<StatusOr<TagID>>
MetaClient::alterTagSchema(GraphSpaceID spaceId,
std::string name,
std::vector<cpp2::AlterSchemaItem> tagItems) {
std::vector<cpp2::AlterSchemaItem> items) {
cpp2::AlterTagReq req;
req.set_space_id(std::move(spaceId));
req.set_tag_name(std::move(name));
req.set_tag_items(std::move(tagItems));
req.set_tag_items(std::move(items));

return getResponse(std::move(req), [] (auto client, auto request) {
return client->future_alterTag(request);
Expand Down Expand Up @@ -705,6 +705,20 @@ MetaClient::createEdgeSchema(GraphSpaceID spaceId, std::string name, nebula::cpp
});
}

folly::Future<StatusOr<bool>>
MetaClient::alterEdge(GraphSpaceID spaceId, std::string name,
std::vector<cpp2::AlterEdgeItem> items) {
cpp2::AlterEdgeReq req;
req.set_space_id(std::move(spaceId));
req.set_edge_name(std::move(name));
req.set_edge_items(std::move(items));
return getResponse(std::move(req), [] (auto client, auto request) {
return client->future_alterEdge(request);
}, [] (cpp2::ExecResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
});
}

folly::Future<StatusOr<std::vector<cpp2::EdgeItem>>>
MetaClient::listEdgeSchemas(GraphSpaceID spaceId) {
cpp2::ListEdgesReq req;
Expand All @@ -716,6 +730,32 @@ MetaClient::listEdgeSchemas(GraphSpaceID spaceId) {
});
}

folly::Future<StatusOr<bool>>
darionyaphet marked this conversation as resolved.
Show resolved Hide resolved
MetaClient::getEdgeSchema(GraphSpaceID spaceId, int32_t edgeType, SchemaVer version) {
cpp2::GetEdgeReq req;
req.set_space_id(std::move(spaceId));
req.set_edge_type(edgeType);
req.set_version(version);
return getResponse(std::move(req), [] (auto client, auto request) {
return client->future_getEdge(request);
}, [] (cpp2::GetEdgeResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
});
}

folly::Future<StatusOr<bool>>
MetaClient::removeEdgeSchema(GraphSpaceID spaceId, std::string name) {
CHECK_PARAMETER_AND_RETURN_STATUS(name);
cpp2::RemoveEdgeReq req;
req.set_space_id(std::move(spaceId));
req.set_edge_name(std::move(name));
return getResponse(std::move(req), [] (auto client, auto request) {
return client->future_removeEdge(request);
}, [] (cpp2::ExecResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
});
}

StatusOr<std::shared_ptr<const SchemaProviderIf>>
MetaClient::getTagSchemeFromCache(GraphSpaceID spaceId, TagID tagID, SchemaVer ver) {
folly::RWSpinLock::ReadHolder holder(localCacheLock_);
Expand Down
9 changes: 9 additions & 0 deletions src/meta/client/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,18 @@ class MetaClient {
folly::Future<StatusOr<EdgeType>>
createEdgeSchema(GraphSpaceID spaceId, std::string name, nebula::cpp2::Schema schema);

folly::Future<StatusOr<bool>>
alterEdge(GraphSpaceID spaceId, std::string name, std::vector<cpp2::AlterEdgeItem> items);

folly::Future<StatusOr<std::vector<cpp2::EdgeItem>>>
listEdgeSchemas(GraphSpaceID spaceId);

folly::Future<StatusOr<bool>>
getEdgeSchema(GraphSpaceID spaceId, int32_t edgeType, SchemaVer version);

folly::Future<StatusOr<bool>>
removeEdgeSchema(GraphSpaceID spaceId, std::string name);

// These are the interfaces about cache opeartions.
StatusOr<GraphSpaceID> getSpaceIdByNameFromCache(const std::string& name);

Expand Down
45 changes: 42 additions & 3 deletions src/meta/processors/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ GENERATE_LOCK(edge);
#undef GENERATE_LOCK
};

#define CHECK_SPACE_ID_AND_RETURN(spaceID) \
if (spaceExist(spaceID) == Status::SpaceNotFound()) { \
resp_.set_code(cpp2::ErrorCode::E_NOT_FOUND); \
onFinished(); \
return; \
}

/**
* Check segemnt is consist of numbers and letters and should not empty.
* */
Expand All @@ -50,6 +57,36 @@ GENERATE_LOCK(edge);
return; \
}

/**
* Retrieval space ID by space name and will return when it's not found.
* */
#define GET_SPACE_ID_AND_RETURN(spaceName, spaceId) \
darionyaphet marked this conversation as resolved.
Show resolved Hide resolved
auto spaceIdResult = getSpaceId(spaceName); \
if (!spaceIdResult.ok()) { \
resp_.set_code(cpp2::ErrorCode::E_NOT_FOUND); \
onFinished(); \
return; \
} else { \
spaceId = spaceIdResult.value(); \
}

/**
*
* */
#define GET_EDGE_TYPE_AND_RETURN(spaceId, edgeName, edgeType) \
darionyaphet marked this conversation as resolved.
Show resolved Hide resolved
auto edgeTypeResult = getEdgeType(spaceId, edgeName); \
if (!edgeTypeResult.ok()) { \
resp_.set_code(cpp2::ErrorCode::E_NOT_FOUND); \
onFinished(); \
return; \
} else { \
edgeType = edgeTypeResult.value(); \
}


#define MAX_VERSION_HEX 0x7FFFFFFFFFFFFFFF
darionyaphet marked this conversation as resolved.
Show resolved Hide resolved
#define MIN_VERSION_HEX 0x0000000000000000

template<typename RESP>
class BaseProcessor {
public:
Expand Down Expand Up @@ -118,6 +155,8 @@ class BaseProcessor {
* */
void doPut(std::vector<kvstore::KV> data);

StatusOr<std::unique_ptr<kvstore::KVIterator>> doPrefix(const std::string& key);

/**
* General get function.
* */
Expand Down Expand Up @@ -170,17 +209,17 @@ class BaseProcessor {
Status hostsExist(const std::vector<std::string>& name);

/**
* Return the spaceId for name.
* Return the spaceId from name.
darionyaphet marked this conversation as resolved.
Show resolved Hide resolved
* */
StatusOr<GraphSpaceID> getSpaceId(const std::string& name);

/**
* Return the tagId for name.
* Return the tagId from name.
*/
StatusOr<TagID> getTagId(GraphSpaceID spaceId, const std::string& name);

/**
* Return the edgeType for name.
* Return the edgeType from name.
*/
StatusOr<EdgeType> getEdgeType(GraphSpaceID spaceId, const std::string& name);

Expand Down
21 changes: 16 additions & 5 deletions src/meta/processors/BaseProcessor.inl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ void BaseProcessor<RESP>::doPut(std::vector<kvstore::KV> data) {
});
}

template<typename RESP>
StatusOr<std::unique_ptr<kvstore::KVIterator>>
BaseProcessor<RESP>::doPrefix(const std::string& key) {
std::unique_ptr<kvstore::KVIterator> iter;
auto code = kvstore_->prefix(kDefaultSpaceId_, kDefaultPartId_, key, &iter);
if (code != kvstore::ResultCode::SUCCEEDED) {
return Status::Error("Prefix Failed");
}
return iter;
}

template<typename RESP>
StatusOr<std::string> BaseProcessor<RESP>::doGet(const std::string& key) {
std::string value;
Expand Down Expand Up @@ -123,7 +134,7 @@ int32_t BaseProcessor<RESP>::autoIncrementId() {
std::string val;
auto ret = kvstore_->get(kDefaultSpaceId_, kDefaultPartId_, kIdKey, &val);
if (ret != kvstore::ResultCode::SUCCEEDED) {
CHECK_EQ(ret, kvstore::ResultCode::ERR_KEY_NOT_FOUND);
CHECK_EQ(kvstore::ResultCode::ERR_KEY_NOT_FOUND, ret);
id = 1;
} else {
id = *reinterpret_cast<const int32_t*>(val.c_str()) + 1;
Expand Down Expand Up @@ -198,12 +209,12 @@ StatusOr<TagID> BaseProcessor<RESP>::getTagId(GraphSpaceID spaceId, const std::s
template<typename RESP>
StatusOr<EdgeType> BaseProcessor<RESP>::getEdgeType(GraphSpaceID spaceId, const std::string& name) {
auto indexKey = MetaServiceUtils::indexEdgeKey(spaceId, name);
std::string val;
auto ret = kvstore_->get(kDefaultSpaceId_, kDefaultPartId_, indexKey, &val);
if (ret == kvstore::ResultCode::SUCCEEDED) {
return *reinterpret_cast<const TagID*>(val.c_str());
auto ret = doGet(indexKey);
if (ret.ok()) {
return *reinterpret_cast<const EdgeType*>(ret.value().c_str());
}
return Status::EdgeNotFound(folly::stringPrintf("Edge %s not found", name.c_str()));
}

} // namespace meta
} // namespace nebula
1 change: 1 addition & 0 deletions src/meta/processors/GetEdgeProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ class GetEdgeProcessor : public BaseProcessor<cpp2::GetEdgeResp> {

} // namespace meta
} // namespace nebula

#endif // META_GETEDGEPROCESSOR_H_
2 changes: 1 addition & 1 deletion src/meta/processors/GetProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ void GetProcessor::process(const cpp2::GetReq& req) {
auto result = doGet(key);
if (!result.ok()) {
LOG(ERROR) << "Get Failed :" << key << " not found!";
resp_.set_code(cpp2::ErrorCode::E_KEY_NOT_FOUND);
resp_.set_code(cpp2::ErrorCode::E_NOT_FOUND);
onFinished();
return;
}
Expand Down
Loading