Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Relay rate limits #906

Merged
merged 15 commits into from
Nov 19, 2024
34 changes: 22 additions & 12 deletions relay/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/aws"
core "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/relay/limiter"
"github.com/spf13/viper"
"os"
"strings"
Expand Down Expand Up @@ -76,23 +77,32 @@ type Config struct {
// ChunkMaxConcurrency is the size of the work pool for fetching chunks. Default is 32. Note that this does not
// impact concurrency utilized by the s3 client to upload/download fragmented files.
ChunkMaxConcurrency int

// MaxKeysPerGetChunksRequest is the maximum number of keys that can be requested in a single GetChunks request.
// Default is 1024. // TODO should this be the max batch size? What is that?
MaxKeysPerGetChunksRequest int
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ian-shim what is the maximum number of keys we should permit in a single request?

Copy link
Contributor

@ian-shim ian-shim Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can start with something like 512? It will probably hit the grpc limit anyways if it goes beyond that.
Actually let's leave this with 1024 and talk about this offline.


// RateLimits contains configuration for rate limiting.
RateLimits limiter.Config
}

// DefaultConfig returns the default configuration for the relay Server.
func DefaultConfig() *Config {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we move this to test file? actual default values for optional fields can be set in flags.go

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved

return &Config{
Log: common.DefaultLoggerConfig(),
AWS: *aws.DefaultClientConfig(),
GRPCPort: 50051,
MaxGRPCMessageSize: 1024 * 1024 * 300,
BucketName: "relay",
MetadataTableName: "metadata",
MetadataCacheSize: 1024 * 1024,
MetadataMaxConcurrency: 32,
BlobCacheSize: 32,
BlobMaxConcurrency: 32,
ChunkCacheSize: 32,
ChunkMaxConcurrency: 32,
Log: common.DefaultLoggerConfig(),
AWS: *aws.DefaultClientConfig(),
GRPCPort: 50051,
MaxGRPCMessageSize: 1024 * 1024 * 300,
BucketName: "relay",
MetadataTableName: "metadata",
MetadataCacheSize: 1024 * 1024,
MetadataMaxConcurrency: 32,
BlobCacheSize: 32,
BlobMaxConcurrency: 32,
ChunkCacheSize: 32,
ChunkMaxConcurrency: 32,
MaxKeysPerGetChunksRequest: 1024,
RateLimits: *limiter.DefaultConfig(),
}
}

Expand Down
93 changes: 93 additions & 0 deletions relay/limiter/blob_rate_limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package limiter

import (
"fmt"
"golang.org/x/time/rate"
"sync/atomic"
"time"
)

// BlobRateLimiter enforces rate limits on GetBlob operations.
type BlobRateLimiter struct {

// config is the rate limit configuration.
config *Config

// opLimiter enforces rate limits on the maximum rate of GetBlob operations
opLimiter *rate.Limiter

// bandwidthLimiter enforces rate limits on the maximum bandwidth consumed by GetBlob operations. Only the size
// of the blob data is considered, not the size of the entire response.
bandwidthLimiter *rate.Limiter

// operationsInFlight is the number of GetBlob operations currently in flight.
operationsInFlight atomic.Int64
}

// NewBlobRateLimiter creates a new BlobRateLimiter.
func NewBlobRateLimiter(config *Config) *BlobRateLimiter {
globalGetBlobOpLimiter := rate.NewLimiter(
rate.Limit(config.MaxGetBlobOpsPerSecond),
config.GetBlobOpsBurstiness)

globalGetBlobBandwidthLimiter := rate.NewLimiter(
rate.Limit(config.MaxGetBlobBytesPerSecond),
config.GetBlobBytesBurstiness)

return &BlobRateLimiter{
config: config,
opLimiter: globalGetBlobOpLimiter,
bandwidthLimiter: globalGetBlobBandwidthLimiter,
}
}

