Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add blob ratelimit #121

Merged
merged 7 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion disperser/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,4 @@ run_encoder: build_encoder
--kzg.srs-order 3000 \
--kzg.num-workers 12 \
--disperser-encoder.log.level-std trace \
--disperser-encoder.log.level-file trace
--disperser-encoder.log.level-file trace
60 changes: 54 additions & 6 deletions disperser/apiserver/rate_config.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,32 @@
package apiserver

import (
"fmt"
"strconv"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
"github.com/urfave/cli"
)

const (
RegisteredQuorumFlagName = "auth.registered-quorum"
TotalUnauthThroughputFlagName = "auth.total-unauth-throughput"
PerUserUnauthThroughputFlagName = "auth.per-user-unauth-throughput"
TotalUnauthThroughputFlagName = "auth.total-unauth-byte-rate"
PerUserUnauthThroughputFlagName = "auth.per-user-unauth-byte-rate"
TotalUnauthBlobRateFlagName = "auth.total-unauth-blob-rate"
PerUserUnauthBlobRateFlagName = "auth.per-user-unauth-blob-rate"
ClientIPHeaderFlagName = "auth.client-ip-header"

// We allow the user to specify the blob rate in blobs/sec, but internally we use blobs/sec * 1e6 (i.e. blobs/microsec).
// This is because the rate limiter takes an integer rate.
blobRateMultiplier = 1e6
mooselumph marked this conversation as resolved.
Show resolved Hide resolved
)

type QuorumRateInfo struct {
PerUserUnauthThroughput common.RateParam
TotalUnauthThroughput common.RateParam
PerUserUnauthBlobRate common.RateParam
TotalUnauthBlobRate common.RateParam
}

type RateConfig struct {
Expand All @@ -35,13 +46,25 @@ func CLIFlags(envPrefix string) []cli.Flag {
Name: TotalUnauthThroughputFlagName,
Usage: "Total encoded throughput for unauthenticated requests (Bytes/sec)",
Required: true,
EnvVar: common.PrefixEnvVar(envPrefix, "TOTAL_UNAUTH_THROUGHPUT"),
EnvVar: common.PrefixEnvVar(envPrefix, "TOTAL_UNAUTH_BYTE_RATE"),
},
cli.IntSliceFlag{
Name: PerUserUnauthThroughputFlagName,
Usage: "Per-user encoded throughput for unauthenticated requests (Bytes/sec)",
Required: true,
EnvVar: common.PrefixEnvVar(envPrefix, "PER_USER_UNAUTH_THROUGHPUT"),
EnvVar: common.PrefixEnvVar(envPrefix, "PER_USER_UNAUTH_BYTE_RATE"),
},
cli.StringSliceFlag{
Name: TotalUnauthBlobRateFlagName,
Usage: "Total blob rate for unauthenticated requests (Blobs/sec)",
Required: true,
EnvVar: common.PrefixEnvVar(envPrefix, "TOTAL_UNAUTH_BLOB_RATE"),
},
cli.StringSliceFlag{
Name: PerUserUnauthBlobRateFlagName,
Usage: "Per-user blob interval for unauthenticated requests (Blobs/sec)",
Required: true,
EnvVar: common.PrefixEnvVar(envPrefix, "PER_USER_UNAUTH_BLOB_RATE"),
},
cli.StringFlag{
Name: ClientIPHeaderFlagName,
Expand All @@ -53,19 +76,44 @@ func CLIFlags(envPrefix string) []cli.Flag {
}
}

func ReadCLIConfig(c *cli.Context) RateConfig {
func ReadCLIConfig(c *cli.Context) (RateConfig, error) {

numQuorums := len(c.IntSlice(RegisteredQuorumFlagName))
if len(c.StringSlice(TotalUnauthBlobRateFlagName)) != numQuorums {
return RateConfig{}, fmt.Errorf("number of total unauth blob rates does not match number of quorums")
}
if len(c.StringSlice(PerUserUnauthBlobRateFlagName)) != numQuorums {
return RateConfig{}, fmt.Errorf("number of per user unauth blob intervals does not match number of quorums")
}
if len(c.IntSlice(TotalUnauthThroughputFlagName)) != numQuorums {
return RateConfig{}, fmt.Errorf("number of total unauth throughput does not match number of quorums")
}
if len(c.IntSlice(PerUserUnauthThroughputFlagName)) != numQuorums {
return RateConfig{}, fmt.Errorf("number of per user unauth throughput does not match number of quorums")
}

quorumRateInfos := make(map[core.QuorumID]QuorumRateInfo)
for ind, quorumID := range c.IntSlice(RegisteredQuorumFlagName) {

totalBlobRate, err := strconv.ParseFloat(c.StringSlice(TotalUnauthBlobRateFlagName)[ind], 64)
if err != nil {
return RateConfig{}, err
}
accountBlobRate, err := strconv.ParseFloat(c.StringSlice(PerUserUnauthBlobRateFlagName)[ind], 64)
if err != nil {
return RateConfig{}, err
}

quorumRateInfos[core.QuorumID(quorumID)] = QuorumRateInfo{
TotalUnauthThroughput: common.RateParam(c.IntSlice(TotalUnauthThroughputFlagName)[ind]),
PerUserUnauthThroughput: common.RateParam(c.IntSlice(PerUserUnauthThroughputFlagName)[ind]),
TotalUnauthBlobRate: common.RateParam(totalBlobRate * blobRateMultiplier),
PerUserUnauthBlobRate: common.RateParam(accountBlobRate * blobRateMultiplier),
}
}

return RateConfig{
QuorumRateInfos: quorumRateInfos,
ClientIPHeader: c.String(ClientIPHeaderFlagName),
}
}, nil
}
26 changes: 24 additions & 2 deletions disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,22 @@ func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob *
return fmt.Errorf("ratelimiter error: %v", err)
}
if !allowed {
s.logger.Warn("system ratelimit exceeded", "systemQuorumKey", systemQuorumKey, "rate", rates.TotalUnauthThroughput)
s.logger.Warn("system byte ratelimit exceeded", "systemQuorumKey", systemQuorumKey, "rate", rates.TotalUnauthThroughput)
return errSystemRateLimit
}

