Skip to content

Commit

Permalink
Check/create edge index (#2058)
Browse files Browse the repository at this point in the history
* Check the create edge index request early.

* Add some comments.

* Using std::move.
  • Loading branch information
Shylock-Hg authored Jul 17, 2020
1 parent 080c027 commit 9d1acc5
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 75 deletions.
100 changes: 51 additions & 49 deletions src/common/base/Status.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class Status final {
state_ = rhs.state_ == nullptr ? nullptr : copyState(rhs.state_.get());
}

Status& operator=(const Status &rhs) {
Status &operator=(const Status &rhs) {
// `state_ == rhs.state_' means either `this == &rhs',
// or both `*this' and `rhs' are OK
if (state_ != rhs.state_) {
Expand All @@ -42,7 +42,7 @@ class Status final {
state_ = std::move(rhs.state_);
}

Status& operator=(Status &&rhs) noexcept {
Status &operator=(Status &&rhs) noexcept {
// `state_ == rhs.state_' means either `this == &rhs',
// or both `*this' and `rhs' are OK
if (state_ != rhs.state_) {
Expand Down Expand Up @@ -72,26 +72,25 @@ class Status final {
return Status();
}

#define STATUS_GENERATOR(ERROR) \
static Status ERROR() { \
return Status(k##ERROR, ""); \
} \
\
static Status ERROR(folly::StringPiece msg) { \
return Status(k##ERROR, msg); \
} \
\
static Status ERROR(const char *fmt, ...) \
__attribute__((format(printf, 1, 2))) { \
va_list args; \
va_start(args, fmt); \
auto msg = format(fmt, args); \
va_end(args); \
return Status(k##ERROR, msg); \
} \
\
bool is##ERROR() const { \
return code() == k##ERROR; \
#define STATUS_GENERATOR(ERROR) \
static Status ERROR() { \
return Status(k##ERROR, ""); \
} \
\
static Status ERROR(folly::StringPiece msg) { \
return Status(k##ERROR, msg); \
} \
\
static Status ERROR(const char *fmt, ...) __attribute__((format(printf, 1, 2))) { \
va_list args; \
va_start(args, fmt); \
auto msg = format(fmt, args); \
va_end(args); \
return Status(k##ERROR, msg); \
} \
\
bool is##ERROR() const { \
return code() == k##ERROR; \
}
// Some succeeded codes
STATUS_GENERATOR(Inserted);
Expand All @@ -104,6 +103,7 @@ class Status final {

// Graph engine errors
STATUS_GENERATOR(SyntaxError);
STATUS_GENERATOR(MalformedRequest);
// Nothing is executed When command is comment
STATUS_GENERATOR(StatementEmpty);

Expand All @@ -129,50 +129,53 @@ class Status final {

std::string toString() const;

friend std::ostream& operator<<(std::ostream &os, const Status &status);
friend std::ostream &operator<<(std::ostream &os, const Status &status);

// If some kind of error really needs to be distinguished with others using a specific
// code, other than a general code and specific msg, you could add a new code below,
// e.g. kSomeError, and add the cooresponding STATUS_GENERATOR(SomeError)
enum Code : uint16_t {
// clang-format off
// OK
kOk = 0,
kInserted = 1,
kOk = 0,
kInserted = 1,
// 1xx, for general errors
kError = 101,
kNoSuchFile = 102,
kNotSupported = 103,
kError = 101,
kNoSuchFile = 102,
kNotSupported = 103,
kInvalidParameter = 104,
// 2xx, for graph engine errors
kSyntaxError = 201,
kStatementEmpty = 202,
kSyntaxError = 201,
kStatementEmpty = 202,
kMalformedRequest = 203,
// 3xx, for storage engine errors
kKeyNotFound = 301,
kKeyNotFound = 301,
// 4xx, for meta service errors
kSpaceNotFound = 404,
kHostNotFound = 405,
kTagNotFound = 406,
kEdgeNotFound = 407,
kUserNotFound = 408,
kLeaderChanged = 409,
kBalanced = 410,
kIndexNotFound = 411,
kPartNotFound = 412,
kSpaceNotFound = 404,
kHostNotFound = 405,
kTagNotFound = 406,
kEdgeNotFound = 407,
kUserNotFound = 408,
kLeaderChanged = 409,
kBalanced = 410,
kIndexNotFound = 411,
kPartNotFound = 412,
// 5xx for user or permission error
kPermissionError = 501,
kPermissionError = 501,
// clang-format on
};

Code code() const {
if (state_ == nullptr) {
return kOk;
}
return reinterpret_cast<const Header*>(state_.get())->code_;
return reinterpret_cast<const Header *>(state_.get())->code_;
}

private:
// REQUIRES: stat_ != nullptr
uint16_t size() const {
return reinterpret_cast<const Header*>(state_.get())->size_;
return reinterpret_cast<const Header *>(state_.get())->size_;
}

Status(Code code, folly::StringPiece msg);
Expand All @@ -183,23 +186,22 @@ class Status final {

private:
struct Header {
uint16_t size_;
Code code_;
uint16_t size_;
Code code_;
};
static constexpr auto kHeaderSize = sizeof(Header);
// state_ == nullptr indicates OK
// otherwise, the buffer layout is:
// state_[0..1] length of the error msg, i.e. size() - kHeaderSize
// state_[2..3] code
// state_[4...] verbose error message
std::unique_ptr<const char[]> state_;
std::unique_ptr<const char[]> state_;
};


inline std::ostream& operator<<(std::ostream &os, const Status &status) {
inline std::ostream &operator<<(std::ostream &os, const Status &status) {
return os << status.toString();
}

} // namespace nebula

#endif // COMMON_BASE_STATUS_H_
#endif // COMMON_BASE_STATUS_H_
31 changes: 20 additions & 11 deletions src/graph/CreateEdgeIndexExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
namespace nebula {
namespace graph {

CreateEdgeIndexExecutor::CreateEdgeIndexExecutor(Sentence *sentence,
ExecutionContext *ectx) : Executor(ectx) {
sentence_ = static_cast<CreateEdgeIndexSentence*>(sentence);
CreateEdgeIndexExecutor::CreateEdgeIndexExecutor(Sentence *sentence, ExecutionContext *ectx)
: Executor(ectx) {
sentence_ = static_cast<CreateEdgeIndexSentence *>(sentence);
}

Status CreateEdgeIndexExecutor::prepare() {
Expand All @@ -31,14 +31,24 @@ void CreateEdgeIndexExecutor::execute() {
auto *edgeName = sentence_->edgeName();
auto columns = sentence_->names();
auto spaceId = ectx()->rctx()->session()->space();
if (UNLIKELY(columns.empty())) {
// It's not allowed by parser in normal
LOG(WARNING) << "Impossible empty index fields.";
onError_(Status::MalformedRequest("Empty fields."));
return;
}
std::unordered_set<std::string> uniFields;
uniFields.reserve(columns.size());
uniFields.insert(columns.begin(), columns.end());
if (UNLIKELY(uniFields.size() != columns.size())) {
onError_(Status::MalformedRequest("Duplicate fields."));
return;
}

auto future = mc->createEdgeIndex(spaceId,
*name,
*edgeName,
columns,
sentence_->isIfNotExist());
auto future = mc->createEdgeIndex(
spaceId, *name, *edgeName, std::move(columns), sentence_->isIfNotExist());
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto &&resp) {
auto cb = [this](auto &&resp) {
if (!resp.ok()) {
DCHECK(onError_);
onError_(resp.status());
Expand All @@ -49,7 +59,7 @@ void CreateEdgeIndexExecutor::execute() {
onFinish_(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
auto error = [this](auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
onError_(Status::Error("Internal error"));
};
Expand All @@ -59,4 +69,3 @@ void CreateEdgeIndexExecutor::execute() {

} // namespace graph
} // namespace nebula

31 changes: 16 additions & 15 deletions src/meta/processors/indexMan/CreateEdgeIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,28 @@
namespace nebula {
namespace meta {

void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq& req) {
auto space = req.get_space_id();
CHECK_SPACE_ID_AND_RETURN(space);
void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq &req) {
const auto &indexName = req.get_index_name();
auto &edgeName = req.get_edge_name();
auto &fieldNames = req.get_fields();
if (fieldNames.empty()) {
if (UNLIKELY(fieldNames.empty())) {
LOG(ERROR) << "The index field of an edge type should not be empty.";
handleErrorCode(cpp2::ErrorCode::E_INVALID_PARM);
onFinished();
return;
}

std::set<std::string> columnSet(fieldNames.begin(), fieldNames.end());
if (fieldNames.size() != columnSet.size()) {
if (UNLIKELY(fieldNames.size() != columnSet.size())) {
LOG(ERROR) << "Conflict field in the edge index.";
handleErrorCode(cpp2::ErrorCode::E_CONFLICT);
onFinished();
return;
}

auto space = req.get_space_id();
CHECK_SPACE_ID_AND_RETURN(space);

folly::SharedMutex::WriteHolder wHolder(LockUtils::edgeIndexLock());
auto ret = getIndexID(space, indexName);
if (ret.ok()) {
Expand Down Expand Up @@ -62,17 +63,18 @@ void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq& req) {

auto latestEdgeSchema = retSchema.value();
if (tagOrEdgeHasTTL(latestEdgeSchema)) {
LOG(ERROR) << "Edge: " << edgeName << " has ttl, not create index";
handleErrorCode(cpp2::ErrorCode::E_INDEX_WITH_TTL);
onFinished();
return;
LOG(ERROR) << "Edge: " << edgeName << " has ttl, not create index";
handleErrorCode(cpp2::ErrorCode::E_INDEX_WITH_TTL);
onFinished();
return;
}

auto fields = getLatestEdgeFields(latestEdgeSchema);
std::vector<nebula::cpp2::ColumnDef> columns;
for (auto &field : fieldNames) {
auto iter = std::find_if(std::begin(fields), std::end(fields),
[field](const auto& pair) { return field == pair.first; });
auto iter = std::find_if(std::begin(fields), std::end(fields), [field](const auto &pair) {
return field == pair.first;
});

if (iter == fields.end()) {
LOG(ERROR) << "Field " << field << " not found in Edge " << edgeName;
Expand Down Expand Up @@ -108,14 +110,13 @@ void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq& req) {
item.set_fields(std::move(columns));

data.emplace_back(MetaServiceUtils::indexIndexKey(space, indexName),
std::string(reinterpret_cast<const char*>(&edgeIndex), sizeof(IndexID)));
std::string(reinterpret_cast<const char *>(&edgeIndex), sizeof(IndexID)));
data.emplace_back(MetaServiceUtils::indexKey(space, edgeIndex),
MetaServiceUtils::indexVal(item));
LOG(INFO) << "Create Edge Index " << indexName << ", edgeIndex " << edgeIndex;
resp_.set_id(to(edgeIndex, EntryType::INDEX));
doSyncPutAndUpdate(std::move(data));
}

} // namespace meta
} // namespace nebula

} // namespace meta
} // namespace nebula

0 comments on commit 9d1acc5

Please sign in to comment.