Skip to content

Commit

Permalink
[Heartbeat]: catch all error handler for browser jobs (#30013) (#30126)
Browse files Browse the repository at this point in the history
* [Heartbeat]: catch all error handler for browser jobs

* fix wrapper tests

* add tests and consume all events

* address review and improve test

(cherry picked from commit 267f8cb)

Co-authored-by: Vignesh Shanmugam <[email protected]>
  • Loading branch information
mergify[bot] and vigneshshanmugam authored Feb 1, 2022
1 parent a6903b3 commit e38c43a
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 164 deletions.
99 changes: 56 additions & 43 deletions heartbeat/monitors/wrappers/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ func WrapLightweight(js []jobs.Job, stdMonFields stdfields.StdMonitorFields) []j
return jobs.WrapAllSeparately(
jobs.WrapAll(
js,
addMonitorTimespan(stdMonFields),
addServiceName(stdMonFields),
addMonitorMeta(stdMonFields, len(js) > 1),
addMonitorStatus(stdMonFields.Type, false),
addMonitorStatus(false),
addMonitorDuration,
),
func() jobs.JobWrapper {
Expand All @@ -65,64 +67,75 @@ func WrapLightweight(js []jobs.Job, stdMonFields stdfields.StdMonitorFields) []j
func WrapBrowser(js []jobs.Job, stdMonFields stdfields.StdMonitorFields) []jobs.Job {
return jobs.WrapAll(
js,
addMonitorMeta(stdMonFields, len(js) > 1),
addMonitorStatus(stdMonFields.Type, true),
addMonitorTimespan(stdMonFields),
addServiceName(stdMonFields),
addMonitorStatus(true),
)
}

// addMonitorMeta adds the id, name, and type fields to the monitor.
func addMonitorMeta(stdMonFields stdfields.StdMonitorFields, isMulti bool) jobs.JobWrapper {
func addMonitorMeta(sf stdfields.StdMonitorFields, isMulti bool) jobs.JobWrapper {
return func(job jobs.Job) jobs.Job {
return func(event *beat.Event) ([]jobs.Job, error) {
cont, e := job(event)
addMonitorMetaFields(event, time.Now(), stdMonFields, isMulti)
return cont, e
cont, err := job(event)

id := sf.ID
name := sf.Name
// If multiple jobs are listed for this monitor, we can't have a single ID, so we hash the
// unique URLs to create unique suffixes for the monitor.
if isMulti {
url, err := event.GetValue("url.full")
if err != nil {
logp.Error(errors.Wrap(err, "Mandatory url.full key missing!"))
url = "n/a"
}
urlHash, _ := hashstructure.Hash(url, nil)
id = fmt.Sprintf("%s-%x", sf.ID, urlHash)
}

eventext.MergeEventFields(event, common.MapStr{
"monitor": common.MapStr{
"id": id,
"name": name,
"type": sf.Type,
},
})
return cont, err
}
}
}

func addMonitorMetaFields(event *beat.Event, started time.Time, sf stdfields.StdMonitorFields, isMulti bool) {
id := sf.ID
name := sf.Name
func addMonitorTimespan(sf stdfields.StdMonitorFields) jobs.JobWrapper {
return func(origJob jobs.Job) jobs.Job {
return func(event *beat.Event) ([]jobs.Job, error) {
cont, err := origJob(event)

// If multiple jobs are listed for this monitor, we can't have a single ID, so we hash the
// unique URLs to create unique suffixes for the monitor.
if isMulti {
url, err := event.GetValue("url.full")
if err != nil {
logp.Error(errors.Wrap(err, "Mandatory url.full key missing!"))
url = "n/a"
eventext.MergeEventFields(event, common.MapStr{
"monitor": common.MapStr{
"timespan": timespan(time.Now(), sf.Schedule, sf.Timeout),
},
})
return cont, err
}
urlHash, _ := hashstructure.Hash(url, nil)
id = fmt.Sprintf("%s-%x", sf.ID, urlHash)
}

// Allow jobs to override the ID, useful for browser suites
// which do this logic on their own
if v, _ := event.GetValue("monitor.id"); v != nil {
id = fmt.Sprintf("%s-%s", sf.ID, v.(string))
}
if v, _ := event.GetValue("monitor.name"); v != nil {
name = fmt.Sprintf("%s - %s", sf.Name, v.(string))
}
}

fieldsToMerge := common.MapStr{
"monitor": common.MapStr{
"id": id,
"name": name,
"type": sf.Type,
"timespan": timespan(started, sf.Schedule, sf.Timeout),
},
}
// Add service.name to monitors for APM interop
func addServiceName(sf stdfields.StdMonitorFields) jobs.JobWrapper {
return func(origJob jobs.Job) jobs.Job {
return func(event *beat.Event) ([]jobs.Job, error) {
cont, err := origJob(event)

// Add service.name for APM interop
if sf.Service.Name != "" {
fieldsToMerge["service"] = common.MapStr{
"name": sf.Service.Name,
if sf.Service.Name != "" {
eventext.MergeEventFields(event, common.MapStr{
"service": common.MapStr{
"name": sf.Service.Name,
},
})
}
return cont, err
}
}

eventext.MergeEventFields(event, fieldsToMerge)
}

func timespan(started time.Time, sched *schedule.Schedule, timeout time.Duration) common.MapStr {
Expand All @@ -142,7 +155,7 @@ func timespan(started time.Time, sched *schedule.Schedule, timeout time.Duration
// by the original Job will be set as a field. The original error will not be
// passed through as a return value. Errors may still be present but only if there
// is an actual error wrapping the error.
func addMonitorStatus(monitorType string, summaryOnly bool) jobs.JobWrapper {
func addMonitorStatus(summaryOnly bool) jobs.JobWrapper {
return func(origJob jobs.Job) jobs.Job {
return func(event *beat.Event) ([]jobs.Job, error) {
cont, err := origJob(event)
Expand Down
57 changes: 33 additions & 24 deletions heartbeat/monitors/wrappers/wrappers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ var testMonFields = stdfields.StdMonitorFields{
}

var testBrowserMonFields = stdfields.StdMonitorFields{
ID: "myid",
Name: "myname",
Type: "browser",
Schedule: schedule.MustParse("@every 1s"),
Timeout: 1,
Expand Down Expand Up @@ -396,23 +394,37 @@ func TestTimespan(t *testing.T) {
}
}

type BrowserMonitor struct {
id string
name string
checkGroup string
}

var inlineMonitorValues = BrowserMonitor{
id: "inline",
name: "inline",
checkGroup: "inline-check-group",
}

func makeInlineBrowserJob(t *testing.T, u string) jobs.Job {
parsed, err := url.Parse(u)
require.NoError(t, err)
return func(event *beat.Event) (i []jobs.Job, e error) {
eventext.MergeEventFields(event, common.MapStr{
"url": URLFields(parsed),
"monitor": common.MapStr{
"check_group": "inline-check-group",
"type": "browser",
"id": inlineMonitorValues.id,
"name": inlineMonitorValues.name,
"check_group": inlineMonitorValues.checkGroup,
},
})
return nil, nil
}
}

// Inline browser jobs function very similarly to lightweight jobs
// in that they don't override the ID.
// They do not, however, get a summary field added, nor duration.
// Browser inline jobs monitor information should not be altered
// by the wrappers as they are handled separately in synth enricher
func TestInlineBrowserJob(t *testing.T) {
fields := testBrowserMonFields
testCommonWrap(t, testDef{
Expand All @@ -425,10 +437,10 @@ func TestInlineBrowserJob(t *testing.T) {
urlValidator(t, "http://foo.com"),
lookslike.MustCompile(map[string]interface{}{
"monitor": map[string]interface{}{
"id": testMonFields.ID,
"name": testMonFields.Name,
"type": fields.Type,
"check_group": "inline-check-group",
"type": "browser",
"id": inlineMonitorValues.id,
"name": inlineMonitorValues.name,
"check_group": inlineMonitorValues.checkGroup,
},
}),
hbtestllext.MonitorTimespanValidator,
Expand All @@ -439,13 +451,9 @@ func TestInlineBrowserJob(t *testing.T) {
})
}

var suiteBrowserJobValues = struct {
id string
name string
checkGroup string
}{
id: "journey_1",
name: "Journey 1",
var suiteMonitorValues = BrowserMonitor{
id: "suite-journey_1",
name: "suite-Journey 1",
checkGroup: "journey-1-check-group",
}

Expand All @@ -456,9 +464,10 @@ func makeSuiteBrowserJob(t *testing.T, u string, summary bool, suiteErr error) j
eventext.MergeEventFields(event, common.MapStr{
"url": URLFields(parsed),
"monitor": common.MapStr{
"id": suiteBrowserJobValues.id,
"name": suiteBrowserJobValues.name,
"check_group": suiteBrowserJobValues.checkGroup,
"type": "browser",
"id": suiteMonitorValues.id,
"name": suiteMonitorValues.name,
"check_group": suiteMonitorValues.checkGroup,
},
})
if summary {
Expand All @@ -482,10 +491,10 @@ func TestSuiteBrowserJob(t *testing.T) {
urlU, _ := url.Parse(urlStr)
expectedMonFields := lookslike.MustCompile(map[string]interface{}{
"monitor": map[string]interface{}{
"id": fmt.Sprintf("%s-%s", testMonFields.ID, suiteBrowserJobValues.id),
"name": fmt.Sprintf("%s - %s", testMonFields.Name, suiteBrowserJobValues.name),
"type": fields.Type,
"check_group": suiteBrowserJobValues.checkGroup,
"type": "browser",
"id": suiteMonitorValues.id,
"name": suiteMonitorValues.name,
"check_group": suiteMonitorValues.checkGroup,
"timespan": common.MapStr{
"gte": hbtestllext.IsTime,
"lt": hbtestllext.IsTime,
Expand Down
14 changes: 12 additions & 2 deletions x-pack/heartbeat/monitors/browser/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ func (s *Suite) FilterJourneys() synthexec.FilterJourneyConfig {
return s.suiteCfg.FilterJourneys
}

func (s *Suite) Fields() synthexec.StdSuiteFields {
_, isInline := s.InlineSource()
return synthexec.StdSuiteFields{
Name: s.suiteCfg.Name,
Id: s.suiteCfg.Id,
IsInline: isInline,
Type: "browser",
}
}

func (s *Suite) Close() error {
if s.suiteCfg.Source.ActiveMemo != nil {
s.suiteCfg.Source.ActiveMemo.Close()
Expand Down Expand Up @@ -102,14 +112,14 @@ func (s *Suite) extraArgs() []string {
func (s *Suite) jobs() []jobs.Job {
var j jobs.Job
if src, ok := s.InlineSource(); ok {
j = synthexec.InlineJourneyJob(context.TODO(), src, s.Params(), s.extraArgs()...)
j = synthexec.InlineJourneyJob(context.TODO(), src, s.Params(), s.Fields(), s.extraArgs()...)
} else {
j = func(event *beat.Event) ([]jobs.Job, error) {
err := s.Fetch()
if err != nil {
return nil, fmt.Errorf("could not fetch for suite job: %w", err)
}
sj, err := synthexec.SuiteJob(context.TODO(), s.Workdir(), s.Params(), s.FilterJourneys(), s.extraArgs()...)
sj, err := synthexec.SuiteJob(context.TODO(), s.Workdir(), s.Params(), s.FilterJourneys(), s.Fields(), s.extraArgs()...)
if err != nil {
return nil, err
}
Expand Down
42 changes: 32 additions & 10 deletions x-pack/heartbeat/monitors/browser/synthexec/enrich.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
)

type enricher func(event *beat.Event, se *SynthEvent) error
type enricher func(event *beat.Event, se *SynthEvent, fields StdSuiteFields) error

type streamEnricher struct {
je *journeyEnricher
}

func (e *streamEnricher) enrich(event *beat.Event, se *SynthEvent) error {
func (e *streamEnricher) enrich(event *beat.Event, se *SynthEvent, fields StdSuiteFields) error {
if e.je == nil || (se != nil && se.Type == "journey/start") {
e.je = newJourneyEnricher()
}

return e.je.enrich(event, se)
return e.je.enrich(event, se, fields)
}

// journeyEnricher holds state across received SynthEvents retaining fields
Expand Down Expand Up @@ -62,7 +62,7 @@ func makeUuid() string {
return u.String()
}

func (je *journeyEnricher) enrich(event *beat.Event, se *SynthEvent) error {
func (je *journeyEnricher) enrich(event *beat.Event, se *SynthEvent, fields StdSuiteFields) error {
if se == nil {
return nil
}
Expand All @@ -84,20 +84,41 @@ func (je *journeyEnricher) enrich(event *beat.Event, se *SynthEvent) error {
}

eventext.MergeEventFields(event, common.MapStr{
"event": common.MapStr{
"type": se.Type,
},
"monitor": common.MapStr{
"check_group": je.checkGroup,
},
})
// Inline jobs have no journey
if je.journey != nil {

// Id and name differs for inline and suite monitors
// - We use the monitor id and name for inline journeys ignoring the
// autogenerated `inline`journey id and name.
// - Monitor id/name is concatenated with the journey id/name for
// suite monitors
id := fields.Id
name := fields.Name
if !fields.IsInline && je.journey != nil {
id = fmt.Sprintf("%s-%s", id, je.journey.Id)
name = fmt.Sprintf("%s - %s", name, je.journey.Name)
}
eventext.MergeEventFields(event, common.MapStr{
"monitor": common.MapStr{
"id": id,
"name": name,
},
})

// Write suite level fields for suite monitors
if !fields.IsInline {
eventext.MergeEventFields(event, common.MapStr{
"monitor": common.MapStr{
"id": je.journey.Id,
"name": je.journey.Name,
"suite": common.MapStr{
"id": fields.Id,
"name": fields.Name,
},
})
}

return je.enrichSynthEvent(event, se)
}

Expand Down Expand Up @@ -141,6 +162,7 @@ func (je *journeyEnricher) enrichSynthEvent(event *beat.Event, se *SynthEvent) e
// In that case we always want to issue an update op
event.Meta.Put(events.FieldMetaOpType, events.OpTypeCreate)
}

eventext.MergeEventFields(event, se.ToMap())

if je.urlFields == nil {
Expand Down
Loading

0 comments on commit e38c43a

Please sign in to comment.