systemQuorumKey = fmt.Sprintf("%s:%d-blobrate", systemAccountKey, param.QuorumID)
allowed, err = s.ratelimiter.AllowRequest(ctx, systemQuorumKey, blobRateMultiplier, rates.TotalUnauthBlobRate)
if err != nil {
return fmt.Errorf("ratelimiter error: %v", err)
}
if !allowed {
s.logger.Warn("system blob ratelimit exceeded", "systemQuorumKey", systemQuorumKey, "rate", float32(rates.TotalUnauthBlobRate)/blobRateMultiplier)
return errSystemRateLimit
}

// Check Account Ratelimit

blob.RequestHeader.AccountID = "ip:" + origin

userQuorumKey := fmt.Sprintf("%s:%d", blob.RequestHeader.AccountID, param.QuorumID)
Expand All @@ -213,7 +225,17 @@ func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob *
return fmt.Errorf("ratelimiter error: %v", err)
}
if !allowed {
s.logger.Warn("account ratelimit exceeded", "userQuorumKey", userQuorumKey, "rate", rates.PerUserUnauthThroughput)
s.logger.Warn("account byte ratelimit exceeded", "userQuorumKey", userQuorumKey, "rate", rates.PerUserUnauthThroughput)
return errAccountRateLimit
}

userQuorumKey = fmt.Sprintf("%s:%d-blobrate", blob.RequestHeader.AccountID, param.QuorumID)
allowed, err = s.ratelimiter.AllowRequest(ctx, userQuorumKey, blobRateMultiplier, rates.PerUserUnauthBlobRate)
if err != nil {
return fmt.Errorf("ratelimiter error: %v", err)
}
if !allowed {
s.logger.Warn("account blob ratelimit exceeded", "userQuorumKey", userQuorumKey, "rate", float32(rates.PerUserUnauthBlobRate)/blobRateMultiplier)
return errAccountRateLimit
}

Expand Down
7 changes: 6 additions & 1 deletion disperser/cmd/apiserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ func NewConfig(ctx *cli.Context) (Config, error) {
return Config{}, err
}

rateConfig, err := apiserver.ReadCLIConfig(ctx)
if err != nil {
return Config{}, err
}

