Skip to content

Commit

Permalink
Some minor enhancement (#4500)
Browse files Browse the repository at this point in the history
* fix #4155

* fix #4489

* add stats speed limit by sleep a while for every 1000 records

* fix gcc build

Co-authored-by: Sophie <[email protected]>
  • Loading branch information
critical27 and Sophie-Xie authored Aug 16, 2022
1 parent 02d6380 commit fc44449
Show file tree
Hide file tree
Showing 12 changed files with 54 additions and 19 deletions.
6 changes: 6 additions & 0 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,12 @@ Status MetaClient::handleResponse(const RESP& resp) {
return Status::Error("Backup table failure!");
case nebula::cpp2::ErrorCode::E_SESSION_NOT_FOUND:
return Status::Error("Session not existed!");
case nebula::cpp2::ErrorCode::E_SCHEMA_NAME_EXISTS:
return Status::Error("Schema with same name exists");
case nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS:
return Status::Error("Related index exists, please drop index first");
case nebula::cpp2::ErrorCode::E_RELATED_SPACE_EXISTS:
return Status::Error("There are still space on the host");
default:
return Status::Error("Unknown error!");
}
Expand Down
8 changes: 6 additions & 2 deletions src/graph/executor/admin/SubmitJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData(
apache::thrift::util::enumNameSafe(tsk.get_result()),
convertJobTimestampToDateTime(std::move(tsk).get_start_time()),
convertJobTimestampToDateTime(std::move(tsk).get_stop_time()),
apache::thrift::util::enumNameSafe(jd.get_code())}));
jd.get_code() == nebula::cpp2::ErrorCode::E_UNKNOWN
? ""
: apache::thrift::util::enumNameSafe(jd.get_code())}));
}
v.emplace_back(Row({folly::sformat("Total:{}", total),
folly::sformat("Succeeded:{}", succeeded),
Expand Down Expand Up @@ -195,7 +197,9 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData(
apache::thrift::util::enumNameSafe(taskDesc.get_status()),
convertJobTimestampToDateTime(taskDesc.get_start_time()),
convertJobTimestampToDateTime(taskDesc.get_stop_time()),
apache::thrift::util::enumNameSafe(taskDesc.get_code()),
jd.get_code() == nebula::cpp2::ErrorCode::E_UNKNOWN
? ""
: apache::thrift::util::enumNameSafe(jd.get_code()),
}));
}
v.emplace_back(Row({folly::sformat("Total:{}", total),
Expand Down
3 changes: 3 additions & 0 deletions src/interface/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,9 @@ enum ErrorCode {
E_WRONGCLUSTER = -2010, // Wrong cluster
E_ZONE_NOT_ENOUGH = -2011, // Listener conflicts
E_ZONE_IS_EMPTY = -2012, // Host not exist
E_SCHEMA_NAME_EXISTS = -2013, // Schema name alreay exists
E_RELATED_INDEX_EXISTS = -2014, // There are stil indexes related to tag or edge, cannot drop it
E_RELATED_SPACE_EXISTS = -2015, // There are still some spaec on the host, cannot drop it

E_STORE_FAILURE = -2021, // Failed to store data
E_STORE_SEGMENT_ILLEGAL = -2022, // Illegal storage segment
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/schema/CreateEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ void CreateEdgeProcessor::process(const cpp2::CreateEdgeReq& req) {
LOG(INFO) << "Failed to create edge `" << edgeName
<< "': some tag with the same name already exists.";
resp_.id_ref() = to(nebula::value(conflictRet), EntryType::EDGE);
handleErrorCode(nebula::cpp2::ErrorCode::E_CONFLICT);
handleErrorCode(nebula::cpp2::ErrorCode::E_SCHEMA_NAME_EXISTS);
onFinished();
return;
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/schema/DropEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void DropEdgeProcessor::process(const cpp2::DropEdgeReq& req) {
}
if (!nebula::value(indexes).empty()) {
LOG(INFO) << "Drop edge error, index conflict, please delete index first.";
handleErrorCode(nebula::cpp2::ErrorCode::E_CONFLICT);
handleErrorCode(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS);
onFinished();
return;
}
Expand All @@ -59,7 +59,7 @@ void DropEdgeProcessor::process(const cpp2::DropEdgeReq& req) {
if (nebula::ok(ftIdxRet)) {
LOG(INFO) << "Drop edge error, fulltext index conflict, "
<< "please delete fulltext index first.";
handleErrorCode(nebula::cpp2::ErrorCode::E_CONFLICT);
handleErrorCode(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS);
onFinished();
return;
}
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/schema/DropTagProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void DropTagProcessor::process(const cpp2::DropTagReq& req) {
}
if (!nebula::value(indexes).empty()) {
LOG(INFO) << "Drop tag error, index conflict, please delete index first.";
handleErrorCode(nebula::cpp2::ErrorCode::E_CONFLICT);
handleErrorCode(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS);
onFinished();
return;
}
Expand All @@ -59,7 +59,7 @@ void DropTagProcessor::process(const cpp2::DropTagReq& req) {
if (nebula::ok(ftIdxRet)) {
LOG(INFO) << "Drop tag error, fulltext index conflict, "
<< "please delete fulltext index first.";
handleErrorCode(nebula::cpp2::ErrorCode::E_CONFLICT);
handleErrorCode(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS);
onFinished();
return;
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/zone/DropHostsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void DropHostsProcessor::process(const cpp2::DropHostsReq& req) {
for (auto& h : partHosts) {
if (std::find(hosts.begin(), hosts.end(), h) != hosts.end()) {
LOG(INFO) << h << " is related with partition";
code = nebula::cpp2::ErrorCode::E_CONFLICT;
code = nebula::cpp2::ErrorCode::E_RELATED_SPACE_EXISTS;
break;
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/zone/DropZoneProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ nebula::cpp2::ErrorCode DropZoneProcessor::checkSpaceReplicaZone() {
size_t replicaFactor = properties.get_replica_factor();
if (replicaFactor == spaceZones.size()) {
LOG(INFO) << "Space " << spaceId << " replica factor and zone size are the same";
code = nebula::cpp2::ErrorCode::E_CONFLICT;
code = nebula::cpp2::ErrorCode::E_ZONE_NOT_ENOUGH;
break;
}
iter->next();
Expand Down Expand Up @@ -121,7 +121,7 @@ nebula::cpp2::ErrorCode DropZoneProcessor::checkHostPartition(const HostAddr& ad
for (auto& host : hosts) {
if (host == address) {
LOG(INFO) << "Host " << address << " have partition on it";
code = nebula::cpp2::ErrorCode::E_CONFLICT;
code = nebula::cpp2::ErrorCode::E_RELATED_SPACE_EXISTS;
break;
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/meta/test/IndexProcessorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1179,7 +1179,7 @@ TEST(IndexProcessorTest, IndexCheckDropEdgeTest) {
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::E_CONFLICT, resp.get_code());
ASSERT_EQ(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS, resp.get_code());
}
}

Expand Down Expand Up @@ -1211,7 +1211,7 @@ TEST(IndexProcessorTest, IndexCheckDropTagTest) {
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::E_CONFLICT, resp.get_code());
ASSERT_EQ(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS, resp.get_code());
}
}

Expand Down Expand Up @@ -1934,7 +1934,7 @@ TEST(IndexProcessorTest, DropWithFTIndexTest) {
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::E_CONFLICT, resp.get_code());
ASSERT_EQ(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS, resp.get_code());
}
{
cpp2::DropEdgeReq req;
Expand All @@ -1944,7 +1944,7 @@ TEST(IndexProcessorTest, DropWithFTIndexTest) {
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::E_CONFLICT, resp.get_code());
ASSERT_EQ(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS, resp.get_code());
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/meta/test/ProcessorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ TEST(ProcessorTest, CreateTagTest) {
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::E_CONFLICT, resp.get_code());
ASSERT_EQ(nebula::cpp2::ErrorCode::E_SCHEMA_NAME_EXISTS, resp.get_code());
}
{
// Set schema ttl property
Expand Down Expand Up @@ -3596,7 +3596,7 @@ TEST(ProcessorTest, DropHostsTest) {
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::E_CONFLICT, resp.get_code());
ASSERT_EQ(nebula::cpp2::ErrorCode::E_RELATED_SPACE_EXISTS, resp.get_code());
}
{
// Drop hosts which hold partition.
Expand All @@ -3607,7 +3607,7 @@ TEST(ProcessorTest, DropHostsTest) {
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::E_CONFLICT, resp.get_code());
ASSERT_EQ(nebula::cpp2::ErrorCode::E_RELATED_SPACE_EXISTS, resp.get_code());
}
{
cpp2::DropSpaceReq req;
Expand Down Expand Up @@ -4665,7 +4665,7 @@ TEST(ProcessorTest, DropZoneTest) {
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::E_CONFLICT, resp.get_code());
ASSERT_EQ(nebula::cpp2::ErrorCode::E_ZONE_NOT_ENOUGH, resp.get_code());
}
{
cpp2::DropSpaceReq req;
Expand Down Expand Up @@ -4699,7 +4699,7 @@ TEST(ProcessorTest, DropZoneTest) {
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::E_CONFLICT, resp.get_code());
ASSERT_EQ(nebula::cpp2::ErrorCode::E_RELATED_SPACE_EXISTS, resp.get_code());
}
}

Expand Down
18 changes: 18 additions & 0 deletions src/storage/admin/StatsTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
#include "common/utils/NebulaKeyUtils.h"
#include "kvstore/Common.h"

DEFINE_int32(stats_sleep_interval_ms,
0,
"If interval is greater than 0, sleep a period of time when scanned 1000 records. "
"Default value 0 means won't sleep");

namespace nebula {
namespace storage {

Expand Down Expand Up @@ -132,6 +137,9 @@ nebula::cpp2::ErrorCode StatsTask::genSubTask(GraphSpaceID spaceId,
std::unordered_map<PartitionID, int64_t> negativeRelevancy;
int64_t spaceVertices = 0;
int64_t spaceEdges = 0;
// Once 1000 records are scanned, check if we need to sleep for a while to prevent high pressure
// on io, the sleep time is stats_sleep_interval_ms
size_t countToSleep = 0;

for (auto tag : tags) {
tagsVertices[tag.first] = 0;
Expand Down Expand Up @@ -167,6 +175,7 @@ nebula::cpp2::ErrorCode StatsTask::genSubTask(GraphSpaceID spaceId,
}
tagsVertices[tagId] += 1;
tagIter->next();
sleepIfScannedSomeRecord(++countToSleep);
}

// Only stats valid edge data, no multi version
Expand Down Expand Up @@ -216,10 +225,12 @@ nebula::cpp2::ErrorCode StatsTask::genSubTask(GraphSpaceID spaceId,
negativeRelevancy[sourceVid % partitionNum + 1]++;
}
edgeIter->next();
sleepIfScannedSomeRecord(++countToSleep);
}
while (vertexIter && vertexIter->valid()) {
spaceVertices++;
vertexIter->next();
sleepIfScannedSomeRecord(++countToSleep);
}
nebula::meta::cpp2::StatsItem statsItem;

Expand Down Expand Up @@ -334,5 +345,12 @@ void StatsTask::finish(nebula::cpp2::ErrorCode rc) {
}
}

void StatsTask::sleepIfScannedSomeRecord(size_t& countToSleep) {
if (FLAGS_stats_sleep_interval_ms > 0 && countToSleep >= kRecordsToSleep) {
usleep(FLAGS_stats_sleep_interval_ms * 1000);
countToSleep = 0;
}
}

} // namespace storage
} // namespace nebula
4 changes: 4 additions & 0 deletions src/storage/admin/StatsTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class StatsTask : public AdminTask {
private:
nebula::cpp2::ErrorCode getSchemas(GraphSpaceID spaceId);

void sleepIfScannedSomeRecord(size_t& countToSleep);

protected:
GraphSpaceID spaceId_;

Expand All @@ -65,6 +67,8 @@ class StatsTask : public AdminTask {

// The number of subtasks equals to the number of parts in request
size_t subTaskSize_{0};

static constexpr size_t kRecordsToSleep{1000};
};

} // namespace storage
Expand Down

0 comments on commit fc44449

Please sign in to comment.