From 9dd38697b97edd540f0f19e199d78f01a7afac49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Tue, 23 May 2017 12:47:29 +0100 Subject: [PATCH] add rate limiter --- main.go | 13 +++++++--- modes.go | 77 ++++++++++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 77 insertions(+), 13 deletions(-) diff --git a/main.go b/main.go index 13f3201..837b72a 100644 --- a/main.go +++ b/main.go @@ -70,7 +70,7 @@ func GetWorkload(name string, threadId int, partitionOffset int) WorkloadGenerat panic("unreachable") } -func GetMode(name string) func(session *gocql.Session, workload WorkloadGenerator) Result { +func GetMode(name string) func(session *gocql.Session, workload WorkloadGenerator, rateLimiter RateLimiter) Result { switch name { case "write": if rowsPerRequest == 1 { @@ -96,6 +96,7 @@ func main() { var nodes string var clientCompression bool var connectionCount int + var maximumRate int var pageSize int var partitionOffset int @@ -110,6 +111,7 @@ func main() { flag.BoolVar(&clientCompression, "client-compression", true, "use compression for client-coordinator communication") flag.IntVar(&concurrency, "concurrency", 16, "number of used goroutines") flag.IntVar(&connectionCount, "connection-count", 4, "number of connections") + flag.IntVar(&maximumRate, "max-rate", 0, "the maximum rate of outbound requests in op/s (0 for unlimited)") flag.IntVar(&pageSize, "page-size", 1000, "page size") flag.IntVar(&partitionCount, "partition-count", 10000, "number of partitions") @@ -208,8 +210,8 @@ func main() { }() } - result := RunConcurrently(func(i int) Result { - return GetMode(mode)(session, GetWorkload(workload, i, partitionOffset)) + result := RunConcurrently(maximumRate, func(i int, rateLimiter RateLimiter) Result { + return GetMode(mode)(session, GetWorkload(workload, i, partitionOffset), rateLimiter) }) fmt.Println("Configuration") @@ -231,6 +233,11 @@ func main() { fmt.Println("Page size:\t\t", pageSize) fmt.Println("Concurrency:\t\t", concurrency) fmt.Println("Connections:\t\t", connectionCount) + if maximumRate > 0 { + fmt.Println("Maximum rate:\t\t", maximumRate, "op/s") + } else { + fmt.Println("Maximum rate:\t\t unlimited") + } fmt.Println("Client compression:\t", clientCompression) fmt.Println("\nResults") diff --git a/modes.go b/modes.go index d34893e..04dc2b9 100644 --- a/modes.go +++ b/modes.go @@ -11,6 +11,46 @@ import ( "github.com/gocql/gocql" ) +type RateLimiter interface { + Wait() + ExpectedInterval() int64 +} + +type UnlimitedRateLimiter struct{} + +func (*UnlimitedRateLimiter) Wait() {} + +func (*UnlimitedRateLimiter) ExpectedInterval() int64 { + return 0 +} + +type MaximumRateLimiter struct { + Period time.Duration + LastRequest time.Time +} + +func (mxrl *MaximumRateLimiter) Wait() { + nextRequest := mxrl.LastRequest.Add(mxrl.Period) + now := time.Now() + if now.Before(nextRequest) { + time.Sleep(nextRequest.Sub(now)) + } + mxrl.LastRequest = time.Now() +} + +func (mxrl *MaximumRateLimiter) ExpectedInterval() int64 { + return mxrl.Period.Nanoseconds() +} + +func NewRateLimiter(maximumRate int, timeOffset time.Duration) RateLimiter { + if maximumRate == 0 { + return &UnlimitedRateLimiter{} + } + period := time.Duration(int64(time.Second) / int64(maximumRate)) + lastRequest := time.Now().Add(timeOffset) + return &MaximumRateLimiter{period, lastRequest} +} + type Result struct { ElapsedTime time.Duration Operations int @@ -41,7 +81,15 @@ func HandleError(err error) { } } -func RunConcurrently(workload func(id int) Result) MergedResult { +func RunConcurrently(maximumRate int, workload func(id int, rateLimiter RateLimiter) Result) MergedResult { + var timeOffsetUnit int64 + if maximumRate != 0 { + timeOffsetUnit = int64(time.Second) / int64(maximumRate) + maximumRate /= concurrency + } else { + timeOffsetUnit = 0 + } + results := make([]chan Result, concurrency) for i := range results { results[i] = make(chan Result, 1) @@ -49,7 +97,8 @@ func RunConcurrently(workload func(id int) Result) MergedResult { for i := 0; i < concurrency; i++ { go func(i int) { - results[i] <- workload(i) + timeOffset := time.Duration(timeOffsetUnit * int64(i)) + results[i] <- workload(i, NewRateLimiter(maximumRate, timeOffset)) close(results[i]) }(i) } @@ -73,7 +122,7 @@ func RunConcurrently(workload func(id int) Result) MergedResult { return result } -func DoWrites(session *gocql.Session, workload WorkloadGenerator) Result { +func DoWrites(session *gocql.Session, workload WorkloadGenerator, rateLimiter RateLimiter) Result { value := make([]byte, clusteringRowSize) query := session.Query("INSERT INTO " + keyspaceName + "." + tableName + " (pk, ck, v) VALUES (?, ?, ?)") @@ -82,6 +131,8 @@ func DoWrites(session *gocql.Session, workload WorkloadGenerator) Result { start := time.Now() for !workload.IsDone() && atomic.LoadUint32(&stopAll) == 0 { + rateLimiter.Wait() + operations++ pk := workload.NextPartitionKey() ck := workload.NextClusteringKey() @@ -96,7 +147,7 @@ func DoWrites(session *gocql.Session, workload WorkloadGenerator) Result { } latency := requestEnd.Sub(requestStart) - err = latencyHistogram.RecordValue(latency.Nanoseconds()) + err = latencyHistogram.RecordCorrectedValue(latency.Nanoseconds(), rateLimiter.ExpectedInterval()) if err != nil { HandleError(err) break @@ -107,7 +158,7 @@ func DoWrites(session *gocql.Session, workload WorkloadGenerator) Result { return Result{end.Sub(start), operations, operations, latencyHistogram} } -func DoBatchedWrites(session *gocql.Session, workload WorkloadGenerator) Result { +func DoBatchedWrites(session *gocql.Session, workload WorkloadGenerator, rateLimiter RateLimiter) Result { value := make([]byte, clusteringRowSize) request := fmt.Sprintf("INSERT INTO %s.%s (pk, ck, v) VALUES (?, ?, ?)", keyspaceName, tableName) @@ -116,6 +167,8 @@ func DoBatchedWrites(session *gocql.Session, workload WorkloadGenerator) Result start := time.Now() for !workload.IsDone() && atomic.LoadUint32(&stopAll) == 0 { + rateLimiter.Wait() + batch := gocql.NewBatch(gocql.UnloggedBatch) batchSize := 0 @@ -138,7 +191,7 @@ func DoBatchedWrites(session *gocql.Session, workload WorkloadGenerator) Result } latency := requestEnd.Sub(requestStart) - err = result.Latency.RecordValue(latency.Nanoseconds()) + err = result.Latency.RecordCorrectedValue(latency.Nanoseconds(), rateLimiter.ExpectedInterval()) if err != nil { HandleError(err) break @@ -150,7 +203,7 @@ func DoBatchedWrites(session *gocql.Session, workload WorkloadGenerator) Result return result } -func DoCounterUpdates(session *gocql.Session, workload WorkloadGenerator) Result { +func DoCounterUpdates(session *gocql.Session, workload WorkloadGenerator, rateLimiter RateLimiter) Result { query := session.Query("UPDATE " + keyspaceName + "." + counterTableName + " SET c1 = c1 + 1, c2 = c2 + 1, c3 = c3 + 1, c4 = c4 + 1, c5 = c5 + 1 WHERE pk = ? AND ck = ?") var operations int @@ -158,6 +211,8 @@ func DoCounterUpdates(session *gocql.Session, workload WorkloadGenerator) Result start := time.Now() for !workload.IsDone() && atomic.LoadUint32(&stopAll) == 0 { + rateLimiter.Wait() + operations++ pk := workload.NextPartitionKey() ck := workload.NextClusteringKey() @@ -172,7 +227,7 @@ func DoCounterUpdates(session *gocql.Session, workload WorkloadGenerator) Result } latency := requestEnd.Sub(requestStart) - err = latencyHistogram.RecordValue(latency.Nanoseconds()) + err = latencyHistogram.RecordCorrectedValue(latency.Nanoseconds(), rateLimiter.ExpectedInterval()) if err != nil { HandleError(err) break @@ -183,7 +238,7 @@ func DoCounterUpdates(session *gocql.Session, workload WorkloadGenerator) Result return Result{end.Sub(start), operations, operations, latencyHistogram} } -func DoReads(session *gocql.Session, workload WorkloadGenerator) Result { +func DoReads(session *gocql.Session, workload WorkloadGenerator, rateLimiter RateLimiter) Result { var request string if inRestriction { arr := make([]string, rowsPerRequest) @@ -203,6 +258,8 @@ func DoReads(session *gocql.Session, workload WorkloadGenerator) Result { start := time.Now() for !workload.IsDone() && atomic.LoadUint32(&stopAll) == 0 { + rateLimiter.Wait() + result.Operations++ pk := workload.NextPartitionKey() @@ -241,7 +298,7 @@ func DoReads(session *gocql.Session, workload WorkloadGenerator) Result { } latency := requestEnd.Sub(requestStart) - err = result.Latency.RecordValue(latency.Nanoseconds()) + err = result.Latency.RecordCorrectedValue(latency.Nanoseconds(), rateLimiter.ExpectedInterval()) if err != nil { HandleError(err) break