config := Config{
AwsClientConfig: aws.ReadClientConfig(ctx, flags.FlagPrefix),
ServerConfig: disperser.ServerConfig{
Expand All @@ -51,7 +56,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
EnableMetrics: ctx.GlobalBool(flags.EnableMetrics.Name),
},
RatelimiterConfig: ratelimiterConfig,
RateConfig: apiserver.ReadCLIConfig(ctx),
RateConfig: rateConfig,
EnableRatelimiter: ctx.GlobalBool(flags.EnableRatelimiter.Name),
BucketTableName: ctx.GlobalString(flags.BucketTableName.Name),
BucketStoreSize: ctx.GlobalInt(flags.BucketStoreSize.Name),
Expand Down
10 changes: 6 additions & 4 deletions inabox/deploy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,12 @@ func (env *Config) generateDisperserVars(ind int, key, address, logPath, dbPath,
DISPERSER_SERVER_PRIVATE_KEY: "123",
DISPERSER_SERVER_NUM_CONFIRMATIONS: "0",

DISPERSER_SERVER_REGISTERED_QUORUM_ID: "0",
DISPERSER_SERVER_TOTAL_UNAUTH_THROUGHPUT: "10000000",
DISPERSER_SERVER_PER_USER_UNAUTH_THROUGHPUT: "32000",
DISPERSER_SERVER_ENABLE_RATELIMITER: "true",
DISPERSER_SERVER_REGISTERED_QUORUM_ID: "0",
DISPERSER_SERVER_TOTAL_UNAUTH_BYTE_RATE: "10000000",
DISPERSER_SERVER_PER_USER_UNAUTH_BYTE_RATE: "32000",
DISPERSER_SERVER_TOTAL_UNAUTH_BLOB_RATE: "10",
DISPERSER_SERVER_PER_USER_UNAUTH_BLOB_RATE: "2",
DISPERSER_SERVER_ENABLE_RATELIMITER: "true",

DISPERSER_SERVER_BUCKET_SIZES: "5s",
DISPERSER_SERVER_BUCKET_MULTIPLIERS: "1",
Expand Down
14 changes: 12 additions & 2 deletions inabox/deploy/env_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type DisperserVars struct {

DISPERSER_SERVER_BUCKET_STORE_SIZE string

DISPERSER_SERVER_ALLOWLIST string

DISPERSER_SERVER_AWS_REGION string

DISPERSER_SERVER_AWS_ACCESS_KEY_ID string
Expand All @@ -53,9 +55,13 @@ type DisperserVars struct {

DISPERSER_SERVER_REGISTERED_QUORUM_ID string

DISPERSER_SERVER_TOTAL_UNAUTH_THROUGHPUT string
DISPERSER_SERVER_TOTAL_UNAUTH_BYTE_RATE string

DISPERSER_SERVER_PER_USER_UNAUTH_BYTE_RATE string

DISPERSER_SERVER_TOTAL_UNAUTH_BLOB_RATE string

DISPERSER_SERVER_PER_USER_UNAUTH_THROUGHPUT string
DISPERSER_SERVER_PER_USER_UNAUTH_BLOB_RATE string

DISPERSER_SERVER_CLIENT_IP_HEADER string
}
Expand Down Expand Up @@ -303,6 +309,10 @@ type RetrieverVars struct {

RETRIEVER_METRICS_HTTP_PORT string

RETRIEVER_GRAPH_URL string

RETRIEVER_USE_GRAPH string

RETRIEVER_G1_PATH string

RETRIEVER_G2_PATH string
Expand Down
2 changes: 1 addition & 1 deletion inabox/tests/integration_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func setupRetrievalClient(testConfig *deploy.Config) error {
NumWorker: 1,
SRSOrder: uint64(srsOrder),
Verbose: true,
PreloadEncoder: true,
PreloadEncoder: false,
},
})
if err != nil {
Expand Down
33 changes: 30 additions & 3 deletions inabox/tests/ratelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type result struct {
err error
}

func dispserse(t *testing.T, ctx context.Context, client traffic.DisperserClient, resultChan chan result, data []byte, param core.SecurityParam) {
func disperse(t *testing.T, ctx context.Context, client traffic.DisperserClient, resultChan chan result, data []byte, param core.SecurityParam) {

blobStatus, key, err := client.DisperseBlob(ctx, data, param.QuorumID, param.QuorumThreshold, param.AdversaryThreshold)
if err != nil {
Expand Down Expand Up @@ -127,7 +127,7 @@ func testRatelimit(t *testing.T, testConfig *deploy.Config, c ratelimitTestCase)
go func() {
for i := 0; i < c.numDispersal; i++ {
<-dispersalTicker.C
go dispserse(t, ctx, disp, resultChan, data, c.param)
go disperse(t, ctx, disp, resultChan, data, c.param)
}
}()

Expand Down Expand Up @@ -181,7 +181,7 @@ func TestRatelimit(t *testing.T) {
}
testConfig := deploy.NewTestConfig(testname, rootPath)

if testConfig.Dispersers[0].DISPERSER_SERVER_PER_USER_UNAUTH_THROUGHPUT != fmt.Sprint(perUserThroughput) {
if testConfig.Dispersers[0].DISPERSER_SERVER_PER_USER_UNAUTH_BYTE_RATE != fmt.Sprint(perUserThroughput) {
t.Fatalf("per user throughput should be %v", perUserThroughput)
}
if testConfig.Dispersers[0].DISPERSER_SERVER_BUCKET_MULTIPLIERS != fmt.Sprint(dispersalMultiplier) {
Expand Down Expand Up @@ -281,4 +281,31 @@ func TestRatelimit(t *testing.T) {

})

t.Run("ratelimiting when dispersing greater than blob rate", func(t *testing.T) {

t.Skip("Manual test for now")

testCase := ratelimitTestCase{
numDispersal: 200,
numRetrieval: 0,
dispersalInterval: 450 * time.Millisecond,
retrievalInterval: 500 * time.Millisecond,
pause: 0,
blobSize: 5,
param: core.SecurityParam{
QuorumID: 0,
AdversaryThreshold: 50,
QuorumThreshold: 100,
},
}

dispersalErrors, retrievalErrors := testRatelimit(t, testConfig, testCase)

fmt.Println("Dispersal Ratelimited: ", dispersalErrors)

assert.Greater(t, dispersalErrors, 0)
assert.Equal(t, 0, retrievalErrors)

})

}
Loading