Skip to content

Commit

Permalink
fix memory leak in some case
Browse files Browse the repository at this point in the history
  • Loading branch information
critical27 committed Sep 7, 2021
1 parent d469631 commit 664c199
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 84 deletions.
16 changes: 8 additions & 8 deletions src/kvstore/NebulaSnapshotManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ const int32_t kReserveNum = 1024 * 4;
NebulaSnapshotManager::NebulaSnapshotManager(NebulaStore* kv) : store_(kv) {
// Snapshot rate is limited to FLAGS_snapshot_worker_threads * FLAGS_snapshot_part_rate_limit.
// So by default, the total send rate is limited to 4 * 2Mb = 8Mb.
rateLimiter_.reset(
new RateLimiter(FLAGS_snapshot_part_rate_limit, FLAGS_snapshot_part_rate_limit));
LOG(INFO) << "Send snapshot is rate limited to " << FLAGS_snapshot_part_rate_limit
<< " for each part";
}

void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId,
PartitionID partId,
raftex::SnapshotCallback cb) {
auto rateLimiter = std::make_unique<kvstore::RateLimiter>(FLAGS_snapshot_part_rate_limit,
FLAGS_snapshot_part_rate_limit);
CHECK_NOTNULL(store_);
auto tables = NebulaKeyUtils::snapshotPrefix(partId);
std::vector<std::string> data;
Expand All @@ -41,12 +43,9 @@ void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId,
partId,
FLAGS_snapshot_part_rate_limit,
FLAGS_snapshot_batch_size);
// raft will make sure that there won't be concurrent accessAllRowsInSnapshot of a given partition
rateLimiter_->add(spaceId, partId);
SCOPE_EXIT { rateLimiter_->remove(spaceId, partId); };

