Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

skip the wait in rate limiter in CI #2795

Merged
merged 3 commits into from
Sep 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/kvstore/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ nebula_add_library(
RocksEngineConfig.cpp
LogEncoder.cpp
NebulaSnapshotManager.cpp
RateLimiter.cpp
plugins/elasticsearch/ESListener.cpp
)

Expand Down
16 changes: 8 additions & 8 deletions src/kvstore/NebulaSnapshotManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ const int32_t kReserveNum = 1024 * 4;
NebulaSnapshotManager::NebulaSnapshotManager(NebulaStore* kv) : store_(kv) {
// Snapshot rate is limited to FLAGS_snapshot_worker_threads * FLAGS_snapshot_part_rate_limit.
// So by default, the total send rate is limited to 4 * 2Mb = 8Mb.
rateLimiter_.reset(
new RateLimiter(FLAGS_snapshot_part_rate_limit, FLAGS_snapshot_part_rate_limit));
LOG(INFO) << "Send snapshot is rate limited to " << FLAGS_snapshot_part_rate_limit
<< " for each part";
}

void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId,
PartitionID partId,
raftex::SnapshotCallback cb) {
auto rateLimiter = std::make_unique<kvstore::RateLimiter>(FLAGS_snapshot_part_rate_limit,
FLAGS_snapshot_part_rate_limit);
CHECK_NOTNULL(store_);
auto tables = NebulaKeyUtils::snapshotPrefix(partId);
std::vector<std::string> data;
Expand All @@ -41,12 +43,9 @@ void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId,
partId,
FLAGS_snapshot_part_rate_limit,
FLAGS_snapshot_batch_size);
// raft will make sure that there won't be concurrent accessAllRowsInSnapshot of a given partition
rateLimiter_->add(spaceId, partId);
SCOPE_EXIT { rateLimiter_->remove(spaceId, partId); };

