diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index 55f45eaa0ac..3c6442d415d 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -955,6 +955,12 @@ Status MetaClient::handleResponse(const RESP& resp) { return Status::Error("Backup table failure!"); case nebula::cpp2::ErrorCode::E_SESSION_NOT_FOUND: return Status::Error("Session not existed!"); + case nebula::cpp2::ErrorCode::E_SCHEMA_NAME_EXISTS: + return Status::Error("Schema with same name exists"); + case nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS: + return Status::Error("Related index exists, please drop index first"); + case nebula::cpp2::ErrorCode::E_RELATED_SPACE_EXISTS: + return Status::Error("There are still space on the host"); default: return Status::Error("Unknown error!"); } diff --git a/src/graph/executor/admin/SubmitJobExecutor.cpp b/src/graph/executor/admin/SubmitJobExecutor.cpp index fbec9898d69..4ad69ea5365 100644 --- a/src/graph/executor/admin/SubmitJobExecutor.cpp +++ b/src/graph/executor/admin/SubmitJobExecutor.cpp @@ -151,7 +151,9 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData( apache::thrift::util::enumNameSafe(tsk.get_result()), convertJobTimestampToDateTime(std::move(tsk).get_start_time()), convertJobTimestampToDateTime(std::move(tsk).get_stop_time()), - apache::thrift::util::enumNameSafe(jd.get_code())})); + jd.get_code() == nebula::cpp2::ErrorCode::E_UNKNOWN + ? "" + : apache::thrift::util::enumNameSafe(jd.get_code())})); } v.emplace_back(Row({folly::sformat("Total:{}", total), folly::sformat("Succeeded:{}", succeeded), @@ -195,7 +197,9 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData( apache::thrift::util::enumNameSafe(taskDesc.get_status()), convertJobTimestampToDateTime(taskDesc.get_start_time()), convertJobTimestampToDateTime(taskDesc.get_stop_time()), - apache::thrift::util::enumNameSafe(taskDesc.get_code()), + jd.get_code() == nebula::cpp2::ErrorCode::E_UNKNOWN + ? "" + : apache::thrift::util::enumNameSafe(jd.get_code()), })); } v.emplace_back(Row({folly::sformat("Total:{}", total), diff --git a/src/interface/common.thrift b/src/interface/common.thrift index 1e95be659b8..a703b015280 100644 --- a/src/interface/common.thrift +++ b/src/interface/common.thrift @@ -357,6 +357,9 @@ enum ErrorCode { E_WRONGCLUSTER = -2010, // Wrong cluster E_ZONE_NOT_ENOUGH = -2011, // Listener conflicts E_ZONE_IS_EMPTY = -2012, // Host not exist + E_SCHEMA_NAME_EXISTS = -2013, // Schema name alreay exists + E_RELATED_INDEX_EXISTS = -2014, // There are stil indexes related to tag or edge, cannot drop it + E_RELATED_SPACE_EXISTS = -2015, // There are still some spaec on the host, cannot drop it E_STORE_FAILURE = -2021, // Failed to store data E_STORE_SEGMENT_ILLEGAL = -2022, // Illegal storage segment diff --git a/src/meta/processors/schema/CreateEdgeProcessor.cpp b/src/meta/processors/schema/CreateEdgeProcessor.cpp index e758dd1da3c..c45a67b74ee 100644 --- a/src/meta/processors/schema/CreateEdgeProcessor.cpp +++ b/src/meta/processors/schema/CreateEdgeProcessor.cpp @@ -21,7 +21,7 @@ void CreateEdgeProcessor::process(const cpp2::CreateEdgeReq& req) { LOG(INFO) << "Failed to create edge `" << edgeName << "': some tag with the same name already exists."; resp_.id_ref() = to(nebula::value(conflictRet), EntryType::EDGE); - handleErrorCode(nebula::cpp2::ErrorCode::E_CONFLICT); + handleErrorCode(nebula::cpp2::ErrorCode::E_SCHEMA_NAME_EXISTS); onFinished(); return; } else { diff --git a/src/meta/processors/schema/DropEdgeProcessor.cpp b/src/meta/processors/schema/DropEdgeProcessor.cpp index 7c07f0a306e..9255223ec59 100644 --- a/src/meta/processors/schema/DropEdgeProcessor.cpp +++ b/src/meta/processors/schema/DropEdgeProcessor.cpp @@ -50,7 +50,7 @@ void DropEdgeProcessor::process(const cpp2::DropEdgeReq& req) { } if (!nebula::value(indexes).empty()) { LOG(INFO) << "Drop edge error, index conflict, please delete index first."; - handleErrorCode(nebula::cpp2::ErrorCode::E_CONFLICT); + handleErrorCode(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS); onFinished(); return; } @@ -59,7 +59,7 @@ void DropEdgeProcessor::process(const cpp2::DropEdgeReq& req) { if (nebula::ok(ftIdxRet)) { LOG(INFO) << "Drop edge error, fulltext index conflict, " << "please delete fulltext index first."; - handleErrorCode(nebula::cpp2::ErrorCode::E_CONFLICT); + handleErrorCode(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS); onFinished(); return; } diff --git a/src/meta/processors/schema/DropTagProcessor.cpp b/src/meta/processors/schema/DropTagProcessor.cpp index 900454fa3d5..2f3c5c27cb7 100644 --- a/src/meta/processors/schema/DropTagProcessor.cpp +++ b/src/meta/processors/schema/DropTagProcessor.cpp @@ -50,7 +50,7 @@ void DropTagProcessor::process(const cpp2::DropTagReq& req) { } if (!nebula::value(indexes).empty()) { LOG(INFO) << "Drop tag error, index conflict, please delete index first."; - handleErrorCode(nebula::cpp2::ErrorCode::E_CONFLICT); + handleErrorCode(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS); onFinished(); return; } @@ -59,7 +59,7 @@ void DropTagProcessor::process(const cpp2::DropTagReq& req) { if (nebula::ok(ftIdxRet)) { LOG(INFO) << "Drop tag error, fulltext index conflict, " << "please delete fulltext index first."; - handleErrorCode(nebula::cpp2::ErrorCode::E_CONFLICT); + handleErrorCode(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS); onFinished(); return; } diff --git a/src/meta/processors/zone/DropHostsProcessor.cpp b/src/meta/processors/zone/DropHostsProcessor.cpp index 51323d5ca14..c91aaec1349 100644 --- a/src/meta/processors/zone/DropHostsProcessor.cpp +++ b/src/meta/processors/zone/DropHostsProcessor.cpp @@ -54,7 +54,7 @@ void DropHostsProcessor::process(const cpp2::DropHostsReq& req) { for (auto& h : partHosts) { if (std::find(hosts.begin(), hosts.end(), h) != hosts.end()) { LOG(INFO) << h << " is related with partition"; - code = nebula::cpp2::ErrorCode::E_CONFLICT; + code = nebula::cpp2::ErrorCode::E_RELATED_SPACE_EXISTS; break; } } diff --git a/src/meta/processors/zone/DropZoneProcessor.cpp b/src/meta/processors/zone/DropZoneProcessor.cpp index ac77d9ab518..22c6f4062ba 100644 --- a/src/meta/processors/zone/DropZoneProcessor.cpp +++ b/src/meta/processors/zone/DropZoneProcessor.cpp @@ -86,7 +86,7 @@ nebula::cpp2::ErrorCode DropZoneProcessor::checkSpaceReplicaZone() { size_t replicaFactor = properties.get_replica_factor(); if (replicaFactor == spaceZones.size()) { LOG(INFO) << "Space " << spaceId << " replica factor and zone size are the same"; - code = nebula::cpp2::ErrorCode::E_CONFLICT; + code = nebula::cpp2::ErrorCode::E_ZONE_NOT_ENOUGH; break; } iter->next(); @@ -121,7 +121,7 @@ nebula::cpp2::ErrorCode DropZoneProcessor::checkHostPartition(const HostAddr& ad for (auto& host : hosts) { if (host == address) { LOG(INFO) << "Host " << address << " have partition on it"; - code = nebula::cpp2::ErrorCode::E_CONFLICT; + code = nebula::cpp2::ErrorCode::E_RELATED_SPACE_EXISTS; break; } } diff --git a/src/meta/test/IndexProcessorTest.cpp b/src/meta/test/IndexProcessorTest.cpp index 4ee6a750077..9a2b199c93f 100644 --- a/src/meta/test/IndexProcessorTest.cpp +++ b/src/meta/test/IndexProcessorTest.cpp @@ -1179,7 +1179,7 @@ TEST(IndexProcessorTest, IndexCheckDropEdgeTest) { auto f = processor->getFuture(); processor->process(req); auto resp = std::move(f).get(); - ASSERT_EQ(nebula::cpp2::ErrorCode::E_CONFLICT, resp.get_code()); + ASSERT_EQ(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS, resp.get_code()); } } @@ -1211,7 +1211,7 @@ TEST(IndexProcessorTest, IndexCheckDropTagTest) { auto f = processor->getFuture(); processor->process(req); auto resp = std::move(f).get(); - ASSERT_EQ(nebula::cpp2::ErrorCode::E_CONFLICT, resp.get_code()); + ASSERT_EQ(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS, resp.get_code()); } } @@ -1934,7 +1934,7 @@ TEST(IndexProcessorTest, DropWithFTIndexTest) { auto f = processor->getFuture(); processor->process(req); auto resp = std::move(f).get(); - ASSERT_EQ(nebula::cpp2::ErrorCode::E_CONFLICT, resp.get_code()); + ASSERT_EQ(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS, resp.get_code()); } { cpp2::DropEdgeReq req; @@ -1944,7 +1944,7 @@ TEST(IndexProcessorTest, DropWithFTIndexTest) { auto f = processor->getFuture(); processor->process(req); auto resp = std::move(f).get(); - ASSERT_EQ(nebula::cpp2::ErrorCode::E_CONFLICT, resp.get_code()); + ASSERT_EQ(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS, resp.get_code()); } } diff --git a/src/meta/test/ProcessorTest.cpp b/src/meta/test/ProcessorTest.cpp index 3afc4760b82..66299126eb2 100644 --- a/src/meta/test/ProcessorTest.cpp +++ b/src/meta/test/ProcessorTest.cpp @@ -640,7 +640,7 @@ TEST(ProcessorTest, CreateTagTest) { auto f = processor->getFuture(); processor->process(req); auto resp = std::move(f).get(); - ASSERT_EQ(nebula::cpp2::ErrorCode::E_CONFLICT, resp.get_code()); + ASSERT_EQ(nebula::cpp2::ErrorCode::E_SCHEMA_NAME_EXISTS, resp.get_code()); } { // Set schema ttl property @@ -3596,7 +3596,7 @@ TEST(ProcessorTest, DropHostsTest) { auto f = processor->getFuture(); processor->process(req); auto resp = std::move(f).get(); - ASSERT_EQ(nebula::cpp2::ErrorCode::E_CONFLICT, resp.get_code()); + ASSERT_EQ(nebula::cpp2::ErrorCode::E_RELATED_SPACE_EXISTS, resp.get_code()); } { // Drop hosts which hold partition. @@ -3607,7 +3607,7 @@ TEST(ProcessorTest, DropHostsTest) { auto f = processor->getFuture(); processor->process(req); auto resp = std::move(f).get(); - ASSERT_EQ(nebula::cpp2::ErrorCode::E_CONFLICT, resp.get_code()); + ASSERT_EQ(nebula::cpp2::ErrorCode::E_RELATED_SPACE_EXISTS, resp.get_code()); } { cpp2::DropSpaceReq req; @@ -4665,7 +4665,7 @@ TEST(ProcessorTest, DropZoneTest) { auto f = processor->getFuture(); processor->process(req); auto resp = std::move(f).get(); - ASSERT_EQ(nebula::cpp2::ErrorCode::E_CONFLICT, resp.get_code()); + ASSERT_EQ(nebula::cpp2::ErrorCode::E_ZONE_NOT_ENOUGH, resp.get_code()); } { cpp2::DropSpaceReq req; @@ -4699,7 +4699,7 @@ TEST(ProcessorTest, DropZoneTest) { auto f = processor->getFuture(); processor->process(req); auto resp = std::move(f).get(); - ASSERT_EQ(nebula::cpp2::ErrorCode::E_CONFLICT, resp.get_code()); + ASSERT_EQ(nebula::cpp2::ErrorCode::E_RELATED_SPACE_EXISTS, resp.get_code()); } } diff --git a/src/storage/admin/StatsTask.cpp b/src/storage/admin/StatsTask.cpp index 8008df3173d..9b9bb304394 100644 --- a/src/storage/admin/StatsTask.cpp +++ b/src/storage/admin/StatsTask.cpp @@ -11,6 +11,11 @@ #include "common/utils/NebulaKeyUtils.h" #include "kvstore/Common.h" +DEFINE_int32(stats_sleep_interval_ms, + 0, + "If interval is greater than 0, sleep a period of time when scanned 1000 records. " + "Default value 0 means won't sleep"); + namespace nebula { namespace storage { @@ -132,6 +137,9 @@ nebula::cpp2::ErrorCode StatsTask::genSubTask(GraphSpaceID spaceId, std::unordered_map negativeRelevancy; int64_t spaceVertices = 0; int64_t spaceEdges = 0; + // Once 1000 records are scanned, check if we need to sleep for a while to prevent high pressure + // on io, the sleep time is stats_sleep_interval_ms + size_t countToSleep = 0; for (auto tag : tags) { tagsVertices[tag.first] = 0; @@ -167,6 +175,7 @@ nebula::cpp2::ErrorCode StatsTask::genSubTask(GraphSpaceID spaceId, } tagsVertices[tagId] += 1; tagIter->next(); + sleepIfScannedSomeRecord(++countToSleep); } // Only stats valid edge data, no multi version @@ -216,10 +225,12 @@ nebula::cpp2::ErrorCode StatsTask::genSubTask(GraphSpaceID spaceId, negativeRelevancy[sourceVid % partitionNum + 1]++; } edgeIter->next(); + sleepIfScannedSomeRecord(++countToSleep); } while (vertexIter && vertexIter->valid()) { spaceVertices++; vertexIter->next(); + sleepIfScannedSomeRecord(++countToSleep); } nebula::meta::cpp2::StatsItem statsItem; @@ -334,5 +345,12 @@ void StatsTask::finish(nebula::cpp2::ErrorCode rc) { } } +void StatsTask::sleepIfScannedSomeRecord(size_t& countToSleep) { + if (FLAGS_stats_sleep_interval_ms > 0 && countToSleep >= kRecordsToSleep) { + usleep(FLAGS_stats_sleep_interval_ms * 1000); + countToSleep = 0; + } +} + } // namespace storage } // namespace nebula diff --git a/src/storage/admin/StatsTask.h b/src/storage/admin/StatsTask.h index 5b618fc0d63..37be1156d29 100644 --- a/src/storage/admin/StatsTask.h +++ b/src/storage/admin/StatsTask.h @@ -52,6 +52,8 @@ class StatsTask : public AdminTask { private: nebula::cpp2::ErrorCode getSchemas(GraphSpaceID spaceId); + void sleepIfScannedSomeRecord(size_t& countToSleep); + protected: GraphSpaceID spaceId_; @@ -65,6 +67,8 @@ class StatsTask : public AdminTask { // The number of subtasks equals to the number of parts in request size_t subTaskSize_{0}; + + static constexpr size_t kRecordsToSleep{1000}; }; } // namespace storage