Skip to content

Commit

Permalink
Improved snapshot creation logic. (vesoft-inc#2318)
Browse files Browse the repository at this point in the history
  • Loading branch information
bright-starry-sky authored and xuguruogu committed Sep 7, 2020
1 parent f7bdb66 commit 494f507
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 31 deletions.
32 changes: 15 additions & 17 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -794,23 +794,21 @@ ResultCode NebulaStore::setWriteBlocking(GraphSpaceID spaceId, bool sign) {
return error(partRet);
}
auto p = nebula::value(partRet);
if (p->isLeader()) {
auto ret = ResultCode::SUCCEEDED;
p->setBlocking(sign);
if (sign) {
folly::Baton<true, std::atomic> baton;
p->sync([&ret, &baton] (kvstore::ResultCode code) {
if (kvstore::ResultCode::SUCCEEDED != code) {
ret = code;
}
baton.post();
});
baton.wait();
}
if (ret != ResultCode::SUCCEEDED) {
LOG(ERROR) << "Part sync failed. space : " << spaceId << " Part : " << part;
return ret;
}
auto ret = ResultCode::SUCCEEDED;
p->setBlocking(sign);
if (sign) {
folly::Baton<true, std::atomic> baton;
p->sync([&ret, &baton] (kvstore::ResultCode code) {
if (kvstore::ResultCode::SUCCEEDED != code) {
ret = code;
}
baton.post();
});
baton.wait();
}
if (ret != ResultCode::SUCCEEDED) {
LOG(ERROR) << "Part sync failed. space : " << spaceId << " Part : " << part;
return ret;
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/kvstore/RocksEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,9 +448,10 @@ ResultCode RocksEngine::createCheckpoint(const std::string& name) {
auto checkpointPath = folly::stringPrintf("%s/checkpoints/%s/data",
dataPath_.c_str(), name.c_str());
LOG(INFO) << "Target checkpoint path : " << checkpointPath;
if (fs::FileUtils::exist(checkpointPath)) {
LOG(ERROR) << "The snapshot file already exists: " << checkpointPath;
return ResultCode::ERR_CHECKPOINT_ERROR;
if (fs::FileUtils::exist(checkpointPath) &&
!fs::FileUtils::remove(checkpointPath.data(), true)) {
LOG(ERROR) << "Remove exist dir failed of checkpoint : " << checkpointPath;
return ResultCode::ERR_IO_ERROR;
}

auto parent = checkpointPath.substr(0, checkpointPath.rfind('/'));
Expand Down
8 changes: 5 additions & 3 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -554,10 +554,12 @@ folly::Future<AppendLogResult> RaftPart::appendLogAsync(ClusterID source,
LogType logType,
std::string log,
AtomicOp op) {
if (blocking_ && (logType == LogType::NORMAL || logType == LogType::ATOMIC_OP)) {
return AppendLogResult::E_WRITE_BLOCKING;
if (blocking_) {
// No need to block heartbeats and empty log.
if ((logType == LogType::NORMAL && !log.empty()) || logType == LogType::ATOMIC_OP) {
return AppendLogResult::E_WRITE_BLOCKING;
}
}

LogCache swappedOutLogs;
auto retFuture = folly::Future<AppendLogResult>::makeEmpty();

Expand Down
7 changes: 7 additions & 0 deletions src/kvstore/wal/FileBasedWal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,13 @@ bool FileBasedWal::linkCurrentWAL(const char* newPath) {
LOG(INFO) << idStr_ << "No wal files found, skip link";
return true;
}

if (fs::FileUtils::exist(newPath) &&
!fs::FileUtils::remove(newPath, true)) {
LOG(ERROR) << "Remove exist dir failed of wal : " << newPath;
return false;
}

if (!fs::FileUtils::makeDir(newPath)) {
LOG(INFO) << idStr_ << "Link file parent dir make failed : " << newPath;
return false;
Expand Down
10 changes: 3 additions & 7 deletions src/meta/processors/admin/AdminClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -600,13 +600,9 @@ folly::Future<Status> AdminClient::createSnapshot(GraphSpaceID spaceId,
folly::Promise<Status> pro;
auto f = pro.getFuture();

/**
* Don't need retry.
* Because existing checkpoint directories leads to fail again.
**/
getResponse({host}, 0, std::move(req), [] (auto client, auto request) {
return client->future_createCheckpoint(request);
}, 0, std::move(pro), 0);
}, 0, std::move(pro), 3 /*The snapshot operation need to retry 3 times*/);
return f;
}

Expand All @@ -625,7 +621,7 @@ folly::Future<Status> AdminClient::dropSnapshot(GraphSpaceID spaceId,
auto f = pro.getFuture();
getResponse({host}, 0, std::move(req), [] (auto client, auto request) {
return client->future_dropCheckpoint(request);
}, 0, std::move(pro), 1 /*The snapshot operation only needs to be retried twice*/);
}, 0, std::move(pro), 3 /*The snapshot operation need to retry 3 times*/);
return f;
}

Expand All @@ -640,7 +636,7 @@ folly::Future<Status> AdminClient::blockingWrites(GraphSpaceID spaceId,
auto f = pro.getFuture();
getResponse({host}, 0, std::move(req), [] (auto client, auto request) {
return client->future_blockingWrites(request);
}, 0, std::move(pro), 1 /*The blocking needs to be retried twice*/);
}, 0, std::move(pro), 32 /*The blocking need to retry 32 times*/);
return f;
}

Expand Down
1 change: 0 additions & 1 deletion src/meta/processors/admin/CreateSnapshotProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ namespace nebula {
namespace meta {

void CreateSnapshotProcessor::process(const cpp2::CreateSnapshotReq&) {
folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock());
// check the index rebuild. not allowed to create snapshot when index rebuilding.
auto prefix = MetaServiceUtils::rebuildIndexStatusPrefix();
std::unique_ptr<kvstore::KVIterator> iter;
Expand Down

0 comments on commit 494f507

Please sign in to comment.