Skip to content

Commit

Permalink
k6runner/http: retry requests
Browse files Browse the repository at this point in the history
  • Loading branch information
nadiamoe committed Jun 18, 2024
1 parent 41040ea commit db844ee
Show file tree
Hide file tree
Showing 2 changed files with 324 additions and 23 deletions.
132 changes: 112 additions & 20 deletions internal/k6runner/k6runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
"math/rand"
"net/http"
"os/exec"
"strings"
Expand Down Expand Up @@ -56,8 +57,9 @@ func New(opts RunnerOpts) Runner {

if strings.HasPrefix(opts.Uri, "http") {
r = &HttpRunner{
url: opts.Uri,
logger: &logger,
url: opts.Uri,
logger: &logger,
graceTime: 20 * time.Second,
}
} else {
r = &LocalRunner{
Expand Down Expand Up @@ -317,6 +319,8 @@ NEXT_RECORD:
type HttpRunner struct {
url string
logger *zerolog.Logger
// graceTime tells the HttpRunner how much time to add do the script timeout to form the request timeout.
graceTime time.Duration
}

type requestError struct {
Expand Down Expand Up @@ -347,23 +351,103 @@ func (r HttpRunner) WithLogger(logger *zerolog.Logger) Runner {
var ErrUnexpectedStatus = errors.New("unexpected status code")

func (r HttpRunner) Run(ctx context.Context, script Script) (*RunResponse, error) {
deadline, hasDeadline := ctx.Deadline()
var cancel context.CancelFunc
if !hasDeadline {
r.logger.Error().
Msg("k6 runner does not have a deadline for all retries. This is a bug. Defaulting to double frequency to avoid retrying forever")

defaultAllRetriesTimeout := time.Duration(script.Settings.Timeout) * time.Millisecond * 2
ctx, cancel = context.WithTimeout(ctx, defaultAllRetriesTimeout)
defer cancel()
}

if tud := time.Until(deadline); tud < time.Duration(script.Settings.Timeout)*time.Millisecond*2 {
r.logger.Debug().
Str("timeUntilNext", tud.String()).
Str("timeout", (time.Duration(script.Settings.Timeout) * time.Millisecond).String()).
Msg("time until next execution is too close to script timeout, there might not be room for retries")
}

wait := time.Second
var response *RunResponse
for {
start := time.Now()

var err error
response, err = r.request(ctx, script)
if err == nil {
r.logger.Debug().Bytes("metrics", response.Metrics).Bytes("logs", response.Logs).Msg("script result")
return response, nil
}

if !errors.Is(err, errRetryable) {
return nil, err
}

// Wait, but subtract the amount of time we've already waited as part of the request timeout.
// We do this because these requests have huge timeouts, and we expect the case of them timing out to be fairly
// common, so we don't want to add more time on top of that.
waitRemaining := wait - time.Since(start)
r.logger.Debug().Err(err).Dur("after", waitRemaining).Msg("retrying retryable error")

waitTimer := time.NewTimer(waitRemaining)
select {
case <-ctx.Done():
waitTimer.Stop()
return nil, ctx.Err()
case <-waitTimer.C:
}

// Backoff linearly, adding some jitter.
wait += time.Second + time.Duration(rand.Intn(1000))*time.Millisecond
}
}

// errRetryable indicates that an error is retryable. It is typically joined with another error.
var errRetryable = errors.New("retryable")

func (r HttpRunner) request(ctx context.Context, script Script) (*RunResponse, error) {
checkTimeout := time.Duration(script.Settings.Timeout) * time.Millisecond
if checkTimeout == 0 {
return nil, ErrNoTimeout
}

reqBody, err := json.Marshal(script)
if err != nil {
return nil, fmt.Errorf("encoding script: %w", err)
}
// requestTimeout should be noticeably larger than [Script.Settings.Timeout], to account for added latencies in the
// system such as network, i/o, seralization, queue wait time, etc. that take place after and before the script is
// ran.
// t0 t1 t2 t3
// |--- Queue wait ---|-------------- k6 run -----------------|--- Response ---|
// checkTimeout = t2 - t1
// requestTimeout = t3 - t0
requestTimeout := checkTimeout + r.graceTime

// Hint runners when we're about to drop this request. If a runner determines the script is unlikely to finish
// before this time, it will drop the request.
// Runners sanitize this value by clamping it to a maximum and minimum value to protect themselves from misbehaving
// clients.
notAfter := time.Now().Add(requestTimeout)

// The context above carries the check timeout, which will be eventually passed to k6 by the runner at the other end
// of this request. To account for network overhead, we create a different context with an extra second of timeout,
// which adds some grace time to account for the network/system latency of the http request.
reqCtx, cancel := context.WithTimeout(context.Background(), checkTimeout+time.Second)
ctx, cancel := context.WithDeadline(ctx, notAfter)
defer cancel()

req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, r.url, bytes.NewReader(reqBody))
// Decorate the script request with the NotAfter hint.
// NotAfter is a scheduling hint for external runners, which indicates them when the client will cancel the request.
// This allows runners to not waste time running scripts which will not complete before the client gives up on the
// request.
runRequest := struct {
Script `json:",inline"`
NotAfter time.Time `json:"notAfter"`
}{
Script: script,
NotAfter: notAfter,
}

reqBody, err := json.Marshal(runRequest)
if err != nil {
return nil, fmt.Errorf("encoding script: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.url, bytes.NewReader(reqBody))
if err != nil {
return nil, fmt.Errorf("building request: %w", err)
}
Expand All @@ -373,7 +457,9 @@ func (r HttpRunner) Run(ctx context.Context, script Script) (*RunResponse, error
resp, err := http.DefaultClient.Do(req)
if err != nil {
r.logger.Error().Err(err).Msg("sending request")
return nil, fmt.Errorf("running script: %w", err)

// Any error making a request is retryable.
return nil, errors.Join(errRetryable, fmt.Errorf("making request: %w", err))
}

defer resp.Body.Close()
Expand All @@ -383,20 +469,20 @@ func (r HttpRunner) Run(ctx context.Context, script Script) (*RunResponse, error
// These are status code that come with a machine-readable response. The response may contain an error, which is
// handled later.
// See: https://github.com/grafana/sm-k6-runner/blob/main/internal/mq/proxy.go#L215

default:
return nil, fmt.Errorf("%w %d", ErrUnexpectedStatus, resp.StatusCode)
// Unexpected statuses are retryable.
return nil, errors.Join(errRetryable, fmt.Errorf("%w %d", ErrUnexpectedStatus, resp.StatusCode))
}

var result RunResponse
err = json.NewDecoder(resp.Body).Decode(&result)
var response RunResponse
err = json.NewDecoder(resp.Body).Decode(&response)
if err != nil {
r.logger.Error().Err(err).Msg("decoding script result")
return nil, fmt.Errorf("decoding script result: %w", err)
}

r.logger.Debug().Bytes("metrics", result.Metrics).Bytes("logs", result.Logs).Msg("script result")

return &result, nil
return &response, nil
}

type LocalRunner struct {
Expand All @@ -417,6 +503,11 @@ func (r LocalRunner) WithLogger(logger *zerolog.Logger) Runner {
func (r LocalRunner) Run(ctx context.Context, script Script) (*RunResponse, error) {
afs := afero.Afero{Fs: r.fs}

checkTimeout := time.Duration(script.Settings.Timeout) * time.Millisecond
if checkTimeout == 0 {
return nil, ErrNoTimeout
}

workdir, err := afs.TempDir("", "k6-runner")
if err != nil {
return nil, fmt.Errorf("cannot create temporary directory: %w", err)
Expand Down Expand Up @@ -452,7 +543,9 @@ func (r LocalRunner) Run(ctx context.Context, script Script) (*RunResponse, erro
return nil, fmt.Errorf("cannot find k6 executable: %w", err)
}

timeout := time.Duration(script.Settings.Timeout) * time.Millisecond
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, checkTimeout)
defer cancel()

// #nosec G204 -- the variables are not user-controlled
cmd := exec.CommandContext(
Expand All @@ -464,7 +557,6 @@ func (r LocalRunner) Run(ctx context.Context, script Script) (*RunResponse, erro
"--log-output", "file="+logsFn,
"--vus", "1",
"--iterations", "1",
"--duration", timeout.String(),
"--max-redirects", "10",
"--batch", "10",
"--batch-per-host", "4",
Expand Down
Loading

0 comments on commit db844ee

Please sign in to comment.