Skip to content

Commit

Permalink
add rate limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
pdziepak committed May 23, 2017
1 parent f9e43f9 commit 9dd3869
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 13 deletions.
13 changes: 10 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func GetWorkload(name string, threadId int, partitionOffset int) WorkloadGenerat
panic("unreachable")
}

func GetMode(name string) func(session *gocql.Session, workload WorkloadGenerator) Result {
func GetMode(name string) func(session *gocql.Session, workload WorkloadGenerator, rateLimiter RateLimiter) Result {
switch name {
case "write":
if rowsPerRequest == 1 {
Expand All @@ -96,6 +96,7 @@ func main() {
var nodes string
var clientCompression bool
var connectionCount int
var maximumRate int
var pageSize int

var partitionOffset int
Expand All @@ -110,6 +111,7 @@ func main() {
flag.BoolVar(&clientCompression, "client-compression", true, "use compression for client-coordinator communication")
flag.IntVar(&concurrency, "concurrency", 16, "number of used goroutines")
flag.IntVar(&connectionCount, "connection-count", 4, "number of connections")
flag.IntVar(&maximumRate, "max-rate", 0, "the maximum rate of outbound requests in op/s (0 for unlimited)")
flag.IntVar(&pageSize, "page-size", 1000, "page size")

flag.IntVar(&partitionCount, "partition-count", 10000, "number of partitions")
Expand Down Expand Up @@ -208,8 +210,8 @@ func main() {
}()
}

result := RunConcurrently(func(i int) Result {
return GetMode(mode)(session, GetWorkload(workload, i, partitionOffset))
result := RunConcurrently(maximumRate, func(i int, rateLimiter RateLimiter) Result {
return GetMode(mode)(session, GetWorkload(workload, i, partitionOffset), rateLimiter)
})

fmt.Println("Configuration")
Expand All @@ -231,6 +233,11 @@ func main() {
fmt.Println("Page size:\t\t", pageSize)
fmt.Println("Concurrency:\t\t", concurrency)
fmt.Println("Connections:\t\t", connectionCount)
if maximumRate > 0 {
fmt.Println("Maximum rate:\t\t", maximumRate, "op/s")
} else {
fmt.Println("Maximum rate:\t\t unlimited")
}
fmt.Println("Client compression:\t", clientCompression)

fmt.Println("\nResults")
Expand Down
77 changes: 67 additions & 10 deletions modes.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,46 @@ import (
"github.com/gocql/gocql"
)

type RateLimiter interface {
Wait()
ExpectedInterval() int64
}

type UnlimitedRateLimiter struct{}

func (*UnlimitedRateLimiter) Wait() {}

func (*UnlimitedRateLimiter) ExpectedInterval() int64 {
return 0
}

type MaximumRateLimiter struct {
Period time.Duration
LastRequest time.Time
}

func (mxrl *MaximumRateLimiter) Wait() {
nextRequest := mxrl.LastRequest.Add(mxrl.Period)
now := time.Now()
if now.Before(nextRequest) {
time.Sleep(nextRequest.Sub(now))
}
mxrl.LastRequest = time.Now()
}

func (mxrl *MaximumRateLimiter) ExpectedInterval() int64 {
return mxrl.Period.Nanoseconds()
}

func NewRateLimiter(maximumRate int, timeOffset time.Duration) RateLimiter {
if maximumRate == 0 {
return &UnlimitedRateLimiter{}
}
period := time.Duration(int64(time.Second) / int64(maximumRate))
lastRequest := time.Now().Add(timeOffset)
return &MaximumRateLimiter{period, lastRequest}
}

type Result struct {
ElapsedTime time.Duration
Operations int
Expand Down Expand Up @@ -41,15 +81,24 @@ func HandleError(err error) {
}
}

func RunConcurrently(workload func(id int) Result) MergedResult {
func RunConcurrently(maximumRate int, workload func(id int, rateLimiter RateLimiter) Result) MergedResult {
var timeOffsetUnit int64
if maximumRate != 0 {
timeOffsetUnit = int64(time.Second) / int64(maximumRate)
maximumRate /= concurrency
} else {
timeOffsetUnit = 0
}

results := make([]chan Result, concurrency)
for i := range results {
results[i] = make(chan Result, 1)
}

for i := 0; i < concurrency; i++ {
go func(i int) {
results[i] <- workload(i)
timeOffset := time.Duration(timeOffsetUnit * int64(i))
results[i] <- workload(i, NewRateLimiter(maximumRate, timeOffset))
close(results[i])
}(i)
}
Expand All @@ -73,7 +122,7 @@ func RunConcurrently(workload func(id int) Result) MergedResult {
return result
}

func DoWrites(session *gocql.Session, workload WorkloadGenerator) Result {
func DoWrites(session *gocql.Session, workload WorkloadGenerator, rateLimiter RateLimiter) Result {
value := make([]byte, clusteringRowSize)
query := session.Query("INSERT INTO " + keyspaceName + "." + tableName + " (pk, ck, v) VALUES (?, ?, ?)")

Expand All @@ -82,6 +131,8 @@ func DoWrites(session *gocql.Session, workload WorkloadGenerator) Result {

start := time.Now()
for !workload.IsDone() && atomic.LoadUint32(&stopAll) == 0 {
rateLimiter.Wait()

operations++
pk := workload.NextPartitionKey()
ck := workload.NextClusteringKey()
Expand All @@ -96,7 +147,7 @@ func DoWrites(session *gocql.Session, workload WorkloadGenerator) Result {
}

latency := requestEnd.Sub(requestStart)
err = latencyHistogram.RecordValue(latency.Nanoseconds())
err = latencyHistogram.RecordCorrectedValue(latency.Nanoseconds(), rateLimiter.ExpectedInterval())
if err != nil {
HandleError(err)
break
Expand All @@ -107,7 +158,7 @@ func DoWrites(session *gocql.Session, workload WorkloadGenerator) Result {
return Result{end.Sub(start), operations, operations, latencyHistogram}
}

func DoBatchedWrites(session *gocql.Session, workload WorkloadGenerator) Result {
func DoBatchedWrites(session *gocql.Session, workload WorkloadGenerator, rateLimiter RateLimiter) Result {
value := make([]byte, clusteringRowSize)
request := fmt.Sprintf("INSERT INTO %s.%s (pk, ck, v) VALUES (?, ?, ?)", keyspaceName, tableName)

Expand All @@ -116,6 +167,8 @@ func DoBatchedWrites(session *gocql.Session, workload WorkloadGenerator) Result

start := time.Now()
for !workload.IsDone() && atomic.LoadUint32(&stopAll) == 0 {
rateLimiter.Wait()

batch := gocql.NewBatch(gocql.UnloggedBatch)
batchSize := 0

Expand All @@ -138,7 +191,7 @@ func DoBatchedWrites(session *gocql.Session, workload WorkloadGenerator) Result
}

latency := requestEnd.Sub(requestStart)
err = result.Latency.RecordValue(latency.Nanoseconds())
err = result.Latency.RecordCorrectedValue(latency.Nanoseconds(), rateLimiter.ExpectedInterval())
if err != nil {
HandleError(err)
break
Expand All @@ -150,14 +203,16 @@ func DoBatchedWrites(session *gocql.Session, workload WorkloadGenerator) Result
return result
}

func DoCounterUpdates(session *gocql.Session, workload WorkloadGenerator) Result {
func DoCounterUpdates(session *gocql.Session, workload WorkloadGenerator, rateLimiter RateLimiter) Result {
query := session.Query("UPDATE " + keyspaceName + "." + counterTableName + " SET c1 = c1 + 1, c2 = c2 + 1, c3 = c3 + 1, c4 = c4 + 1, c5 = c5 + 1 WHERE pk = ? AND ck = ?")

var operations int
latencyHistogram := NewHistogram()

start := time.Now()
for !workload.IsDone() && atomic.LoadUint32(&stopAll) == 0 {
rateLimiter.Wait()

operations++
pk := workload.NextPartitionKey()
ck := workload.NextClusteringKey()
Expand All @@ -172,7 +227,7 @@ func DoCounterUpdates(session *gocql.Session, workload WorkloadGenerator) Result
}

latency := requestEnd.Sub(requestStart)
err = latencyHistogram.RecordValue(latency.Nanoseconds())
err = latencyHistogram.RecordCorrectedValue(latency.Nanoseconds(), rateLimiter.ExpectedInterval())
if err != nil {
HandleError(err)
break
Expand All @@ -183,7 +238,7 @@ func DoCounterUpdates(session *gocql.Session, workload WorkloadGenerator) Result
return Result{end.Sub(start), operations, operations, latencyHistogram}
}

func DoReads(session *gocql.Session, workload WorkloadGenerator) Result {
func DoReads(session *gocql.Session, workload WorkloadGenerator, rateLimiter RateLimiter) Result {
var request string
if inRestriction {
arr := make([]string, rowsPerRequest)
Expand All @@ -203,6 +258,8 @@ func DoReads(session *gocql.Session, workload WorkloadGenerator) Result {

start := time.Now()
for !workload.IsDone() && atomic.LoadUint32(&stopAll) == 0 {
rateLimiter.Wait()

result.Operations++
pk := workload.NextPartitionKey()

Expand Down Expand Up @@ -241,7 +298,7 @@ func DoReads(session *gocql.Session, workload WorkloadGenerator) Result {
}

latency := requestEnd.Sub(requestStart)
err = result.Latency.RecordValue(latency.Nanoseconds())
err = result.Latency.RecordCorrectedValue(latency.Nanoseconds(), rateLimiter.ExpectedInterval())
if err != nil {
HandleError(err)
break
Expand Down

0 comments on commit 9dd3869

Please sign in to comment.