From d2c02aa0f361f515a8a6d8475914e5cb4589c1eb Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Tue, 7 Jul 2020 19:11:18 +0200 Subject: [PATCH] Cherry-pick #19227 to 7.x: Add statestore test helpers and unit tests (#19689) Add statestore test helpers and unit tests This change introduces the libbeat/statestore/storetest package and unit tests to the statestore frontend itself. The storetest package provides helpers for writing tests. For example does it emulate a key value store in memory by storing all k/v-pairs in a map[string]interface{}, that can optionally provided by users. The internal storecompliance test-suite is used to validate the storetest package to be fully compatible with the statestore requirements. The addition of the statestore package is split up into multiple changeset to ease review. The final version of the package can be found [here](https://github.com/urso/beats/tree/fb-input-v2-combined/libbeat/statestore). Once finalized, the libbeat/statestore package contains: - The statestore frontend and interface for use within Beats - Interfaces for the store backend - A common set of tests store backends need to support - a storetest package for testing new features that require a store. The testing helpers use map[string]interface{} that can be initialized or queried after the test run for validation purposes. - The default memlog backend + tests This change introduces the second last item to libbeat: test helpers and additional unit tests. (cherry picked from commit 372c3ae57d1fa1787034c1169bb43f20a7b2d7be) --- libbeat/statestore/mock_test.go | 89 +++++++ libbeat/statestore/registry_test.go | 113 +++++++++ libbeat/statestore/store_test.go | 223 ++++++++++++++++++ libbeat/statestore/storetest/storetest.go | 214 +++++++++++++++++ .../statestore/storetest/storetest_test.go | 56 +++++ 5 files changed, 695 insertions(+) create mode 100644 libbeat/statestore/mock_test.go create mode 100644 libbeat/statestore/registry_test.go create mode 100644 libbeat/statestore/store_test.go create mode 100644 libbeat/statestore/storetest/storetest.go create mode 100644 libbeat/statestore/storetest/storetest_test.go diff --git a/libbeat/statestore/mock_test.go b/libbeat/statestore/mock_test.go new file mode 100644 index 00000000000..69a1d80303c --- /dev/null +++ b/libbeat/statestore/mock_test.go @@ -0,0 +1,89 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package statestore + +import ( + "github.com/stretchr/testify/mock" + + "github.com/elastic/beats/v7/libbeat/statestore/backend" +) + +type mockRegistry struct { + mock.Mock +} + +type mockStore struct { + mock.Mock +} + +func newMockRegistry() *mockRegistry { return &mockRegistry{} } + +func (m *mockRegistry) OnAccess(name string) *mock.Call { return m.On("Access", name) } +func (m *mockRegistry) Access(name string) (backend.Store, error) { + args := m.Called(name) + + var store backend.Store + if ifc := args.Get(0); ifc != nil { + store = ifc.(backend.Store) + } + + return store, args.Error(1) +} + +func (m *mockRegistry) OnClose() *mock.Call { return m.On("Close") } +func (m *mockRegistry) Close() error { + args := m.Called() + return args.Error(0) +} + +func newMockStore() *mockStore { return &mockStore{} } + +func (m *mockStore) OnClose() *mock.Call { return m.On("Close") } +func (m *mockStore) Close() error { + args := m.Called() + return args.Error(0) +} + +func (m *mockStore) OnHas(key string) *mock.Call { return m.On("Has", key) } +func (m *mockStore) Has(key string) (bool, error) { + args := m.Called(key) + return args.Bool(0), args.Error(1) +} + +func (m *mockStore) OnGet(key string) *mock.Call { return m.On("Get", key) } +func (m *mockStore) Get(key string, into interface{}) error { + args := m.Called(key) + return args.Error(0) +} + +func (m *mockStore) OnRemove(key string) *mock.Call { return m.On("Remove", key) } +func (m *mockStore) Remove(key string) error { + args := m.Called(key) + return args.Error(0) +} + +func (m *mockStore) OnSet(key string) *mock.Call { return m.On("Set", key) } +func (m *mockStore) Set(key string, from interface{}) error { + args := m.Called(key) + return args.Error(0) +} + +func (m *mockStore) Each(fn func(string, backend.ValueDecoder) (bool, error)) error { + args := m.Called(fn) + return args.Error(0) +} diff --git a/libbeat/statestore/registry_test.go b/libbeat/statestore/registry_test.go new file mode 100644 index 00000000000..bb87f47c095 --- /dev/null +++ b/libbeat/statestore/registry_test.go @@ -0,0 +1,113 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package statestore + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAccessStore(t *testing.T) { + t.Run("single access", func(t *testing.T) { + mr := newMockRegistry() + ms := newMockStore() + mr.OnClose().Once().Return(nil) + mr.OnAccess("test").Once().Return(ms, nil) + ms.OnClose().Once().Return(nil) + + reg := NewRegistry(mr) + store, _ := reg.Get("test") + assert.NoError(t, store.Close()) + assert.NoError(t, reg.Close()) + + mr.AssertExpectations(t) + ms.AssertExpectations(t) + }) + + t.Run("shared store instance", func(t *testing.T) { + mr := newMockRegistry() + ms := newMockStore() + mr.OnClose().Once().Return(nil) + + // test instance sharing. Store must be opened and closed only once + mr.OnAccess("test").Once().Return(ms, nil) + ms.OnClose().Once().Return(nil) + + reg := NewRegistry(mr) + s1, _ := reg.Get("test") + s2, _ := reg.Get("test") + assert.NoError(t, s1.Close()) + assert.NoError(t, s2.Close()) + assert.NoError(t, reg.Close()) + + mr.AssertExpectations(t) + ms.AssertExpectations(t) + }) + + t.Run("close non-shared store needs open", func(t *testing.T) { + mr := newMockRegistry() + ms := newMockStore() + mr.OnClose().Once().Return(nil) + + // test instance sharing. Store must be opened and closed only once + mr.OnAccess("test").Twice().Return(ms, nil) + ms.OnClose().Twice().Return(nil) + + reg := NewRegistry(mr) + + store, err := reg.Get("test") + assert.NoError(t, err) + assert.NoError(t, store.Close()) + + store, err = reg.Get("test") + assert.NoError(t, err) + assert.NoError(t, store.Close()) + + assert.NoError(t, reg.Close()) + + mr.AssertExpectations(t) + ms.AssertExpectations(t) + }) + + t.Run("separate stores are not shared", func(t *testing.T) { + mr := newMockRegistry() + mr.OnClose().Once().Return(nil) + + ms1 := newMockStore() + ms1.OnClose().Once().Return(nil) + mr.OnAccess("s1").Once().Return(ms1, nil) + + ms2 := newMockStore() + ms2.OnClose().Once().Return(nil) + mr.OnAccess("s2").Once().Return(ms2, nil) + + reg := NewRegistry(mr) + s1, err := reg.Get("s1") + assert.NoError(t, err) + s2, err := reg.Get("s2") + assert.NoError(t, err) + assert.NoError(t, s1.Close()) + assert.NoError(t, s2.Close()) + assert.NoError(t, reg.Close()) + + mr.AssertExpectations(t) + ms1.AssertExpectations(t) + ms2.AssertExpectations(t) + }) +} diff --git a/libbeat/statestore/store_test.go b/libbeat/statestore/store_test.go new file mode 100644 index 00000000000..7a7833b4939 --- /dev/null +++ b/libbeat/statestore/store_test.go @@ -0,0 +1,223 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package statestore + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/statestore/storetest" +) + +func TestStore_Close(t *testing.T) { + t.Run("close succeeds", func(t *testing.T) { + makeClosedTestStore(t) + }) + t.Run("fails if store has been closed", func(t *testing.T) { + assert.Error(t, makeClosedTestStore(t).Close()) + }) +} + +func TestStore_Has(t *testing.T) { + t.Run("fails if store has been closed", func(t *testing.T) { + store := makeClosedTestStore(t) + _, err := store.Has("test") + assertClosed(t, err) + }) + t.Run("error is passed through", func(t *testing.T) { + ms := newMockStore() + ms.OnHas("test").Return(false, errors.New("oops")) + defer ms.AssertExpectations(t) + + store := makeTestMockedStore(t, ms) + defer store.Close() + + _, err := store.Has("test") + assert.Error(t, err) + }) + t.Run("return result from backend", func(t *testing.T) { + data := map[string]interface{}{"known_key": "test"} + store := makeTestStore(t, data) + defer store.Close() + + got, err := store.Has("known_key") + assert.NoError(t, err) + assert.True(t, got) + + got, err = store.Has("unknown_key") + assert.NoError(t, err) + assert.False(t, got) + }) +} + +func TestStore_Get(t *testing.T) { + t.Run("fails if store has been closed", func(t *testing.T) { + store := makeClosedTestStore(t) + var tmp interface{} + assertClosed(t, store.Get("test", &tmp)) + }) + t.Run("error is passed through", func(t *testing.T) { + ms := newMockStore() + defer ms.AssertExpectations(t) + + store := makeTestMockedStore(t, ms) + defer store.Close() + + ms.OnGet("test").Return(errors.New("oops")) + var tmp interface{} + err := store.Get("test", &tmp) + assert.Error(t, err) + }) + t.Run("return result from backend", func(t *testing.T) { + data := map[string]interface{}{"known_key": "test"} + store := makeTestStore(t, data) + defer store.Close() + + var got interface{} + err := store.Get("known_key", &got) + assert.NoError(t, err) + assert.Equal(t, "test", got) + }) +} + +func TestStore_Set(t *testing.T) { + t.Run("fails if store has been closed", func(t *testing.T) { + store := makeClosedTestStore(t) + var tmp interface{} + assertClosed(t, store.Set("test", &tmp)) + }) + t.Run("error is passed through", func(t *testing.T) { + ms := newMockStore() + defer ms.AssertExpectations(t) + + store := makeTestMockedStore(t, ms) + defer store.Close() + + ms.OnSet("test").Return(errors.New("oops")) + err := store.Set("test", nil) + assert.Error(t, err) + }) + t.Run("set key in backend", func(t *testing.T) { + data := map[string]interface{}{} + store := makeTestStore(t, data) + defer store.Close() + + err := store.Set("key", "value") + assert.NoError(t, err) + assert.Equal(t, "value", data["key"]) + }) +} + +func TestStore_Remove(t *testing.T) { + t.Run("fails if store has been closed", func(t *testing.T) { + store := makeClosedTestStore(t) + assertClosed(t, store.Remove("test")) + }) + t.Run("error is passed through", func(t *testing.T) { + ms := newMockStore() + ms.OnRemove("test").Return(errors.New("oops")) + defer ms.AssertExpectations(t) + + store := makeTestMockedStore(t, ms) + defer store.Close() + + assert.Error(t, store.Remove("test")) + }) + t.Run("remove key from backend", func(t *testing.T) { + data := map[string]interface{}{"key": "test"} + store := makeTestStore(t, data) + + err := store.Remove("key") + assert.NoError(t, err) + assert.Equal(t, 0, len(data)) + }) +} + +func TestStore_Each(t *testing.T) { + t.Run("fails if store has been closed", func(t *testing.T) { + store := makeClosedTestStore(t) + assertClosed(t, store.Each(func(string, ValueDecoder) (bool, error) { + return true, nil + })) + }) + t.Run("correctly iterate pairs", func(t *testing.T) { + data := map[string]interface{}{ + "a": map[string]interface{}{"field": "hello"}, + "b": map[string]interface{}{"field": "test"}, + } + store := makeTestStore(t, data) + defer store.Close() + + got := map[string]interface{}{} + err := store.Each(func(key string, dec ValueDecoder) (bool, error) { + var tmp interface{} + if err := dec.Decode(&tmp); err != nil { + t.Fatalf("failed to read value from store: %v", err) + } + got[key] = tmp + return true, nil + }) + + assert.NoError(t, err) + assert.Equal(t, data, got) + }) +} + +func makeTestStore(t *testing.T, data map[string]interface{}) *Store { + memstore := &storetest.MapStore{Table: data} + reg := NewRegistry(&storetest.MemoryStore{ + Stores: map[string]*storetest.MapStore{ + "test": memstore, + }, + }) + store, err := reg.Get("test") + if err != nil { + t.Fatalf("Failed to create test store: %v", err) + } + return store +} + +func makeTestMockedStore(t *testing.T, ms *mockStore) *Store { + mr := newMockRegistry() + mr.OnAccess("test").Once().Return(ms, nil) + + reg := NewRegistry(mr) + s, err := reg.Get("test") + require.NoError(t, err) + + ms.OnClose().Return(nil) + return s +} + +func makeClosedTestStore(t *testing.T) *Store { + s := makeTestMockedStore(t, newMockStore()) + require.NoError(t, s.Close()) + return s +} + +func assertClosed(t *testing.T, err error) { + if err == nil { + t.Fatal("expected error") + } + if !IsClosed(err) { + t.Fatalf("The error does not seem to indicate a failure because of a closed store. Error: %v", err) + } +} diff --git a/libbeat/statestore/storetest/storetest.go b/libbeat/statestore/storetest/storetest.go new file mode 100644 index 00000000000..cd6d07dac0d --- /dev/null +++ b/libbeat/statestore/storetest/storetest.go @@ -0,0 +1,214 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package storetest provides helpers for testing functionality that requires a statestore. +package storetest + +import ( + "errors" + "sync" + + "github.com/elastic/beats/v7/libbeat/common/transform/typeconv" + "github.com/elastic/beats/v7/libbeat/statestore/backend" +) + +// MemoryStore provides a dummy backend store that holds all access stores and +// data in memory. The Stores field is accessible for introspection or custom +// initialization. Stores should not be modified while a test is active. +// For validation one can use the statestore API or introspect the tables directly. +// +// The zero value is MemoryStore is a valid store instance. The Stores field +// will be initialized lazily if it has not been setup upfront. +// +// Example: Create store for testing: +// store := statestore.NewRegistry(storetest.NewMemoryStoreBackend()) +type MemoryStore struct { + Stores map[string]*MapStore + mu sync.Mutex +} + +// MapStore implements a single in memory storage. The MapStore holds all +// key-value pairs in a map[string]interface{}. +type MapStore struct { + mu sync.RWMutex + closed bool + Table map[string]interface{} +} + +type valueUnpacker struct { + from interface{} +} + +// CreateValueDecoder creates a backend.ValueDecoder that can be used to unpack +// an value into a custom go type. +func CreateValueDecoder(v interface{}) backend.ValueDecoder { + return valueUnpacker{v} +} + +var errMapStoreClosed = errors.New("store closed") +var errUnknownKey = errors.New("unknown key") + +// NewMemoryStoreBackend creates a new backend.Registry instance that can be +// used with the statestore. +func NewMemoryStoreBackend() *MemoryStore { + return &MemoryStore{} +} + +func (m *MemoryStore) init() { + if m.Stores == nil { + m.Stores = map[string]*MapStore{} + } +} + +// Access returns a MapStore that for the given name. A new store is created +// and registered in the Stores table, if the store name is new to MemoryStore. +func (m *MemoryStore) Access(name string) (backend.Store, error) { + m.mu.Lock() + defer m.mu.Unlock() + m.init() + + store, exists := m.Stores[name] + if !exists { + store = &MapStore{} + m.Stores[name] = store + } else { + store.Reopen() + } + return store, nil +} + +// Close closes the store. +func (m *MemoryStore) Close() error { return nil } + +func (s *MapStore) init() { + if s.Table == nil { + s.Table = map[string]interface{}{} + } +} + +// Reopen marks the MapStore as open in case it has been closed already. All +// key-value pairs and store operations are accessible after reopening the +// store. +func (s *MapStore) Reopen() { + s.mu.Lock() + defer s.mu.Unlock() + s.closed = false +} + +// Close marks the store as closed. The Store API calls like Has, Get, Set, and +// Remove will fail until the store is reopenned. +func (s *MapStore) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + s.closed = true + return nil +} + +// IsClosed returns true if the store is marked as closed. +func (s *MapStore) IsClosed() bool { + s.mu.RLock() + defer s.mu.RUnlock() + return s.closed +} + +// Has checks if the key value pair is known to the store. +// It returns an error if the store is marked as closed. +func (s *MapStore) Has(key string) (bool, error) { + s.mu.RLock() + defer s.mu.RUnlock() + if s.closed { + return false, errMapStoreClosed + } + + s.init() + _, exists := s.Table[key] + return exists, nil +} + +// Get returns a key value pair from the store. An error is returned if the +// store has been closed, the key is unknown, or an decoding error occured. +func (s *MapStore) Get(key string, into interface{}) error { + s.mu.RLock() + defer s.mu.RUnlock() + if s.closed { + return errMapStoreClosed + } + + s.init() + val, exists := s.Table[key] + if !exists { + return errUnknownKey + } + return typeconv.Convert(into, val) +} + +// Set inserts or overwrites a key-value pair. +// An error is returned if the store is marked as closed or the value being +// passed in can not be encoded. +func (s *MapStore) Set(key string, from interface{}) error { + s.mu.Lock() + defer s.mu.Unlock() + if s.closed { + return errMapStoreClosed + } + + s.init() + var tmp interface{} + if err := typeconv.Convert(&tmp, from); err != nil { + return err + } + s.Table[key] = tmp + return nil +} + +// Remove removes a key value pair from the store. +// An error is returned if the store is marked as closed. +func (s *MapStore) Remove(key string) error { + s.mu.Lock() + defer s.mu.Unlock() + if s.closed { + return errMapStoreClosed + } + + s.init() + delete(s.Table, key) + return nil +} + +// Each iterates all key value pairs in the store calling fn. +// The iteration stops if fn returns false or an error. +// Each returns an error if the store is closed, or fn returns an error. +func (s *MapStore) Each(fn func(string, backend.ValueDecoder) (bool, error)) error { + s.mu.RLock() + defer s.mu.RUnlock() + if s.closed { + return errMapStoreClosed + } + + s.init() + for k, v := range s.Table { + cont, err := fn(k, CreateValueDecoder(v)) + if !cont || err != nil { + return err + } + } + return nil +} + +func (d valueUnpacker) Decode(to interface{}) error { + return typeconv.Convert(to, d.from) +} diff --git a/libbeat/statestore/storetest/storetest_test.go b/libbeat/statestore/storetest/storetest_test.go new file mode 100644 index 00000000000..58d4aae0b8f --- /dev/null +++ b/libbeat/statestore/storetest/storetest_test.go @@ -0,0 +1,56 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package storetest + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/statestore/backend" + "github.com/elastic/beats/v7/libbeat/statestore/internal/storecompliance" +) + +func init() { + logp.DevelopmentSetup() +} + +func TestCompliance(t *testing.T) { + storecompliance.TestBackendCompliance(t, func(testPath string) (backend.Registry, error) { + return NewMemoryStoreBackend(), nil + }) +} + +func TestStore_IsClosed(t *testing.T) { + t.Run("false by default", func(t *testing.T) { + store := &MapStore{} + assert.False(t, store.IsClosed()) + }) + t.Run("true after close", func(t *testing.T) { + store := &MapStore{} + store.Close() + assert.True(t, store.IsClosed()) + }) + t.Run("true after reopen", func(t *testing.T) { + store := &MapStore{} + store.Close() + store.Reopen() + assert.False(t, store.IsClosed()) + }) +}