From eb62b37d3ca0afadcba6bab43d9ef2933349ddd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20Miri=C4=87?= Date: Tue, 30 May 2023 16:42:19 +0200 Subject: [PATCH 01/13] Add event system --- event/doc.go | 3 + event/event.go | 10 ++ event/system.go | 178 +++++++++++++++++++++++++++++++ event/system_test.go | 246 +++++++++++++++++++++++++++++++++++++++++++ event/type.go | 34 ++++++ event/type_gen.go | 54 ++++++++++ 6 files changed, 525 insertions(+) create mode 100644 event/doc.go create mode 100644 event/event.go create mode 100644 event/system.go create mode 100644 event/system_test.go create mode 100644 event/type.go create mode 100644 event/type_gen.go diff --git a/event/doc.go b/event/doc.go new file mode 100644 index 000000000000..7a5c3c045058 --- /dev/null +++ b/event/doc.go @@ -0,0 +1,3 @@ +// Package event contains the event system used to notify external components of +// various internal events during test execution. +package event diff --git a/event/event.go b/event/event.go new file mode 100644 index 000000000000..0fe6a21ab4a0 --- /dev/null +++ b/event/event.go @@ -0,0 +1,10 @@ +package event + +// Event is the emitted object sent to all subscribers of its type. +// The subscriber should call its Done method when finished processing +// to notify the emitter, though this is not required for all events. +type Event struct { + Type Type + Data any + Done func() +} diff --git a/event/system.go b/event/system.go new file mode 100644 index 000000000000..64c60db22128 --- /dev/null +++ b/event/system.go @@ -0,0 +1,178 @@ +package event + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/sirupsen/logrus" +) + +// Subscriber is a limited interface of System that only allows subscribing and +// unsubscribing. +type Subscriber interface { + Subscribe(events ...Type) (subID uint64, eventsCh <-chan *Event) + Unsubscribe(subID uint64) +} + +// System keeps track of subscribers, and allows subscribing to and emitting +// events. +type System struct { + ctx context.Context + subMx sync.RWMutex + subIDCount uint64 + subscribers map[Type]map[uint64]chan *Event + eventBuffer int + logger logrus.FieldLogger +} + +// NewEventSystem returns a new System. +// eventBuffer determines the size of the Event channel buffer. Events might be +// dropped if this buffer is full and there are no event listeners, or if events +// are emitted very quickly and the event handler goroutine is busy. It is +// recommended to handle events in a separate goroutine to not block the +// listener goroutine. +func NewEventSystem(ctx context.Context, eventBuffer int, logger logrus.FieldLogger) *System { + return &System{ + ctx: ctx, + subscribers: make(map[Type]map[uint64]chan *Event), + eventBuffer: eventBuffer, + logger: logger, + } +} + +// Subscribe to one or more events. It returns a subscriber ID that can be +// used to unsubscribe, and an Event channel to receive events. +// It panics if events is empty. +func (s *System) Subscribe(events ...Type) (subID uint64, eventsCh <-chan *Event) { + if len(events) == 0 { + panic("must subscribe to at least 1 event type") + } + + s.subMx.Lock() + defer s.subMx.Unlock() + s.subIDCount++ + subID = s.subIDCount + + evtCh := make(chan *Event, s.eventBuffer) + for _, evt := range events { + if s.subscribers[evt] == nil { + s.subscribers[evt] = make(map[uint64]chan *Event) + } + s.subscribers[evt][subID] = evtCh + } + + s.logger.WithFields(logrus.Fields{ + "subscriptionID": subID, + "events": events, + }).Debug("Created event subscription") + + return subID, evtCh +} + +// Emit the event to all subscribers of its type. +// It returns a function that can be optionally used to wait for all subscribers +// to process the event (by signalling via the Done method). +func (s *System) Emit(event *Event) (wait func(time.Duration) error) { + s.subMx.RLock() + defer s.subMx.RUnlock() + totalSubs := len(s.subscribers[event.Type]) + if totalSubs == 0 { + return func(time.Duration) error { return nil } + } + + if event.Done == nil { + event.Done = func() {} + } + origDoneFn := event.Done + doneCh := make(chan struct{}, s.eventBuffer) + doneFn := func() { + origDoneFn() + select { + case doneCh <- struct{}{}: + default: + } + } + event.Done = doneFn + + for _, evtCh := range s.subscribers[event.Type] { + select { + case evtCh <- event: + default: + } + } + + s.logger.WithFields(logrus.Fields{ + "subscribers": totalSubs, + "event": event.Type, + }).Trace("Emitted event") + + return func(timeout time.Duration) error { + var ( + doneCount int + tout = time.After(timeout) + ) + for { + if doneCount == totalSubs { + close(doneCh) + return nil + } + select { + case <-doneCh: + doneCount++ + case <-tout: + return fmt.Errorf("timed out after waiting %s for all '%s' events to be processed", timeout, event.Type) + case <-s.ctx.Done(): + return nil + } + } + } +} + +// Unsubscribe closes the Event channel and removes the subscription with ID +// subID. +func (s *System) Unsubscribe(subID uint64) { + s.subMx.Lock() + defer s.subMx.Unlock() + var seen bool + for _, sub := range s.subscribers { + if evtCh, ok := sub[subID]; ok { + if !seen { + close(evtCh) + } + delete(sub, subID) + seen = true + } + } + + if seen { + s.logger.WithFields(logrus.Fields{ + "subscriptionID": subID, + }).Debug("Removed event subscription") + } +} + +// UnsubscribeAll closes all event channels and removes all subscriptions. +func (s *System) UnsubscribeAll() { + s.subMx.Lock() + defer s.subMx.Unlock() + + seenSubs := make(map[uint64]struct{}) + for _, sub := range s.subscribers { + for subID, evtCh := range sub { + if _, ok := seenSubs[subID]; !ok { + close(evtCh) + seenSubs[subID] = struct{}{} + } + } + } + + if len(seenSubs) > 0 { + s.logger.WithFields(logrus.Fields{ + "subscriptions": len(seenSubs), + }).Debug("Removed all event subscriptions") + } + + s.subscribers = make(map[Type]map[uint64]chan *Event) +} diff --git a/event/system_test.go b/event/system_test.go new file mode 100644 index 000000000000..359aec480295 --- /dev/null +++ b/event/system_test.go @@ -0,0 +1,246 @@ +package event + +import ( + "context" + "errors" + "io" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEventSystem(t *testing.T) { + t.Parallel() + t.Run("subscribe", func(t *testing.T) { + t.Parallel() + logger := logrus.New() + logger.SetOutput(io.Discard) + es := NewEventSystem(context.Background(), 10, logger) + + require.Len(t, es.subscribers, 0) + + s1id, s1ch := es.Subscribe(Init) + + assert.Equal(t, uint64(1), s1id) + assert.NotNil(t, s1ch) + assert.Len(t, es.subscribers, 1) + assert.Len(t, es.subscribers[Init], 1) + assert.Equal(t, (<-chan *Event)(es.subscribers[Init][s1id]), s1ch) + + s2id, s2ch := es.Subscribe(Init, TestStart) + + assert.Equal(t, uint64(2), s2id) + assert.NotNil(t, s2ch) + assert.Len(t, es.subscribers, 2) + assert.Len(t, es.subscribers[Init], 2) + assert.Len(t, es.subscribers[TestStart], 1) + assert.Equal(t, (<-chan *Event)(es.subscribers[Init][s2id]), s2ch) + assert.Equal(t, (<-chan *Event)(es.subscribers[TestStart][s2id]), s2ch) + }) + + t.Run("subscribe/panic", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + logger := logrus.New() + logger.SetOutput(io.Discard) + es := NewEventSystem(ctx, 10, logger) + assert.PanicsWithValue(t, "must subscribe to at least 1 event type", func() { + es.Subscribe() + }) + }) + + t.Run("emit_and_process", func(t *testing.T) { + t.Parallel() + testTimeout := 5 * time.Second + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + + defer cancel() + logger := logrus.New() + logger.SetOutput(io.Discard) + es := NewEventSystem(ctx, 10, logger) + + s1id, s1ch := es.Subscribe(Init, Exit) + s2id, s2ch := es.Subscribe(Init, TestStart, TestEnd, Exit) + + type result struct { + sid uint64 + events []*Event + err error + } + resultCh := make(chan result, 2) + go func() { + s1result, err := processEvents(ctx, es, s1id, s1ch) + resultCh <- result{s1id, s1result, err} + }() + + go func() { + s2result, err := processEvents(ctx, es, s2id, s2ch) + resultCh <- result{s2id, s2result, err} + }() + + var ( + doneMx sync.RWMutex + processed = make(map[Type]int) + emitEvents = []Type{Init, TestStart, IterStart, IterEnd, TestEnd, Exit} + data int + ) + for _, et := range emitEvents { + et := et + evt := &Event{Type: et, Data: data, Done: func() { + doneMx.Lock() + processed[et]++ + doneMx.Unlock() + }} + es.Emit(evt) + data++ + } + + for i := 0; i < 2; i++ { + select { + case result := <-resultCh: + require.NoError(t, result.err) + switch result.sid { + case s1id: + require.Len(t, result.events, 2) + assert.Equal(t, Init, result.events[0].Type) + assert.Equal(t, 0, result.events[0].Data) + assert.Equal(t, Exit, result.events[1].Type) + assert.Equal(t, 5, result.events[1].Data) + case s2id: + require.Len(t, result.events, 4) + assert.Equal(t, Init, result.events[0].Type) + assert.Equal(t, 0, result.events[0].Data) + assert.Equal(t, TestStart, result.events[1].Type) + assert.Equal(t, 1, result.events[1].Data) + assert.Equal(t, TestEnd, result.events[2].Type) + assert.Equal(t, 4, result.events[2].Data) + assert.Equal(t, Exit, result.events[3].Type) + assert.Equal(t, 5, result.events[3].Data) + } + case <-ctx.Done(): + t.Fatalf("test timed out after %s", testTimeout) + } + } + + expProcessed := map[Type]int{ + Init: 2, + TestStart: 1, + TestEnd: 1, + Exit: 2, + } + assert.Equal(t, expProcessed, processed) + }) + + t.Run("emit_and_wait/ok", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + logger := logrus.New() + logger.SetOutput(io.Discard) + es := NewEventSystem(ctx, 100, logger) + + var ( + wg sync.WaitGroup + numSubs = 100 + ) + for i := 0; i < numSubs; i++ { + sid, evtCh := es.Subscribe(Exit) + wg.Add(1) + go func() { + defer wg.Done() + _, err := processEvents(ctx, es, sid, evtCh) + require.NoError(t, err) + }() + } + + var done uint32 + wait := es.Emit(&Event{Type: Exit, Done: func() { + atomic.AddUint32(&done, 1) + }}) + err := wait(time.Second) + require.NoError(t, err) + assert.Equal(t, uint32(numSubs), done) + + wg.Wait() + }) + + t.Run("emit_and_wait/error", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + logger := logrus.New() + logger.SetOutput(io.Discard) + es := NewEventSystem(ctx, 10, logger) + + sid, evtCh := es.Subscribe(Exit) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _, err := processEvents(ctx, es, sid, evtCh) + assert.NoError(t, err) + }() + + wait := es.Emit(&Event{Type: Exit, Done: func() { + time.Sleep(200 * time.Millisecond) + }}) + err := wait(100 * time.Millisecond) + assert.EqualError(t, err, "timed out after waiting 100ms for all 'Exit' events to be processed") + + wg.Wait() + }) + + t.Run("unsubscribe", func(t *testing.T) { + t.Parallel() + logger := logrus.New() + logger.SetOutput(io.Discard) + es := NewEventSystem(context.Background(), 10, logger) + + require.Len(t, es.subscribers, 0) + + var ( + numSubs = 5 + subs = make([]uint64, numSubs) + ) + for i := 0; i < numSubs; i++ { + sid, _ := es.Subscribe(Init) + subs[i] = sid + } + + require.Len(t, es.subscribers[Init], numSubs) + + es.Unsubscribe(subs[0]) + assert.Len(t, es.subscribers[Init], numSubs-1) + es.Unsubscribe(subs[0]) // second unsubscribe does nothing + assert.Len(t, es.subscribers[Init], numSubs-1) + + es.UnsubscribeAll() + assert.Len(t, es.subscribers[Init], 0) + }) +} + +func processEvents(ctx context.Context, es *System, sid uint64, evtCh <-chan *Event) ([]*Event, error) { + result := make([]*Event, 0) + + for { + select { + case evt, ok := <-evtCh: + if !ok { + return result, nil + } + result = append(result, evt) + evt.Done() + if evt.Type == Exit { + es.Unsubscribe(sid) + } + case <-ctx.Done(): + return nil, errors.New("test timed out") + } + } +} diff --git a/event/type.go b/event/type.go new file mode 100644 index 000000000000..1a5a91c17f06 --- /dev/null +++ b/event/type.go @@ -0,0 +1,34 @@ +package event + +// Type represents the different event types emitted by k6. +// +//go:generate enumer -type=Type -trimprefix Type -output type_gen.go +type Type uint8 + +const ( + // Init is emitted when k6 starts initializing outputs, VUs and executors. + Init Type = iota + 1 + // TestStart is emitted when the execution scheduler starts running the test. + TestStart + // TestEnd is emitted when the test execution ends. + TestEnd + // IterStart is emitted when a VU starts an iteration. + IterStart + // IterEnd is emitted when a VU ends an iteration. + IterEnd + // Exit is emitted when the k6 process is about to exit. + Exit +) + +// ExitData is the data sent in the Exit event. Error is the error returned by +// the run command. +type ExitData struct { + Error error +} + +// IterData is the data sent in the IterStart and IterEnd events. +type IterData struct { + Iteration uint64 + VUID uint64 + ScenarioName string +} diff --git a/event/type_gen.go b/event/type_gen.go new file mode 100644 index 000000000000..21b2662ba3bc --- /dev/null +++ b/event/type_gen.go @@ -0,0 +1,54 @@ +// Code generated by "enumer -type=Type -trimprefix Type -output type_gen.go"; DO NOT EDIT. + +package event + +import ( + "fmt" +) + +const _TypeName = "InitTestStartTestEndIterStartIterEndExit" + +var _TypeIndex = [...]uint8{0, 4, 13, 20, 29, 36, 40} + +func (i Type) String() string { + i -= 1 + if i >= Type(len(_TypeIndex)-1) { + return fmt.Sprintf("Type(%d)", i+1) + } + return _TypeName[_TypeIndex[i]:_TypeIndex[i+1]] +} + +var _TypeValues = []Type{1, 2, 3, 4, 5, 6} + +var _TypeNameToValueMap = map[string]Type{ + _TypeName[0:4]: 1, + _TypeName[4:13]: 2, + _TypeName[13:20]: 3, + _TypeName[20:29]: 4, + _TypeName[29:36]: 5, + _TypeName[36:40]: 6, +} + +// TypeString retrieves an enum value from the enum constants string name. +// Throws an error if the param is not part of the enum. +func TypeString(s string) (Type, error) { + if val, ok := _TypeNameToValueMap[s]; ok { + return val, nil + } + return 0, fmt.Errorf("%s does not belong to Type values", s) +} + +// TypeValues returns all values of the enum +func TypeValues() []Type { + return _TypeValues +} + +// IsAType returns "true" if the value is listed in the enum definition. "false" otherwise +func (i Type) IsAType() bool { + for _, v := range _TypeValues { + if i == v { + return true + } + } + return false +} From 84e6db69f1cacfe69761725e8274ce5d945ca520 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20Miri=C4=87?= Date: Wed, 31 May 2023 17:14:46 +0200 Subject: [PATCH 02/13] Expose event system to JS modules --- js/bundle.go | 4 ++-- js/modules/modules.go | 3 +++ js/modules_vu.go | 6 ++++++ js/modulestest/modulestest.go | 7 +++++++ 4 files changed, 18 insertions(+), 2 deletions(-) diff --git a/js/bundle.go b/js/bundle.go index 3b7c9a46542b..f081ba67ea28 100644 --- a/js/bundle.go +++ b/js/bundle.go @@ -112,7 +112,7 @@ func newBundle( // Instantiate the bundle into a new VM using a bound init context. This uses a context with a // runtime, but no state, to allow module-provided types to function within the init context. // TODO use a real context - vuImpl := &moduleVUImpl{ctx: context.Background(), runtime: goja.New()} + vuImpl := &moduleVUImpl{ctx: context.Background(), runtime: goja.New(), events: piState.Events} vuImpl.eventLoop = eventloop.New(vuImpl) exports, err := bundle.instantiate(vuImpl, 0) if err != nil { @@ -220,7 +220,7 @@ func (b *Bundle) populateExports(updateOptions bool, exports *goja.Object) error func (b *Bundle) Instantiate(ctx context.Context, vuID uint64) (*BundleInstance, error) { // Instantiate the bundle into a new VM using a bound init context. This uses a context with a // runtime, but no state, to allow module-provided types to function within the init context. - vuImpl := &moduleVUImpl{ctx: ctx, runtime: goja.New()} + vuImpl := &moduleVUImpl{ctx: ctx, runtime: goja.New(), events: b.preInitState.Events} vuImpl.eventLoop = eventloop.New(vuImpl) exports, err := b.instantiate(vuImpl, vuID) if err != nil { diff --git a/js/modules/modules.go b/js/modules/modules.go index 148d899b0f5f..2f2e3945dee7 100644 --- a/js/modules/modules.go +++ b/js/modules/modules.go @@ -7,6 +7,7 @@ import ( "strings" "github.com/dop251/goja" + "go.k6.io/k6/event" "go.k6.io/k6/ext" "go.k6.io/k6/js/common" "go.k6.io/k6/lib" @@ -42,6 +43,8 @@ type VU interface { // Context return the context.Context about the current VU Context() context.Context + Events() event.Subscriber + // InitEnv returns common.InitEnvironment instance if present InitEnv() *common.InitEnvironment diff --git a/js/modules_vu.go b/js/modules_vu.go index aa2e3858fd04..8335d0dded36 100644 --- a/js/modules_vu.go +++ b/js/modules_vu.go @@ -4,6 +4,7 @@ import ( "context" "github.com/dop251/goja" + "go.k6.io/k6/event" "go.k6.io/k6/js/common" "go.k6.io/k6/js/eventloop" "go.k6.io/k6/lib" @@ -15,12 +16,17 @@ type moduleVUImpl struct { state *lib.State runtime *goja.Runtime eventLoop *eventloop.EventLoop + events *event.System } func (m *moduleVUImpl) Context() context.Context { return m.ctx } +func (m *moduleVUImpl) Events() event.Subscriber { + return m.events +} + func (m *moduleVUImpl) InitEnv() *common.InitEnvironment { return m.initEnv } diff --git a/js/modulestest/modulestest.go b/js/modulestest/modulestest.go index 0b7cd96462cc..3b5f1169746a 100644 --- a/js/modulestest/modulestest.go +++ b/js/modulestest/modulestest.go @@ -4,6 +4,7 @@ import ( "context" "github.com/dop251/goja" + "go.k6.io/k6/event" "go.k6.io/k6/js/common" "go.k6.io/k6/js/modules" "go.k6.io/k6/lib" @@ -15,6 +16,7 @@ var _ modules.VU = &VU{} type VU struct { CtxField context.Context InitEnvField *common.InitEnvironment + EventsField *event.System StateField *lib.State RuntimeField *goja.Runtime RegisterCallbackField func() func(f func() error) @@ -25,6 +27,11 @@ func (m *VU) Context() context.Context { return m.CtxField } +// Events returns internally set field to conform to modules.VU interface +func (m *VU) Events() event.Subscriber { + return m.EventsField +} + // InitEnv returns internally set field to conform to modules.VU interface func (m *VU) InitEnv() *common.InitEnvironment { m.checkIntegrity() From 477f770c90576e9fe1bb1af00f36508b1c6afeaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20Miri=C4=87?= Date: Wed, 31 May 2023 17:12:58 +0200 Subject: [PATCH 03/13] Emit events in run command and js.VU.RunOnce() --- cmd/run.go | 31 ++++++++++++ cmd/test_load.go | 2 + cmd/tests/cmd_run_test.go | 99 +++++++++++++++++++++++++++++++++++++- cmd/tests/events/events.go | 74 ++++++++++++++++++++++++++++ event/type.go | 1 + js/runner.go | 15 ++++++ lib/test_state.go | 2 + 7 files changed, 223 insertions(+), 1 deletion(-) create mode 100644 cmd/tests/events/events.go diff --git a/cmd/run.go b/cmd/run.go index fda928a55310..402c295d28f8 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -23,6 +23,7 @@ import ( "go.k6.io/k6/cmd/state" "go.k6.io/k6/errext" "go.k6.io/k6/errext/exitcodes" + "go.k6.io/k6/event" "go.k6.io/k6/execution" "go.k6.io/k6/js/common" "go.k6.io/k6/lib" @@ -78,6 +79,18 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { }() } + defer func() { + waitExitDone := test.preInitState.Events.Emit(&event.Event{ + Type: event.Exit, + Data: &event.ExitData{Error: err}, + }) + // TODO: Make the timeout configurable? + if werr := waitExitDone(5 * time.Second); werr != nil { + logger.WithError(werr).Warn() + } + test.preInitState.Events.UnsubscribeAll() + }() + // Write the full consolidated *and derived* options back to the Runner. conf := test.derivedConfig testRunState, err := test.buildTestRunState(conf.Options) @@ -153,6 +166,12 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { }() } + // TODO: Subscribe all initialization processes (outputs, VUs and executors) + // to the Init event. This would allow running them concurrently, and they + // could be synchronized by waiting for the event processing to complete. + // This could later be expanded to also initialize browser processes. + waitInitDone := test.preInitState.Events.Emit(&event.Event{Type: event.Init}) + // Create and start the outputs. We do it quite early to get any output URLs // or other details below. It also allows us to ensure when they have // flushed their samples and when they have stopped in the defer statements. @@ -300,9 +319,21 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { }() } + // TODO: Make the timeout configurable? + if werr := waitInitDone(10 * time.Second); werr != nil { + logger.WithError(werr).Warn() + } + test.preInitState.Events.Emit(&event.Event{Type: event.TestStart}) // Start the test! However, we won't immediately return if there was an // error, we still have things to do. err = execScheduler.Run(globalCtx, runCtx, samples) + waitTestEndDone := test.preInitState.Events.Emit(&event.Event{Type: event.TestEnd}) + defer func() { + // TODO: Make the timeout configurable? + if werr := waitTestEndDone(5 * time.Second); werr != nil { + logger.WithError(werr).Warn() + } + }() // Init has passed successfully, so unless disabled, make sure we send a // usage report after the context is done. diff --git a/cmd/test_load.go b/cmd/test_load.go index bf7378dda749..d11d504efc14 100644 --- a/cmd/test_load.go +++ b/cmd/test_load.go @@ -16,6 +16,7 @@ import ( "go.k6.io/k6/cmd/state" "go.k6.io/k6/errext" "go.k6.io/k6/errext/exitcodes" + "go.k6.io/k6/event" "go.k6.io/k6/js" "go.k6.io/k6/lib" "go.k6.io/k6/lib/fsext" @@ -70,6 +71,7 @@ func loadTest(gs *state.GlobalState, cmd *cobra.Command, args []string) (*loaded RuntimeOptions: runtimeOptions, Registry: registry, BuiltinMetrics: metrics.RegisterBuiltinMetrics(registry), + Events: event.NewEventSystem(gs.Ctx, 100, gs.Logger), LookupEnv: func(key string) (string, bool) { val, ok := gs.Env[key] return val, ok diff --git a/cmd/tests/cmd_run_test.go b/cmd/tests/cmd_run_test.go index ae11a58311ee..25ade413fb7d 100644 --- a/cmd/tests/cmd_run_test.go +++ b/cmd/tests/cmd_run_test.go @@ -13,6 +13,7 @@ import ( "runtime" "strings" "sync" + "sync/atomic" "syscall" "testing" "time" @@ -23,7 +24,10 @@ import ( "github.com/tidwall/gjson" "go.k6.io/k6/cloudapi" "go.k6.io/k6/cmd" + "go.k6.io/k6/cmd/tests/events" "go.k6.io/k6/errext/exitcodes" + "go.k6.io/k6/event" + "go.k6.io/k6/js/modules" "go.k6.io/k6/lib/consts" "go.k6.io/k6/lib/fsext" "go.k6.io/k6/lib/testutils" @@ -42,7 +46,6 @@ func TestVersion(t *testing.T) { assert.Contains(t, stdout, runtime.Version()) assert.Contains(t, stdout, runtime.GOOS) assert.Contains(t, stdout, runtime.GOARCH) - assert.NotContains(t, stdout[:len(stdout)-1], "\n") assert.Empty(t, ts.Stderr.Bytes()) assert.Empty(t, ts.LoggerHook.Drain()) @@ -1998,3 +2001,97 @@ func TestBadLogOutput(t *testing.T) { }) } } + +// HACK: We need this so multiple tests can register differently named modules. +var uniqueModuleNumber uint64 //nolint:gochecknoglobals + +// Tests that the appropriate events are emitted at the appropriate times. +func TestEventSystemOK(t *testing.T) { + t.Parallel() + + ts := NewGlobalTestState(t) + + moduleName := fmt.Sprintf("k6/x/testevents-%d", atomic.AddUint64(&uniqueModuleNumber, 1)) + modules.Register(moduleName, events.New( + ts.GlobalState.DefaultFlags.Address, []event.Type{ + event.Init, event.TestStart, event.IterStart, event.IterEnd, + event.TestEnd, event.Exit, + })) + + ts.CmdArgs = []string{"k6", "--quiet", "run", "-"} + ts.Stdin = bytes.NewBuffer([]byte(fmt.Sprintf(` + import events from '%s'; + import { sleep } from 'k6'; + + export let options = { + vus: 1, + iterations: 5, + } + + export default function () { sleep(1); } + `, moduleName))) + + cmd.ExecuteWithGlobalState(ts.GlobalState) + + expLog := []string{ + `got event Init with data '', test status: InitVUs`, + `got event TestStart with data '', test status: Running`, + `got event IterStart with data '{Iteration:0 VUID:1 ScenarioName:default Error:}', test status: Running`, + `got event IterEnd with data '{Iteration:0 VUID:1 ScenarioName:default Error:}', test status: Running`, + `got event IterStart with data '{Iteration:1 VUID:1 ScenarioName:default Error:}', test status: Running`, + `got event IterEnd with data '{Iteration:1 VUID:1 ScenarioName:default Error:}', test status: Running`, + `got event IterStart with data '{Iteration:2 VUID:1 ScenarioName:default Error:}', test status: Running`, + `got event IterEnd with data '{Iteration:2 VUID:1 ScenarioName:default Error:}', test status: Running`, + `got event IterStart with data '{Iteration:3 VUID:1 ScenarioName:default Error:}', test status: Running`, + `got event IterEnd with data '{Iteration:3 VUID:1 ScenarioName:default Error:}', test status: Running`, + `got event IterStart with data '{Iteration:4 VUID:1 ScenarioName:default Error:}', test status: Running`, + `got event IterEnd with data '{Iteration:4 VUID:1 ScenarioName:default Error:}', test status: Ended`, + `got event TestEnd with data '', test status: Ended`, + `got event Exit with data '&{Error:}'`, + } + log := ts.LoggerHook.Lines() + assert.Equal(t, expLog, log) +} + +// Tests that the exit event is emitted with the test exit error. +func TestEventSystemAborted(t *testing.T) { + t.Parallel() + + ts := NewGlobalTestState(t) + + moduleName := fmt.Sprintf("k6/x/testevents-%d", atomic.AddUint64(&uniqueModuleNumber, 1)) + modules.Register(moduleName, events.New( + ts.GlobalState.DefaultFlags.Address, []event.Type{ + event.Init, event.TestStart, event.TestEnd, event.Exit, + })) + + ts.CmdArgs = []string{"k6", "--quiet", "run", "-"} + ts.ExpectedExitCode = int(exitcodes.ScriptAborted) + ts.Stdin = bytes.NewBuffer([]byte(fmt.Sprintf(` + import events from '%s'; + import { test } from 'k6/execution'; + import { sleep } from 'k6'; + + export let options = { + vus: 5, + duration: '5s', + } + + export default function () { + sleep(1); + test.abort('oops!'); + } + `, moduleName))) + + cmd.ExecuteWithGlobalState(ts.GlobalState) + + expLog := []string{ + `got event Init with data '', test status: InitVUs`, + `got event TestStart with data '', test status: Running`, + `got event TestEnd with data '', test status: Interrupted`, + `got event Exit with data '&{Error:test aborted: oops! at file:///-:13:14(12)}'`, + `test aborted: oops! at file:///-:13:14(12)`, + } + log := ts.LoggerHook.Lines() + assert.Equal(t, expLog, log) +} diff --git a/cmd/tests/events/events.go b/cmd/tests/events/events.go new file mode 100644 index 000000000000..e9c590d354cf --- /dev/null +++ b/cmd/tests/events/events.go @@ -0,0 +1,74 @@ +// Package events is used for testing the event functionality. +package events + +import ( + "fmt" + "sync" + + "go.k6.io/k6/api/v1/client" + "go.k6.io/k6/event" + "go.k6.io/k6/js/modules" +) + +// RootModule is the global module instance that will create module +// instances for each VU. +type RootModule struct { + initOnce sync.Once + apiAddress string + subscribeEvents []event.Type +} + +// Events represents an instance of the events module. +type Events struct{} + +var ( + _ modules.Module = &RootModule{} + _ modules.Instance = &Events{} +) + +// New returns a pointer to a new RootModule instance. +func New(apiAddress string, subscribeEvents []event.Type) *RootModule { + return &RootModule{ + initOnce: sync.Once{}, + apiAddress: apiAddress, + subscribeEvents: subscribeEvents, + } +} + +// NewModuleInstance implements the modules.Module interface to return +// a new instance for each VU. +func (rm *RootModule) NewModuleInstance(vu modules.VU) modules.Instance { + rm.initOnce.Do(func() { + sid, evtCh := vu.Events().Subscribe(rm.subscribeEvents...) + logger := vu.InitEnv().Logger + go func() { + api, _ := client.New(rm.apiAddress) + for { + select { + case evt, ok := <-evtCh: + if !ok { + return + } + var testStatus string + if evt.Type != event.Exit { + status, _ := api.Status(vu.Context()) + testStatus = fmt.Sprintf(", test status: %s", status.Status.String()) + } + logger.Infof("got event %s with data '%+v'%s", evt.Type, evt.Data, testStatus) + evt.Done() + if evt.Type == event.Exit { + vu.Events().Unsubscribe(sid) + } + case <-vu.Context().Done(): + return + } + } + }() + }) + return &Events{} +} + +// Exports returns the exports of the k6 module. +func (e *Events) Exports() modules.Exports { + return modules.Exports{Default: e} +} diff --git a/event/type.go b/event/type.go index 1a5a91c17f06..624a0a9525a4 100644 --- a/event/type.go +++ b/event/type.go @@ -31,4 +31,5 @@ type IterData struct { Iteration uint64 VUID uint64 ScenarioName string + Error error } diff --git a/js/runner.go b/js/runner.go index f604c4e4f891..0918a57c5333 100644 --- a/js/runner.go +++ b/js/runner.go @@ -23,6 +23,7 @@ import ( "go.k6.io/k6/errext" "go.k6.io/k6/errext/exitcodes" + "go.k6.io/k6/event" "go.k6.io/k6/js/common" "go.k6.io/k6/js/eventloop" "go.k6.io/k6/lib" @@ -765,6 +766,16 @@ func (u *ActiveVU) RunOnce() error { ctx, cancel := context.WithCancel(u.RunContext) defer cancel() u.moduleVUImpl.ctx = ctx + + eventIterData := event.IterData{ + Iteration: uint64(u.iteration), + VUID: u.ID, + ScenarioName: u.scenarioName, + } + if u.Runner.preInitState.Events != nil { + u.Runner.preInitState.Events.Emit(&event.Event{Type: event.IterStart, Data: eventIterData}) + } + // Call the exported function. _, isFullIteration, totalTime, err := u.runFn(ctx, true, fn, cancel, u.setupData) if err != nil { @@ -775,6 +786,10 @@ func (u *ActiveVU) RunOnce() error { err = v } } + eventIterData.Error = err + } + if u.Runner.preInitState.Events != nil { + u.Runner.preInitState.Events.Emit(&event.Event{Type: event.IterEnd, Data: eventIterData}) } // If MinIterationDuration is specified and the iteration wasn't canceled diff --git a/lib/test_state.go b/lib/test_state.go index 9fd5552839de..370b12103ccc 100644 --- a/lib/test_state.go +++ b/lib/test_state.go @@ -4,6 +4,7 @@ import ( "io" "github.com/sirupsen/logrus" + "go.k6.io/k6/event" "go.k6.io/k6/metrics" ) @@ -13,6 +14,7 @@ type TestPreInitState struct { RuntimeOptions RuntimeOptions Registry *metrics.Registry BuiltinMetrics *metrics.BuiltinMetrics + Events *event.System KeyLogger io.Writer LookupEnv func(key string) (val string, ok bool) Logger logrus.FieldLogger From c8b17b83be1f9c5312b1e79dd8bdfdfedb9d83aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20Miri=C4=87?= Date: Tue, 6 Jun 2023 16:07:15 +0200 Subject: [PATCH 04/13] Remove embedded context from event.System --- cmd/run.go | 13 ++++++++++--- cmd/test_load.go | 2 +- event/system.go | 22 +++++++--------------- event/system_test.go | 27 ++++++++++++++------------- 4 files changed, 32 insertions(+), 32 deletions(-) diff --git a/cmd/run.go b/cmd/run.go index 402c295d28f8..071827f4d73d 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -85,7 +85,9 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { Data: &event.ExitData{Error: err}, }) // TODO: Make the timeout configurable? - if werr := waitExitDone(5 * time.Second); werr != nil { + waitExitCtx, waitExitCancel := context.WithTimeout(globalCtx, 5*time.Second) + defer waitExitCancel() + if werr := waitExitDone(waitExitCtx); werr != nil { logger.WithError(werr).Warn() } test.preInitState.Events.UnsubscribeAll() @@ -320,7 +322,10 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { } // TODO: Make the timeout configurable? - if werr := waitInitDone(10 * time.Second); werr != nil { + waitInitCtx, waitInitCancel := context.WithTimeout(globalCtx, 10*time.Second) + defer waitInitCancel() + + if werr := waitInitDone(waitInitCtx); werr != nil { logger.WithError(werr).Warn() } test.preInitState.Events.Emit(&event.Event{Type: event.TestStart}) @@ -330,7 +335,9 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { waitTestEndDone := test.preInitState.Events.Emit(&event.Event{Type: event.TestEnd}) defer func() { // TODO: Make the timeout configurable? - if werr := waitTestEndDone(5 * time.Second); werr != nil { + waitEndCtx, waitEndCancel := context.WithTimeout(globalCtx, 5*time.Second) + defer waitEndCancel() + if werr := waitTestEndDone(waitEndCtx); werr != nil { logger.WithError(werr).Warn() } }() diff --git a/cmd/test_load.go b/cmd/test_load.go index d11d504efc14..ca412fe641b2 100644 --- a/cmd/test_load.go +++ b/cmd/test_load.go @@ -71,7 +71,7 @@ func loadTest(gs *state.GlobalState, cmd *cobra.Command, args []string) (*loaded RuntimeOptions: runtimeOptions, Registry: registry, BuiltinMetrics: metrics.RegisterBuiltinMetrics(registry), - Events: event.NewEventSystem(gs.Ctx, 100, gs.Logger), + Events: event.NewEventSystem(100, gs.Logger), LookupEnv: func(key string) (string, bool) { val, ok := gs.Env[key] return val, ok diff --git a/event/system.go b/event/system.go index 64c60db22128..416f8ac7742c 100644 --- a/event/system.go +++ b/event/system.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "sync" - "time" "github.com/sirupsen/logrus" ) @@ -19,7 +18,6 @@ type Subscriber interface { // System keeps track of subscribers, and allows subscribing to and emitting // events. type System struct { - ctx context.Context subMx sync.RWMutex subIDCount uint64 subscribers map[Type]map[uint64]chan *Event @@ -33,9 +31,8 @@ type System struct { // are emitted very quickly and the event handler goroutine is busy. It is // recommended to handle events in a separate goroutine to not block the // listener goroutine. -func NewEventSystem(ctx context.Context, eventBuffer int, logger logrus.FieldLogger) *System { +func NewEventSystem(eventBuffer int, logger logrus.FieldLogger) *System { return &System{ - ctx: ctx, subscribers: make(map[Type]map[uint64]chan *Event), eventBuffer: eventBuffer, logger: logger, @@ -74,12 +71,12 @@ func (s *System) Subscribe(events ...Type) (subID uint64, eventsCh <-chan *Event // Emit the event to all subscribers of its type. // It returns a function that can be optionally used to wait for all subscribers // to process the event (by signalling via the Done method). -func (s *System) Emit(event *Event) (wait func(time.Duration) error) { +func (s *System) Emit(event *Event) (wait func(context.Context) error) { s.subMx.RLock() defer s.subMx.RUnlock() totalSubs := len(s.subscribers[event.Type]) if totalSubs == 0 { - return func(time.Duration) error { return nil } + return func(context.Context) error { return nil } } if event.Done == nil { @@ -108,11 +105,8 @@ func (s *System) Emit(event *Event) (wait func(time.Duration) error) { "event": event.Type, }).Trace("Emitted event") - return func(timeout time.Duration) error { - var ( - doneCount int - tout = time.After(timeout) - ) + return func(ctx context.Context) error { + var doneCount int for { if doneCount == totalSubs { close(doneCh) @@ -121,10 +115,8 @@ func (s *System) Emit(event *Event) (wait func(time.Duration) error) { select { case <-doneCh: doneCount++ - case <-tout: - return fmt.Errorf("timed out after waiting %s for all '%s' events to be processed", timeout, event.Type) - case <-s.ctx.Done(): - return nil + case <-ctx.Done(): + return fmt.Errorf("context is done before all '%s' events were processed", event.Type) } } } diff --git a/event/system_test.go b/event/system_test.go index 359aec480295..33aed5de0089 100644 --- a/event/system_test.go +++ b/event/system_test.go @@ -20,7 +20,7 @@ func TestEventSystem(t *testing.T) { t.Parallel() logger := logrus.New() logger.SetOutput(io.Discard) - es := NewEventSystem(context.Background(), 10, logger) + es := NewEventSystem(10, logger) require.Len(t, es.subscribers, 0) @@ -45,11 +45,9 @@ func TestEventSystem(t *testing.T) { t.Run("subscribe/panic", func(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() logger := logrus.New() logger.SetOutput(io.Discard) - es := NewEventSystem(ctx, 10, logger) + es := NewEventSystem(10, logger) assert.PanicsWithValue(t, "must subscribe to at least 1 event type", func() { es.Subscribe() }) @@ -59,11 +57,10 @@ func TestEventSystem(t *testing.T) { t.Parallel() testTimeout := 5 * time.Second ctx, cancel := context.WithTimeout(context.Background(), testTimeout) - defer cancel() logger := logrus.New() logger.SetOutput(io.Discard) - es := NewEventSystem(ctx, 10, logger) + es := NewEventSystem(10, logger) s1id, s1ch := es.Subscribe(Init, Exit) s2id, s2ch := es.Subscribe(Init, TestStart, TestEnd, Exit) @@ -143,7 +140,7 @@ func TestEventSystem(t *testing.T) { defer cancel() logger := logrus.New() logger.SetOutput(io.Discard) - es := NewEventSystem(ctx, 100, logger) + es := NewEventSystem(100, logger) var ( wg sync.WaitGroup @@ -163,7 +160,9 @@ func TestEventSystem(t *testing.T) { wait := es.Emit(&Event{Type: Exit, Done: func() { atomic.AddUint32(&done, 1) }}) - err := wait(time.Second) + waitCtx, waitCancel := context.WithTimeout(ctx, time.Second) + defer waitCancel() + err := wait(waitCtx) require.NoError(t, err) assert.Equal(t, uint32(numSubs), done) @@ -172,11 +171,11 @@ func TestEventSystem(t *testing.T) { t.Run("emit_and_wait/error", func(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() logger := logrus.New() logger.SetOutput(io.Discard) - es := NewEventSystem(ctx, 10, logger) + es := NewEventSystem(10, logger) sid, evtCh := es.Subscribe(Exit) var wg sync.WaitGroup @@ -190,8 +189,10 @@ func TestEventSystem(t *testing.T) { wait := es.Emit(&Event{Type: Exit, Done: func() { time.Sleep(200 * time.Millisecond) }}) - err := wait(100 * time.Millisecond) - assert.EqualError(t, err, "timed out after waiting 100ms for all 'Exit' events to be processed") + waitCtx, waitCancel := context.WithTimeout(ctx, 100*time.Millisecond) + defer waitCancel() + err := wait(waitCtx) + assert.EqualError(t, err, "context is done before all 'Exit' events were processed") wg.Wait() }) @@ -200,7 +201,7 @@ func TestEventSystem(t *testing.T) { t.Parallel() logger := logrus.New() logger.SetOutput(io.Discard) - es := NewEventSystem(context.Background(), 10, logger) + es := NewEventSystem(10, logger) require.Len(t, es.subscribers, 0) From be0f53074b7c1349bef1df43821fcb7ace9e8d1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20Miri=C4=87?= Date: Tue, 6 Jun 2023 16:58:51 +0200 Subject: [PATCH 05/13] Remove checking execution status via HTTP API in E2E event tests Resolves https://github.com/grafana/k6/pull/3112#discussion_r1214442355 --- cmd/tests/cmd_run_test.go | 48 ++++++++++++++++++-------------------- cmd/tests/events/events.go | 14 ++--------- 2 files changed, 25 insertions(+), 37 deletions(-) diff --git a/cmd/tests/cmd_run_test.go b/cmd/tests/cmd_run_test.go index 25ade413fb7d..4692b00b8046 100644 --- a/cmd/tests/cmd_run_test.go +++ b/cmd/tests/cmd_run_test.go @@ -2012,11 +2012,10 @@ func TestEventSystemOK(t *testing.T) { ts := NewGlobalTestState(t) moduleName := fmt.Sprintf("k6/x/testevents-%d", atomic.AddUint64(&uniqueModuleNumber, 1)) - modules.Register(moduleName, events.New( - ts.GlobalState.DefaultFlags.Address, []event.Type{ - event.Init, event.TestStart, event.IterStart, event.IterEnd, - event.TestEnd, event.Exit, - })) + modules.Register(moduleName, events.New([]event.Type{ + event.Init, event.TestStart, event.IterStart, event.IterEnd, + event.TestEnd, event.Exit, + })) ts.CmdArgs = []string{"k6", "--quiet", "run", "-"} ts.Stdin = bytes.NewBuffer([]byte(fmt.Sprintf(` @@ -2034,19 +2033,19 @@ func TestEventSystemOK(t *testing.T) { cmd.ExecuteWithGlobalState(ts.GlobalState) expLog := []string{ - `got event Init with data '', test status: InitVUs`, - `got event TestStart with data '', test status: Running`, - `got event IterStart with data '{Iteration:0 VUID:1 ScenarioName:default Error:}', test status: Running`, - `got event IterEnd with data '{Iteration:0 VUID:1 ScenarioName:default Error:}', test status: Running`, - `got event IterStart with data '{Iteration:1 VUID:1 ScenarioName:default Error:}', test status: Running`, - `got event IterEnd with data '{Iteration:1 VUID:1 ScenarioName:default Error:}', test status: Running`, - `got event IterStart with data '{Iteration:2 VUID:1 ScenarioName:default Error:}', test status: Running`, - `got event IterEnd with data '{Iteration:2 VUID:1 ScenarioName:default Error:}', test status: Running`, - `got event IterStart with data '{Iteration:3 VUID:1 ScenarioName:default Error:}', test status: Running`, - `got event IterEnd with data '{Iteration:3 VUID:1 ScenarioName:default Error:}', test status: Running`, - `got event IterStart with data '{Iteration:4 VUID:1 ScenarioName:default Error:}', test status: Running`, - `got event IterEnd with data '{Iteration:4 VUID:1 ScenarioName:default Error:}', test status: Ended`, - `got event TestEnd with data '', test status: Ended`, + `got event Init with data ''`, + `got event TestStart with data ''`, + `got event IterStart with data '{Iteration:0 VUID:1 ScenarioName:default Error:}'`, + `got event IterEnd with data '{Iteration:0 VUID:1 ScenarioName:default Error:}'`, + `got event IterStart with data '{Iteration:1 VUID:1 ScenarioName:default Error:}'`, + `got event IterEnd with data '{Iteration:1 VUID:1 ScenarioName:default Error:}'`, + `got event IterStart with data '{Iteration:2 VUID:1 ScenarioName:default Error:}'`, + `got event IterEnd with data '{Iteration:2 VUID:1 ScenarioName:default Error:}'`, + `got event IterStart with data '{Iteration:3 VUID:1 ScenarioName:default Error:}'`, + `got event IterEnd with data '{Iteration:3 VUID:1 ScenarioName:default Error:}'`, + `got event IterStart with data '{Iteration:4 VUID:1 ScenarioName:default Error:}'`, + `got event IterEnd with data '{Iteration:4 VUID:1 ScenarioName:default Error:}'`, + `got event TestEnd with data ''`, `got event Exit with data '&{Error:}'`, } log := ts.LoggerHook.Lines() @@ -2060,10 +2059,9 @@ func TestEventSystemAborted(t *testing.T) { ts := NewGlobalTestState(t) moduleName := fmt.Sprintf("k6/x/testevents-%d", atomic.AddUint64(&uniqueModuleNumber, 1)) - modules.Register(moduleName, events.New( - ts.GlobalState.DefaultFlags.Address, []event.Type{ - event.Init, event.TestStart, event.TestEnd, event.Exit, - })) + modules.Register(moduleName, events.New([]event.Type{ + event.Init, event.TestStart, event.TestEnd, event.Exit, + })) ts.CmdArgs = []string{"k6", "--quiet", "run", "-"} ts.ExpectedExitCode = int(exitcodes.ScriptAborted) @@ -2086,9 +2084,9 @@ func TestEventSystemAborted(t *testing.T) { cmd.ExecuteWithGlobalState(ts.GlobalState) expLog := []string{ - `got event Init with data '', test status: InitVUs`, - `got event TestStart with data '', test status: Running`, - `got event TestEnd with data '', test status: Interrupted`, + `got event Init with data ''`, + `got event TestStart with data ''`, + `got event TestEnd with data ''`, `got event Exit with data '&{Error:test aborted: oops! at file:///-:13:14(12)}'`, `test aborted: oops! at file:///-:13:14(12)`, } diff --git a/cmd/tests/events/events.go b/cmd/tests/events/events.go index e9c590d354cf..73277c34889b 100644 --- a/cmd/tests/events/events.go +++ b/cmd/tests/events/events.go @@ -2,10 +2,8 @@ package events import ( - "fmt" "sync" - "go.k6.io/k6/api/v1/client" "go.k6.io/k6/event" "go.k6.io/k6/js/modules" ) @@ -14,7 +12,6 @@ import ( // instances for each VU. type RootModule struct { initOnce sync.Once - apiAddress string subscribeEvents []event.Type } @@ -27,10 +24,9 @@ var ( ) // New returns a pointer to a new RootModule instance. -func New(apiAddress string, subscribeEvents []event.Type) *RootModule { +func New(subscribeEvents []event.Type) *RootModule { return &RootModule{ initOnce: sync.Once{}, - apiAddress: apiAddress, subscribeEvents: subscribeEvents, } } @@ -42,19 +38,13 @@ func (rm *RootModule) NewModuleInstance(vu modules.VU) modules.Instance { sid, evtCh := vu.Events().Subscribe(rm.subscribeEvents...) logger := vu.InitEnv().Logger go func() { - api, _ := client.New(rm.apiAddress) for { select { case evt, ok := <-evtCh: if !ok { return } - var testStatus string - if evt.Type != event.Exit { - status, _ := api.Status(vu.Context()) - testStatus = fmt.Sprintf(", test status: %s", status.Status.String()) - } - logger.Infof("got event %s with data '%+v'%s", evt.Type, evt.Data, testStatus) + logger.Infof("got event %s with data '%+v'", evt.Type, evt.Data) evt.Done() if evt.Type == event.Exit { vu.Events().Unsubscribe(sid) From 4110273095ab972fe93d3e15c6f04a9cdd78412b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20Miri=C4=87?= Date: Wed, 14 Jun 2023 10:39:10 +0200 Subject: [PATCH 06/13] fixup! Add event system --- event/type.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/event/type.go b/event/type.go index 624a0a9525a4..9d3b5ee549e8 100644 --- a/event/type.go +++ b/event/type.go @@ -28,7 +28,7 @@ type ExitData struct { // IterData is the data sent in the IterStart and IterEnd events. type IterData struct { - Iteration uint64 + Iteration int64 VUID uint64 ScenarioName string Error error From 36449a9ceb5ed7cab4060b39e836aff5ede90351 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20Miri=C4=87?= Date: Wed, 14 Jun 2023 10:39:36 +0200 Subject: [PATCH 07/13] fixup! Emit events in run command and js.VU.RunOnce() --- js/runner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/js/runner.go b/js/runner.go index 0918a57c5333..dd4a9539aad6 100644 --- a/js/runner.go +++ b/js/runner.go @@ -768,7 +768,7 @@ func (u *ActiveVU) RunOnce() error { u.moduleVUImpl.ctx = ctx eventIterData := event.IterData{ - Iteration: uint64(u.iteration), + Iteration: u.iteration, VUID: u.ID, ScenarioName: u.scenarioName, } From f7588d616935b0c338d1fa985a659b9c9de2b52c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20Miri=C4=87?= Date: Mon, 19 Jun 2023 18:23:14 +0200 Subject: [PATCH 08/13] Emit VU events in a VU-local event system This addresses the issue that made it difficult to keep track of events for a specific VU, since the browser module needs to initialize browser processes for each VU[1]. This splits the event system into a global and local (per-VU) one, and the JS module can decide to which they want to subscribe to. --- cmd/tests/cmd_run_test.go | 37 ++++++++++++++++++----- cmd/tests/events/events.go | 56 ++++++++++++++++++++++++++++++----- event/type.go | 8 +++++ js/bundle.go | 19 ++++++++++-- js/common/event.go | 9 ++++++ js/modules/modules.go | 5 ++-- js/modules_vu.go | 10 +++++-- js/modulestest/modulestest.go | 5 ++-- js/runner.go | 8 ++--- 9 files changed, 126 insertions(+), 31 deletions(-) create mode 100644 js/common/event.go diff --git a/cmd/tests/cmd_run_test.go b/cmd/tests/cmd_run_test.go index 4692b00b8046..42409760843a 100644 --- a/cmd/tests/cmd_run_test.go +++ b/cmd/tests/cmd_run_test.go @@ -2005,17 +2005,15 @@ func TestBadLogOutput(t *testing.T) { // HACK: We need this so multiple tests can register differently named modules. var uniqueModuleNumber uint64 //nolint:gochecknoglobals -// Tests that the appropriate events are emitted at the appropriate times. +// Tests that the appropriate events are emitted in the correct order. func TestEventSystemOK(t *testing.T) { t.Parallel() ts := NewGlobalTestState(t) moduleName := fmt.Sprintf("k6/x/testevents-%d", atomic.AddUint64(&uniqueModuleNumber, 1)) - modules.Register(moduleName, events.New([]event.Type{ - event.Init, event.TestStart, event.IterStart, event.IterEnd, - event.TestEnd, event.Exit, - })) + mod := events.New(event.GlobalEvents, event.VUEvents) + modules.Register(moduleName, mod) ts.CmdArgs = []string{"k6", "--quiet", "run", "-"} ts.Stdin = bytes.NewBuffer([]byte(fmt.Sprintf(` @@ -2032,6 +2030,18 @@ func TestEventSystemOK(t *testing.T) { cmd.ExecuteWithGlobalState(ts.GlobalState) + doneCh := make(chan struct{}) + go func() { + mod.WG.Wait() + close(doneCh) + }() + + select { + case <-doneCh: + case <-time.After(time.Second): + t.Fatal("timed out") + } + expLog := []string{ `got event Init with data ''`, `got event TestStart with data ''`, @@ -2059,9 +2069,8 @@ func TestEventSystemAborted(t *testing.T) { ts := NewGlobalTestState(t) moduleName := fmt.Sprintf("k6/x/testevents-%d", atomic.AddUint64(&uniqueModuleNumber, 1)) - modules.Register(moduleName, events.New([]event.Type{ - event.Init, event.TestStart, event.TestEnd, event.Exit, - })) + mod := events.New(event.GlobalEvents, nil) + modules.Register(moduleName, mod) ts.CmdArgs = []string{"k6", "--quiet", "run", "-"} ts.ExpectedExitCode = int(exitcodes.ScriptAborted) @@ -2083,6 +2092,18 @@ func TestEventSystemAborted(t *testing.T) { cmd.ExecuteWithGlobalState(ts.GlobalState) + doneCh := make(chan struct{}) + go func() { + mod.WG.Wait() + close(doneCh) + }() + + select { + case <-doneCh: + case <-time.After(time.Second): + t.Fatal("timed out") + } + expLog := []string{ `got event Init with data ''`, `got event TestStart with data ''`, diff --git a/cmd/tests/events/events.go b/cmd/tests/events/events.go index 73277c34889b..8db59e1ca8fa 100644 --- a/cmd/tests/events/events.go +++ b/cmd/tests/events/events.go @@ -11,8 +11,14 @@ import ( // RootModule is the global module instance that will create module // instances for each VU. type RootModule struct { - initOnce sync.Once - subscribeEvents []event.Type + initOnce sync.Once + globalEvents, vuEvents []event.Type + // Used by the test function to wait for all event handler goroutines to exit, + // to avoid dangling goroutines. + WG sync.WaitGroup + // Closed by the global event handler once the Exit event is received, and + // used as a signal for VU event handlers to also exit. + exit chan struct{} } // Events represents an instance of the events module. @@ -24,10 +30,12 @@ var ( ) // New returns a pointer to a new RootModule instance. -func New(subscribeEvents []event.Type) *RootModule { +func New(globalEvents, vuEvents []event.Type) *RootModule { return &RootModule{ - initOnce: sync.Once{}, - subscribeEvents: subscribeEvents, + initOnce: sync.Once{}, + exit: make(chan struct{}), + globalEvents: globalEvents, + vuEvents: vuEvents, } } @@ -35,9 +43,14 @@ func New(subscribeEvents []event.Type) *RootModule { // a new instance for each VU. func (rm *RootModule) NewModuleInstance(vu modules.VU) modules.Instance { rm.initOnce.Do(func() { - sid, evtCh := vu.Events().Subscribe(rm.subscribeEvents...) + sid, evtCh := vu.Events().Global.Subscribe(rm.globalEvents...) logger := vu.InitEnv().Logger + rm.WG.Add(1) go func() { + defer func() { + close(rm.exit) + rm.WG.Done() + }() for { select { case evt, ok := <-evtCh: @@ -47,7 +60,7 @@ func (rm *RootModule) NewModuleInstance(vu modules.VU) modules.Instance { logger.Infof("got event %s with data '%+v'", evt.Type, evt.Data) evt.Done() if evt.Type == event.Exit { - vu.Events().Unsubscribe(sid) + vu.Events().Global.Unsubscribe(sid) } case <-vu.Context().Done(): return @@ -55,6 +68,35 @@ func (rm *RootModule) NewModuleInstance(vu modules.VU) modules.Instance { } }() }) + + if len(rm.vuEvents) > 0 { + // NOTE: It would be an improvement to only subscribe to events in VUs + // that will actually run the VU function (VU IDs > 0), and not in the + // throwaway VUs used for setup/teardown. But since there's no direct + // access to the VU ID at this point (it would involve getting it from + // vu.Runtime()), we subscribe in all VUs, and all event handler + // goroutines would exit normally once rm.exit is closed. + sid, evtCh := vu.Events().Local.Subscribe(rm.vuEvents...) + logger := vu.InitEnv().Logger + rm.WG.Add(1) + go func() { + defer rm.WG.Done() + for { + select { + case evt, ok := <-evtCh: + if !ok { + return + } + logger.Infof("got event %s with data '%+v'", evt.Type, evt.Data) + evt.Done() + case <-rm.exit: + vu.Events().Local.Unsubscribe(sid) + return + } + } + }() + } + return &Events{} } diff --git a/event/type.go b/event/type.go index 9d3b5ee549e8..7e02f3e04d38 100644 --- a/event/type.go +++ b/event/type.go @@ -20,6 +20,14 @@ const ( Exit ) +//nolint:gochecknoglobals +var ( + // GlobalEvents are emitted once per test run. + GlobalEvents = []Type{Init, TestStart, TestEnd, Exit} + // VUEvents are emitted multiple times per each VU. + VUEvents = []Type{IterStart, IterEnd} +) + // ExitData is the data sent in the Exit event. Error is the error returned by // the run command. type ExitData struct { diff --git a/js/bundle.go b/js/bundle.go index f081ba67ea28..0e384dc1161c 100644 --- a/js/bundle.go +++ b/js/bundle.go @@ -15,6 +15,7 @@ import ( "github.com/sirupsen/logrus" "gopkg.in/guregu/null.v3" + "go.k6.io/k6/event" "go.k6.io/k6/js/common" "go.k6.io/k6/js/compiler" "go.k6.io/k6/js/eventloop" @@ -112,7 +113,14 @@ func newBundle( // Instantiate the bundle into a new VM using a bound init context. This uses a context with a // runtime, but no state, to allow module-provided types to function within the init context. // TODO use a real context - vuImpl := &moduleVUImpl{ctx: context.Background(), runtime: goja.New(), events: piState.Events} + vuImpl := &moduleVUImpl{ + ctx: context.Background(), + runtime: goja.New(), + events: events{ + global: piState.Events, + local: event.NewEventSystem(100, piState.Logger), + }, + } vuImpl.eventLoop = eventloop.New(vuImpl) exports, err := bundle.instantiate(vuImpl, 0) if err != nil { @@ -220,7 +228,14 @@ func (b *Bundle) populateExports(updateOptions bool, exports *goja.Object) error func (b *Bundle) Instantiate(ctx context.Context, vuID uint64) (*BundleInstance, error) { // Instantiate the bundle into a new VM using a bound init context. This uses a context with a // runtime, but no state, to allow module-provided types to function within the init context. - vuImpl := &moduleVUImpl{ctx: ctx, runtime: goja.New(), events: b.preInitState.Events} + vuImpl := &moduleVUImpl{ + ctx: ctx, + runtime: goja.New(), + events: events{ + global: b.preInitState.Events, + local: event.NewEventSystem(100, b.preInitState.Logger), + }, + } vuImpl.eventLoop = eventloop.New(vuImpl) exports, err := b.instantiate(vuImpl, vuID) if err != nil { diff --git a/js/common/event.go b/js/common/event.go new file mode 100644 index 000000000000..d561c09b812f --- /dev/null +++ b/js/common/event.go @@ -0,0 +1,9 @@ +package common + +import "go.k6.io/k6/event" + +// Events are the event subscriber interfaces for the global event system, and +// the local (per-VU) event system. +type Events struct { + Global, Local event.Subscriber +} diff --git a/js/modules/modules.go b/js/modules/modules.go index 2f2e3945dee7..7c3ca7705772 100644 --- a/js/modules/modules.go +++ b/js/modules/modules.go @@ -7,7 +7,6 @@ import ( "strings" "github.com/dop251/goja" - "go.k6.io/k6/event" "go.k6.io/k6/ext" "go.k6.io/k6/js/common" "go.k6.io/k6/lib" @@ -43,7 +42,9 @@ type VU interface { // Context return the context.Context about the current VU Context() context.Context - Events() event.Subscriber + // Events allows subscribing to global k6 execution events, such as Init and + // Exit, and to local (per-VU) events, such as IterStart and IterEnd. + Events() common.Events // InitEnv returns common.InitEnvironment instance if present InitEnv() *common.InitEnvironment diff --git a/js/modules_vu.go b/js/modules_vu.go index 8335d0dded36..ff0a42768dc0 100644 --- a/js/modules_vu.go +++ b/js/modules_vu.go @@ -10,21 +10,25 @@ import ( "go.k6.io/k6/lib" ) +type events struct { + global, local *event.System +} + type moduleVUImpl struct { ctx context.Context initEnv *common.InitEnvironment state *lib.State runtime *goja.Runtime eventLoop *eventloop.EventLoop - events *event.System + events events } func (m *moduleVUImpl) Context() context.Context { return m.ctx } -func (m *moduleVUImpl) Events() event.Subscriber { - return m.events +func (m *moduleVUImpl) Events() common.Events { + return common.Events{Global: m.events.global, Local: m.events.local} } func (m *moduleVUImpl) InitEnv() *common.InitEnvironment { diff --git a/js/modulestest/modulestest.go b/js/modulestest/modulestest.go index 3b5f1169746a..6106da9878f0 100644 --- a/js/modulestest/modulestest.go +++ b/js/modulestest/modulestest.go @@ -4,7 +4,6 @@ import ( "context" "github.com/dop251/goja" - "go.k6.io/k6/event" "go.k6.io/k6/js/common" "go.k6.io/k6/js/modules" "go.k6.io/k6/lib" @@ -16,7 +15,7 @@ var _ modules.VU = &VU{} type VU struct { CtxField context.Context InitEnvField *common.InitEnvironment - EventsField *event.System + EventsField common.Events StateField *lib.State RuntimeField *goja.Runtime RegisterCallbackField func() func(f func() error) @@ -28,7 +27,7 @@ func (m *VU) Context() context.Context { } // Events returns internally set field to conform to modules.VU interface -func (m *VU) Events() event.Subscriber { +func (m *VU) Events() common.Events { return m.EventsField } diff --git a/js/runner.go b/js/runner.go index dd4a9539aad6..382e3b7f4d97 100644 --- a/js/runner.go +++ b/js/runner.go @@ -772,9 +772,7 @@ func (u *ActiveVU) RunOnce() error { VUID: u.ID, ScenarioName: u.scenarioName, } - if u.Runner.preInitState.Events != nil { - u.Runner.preInitState.Events.Emit(&event.Event{Type: event.IterStart, Data: eventIterData}) - } + u.moduleVUImpl.events.local.Emit(&event.Event{Type: event.IterStart, Data: eventIterData}) // Call the exported function. _, isFullIteration, totalTime, err := u.runFn(ctx, true, fn, cancel, u.setupData) @@ -788,9 +786,7 @@ func (u *ActiveVU) RunOnce() error { } eventIterData.Error = err } - if u.Runner.preInitState.Events != nil { - u.Runner.preInitState.Events.Emit(&event.Event{Type: event.IterEnd, Data: eventIterData}) - } + u.moduleVUImpl.events.local.Emit(&event.Event{Type: event.IterEnd, Data: eventIterData}) // If MinIterationDuration is specified and the iteration wasn't canceled // and was less than it, sleep for the remainder From d275b22f3d6dff658b4871de761aafe8782db3a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20Miri=C4=87?= Date: Wed, 21 Jun 2023 12:29:18 +0200 Subject: [PATCH 09/13] Set a very high event wait timeout --- cmd/run.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/cmd/run.go b/cmd/run.go index 071827f4d73d..5e1c5e85c659 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -40,6 +40,12 @@ type cmdRun struct { gs *state.GlobalState } +// We use an excessively high timeout to wait for event processing to complete, +// since prematurely proceeding before it is done could create bigger problems. +// In practice, this effectively acts as no timeout, and the user will have to +// kill k6 if a hang happens, which is the behavior without events anyway. +const waitEventDoneTimeout = 30 * time.Minute + // TODO: split apart some more // //nolint:funlen,gocognit,gocyclo,cyclop @@ -84,8 +90,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { Type: event.Exit, Data: &event.ExitData{Error: err}, }) - // TODO: Make the timeout configurable? - waitExitCtx, waitExitCancel := context.WithTimeout(globalCtx, 5*time.Second) + waitExitCtx, waitExitCancel := context.WithTimeout(globalCtx, waitEventDoneTimeout) defer waitExitCancel() if werr := waitExitDone(waitExitCtx); werr != nil { logger.WithError(werr).Warn() @@ -321,8 +326,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { }() } - // TODO: Make the timeout configurable? - waitInitCtx, waitInitCancel := context.WithTimeout(globalCtx, 10*time.Second) + waitInitCtx, waitInitCancel := context.WithTimeout(globalCtx, waitEventDoneTimeout) defer waitInitCancel() if werr := waitInitDone(waitInitCtx); werr != nil { @@ -334,8 +338,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { err = execScheduler.Run(globalCtx, runCtx, samples) waitTestEndDone := test.preInitState.Events.Emit(&event.Event{Type: event.TestEnd}) defer func() { - // TODO: Make the timeout configurable? - waitEndCtx, waitEndCancel := context.WithTimeout(globalCtx, 5*time.Second) + waitEndCtx, waitEndCancel := context.WithTimeout(globalCtx, waitEventDoneTimeout) defer waitEndCancel() if werr := waitTestEndDone(waitEndCtx); werr != nil { logger.WithError(werr).Warn() From a97bdd057ec743b93e200e5773ec8bb0d1cb4de9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20Miri=C4=87?= Date: Wed, 21 Jun 2023 12:40:44 +0200 Subject: [PATCH 10/13] Wait for IterStart and IterEnd events to be processed The browser module uses the IterStart and IterEnd events for browser initialization and shutdown, so we need to wait for them to complete. There is some concern that this might add some delay to the overall iteration duration, and not just to the iterations where browsers processes are managed, but it should be negligible overall. We should run benchmarks after this change to confirm this. --- js/runner.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/js/runner.go b/js/runner.go index 382e3b7f4d97..9f727d3d0c50 100644 --- a/js/runner.go +++ b/js/runner.go @@ -772,7 +772,13 @@ func (u *ActiveVU) RunOnce() error { VUID: u.ID, ScenarioName: u.scenarioName, } - u.moduleVUImpl.events.local.Emit(&event.Event{Type: event.IterStart, Data: eventIterData}) + + waitEventCtx, waitEventCancel := context.WithTimeout(u.RunContext, 30*time.Minute) + defer waitEventCancel() + waitEventDone := u.moduleVUImpl.events.local.Emit(&event.Event{Type: event.IterStart, Data: eventIterData}) + if err := waitEventDone(waitEventCtx); err != nil { + panic(fmt.Sprintf("error waiting for '%s' event processing to complete: %s", event.IterStart, err)) + } // Call the exported function. _, isFullIteration, totalTime, err := u.runFn(ctx, true, fn, cancel, u.setupData) @@ -786,7 +792,12 @@ func (u *ActiveVU) RunOnce() error { } eventIterData.Error = err } - u.moduleVUImpl.events.local.Emit(&event.Event{Type: event.IterEnd, Data: eventIterData}) + waitEventCtx, waitEventCancel = context.WithTimeout(u.RunContext, 30*time.Minute) + defer waitEventCancel() + waitEventDone = u.moduleVUImpl.events.local.Emit(&event.Event{Type: event.IterEnd, Data: eventIterData}) + if err = waitEventDone(waitEventCtx); err != nil { + panic(fmt.Sprintf("error waiting for '%s' event processing to complete: %s", event.IterEnd, err)) + } // If MinIterationDuration is specified and the iteration wasn't canceled // and was less than it, sleep for the remainder From 846f12fa8dfb627dec2fdb733776bf749013b816 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20Miri=C4=87?= Date: Wed, 21 Jun 2023 14:59:16 +0200 Subject: [PATCH 11/13] DRY event emission, fix issue passing aborted error --- cmd/run.go | 41 ++++++++++++++++++--------------------- cmd/tests/cmd_run_test.go | 2 +- js/runner.go | 24 ++++++++++++----------- 3 files changed, 33 insertions(+), 34 deletions(-) diff --git a/cmd/run.go b/cmd/run.go index 5e1c5e85c659..4e328cb6e585 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -85,16 +85,22 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { }() } + emitEvent := func(evt *event.Event) func() { + waitDone := test.preInitState.Events.Emit(evt) + return func() { + waitCtx, waitCancel := context.WithTimeout(globalCtx, waitEventDoneTimeout) + defer waitCancel() + if werr := waitDone(waitCtx); werr != nil { + logger.WithError(werr).Warn() + } + } + } + defer func() { - waitExitDone := test.preInitState.Events.Emit(&event.Event{ + emitEvent(&event.Event{ Type: event.Exit, Data: &event.ExitData{Error: err}, - }) - waitExitCtx, waitExitCancel := context.WithTimeout(globalCtx, waitEventDoneTimeout) - defer waitExitCancel() - if werr := waitExitDone(waitExitCtx); werr != nil { - logger.WithError(werr).Warn() - } + })() test.preInitState.Events.UnsubscribeAll() }() @@ -177,7 +183,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { // to the Init event. This would allow running them concurrently, and they // could be synchronized by waiting for the event processing to complete. // This could later be expanded to also initialize browser processes. - waitInitDone := test.preInitState.Events.Emit(&event.Event{Type: event.Init}) + waitInitDone := emitEvent(&event.Event{Type: event.Init}) // Create and start the outputs. We do it quite early to get any output URLs // or other details below. It also allows us to ensure when they have @@ -326,24 +332,15 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { }() } - waitInitCtx, waitInitCancel := context.WithTimeout(globalCtx, waitEventDoneTimeout) - defer waitInitCancel() + waitInitDone() + + emitEvent(&event.Event{Type: event.TestStart})() - if werr := waitInitDone(waitInitCtx); werr != nil { - logger.WithError(werr).Warn() - } - test.preInitState.Events.Emit(&event.Event{Type: event.TestStart}) // Start the test! However, we won't immediately return if there was an // error, we still have things to do. err = execScheduler.Run(globalCtx, runCtx, samples) - waitTestEndDone := test.preInitState.Events.Emit(&event.Event{Type: event.TestEnd}) - defer func() { - waitEndCtx, waitEndCancel := context.WithTimeout(globalCtx, waitEventDoneTimeout) - defer waitEndCancel() - if werr := waitTestEndDone(waitEndCtx); werr != nil { - logger.WithError(werr).Warn() - } - }() + + defer emitEvent(&event.Event{Type: event.TestEnd})() // Init has passed successfully, so unless disabled, make sure we send a // usage report after the context is done. diff --git a/cmd/tests/cmd_run_test.go b/cmd/tests/cmd_run_test.go index 42409760843a..108906202da6 100644 --- a/cmd/tests/cmd_run_test.go +++ b/cmd/tests/cmd_run_test.go @@ -1537,7 +1537,7 @@ func TestMinIterationDuration(t *testing.T) { elapsed := time.Since(start) assert.Greater(t, elapsed, 7*time.Second, "expected more time to have passed because of minIterationDuration") assert.Less( - t, elapsed, 14*time.Second, + t, elapsed, 15*time.Second, "expected less time to have passed because minIterationDuration should not affect setup() and teardown() ", ) diff --git a/js/runner.go b/js/runner.go index 9f727d3d0c50..37e1162e893d 100644 --- a/js/runner.go +++ b/js/runner.go @@ -773,13 +773,19 @@ func (u *ActiveVU) RunOnce() error { ScenarioName: u.scenarioName, } - waitEventCtx, waitEventCancel := context.WithTimeout(u.RunContext, 30*time.Minute) - defer waitEventCancel() - waitEventDone := u.moduleVUImpl.events.local.Emit(&event.Event{Type: event.IterStart, Data: eventIterData}) - if err := waitEventDone(waitEventCtx); err != nil { - panic(fmt.Sprintf("error waiting for '%s' event processing to complete: %s", event.IterStart, err)) + emitEvent := func(evt *event.Event) func() { + waitDone := u.moduleVUImpl.events.local.Emit(evt) + return func() { + waitCtx, waitCancel := context.WithTimeout(u.RunContext, 30*time.Minute) + defer waitCancel() + if werr := waitDone(waitCtx); werr != nil { + u.state.Logger.WithError(werr).Warn() + } + } } + emitEvent(&event.Event{Type: event.IterStart, Data: eventIterData})() + // Call the exported function. _, isFullIteration, totalTime, err := u.runFn(ctx, true, fn, cancel, u.setupData) if err != nil { @@ -792,12 +798,8 @@ func (u *ActiveVU) RunOnce() error { } eventIterData.Error = err } - waitEventCtx, waitEventCancel = context.WithTimeout(u.RunContext, 30*time.Minute) - defer waitEventCancel() - waitEventDone = u.moduleVUImpl.events.local.Emit(&event.Event{Type: event.IterEnd, Data: eventIterData}) - if err = waitEventDone(waitEventCtx); err != nil { - panic(fmt.Sprintf("error waiting for '%s' event processing to complete: %s", event.IterEnd, err)) - } + + emitEvent(&event.Event{Type: event.IterEnd, Data: eventIterData})() // If MinIterationDuration is specified and the iteration wasn't canceled // and was less than it, sleep for the remainder From d5897faac46004965bdc2420728fed3bb1cbea50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20Miri=C4=87?= Date: Thu, 22 Jun 2023 12:19:28 +0200 Subject: [PATCH 12/13] Initialize global event system in GlobalState, defer Exit emission earlier This ensures that the Exit event is sent even in the case of an early error, such as a script exception. Resolves https://github.com/grafana/k6/pull/3112#issuecomment-1600982890 --- cmd/run.go | 28 ++++---- cmd/state/state.go | 3 + cmd/test_load.go | 3 +- cmd/tests/cmd_run_test.go | 131 ++++++++++++++++++++++++++------------ cmd/tests/test_state.go | 2 + 5 files changed, 111 insertions(+), 56 deletions(-) diff --git a/cmd/run.go b/cmd/run.go index 4e328cb6e585..75c5ede4024a 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -73,20 +73,8 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { // from sub-contexts while also attaching a reason for the abort. runCtx, runAbort := execution.NewTestRunContext(lingerCtx, logger) - test, err := loadAndConfigureTest(c.gs, cmd, args, getConfig) - if err != nil { - return err - } - if test.keyLogger != nil { - defer func() { - if klErr := test.keyLogger.Close(); klErr != nil { - logger.WithError(klErr).Warn("Error while closing the SSLKEYLOGFILE") - } - }() - } - emitEvent := func(evt *event.Event) func() { - waitDone := test.preInitState.Events.Emit(evt) + waitDone := c.gs.Events.Emit(evt) return func() { waitCtx, waitCancel := context.WithTimeout(globalCtx, waitEventDoneTimeout) defer waitCancel() @@ -101,9 +89,21 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { Type: event.Exit, Data: &event.ExitData{Error: err}, })() - test.preInitState.Events.UnsubscribeAll() + c.gs.Events.UnsubscribeAll() }() + test, err := loadAndConfigureTest(c.gs, cmd, args, getConfig) + if err != nil { + return err + } + if test.keyLogger != nil { + defer func() { + if klErr := test.keyLogger.Close(); klErr != nil { + logger.WithError(klErr).Warn("Error while closing the SSLKEYLOGFILE") + } + }() + } + // Write the full consolidated *and derived* options back to the Runner. conf := test.derivedConfig testRunState, err := test.buildTestRunState(conf.Options) diff --git a/cmd/state/state.go b/cmd/state/state.go index f0595d7770e2..3facd99d0198 100644 --- a/cmd/state/state.go +++ b/cmd/state/state.go @@ -12,6 +12,7 @@ import ( "github.com/mattn/go-isatty" "github.com/sirupsen/logrus" + "go.k6.io/k6/event" "go.k6.io/k6/lib/fsext" "go.k6.io/k6/ui/console" ) @@ -39,6 +40,7 @@ type GlobalState struct { BinaryName string CmdArgs []string Env map[string]string + Events *event.System DefaultFlags, Flags GlobalFlags @@ -110,6 +112,7 @@ func NewGlobalState(ctx context.Context) *GlobalState { BinaryName: filepath.Base(binary), CmdArgs: os.Args, Env: env, + Events: event.NewEventSystem(100, logger), DefaultFlags: defaultFlags, Flags: getFlags(defaultFlags, env), OutMutex: outMutex, diff --git a/cmd/test_load.go b/cmd/test_load.go index ca412fe641b2..6dd1d6b66651 100644 --- a/cmd/test_load.go +++ b/cmd/test_load.go @@ -16,7 +16,6 @@ import ( "go.k6.io/k6/cmd/state" "go.k6.io/k6/errext" "go.k6.io/k6/errext/exitcodes" - "go.k6.io/k6/event" "go.k6.io/k6/js" "go.k6.io/k6/lib" "go.k6.io/k6/lib/fsext" @@ -71,7 +70,7 @@ func loadTest(gs *state.GlobalState, cmd *cobra.Command, args []string) (*loaded RuntimeOptions: runtimeOptions, Registry: registry, BuiltinMetrics: metrics.RegisterBuiltinMetrics(registry), - Events: event.NewEventSystem(100, gs.Logger), + Events: gs.Events, LookupEnv: func(key string) (string, bool) { val, ok := gs.Env[key] return val, ok diff --git a/cmd/tests/cmd_run_test.go b/cmd/tests/cmd_run_test.go index 108906202da6..2bc44725ca78 100644 --- a/cmd/tests/cmd_run_test.go +++ b/cmd/tests/cmd_run_test.go @@ -2062,55 +2062,106 @@ func TestEventSystemOK(t *testing.T) { assert.Equal(t, expLog, log) } -// Tests that the exit event is emitted with the test exit error. -func TestEventSystemAborted(t *testing.T) { +// Check emitted events in the case of a script error. +func TestEventSystemError(t *testing.T) { t.Parallel() - ts := NewGlobalTestState(t) + testCases := []struct { + name, script string + expLog []string + expExitCode exitcodes.ExitCode + }{ + { + name: "abort", + script: ` + import { test } from 'k6/execution'; - moduleName := fmt.Sprintf("k6/x/testevents-%d", atomic.AddUint64(&uniqueModuleNumber, 1)) - mod := events.New(event.GlobalEvents, nil) - modules.Register(moduleName, mod) + export let options = { + vus: 1, + iterations: 5, + } + + export default function () { + test.abort('oops!'); + } + `, expLog: []string{ + "got event Init with data ''", + "got event TestStart with data ''", + "got event IterStart with data '{Iteration:0 VUID:1 ScenarioName:default Error:}'", + "got event IterEnd with data '{Iteration:0 VUID:1 ScenarioName:default Error:test aborted: oops! at file:///-:11:16(6)}'", + "got event TestEnd with data ''", + "got event Exit with data '&{Error:test aborted: oops! at file:///-:11:16(6)}'", + "test aborted: oops! at file:///-:11:16(6)", + }, + expExitCode: exitcodes.ScriptAborted, + }, + { + name: "init", + script: "undefinedVar", + expLog: []string{ + "got event Exit with data '&{Error:could not initialize '-': could not load JS test " + + "'file:///-': ReferenceError: undefinedVar is not defined\n\tat file:///-:2:0(12)\n}'", + "ReferenceError: undefinedVar is not defined\n\tat file:///-:2:0(12)\n", + }, + expExitCode: exitcodes.ScriptException, + }, + { + name: "throw", + script: ` + export let options = { + vus: 1, + iterations: 2, + } + + export default function () { + throw new Error('oops!'); + } + `, expLog: []string{ + "got event Init with data ''", + "got event TestStart with data ''", + "got event IterStart with data '{Iteration:0 VUID:1 ScenarioName:default Error:}'", + "got event IterEnd with data '{Iteration:0 VUID:1 ScenarioName:default Error:Error: oops!\n\tat file:///-:9:11(3)\n}'", + "Error: oops!\n\tat file:///-:9:11(3)\n", + "got event IterStart with data '{Iteration:1 VUID:1 ScenarioName:default Error:}'", + "got event IterEnd with data '{Iteration:1 VUID:1 ScenarioName:default Error:Error: oops!\n\tat file:///-:9:11(3)\n}'", + "Error: oops!\n\tat file:///-:9:11(3)\n", + "got event TestEnd with data ''", + "got event Exit with data '&{Error:}'", + }, + expExitCode: 0, + }, + } - ts.CmdArgs = []string{"k6", "--quiet", "run", "-"} - ts.ExpectedExitCode = int(exitcodes.ScriptAborted) - ts.Stdin = bytes.NewBuffer([]byte(fmt.Sprintf(` - import events from '%s'; - import { test } from 'k6/execution'; - import { sleep } from 'k6'; + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + ts := NewGlobalTestState(t) - export let options = { - vus: 5, - duration: '5s', - } + moduleName := fmt.Sprintf("k6/x/testevents-%d", atomic.AddUint64(&uniqueModuleNumber, 1)) + mod := events.New(event.GlobalEvents, event.VUEvents) + modules.Register(moduleName, mod) - export default function () { - sleep(1); - test.abort('oops!'); - } - `, moduleName))) + ts.CmdArgs = []string{"k6", "--quiet", "run", "-"} + ts.ExpectedExitCode = int(tc.expExitCode) + ts.Stdin = bytes.NewBuffer([]byte(fmt.Sprintf("import events from '%s';\n%s", moduleName, tc.script))) - cmd.ExecuteWithGlobalState(ts.GlobalState) + cmd.ExecuteWithGlobalState(ts.GlobalState) - doneCh := make(chan struct{}) - go func() { - mod.WG.Wait() - close(doneCh) - }() + doneCh := make(chan struct{}) + go func() { + mod.WG.Wait() + close(doneCh) + }() - select { - case <-doneCh: - case <-time.After(time.Second): - t.Fatal("timed out") - } + select { + case <-doneCh: + case <-time.After(time.Second): + t.Fatal("timed out") + } - expLog := []string{ - `got event Init with data ''`, - `got event TestStart with data ''`, - `got event TestEnd with data ''`, - `got event Exit with data '&{Error:test aborted: oops! at file:///-:13:14(12)}'`, - `test aborted: oops! at file:///-:13:14(12)`, + log := ts.LoggerHook.Lines() + assert.Equal(t, tc.expLog, log) + }) } - log := ts.LoggerHook.Lines() - assert.Equal(t, expLog, log) } diff --git a/cmd/tests/test_state.go b/cmd/tests/test_state.go index 54e5be13a0e3..1459ed97b7ae 100644 --- a/cmd/tests/test_state.go +++ b/cmd/tests/test_state.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.k6.io/k6/cmd/state" + "go.k6.io/k6/event" "go.k6.io/k6/lib/fsext" "go.k6.io/k6/lib/testutils" "go.k6.io/k6/ui/console" @@ -90,6 +91,7 @@ func NewGlobalTestState(tb testing.TB) *GlobalTestState { BinaryName: "k6", CmdArgs: []string{}, Env: map[string]string{"K6_NO_USAGE_REPORT": "true"}, + Events: event.NewEventSystem(100, logger), DefaultFlags: defaultFlags, Flags: defaultFlags, OutMutex: outMutex, From 89fa056643ef9468f193e4f5446abdb786ded956 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20Miri=C4=87?= Date: Fri, 23 Jun 2023 16:50:44 +0200 Subject: [PATCH 13/13] Add benchmarks for run command with and without events See the results here: https://gist.github.com/imiric/b2094c79f45cbdbdcb067671434fde4a --- cmd/tests/cmd_run_test.go | 63 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/cmd/tests/cmd_run_test.go b/cmd/tests/cmd_run_test.go index 2bc44725ca78..0df7021af5dd 100644 --- a/cmd/tests/cmd_run_test.go +++ b/cmd/tests/cmd_run_test.go @@ -2165,3 +2165,66 @@ func TestEventSystemError(t *testing.T) { }) } } + +func BenchmarkRun(b *testing.B) { + b.StopTimer() + + for i := 0; i < b.N; i++ { + ts := NewGlobalTestState(b) + + ts.CmdArgs = []string{"k6", "--quiet", "run", "-"} + ts.Stdin = bytes.NewBuffer([]byte(` + export let options = { + vus: 10, + iterations: 100, + } + + export default function () {} + `)) + ts.ExpectedExitCode = 0 + + b.StartTimer() + cmd.ExecuteWithGlobalState(ts.GlobalState) + b.StopTimer() + } +} + +func BenchmarkRunEvents(b *testing.B) { + b.StopTimer() + + for i := 0; i < b.N; i++ { + ts := NewGlobalTestState(b) + + moduleName := fmt.Sprintf("k6/x/testevents-%d", atomic.AddUint64(&uniqueModuleNumber, 1)) + mod := events.New(event.GlobalEvents, event.VUEvents) + modules.Register(moduleName, mod) + + ts.CmdArgs = []string{"k6", "--quiet", "run", "-"} + ts.Stdin = bytes.NewBuffer([]byte(fmt.Sprintf(` + import events from '%s'; + export let options = { + vus: 10, + iterations: 100, + } + + export default function () {} + `, moduleName))) + ts.ExpectedExitCode = 0 + + b.StartTimer() + cmd.ExecuteWithGlobalState(ts.GlobalState) + b.StopTimer() + + doneCh := make(chan struct{}) + go func() { + mod.WG.Wait() + close(doneCh) + }() + + select { + case <-doneCh: + case <-time.After(time.Second): + b.Fatal("timed out") + } + } +}