From b6c95f7a91655f1d09adec6c8b80135a05f20d0d Mon Sep 17 00:00:00 2001 From: vigneshshanmugam Date: Tue, 25 Jan 2022 23:13:11 -0800 Subject: [PATCH 1/4] [Heartbeat]: catch all error handler for browser jobs --- heartbeat/monitors/wrappers/wrappers.go | 24 ++++----- x-pack/heartbeat/monitors/browser/suite.go | 14 ++++- .../monitors/browser/synthexec/enrich.go | 38 +++++++++---- .../monitors/browser/synthexec/enrich_test.go | 6 ++- .../browser/synthexec/execmultiplexer.go | 26 +++++---- .../monitors/browser/synthexec/synthexec.go | 54 ++++++++++--------- .../browser/synthexec/synthexec_test.go | 2 +- 7 files changed, 103 insertions(+), 61 deletions(-) diff --git a/heartbeat/monitors/wrappers/wrappers.go b/heartbeat/monitors/wrappers/wrappers.go index c9e0c79c105..41fd9251470 100644 --- a/heartbeat/monitors/wrappers/wrappers.go +++ b/heartbeat/monitors/wrappers/wrappers.go @@ -97,23 +97,23 @@ func addMonitorMetaFields(event *beat.Event, started time.Time, sf stdfields.Std id = fmt.Sprintf("%s-%x", sf.ID, urlHash) } - // Allow jobs to override the ID, useful for browser suites - // which do this logic on their own - if v, _ := event.GetValue("monitor.id"); v != nil { - id = fmt.Sprintf("%s-%s", sf.ID, v.(string)) - } - if v, _ := event.GetValue("monitor.name"); v != nil { - name = fmt.Sprintf("%s - %s", sf.Name, v.(string)) - } - fieldsToMerge := common.MapStr{ "monitor": common.MapStr{ - "id": id, - "name": name, - "type": sf.Type, "timespan": timespan(started, sf.Schedule, sf.Timeout), }, } + // Browser monitor fields are enriched separately + // in the synthexec package - x-pack/heartbeat/monitors/browser/synthexec/enrich.go + if sf.Type != "browser" { + fieldsToMerge = common.MapStr{ + "monitor": common.MapStr{ + "id": id, + "name": name, + "type": sf.Type, + "timespan": timespan(started, sf.Schedule, sf.Timeout), + }, + } + } // Add service.name for APM interop if sf.Service.Name != "" { diff --git a/x-pack/heartbeat/monitors/browser/suite.go b/x-pack/heartbeat/monitors/browser/suite.go index aff137baeca..e928ec42852 100644 --- a/x-pack/heartbeat/monitors/browser/suite.go +++ b/x-pack/heartbeat/monitors/browser/suite.go @@ -66,6 +66,16 @@ func (s *Suite) FilterJourneys() synthexec.FilterJourneyConfig { return s.suiteCfg.FilterJourneys } +func (s *Suite) Fields() synthexec.StandardSuiteFields { + _, inline := s.InlineSource() + return synthexec.StandardSuiteFields{ + Name: s.suiteCfg.Name, + Id: s.suiteCfg.Id, + Inline: inline, + Type: "browser", + } +} + func (s *Suite) Close() error { if s.suiteCfg.Source.ActiveMemo != nil { s.suiteCfg.Source.ActiveMemo.Close() @@ -102,14 +112,14 @@ func (s *Suite) extraArgs() []string { func (s *Suite) jobs() []jobs.Job { var j jobs.Job if src, ok := s.InlineSource(); ok { - j = synthexec.InlineJourneyJob(context.TODO(), src, s.Params(), s.extraArgs()...) + j = synthexec.InlineJourneyJob(context.TODO(), src, s.Params(), s.Fields(), s.extraArgs()...) } else { j = func(event *beat.Event) ([]jobs.Job, error) { err := s.Fetch() if err != nil { return nil, fmt.Errorf("could not fetch for suite job: %w", err) } - sj, err := synthexec.SuiteJob(context.TODO(), s.Workdir(), s.Params(), s.FilterJourneys(), s.extraArgs()...) + sj, err := synthexec.SuiteJob(context.TODO(), s.Workdir(), s.Params(), s.FilterJourneys(), s.Fields(), s.extraArgs()...) if err != nil { return nil, err } diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go index bec551a7947..76a24b26eb9 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go @@ -18,18 +18,18 @@ import ( "github.com/elastic/beats/v7/libbeat/common" ) -type enricher func(event *beat.Event, se *SynthEvent) error +type enricher func(event *beat.Event, se *SynthEvent, fields StandardSuiteFields) error type streamEnricher struct { je *journeyEnricher } -func (e *streamEnricher) enrich(event *beat.Event, se *SynthEvent) error { +func (e *streamEnricher) enrich(event *beat.Event, se *SynthEvent, fields StandardSuiteFields) error { if e.je == nil || (se != nil && se.Type == "journey/start") { e.je = newJourneyEnricher() } - return e.je.enrich(event, se) + return e.je.enrich(event, se, fields) } // journeyEnricher holds state across received SynthEvents retaining fields @@ -62,7 +62,7 @@ func makeUuid() string { return u.String() } -func (je *journeyEnricher) enrich(event *beat.Event, se *SynthEvent) error { +func (je *journeyEnricher) enrich(event *beat.Event, se *SynthEvent, fields StandardSuiteFields) error { if se == nil { return nil } @@ -84,20 +84,40 @@ func (je *journeyEnricher) enrich(event *beat.Event, se *SynthEvent) error { } eventext.MergeEventFields(event, common.MapStr{ + "event": common.MapStr{ + "type": se.Type, + }, "monitor": common.MapStr{ "check_group": je.checkGroup, }, }) - // Inline jobs have no journey + + // Id and name differs for inline and suite monitors + // - We use the monitor id and name for inline journeys + // - Monitor id/name is concatenated with the journey id/name for + // suite journeys + id := fields.Id + name := fields.Name if je.journey != nil { + id = fmt.Sprintf("%s-%s", id, je.journey.Id) + name = fmt.Sprintf("%s - %s", name, je.journey.Name) + } + eventext.MergeEventFields(event, common.MapStr{ + "monitor": common.MapStr{ + "id": id, + "name": name, + }, + }) + + // Write suite level fields for suite jobs + if !fields.Inline { eventext.MergeEventFields(event, common.MapStr{ - "monitor": common.MapStr{ - "id": je.journey.Id, - "name": je.journey.Name, + "suite": common.MapStr{ + "id": id, + "name": name, }, }) } - return je.enrichSynthEvent(event, se) } diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go index f2c8ba25dca..a2a721f72cb 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go @@ -84,8 +84,9 @@ func TestJourneyEnricher(t *testing.T) { // on the nil data. for idx, se := range synthEvents { e := &beat.Event{} + stdFields := StandardSuiteFields{Inline: false} t.Run(fmt.Sprintf("event %d", idx), func(t *testing.T) { - enrichErr := je.enrich(e, se) + enrichErr := je.enrich(e, se, stdFields) if se != nil && se.Type != "journey/end" { // Test that the created event includes the mapped @@ -258,8 +259,9 @@ func TestNoSummaryOnAfterHook(t *testing.T) { for idx, se := range synthEvents { e := &beat.Event{} + stdFields := StandardSuiteFields{Inline: false} t.Run(fmt.Sprintf("event %d", idx), func(t *testing.T) { - enrichErr := je.enrich(e, se) + enrichErr := je.enrich(e, se, stdFields) if se != nil && se.Type == "cmd/status" { t.Run("no summary in cmd/status", func(t *testing.T) { diff --git a/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go b/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go index 07de0143c38..b47284ea6d2 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go @@ -25,23 +25,27 @@ func (e ExecMultiplexer) writeSynthEvent(se *SynthEvent) { } if se.Type == "journey/start" { - e.currentJourney.Store(true) + // e.currentJourney.Store(true) e.eventCounter.Store(-1) } - hasCurrentJourney := e.currentJourney.Load() - if se.Type == "journey/end" || se.Type == "cmd/status" { - e.currentJourney.Store(false) - } - se.index = e.eventCounter.Inc() - if hasCurrentJourney { - e.synthEvents <- se - } + // hasCurrentJourney := e.currentJourney.Load() + // if se.Type == "journey/end" || se.Type == "cmd/status" { + // e.currentJourney.Store(false) + // } + // If its an inline monitor, we pipe the events to the SynthEvents channel + // as we can associate the monitors, same is not the case with suite monitors + // if e.Inline.Load() || hasCurrentJourney { + e.synthEvents <- se + // } } // SynthEvents returns a read only channel for synth events -func (e ExecMultiplexer) SynthEvents() <-chan *SynthEvent { - return e.synthEvents +func (e ExecMultiplexer) SynthEvents(inline bool) <-chan *SynthEvent { + if inline || e.currentJourney.Load() { + return e.synthEvents + } + return make(chan *SynthEvent) } // Done returns a channel that is closed when all output has been received diff --git a/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go b/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go index 039a3480c80..397559f27a1 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go @@ -26,13 +26,20 @@ import ( const debugSelector = "synthexec" +type StandardSuiteFields struct { + Name string + Id string + Type string + Inline bool +} + type FilterJourneyConfig struct { Tags []string `config:"tags"` Match string `config:"match"` } // SuiteJob will run a single journey by name from the given suite. -func SuiteJob(ctx context.Context, suitePath string, params common.MapStr, filterJourneys FilterJourneyConfig, extraArgs ...string) (jobs.Job, error) { +func SuiteJob(ctx context.Context, suitePath string, params common.MapStr, filterJourneys FilterJourneyConfig, fields StandardSuiteFields, extraArgs ...string) (jobs.Job, error) { // Run the command in the given suitePath, use '.' as the first arg since the command runs // in the correct dir cmdFactory, err := suiteCommandFactory(suitePath, extraArgs...) @@ -40,7 +47,7 @@ func SuiteJob(ctx context.Context, suitePath string, params common.MapStr, filte return nil, err } - return startCmdJob(ctx, cmdFactory, nil, params, filterJourneys), nil + return startCmdJob(ctx, cmdFactory, nil, params, filterJourneys, fields), nil } func suiteCommandFactory(suitePath string, args ...string) (func() *exec.Cmd, error) { @@ -64,40 +71,38 @@ func suiteCommandFactory(suitePath string, args ...string) (func() *exec.Cmd, er } // InlineJourneyJob returns a job that runs the given source as a single journey. -func InlineJourneyJob(ctx context.Context, script string, params common.MapStr, extraArgs ...string) jobs.Job { +func InlineJourneyJob(ctx context.Context, script string, params common.MapStr, fields StandardSuiteFields, extraArgs ...string) jobs.Job { newCmd := func() *exec.Cmd { return exec.Command("elastic-synthetics", append(extraArgs, "--inline")...) } - return startCmdJob(ctx, newCmd, &script, params, FilterJourneyConfig{}) + return startCmdJob(ctx, newCmd, &script, params, FilterJourneyConfig{}, fields) } // startCmdJob adapts commands into a heartbeat job. This is a little awkward given that the command's output is // available via a sequence of events in the multiplexer, while heartbeat jobs are tail recursive continuations. // Here, we adapt one to the other, where each recursive job pulls another item off the chan until none are left. -func startCmdJob(ctx context.Context, newCmd func() *exec.Cmd, stdinStr *string, params common.MapStr, filterJourneys FilterJourneyConfig) jobs.Job { +func startCmdJob(ctx context.Context, newCmd func() *exec.Cmd, stdinStr *string, params common.MapStr, filterJourneys FilterJourneyConfig, fields StandardSuiteFields) jobs.Job { return func(event *beat.Event) ([]jobs.Job, error) { mpx, err := runCmd(ctx, newCmd(), stdinStr, params, filterJourneys) if err != nil { return nil, err } senr := streamEnricher{} - return []jobs.Job{readResultsJob(ctx, mpx.SynthEvents(), senr.enrich)}, nil + return []jobs.Job{readResultsJob(ctx, mpx.SynthEvents(fields.Inline), senr.enrich, fields)}, nil } } // readResultsJob adapts the output of an ExecMultiplexer into a Job, that uses continuations // to read all output. -func readResultsJob(ctx context.Context, synthEvents <-chan *SynthEvent, enrich enricher) jobs.Job { +func readResultsJob(ctx context.Context, synthEvents <-chan *SynthEvent, enrich enricher, fields StandardSuiteFields) jobs.Job { return func(event *beat.Event) (conts []jobs.Job, err error) { - select { - case se := <-synthEvents: - err = enrich(event, se) - if se != nil { - return []jobs.Job{readResultsJob(ctx, synthEvents, enrich)}, err - } else { - return nil, err - } + se := <-synthEvents + err = enrich(event, se, fields) + if se != nil { + return []jobs.Job{readResultsJob(ctx, synthEvents, enrich, fields)}, err + } else { + return nil, err } } } @@ -111,6 +116,7 @@ func runCmd( params common.MapStr, filterJourneys FilterJourneyConfig, ) (mpx *ExecMultiplexer, err error) { + isInlineJob := stdinStr != nil mpx = NewExecMultiplexer() // Setup a pipe for JSON structured output jsonReader, jsonWriter, err := os.Pipe() @@ -147,7 +153,7 @@ func runCmd( logp.Info("Running command: %s in directory: '%s'", loggableCmd.String(), cmd.Dir) - if stdinStr != nil { + if isInlineJob { logp.Debug(debugSelector, "Using stdin str %s", *stdinStr) cmd.Stdin = strings.NewReader(*stdinStr) } @@ -155,14 +161,14 @@ func runCmd( wg := sync.WaitGroup{} // Send stdout into the output - stdoutPipe, err := cmd.StdoutPipe() + stdoutPipe, _ := cmd.StdoutPipe() wg.Add(1) go func() { scanToSynthEvents(stdoutPipe, stdoutToSynthEvent, mpx.writeSynthEvent) wg.Done() }() - stderrPipe, err := cmd.StderrPipe() + stderrPipe, _ := cmd.StderrPipe() wg.Add(1) go func() { scanToSynthEvents(stderrPipe, stderrToSynthEvent, mpx.writeSynthEvent) @@ -217,11 +223,6 @@ func scanToSynthEvents(rdr io.ReadCloser, transform func(bytes []byte, text stri scanner.Buffer(buf, 1024*1024*40) // Max 50MiB Buffer for scanner.Scan() { - if scanner.Err() != nil { - logp.Warn("Error scanning results %s", scanner.Err()) - return scanner.Err() - } - se, err := transform(scanner.Bytes(), scanner.Text()) if err != nil { logp.Warn("error parsing line: %s for line: %s", err, scanner.Text()) @@ -232,6 +233,11 @@ func scanToSynthEvents(rdr io.ReadCloser, transform func(bytes []byte, text stri } } + if scanner.Err() != nil { + logp.Warn("error scanning synthetics runner results %s", scanner.Err()) + return scanner.Err() + } + return nil } @@ -270,7 +276,7 @@ func jsonToSynthEvent(bytes []byte, text string) (res *SynthEvent, err error) { } if res.Type == "" { - return nil, fmt.Errorf("Unmarshal succeeded, but no type found for: %s", text) + return nil, fmt.Errorf("unmarshal succeeded, but no type found for: %s", text) } return } diff --git a/x-pack/heartbeat/monitors/browser/synthexec/synthexec_test.go b/x-pack/heartbeat/monitors/browser/synthexec/synthexec_test.go index c32130c45ba..86db73fa58f 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/synthexec_test.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/synthexec_test.go @@ -110,7 +110,7 @@ func TestRunCmd(t *testing.T) { Loop: for { select { - case se := <-mpx.SynthEvents(): + case se := <-mpx.SynthEvents(false): if se == nil { break Loop } From d145c0d9fe9f6527a7c78bd0d98171dbc0ec3a88 Mon Sep 17 00:00:00 2001 From: vigneshshanmugam Date: Wed, 26 Jan 2022 09:03:01 -0800 Subject: [PATCH 2/4] fix wrapper tests --- heartbeat/monitors/wrappers/wrappers_test.go | 57 +++++++++++-------- .../browser/synthexec/execmultiplexer.go | 14 ++--- .../monitors/browser/synthexec/synthexec.go | 3 +- .../browser/synthexec/synthexec_test.go | 2 +- 4 files changed, 40 insertions(+), 36 deletions(-) diff --git a/heartbeat/monitors/wrappers/wrappers_test.go b/heartbeat/monitors/wrappers/wrappers_test.go index b84b18f65a9..049b6ebd8fb 100644 --- a/heartbeat/monitors/wrappers/wrappers_test.go +++ b/heartbeat/monitors/wrappers/wrappers_test.go @@ -57,8 +57,6 @@ var testMonFields = stdfields.StdMonitorFields{ } var testBrowserMonFields = stdfields.StdMonitorFields{ - ID: "myid", - Name: "myname", Type: "browser", Schedule: schedule.MustParse("@every 1s"), Timeout: 1, @@ -396,6 +394,18 @@ func TestTimespan(t *testing.T) { } } +type BrowserMonitor struct { + id string + name string + checkGroup string +} + +var inlineMonitorValues = BrowserMonitor{ + id: "inline", + name: "inline", + checkGroup: "inline-check-group", +} + func makeInlineBrowserJob(t *testing.T, u string) jobs.Job { parsed, err := url.Parse(u) require.NoError(t, err) @@ -403,16 +413,18 @@ func makeInlineBrowserJob(t *testing.T, u string) jobs.Job { eventext.MergeEventFields(event, common.MapStr{ "url": URLFields(parsed), "monitor": common.MapStr{ - "check_group": "inline-check-group", + "type": "browser", + "id": inlineMonitorValues.id, + "name": inlineMonitorValues.name, + "check_group": inlineMonitorValues.checkGroup, }, }) return nil, nil } } -// Inline browser jobs function very similarly to lightweight jobs -// in that they don't override the ID. -// They do not, however, get a summary field added, nor duration. +// Browser inline jobs monitor information should not be altered +// by the wrappers as they are handled separately in synth enricher func TestInlineBrowserJob(t *testing.T) { fields := testBrowserMonFields testCommonWrap(t, testDef{ @@ -425,10 +437,10 @@ func TestInlineBrowserJob(t *testing.T) { urlValidator(t, "http://foo.com"), lookslike.MustCompile(map[string]interface{}{ "monitor": map[string]interface{}{ - "id": testMonFields.ID, - "name": testMonFields.Name, - "type": fields.Type, - "check_group": "inline-check-group", + "type": "browser", + "id": inlineMonitorValues.id, + "name": inlineMonitorValues.name, + "check_group": inlineMonitorValues.checkGroup, }, }), hbtestllext.MonitorTimespanValidator, @@ -439,13 +451,9 @@ func TestInlineBrowserJob(t *testing.T) { }) } -var suiteBrowserJobValues = struct { - id string - name string - checkGroup string -}{ - id: "journey_1", - name: "Journey 1", +var suiteMonitorValues = BrowserMonitor{ + id: "suite-journey_1", + name: "suite-Journey 1", checkGroup: "journey-1-check-group", } @@ -456,9 +464,10 @@ func makeSuiteBrowserJob(t *testing.T, u string, summary bool, suiteErr error) j eventext.MergeEventFields(event, common.MapStr{ "url": URLFields(parsed), "monitor": common.MapStr{ - "id": suiteBrowserJobValues.id, - "name": suiteBrowserJobValues.name, - "check_group": suiteBrowserJobValues.checkGroup, + "type": "browser", + "id": suiteMonitorValues.id, + "name": suiteMonitorValues.name, + "check_group": suiteMonitorValues.checkGroup, }, }) if summary { @@ -482,10 +491,10 @@ func TestSuiteBrowserJob(t *testing.T) { urlU, _ := url.Parse(urlStr) expectedMonFields := lookslike.MustCompile(map[string]interface{}{ "monitor": map[string]interface{}{ - "id": fmt.Sprintf("%s-%s", testMonFields.ID, suiteBrowserJobValues.id), - "name": fmt.Sprintf("%s - %s", testMonFields.Name, suiteBrowserJobValues.name), - "type": fields.Type, - "check_group": suiteBrowserJobValues.checkGroup, + "type": "browser", + "id": suiteMonitorValues.id, + "name": suiteMonitorValues.name, + "check_group": suiteMonitorValues.checkGroup, "timespan": common.MapStr{ "gte": hbtestllext.IsTime, "lt": hbtestllext.IsTime, diff --git a/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go b/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go index b47284ea6d2..d08dd263461 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go @@ -25,19 +25,15 @@ func (e ExecMultiplexer) writeSynthEvent(se *SynthEvent) { } if se.Type == "journey/start" { - // e.currentJourney.Store(true) + e.currentJourney.Store(true) e.eventCounter.Store(-1) } se.index = e.eventCounter.Inc() - // hasCurrentJourney := e.currentJourney.Load() - // if se.Type == "journey/end" || se.Type == "cmd/status" { - // e.currentJourney.Store(false) - // } - // If its an inline monitor, we pipe the events to the SynthEvents channel - // as we can associate the monitors, same is not the case with suite monitors - // if e.Inline.Load() || hasCurrentJourney { + + if se.Type == "journey/end" || se.Type == "cmd/status" { + e.currentJourney.Store(false) + } e.synthEvents <- se - // } } // SynthEvents returns a read only channel for synth events diff --git a/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go b/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go index 397559f27a1..6fcead07dbb 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go @@ -116,7 +116,6 @@ func runCmd( params common.MapStr, filterJourneys FilterJourneyConfig, ) (mpx *ExecMultiplexer, err error) { - isInlineJob := stdinStr != nil mpx = NewExecMultiplexer() // Setup a pipe for JSON structured output jsonReader, jsonWriter, err := os.Pipe() @@ -153,7 +152,7 @@ func runCmd( logp.Info("Running command: %s in directory: '%s'", loggableCmd.String(), cmd.Dir) - if isInlineJob { + if stdinStr != nil { logp.Debug(debugSelector, "Using stdin str %s", *stdinStr) cmd.Stdin = strings.NewReader(*stdinStr) } diff --git a/x-pack/heartbeat/monitors/browser/synthexec/synthexec_test.go b/x-pack/heartbeat/monitors/browser/synthexec/synthexec_test.go index 86db73fa58f..4c09cbe8f53 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/synthexec_test.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/synthexec_test.go @@ -110,7 +110,7 @@ func TestRunCmd(t *testing.T) { Loop: for { select { - case se := <-mpx.SynthEvents(false): + case se := <-mpx.SynthEvents(true): if se == nil { break Loop } From 0d6f43844404e7bb810846ecd03f5fa72c153d5e Mon Sep 17 00:00:00 2001 From: vigneshshanmugam Date: Fri, 28 Jan 2022 17:37:07 -0800 Subject: [PATCH 3/4] add tests and consume all events --- heartbeat/monitors/wrappers/wrappers.go | 99 +++++++++-------- x-pack/heartbeat/monitors/browser/suite.go | 10 +- .../monitors/browser/synthexec/enrich.go | 15 +-- .../monitors/browser/synthexec/enrich_test.go | 100 +++++++++++++++++- .../browser/synthexec/execmultiplexer.go | 25 ++--- .../browser/synthexec/execmultiplexer_test.go | 24 ++--- .../monitors/browser/synthexec/synthexec.go | 12 +-- .../browser/synthexec/synthexec_test.go | 2 +- 8 files changed, 186 insertions(+), 101 deletions(-) diff --git a/heartbeat/monitors/wrappers/wrappers.go b/heartbeat/monitors/wrappers/wrappers.go index 41fd9251470..2790de08ffd 100644 --- a/heartbeat/monitors/wrappers/wrappers.go +++ b/heartbeat/monitors/wrappers/wrappers.go @@ -50,8 +50,10 @@ func WrapLightweight(js []jobs.Job, stdMonFields stdfields.StdMonitorFields) []j return jobs.WrapAllSeparately( jobs.WrapAll( js, + addMonitorTimespan(stdMonFields), + addServiceName(stdMonFields), addMonitorMeta(stdMonFields, len(js) > 1), - addMonitorStatus(stdMonFields.Type, false), + addMonitorStatus(false), addMonitorDuration, ), func() jobs.JobWrapper { @@ -65,64 +67,75 @@ func WrapLightweight(js []jobs.Job, stdMonFields stdfields.StdMonitorFields) []j func WrapBrowser(js []jobs.Job, stdMonFields stdfields.StdMonitorFields) []jobs.Job { return jobs.WrapAll( js, - addMonitorMeta(stdMonFields, len(js) > 1), - addMonitorStatus(stdMonFields.Type, true), + addMonitorTimespan(stdMonFields), + addServiceName(stdMonFields), + addMonitorStatus(true), ) } // addMonitorMeta adds the id, name, and type fields to the monitor. -func addMonitorMeta(stdMonFields stdfields.StdMonitorFields, isMulti bool) jobs.JobWrapper { +func addMonitorMeta(sf stdfields.StdMonitorFields, isMulti bool) jobs.JobWrapper { return func(job jobs.Job) jobs.Job { return func(event *beat.Event) ([]jobs.Job, error) { - cont, e := job(event) - addMonitorMetaFields(event, time.Now(), stdMonFields, isMulti) - return cont, e + cont, err := job(event) + + id := sf.ID + name := sf.Name + // If multiple jobs are listed for this monitor, we can't have a single ID, so we hash the + // unique URLs to create unique suffixes for the monitor. + if isMulti { + url, err := event.GetValue("url.full") + if err != nil { + logp.Error(errors.Wrap(err, "Mandatory url.full key missing!")) + url = "n/a" + } + urlHash, _ := hashstructure.Hash(url, nil) + id = fmt.Sprintf("%s-%x", sf.ID, urlHash) + } + + eventext.MergeEventFields(event, common.MapStr{ + "monitor": common.MapStr{ + "id": id, + "name": name, + "type": sf.Type, + }, + }) + return cont, err } } } -func addMonitorMetaFields(event *beat.Event, started time.Time, sf stdfields.StdMonitorFields, isMulti bool) { - id := sf.ID - name := sf.Name +func addMonitorTimespan(sf stdfields.StdMonitorFields) jobs.JobWrapper { + return func(origJob jobs.Job) jobs.Job { + return func(event *beat.Event) ([]jobs.Job, error) { + cont, err := origJob(event) - // If multiple jobs are listed for this monitor, we can't have a single ID, so we hash the - // unique URLs to create unique suffixes for the monitor. - if isMulti { - url, err := event.GetValue("url.full") - if err != nil { - logp.Error(errors.Wrap(err, "Mandatory url.full key missing!")) - url = "n/a" + eventext.MergeEventFields(event, common.MapStr{ + "monitor": common.MapStr{ + "timespan": timespan(time.Now(), sf.Schedule, sf.Timeout), + }, + }) + return cont, err } - urlHash, _ := hashstructure.Hash(url, nil) - id = fmt.Sprintf("%s-%x", sf.ID, urlHash) } +} - fieldsToMerge := common.MapStr{ - "monitor": common.MapStr{ - "timespan": timespan(started, sf.Schedule, sf.Timeout), - }, - } - // Browser monitor fields are enriched separately - // in the synthexec package - x-pack/heartbeat/monitors/browser/synthexec/enrich.go - if sf.Type != "browser" { - fieldsToMerge = common.MapStr{ - "monitor": common.MapStr{ - "id": id, - "name": name, - "type": sf.Type, - "timespan": timespan(started, sf.Schedule, sf.Timeout), - }, - } - } +// Add service.name to monitors for APM interop +func addServiceName(sf stdfields.StdMonitorFields) jobs.JobWrapper { + return func(origJob jobs.Job) jobs.Job { + return func(event *beat.Event) ([]jobs.Job, error) { + cont, err := origJob(event) - // Add service.name for APM interop - if sf.Service.Name != "" { - fieldsToMerge["service"] = common.MapStr{ - "name": sf.Service.Name, + if sf.Service.Name != "" { + eventext.MergeEventFields(event, common.MapStr{ + "service": common.MapStr{ + "name": sf.Service.Name, + }, + }) + } + return cont, err } } - - eventext.MergeEventFields(event, fieldsToMerge) } func timespan(started time.Time, sched *schedule.Schedule, timeout time.Duration) common.MapStr { @@ -142,7 +155,7 @@ func timespan(started time.Time, sched *schedule.Schedule, timeout time.Duration // by the original Job will be set as a field. The original error will not be // passed through as a return value. Errors may still be present but only if there // is an actual error wrapping the error. -func addMonitorStatus(monitorType string, summaryOnly bool) jobs.JobWrapper { +func addMonitorStatus(summaryOnly bool) jobs.JobWrapper { return func(origJob jobs.Job) jobs.Job { return func(event *beat.Event) ([]jobs.Job, error) { cont, err := origJob(event) diff --git a/x-pack/heartbeat/monitors/browser/suite.go b/x-pack/heartbeat/monitors/browser/suite.go index e928ec42852..1561096d856 100644 --- a/x-pack/heartbeat/monitors/browser/suite.go +++ b/x-pack/heartbeat/monitors/browser/suite.go @@ -67,12 +67,12 @@ func (s *Suite) FilterJourneys() synthexec.FilterJourneyConfig { } func (s *Suite) Fields() synthexec.StandardSuiteFields { - _, inline := s.InlineSource() + _, isInline := s.InlineSource() return synthexec.StandardSuiteFields{ - Name: s.suiteCfg.Name, - Id: s.suiteCfg.Id, - Inline: inline, - Type: "browser", + Name: s.suiteCfg.Name, + Id: s.suiteCfg.Id, + IsInline: isInline, + Type: "browser", } } diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go index 76a24b26eb9..3e3f39702c3 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go @@ -93,12 +93,13 @@ func (je *journeyEnricher) enrich(event *beat.Event, se *SynthEvent, fields Stan }) // Id and name differs for inline and suite monitors - // - We use the monitor id and name for inline journeys + // - We use the monitor id and name for inline journeys ignoring the + // autogenerated `inline`journey id and name. // - Monitor id/name is concatenated with the journey id/name for - // suite journeys + // suite monitors id := fields.Id name := fields.Name - if je.journey != nil { + if !fields.IsInline && je.journey != nil { id = fmt.Sprintf("%s-%s", id, je.journey.Id) name = fmt.Sprintf("%s - %s", name, je.journey.Name) } @@ -109,12 +110,12 @@ func (je *journeyEnricher) enrich(event *beat.Event, se *SynthEvent, fields Stan }, }) - // Write suite level fields for suite jobs - if !fields.Inline { + // Write suite level fields for suite monitors + if !fields.IsInline { eventext.MergeEventFields(event, common.MapStr{ "suite": common.MapStr{ - "id": id, - "name": name, + "id": fields.Id, + "name": fields.Name, }, }) } diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go index a2a721f72cb..18f775b5e92 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go @@ -34,6 +34,12 @@ func makeStepEvent(typ string, ts float64, name string, index int, status string } func TestJourneyEnricher(t *testing.T) { + var stdFields = StandardSuiteFields{ + Id: "mysuite", + Name: "mysuite", + Type: "browser", + IsInline: false, + } journey := &Journey{ Name: "A Journey Name", Id: "my-journey-id", @@ -84,14 +90,22 @@ func TestJourneyEnricher(t *testing.T) { // on the nil data. for idx, se := range synthEvents { e := &beat.Event{} - stdFields := StandardSuiteFields{Inline: false} - t.Run(fmt.Sprintf("event %d", idx), func(t *testing.T) { + stdFields.IsInline = false + t.Run(fmt.Sprintf("suites monitor event %d", idx), func(t *testing.T) { enrichErr := je.enrich(e, se, stdFields) if se != nil && se.Type != "journey/end" { // Test that the created event includes the mapped // version of the event testslike.Test(t, lookslike.MustCompile(se.ToMap()), e.Fields) + // check suite and monitor meta fields + testslike.Test(t, lookslike.MustCompile(common.MapStr{ + "suite.id": stdFields.Id, + "suite.name": stdFields.Name, + "monitor.id": fmt.Sprintf("%s-%s", stdFields.Id, journey.Id), + "monitor.name": fmt.Sprintf("%s - %s", stdFields.Name, journey.Name), + }), e.Fields) + require.Equal(t, se.Timestamp().Unix(), e.Timestamp.Unix()) if se.Error != nil { @@ -112,6 +126,82 @@ func TestJourneyEnricher(t *testing.T) { }) } }) + + } + + for idx, se := range synthEvents { + e := &beat.Event{} + stdFields.IsInline = true + t.Run(fmt.Sprintf("inline monitor event %d", idx), func(t *testing.T) { + je.enrich(e, se, stdFields) + testslike.Test(t, lookslike.MustCompile(common.MapStr{ + "monitor.id": stdFields.Id, + "monitor.name": stdFields.Name, + }), e.Fields) + + sv, _ := e.Fields.GetValue("suite") + require.Nil(t, sv) + }) + } +} + +func TestEnrichConsoleSynthEvents(t *testing.T) { + tests := []struct { + name string + je *journeyEnricher + se *SynthEvent + check func(t *testing.T, e *beat.Event, je *journeyEnricher) + }{ + { + "stderr", + &journeyEnricher{}, + &SynthEvent{ + Type: "stderr", + Payload: common.MapStr{ + "message": "Error from synthetics", + }, + }, + func(t *testing.T, e *beat.Event, je *journeyEnricher) { + v := lookslike.MustCompile(common.MapStr{ + "synthetics": common.MapStr{ + "payload": common.MapStr{ + "message": "Error from synthetics", + }, + "type": "stderr", + }, + }) + testslike.Test(t, v, e.Fields) + }, + }, + { + "stdout", + &journeyEnricher{}, + &SynthEvent{ + Type: "stdout", + Payload: common.MapStr{ + "message": "debug output", + }, + }, + func(t *testing.T, e *beat.Event, je *journeyEnricher) { + v := lookslike.MustCompile(common.MapStr{ + "synthetics": common.MapStr{ + "payload": common.MapStr{ + "message": "debug output", + }, + "type": "stdout", + }, + }) + testslike.Test(t, v, e.Fields) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + e := &beat.Event{} + tt.je.enrichSynthEvent(e, tt.se) + tt.check(t, e, tt.je) + }) } } @@ -132,7 +222,7 @@ func TestEnrichSynthEvent(t *testing.T) { }, true, func(t *testing.T, e *beat.Event, je *journeyEnricher) { - v := lookslike.MustCompile(map[string]interface{}{ + v := lookslike.MustCompile(common.MapStr{ "summary": map[string]int{ "up": 0, "down": 1, @@ -147,7 +237,7 @@ func TestEnrichSynthEvent(t *testing.T) { &SynthEvent{Type: "journey/end"}, false, func(t *testing.T, e *beat.Event, je *journeyEnricher) { - v := lookslike.MustCompile(map[string]interface{}{ + v := lookslike.MustCompile(common.MapStr{ "summary": map[string]int{ "up": 1, "down": 0, @@ -259,7 +349,7 @@ func TestNoSummaryOnAfterHook(t *testing.T) { for idx, se := range synthEvents { e := &beat.Event{} - stdFields := StandardSuiteFields{Inline: false} + stdFields := StandardSuiteFields{IsInline: false} t.Run(fmt.Sprintf("event %d", idx), func(t *testing.T) { enrichErr := je.enrich(e, se, stdFields) diff --git a/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go b/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go index d08dd263461..568916140f1 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go @@ -9,10 +9,9 @@ import ( ) type ExecMultiplexer struct { - currentJourney *atomic.Bool - eventCounter *atomic.Int - synthEvents chan *SynthEvent - done chan struct{} + eventCounter *atomic.Int + synthEvents chan *SynthEvent + done chan struct{} } func (e ExecMultiplexer) Close() { @@ -25,23 +24,16 @@ func (e ExecMultiplexer) writeSynthEvent(se *SynthEvent) { } if se.Type == "journey/start" { - e.currentJourney.Store(true) e.eventCounter.Store(-1) } se.index = e.eventCounter.Inc() - if se.Type == "journey/end" || se.Type == "cmd/status" { - e.currentJourney.Store(false) - } e.synthEvents <- se } // SynthEvents returns a read only channel for synth events -func (e ExecMultiplexer) SynthEvents(inline bool) <-chan *SynthEvent { - if inline || e.currentJourney.Load() { - return e.synthEvents - } - return make(chan *SynthEvent) +func (e ExecMultiplexer) SynthEvents() <-chan *SynthEvent { + return e.synthEvents } // Done returns a channel that is closed when all output has been received @@ -56,9 +48,8 @@ func (e ExecMultiplexer) Wait() { func NewExecMultiplexer() *ExecMultiplexer { return &ExecMultiplexer{ - currentJourney: atomic.NewBool(false), - eventCounter: atomic.NewInt(-1), // Start from -1 so first call to Inc returns 0 - synthEvents: make(chan *SynthEvent), - done: make(chan struct{}), + eventCounter: atomic.NewInt(-1), // Start from -1 so first call to Inc returns 0 + synthEvents: make(chan *SynthEvent), + done: make(chan struct{}), } } diff --git a/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer_test.go b/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer_test.go index ec85a6b5222..310da4967a3 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer_test.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer_test.go @@ -18,7 +18,7 @@ func TestExecMultiplexer(t *testing.T) { var testJourneys []*Journey var testEvents []*SynthEvent time := float64(0) - for jIdx := 0; jIdx < 4; jIdx++ { + for jIdx := 0; jIdx < 3; jIdx++ { time++ // fake time to make events seem spaced out journey := &Journey{ Name: fmt.Sprintf("J%d", jIdx), @@ -44,21 +44,11 @@ func TestExecMultiplexer(t *testing.T) { TimestampEpochMicros: time, }) } - - // We want one of the test journeys to end with a cmd/status indicating it failed - if jIdx != 4 { - testEvents = append(testEvents, &SynthEvent{ - Journey: journey, - Type: "journey/end", - TimestampEpochMicros: time, - }) - } else { - testEvents = append(testEvents, &SynthEvent{ - Journey: journey, - Type: "cmd/status", - TimestampEpochMicros: time, - }) - } + testEvents = append(testEvents, &SynthEvent{ + Journey: journey, + Type: "journey/end", + TimestampEpochMicros: time, + }) } // Write the test events in another go routine since writes block @@ -86,7 +76,7 @@ Loop: i := 0 // counter for index, resets on journey change for _, se := range results { require.Equal(t, i, se.index) - if se.Type == "journey/end" || se.Type == "cmd/status" { + if se.Type == "journey/end" { i = 0 } else { i++ diff --git a/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go b/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go index 6fcead07dbb..c293a944324 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go @@ -27,10 +27,10 @@ import ( const debugSelector = "synthexec" type StandardSuiteFields struct { - Name string - Id string - Type string - Inline bool + Name string + Id string + Type string + IsInline bool } type FilterJourneyConfig struct { @@ -89,7 +89,7 @@ func startCmdJob(ctx context.Context, newCmd func() *exec.Cmd, stdinStr *string, return nil, err } senr := streamEnricher{} - return []jobs.Job{readResultsJob(ctx, mpx.SynthEvents(fields.Inline), senr.enrich, fields)}, nil + return []jobs.Job{readResultsJob(ctx, mpx.SynthEvents(), senr.enrich, fields)}, nil } } @@ -250,7 +250,7 @@ func lineToSynthEventFactory(typ string) func(bytes []byte, text string) (res *S return &SynthEvent{ Type: typ, TimestampEpochMicros: float64(time.Now().UnixMicro()), - Payload: map[string]interface{}{ + Payload: common.MapStr{ "message": text, }, }, nil diff --git a/x-pack/heartbeat/monitors/browser/synthexec/synthexec_test.go b/x-pack/heartbeat/monitors/browser/synthexec/synthexec_test.go index 4c09cbe8f53..c32130c45ba 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/synthexec_test.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/synthexec_test.go @@ -110,7 +110,7 @@ func TestRunCmd(t *testing.T) { Loop: for { select { - case se := <-mpx.SynthEvents(true): + case se := <-mpx.SynthEvents(): if se == nil { break Loop } From c0b4cc0ead5ccc608567b97aad17ae1992a78a77 Mon Sep 17 00:00:00 2001 From: vigneshshanmugam Date: Mon, 31 Jan 2022 17:01:27 -0800 Subject: [PATCH 4/4] address review and improve test --- x-pack/heartbeat/monitors/browser/suite.go | 4 +- .../monitors/browser/synthexec/enrich.go | 7 +- .../monitors/browser/synthexec/enrich_test.go | 138 +++++++++++------- .../monitors/browser/synthexec/synthexec.go | 20 ++- 4 files changed, 101 insertions(+), 68 deletions(-) diff --git a/x-pack/heartbeat/monitors/browser/suite.go b/x-pack/heartbeat/monitors/browser/suite.go index 1561096d856..c4f52921b41 100644 --- a/x-pack/heartbeat/monitors/browser/suite.go +++ b/x-pack/heartbeat/monitors/browser/suite.go @@ -66,9 +66,9 @@ func (s *Suite) FilterJourneys() synthexec.FilterJourneyConfig { return s.suiteCfg.FilterJourneys } -func (s *Suite) Fields() synthexec.StandardSuiteFields { +func (s *Suite) Fields() synthexec.StdSuiteFields { _, isInline := s.InlineSource() - return synthexec.StandardSuiteFields{ + return synthexec.StdSuiteFields{ Name: s.suiteCfg.Name, Id: s.suiteCfg.Id, IsInline: isInline, diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go index 3e3f39702c3..3e01c387cb7 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go @@ -18,13 +18,13 @@ import ( "github.com/elastic/beats/v7/libbeat/common" ) -type enricher func(event *beat.Event, se *SynthEvent, fields StandardSuiteFields) error +type enricher func(event *beat.Event, se *SynthEvent, fields StdSuiteFields) error type streamEnricher struct { je *journeyEnricher } -func (e *streamEnricher) enrich(event *beat.Event, se *SynthEvent, fields StandardSuiteFields) error { +func (e *streamEnricher) enrich(event *beat.Event, se *SynthEvent, fields StdSuiteFields) error { if e.je == nil || (se != nil && se.Type == "journey/start") { e.je = newJourneyEnricher() } @@ -62,7 +62,7 @@ func makeUuid() string { return u.String() } -func (je *journeyEnricher) enrich(event *beat.Event, se *SynthEvent, fields StandardSuiteFields) error { +func (je *journeyEnricher) enrich(event *beat.Event, se *SynthEvent, fields StdSuiteFields) error { if se == nil { return nil } @@ -162,6 +162,7 @@ func (je *journeyEnricher) enrichSynthEvent(event *beat.Event, se *SynthEvent) e // In that case we always want to issue an update op event.Meta.Put(events.FieldMetaOpType, events.OpTypeCreate) } + eventext.MergeEventFields(event, se.ToMap()) if je.urlFields == nil { diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go index 18f775b5e92..da2f8980dc8 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go @@ -19,6 +19,7 @@ import ( "github.com/elastic/beats/v7/libbeat/processors/add_data_stream" "github.com/elastic/go-lookslike" "github.com/elastic/go-lookslike/testslike" + "github.com/elastic/go-lookslike/validator" ) func makeStepEvent(typ string, ts float64, name string, index int, status string, urlstr string, err *SynthError) *SynthEvent { @@ -34,7 +35,7 @@ func makeStepEvent(typ string, ts float64, name string, index int, status string } func TestJourneyEnricher(t *testing.T) { - var stdFields = StandardSuiteFields{ + var stdFields = StdSuiteFields{ Id: "mysuite", Name: "mysuite", Type: "browser", @@ -83,64 +84,83 @@ func TestJourneyEnricher(t *testing.T) { journeyEnd, } - je := &journeyEnricher{} - - // We need an expectation for each input - // plus a final expectation for the summary which comes - // on the nil data. - for idx, se := range synthEvents { - e := &beat.Event{} - stdFields.IsInline = false - t.Run(fmt.Sprintf("suites monitor event %d", idx), func(t *testing.T) { - enrichErr := je.enrich(e, se, stdFields) - - if se != nil && se.Type != "journey/end" { - // Test that the created event includes the mapped - // version of the event - testslike.Test(t, lookslike.MustCompile(se.ToMap()), e.Fields) - // check suite and monitor meta fields - testslike.Test(t, lookslike.MustCompile(common.MapStr{ - "suite.id": stdFields.Id, - "suite.name": stdFields.Name, - "monitor.id": fmt.Sprintf("%s-%s", stdFields.Id, journey.Id), - "monitor.name": fmt.Sprintf("%s - %s", stdFields.Name, journey.Name), - }), e.Fields) - - require.Equal(t, se.Timestamp().Unix(), e.Timestamp.Unix()) - - if se.Error != nil { - require.Equal(t, stepError(se.Error), enrichErr) - } - } else { // journey end gets a summary - require.Equal(t, stepError(syntherr), enrichErr) + suiteValidator := func() validator.Validator { + return lookslike.MustCompile(common.MapStr{ + "suite.id": stdFields.Id, + "suite.name": stdFields.Name, + "monitor.id": fmt.Sprintf("%s-%s", stdFields.Id, journey.Id), + "monitor.name": fmt.Sprintf("%s - %s", stdFields.Name, journey.Name), + }) + } + inlineValidator := func() validator.Validator { + return lookslike.MustCompile(common.MapStr{ + "monitor.id": stdFields.Id, + "monitor.name": stdFields.Name, + }) + } + commonValidator := func(se *SynthEvent) validator.Validator { + var v []validator.Validator - u, _ := url.Parse(url1) - t.Run("summary", func(t *testing.T) { - v := lookslike.MustCompile(common.MapStr{ - "synthetics.type": "heartbeat/summary", - "url": wrappers.URLFields(u), - "monitor.duration.us": int64(journeyEnd.Timestamp().Sub(journeyStart.Timestamp()) / time.Microsecond), - }) + // We need an expectation for each input plus a final + // expectation for the summary which comes on the nil data. + if se.Type != "journey/end" { + // Test that the created event includes the mapped + // version of the event + v = append(v, lookslike.MustCompile(se.ToMap())) + } else { + u, _ := url.Parse(url1) + // journey end gets a summary + v = append(v, lookslike.MustCompile(common.MapStr{ + "synthetics.type": "heartbeat/summary", + "url": wrappers.URLFields(u), + "monitor.duration.us": int64(journeyEnd.Timestamp().Sub(journeyStart.Timestamp()) / time.Microsecond), + })) + } + return lookslike.Compose(v...) + } - testslike.Test(t, v, e.Fields) - }) + je := &journeyEnricher{} + check := func(t *testing.T, se *SynthEvent, ssf StdSuiteFields) { + e := &beat.Event{} + t.Run(fmt.Sprintf("event: %s", se.Type), func(t *testing.T) { + enrichErr := je.enrich(e, se, ssf) + if se.Error != nil { + require.Equal(t, stepError(se.Error), enrichErr) } - }) + if ssf.IsInline { + sv, _ := e.Fields.GetValue("suite") + require.Nil(t, sv) + testslike.Test(t, inlineValidator(), e.Fields) + } else { + testslike.Test(t, suiteValidator(), e.Fields) + } + testslike.Test(t, commonValidator(se), e.Fields) + require.Equal(t, se.Timestamp().Unix(), e.Timestamp.Unix()) + }) } - for idx, se := range synthEvents { - e := &beat.Event{} - stdFields.IsInline = true - t.Run(fmt.Sprintf("inline monitor event %d", idx), func(t *testing.T) { - je.enrich(e, se, stdFields) - testslike.Test(t, lookslike.MustCompile(common.MapStr{ - "monitor.id": stdFields.Id, - "monitor.name": stdFields.Name, - }), e.Fields) + tests := []struct { + name string + isInline bool + se []*SynthEvent + }{ + { + name: "suite monitor", + isInline: false, + }, + { + name: "inline monitor", + isInline: true, + }, + } - sv, _ := e.Fields.GetValue("suite") - require.Nil(t, sv) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + stdFields.IsInline = tt.isInline + for _, se := range synthEvents { + check(t, se, stdFields) + } }) } } @@ -160,6 +180,7 @@ func TestEnrichConsoleSynthEvents(t *testing.T) { Payload: common.MapStr{ "message": "Error from synthetics", }, + PackageVersion: "1.0.0", }, func(t *testing.T, e *beat.Event, je *journeyEnricher) { v := lookslike.MustCompile(common.MapStr{ @@ -167,7 +188,9 @@ func TestEnrichConsoleSynthEvents(t *testing.T) { "payload": common.MapStr{ "message": "Error from synthetics", }, - "type": "stderr", + "type": "stderr", + "package_version": "1.0.0", + "index": 0, }, }) testslike.Test(t, v, e.Fields) @@ -181,6 +204,7 @@ func TestEnrichConsoleSynthEvents(t *testing.T) { Payload: common.MapStr{ "message": "debug output", }, + PackageVersion: "1.0.0", }, func(t *testing.T, e *beat.Event, je *journeyEnricher) { v := lookslike.MustCompile(common.MapStr{ @@ -188,10 +212,12 @@ func TestEnrichConsoleSynthEvents(t *testing.T) { "payload": common.MapStr{ "message": "debug output", }, - "type": "stdout", + "type": "stdout", + "package_version": "1.0.0", + "index": 0, }, }) - testslike.Test(t, v, e.Fields) + testslike.Test(t, lookslike.Strict(v), e.Fields) }, }, } @@ -349,7 +375,7 @@ func TestNoSummaryOnAfterHook(t *testing.T) { for idx, se := range synthEvents { e := &beat.Event{} - stdFields := StandardSuiteFields{IsInline: false} + stdFields := StdSuiteFields{IsInline: false} t.Run(fmt.Sprintf("event %d", idx), func(t *testing.T) { enrichErr := je.enrich(e, se, stdFields) diff --git a/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go b/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go index c293a944324..b2b8d9c612d 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go @@ -26,7 +26,7 @@ import ( const debugSelector = "synthexec" -type StandardSuiteFields struct { +type StdSuiteFields struct { Name string Id string Type string @@ -39,7 +39,7 @@ type FilterJourneyConfig struct { } // SuiteJob will run a single journey by name from the given suite. -func SuiteJob(ctx context.Context, suitePath string, params common.MapStr, filterJourneys FilterJourneyConfig, fields StandardSuiteFields, extraArgs ...string) (jobs.Job, error) { +func SuiteJob(ctx context.Context, suitePath string, params common.MapStr, filterJourneys FilterJourneyConfig, fields StdSuiteFields, extraArgs ...string) (jobs.Job, error) { // Run the command in the given suitePath, use '.' as the first arg since the command runs // in the correct dir cmdFactory, err := suiteCommandFactory(suitePath, extraArgs...) @@ -71,7 +71,7 @@ func suiteCommandFactory(suitePath string, args ...string) (func() *exec.Cmd, er } // InlineJourneyJob returns a job that runs the given source as a single journey. -func InlineJourneyJob(ctx context.Context, script string, params common.MapStr, fields StandardSuiteFields, extraArgs ...string) jobs.Job { +func InlineJourneyJob(ctx context.Context, script string, params common.MapStr, fields StdSuiteFields, extraArgs ...string) jobs.Job { newCmd := func() *exec.Cmd { return exec.Command("elastic-synthetics", append(extraArgs, "--inline")...) } @@ -82,7 +82,7 @@ func InlineJourneyJob(ctx context.Context, script string, params common.MapStr, // startCmdJob adapts commands into a heartbeat job. This is a little awkward given that the command's output is // available via a sequence of events in the multiplexer, while heartbeat jobs are tail recursive continuations. // Here, we adapt one to the other, where each recursive job pulls another item off the chan until none are left. -func startCmdJob(ctx context.Context, newCmd func() *exec.Cmd, stdinStr *string, params common.MapStr, filterJourneys FilterJourneyConfig, fields StandardSuiteFields) jobs.Job { +func startCmdJob(ctx context.Context, newCmd func() *exec.Cmd, stdinStr *string, params common.MapStr, filterJourneys FilterJourneyConfig, fields StdSuiteFields) jobs.Job { return func(event *beat.Event) ([]jobs.Job, error) { mpx, err := runCmd(ctx, newCmd(), stdinStr, params, filterJourneys) if err != nil { @@ -95,7 +95,7 @@ func startCmdJob(ctx context.Context, newCmd func() *exec.Cmd, stdinStr *string, // readResultsJob adapts the output of an ExecMultiplexer into a Job, that uses continuations // to read all output. -func readResultsJob(ctx context.Context, synthEvents <-chan *SynthEvent, enrich enricher, fields StandardSuiteFields) jobs.Job { +func readResultsJob(ctx context.Context, synthEvents <-chan *SynthEvent, enrich enricher, fields StdSuiteFields) jobs.Job { return func(event *beat.Event) (conts []jobs.Job, err error) { se := <-synthEvents err = enrich(event, se, fields) @@ -160,14 +160,20 @@ func runCmd( wg := sync.WaitGroup{} // Send stdout into the output - stdoutPipe, _ := cmd.StdoutPipe() + stdoutPipe, err := cmd.StdoutPipe() + if err != nil { + return nil, fmt.Errorf("could not open stdout pipe: %w", err) + } wg.Add(1) go func() { scanToSynthEvents(stdoutPipe, stdoutToSynthEvent, mpx.writeSynthEvent) wg.Done() }() - stderrPipe, _ := cmd.StderrPipe() + stderrPipe, err := cmd.StderrPipe() + if err != nil { + return nil, fmt.Errorf("could not open stderr pipe: %w", err) + } wg.Add(1) go func() { scanToSynthEvents(stderrPipe, stderrToSynthEvent, mpx.writeSynthEvent)