Skip to content
This repository has been archived by the owner on Dec 12, 2022. It is now read-only.

Commit

Permalink
add actions for snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
cbs authored and bright-starry-sky committed Sep 3, 2020
1 parent bdfc9e8 commit 8604dbc
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 2 deletions.
67 changes: 67 additions & 0 deletions src/nebula/NebulaAction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,30 @@ ResultCode MetaAction::doRun() {
return ResultCode::ERR_FAILED;
}

ResultCode WriteVerticesAction::doRun() {
CHECK_NOTNULL(this->client_);
std::vector<std::string> batchCmds;
batchCmds.reserve(1024);
uint64_t vid = startVid_;
while (vid <= totalRows_ + startVid_) {
if (batchCmds.size() == batchNum_) {
auto res = sendBatch(batchCmds);
if (res != ResultCode::OK) {
LOG(ERROR) << "Send request failed!";
return res;
}
FB_LOG_EVERY_MS(INFO, 3000) << "Send requests successfully, row "
<< vid;
batchCmds.clear();
}
auto val = folly::stringPrintf("%ld:%s%ld", vid, strValPrefix_.c_str(), vid);
vid++;
}
auto res = sendBatch(batchCmds);
LOG(INFO) << "Send all requests successfully, rows count : " << totalRows_;
return res;
}

ResultCode WriteCircleAction::sendBatch(const std::vector<std::string>& batchCmds) {
auto joinStr = folly::join(",", batchCmds);
auto cmd = folly::stringPrintf("INSERT VERTEX %s (%s) VALUES %s",
Expand Down Expand Up @@ -875,5 +899,48 @@ ResultCode RestoreFromDataDirAction::doRun() {
}
return ResultCode::OK;
}

bool VerifyVerticesAction::verifyVertex(const std::string& cmd, int64_t vid) {
VLOG(1) << cmd;
ExecutionResponse resp;
uint32_t tryTimes = 0;
uint32_t retryInterval = retryIntervalMs_;
while (++tryTimes < try_) {
auto res = client_->execute(cmd, resp);
if (res == Code::SUCCEEDED) {
if (!resp.__isset.rows || resp.rows.empty() || resp.rows[0].columns.empty()) {
LOG(ERROR) << "Bad result, resp.rows size " << resp.rows.size();
return false;
}
VLOG(1) << resp.rows.size() << ", " << resp.rows[0].columns.size();
auto val = folly::stringPrintf("%s%ld", strValPrefix_.c_str(), vid);
return resp.rows[0].columns[1].get_str() == val;
} else {
usleep(retryInterval * 1000 * tryTimes);
LOG(WARNING) << "Failed to send request, tryTimes " << tryTimes
<< ", error code " << static_cast<int32_t>(res);
}
}
return false;
}

ResultCode VerifyVerticesAction::doRun() {
CHECK_NOTNULL(client_);
uint64_t id = startVid_;
while (id++ <= totalRows_ + startVid_) {
auto cmd = folly::stringPrintf("FETCH PROP ON %s %ld YIELD %s.%s",
tag_.c_str(),
id,
tag_.c_str(),
col_.c_str());
FB_LOG_EVERY_MS(INFO, 3000) << cmd;
auto res = verifyVertex(cmd, id);
if ((res && expectedFail_) || (!res && !expectedFail_)) {
LOG(ERROR) << "verify vertex fail : " << id;
return ResultCode::ERR_FAILED;
}
}
return id == totalRows_ + startVid_ ? ResultCode::OK : ResultCode::ERR_FAILED;
}
} // namespace nebula
} // namespace nebula_chaos
83 changes: 81 additions & 2 deletions src/nebula/NebulaAction.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,10 @@ class WriteCircleAction : public core::Action {
return folly::stringPrintf("Write data to %s", client_->serverAddress().c_str());
}

private:
protected:
ResultCode sendBatch(const std::vector<std::string>& batchCmds);

private:
protected:
GraphClient* client_ = nullptr;
std::string tag_;
std::string col_;
Expand All @@ -142,6 +142,38 @@ class WriteCircleAction : public core::Action {
uint32_t retryIntervalMs_;
};

