Skip to content

Commit

Permalink
Additional Heartbeat Reload Tests (#8228)
Browse files Browse the repository at this point in the history
* Adds a mocked test of the Monitor type
* Minimizes the required mockable area by introducing a PipelineConnector type that is easier to mock than a Pipeline.
* Introduces a number of useful mocks for future tests potentially.
  • Loading branch information
andrewvc authored Oct 3, 2018
1 parent 7798003 commit 0417f98
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 13 deletions.
124 changes: 124 additions & 0 deletions heartbeat/monitors/mocks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package monitors

import (
"fmt"
"sync"
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
)

type MockBeatClient struct {
publishes []beat.Event
closed bool
mtx sync.Mutex
}

func (c *MockBeatClient) Publish(e beat.Event) {
c.PublishAll([]beat.Event{e})
}

func (c *MockBeatClient) PublishAll(events []beat.Event) {
c.mtx.Lock()
defer c.mtx.Unlock()

for _, e := range events {
c.publishes = append(c.publishes, e)
}
}

func (c *MockBeatClient) Close() error {
c.mtx.Lock()
defer c.mtx.Unlock()

if c.closed {
return fmt.Errorf("mock client already closed")
}

c.closed = true
return nil
}

func (c *MockBeatClient) Publishes() []beat.Event {
c.mtx.Lock()
defer c.mtx.Unlock()

dst := make([]beat.Event, len(c.publishes))
copy(dst, c.publishes)
return dst
}

type MockPipelineConnector struct {
clients []*MockBeatClient
mtx sync.Mutex
}

func (pc *MockPipelineConnector) Connect() (beat.Client, error) {
return pc.ConnectWith(beat.ClientConfig{})
}

func (pc *MockPipelineConnector) ConnectWith(beat.ClientConfig) (beat.Client, error) {
pc.mtx.Lock()
defer pc.mtx.Unlock()

c := &MockBeatClient{}

pc.clients = append(pc.clients, c)

return c, nil
}

func createMockJob(name string, cfg *common.Config) ([]Job, error) {
j := MakeSimpleJob(JobSettings{}, func() (common.MapStr, error) {
return common.MapStr{
"foo": "bar",
}, nil
})

return []Job{j}, nil
}

func mockPluginBuilder() pluginBuilder {
return pluginBuilder{"test", ActiveMonitor, func(s string, config *common.Config) ([]Job, error) {
c := common.Config{}
j, err := createMockJob("test", &c)
return j, err
}}
}

func mockPluginsReg() *pluginsReg {
reg := newPluginsReg()
reg.add(mockPluginBuilder())
return reg
}

func mockPluginConf(t *testing.T, schedule string, url string) *common.Config {
conf, err := common.NewConfigFrom(map[string]interface{}{
"type": "test",
"urls": []string{url},
"schedule": schedule,
})
require.NoError(t, err)

return conf
}
20 changes: 10 additions & 10 deletions heartbeat/monitors/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Monitor struct {
watchPollTasks []*task
watch watcher.Watch

pipeline beat.Pipeline
pipelineConnector beat.PipelineConnector
}

// String prints a description of the monitor in a threadsafe way. It is important that this use threadsafe
Expand All @@ -70,7 +70,7 @@ var ErrWatchesDisabled = errors.New("watch poll files are only allowed in heartb
func newMonitor(
config *common.Config,
registrar *pluginsReg,
pipeline beat.Pipeline,
pipelineConnector beat.PipelineConnector,
scheduler *scheduler.Scheduler,
allowWatches bool,
) (*Monitor, error) {
Expand All @@ -84,17 +84,17 @@ func newMonitor(

monitorPlugin, found := registrar.get(mpi.Type)
if !found {
return nil, fmt.Errorf("monitor type %v does not exist", mpi.Type)
return nil, fmt.Errorf("monitor type %v does not exist, valid types are %v", mpi.Type, registrar.monitorNames())
}

m := &Monitor{
name: monitorPlugin.name,
scheduler: scheduler,
jobTasks: []*task{},
pipeline: pipeline,
watchPollTasks: []*task{},
internalsMtx: sync.Mutex{},
config: config,
name: monitorPlugin.name,
scheduler: scheduler,
jobTasks: []*task{},
pipelineConnector: pipelineConnector,
watchPollTasks: []*task{},
internalsMtx: sync.Mutex{},
config: config,
}

jobs, err := monitorPlugin.create(config)
Expand Down
69 changes: 69 additions & 0 deletions heartbeat/monitors/monitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package monitors

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/heartbeat/scheduler"
)

func TestMonitor(t *testing.T) {
serverMonConf := mockPluginConf(t, "@every 1ms", "http://example.net")
reg := mockPluginsReg()
pipelineConnector := &MockPipelineConnector{}

sched := scheduler.New(1)
err := sched.Start()
require.NoError(t, err)
defer sched.Stop()

mon, err := newMonitor(serverMonConf, reg, pipelineConnector, sched, false)
require.NoError(t, err)

mon.Start()
defer mon.Stop()

require.Equal(t, 1, len(pipelineConnector.clients))
pcClient := pipelineConnector.clients[0]

timeout := time.Second
start := time.Now()
success := false
for time.Since(start) < timeout && !success {
count := len(pcClient.Publishes())
if count >= 1 {
success = true
} else {
// Let's yield this goroutine so we don't spin
// This could (possibly?) lock on a single core system otherwise
time.Sleep(time.Microsecond)
}
}

if !success {
t.Fatalf("No publishes detected!")
}

mon.Stop()
assert.Equal(t, true, pcClient.closed)
}
7 changes: 7 additions & 0 deletions heartbeat/monitors/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ func (r *pluginsReg) String() string {
return fmt.Sprintf("globalPluginsReg, monitor: %v",
strings.Join(monitors, ", "))
}
func (r *pluginsReg) monitorNames() []string {
names := make([]string, 0, len(r.monitors))
for k := range r.monitors {
names = append(names, k)
}
return names
}

func (e *pluginBuilder) create(cfg *common.Config) ([]Job, error) {
return e.builder(e.name, cfg)
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/monitors/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (t *task) makeSchedulerTaskFunc() scheduler.TaskFunc {
// Start schedules this task for execution.
func (t *task) Start() {
var err error
t.client, err = t.monitor.pipeline.ConnectWith(beat.ClientConfig{
t.client, err = t.monitor.pipelineConnector.ConnectWith(beat.ClientConfig{
EventMetadata: t.config.EventMetadata,
Processor: t.processors,
})
Expand Down
9 changes: 7 additions & 2 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@ import (
)

type Pipeline interface {
Connect() (Client, error)
ConnectWith(ClientConfig) (Client, error)
PipelineConnector
SetACKHandler(PipelineACKHandler) error
}

// PipelineConnector creates a publishing Client. This is typically backed by a Pipeline.
type PipelineConnector interface {
ConnectWith(ClientConfig) (Client, error)
Connect() (Client, error)
}

// Client holds a connection to the beats publisher pipeline
type Client interface {
Publish(Event)
Expand Down

0 comments on commit 0417f98

Please sign in to comment.