From eddd0fed41607856d07d604b2455e97a40f92f16 Mon Sep 17 00:00:00 2001 From: Vignesh Shanmugam Date: Mon, 31 Jan 2022 20:56:58 -0800 Subject: [PATCH] [Heartbeat]: catch all error handler for browser jobs (#30013) * [Heartbeat]: catch all error handler for browser jobs * fix wrapper tests * add tests and consume all events * address review and improve test (cherry picked from commit 267f8cb34853f3fa2c0c6c3cdd33242ca88494a9) --- heartbeat/monitors/wrappers/wrappers.go | 99 +++++----- heartbeat/monitors/wrappers/wrappers_test.go | 57 +++--- x-pack/heartbeat/monitors/browser/suite.go | 14 +- .../monitors/browser/synthexec/enrich.go | 42 ++++- .../monitors/browser/synthexec/enrich_test.go | 176 +++++++++++++++--- .../browser/synthexec/execmultiplexer.go | 25 +-- .../browser/synthexec/execmultiplexer_test.go | 24 +-- .../monitors/browser/synthexec/synthexec.go | 55 +++--- 8 files changed, 328 insertions(+), 164 deletions(-) diff --git a/heartbeat/monitors/wrappers/wrappers.go b/heartbeat/monitors/wrappers/wrappers.go index c9e0c79c105f..2790de08ffde 100644 --- a/heartbeat/monitors/wrappers/wrappers.go +++ b/heartbeat/monitors/wrappers/wrappers.go @@ -50,8 +50,10 @@ func WrapLightweight(js []jobs.Job, stdMonFields stdfields.StdMonitorFields) []j return jobs.WrapAllSeparately( jobs.WrapAll( js, + addMonitorTimespan(stdMonFields), + addServiceName(stdMonFields), addMonitorMeta(stdMonFields, len(js) > 1), - addMonitorStatus(stdMonFields.Type, false), + addMonitorStatus(false), addMonitorDuration, ), func() jobs.JobWrapper { @@ -65,64 +67,75 @@ func WrapLightweight(js []jobs.Job, stdMonFields stdfields.StdMonitorFields) []j func WrapBrowser(js []jobs.Job, stdMonFields stdfields.StdMonitorFields) []jobs.Job { return jobs.WrapAll( js, - addMonitorMeta(stdMonFields, len(js) > 1), - addMonitorStatus(stdMonFields.Type, true), + addMonitorTimespan(stdMonFields), + addServiceName(stdMonFields), + addMonitorStatus(true), ) } // addMonitorMeta adds the id, name, and type fields to the monitor. -func addMonitorMeta(stdMonFields stdfields.StdMonitorFields, isMulti bool) jobs.JobWrapper { +func addMonitorMeta(sf stdfields.StdMonitorFields, isMulti bool) jobs.JobWrapper { return func(job jobs.Job) jobs.Job { return func(event *beat.Event) ([]jobs.Job, error) { - cont, e := job(event) - addMonitorMetaFields(event, time.Now(), stdMonFields, isMulti) - return cont, e + cont, err := job(event) + + id := sf.ID + name := sf.Name + // If multiple jobs are listed for this monitor, we can't have a single ID, so we hash the + // unique URLs to create unique suffixes for the monitor. + if isMulti { + url, err := event.GetValue("url.full") + if err != nil { + logp.Error(errors.Wrap(err, "Mandatory url.full key missing!")) + url = "n/a" + } + urlHash, _ := hashstructure.Hash(url, nil) + id = fmt.Sprintf("%s-%x", sf.ID, urlHash) + } + + eventext.MergeEventFields(event, common.MapStr{ + "monitor": common.MapStr{ + "id": id, + "name": name, + "type": sf.Type, + }, + }) + return cont, err } } } -func addMonitorMetaFields(event *beat.Event, started time.Time, sf stdfields.StdMonitorFields, isMulti bool) { - id := sf.ID - name := sf.Name +func addMonitorTimespan(sf stdfields.StdMonitorFields) jobs.JobWrapper { + return func(origJob jobs.Job) jobs.Job { + return func(event *beat.Event) ([]jobs.Job, error) { + cont, err := origJob(event) - // If multiple jobs are listed for this monitor, we can't have a single ID, so we hash the - // unique URLs to create unique suffixes for the monitor. - if isMulti { - url, err := event.GetValue("url.full") - if err != nil { - logp.Error(errors.Wrap(err, "Mandatory url.full key missing!")) - url = "n/a" + eventext.MergeEventFields(event, common.MapStr{ + "monitor": common.MapStr{ + "timespan": timespan(time.Now(), sf.Schedule, sf.Timeout), + }, + }) + return cont, err } - urlHash, _ := hashstructure.Hash(url, nil) - id = fmt.Sprintf("%s-%x", sf.ID, urlHash) - } - - // Allow jobs to override the ID, useful for browser suites - // which do this logic on their own - if v, _ := event.GetValue("monitor.id"); v != nil { - id = fmt.Sprintf("%s-%s", sf.ID, v.(string)) - } - if v, _ := event.GetValue("monitor.name"); v != nil { - name = fmt.Sprintf("%s - %s", sf.Name, v.(string)) } +} - fieldsToMerge := common.MapStr{ - "monitor": common.MapStr{ - "id": id, - "name": name, - "type": sf.Type, - "timespan": timespan(started, sf.Schedule, sf.Timeout), - }, - } +// Add service.name to monitors for APM interop +func addServiceName(sf stdfields.StdMonitorFields) jobs.JobWrapper { + return func(origJob jobs.Job) jobs.Job { + return func(event *beat.Event) ([]jobs.Job, error) { + cont, err := origJob(event) - // Add service.name for APM interop - if sf.Service.Name != "" { - fieldsToMerge["service"] = common.MapStr{ - "name": sf.Service.Name, + if sf.Service.Name != "" { + eventext.MergeEventFields(event, common.MapStr{ + "service": common.MapStr{ + "name": sf.Service.Name, + }, + }) + } + return cont, err } } - - eventext.MergeEventFields(event, fieldsToMerge) } func timespan(started time.Time, sched *schedule.Schedule, timeout time.Duration) common.MapStr { @@ -142,7 +155,7 @@ func timespan(started time.Time, sched *schedule.Schedule, timeout time.Duration // by the original Job will be set as a field. The original error will not be // passed through as a return value. Errors may still be present but only if there // is an actual error wrapping the error. -func addMonitorStatus(monitorType string, summaryOnly bool) jobs.JobWrapper { +func addMonitorStatus(summaryOnly bool) jobs.JobWrapper { return func(origJob jobs.Job) jobs.Job { return func(event *beat.Event) ([]jobs.Job, error) { cont, err := origJob(event) diff --git a/heartbeat/monitors/wrappers/wrappers_test.go b/heartbeat/monitors/wrappers/wrappers_test.go index b84b18f65a92..049b6ebd8fba 100644 --- a/heartbeat/monitors/wrappers/wrappers_test.go +++ b/heartbeat/monitors/wrappers/wrappers_test.go @@ -57,8 +57,6 @@ var testMonFields = stdfields.StdMonitorFields{ } var testBrowserMonFields = stdfields.StdMonitorFields{ - ID: "myid", - Name: "myname", Type: "browser", Schedule: schedule.MustParse("@every 1s"), Timeout: 1, @@ -396,6 +394,18 @@ func TestTimespan(t *testing.T) { } } +type BrowserMonitor struct { + id string + name string + checkGroup string +} + +var inlineMonitorValues = BrowserMonitor{ + id: "inline", + name: "inline", + checkGroup: "inline-check-group", +} + func makeInlineBrowserJob(t *testing.T, u string) jobs.Job { parsed, err := url.Parse(u) require.NoError(t, err) @@ -403,16 +413,18 @@ func makeInlineBrowserJob(t *testing.T, u string) jobs.Job { eventext.MergeEventFields(event, common.MapStr{ "url": URLFields(parsed), "monitor": common.MapStr{ - "check_group": "inline-check-group", + "type": "browser", + "id": inlineMonitorValues.id, + "name": inlineMonitorValues.name, + "check_group": inlineMonitorValues.checkGroup, }, }) return nil, nil } } -// Inline browser jobs function very similarly to lightweight jobs -// in that they don't override the ID. -// They do not, however, get a summary field added, nor duration. +// Browser inline jobs monitor information should not be altered +// by the wrappers as they are handled separately in synth enricher func TestInlineBrowserJob(t *testing.T) { fields := testBrowserMonFields testCommonWrap(t, testDef{ @@ -425,10 +437,10 @@ func TestInlineBrowserJob(t *testing.T) { urlValidator(t, "http://foo.com"), lookslike.MustCompile(map[string]interface{}{ "monitor": map[string]interface{}{ - "id": testMonFields.ID, - "name": testMonFields.Name, - "type": fields.Type, - "check_group": "inline-check-group", + "type": "browser", + "id": inlineMonitorValues.id, + "name": inlineMonitorValues.name, + "check_group": inlineMonitorValues.checkGroup, }, }), hbtestllext.MonitorTimespanValidator, @@ -439,13 +451,9 @@ func TestInlineBrowserJob(t *testing.T) { }) } -var suiteBrowserJobValues = struct { - id string - name string - checkGroup string -}{ - id: "journey_1", - name: "Journey 1", +var suiteMonitorValues = BrowserMonitor{ + id: "suite-journey_1", + name: "suite-Journey 1", checkGroup: "journey-1-check-group", } @@ -456,9 +464,10 @@ func makeSuiteBrowserJob(t *testing.T, u string, summary bool, suiteErr error) j eventext.MergeEventFields(event, common.MapStr{ "url": URLFields(parsed), "monitor": common.MapStr{ - "id": suiteBrowserJobValues.id, - "name": suiteBrowserJobValues.name, - "check_group": suiteBrowserJobValues.checkGroup, + "type": "browser", + "id": suiteMonitorValues.id, + "name": suiteMonitorValues.name, + "check_group": suiteMonitorValues.checkGroup, }, }) if summary { @@ -482,10 +491,10 @@ func TestSuiteBrowserJob(t *testing.T) { urlU, _ := url.Parse(urlStr) expectedMonFields := lookslike.MustCompile(map[string]interface{}{ "monitor": map[string]interface{}{ - "id": fmt.Sprintf("%s-%s", testMonFields.ID, suiteBrowserJobValues.id), - "name": fmt.Sprintf("%s - %s", testMonFields.Name, suiteBrowserJobValues.name), - "type": fields.Type, - "check_group": suiteBrowserJobValues.checkGroup, + "type": "browser", + "id": suiteMonitorValues.id, + "name": suiteMonitorValues.name, + "check_group": suiteMonitorValues.checkGroup, "timespan": common.MapStr{ "gte": hbtestllext.IsTime, "lt": hbtestllext.IsTime, diff --git a/x-pack/heartbeat/monitors/browser/suite.go b/x-pack/heartbeat/monitors/browser/suite.go index aff137baeca8..c4f52921b41d 100644 --- a/x-pack/heartbeat/monitors/browser/suite.go +++ b/x-pack/heartbeat/monitors/browser/suite.go @@ -66,6 +66,16 @@ func (s *Suite) FilterJourneys() synthexec.FilterJourneyConfig { return s.suiteCfg.FilterJourneys } +func (s *Suite) Fields() synthexec.StdSuiteFields { + _, isInline := s.InlineSource() + return synthexec.StdSuiteFields{ + Name: s.suiteCfg.Name, + Id: s.suiteCfg.Id, + IsInline: isInline, + Type: "browser", + } +} + func (s *Suite) Close() error { if s.suiteCfg.Source.ActiveMemo != nil { s.suiteCfg.Source.ActiveMemo.Close() @@ -102,14 +112,14 @@ func (s *Suite) extraArgs() []string { func (s *Suite) jobs() []jobs.Job { var j jobs.Job if src, ok := s.InlineSource(); ok { - j = synthexec.InlineJourneyJob(context.TODO(), src, s.Params(), s.extraArgs()...) + j = synthexec.InlineJourneyJob(context.TODO(), src, s.Params(), s.Fields(), s.extraArgs()...) } else { j = func(event *beat.Event) ([]jobs.Job, error) { err := s.Fetch() if err != nil { return nil, fmt.Errorf("could not fetch for suite job: %w", err) } - sj, err := synthexec.SuiteJob(context.TODO(), s.Workdir(), s.Params(), s.FilterJourneys(), s.extraArgs()...) + sj, err := synthexec.SuiteJob(context.TODO(), s.Workdir(), s.Params(), s.FilterJourneys(), s.Fields(), s.extraArgs()...) if err != nil { return nil, err } diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go index bec551a7947e..3e01c387cb70 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go @@ -18,18 +18,18 @@ import ( "github.com/elastic/beats/v7/libbeat/common" ) -type enricher func(event *beat.Event, se *SynthEvent) error +type enricher func(event *beat.Event, se *SynthEvent, fields StdSuiteFields) error type streamEnricher struct { je *journeyEnricher } -func (e *streamEnricher) enrich(event *beat.Event, se *SynthEvent) error { +func (e *streamEnricher) enrich(event *beat.Event, se *SynthEvent, fields StdSuiteFields) error { if e.je == nil || (se != nil && se.Type == "journey/start") { e.je = newJourneyEnricher() } - return e.je.enrich(event, se) + return e.je.enrich(event, se, fields) } // journeyEnricher holds state across received SynthEvents retaining fields @@ -62,7 +62,7 @@ func makeUuid() string { return u.String() } -func (je *journeyEnricher) enrich(event *beat.Event, se *SynthEvent) error { +func (je *journeyEnricher) enrich(event *beat.Event, se *SynthEvent, fields StdSuiteFields) error { if se == nil { return nil } @@ -84,20 +84,41 @@ func (je *journeyEnricher) enrich(event *beat.Event, se *SynthEvent) error { } eventext.MergeEventFields(event, common.MapStr{ + "event": common.MapStr{ + "type": se.Type, + }, "monitor": common.MapStr{ "check_group": je.checkGroup, }, }) - // Inline jobs have no journey - if je.journey != nil { + + // Id and name differs for inline and suite monitors + // - We use the monitor id and name for inline journeys ignoring the + // autogenerated `inline`journey id and name. + // - Monitor id/name is concatenated with the journey id/name for + // suite monitors + id := fields.Id + name := fields.Name + if !fields.IsInline && je.journey != nil { + id = fmt.Sprintf("%s-%s", id, je.journey.Id) + name = fmt.Sprintf("%s - %s", name, je.journey.Name) + } + eventext.MergeEventFields(event, common.MapStr{ + "monitor": common.MapStr{ + "id": id, + "name": name, + }, + }) + + // Write suite level fields for suite monitors + if !fields.IsInline { eventext.MergeEventFields(event, common.MapStr{ - "monitor": common.MapStr{ - "id": je.journey.Id, - "name": je.journey.Name, + "suite": common.MapStr{ + "id": fields.Id, + "name": fields.Name, }, }) } - return je.enrichSynthEvent(event, se) } @@ -141,6 +162,7 @@ func (je *journeyEnricher) enrichSynthEvent(event *beat.Event, se *SynthEvent) e // In that case we always want to issue an update op event.Meta.Put(events.FieldMetaOpType, events.OpTypeCreate) } + eventext.MergeEventFields(event, se.ToMap()) if je.urlFields == nil { diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go index f2c8ba25dca9..da2f8980dc81 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go @@ -19,6 +19,7 @@ import ( "github.com/elastic/beats/v7/libbeat/processors/add_data_stream" "github.com/elastic/go-lookslike" "github.com/elastic/go-lookslike/testslike" + "github.com/elastic/go-lookslike/validator" ) func makeStepEvent(typ string, ts float64, name string, index int, status string, urlstr string, err *SynthError) *SynthEvent { @@ -34,6 +35,12 @@ func makeStepEvent(typ string, ts float64, name string, index int, status string } func TestJourneyEnricher(t *testing.T) { + var stdFields = StdSuiteFields{ + Id: "mysuite", + Name: "mysuite", + Type: "browser", + IsInline: false, + } journey := &Journey{ Name: "A Journey Name", Id: "my-journey-id", @@ -77,39 +84,149 @@ func TestJourneyEnricher(t *testing.T) { journeyEnd, } - je := &journeyEnricher{} + suiteValidator := func() validator.Validator { + return lookslike.MustCompile(common.MapStr{ + "suite.id": stdFields.Id, + "suite.name": stdFields.Name, + "monitor.id": fmt.Sprintf("%s-%s", stdFields.Id, journey.Id), + "monitor.name": fmt.Sprintf("%s - %s", stdFields.Name, journey.Name), + }) + } + inlineValidator := func() validator.Validator { + return lookslike.MustCompile(common.MapStr{ + "monitor.id": stdFields.Id, + "monitor.name": stdFields.Name, + }) + } + commonValidator := func(se *SynthEvent) validator.Validator { + var v []validator.Validator - // We need an expectation for each input - // plus a final expectation for the summary which comes - // on the nil data. - for idx, se := range synthEvents { + // We need an expectation for each input plus a final + // expectation for the summary which comes on the nil data. + if se.Type != "journey/end" { + // Test that the created event includes the mapped + // version of the event + v = append(v, lookslike.MustCompile(se.ToMap())) + } else { + u, _ := url.Parse(url1) + // journey end gets a summary + v = append(v, lookslike.MustCompile(common.MapStr{ + "synthetics.type": "heartbeat/summary", + "url": wrappers.URLFields(u), + "monitor.duration.us": int64(journeyEnd.Timestamp().Sub(journeyStart.Timestamp()) / time.Microsecond), + })) + } + return lookslike.Compose(v...) + } + + je := &journeyEnricher{} + check := func(t *testing.T, se *SynthEvent, ssf StdSuiteFields) { e := &beat.Event{} - t.Run(fmt.Sprintf("event %d", idx), func(t *testing.T) { - enrichErr := je.enrich(e, se) + t.Run(fmt.Sprintf("event: %s", se.Type), func(t *testing.T) { + enrichErr := je.enrich(e, se, ssf) + if se.Error != nil { + require.Equal(t, stepError(se.Error), enrichErr) + } + if ssf.IsInline { + sv, _ := e.Fields.GetValue("suite") + require.Nil(t, sv) + testslike.Test(t, inlineValidator(), e.Fields) + } else { + testslike.Test(t, suiteValidator(), e.Fields) + } + testslike.Test(t, commonValidator(se), e.Fields) - if se != nil && se.Type != "journey/end" { - // Test that the created event includes the mapped - // version of the event - testslike.Test(t, lookslike.MustCompile(se.ToMap()), e.Fields) - require.Equal(t, se.Timestamp().Unix(), e.Timestamp.Unix()) + require.Equal(t, se.Timestamp().Unix(), e.Timestamp.Unix()) + }) + } - if se.Error != nil { - require.Equal(t, stepError(se.Error), enrichErr) - } - } else { // journey end gets a summary - require.Equal(t, stepError(syntherr), enrichErr) + tests := []struct { + name string + isInline bool + se []*SynthEvent + }{ + { + name: "suite monitor", + isInline: false, + }, + { + name: "inline monitor", + isInline: true, + }, + } - u, _ := url.Parse(url1) - t.Run("summary", func(t *testing.T) { - v := lookslike.MustCompile(common.MapStr{ - "synthetics.type": "heartbeat/summary", - "url": wrappers.URLFields(u), - "monitor.duration.us": int64(journeyEnd.Timestamp().Sub(journeyStart.Timestamp()) / time.Microsecond), - }) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + stdFields.IsInline = tt.isInline + for _, se := range synthEvents { + check(t, se, stdFields) + } + }) + } +} - testslike.Test(t, v, e.Fields) +func TestEnrichConsoleSynthEvents(t *testing.T) { + tests := []struct { + name string + je *journeyEnricher + se *SynthEvent + check func(t *testing.T, e *beat.Event, je *journeyEnricher) + }{ + { + "stderr", + &journeyEnricher{}, + &SynthEvent{ + Type: "stderr", + Payload: common.MapStr{ + "message": "Error from synthetics", + }, + PackageVersion: "1.0.0", + }, + func(t *testing.T, e *beat.Event, je *journeyEnricher) { + v := lookslike.MustCompile(common.MapStr{ + "synthetics": common.MapStr{ + "payload": common.MapStr{ + "message": "Error from synthetics", + }, + "type": "stderr", + "package_version": "1.0.0", + "index": 0, + }, }) - } + testslike.Test(t, v, e.Fields) + }, + }, + { + "stdout", + &journeyEnricher{}, + &SynthEvent{ + Type: "stdout", + Payload: common.MapStr{ + "message": "debug output", + }, + PackageVersion: "1.0.0", + }, + func(t *testing.T, e *beat.Event, je *journeyEnricher) { + v := lookslike.MustCompile(common.MapStr{ + "synthetics": common.MapStr{ + "payload": common.MapStr{ + "message": "debug output", + }, + "type": "stdout", + "package_version": "1.0.0", + "index": 0, + }, + }) + testslike.Test(t, lookslike.Strict(v), e.Fields) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + e := &beat.Event{} + tt.je.enrichSynthEvent(e, tt.se) + tt.check(t, e, tt.je) }) } } @@ -131,7 +248,7 @@ func TestEnrichSynthEvent(t *testing.T) { }, true, func(t *testing.T, e *beat.Event, je *journeyEnricher) { - v := lookslike.MustCompile(map[string]interface{}{ + v := lookslike.MustCompile(common.MapStr{ "summary": map[string]int{ "up": 0, "down": 1, @@ -146,7 +263,7 @@ func TestEnrichSynthEvent(t *testing.T) { &SynthEvent{Type: "journey/end"}, false, func(t *testing.T, e *beat.Event, je *journeyEnricher) { - v := lookslike.MustCompile(map[string]interface{}{ + v := lookslike.MustCompile(common.MapStr{ "summary": map[string]int{ "up": 1, "down": 0, @@ -258,8 +375,9 @@ func TestNoSummaryOnAfterHook(t *testing.T) { for idx, se := range synthEvents { e := &beat.Event{} + stdFields := StdSuiteFields{IsInline: false} t.Run(fmt.Sprintf("event %d", idx), func(t *testing.T) { - enrichErr := je.enrich(e, se) + enrichErr := je.enrich(e, se, stdFields) if se != nil && se.Type == "cmd/status" { t.Run("no summary in cmd/status", func(t *testing.T) { diff --git a/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go b/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go index 07de0143c38b..568916140f14 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go @@ -9,10 +9,9 @@ import ( ) type ExecMultiplexer struct { - currentJourney *atomic.Bool - eventCounter *atomic.Int - synthEvents chan *SynthEvent - done chan struct{} + eventCounter *atomic.Int + synthEvents chan *SynthEvent + done chan struct{} } func (e ExecMultiplexer) Close() { @@ -25,18 +24,11 @@ func (e ExecMultiplexer) writeSynthEvent(se *SynthEvent) { } if se.Type == "journey/start" { - e.currentJourney.Store(true) e.eventCounter.Store(-1) } - hasCurrentJourney := e.currentJourney.Load() - if se.Type == "journey/end" || se.Type == "cmd/status" { - e.currentJourney.Store(false) - } - se.index = e.eventCounter.Inc() - if hasCurrentJourney { - e.synthEvents <- se - } + + e.synthEvents <- se } // SynthEvents returns a read only channel for synth events @@ -56,9 +48,8 @@ func (e ExecMultiplexer) Wait() { func NewExecMultiplexer() *ExecMultiplexer { return &ExecMultiplexer{ - currentJourney: atomic.NewBool(false), - eventCounter: atomic.NewInt(-1), // Start from -1 so first call to Inc returns 0 - synthEvents: make(chan *SynthEvent), - done: make(chan struct{}), + eventCounter: atomic.NewInt(-1), // Start from -1 so first call to Inc returns 0 + synthEvents: make(chan *SynthEvent), + done: make(chan struct{}), } } diff --git a/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer_test.go b/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer_test.go index ec85a6b52229..310da4967a3f 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer_test.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer_test.go @@ -18,7 +18,7 @@ func TestExecMultiplexer(t *testing.T) { var testJourneys []*Journey var testEvents []*SynthEvent time := float64(0) - for jIdx := 0; jIdx < 4; jIdx++ { + for jIdx := 0; jIdx < 3; jIdx++ { time++ // fake time to make events seem spaced out journey := &Journey{ Name: fmt.Sprintf("J%d", jIdx), @@ -44,21 +44,11 @@ func TestExecMultiplexer(t *testing.T) { TimestampEpochMicros: time, }) } - - // We want one of the test journeys to end with a cmd/status indicating it failed - if jIdx != 4 { - testEvents = append(testEvents, &SynthEvent{ - Journey: journey, - Type: "journey/end", - TimestampEpochMicros: time, - }) - } else { - testEvents = append(testEvents, &SynthEvent{ - Journey: journey, - Type: "cmd/status", - TimestampEpochMicros: time, - }) - } + testEvents = append(testEvents, &SynthEvent{ + Journey: journey, + Type: "journey/end", + TimestampEpochMicros: time, + }) } // Write the test events in another go routine since writes block @@ -86,7 +76,7 @@ Loop: i := 0 // counter for index, resets on journey change for _, se := range results { require.Equal(t, i, se.index) - if se.Type == "journey/end" || se.Type == "cmd/status" { + if se.Type == "journey/end" { i = 0 } else { i++ diff --git a/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go b/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go index 039a3480c800..b2b8d9c612de 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go @@ -26,13 +26,20 @@ import ( const debugSelector = "synthexec" +type StdSuiteFields struct { + Name string + Id string + Type string + IsInline bool +} + type FilterJourneyConfig struct { Tags []string `config:"tags"` Match string `config:"match"` } // SuiteJob will run a single journey by name from the given suite. -func SuiteJob(ctx context.Context, suitePath string, params common.MapStr, filterJourneys FilterJourneyConfig, extraArgs ...string) (jobs.Job, error) { +func SuiteJob(ctx context.Context, suitePath string, params common.MapStr, filterJourneys FilterJourneyConfig, fields StdSuiteFields, extraArgs ...string) (jobs.Job, error) { // Run the command in the given suitePath, use '.' as the first arg since the command runs // in the correct dir cmdFactory, err := suiteCommandFactory(suitePath, extraArgs...) @@ -40,7 +47,7 @@ func SuiteJob(ctx context.Context, suitePath string, params common.MapStr, filte return nil, err } - return startCmdJob(ctx, cmdFactory, nil, params, filterJourneys), nil + return startCmdJob(ctx, cmdFactory, nil, params, filterJourneys, fields), nil } func suiteCommandFactory(suitePath string, args ...string) (func() *exec.Cmd, error) { @@ -64,40 +71,38 @@ func suiteCommandFactory(suitePath string, args ...string) (func() *exec.Cmd, er } // InlineJourneyJob returns a job that runs the given source as a single journey. -func InlineJourneyJob(ctx context.Context, script string, params common.MapStr, extraArgs ...string) jobs.Job { +func InlineJourneyJob(ctx context.Context, script string, params common.MapStr, fields StdSuiteFields, extraArgs ...string) jobs.Job { newCmd := func() *exec.Cmd { return exec.Command("elastic-synthetics", append(extraArgs, "--inline")...) } - return startCmdJob(ctx, newCmd, &script, params, FilterJourneyConfig{}) + return startCmdJob(ctx, newCmd, &script, params, FilterJourneyConfig{}, fields) } // startCmdJob adapts commands into a heartbeat job. This is a little awkward given that the command's output is // available via a sequence of events in the multiplexer, while heartbeat jobs are tail recursive continuations. // Here, we adapt one to the other, where each recursive job pulls another item off the chan until none are left. -func startCmdJob(ctx context.Context, newCmd func() *exec.Cmd, stdinStr *string, params common.MapStr, filterJourneys FilterJourneyConfig) jobs.Job { +func startCmdJob(ctx context.Context, newCmd func() *exec.Cmd, stdinStr *string, params common.MapStr, filterJourneys FilterJourneyConfig, fields StdSuiteFields) jobs.Job { return func(event *beat.Event) ([]jobs.Job, error) { mpx, err := runCmd(ctx, newCmd(), stdinStr, params, filterJourneys) if err != nil { return nil, err } senr := streamEnricher{} - return []jobs.Job{readResultsJob(ctx, mpx.SynthEvents(), senr.enrich)}, nil + return []jobs.Job{readResultsJob(ctx, mpx.SynthEvents(), senr.enrich, fields)}, nil } } // readResultsJob adapts the output of an ExecMultiplexer into a Job, that uses continuations // to read all output. -func readResultsJob(ctx context.Context, synthEvents <-chan *SynthEvent, enrich enricher) jobs.Job { +func readResultsJob(ctx context.Context, synthEvents <-chan *SynthEvent, enrich enricher, fields StdSuiteFields) jobs.Job { return func(event *beat.Event) (conts []jobs.Job, err error) { - select { - case se := <-synthEvents: - err = enrich(event, se) - if se != nil { - return []jobs.Job{readResultsJob(ctx, synthEvents, enrich)}, err - } else { - return nil, err - } + se := <-synthEvents + err = enrich(event, se, fields) + if se != nil { + return []jobs.Job{readResultsJob(ctx, synthEvents, enrich, fields)}, err + } else { + return nil, err } } } @@ -156,6 +161,9 @@ func runCmd( // Send stdout into the output stdoutPipe, err := cmd.StdoutPipe() + if err != nil { + return nil, fmt.Errorf("could not open stdout pipe: %w", err) + } wg.Add(1) go func() { scanToSynthEvents(stdoutPipe, stdoutToSynthEvent, mpx.writeSynthEvent) @@ -163,6 +171,9 @@ func runCmd( }() stderrPipe, err := cmd.StderrPipe() + if err != nil { + return nil, fmt.Errorf("could not open stderr pipe: %w", err) + } wg.Add(1) go func() { scanToSynthEvents(stderrPipe, stderrToSynthEvent, mpx.writeSynthEvent) @@ -217,11 +228,6 @@ func scanToSynthEvents(rdr io.ReadCloser, transform func(bytes []byte, text stri scanner.Buffer(buf, 1024*1024*40) // Max 50MiB Buffer for scanner.Scan() { - if scanner.Err() != nil { - logp.Warn("Error scanning results %s", scanner.Err()) - return scanner.Err() - } - se, err := transform(scanner.Bytes(), scanner.Text()) if err != nil { logp.Warn("error parsing line: %s for line: %s", err, scanner.Text()) @@ -232,6 +238,11 @@ func scanToSynthEvents(rdr io.ReadCloser, transform func(bytes []byte, text stri } } + if scanner.Err() != nil { + logp.Warn("error scanning synthetics runner results %s", scanner.Err()) + return scanner.Err() + } + return nil } @@ -245,7 +256,7 @@ func lineToSynthEventFactory(typ string) func(bytes []byte, text string) (res *S return &SynthEvent{ Type: typ, TimestampEpochMicros: float64(time.Now().UnixMicro()), - Payload: map[string]interface{}{ + Payload: common.MapStr{ "message": text, }, }, nil @@ -270,7 +281,7 @@ func jsonToSynthEvent(bytes []byte, text string) (res *SynthEvent, err error) { } if res.Type == "" { - return nil, fmt.Errorf("Unmarshal succeeded, but no type found for: %s", text) + return nil, fmt.Errorf("unmarshal succeeded, but no type found for: %s", text) } return }