Skip to content
This repository has been archived by the owner on Dec 12, 2022. It is now read-only.

Commit

Permalink
Added some verify actions
Browse files Browse the repository at this point in the history
  • Loading branch information
bright-starry-sky committed Sep 3, 2020
1 parent 8604dbc commit d5790ef
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 30 deletions.
Binary file modified conf/checkpoint_create_restore_plan.gv.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
117 changes: 96 additions & 21 deletions conf/checkpoint_create_restore_plan.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"name": "Nebula checkpoint plan",
"concurrency": 10,
"email": "",
"rolling_table": true,
"instances": [
{
Expand Down Expand Up @@ -39,192 +40,266 @@
"user": ""
}
],
"actions" : [
"actions" : [
{
"action_num" : 0,
"type": "StartAction",
"inst_index": 0,
"depends": [2, 3, 4]
},
{
"action_num" : 1,
"type": "StartAction",
"inst_index": 1,
"depends": []
},
{
"action_num" : 2,
"type": "StartAction",
"inst_index": 2,
"depends": [1]
},
{
"action_num" : 3,
"type": "StartAction",
"inst_index": 3,
"depends": [1]
},
{
"action_num" : 4,
"type": "StartAction",
"inst_index": 4,
"depends": [1]
},
{
"action_num" : 5,
"type": "WaitAction",
"wait_time_ms": 15000,
"depends": [0]
},
{
"action_num" : 6,
"type": "ClientConnectAction",
"depends": [5]
},
{
"action_num" : 7,
"type": "CreateSpaceAction",
"space_name": "checkpoint_space",
"replica": 3,
"parts": 10,
"depends": [6]
},
{
"action_num" : 8,
"type": "UseSpaceAction",
"space_name": "checkpoint_space",
"depends": [7]
},
{
"action_num" : 9,
"type": "CreateSchemaAction",
"name": "checkpoint_test",
"props": [
{"name": "nextId", "type": "int"}
{"name": "nextId", "type": "string"}
],
"edge_or_tag": false,
"depends": [8]
},
{
"action_num" : 10,
"type": "WaitAction",
"wait_time_ms": 5000,
"depends": [9]
},
{
"action_num" : 11,
"type": "BalanceLeaderAction",
"depends": [10]
},
{
"action_num" : 12,
"type": "CheckLeadersAction",
"expected_num": 10,
"space": "checkpoint_space",
"depends": [11]
},
{
"type": "WriteCircleAction",
"action_num" : 13,
"type": "WriteVerticesAction",
"tag": "checkpoint_test",
"col": "nextId",
"total_rows": 100000,
"total_rows": 50000,
"str_val_prefix": "row_",
"depends": [12]
},
{
"action_num" : 14,
"type": "CreateCheckpointAction",
"wait_time_ms": 3000,
"depends": [13]
},
{
"action_num" : 15,
"type": "WaitAction",
"wait_time_ms": 3000,
"depends": [14]
},
{
"type": "WriteCircleAction",
"action_num" : 16,
"type": "WriteVerticesAction",
"tag": "checkpoint_test",
"col": "nextId",
"total_rows": 200000,
"total_rows": 60000,
"str_val_prefix": "new_row_",
"depends": [15]
},
{
"action_num" : 17,
"type": "StopAction",
"inst_index": 1,
"depends": [16]
},
{
"action_num" : 18,
"type": "StopAction",
"inst_index": 2,
"depends": [16]
},
{
"action_num" : 19,
"type": "StopAction",
"inst_index": 3,
"depends": [16]
},
{
"action_num" : 20,
"type": "StopAction",
"inst_index": 4,
"depends": [16]
},
{
"action_num" : 21,
"type": "WaitAction",
"wait_time_ms": 3000,
"depends": [17, 18, 19]
"depends": [17, 18, 19, 20]
},
{
"action_num" : 22,
"type": "RestoreFromCheckpointAction",
"inst_index": 1,
"depends": [21]
},
{
"action_num" : 23,
"type": "RestoreFromCheckpointAction",
"inst_index": 2,
"depends": [20]
"depends": [21]
},
{
"action_num" : 24,
"type": "RestoreFromCheckpointAction",
"inst_index": 3,
"depends": [20]
},
{
"action_num" : 25,
"type": "RestoreFromCheckpointAction",
"inst_index": 4,
"depends": [20]
},
{
"action_num" : 26,
"type": "WaitAction",
"wait_time_ms": 3000,
"depends": [21, 22, 23]
"depends": [22, 23, 24, 25]
},
{
"action_num" : 27,
"type": "StartAction",
"inst_index": 1,
"depends": [26]
},
{
"action_num" : 28,
"type": "StartAction",
"inst_index": 2,
"depends": [24]
"depends": [26]
},
{
"action_num" : 29,
"type": "StartAction",
"inst_index": 3,
"depends": [24]
"depends": [26]
},
{
"action_num" : 30,
"type": "StartAction",
"inst_index": 4,
"depends": [24]
"depends": [26]
},
{
"action_num" : 31,
"type": "WaitAction",
"wait_time_ms": 15000,
"depends": [25, 26, 27]
"depends": [27, 28, 29, 30]
},
{
"action_num" : 32,
"type": "CheckLeadersAction",
"expected_num": 10,
"space": "checkpoint_space",
"depends": [31]
},
{
"action_num" : 33,
"type": "VerifyVerticesAction",
"tag": "checkpoint_test",
"col": "nextId",
"total_rows": 50000,
"str_val_prefix": "row_",
"depends": [32]
},
{
"type": "WalkThroughAction",
"action_num" : 34,
"type": "VerifyVerticesAction",
"tag": "checkpoint_test",
"col": "nextId",
"total_rows": 100000,
"depends": [28]
"total_rows": 10000,
"start_vid": 60001,
"str_val_prefix": "new_row_",
"expected_fail": true,
"depends": [33]
},
{
"action_num" : 35,
"type": "StopAction",
"inst_index": 0,
"depends": [29]
"depends": [34]
},
{
"action_num" : 36,
"type": "StopAction",
"inst_index": 1,
"depends": [29]
"depends": [34]
},
{
"action_num" : 37,
"type": "StopAction",
"inst_index": 2,
"depends": [29]
"depends": [34]
},
{
"action_num" : 38,
"type": "StopAction",
"inst_index": 3,
"depends": [29]
"depends": [34]
},
{
"action_num" : 39,
"type": "StopAction",
"inst_index": 4,
"depends": [29]
"depends": [34]
}
]
}
16 changes: 7 additions & 9 deletions src/nebula/NebulaAction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ ResultCode WriteVerticesAction::doRun() {
std::vector<std::string> batchCmds;
batchCmds.reserve(1024);
uint64_t vid = startVid_;
while (vid <= totalRows_ + startVid_) {
while (vid < totalRows_ + startVid_) {
if (batchCmds.size() == batchNum_) {
auto res = sendBatch(batchCmds);
if (res != ResultCode::OK) {
Expand All @@ -191,7 +191,7 @@ ResultCode WriteVerticesAction::doRun() {
<< vid;
batchCmds.clear();
}
auto val = folly::stringPrintf("%ld:%s%ld", vid, strValPrefix_.c_str(), vid);
batchCmds.emplace_back(folly::stringPrintf("%ld:(\"%s%ld\")", vid, strValPrefix_.c_str(), vid));
vid++;
}
auto res = sendBatch(batchCmds);
Expand Down Expand Up @@ -908,13 +908,10 @@ bool VerifyVerticesAction::verifyVertex(const std::string& cmd, int64_t vid) {
while (++tryTimes < try_) {
auto res = client_->execute(cmd, resp);
if (res == Code::SUCCEEDED) {
if (!resp.__isset.rows || resp.rows.empty() || resp.rows[0].columns.empty()) {
LOG(ERROR) << "Bad result, resp.rows size " << resp.rows.size();
return false;
if (resp.__isset.rows && !resp.rows.empty() && resp.rows.size() > 0) {
auto val = folly::stringPrintf("%s%ld", strValPrefix_.c_str(), vid);
return resp.rows[0].columns[1].get_str() == val;
}
VLOG(1) << resp.rows.size() << ", " << resp.rows[0].columns.size();
auto val = folly::stringPrintf("%s%ld", strValPrefix_.c_str(), vid);
return resp.rows[0].columns[1].get_str() == val;
} else {
usleep(retryInterval * 1000 * tryTimes);
LOG(WARNING) << "Failed to send request, tryTimes " << tryTimes
Expand All @@ -927,7 +924,7 @@ bool VerifyVerticesAction::verifyVertex(const std::string& cmd, int64_t vid) {
ResultCode VerifyVerticesAction::doRun() {
CHECK_NOTNULL(client_);
uint64_t id = startVid_;
while (id++ <= totalRows_ + startVid_) {
while (id < totalRows_ + startVid_) {
auto cmd = folly::stringPrintf("FETCH PROP ON %s %ld YIELD %s.%s",
tag_.c_str(),
id,
Expand All @@ -939,6 +936,7 @@ ResultCode VerifyVerticesAction::doRun() {
LOG(ERROR) << "verify vertex fail : " << id;
return ResultCode::ERR_FAILED;
}
id++;
}
return id == totalRows_ + startVid_ ? ResultCode::OK : ResultCode::ERR_FAILED;
}
Expand Down
6 changes: 6 additions & 0 deletions src/nebula/NebulaUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ class Utils {
retryInterval);
} else if (type == "WriteVerticesAction") {
auto tag = obj.at("tag").asString();
if (ctx.rolling) {
tag = Utils::getOperatingTable(tag);
}
auto col = obj.at("col").asString();
auto totalRows = obj.getDefault("total_rows", 100000).asInt();
auto batchNum = obj.getDefault("batch_num", 1).asInt();
Expand Down Expand Up @@ -338,6 +341,9 @@ class Utils {
valExpr);
} else if (type == "VerifyVerticesAction") {
auto tag = obj.at("tag").asString();
if (ctx.rolling) {
tag = Utils::getOperatingTable(tag);
}
auto col = obj.at("col").asString();
auto totalRows = obj.getDefault("total_rows", 100000).asInt();
auto tryNum = obj.getDefault("try_num", 32).asInt();
Expand Down

0 comments on commit d5790ef

Please sign in to comment.