Skip to content

Commit

Permalink
[CM] Allow to unenroll a beats (elastic#9729)
Browse files Browse the repository at this point in the history
When a Beat is unenrolled for CM it will receive a 404. Usually Beats
will threat any errors returned by CM to be transient and will use a
cached version of the configuration, this commit change the logic if a 404 is returned by CM
we will clean the cache and remove any running configuration.

We will log this event as either the beats did not find any
configuration or was unenrolled from CM.

If the error is transient, the Beat will pickup the change on the next
fetch, if its permanent we will log each fetch.

Fixes: elastic#9452

Need backport to 6.5, 6.6, 6.x

(cherry picked from commit 5187335)
  • Loading branch information
ph committed Jan 13, 2019
1 parent 0427e38 commit c881c71
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ https://github.com/elastic/beats/compare/v6.5.4...6.5[Check the HEAD diff]
*Affecting all Beats*

- Enforce validation for the Central Management access token. {issue}9621[9621]
- Allow to unenroll a Beat from the UI. {issue}9452[9452]

*Auditbeat*

Expand Down
15 changes: 14 additions & 1 deletion x-pack/libbeat/management/api/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@ import (
"net/http"
"reflect"

"errors"

"github.com/elastic/beats/libbeat/common/reload"

"github.com/gofrs/uuid"

"github.com/elastic/beats/libbeat/common"
)

var errConfigurationNotFound = errors.New("no configuration found, you need to enroll your Beat")

// ConfigBlock stores a piece of config from central management
type ConfigBlock struct {
Raw map[string]interface{}
Expand Down Expand Up @@ -58,7 +62,11 @@ func (c *Client) Configuration(accessToken string, beatUUID uuid.UUID, configOK
} `json:"configuration_blocks"`
}{}
url := fmt.Sprintf("/api/beats/agent/%s/configuration?validSetting=%t", beatUUID, configOK)
_, err := c.request("GET", url, nil, headers, &resp)
statusCode, err := c.request("GET", url, nil, headers, &resp)
if statusCode == http.StatusNotFound {
return nil, errConfigurationNotFound
}

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -88,3 +96,8 @@ func ConfigBlocksEqual(a, b ConfigBlocks) bool {

return reflect.DeepEqual(a, b)
}

// IsConfigurationNotFound returns true if the configuration was not found.
func IsConfigurationNotFound(err error) bool {
return err == errConfigurationNotFound
}
21 changes: 21 additions & 0 deletions x-pack/libbeat/management/api/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,24 @@ func TestConfigBlocksEqual(t *testing.T) {
})
}
}

func TestUnEnroll(t *testing.T) {
beatUUID, err := uuid.NewV4()
if err != nil {
t.Fatalf("error while generating Beat UUID: %v", err)
}

server, client := newServerClientPair(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Check correct path is used
assert.Equal(t, "/api/beats/agent/"+beatUUID.String()+"/configuration", r.URL.Path)

// Check enrollment token is correct
assert.Equal(t, "thisismyenrollmenttoken", r.Header.Get("kbn-beats-access-token"))

http.NotFound(w, r)
}))
defer server.Close()

_, err = client.Configuration("thisismyenrollmenttoken", beatUUID, false)
assert.True(t, IsConfigurationNotFound(err))
}
5 changes: 5 additions & 0 deletions x-pack/libbeat/management/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,8 @@ func (c *Cache) Save() error {
// move temporary file into final location
return file.SafeFileRotate(path, tempFile)
}

// HasConfig returns true if configs are cached.
func (c *Cache) HasConfig() bool {
return len(c.Configs) > 0
}
38 changes: 38 additions & 0 deletions x-pack/libbeat/management/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package management

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/x-pack/libbeat/management/api"
)

func TestHasConfig(t *testing.T) {
tests := map[string]struct {
configs api.ConfigBlocks
expected bool
}{
"with config": {
configs: api.ConfigBlocks{
api.ConfigBlocksWithType{Type: "metricbeat "},
},
expected: true,
},
"without config": {
configs: api.ConfigBlocks{},
expected: false,
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
cache := Cache{Configs: test.configs}
assert.Equal(t, test.expected, cache.HasConfig())
})
}
}
11 changes: 10 additions & 1 deletion x-pack/libbeat/management/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,17 @@ func (cm *ConfigManager) worker() {
func (cm *ConfigManager) fetch() bool {
cm.logger.Debug("Retrieving new configurations from Kibana")
configs, err := cm.client.Configuration(cm.config.AccessToken, cm.beatUUID, cm.cache.ConfigOK)

if api.IsConfigurationNotFound(err) {
if cm.cache.HasConfig() {
cm.logger.Error("Disabling all running configuration because no configurations were found for this Beat, the endpoint returned a 404 or the beat is not enrolled with central management")
cm.cache.Configs = api.ConfigBlocks{}
}
return true
}

if err != nil {
cm.logger.Errorf("error retriving new configurations, will use cached ones: %s", err)
cm.logger.Errorf("error retrieving new configurations, will use cached ones: %s", err)
return false
}

Expand Down
67 changes: 67 additions & 0 deletions x-pack/libbeat/management/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,70 @@ func TestConfigValidate(t *testing.T) {
})
}
}
func responseText(s string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, s)
}
}

func TestUnEnroll(t *testing.T) {
registry := reload.NewRegistry()
id, err := uuid.NewV4()
if err != nil {
t.Fatalf("error while generating id: %v", err)
}
accessToken := "footoken"
reloadable := reloadable{
reloaded: make(chan *reload.ConfigWithMeta, 1),
}
registry.MustRegister("test.blocks", &reloadable)

mux := http.NewServeMux()
i := 0
responses := []http.HandlerFunc{ // Initial load
responseText(`{"configuration_blocks":[{"type":"test.blocks","config":{"module":"apache2"}}]}`),
http.NotFound,
}
mux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
responses[i](w, r)
i++
}))

server := httptest.NewServer(mux)

c, err := api.ConfigFromURL(server.URL, common.NewConfig())
if err != nil {
t.Fatal(err)
}

config := &Config{
Enabled: true,
Period: 100 * time.Millisecond,
Kibana: c,
AccessToken: accessToken,
}

manager, err := NewConfigManagerWithConfig(config, registry, id)
if err != nil {
t.Fatal(err)
}

manager.Start()

// On first reload we will get apache2 module
config1 := <-reloadable.reloaded
assert.Equal(t, &reload.ConfigWithMeta{
Config: common.MustNewConfigFrom(map[string]interface{}{
"module": "apache2",
}),
}, config1)

// Get a nil config, even if the block is not part of the payload
config2 := <-reloadable.reloaded
var nilConfig *reload.ConfigWithMeta
assert.Equal(t, nilConfig, config2)

// Cleanup
manager.Stop()
os.Remove(paths.Resolve(paths.Data, "management.yml"))
}

0 comments on commit c881c71

Please sign in to comment.