Skip to content

Commit

Permalink
Use GetStrippedOffsets in constant-arrival-rate
Browse files Browse the repository at this point in the history
  • Loading branch information
mstoykov committed Feb 20, 2020
1 parent 3e0169b commit d093a4a
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 3 deletions.
22 changes: 19 additions & 3 deletions lib/executor/constant_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"fmt"
"math"
"math/big"
"sync/atomic"
"time"

Expand Down Expand Up @@ -196,7 +197,6 @@ func (car ConstantArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC

startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(ctx, duration, gracefulStop)
defer cancel()
ticker := time.NewTicker(tickerPeriod) // the rate can't be 0 because of the validation

// Make sure the log and the progress bar have accurate information
car.logger.WithFields(logrus.Fields{
Expand Down Expand Up @@ -260,9 +260,25 @@ func (car ConstantArrivalRate) Run(ctx context.Context, out chan<- stats.SampleC
}

remainingUnplannedVUs := maxVUs - preAllocatedVUs
for {
start, offsets, _, err := car.executionState.Options.ESS.GetStripedOffsets(segment)
if err != nil {
return err
}
startTime = time.Now()
timer := time.NewTimer(time.Hour * 24)
// here the we need the not scaled one
notScaledTickerPeriod := time.Duration(
getTickerPeriod(
big.NewRat(
car.config.Rate.Int64,
int64(time.Duration(car.config.TimeUnit.Duration)),
)).Duration)

for li, gi := 0, start; ; li, gi = li+1, gi+offsets[li%len(offsets)] {
var t = notScaledTickerPeriod*time.Duration(gi) - time.Since(startTime)
timer.Reset(t)
select {
case <-ticker.C:
case <-timer.C:
select {
case vu := <-vus:
// ideally, we get the VU from the buffer without any issues
Expand Down
117 changes: 117 additions & 0 deletions lib/executor/constant_arrival_rate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ package executor

import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
null "gopkg.in/guregu/null.v3"

Expand Down Expand Up @@ -102,6 +104,121 @@ func TestConstantArrivalRateRunCorrectRate(t *testing.T) {
require.Empty(t, logHook.Drain())
}

func TestConstantArrivalRateRunCorrectTiming(t *testing.T) {
newExecutionSegmentFromString := func(str string) *lib.ExecutionSegment {
r, err := lib.NewExecutionSegmentFromString(str)
require.NoError(t, err)
return r
}

newExecutionSegmentSequenceFromString := func(str string) *lib.ExecutionSegmentSequence {
r, err := lib.NewExecutionSegmentSequenceFromString(str)
require.NoError(t, err)
return &r
}

var tests = []struct {
segment *lib.ExecutionSegment
sequence *lib.ExecutionSegmentSequence
start time.Duration
step time.Duration
}{
{
segment: newExecutionSegmentFromString("0:1/3"),
start: time.Millisecond * 00,
step: time.Millisecond * 60,
},
{
segment: newExecutionSegmentFromString("1/3:2/3"),
start: time.Millisecond * 00,
step: time.Millisecond * 60,
},
{
segment: newExecutionSegmentFromString("2/3:1"),
start: time.Millisecond * 00,
step: time.Millisecond * 60,
},
{
segment: newExecutionSegmentFromString("1/6:3/6"),
start: time.Millisecond * 00,
step: time.Millisecond * 60,
},
{
segment: newExecutionSegmentFromString("1/6:3/6"),
sequence: &lib.ExecutionSegmentSequence{},
start: time.Millisecond * 00,
step: time.Millisecond * 60,
},
// sequences
{
segment: newExecutionSegmentFromString("0:1/3"),
sequence: newExecutionSegmentSequenceFromString("0,1/3,2/3,1"),
start: time.Millisecond * 00,
step: time.Millisecond * 60,
},
{
segment: newExecutionSegmentFromString("1/3:2/3"),
sequence: newExecutionSegmentSequenceFromString("0,1/3,2/3,1"),
start: time.Millisecond * 20,
step: time.Millisecond * 60,
},
{
segment: newExecutionSegmentFromString("2/3:1"),
sequence: newExecutionSegmentSequenceFromString("0,1/3,2/3,1"),
start: time.Millisecond * 40,
step: time.Millisecond * 60,
},
}
for _, test := range tests {
test := test

t.Run(fmt.Sprintf("segment %s sequence %s", test.segment, test.sequence), func(t *testing.T) {
t.Parallel()
es := lib.NewExecutionState(lib.Options{
ExecutionSegment: test.segment,
ESS: test.sequence,
}, 10, 50)
var count int64
var startTime = time.Now()
var ctx, cancel, executor, logHook = setupExecutor(
t, getTestConstantArrivalRateConfig(), es,
simpleRunner(func(ctx context.Context) error {
current := atomic.AddInt64(&count, 1)
expectedTime := test.start + time.Duration(current-1)*test.step
assert.WithinDuration(t,
startTime.Add(expectedTime),
time.Now(),
time.Millisecond*10,
"%d expectedTime %s", current, expectedTime,
)

return nil
}),
)
defer cancel()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// check that we got around the amount of VU iterations as we would expect
var currentCount int64

for i := 0; i < 5; i++ {
time.Sleep(time.Second)
currentCount = atomic.LoadInt64(&count)
assert.InDelta(t, (i+1)*17, currentCount, 2)
}
}()
startTime = time.Now()
var engineOut = make(chan stats.SampleContainer, 1000)
err := executor.Run(ctx, engineOut)
wg.Wait()
require.NoError(t, err)
require.Empty(t, logHook.Drain())
})
}
}

func TestArrivalRateCancel(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit d093a4a

Please sign in to comment.