Skip to content

Commit

Permalink
Revert "Revert "Merge pull request #2251 from grafana/fix/1443_remove…
Browse files Browse the repository at this point in the history
…_the_js_runtime_from_threshold_calcultations""

This reverts commit 22a0db3.
  • Loading branch information
oleiade committed Feb 25, 2022
1 parent 737dfa7 commit 7a9d184
Show file tree
Hide file tree
Showing 5 changed files with 1,010 additions and 180 deletions.
29 changes: 18 additions & 11 deletions core/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func TestEngine_processSamples(t *testing.T) {
})
t.Run("submetric", func(t *testing.T) {
t.Parallel()
ths, err := stats.NewThresholds([]string{`1+1==2`})
ths, err := stats.NewThresholds([]string{`value<2`})
assert.NoError(t, err)

e, _, wait := newTestEngine(t, nil, nil, nil, lib.Options{
Expand Down Expand Up @@ -291,7 +291,10 @@ func TestEngineThresholdsWillAbort(t *testing.T) {
t.Parallel()
metric := stats.New("my_metric", stats.Gauge)

ths, err := stats.NewThresholds([]string{"1+1==3"})
// The incoming samples for the metric set it to 1.25. Considering
// the metric is of type Gauge, value > 1.25 should always fail, and
// trigger an abort.
ths, err := stats.NewThresholds([]string{"value>1.25"})
assert.NoError(t, err)
ths.Thresholds[0].AbortOnFail = true

Expand All @@ -310,7 +313,11 @@ func TestEngineAbortedByThresholds(t *testing.T) {
t.Parallel()
metric := stats.New("my_metric", stats.Gauge)

ths, err := stats.NewThresholds([]string{"1+1==3"})
// The MiniRunner sets the value of the metric to 1.25. Considering
// the metric is of type Gauge, value > 1.25 should always fail, and
// trigger an abort.
// **N.B**: a threshold returning an error, won't trigger an abort.
ths, err := stats.NewThresholds([]string{"value>1.25"})
assert.NoError(t, err)
ths.Thresholds[0].AbortOnFail = true

Expand Down Expand Up @@ -350,14 +357,14 @@ func TestEngine_processThresholds(t *testing.T) {
ths map[string][]string
abort bool
}{
"passing": {true, map[string][]string{"my_metric": {"1+1==2"}}, false},
"failing": {false, map[string][]string{"my_metric": {"1+1==3"}}, false},
"aborting": {false, map[string][]string{"my_metric": {"1+1==3"}}, true},

"submetric,match,passing": {true, map[string][]string{"my_metric{a:1}": {"1+1==2"}}, false},
"submetric,match,failing": {false, map[string][]string{"my_metric{a:1}": {"1+1==3"}}, false},
"submetric,nomatch,passing": {true, map[string][]string{"my_metric{a:2}": {"1+1==2"}}, false},
"submetric,nomatch,failing": {true, map[string][]string{"my_metric{a:2}": {"1+1==3"}}, false},
"passing": {true, map[string][]string{"my_metric": {"value<2"}}, false},
"failing": {false, map[string][]string{"my_metric": {"value>1.25"}}, false},
"aborting": {false, map[string][]string{"my_metric": {"value>1.25"}}, true},

"submetric,match,passing": {true, map[string][]string{"my_metric{a:1}": {"value<2"}}, false},
"submetric,match,failing": {false, map[string][]string{"my_metric{a:1}": {"value>1.25"}}, false},
"submetric,nomatch,passing": {true, map[string][]string{"my_metric{a:2}": {"value<2"}}, false},
"submetric,nomatch,failing": {true, map[string][]string{"my_metric{a:2}": {"value>1.25"}}, false},
}

for name, data := range testdata {
Expand Down
181 changes: 112 additions & 69 deletions stats/thresholds.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,54 +17,35 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

package stats

import (
"bytes"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/dop251/goja"

"go.k6.io/k6/lib/types"
)

const jsEnvSrc = `
function p(pct) {
return __sink__.P(pct/100.0);
};
`

var jsEnv *goja.Program

func init() {
pgm, err := goja.Compile("__env__", jsEnvSrc, true)
if err != nil {
panic(err)
}
jsEnv = pgm
}

// Threshold is a representation of a single threshold for a single metric
type Threshold struct {
// Source is the text based source of the threshold
Source string
// LastFailed is a makrer if the last testing of this threshold failed
// LastFailed is a marker if the last testing of this threshold failed
LastFailed bool
// AbortOnFail marks if a given threshold fails that the whole test should be aborted
AbortOnFail bool
// AbortGracePeriod is a the minimum amount of time a test should be running before a failing
// this threshold will abort the test
AbortGracePeriod types.NullDuration

pgm *goja.Program
rt *goja.Runtime
// parsed is the threshold expression parsed from the Source
parsed *thresholdExpression
}

func newThreshold(src string, newThreshold *goja.Runtime, abortOnFail bool, gracePeriod types.NullDuration) (*Threshold, error) {
pgm, err := goja.Compile("__threshold__", src, true)
func newThreshold(src string, abortOnFail bool, gracePeriod types.NullDuration) (*Threshold, error) {
parsedExpression, err := parseThresholdExpression(src)
if err != nil {
return nil, err
}
Expand All @@ -73,23 +54,57 @@ func newThreshold(src string, newThreshold *goja.Runtime, abortOnFail bool, grac
Source: src,
AbortOnFail: abortOnFail,
AbortGracePeriod: gracePeriod,
pgm: pgm,
rt: newThreshold,
parsed: parsedExpression,
}, nil
}

func (t Threshold) runNoTaint() (bool, error) {
v, err := t.rt.RunProgram(t.pgm)
if err != nil {
return false, err
func (t *Threshold) runNoTaint(sinks map[string]float64) (bool, error) {
// Extract the sink value for the aggregation method used in the threshold
// expression
lhs, ok := sinks[t.parsed.AggregationMethod]
if !ok {
return false, fmt.Errorf("unable to apply threshold %s over metrics; reason: "+
"no metric supporting the %s aggregation method found",
t.Source,
t.parsed.AggregationMethod)
}
return v.ToBoolean(), nil

// Apply the threshold expression operator to the left and
// right hand side values
var passes bool
switch t.parsed.Operator {
case ">":
passes = lhs > t.parsed.Value
case ">=":
passes = lhs >= t.parsed.Value
case "<=":
passes = lhs <= t.parsed.Value
case "<":
passes = lhs < t.parsed.Value
case "==", "===":
// Considering a sink always maps to float64 values,
// strictly equal is equivalent to loosely equal
passes = lhs == t.parsed.Value
case "!=":
passes = lhs != t.parsed.Value
default:
// The parseThresholdExpression function should ensure that no invalid
// operator gets through, but let's protect our future selves anyhow.
return false, fmt.Errorf("unable to apply threshold %s over metrics; "+
"reason: %s is an invalid operator",
t.Source,
t.parsed.Operator,
)
}

// Perform the actual threshold verification
return passes, nil
}

func (t *Threshold) run() (bool, error) {
b, err := t.runNoTaint()
t.LastFailed = !b
return b, err
func (t *Threshold) run(sinks map[string]float64) (bool, error) {
passes, err := t.runNoTaint(sinks)
t.LastFailed = !passes
return passes, err
}

type thresholdConfig struct {
Expand All @@ -98,11 +113,11 @@ type thresholdConfig struct {
AbortGracePeriod types.NullDuration `json:"delayAbortEval"`
}

//used internally for JSON marshalling
// used internally for JSON marshalling
type rawThresholdConfig thresholdConfig

func (tc *thresholdConfig) UnmarshalJSON(data []byte) error {
//shortcircuit unmarshalling for simple string format
// shortcircuit unmarshalling for simple string format
if err := json.Unmarshal(data, &tc.Threshold); err == nil {
return nil
}
Expand All @@ -122,9 +137,9 @@ func (tc thresholdConfig) MarshalJSON() ([]byte, error) {

// Thresholds is the combination of all Thresholds for a given metric
type Thresholds struct {
Runtime *goja.Runtime
Thresholds []*Threshold
Abort bool
sinked map[string]float64
}

// NewThresholds returns Thresholds objects representing the provided source strings
Expand All @@ -138,60 +153,88 @@ func NewThresholds(sources []string) (Thresholds, error) {
}

func newThresholdsWithConfig(configs []thresholdConfig) (Thresholds, error) {
rt := goja.New()
if _, err := rt.RunProgram(jsEnv); err != nil {
return Thresholds{}, fmt.Errorf("threshold builtin error: %w", err)
}
thresholds := make([]*Threshold, len(configs))
sinked := make(map[string]float64)

ts := make([]*Threshold, len(configs))
for i, config := range configs {
t, err := newThreshold(config.Threshold, rt, config.AbortOnFail, config.AbortGracePeriod)
t, err := newThreshold(config.Threshold, config.AbortOnFail, config.AbortGracePeriod)
if err != nil {
return Thresholds{}, fmt.Errorf("threshold %d error: %w", i, err)
}
ts[i] = t
thresholds[i] = t
}

return Thresholds{rt, ts, false}, nil
return Thresholds{thresholds, false, sinked}, nil
}

func (ts *Thresholds) updateVM(sink Sink, t time.Duration) error {
ts.Runtime.Set("__sink__", sink)
f := sink.Format(t)
for k, v := range f {
ts.Runtime.Set(k, v)
}
return nil
}

func (ts *Thresholds) runAll(t time.Duration) (bool, error) {
succ := true
for i, th := range ts.Thresholds {
b, err := th.run()
func (ts *Thresholds) runAll(timeSpentInTest time.Duration) (bool, error) {
succeeded := true
for i, threshold := range ts.Thresholds {
b, err := threshold.run(ts.sinked)
if err != nil {
return false, fmt.Errorf("threshold %d run error: %w", i, err)
}

if !b {
succ = false
succeeded = false

if ts.Abort || !th.AbortOnFail {
if ts.Abort || !threshold.AbortOnFail {
continue
}

ts.Abort = !th.AbortGracePeriod.Valid ||
th.AbortGracePeriod.Duration < types.Duration(t)
ts.Abort = !threshold.AbortGracePeriod.Valid ||
threshold.AbortGracePeriod.Duration < types.Duration(timeSpentInTest)
}
}
return succ, nil

return succeeded, nil
}

// Run processes all the thresholds with the provided Sink at the provided time and returns if any
// of them fails
func (ts *Thresholds) Run(sink Sink, t time.Duration) (bool, error) {
if err := ts.updateVM(sink, t); err != nil {
return false, err
func (ts *Thresholds) Run(sink Sink, duration time.Duration) (bool, error) {
// Initialize the sinks store
ts.sinked = make(map[string]float64)

// FIXME: Remove this comment as soon as the stats.Sink does not expose Format anymore.
//
// As of December 2021, this block reproduces the behavior of the
// stats.Sink.Format behavior. As we intend to try to get away from it,
// we instead implement the behavior directly here.
//
// For more details, see https://github.com/grafana/k6/issues/2320
switch sinkImpl := sink.(type) {
case *CounterSink:
ts.sinked["count"] = sinkImpl.Value
ts.sinked["rate"] = sinkImpl.Value / (float64(duration) / float64(time.Second))
case *GaugeSink:
ts.sinked["value"] = sinkImpl.Value
case *TrendSink:
ts.sinked["min"] = sinkImpl.Min
ts.sinked["max"] = sinkImpl.Max
ts.sinked["avg"] = sinkImpl.Avg
ts.sinked["med"] = sinkImpl.Med

// Parse the percentile thresholds and insert them in
// the sinks mapping.
for _, threshold := range ts.Thresholds {
if !strings.HasPrefix(threshold.parsed.AggregationMethod, "p(") {
continue
}

ts.sinked[threshold.parsed.AggregationMethod] = sinkImpl.P(threshold.parsed.AggregationValue.Float64 / 100)
}
case *RateSink:
ts.sinked["rate"] = float64(sinkImpl.Trues) / float64(sinkImpl.Total)
case DummySink:
for k, v := range sinkImpl {
ts.sinked[k] = v
}
default:
return false, fmt.Errorf("unable to run Thresholds; reason: unknown sink type")
}
return ts.runAll(t)

return ts.runAll(duration)
}

// UnmarshalJSON is implementation of json.Unmarshaler
Expand Down
Loading

0 comments on commit 7a9d184

Please sign in to comment.