class WriteVerticesAction : public WriteCircleAction {
public:
WriteVerticesAction(GraphClient* client,
const std::string& tag,
const std::string& col,
uint64_t totalRows = 10000,
uint32_t batchNum = 1,
uint32_t tryNum = 32,
uint32_t retryIntervalMs = 1,
uint64_t startVid = 1,
const std::string& strValPrefix = "row_")
: WriteCircleAction(client,
tag,
col,
totalRows,
batchNum,
tryNum,
retryIntervalMs)
, client_(client)
, startVid_(startVid)
, strValPrefix_(strValPrefix) {};

virtual ~WriteVerticesAction() = default;

ResultCode doRun() override;

private:
GraphClient* client_ = nullptr;
uint64_t startVid_;
std::string strValPrefix_;
};

class WalkThroughAction : public core::Action {
public:
WalkThroughAction(GraphClient* client,
Expand Down Expand Up @@ -660,6 +692,53 @@ class RestoreFromDataDirAction : public core::Action {
NebulaInstance* inst_;
std::string srcDataPaths_;
};

class VerifyVerticesAction : public core::Action {
public:
VerifyVerticesAction(GraphClient* client,
const std::string& tag,
const std::string& col,
uint64_t totalRows,
uint32_t tryNum = 32,
uint32_t retryIntervalMs = 1,
uint64_t startVid = 1,
const std::string& strValPrefix = "row_",
bool expectedFail = false)
: client_(client)
, tag_(tag)
, col_(col)
, totalRows_(totalRows)
, try_(tryNum)
, retryIntervalMs_(retryIntervalMs)
, startVid_(startVid)
, strValPrefix_(strValPrefix)
, expectedFail_(expectedFail) {}

~VerifyVerticesAction() = default;

ResultCode doRun() override;

std::string toString() const override {
return folly::stringPrintf("Verify vertices, from %ld, total %ld",
startVid_,
totalRows_);
}

private:
bool verifyVertex(const std::string& cmd, int64_t vid);

private:
GraphClient* client_ = nullptr;
std::string tag_;
std::string col_;
uint64_t totalRows_;
uint32_t try_;
uint32_t retryIntervalMs_;
uint64_t startVid_ = 1;
std::string strValPrefix_;
bool expectedFail_;
};

} // namespace nebula
} // namespace nebula_chaos

Expand Down
36 changes: 36 additions & 0 deletions src/nebula/NebulaUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,24 @@ class Utils {
batchNum,
tryNum,
retryInterval);
} else if (type == "WriteVerticesAction") {
auto tag = obj.at("tag").asString();
auto col = obj.at("col").asString();
auto totalRows = obj.getDefault("total_rows", 100000).asInt();
auto batchNum = obj.getDefault("batch_num", 1).asInt();
auto tryNum = obj.getDefault("try_num", 32).asInt();
auto retryInterval = obj.getDefault("retry_interval_ms", 100).asInt();
auto startVid = obj.getDefault("start_vid", 1).asInt();
auto strValPrefix = obj.getDefault("str_val_prefix", "row_").asString();
return std::make_unique<WriteVerticesAction>(ctx.gClient,
tag,
col,
totalRows,
batchNum,
tryNum,
retryInterval,
startVid,
strValPrefix);
} else if (type == "WalkThroughAction") {
auto tag = obj.at("tag").asString();
if (ctx.rolling) {
Expand Down Expand Up @@ -318,6 +336,24 @@ class Utils {
return std::make_unique<core::AssignAction>(&ctx.planCtx->actionCtx,
varName,
valExpr);
} else if (type == "VerifyVerticesAction") {
auto tag = obj.at("tag").asString();
auto col = obj.at("col").asString();
auto totalRows = obj.getDefault("total_rows", 100000).asInt();
auto tryNum = obj.getDefault("try_num", 32).asInt();
auto retryInterval = obj.getDefault("retry_interval_ms", 100).asInt();
auto startVid = obj.getDefault("start_vid", 1).asInt();
auto strValPrefix = obj.getDefault("str_val_prefix", "row_").asString();
auto expectedFail = obj.getDefault("expected_fail", false).asBool();
return std::make_unique<VerifyVerticesAction>(ctx.gClient,
tag,
col,
totalRows,
tryNum,
retryInterval,
startVid,
strValPrefix,
expectedFail);
}
LOG(FATAL) << "Unknown type " << type;
return nullptr;
Expand Down

0 comments on commit 8604dbc

Please sign in to comment.