From ae2569fdf9844d964bc168057ee7bfb8cb00a391 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Thu, 4 Oct 2018 12:03:38 -0400 Subject: [PATCH] Additional Heartbeat Reload Tests (#8228) (#8548) * Adds a mocked test of the Monitor type * Minimizes the required mockable area by introducing a PipelineConnector type that is easier to mock than a Pipeline. * Introduces a number of useful mocks for future tests potentially. (cherry picked from commit 0417f98558ee4db6e8f041666992e805ed1f6521) --- heartbeat/monitors/mocks_test.go | 124 +++++++++++++++++++++++++++++ heartbeat/monitors/monitor.go | 20 ++--- heartbeat/monitors/monitor_test.go | 69 ++++++++++++++++ heartbeat/monitors/plugin.go | 7 ++ heartbeat/monitors/task.go | 2 +- libbeat/beat/pipeline.go | 9 ++- 6 files changed, 218 insertions(+), 13 deletions(-) create mode 100644 heartbeat/monitors/mocks_test.go create mode 100644 heartbeat/monitors/monitor_test.go diff --git a/heartbeat/monitors/mocks_test.go b/heartbeat/monitors/mocks_test.go new file mode 100644 index 00000000000..04fedc32cd8 --- /dev/null +++ b/heartbeat/monitors/mocks_test.go @@ -0,0 +1,124 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package monitors + +import ( + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" +) + +type MockBeatClient struct { + publishes []beat.Event + closed bool + mtx sync.Mutex +} + +func (c *MockBeatClient) Publish(e beat.Event) { + c.PublishAll([]beat.Event{e}) +} + +func (c *MockBeatClient) PublishAll(events []beat.Event) { + c.mtx.Lock() + defer c.mtx.Unlock() + + for _, e := range events { + c.publishes = append(c.publishes, e) + } +} + +func (c *MockBeatClient) Close() error { + c.mtx.Lock() + defer c.mtx.Unlock() + + if c.closed { + return fmt.Errorf("mock client already closed") + } + + c.closed = true + return nil +} + +func (c *MockBeatClient) Publishes() []beat.Event { + c.mtx.Lock() + defer c.mtx.Unlock() + + dst := make([]beat.Event, len(c.publishes)) + copy(dst, c.publishes) + return dst +} + +type MockPipelineConnector struct { + clients []*MockBeatClient + mtx sync.Mutex +} + +func (pc *MockPipelineConnector) Connect() (beat.Client, error) { + return pc.ConnectWith(beat.ClientConfig{}) +} + +func (pc *MockPipelineConnector) ConnectWith(beat.ClientConfig) (beat.Client, error) { + pc.mtx.Lock() + defer pc.mtx.Unlock() + + c := &MockBeatClient{} + + pc.clients = append(pc.clients, c) + + return c, nil +} + +func createMockJob(name string, cfg *common.Config) ([]Job, error) { + j := MakeSimpleJob(JobSettings{}, func() (common.MapStr, error) { + return common.MapStr{ + "foo": "bar", + }, nil + }) + + return []Job{j}, nil +} + +func mockPluginBuilder() pluginBuilder { + return pluginBuilder{"test", ActiveMonitor, func(s string, config *common.Config) ([]Job, error) { + c := common.Config{} + j, err := createMockJob("test", &c) + return j, err + }} +} + +func mockPluginsReg() *pluginsReg { + reg := newPluginsReg() + reg.add(mockPluginBuilder()) + return reg +} + +func mockPluginConf(t *testing.T, schedule string, url string) *common.Config { + conf, err := common.NewConfigFrom(map[string]interface{}{ + "type": "test", + "urls": []string{url}, + "schedule": schedule, + }) + require.NoError(t, err) + + return conf +} diff --git a/heartbeat/monitors/monitor.go b/heartbeat/monitors/monitor.go index b371e48cf07..d363e156096 100644 --- a/heartbeat/monitors/monitor.go +++ b/heartbeat/monitors/monitor.go @@ -50,7 +50,7 @@ type Monitor struct { watchPollTasks []*task watch watcher.Watch - pipeline beat.Pipeline + pipelineConnector beat.PipelineConnector } // String prints a description of the monitor in a threadsafe way. It is important that this use threadsafe @@ -70,7 +70,7 @@ var ErrWatchesDisabled = errors.New("watch poll files are only allowed in heartb func newMonitor( config *common.Config, registrar *pluginsReg, - pipeline beat.Pipeline, + pipelineConnector beat.PipelineConnector, scheduler *scheduler.Scheduler, allowWatches bool, ) (*Monitor, error) { @@ -84,17 +84,17 @@ func newMonitor( monitorPlugin, found := registrar.get(mpi.Type) if !found { - return nil, fmt.Errorf("monitor type %v does not exist", mpi.Type) + return nil, fmt.Errorf("monitor type %v does not exist, valid types are %v", mpi.Type, registrar.monitorNames()) } m := &Monitor{ - name: monitorPlugin.name, - scheduler: scheduler, - jobTasks: []*task{}, - pipeline: pipeline, - watchPollTasks: []*task{}, - internalsMtx: sync.Mutex{}, - config: config, + name: monitorPlugin.name, + scheduler: scheduler, + jobTasks: []*task{}, + pipelineConnector: pipelineConnector, + watchPollTasks: []*task{}, + internalsMtx: sync.Mutex{}, + config: config, } jobs, err := monitorPlugin.create(config) diff --git a/heartbeat/monitors/monitor_test.go b/heartbeat/monitors/monitor_test.go new file mode 100644 index 00000000000..332137020a7 --- /dev/null +++ b/heartbeat/monitors/monitor_test.go @@ -0,0 +1,69 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package monitors + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/heartbeat/scheduler" +) + +func TestMonitor(t *testing.T) { + serverMonConf := mockPluginConf(t, "@every 1ms", "http://example.net") + reg := mockPluginsReg() + pipelineConnector := &MockPipelineConnector{} + + sched := scheduler.New(1) + err := sched.Start() + require.NoError(t, err) + defer sched.Stop() + + mon, err := newMonitor(serverMonConf, reg, pipelineConnector, sched, false) + require.NoError(t, err) + + mon.Start() + defer mon.Stop() + + require.Equal(t, 1, len(pipelineConnector.clients)) + pcClient := pipelineConnector.clients[0] + + timeout := time.Second + start := time.Now() + success := false + for time.Since(start) < timeout && !success { + count := len(pcClient.Publishes()) + if count >= 1 { + success = true + } else { + // Let's yield this goroutine so we don't spin + // This could (possibly?) lock on a single core system otherwise + time.Sleep(time.Microsecond) + } + } + + if !success { + t.Fatalf("No publishes detected!") + } + + mon.Stop() + assert.Equal(t, true, pcClient.closed) +} diff --git a/heartbeat/monitors/plugin.go b/heartbeat/monitors/plugin.go index 8a3286287d0..688732fd47c 100644 --- a/heartbeat/monitors/plugin.go +++ b/heartbeat/monitors/plugin.go @@ -121,6 +121,13 @@ func (r *pluginsReg) String() string { return fmt.Sprintf("globalPluginsReg, monitor: %v", strings.Join(monitors, ", ")) } +func (r *pluginsReg) monitorNames() []string { + names := make([]string, 0, len(r.monitors)) + for k := range r.monitors { + names = append(names, k) + } + return names +} func (e *pluginBuilder) create(cfg *common.Config) ([]Job, error) { return e.builder(e.name, cfg) diff --git a/heartbeat/monitors/task.go b/heartbeat/monitors/task.go index 8f32db39b07..cd81673436b 100644 --- a/heartbeat/monitors/task.go +++ b/heartbeat/monitors/task.go @@ -123,7 +123,7 @@ func (t *task) makeSchedulerTaskFunc() scheduler.TaskFunc { // Start schedules this task for execution. func (t *task) Start() { var err error - t.client, err = t.monitor.pipeline.ConnectWith(beat.ClientConfig{ + t.client, err = t.monitor.pipelineConnector.ConnectWith(beat.ClientConfig{ EventMetadata: t.config.EventMetadata, Processor: t.processors, }) diff --git a/libbeat/beat/pipeline.go b/libbeat/beat/pipeline.go index 823ae8de20b..444daae7efa 100644 --- a/libbeat/beat/pipeline.go +++ b/libbeat/beat/pipeline.go @@ -24,11 +24,16 @@ import ( ) type Pipeline interface { - Connect() (Client, error) - ConnectWith(ClientConfig) (Client, error) + PipelineConnector SetACKHandler(PipelineACKHandler) error } +// PipelineConnector creates a publishing Client. This is typically backed by a Pipeline. +type PipelineConnector interface { + ConnectWith(ClientConfig) (Client, error) + Connect() (Client, error) +} + // Client holds a connection to the beats publisher pipeline type Client interface { Publish(Event)