diff --git a/CHANGELOG.md b/CHANGELOG.md index a5bbc8d3b6..810ca21caf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ Versioning](http://semver.org/spec/v2.0.0.html). ### Fixed - Fixed the agent `--annotations` and `--labels` flags. +### Added +- Added the message bus to Tessend in order to track Tessen configuration changes from the API. +- Added a performance optimizing `Count()` function to the generic store. + ## [5.5.1] - 2019-04-15 ### Changed diff --git a/backend/apid/actions/tessen.go b/backend/apid/actions/tessen.go index 5de126e5fc..49a6ef1835 100644 --- a/backend/apid/actions/tessen.go +++ b/backend/apid/actions/tessen.go @@ -2,6 +2,7 @@ package actions import ( corev2 "github.com/sensu/sensu-go/api/core/v2" + "github.com/sensu/sensu-go/backend/messaging" "github.com/sensu/sensu-go/backend/store" "golang.org/x/net/context" ) @@ -9,12 +10,14 @@ import ( // TessenController exposes actions which a viewer can perform type TessenController struct { store store.TessenConfigStore + bus messaging.MessageBus } // NewTessenController returns a new TessenController -func NewTessenController(store store.TessenConfigStore) TessenController { +func NewTessenController(store store.TessenConfigStore, bus messaging.MessageBus) TessenController { return TessenController{ store: store, + bus: bus, } } @@ -29,6 +32,11 @@ func (c TessenController) CreateOrUpdate(ctx context.Context, config *corev2.Tes } } + // Publish to Tessend + if err := c.bus.Publish(messaging.TopicTessen, config); err != nil { + return NewError(InternalErr, err) + } + return nil } diff --git a/backend/apid/actions/tessen_test.go b/backend/apid/actions/tessen_test.go index b9198ff2e6..5387085661 100644 --- a/backend/apid/actions/tessen_test.go +++ b/backend/apid/actions/tessen_test.go @@ -7,6 +7,7 @@ import ( corev2 "github.com/sensu/sensu-go/api/core/v2" "github.com/sensu/sensu-go/backend/store" + "github.com/sensu/sensu-go/testing/mockbus" "github.com/sensu/sensu-go/testing/mockstore" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -16,10 +17,12 @@ func TestNewTessenController(t *testing.T) { assert := assert.New(t) store := &mockstore.MockStore{} - actions := NewTessenController(store) + bus := &mockbus.MockBus{} + actions := NewTessenController(store, bus) assert.NotNil(actions) assert.Equal(store, actions.store) + assert.Equal(bus, actions.bus) } func TestCreateOrUpdateTessenConfig(t *testing.T) { @@ -28,6 +31,7 @@ func TestCreateOrUpdateTessenConfig(t *testing.T) { ctx context.Context argument *corev2.TessenConfig storeErr error + busErr error expectedErr bool expectedErrCode ErrCode }{ @@ -52,11 +56,20 @@ func TestCreateOrUpdateTessenConfig(t *testing.T) { expectedErr: true, expectedErrCode: InternalErr, }, + { + name: "Bus Error", + ctx: context.Background(), + argument: corev2.DefaultTessenConfig(), + busErr: errors.New("the bus has a flat tire"), + expectedErr: true, + expectedErrCode: InternalErr, + }, } for _, tc := range testCases { store := &mockstore.MockStore{} - actions := NewTessenController(store) + bus := &mockbus.MockBus{} + actions := NewTessenController(store, bus) t.Run(tc.name, func(t *testing.T) { assert := assert.New(t) @@ -65,6 +78,8 @@ func TestCreateOrUpdateTessenConfig(t *testing.T) { On("CreateOrUpdateTessenConfig", mock.Anything, mock.Anything). Return(tc.storeErr) + bus.On("Publish", mock.Anything, mock.Anything).Return(tc.busErr) + err := actions.CreateOrUpdate(tc.ctx, tc.argument) if tc.expectedErr { @@ -114,7 +129,8 @@ func TestGetTessenConfig(t *testing.T) { for _, tc := range testCases { store := &mockstore.MockStore{} - actions := NewTessenController(store) + bus := &mockbus.MockBus{} + actions := NewTessenController(store, bus) t.Run(tc.name, func(t *testing.T) { assert := assert.New(t) diff --git a/backend/apid/apid.go b/backend/apid/apid.go index 62c85adcf0..4cad66d84f 100644 --- a/backend/apid/apid.go +++ b/backend/apid/apid.go @@ -258,7 +258,7 @@ func registerRestrictedResources(router *mux.Router, store store.Store, getter t routers.NewRolesRouter(store), routers.NewRoleBindingsRouter(store), routers.NewSilencedRouter(store), - routers.NewTessenRouter(actions.NewTessenController(store)), + routers.NewTessenRouter(actions.NewTessenController(store, bus)), routers.NewUsersRouter(store), ) } diff --git a/backend/backend.go b/backend/backend.go index ba17f194fb..d1f20ec26d 100644 --- a/backend/backend.go +++ b/backend/backend.go @@ -243,6 +243,7 @@ func Initialize(config *Config) (*Backend, error) { Store: store, RingPool: ringPool, Client: b.Client, + Bus: bus, }) if err != nil { return nil, fmt.Errorf("error initializing %s: %s", tessen.Name(), err) diff --git a/backend/messaging/message_bus.go b/backend/messaging/message_bus.go index 0553f19423..a0647ec3f4 100644 --- a/backend/messaging/message_bus.go +++ b/backend/messaging/message_bus.go @@ -22,6 +22,9 @@ const ( // TopicSubscriptions is the topic prefix for each subscription TopicSubscriptions = "sensu:check" + + // TopicTessen is the topic prefix for tessen api events to Tessend. + TopicTessen = "sensu:tessen" ) // A Subscriber receives messages via a channel. diff --git a/backend/store/etcd/entity_store.go b/backend/store/etcd/entity_store.go index a66eb6bacb..cdd4e46bb0 100644 --- a/backend/store/etcd/entity_store.go +++ b/backend/store/etcd/entity_store.go @@ -23,7 +23,8 @@ func getEntityPath(entity *corev2.Entity) string { return entityKeyBuilder.WithResource(entity).Build(entity.Name) } -func getEntitiesPath(ctx context.Context, name string) string { +// GetEntitiesPath gets the path of the entity store +func GetEntitiesPath(ctx context.Context, name string) string { return entityKeyBuilder.WithContext(ctx).Build(name) } @@ -42,7 +43,7 @@ func (s *Store) DeleteEntityByName(ctx context.Context, name string) error { return errors.New("must specify name") } - _, err := s.client.Delete(ctx, getEntitiesPath(ctx, name)) + _, err := s.client.Delete(ctx, GetEntitiesPath(ctx, name)) return err } @@ -52,7 +53,7 @@ func (s *Store) GetEntityByName(ctx context.Context, name string) (*corev2.Entit return nil, errors.New("must specify name") } - resp, err := s.client.Get(ctx, getEntitiesPath(ctx, name), clientv3.WithLimit(1)) + resp, err := s.client.Get(ctx, GetEntitiesPath(ctx, name), clientv3.WithLimit(1)) if err != nil { return nil, err } @@ -76,7 +77,7 @@ func (s *Store) GetEntityByName(ctx context.Context, name string) (*corev2.Entit // GetEntities returns the entities for the namespace in the supplied context. func (s *Store) GetEntities(ctx context.Context, pred *store.SelectionPredicate) ([]*corev2.Entity, error) { entities := []*corev2.Entity{} - err := List(ctx, s.client, getEntitiesPath, &entities, pred) + err := List(ctx, s.client, GetEntitiesPath, &entities, pred) return entities, err } diff --git a/backend/store/etcd/store.go b/backend/store/etcd/store.go index 67b3f3f6e4..101c81c93a 100644 --- a/backend/store/etcd/store.go +++ b/backend/store/etcd/store.go @@ -254,6 +254,22 @@ func Update(ctx context.Context, client *clientv3.Client, key, namespace string, return nil } +// Count retrieves the count of all keys from storage under the +// provided prefix key, while supporting all namespaces. +func Count(ctx context.Context, client *clientv3.Client, key string) (int64, error) { + opts := []clientv3.OpOption{ + clientv3.WithCountOnly(), + clientv3.WithRange(clientv3.GetPrefixRangeEnd(key)), + } + + resp, err := client.Get(ctx, key, opts...) + if err != nil { + return 0, err + } + + return resp.Count, nil +} + func getKey(key string) clientv3.Op { return clientv3.OpGet(key) } diff --git a/backend/store/etcd/store_test.go b/backend/store/etcd/store_test.go index aaa8481dcc..2b75ef1e8a 100644 --- a/backend/store/etcd/store_test.go +++ b/backend/store/etcd/store_test.go @@ -378,3 +378,41 @@ func TestUpdate(t *testing.T) { assert.Equal(t, 2, obj.Revision) }) } + +func TestCount(t *testing.T) { + testWithEtcdStore(t, func(s *Store) { + // Create a second namespace + require.NoError(t, s.CreateNamespace(context.Background(), types.FixtureNamespace("acme"))) + + // Create a bunch of keys everywhere + obj1 := &genericObject{ObjectMeta: corev2.ObjectMeta{Name: "obj1", Namespace: "default"}} + ctx := context.WithValue(context.Background(), types.NamespaceKey, "default") + require.NoError(t, Create(ctx, s.client, "/sensu.io/generic/default/obj1", "default", obj1)) + + obj2 := &genericObject{ObjectMeta: corev2.ObjectMeta{Name: "obj2", Namespace: "acme"}} + ctx = context.WithValue(context.Background(), types.NamespaceKey, "acme") + require.NoError(t, Create(ctx, s.client, "/sensu.io/generic/acme/obj2", "acme", obj2)) + + obj3 := &genericObject{ObjectMeta: corev2.ObjectMeta{Name: "obj3", Namespace: "acme"}} + ctx = context.WithValue(context.Background(), types.NamespaceKey, "acme") + require.NoError(t, Create(ctx, s.client, "/sensu.io/generic/acme/obj3", "acme", obj3)) + + // We should have 1 object when listing keys under the default namespace + ctx = context.WithValue(context.Background(), types.NamespaceKey, "default") + count, err := Count(ctx, s.client, getGenericObjectsPath(ctx, "")) + require.NoError(t, err) + assert.Equal(t, int64(1), count) + + // We should have 2 objects when listing keys under the acme namespace + ctx = context.WithValue(context.Background(), types.NamespaceKey, "acme") + count, err = Count(ctx, s.client, getGenericObjectsPath(ctx, "")) + require.NoError(t, err) + assert.Equal(t, int64(2), count) + + // We should have 3 objects when listing through all namespaces + ctx = context.WithValue(context.Background(), types.NamespaceKey, "") + count, err = Count(ctx, s.client, getGenericObjectsPath(ctx, "")) + require.NoError(t, err) + assert.Equal(t, int64(3), count) + }) +} diff --git a/backend/tessend/tessend.go b/backend/tessend/tessend.go index cd9ec8f950..2087fdd1a4 100644 --- a/backend/tessend/tessend.go +++ b/backend/tessend/tessend.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "io" "io/ioutil" "net/http" "strconv" @@ -14,6 +15,7 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/google/uuid" corev2 "github.com/sensu/sensu-go/api/core/v2" + "github.com/sensu/sensu-go/backend/messaging" "github.com/sensu/sensu-go/backend/ringv2" "github.com/sensu/sensu-go/backend/store" "github.com/sensu/sensu-go/backend/store/etcd" @@ -22,9 +24,17 @@ import ( ) const ( + // componentName identifies Tessend as the component/daemon implemented in this + // package. + componentName = "tessend" + // tessenURL is the http endpoint for the tessen service. tessenURL = "https://tessen.sensu.io/v2/data" + // tessenIntervalHeader is the name of the header that the tessen service + // will return to update the reporting interval of the tessen daemon. + tessenIntervalHeader = "tessen-reporting-interval" + // ringUpdateInterval is the interval, in seconds, that TessenD will // update the ring with any added/removed cluster members. ringUpdateInterval = 450 * time.Second @@ -36,16 +46,19 @@ const ( // Tessend is the tessen daemon. type Tessend struct { - interval uint32 - store store.Store - ctx context.Context - cancel context.CancelFunc - errChan chan error - ring *ringv2.Ring - interrupt chan *corev2.TessenConfig - client *clientv3.Client - url string - backendID string + interval uint32 + store store.Store + ctx context.Context + cancel context.CancelFunc + errChan chan error + ring *ringv2.Ring + interrupt chan *corev2.TessenConfig + client *clientv3.Client + url string + backendID string + bus messaging.MessageBus + messageChan chan interface{} + subscription messaging.Subscription } // Option is a functional option. @@ -56,17 +69,20 @@ type Config struct { Store store.Store RingPool *ringv2.Pool Client *clientv3.Client + Bus messaging.MessageBus } // New creates a new TessenD. func New(c Config, opts ...Option) (*Tessend, error) { t := &Tessend{ - interval: corev2.DefaultTessenInterval, - store: c.Store, - client: c.Client, - errChan: make(chan error, 1), - url: tessenURL, - backendID: uuid.New().String(), + interval: corev2.DefaultTessenInterval, + store: c.Store, + client: c.Client, + errChan: make(chan error, 1), + url: tessenURL, + backendID: uuid.New().String(), + bus: c.Bus, + messageChan: make(chan interface{}, 1), } t.ctx, t.cancel = context.WithCancel(context.Background()) t.interrupt = make(chan *corev2.TessenConfig, 1) @@ -93,6 +109,13 @@ func (t *Tessend) Start() error { return err } + sub, err := t.bus.Subscribe(messaging.TopicTessen, componentName, t) + t.subscription = sub + if err != nil { + return err + } + + go t.startMessageHandler() go t.startWatcher() go t.startRingUpdates() go t.start(tessen) @@ -111,8 +134,11 @@ func (t *Tessend) Stop() error { } else { logger.WithField("key", t.backendID).Debug("removed a key from the ring") } + if err := t.subscription.Cancel(); err != nil { + logger.WithError(err).Error("unable to unsubscribe from message bus") + } t.cancel() - close(t.errChan) + close(t.messageChan) return nil } @@ -123,7 +149,38 @@ func (t *Tessend) Err() <-chan error { // Name returns the daemon name. func (t *Tessend) Name() string { - return "tessend" + return componentName +} + +// Receiver returns the tessen receiver channel. +func (t *Tessend) Receiver() chan<- interface{} { + return t.messageChan +} + +func (t *Tessend) startMessageHandler() { + for { + msg, ok := <-t.messageChan + if !ok { + logger.Debug("tessen message channel closed") + return + } + + tessen, ok := msg.(*corev2.TessenConfig) + if !ok { + logger.WithField("msg", msg).Errorf("received non-TessenConfig on tessen message channel") + return + } + + data := t.getDataPayload() + t.getTessenConfigMetrics(time.Now().UTC().Unix(), tessen, data) + logger.WithFields(logrus.Fields{ + "url": t.url, + "id": data.Cluster.ID, + "opt-out": tessen.OptOut, + data.Metrics.Points[0].Name: data.Metrics.Points[0].Value, + }).Info("sending opt-out status event to tessen") + _ = t.send(data) + } } // startWatcher watches the TessenConfig store for changes to the opt-out configuration. @@ -264,7 +321,8 @@ func (t *Tessend) enabled(tessen *corev2.TessenConfig) bool { // Errors are logged and tessen continues to the best of its ability. func (t *Tessend) collectAndSend(tessen *corev2.TessenConfig) { // collect data - data := t.collect(time.Now().UTC().Unix()) + data := t.getDataPayload() + t.getTessenMetrics(time.Now().UTC().Unix(), data) logger.WithFields(logrus.Fields{ "url": t.url, @@ -274,19 +332,14 @@ func (t *Tessend) collectAndSend(tessen *corev2.TessenConfig) { }).Info("sending data to tessen") // send data - resp, err := t.send(data) - if err != nil { - logger.WithError(err).Error("tessen phone-home service failed") - return - } - if resp.StatusCode >= 400 { - body, _ := ioutil.ReadAll(resp.Body) - logger.Errorf("bad status: %d (%q)", resp.StatusCode, string(body)) + respHeader := t.send(data) + if respHeader == "" { + logger.Debug("no tessen response header") return } // parse the response header for an integer value - interval, err := strconv.ParseUint(resp.Header.Get("tessen-reporting-interval"), 10, 32) + interval, err := strconv.ParseUint(respHeader, 10, 32) if err != nil { logger.Debugf("invalid tessen response header: %v", err) return @@ -307,35 +360,25 @@ func (t *Tessend) collectAndSend(tessen *corev2.TessenConfig) { } } -// collect data and populate the data payload -func (t *Tessend) collect(now int64) *Data { +// getDataPayload retrieves cluster, version, and license information +// and returns the populated data payload. +func (t *Tessend) getDataPayload() *Data { var clusterID string - var entityCount, backendCount float64 - // collect client count - entities, err := t.store.GetEntities(t.ctx, &store.SelectionPredicate{}) + // collect cluster id + cluster, err := t.client.Cluster.MemberList(t.ctx) if err != nil { - logger.WithError(err).Error("unable to retrieve client count") + logger.WithError(err).Error("unable to retrieve cluster id") } - if entities != nil { - entityCount = float64(len(entities)) - } - - // collect server count and cluster id - servers, err := t.client.Cluster.MemberList(t.ctx) - if err != nil { - logger.WithError(err).Error("unable to retrieve cluster information") - } - if servers != nil { - clusterID = fmt.Sprintf("%x", servers.Header.ClusterId) - backendCount = float64(len(servers.Members)) + if cluster != nil { + clusterID = fmt.Sprintf("%x", cluster.Header.ClusterId) } // collect license information wrapper := &Wrapper{} err = etcd.Get(t.ctx, t.client, licenseStorePath, wrapper) if err != nil { - logger.Debugf("cannot retrieve license: %v", err) + logger.WithError(err).Debug("unable to retrieve license") } // populate data payload @@ -345,27 +388,82 @@ func (t *Tessend) collect(now int64) *Data { Version: version.Semver(), License: wrapper.Value.License, }, - Metrics: corev2.Metrics{ - Points: []*corev2.MetricPoint{ - &corev2.MetricPoint{ - Name: "entity_count", - Value: entityCount, - Timestamp: now, - }, - &corev2.MetricPoint{ - Name: "backend_count", - Value: backendCount, - Timestamp: now, - }, + } + + return data +} + +// getTessenMetrics populates the data payload with # backends/entities. +func (t *Tessend) getTessenMetrics(now int64, data *Data) { + var entityCount, backendCount float64 + + // collect entity count + entities, err := etcd.Count(t.ctx, t.client, etcd.GetEntitiesPath(t.ctx, "")) + if err != nil { + logger.WithError(err).Error("unable to retrieve entity count") + } + entityCount = float64(entities) + + // collect backend count + cluster, err := t.client.Cluster.MemberList(t.ctx) + if err != nil { + logger.WithError(err).Error("unable to retrieve backend count") + } + if cluster != nil { + backendCount = float64(len(cluster.Members)) + } + + // populate data payload + data.Metrics = corev2.Metrics{ + Points: []*corev2.MetricPoint{ + &corev2.MetricPoint{ + Name: "entity_count", + Value: entityCount, + Timestamp: now, + }, + &corev2.MetricPoint{ + Name: "backend_count", + Value: backendCount, + Timestamp: now, }, }, } +} - return data +// getTessenConfigMetrics populates the data payload with an opt-out status event. +func (t *Tessend) getTessenConfigMetrics(now int64, tessen *corev2.TessenConfig, data *Data) { + data.Metrics = corev2.Metrics{ + Points: []*corev2.MetricPoint{ + &corev2.MetricPoint{ + Name: "tessen_config_update", + Value: 1, + Timestamp: now, + Tags: []*corev2.MetricTag{ + &corev2.MetricTag{ + Name: "opt_out", + Value: strconv.FormatBool(tessen.OptOut), + }, + }, + }, + }, + } } -// send the data payload to the tessen url -func (t *Tessend) send(data *Data) (*http.Response, error) { +// send sends the data payload to the tessen url and retrieves the interval response header. +func (t *Tessend) send(data *Data) string { b, _ := json.Marshal(data) - return http.Post(t.url, "application/json", bytes.NewBuffer(b)) + resp, err := http.Post(t.url, "application/json", bytes.NewBuffer(b)) + // TODO(nikki): special case logs on a per error basis + if err != nil { + logger.WithError(err).Error("tessen phone-home service failed") + return "" + } + defer resp.Body.Close() + if resp.StatusCode >= 400 { + body, _ := ioutil.ReadAll(io.LimitReader(resp.Body, 4096)) + logger.Errorf("bad status: %d (%q)", resp.StatusCode, string(body)) + return "" + } + + return resp.Header.Get(tessenIntervalHeader) } diff --git a/backend/tessend/tessend_test.go b/backend/tessend/tessend_test.go index c315297a0e..d8dd05a8b1 100644 --- a/backend/tessend/tessend_test.go +++ b/backend/tessend/tessend_test.go @@ -3,6 +3,7 @@ package tessend import ( + "fmt" "net/http" "net/http/httptest" "testing" @@ -10,13 +11,21 @@ import ( corev2 "github.com/sensu/sensu-go/api/core/v2" "github.com/sensu/sensu-go/backend/etcd" + "github.com/sensu/sensu-go/backend/messaging" "github.com/sensu/sensu-go/backend/ringv2" - etcdstore "github.com/sensu/sensu-go/backend/store/etcd" + "github.com/sensu/sensu-go/backend/store" + "github.com/sensu/sensu-go/testing/mockstore" + "github.com/sensu/sensu-go/types" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) func newTessendTest(t *testing.T) *Tessend { + bus, err := messaging.NewWizardBus(messaging.WizardBusConfig{}) + require.NoError(t, err) + require.NoError(t, bus.Start()) + e, cleanup := etcd.NewTestEtcd(t) defer cleanup() @@ -26,15 +35,43 @@ func newTessendTest(t *testing.T) *Tessend { } defer client.Close() + s := &mockstore.MockStore{} + pred := &store.SelectionPredicate{} + ch := make(<-chan store.WatchEventTessenConfig) + s.On("GetEntities", mock.Anything, pred).Return([]*types.Entity{ + types.FixtureEntity("entity1"), + types.FixtureEntity("entity2"), + }, nil) + s.On("CreateOrUpdateTessenConfig", mock.Anything, mock.Anything).Return(fmt.Errorf("foo")) + s.On("GetTessenConfig", mock.Anything, mock.Anything).Return(corev2.DefaultTessenConfig(), fmt.Errorf("foo")) + s.On("GetTessenConfigWatcher", mock.Anything).Return(ch) + tessend, err := New(Config{ - Store: etcdstore.NewStore(client, e.Name()), + Store: s, Client: client, RingPool: ringv2.NewPool(client), + Bus: bus, }) require.NoError(t, err) return tessend } +func TestTessendBus(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.NotEmpty(t, r.Body) + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + tessend := newTessendTest(t) + tessend.url = ts.URL + require.NoError(t, tessend.Start()) + require.NoError(t, tessend.bus.Publish(messaging.TopicTessen, corev2.DefaultTessenConfig())) + time.Sleep(2 * time.Second) + assert.NoError(t, tessend.Stop()) + assert.Equal(t, tessend.Name(), "tessend") +} + func TestTessend200(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("tessen-reporting-interval", "1800")