Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some minor enhancement #4500

Merged
merged 6 commits into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,12 @@ Status MetaClient::handleResponse(const RESP& resp) {
return Status::Error("Backup table failure!");
case nebula::cpp2::ErrorCode::E_SESSION_NOT_FOUND:
return Status::Error("Session not existed!");
case nebula::cpp2::ErrorCode::E_SCHEMA_NAME_EXISTS:
return Status::Error("Schema with same name exists");
case nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS:
return Status::Error("Related index exists, please drop index first");
case nebula::cpp2::ErrorCode::E_RELATED_SPACE_EXISTS:
return Status::Error("There are still space on the host");
default:
return Status::Error("Unknown error!");
}
Expand Down
8 changes: 6 additions & 2 deletions src/graph/executor/admin/SubmitJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData(
apache::thrift::util::enumNameSafe(tsk.get_result()),
convertJobTimestampToDateTime(std::move(tsk).get_start_time()),
convertJobTimestampToDateTime(std::move(tsk).get_stop_time()),
apache::thrift::util::enumNameSafe(jd.get_code())}));
jd.get_code() == nebula::cpp2::ErrorCode::E_UNKNOWN
? ""
: apache::thrift::util::enumNameSafe(jd.get_code())}));
}
v.emplace_back(Row({folly::sformat("Total:{}", total),
folly::sformat("Succeeded:{}", succeeded),
Expand Down Expand Up @@ -195,7 +197,9 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData(
apache::thrift::util::enumNameSafe(taskDesc.get_status()),
convertJobTimestampToDateTime(taskDesc.get_start_time()),
convertJobTimestampToDateTime(taskDesc.get_stop_time()),
apache::thrift::util::enumNameSafe(taskDesc.get_code()),
jd.get_code() == nebula::cpp2::ErrorCode::E_UNKNOWN
? ""
: apache::thrift::util::enumNameSafe(jd.get_code()),
}));
}
v.emplace_back(Row({folly::sformat("Total:{}", total),
Expand Down
3 changes: 3 additions & 0 deletions src/interface/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,9 @@ enum ErrorCode {
E_WRONGCLUSTER = -2010, // Wrong cluster
E_ZONE_NOT_ENOUGH = -2011, // Listener conflicts
E_ZONE_IS_EMPTY = -2012, // Host not exist
E_SCHEMA_NAME_EXISTS = -2013, // Schema name alreay exists
E_RELATED_INDEX_EXISTS = -2014, // There are stil indexes related to tag or edge, cannot drop it
E_RELATED_SPACE_EXISTS = -2015, // There are still some spaec on the host, cannot drop it

E_STORE_FAILURE = -2021, // Failed to store data
E_STORE_SEGMENT_ILLEGAL = -2022, // Illegal storage segment
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/schema/CreateEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ void CreateEdgeProcessor::process(const cpp2::CreateEdgeReq& req) {
LOG(INFO) << "Failed to create edge `" << edgeName
<< "': some tag with the same name already exists.";
resp_.id_ref() = to(nebula::value(conflictRet), EntryType::EDGE);
handleErrorCode(nebula::cpp2::ErrorCode::E_CONFLICT);
handleErrorCode(nebula::cpp2::ErrorCode::E_SCHEMA_NAME_EXISTS);
onFinished();
return;
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/schema/DropEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void DropEdgeProcessor::process(const cpp2::DropEdgeReq& req) {
}
if (!nebula::value(indexes).empty()) {
LOG(INFO) << "Drop edge error, index conflict, please delete index first.";
handleErrorCode(nebula::cpp2::ErrorCode::E_CONFLICT);
handleErrorCode(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS);
onFinished();
return;
}
Expand All @@ -59,7 +59,7 @@ void DropEdgeProcessor::process(const cpp2::DropEdgeReq& req) {
if (nebula::ok(ftIdxRet)) {
LOG(INFO) << "Drop edge error, fulltext index conflict, "
<< "please delete fulltext index first.";
handleErrorCode(nebula::cpp2::ErrorCode::E_CONFLICT);
handleErrorCode(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS);
onFinished();
return;
}
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/schema/DropTagProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void DropTagProcessor::process(const cpp2::DropTagReq& req) {
}
if (!nebula::value(indexes).empty()) {
LOG(INFO) << "Drop tag error, index conflict, please delete index first.";
handleErrorCode(nebula::cpp2::ErrorCode::E_CONFLICT);
handleErrorCode(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS);
onFinished();
return;
}
Expand All @@ -59,7 +59,7 @@ void DropTagProcessor::process(const cpp2::DropTagReq& req) {
if (nebula::ok(ftIdxRet)) {
LOG(INFO) << "Drop tag error, fulltext index conflict, "
<< "please delete fulltext index first.";
handleErrorCode(nebula::cpp2::ErrorCode::E_CONFLICT);
handleErrorCode(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS);
onFinished();
return;
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/zone/DropHostsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void DropHostsProcessor::process(const cpp2::DropHostsReq& req) {
for (auto& h : partHosts) {
if (std::find(hosts.begin(), hosts.end(), h) != hosts.end()) {
LOG(INFO) << h << " is related with partition";
code = nebula::cpp2::ErrorCode::E_CONFLICT;
code = nebula::cpp2::ErrorCode::E_RELATED_SPACE_EXISTS;
break;
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/zone/DropZoneProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ nebula::cpp2::ErrorCode DropZoneProcessor::checkSpaceReplicaZone() {
size_t replicaFactor = properties.get_replica_factor();
if (replicaFactor == spaceZones.size()) {
LOG(INFO) << "Space " << spaceId << " replica factor and zone size are the same";
code = nebula::cpp2::ErrorCode::E_CONFLICT;
code = nebula::cpp2::ErrorCode::E_ZONE_NOT_ENOUGH;
break;
}
iter->next();
Expand Down Expand Up @@ -121,7 +121,7 @@ nebula::cpp2::ErrorCode DropZoneProcessor::checkHostPartition(const HostAddr& ad
for (auto& host : hosts) {
if (host == address) {
LOG(INFO) << "Host " << address << " have partition on it";
code = nebula::cpp2::ErrorCode::E_CONFLICT;
code = nebula::cpp2::ErrorCode::E_RELATED_SPACE_EXISTS;
break;
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/meta/test/IndexProcessorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1179,7 +1179,7 @@ TEST(IndexProcessorTest, IndexCheckDropEdgeTest) {
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::E_CONFLICT, resp.get_code());
ASSERT_EQ(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS, resp.get_code());
}
}

Expand Down Expand Up @@ -1211,7 +1211,7 @@ TEST(IndexProcessorTest, IndexCheckDropTagTest) {
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::E_CONFLICT, resp.get_code());
ASSERT_EQ(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS, resp.get_code());
}
}

Expand Down Expand Up @@ -1934,7 +1934,7 @@ TEST(IndexProcessorTest, DropWithFTIndexTest) {
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::E_CONFLICT, resp.get_code());
ASSERT_EQ(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS, resp.get_code());
}
{
cpp2::DropEdgeReq req;
Expand All @@ -1944,7 +1944,7 @@ TEST(IndexProcessorTest, DropWithFTIndexTest) {
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::E_CONFLICT, resp.get_code());
ASSERT_EQ(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS, resp.get_code());
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/meta/test/ProcessorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ TEST(ProcessorTest, CreateTagTest) {
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::E_CONFLICT, resp.get_code());
ASSERT_EQ(nebula::cpp2::ErrorCode::E_SCHEMA_NAME_EXISTS, resp.get_code());
}
{
// Set schema ttl property
Expand Down Expand Up @@ -3596,7 +3596,7 @@ TEST(ProcessorTest, DropHostsTest) {
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::E_CONFLICT, resp.get_code());
ASSERT_EQ(nebula::cpp2::ErrorCode::E_RELATED_SPACE_EXISTS, resp.get_code());
}
{
// Drop hosts which hold partition.
Expand All @@ -3607,7 +3607,7 @@ TEST(ProcessorTest, DropHostsTest) {
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::E_CONFLICT, resp.get_code());
ASSERT_EQ(nebula::cpp2::ErrorCode::E_RELATED_SPACE_EXISTS, resp.get_code());
}
{
cpp2::DropSpaceReq req;
Expand Down Expand Up @@ -4665,7 +4665,7 @@ TEST(ProcessorTest, DropZoneTest) {
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::E_CONFLICT, resp.get_code());
ASSERT_EQ(nebula::cpp2::ErrorCode::E_ZONE_NOT_ENOUGH, resp.get_code());
}
{
cpp2::DropSpaceReq req;
Expand Down Expand Up @@ -4699,7 +4699,7 @@ TEST(ProcessorTest, DropZoneTest) {
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::E_CONFLICT, resp.get_code());
ASSERT_EQ(nebula::cpp2::ErrorCode::E_RELATED_SPACE_EXISTS, resp.get_code());
}
}

Expand Down
18 changes: 18 additions & 0 deletions src/storage/admin/StatsTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
#include "common/utils/NebulaKeyUtils.h"
#include "kvstore/Common.h"

DEFINE_int32(stats_sleep_interval_ms,
0,
"If interval is greater than 0, sleep a period of time when scanned 1000 records. "
"Default value 0 means won't sleep");

namespace nebula {
namespace storage {

Expand Down Expand Up @@ -132,6 +137,9 @@ nebula::cpp2::ErrorCode StatsTask::genSubTask(GraphSpaceID spaceId,
std::unordered_map<PartitionID, int64_t> negativeRelevancy;
int64_t spaceVertices = 0;
int64_t spaceEdges = 0;
// Once 1000 records are scanned, check if we need to sleep for a while to prevent high pressure
// on io, the sleep time is stats_sleep_interval_ms
size_t countToSleep = 0;

for (auto tag : tags) {
tagsVertices[tag.first] = 0;
Expand Down Expand Up @@ -167,6 +175,7 @@ nebula::cpp2::ErrorCode StatsTask::genSubTask(GraphSpaceID spaceId,
}
tagsVertices[tagId] += 1;
tagIter->next();
sleepIfScannedSomeRecord(++countToSleep);
}

// Only stats valid edge data, no multi version
Expand Down Expand Up @@ -216,10 +225,12 @@ nebula::cpp2::ErrorCode StatsTask::genSubTask(GraphSpaceID spaceId,
negativeRelevancy[sourceVid % partitionNum + 1]++;
}
edgeIter->next();
sleepIfScannedSomeRecord(++countToSleep);
}
while (vertexIter && vertexIter->valid()) {
spaceVertices++;
vertexIter->next();
sleepIfScannedSomeRecord(++countToSleep);
}
nebula::meta::cpp2::StatsItem statsItem;

Expand Down Expand Up @@ -334,5 +345,12 @@ void StatsTask::finish(nebula::cpp2::ErrorCode rc) {
}
}

void StatsTask::sleepIfScannedSomeRecord(size_t& countToSleep) {
if (FLAGS_stats_sleep_interval_ms > 0 && countToSleep >= kRecordsToSleep) {
usleep(FLAGS_stats_sleep_interval_ms * 1000);
countToSleep = 0;
}
}

} // namespace storage
} // namespace nebula
4 changes: 4 additions & 0 deletions src/storage/admin/StatsTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class StatsTask : public AdminTask {
private:
nebula::cpp2::ErrorCode getSchemas(GraphSpaceID spaceId);

void sleepIfScannedSomeRecord(size_t& countToSleep);

protected:
GraphSpaceID spaceId_;

Expand All @@ -65,6 +67,8 @@ class StatsTask : public AdminTask {

// The number of subtasks equals to the number of parts in request
size_t subTaskSize_{0};

static constexpr size_t kRecordsToSleep{1000};
};

} // namespace storage
Expand Down