Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.17](backport #30013) [Heartbeat]: catch all error handler for browser jobs #30126

Merged
merged 1 commit into from
Feb 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 56 additions & 43 deletions heartbeat/monitors/wrappers/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ func WrapLightweight(js []jobs.Job, stdMonFields stdfields.StdMonitorFields) []j
return jobs.WrapAllSeparately(
jobs.WrapAll(
js,
addMonitorTimespan(stdMonFields),
addServiceName(stdMonFields),
addMonitorMeta(stdMonFields, len(js) > 1),
addMonitorStatus(stdMonFields.Type, false),
addMonitorStatus(false),
addMonitorDuration,
),
func() jobs.JobWrapper {
Expand All @@ -65,64 +67,75 @@ func WrapLightweight(js []jobs.Job, stdMonFields stdfields.StdMonitorFields) []j
func WrapBrowser(js []jobs.Job, stdMonFields stdfields.StdMonitorFields) []jobs.Job {
return jobs.WrapAll(
js,
addMonitorMeta(stdMonFields, len(js) > 1),
addMonitorStatus(stdMonFields.Type, true),
addMonitorTimespan(stdMonFields),
addServiceName(stdMonFields),
addMonitorStatus(true),
)
}

// addMonitorMeta adds the id, name, and type fields to the monitor.
func addMonitorMeta(stdMonFields stdfields.StdMonitorFields, isMulti bool) jobs.JobWrapper {
func addMonitorMeta(sf stdfields.StdMonitorFields, isMulti bool) jobs.JobWrapper {
return func(job jobs.Job) jobs.Job {
return func(event *beat.Event) ([]jobs.Job, error) {
cont, e := job(event)
addMonitorMetaFields(event, time.Now(), stdMonFields, isMulti)
return cont, e
cont, err := job(event)

id := sf.ID
name := sf.Name
// If multiple jobs are listed for this monitor, we can't have a single ID, so we hash the
// unique URLs to create unique suffixes for the monitor.
if isMulti {
url, err := event.GetValue("url.full")
if err != nil {
logp.Error(errors.Wrap(err, "Mandatory url.full key missing!"))
url = "n/a"
}
urlHash, _ := hashstructure.Hash(url, nil)
id = fmt.Sprintf("%s-%x", sf.ID, urlHash)
}

eventext.MergeEventFields(event, common.MapStr{
"monitor": common.MapStr{
"id": id,
"name": name,
"type": sf.Type,
},
})
return cont, err
}
}
}

func addMonitorMetaFields(event *beat.Event, started time.Time, sf stdfields.StdMonitorFields, isMulti bool) {
id := sf.ID
name := sf.Name
func addMonitorTimespan(sf stdfields.StdMonitorFields) jobs.JobWrapper {
return func(origJob jobs.Job) jobs.Job {
return func(event *beat.Event) ([]jobs.Job, error) {
cont, err := origJob(event)

// If multiple jobs are listed for this monitor, we can't have a single ID, so we hash the
// unique URLs to create unique suffixes for the monitor.
if isMulti {
url, err := event.GetValue("url.full")
if err != nil {
logp.Error(errors.Wrap(err, "Mandatory url.full key missing!"))
url = "n/a"
eventext.MergeEventFields(event, common.MapStr{
"monitor": common.MapStr{
"timespan": timespan(time.Now(), sf.Schedule, sf.Timeout),
},
})
return cont, err
}
urlHash, _ := hashstructure.Hash(url, nil)
id = fmt.Sprintf("%s-%x", sf.ID, urlHash)
}

// Allow jobs to override the ID, useful for browser suites
// which do this logic on their own
if v, _ := event.GetValue("monitor.id"); v != nil {
id = fmt.Sprintf("%s-%s", sf.ID, v.(string))
}
if v, _ := event.GetValue("monitor.name"); v != nil {
name = fmt.Sprintf("%s - %s", sf.Name, v.(string))
}
}

fieldsToMerge := common.MapStr{
"monitor": common.MapStr{
"id": id,
"name": name,
"type": sf.Type,
"timespan": timespan(started, sf.Schedule, sf.Timeout),
},
}
// Add service.name to monitors for APM interop
func addServiceName(sf stdfields.StdMonitorFields) jobs.JobWrapper {
return func(origJob jobs.Job) jobs.Job {
return func(event *beat.Event) ([]jobs.Job, error) {
cont, err := origJob(event)

// Add service.name for APM interop
if sf.Service.Name != "" {
fieldsToMerge["service"] = common.MapStr{
"name": sf.Service.Name,
if sf.Service.Name != "" {
eventext.MergeEventFields(event, common.MapStr{
"service": common.MapStr{
"name": sf.Service.Name,
},
})
}
return cont, err
}
}

eventext.MergeEventFields(event, fieldsToMerge)
}

func timespan(started time.Time, sched *schedule.Schedule, timeout time.Duration) common.MapStr {
Expand All @@ -142,7 +155,7 @@ func timespan(started time.Time, sched *schedule.Schedule, timeout time.Duration
// by the original Job will be set as a field. The original error will not be
// passed through as a return value. Errors may still be present but only if there
// is an actual error wrapping the error.
func addMonitorStatus(monitorType string, summaryOnly bool) jobs.JobWrapper {
func addMonitorStatus(summaryOnly bool) jobs.JobWrapper {
return func(origJob jobs.Job) jobs.Job {
return func(event *beat.Event) ([]jobs.Job, error) {
cont, err := origJob(event)
Expand Down
57 changes: 33 additions & 24 deletions heartbeat/monitors/wrappers/wrappers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ var testMonFields = stdfields.StdMonitorFields{
}

var testBrowserMonFields = stdfields.StdMonitorFields{
ID: "myid",
Name: "myname",
Type: "browser",
Schedule: schedule.MustParse("@every 1s"),
Timeout: 1,
Expand Down Expand Up @@ -396,23 +394,37 @@ func TestTimespan(t *testing.T) {
}
}

type BrowserMonitor struct {
id string
name string
checkGroup string
}

var inlineMonitorValues = BrowserMonitor{
id: "inline",
name: "inline",
checkGroup: "inline-check-group",
}

func makeInlineBrowserJob(t *testing.T, u string) jobs.Job {
parsed, err := url.Parse(u)
require.NoError(t, err)
return func(event *beat.Event) (i []jobs.Job, e error) {
eventext.MergeEventFields(event, common.MapStr{
"url": URLFields(parsed),
"monitor": common.MapStr{
"check_group": "inline-check-group",
"type": "browser",
"id": inlineMonitorValues.id,
"name": inlineMonitorValues.name,
"check_group": inlineMonitorValues.checkGroup,
},
})
return nil, nil
}
}

// Inline browser jobs function very similarly to lightweight jobs
// in that they don't override the ID.
// They do not, however, get a summary field added, nor duration.
// Browser inline jobs monitor information should not be altered
// by the wrappers as they are handled separately in synth enricher
func TestInlineBrowserJob(t *testing.T) {
fields := testBrowserMonFields
testCommonWrap(t, testDef{
Expand All @@ -425,10 +437,10 @@ func TestInlineBrowserJob(t *testing.T) {
urlValidator(t, "http://foo.com"),
lookslike.MustCompile(map[string]interface{}{
"monitor": map[string]interface{}{
"id": testMonFields.ID,
"name": testMonFields.Name,
"type": fields.Type,
"check_group": "inline-check-group",
"type": "browser",
"id": inlineMonitorValues.id,
"name": inlineMonitorValues.name,
"check_group": inlineMonitorValues.checkGroup,
},
}),
hbtestllext.MonitorTimespanValidator,
Expand All @@ -439,13 +451,9 @@ func TestInlineBrowserJob(t *testing.T) {
})
}

var suiteBrowserJobValues = struct {
id string
name string
checkGroup string
}{
id: "journey_1",
name: "Journey 1",
var suiteMonitorValues = BrowserMonitor{
id: "suite-journey_1",
name: "suite-Journey 1",
checkGroup: "journey-1-check-group",
}

Expand All @@ -456,9 +464,10 @@ func makeSuiteBrowserJob(t *testing.T, u string, summary bool, suiteErr error) j
eventext.MergeEventFields(event, common.MapStr{
"url": URLFields(parsed),
"monitor": common.MapStr{
"id": suiteBrowserJobValues.id,
"name": suiteBrowserJobValues.name,
"check_group": suiteBrowserJobValues.checkGroup,
"type": "browser",
"id": suiteMonitorValues.id,
"name": suiteMonitorValues.name,
"check_group": suiteMonitorValues.checkGroup,
},
})
if summary {
Expand All @@ -482,10 +491,10 @@ func TestSuiteBrowserJob(t *testing.T) {
urlU, _ := url.Parse(urlStr)
expectedMonFields := lookslike.MustCompile(map[string]interface{}{
"monitor": map[string]interface{}{
"id": fmt.Sprintf("%s-%s", testMonFields.ID, suiteBrowserJobValues.id),
"name": fmt.Sprintf("%s - %s", testMonFields.Name, suiteBrowserJobValues.name),
"type": fields.Type,
"check_group": suiteBrowserJobValues.checkGroup,
"type": "browser",
"id": suiteMonitorValues.id,
"name": suiteMonitorValues.name,
"check_group": suiteMonitorValues.checkGroup,
"timespan": common.MapStr{
"gte": hbtestllext.IsTime,
"lt": hbtestllext.IsTime,
Expand Down
14 changes: 12 additions & 2 deletions x-pack/heartbeat/monitors/browser/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ func (s *Suite) FilterJourneys() synthexec.FilterJourneyConfig {
return s.suiteCfg.FilterJourneys
}

func (s *Suite) Fields() synthexec.StdSuiteFields {
_, isInline := s.InlineSource()
return synthexec.StdSuiteFields{
Name: s.suiteCfg.Name,
Id: s.suiteCfg.Id,
IsInline: isInline,
Type: "browser",
}
}

func (s *Suite) Close() error {
if s.suiteCfg.Source.ActiveMemo != nil {
s.suiteCfg.Source.ActiveMemo.Close()
Expand Down Expand Up @@ -102,14 +112,14 @@ func (s *Suite) extraArgs() []string {
func (s *Suite) jobs() []jobs.Job {
var j jobs.Job
if src, ok := s.InlineSource(); ok {
j = synthexec.InlineJourneyJob(context.TODO(), src, s.Params(), s.extraArgs()...)
j = synthexec.InlineJourneyJob(context.TODO(), src, s.Params(), s.Fields(), s.extraArgs()...)
} else {
j = func(event *beat.Event) ([]jobs.Job, error) {
err := s.Fetch()
if err != nil {
return nil, fmt.Errorf("could not fetch for suite job: %w", err)
}
sj, err := synthexec.SuiteJob(context.TODO(), s.Workdir(), s.Params(), s.FilterJourneys(), s.extraArgs()...)
sj, err := synthexec.SuiteJob(context.TODO(), s.Workdir(), s.Params(), s.FilterJourneys(), s.Fields(), s.extraArgs()...)
if err != nil {
return nil, err
}
Expand Down
42 changes: 32 additions & 10 deletions x-pack/heartbeat/monitors/browser/synthexec/enrich.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
)

type enricher func(event *beat.Event, se *SynthEvent) error
type enricher func(event *beat.Event, se *SynthEvent, fields StdSuiteFields) error

type streamEnricher struct {
je *journeyEnricher
}

func (e *streamEnricher) enrich(event *beat.Event, se *SynthEvent) error {
func (e *streamEnricher) enrich(event *beat.Event, se *SynthEvent, fields StdSuiteFields) error {
if e.je == nil || (se != nil && se.Type == "journey/start") {
e.je = newJourneyEnricher()
}

return e.je.enrich(event, se)
return e.je.enrich(event, se, fields)
}

// journeyEnricher holds state across received SynthEvents retaining fields
Expand Down Expand Up @@ -62,7 +62,7 @@ func makeUuid() string {
return u.String()
}

func (je *journeyEnricher) enrich(event *beat.Event, se *SynthEvent) error {
func (je *journeyEnricher) enrich(event *beat.Event, se *SynthEvent, fields StdSuiteFields) error {
if se == nil {
return nil
}
Expand All @@ -84,20 +84,41 @@ func (je *journeyEnricher) enrich(event *beat.Event, se *SynthEvent) error {
}

eventext.MergeEventFields(event, common.MapStr{
"event": common.MapStr{
"type": se.Type,
},
"monitor": common.MapStr{
"check_group": je.checkGroup,
},
})
// Inline jobs have no journey
if je.journey != nil {

// Id and name differs for inline and suite monitors
// - We use the monitor id and name for inline journeys ignoring the
// autogenerated `inline`journey id and name.
// - Monitor id/name is concatenated with the journey id/name for
// suite monitors
id := fields.Id
name := fields.Name
if !fields.IsInline && je.journey != nil {
id = fmt.Sprintf("%s-%s", id, je.journey.Id)
name = fmt.Sprintf("%s - %s", name, je.journey.Name)
}
eventext.MergeEventFields(event, common.MapStr{
"monitor": common.MapStr{
"id": id,
"name": name,
},
})

// Write suite level fields for suite monitors
if !fields.IsInline {
eventext.MergeEventFields(event, common.MapStr{
"monitor": common.MapStr{
"id": je.journey.Id,
"name": je.journey.Name,
"suite": common.MapStr{
"id": fields.Id,
"name": fields.Name,
},
})
}

return je.enrichSynthEvent(event, se)
}

Expand Down Expand Up @@ -141,6 +162,7 @@ func (je *journeyEnricher) enrichSynthEvent(event *beat.Event, se *SynthEvent) e
// In that case we always want to issue an update op
event.Meta.Put(events.FieldMetaOpType, events.OpTypeCreate)
}

eventext.MergeEventFields(event, se.ToMap())

if je.urlFields == nil {
Expand Down
Loading