From 70b2ca88cb6113c504868005a721f148fffce67d Mon Sep 17 00:00:00 2001 From: Ro Santalla Date: Tue, 11 Jun 2024 08:25:44 +0200 Subject: [PATCH] create k6runner.Script and make probers aware of it --- internal/checks/checks_test.go | 2 +- internal/k6runner/k6runner.go | 61 +++++++++------------ internal/k6runner/k6runner_test.go | 61 ++++++++++----------- internal/prober/multihttp/multihttp.go | 40 +++++++------- internal/prober/multihttp/multihttp_test.go | 6 +- internal/prober/scripted/scripted.go | 41 ++++++-------- internal/prober/scripted/scripted_test.go | 8 +-- internal/scraper/scraper_test.go | 2 +- 8 files changed, 102 insertions(+), 119 deletions(-) diff --git a/internal/checks/checks_test.go b/internal/checks/checks_test.go index a208bd06..0a4a422a 100644 --- a/internal/checks/checks_test.go +++ b/internal/checks/checks_test.go @@ -419,7 +419,7 @@ func (noopRunner) WithLogger(logger *zerolog.Logger) k6runner.Runner { return r } -func (noopRunner) Run(ctx context.Context, script []byte) (*k6runner.RunResponse, error) { +func (noopRunner) Run(ctx context.Context, script k6runner.Script) (*k6runner.RunResponse, error) { return &k6runner.RunResponse{}, nil } diff --git a/internal/k6runner/k6runner.go b/internal/k6runner/k6runner.go index cb286836..02fe25bb 100644 --- a/internal/k6runner/k6runner.go +++ b/internal/k6runner/k6runner.go @@ -22,9 +22,24 @@ import ( "github.com/spf13/afero" ) +// Script is a k6 script that a runner is able to run, with some added instructions for that runner to act on. +type Script struct { + Script []byte `json:"script"` + Settings Settings `json:"settings"` + // TODO: Add Metadata and Features. +} + +type Settings struct { + // Timeout for k6 run, in milliseconds. This value is a configuration value for remote runners, which will instruct + // them to return an error if the operation takes longer than this time to complete. Clients should expect that + // requests to remote runners may take longer than this value due to network and other latencies, and thus clients + // should wait additional time before aborting outgoing requests. + Timeout int64 `json:"timeout"` +} + type Runner interface { WithLogger(logger *zerolog.Logger) Runner - Run(ctx context.Context, script []byte) (*RunResponse, error) + Run(ctx context.Context, script Script) (*RunResponse, error) } type RunnerOpts struct { @@ -61,12 +76,13 @@ func (r *LocalRunner) withOpts(opts RunnerOpts) { } } +// Processor runs a script with a runner and parses the k6 output. type Processor struct { runner Runner - script []byte + script Script } -func NewProcessor(script []byte, k6runner Runner) (*Processor, error) { +func NewProcessor(script Script, k6runner Runner) (*Processor, error) { r := Processor{ runner: k6runner, script: script, @@ -311,15 +327,6 @@ func (r requestError) Error() string { return fmt.Sprintf("%s: %s", r.Err, r.Message) } -type Settings struct { - Timeout int64 `json:"timeout"` -} - -type RunRequest struct { - Script []byte `json:"script"` - Settings Settings `json:"settings"` -} - type RunResponse struct { Error string `json:"error,omitempty"` ErrorCode string `json:"errorCode,omitempty"` @@ -336,17 +343,12 @@ func (r HttpRunner) WithLogger(logger *zerolog.Logger) Runner { var ErrUnexpectedStatus = errors.New("unexpected status code") -func (r HttpRunner) Run(ctx context.Context, script []byte) (*RunResponse, error) { - k6Timeout := getTimeout(ctx) +func (r HttpRunner) Run(ctx context.Context, script Script) (*RunResponse, error) { + k6Timeout := time.Duration(script.Settings.Timeout) * time.Millisecond - reqBody, err := json.Marshal(&RunRequest{ - Script: script, - Settings: Settings{ - Timeout: k6Timeout.Milliseconds(), - }, - }) + reqBody, err := json.Marshal(script) if err != nil { - return nil, fmt.Errorf("running script: %w", err) + return nil, fmt.Errorf("encoding script: %w", err) } // The context above carries the check timeout, which will be eventually passed to k6 by the runner at the other end @@ -406,7 +408,7 @@ func (r LocalRunner) WithLogger(logger *zerolog.Logger) Runner { } } -func (r LocalRunner) Run(ctx context.Context, script []byte) (*RunResponse, error) { +func (r LocalRunner) Run(ctx context.Context, script Script) (*RunResponse, error) { afs := afero.Afero{Fs: r.fs} workdir, err := afs.TempDir("", "k6-runner") @@ -435,7 +437,7 @@ func (r LocalRunner) Run(ctx context.Context, script []byte) (*RunResponse, erro return nil, fmt.Errorf("cannot obtain temporary script filename: %w", err) } - if err := afs.WriteFile(scriptFn, script, 0o644); err != nil { + if err := afs.WriteFile(scriptFn, script.Script, 0o644); err != nil { return nil, fmt.Errorf("cannot write temporary script file: %w", err) } @@ -444,7 +446,7 @@ func (r LocalRunner) Run(ctx context.Context, script []byte) (*RunResponse, erro return nil, fmt.Errorf("cannot find k6 executable: %w", err) } - timeout := getTimeout(ctx) + timeout := time.Duration(script.Settings.Timeout) * time.Millisecond // #nosec G204 -- the variables are not user-controlled cmd := exec.CommandContext( @@ -483,7 +485,7 @@ func (r LocalRunner) Run(ctx context.Context, script []byte) (*RunResponse, erro start := time.Now() - r.logger.Info().Str("command", cmd.String()).Bytes("script", script).Msg("running k6 script") + r.logger.Info().Str("command", cmd.String()).Bytes("script", script.Script).Msg("running k6 script") if err := cmd.Run(); err != nil { r.logger.Error().Err(err).Str("stdout", stdout.String()).Str("stderr", stderr.String()).Msg("k6 exited with error") @@ -524,12 +526,3 @@ func mktemp(fs afero.Fs, dir, pattern string) (string, error) { } return f.Name(), nil } - -func getTimeout(ctx context.Context) time.Duration { - deadline, ok := ctx.Deadline() - if !ok { - return 10 * time.Second - } - - return time.Until(deadline) -} diff --git a/internal/k6runner/k6runner_test.go b/internal/k6runner/k6runner_test.go index eb925737..d8171b2a 100644 --- a/internal/k6runner/k6runner_test.go +++ b/internal/k6runner/k6runner_test.go @@ -37,12 +37,15 @@ func TestNew(t *testing.T) { func TestNewScript(t *testing.T) { runner := New(RunnerOpts{Uri: "k6"}) - src := []byte("test") - script, err := NewProcessor(src, runner) + script := Script{ + Script: []byte("test"), + } + + processor, err := NewProcessor(script, runner) require.NoError(t, err) - require.NotNil(t, script) - require.Equal(t, src, script.script) - require.Equal(t, runner, script.runner) + require.NotNil(t, processor) + require.Equal(t, script, processor.script) + require.Equal(t, runner, processor.runner) } func TestScriptRun(t *testing.T) { @@ -51,9 +54,9 @@ func TestScriptRun(t *testing.T) { logs: testhelper.MustReadFile(t, "testdata/test.log"), } - script, err := NewProcessor(testhelper.MustReadFile(t, "testdata/test.js"), &runner) + processor, err := NewProcessor(Script{Script: testhelper.MustReadFile(t, "testdata/test.js")}, &runner) require.NoError(t, err) - require.NotNil(t, script) + require.NotNil(t, processor) var ( registry = prometheus.NewRegistry() @@ -68,33 +71,28 @@ func TestScriptRun(t *testing.T) { // We already know tha parsing the metrics and the logs is working, so // we are only interested in verifying that the script runs without // errors. - success, err := script.Run(ctx, registry, &logger, zlogger) + success, err := processor.Run(ctx, registry, &logger, zlogger) require.NoError(t, err) require.True(t, success) } func TestHttpRunnerRun(t *testing.T) { - scriptSrc := testhelper.MustReadFile(t, "testdata/test.js") - timeout := 1 * time.Second + script := Script{ + Script: testhelper.MustReadFile(t, "testdata/test.js"), + Settings: Settings{ + Timeout: 1000, + }, + } mux := http.NewServeMux() mux.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) { require.Equal(t, http.MethodPost, r.Method) require.Equal(t, "application/json", r.Header.Get("Content-Type")) - var req RunRequest + var req Script err := json.NewDecoder(r.Body).Decode(&req) require.NoError(t, err) - require.Equal(t, scriptSrc, req.Script) - // The timeout in the request is not going to be exactly the - // original timeout because computers need some time to process - // data, and the timeout is set based on the remaining time - // until the deadline and the clock starts ticking as soon as - // the context is created. Check that the actual timeout is not - // greater than the expected value and that it's within 1% of - // the expected value. - require.LessOrEqual(t, req.Settings.Timeout, timeout.Milliseconds()) - require.InEpsilon(t, timeout.Milliseconds(), req.Settings.Timeout, 0.01) + require.Equal(t, script, req) w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) @@ -119,18 +117,17 @@ func TestHttpRunnerRun(t *testing.T) { ctx, cancel := testhelper.Context(ctx, t) t.Cleanup(cancel) - // By adding a timeout to the context passed to Run, the expectation is - // that the runner extracts the timeout from it and sets the - // corresponding field accordingly. - ctx, cancel = context.WithTimeout(ctx, timeout) - t.Cleanup(cancel) - - _, err := runner.Run(ctx, scriptSrc) + _, err := runner.Run(ctx, script) require.NoError(t, err) } func TestHttpRunnerRunError(t *testing.T) { - scriptSrc := testhelper.MustReadFile(t, "testdata/test.js") + script := Script{ + Script: testhelper.MustReadFile(t, "testdata/test.js"), + Settings: Settings{ + Timeout: 1000, + }, + } mux := http.NewServeMux() mux.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) { @@ -160,7 +157,7 @@ func TestHttpRunnerRunError(t *testing.T) { ctx, cancel := testhelper.Context(context.Background(), t) t.Cleanup(cancel) - _, err := runner.Run(ctx, scriptSrc) + _, err := runner.Run(ctx, script) require.Error(t, err) } @@ -278,7 +275,7 @@ func TestScriptHTTPRun(t *testing.T) { t.Cleanup(srv.Close) runner := New(RunnerOpts{Uri: srv.URL + "/run"}) - script, err := NewProcessor([]byte("tee-hee"), runner) + script, err := NewProcessor(Script{Script: []byte("tee-hee")}, runner) require.NoError(t, err) baseCtx, baseCancel := context.WithTimeout(context.Background(), time.Second) @@ -307,7 +304,7 @@ type testRunner struct { var _ Runner = &testRunner{} -func (r *testRunner) Run(ctx context.Context, script []byte) (*RunResponse, error) { +func (r *testRunner) Run(ctx context.Context, script Script) (*RunResponse, error) { return &RunResponse{ Metrics: r.metrics, Logs: r.logs, diff --git a/internal/prober/multihttp/multihttp.go b/internal/prober/multihttp/multihttp.go index e4375047..3a9c9094 100644 --- a/internal/prober/multihttp/multihttp.go +++ b/internal/prober/multihttp/multihttp.go @@ -5,7 +5,6 @@ import ( "errors" "net/http" "strings" - "time" "github.com/grafana/synthetic-monitoring-agent/internal/k6runner" "github.com/grafana/synthetic-monitoring-agent/internal/prober/logger" @@ -19,14 +18,14 @@ const proberName = "multihttp" var errUnsupportedCheck = errors.New("unsupported check") type Module struct { - Prober string - Timeout time.Duration + Prober string + Script k6runner.Script } type Prober struct { - logger zerolog.Logger - config Module - script *k6runner.Processor + logger zerolog.Logger + module Module + processor *k6runner.Processor } func NewProber(ctx context.Context, check sm.Check, logger zerolog.Logger, runner k6runner.Runner, reservedHeaders http.Header) (Prober, error) { @@ -44,16 +43,23 @@ func NewProber(ctx context.Context, check sm.Check, logger zerolog.Logger, runne augmentHttpHeaders(&check, reservedHeaders) } - p.config = settingsToModule(check.Settings.Multihttp) - timeout := time.Duration(check.Timeout) * time.Millisecond - p.config.Timeout = timeout - script, err := settingsToScript(check.Settings.Multihttp) if err != nil { return p, err } - k6Script, err := k6runner.NewProcessor(script, runner) + p.module = Module{ + Prober: sm.CheckTypeMultiHttp.String(), + Script: k6runner.Script{ + Script: script, + Settings: k6runner.Settings{ + Timeout: check.Timeout, + }, + // TODO: Add metadata & features here. + }, + } + + processor, err := k6runner.NewProcessor(p.module.Script, runner) if err != nil { return p, err } @@ -66,7 +72,7 @@ func NewProber(ctx context.Context, check sm.Check, logger zerolog.Logger, runne Bytes("script", script). Msg("created prober") - p.script = k6Script + p.processor = processor p.logger = logger return p, nil @@ -77,7 +83,7 @@ func (p Prober) Name() string { } func (p Prober) Probe(ctx context.Context, target string, registry *prometheus.Registry, logger logger.Logger) bool { - success, err := p.script.Run(ctx, registry, logger, p.logger) + success, err := p.processor.Run(ctx, registry, logger, p.logger) if err != nil { p.logger.Warn().Err(err).Msg("running probe") return false @@ -86,14 +92,6 @@ func (p Prober) Probe(ctx context.Context, target string, registry *prometheus.R return success } -func settingsToModule(settings *sm.MultiHttpSettings) Module { - var m Module - - m.Prober = sm.CheckTypeMultiHttp.String() - - return m -} - // Overrides any user-provided headers with our own augmented values // for 'reserved' headers. func augmentHttpHeaders(check *sm.Check, reservedHeaders http.Header) { diff --git a/internal/prober/multihttp/multihttp_test.go b/internal/prober/multihttp/multihttp_test.go index cb350b1c..a3cdaf2f 100644 --- a/internal/prober/multihttp/multihttp_test.go +++ b/internal/prober/multihttp/multihttp_test.go @@ -131,8 +131,8 @@ func TestNewProber(t *testing.T) { require.Equal(t, requestHeaders[0].Value, fmt.Sprintf("%d-%d", checkId, checkId)) require.NoError(t, err) - require.Equal(t, proberName, p.config.Prober) - require.Equal(t, 10*time.Second, p.config.Timeout) + require.Equal(t, proberName, p.module.Prober) + require.Equal(t, 10*time.Second, time.Duration(p.module.Script.Settings.Timeout)*time.Millisecond) // TODO: check script }) } @@ -145,6 +145,6 @@ func (noopRunner) WithLogger(logger *zerolog.Logger) k6runner.Runner { return r } -func (noopRunner) Run(ctx context.Context, script []byte) (*k6runner.RunResponse, error) { +func (noopRunner) Run(ctx context.Context, script k6runner.Script) (*k6runner.RunResponse, error) { return &k6runner.RunResponse{}, nil } diff --git a/internal/prober/scripted/scripted.go b/internal/prober/scripted/scripted.go index 7cdb96dc..4d6039b1 100644 --- a/internal/prober/scripted/scripted.go +++ b/internal/prober/scripted/scripted.go @@ -3,7 +3,6 @@ package scripted import ( "context" "errors" - "time" "github.com/grafana/synthetic-monitoring-agent/internal/k6runner" "github.com/grafana/synthetic-monitoring-agent/internal/prober/logger" @@ -17,15 +16,14 @@ const proberName = "scripted" var errUnsupportedCheck = errors.New("unsupported check") type Module struct { - Prober string - Timeout time.Duration - Script []byte + Prober string + Script k6runner.Script } type Prober struct { - logger zerolog.Logger - config Module - script *k6runner.Processor + logger zerolog.Logger + module Module + processor *k6runner.Processor } func NewProber(ctx context.Context, check sm.Check, logger zerolog.Logger, runner k6runner.Runner) (Prober, error) { @@ -35,16 +33,23 @@ func NewProber(ctx context.Context, check sm.Check, logger zerolog.Logger, runne return p, errUnsupportedCheck } - p.config = settingsToModule(check.Settings.Scripted) - timeout := time.Duration(check.Timeout) * time.Millisecond - p.config.Timeout = timeout + p.module = Module{ + Prober: sm.CheckTypeScripted.String(), + Script: k6runner.Script{ + Script: check.Settings.Scripted.Script, + Settings: k6runner.Settings{ + Timeout: check.Timeout, + }, + // TODO: Add metadata & features here. + }, + } - script, err := k6runner.NewProcessor(check.Settings.Scripted.Script, runner) + processor, err := k6runner.NewProcessor(p.module.Script, runner) if err != nil { return p, err } - p.script = script + p.processor = processor p.logger = logger return p, nil @@ -55,7 +60,7 @@ func (p Prober) Name() string { } func (p Prober) Probe(ctx context.Context, target string, registry *prometheus.Registry, logger logger.Logger) bool { - success, err := p.script.Run(ctx, registry, logger, p.logger) + success, err := p.processor.Run(ctx, registry, logger, p.logger) if err != nil { p.logger.Warn().Err(err).Msg("running probe") return false @@ -63,13 +68,3 @@ func (p Prober) Probe(ctx context.Context, target string, registry *prometheus.R return success } - -func settingsToModule(settings *sm.ScriptedSettings) Module { - var m Module - - m.Prober = sm.CheckTypeScripted.String() - - m.Script = settings.Script - - return m -} diff --git a/internal/prober/scripted/scripted_test.go b/internal/prober/scripted/scripted_test.go index ba20c70a..2c39df83 100644 --- a/internal/prober/scripted/scripted_test.go +++ b/internal/prober/scripted/scripted_test.go @@ -59,9 +59,9 @@ func TestNewProber(t *testing.T) { } require.NoError(t, err) - require.Equal(t, proberName, p.config.Prober) - require.Equal(t, 10*time.Second, p.config.Timeout) - require.Equal(t, tc.check.Settings.Scripted.Script, p.config.Script) + require.Equal(t, proberName, p.module.Prober) + require.Equal(t, 10*time.Second, time.Duration(p.module.Script.Settings.Timeout)*time.Millisecond) + require.Equal(t, tc.check.Settings.Scripted.Script, p.module.Script.Script) }) } } @@ -73,7 +73,7 @@ func (noopRunner) WithLogger(logger *zerolog.Logger) k6runner.Runner { return r } -func (noopRunner) Run(ctx context.Context, script []byte) (*k6runner.RunResponse, error) { +func (noopRunner) Run(ctx context.Context, script k6runner.Script) (*k6runner.RunResponse, error) { return &k6runner.RunResponse{}, nil } diff --git a/internal/scraper/scraper_test.go b/internal/scraper/scraper_test.go index 18166c9d..a71d278e 100644 --- a/internal/scraper/scraper_test.go +++ b/internal/scraper/scraper_test.go @@ -1685,7 +1685,7 @@ type testRunner struct { var _ k6runner.Runner = &testRunner{} -func (r *testRunner) Run(ctx context.Context, script []byte) (*k6runner.RunResponse, error) { +func (r *testRunner) Run(ctx context.Context, script k6runner.Script) (*k6runner.RunResponse, error) { return &k6runner.RunResponse{ Metrics: r.metrics, Logs: r.logs,