diff --git a/etc/nebula-metad.conf.production b/etc/nebula-metad.conf.production index eafba1cf0f9..b8e4b473755 100644 --- a/etc/nebula-metad.conf.production +++ b/etc/nebula-metad.conf.production @@ -35,5 +35,4 @@ --data_path=data/meta ############## rocksdb Options ############## ---rocksdb_disable_wal=false --rocksdb_wal_sync=true diff --git a/etc/nebula-storaged.conf.default b/etc/nebula-storaged.conf.default index c864eb8546b..58e3f537d2e 100644 --- a/etc/nebula-storaged.conf.default +++ b/etc/nebula-storaged.conf.default @@ -65,11 +65,7 @@ # In order to disable compression for level 0/1, set it to "no:no" --rocksdb_compression_per_level= ---rocksdb_disable_wal=true - ############## rocksdb Options ############## ---rocksdb_disable_wal=true - # rocksdb DBOptions in json, each name and value of option is a string, given as "option_name":"option_value" separated by comma --rocksdb_db_options={"max_subcompactions":"1","max_background_jobs":"1"} # rocksdb ColumnFamilyOptions in json, each name and value of option is string, given as "option_name":"option_value" separated by comma diff --git a/etc/nebula-storaged.conf.production b/etc/nebula-storaged.conf.production index 2c5a5fcf0a8..eabd3f67095 100644 --- a/etc/nebula-storaged.conf.production +++ b/etc/nebula-storaged.conf.production @@ -66,8 +66,6 @@ # In order to disable compression for level 0/1, set it to "no:no" --rocksdb_compression_per_level= ---rocksdb_disable_wal=true - # rocksdb DBOptions in json, each name and value of option is a string, given as "option_name":"option_value" separated by comma --rocksdb_db_options={"max_subcompactions":"4","max_background_jobs":"4"} # rocksdb ColumnFamilyOptions in json, each name and value of option is string, given as "option_name":"option_value" separated by comma diff --git a/kubernetes/helm/templates/configmap.yaml b/kubernetes/helm/templates/configmap.yaml index 6ddce867e68..df73ec9b684 100644 --- a/kubernetes/helm/templates/configmap.yaml +++ b/kubernetes/helm/templates/configmap.yaml @@ -156,7 +156,6 @@ data: --engine_type=rocksdb ############## rocksdb Options ############## - --rocksdb_disable_wal=true # rocksdb DBOptions in json, each name and value of option is a string, given as "option_name":"option_value" separated by comma --rocksdb_db_options={} # rocksdb ColumnFamilyOptions in json, each name and value of option is string, given as "option_name":"option_value" separated by comma diff --git a/src/common/base/Status.h b/src/common/base/Status.h index aaacb8a21c1..f6f233d1ce2 100644 --- a/src/common/base/Status.h +++ b/src/common/base/Status.h @@ -29,7 +29,7 @@ class Status final { state_ = rhs.state_ == nullptr ? nullptr : copyState(rhs.state_.get()); } - Status& operator=(const Status &rhs) { + Status &operator=(const Status &rhs) { // `state_ == rhs.state_' means either `this == &rhs', // or both `*this' and `rhs' are OK if (state_ != rhs.state_) { @@ -42,7 +42,7 @@ class Status final { state_ = std::move(rhs.state_); } - Status& operator=(Status &&rhs) noexcept { + Status &operator=(Status &&rhs) noexcept { // `state_ == rhs.state_' means either `this == &rhs', // or both `*this' and `rhs' are OK if (state_ != rhs.state_) { @@ -72,26 +72,25 @@ class Status final { return Status(); } -#define STATUS_GENERATOR(ERROR) \ - static Status ERROR() { \ - return Status(k##ERROR, ""); \ - } \ - \ - static Status ERROR(folly::StringPiece msg) { \ - return Status(k##ERROR, msg); \ - } \ - \ - static Status ERROR(const char *fmt, ...) \ - __attribute__((format(printf, 1, 2))) { \ - va_list args; \ - va_start(args, fmt); \ - auto msg = format(fmt, args); \ - va_end(args); \ - return Status(k##ERROR, msg); \ - } \ - \ - bool is##ERROR() const { \ - return code() == k##ERROR; \ +#define STATUS_GENERATOR(ERROR) \ + static Status ERROR() { \ + return Status(k##ERROR, ""); \ + } \ + \ + static Status ERROR(folly::StringPiece msg) { \ + return Status(k##ERROR, msg); \ + } \ + \ + static Status ERROR(const char *fmt, ...) __attribute__((format(printf, 1, 2))) { \ + va_list args; \ + va_start(args, fmt); \ + auto msg = format(fmt, args); \ + va_end(args); \ + return Status(k##ERROR, msg); \ + } \ + \ + bool is##ERROR() const { \ + return code() == k##ERROR; \ } // Some succeeded codes STATUS_GENERATOR(Inserted); @@ -104,6 +103,7 @@ class Status final { // Graph engine errors STATUS_GENERATOR(SyntaxError); + STATUS_GENERATOR(MalformedRequest); // Nothing is executed When command is comment STATUS_GENERATOR(StatementEmpty); @@ -129,50 +129,53 @@ class Status final { std::string toString() const; - friend std::ostream& operator<<(std::ostream &os, const Status &status); + friend std::ostream &operator<<(std::ostream &os, const Status &status); // If some kind of error really needs to be distinguished with others using a specific // code, other than a general code and specific msg, you could add a new code below, // e.g. kSomeError, and add the cooresponding STATUS_GENERATOR(SomeError) enum Code : uint16_t { + // clang-format off // OK - kOk = 0, - kInserted = 1, + kOk = 0, + kInserted = 1, // 1xx, for general errors - kError = 101, - kNoSuchFile = 102, - kNotSupported = 103, + kError = 101, + kNoSuchFile = 102, + kNotSupported = 103, kInvalidParameter = 104, // 2xx, for graph engine errors - kSyntaxError = 201, - kStatementEmpty = 202, + kSyntaxError = 201, + kStatementEmpty = 202, + kMalformedRequest = 203, // 3xx, for storage engine errors - kKeyNotFound = 301, + kKeyNotFound = 301, // 4xx, for meta service errors - kSpaceNotFound = 404, - kHostNotFound = 405, - kTagNotFound = 406, - kEdgeNotFound = 407, - kUserNotFound = 408, - kLeaderChanged = 409, - kBalanced = 410, - kIndexNotFound = 411, - kPartNotFound = 412, + kSpaceNotFound = 404, + kHostNotFound = 405, + kTagNotFound = 406, + kEdgeNotFound = 407, + kUserNotFound = 408, + kLeaderChanged = 409, + kBalanced = 410, + kIndexNotFound = 411, + kPartNotFound = 412, // 5xx for user or permission error - kPermissionError = 501, + kPermissionError = 501, + // clang-format on }; Code code() const { if (state_ == nullptr) { return kOk; } - return reinterpret_cast(state_.get())->code_; + return reinterpret_cast(state_.get())->code_; } private: // REQUIRES: stat_ != nullptr uint16_t size() const { - return reinterpret_cast(state_.get())->size_; + return reinterpret_cast(state_.get())->size_; } Status(Code code, folly::StringPiece msg); @@ -183,8 +186,8 @@ class Status final { private: struct Header { - uint16_t size_; - Code code_; + uint16_t size_; + Code code_; }; static constexpr auto kHeaderSize = sizeof(Header); // state_ == nullptr indicates OK @@ -192,14 +195,13 @@ class Status final { // state_[0..1] length of the error msg, i.e. size() - kHeaderSize // state_[2..3] code // state_[4...] verbose error message - std::unique_ptr state_; + std::unique_ptr state_; }; - -inline std::ostream& operator<<(std::ostream &os, const Status &status) { +inline std::ostream &operator<<(std::ostream &os, const Status &status) { return os << status.toString(); } } // namespace nebula -#endif // COMMON_BASE_STATUS_H_ +#endif // COMMON_BASE_STATUS_H_ diff --git a/src/graph/CreateEdgeIndexExecutor.cpp b/src/graph/CreateEdgeIndexExecutor.cpp index fec9a46e49a..7f87ecbd725 100644 --- a/src/graph/CreateEdgeIndexExecutor.cpp +++ b/src/graph/CreateEdgeIndexExecutor.cpp @@ -9,9 +9,9 @@ namespace nebula { namespace graph { -CreateEdgeIndexExecutor::CreateEdgeIndexExecutor(Sentence *sentence, - ExecutionContext *ectx) : Executor(ectx) { - sentence_ = static_cast(sentence); +CreateEdgeIndexExecutor::CreateEdgeIndexExecutor(Sentence *sentence, ExecutionContext *ectx) + : Executor(ectx) { + sentence_ = static_cast(sentence); } Status CreateEdgeIndexExecutor::prepare() { @@ -31,14 +31,24 @@ void CreateEdgeIndexExecutor::execute() { auto *edgeName = sentence_->edgeName(); auto columns = sentence_->names(); auto spaceId = ectx()->rctx()->session()->space(); + if (UNLIKELY(columns.empty())) { + // It's not allowed by parser in normal + LOG(WARNING) << "Impossible empty index fields."; + onError_(Status::MalformedRequest("Empty fields.")); + return; + } + std::unordered_set uniFields; + uniFields.reserve(columns.size()); + uniFields.insert(columns.begin(), columns.end()); + if (UNLIKELY(uniFields.size() != columns.size())) { + onError_(Status::MalformedRequest("Duplicate fields.")); + return; + } - auto future = mc->createEdgeIndex(spaceId, - *name, - *edgeName, - columns, - sentence_->isIfNotExist()); + auto future = mc->createEdgeIndex( + spaceId, *name, *edgeName, std::move(columns), sentence_->isIfNotExist()); auto *runner = ectx()->rctx()->runner(); - auto cb = [this] (auto &&resp) { + auto cb = [this](auto &&resp) { if (!resp.ok()) { DCHECK(onError_); onError_(resp.status()); @@ -49,7 +59,7 @@ void CreateEdgeIndexExecutor::execute() { onFinish_(Executor::ProcessControl::kNext); }; - auto error = [this] (auto &&e) { + auto error = [this](auto &&e) { LOG(ERROR) << "Exception caught: " << e.what(); onError_(Status::Error("Internal error")); }; @@ -59,4 +69,3 @@ void CreateEdgeIndexExecutor::execute() { } // namespace graph } // namespace nebula - diff --git a/src/graph/test/IndexTest.cpp b/src/graph/test/IndexTest.cpp index 4a2882bfc39..494c26b2dd1 100644 --- a/src/graph/test/IndexTest.cpp +++ b/src/graph/test/IndexTest.cpp @@ -55,6 +55,12 @@ TEST_F(IndexTest, TagIndex) { auto code = client->execute(query, resp); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); } + { + cpp2::ExecutionResponse resp; + std::string query = "CREATE TAG INDEX duplicate_person_index ON person(name)"; + auto code = client->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::E_EXECUTION_ERROR, code); + } // Tag not exist { cpp2::ExecutionResponse resp; @@ -83,6 +89,24 @@ TEST_F(IndexTest, TagIndex) { auto code = client->execute(query, resp); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); } + { + cpp2::ExecutionResponse resp; + std::string query = "CREATE TAG INDEX duplicate_person_index ON person(name, email)"; + auto code = client->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::E_EXECUTION_ERROR, code); + } + { + cpp2::ExecutionResponse resp; + std::string query = "CREATE TAG INDEX duplicate_index ON person(name, name)"; + auto code = client->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::E_EXECUTION_ERROR, code); + } + { + cpp2::ExecutionResponse resp; + std::string query = "CREATE TAG INDEX disorder_person_index ON person(email, name)"; + auto code = client->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + } { cpp2::ExecutionResponse resp; auto query = "INSERT VERTEX person(name, age, gender, email) VALUES " @@ -133,12 +157,6 @@ TEST_F(IndexTest, TagIndex) { ASSERT_NE("FAILED", columns[1].get_str()); } } - { - cpp2::ExecutionResponse resp; - std::string query = "CREATE TAG INDEX duplicate_index ON person(name, name)"; - auto code = client->execute(query, resp); - ASSERT_EQ(cpp2::ErrorCode::E_EXECUTION_ERROR, code); - } // Describe Tag Index { cpp2::ExecutionResponse resp; @@ -179,6 +197,7 @@ TEST_F(IndexTest, TagIndex) { std::vector> expected{ {"single_person_index"}, {"multi_person_index"}, + {"disorder_person_index"}, }; ASSERT_TRUE(verifyResult(resp, expected, true, {0})); } @@ -242,6 +261,12 @@ TEST_F(IndexTest, EdgeIndex) { auto code = client->execute(query, resp); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); } + { + cpp2::ExecutionResponse resp; + std::string query = "CREATE EDGE INDEX duplicate_friend_index ON friend(degree)"; + auto code = client->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::E_EXECUTION_ERROR, code); + } // Edge not exist { cpp2::ExecutionResponse resp; @@ -270,6 +295,25 @@ TEST_F(IndexTest, EdgeIndex) { auto code = client->execute(query, resp); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); } + { + cpp2::ExecutionResponse resp; + std::string query = "CREATE EDGE INDEX duplicate_friend_index " + "ON friend(degree, start_time)"; + auto code = client->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::E_EXECUTION_ERROR, code); + } + { + cpp2::ExecutionResponse resp; + std::string query = "CREATE EDGE INDEX duplicate_index ON friend(degree, degree)"; + auto code = client->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::E_EXECUTION_ERROR, code); + } + { + cpp2::ExecutionResponse resp; + std::string query = "CREATE EDGE INDEX disorder_friend_index ON friend(start_time, degree)"; + auto code = client->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + } { cpp2::ExecutionResponse resp; auto query = "INSERT EDGE friend(degree, start_time) VALUES " @@ -320,12 +364,6 @@ TEST_F(IndexTest, EdgeIndex) { ASSERT_NE("FAILED", columns[1].get_str()); } } - { - cpp2::ExecutionResponse resp; - std::string query = "CREATE EDGE INDEX duplicate_index ON friend(degree, degree)"; - auto code = client->execute(query, resp); - ASSERT_EQ(cpp2::ErrorCode::E_EXECUTION_ERROR, code); - } // Describe Edge Index { cpp2::ExecutionResponse resp; @@ -366,6 +404,7 @@ TEST_F(IndexTest, EdgeIndex) { std::vector> expected{ {"single_friend_index"}, {"multi_friend_index"}, + {"disorder_friend_index"}, }; ASSERT_TRUE(verifyResult(resp, expected, true, {0})); } diff --git a/src/kvstore/RocksEngineConfig.cpp b/src/kvstore/RocksEngineConfig.cpp index c331b5bffea..7ad18a25866 100644 --- a/src/kvstore/RocksEngineConfig.cpp +++ b/src/kvstore/RocksEngineConfig.cpp @@ -17,7 +17,7 @@ // [WAL] DEFINE_bool(rocksdb_disable_wal, - true, + false, "Whether to disable the WAL in rocksdb"); DEFINE_bool(rocksdb_wal_sync, diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index 981569555ee..972133ca57b 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -316,8 +316,8 @@ void RaftPart::start(std::vector&& peers, bool asLearner) { startTimeMs_ = time::WallClock::fastNowInMilliSec(); // Set up a leader election task size_t delayMS = 100 + folly::Random::rand32(900); - bgWorkers_->addDelayTask(delayMS, [self = shared_from_this()] { - self->statusPolling(); + bgWorkers_->addDelayTask(delayMS, [self = shared_from_this(), startTime = startTimeMs_] { + self->statusPolling(startTime); }); } @@ -1192,7 +1192,16 @@ bool RaftPart::leaderElection() { } -void RaftPart::statusPolling() { +void RaftPart::statusPolling(int64_t startTime) { + { + std::lock_guard g(raftLock_); + // If startTime is not same as the time when `statusPolling` is add to event loop, + // it means the part has been restarted (it only happens in ut for now), so don't + // add another `statusPolling`. + if (startTime != startTimeMs_) { + return; + } + } size_t delay = FLAGS_raft_heartbeat_interval_secs * 1000 / 3; if (needToStartElection()) { if (leaderElection()) { @@ -1217,8 +1226,8 @@ void RaftPart::statusPolling() { VLOG(3) << idStr_ << "Schedule new task"; bgWorkers_->addDelayTask( delay, - [self = shared_from_this()] { - self->statusPolling(); + [self = shared_from_this(), startTime] { + self->statusPolling(startTime); }); } } diff --git a/src/kvstore/raftex/RaftPart.h b/src/kvstore/raftex/RaftPart.h index 3af52f49b98..2deb426ba95 100644 --- a/src/kvstore/raftex/RaftPart.h +++ b/src/kvstore/raftex/RaftPart.h @@ -328,7 +328,7 @@ class RaftPart : public std::enable_shared_from_this { bool needToStartElection(); - void statusPolling(); + void statusPolling(int64_t startTime); bool needToCleanupSnapshot(); diff --git a/src/meta/processors/BaseProcessor.h b/src/meta/processors/BaseProcessor.h index c0327e4e97d..c6590d741c4 100644 --- a/src/meta/processors/BaseProcessor.h +++ b/src/meta/processors/BaseProcessor.h @@ -237,7 +237,7 @@ class BaseProcessor { void doSyncMultiRemoveAndUpdate(std::vector keys); /** - * check if the edge or tag contains indexes when alter edge or tag. + * Check the edge or tag contains indexes when alter it. **/ cpp2::ErrorCode indexCheck(const std::vector& items, const std::vector& alterItems); @@ -245,6 +245,9 @@ class BaseProcessor { StatusOr> getIndexes(GraphSpaceID spaceId, int32_t tagOrEdge); + bool checkIndexExist(const std::vector& fields, + const nebula::cpp2::IndexItem& item); + protected: kvstore::KVStore* kvstore_ = nullptr; RESP resp_; diff --git a/src/meta/processors/BaseProcessor.inl b/src/meta/processors/BaseProcessor.inl index eb1c66734e7..6443a6b76bb 100644 --- a/src/meta/processors/BaseProcessor.inl +++ b/src/meta/processors/BaseProcessor.inl @@ -469,5 +469,21 @@ BaseProcessor::indexCheck(const std::vector& item return cpp2::ErrorCode::SUCCEEDED; } +template +bool BaseProcessor::checkIndexExist(const std::vector& fields, + const nebula::cpp2::IndexItem& item) { + for (size_t i = 0; i < fields.size(); i++) { + if (fields[i] != item.get_fields()[i].get_name()) { + break; + } + + if (i == fields.size() - 1) { + LOG(ERROR) << "Index " << item.get_index_name() << " have existed"; + return true; + } + } + return false; +} + } // namespace meta } // namespace nebula diff --git a/src/meta/processors/indexMan/CreateEdgeIndexProcessor.cpp b/src/meta/processors/indexMan/CreateEdgeIndexProcessor.cpp index 334a4e0f7aa..b943abd11a6 100644 --- a/src/meta/processors/indexMan/CreateEdgeIndexProcessor.cpp +++ b/src/meta/processors/indexMan/CreateEdgeIndexProcessor.cpp @@ -9,13 +9,11 @@ namespace nebula { namespace meta { -void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq& req) { - auto space = req.get_space_id(); - CHECK_SPACE_ID_AND_RETURN(space); +void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq &req) { const auto &indexName = req.get_index_name(); auto &edgeName = req.get_edge_name(); auto &fieldNames = req.get_fields(); - if (fieldNames.empty()) { + if (UNLIKELY(fieldNames.empty())) { LOG(ERROR) << "The index field of an edge type should not be empty."; handleErrorCode(cpp2::ErrorCode::E_INVALID_PARM); onFinished(); @@ -23,13 +21,16 @@ void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq& req) { } std::set columnSet(fieldNames.begin(), fieldNames.end()); - if (fieldNames.size() != columnSet.size()) { + if (UNLIKELY(fieldNames.size() != columnSet.size())) { LOG(ERROR) << "Conflict field in the edge index."; handleErrorCode(cpp2::ErrorCode::E_CONFLICT); onFinished(); return; } + auto space = req.get_space_id(); + CHECK_SPACE_ID_AND_RETURN(space); + folly::SharedMutex::WriteHolder wHolder(LockUtils::edgeIndexLock()); auto ret = getIndexID(space, indexName); if (ret.ok()) { @@ -53,26 +54,54 @@ void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq& req) { } auto edgeType = edgeTypeRet.value(); - auto retSchema = getLatestEdgeSchema(space, edgeType); - if (!retSchema.ok()) { + auto prefix = MetaServiceUtils::indexPrefix(space); + std::unique_ptr checkIter; + auto checkRet = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &checkIter); + if (checkRet != kvstore::ResultCode::SUCCEEDED) { + resp_.set_code(MetaCommon::to(checkRet)); + onFinished(); + return; + } + + while (checkIter->valid()) { + auto val = checkIter->val(); + auto item = MetaServiceUtils::parseIndex(val); + if (item.get_schema_id().getType() != nebula::cpp2::SchemaID::Type::edge_type || + fieldNames.size() > item.get_fields().size() || + edgeType != item.get_schema_id().get_edge_type()) { + checkIter->next(); + continue; + } + + if (checkIndexExist(fieldNames, item)) { + resp_.set_code(cpp2::ErrorCode::E_EXISTED); + onFinished(); + return; + } + checkIter->next(); + } + + auto schemaRet = getLatestEdgeSchema(space, edgeType); + if (!schemaRet.ok()) { handleErrorCode(cpp2::ErrorCode::E_NOT_FOUND); onFinished(); return; } - auto latestEdgeSchema = retSchema.value(); + auto latestEdgeSchema = schemaRet.value(); if (tagOrEdgeHasTTL(latestEdgeSchema)) { - LOG(ERROR) << "Edge: " << edgeName << " has ttl, not create index"; - handleErrorCode(cpp2::ErrorCode::E_INDEX_WITH_TTL); - onFinished(); - return; + LOG(ERROR) << "Edge: " << edgeName << " has ttl, not create index"; + handleErrorCode(cpp2::ErrorCode::E_INDEX_WITH_TTL); + onFinished(); + return; } auto fields = getLatestEdgeFields(latestEdgeSchema); std::vector columns; for (auto &field : fieldNames) { - auto iter = std::find_if(std::begin(fields), std::end(fields), - [field](const auto& pair) { return field == pair.first; }); + auto iter = std::find_if(std::begin(fields), std::end(fields), [field](const auto &pair) { + return field == pair.first; + }); if (iter == fields.end()) { LOG(ERROR) << "Field " << field << " not found in Edge " << edgeName; @@ -108,7 +137,7 @@ void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq& req) { item.set_fields(std::move(columns)); data.emplace_back(MetaServiceUtils::indexIndexKey(space, indexName), - std::string(reinterpret_cast(&edgeIndex), sizeof(IndexID))); + std::string(reinterpret_cast(&edgeIndex), sizeof(IndexID))); data.emplace_back(MetaServiceUtils::indexKey(space, edgeIndex), MetaServiceUtils::indexVal(item)); LOG(INFO) << "Create Edge Index " << indexName << ", edgeIndex " << edgeIndex; @@ -116,6 +145,5 @@ void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq& req) { doSyncPutAndUpdate(std::move(data)); } -} // namespace meta -} // namespace nebula - +} // namespace meta +} // namespace nebula diff --git a/src/meta/processors/indexMan/CreateTagIndexProcessor.cpp b/src/meta/processors/indexMan/CreateTagIndexProcessor.cpp index 01f8d801c7a..953eab1572f 100644 --- a/src/meta/processors/indexMan/CreateTagIndexProcessor.cpp +++ b/src/meta/processors/indexMan/CreateTagIndexProcessor.cpp @@ -51,14 +51,41 @@ void CreateTagIndexProcessor::process(const cpp2::CreateTagIndexReq& req) { } auto tagID = tagIDRet.value(); - auto retSchema = getLatestTagSchema(space, tagID); - if (!retSchema.ok()) { + auto prefix = MetaServiceUtils::indexPrefix(space); + std::unique_ptr checkIter; + auto checkRet = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &checkIter); + if (checkRet != kvstore::ResultCode::SUCCEEDED) { + resp_.set_code(MetaCommon::to(checkRet)); + onFinished(); + return; + } + + while (checkIter->valid()) { + auto val = checkIter->val(); + auto item = MetaServiceUtils::parseIndex(val); + if (item.get_schema_id().getType() != nebula::cpp2::SchemaID::Type::tag_id || + fieldNames.size() > item.get_fields().size() || + tagID != item.get_schema_id().get_tag_id()) { + checkIter->next(); + continue; + } + + if (checkIndexExist(fieldNames, item)) { + resp_.set_code(cpp2::ErrorCode::E_EXISTED); + onFinished(); + return; + } + checkIter->next(); + } + + auto schemaRet = getLatestTagSchema(space, tagID); + if (!schemaRet.ok()) { handleErrorCode(cpp2::ErrorCode::E_NOT_FOUND); onFinished(); return; } - auto latestTagSchema = retSchema.value(); + auto latestTagSchema = schemaRet.value(); if (tagOrEdgeHasTTL(latestTagSchema)) { LOG(ERROR) << "Tag: " << tagName << " has ttl, not create index"; handleErrorCode(cpp2::ErrorCode::E_INDEX_WITH_TTL); diff --git a/src/meta/test/BalanceIntegrationTest.cpp b/src/meta/test/BalanceIntegrationTest.cpp index e8389516cfa..d51d106e175 100644 --- a/src/meta/test/BalanceIntegrationTest.cpp +++ b/src/meta/test/BalanceIntegrationTest.cpp @@ -337,11 +337,12 @@ TEST(BalanceIntegrationTest, LeaderBalanceTest) { LOG(INFO) << "Waiting for the leader balance"; sleep(FLAGS_raft_heartbeat_interval_secs + 1); + size_t leaderCount = 0; for (int i = 0; i < replica; i++) { std::unordered_map> leaderIds; - EXPECT_LE(2, serverContexts[i]->kvStore_->allLeader(leaderIds)); - EXPECT_GE(4, serverContexts[i]->kvStore_->allLeader(leaderIds)); + leaderCount += serverContexts[i]->kvStore_->allLeader(leaderIds); } + EXPECT_EQ(9, leaderCount); for (auto& c : metaClients) { c->stop(); } diff --git a/src/meta/test/MetaClientTest.cpp b/src/meta/test/MetaClientTest.cpp index 88882b894d8..1ee1220b1b4 100644 --- a/src/meta/test/MetaClientTest.cpp +++ b/src/meta/test/MetaClientTest.cpp @@ -638,6 +638,14 @@ TEST(MetaClientTest, TagIndexTest) { ASSERT_TRUE(result.ok()); singleFieldIndexID = result.value(); } + { + std::vector&& fields {"tag_0_col_0"}; + auto result = client->createTagIndex(space, + "tag_duplicate_field_index", + "tag_0", + std::move(fields)).get(); + ASSERT_FALSE(result.ok()); + } { std::vector&& fields {"tag_0_col_0", "tag_0_col_1"}; auto result = client->createTagIndex(space, @@ -647,6 +655,30 @@ TEST(MetaClientTest, TagIndexTest) { ASSERT_TRUE(result.ok()); multiFieldIndexID = result.value(); } + { + std::vector&& fields {"tag_0_col_1", "tag_0_col_0"}; + auto result = client->createTagIndex(space, + "tag_disorder_field_index", + "tag_0", + std::move(fields)).get(); + ASSERT_TRUE(result.ok()); + } + { + std::vector&& fields {"tag_0_col_0", "tag_0_col_1"}; + auto result = client->createTagIndex(space, + "tag_duplicate_field_index", + "tag_0", + std::move(fields)).get(); + ASSERT_FALSE(result.ok()); + } + { + std::vector&& fields {"tag_0_col_0"}; + auto result = client->createTagIndex(space, + "tag_duplicate_field_index", + "tag_0", + std::move(fields)).get(); + ASSERT_FALSE(result.ok()); + } { std::vector&& fields {"tag_0_col_0", "not_exist_field"}; auto result = client->createTagIndex(space, @@ -677,7 +709,7 @@ TEST(MetaClientTest, TagIndexTest) { { auto result = client->listTagIndexes(space).get(); auto values = result.value(); - ASSERT_EQ(2, values.size()); + ASSERT_EQ(3, values.size()); { nebula::cpp2::ColumnDef singleColumn; @@ -711,6 +743,26 @@ TEST(MetaClientTest, TagIndexTest) { auto multiFieldResult = values[1].get_fields(); ASSERT_TRUE(TestUtils::verifyResult(columns, multiFieldResult)); } + + { + std::vector columns; + nebula::cpp2::ColumnDef stringColumn; + stringColumn.set_name("tag_0_col_1"); + nebula::cpp2::ValueType stringType; + stringType.set_type(SupportedType::STRING); + stringColumn.set_type(std::move(stringType)); + columns.emplace_back(std::move(stringColumn)); + + nebula::cpp2::ColumnDef intColumn; + intColumn.set_name("tag_0_col_0"); + nebula::cpp2::ValueType intType; + intType.set_type(SupportedType::INT); + intColumn.set_type(std::move(intType)); + columns.emplace_back(std::move(intColumn)); + + auto disorderFieldResult = values[2].get_fields(); + ASSERT_TRUE(TestUtils::verifyResult(columns, disorderFieldResult)); + } } sleep(FLAGS_heartbeat_interval_secs * 5); // Test Tag Index Properties Cache @@ -735,7 +787,7 @@ TEST(MetaClientTest, TagIndexTest) { { auto tagIndexes = client->getTagIndexesFromCache(space); ASSERT_TRUE(tagIndexes.ok()); - ASSERT_EQ(2, tagIndexes.value().size()); + ASSERT_EQ(3, tagIndexes.value().size()); } { GraphSpaceID spaceNotExist = 99; @@ -814,6 +866,14 @@ TEST(MetaClientTest, EdgeIndexTest) { ASSERT_TRUE(result.ok()); singleFieldIndexID = result.value(); } + { + std::vector&& fields {"edge_0_col_0"}; + auto result = client->createEdgeIndex(space, + "edge_duplicate_field_index", + "edge_0", + std::move(fields)).get(); + ASSERT_FALSE(result.ok()); + } { std::vector&& fields {"edge_0_col_0", "edge_0_col_1"}; auto result = client->createEdgeIndex(space, @@ -823,6 +883,30 @@ TEST(MetaClientTest, EdgeIndexTest) { ASSERT_TRUE(result.ok()); multiFieldIndexID = result.value(); } + { + std::vector&& fields {"edge_0_col_1", "edge_0_col_0"}; + auto result = client->createEdgeIndex(space, + "edge_disorder_field_index", + "edge_0", + std::move(fields)).get(); + ASSERT_TRUE(result.ok()); + } + { + std::vector&& fields {"edge_0_col_0", "edge_0_col_1"}; + auto result = client->createEdgeIndex(space, + "edge_duplicate_field_index", + "edge_0", + std::move(fields)).get(); + ASSERT_FALSE(result.ok()); + } + { + std::vector&& fields {"edge_0_col_0"}; + auto result = client->createEdgeIndex(space, + "edge_duplicate_field_index", + "edge_0", + std::move(fields)).get(); + ASSERT_FALSE(result.ok()); + } { std::vector&& fields {"edge_0_col_0", "edge_0_col_1"}; auto result = client->createEdgeIndex(space, @@ -853,7 +937,7 @@ TEST(MetaClientTest, EdgeIndexTest) { { auto result = client->listEdgeIndexes(space).get(); auto values = result.value(); - ASSERT_EQ(2, values.size()); + ASSERT_EQ(3, values.size()); { nebula::cpp2::ColumnDef column; @@ -885,6 +969,23 @@ TEST(MetaClientTest, EdgeIndexTest) { auto multiFieldResult = values[1].get_fields(); ASSERT_TRUE(TestUtils::verifyResult(columns, multiFieldResult)); } + { + std::vector columns; + nebula::cpp2::ColumnDef stringColumn; + stringColumn.set_name("edge_0_col_1"); + nebula::cpp2::ValueType stringType; + stringType.set_type(SupportedType::STRING); + stringColumn.set_type(std::move(stringType)); + columns.emplace_back(std::move(stringColumn)); + nebula::cpp2::ColumnDef intColumn; + intColumn.set_name("edge_0_col_0"); + nebula::cpp2::ValueType intType; + intType.set_type(SupportedType::INT); + intColumn.set_type(std::move(intType)); + columns.emplace_back(std::move(intColumn)); + auto disorderFieldResult = values[2].get_fields(); + ASSERT_TRUE(TestUtils::verifyResult(columns, disorderFieldResult)); + } } sleep(FLAGS_heartbeat_interval_secs * 5); // Test Edge Index Properties Cache @@ -912,7 +1013,7 @@ TEST(MetaClientTest, EdgeIndexTest) { { auto edgeIndexes = client->getEdgeIndexesFromCache(space); ASSERT_TRUE(edgeIndexes.ok()); - ASSERT_EQ(2, edgeIndexes.value().size()); + ASSERT_EQ(3, edgeIndexes.value().size()); } { GraphSpaceID spaceNotExist = 99; diff --git a/src/meta/test/ProcessorTest.cpp b/src/meta/test/ProcessorTest.cpp index 964b65ac998..3f8ef8e16aa 100644 --- a/src/meta/test/ProcessorTest.cpp +++ b/src/meta/test/ProcessorTest.cpp @@ -2024,6 +2024,19 @@ TEST(ProcessorTest, TagIndexTest) { auto resp = std::move(f).get(); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.get_code()); } + { + cpp2::CreateTagIndexReq req; + req.set_space_id(1); + req.set_tag_name("tag_0"); + std::vector fields{"tag_0_col_0"}; + req.set_fields(std::move(fields)); + req.set_index_name("duplicate_field_index"); + auto* processor = CreateTagIndexProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(cpp2::ErrorCode::E_EXISTED, resp.get_code()); + } { cpp2::CreateTagIndexReq req; req.set_space_id(1); @@ -2050,6 +2063,45 @@ TEST(ProcessorTest, TagIndexTest) { auto resp = std::move(f).get(); ASSERT_EQ(cpp2::ErrorCode::E_CONFLICT, resp.get_code()); } + { + cpp2::CreateTagIndexReq req; + req.set_space_id(1); + req.set_tag_name("tag_0"); + std::vector fields{"tag_0_col_0", "tag_0_col_1"}; + req.set_fields(std::move(fields)); + req.set_index_name("duplicate_field_index"); + auto* processor = CreateTagIndexProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(cpp2::ErrorCode::E_EXISTED, resp.get_code()); + } + { + cpp2::CreateTagIndexReq req; + req.set_space_id(1); + req.set_tag_name("tag_0"); + std::vector fields{"tag_0_col_0"}; + req.set_fields(std::move(fields)); + req.set_index_name("duplicate_field_index"); + auto* processor = CreateTagIndexProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(cpp2::ErrorCode::E_EXISTED, resp.get_code()); + } + { + cpp2::CreateTagIndexReq req; + req.set_space_id(1); + req.set_tag_name("tag_0"); + std::vector fields{"tag_0_col_1", "tag_0_col_0"}; + req.set_fields(std::move(fields)); + req.set_index_name("disorder_field_index"); + auto* processor = CreateTagIndexProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.get_code()); + } { cpp2::CreateTagIndexReq req; req.set_space_id(1); @@ -2100,7 +2152,7 @@ TEST(ProcessorTest, TagIndexTest) { ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.get_code()); auto items = resp.get_items(); - ASSERT_EQ(2, items.size()); + ASSERT_EQ(3, items.size()); { nebula::cpp2::ColumnDef column; column.set_name("tag_0_col_0"); @@ -2136,6 +2188,26 @@ TEST(ProcessorTest, TagIndexTest) { auto multiFieldResult = multiItem.get_fields(); ASSERT_TRUE(TestUtils::verifyResult(columns, multiFieldResult)); } + { + std::vector columns; + nebula::cpp2::ColumnDef stringColumn; + stringColumn.set_name("tag_0_col_1"); + nebula::cpp2::ValueType stringType; + stringType.set_type(SupportedType::STRING); + stringColumn.set_type(std::move(stringType)); + columns.emplace_back(std::move(stringColumn)); + nebula::cpp2::ColumnDef intColumn; + intColumn.set_name("tag_0_col_0"); + nebula::cpp2::ValueType intType; + intType.set_type(SupportedType::INT); + intColumn.set_type(std::move(intType)); + columns.emplace_back(std::move(intColumn)); + + auto disorderItem = items[2]; + ASSERT_EQ(3, disorderItem.get_index_id()); + auto disorderFieldResult = disorderItem.get_fields(); + ASSERT_TRUE(TestUtils::verifyResult(columns, disorderFieldResult)); + } } { cpp2::GetTagIndexReq req; @@ -2215,6 +2287,19 @@ TEST(ProcessorTest, EdgeIndexTest) { auto resp = std::move(f).get(); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.get_code()); } + { + cpp2::CreateEdgeIndexReq req; + req.set_space_id(1); + req.set_edge_name("edge_0"); + std::vector fields{"edge_0_col_0"}; + req.set_fields(std::move(fields)); + req.set_index_name("duplicate_field_index"); + auto* processor = CreateEdgeIndexProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(cpp2::ErrorCode::E_EXISTED, resp.get_code()); + } { cpp2::CreateEdgeIndexReq req; req.set_space_id(1); @@ -2241,6 +2326,45 @@ TEST(ProcessorTest, EdgeIndexTest) { auto resp = std::move(f).get(); ASSERT_EQ(cpp2::ErrorCode::E_CONFLICT, resp.get_code()); } + { + cpp2::CreateEdgeIndexReq req; + req.set_space_id(1); + req.set_edge_name("edge_0"); + std::vector fields{"edge_0_col_0", "edge_0_col_1"}; + req.set_fields(std::move(fields)); + req.set_index_name("duplicate_field_index"); + auto* processor = CreateEdgeIndexProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(cpp2::ErrorCode::E_EXISTED, resp.get_code()); + } + { + cpp2::CreateEdgeIndexReq req; + req.set_space_id(1); + req.set_edge_name("edge_0"); + std::vector fields{"edge_0_col_0"}; + req.set_fields(std::move(fields)); + req.set_index_name("duplicate_field_index"); + auto* processor = CreateEdgeIndexProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(cpp2::ErrorCode::E_EXISTED, resp.get_code()); + } + { + cpp2::CreateEdgeIndexReq req; + req.set_space_id(1); + req.set_edge_name("edge_0"); + std::vector fields{"edge_0_col_1", "edge_0_col_0"}; + req.set_fields(std::move(fields)); + req.set_index_name("disorder_field_index"); + auto* processor = CreateEdgeIndexProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.get_code()); + } { cpp2::CreateEdgeIndexReq req; req.set_space_id(1); @@ -2289,7 +2413,7 @@ TEST(ProcessorTest, EdgeIndexTest) { auto resp = std::move(f).get(); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.get_code()); auto items = resp.get_items(); - ASSERT_EQ(2, items.size()); + ASSERT_EQ(3, items.size()); { nebula::cpp2::ColumnDef column; @@ -2326,6 +2450,26 @@ TEST(ProcessorTest, EdgeIndexTest) { auto multiFieldResult = multiItem.get_fields(); ASSERT_TRUE(TestUtils::verifyResult(columns, multiFieldResult)); } + { + std::vector columns; + nebula::cpp2::ColumnDef stringColumn; + stringColumn.set_name("edge_0_col_1"); + nebula::cpp2::ValueType stringType; + stringType.set_type(SupportedType::STRING); + stringColumn.set_type(std::move(stringType)); + columns.emplace_back(std::move(stringColumn)); + nebula::cpp2::ColumnDef intColumn; + intColumn.set_name("edge_0_col_0"); + nebula::cpp2::ValueType intType; + intType.set_type(SupportedType::INT); + intColumn.set_type(std::move(intType)); + columns.emplace_back(std::move(intColumn)); + + auto disorderItem = items[2]; + ASSERT_EQ(3, disorderItem.get_index_id()); + auto disorderFieldResult = disorderItem.get_fields(); + ASSERT_TRUE(TestUtils::verifyResult(columns, disorderFieldResult)); + } } { cpp2::GetEdgeIndexReq req; diff --git a/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/nebula/tools/generator/v2/SparkClientGenerator.scala b/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/nebula/tools/generator/v2/SparkClientGenerator.scala index 07119e4ba7e..fae402eed19 100644 --- a/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/nebula/tools/generator/v2/SparkClientGenerator.scala +++ b/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/nebula/tools/generator/v2/SparkClientGenerator.scala @@ -310,7 +310,7 @@ object SparkClientGenerator { val values = (for { property <- valueProperties if property.trim.length != 0 } yield extraValue(row, property)).mkString(",") - (row.getString(vertexIndex), values) + (String.valueOf(extraValue(row, vertex)), values) }(Encoders.tuple(Encoders.STRING, Encoders.STRING)) .foreachPartition { iterator: Iterator[(String, String)] => val service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)) diff --git a/src/tools/storage-perf/StoragePerfTool.cpp b/src/tools/storage-perf/StoragePerfTool.cpp index e4ac7839f07..d0679eaac6e 100644 --- a/src/tools/storage-perf/StoragePerfTool.cpp +++ b/src/tools/storage-perf/StoragePerfTool.cpp @@ -191,6 +191,7 @@ class Perf { auto props = genData(FLAGS_size); edge.set_props(std::move(props)); edges.emplace_back(std::move(edge)); + vId++; return edges; }