diff --git a/libbeat/statestore/mock_test.go b/libbeat/statestore/mock_test.go new file mode 100644 index 00000000000..69a1d80303c --- /dev/null +++ b/libbeat/statestore/mock_test.go @@ -0,0 +1,89 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package statestore + +import ( + "github.com/stretchr/testify/mock" + + "github.com/elastic/beats/v7/libbeat/statestore/backend" +) + +type mockRegistry struct { + mock.Mock +} + +type mockStore struct { + mock.Mock +} + +func newMockRegistry() *mockRegistry { return &mockRegistry{} } + +func (m *mockRegistry) OnAccess(name string) *mock.Call { return m.On("Access", name) } +func (m *mockRegistry) Access(name string) (backend.Store, error) { + args := m.Called(name) + + var store backend.Store + if ifc := args.Get(0); ifc != nil { + store = ifc.(backend.Store) + } + + return store, args.Error(1) +} + +func (m *mockRegistry) OnClose() *mock.Call { return m.On("Close") } +func (m *mockRegistry) Close() error { + args := m.Called() + return args.Error(0) +} + +func newMockStore() *mockStore { return &mockStore{} } + +func (m *mockStore) OnClose() *mock.Call { return m.On("Close") } +func (m *mockStore) Close() error { + args := m.Called() + return args.Error(0) +} + +func (m *mockStore) OnHas(key string) *mock.Call { return m.On("Has", key) } +func (m *mockStore) Has(key string) (bool, error) { + args := m.Called(key) + return args.Bool(0), args.Error(1) +} + +func (m *mockStore) OnGet(key string) *mock.Call { return m.On("Get", key) } +func (m *mockStore) Get(key string, into interface{}) error { + args := m.Called(key) + return args.Error(0) +} + +func (m *mockStore) OnRemove(key string) *mock.Call { return m.On("Remove", key) } +func (m *mockStore) Remove(key string) error { + args := m.Called(key) + return args.Error(0) +} + +func (m *mockStore) OnSet(key string) *mock.Call { return m.On("Set", key) } +func (m *mockStore) Set(key string, from interface{}) error { + args := m.Called(key) + return args.Error(0) +} + +func (m *mockStore) Each(fn func(string, backend.ValueDecoder) (bool, error)) error { + args := m.Called(fn) + return args.Error(0) +} diff --git a/libbeat/statestore/registry_test.go b/libbeat/statestore/registry_test.go new file mode 100644 index 00000000000..bb87f47c095 --- /dev/null +++ b/libbeat/statestore/registry_test.go @@ -0,0 +1,113 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package statestore + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAccessStore(t *testing.T) { + t.Run("single access", func(t *testing.T) { + mr := newMockRegistry() + ms := newMockStore() + mr.OnClose().Once().Return(nil) + mr.OnAccess("test").Once().Return(ms, nil) + ms.OnClose().Once().Return(nil) + + reg := NewRegistry(mr) + store, _ := reg.Get("test") + assert.NoError(t, store.Close()) + assert.NoError(t, reg.Close()) + + mr.AssertExpectations(t) + ms.AssertExpectations(t) + }) + + t.Run("shared store instance", func(t *testing.T) { + mr := newMockRegistry() + ms := newMockStore() + mr.OnClose().Once().Return(nil) + + // test instance sharing. Store must be opened and closed only once + mr.OnAccess("test").Once().Return(ms, nil) + ms.OnClose().Once().Return(nil) + + reg := NewRegistry(mr) + s1, _ := reg.Get("test") + s2, _ := reg.Get("test") + assert.NoError(t, s1.Close()) + assert.NoError(t, s2.Close()) + assert.NoError(t, reg.Close()) + + mr.AssertExpectations(t) + ms.AssertExpectations(t) + }) + + t.Run("close non-shared store needs open", func(t *testing.T) { + mr := newMockRegistry() + ms := newMockStore() + mr.OnClose().Once().Return(nil) + + // test instance sharing. Store must be opened and closed only once + mr.OnAccess("test").Twice().Return(ms, nil) + ms.OnClose().Twice().Return(nil) + + reg := NewRegistry(mr) + + store, err := reg.Get("test") + assert.NoError(t, err) + assert.NoError(t, store.Close()) + + store, err = reg.Get("test") + assert.NoError(t, err) + assert.NoError(t, store.Close()) + + assert.NoError(t, reg.Close()) + + mr.AssertExpectations(t) + ms.AssertExpectations(t) + }) + + t.Run("separate stores are not shared", func(t *testing.T) { + mr := newMockRegistry() + mr.OnClose().Once().Return(nil) + + ms1 := newMockStore() + ms1.OnClose().Once().Return(nil) + mr.OnAccess("s1").Once().Return(ms1, nil) + + ms2 := newMockStore() + ms2.OnClose().Once().Return(nil) + mr.OnAccess("s2").Once().Return(ms2, nil) + + reg := NewRegistry(mr) + s1, err := reg.Get("s1") + assert.NoError(t, err) + s2, err := reg.Get("s2") + assert.NoError(t, err) + assert.NoError(t, s1.Close()) + assert.NoError(t, s2.Close()) + assert.NoError(t, reg.Close()) + + mr.AssertExpectations(t) + ms1.AssertExpectations(t) + ms2.AssertExpectations(t) + }) +} diff --git a/libbeat/statestore/store_test.go b/libbeat/statestore/store_test.go new file mode 100644 index 00000000000..7a7833b4939 --- /dev/null +++ b/libbeat/statestore/store_test.go @@ -0,0 +1,223 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package statestore + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/statestore/storetest" +) + +func TestStore_Close(t *testing.T) { + t.Run("close succeeds", func(t *testing.T) { + makeClosedTestStore(t) + }) + t.Run("fails if store has been closed", func(t *testing.T) { + assert.Error(t, makeClosedTestStore(t).Close()) + }) +} + +func TestStore_Has(t *testing.T) { + t.Run("fails if store has been closed", func(t *testing.T) { + store := makeClosedTestStore(t) + _, err := store.Has("test") + assertClosed(t, err) + }) + t.Run("error is passed through", func(t *testing.T) { + ms := newMockStore() + ms.OnHas("test").Return(false, errors.New("oops")) + defer ms.AssertExpectations(t) + + store := makeTestMockedStore(t, ms) + defer store.Close() + + _, err := store.Has("test") + assert.Error(t, err) + }) + t.Run("return result from backend", func(t *testing.T) { + data := map[string]interface{}{"known_key": "test"} + store := makeTestStore(t, data) + defer store.Close() + + got, err := store.Has("known_key") + assert.NoError(t, err) + assert.True(t, got) + + got, err = store.Has("unknown_key") + assert.NoError(t, err) + assert.False(t, got) + }) +} + +func TestStore_Get(t *testing.T) { + t.Run("fails if store has been closed", func(t *testing.T) { + store := makeClosedTestStore(t) + var tmp interface{} + assertClosed(t, store.Get("test", &tmp)) + }) + t.Run("error is passed through", func(t *testing.T) { + ms := newMockStore() + defer ms.AssertExpectations(t) + + store := makeTestMockedStore(t, ms) + defer store.Close() + + ms.OnGet("test").Return(errors.New("oops")) + var tmp interface{} + err := store.Get("test", &tmp) + assert.Error(t, err) + }) + t.Run("return result from backend", func(t *testing.T) { + data := map[string]interface{}{"known_key": "test"} + store := makeTestStore(t, data) + defer store.Close() + + var got interface{} + err := store.Get("known_key", &got) + assert.NoError(t, err) + assert.Equal(t, "test", got) + }) +} + +func TestStore_Set(t *testing.T) { + t.Run("fails if store has been closed", func(t *testing.T) { + store := makeClosedTestStore(t) + var tmp interface{} + assertClosed(t, store.Set("test", &tmp)) + }) + t.Run("error is passed through", func(t *testing.T) { + ms := newMockStore() + defer ms.AssertExpectations(t) + + store := makeTestMockedStore(t, ms) + defer store.Close() + + ms.OnSet("test").Return(errors.New("oops")) + err := store.Set("test", nil) + assert.Error(t, err) + }) + t.Run("set key in backend", func(t *testing.T) { + data := map[string]interface{}{} + store := makeTestStore(t, data) + defer store.Close() + + err := store.Set("key", "value") + assert.NoError(t, err) + assert.Equal(t, "value", data["key"]) + }) +} + +func TestStore_Remove(t *testing.T) { + t.Run("fails if store has been closed", func(t *testing.T) { + store := makeClosedTestStore(t) + assertClosed(t, store.Remove("test")) + }) + t.Run("error is passed through", func(t *testing.T) { + ms := newMockStore() + ms.OnRemove("test").Return(errors.New("oops")) + defer ms.AssertExpectations(t) + + store := makeTestMockedStore(t, ms) + defer store.Close() + + assert.Error(t, store.Remove("test")) + }) + t.Run("remove key from backend", func(t *testing.T) { + data := map[string]interface{}{"key": "test"} + store := makeTestStore(t, data) + + err := store.Remove("key") + assert.NoError(t, err) + assert.Equal(t, 0, len(data)) + }) +} + +func TestStore_Each(t *testing.T) { + t.Run("fails if store has been closed", func(t *testing.T) { + store := makeClosedTestStore(t) + assertClosed(t, store.Each(func(string, ValueDecoder) (bool, error) { + return true, nil + })) + }) + t.Run("correctly iterate pairs", func(t *testing.T) { + data := map[string]interface{}{ + "a": map[string]interface{}{"field": "hello"}, + "b": map[string]interface{}{"field": "test"}, + } + store := makeTestStore(t, data) + defer store.Close() + + got := map[string]interface{}{} + err := store.Each(func(key string, dec ValueDecoder) (bool, error) { + var tmp interface{} + if err := dec.Decode(&tmp); err != nil { + t.Fatalf("failed to read value from store: %v", err) + } + got[key] = tmp + return true, nil + }) + + assert.NoError(t, err) + assert.Equal(t, data, got) + }) +} + +func makeTestStore(t *testing.T, data map[string]interface{}) *Store { + memstore := &storetest.MapStore{Table: data} + reg := NewRegistry(&storetest.MemoryStore{ + Stores: map[string]*storetest.MapStore{ + "test": memstore, + }, + }) + store, err := reg.Get("test") + if err != nil { + t.Fatalf("Failed to create test store: %v", err) + } + return store +} + +func makeTestMockedStore(t *testing.T, ms *mockStore) *Store { + mr := newMockRegistry() + mr.OnAccess("test").Once().Return(ms, nil) + + reg := NewRegistry(mr) + s, err := reg.Get("test") + require.NoError(t, err) + + ms.OnClose().Return(nil) + return s +} + +func makeClosedTestStore(t *testing.T) *Store { + s := makeTestMockedStore(t, newMockStore()) + require.NoError(t, s.Close()) + return s +} + +func assertClosed(t *testing.T, err error) { + if err == nil { + t.Fatal("expected error") + } + if !IsClosed(err) { + t.Fatalf("The error does not seem to indicate a failure because of a closed store. Error: %v", err) + } +} diff --git a/libbeat/statestore/storetest/storetest.go b/libbeat/statestore/storetest/storetest.go new file mode 100644 index 00000000000..cd6d07dac0d --- /dev/null +++ b/libbeat/statestore/storetest/storetest.go @@ -0,0 +1,214 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package storetest provides helpers for testing functionality that requires a statestore. +package storetest + +import ( + "errors" + "sync" + + "github.com/elastic/beats/v7/libbeat/common/transform/typeconv" + "github.com/elastic/beats/v7/libbeat/statestore/backend" +) + +// MemoryStore provides a dummy backend store that holds all access stores and +// data in memory. The Stores field is accessible for introspection or custom +// initialization. Stores should not be modified while a test is active. +// For validation one can use the statestore API or introspect the tables directly. +// +// The zero value is MemoryStore is a valid store instance. The Stores field +// will be initialized lazily if it has not been setup upfront. +// +// Example: Create store for testing: +// store := statestore.NewRegistry(storetest.NewMemoryStoreBackend()) +type MemoryStore struct { + Stores map[string]*MapStore + mu sync.Mutex +} + +// MapStore implements a single in memory storage. The MapStore holds all +// key-value pairs in a map[string]interface{}. +type MapStore struct { + mu sync.RWMutex + closed bool + Table map[string]interface{} +} + +type valueUnpacker struct { + from interface{} +} + +// CreateValueDecoder creates a backend.ValueDecoder that can be used to unpack +// an value into a custom go type. +func CreateValueDecoder(v interface{}) backend.ValueDecoder { + return valueUnpacker{v} +} + +var errMapStoreClosed = errors.New("store closed") +var errUnknownKey = errors.New("unknown key") + +// NewMemoryStoreBackend creates a new backend.Registry instance that can be +// used with the statestore. +func NewMemoryStoreBackend() *MemoryStore { + return &MemoryStore{} +} + +func (m *MemoryStore) init() { + if m.Stores == nil { + m.Stores = map[string]*MapStore{} + } +} + +// Access returns a MapStore that for the given name. A new store is created +// and registered in the Stores table, if the store name is new to MemoryStore. +func (m *MemoryStore) Access(name string) (backend.Store, error) { + m.mu.Lock() + defer m.mu.Unlock() + m.init() + + store, exists := m.Stores[name] + if !exists { + store = &MapStore{} + m.Stores[name] = store + } else { + store.Reopen() + } + return store, nil +} + +// Close closes the store. +func (m *MemoryStore) Close() error { return nil } + +func (s *MapStore) init() { + if s.Table == nil { + s.Table = map[string]interface{}{} + } +} + +// Reopen marks the MapStore as open in case it has been closed already. All +// key-value pairs and store operations are accessible after reopening the +// store. +func (s *MapStore) Reopen() { + s.mu.Lock() + defer s.mu.Unlock() + s.closed = false +} + +// Close marks the store as closed. The Store API calls like Has, Get, Set, and +// Remove will fail until the store is reopenned. +func (s *MapStore) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + s.closed = true + return nil +} + +// IsClosed returns true if the store is marked as closed. +func (s *MapStore) IsClosed() bool { + s.mu.RLock() + defer s.mu.RUnlock() + return s.closed +} + +// Has checks if the key value pair is known to the store. +// It returns an error if the store is marked as closed. +func (s *MapStore) Has(key string) (bool, error) { + s.mu.RLock() + defer s.mu.RUnlock() + if s.closed { + return false, errMapStoreClosed + } + + s.init() + _, exists := s.Table[key] + return exists, nil +} + +// Get returns a key value pair from the store. An error is returned if the +// store has been closed, the key is unknown, or an decoding error occured. +func (s *MapStore) Get(key string, into interface{}) error { + s.mu.RLock() + defer s.mu.RUnlock() + if s.closed { + return errMapStoreClosed + } + + s.init() + val, exists := s.Table[key] + if !exists { + return errUnknownKey + } + return typeconv.Convert(into, val) +} + +// Set inserts or overwrites a key-value pair. +// An error is returned if the store is marked as closed or the value being +// passed in can not be encoded. +func (s *MapStore) Set(key string, from interface{}) error { + s.mu.Lock() + defer s.mu.Unlock() + if s.closed { + return errMapStoreClosed + } + + s.init() + var tmp interface{} + if err := typeconv.Convert(&tmp, from); err != nil { + return err + } + s.Table[key] = tmp + return nil +} + +// Remove removes a key value pair from the store. +// An error is returned if the store is marked as closed. +func (s *MapStore) Remove(key string) error { + s.mu.Lock() + defer s.mu.Unlock() + if s.closed { + return errMapStoreClosed + } + + s.init() + delete(s.Table, key) + return nil +} + +// Each iterates all key value pairs in the store calling fn. +// The iteration stops if fn returns false or an error. +// Each returns an error if the store is closed, or fn returns an error. +func (s *MapStore) Each(fn func(string, backend.ValueDecoder) (bool, error)) error { + s.mu.RLock() + defer s.mu.RUnlock() + if s.closed { + return errMapStoreClosed + } + + s.init() + for k, v := range s.Table { + cont, err := fn(k, CreateValueDecoder(v)) + if !cont || err != nil { + return err + } + } + return nil +} + +func (d valueUnpacker) Decode(to interface{}) error { + return typeconv.Convert(to, d.from) +} diff --git a/libbeat/statestore/storetest/storetest_test.go b/libbeat/statestore/storetest/storetest_test.go new file mode 100644 index 00000000000..58d4aae0b8f --- /dev/null +++ b/libbeat/statestore/storetest/storetest_test.go @@ -0,0 +1,56 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package storetest + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/statestore/backend" + "github.com/elastic/beats/v7/libbeat/statestore/internal/storecompliance" +) + +func init() { + logp.DevelopmentSetup() +} + +func TestCompliance(t *testing.T) { + storecompliance.TestBackendCompliance(t, func(testPath string) (backend.Registry, error) { + return NewMemoryStoreBackend(), nil + }) +} + +func TestStore_IsClosed(t *testing.T) { + t.Run("false by default", func(t *testing.T) { + store := &MapStore{} + assert.False(t, store.IsClosed()) + }) + t.Run("true after close", func(t *testing.T) { + store := &MapStore{} + store.Close() + assert.True(t, store.IsClosed()) + }) + t.Run("true after reopen", func(t *testing.T) { + store := &MapStore{} + store.Close() + store.Reopen() + assert.False(t, store.IsClosed()) + }) +}