for (const auto& prefix : tables) {
if (!accessTable(spaceId, partId, prefix, cb, data, totalCount, totalSize)) {
if (!accessTable(spaceId, partId, prefix, cb, data, totalCount, totalSize, rateLimiter.get())) {
return;
}
}
Expand All @@ -61,7 +60,8 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId,
raftex::SnapshotCallback& cb,
std::vector<std::string>& data,
int64_t& totalCount,
int64_t& totalSize) {
int64_t& totalSize,
kvstore::RateLimiter* rateLimiter) {
std::unique_ptr<KVIterator> iter;
auto ret = store_->prefix(spaceId, partId, prefix, &iter);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand All @@ -74,7 +74,7 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId,
size_t batchSize = 0;
while (iter && iter->valid()) {
if (batchSize >= FLAGS_snapshot_batch_size) {
rateLimiter_->consume(spaceId, partId, batchSize);
rateLimiter->consume(batchSize);
if (cb(data, totalCount, totalSize, raftex::SnapshotStatus::IN_PROGRESS)) {
data.clear();
batchSize = 0;
Expand Down
4 changes: 2 additions & 2 deletions src/kvstore/NebulaSnapshotManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ class NebulaSnapshotManager : public raftex::SnapshotManager {
raftex::SnapshotCallback& cb,
std::vector<std::string>& data,
int64_t& totalCount,
int64_t& totalSize);
int64_t& totalSize,
kvstore::RateLimiter* rateLimiter);

std::unique_ptr<RateLimiter> rateLimiter_;
NebulaStore* store_;
};

Expand Down
43 changes: 11 additions & 32 deletions src/kvstore/RateLimiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,50 +22,29 @@ namespace kvstore {
class RateLimiter {
public:
RateLimiter(int32_t rate, int32_t burstSize)
: rate_(static_cast<double>(rate)), burstSize_(static_cast<double>(burstSize)) {}

void add(GraphSpaceID spaceId, PartitionID partId) {
std::lock_guard<std::mutex> guard(lock_);
DCHECK(buckets_.find({spaceId, partId}) == buckets_.end());
: rate_(static_cast<double>(rate)), burstSize_(static_cast<double>(burstSize)) {
// token will be available after 1 second, to prevent speed spike at the beginning
auto now = time::WallClock::fastNowInSec();
int64_t waitInSec = FLAGS_skip_wait_in_rate_limiter ? 0 : 1;
folly::TokenBucket bucket(rate_, burstSize_, static_cast<double>(now + waitInSec));
buckets_.emplace(std::make_pair(spaceId, partId), std::move(bucket));
}

void remove(GraphSpaceID spaceId, PartitionID partId) {
std::lock_guard<std::mutex> guard(lock_);
DCHECK(buckets_.find({spaceId, partId}) != buckets_.end());
buckets_.erase({spaceId, partId});
bucket_.reset(new folly::TokenBucket(rate_, burstSize_, static_cast<double>(now + waitInSec)));
}

// Caller must make sure the **the parition has been add, and won't be removed during consume.**
// Snaphot and rebuild index follow this principle by design.
void consume(GraphSpaceID spaceId, PartitionID partId, size_t toConsume) {
// todo(doodle): enable this DCHECK later
// DCHECK(buckets_.find({spaceId, partId}) != buckets_.end());
auto iter = buckets_.find({spaceId, partId});
if (iter != buckets_.end()) {
if (toConsume > burstSize_) {
// consumeWithBorrowAndWait do nothing when toConsume > burstSize_, we sleep 1s instead
std::this_thread::sleep_for(std::chrono::seconds(1));
} else {
// If there are enouth tokens, consume and return immediately.
// If not, cosume anyway, but sleep enough time before return.
auto now = time::WallClock::fastNowInSec();
iter->second.consumeWithBorrowAndWait(static_cast<double>(toConsume),
static_cast<double>(now));
}
void consume(size_t toConsume) {
if (toConsume > burstSize_) {
// consumeWithBorrowAndWait do nothing when toConsume > burstSize_, we sleep 1s instead
std::this_thread::sleep_for(std::chrono::seconds(1));
} else {
LOG_EVERY_N(WARNING, 100) << folly::format(
"Rate limiter is not available for [{},{}]", spaceId, partId);
// If there are enouth tokens, consume and return immediately.
// If not, cosume anyway, but sleep enough time before return.
auto now = time::WallClock::fastNowInSec();
bucket_->consumeWithBorrowAndWait(static_cast<double>(toConsume), static_cast<double>(now));
}
}

private:
std::unordered_map<std::pair<GraphSpaceID, PartitionID>, folly::TokenBucket> buckets_;
std::mutex lock_;
std::unique_ptr<folly::TokenBucket> bucket_;
double rate_;
double burstSize_;
};
Expand Down
18 changes: 3 additions & 15 deletions src/kvstore/test/RateLimiterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,33 @@ namespace kvstore {

TEST(RateLimter, ConsumeLessEqualThanBurst) {
RateLimiter limiter(FLAGS_snapshot_part_rate_limit, FLAGS_snapshot_part_rate_limit);
GraphSpaceID spaceId = 1;
PartitionID partId = 1;
limiter.add(spaceId, partId);
auto now = time::WallClock::fastNowInSec();
int64_t count = 0;
while (count++ < 50) {
limiter.consume(spaceId, partId, FLAGS_snapshot_part_rate_limit / 10);
limiter.consume(FLAGS_snapshot_part_rate_limit / 10);
}
EXPECT_GE(time::WallClock::fastNowInSec() - now, 5);
limiter.remove(spaceId, partId);
}

TEST(RateLimter, ConsumeGreaterThanBurst) {
RateLimiter limiter(FLAGS_snapshot_part_rate_limit, FLAGS_snapshot_part_rate_limit / 10);
GraphSpaceID spaceId = 1;
PartitionID partId = 1;
limiter.add(spaceId, partId);
auto now = time::WallClock::fastNowInSec();
int64_t count = 0;
while (count++ < 5) {
// greater than burst size, will sleep 1 second instead
limiter.consume(spaceId, partId, FLAGS_snapshot_part_rate_limit);
limiter.consume(FLAGS_snapshot_part_rate_limit);
}
EXPECT_GE(time::WallClock::fastNowInSec() - now, 5);
limiter.remove(spaceId, partId);
}

TEST(RateLimter, RateLessThanBurst) {
RateLimiter limiter(FLAGS_snapshot_part_rate_limit, 2 * FLAGS_snapshot_part_rate_limit);
GraphSpaceID spaceId = 1;
PartitionID partId = 1;
limiter.add(spaceId, partId);
auto now = time::WallClock::fastNowInSec();
int64_t count = 0;
while (count++ < 5) {
limiter.consume(spaceId, partId, FLAGS_snapshot_part_rate_limit);
limiter.consume(FLAGS_snapshot_part_rate_limit);
}
EXPECT_GE(time::WallClock::fastNowInSec() - now, 5);
limiter.remove(spaceId, partId);
}

} // namespace kvstore
Expand Down
7 changes: 4 additions & 3 deletions src/storage/admin/RebuildEdgeIndexTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ StatusOr<std::shared_ptr<meta::cpp2::IndexItem>> RebuildEdgeIndexTask::getIndex(

nebula::cpp2::ErrorCode RebuildEdgeIndexTask::buildIndexGlobal(GraphSpaceID space,
PartitionID part,
const IndexItems& items) {
const IndexItems& items,
kvstore::RateLimiter* rateLimiter) {
if (canceled_) {
LOG(ERROR) << "Rebuild Edge Index is Canceled";
return nebula::cpp2::ErrorCode::SUCCEEDED;
Expand Down Expand Up @@ -66,7 +67,7 @@ nebula::cpp2::ErrorCode RebuildEdgeIndexTask::buildIndexGlobal(GraphSpaceID spac
}

if (batchSize >= FLAGS_rebuild_index_batch_size) {
auto result = writeData(space, part, data, batchSize);
auto result = writeData(space, part, data, batchSize, rateLimiter);
if (result != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Write Part " << part << " Index Failed";
return result;
Expand Down Expand Up @@ -160,7 +161,7 @@ nebula::cpp2::ErrorCode RebuildEdgeIndexTask::buildIndexGlobal(GraphSpaceID spac
iter->next();
}

auto result = writeData(space, part, std::move(data), batchSize);
auto result = writeData(space, part, std::move(data), batchSize, rateLimiter);
if (result != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Write Part " << part << " Index Failed";
return nebula::cpp2::ErrorCode::E_STORE_FAILURE;
Expand Down
3 changes: 2 additions & 1 deletion src/storage/admin/RebuildEdgeIndexTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ class RebuildEdgeIndexTask : public RebuildIndexTask {

nebula::cpp2::ErrorCode buildIndexGlobal(GraphSpaceID space,
PartitionID part,
const IndexItems& items) override;
const IndexItems& items,
kvstore::RateLimiter* rateLimiter) override;
};

} // namespace storage
Expand Down
28 changes: 14 additions & 14 deletions src/storage/admin/RebuildIndexTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ RebuildIndexTask::RebuildIndexTask(StorageEnv* env, TaskContext&& ctx)
// 10Mb, which is non-trival.
LOG(INFO) << "Rebuild index task is rate limited to " << FLAGS_rebuild_index_part_rate_limit
<< " for each subtask";
rateLimiter_.reset(new kvstore::RateLimiter(FLAGS_rebuild_index_part_rate_limit,
FLAGS_rebuild_index_part_rate_limit));
}

ErrorOr<nebula::cpp2::ErrorCode, std::vector<AdminSubTask>> RebuildIndexTask::genSubTasks() {
Expand Down Expand Up @@ -80,9 +78,9 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<AdminSubTask>> RebuildIndexTask::ge
nebula::cpp2::ErrorCode RebuildIndexTask::invoke(GraphSpaceID space,
PartitionID part,
const IndexItems& items) {
auto rateLimiter = std::make_unique<kvstore::RateLimiter>(FLAGS_rebuild_index_part_rate_limit,
FLAGS_rebuild_index_part_rate_limit);
// TaskMananger will make sure that there won't be cocurrent invoke of a given part
rateLimiter_->add(space, part);
SCOPE_EXIT { rateLimiter_->remove(space, part); };
auto result = removeLegacyLogs(space, part);
if (result != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Remove legacy logs at part: " << part << " failed";
Expand All @@ -97,7 +95,7 @@ nebula::cpp2::ErrorCode RebuildIndexTask::invoke(GraphSpaceID space,
env_->rebuildIndexGuard_->assign(std::make_tuple(space, part), IndexState::BUILDING);

LOG(INFO) << "Start building index";
result = buildIndexGlobal(space, part, items);
result = buildIndexGlobal(space, part, items, rateLimiter.get());
if (result != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Building index failed";
return nebula::cpp2::ErrorCode::E_REBUILD_INDEX_FAILED;
Expand All @@ -106,7 +104,7 @@ nebula::cpp2::ErrorCode RebuildIndexTask::invoke(GraphSpaceID space,
}

LOG(INFO) << folly::sformat("Processing operation logs, space={}, part={}", space, part);
result = buildIndexOnOperations(space, part);
result = buildIndexOnOperations(space, part, rateLimiter.get());
if (result != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << folly::sformat(
"Building index with operation logs failed, space={}, part={}", space, part);
Expand All @@ -118,8 +116,8 @@ nebula::cpp2::ErrorCode RebuildIndexTask::invoke(GraphSpaceID space,
return result;
}

nebula::cpp2::ErrorCode RebuildIndexTask::buildIndexOnOperations(GraphSpaceID space,
PartitionID part) {
nebula::cpp2::ErrorCode RebuildIndexTask::buildIndexOnOperations(
GraphSpaceID space, PartitionID part, kvstore::RateLimiter* rateLimiter) {
if (canceled_) {
LOG(INFO) << folly::sformat("Rebuild index canceled, space={}, part={}", space, part);
return nebula::cpp2::ErrorCode::SUCCEEDED;
Expand Down Expand Up @@ -154,7 +152,7 @@ nebula::cpp2::ErrorCode RebuildIndexTask::buildIndexOnOperations(GraphSpaceID sp

batchHolder->remove(opKey.str());
if (batchHolder->size() > FLAGS_rebuild_index_batch_size) {
auto ret = writeOperation(space, part, batchHolder.get());
auto ret = writeOperation(space, part, batchHolder.get(), rateLimiter);
if (nebula::cpp2::ErrorCode::SUCCEEDED != ret) {
LOG(ERROR) << "Write Operation Failed";
return ret;
Expand All @@ -163,7 +161,7 @@ nebula::cpp2::ErrorCode RebuildIndexTask::buildIndexOnOperations(GraphSpaceID sp
operationIter->next();
}

auto ret = writeOperation(space, part, batchHolder.get());
auto ret = writeOperation(space, part, batchHolder.get(), rateLimiter);
if (nebula::cpp2::ErrorCode::SUCCEEDED != ret) {
LOG(ERROR) << "Write Operation Failed";
return ret;
Expand Down Expand Up @@ -219,10 +217,11 @@ nebula::cpp2::ErrorCode RebuildIndexTask::removeLegacyLogs(GraphSpaceID space, P
nebula::cpp2::ErrorCode RebuildIndexTask::writeData(GraphSpaceID space,
PartitionID part,
std::vector<kvstore::KV> data,
size_t batchSize) {
size_t batchSize,
kvstore::RateLimiter* rateLimiter) {
folly::Baton<true, std::atomic> baton;
auto result = nebula::cpp2::ErrorCode::SUCCEEDED;
rateLimiter_->consume(space, part, batchSize);
rateLimiter->consume(batchSize);
env_->kvstore_->asyncMultiPut(
space, part, std::move(data), [&result, &baton](nebula::cpp2::ErrorCode code) {
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand All @@ -236,11 +235,12 @@ nebula::cpp2::ErrorCode RebuildIndexTask::writeData(GraphSpaceID space,

nebula::cpp2::ErrorCode RebuildIndexTask::writeOperation(GraphSpaceID space,
PartitionID part,
kvstore::BatchHolder* batchHolder) {
kvstore::BatchHolder* batchHolder,
kvstore::RateLimiter* rateLimiter) {
folly::Baton<true, std::atomic> baton;
auto result = nebula::cpp2::ErrorCode::SUCCEEDED;
auto encoded = encodeBatchValue(batchHolder->getBatch());
rateLimiter_->consume(space, part, batchHolder->size());
rateLimiter->consume(batchHolder->size());
env_->kvstore_->asyncAppendBatch(
space, part, std::move(encoded), [&result, &baton](nebula::cpp2::ErrorCode code) {
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand Down
14 changes: 9 additions & 5 deletions src/storage/admin/RebuildIndexTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,30 +34,34 @@ class RebuildIndexTask : public AdminTask {

virtual nebula::cpp2::ErrorCode buildIndexGlobal(GraphSpaceID space,
PartitionID part,
const IndexItems& items) = 0;
const IndexItems& items,
kvstore::RateLimiter* rateLimiter) = 0;

void cancel() override { canceled_ = true; }

nebula::cpp2::ErrorCode buildIndexOnOperations(GraphSpaceID space, PartitionID part);
nebula::cpp2::ErrorCode buildIndexOnOperations(GraphSpaceID space,
PartitionID part,
kvstore::RateLimiter* rateLimiter);

// Remove the legacy operation log to make sure the index is correct.
nebula::cpp2::ErrorCode removeLegacyLogs(GraphSpaceID space, PartitionID part);

nebula::cpp2::ErrorCode writeData(GraphSpaceID space,
PartitionID part,
std::vector<kvstore::KV> data,
size_t batchSize);
size_t batchSize,
kvstore::RateLimiter* rateLimiter);

nebula::cpp2::ErrorCode writeOperation(GraphSpaceID space,
PartitionID part,
kvstore::BatchHolder* batchHolder);
kvstore::BatchHolder* batchHolder,
kvstore::RateLimiter* rateLimiter);

nebula::cpp2::ErrorCode invoke(GraphSpaceID space, PartitionID part, const IndexItems& items);

protected:
std::atomic<bool> canceled_{false};
GraphSpaceID space_;
std::unique_ptr<kvstore::RateLimiter> rateLimiter_;
};

} // namespace storage
Expand Down
7 changes: 4 additions & 3 deletions src/storage/admin/RebuildTagIndexTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ StatusOr<std::shared_ptr<meta::cpp2::IndexItem>> RebuildTagIndexTask::getIndex(G

nebula::cpp2::ErrorCode RebuildTagIndexTask::buildIndexGlobal(GraphSpaceID space,
PartitionID part,
const IndexItems& items) {
const IndexItems& items,
kvstore::RateLimiter* rateLimiter) {
if (canceled_) {
LOG(ERROR) << "Rebuild Tag Index is Canceled";
return nebula::cpp2::ErrorCode::SUCCEEDED;
Expand Down Expand Up @@ -64,7 +65,7 @@ nebula::cpp2::ErrorCode RebuildTagIndexTask::buildIndexGlobal(GraphSpaceID space
}

if (batchSize >= FLAGS_rebuild_index_batch_size) {
auto result = writeData(space, part, data, batchSize);
auto result = writeData(space, part, data, batchSize, rateLimiter);
if (result != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Write Part " << part << " Index Failed";
return result;
Expand Down Expand Up @@ -141,7 +142,7 @@ nebula::cpp2::ErrorCode RebuildTagIndexTask::buildIndexGlobal(GraphSpaceID space
iter->next();
}

auto result = writeData(space, part, std::move(data), batchSize);
auto result = writeData(space, part, std::move(data), batchSize, rateLimiter);
if (result != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Write Part " << part << " Index Failed";
return nebula::cpp2::ErrorCode::E_STORE_FAILURE;
Expand Down
3 changes: 2 additions & 1 deletion src/storage/admin/RebuildTagIndexTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ class RebuildTagIndexTask : public RebuildIndexTask {

nebula::cpp2::ErrorCode buildIndexGlobal(GraphSpaceID space,
PartitionID part,
const IndexItems& items) override;
const IndexItems& items,
kvstore::RateLimiter* rateLimiter) override;
};

} // namespace storage
Expand Down

0 comments on commit 664c199

Please sign in to comment.