Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check/create edge index #2058

Merged
merged 7 commits into from
Jul 17, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 51 additions & 49 deletions src/common/base/Status.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class Status final {
state_ = rhs.state_ == nullptr ? nullptr : copyState(rhs.state_.get());
}

Status& operator=(const Status &rhs) {
Status &operator=(const Status &rhs) {
// `state_ == rhs.state_' means either `this == &rhs',
// or both `*this' and `rhs' are OK
if (state_ != rhs.state_) {
Expand All @@ -42,7 +42,7 @@ class Status final {
state_ = std::move(rhs.state_);
}

Status& operator=(Status &&rhs) noexcept {
Status &operator=(Status &&rhs) noexcept {
// `state_ == rhs.state_' means either `this == &rhs',
// or both `*this' and `rhs' are OK
if (state_ != rhs.state_) {
Expand Down Expand Up @@ -72,26 +72,25 @@ class Status final {
return Status();
}

#define STATUS_GENERATOR(ERROR) \
static Status ERROR() { \
return Status(k##ERROR, ""); \
} \
\
static Status ERROR(folly::StringPiece msg) { \
return Status(k##ERROR, msg); \
} \
\
static Status ERROR(const char *fmt, ...) \
__attribute__((format(printf, 1, 2))) { \
va_list args; \
va_start(args, fmt); \
auto msg = format(fmt, args); \
va_end(args); \
return Status(k##ERROR, msg); \
} \
\
bool is##ERROR() const { \
return code() == k##ERROR; \
#define STATUS_GENERATOR(ERROR) \
static Status ERROR() { \
return Status(k##ERROR, ""); \
} \
\
static Status ERROR(folly::StringPiece msg) { \
return Status(k##ERROR, msg); \
} \
\
static Status ERROR(const char *fmt, ...) __attribute__((format(printf, 1, 2))) { \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Format.

va_list args; \
va_start(args, fmt); \
auto msg = format(fmt, args); \
va_end(args); \
return Status(k##ERROR, msg); \
} \
\
bool is##ERROR() const { \
return code() == k##ERROR; \
}
// Some succeeded codes
STATUS_GENERATOR(Inserted);
Expand All @@ -103,6 +102,7 @@ class Status final {

// Graph engine errors
STATUS_GENERATOR(SyntaxError);
STATUS_GENERATOR(MalformedRequest);
// Nothing is executed When command is comment
STATUS_GENERATOR(StatementEmpty);

Expand All @@ -128,49 +128,52 @@ class Status final {

std::string toString() const;

friend std::ostream& operator<<(std::ostream &os, const Status &status);
friend std::ostream &operator<<(std::ostream &os, const Status &status);

// If some kind of error really needs to be distinguished with others using a specific
// code, other than a general code and specific msg, you could add a new code below,
// e.g. kSomeError, and add the cooresponding STATUS_GENERATOR(SomeError)
enum Code : uint16_t {
// clang-format off
// OK
kOk = 0,
kInserted = 1,
kOk = 0,
kInserted = 1,
// 1xx, for general errors
kError = 101,
kNoSuchFile = 102,
kNotSupported = 103,
kError = 101,
kNoSuchFile = 102,
kNotSupported = 103,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Format.

// 2xx, for graph engine errors
kSyntaxError = 201,
kStatementEmpty = 202,
kSyntaxError = 201,
kStatementEmpty = 202,
kMalformedRequest = 203,
// 3xx, for storage engine errors
kKeyNotFound = 301,
kKeyNotFound = 301,
// 4xx, for meta service errors
kSpaceNotFound = 404,
kHostNotFound = 405,
kTagNotFound = 406,
kEdgeNotFound = 407,
kUserNotFound = 408,
kLeaderChanged = 409,
kBalanced = 410,
kIndexNotFound = 411,
kPartNotFound = 412,
kSpaceNotFound = 404,
kHostNotFound = 405,
kTagNotFound = 406,
kEdgeNotFound = 407,
kUserNotFound = 408,
kLeaderChanged = 409,
kBalanced = 410,
kIndexNotFound = 411,
kPartNotFound = 412,
// 5xx for user or permission error
kPermissionError = 501,
kPermissionError = 501,
// clang-format on
};

Code code() const {
if (state_ == nullptr) {
return kOk;
}
return reinterpret_cast<const Header*>(state_.get())->code_;
return reinterpret_cast<const Header *>(state_.get())->code_;
}

private:
// REQUIRES: stat_ != nullptr
uint16_t size() const {
return reinterpret_cast<const Header*>(state_.get())->size_;
return reinterpret_cast<const Header *>(state_.get())->size_;
}

Status(Code code, folly::StringPiece msg);
Expand All @@ -181,23 +184,22 @@ class Status final {

private:
struct Header {
uint16_t size_;
Code code_;
uint16_t size_;
Code code_;
};
static constexpr auto kHeaderSize = sizeof(Header);
// state_ == nullptr indicates OK
// otherwise, the buffer layout is:
// state_[0..1] length of the error msg, i.e. size() - kHeaderSize
// state_[2..3] code
// state_[4...] verbose error message
std::unique_ptr<const char[]> state_;
std::unique_ptr<const char[]> state_;
};


inline std::ostream& operator<<(std::ostream &os, const Status &status) {
inline std::ostream &operator<<(std::ostream &os, const Status &status) {
return os << status.toString();
}

} // namespace nebula

#endif // COMMON_BASE_STATUS_H_
#endif // COMMON_BASE_STATUS_H_
31 changes: 20 additions & 11 deletions src/graph/CreateEdgeIndexExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
namespace nebula {
namespace graph {

CreateEdgeIndexExecutor::CreateEdgeIndexExecutor(Sentence *sentence,
ExecutionContext *ectx) : Executor(ectx) {
sentence_ = static_cast<CreateEdgeIndexSentence*>(sentence);
CreateEdgeIndexExecutor::CreateEdgeIndexExecutor(Sentence *sentence, ExecutionContext *ectx)
: Executor(ectx) {
sentence_ = static_cast<CreateEdgeIndexSentence *>(sentence);
}

Status CreateEdgeIndexExecutor::prepare() {
Expand All @@ -31,14 +31,24 @@ void CreateEdgeIndexExecutor::execute() {
auto *edgeName = sentence_->edgeName();
auto columns = sentence_->names();
auto spaceId = ectx()->rctx()->session()->space();
if (UNLIKELY(columns.empty())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check about column's empty and column duplicate is in meta.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's my purpose.

// It's not allowed by parser in normal
LOG(WARNING) << "Impossible empty index fields.";
onError_(Status::MalformedRequest("Empty fields."));
return;
}
std::unordered_set<std::string> uniFields;
uniFields.reserve(columns.size());
uniFields.insert(columns.begin(), columns.end());
if (UNLIKELY(uniFields.size() != columns.size())) {
onError_(Status::MalformedRequest("Duplicate fields."));
return;
}

auto future = mc->createEdgeIndex(spaceId,
*name,
*edgeName,
columns,
sentence_->isIfNotExist());
auto future = mc->createEdgeIndex(
spaceId, *name, *edgeName, std::move(columns), sentence_->isIfNotExist());
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto &&resp) {
auto cb = [this](auto &&resp) {
if (!resp.ok()) {
DCHECK(onError_);
onError_(resp.status());
Expand All @@ -49,7 +59,7 @@ void CreateEdgeIndexExecutor::execute() {
onFinish_(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
auto error = [this](auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
onError_(Status::Error("Internal error"));
};
Expand All @@ -59,4 +69,3 @@ void CreateEdgeIndexExecutor::execute() {

} // namespace graph
} // namespace nebula

31 changes: 16 additions & 15 deletions src/meta/processors/indexMan/CreateEdgeIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,28 @@
namespace nebula {
namespace meta {

void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq& req) {
auto space = req.get_space_id();
CHECK_SPACE_ID_AND_RETURN(space);
void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq &req) {
const auto &indexName = req.get_index_name();
auto &edgeName = req.get_edge_name();
auto &fieldNames = req.get_fields();
if (fieldNames.empty()) {
if (UNLIKELY(fieldNames.empty())) {
LOG(ERROR) << "The index field of an edge type should not be empty.";
handleErrorCode(cpp2::ErrorCode::E_INVALID_PARM);
onFinished();
return;
}

std::set<std::string> columnSet(fieldNames.begin(), fieldNames.end());
if (fieldNames.size() != columnSet.size()) {
if (UNLIKELY(fieldNames.size() != columnSet.size())) {
LOG(ERROR) << "Conflict field in the edge index.";
handleErrorCode(cpp2::ErrorCode::E_CONFLICT);
onFinished();
return;
}

auto space = req.get_space_id();
CHECK_SPACE_ID_AND_RETURN(space);

folly::SharedMutex::WriteHolder wHolder(LockUtils::edgeIndexLock());
auto ret = getIndexID(space, indexName);
if (ret.ok()) {
Expand Down Expand Up @@ -62,17 +63,18 @@ void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq& req) {

auto latestEdgeSchema = retSchema.value();
if (tagOrEdgeHasTTL(latestEdgeSchema)) {
LOG(ERROR) << "Edge: " << edgeName << " has ttl, not create index";
handleErrorCode(cpp2::ErrorCode::E_INDEX_WITH_TTL);
onFinished();
return;
LOG(ERROR) << "Edge: " << edgeName << " has ttl, not create index";
handleErrorCode(cpp2::ErrorCode::E_INDEX_WITH_TTL);
onFinished();
return;
}

auto fields = getLatestEdgeFields(latestEdgeSchema);
std::vector<nebula::cpp2::ColumnDef> columns;
for (auto &field : fieldNames) {
auto iter = std::find_if(std::begin(fields), std::end(fields),
[field](const auto& pair) { return field == pair.first; });
auto iter = std::find_if(std::begin(fields), std::end(fields), [field](const auto &pair) {
return field == pair.first;
});

if (iter == fields.end()) {
LOG(ERROR) << "Field " << field << " not found in Edge " << edgeName;
Expand Down Expand Up @@ -108,14 +110,13 @@ void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq& req) {
item.set_fields(std::move(columns));

data.emplace_back(MetaServiceUtils::indexIndexKey(space, indexName),
std::string(reinterpret_cast<const char*>(&edgeIndex), sizeof(IndexID)));
std::string(reinterpret_cast<const char *>(&edgeIndex), sizeof(IndexID)));
data.emplace_back(MetaServiceUtils::indexKey(space, edgeIndex),
MetaServiceUtils::indexVal(item));
LOG(INFO) << "Create Edge Index " << indexName << ", edgeIndex " << edgeIndex;
resp_.set_id(to(edgeIndex, EntryType::INDEX));
doSyncPutAndUpdate(std::move(data));
}

} // namespace meta
} // namespace nebula

} // namespace meta
} // namespace nebula