Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Heartbeat]: catch all error handler for browser jobs #30013

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 56 additions & 43 deletions heartbeat/monitors/wrappers/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ func WrapLightweight(js []jobs.Job, stdMonFields stdfields.StdMonitorFields) []j
return jobs.WrapAllSeparately(
jobs.WrapAll(
js,
addMonitorTimespan(stdMonFields),
addServiceName(stdMonFields),
addMonitorMeta(stdMonFields, len(js) > 1),
addMonitorStatus(stdMonFields.Type, false),
addMonitorStatus(false),
addMonitorDuration,
),
func() jobs.JobWrapper {
Expand All @@ -65,64 +67,75 @@ func WrapLightweight(js []jobs.Job, stdMonFields stdfields.StdMonitorFields) []j
func WrapBrowser(js []jobs.Job, stdMonFields stdfields.StdMonitorFields) []jobs.Job {
return jobs.WrapAll(
js,
addMonitorMeta(stdMonFields, len(js) > 1),
addMonitorStatus(stdMonFields.Type, true),
addMonitorTimespan(stdMonFields),
vigneshshanmugam marked this conversation as resolved.
Show resolved Hide resolved
addServiceName(stdMonFields),
addMonitorStatus(true),
)
}

// addMonitorMeta adds the id, name, and type fields to the monitor.
func addMonitorMeta(stdMonFields stdfields.StdMonitorFields, isMulti bool) jobs.JobWrapper {
func addMonitorMeta(sf stdfields.StdMonitorFields, isMulti bool) jobs.JobWrapper {
return func(job jobs.Job) jobs.Job {
return func(event *beat.Event) ([]jobs.Job, error) {
cont, e := job(event)
addMonitorMetaFields(event, time.Now(), stdMonFields, isMulti)
return cont, e
cont, err := job(event)

id := sf.ID
name := sf.Name
// If multiple jobs are listed for this monitor, we can't have a single ID, so we hash the
// unique URLs to create unique suffixes for the monitor.
if isMulti {
url, err := event.GetValue("url.full")
if err != nil {
logp.Error(errors.Wrap(err, "Mandatory url.full key missing!"))
url = "n/a"
}
urlHash, _ := hashstructure.Hash(url, nil)
id = fmt.Sprintf("%s-%x", sf.ID, urlHash)
}

eventext.MergeEventFields(event, common.MapStr{
"monitor": common.MapStr{
"id": id,
"name": name,
"type": sf.Type,
},
})
return cont, err
}
}
}

func addMonitorMetaFields(event *beat.Event, started time.Time, sf stdfields.StdMonitorFields, isMulti bool) {
id := sf.ID
name := sf.Name
func addMonitorTimespan(sf stdfields.StdMonitorFields) jobs.JobWrapper {
return func(origJob jobs.Job) jobs.Job {
return func(event *beat.Event) ([]jobs.Job, error) {
cont, err := origJob(event)

// If multiple jobs are listed for this monitor, we can't have a single ID, so we hash the
// unique URLs to create unique suffixes for the monitor.
if isMulti {
url, err := event.GetValue("url.full")
if err != nil {
logp.Error(errors.Wrap(err, "Mandatory url.full key missing!"))
url = "n/a"
eventext.MergeEventFields(event, common.MapStr{
"monitor": common.MapStr{
"timespan": timespan(time.Now(), sf.Schedule, sf.Timeout),
},
})
return cont, err
}
urlHash, _ := hashstructure.Hash(url, nil)
id = fmt.Sprintf("%s-%x", sf.ID, urlHash)
}
}

fieldsToMerge := common.MapStr{
"monitor": common.MapStr{
"timespan": timespan(started, sf.Schedule, sf.Timeout),
},
}
// Browser monitor fields are enriched separately
// in the synthexec package - x-pack/heartbeat/monitors/browser/synthexec/enrich.go
if sf.Type != "browser" {
fieldsToMerge = common.MapStr{
"monitor": common.MapStr{
"id": id,
"name": name,
"type": sf.Type,
"timespan": timespan(started, sf.Schedule, sf.Timeout),
},
}
}
// Add service.name to monitors for APM interop
func addServiceName(sf stdfields.StdMonitorFields) jobs.JobWrapper {
return func(origJob jobs.Job) jobs.Job {
return func(event *beat.Event) ([]jobs.Job, error) {
cont, err := origJob(event)

// Add service.name for APM interop
if sf.Service.Name != "" {
fieldsToMerge["service"] = common.MapStr{
"name": sf.Service.Name,
if sf.Service.Name != "" {
eventext.MergeEventFields(event, common.MapStr{
"service": common.MapStr{
"name": sf.Service.Name,
},
})
}
return cont, err
}
}

eventext.MergeEventFields(event, fieldsToMerge)
}

func timespan(started time.Time, sched *schedule.Schedule, timeout time.Duration) common.MapStr {
Expand All @@ -142,7 +155,7 @@ func timespan(started time.Time, sched *schedule.Schedule, timeout time.Duration
// by the original Job will be set as a field. The original error will not be
// passed through as a return value. Errors may still be present but only if there
// is an actual error wrapping the error.
func addMonitorStatus(monitorType string, summaryOnly bool) jobs.JobWrapper {
func addMonitorStatus(summaryOnly bool) jobs.JobWrapper {
return func(origJob jobs.Job) jobs.Job {
return func(event *beat.Event) ([]jobs.Job, error) {
cont, err := origJob(event)
Expand Down
10 changes: 5 additions & 5 deletions x-pack/heartbeat/monitors/browser/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ func (s *Suite) FilterJourneys() synthexec.FilterJourneyConfig {
}

func (s *Suite) Fields() synthexec.StandardSuiteFields {
_, inline := s.InlineSource()
_, isInline := s.InlineSource()
return synthexec.StandardSuiteFields{
Name: s.suiteCfg.Name,
Id: s.suiteCfg.Id,
Inline: inline,
Type: "browser",
Name: s.suiteCfg.Name,
Id: s.suiteCfg.Id,
IsInline: isInline,
Type: "browser",
}
}

Expand Down
15 changes: 8 additions & 7 deletions x-pack/heartbeat/monitors/browser/synthexec/enrich.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,13 @@ func (je *journeyEnricher) enrich(event *beat.Event, se *SynthEvent, fields Stan
})

// Id and name differs for inline and suite monitors
// - We use the monitor id and name for inline journeys
// - We use the monitor id and name for inline journeys ignoring the
// autogenerated `inline`journey id and name.
// - Monitor id/name is concatenated with the journey id/name for
// suite journeys
// suite monitors
id := fields.Id
name := fields.Name
if je.journey != nil {
if !fields.IsInline && je.journey != nil {
id = fmt.Sprintf("%s-%s", id, je.journey.Id)
name = fmt.Sprintf("%s - %s", name, je.journey.Name)
}
Expand All @@ -109,12 +110,12 @@ func (je *journeyEnricher) enrich(event *beat.Event, se *SynthEvent, fields Stan
},
})

// Write suite level fields for suite jobs
if !fields.Inline {
// Write suite level fields for suite monitors
if !fields.IsInline {
eventext.MergeEventFields(event, common.MapStr{
"suite": common.MapStr{
"id": id,
"name": name,
"id": fields.Id,
"name": fields.Name,
},
})
}
Expand Down
100 changes: 95 additions & 5 deletions x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ func makeStepEvent(typ string, ts float64, name string, index int, status string
}

func TestJourneyEnricher(t *testing.T) {
var stdFields = StandardSuiteFields{
Id: "mysuite",
Name: "mysuite",
Type: "browser",
IsInline: false,
}
journey := &Journey{
Name: "A Journey Name",
Id: "my-journey-id",
Expand Down Expand Up @@ -84,14 +90,22 @@ func TestJourneyEnricher(t *testing.T) {
// on the nil data.
for idx, se := range synthEvents {
e := &beat.Event{}
stdFields := StandardSuiteFields{Inline: false}
t.Run(fmt.Sprintf("event %d", idx), func(t *testing.T) {
stdFields.IsInline = false
vigneshshanmugam marked this conversation as resolved.
Show resolved Hide resolved
t.Run(fmt.Sprintf("suites monitor event %d", idx), func(t *testing.T) {
enrichErr := je.enrich(e, se, stdFields)

if se != nil && se.Type != "journey/end" {
// Test that the created event includes the mapped
// version of the event
testslike.Test(t, lookslike.MustCompile(se.ToMap()), e.Fields)
// check suite and monitor meta fields
testslike.Test(t, lookslike.MustCompile(common.MapStr{
"suite.id": stdFields.Id,
"suite.name": stdFields.Name,
"monitor.id": fmt.Sprintf("%s-%s", stdFields.Id, journey.Id),
"monitor.name": fmt.Sprintf("%s - %s", stdFields.Name, journey.Name),
}), e.Fields)

require.Equal(t, se.Timestamp().Unix(), e.Timestamp.Unix())

if se.Error != nil {
Expand All @@ -112,6 +126,82 @@ func TestJourneyEnricher(t *testing.T) {
})
}
})

}

for idx, se := range synthEvents {
e := &beat.Event{}
stdFields.IsInline = true
t.Run(fmt.Sprintf("inline monitor event %d", idx), func(t *testing.T) {
je.enrich(e, se, stdFields)
testslike.Test(t, lookslike.MustCompile(common.MapStr{
"monitor.id": stdFields.Id,
"monitor.name": stdFields.Name,
}), e.Fields)

sv, _ := e.Fields.GetValue("suite")
require.Nil(t, sv)
})
}
}

func TestEnrichConsoleSynthEvents(t *testing.T) {
tests := []struct {
name string
je *journeyEnricher
se *SynthEvent
check func(t *testing.T, e *beat.Event, je *journeyEnricher)
}{
{
"stderr",
&journeyEnricher{},
&SynthEvent{
Type: "stderr",
Payload: common.MapStr{
"message": "Error from synthetics",
},
},
func(t *testing.T, e *beat.Event, je *journeyEnricher) {
v := lookslike.MustCompile(common.MapStr{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would there be any value in also testing the monitor.* values that are set here? Also, as a note, you can use lookslike.Strict to ensure that there are no extra fields if that's appropriate. It just wraps the outer lookslike.MustCompile, but you do have to define every field.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if there would be any value, I added it to denote the case when there are no journeys ran and this was the only message from console.

"synthetics": common.MapStr{
"payload": common.MapStr{
"message": "Error from synthetics",
},
"type": "stderr",
},
})
testslike.Test(t, v, e.Fields)
},
},
{
"stdout",
&journeyEnricher{},
&SynthEvent{
Type: "stdout",
Payload: common.MapStr{
"message": "debug output",
},
},
func(t *testing.T, e *beat.Event, je *journeyEnricher) {
v := lookslike.MustCompile(common.MapStr{
"synthetics": common.MapStr{
"payload": common.MapStr{
"message": "debug output",
},
"type": "stdout",
},
})
testslike.Test(t, v, e.Fields)
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := &beat.Event{}
tt.je.enrichSynthEvent(e, tt.se)
tt.check(t, e, tt.je)
})
}
}

Expand All @@ -132,7 +222,7 @@ func TestEnrichSynthEvent(t *testing.T) {
},
true,
func(t *testing.T, e *beat.Event, je *journeyEnricher) {
v := lookslike.MustCompile(map[string]interface{}{
v := lookslike.MustCompile(common.MapStr{
"summary": map[string]int{
"up": 0,
"down": 1,
Expand All @@ -147,7 +237,7 @@ func TestEnrichSynthEvent(t *testing.T) {
&SynthEvent{Type: "journey/end"},
false,
func(t *testing.T, e *beat.Event, je *journeyEnricher) {
v := lookslike.MustCompile(map[string]interface{}{
v := lookslike.MustCompile(common.MapStr{
"summary": map[string]int{
"up": 1,
"down": 0,
Expand Down Expand Up @@ -259,7 +349,7 @@ func TestNoSummaryOnAfterHook(t *testing.T) {

for idx, se := range synthEvents {
e := &beat.Event{}
stdFields := StandardSuiteFields{Inline: false}
stdFields := StandardSuiteFields{IsInline: false}
t.Run(fmt.Sprintf("event %d", idx), func(t *testing.T) {
enrichErr := je.enrich(e, se, stdFields)

Expand Down
25 changes: 8 additions & 17 deletions x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ import (
)

type ExecMultiplexer struct {
currentJourney *atomic.Bool
eventCounter *atomic.Int
synthEvents chan *SynthEvent
done chan struct{}
eventCounter *atomic.Int
synthEvents chan *SynthEvent
done chan struct{}
}

func (e ExecMultiplexer) Close() {
Expand All @@ -25,23 +24,16 @@ func (e ExecMultiplexer) writeSynthEvent(se *SynthEvent) {
}

if se.Type == "journey/start" {
e.currentJourney.Store(true)
e.eventCounter.Store(-1)
}
se.index = e.eventCounter.Inc()

if se.Type == "journey/end" || se.Type == "cmd/status" {
e.currentJourney.Store(false)
}
e.synthEvents <- se
vigneshshanmugam marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a weird thing to consider, which is errors between journeys.

We could still need to check for journey/end to account for errors that might happen between journeys. I think we can just set e.currentJourney.store(false) again, no? Then it just gets reported as an error on the suite.

Our long term goal is for heartbeat to really only execute one journey per monitor (with discovery mode) so I'm OK with not handling this for the moment and just tacking those errors on the previous journey. Also, I don't know what an error between journeys would even be.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if there would be error between journeys, But with this change we would still get stderr messages on the suite level, wouldnt that be enough to figure out any errors?

}

// SynthEvents returns a read only channel for synth events
func (e ExecMultiplexer) SynthEvents(inline bool) <-chan *SynthEvent {
if inline || e.currentJourney.Load() {
return e.synthEvents
}
return make(chan *SynthEvent)
func (e ExecMultiplexer) SynthEvents() <-chan *SynthEvent {
return e.synthEvents
}

// Done returns a channel that is closed when all output has been received
Expand All @@ -56,9 +48,8 @@ func (e ExecMultiplexer) Wait() {

func NewExecMultiplexer() *ExecMultiplexer {
return &ExecMultiplexer{
currentJourney: atomic.NewBool(false),
eventCounter: atomic.NewInt(-1), // Start from -1 so first call to Inc returns 0
synthEvents: make(chan *SynthEvent),
done: make(chan struct{}),
eventCounter: atomic.NewInt(-1), // Start from -1 so first call to Inc returns 0
synthEvents: make(chan *SynthEvent),
done: make(chan struct{}),
}
}
Loading