From f4ad72fb74e33b1755bc546a23e24865742ed357 Mon Sep 17 00:00:00 2001 From: bright-starry-sky Date: Wed, 26 Aug 2020 13:57:49 +0800 Subject: [PATCH 1/8] unstable testcase snapshot_command_test --- src/kvstore/NebulaStore.cpp | 4 ++++ src/storage/admin/CreateCheckpointProcessor.cpp | 1 + src/storage/admin/DropCheckpointProcessor.cpp | 1 + 3 files changed, 6 insertions(+) diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index 8b016ef39fd..93046bd9302 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -795,6 +795,8 @@ ResultCode NebulaStore::setWriteBlocking(GraphSpaceID spaceId, bool sign) { } auto p = nebula::value(partRet); if (p->isLeader()) { + LOG(INFO) << "Begin write block for space : " << spaceId + << ", part : " << p->partitionId(); auto ret = ResultCode::SUCCEEDED; p->setBlocking(sign); if (sign) { @@ -811,6 +813,8 @@ ResultCode NebulaStore::setWriteBlocking(GraphSpaceID spaceId, bool sign) { LOG(ERROR) << "Part sync failed. space : " << spaceId << " Part : " << part; return ret; } + LOG(INFO) << "End write block for space : " << spaceId + << ", part : " << p->partitionId(); } } } diff --git a/src/storage/admin/CreateCheckpointProcessor.cpp b/src/storage/admin/CreateCheckpointProcessor.cpp index 6dadc431333..a9314f8e398 100644 --- a/src/storage/admin/CreateCheckpointProcessor.cpp +++ b/src/storage/admin/CreateCheckpointProcessor.cpp @@ -11,6 +11,7 @@ namespace storage { void CreateCheckpointProcessor::process(const cpp2::CreateCPRequest& req) { CHECK_NOTNULL(kvstore_); + LOG(INFO) << "Begin create checkpoint for space " << req.get_space_id(); auto spaceId = req.get_space_id(); auto& name = req.get_name(); auto retCode = kvstore_->createCheckpoint(spaceId, std::move(name)); diff --git a/src/storage/admin/DropCheckpointProcessor.cpp b/src/storage/admin/DropCheckpointProcessor.cpp index 54633c38739..d096a2ca031 100644 --- a/src/storage/admin/DropCheckpointProcessor.cpp +++ b/src/storage/admin/DropCheckpointProcessor.cpp @@ -11,6 +11,7 @@ namespace storage { void DropCheckpointProcessor::process(const cpp2::DropCPRequest& req) { CHECK_NOTNULL(kvstore_); + LOG(INFO) << "Begin drop checkpoint for space " << req.get_space_id(); auto spaceId = req.get_space_id(); auto& name = req.get_name(); auto retCode = kvstore_->dropCheckpoint(spaceId, std::move(name)); From 1ee44656dc142b19c6c64ebe6e766c28ea906c1e Mon Sep 17 00:00:00 2001 From: bright-starry-sky Date: Fri, 28 Aug 2020 12:06:32 +0800 Subject: [PATCH 2/8] add some debug info --- src/kvstore/NebulaStore.cpp | 2 ++ src/kvstore/RocksEngine.cpp | 1 + 2 files changed, 3 insertions(+) diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index 93046bd9302..e5ce1e38455 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -730,6 +730,7 @@ ResultCode NebulaStore::createCheckpoint(GraphSpaceID spaceId, const std::string auto space = nebula::value(spaceRet); for (auto& engine : space->engines_) { + LOG(INFO) << "Begin checkpoint for engine : " << engine->getDataRoot(); auto code = engine->createCheckpoint(name); if (code != ResultCode::SUCCEEDED) { return code; @@ -749,6 +750,7 @@ ResultCode NebulaStore::createCheckpoint(GraphSpaceID spaceId, const std::string return ResultCode::ERR_CHECKPOINT_ERROR; } } + LOG(INFO) << "End checkpoint for engine : " << engine->getDataRoot(); } return ResultCode::SUCCEEDED; } diff --git a/src/kvstore/RocksEngine.cpp b/src/kvstore/RocksEngine.cpp index e3396f5e64b..733371baa45 100644 --- a/src/kvstore/RocksEngine.cpp +++ b/src/kvstore/RocksEngine.cpp @@ -474,6 +474,7 @@ ResultCode RocksEngine::createCheckpoint(const std::string& name) { LOG(ERROR) << "Create checkpoint Failed: " << status.ToString(); return ResultCode::ERR_CHECKPOINT_ERROR; } + LOG(INFO) << "End checkpoint : " << dataPath_; return ResultCode::SUCCEEDED; } From 9b6b8de33cf857240d6660aa3d5fb9bc293df7dd Mon Sep 17 00:00:00 2001 From: bright-starry-sky Date: Fri, 28 Aug 2020 12:38:25 +0800 Subject: [PATCH 3/8] waiting resp --- src/graph/test/SnapshotCommandTest.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/graph/test/SnapshotCommandTest.cpp b/src/graph/test/SnapshotCommandTest.cpp index cea4619c144..5cc647c5902 100644 --- a/src/graph/test/SnapshotCommandTest.cpp +++ b/src/graph/test/SnapshotCommandTest.cpp @@ -46,6 +46,7 @@ TEST_F(SnapshotCommandTest, TestSnapshot) { auto code = client->execute(cmd, resp); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); } + sleep(FLAGS_heartbeat_interval_secs + 1); std::string sname; ASSERT_NE(nullptr, client); { From 9fe247682fa39a21e0a0037651ba1b7736def61f Mon Sep 17 00:00:00 2001 From: bright-starry-sky Date: Fri, 28 Aug 2020 13:07:55 +0800 Subject: [PATCH 4/8] retry --- src/graph/test/SnapshotCommandTest.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/graph/test/SnapshotCommandTest.cpp b/src/graph/test/SnapshotCommandTest.cpp index 5cc647c5902..b6a8349fdf6 100644 --- a/src/graph/test/SnapshotCommandTest.cpp +++ b/src/graph/test/SnapshotCommandTest.cpp @@ -84,7 +84,7 @@ TEST_F(SnapshotCommandTest, TestSnapshot) { auto code = client->execute(cmd, resp); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); } - + sleep(FLAGS_heartbeat_interval_secs + 1); { std::vector checkpoints; checkpoints.emplace_back(folly::stringPrintf("%s/disk1/nebula/1/checkpoints/%s", From 52aeb6e2e858716c81091c8efc818401ac19c1d1 Mon Sep 17 00:00:00 2001 From: bright-starry-sky Date: Fri, 28 Aug 2020 17:06:29 +0800 Subject: [PATCH 5/8] wait metaClient ready --- src/graph/test/TestEnv.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/graph/test/TestEnv.cpp b/src/graph/test/TestEnv.cpp index 4a978294876..a87d895ce8c 100644 --- a/src/graph/test/TestEnv.cpp +++ b/src/graph/test/TestEnv.cpp @@ -60,7 +60,11 @@ void TestEnv::SetUp() { mClient_ = std::make_unique(threadPool, std::move(addrsRet.value()), options); - mClient_->waitForMetadReady(); + auto ready = mClient_->waitForMetadReady(3); + if (!ready) { + // Resort to retrying in the background + LOG(WARNING) << "Failed to synchronously wait for meta service ready"; + } gflagsManager_ = std::make_unique(mClient_.get()); IPv4 localIp; @@ -84,7 +88,6 @@ void TestEnv::TearDown() { storageServer_.reset(); mClient_.reset(); metaServer_.reset(); - mClient_.reset(); } uint16_t TestEnv::graphServerPort() const { From b86d9a1b1d87cb6f2b025817de312d21e99c413f Mon Sep 17 00:00:00 2001 From: bright-starry-sky Date: Fri, 28 Aug 2020 17:25:26 +0800 Subject: [PATCH 6/8] add debug info --- src/meta/processors/admin/SnapShot.cpp | 28 +++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/src/meta/processors/admin/SnapShot.cpp b/src/meta/processors/admin/SnapShot.cpp index d48ecf0af97..0afdaaeff67 100644 --- a/src/meta/processors/admin/SnapShot.cpp +++ b/src/meta/processors/admin/SnapShot.cpp @@ -21,10 +21,16 @@ cpp2::ErrorCode Snapshot::createSnapshot(const std::string& name) { auto spacesHosts = retSpacesHosts.value(); for (const auto& spaceHosts : spacesHosts) { for (const auto& host : spaceHosts.second) { + LOG(INFO) << "Begin create snapshot on " + << network::NetworkUtils::toHosts({host}).c_str() + << ", Space " << spaceHosts.first; auto status = client_->createSnapshot(spaceHosts.first, name, host).get(); if (!status.ok()) { return cpp2::ErrorCode::E_RPC_FAILURE; } + LOG(INFO) << "End create snapshot on " + << network::NetworkUtils::toHosts({host}).c_str() + << ", Space " << spaceHosts.first; } } return cpp2::ErrorCode::SUCCEEDED; @@ -40,6 +46,9 @@ cpp2::ErrorCode Snapshot::dropSnapshot(const std::string& name, for (const auto& spaceHosts : spacesHosts) { for (const auto& host : spaceHosts.second) { if (std::find(hosts.begin(), hosts.end(), host) != hosts.end()) { + LOG(INFO) << "Begin drop snapshot on " + << network::NetworkUtils::toHosts({host}).c_str() + << ", Space " << spaceHosts.first; auto status = client_->dropSnapshot(spaceHosts.first, name, host).get(); if (!status.ok()) { auto msg = "failed drop checkpoint : \"%s\". on host %s. error %s"; @@ -49,6 +58,9 @@ cpp2::ErrorCode Snapshot::dropSnapshot(const std::string& name, status.toString().c_str()); LOG(ERROR) << error; } + LOG(INFO) << "End drop snapshot on " + << network::NetworkUtils::toHosts({host}).c_str() + << ", Space " << spaceHosts.first; } } } @@ -63,11 +75,17 @@ cpp2::ErrorCode Snapshot::blockingWrites(storage::cpp2::EngineSignType sign) { auto spacesHosts = retSpacesHosts.value(); for (const auto& spaceHosts : spacesHosts) { for (const auto& host : spaceHosts.second) { - auto status = client_->blockingWrites(spaceHosts.first, sign, host).get(); - if (!status.ok()) { - LOG(ERROR) << " Send blocking sign error on host : " - << network::NetworkUtils::toHosts({host}); - } + LOG(INFO) << "Begin blocking write on " + << network::NetworkUtils::toHosts({host}).c_str() + << ", Space " << spaceHosts.first; + auto status = client_->blockingWrites(spaceHosts.first, sign, host).get(); + if (!status.ok()) { + LOG(ERROR) << " Send blocking sign error on host : " + << network::NetworkUtils::toHosts({host}); + } + LOG(INFO) << "End blocking write on " + << network::NetworkUtils::toHosts({host}).c_str() + << ", Space " << spaceHosts.first; } } return cpp2::ErrorCode::SUCCEEDED; From 680d3cda11cc6b7bff1fed417757b77b1e0e8b60 Mon Sep 17 00:00:00 2001 From: bright-starry-sky Date: Thu, 3 Sep 2020 10:08:56 +0800 Subject: [PATCH 7/8] changed getResponse interface --- src/meta/processors/admin/AdminClient.cpp | 61 +++++++++++++++++++---- src/meta/processors/admin/AdminClient.h | 7 +++ 2 files changed, 58 insertions(+), 10 deletions(-) diff --git a/src/meta/processors/admin/AdminClient.cpp b/src/meta/processors/admin/AdminClient.cpp index 80ee94c4d72..d9a0c56bbd7 100644 --- a/src/meta/processors/admin/AdminClient.cpp +++ b/src/meta/processors/admin/AdminClient.cpp @@ -484,6 +484,48 @@ void AdminClient::getResponse( }); // via } + +template +folly::Future AdminClient::getResponseNoPart(const HostAddr& host, + Request req, + RemoteFunc remoteFunc, + RespGenerator respGen) { + folly::Promise pro; + auto f = pro.getFuture(); + auto* evb = ioThreadPool_->getEventBase(); + folly::via(evb, [evb, pro = std::move(pro), host, req = std::move(req), + remoteFunc = std::move(remoteFunc), respGen = std::move(respGen), + this] () mutable { + auto client = clientsMan_->client(host, evb); + remoteFunc(client, std::move(req)).via(evb) + .then([p = std::move(pro), respGen = std::move(respGen), host]( + folly::Try&& t) mutable { + // exception occurred during RPC + auto hostStr = network::NetworkUtils::intToIPv4(host.first); + if (t.hasException()) { + p.setValue(Status::Error(folly::stringPrintf( + "[%s:%d] RPC failure in AdminClient: %s", + hostStr.c_str(), + host.second, + t.exception().what().c_str()))); + return; + } + auto&& result = std::move(t).value().get_result(); + if (result.get_failed_codes().empty()) { + storage::cpp2::ResultCode resultCode; + resultCode.set_code(storage::cpp2::ErrorCode::SUCCEEDED); + p.setValue(respGen(resultCode)); + } else { + auto resp = result.get_failed_codes().front(); + p.setValue(respGen(std::move(resp))); + } + }); + }); + return f; +} + nebula::cpp2::HostAddr AdminClient::toThriftHost(const HostAddr& addr) { nebula::cpp2::HostAddr thriftAddr; thriftAddr.set_ip(addr.first); @@ -597,17 +639,16 @@ folly::Future AdminClient::createSnapshot(GraphSpaceID spaceId, req.set_space_id(spaceId); req.set_name(name); - folly::Promise pro; - auto f = pro.getFuture(); - - /** - * Don't need retry. - * Because existing checkpoint directories leads to fail again. - **/ - getResponse({host}, 0, std::move(req), [] (auto client, auto request) { + return getResponseNoPart(host, std::move(req), [] (auto client, auto request) { return client->future_createCheckpoint(request); - }, 0, std::move(pro), 0); - return f; + }, [] (auto&& resp) -> Status { + if (resp.get_code() == storage::cpp2::ErrorCode::SUCCEEDED) { + return Status::OK(); + } else { + return Status::Error("Create snapshot failed! code=%d", + static_cast(resp.get_code())); + } + }); } folly::Future AdminClient::dropSnapshot(GraphSpaceID spaceId, diff --git a/src/meta/processors/admin/AdminClient.h b/src/meta/processors/admin/AdminClient.h index 9ff5e387798..dbe847bafd0 100644 --- a/src/meta/processors/admin/AdminClient.h +++ b/src/meta/processors/admin/AdminClient.h @@ -137,6 +137,13 @@ class AdminClient { Request req, RemoteFunc remoteFunc, RespGenerator respGen); + template + folly::Future getResponseNoPart(const HostAddr& host, + Request req, + RemoteFunc remoteFunc, + RespGenerator respGen); template void getResponse(std::vector hosts, From fb57437e2be3b96932a87c14333dec6ec31f106d Mon Sep 17 00:00:00 2001 From: bright-starry-sky Date: Thu, 3 Sep 2020 13:16:20 +0800 Subject: [PATCH 8/8] To improve the checkpoint logic --- src/graph/test/SnapshotCommandTest.cpp | 3 +- src/graph/test/TestEnv.cpp | 7 +-- src/kvstore/NebulaStore.cpp | 38 +++++------- src/kvstore/RocksEngine.cpp | 8 +-- src/kvstore/raftex/RaftPart.cpp | 8 ++- src/kvstore/wal/FileBasedWal.cpp | 7 +++ src/meta/processors/admin/AdminClient.cpp | 61 +++---------------- src/meta/processors/admin/AdminClient.h | 7 --- .../admin/CreateSnapshotProcessor.cpp | 1 - src/meta/processors/admin/SnapShot.cpp | 28 ++------- .../admin/CreateCheckpointProcessor.cpp | 1 - src/storage/admin/DropCheckpointProcessor.cpp | 1 - 12 files changed, 47 insertions(+), 123 deletions(-) diff --git a/src/graph/test/SnapshotCommandTest.cpp b/src/graph/test/SnapshotCommandTest.cpp index b6a8349fdf6..cea4619c144 100644 --- a/src/graph/test/SnapshotCommandTest.cpp +++ b/src/graph/test/SnapshotCommandTest.cpp @@ -46,7 +46,6 @@ TEST_F(SnapshotCommandTest, TestSnapshot) { auto code = client->execute(cmd, resp); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); } - sleep(FLAGS_heartbeat_interval_secs + 1); std::string sname; ASSERT_NE(nullptr, client); { @@ -84,7 +83,7 @@ TEST_F(SnapshotCommandTest, TestSnapshot) { auto code = client->execute(cmd, resp); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); } - sleep(FLAGS_heartbeat_interval_secs + 1); + { std::vector checkpoints; checkpoints.emplace_back(folly::stringPrintf("%s/disk1/nebula/1/checkpoints/%s", diff --git a/src/graph/test/TestEnv.cpp b/src/graph/test/TestEnv.cpp index a87d895ce8c..4a978294876 100644 --- a/src/graph/test/TestEnv.cpp +++ b/src/graph/test/TestEnv.cpp @@ -60,11 +60,7 @@ void TestEnv::SetUp() { mClient_ = std::make_unique(threadPool, std::move(addrsRet.value()), options); - auto ready = mClient_->waitForMetadReady(3); - if (!ready) { - // Resort to retrying in the background - LOG(WARNING) << "Failed to synchronously wait for meta service ready"; - } + mClient_->waitForMetadReady(); gflagsManager_ = std::make_unique(mClient_.get()); IPv4 localIp; @@ -88,6 +84,7 @@ void TestEnv::TearDown() { storageServer_.reset(); mClient_.reset(); metaServer_.reset(); + mClient_.reset(); } uint16_t TestEnv::graphServerPort() const { diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index e5ce1e38455..7f6c66266ab 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -730,7 +730,6 @@ ResultCode NebulaStore::createCheckpoint(GraphSpaceID spaceId, const std::string auto space = nebula::value(spaceRet); for (auto& engine : space->engines_) { - LOG(INFO) << "Begin checkpoint for engine : " << engine->getDataRoot(); auto code = engine->createCheckpoint(name); if (code != ResultCode::SUCCEEDED) { return code; @@ -750,7 +749,6 @@ ResultCode NebulaStore::createCheckpoint(GraphSpaceID spaceId, const std::string return ResultCode::ERR_CHECKPOINT_ERROR; } } - LOG(INFO) << "End checkpoint for engine : " << engine->getDataRoot(); } return ResultCode::SUCCEEDED; } @@ -796,27 +794,21 @@ ResultCode NebulaStore::setWriteBlocking(GraphSpaceID spaceId, bool sign) { return error(partRet); } auto p = nebula::value(partRet); - if (p->isLeader()) { - LOG(INFO) << "Begin write block for space : " << spaceId - << ", part : " << p->partitionId(); - auto ret = ResultCode::SUCCEEDED; - p->setBlocking(sign); - if (sign) { - folly::Baton baton; - p->sync([&ret, &baton] (kvstore::ResultCode code) { - if (kvstore::ResultCode::SUCCEEDED != code) { - ret = code; - } - baton.post(); - }); - baton.wait(); - } - if (ret != ResultCode::SUCCEEDED) { - LOG(ERROR) << "Part sync failed. space : " << spaceId << " Part : " << part; - return ret; - } - LOG(INFO) << "End write block for space : " << spaceId - << ", part : " << p->partitionId(); + auto ret = ResultCode::SUCCEEDED; + p->setBlocking(sign); + if (sign) { + folly::Baton baton; + p->sync([&ret, &baton] (kvstore::ResultCode code) { + if (kvstore::ResultCode::SUCCEEDED != code) { + ret = code; + } + baton.post(); + }); + baton.wait(); + } + if (ret != ResultCode::SUCCEEDED) { + LOG(ERROR) << "Part sync failed. space : " << spaceId << " Part : " << part; + return ret; } } } diff --git a/src/kvstore/RocksEngine.cpp b/src/kvstore/RocksEngine.cpp index 733371baa45..97fa69aa194 100644 --- a/src/kvstore/RocksEngine.cpp +++ b/src/kvstore/RocksEngine.cpp @@ -449,9 +449,10 @@ ResultCode RocksEngine::createCheckpoint(const std::string& name) { auto checkpointPath = folly::stringPrintf("%s/checkpoints/%s/data", dataPath_.c_str(), name.c_str()); LOG(INFO) << "Target checkpoint path : " << checkpointPath; - if (fs::FileUtils::exist(checkpointPath)) { - LOG(ERROR) << "The snapshot file already exists: " << checkpointPath; - return ResultCode::ERR_CHECKPOINT_ERROR; + if (fs::FileUtils::exist(checkpointPath) && + !fs::FileUtils::remove(checkpointPath.data(), true)) { + LOG(ERROR) << "Remove exist dir failed of checkpoint : " << checkpointPath; + return ResultCode::ERR_IO_ERROR; } auto parent = checkpointPath.substr(0, checkpointPath.rfind('/')); @@ -474,7 +475,6 @@ ResultCode RocksEngine::createCheckpoint(const std::string& name) { LOG(ERROR) << "Create checkpoint Failed: " << status.ToString(); return ResultCode::ERR_CHECKPOINT_ERROR; } - LOG(INFO) << "End checkpoint : " << dataPath_; return ResultCode::SUCCEEDED; } diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index aef427893f4..701d3c0b06c 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -554,10 +554,12 @@ folly::Future RaftPart::appendLogAsync(ClusterID source, LogType logType, std::string log, AtomicOp op) { - if (blocking_ && (logType == LogType::NORMAL || logType == LogType::ATOMIC_OP)) { - return AppendLogResult::E_WRITE_BLOCKING; + if (blocking_) { + // No need to block heartbeats and empty log. + if ((logType == LogType::NORMAL && !log.empty()) || logType == LogType::ATOMIC_OP) { + return AppendLogResult::E_WRITE_BLOCKING; + } } - LogCache swappedOutLogs; auto retFuture = folly::Future::makeEmpty(); diff --git a/src/kvstore/wal/FileBasedWal.cpp b/src/kvstore/wal/FileBasedWal.cpp index 72c5896ca00..71bc29da80c 100644 --- a/src/kvstore/wal/FileBasedWal.cpp +++ b/src/kvstore/wal/FileBasedWal.cpp @@ -620,6 +620,13 @@ bool FileBasedWal::linkCurrentWAL(const char* newPath) { LOG(INFO) << idStr_ << "No wal files found, skip link"; return true; } + + if (fs::FileUtils::exist(newPath) && + !fs::FileUtils::remove(newPath, true)) { + LOG(ERROR) << "Remove exist dir failed of wal : " << newPath; + return false; + } + if (!fs::FileUtils::makeDir(newPath)) { LOG(INFO) << idStr_ << "Link file parent dir make failed : " << newPath; return false; diff --git a/src/meta/processors/admin/AdminClient.cpp b/src/meta/processors/admin/AdminClient.cpp index d9a0c56bbd7..9d3ddbed1e3 100644 --- a/src/meta/processors/admin/AdminClient.cpp +++ b/src/meta/processors/admin/AdminClient.cpp @@ -484,48 +484,6 @@ void AdminClient::getResponse( }); // via } - -template -folly::Future AdminClient::getResponseNoPart(const HostAddr& host, - Request req, - RemoteFunc remoteFunc, - RespGenerator respGen) { - folly::Promise pro; - auto f = pro.getFuture(); - auto* evb = ioThreadPool_->getEventBase(); - folly::via(evb, [evb, pro = std::move(pro), host, req = std::move(req), - remoteFunc = std::move(remoteFunc), respGen = std::move(respGen), - this] () mutable { - auto client = clientsMan_->client(host, evb); - remoteFunc(client, std::move(req)).via(evb) - .then([p = std::move(pro), respGen = std::move(respGen), host]( - folly::Try&& t) mutable { - // exception occurred during RPC - auto hostStr = network::NetworkUtils::intToIPv4(host.first); - if (t.hasException()) { - p.setValue(Status::Error(folly::stringPrintf( - "[%s:%d] RPC failure in AdminClient: %s", - hostStr.c_str(), - host.second, - t.exception().what().c_str()))); - return; - } - auto&& result = std::move(t).value().get_result(); - if (result.get_failed_codes().empty()) { - storage::cpp2::ResultCode resultCode; - resultCode.set_code(storage::cpp2::ErrorCode::SUCCEEDED); - p.setValue(respGen(resultCode)); - } else { - auto resp = result.get_failed_codes().front(); - p.setValue(respGen(std::move(resp))); - } - }); - }); - return f; -} - nebula::cpp2::HostAddr AdminClient::toThriftHost(const HostAddr& addr) { nebula::cpp2::HostAddr thriftAddr; thriftAddr.set_ip(addr.first); @@ -639,16 +597,13 @@ folly::Future AdminClient::createSnapshot(GraphSpaceID spaceId, req.set_space_id(spaceId); req.set_name(name); - return getResponseNoPart(host, std::move(req), [] (auto client, auto request) { + folly::Promise pro; + auto f = pro.getFuture(); + + getResponse({host}, 0, std::move(req), [] (auto client, auto request) { return client->future_createCheckpoint(request); - }, [] (auto&& resp) -> Status { - if (resp.get_code() == storage::cpp2::ErrorCode::SUCCEEDED) { - return Status::OK(); - } else { - return Status::Error("Create snapshot failed! code=%d", - static_cast(resp.get_code())); - } - }); + }, 0, std::move(pro), 3 /*The snapshot operation need to retry 3 times*/); + return f; } folly::Future AdminClient::dropSnapshot(GraphSpaceID spaceId, @@ -666,7 +621,7 @@ folly::Future AdminClient::dropSnapshot(GraphSpaceID spaceId, auto f = pro.getFuture(); getResponse({host}, 0, std::move(req), [] (auto client, auto request) { return client->future_dropCheckpoint(request); - }, 0, std::move(pro), 1 /*The snapshot operation only needs to be retried twice*/); + }, 0, std::move(pro), 3 /*The snapshot operation need to retry 3 times*/); return f; } @@ -681,7 +636,7 @@ folly::Future AdminClient::blockingWrites(GraphSpaceID spaceId, auto f = pro.getFuture(); getResponse({host}, 0, std::move(req), [] (auto client, auto request) { return client->future_blockingWrites(request); - }, 0, std::move(pro), 1 /*The blocking needs to be retried twice*/); + }, 0, std::move(pro), 32 /*The blocking need to retry 32 times*/); return f; } diff --git a/src/meta/processors/admin/AdminClient.h b/src/meta/processors/admin/AdminClient.h index dbe847bafd0..9ff5e387798 100644 --- a/src/meta/processors/admin/AdminClient.h +++ b/src/meta/processors/admin/AdminClient.h @@ -137,13 +137,6 @@ class AdminClient { Request req, RemoteFunc remoteFunc, RespGenerator respGen); - template - folly::Future getResponseNoPart(const HostAddr& host, - Request req, - RemoteFunc remoteFunc, - RespGenerator respGen); template void getResponse(std::vector hosts, diff --git a/src/meta/processors/admin/CreateSnapshotProcessor.cpp b/src/meta/processors/admin/CreateSnapshotProcessor.cpp index 1093057db56..dd8b10222a7 100644 --- a/src/meta/processors/admin/CreateSnapshotProcessor.cpp +++ b/src/meta/processors/admin/CreateSnapshotProcessor.cpp @@ -13,7 +13,6 @@ namespace nebula { namespace meta { void CreateSnapshotProcessor::process(const cpp2::CreateSnapshotReq&) { - folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); // check the index rebuild. not allowed to create snapshot when index rebuilding. auto prefix = MetaServiceUtils::rebuildIndexStatusPrefix(); std::unique_ptr iter; diff --git a/src/meta/processors/admin/SnapShot.cpp b/src/meta/processors/admin/SnapShot.cpp index 0afdaaeff67..d48ecf0af97 100644 --- a/src/meta/processors/admin/SnapShot.cpp +++ b/src/meta/processors/admin/SnapShot.cpp @@ -21,16 +21,10 @@ cpp2::ErrorCode Snapshot::createSnapshot(const std::string& name) { auto spacesHosts = retSpacesHosts.value(); for (const auto& spaceHosts : spacesHosts) { for (const auto& host : spaceHosts.second) { - LOG(INFO) << "Begin create snapshot on " - << network::NetworkUtils::toHosts({host}).c_str() - << ", Space " << spaceHosts.first; auto status = client_->createSnapshot(spaceHosts.first, name, host).get(); if (!status.ok()) { return cpp2::ErrorCode::E_RPC_FAILURE; } - LOG(INFO) << "End create snapshot on " - << network::NetworkUtils::toHosts({host}).c_str() - << ", Space " << spaceHosts.first; } } return cpp2::ErrorCode::SUCCEEDED; @@ -46,9 +40,6 @@ cpp2::ErrorCode Snapshot::dropSnapshot(const std::string& name, for (const auto& spaceHosts : spacesHosts) { for (const auto& host : spaceHosts.second) { if (std::find(hosts.begin(), hosts.end(), host) != hosts.end()) { - LOG(INFO) << "Begin drop snapshot on " - << network::NetworkUtils::toHosts({host}).c_str() - << ", Space " << spaceHosts.first; auto status = client_->dropSnapshot(spaceHosts.first, name, host).get(); if (!status.ok()) { auto msg = "failed drop checkpoint : \"%s\". on host %s. error %s"; @@ -58,9 +49,6 @@ cpp2::ErrorCode Snapshot::dropSnapshot(const std::string& name, status.toString().c_str()); LOG(ERROR) << error; } - LOG(INFO) << "End drop snapshot on " - << network::NetworkUtils::toHosts({host}).c_str() - << ", Space " << spaceHosts.first; } } } @@ -75,17 +63,11 @@ cpp2::ErrorCode Snapshot::blockingWrites(storage::cpp2::EngineSignType sign) { auto spacesHosts = retSpacesHosts.value(); for (const auto& spaceHosts : spacesHosts) { for (const auto& host : spaceHosts.second) { - LOG(INFO) << "Begin blocking write on " - << network::NetworkUtils::toHosts({host}).c_str() - << ", Space " << spaceHosts.first; - auto status = client_->blockingWrites(spaceHosts.first, sign, host).get(); - if (!status.ok()) { - LOG(ERROR) << " Send blocking sign error on host : " - << network::NetworkUtils::toHosts({host}); - } - LOG(INFO) << "End blocking write on " - << network::NetworkUtils::toHosts({host}).c_str() - << ", Space " << spaceHosts.first; + auto status = client_->blockingWrites(spaceHosts.first, sign, host).get(); + if (!status.ok()) { + LOG(ERROR) << " Send blocking sign error on host : " + << network::NetworkUtils::toHosts({host}); + } } } return cpp2::ErrorCode::SUCCEEDED; diff --git a/src/storage/admin/CreateCheckpointProcessor.cpp b/src/storage/admin/CreateCheckpointProcessor.cpp index a9314f8e398..6dadc431333 100644 --- a/src/storage/admin/CreateCheckpointProcessor.cpp +++ b/src/storage/admin/CreateCheckpointProcessor.cpp @@ -11,7 +11,6 @@ namespace storage { void CreateCheckpointProcessor::process(const cpp2::CreateCPRequest& req) { CHECK_NOTNULL(kvstore_); - LOG(INFO) << "Begin create checkpoint for space " << req.get_space_id(); auto spaceId = req.get_space_id(); auto& name = req.get_name(); auto retCode = kvstore_->createCheckpoint(spaceId, std::move(name)); diff --git a/src/storage/admin/DropCheckpointProcessor.cpp b/src/storage/admin/DropCheckpointProcessor.cpp index d096a2ca031..54633c38739 100644 --- a/src/storage/admin/DropCheckpointProcessor.cpp +++ b/src/storage/admin/DropCheckpointProcessor.cpp @@ -11,7 +11,6 @@ namespace storage { void DropCheckpointProcessor::process(const cpp2::DropCPRequest& req) { CHECK_NOTNULL(kvstore_); - LOG(INFO) << "Begin drop checkpoint for space " << req.get_space_id(); auto spaceId = req.get_space_id(); auto& name = req.get_name(); auto retCode = kvstore_->dropCheckpoint(spaceId, std::move(name));