for (const auto& prefix : tables) {
if (!accessTable(spaceId, partId, prefix, cb, data, totalCount, totalSize)) {
if (!accessTable(spaceId, partId, prefix, cb, data, totalCount, totalSize, rateLimiter.get())) {
return;
}
}
Expand All @@ -61,7 +60,8 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId,
raftex::SnapshotCallback& cb,
std::vector<std::string>& data,
int64_t& totalCount,
int64_t& totalSize) {
int64_t& totalSize,
kvstore::RateLimiter* rateLimiter) {
std::unique_ptr<KVIterator> iter;
auto ret = store_->prefix(spaceId, partId, prefix, &iter);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand All @@ -74,7 +74,7 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId,
size_t batchSize = 0;
while (iter && iter->valid()) {
if (batchSize >= FLAGS_snapshot_batch_size) {
rateLimiter_->consume(spaceId, partId, batchSize);
rateLimiter->consume(batchSize);
if (cb(data, totalCount, totalSize, raftex::SnapshotStatus::IN_PROGRESS)) {
data.clear();
batchSize = 0;
Expand Down
4 changes: 2 additions & 2 deletions src/kvstore/NebulaSnapshotManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ class NebulaSnapshotManager : public raftex::SnapshotManager {
raftex::SnapshotCallback& cb,
std::vector<std::string>& data,
int64_t& totalCount,
int64_t& totalSize);
int64_t& totalSize,
kvstore::RateLimiter* rateLimiter);

std::unique_ptr<RateLimiter> rateLimiter_;
NebulaStore* store_;
};

Expand Down
11 changes: 11 additions & 0 deletions src/kvstore/RateLimiter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "kvstore/RateLimiter.h"

DEFINE_bool(skip_wait_in_rate_limiter,
false,
"skip the waiting of first second in rate limiter in CI");
49 changes: 16 additions & 33 deletions src/kvstore/RateLimiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include "common/thrift/ThriftTypes.h"
#include "common/time/WallClock.h"

DECLARE_bool(skip_wait_in_rate_limiter);

namespace nebula {
namespace kvstore {

Expand All @@ -20,50 +22,31 @@ namespace kvstore {
class RateLimiter {
public:
RateLimiter(int32_t rate, int32_t burstSize)
: rate_(static_cast<double>(rate)), burstSize_(static_cast<double>(burstSize)) {}

void add(GraphSpaceID spaceId, PartitionID partId) {
std::lock_guard<std::mutex> guard(lock_);
DCHECK(buckets_.find({spaceId, partId}) == buckets_.end());
: rate_(static_cast<double>(rate)), burstSize_(static_cast<double>(burstSize)) {
// token will be available after 1 second, to prevent speed spike at the beginning
auto now = time::WallClock::fastNowInSec();
folly::TokenBucket bucket(rate_, burstSize_, static_cast<double>(now + 1));
buckets_.emplace(std::make_pair(spaceId, partId), std::move(bucket));
}

void remove(GraphSpaceID spaceId, PartitionID partId) {
std::lock_guard<std::mutex> guard(lock_);
DCHECK(buckets_.find({spaceId, partId}) != buckets_.end());
buckets_.erase({spaceId, partId});
int64_t waitInSec = FLAGS_skip_wait_in_rate_limiter ? 0 : 1;
bucket_.reset(new folly::TokenBucket(rate_, burstSize_, static_cast<double>(now + waitInSec)));
}

// Caller must make sure the **the parition has been add, and won't be removed during consume.**
// Snaphot and rebuild index follow this principle by design.
void consume(GraphSpaceID spaceId, PartitionID partId, size_t toConsume) {
DCHECK(buckets_.find({spaceId, partId}) != buckets_.end());
auto iter = buckets_.find({spaceId, partId});
if (iter != buckets_.end()) {
if (toConsume > burstSize_) {
// consumeWithBorrowAndWait do nothing when toConsume > burstSize_, we sleep 1s instead
std::this_thread::sleep_for(std::chrono::seconds(1));
} else {
// If there are enouth tokens, consume and return immediately.
// If not, cosume anyway, but sleep enough time before return.
auto now = time::WallClock::fastNowInSec();
iter->second.consumeWithBorrowAndWait(static_cast<double>(toConsume),
static_cast<double>(now));
}
void consume(size_t toConsume) {
if (toConsume > burstSize_) {
// consumeWithBorrowAndWait do nothing when toConsume > burstSize_, we sleep 1s instead
std::this_thread::sleep_for(std::chrono::seconds(1));
} else {
LOG_EVERY_N(WARNING, 100) << folly::format(
"Rate limiter is not available for [{},{}]", spaceId, partId);
// If there are enouth tokens, consume and return immediately.
// If not, cosume anyway, but sleep enough time before return.
auto now = time::WallClock::fastNowInSec();
bucket_->consumeWithBorrowAndWait(static_cast<double>(toConsume), static_cast<double>(now));
}
}

private:
std::unordered_map<std::pair<GraphSpaceID, PartitionID>, folly::TokenBucket> buckets_;
std::mutex lock_;
double rate_;
double burstSize_;
std::unique_ptr<folly::TokenBucket> bucket_;
double rate_{1 << 20};
double burstSize_{1 << 20};
};

} // namespace kvstore
Expand Down
18 changes: 3 additions & 15 deletions src/kvstore/test/RateLimiterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,33 @@ namespace kvstore {

TEST(RateLimter, ConsumeLessEqualThanBurst) {
RateLimiter limiter(FLAGS_snapshot_part_rate_limit, FLAGS_snapshot_part_rate_limit);
GraphSpaceID spaceId = 1;
PartitionID partId = 1;
limiter.add(spaceId, partId);
auto now = time::WallClock::fastNowInSec();
int64_t count = 0;
while (count++ < 50) {
limiter.consume(spaceId, partId, FLAGS_snapshot_part_rate_limit / 10);
limiter.consume(FLAGS_snapshot_part_rate_limit / 10);
}
EXPECT_GE(time::WallClock::fastNowInSec() - now, 5);
limiter.remove(spaceId, partId);
}

TEST(RateLimter, ConsumeGreaterThanBurst) {
RateLimiter limiter(FLAGS_snapshot_part_rate_limit, FLAGS_snapshot_part_rate_limit / 10);
GraphSpaceID spaceId = 1;
PartitionID partId = 1;
limiter.add(spaceId, partId);
auto now = time::WallClock::fastNowInSec();
int64_t count = 0;
while (count++ < 5) {
// greater than burst size, will sleep 1 second instead
limiter.consume(spaceId, partId, FLAGS_snapshot_part_rate_limit);
limiter.consume(FLAGS_snapshot_part_rate_limit);
}
EXPECT_GE(time::WallClock::fastNowInSec() - now, 5);
limiter.remove(spaceId, partId);
}

TEST(RateLimter, RateLessThanBurst) {
RateLimiter limiter(FLAGS_snapshot_part_rate_limit, 2 * FLAGS_snapshot_part_rate_limit);
GraphSpaceID spaceId = 1;
PartitionID partId = 1;
limiter.add(spaceId, partId);
auto now = time::WallClock::fastNowInSec();
int64_t count = 0;
while (count++ < 5) {
limiter.consume(spaceId, partId, FLAGS_snapshot_part_rate_limit);
limiter.consume(FLAGS_snapshot_part_rate_limit);
}
EXPECT_GE(time::WallClock::fastNowInSec() - now, 5);
limiter.remove(spaceId, partId);
}

} // namespace kvstore
Expand Down
7 changes: 4 additions & 3 deletions src/storage/admin/RebuildEdgeIndexTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ StatusOr<std::shared_ptr<meta::cpp2::IndexItem>> RebuildEdgeIndexTask::getIndex(

nebula::cpp2::ErrorCode RebuildEdgeIndexTask::buildIndexGlobal(GraphSpaceID space,
PartitionID part,
const IndexItems& items) {
const IndexItems& items,
kvstore::RateLimiter* rateLimiter) {
if (canceled_) {
LOG(ERROR) << "Rebuild Edge Index is Canceled";
return nebula::cpp2::ErrorCode::SUCCEEDED;
Expand Down Expand Up @@ -66,7 +67,7 @@ nebula::cpp2::ErrorCode RebuildEdgeIndexTask::buildIndexGlobal(GraphSpaceID spac
}

if (batchSize >= FLAGS_rebuild_index_batch_size) {
auto result = writeData(space, part, data, batchSize);
auto result = writeData(space, part, data, batchSize, rateLimiter);
if (result != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Write Part " << part << " Index Failed";
return result;
Expand Down Expand Up @@ -160,7 +161,7 @@ nebula::cpp2::ErrorCode RebuildEdgeIndexTask::buildIndexGlobal(GraphSpaceID spac
iter->next();
}

auto result = writeData(space, part, std::move(data), batchSize);
auto result = writeData(space, part, std::move(data), batchSize, rateLimiter);
if (result != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Write Part " << part << " Index Failed";
return nebula::cpp2::ErrorCode::E_STORE_FAILURE;
Expand Down
3 changes: 2 additions & 1 deletion src/storage/admin/RebuildEdgeIndexTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ class RebuildEdgeIndexTask : public RebuildIndexTask {

nebula::cpp2::ErrorCode buildIndexGlobal(GraphSpaceID space,
PartitionID part,
const IndexItems& items) override;
const IndexItems& items,
kvstore::RateLimiter* rateLimiter) override;
};

} // namespace storage
Expand Down
28 changes: 14 additions & 14 deletions src/storage/admin/RebuildIndexTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ RebuildIndexTask::RebuildIndexTask(StorageEnv* env, TaskContext&& ctx)
// 10Mb, which is non-trival.
LOG(INFO) << "Rebuild index task is rate limited to " << FLAGS_rebuild_index_part_rate_limit
<< " for each subtask";
rateLimiter_.reset(new kvstore::RateLimiter(FLAGS_rebuild_index_part_rate_limit,
FLAGS_rebuild_index_part_rate_limit));
}

ErrorOr<nebula::cpp2::ErrorCode, std::vector<AdminSubTask>> RebuildIndexTask::genSubTasks() {
Expand Down Expand Up @@ -80,9 +78,9 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<AdminSubTask>> RebuildIndexTask::ge
nebula::cpp2::ErrorCode RebuildIndexTask::invoke(GraphSpaceID space,
PartitionID part,
const IndexItems& items) {
auto rateLimiter = std::make_unique<kvstore::RateLimiter>(FLAGS_rebuild_index_part_rate_limit,
FLAGS_rebuild_index_part_rate_limit);
// TaskMananger will make sure that there won't be cocurrent invoke of a given part
rateLimiter_->add(space, part);
SCOPE_EXIT { rateLimiter_->remove(space, part); };
auto result = removeLegacyLogs(space, part);
if (result != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Remove legacy logs at part: " << part << " failed";
Expand All @@ -97,7 +95,7 @@ nebula::cpp2::ErrorCode RebuildIndexTask::invoke(GraphSpaceID space,
env_->rebuildIndexGuard_->assign(std::make_tuple(space, part), IndexState::BUILDING);

LOG(INFO) << "Start building index";
result = buildIndexGlobal(space, part, items);
result = buildIndexGlobal(space, part, items, rateLimiter.get());
if (result != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Building index failed";
return nebula::cpp2::ErrorCode::E_REBUILD_INDEX_FAILED;
Expand All @@ -106,7 +104,7 @@ nebula::cpp2::ErrorCode RebuildIndexTask::invoke(GraphSpaceID space,
}

LOG(INFO) << folly::sformat("Processing operation logs, space={}, part={}", space, part);
result = buildIndexOnOperations(space, part);
result = buildIndexOnOperations(space, part, rateLimiter.get());
if (result != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << folly::sformat(
"Building index with operation logs failed, space={}, part={}", space, part);
Expand All @@ -118,8 +116,8 @@ nebula::cpp2::ErrorCode RebuildIndexTask::invoke(GraphSpaceID space,
return result;
}

nebula::cpp2::ErrorCode RebuildIndexTask::buildIndexOnOperations(GraphSpaceID space,
PartitionID part) {
nebula::cpp2::ErrorCode RebuildIndexTask::buildIndexOnOperations(
GraphSpaceID space, PartitionID part, kvstore::RateLimiter* rateLimiter) {
if (canceled_) {
LOG(INFO) << folly::sformat("Rebuild index canceled, space={}, part={}", space, part);
return nebula::cpp2::ErrorCode::SUCCEEDED;
Expand Down Expand Up @@ -154,7 +152,7 @@ nebula::cpp2::ErrorCode RebuildIndexTask::buildIndexOnOperations(GraphSpaceID sp

batchHolder->remove(opKey.str());
if (batchHolder->size() > FLAGS_rebuild_index_batch_size) {
auto ret = writeOperation(space, part, batchHolder.get());
auto ret = writeOperation(space, part, batchHolder.get(), rateLimiter);
if (nebula::cpp2::ErrorCode::SUCCEEDED != ret) {
LOG(ERROR) << "Write Operation Failed";
return ret;
Expand All @@ -163,7 +161,7 @@ nebula::cpp2::ErrorCode RebuildIndexTask::buildIndexOnOperations(GraphSpaceID sp
operationIter->next();
}

auto ret = writeOperation(space, part, batchHolder.get());
auto ret = writeOperation(space, part, batchHolder.get(), rateLimiter);
if (nebula::cpp2::ErrorCode::SUCCEEDED != ret) {
LOG(ERROR) << "Write Operation Failed";
return ret;
Expand Down Expand Up @@ -219,10 +217,11 @@ nebula::cpp2::ErrorCode RebuildIndexTask::removeLegacyLogs(GraphSpaceID space, P
nebula::cpp2::ErrorCode RebuildIndexTask::writeData(GraphSpaceID space,
PartitionID part,
std::vector<kvstore::KV> data,
size_t batchSize) {
size_t batchSize,
kvstore::RateLimiter* rateLimiter) {
folly::Baton<true, std::atomic> baton;
auto result = nebula::cpp2::ErrorCode::SUCCEEDED;
rateLimiter_->consume(space, part, batchSize);
rateLimiter->consume(batchSize);
env_->kvstore_->asyncMultiPut(
space, part, std::move(data), [&result, &baton](nebula::cpp2::ErrorCode code) {
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand All @@ -236,11 +235,12 @@ nebula::cpp2::ErrorCode RebuildIndexTask::writeData(GraphSpaceID space,

nebula::cpp2::ErrorCode RebuildIndexTask::writeOperation(GraphSpaceID space,
PartitionID part,
kvstore::BatchHolder* batchHolder) {
kvstore::BatchHolder* batchHolder,
kvstore::RateLimiter* rateLimiter) {
folly::Baton<true, std::atomic> baton;
auto result = nebula::cpp2::ErrorCode::SUCCEEDED;
auto encoded = encodeBatchValue(batchHolder->getBatch());
rateLimiter_->consume(space, part, batchHolder->size());
rateLimiter->consume(batchHolder->size());
env_->kvstore_->asyncAppendBatch(
space, part, std::move(encoded), [&result, &baton](nebula::cpp2::ErrorCode code) {
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand Down
14 changes: 9 additions & 5 deletions src/storage/admin/RebuildIndexTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,30 +34,34 @@ class RebuildIndexTask : public AdminTask {

virtual nebula::cpp2::ErrorCode buildIndexGlobal(GraphSpaceID space,
PartitionID part,
const IndexItems& items) = 0;
const IndexItems& items,
kvstore::RateLimiter* rateLimiter) = 0;

void cancel() override { canceled_ = true; }

nebula::cpp2::ErrorCode buildIndexOnOperations(GraphSpaceID space, PartitionID part);
nebula::cpp2::ErrorCode buildIndexOnOperations(GraphSpaceID space,
PartitionID part,
kvstore::RateLimiter* rateLimiter);

// Remove the legacy operation log to make sure the index is correct.
nebula::cpp2::ErrorCode removeLegacyLogs(GraphSpaceID space, PartitionID part);

nebula::cpp2::ErrorCode writeData(GraphSpaceID space,
PartitionID part,
std::vector<kvstore::KV> data,
size_t batchSize);
size_t batchSize,
kvstore::RateLimiter* rateLimiter);

nebula::cpp2::ErrorCode writeOperation(GraphSpaceID space,
PartitionID part,
kvstore::BatchHolder* batchHolder);
kvstore::BatchHolder* batchHolder,
kvstore::RateLimiter* rateLimiter);

nebula::cpp2::ErrorCode invoke(GraphSpaceID space, PartitionID part, const IndexItems& items);

protected:
std::atomic<bool> canceled_{false};
GraphSpaceID space_;
std::unique_ptr<kvstore::RateLimiter> rateLimiter_;
};

} // namespace storage
Expand Down
Loading