Skip to content

Commit

Permalink
Add AlterEdgeProcessor, GetEdgeProcessor, RemoveEdgeProcessor and UTs (
Browse files Browse the repository at this point in the history
…#346)

* add AlterEdgeProcessor, GetEdgeProcessor, RemoveEdgeProcessor

* address dangleptr's comment

* rebase master
modify SET to CHANGE

* fix UT
  • Loading branch information
laura-ding authored and dutor committed May 10, 2019
1 parent c5ca970 commit 166e434
Show file tree
Hide file tree
Showing 25 changed files with 835 additions and 122 deletions.
16 changes: 8 additions & 8 deletions src/graph/AlterTagExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ void AlterTagExecutor::execute() {
const auto& tagOpts = sentence_->tagOptList();
auto spaceId = ectx()->rctx()->session()->space();

std::vector<nebula::meta::cpp2::AlterTagItem> tagItems;
std::vector<nebula::meta::cpp2::AlterSchemaItem> tagItems;
for (auto& tagOpt : tagOpts) {
nebula::meta::cpp2::AlterTagItem tagItem;
nebula::meta::cpp2::AlterSchemaItem tagItem;
auto opType = getTagOpType(tagOpt->getOptType());
tagItem.set_op(std::move(opType));
const auto& specs = tagOpt->columnSpecs();
Expand Down Expand Up @@ -65,17 +65,17 @@ void AlterTagExecutor::execute() {
std::move(future).via(runner).thenValue(cb).thenError(error);
}

nebula::meta::cpp2::AlterTagOp
nebula::meta::cpp2::AlterSchemaOp
AlterTagExecutor::getTagOpType(const AlterTagOptItem::OptionType type) {
switch (type) {
case AlterTagOptItem::OptionType::ADD :
return nebula::meta::cpp2::AlterTagOp::ADD;
case AlterTagOptItem::OptionType::SET :
return nebula::meta::cpp2::AlterTagOp::SET;
return nebula::meta::cpp2::AlterSchemaOp::ADD;
case AlterTagOptItem::OptionType::CHANGE :
return nebula::meta::cpp2::AlterSchemaOp::CHANGE;
case AlterTagOptItem::OptionType::DROP :
return nebula::meta::cpp2::AlterTagOp::DROP;
return nebula::meta::cpp2::AlterSchemaOp::DROP;
default:
return nebula::meta::cpp2::AlterTagOp::UNKNOWN;
return nebula::meta::cpp2::AlterSchemaOp::UNKNOWN;
}
}
} // namespace graph
Expand Down
2 changes: 1 addition & 1 deletion src/graph/AlterTagExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class AlterTagExecutor final : public Executor {

private:
AlterTagSentence *sentence_{nullptr};
nebula::meta::cpp2::AlterTagOp
nebula::meta::cpp2::AlterSchemaOp
getTagOpType(const AlterTagOptItem::OptionType type);
};

Expand Down
5 changes: 1 addition & 4 deletions src/graph/test/SchemaTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ TEST_F(SchemaTest, metaCommunication) {
cpp2::ExecutionResponse resp;
std::string query = "ALTER TAG account "
"ADD (col1 int TTL = 200, col2 string), "
"SET (balance string), "
"CHANGE (balance string), "
"DROP (id)";
auto code = client->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
Expand Down Expand Up @@ -208,13 +208,10 @@ TEST_F(SchemaTest, metaCommunication) {
};
ASSERT_TRUE(verifyResult(resp, expected));
}
/* test the same tag in diff space, but now meta server not supported,
* will add a issue(#292) to resolve it */
{
cpp2::ExecutionResponse resp;
std::string query = "CREATE TAG person(name string, interest string)";
auto code = client->execute(query, resp);
sleep(FLAGS_load_data_interval_second + 1);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
}
{
Expand Down
43 changes: 25 additions & 18 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ enum ErrorCode {
} (cpp.enum_strict)


enum AlterTagOp {
enum AlterSchemaOp {
ADD = 0x01,
SET = 0x02,
CHANGE = 0x02,
DROP = 0x03,
UNKNOWN = 0x04,
} (cpp.enum_strict)
Expand Down Expand Up @@ -65,8 +65,8 @@ struct TagItem {
4: common.Schema schema,
}

struct AlterTagItem {
1: AlterTagOp op,
struct AlterSchemaItem {
1: AlterSchemaOp op,
2: common.Schema schema,
}

Expand Down Expand Up @@ -124,9 +124,9 @@ struct CreateTagReq {
}

struct AlterTagReq {
1: common.GraphSpaceID space_id,
2: string tag_name,
3: list<AlterTagItem> tag_items,
1: common.GraphSpaceID space_id,
2: string tag_name,
3: list<AlterSchemaItem> tag_items,
}

struct RemoveTagReq {
Expand Down Expand Up @@ -156,6 +156,23 @@ struct GetTagResp {
2: common.Schema schema,
}

struct GetEdgeReq {
1: common.GraphSpaceID space_id,
2: common.EdgeType edge_type,
3: common.SchemaVer version,
}

struct AlterEdgeReq {
1: common.GraphSpaceID space_id,
2: string edge_name,
3: list<AlterSchemaItem> edge_items,
}

struct GetEdgeResp {
1: ErrorCode code,
2: common.Schema schema,
}

// Edge related operations.
struct CreateEdgeReq {
1: common.GraphSpaceID space_id,
Expand All @@ -165,7 +182,7 @@ struct CreateEdgeReq {

struct RemoveEdgeReq {
1: common.GraphSpaceID space_id,
2: common.EdgeType edge_type,
2: string edge_name,
}

struct ListEdgesReq {
Expand All @@ -179,16 +196,6 @@ struct ListEdgesResp {
3: list<EdgeItem> edges,
}

struct GetEdgeReq {
1: common.GraphSpaceID space_id,
2: common.EdgeType edge_type,
3: common.SchemaVer version,
}

struct GetEdgeResp {
1: common.Schema schema,
}

// Host related operations.
struct AddHostsReq {
1: list<common.HostAddr> hosts;
Expand Down
7 changes: 5 additions & 2 deletions src/meta/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ add_library(
processors/DropSpaceProcessor.cpp
processors/GetPartsAllocProcessor.cpp
processors/CreateTagProcessor.cpp
processors/AlterTagProcessor.cpp
processors/CreateEdgeProcessor.cpp
processors/AlterTagProcessor.cpp
processors/AlterEdgeProcessor.cpp
processors/GetTagProcessor.cpp
processors/GetEdgeProcessor.cpp
processors/ListTagsProcessor.cpp
processors/RemoveTagProcessor.cpp
processors/ListEdgesProcessor.cpp
processors/RemoveTagProcessor.cpp
processors/RemoveEdgeProcessor.cpp
processors/GetProcessor.cpp
processors/MultiGetProcessor.cpp
processors/MultiPutProcessor.cpp
Expand Down
51 changes: 51 additions & 0 deletions src/meta/MetaServiceUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ nebula::cpp2::HostAddr MetaServiceUtils::parseHostKey(folly::StringPiece key) {
return host;
}

std::string MetaServiceUtils::schemaEdgePrefix(GraphSpaceID spaceId, EdgeType edgeType) {
std::string key;
key.reserve(128);
key.append(kEdgesTable.data(), kEdgesTable.size());
key.append(reinterpret_cast<const char*>(&spaceId), sizeof(spaceId));
key.append(reinterpret_cast<const char*>(&edgeType), sizeof(edgeType));
return key;
}

std::string MetaServiceUtils::schemaEdgesPrefix(GraphSpaceID spaceId) {
std::string key;
key.reserve(kEdgesTable.size() + sizeof(GraphSpaceID));
Expand Down Expand Up @@ -250,5 +259,47 @@ std::string MetaServiceUtils::assembleSegmentKey(const std::string& segment,
return segmentKey;
}

cpp2::ErrorCode MetaServiceUtils::alterColumnDefs(std::vector<nebula::cpp2::ColumnDef>& cols,
const nebula::cpp2::ColumnDef col,
const cpp2::AlterSchemaOp op) {
switch (op) {
case cpp2::AlterSchemaOp::ADD :
{
for (auto it = cols.begin(); it != cols.end(); ++it) {
if (it->get_name() == col.get_name()) {
LOG(WARNING) << "Column existing : " << col.get_name();
return cpp2::ErrorCode::E_EXISTED;
}
}
cols.emplace_back(std::move(col));
return cpp2::ErrorCode::SUCCEEDED;
}
case cpp2::AlterSchemaOp::CHANGE :
{
for (auto it = cols.begin(); it != cols.end(); ++it) {
if (col.get_name() == it->get_name()) {
*it = col;
return cpp2::ErrorCode::SUCCEEDED;
}
}
break;
}
case cpp2::AlterSchemaOp::DROP :
{
for (auto it = cols.begin(); it != cols.end(); ++it) {
if (col.get_name() == it->get_name()) {
cols.erase(it);
return cpp2::ErrorCode::SUCCEEDED;
}
}
break;
}
default :
return cpp2::ErrorCode::E_UNKNOWN;
}
LOG(WARNING) << "Column not found : " << col.get_name();
return cpp2::ErrorCode::E_NOT_FOUND;
}

} // namespace meta
} // namespace nebula
8 changes: 7 additions & 1 deletion src/meta/MetaServiceUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#define META_METAUTILS_H_

#include "base/Base.h"
#include "interface/gen-cpp2/common_types.h"
#include "interface/gen-cpp2/meta_types.h"

namespace nebula {
namespace meta {
Expand Down Expand Up @@ -52,6 +52,8 @@ class MetaServiceUtils final {

static nebula::cpp2::HostAddr parseHostKey(folly::StringPiece key);

static std::string schemaEdgePrefix(GraphSpaceID spaceId, EdgeType edgeType);

static std::string schemaEdgesPrefix(GraphSpaceID spaceId);

static std::string schemaEdgeKey(GraphSpaceID spaceId, EdgeType edgeType, SchemaVer version);
Expand Down Expand Up @@ -79,6 +81,10 @@ class MetaServiceUtils final {
static std::string indexEdgeKey(GraphSpaceID spaceId, const std::string& name);

static std::string assembleSegmentKey(const std::string& segment, const std::string& key);

static cpp2::ErrorCode alterColumnDefs(std::vector<nebula::cpp2::ColumnDef>& cols,
const nebula::cpp2::ColumnDef col,
const cpp2::AlterSchemaOp op);
};

} // namespace meta
Expand Down
2 changes: 1 addition & 1 deletion src/meta/client/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ MetaClient::createTagSchema(GraphSpaceID spaceId, std::string name, nebula::cpp2
folly::Future<StatusOr<TagID>>
MetaClient::alterTagSchema(GraphSpaceID spaceId,
std::string name,
std::vector<cpp2::AlterTagItem> tagItems) {
std::vector<cpp2::AlterSchemaItem> tagItems) {
cpp2::AlterTagReq req;
req.set_space_id(std::move(spaceId));
req.set_tag_name(std::move(name));
Expand Down
5 changes: 3 additions & 2 deletions src/meta/client/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ class MetaClient {
createTagSchema(GraphSpaceID spaceId, std::string name, nebula::cpp2::Schema schema);

// TODO(boshengchen) need refresh tagNameIdMap and newestTagVerMap
folly::Future<StatusOr<TagID>>
alterTagSchema(GraphSpaceID spaceId, std::string name, std::vector<cpp2::AlterTagItem> items);
folly::Future<StatusOr<TagID>> alterTagSchema(GraphSpaceID spaceId,
std::string name,
std::vector<cpp2::AlterSchemaItem> items);

folly::Future<StatusOr<std::vector<cpp2::TagItem>>>
listTagSchemas(GraphSpaceID spaceId);
Expand Down
69 changes: 69 additions & 0 deletions src/meta/processors/AlterEdgeProcessor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/

#include "meta/processors/AlterEdgeProcessor.h"
#include "time/TimeUtils.h"

namespace nebula {
namespace meta {

void AlterEdgeProcessor::process(const cpp2::AlterEdgeReq& req) {
if (spaceExist(req.get_space_id()) == Status::SpaceNotFound()) {
resp_.set_code(cpp2::ErrorCode::E_NOT_FOUND);
onFinished();
return;
}
folly::SharedMutex::WriteHolder wHolder(LockUtils::edgeLock());
auto ret = getEdgeType(req.get_space_id(), req.get_edge_name());
if (!ret.ok()) {
resp_.set_code(to(ret.status()));
onFinished();
return;
}
auto edgeType = ret.value();

// Check the edge belongs to the space
std::unique_ptr<kvstore::KVIterator> iter;
auto edgePrefix = MetaServiceUtils::schemaEdgePrefix(req.get_space_id(), edgeType);
auto code = kvstore_->prefix(kDefaultSpaceId_, kDefaultPartId_, edgePrefix, &iter);
if (code != kvstore::ResultCode::SUCCEEDED || !iter->valid()) {
LOG(WARNING) << "Edge could not be found " << req.get_edge_name()
<< ", spaceId " << req.get_space_id() << ", edgeType " << edgeType;
resp_.set_code(cpp2::ErrorCode::E_NOT_FOUND);
onFinished();
return;
}

// Get lasted version of edge
auto version = MetaServiceUtils::parseEdgeVersion(iter->key()) + 1;
auto schema = MetaServiceUtils::parseSchema(iter->val());
auto columns = schema.get_columns();
auto& edgeItems = req.get_edge_items();
for (auto& edgeItem : edgeItems) {
auto& cols = edgeItem.get_schema().get_columns();
for (auto& col : cols) {
auto retCode = MetaServiceUtils::alterColumnDefs(columns, col, edgeItem.op);
if (retCode != cpp2::ErrorCode::SUCCEEDED) {
LOG(WARNING) << "Alter edge error " << static_cast<int32_t>(retCode);
resp_.set_code(retCode);
onFinished();
return;
}
}
}
schema.set_columns(std::move(columns));
std::vector<kvstore::KV> data;
LOG(INFO) << "Alter edge " << req.get_edge_name() << ", edgeTye " << edgeType;
data.emplace_back(MetaServiceUtils::schemaEdgeKey(req.get_space_id(), edgeType, version),
MetaServiceUtils::schemaEdgeVal(req.get_edge_name(), schema));
resp_.set_code(cpp2::ErrorCode::SUCCEEDED);
resp_.set_id(to(edgeType, EntryType::EDGE));
doPut(std::move(data));
}

} // namespace meta
} // namespace nebula

31 changes: 31 additions & 0 deletions src/meta/processors/AlterEdgeProcessor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/

#ifndef META_ALTEREDGEPROCESSOR_H_
#define META_ALTEREDGEPROCESSOR_H_

#include "meta/processors/BaseProcessor.h"

namespace nebula {
namespace meta {

class AlterEdgeProcessor : public BaseProcessor<cpp2::ExecResp> {
public:
static AlterEdgeProcessor* instance(kvstore::KVStore* kvstore) {
return new AlterEdgeProcessor(kvstore);
}

void process(const cpp2::AlterEdgeReq& req);

private:
explicit AlterEdgeProcessor(kvstore::KVStore* kvstore)
: BaseProcessor<cpp2::ExecResp>(kvstore) {}
};

} // namespace meta
} // namespace nebula
#endif // META_ALTEREDGEPROCESSOR_H_

Loading

0 comments on commit 166e434

Please sign in to comment.