Skip to content

Commit

Permalink
fix disk manager thread conflict (vesoft-inc#512)
Browse files Browse the repository at this point in the history
* fix disk manager thread conflict

* add default value of minimum_reserved_bytes

* fix gcc 7.5 build warning

* fix wal_path
  • Loading branch information
critical27 authored Jul 6, 2021
1 parent c363760 commit 3842980
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 6 deletions.
3 changes: 3 additions & 0 deletions conf/nebula-storaged.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@
# One path per Rocksdb instance.
--data_path=data/storage

# Minimum reserved bytes of each data path
--minimum_reserved_bytes=268435456

# The default reserved bytes for one batch operation
--rocksdb_batch_size=4096
# The default block cache size used in BlockBasedTable.
Expand Down
3 changes: 3 additions & 0 deletions conf/nebula-storaged.conf.production
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@
# One path per Rocksdb instance.
--data_path=data/storage

# Minimum reserved bytes of each data path
--minimum_reserved_bytes=268435456

# The default reserved bytes for one batch operation
--rocksdb_batch_size=4096
# The default block cache size used in BlockBasedTable. (MB)
Expand Down
6 changes: 4 additions & 2 deletions src/kvstore/DiskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ StatusOr<std::vector<std::string>> DiskManager::path(GraphSpaceID spaceId) {
return Status::Error("Space not found");
}
std::vector<std::string> paths;
for (const auto& [path, _] : spaceIt->second) {
paths.emplace_back(path);
for (const auto& partEntry : spaceIt->second) {
paths.emplace_back(partEntry.first);
}
return paths;
}
Expand All @@ -70,6 +70,7 @@ StatusOr<std::string> DiskManager::path(GraphSpaceID spaceId, PartitionID partId
void DiskManager::addPartToPath(GraphSpaceID spaceId,
PartitionID partId,
const std::string& path) {
std::lock_guard<std::mutex> lg(lock_);
try {
auto canonical = boost::filesystem::canonical(path);
auto dataPath = canonical.parent_path().parent_path();
Expand All @@ -85,6 +86,7 @@ void DiskManager::addPartToPath(GraphSpaceID spaceId,
void DiskManager::removePartFromPath(GraphSpaceID spaceId,
PartitionID partId,
const std::string& path) {
std::lock_guard<std::mutex> lg(lock_);
try {
auto canonical = boost::filesystem::canonical(path);
auto dataPath = canonical.parent_path().parent_path();
Expand Down
3 changes: 3 additions & 0 deletions src/kvstore/DiskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ class DiskManager {

// the index in dataPaths_ for a given space + part
std::unordered_map<GraphSpaceID, std::unordered_map<PartitionID, size_t>> partIndex_;

// lock used to protect partPath_ and partIndex_
std::mutex lock_;
};

} // namespace kvstore
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ std::shared_ptr<Part> NebulaStore::newPart(GraphSpaceID spaceId,
KVEngine* engine,
bool asLearner,
const std::vector<HostAddr>& defaultPeers) {
auto walPath = folly::stringPrintf("%s/wal/%d", engine->getDataRoot(), partId);
auto walPath = folly::stringPrintf("%s/wal/%d", engine->getWalRoot(), partId);
auto part = std::make_shared<Part>(spaceId,
partId,
raftAddr_,
Expand Down
8 changes: 5 additions & 3 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ void RaftPart::appendLogsInternal(AppendLogsIterator iter, TermID termId) {
} while (false);

if (!checkAppendLogResult(res)) {
LOG(ERROR) << idStr_ << "Failed append logs";
LOG_EVERY_N(WARNING, 100) << idStr_ << "Failed to write wal";
return;
}
// Step 2: Replicate to followers
Expand Down Expand Up @@ -805,7 +805,7 @@ void RaftPart::replicateLogs(folly::EventBase* eb,
} while (false);

if (!checkAppendLogResult(res)) {
LOG(ERROR) << idStr_ << "Replicate logs failed";
LOG(WARNING) << idStr_ << "replicateLogs failed because of not leader or term changed";
return;
}

Expand Down Expand Up @@ -911,7 +911,9 @@ void RaftPart::processAppendLogResponses(
} while (false);

if (!checkAppendLogResult(res)) {
LOG(ERROR) << idStr_ << "processAppendLogResponses failed!";
LOG(WARNING)
<< idStr_
<< "processAppendLogResponses failed because of not leader or term changed";
return;
}

Expand Down

0 comments on commit 3842980

Please sign in to comment.