Skip to content

Commit

Permalink
address review and improve test
Browse files Browse the repository at this point in the history
  • Loading branch information
vigneshshanmugam committed Feb 1, 2022
1 parent 0d6f438 commit c0b4cc0
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 68 deletions.
4 changes: 2 additions & 2 deletions x-pack/heartbeat/monitors/browser/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ func (s *Suite) FilterJourneys() synthexec.FilterJourneyConfig {
return s.suiteCfg.FilterJourneys
}

func (s *Suite) Fields() synthexec.StandardSuiteFields {
func (s *Suite) Fields() synthexec.StdSuiteFields {
_, isInline := s.InlineSource()
return synthexec.StandardSuiteFields{
return synthexec.StdSuiteFields{
Name: s.suiteCfg.Name,
Id: s.suiteCfg.Id,
IsInline: isInline,
Expand Down
7 changes: 4 additions & 3 deletions x-pack/heartbeat/monitors/browser/synthexec/enrich.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
)

type enricher func(event *beat.Event, se *SynthEvent, fields StandardSuiteFields) error
type enricher func(event *beat.Event, se *SynthEvent, fields StdSuiteFields) error

type streamEnricher struct {
je *journeyEnricher
}

func (e *streamEnricher) enrich(event *beat.Event, se *SynthEvent, fields StandardSuiteFields) error {
func (e *streamEnricher) enrich(event *beat.Event, se *SynthEvent, fields StdSuiteFields) error {
if e.je == nil || (se != nil && se.Type == "journey/start") {
e.je = newJourneyEnricher()
}
Expand Down Expand Up @@ -62,7 +62,7 @@ func makeUuid() string {
return u.String()
}

func (je *journeyEnricher) enrich(event *beat.Event, se *SynthEvent, fields StandardSuiteFields) error {
func (je *journeyEnricher) enrich(event *beat.Event, se *SynthEvent, fields StdSuiteFields) error {
if se == nil {
return nil
}
Expand Down Expand Up @@ -162,6 +162,7 @@ func (je *journeyEnricher) enrichSynthEvent(event *beat.Event, se *SynthEvent) e
// In that case we always want to issue an update op
event.Meta.Put(events.FieldMetaOpType, events.OpTypeCreate)
}

eventext.MergeEventFields(event, se.ToMap())

if je.urlFields == nil {
Expand Down
138 changes: 82 additions & 56 deletions x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/elastic/beats/v7/libbeat/processors/add_data_stream"
"github.com/elastic/go-lookslike"
"github.com/elastic/go-lookslike/testslike"
"github.com/elastic/go-lookslike/validator"
)

func makeStepEvent(typ string, ts float64, name string, index int, status string, urlstr string, err *SynthError) *SynthEvent {
Expand All @@ -34,7 +35,7 @@ func makeStepEvent(typ string, ts float64, name string, index int, status string
}

func TestJourneyEnricher(t *testing.T) {
var stdFields = StandardSuiteFields{
var stdFields = StdSuiteFields{
Id: "mysuite",
Name: "mysuite",
Type: "browser",
Expand Down Expand Up @@ -83,64 +84,83 @@ func TestJourneyEnricher(t *testing.T) {
journeyEnd,
}

je := &journeyEnricher{}

// We need an expectation for each input
// plus a final expectation for the summary which comes
// on the nil data.
for idx, se := range synthEvents {
e := &beat.Event{}
stdFields.IsInline = false
t.Run(fmt.Sprintf("suites monitor event %d", idx), func(t *testing.T) {
enrichErr := je.enrich(e, se, stdFields)

if se != nil && se.Type != "journey/end" {
// Test that the created event includes the mapped
// version of the event
testslike.Test(t, lookslike.MustCompile(se.ToMap()), e.Fields)
// check suite and monitor meta fields
testslike.Test(t, lookslike.MustCompile(common.MapStr{
"suite.id": stdFields.Id,
"suite.name": stdFields.Name,
"monitor.id": fmt.Sprintf("%s-%s", stdFields.Id, journey.Id),
"monitor.name": fmt.Sprintf("%s - %s", stdFields.Name, journey.Name),
}), e.Fields)

require.Equal(t, se.Timestamp().Unix(), e.Timestamp.Unix())

if se.Error != nil {
require.Equal(t, stepError(se.Error), enrichErr)
}
} else { // journey end gets a summary
require.Equal(t, stepError(syntherr), enrichErr)
suiteValidator := func() validator.Validator {
return lookslike.MustCompile(common.MapStr{
"suite.id": stdFields.Id,
"suite.name": stdFields.Name,
"monitor.id": fmt.Sprintf("%s-%s", stdFields.Id, journey.Id),
"monitor.name": fmt.Sprintf("%s - %s", stdFields.Name, journey.Name),
})
}
inlineValidator := func() validator.Validator {
return lookslike.MustCompile(common.MapStr{
"monitor.id": stdFields.Id,
"monitor.name": stdFields.Name,
})
}
commonValidator := func(se *SynthEvent) validator.Validator {
var v []validator.Validator

u, _ := url.Parse(url1)
t.Run("summary", func(t *testing.T) {
v := lookslike.MustCompile(common.MapStr{
"synthetics.type": "heartbeat/summary",
"url": wrappers.URLFields(u),
"monitor.duration.us": int64(journeyEnd.Timestamp().Sub(journeyStart.Timestamp()) / time.Microsecond),
})
// We need an expectation for each input plus a final
// expectation for the summary which comes on the nil data.
if se.Type != "journey/end" {
// Test that the created event includes the mapped
// version of the event
v = append(v, lookslike.MustCompile(se.ToMap()))
} else {
u, _ := url.Parse(url1)
// journey end gets a summary
v = append(v, lookslike.MustCompile(common.MapStr{
"synthetics.type": "heartbeat/summary",
"url": wrappers.URLFields(u),
"monitor.duration.us": int64(journeyEnd.Timestamp().Sub(journeyStart.Timestamp()) / time.Microsecond),
}))
}
return lookslike.Compose(v...)
}

testslike.Test(t, v, e.Fields)
})
je := &journeyEnricher{}
check := func(t *testing.T, se *SynthEvent, ssf StdSuiteFields) {
e := &beat.Event{}
t.Run(fmt.Sprintf("event: %s", se.Type), func(t *testing.T) {
enrichErr := je.enrich(e, se, ssf)
if se.Error != nil {
require.Equal(t, stepError(se.Error), enrichErr)
}
})
if ssf.IsInline {
sv, _ := e.Fields.GetValue("suite")
require.Nil(t, sv)
testslike.Test(t, inlineValidator(), e.Fields)
} else {
testslike.Test(t, suiteValidator(), e.Fields)
}
testslike.Test(t, commonValidator(se), e.Fields)

require.Equal(t, se.Timestamp().Unix(), e.Timestamp.Unix())
})
}

for idx, se := range synthEvents {
e := &beat.Event{}
stdFields.IsInline = true
t.Run(fmt.Sprintf("inline monitor event %d", idx), func(t *testing.T) {
je.enrich(e, se, stdFields)
testslike.Test(t, lookslike.MustCompile(common.MapStr{
"monitor.id": stdFields.Id,
"monitor.name": stdFields.Name,
}), e.Fields)
tests := []struct {
name string
isInline bool
se []*SynthEvent
}{
{
name: "suite monitor",
isInline: false,
},
{
name: "inline monitor",
isInline: true,
},
}

sv, _ := e.Fields.GetValue("suite")
require.Nil(t, sv)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
stdFields.IsInline = tt.isInline
for _, se := range synthEvents {
check(t, se, stdFields)
}
})
}
}
Expand All @@ -160,14 +180,17 @@ func TestEnrichConsoleSynthEvents(t *testing.T) {
Payload: common.MapStr{
"message": "Error from synthetics",
},
PackageVersion: "1.0.0",
},
func(t *testing.T, e *beat.Event, je *journeyEnricher) {
v := lookslike.MustCompile(common.MapStr{
"synthetics": common.MapStr{
"payload": common.MapStr{
"message": "Error from synthetics",
},
"type": "stderr",
"type": "stderr",
"package_version": "1.0.0",
"index": 0,
},
})
testslike.Test(t, v, e.Fields)
Expand All @@ -181,17 +204,20 @@ func TestEnrichConsoleSynthEvents(t *testing.T) {
Payload: common.MapStr{
"message": "debug output",
},
PackageVersion: "1.0.0",
},
func(t *testing.T, e *beat.Event, je *journeyEnricher) {
v := lookslike.MustCompile(common.MapStr{
"synthetics": common.MapStr{
"payload": common.MapStr{
"message": "debug output",
},
"type": "stdout",
"type": "stdout",
"package_version": "1.0.0",
"index": 0,
},
})
testslike.Test(t, v, e.Fields)
testslike.Test(t, lookslike.Strict(v), e.Fields)
},
},
}
Expand Down Expand Up @@ -349,7 +375,7 @@ func TestNoSummaryOnAfterHook(t *testing.T) {

for idx, se := range synthEvents {
e := &beat.Event{}
stdFields := StandardSuiteFields{IsInline: false}
stdFields := StdSuiteFields{IsInline: false}
t.Run(fmt.Sprintf("event %d", idx), func(t *testing.T) {
enrichErr := je.enrich(e, se, stdFields)

Expand Down
20 changes: 13 additions & 7 deletions x-pack/heartbeat/monitors/browser/synthexec/synthexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

const debugSelector = "synthexec"

type StandardSuiteFields struct {
type StdSuiteFields struct {
Name string
Id string
Type string
Expand All @@ -39,7 +39,7 @@ type FilterJourneyConfig struct {
}

// SuiteJob will run a single journey by name from the given suite.
func SuiteJob(ctx context.Context, suitePath string, params common.MapStr, filterJourneys FilterJourneyConfig, fields StandardSuiteFields, extraArgs ...string) (jobs.Job, error) {
func SuiteJob(ctx context.Context, suitePath string, params common.MapStr, filterJourneys FilterJourneyConfig, fields StdSuiteFields, extraArgs ...string) (jobs.Job, error) {
// Run the command in the given suitePath, use '.' as the first arg since the command runs
// in the correct dir
cmdFactory, err := suiteCommandFactory(suitePath, extraArgs...)
Expand Down Expand Up @@ -71,7 +71,7 @@ func suiteCommandFactory(suitePath string, args ...string) (func() *exec.Cmd, er
}

// InlineJourneyJob returns a job that runs the given source as a single journey.
func InlineJourneyJob(ctx context.Context, script string, params common.MapStr, fields StandardSuiteFields, extraArgs ...string) jobs.Job {
func InlineJourneyJob(ctx context.Context, script string, params common.MapStr, fields StdSuiteFields, extraArgs ...string) jobs.Job {
newCmd := func() *exec.Cmd {
return exec.Command("elastic-synthetics", append(extraArgs, "--inline")...)
}
Expand All @@ -82,7 +82,7 @@ func InlineJourneyJob(ctx context.Context, script string, params common.MapStr,
// startCmdJob adapts commands into a heartbeat job. This is a little awkward given that the command's output is
// available via a sequence of events in the multiplexer, while heartbeat jobs are tail recursive continuations.
// Here, we adapt one to the other, where each recursive job pulls another item off the chan until none are left.
func startCmdJob(ctx context.Context, newCmd func() *exec.Cmd, stdinStr *string, params common.MapStr, filterJourneys FilterJourneyConfig, fields StandardSuiteFields) jobs.Job {
func startCmdJob(ctx context.Context, newCmd func() *exec.Cmd, stdinStr *string, params common.MapStr, filterJourneys FilterJourneyConfig, fields StdSuiteFields) jobs.Job {
return func(event *beat.Event) ([]jobs.Job, error) {
mpx, err := runCmd(ctx, newCmd(), stdinStr, params, filterJourneys)
if err != nil {
Expand All @@ -95,7 +95,7 @@ func startCmdJob(ctx context.Context, newCmd func() *exec.Cmd, stdinStr *string,

// readResultsJob adapts the output of an ExecMultiplexer into a Job, that uses continuations
// to read all output.
func readResultsJob(ctx context.Context, synthEvents <-chan *SynthEvent, enrich enricher, fields StandardSuiteFields) jobs.Job {
func readResultsJob(ctx context.Context, synthEvents <-chan *SynthEvent, enrich enricher, fields StdSuiteFields) jobs.Job {
return func(event *beat.Event) (conts []jobs.Job, err error) {
se := <-synthEvents
err = enrich(event, se, fields)
Expand Down Expand Up @@ -160,14 +160,20 @@ func runCmd(
wg := sync.WaitGroup{}

// Send stdout into the output
stdoutPipe, _ := cmd.StdoutPipe()
stdoutPipe, err := cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("could not open stdout pipe: %w", err)
}
wg.Add(1)
go func() {
scanToSynthEvents(stdoutPipe, stdoutToSynthEvent, mpx.writeSynthEvent)
wg.Done()
}()

stderrPipe, _ := cmd.StderrPipe()
stderrPipe, err := cmd.StderrPipe()
if err != nil {
return nil, fmt.Errorf("could not open stderr pipe: %w", err)
}
wg.Add(1)
go func() {
scanToSynthEvents(stderrPipe, stderrToSynthEvent, mpx.writeSynthEvent)
Expand Down

0 comments on commit c0b4cc0

Please sign in to comment.