// BeginGetBlobOperation should be called when a GetBlob operation is about to begin. If it returns an error,
// the operation should not be performed. If it does not return an error, FinishGetBlobOperation should be
// called when the operation completes.
func (l *BlobRateLimiter) BeginGetBlobOperation(now time.Time) error {
if l == nil {
// If the rate limiter is nil, do not enforce rate limits.
return nil
}

countInFlight := l.operationsInFlight.Add(1)
if countInFlight > int64(l.config.MaxConcurrentGetBlobOps) {
l.operationsInFlight.Add(-1)
return fmt.Errorf("global concurrent request limit exceeded for getBlob operations, try again later")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this whole sequence be atomic? otherwise, can operationsInFlight go over MaxConcurrentGetBlobOps and potentially reject valid operations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The situation where this logic may fail is if the request rate is so high that there are always MaxConcurrentGetBlobOps or more goroutines that happen to be inside this one block of code, which could in theory prevent any actual operations from being served.

Although such a scenario is unlikely, probably better to err on the side of safety. This now uses a mutex to guarantee atomicity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Playing around with this a little, the code gets really ugy when I try and to low resolution locking. Methods in this class (as well as ChunkRateLimiter) are just doing basic arithmetic... let's just use a single mutext to protect the entire class for the sake of simplicity.


allowed := l.opLimiter.AllowN(now, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this different from opLimiter.Allow()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only difference is that the time is supplied from the outside context. This allows me to write unit tests that aren't wall clock dependent.


if !allowed {
l.operationsInFlight.Add(-1)
return fmt.Errorf("global rate limit exceeded for getBlob operations, try again later")
}
return nil
}

// FinishGetBlobOperation should be called exactly once for each time BeginGetBlobOperation is called and
// returns nil.
func (l *BlobRateLimiter) FinishGetBlobOperation() {
if l == nil {
// If the rate limiter is nil, do not enforce rate limits.
return
}

l.operationsInFlight.Add(-1)
}

// RequestGetBlobBandwidth should be called when a GetBlob is about to start downloading blob data
// from S3. It returns an error if there is insufficient bandwidth available. If it returns nil, the
// operation should proceed.
func (l *BlobRateLimiter) RequestGetBlobBandwidth(now time.Time, bytes uint32) error {
if l == nil {
// If the rate limiter is nil, do not enforce rate limits.
return nil
}

allowed := l.bandwidthLimiter.AllowN(now, int(bytes))
if !allowed {
return fmt.Errorf("global rate limit exceeded for getBlob bandwidth, try again later")
}
return nil
}
143 changes: 143 additions & 0 deletions relay/limiter/blob_rate_limiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package limiter

import (
tu "github.com/Layr-Labs/eigenda/common/testutils"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
"testing"
"time"
)

func TestConcurrentBlobOperations(t *testing.T) {
tu.InitializeRandom()

concurrencyLimit := 1 + rand.Intn(10)

config := DefaultConfig()
config.MaxConcurrentGetBlobOps = concurrencyLimit
// Make the burstiness limit high enough that we won't be rate limited
config.GetBlobOpsBurstiness = concurrencyLimit * 100

limiter := NewBlobRateLimiter(config)

// time starts at current time, but advances manually afterward
now := time.Now()

// We should be able to start this many operations concurrently
for i := 0; i < concurrencyLimit; i++ {
err := limiter.BeginGetBlobOperation(now)
require.NoError(t, err)
}

// Starting one more operation should fail due to the concurrency limit
err := limiter.BeginGetBlobOperation(now)
require.Error(t, err)

// Finish an operation. This should permit exactly one more operation to start
limiter.FinishGetBlobOperation()
err = limiter.BeginGetBlobOperation(now)
require.NoError(t, err)
err = limiter.BeginGetBlobOperation(now)
require.Error(t, err)
}

func TestGetBlobOpRateLimit(t *testing.T) {
tu.InitializeRandom()

config := DefaultConfig()
config.MaxGetBlobOpsPerSecond = float64(2 + rand.Intn(10))
config.GetBlobOpsBurstiness = int(config.MaxGetBlobOpsPerSecond) + rand.Intn(10)
config.MaxConcurrentGetBlobOps = 1

limiter := NewBlobRateLimiter(config)

// time starts at current time, but advances manually afterward
now := time.Now()

// Without advancing time, we should be able to perform a number of operations equal to the burstiness limit.
for i := 0; i < config.GetBlobOpsBurstiness; i++ {
err := limiter.BeginGetBlobOperation(now)
require.NoError(t, err)
limiter.FinishGetBlobOperation()
}

// We are not at the rate limit, and should be able to start another operation.
err := limiter.BeginGetBlobOperation(now)
require.Error(t, err)

// Advance time by one second. We should gain a number of tokens equal to the rate limit.
now = now.Add(time.Second)
for i := 0; i < int(config.MaxGetBlobOpsPerSecond); i++ {
err = limiter.BeginGetBlobOperation(now)
require.NoError(t, err)
limiter.FinishGetBlobOperation()
}

// We have once again hit the rate limit. We should not be able to start another operation.
err = limiter.BeginGetBlobOperation(now)
require.Error(t, err)

// Advance time by another second. We should gain another number of tokens equal to the rate limit.
// Intentionally do not finish the next operation. We are attempting to get a failure by exceeding
// the max concurrent operations limit.
now = now.Add(time.Second)
err = limiter.BeginGetBlobOperation(now)
require.NoError(t, err)

// This operation should fail since we have limited concurrent operations to 1. It should not count
// against the rate limit.
err = limiter.BeginGetBlobOperation(now)
require.Error(t, err)

// "finish" the prior operation. Verify that we have all expected tokens available.
limiter.FinishGetBlobOperation()
for i := 0; i < int(config.MaxGetBlobOpsPerSecond)-1; i++ {
err = limiter.BeginGetBlobOperation(now)
require.NoError(t, err)
limiter.FinishGetBlobOperation()
}

// We should now be at the rate limit. We should not be able to start another operation.
err = limiter.BeginGetBlobOperation(now)
require.Error(t, err)
}

func TestGetBlobBandwidthLimit(t *testing.T) {
tu.InitializeRandom()

config := DefaultConfig()
config.MaxGetBlobBytesPerSecond = float64(1024 + rand.Intn(1024*1024))
config.GetBlobBytesBurstiness = int(config.MaxGetBlobBytesPerSecond) + rand.Intn(1024*1024)

limiter := NewBlobRateLimiter(config)

// time starts at current time, but advances manually afterward
now := time.Now()

// Without advancing time, we should be able to utilize a number of bytes equal to the burstiness limit.
bytesRemaining := config.GetBlobBytesBurstiness
for bytesRemaining > 0 {
bytesToRequest := 1 + rand.Intn(bytesRemaining)
err := limiter.RequestGetBlobBandwidth(now, uint32(bytesToRequest))
require.NoError(t, err)
bytesRemaining -= bytesToRequest
}

// Requesting one more byte should fail due to the bandwidth limit
err := limiter.RequestGetBlobBandwidth(now, 1)
require.Error(t, err)

// Advance time by one second. We should gain a number of tokens equal to the rate limit.
now = now.Add(time.Second)
bytesRemaining = int(config.MaxGetBlobBytesPerSecond)
for bytesRemaining > 0 {
bytesToRequest := 1 + rand.Intn(bytesRemaining)
err = limiter.RequestGetBlobBandwidth(now, uint32(bytesToRequest))
require.NoError(t, err)
bytesRemaining -= bytesToRequest
}

// Requesting one more byte should fail due to the bandwidth limit
err = limiter.RequestGetBlobBandwidth(now, 1)
require.Error(t, err)
}
Loading
Loading