From 5f078f4a09a5209c996b9ef044d30516b30923c2 Mon Sep 17 00:00:00 2001 From: Nikki Attea Date: Thu, 18 Apr 2019 14:51:49 -0700 Subject: [PATCH 1/4] Added the message bus to Tessend in order to track Tessen configuration changes from the API Signed-off-by: Nikki Attea --- CHANGELOG.md | 3 + backend/apid/actions/tessen.go | 10 +- backend/apid/actions/tessen_test.go | 22 ++- backend/apid/apid.go | 2 +- backend/backend.go | 1 + backend/messaging/message_bus.go | 3 + backend/tessend/tessend.go | 230 ++++++++++++++++++++-------- backend/tessend/tessend_test.go | 41 ++++- 8 files changed, 242 insertions(+), 70 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a5bbc8d3b6..7c5f703ba8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,9 @@ Versioning](http://semver.org/spec/v2.0.0.html). ### Fixed - Fixed the agent `--annotations` and `--labels` flags. +### Added +- Added the message bus to Tessend in order to track Tessen configuration changes from the API. + ## [5.5.1] - 2019-04-15 ### Changed diff --git a/backend/apid/actions/tessen.go b/backend/apid/actions/tessen.go index 5de126e5fc..49a6ef1835 100644 --- a/backend/apid/actions/tessen.go +++ b/backend/apid/actions/tessen.go @@ -2,6 +2,7 @@ package actions import ( corev2 "github.com/sensu/sensu-go/api/core/v2" + "github.com/sensu/sensu-go/backend/messaging" "github.com/sensu/sensu-go/backend/store" "golang.org/x/net/context" ) @@ -9,12 +10,14 @@ import ( // TessenController exposes actions which a viewer can perform type TessenController struct { store store.TessenConfigStore + bus messaging.MessageBus } // NewTessenController returns a new TessenController -func NewTessenController(store store.TessenConfigStore) TessenController { +func NewTessenController(store store.TessenConfigStore, bus messaging.MessageBus) TessenController { return TessenController{ store: store, + bus: bus, } } @@ -29,6 +32,11 @@ func (c TessenController) CreateOrUpdate(ctx context.Context, config *corev2.Tes } } + // Publish to Tessend + if err := c.bus.Publish(messaging.TopicTessen, config); err != nil { + return NewError(InternalErr, err) + } + return nil } diff --git a/backend/apid/actions/tessen_test.go b/backend/apid/actions/tessen_test.go index b9198ff2e6..5387085661 100644 --- a/backend/apid/actions/tessen_test.go +++ b/backend/apid/actions/tessen_test.go @@ -7,6 +7,7 @@ import ( corev2 "github.com/sensu/sensu-go/api/core/v2" "github.com/sensu/sensu-go/backend/store" + "github.com/sensu/sensu-go/testing/mockbus" "github.com/sensu/sensu-go/testing/mockstore" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -16,10 +17,12 @@ func TestNewTessenController(t *testing.T) { assert := assert.New(t) store := &mockstore.MockStore{} - actions := NewTessenController(store) + bus := &mockbus.MockBus{} + actions := NewTessenController(store, bus) assert.NotNil(actions) assert.Equal(store, actions.store) + assert.Equal(bus, actions.bus) } func TestCreateOrUpdateTessenConfig(t *testing.T) { @@ -28,6 +31,7 @@ func TestCreateOrUpdateTessenConfig(t *testing.T) { ctx context.Context argument *corev2.TessenConfig storeErr error + busErr error expectedErr bool expectedErrCode ErrCode }{ @@ -52,11 +56,20 @@ func TestCreateOrUpdateTessenConfig(t *testing.T) { expectedErr: true, expectedErrCode: InternalErr, }, + { + name: "Bus Error", + ctx: context.Background(), + argument: corev2.DefaultTessenConfig(), + busErr: errors.New("the bus has a flat tire"), + expectedErr: true, + expectedErrCode: InternalErr, + }, } for _, tc := range testCases { store := &mockstore.MockStore{} - actions := NewTessenController(store) + bus := &mockbus.MockBus{} + actions := NewTessenController(store, bus) t.Run(tc.name, func(t *testing.T) { assert := assert.New(t) @@ -65,6 +78,8 @@ func TestCreateOrUpdateTessenConfig(t *testing.T) { On("CreateOrUpdateTessenConfig", mock.Anything, mock.Anything). Return(tc.storeErr) + bus.On("Publish", mock.Anything, mock.Anything).Return(tc.busErr) + err := actions.CreateOrUpdate(tc.ctx, tc.argument) if tc.expectedErr { @@ -114,7 +129,8 @@ func TestGetTessenConfig(t *testing.T) { for _, tc := range testCases { store := &mockstore.MockStore{} - actions := NewTessenController(store) + bus := &mockbus.MockBus{} + actions := NewTessenController(store, bus) t.Run(tc.name, func(t *testing.T) { assert := assert.New(t) diff --git a/backend/apid/apid.go b/backend/apid/apid.go index 62c85adcf0..4cad66d84f 100644 --- a/backend/apid/apid.go +++ b/backend/apid/apid.go @@ -258,7 +258,7 @@ func registerRestrictedResources(router *mux.Router, store store.Store, getter t routers.NewRolesRouter(store), routers.NewRoleBindingsRouter(store), routers.NewSilencedRouter(store), - routers.NewTessenRouter(actions.NewTessenController(store)), + routers.NewTessenRouter(actions.NewTessenController(store, bus)), routers.NewUsersRouter(store), ) } diff --git a/backend/backend.go b/backend/backend.go index ba17f194fb..d1f20ec26d 100644 --- a/backend/backend.go +++ b/backend/backend.go @@ -243,6 +243,7 @@ func Initialize(config *Config) (*Backend, error) { Store: store, RingPool: ringPool, Client: b.Client, + Bus: bus, }) if err != nil { return nil, fmt.Errorf("error initializing %s: %s", tessen.Name(), err) diff --git a/backend/messaging/message_bus.go b/backend/messaging/message_bus.go index 0553f19423..a0647ec3f4 100644 --- a/backend/messaging/message_bus.go +++ b/backend/messaging/message_bus.go @@ -22,6 +22,9 @@ const ( // TopicSubscriptions is the topic prefix for each subscription TopicSubscriptions = "sensu:check" + + // TopicTessen is the topic prefix for tessen api events to Tessend. + TopicTessen = "sensu:tessen" ) // A Subscriber receives messages via a channel. diff --git a/backend/tessend/tessend.go b/backend/tessend/tessend.go index cd9ec8f950..cdf3bd5079 100644 --- a/backend/tessend/tessend.go +++ b/backend/tessend/tessend.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io/ioutil" "net/http" @@ -14,6 +15,7 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/google/uuid" corev2 "github.com/sensu/sensu-go/api/core/v2" + "github.com/sensu/sensu-go/backend/messaging" "github.com/sensu/sensu-go/backend/ringv2" "github.com/sensu/sensu-go/backend/store" "github.com/sensu/sensu-go/backend/store/etcd" @@ -22,9 +24,17 @@ import ( ) const ( + // componentName identifies Tessend as the component/daemon implemented in this + // package. + componentName = "tessend" + // tessenURL is the http endpoint for the tessen service. tessenURL = "https://tessen.sensu.io/v2/data" + // tessenIntervalHeader is the name of the header that the tessen service + // will return to update the reporting interval of the tessen daemon. + tessenIntervalHeader = "tessen-reporting-interval" + // ringUpdateInterval is the interval, in seconds, that TessenD will // update the ring with any added/removed cluster members. ringUpdateInterval = 450 * time.Second @@ -36,16 +46,19 @@ const ( // Tessend is the tessen daemon. type Tessend struct { - interval uint32 - store store.Store - ctx context.Context - cancel context.CancelFunc - errChan chan error - ring *ringv2.Ring - interrupt chan *corev2.TessenConfig - client *clientv3.Client - url string - backendID string + interval uint32 + store store.Store + ctx context.Context + cancel context.CancelFunc + errChan chan error + ring *ringv2.Ring + interrupt chan *corev2.TessenConfig + client *clientv3.Client + url string + backendID string + bus messaging.MessageBus + messageChan chan interface{} + subscription messaging.Subscription } // Option is a functional option. @@ -56,17 +69,20 @@ type Config struct { Store store.Store RingPool *ringv2.Pool Client *clientv3.Client + Bus messaging.MessageBus } // New creates a new TessenD. func New(c Config, opts ...Option) (*Tessend, error) { t := &Tessend{ - interval: corev2.DefaultTessenInterval, - store: c.Store, - client: c.Client, - errChan: make(chan error, 1), - url: tessenURL, - backendID: uuid.New().String(), + interval: corev2.DefaultTessenInterval, + store: c.Store, + client: c.Client, + errChan: make(chan error, 1), + url: tessenURL, + backendID: uuid.New().String(), + bus: c.Bus, + messageChan: make(chan interface{}, 1), } t.ctx, t.cancel = context.WithCancel(context.Background()) t.interrupt = make(chan *corev2.TessenConfig, 1) @@ -93,6 +109,13 @@ func (t *Tessend) Start() error { return err } + sub, err := t.bus.Subscribe(messaging.TopicTessen, componentName, t) + t.subscription = sub + if err != nil { + return err + } + + go t.startMessageHandler() go t.startWatcher() go t.startRingUpdates() go t.start(tessen) @@ -111,8 +134,11 @@ func (t *Tessend) Stop() error { } else { logger.WithField("key", t.backendID).Debug("removed a key from the ring") } + if err := t.subscription.Cancel(); err != nil { + logger.WithError(err).Error("unable to unsubscribe from message bus") + } t.cancel() - close(t.errChan) + close(t.messageChan) return nil } @@ -123,7 +149,43 @@ func (t *Tessend) Err() <-chan error { // Name returns the daemon name. func (t *Tessend) Name() string { - return "tessend" + return componentName +} + +// Receiver returns the tessen receiver channel. +func (t *Tessend) Receiver() chan<- interface{} { + return t.messageChan +} + +func (t *Tessend) startMessageHandler() { + for { + select { + case msg, ok := <-t.messageChan: + if !ok { + select { + case t.errChan <- errors.New("tessen message channel closed"): + default: + } + return + } + + tessen, ok := msg.(*corev2.TessenConfig) + if !ok { + logger.WithField("msg", msg).Errorf("received non-TessenConfig on tessen message channel") + return + } + + data := t.getDataPayload() + t.getTessenConfigMetrics(time.Now().UTC().Unix(), tessen, data) + logger.WithFields(logrus.Fields{ + "url": t.url, + "id": data.Cluster.ID, + "opt-out": tessen.OptOut, + data.Metrics.Points[0].Name: data.Metrics.Points[0].Value, + }).Info("sending opt-out status event to tessen") + _ = t.send(data) + } + } } // startWatcher watches the TessenConfig store for changes to the opt-out configuration. @@ -264,7 +326,8 @@ func (t *Tessend) enabled(tessen *corev2.TessenConfig) bool { // Errors are logged and tessen continues to the best of its ability. func (t *Tessend) collectAndSend(tessen *corev2.TessenConfig) { // collect data - data := t.collect(time.Now().UTC().Unix()) + data := t.getDataPayload() + t.getTessenMetrics(time.Now().UTC().Unix(), data) logger.WithFields(logrus.Fields{ "url": t.url, @@ -274,19 +337,14 @@ func (t *Tessend) collectAndSend(tessen *corev2.TessenConfig) { }).Info("sending data to tessen") // send data - resp, err := t.send(data) - if err != nil { - logger.WithError(err).Error("tessen phone-home service failed") - return - } - if resp.StatusCode >= 400 { - body, _ := ioutil.ReadAll(resp.Body) - logger.Errorf("bad status: %d (%q)", resp.StatusCode, string(body)) + respHeader := t.send(data) + if respHeader == "" { + logger.Debug("no tessen response header") return } // parse the response header for an integer value - interval, err := strconv.ParseUint(resp.Header.Get("tessen-reporting-interval"), 10, 32) + interval, err := strconv.ParseUint(respHeader, 10, 32) if err != nil { logger.Debugf("invalid tessen response header: %v", err) return @@ -307,35 +365,25 @@ func (t *Tessend) collectAndSend(tessen *corev2.TessenConfig) { } } -// collect data and populate the data payload -func (t *Tessend) collect(now int64) *Data { +// getDataPayload retrieves cluster, version, and license information +// and returns the populated data payload. +func (t *Tessend) getDataPayload() *Data { var clusterID string - var entityCount, backendCount float64 - - // collect client count - entities, err := t.store.GetEntities(t.ctx, &store.SelectionPredicate{}) - if err != nil { - logger.WithError(err).Error("unable to retrieve client count") - } - if entities != nil { - entityCount = float64(len(entities)) - } - // collect server count and cluster id - servers, err := t.client.Cluster.MemberList(t.ctx) + // collect cluster id + cluster, err := t.client.Cluster.MemberList(t.ctx) if err != nil { - logger.WithError(err).Error("unable to retrieve cluster information") + logger.WithError(err).Error("unable to retrieve cluster id") } - if servers != nil { - clusterID = fmt.Sprintf("%x", servers.Header.ClusterId) - backendCount = float64(len(servers.Members)) + if cluster != nil { + clusterID = fmt.Sprintf("%x", cluster.Header.ClusterId) } // collect license information wrapper := &Wrapper{} err = etcd.Get(t.ctx, t.client, licenseStorePath, wrapper) if err != nil { - logger.Debugf("cannot retrieve license: %v", err) + logger.WithError(err).Debug("unable to retrieve license") } // populate data payload @@ -345,27 +393,83 @@ func (t *Tessend) collect(now int64) *Data { Version: version.Semver(), License: wrapper.Value.License, }, - Metrics: corev2.Metrics{ - Points: []*corev2.MetricPoint{ - &corev2.MetricPoint{ - Name: "entity_count", - Value: entityCount, - Timestamp: now, - }, - &corev2.MetricPoint{ - Name: "backend_count", - Value: backendCount, - Timestamp: now, - }, + } + + return data +} + +// getTessenMetrics populates the data payload with # backends/entities. +func (t *Tessend) getTessenMetrics(now int64, data *Data) { + var entityCount, backendCount float64 + + // collect entity count + entities, err := t.store.GetEntities(t.ctx, &store.SelectionPredicate{}) + if err != nil { + logger.WithError(err).Error("unable to retrieve entity count") + } + if entities != nil { + entityCount = float64(len(entities)) + } + + // collect backend count + cluster, err := t.client.Cluster.MemberList(t.ctx) + if err != nil { + logger.WithError(err).Error("unable to retrieve backend count") + } + if cluster != nil { + backendCount = float64(len(cluster.Members)) + } + + // populate data payload + data.Metrics = corev2.Metrics{ + Points: []*corev2.MetricPoint{ + &corev2.MetricPoint{ + Name: "entity_count", + Value: entityCount, + Timestamp: now, + }, + &corev2.MetricPoint{ + Name: "backend_count", + Value: backendCount, + Timestamp: now, }, }, } +} - return data +// getTessenConfigMetrics populates the data payload with an opt-out status event. +func (t *Tessend) getTessenConfigMetrics(now int64, tessen *corev2.TessenConfig, data *Data) { + data.Metrics = corev2.Metrics{ + Points: []*corev2.MetricPoint{ + &corev2.MetricPoint{ + Name: "tessen_config_update", + Value: 1, + Timestamp: now, + Tags: []*corev2.MetricTag{ + &corev2.MetricTag{ + Name: "opt_out", + Value: strconv.FormatBool(tessen.OptOut), + }, + }, + }, + }, + } } -// send the data payload to the tessen url -func (t *Tessend) send(data *Data) (*http.Response, error) { +// send sends the data payload to the tessen url and retrieves the interval response header. +func (t *Tessend) send(data *Data) string { b, _ := json.Marshal(data) - return http.Post(t.url, "application/json", bytes.NewBuffer(b)) + resp, err := http.Post(t.url, "application/json", bytes.NewBuffer(b)) + // TODO(nikki): special case logs on a per error basis + if err != nil { + logger.WithError(err).Error("tessen phone-home service failed") + return "" + } + if resp.StatusCode >= 400 { + body, _ := ioutil.ReadAll(resp.Body) + logger.Errorf("bad status: %d (%q)", resp.StatusCode, string(body)) + return "" + } + + return resp.Header.Get(tessenIntervalHeader) } diff --git a/backend/tessend/tessend_test.go b/backend/tessend/tessend_test.go index c315297a0e..d8dd05a8b1 100644 --- a/backend/tessend/tessend_test.go +++ b/backend/tessend/tessend_test.go @@ -3,6 +3,7 @@ package tessend import ( + "fmt" "net/http" "net/http/httptest" "testing" @@ -10,13 +11,21 @@ import ( corev2 "github.com/sensu/sensu-go/api/core/v2" "github.com/sensu/sensu-go/backend/etcd" + "github.com/sensu/sensu-go/backend/messaging" "github.com/sensu/sensu-go/backend/ringv2" - etcdstore "github.com/sensu/sensu-go/backend/store/etcd" + "github.com/sensu/sensu-go/backend/store" + "github.com/sensu/sensu-go/testing/mockstore" + "github.com/sensu/sensu-go/types" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) func newTessendTest(t *testing.T) *Tessend { + bus, err := messaging.NewWizardBus(messaging.WizardBusConfig{}) + require.NoError(t, err) + require.NoError(t, bus.Start()) + e, cleanup := etcd.NewTestEtcd(t) defer cleanup() @@ -26,15 +35,43 @@ func newTessendTest(t *testing.T) *Tessend { } defer client.Close() + s := &mockstore.MockStore{} + pred := &store.SelectionPredicate{} + ch := make(<-chan store.WatchEventTessenConfig) + s.On("GetEntities", mock.Anything, pred).Return([]*types.Entity{ + types.FixtureEntity("entity1"), + types.FixtureEntity("entity2"), + }, nil) + s.On("CreateOrUpdateTessenConfig", mock.Anything, mock.Anything).Return(fmt.Errorf("foo")) + s.On("GetTessenConfig", mock.Anything, mock.Anything).Return(corev2.DefaultTessenConfig(), fmt.Errorf("foo")) + s.On("GetTessenConfigWatcher", mock.Anything).Return(ch) + tessend, err := New(Config{ - Store: etcdstore.NewStore(client, e.Name()), + Store: s, Client: client, RingPool: ringv2.NewPool(client), + Bus: bus, }) require.NoError(t, err) return tessend } +func TestTessendBus(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.NotEmpty(t, r.Body) + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + tessend := newTessendTest(t) + tessend.url = ts.URL + require.NoError(t, tessend.Start()) + require.NoError(t, tessend.bus.Publish(messaging.TopicTessen, corev2.DefaultTessenConfig())) + time.Sleep(2 * time.Second) + assert.NoError(t, tessend.Stop()) + assert.Equal(t, tessend.Name(), "tessend") +} + func TestTessend200(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("tessen-reporting-interval", "1800") From ee35d341e6e4dcd94300b879c753b74256c27e67 Mon Sep 17 00:00:00 2001 From: Nikki Attea Date: Thu, 18 Apr 2019 15:11:03 -0700 Subject: [PATCH 2/4] Add default Signed-off-by: Nikki Attea --- backend/tessend/tessend.go | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/tessend/tessend.go b/backend/tessend/tessend.go index cdf3bd5079..0a506046e9 100644 --- a/backend/tessend/tessend.go +++ b/backend/tessend/tessend.go @@ -184,6 +184,7 @@ func (t *Tessend) startMessageHandler() { data.Metrics.Points[0].Name: data.Metrics.Points[0].Value, }).Info("sending opt-out status event to tessen") _ = t.send(data) + default: } } } From dca9861905abc3b6f031cce8f8af3fb8ac8f5e93 Mon Sep 17 00:00:00 2001 From: Nikki Attea Date: Thu, 18 Apr 2019 15:42:29 -0700 Subject: [PATCH 3/4] Don't use select block Signed-off-by: Nikki Attea --- backend/tessend/tessend.go | 45 ++++++++++++++++++-------------------- 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/backend/tessend/tessend.go b/backend/tessend/tessend.go index 0a506046e9..4f59a8d018 100644 --- a/backend/tessend/tessend.go +++ b/backend/tessend/tessend.go @@ -159,33 +159,30 @@ func (t *Tessend) Receiver() chan<- interface{} { func (t *Tessend) startMessageHandler() { for { - select { - case msg, ok := <-t.messageChan: - if !ok { - select { - case t.errChan <- errors.New("tessen message channel closed"): - default: - } - return - } - - tessen, ok := msg.(*corev2.TessenConfig) - if !ok { - logger.WithField("msg", msg).Errorf("received non-TessenConfig on tessen message channel") - return + msg, ok := <-t.messageChan + if !ok { + select { + case t.errChan <- errors.New("tessen message channel closed"): + default: } + return + } - data := t.getDataPayload() - t.getTessenConfigMetrics(time.Now().UTC().Unix(), tessen, data) - logger.WithFields(logrus.Fields{ - "url": t.url, - "id": data.Cluster.ID, - "opt-out": tessen.OptOut, - data.Metrics.Points[0].Name: data.Metrics.Points[0].Value, - }).Info("sending opt-out status event to tessen") - _ = t.send(data) - default: + tessen, ok := msg.(*corev2.TessenConfig) + if !ok { + logger.WithField("msg", msg).Errorf("received non-TessenConfig on tessen message channel") + return } + + data := t.getDataPayload() + t.getTessenConfigMetrics(time.Now().UTC().Unix(), tessen, data) + logger.WithFields(logrus.Fields{ + "url": t.url, + "id": data.Cluster.ID, + "opt-out": tessen.OptOut, + data.Metrics.Points[0].Name: data.Metrics.Points[0].Value, + }).Info("sending opt-out status event to tessen") + _ = t.send(data) } } From 118384001ca4e6309c071ae41d23c0b99c441a15 Mon Sep 17 00:00:00 2001 From: Nikki Attea Date: Mon, 22 Apr 2019 16:22:48 -0700 Subject: [PATCH 4/4] Add Count function to the generic store Signed-off-by: Nikki Attea --- CHANGELOG.md | 1 + backend/store/etcd/entity_store.go | 9 +++---- backend/store/etcd/store.go | 16 +++++++++++++ backend/store/etcd/store_test.go | 38 ++++++++++++++++++++++++++++++ backend/tessend/tessend.go | 16 +++++-------- 5 files changed, 66 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c5f703ba8..810ca21caf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ Versioning](http://semver.org/spec/v2.0.0.html). ### Added - Added the message bus to Tessend in order to track Tessen configuration changes from the API. +- Added a performance optimizing `Count()` function to the generic store. ## [5.5.1] - 2019-04-15 diff --git a/backend/store/etcd/entity_store.go b/backend/store/etcd/entity_store.go index a66eb6bacb..cdd4e46bb0 100644 --- a/backend/store/etcd/entity_store.go +++ b/backend/store/etcd/entity_store.go @@ -23,7 +23,8 @@ func getEntityPath(entity *corev2.Entity) string { return entityKeyBuilder.WithResource(entity).Build(entity.Name) } -func getEntitiesPath(ctx context.Context, name string) string { +// GetEntitiesPath gets the path of the entity store +func GetEntitiesPath(ctx context.Context, name string) string { return entityKeyBuilder.WithContext(ctx).Build(name) } @@ -42,7 +43,7 @@ func (s *Store) DeleteEntityByName(ctx context.Context, name string) error { return errors.New("must specify name") } - _, err := s.client.Delete(ctx, getEntitiesPath(ctx, name)) + _, err := s.client.Delete(ctx, GetEntitiesPath(ctx, name)) return err } @@ -52,7 +53,7 @@ func (s *Store) GetEntityByName(ctx context.Context, name string) (*corev2.Entit return nil, errors.New("must specify name") } - resp, err := s.client.Get(ctx, getEntitiesPath(ctx, name), clientv3.WithLimit(1)) + resp, err := s.client.Get(ctx, GetEntitiesPath(ctx, name), clientv3.WithLimit(1)) if err != nil { return nil, err } @@ -76,7 +77,7 @@ func (s *Store) GetEntityByName(ctx context.Context, name string) (*corev2.Entit // GetEntities returns the entities for the namespace in the supplied context. func (s *Store) GetEntities(ctx context.Context, pred *store.SelectionPredicate) ([]*corev2.Entity, error) { entities := []*corev2.Entity{} - err := List(ctx, s.client, getEntitiesPath, &entities, pred) + err := List(ctx, s.client, GetEntitiesPath, &entities, pred) return entities, err } diff --git a/backend/store/etcd/store.go b/backend/store/etcd/store.go index 67b3f3f6e4..101c81c93a 100644 --- a/backend/store/etcd/store.go +++ b/backend/store/etcd/store.go @@ -254,6 +254,22 @@ func Update(ctx context.Context, client *clientv3.Client, key, namespace string, return nil } +// Count retrieves the count of all keys from storage under the +// provided prefix key, while supporting all namespaces. +func Count(ctx context.Context, client *clientv3.Client, key string) (int64, error) { + opts := []clientv3.OpOption{ + clientv3.WithCountOnly(), + clientv3.WithRange(clientv3.GetPrefixRangeEnd(key)), + } + + resp, err := client.Get(ctx, key, opts...) + if err != nil { + return 0, err + } + + return resp.Count, nil +} + func getKey(key string) clientv3.Op { return clientv3.OpGet(key) } diff --git a/backend/store/etcd/store_test.go b/backend/store/etcd/store_test.go index aaa8481dcc..2b75ef1e8a 100644 --- a/backend/store/etcd/store_test.go +++ b/backend/store/etcd/store_test.go @@ -378,3 +378,41 @@ func TestUpdate(t *testing.T) { assert.Equal(t, 2, obj.Revision) }) } + +func TestCount(t *testing.T) { + testWithEtcdStore(t, func(s *Store) { + // Create a second namespace + require.NoError(t, s.CreateNamespace(context.Background(), types.FixtureNamespace("acme"))) + + // Create a bunch of keys everywhere + obj1 := &genericObject{ObjectMeta: corev2.ObjectMeta{Name: "obj1", Namespace: "default"}} + ctx := context.WithValue(context.Background(), types.NamespaceKey, "default") + require.NoError(t, Create(ctx, s.client, "/sensu.io/generic/default/obj1", "default", obj1)) + + obj2 := &genericObject{ObjectMeta: corev2.ObjectMeta{Name: "obj2", Namespace: "acme"}} + ctx = context.WithValue(context.Background(), types.NamespaceKey, "acme") + require.NoError(t, Create(ctx, s.client, "/sensu.io/generic/acme/obj2", "acme", obj2)) + + obj3 := &genericObject{ObjectMeta: corev2.ObjectMeta{Name: "obj3", Namespace: "acme"}} + ctx = context.WithValue(context.Background(), types.NamespaceKey, "acme") + require.NoError(t, Create(ctx, s.client, "/sensu.io/generic/acme/obj3", "acme", obj3)) + + // We should have 1 object when listing keys under the default namespace + ctx = context.WithValue(context.Background(), types.NamespaceKey, "default") + count, err := Count(ctx, s.client, getGenericObjectsPath(ctx, "")) + require.NoError(t, err) + assert.Equal(t, int64(1), count) + + // We should have 2 objects when listing keys under the acme namespace + ctx = context.WithValue(context.Background(), types.NamespaceKey, "acme") + count, err = Count(ctx, s.client, getGenericObjectsPath(ctx, "")) + require.NoError(t, err) + assert.Equal(t, int64(2), count) + + // We should have 3 objects when listing through all namespaces + ctx = context.WithValue(context.Background(), types.NamespaceKey, "") + count, err = Count(ctx, s.client, getGenericObjectsPath(ctx, "")) + require.NoError(t, err) + assert.Equal(t, int64(3), count) + }) +} diff --git a/backend/tessend/tessend.go b/backend/tessend/tessend.go index 4f59a8d018..2087fdd1a4 100644 --- a/backend/tessend/tessend.go +++ b/backend/tessend/tessend.go @@ -4,8 +4,8 @@ import ( "bytes" "context" "encoding/json" - "errors" "fmt" + "io" "io/ioutil" "net/http" "strconv" @@ -161,10 +161,7 @@ func (t *Tessend) startMessageHandler() { for { msg, ok := <-t.messageChan if !ok { - select { - case t.errChan <- errors.New("tessen message channel closed"): - default: - } + logger.Debug("tessen message channel closed") return } @@ -401,13 +398,11 @@ func (t *Tessend) getTessenMetrics(now int64, data *Data) { var entityCount, backendCount float64 // collect entity count - entities, err := t.store.GetEntities(t.ctx, &store.SelectionPredicate{}) + entities, err := etcd.Count(t.ctx, t.client, etcd.GetEntitiesPath(t.ctx, "")) if err != nil { logger.WithError(err).Error("unable to retrieve entity count") } - if entities != nil { - entityCount = float64(len(entities)) - } + entityCount = float64(entities) // collect backend count cluster, err := t.client.Cluster.MemberList(t.ctx) @@ -463,8 +458,9 @@ func (t *Tessend) send(data *Data) string { logger.WithError(err).Error("tessen phone-home service failed") return "" } + defer resp.Body.Close() if resp.StatusCode >= 400 { - body, _ := ioutil.ReadAll(resp.Body) + body, _ := ioutil.ReadAll(io.LimitReader(resp.Body, 4096)) logger.Errorf("bad status: %d (%q)", resp.StatusCode, string(body)) return "" }