Skip to content

Commit

Permalink
fix review
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaohaifei committed Mar 9, 2022
1 parent 0dd954e commit 60bede3
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 51 deletions.
4 changes: 2 additions & 2 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -428,11 +428,11 @@ void NebulaStore::removeSpace(GraphSpaceID spaceId, bool isListener) {
}

nebula::cpp2::ErrorCode NebulaStore::clearSpace(GraphSpaceID spaceId) {
folly::RWSpinLock::WriteHolder wh(&lock_);
folly::RWSpinLock::ReadHolder rh(&lock_);
auto spaceIt = this->spaces_.find(spaceId);
if (spaceIt != this->spaces_.end()) {
for (auto& part : spaceIt->second->parts_) {
auto ret = part.second->cleanup();
auto ret = part.second->cleanupSafely();
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "partition clear failed. space: " << spaceId << ", part: " << part.first;
return ret;
Expand Down
14 changes: 12 additions & 2 deletions src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,14 @@ class Part : public raftex::RaftPart {
}

/**
* @brief clean up data in listener, called in RaftPart::reset
* @brief clean up data safely
*
* @return nebula::cpp2::ErrorCode
*/
nebula::cpp2::ErrorCode cleanup() override;
nebula::cpp2::ErrorCode cleanupSafely() {
std::lock_guard<std::mutex> g(raftLock_);
return cleanup();
}

private:
/**
Expand Down Expand Up @@ -297,6 +300,13 @@ class Part : public raftex::RaftPart {
LogID committedLogId,
TermID committedLogTerm);

/**
* @brief clean up data in listener, called in RaftPart::reset
*
* @return nebula::cpp2::ErrorCode
*/
nebula::cpp2::ErrorCode cleanup() override;

public:
struct CallbackOptions {
GraphSpaceID spaceId;
Expand Down
98 changes: 51 additions & 47 deletions src/meta/processors/admin/ClearSpaceProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,66 +9,70 @@ namespace nebula {
namespace meta {

void ClearSpaceProcessor::process(const cpp2::ClearSpaceReq& req) {
folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
GraphSpaceID spaceId;
std::vector<HostAddr> hosts;
{
folly::SharedMutex::ReadHolder holder(LockUtils::lock());

// 1. Fetch spaceId
const auto& spaceName = req.get_space_name();
auto spaceRet = getSpaceId(spaceName);
if (!nebula::ok(spaceRet)) {
auto retCode = nebula::error(spaceRet);
if (retCode == nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND) {
if (req.get_if_exists()) {
retCode = nebula::cpp2::ErrorCode::SUCCEEDED;
// 1. Fetch spaceId
const auto& spaceName = req.get_space_name();
auto spaceRet = getSpaceId(spaceName);
if (!nebula::ok(spaceRet)) {
auto retCode = nebula::error(spaceRet);
if (retCode == nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND) {
if (req.get_if_exists()) {
retCode = nebula::cpp2::ErrorCode::SUCCEEDED;
} else {
LOG(WARNING) << "Clear space Failed, space " << spaceName << " not existed.";
}
} else {
LOG(WARNING) << "Clear space Failed, space " << spaceName << " not existed.";
LOG(ERROR) << "Clear space Failed, space " << spaceName
<< " error: " << apache::thrift::util::enumNameSafe(retCode);
}
} else {
LOG(ERROR) << "Clear space Failed, space " << spaceName
<< " error: " << apache::thrift::util::enumNameSafe(retCode);
handleErrorCode(retCode);
onFinished();
return;
}
handleErrorCode(retCode);
onFinished();
return;
}
auto spaceId = nebula::value(spaceRet);
spaceId = nebula::value(spaceRet);

// 2. Fetch all parts info accroding the spaceId.
auto ret = getAllParts(spaceId);
if (!nebula::ok(ret)) {
handleErrorCode(nebula::error(ret));
onFinished();
return;
}
// 2. Fetch all parts info accroding the spaceId.
auto ret = getAllParts(spaceId);
if (!nebula::ok(ret)) {
handleErrorCode(nebula::error(ret));
onFinished();
return;
}

// 3. Determine which hosts the space is distributed on.
std::vector<HostAddr> distributedOnHosts;
for (auto& partEntry : nebula::value(ret)) {
for (auto& host : partEntry.second) {
if (std::find(distributedOnHosts.begin(), distributedOnHosts.end(), host) ==
distributedOnHosts.end()) {
distributedOnHosts.push_back(host);
// 3. Determine which hosts the space is distributed on.
std::unordered_set<HostAddr> distributedOnHosts;
for (auto& partEntry : nebula::value(ret)) {
for (auto& host : partEntry.second) {
distributedOnHosts.insert(host);
}
}
}

// 4. select the active hosts.
std::vector<HostAddr> selectedHosts;
auto activeHostsRet = ActiveHostsMan::getActiveHosts(kvstore_);
if (!nebula::ok(activeHostsRet)) {
handleErrorCode(nebula::error(activeHostsRet));
onFinished();
return;
}
auto activeHosts = std::move(nebula::value(activeHostsRet));
for (auto& host : distributedOnHosts) {
if (std::find(activeHosts.begin(), activeHosts.end(), host) != activeHosts.end()) {
selectedHosts.push_back(host);
// 4. select the active hosts.
auto activeHostsRet = ActiveHostsMan::getActiveHosts(kvstore_);
if (!nebula::ok(activeHostsRet)) {
handleErrorCode(nebula::error(activeHostsRet));
onFinished();
return;
}
auto activeHosts = std::move(nebula::value(activeHostsRet));
for (auto& host : distributedOnHosts) {
if (std::find(activeHosts.begin(), activeHosts.end(), host) != activeHosts.end()) {
hosts.push_back(host);
}
}
if (hosts.size() == 0) {
handleErrorCode(nebula::cpp2::ErrorCode::E_NO_HOSTS);
onFinished();
return;
}
}

// 5. Delete the space data on the corresponding hosts.
auto clearRet = adminClient_->clearSpace(spaceId, selectedHosts).get();
auto clearRet = adminClient_->clearSpace(spaceId, hosts).get();
handleErrorCode(clearRet);
onFinished();
return;
Expand Down

0 comments on commit 60bede3

Please sign in to comment.