Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AlterEdgeProcessor, GetEdgeProcessor, RemoveEdgeProcessor and UTs #346

Merged
merged 5 commits into from
May 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions src/graph/AlterTagExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ void AlterTagExecutor::execute() {
const auto& tagOpts = sentence_->tagOptList();
auto spaceId = ectx()->rctx()->session()->space();

std::vector<nebula::meta::cpp2::AlterTagItem> tagItems;
std::vector<nebula::meta::cpp2::AlterSchemaItem> tagItems;
for (auto& tagOpt : tagOpts) {
nebula::meta::cpp2::AlterTagItem tagItem;
nebula::meta::cpp2::AlterSchemaItem tagItem;
auto opType = getTagOpType(tagOpt->getOptType());
tagItem.set_op(std::move(opType));
const auto& specs = tagOpt->columnSpecs();
Expand Down Expand Up @@ -65,17 +65,17 @@ void AlterTagExecutor::execute() {
std::move(future).via(runner).thenValue(cb).thenError(error);
}

nebula::meta::cpp2::AlterTagOp
nebula::meta::cpp2::AlterSchemaOp
AlterTagExecutor::getTagOpType(const AlterTagOptItem::OptionType type) {
switch (type) {
case AlterTagOptItem::OptionType::ADD :
return nebula::meta::cpp2::AlterTagOp::ADD;
case AlterTagOptItem::OptionType::SET :
return nebula::meta::cpp2::AlterTagOp::SET;
return nebula::meta::cpp2::AlterSchemaOp::ADD;
case AlterTagOptItem::OptionType::CHANGE :
return nebula::meta::cpp2::AlterSchemaOp::CHANGE;
case AlterTagOptItem::OptionType::DROP :
return nebula::meta::cpp2::AlterTagOp::DROP;
return nebula::meta::cpp2::AlterSchemaOp::DROP;
default:
return nebula::meta::cpp2::AlterTagOp::UNKNOWN;
return nebula::meta::cpp2::AlterSchemaOp::UNKNOWN;
}
}
} // namespace graph
Expand Down
2 changes: 1 addition & 1 deletion src/graph/AlterTagExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class AlterTagExecutor final : public Executor {

private:
AlterTagSentence *sentence_{nullptr};
nebula::meta::cpp2::AlterTagOp
nebula::meta::cpp2::AlterSchemaOp
getTagOpType(const AlterTagOptItem::OptionType type);
};

Expand Down
5 changes: 1 addition & 4 deletions src/graph/test/SchemaTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ TEST_F(SchemaTest, metaCommunication) {
cpp2::ExecutionResponse resp;
std::string query = "ALTER TAG account "
"ADD (col1 int TTL = 200, col2 string), "
"SET (balance string), "
"CHANGE (balance string), "
"DROP (id)";
auto code = client->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
Expand Down Expand Up @@ -208,13 +208,10 @@ TEST_F(SchemaTest, metaCommunication) {
};
ASSERT_TRUE(verifyResult(resp, expected));
}
/* test the same tag in diff space, but now meta server not supported,
* will add a issue(#292) to resolve it */
{
cpp2::ExecutionResponse resp;
std::string query = "CREATE TAG person(name string, interest string)";
auto code = client->execute(query, resp);
sleep(FLAGS_load_data_interval_second + 1);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
}
{
Expand Down
43 changes: 25 additions & 18 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ enum ErrorCode {
} (cpp.enum_strict)


enum AlterTagOp {
enum AlterSchemaOp {
ADD = 0x01,
SET = 0x02,
CHANGE = 0x02,
DROP = 0x03,
UNKNOWN = 0x04,
} (cpp.enum_strict)
Expand Down Expand Up @@ -65,8 +65,8 @@ struct TagItem {
4: common.Schema schema,
}

struct AlterTagItem {
1: AlterTagOp op,
struct AlterSchemaItem {
1: AlterSchemaOp op,
2: common.Schema schema,
}

Expand Down Expand Up @@ -124,9 +124,9 @@ struct CreateTagReq {
}

struct AlterTagReq {
1: common.GraphSpaceID space_id,
2: string tag_name,
3: list<AlterTagItem> tag_items,
1: common.GraphSpaceID space_id,
2: string tag_name,
3: list<AlterSchemaItem> tag_items,
}

struct RemoveTagReq {
Expand Down Expand Up @@ -156,6 +156,23 @@ struct GetTagResp {
2: common.Schema schema,
}

struct GetEdgeReq {
1: common.GraphSpaceID space_id,
2: common.EdgeType edge_type,
3: common.SchemaVer version,
}

struct AlterEdgeReq {
1: common.GraphSpaceID space_id,
2: string edge_name,
3: list<AlterSchemaItem> edge_items,
}

struct GetEdgeResp {
1: ErrorCode code,
2: common.Schema schema,
}

// Edge related operations.
struct CreateEdgeReq {
1: common.GraphSpaceID space_id,
Expand All @@ -165,7 +182,7 @@ struct CreateEdgeReq {

struct RemoveEdgeReq {
1: common.GraphSpaceID space_id,
2: common.EdgeType edge_type,
2: string edge_name,
}

struct ListEdgesReq {
Expand All @@ -179,16 +196,6 @@ struct ListEdgesResp {
3: list<EdgeItem> edges,
}

struct GetEdgeReq {
1: common.GraphSpaceID space_id,
2: common.EdgeType edge_type,
3: common.SchemaVer version,
}

struct GetEdgeResp {
1: common.Schema schema,
}

// Host related operations.
struct AddHostsReq {
1: list<common.HostAddr> hosts;
Expand Down
7 changes: 5 additions & 2 deletions src/meta/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ add_library(
processors/DropSpaceProcessor.cpp
processors/GetPartsAllocProcessor.cpp
processors/CreateTagProcessor.cpp
processors/AlterTagProcessor.cpp
processors/CreateEdgeProcessor.cpp
processors/AlterTagProcessor.cpp
processors/AlterEdgeProcessor.cpp
processors/GetTagProcessor.cpp
processors/GetEdgeProcessor.cpp
processors/ListTagsProcessor.cpp
processors/RemoveTagProcessor.cpp
processors/ListEdgesProcessor.cpp
processors/RemoveTagProcessor.cpp
processors/RemoveEdgeProcessor.cpp
processors/GetProcessor.cpp
processors/MultiGetProcessor.cpp
processors/MultiPutProcessor.cpp
Expand Down
51 changes: 51 additions & 0 deletions src/meta/MetaServiceUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ nebula::cpp2::HostAddr MetaServiceUtils::parseHostKey(folly::StringPiece key) {
return host;
}

std::string MetaServiceUtils::schemaEdgePrefix(GraphSpaceID spaceId, EdgeType edgeType) {
std::string key;
key.reserve(128);
key.append(kEdgesTable.data(), kEdgesTable.size());
key.append(reinterpret_cast<const char*>(&spaceId), sizeof(spaceId));
key.append(reinterpret_cast<const char*>(&edgeType), sizeof(edgeType));
return key;
}

std::string MetaServiceUtils::schemaEdgesPrefix(GraphSpaceID spaceId) {
std::string key;
key.reserve(kEdgesTable.size() + sizeof(GraphSpaceID));
Expand Down Expand Up @@ -250,5 +259,47 @@ std::string MetaServiceUtils::assembleSegmentKey(const std::string& segment,
return segmentKey;
}

cpp2::ErrorCode MetaServiceUtils::alterColumnDefs(std::vector<nebula::cpp2::ColumnDef>& cols,
const nebula::cpp2::ColumnDef col,
const cpp2::AlterSchemaOp op) {
switch (op) {
case cpp2::AlterSchemaOp::ADD :
{
for (auto it = cols.begin(); it != cols.end(); ++it) {
if (it->get_name() == col.get_name()) {
LOG(WARNING) << "Column existing : " << col.get_name();
return cpp2::ErrorCode::E_EXISTED;
}
}
cols.emplace_back(std::move(col));
return cpp2::ErrorCode::SUCCEEDED;
}
case cpp2::AlterSchemaOp::CHANGE :
{
for (auto it = cols.begin(); it != cols.end(); ++it) {
if (col.get_name() == it->get_name()) {
*it = col;
return cpp2::ErrorCode::SUCCEEDED;
}
}
break;
}
case cpp2::AlterSchemaOp::DROP :
{
for (auto it = cols.begin(); it != cols.end(); ++it) {
if (col.get_name() == it->get_name()) {
cols.erase(it);
return cpp2::ErrorCode::SUCCEEDED;
}
}
break;
}
default :
return cpp2::ErrorCode::E_UNKNOWN;
}
LOG(WARNING) << "Column not found : " << col.get_name();
return cpp2::ErrorCode::E_NOT_FOUND;
}

} // namespace meta
} // namespace nebula
8 changes: 7 additions & 1 deletion src/meta/MetaServiceUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#define META_METAUTILS_H_

#include "base/Base.h"
#include "interface/gen-cpp2/common_types.h"
#include "interface/gen-cpp2/meta_types.h"

namespace nebula {
namespace meta {
Expand Down Expand Up @@ -52,6 +52,8 @@ class MetaServiceUtils final {

static nebula::cpp2::HostAddr parseHostKey(folly::StringPiece key);

static std::string schemaEdgePrefix(GraphSpaceID spaceId, EdgeType edgeType);

static std::string schemaEdgesPrefix(GraphSpaceID spaceId);

static std::string schemaEdgeKey(GraphSpaceID spaceId, EdgeType edgeType, SchemaVer version);
Expand Down Expand Up @@ -79,6 +81,10 @@ class MetaServiceUtils final {
static std::string indexEdgeKey(GraphSpaceID spaceId, const std::string& name);

static std::string assembleSegmentKey(const std::string& segment, const std::string& key);

static cpp2::ErrorCode alterColumnDefs(std::vector<nebula::cpp2::ColumnDef>& cols,
const nebula::cpp2::ColumnDef col,
const cpp2::AlterSchemaOp op);
};

} // namespace meta
Expand Down
2 changes: 1 addition & 1 deletion src/meta/client/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ MetaClient::createTagSchema(GraphSpaceID spaceId, std::string name, nebula::cpp2
folly::Future<StatusOr<TagID>>
MetaClient::alterTagSchema(GraphSpaceID spaceId,
std::string name,
std::vector<cpp2::AlterTagItem> tagItems) {
std::vector<cpp2::AlterSchemaItem> tagItems) {
cpp2::AlterTagReq req;
req.set_space_id(std::move(spaceId));
req.set_tag_name(std::move(name));
Expand Down
5 changes: 3 additions & 2 deletions src/meta/client/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ class MetaClient {
createTagSchema(GraphSpaceID spaceId, std::string name, nebula::cpp2::Schema schema);

// TODO(boshengchen) need refresh tagNameIdMap and newestTagVerMap
folly::Future<StatusOr<TagID>>
alterTagSchema(GraphSpaceID spaceId, std::string name, std::vector<cpp2::AlterTagItem> items);
folly::Future<StatusOr<TagID>> alterTagSchema(GraphSpaceID spaceId,
std::string name,
std::vector<cpp2::AlterSchemaItem> items);

folly::Future<StatusOr<std::vector<cpp2::TagItem>>>
listTagSchemas(GraphSpaceID spaceId);
Expand Down
69 changes: 69 additions & 0 deletions src/meta/processors/AlterEdgeProcessor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/

#include "meta/processors/AlterEdgeProcessor.h"
#include "time/TimeUtils.h"

namespace nebula {
namespace meta {

void AlterEdgeProcessor::process(const cpp2::AlterEdgeReq& req) {
if (spaceExist(req.get_space_id()) == Status::SpaceNotFound()) {
resp_.set_code(cpp2::ErrorCode::E_NOT_FOUND);
onFinished();
return;
}
folly::SharedMutex::WriteHolder wHolder(LockUtils::edgeLock());
auto ret = getEdgeType(req.get_space_id(), req.get_edge_name());
if (!ret.ok()) {
resp_.set_code(to(ret.status()));
onFinished();
return;
}
auto edgeType = ret.value();

// Check the edge belongs to the space
std::unique_ptr<kvstore::KVIterator> iter;
auto edgePrefix = MetaServiceUtils::schemaEdgePrefix(req.get_space_id(), edgeType);
auto code = kvstore_->prefix(kDefaultSpaceId_, kDefaultPartId_, edgePrefix, &iter);
if (code != kvstore::ResultCode::SUCCEEDED || !iter->valid()) {
LOG(WARNING) << "Edge could not be found " << req.get_edge_name()
<< ", spaceId " << req.get_space_id() << ", edgeType " << edgeType;
resp_.set_code(cpp2::ErrorCode::E_NOT_FOUND);
onFinished();
return;
}

// Get lasted version of edge
auto version = MetaServiceUtils::parseEdgeVersion(iter->key()) + 1;
auto schema = MetaServiceUtils::parseSchema(iter->val());
auto columns = schema.get_columns();
auto& edgeItems = req.get_edge_items();
for (auto& edgeItem : edgeItems) {
auto& cols = edgeItem.get_schema().get_columns();
for (auto& col : cols) {
auto retCode = MetaServiceUtils::alterColumnDefs(columns, col, edgeItem.op);
if (retCode != cpp2::ErrorCode::SUCCEEDED) {
LOG(WARNING) << "Alter edge error " << static_cast<int32_t>(retCode);
resp_.set_code(retCode);
onFinished();
return;
}
}
}
schema.set_columns(std::move(columns));
std::vector<kvstore::KV> data;
LOG(INFO) << "Alter edge " << req.get_edge_name() << ", edgeTye " << edgeType;
data.emplace_back(MetaServiceUtils::schemaEdgeKey(req.get_space_id(), edgeType, version),
MetaServiceUtils::schemaEdgeVal(req.get_edge_name(), schema));
resp_.set_code(cpp2::ErrorCode::SUCCEEDED);
resp_.set_id(to(edgeType, EntryType::EDGE));
doPut(std::move(data));
}

} // namespace meta
} // namespace nebula

31 changes: 31 additions & 0 deletions src/meta/processors/AlterEdgeProcessor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/

#ifndef META_ALTEREDGEPROCESSOR_H_
#define META_ALTEREDGEPROCESSOR_H_

#include "meta/processors/BaseProcessor.h"

namespace nebula {
namespace meta {

class AlterEdgeProcessor : public BaseProcessor<cpp2::ExecResp> {
public:
static AlterEdgeProcessor* instance(kvstore::KVStore* kvstore) {
return new AlterEdgeProcessor(kvstore);
}

void process(const cpp2::AlterEdgeReq& req);

private:
explicit AlterEdgeProcessor(kvstore::KVStore* kvstore)
: BaseProcessor<cpp2::ExecResp>(kvstore) {}
};

} // namespace meta
} // namespace nebula
#endif // META_ALTEREDGEPROCESSOR_H_

Loading