Skip to content

Commit

Permalink
create k6runner.Script and make probers aware of it
Browse files Browse the repository at this point in the history
  • Loading branch information
nadiamoe committed Jun 11, 2024
1 parent 149daae commit 70b2ca8
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 119 deletions.
2 changes: 1 addition & 1 deletion internal/checks/checks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func (noopRunner) WithLogger(logger *zerolog.Logger) k6runner.Runner {
return r
}

func (noopRunner) Run(ctx context.Context, script []byte) (*k6runner.RunResponse, error) {
func (noopRunner) Run(ctx context.Context, script k6runner.Script) (*k6runner.RunResponse, error) {
return &k6runner.RunResponse{}, nil
}

Expand Down
61 changes: 27 additions & 34 deletions internal/k6runner/k6runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,24 @@ import (
"github.com/spf13/afero"
)

// Script is a k6 script that a runner is able to run, with some added instructions for that runner to act on.
type Script struct {
Script []byte `json:"script"`
Settings Settings `json:"settings"`
// TODO: Add Metadata and Features.
}

type Settings struct {
// Timeout for k6 run, in milliseconds. This value is a configuration value for remote runners, which will instruct
// them to return an error if the operation takes longer than this time to complete. Clients should expect that
// requests to remote runners may take longer than this value due to network and other latencies, and thus clients
// should wait additional time before aborting outgoing requests.
Timeout int64 `json:"timeout"`
}

type Runner interface {
WithLogger(logger *zerolog.Logger) Runner
Run(ctx context.Context, script []byte) (*RunResponse, error)
Run(ctx context.Context, script Script) (*RunResponse, error)
}

type RunnerOpts struct {
Expand Down Expand Up @@ -61,12 +76,13 @@ func (r *LocalRunner) withOpts(opts RunnerOpts) {
}
}

// Processor runs a script with a runner and parses the k6 output.
type Processor struct {
runner Runner
script []byte
script Script
}

func NewProcessor(script []byte, k6runner Runner) (*Processor, error) {
func NewProcessor(script Script, k6runner Runner) (*Processor, error) {
r := Processor{
runner: k6runner,
script: script,
Expand Down Expand Up @@ -311,15 +327,6 @@ func (r requestError) Error() string {
return fmt.Sprintf("%s: %s", r.Err, r.Message)
}

type Settings struct {
Timeout int64 `json:"timeout"`
}

type RunRequest struct {
Script []byte `json:"script"`
Settings Settings `json:"settings"`
}

type RunResponse struct {
Error string `json:"error,omitempty"`
ErrorCode string `json:"errorCode,omitempty"`
Expand All @@ -336,17 +343,12 @@ func (r HttpRunner) WithLogger(logger *zerolog.Logger) Runner {

var ErrUnexpectedStatus = errors.New("unexpected status code")

func (r HttpRunner) Run(ctx context.Context, script []byte) (*RunResponse, error) {
k6Timeout := getTimeout(ctx)
func (r HttpRunner) Run(ctx context.Context, script Script) (*RunResponse, error) {
k6Timeout := time.Duration(script.Settings.Timeout) * time.Millisecond

reqBody, err := json.Marshal(&RunRequest{
Script: script,
Settings: Settings{
Timeout: k6Timeout.Milliseconds(),
},
})
reqBody, err := json.Marshal(script)
if err != nil {
return nil, fmt.Errorf("running script: %w", err)
return nil, fmt.Errorf("encoding script: %w", err)
}

// The context above carries the check timeout, which will be eventually passed to k6 by the runner at the other end
Expand Down Expand Up @@ -406,7 +408,7 @@ func (r LocalRunner) WithLogger(logger *zerolog.Logger) Runner {
}
}

func (r LocalRunner) Run(ctx context.Context, script []byte) (*RunResponse, error) {
func (r LocalRunner) Run(ctx context.Context, script Script) (*RunResponse, error) {
afs := afero.Afero{Fs: r.fs}

workdir, err := afs.TempDir("", "k6-runner")
Expand Down Expand Up @@ -435,7 +437,7 @@ func (r LocalRunner) Run(ctx context.Context, script []byte) (*RunResponse, erro
return nil, fmt.Errorf("cannot obtain temporary script filename: %w", err)
}

if err := afs.WriteFile(scriptFn, script, 0o644); err != nil {
if err := afs.WriteFile(scriptFn, script.Script, 0o644); err != nil {
return nil, fmt.Errorf("cannot write temporary script file: %w", err)
}

Expand All @@ -444,7 +446,7 @@ func (r LocalRunner) Run(ctx context.Context, script []byte) (*RunResponse, erro
return nil, fmt.Errorf("cannot find k6 executable: %w", err)
}

timeout := getTimeout(ctx)
timeout := time.Duration(script.Settings.Timeout) * time.Millisecond

// #nosec G204 -- the variables are not user-controlled
cmd := exec.CommandContext(
Expand Down Expand Up @@ -483,7 +485,7 @@ func (r LocalRunner) Run(ctx context.Context, script []byte) (*RunResponse, erro

start := time.Now()

r.logger.Info().Str("command", cmd.String()).Bytes("script", script).Msg("running k6 script")
r.logger.Info().Str("command", cmd.String()).Bytes("script", script.Script).Msg("running k6 script")

if err := cmd.Run(); err != nil {
r.logger.Error().Err(err).Str("stdout", stdout.String()).Str("stderr", stderr.String()).Msg("k6 exited with error")
Expand Down Expand Up @@ -524,12 +526,3 @@ func mktemp(fs afero.Fs, dir, pattern string) (string, error) {
}
return f.Name(), nil
}

func getTimeout(ctx context.Context) time.Duration {
deadline, ok := ctx.Deadline()
if !ok {
return 10 * time.Second
}

return time.Until(deadline)
}
61 changes: 29 additions & 32 deletions internal/k6runner/k6runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@ func TestNew(t *testing.T) {

func TestNewScript(t *testing.T) {
runner := New(RunnerOpts{Uri: "k6"})
src := []byte("test")
script, err := NewProcessor(src, runner)
script := Script{
Script: []byte("test"),
}

processor, err := NewProcessor(script, runner)
require.NoError(t, err)
require.NotNil(t, script)
require.Equal(t, src, script.script)
require.Equal(t, runner, script.runner)
require.NotNil(t, processor)
require.Equal(t, script, processor.script)
require.Equal(t, runner, processor.runner)
}

func TestScriptRun(t *testing.T) {
Expand All @@ -51,9 +54,9 @@ func TestScriptRun(t *testing.T) {
logs: testhelper.MustReadFile(t, "testdata/test.log"),
}

script, err := NewProcessor(testhelper.MustReadFile(t, "testdata/test.js"), &runner)
processor, err := NewProcessor(Script{Script: testhelper.MustReadFile(t, "testdata/test.js")}, &runner)
require.NoError(t, err)
require.NotNil(t, script)
require.NotNil(t, processor)

var (
registry = prometheus.NewRegistry()
Expand All @@ -68,33 +71,28 @@ func TestScriptRun(t *testing.T) {
// We already know tha parsing the metrics and the logs is working, so
// we are only interested in verifying that the script runs without
// errors.
success, err := script.Run(ctx, registry, &logger, zlogger)
success, err := processor.Run(ctx, registry, &logger, zlogger)
require.NoError(t, err)
require.True(t, success)
}

func TestHttpRunnerRun(t *testing.T) {
scriptSrc := testhelper.MustReadFile(t, "testdata/test.js")
timeout := 1 * time.Second
script := Script{
Script: testhelper.MustReadFile(t, "testdata/test.js"),
Settings: Settings{
Timeout: 1000,
},
}

mux := http.NewServeMux()
mux.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, http.MethodPost, r.Method)
require.Equal(t, "application/json", r.Header.Get("Content-Type"))

var req RunRequest
var req Script
err := json.NewDecoder(r.Body).Decode(&req)
require.NoError(t, err)
require.Equal(t, scriptSrc, req.Script)
// The timeout in the request is not going to be exactly the
// original timeout because computers need some time to process
// data, and the timeout is set based on the remaining time
// until the deadline and the clock starts ticking as soon as
// the context is created. Check that the actual timeout is not
// greater than the expected value and that it's within 1% of
// the expected value.
require.LessOrEqual(t, req.Settings.Timeout, timeout.Milliseconds())
require.InEpsilon(t, timeout.Milliseconds(), req.Settings.Timeout, 0.01)
require.Equal(t, script, req)

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
Expand All @@ -119,18 +117,17 @@ func TestHttpRunnerRun(t *testing.T) {
ctx, cancel := testhelper.Context(ctx, t)
t.Cleanup(cancel)

// By adding a timeout to the context passed to Run, the expectation is
// that the runner extracts the timeout from it and sets the
// corresponding field accordingly.
ctx, cancel = context.WithTimeout(ctx, timeout)
t.Cleanup(cancel)

_, err := runner.Run(ctx, scriptSrc)
_, err := runner.Run(ctx, script)
require.NoError(t, err)
}

func TestHttpRunnerRunError(t *testing.T) {
scriptSrc := testhelper.MustReadFile(t, "testdata/test.js")
script := Script{
Script: testhelper.MustReadFile(t, "testdata/test.js"),
Settings: Settings{
Timeout: 1000,
},
}

mux := http.NewServeMux()
mux.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -160,7 +157,7 @@ func TestHttpRunnerRunError(t *testing.T) {
ctx, cancel := testhelper.Context(context.Background(), t)
t.Cleanup(cancel)

_, err := runner.Run(ctx, scriptSrc)
_, err := runner.Run(ctx, script)
require.Error(t, err)
}

Expand Down Expand Up @@ -278,7 +275,7 @@ func TestScriptHTTPRun(t *testing.T) {
t.Cleanup(srv.Close)

runner := New(RunnerOpts{Uri: srv.URL + "/run"})
script, err := NewProcessor([]byte("tee-hee"), runner)
script, err := NewProcessor(Script{Script: []byte("tee-hee")}, runner)
require.NoError(t, err)

baseCtx, baseCancel := context.WithTimeout(context.Background(), time.Second)
Expand Down Expand Up @@ -307,7 +304,7 @@ type testRunner struct {

var _ Runner = &testRunner{}

func (r *testRunner) Run(ctx context.Context, script []byte) (*RunResponse, error) {
func (r *testRunner) Run(ctx context.Context, script Script) (*RunResponse, error) {
return &RunResponse{
Metrics: r.metrics,
Logs: r.logs,
Expand Down
40 changes: 19 additions & 21 deletions internal/prober/multihttp/multihttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"net/http"
"strings"
"time"

"github.com/grafana/synthetic-monitoring-agent/internal/k6runner"
"github.com/grafana/synthetic-monitoring-agent/internal/prober/logger"
Expand All @@ -19,14 +18,14 @@ const proberName = "multihttp"
var errUnsupportedCheck = errors.New("unsupported check")

type Module struct {
Prober string
Timeout time.Duration
Prober string
Script k6runner.Script
}

type Prober struct {
logger zerolog.Logger
config Module
script *k6runner.Processor
logger zerolog.Logger
module Module
processor *k6runner.Processor
}

func NewProber(ctx context.Context, check sm.Check, logger zerolog.Logger, runner k6runner.Runner, reservedHeaders http.Header) (Prober, error) {
Expand All @@ -44,16 +43,23 @@ func NewProber(ctx context.Context, check sm.Check, logger zerolog.Logger, runne
augmentHttpHeaders(&check, reservedHeaders)
}

p.config = settingsToModule(check.Settings.Multihttp)
timeout := time.Duration(check.Timeout) * time.Millisecond
p.config.Timeout = timeout

script, err := settingsToScript(check.Settings.Multihttp)
if err != nil {
return p, err
}

k6Script, err := k6runner.NewProcessor(script, runner)
p.module = Module{
Prober: sm.CheckTypeMultiHttp.String(),
Script: k6runner.Script{
Script: script,
Settings: k6runner.Settings{
Timeout: check.Timeout,
},
// TODO: Add metadata & features here.
},
}

processor, err := k6runner.NewProcessor(p.module.Script, runner)
if err != nil {
return p, err
}
Expand All @@ -66,7 +72,7 @@ func NewProber(ctx context.Context, check sm.Check, logger zerolog.Logger, runne
Bytes("script", script).
Msg("created prober")

p.script = k6Script
p.processor = processor
p.logger = logger

return p, nil
Expand All @@ -77,7 +83,7 @@ func (p Prober) Name() string {
}

func (p Prober) Probe(ctx context.Context, target string, registry *prometheus.Registry, logger logger.Logger) bool {
success, err := p.script.Run(ctx, registry, logger, p.logger)
success, err := p.processor.Run(ctx, registry, logger, p.logger)
if err != nil {
p.logger.Warn().Err(err).Msg("running probe")
return false
Expand All @@ -86,14 +92,6 @@ func (p Prober) Probe(ctx context.Context, target string, registry *prometheus.R
return success
}

func settingsToModule(settings *sm.MultiHttpSettings) Module {
var m Module

m.Prober = sm.CheckTypeMultiHttp.String()

return m
}

// Overrides any user-provided headers with our own augmented values
// for 'reserved' headers.
func augmentHttpHeaders(check *sm.Check, reservedHeaders http.Header) {
Expand Down
6 changes: 3 additions & 3 deletions internal/prober/multihttp/multihttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ func TestNewProber(t *testing.T) {
require.Equal(t, requestHeaders[0].Value, fmt.Sprintf("%d-%d", checkId, checkId))

require.NoError(t, err)
require.Equal(t, proberName, p.config.Prober)
require.Equal(t, 10*time.Second, p.config.Timeout)
require.Equal(t, proberName, p.module.Prober)
require.Equal(t, 10*time.Second, time.Duration(p.module.Script.Settings.Timeout)*time.Millisecond)
// TODO: check script
})
}
Expand All @@ -145,6 +145,6 @@ func (noopRunner) WithLogger(logger *zerolog.Logger) k6runner.Runner {
return r
}

func (noopRunner) Run(ctx context.Context, script []byte) (*k6runner.RunResponse, error) {
func (noopRunner) Run(ctx context.Context, script k6runner.Script) (*k6runner.RunResponse, error) {
return &k6runner.RunResponse{}, nil
}
Loading

0 comments on commit 70b2ca8

Please sign in to comment.