From 8604dbc5f15b8846e742028c0cfefed70bde6a0d Mon Sep 17 00:00:00 2001 From: cbs Date: Thu, 3 Sep 2020 19:28:33 +0800 Subject: [PATCH] add actions for snapshot --- src/nebula/NebulaAction.cpp | 67 ++++++++++++++++++++++++++++++ src/nebula/NebulaAction.h | 83 ++++++++++++++++++++++++++++++++++++- src/nebula/NebulaUtils.h | 36 ++++++++++++++++ 3 files changed, 184 insertions(+), 2 deletions(-) diff --git a/src/nebula/NebulaAction.cpp b/src/nebula/NebulaAction.cpp index 608c71c..fe72a41 100644 --- a/src/nebula/NebulaAction.cpp +++ b/src/nebula/NebulaAction.cpp @@ -175,6 +175,30 @@ ResultCode MetaAction::doRun() { return ResultCode::ERR_FAILED; } +ResultCode WriteVerticesAction::doRun() { + CHECK_NOTNULL(this->client_); + std::vector batchCmds; + batchCmds.reserve(1024); + uint64_t vid = startVid_; + while (vid <= totalRows_ + startVid_) { + if (batchCmds.size() == batchNum_) { + auto res = sendBatch(batchCmds); + if (res != ResultCode::OK) { + LOG(ERROR) << "Send request failed!"; + return res; + } + FB_LOG_EVERY_MS(INFO, 3000) << "Send requests successfully, row " + << vid; + batchCmds.clear(); + } + auto val = folly::stringPrintf("%ld:%s%ld", vid, strValPrefix_.c_str(), vid); + vid++; + } + auto res = sendBatch(batchCmds); + LOG(INFO) << "Send all requests successfully, rows count : " << totalRows_; + return res; +} + ResultCode WriteCircleAction::sendBatch(const std::vector& batchCmds) { auto joinStr = folly::join(",", batchCmds); auto cmd = folly::stringPrintf("INSERT VERTEX %s (%s) VALUES %s", @@ -875,5 +899,48 @@ ResultCode RestoreFromDataDirAction::doRun() { } return ResultCode::OK; } + +bool VerifyVerticesAction::verifyVertex(const std::string& cmd, int64_t vid) { + VLOG(1) << cmd; + ExecutionResponse resp; + uint32_t tryTimes = 0; + uint32_t retryInterval = retryIntervalMs_; + while (++tryTimes < try_) { + auto res = client_->execute(cmd, resp); + if (res == Code::SUCCEEDED) { + if (!resp.__isset.rows || resp.rows.empty() || resp.rows[0].columns.empty()) { + LOG(ERROR) << "Bad result, resp.rows size " << resp.rows.size(); + return false; + } + VLOG(1) << resp.rows.size() << ", " << resp.rows[0].columns.size(); + auto val = folly::stringPrintf("%s%ld", strValPrefix_.c_str(), vid); + return resp.rows[0].columns[1].get_str() == val; + } else { + usleep(retryInterval * 1000 * tryTimes); + LOG(WARNING) << "Failed to send request, tryTimes " << tryTimes + << ", error code " << static_cast(res); + } + } + return false; +} + +ResultCode VerifyVerticesAction::doRun() { + CHECK_NOTNULL(client_); + uint64_t id = startVid_; + while (id++ <= totalRows_ + startVid_) { + auto cmd = folly::stringPrintf("FETCH PROP ON %s %ld YIELD %s.%s", + tag_.c_str(), + id, + tag_.c_str(), + col_.c_str()); + FB_LOG_EVERY_MS(INFO, 3000) << cmd; + auto res = verifyVertex(cmd, id); + if ((res && expectedFail_) || (!res && !expectedFail_)) { + LOG(ERROR) << "verify vertex fail : " << id; + return ResultCode::ERR_FAILED; + } + } + return id == totalRows_ + startVid_ ? ResultCode::OK : ResultCode::ERR_FAILED; +} } // namespace nebula } // namespace nebula_chaos diff --git a/src/nebula/NebulaAction.h b/src/nebula/NebulaAction.h index 78dd520..3a3fc1d 100644 --- a/src/nebula/NebulaAction.h +++ b/src/nebula/NebulaAction.h @@ -129,10 +129,10 @@ class WriteCircleAction : public core::Action { return folly::stringPrintf("Write data to %s", client_->serverAddress().c_str()); } -private: +protected: ResultCode sendBatch(const std::vector& batchCmds); -private: +protected: GraphClient* client_ = nullptr; std::string tag_; std::string col_; @@ -142,6 +142,38 @@ class WriteCircleAction : public core::Action { uint32_t retryIntervalMs_; }; +class WriteVerticesAction : public WriteCircleAction { +public: + WriteVerticesAction(GraphClient* client, + const std::string& tag, + const std::string& col, + uint64_t totalRows = 10000, + uint32_t batchNum = 1, + uint32_t tryNum = 32, + uint32_t retryIntervalMs = 1, + uint64_t startVid = 1, + const std::string& strValPrefix = "row_") + : WriteCircleAction(client, + tag, + col, + totalRows, + batchNum, + tryNum, + retryIntervalMs) + , client_(client) + , startVid_(startVid) + , strValPrefix_(strValPrefix) {}; + + virtual ~WriteVerticesAction() = default; + + ResultCode doRun() override; + +private: + GraphClient* client_ = nullptr; + uint64_t startVid_; + std::string strValPrefix_; +}; + class WalkThroughAction : public core::Action { public: WalkThroughAction(GraphClient* client, @@ -660,6 +692,53 @@ class RestoreFromDataDirAction : public core::Action { NebulaInstance* inst_; std::string srcDataPaths_; }; + +class VerifyVerticesAction : public core::Action { +public: + VerifyVerticesAction(GraphClient* client, + const std::string& tag, + const std::string& col, + uint64_t totalRows, + uint32_t tryNum = 32, + uint32_t retryIntervalMs = 1, + uint64_t startVid = 1, + const std::string& strValPrefix = "row_", + bool expectedFail = false) + : client_(client) + , tag_(tag) + , col_(col) + , totalRows_(totalRows) + , try_(tryNum) + , retryIntervalMs_(retryIntervalMs) + , startVid_(startVid) + , strValPrefix_(strValPrefix) + , expectedFail_(expectedFail) {} + + ~VerifyVerticesAction() = default; + + ResultCode doRun() override; + + std::string toString() const override { + return folly::stringPrintf("Verify vertices, from %ld, total %ld", + startVid_, + totalRows_); + } + +private: + bool verifyVertex(const std::string& cmd, int64_t vid); + +private: + GraphClient* client_ = nullptr; + std::string tag_; + std::string col_; + uint64_t totalRows_; + uint32_t try_; + uint32_t retryIntervalMs_; + uint64_t startVid_ = 1; + std::string strValPrefix_; + bool expectedFail_; +}; + } // namespace nebula } // namespace nebula_chaos diff --git a/src/nebula/NebulaUtils.h b/src/nebula/NebulaUtils.h index 30cb192..d242375 100644 --- a/src/nebula/NebulaUtils.h +++ b/src/nebula/NebulaUtils.h @@ -85,6 +85,24 @@ class Utils { batchNum, tryNum, retryInterval); + } else if (type == "WriteVerticesAction") { + auto tag = obj.at("tag").asString(); + auto col = obj.at("col").asString(); + auto totalRows = obj.getDefault("total_rows", 100000).asInt(); + auto batchNum = obj.getDefault("batch_num", 1).asInt(); + auto tryNum = obj.getDefault("try_num", 32).asInt(); + auto retryInterval = obj.getDefault("retry_interval_ms", 100).asInt(); + auto startVid = obj.getDefault("start_vid", 1).asInt(); + auto strValPrefix = obj.getDefault("str_val_prefix", "row_").asString(); + return std::make_unique(ctx.gClient, + tag, + col, + totalRows, + batchNum, + tryNum, + retryInterval, + startVid, + strValPrefix); } else if (type == "WalkThroughAction") { auto tag = obj.at("tag").asString(); if (ctx.rolling) { @@ -318,6 +336,24 @@ class Utils { return std::make_unique(&ctx.planCtx->actionCtx, varName, valExpr); + } else if (type == "VerifyVerticesAction") { + auto tag = obj.at("tag").asString(); + auto col = obj.at("col").asString(); + auto totalRows = obj.getDefault("total_rows", 100000).asInt(); + auto tryNum = obj.getDefault("try_num", 32).asInt(); + auto retryInterval = obj.getDefault("retry_interval_ms", 100).asInt(); + auto startVid = obj.getDefault("start_vid", 1).asInt(); + auto strValPrefix = obj.getDefault("str_val_prefix", "row_").asString(); + auto expectedFail = obj.getDefault("expected_fail", false).asBool(); + return std::make_unique(ctx.gClient, + tag, + col, + totalRows, + tryNum, + retryInterval, + startVid, + strValPrefix, + expectedFail); } LOG(FATAL) << "Unknown type " << type; return nullptr;