diff --git a/src/kvstore/NebulaSnapshotManager.cpp b/src/kvstore/NebulaSnapshotManager.cpp index ce6514e29ac..4cbc6dadc32 100644 --- a/src/kvstore/NebulaSnapshotManager.cpp +++ b/src/kvstore/NebulaSnapshotManager.cpp @@ -23,13 +23,15 @@ const int32_t kReserveNum = 1024 * 4; NebulaSnapshotManager::NebulaSnapshotManager(NebulaStore* kv) : store_(kv) { // Snapshot rate is limited to FLAGS_snapshot_worker_threads * FLAGS_snapshot_part_rate_limit. // So by default, the total send rate is limited to 4 * 2Mb = 8Mb. - rateLimiter_.reset( - new RateLimiter(FLAGS_snapshot_part_rate_limit, FLAGS_snapshot_part_rate_limit)); + LOG(INFO) << "Send snapshot is rate limited to " << FLAGS_snapshot_part_rate_limit + << " for each part"; } void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId, PartitionID partId, raftex::SnapshotCallback cb) { + auto rateLimiter = std::make_unique(FLAGS_snapshot_part_rate_limit, + FLAGS_snapshot_part_rate_limit); CHECK_NOTNULL(store_); auto tables = NebulaKeyUtils::snapshotPrefix(partId); std::vector data; @@ -41,12 +43,9 @@ void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId, partId, FLAGS_snapshot_part_rate_limit, FLAGS_snapshot_batch_size); - // raft will make sure that there won't be concurrent accessAllRowsInSnapshot of a given partition - rateLimiter_->add(spaceId, partId); - SCOPE_EXIT { rateLimiter_->remove(spaceId, partId); }; for (const auto& prefix : tables) { - if (!accessTable(spaceId, partId, prefix, cb, data, totalCount, totalSize)) { + if (!accessTable(spaceId, partId, prefix, cb, data, totalCount, totalSize, rateLimiter.get())) { return; } } @@ -61,7 +60,8 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId, raftex::SnapshotCallback& cb, std::vector& data, int64_t& totalCount, - int64_t& totalSize) { + int64_t& totalSize, + kvstore::RateLimiter* rateLimiter) { std::unique_ptr iter; auto ret = store_->prefix(spaceId, partId, prefix, &iter); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { @@ -74,7 +74,7 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId, size_t batchSize = 0; while (iter && iter->valid()) { if (batchSize >= FLAGS_snapshot_batch_size) { - rateLimiter_->consume(spaceId, partId, batchSize); + rateLimiter->consume(batchSize); if (cb(data, totalCount, totalSize, raftex::SnapshotStatus::IN_PROGRESS)) { data.clear(); batchSize = 0; diff --git a/src/kvstore/NebulaSnapshotManager.h b/src/kvstore/NebulaSnapshotManager.h index 9b28c1fcbcf..5cc39ec1a4f 100644 --- a/src/kvstore/NebulaSnapshotManager.h +++ b/src/kvstore/NebulaSnapshotManager.h @@ -31,9 +31,9 @@ class NebulaSnapshotManager : public raftex::SnapshotManager { raftex::SnapshotCallback& cb, std::vector& data, int64_t& totalCount, - int64_t& totalSize); + int64_t& totalSize, + kvstore::RateLimiter* rateLimiter); - std::unique_ptr rateLimiter_; NebulaStore* store_; }; diff --git a/src/kvstore/RateLimiter.h b/src/kvstore/RateLimiter.h index eb87512a52f..1d0019579fc 100644 --- a/src/kvstore/RateLimiter.h +++ b/src/kvstore/RateLimiter.h @@ -22,50 +22,29 @@ namespace kvstore { class RateLimiter { public: RateLimiter(int32_t rate, int32_t burstSize) - : rate_(static_cast(rate)), burstSize_(static_cast(burstSize)) {} - - void add(GraphSpaceID spaceId, PartitionID partId) { - std::lock_guard guard(lock_); - DCHECK(buckets_.find({spaceId, partId}) == buckets_.end()); + : rate_(static_cast(rate)), burstSize_(static_cast(burstSize)) { // token will be available after 1 second, to prevent speed spike at the beginning auto now = time::WallClock::fastNowInSec(); int64_t waitInSec = FLAGS_skip_wait_in_rate_limiter ? 0 : 1; - folly::TokenBucket bucket(rate_, burstSize_, static_cast(now + waitInSec)); - buckets_.emplace(std::make_pair(spaceId, partId), std::move(bucket)); - } - - void remove(GraphSpaceID spaceId, PartitionID partId) { - std::lock_guard guard(lock_); - DCHECK(buckets_.find({spaceId, partId}) != buckets_.end()); - buckets_.erase({spaceId, partId}); + bucket_.reset(new folly::TokenBucket(rate_, burstSize_, static_cast(now + waitInSec))); } // Caller must make sure the **the parition has been add, and won't be removed during consume.** // Snaphot and rebuild index follow this principle by design. - void consume(GraphSpaceID spaceId, PartitionID partId, size_t toConsume) { - // todo(doodle): enable this DCHECK later - // DCHECK(buckets_.find({spaceId, partId}) != buckets_.end()); - auto iter = buckets_.find({spaceId, partId}); - if (iter != buckets_.end()) { - if (toConsume > burstSize_) { - // consumeWithBorrowAndWait do nothing when toConsume > burstSize_, we sleep 1s instead - std::this_thread::sleep_for(std::chrono::seconds(1)); - } else { - // If there are enouth tokens, consume and return immediately. - // If not, cosume anyway, but sleep enough time before return. - auto now = time::WallClock::fastNowInSec(); - iter->second.consumeWithBorrowAndWait(static_cast(toConsume), - static_cast(now)); - } + void consume(size_t toConsume) { + if (toConsume > burstSize_) { + // consumeWithBorrowAndWait do nothing when toConsume > burstSize_, we sleep 1s instead + std::this_thread::sleep_for(std::chrono::seconds(1)); } else { - LOG_EVERY_N(WARNING, 100) << folly::format( - "Rate limiter is not available for [{},{}]", spaceId, partId); + // If there are enouth tokens, consume and return immediately. + // If not, cosume anyway, but sleep enough time before return. + auto now = time::WallClock::fastNowInSec(); + bucket_->consumeWithBorrowAndWait(static_cast(toConsume), static_cast(now)); } } private: - std::unordered_map, folly::TokenBucket> buckets_; - std::mutex lock_; + std::unique_ptr bucket_; double rate_; double burstSize_; }; diff --git a/src/kvstore/test/RateLimiterTest.cpp b/src/kvstore/test/RateLimiterTest.cpp index 1902f2fa225..f01fc70d071 100644 --- a/src/kvstore/test/RateLimiterTest.cpp +++ b/src/kvstore/test/RateLimiterTest.cpp @@ -18,45 +18,33 @@ namespace kvstore { TEST(RateLimter, ConsumeLessEqualThanBurst) { RateLimiter limiter(FLAGS_snapshot_part_rate_limit, FLAGS_snapshot_part_rate_limit); - GraphSpaceID spaceId = 1; - PartitionID partId = 1; - limiter.add(spaceId, partId); auto now = time::WallClock::fastNowInSec(); int64_t count = 0; while (count++ < 50) { - limiter.consume(spaceId, partId, FLAGS_snapshot_part_rate_limit / 10); + limiter.consume(FLAGS_snapshot_part_rate_limit / 10); } EXPECT_GE(time::WallClock::fastNowInSec() - now, 5); - limiter.remove(spaceId, partId); } TEST(RateLimter, ConsumeGreaterThanBurst) { RateLimiter limiter(FLAGS_snapshot_part_rate_limit, FLAGS_snapshot_part_rate_limit / 10); - GraphSpaceID spaceId = 1; - PartitionID partId = 1; - limiter.add(spaceId, partId); auto now = time::WallClock::fastNowInSec(); int64_t count = 0; while (count++ < 5) { // greater than burst size, will sleep 1 second instead - limiter.consume(spaceId, partId, FLAGS_snapshot_part_rate_limit); + limiter.consume(FLAGS_snapshot_part_rate_limit); } EXPECT_GE(time::WallClock::fastNowInSec() - now, 5); - limiter.remove(spaceId, partId); } TEST(RateLimter, RateLessThanBurst) { RateLimiter limiter(FLAGS_snapshot_part_rate_limit, 2 * FLAGS_snapshot_part_rate_limit); - GraphSpaceID spaceId = 1; - PartitionID partId = 1; - limiter.add(spaceId, partId); auto now = time::WallClock::fastNowInSec(); int64_t count = 0; while (count++ < 5) { - limiter.consume(spaceId, partId, FLAGS_snapshot_part_rate_limit); + limiter.consume(FLAGS_snapshot_part_rate_limit); } EXPECT_GE(time::WallClock::fastNowInSec() - now, 5); - limiter.remove(spaceId, partId); } } // namespace kvstore diff --git a/src/storage/admin/RebuildEdgeIndexTask.cpp b/src/storage/admin/RebuildEdgeIndexTask.cpp index 185c7d57e97..58c0bd5d0de 100644 --- a/src/storage/admin/RebuildEdgeIndexTask.cpp +++ b/src/storage/admin/RebuildEdgeIndexTask.cpp @@ -26,7 +26,8 @@ StatusOr> RebuildEdgeIndexTask::getIndex( nebula::cpp2::ErrorCode RebuildEdgeIndexTask::buildIndexGlobal(GraphSpaceID space, PartitionID part, - const IndexItems& items) { + const IndexItems& items, + kvstore::RateLimiter* rateLimiter) { if (canceled_) { LOG(ERROR) << "Rebuild Edge Index is Canceled"; return nebula::cpp2::ErrorCode::SUCCEEDED; @@ -66,7 +67,7 @@ nebula::cpp2::ErrorCode RebuildEdgeIndexTask::buildIndexGlobal(GraphSpaceID spac } if (batchSize >= FLAGS_rebuild_index_batch_size) { - auto result = writeData(space, part, data, batchSize); + auto result = writeData(space, part, data, batchSize, rateLimiter); if (result != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << "Write Part " << part << " Index Failed"; return result; @@ -160,7 +161,7 @@ nebula::cpp2::ErrorCode RebuildEdgeIndexTask::buildIndexGlobal(GraphSpaceID spac iter->next(); } - auto result = writeData(space, part, std::move(data), batchSize); + auto result = writeData(space, part, std::move(data), batchSize, rateLimiter); if (result != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << "Write Part " << part << " Index Failed"; return nebula::cpp2::ErrorCode::E_STORE_FAILURE; diff --git a/src/storage/admin/RebuildEdgeIndexTask.h b/src/storage/admin/RebuildEdgeIndexTask.h index 4c03c548701..f42bac1fea7 100644 --- a/src/storage/admin/RebuildEdgeIndexTask.h +++ b/src/storage/admin/RebuildEdgeIndexTask.h @@ -25,7 +25,8 @@ class RebuildEdgeIndexTask : public RebuildIndexTask { nebula::cpp2::ErrorCode buildIndexGlobal(GraphSpaceID space, PartitionID part, - const IndexItems& items) override; + const IndexItems& items, + kvstore::RateLimiter* rateLimiter) override; }; } // namespace storage diff --git a/src/storage/admin/RebuildIndexTask.cpp b/src/storage/admin/RebuildIndexTask.cpp index a55b795151b..aee855a8cad 100644 --- a/src/storage/admin/RebuildIndexTask.cpp +++ b/src/storage/admin/RebuildIndexTask.cpp @@ -24,8 +24,6 @@ RebuildIndexTask::RebuildIndexTask(StorageEnv* env, TaskContext&& ctx) // 10Mb, which is non-trival. LOG(INFO) << "Rebuild index task is rate limited to " << FLAGS_rebuild_index_part_rate_limit << " for each subtask"; - rateLimiter_.reset(new kvstore::RateLimiter(FLAGS_rebuild_index_part_rate_limit, - FLAGS_rebuild_index_part_rate_limit)); } ErrorOr> RebuildIndexTask::genSubTasks() { @@ -80,9 +78,9 @@ ErrorOr> RebuildIndexTask::ge nebula::cpp2::ErrorCode RebuildIndexTask::invoke(GraphSpaceID space, PartitionID part, const IndexItems& items) { + auto rateLimiter = std::make_unique(FLAGS_rebuild_index_part_rate_limit, + FLAGS_rebuild_index_part_rate_limit); // TaskMananger will make sure that there won't be cocurrent invoke of a given part - rateLimiter_->add(space, part); - SCOPE_EXIT { rateLimiter_->remove(space, part); }; auto result = removeLegacyLogs(space, part); if (result != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << "Remove legacy logs at part: " << part << " failed"; @@ -97,7 +95,7 @@ nebula::cpp2::ErrorCode RebuildIndexTask::invoke(GraphSpaceID space, env_->rebuildIndexGuard_->assign(std::make_tuple(space, part), IndexState::BUILDING); LOG(INFO) << "Start building index"; - result = buildIndexGlobal(space, part, items); + result = buildIndexGlobal(space, part, items, rateLimiter.get()); if (result != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << "Building index failed"; return nebula::cpp2::ErrorCode::E_REBUILD_INDEX_FAILED; @@ -106,7 +104,7 @@ nebula::cpp2::ErrorCode RebuildIndexTask::invoke(GraphSpaceID space, } LOG(INFO) << folly::sformat("Processing operation logs, space={}, part={}", space, part); - result = buildIndexOnOperations(space, part); + result = buildIndexOnOperations(space, part, rateLimiter.get()); if (result != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << folly::sformat( "Building index with operation logs failed, space={}, part={}", space, part); @@ -118,8 +116,8 @@ nebula::cpp2::ErrorCode RebuildIndexTask::invoke(GraphSpaceID space, return result; } -nebula::cpp2::ErrorCode RebuildIndexTask::buildIndexOnOperations(GraphSpaceID space, - PartitionID part) { +nebula::cpp2::ErrorCode RebuildIndexTask::buildIndexOnOperations( + GraphSpaceID space, PartitionID part, kvstore::RateLimiter* rateLimiter) { if (canceled_) { LOG(INFO) << folly::sformat("Rebuild index canceled, space={}, part={}", space, part); return nebula::cpp2::ErrorCode::SUCCEEDED; @@ -154,7 +152,7 @@ nebula::cpp2::ErrorCode RebuildIndexTask::buildIndexOnOperations(GraphSpaceID sp batchHolder->remove(opKey.str()); if (batchHolder->size() > FLAGS_rebuild_index_batch_size) { - auto ret = writeOperation(space, part, batchHolder.get()); + auto ret = writeOperation(space, part, batchHolder.get(), rateLimiter); if (nebula::cpp2::ErrorCode::SUCCEEDED != ret) { LOG(ERROR) << "Write Operation Failed"; return ret; @@ -163,7 +161,7 @@ nebula::cpp2::ErrorCode RebuildIndexTask::buildIndexOnOperations(GraphSpaceID sp operationIter->next(); } - auto ret = writeOperation(space, part, batchHolder.get()); + auto ret = writeOperation(space, part, batchHolder.get(), rateLimiter); if (nebula::cpp2::ErrorCode::SUCCEEDED != ret) { LOG(ERROR) << "Write Operation Failed"; return ret; @@ -219,10 +217,11 @@ nebula::cpp2::ErrorCode RebuildIndexTask::removeLegacyLogs(GraphSpaceID space, P nebula::cpp2::ErrorCode RebuildIndexTask::writeData(GraphSpaceID space, PartitionID part, std::vector data, - size_t batchSize) { + size_t batchSize, + kvstore::RateLimiter* rateLimiter) { folly::Baton baton; auto result = nebula::cpp2::ErrorCode::SUCCEEDED; - rateLimiter_->consume(space, part, batchSize); + rateLimiter->consume(batchSize); env_->kvstore_->asyncMultiPut( space, part, std::move(data), [&result, &baton](nebula::cpp2::ErrorCode code) { if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { @@ -236,11 +235,12 @@ nebula::cpp2::ErrorCode RebuildIndexTask::writeData(GraphSpaceID space, nebula::cpp2::ErrorCode RebuildIndexTask::writeOperation(GraphSpaceID space, PartitionID part, - kvstore::BatchHolder* batchHolder) { + kvstore::BatchHolder* batchHolder, + kvstore::RateLimiter* rateLimiter) { folly::Baton baton; auto result = nebula::cpp2::ErrorCode::SUCCEEDED; auto encoded = encodeBatchValue(batchHolder->getBatch()); - rateLimiter_->consume(space, part, batchHolder->size()); + rateLimiter->consume(batchHolder->size()); env_->kvstore_->asyncAppendBatch( space, part, std::move(encoded), [&result, &baton](nebula::cpp2::ErrorCode code) { if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { diff --git a/src/storage/admin/RebuildIndexTask.h b/src/storage/admin/RebuildIndexTask.h index 7d357594983..772cc28e09e 100644 --- a/src/storage/admin/RebuildIndexTask.h +++ b/src/storage/admin/RebuildIndexTask.h @@ -34,11 +34,14 @@ class RebuildIndexTask : public AdminTask { virtual nebula::cpp2::ErrorCode buildIndexGlobal(GraphSpaceID space, PartitionID part, - const IndexItems& items) = 0; + const IndexItems& items, + kvstore::RateLimiter* rateLimiter) = 0; void cancel() override { canceled_ = true; } - nebula::cpp2::ErrorCode buildIndexOnOperations(GraphSpaceID space, PartitionID part); + nebula::cpp2::ErrorCode buildIndexOnOperations(GraphSpaceID space, + PartitionID part, + kvstore::RateLimiter* rateLimiter); // Remove the legacy operation log to make sure the index is correct. nebula::cpp2::ErrorCode removeLegacyLogs(GraphSpaceID space, PartitionID part); @@ -46,18 +49,19 @@ class RebuildIndexTask : public AdminTask { nebula::cpp2::ErrorCode writeData(GraphSpaceID space, PartitionID part, std::vector data, - size_t batchSize); + size_t batchSize, + kvstore::RateLimiter* rateLimiter); nebula::cpp2::ErrorCode writeOperation(GraphSpaceID space, PartitionID part, - kvstore::BatchHolder* batchHolder); + kvstore::BatchHolder* batchHolder, + kvstore::RateLimiter* rateLimiter); nebula::cpp2::ErrorCode invoke(GraphSpaceID space, PartitionID part, const IndexItems& items); protected: std::atomic canceled_{false}; GraphSpaceID space_; - std::unique_ptr rateLimiter_; }; } // namespace storage diff --git a/src/storage/admin/RebuildTagIndexTask.cpp b/src/storage/admin/RebuildTagIndexTask.cpp index 359b21c0d8b..fa5cd7edbfb 100644 --- a/src/storage/admin/RebuildTagIndexTask.cpp +++ b/src/storage/admin/RebuildTagIndexTask.cpp @@ -26,7 +26,8 @@ StatusOr> RebuildTagIndexTask::getIndex(G nebula::cpp2::ErrorCode RebuildTagIndexTask::buildIndexGlobal(GraphSpaceID space, PartitionID part, - const IndexItems& items) { + const IndexItems& items, + kvstore::RateLimiter* rateLimiter) { if (canceled_) { LOG(ERROR) << "Rebuild Tag Index is Canceled"; return nebula::cpp2::ErrorCode::SUCCEEDED; @@ -64,7 +65,7 @@ nebula::cpp2::ErrorCode RebuildTagIndexTask::buildIndexGlobal(GraphSpaceID space } if (batchSize >= FLAGS_rebuild_index_batch_size) { - auto result = writeData(space, part, data, batchSize); + auto result = writeData(space, part, data, batchSize, rateLimiter); if (result != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << "Write Part " << part << " Index Failed"; return result; @@ -141,7 +142,7 @@ nebula::cpp2::ErrorCode RebuildTagIndexTask::buildIndexGlobal(GraphSpaceID space iter->next(); } - auto result = writeData(space, part, std::move(data), batchSize); + auto result = writeData(space, part, std::move(data), batchSize, rateLimiter); if (result != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << "Write Part " << part << " Index Failed"; return nebula::cpp2::ErrorCode::E_STORE_FAILURE; diff --git a/src/storage/admin/RebuildTagIndexTask.h b/src/storage/admin/RebuildTagIndexTask.h index 8a57e7da460..07ecb26c60e 100644 --- a/src/storage/admin/RebuildTagIndexTask.h +++ b/src/storage/admin/RebuildTagIndexTask.h @@ -26,7 +26,8 @@ class RebuildTagIndexTask : public RebuildIndexTask { nebula::cpp2::ErrorCode buildIndexGlobal(GraphSpaceID space, PartitionID part, - const IndexItems& items) override; + const IndexItems& items, + kvstore::RateLimiter* rateLimiter) override; }; } // namespace storage