diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b93ea8aec304..481be6931047 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -18,6 +18,13 @@ https://github.com/elastic/beats/compare/v6.6.0...6.x[Check the HEAD diff] - Allow Central Management to send events back to kibana. {issue}9382[9382] - Fix panic if fields settting is used to configure `hosts.x` fields. {issue}10824[10824] {pull}10935[10935] - Introduce query.default_field as part of the template. {pull}11205[11205] +- Initialize the Paths before the keystore and save the keystore into `data/{beatname}.keystore`. {pull}10706[10706] +- Add `cleanup_timeout` option to docker autodiscover, to wait some time before removing configurations after a container is stopped. {issue]10374[10374] {pull}10905[10905] +- On Google Cloud Engine (GCE) the add_cloud_metadata will now trim the project + info from the cloud.machine.type and cloud.availability_zone. {issue}10968[10968] +- Empty `meta.json` file will be treated as a missing meta file. {issue}8558[8558] +- Rename `migration.enabled` config to `migration.6_to_7.enabled`. {pull}11284[11284] +- Beats Xpack now checks for Basic license on connect. {pull}11296[11296] *Auditbeat* diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index b2c8c6516527..6b60740f50ba 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -224,6 +224,16 @@ func NewClient( } client.Connection.onConnectCallback = func() error { + globalCallbackRegistry.mutex.Lock() + defer globalCallbackRegistry.mutex.Unlock() + + for _, callback := range globalCallbackRegistry.callbacks { + err := callback(client) + if err != nil { + return err + } + } + if onConnect != nil { onConnect.mutex.Lock() defer onConnect.mutex.Unlock() @@ -726,7 +736,7 @@ func (conn *Connection) Ping() (string, error) { } debugf("Ping status code: %v", status) - logp.Info("Connected to Elasticsearch version %s", response.Version.Number) + logp.Info("Attempting to connect to Elasticsearch version %s", response.Version.Number) return response.Version.Number, nil } diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index ddc4d925f791..ac0b9b0106dc 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -61,6 +61,31 @@ type callbacksRegistry struct { // XXX: it would be fantastic to do this without a package global var connectCallbackRegistry = newCallbacksRegistry() +// NOTE(ph): We need to refactor this, right now this is the only way to ensure that every calls +// to an ES cluster executes a callback. +var globalCallbackRegistry = newCallbacksRegistry() + +// RegisterGlobalCallback register a global callbacks. +func RegisterGlobalCallback(callback connectCallback) (uuid.UUID, error) { + globalCallbackRegistry.mutex.Lock() + defer globalCallbackRegistry.mutex.Unlock() + + // find the next unique key + var key uuid.UUID + var err error + exists := true + for exists { + key, err = uuid.NewV4() + if err != nil { + return uuid.Nil, err + } + _, exists = globalCallbackRegistry.callbacks[key] + } + + globalCallbackRegistry.callbacks[key] = callback + return key, nil +} + func newCallbacksRegistry() callbacksRegistry { return callbacksRegistry{ callbacks: make(map[uuid.UUID]connectCallback), @@ -99,6 +124,15 @@ func DeregisterConnectCallback(key uuid.UUID) { delete(connectCallbackRegistry.callbacks, key) } +// DeregisterGlobalCallback deregisters a callback for the elasticsearch output +// specified by its key. If a callback does not exist, nothing happens. +func DeregisterGlobalCallback(key uuid.UUID) { + globalCallbackRegistry.mutex.Lock() + defer globalCallbackRegistry.mutex.Unlock() + + delete(globalCallbackRegistry.callbacks, key) +} + func makeES( beat beat.Info, observer outputs.Observer, diff --git a/libbeat/outputs/elasticsearch/elasticsearch_test.go b/libbeat/outputs/elasticsearch/elasticsearch_test.go index db5a4a7d493d..0fd3ae91db30 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch_test.go +++ b/libbeat/outputs/elasticsearch/elasticsearch_test.go @@ -46,3 +46,28 @@ func TestConnectCallbacksManagement(t *testing.T) { t.Fatalf("third callback cannot be retrieved") } } + +func TestGlobalConnectCallbacksManagement(t *testing.T) { + f0 := func(client *Client) error { fmt.Println("i am function #0"); return nil } + f1 := func(client *Client) error { fmt.Println("i am function #1"); return nil } + f2 := func(client *Client) error { fmt.Println("i am function #2"); return nil } + + _, err := RegisterGlobalCallback(f0) + if err != nil { + t.Fatalf("error while registering callback: %v", err) + } + id1, err := RegisterGlobalCallback(f1) + if err != nil { + t.Fatalf("error while registering callback: %v", err) + } + id2, err := RegisterGlobalCallback(f2) + if err != nil { + t.Fatalf("error while registering callback: %v", err) + } + + t.Logf("removing second callback") + DeregisterGlobalCallback(id1) + if _, ok := globalCallbackRegistry.callbacks[id2]; !ok { + t.Fatalf("third callback cannot be retrieved") + } +} diff --git a/x-pack/functionbeat/beater/functionbeat.go b/x-pack/functionbeat/beater/functionbeat.go index 61b61cf03442..4e1944885feb 100644 --- a/x-pack/functionbeat/beater/functionbeat.go +++ b/x-pack/functionbeat/beater/functionbeat.go @@ -20,8 +20,8 @@ import ( "github.com/elastic/beats/x-pack/functionbeat/config" "github.com/elastic/beats/x-pack/functionbeat/core" _ "github.com/elastic/beats/x-pack/functionbeat/include" // imports features - "github.com/elastic/beats/x-pack/functionbeat/licenser" "github.com/elastic/beats/x-pack/functionbeat/provider" + "github.com/elastic/beats/x-pack/libbeat/licenser" ) var ( @@ -82,7 +82,7 @@ func (bt *Functionbeat) Run(b *beat.Beat) error { defer manager.Stop() // Wait until we receive the initial license. - if err := licenser.WaitForLicense(bt.ctx, bt.log, manager, checkLicense); err != nil { + if err := licenser.WaitForLicense(bt.ctx, bt.log, manager, licenser.BasicAndAboveOrTrial); err != nil { return err } @@ -154,7 +154,7 @@ func makeClientFactory(log *logp.Logger, manager *licenser.Manager, pipeline bea // Make the client aware of the current license, the client will accept sending events to the // pipeline until the client is closed or if the license change and is not valid. - licenseAware := core.NewLicenseAwareClient(client, checkLicense) + licenseAware := core.NewLicenseAwareClient(client, licenser.BasicAndAboveOrTrial) if err := manager.AddWatcher(licenseAware); err != nil { return nil, err } diff --git a/x-pack/functionbeat/beater/license.go b/x-pack/functionbeat/beater/license.go deleted file mode 100644 index 1294a9331c42..000000000000 --- a/x-pack/functionbeat/beater/license.go +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package beater - -import ( - "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/x-pack/functionbeat/licenser" -) - -func checkLicense(log *logp.Logger, license licenser.License) bool { - return licenser.CheckBasic(log, license) || licenser.CheckTrial(log, license) -} diff --git a/x-pack/functionbeat/core/license_client.go b/x-pack/functionbeat/core/license_client.go index bcccc2bdf11f..6d499987deb0 100644 --- a/x-pack/functionbeat/core/license_client.go +++ b/x-pack/functionbeat/core/license_client.go @@ -10,7 +10,7 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common/atomic" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/x-pack/functionbeat/licenser" + "github.com/elastic/beats/x-pack/libbeat/licenser" ) var errInvalidLicense = errors.New("invalid license detected, cannot publish events") diff --git a/x-pack/functionbeat/core/license_client_test.go b/x-pack/functionbeat/core/license_client_test.go index e5b90f2d0acb..e4f92001f10f 100644 --- a/x-pack/functionbeat/core/license_client_test.go +++ b/x-pack/functionbeat/core/license_client_test.go @@ -11,7 +11,7 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/x-pack/functionbeat/licenser" + "github.com/elastic/beats/x-pack/libbeat/licenser" ) type dummySyncClient struct{ EventCount int } diff --git a/x-pack/libbeat/cmd/inject.go b/x-pack/libbeat/cmd/inject.go index 715f3d2df5c6..b55427ae4036 100644 --- a/x-pack/libbeat/cmd/inject.go +++ b/x-pack/libbeat/cmd/inject.go @@ -6,12 +6,17 @@ package cmd import ( "github.com/elastic/beats/libbeat/cmd" + "github.com/elastic/beats/libbeat/logp" // register central management + "github.com/elastic/beats/x-pack/libbeat/licenser" _ "github.com/elastic/beats/x-pack/libbeat/management" ) +const licenseDebugK = "license" + // AddXPack extends the given root folder with XPack features func AddXPack(root *cmd.BeatsRootCmd, name string) { + licenser.Enforce(logp.NewLogger(licenseDebugK), licenser.BasicAndAboveOrTrial) root.AddCommand(genEnrollCmd(name, "")) } diff --git a/x-pack/functionbeat/licenser/callback_watcher.go b/x-pack/libbeat/licenser/callback_watcher.go similarity index 100% rename from x-pack/functionbeat/licenser/callback_watcher.go rename to x-pack/libbeat/licenser/callback_watcher.go diff --git a/x-pack/functionbeat/licenser/callback_watcher_test.go b/x-pack/libbeat/licenser/callback_watcher_test.go similarity index 100% rename from x-pack/functionbeat/licenser/callback_watcher_test.go rename to x-pack/libbeat/licenser/callback_watcher_test.go diff --git a/x-pack/functionbeat/licenser/check.go b/x-pack/libbeat/licenser/check.go similarity index 79% rename from x-pack/functionbeat/licenser/check.go rename to x-pack/libbeat/licenser/check.go index 422ba88c85bf..b41383f27e1f 100644 --- a/x-pack/functionbeat/licenser/check.go +++ b/x-pack/libbeat/licenser/check.go @@ -19,6 +19,7 @@ func CheckTrial(log *logp.Logger, license License) bool { log.Error("Trial license is expired") return false } + log.Info("Trial license active") return true } return false @@ -27,10 +28,11 @@ func CheckTrial(log *logp.Logger, license License) bool { // CheckLicenseCover check that the current license cover the requested license. func CheckLicenseCover(licenseType LicenseType) func(*logp.Logger, License) bool { return func(log *logp.Logger, license License) bool { - log.Debugf("Checking that license cover %s", licenseType) + log.Debug("Checking that license covers %s", licenseType) if license.Cover(licenseType) && license.IsActive() { return true } + log.Infof("License is active for %s", licenseType) return false } } @@ -48,3 +50,8 @@ func Validate(log *logp.Logger, license License, checks ...CheckFunc) bool { } return false } + +// BasicAndAboveOrTrial return true if the license is basic or if the license is trial and active. +func BasicAndAboveOrTrial(log *logp.Logger, license License) bool { + return CheckBasic(log, license) || CheckTrial(log, license) +} diff --git a/x-pack/functionbeat/licenser/check_test.go b/x-pack/libbeat/licenser/check_test.go similarity index 100% rename from x-pack/functionbeat/licenser/check_test.go rename to x-pack/libbeat/licenser/check_test.go diff --git a/x-pack/functionbeat/licenser/elastic_fetcher.go b/x-pack/libbeat/licenser/elastic_fetcher.go similarity index 96% rename from x-pack/functionbeat/licenser/elastic_fetcher.go rename to x-pack/libbeat/licenser/elastic_fetcher.go index 7b8d05fd6255..854591ccb083 100644 --- a/x-pack/functionbeat/licenser/elastic_fetcher.go +++ b/x-pack/libbeat/licenser/elastic_fetcher.go @@ -114,13 +114,18 @@ func (f *ElasticFetcher) Fetch() (*License, error) { status, body, err := f.client.Request("GET", xPackURL, "", params, nil) // When we are running an OSS release of elasticsearch the _xpack endpoint will return a 405, // "Method Not Allowed", so we return the default OSS license. + if status == http.StatusBadRequest { + f.log.Debug("Received 'Bad request' (400) response from server, fallback to OSS license") + return OSSLicense, nil + } + if status == http.StatusMethodNotAllowed { f.log.Debug("Received 'Method Not allowed' (405) response from server, fallback to OSS license") return OSSLicense, nil } if status == http.StatusUnauthorized { - return nil, errors.New("Unauthorized access, could not connect to the xpack endpoint, verify your credentials") + return nil, errors.New("unauthorized access, could not connect to the xpack endpoint, verify your credentials") } if status != http.StatusOK { diff --git a/x-pack/functionbeat/licenser/elastic_fetcher_integration_test.go b/x-pack/libbeat/licenser/elastic_fetcher_integration_test.go similarity index 93% rename from x-pack/functionbeat/licenser/elastic_fetcher_integration_test.go rename to x-pack/libbeat/licenser/elastic_fetcher_integration_test.go index b8def938cc30..73e978361bd6 100644 --- a/x-pack/functionbeat/licenser/elastic_fetcher_integration_test.go +++ b/x-pack/libbeat/licenser/elastic_fetcher_integration_test.go @@ -27,8 +27,8 @@ func getTestClient() *elasticsearch.Client { client, err := elasticsearch.NewClient(elasticsearch.ClientSettings{ URL: host, Index: outil.MakeSelector(), - Username: cli.GetEnvOr("ES_USER", ""), - Password: cli.GetEnvOr("ES_PASS", ""), + Username: "myelastic", // NOTE: I will refactor this in a followup PR + Password: "changeme", Timeout: 60 * time.Second, CompressionLevel: 3, }, nil) diff --git a/x-pack/functionbeat/licenser/elastic_fetcher_test.go b/x-pack/libbeat/licenser/elastic_fetcher_test.go similarity index 89% rename from x-pack/functionbeat/licenser/elastic_fetcher_test.go rename to x-pack/libbeat/licenser/elastic_fetcher_test.go index 4c71008ce299..708ae1d833fe 100644 --- a/x-pack/functionbeat/licenser/elastic_fetcher_test.go +++ b/x-pack/libbeat/licenser/elastic_fetcher_test.go @@ -35,7 +35,7 @@ func newServerClientPair(t *testing.T, handler http.HandlerFunc) (*httptest.Serv } func TestParseJSON(t *testing.T) { - t.Run("OSS release of Elasticsearch", func(t *testing.T) { + t.Run("OSS release of Elasticsearch (Code: 405)", func(t *testing.T) { h := func(w http.ResponseWriter, r *http.Request) { http.Error(w, "Method Not Allowed", 405) } @@ -52,6 +52,23 @@ func TestParseJSON(t *testing.T) { assert.Equal(t, OSSLicense, oss) }) + t.Run("OSS release of Elasticsearch (Code: 400)", func(t *testing.T) { + h := func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "Bad Request", 400) + } + s, c := newServerClientPair(t, h) + defer s.Close() + defer c.Close() + + fetcher := NewElasticFetcher(c) + oss, err := fetcher.Fetch() + if assert.NoError(t, err) { + return + } + + assert.Equal(t, OSSLicense, oss) + }) + t.Run("malformed JSON", func(t *testing.T) { h := func(w http.ResponseWriter, r *http.Request) { w.Write([]byte("hello bad JSON")) @@ -75,7 +92,7 @@ func TestParseJSON(t *testing.T) { fetcher := NewElasticFetcher(c) _, err := fetcher.Fetch() - assert.Equal(t, err.Error(), "Unauthorized access, could not connect to the xpack endpoint, verify your credentials") + assert.Equal(t, err.Error(), "unauthorized access, could not connect to the xpack endpoint, verify your credentials") }) t.Run("any error from the server", func(t *testing.T) { diff --git a/x-pack/libbeat/licenser/es_callback.go b/x-pack/libbeat/licenser/es_callback.go new file mode 100644 index 000000000000..f854bfd9d85e --- /dev/null +++ b/x-pack/libbeat/licenser/es_callback.go @@ -0,0 +1,44 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package licenser + +import ( + "fmt" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/outputs/elasticsearch" +) + +// Enforce setups the corresponding callbacks in libbeat to verify the license on the +// remote elasticsearch cluster. +func Enforce(log *logp.Logger, checks ...CheckFunc) { + cb := func(client *elasticsearch.Client) error { + fetcher := NewElasticFetcher(client) + license, err := fetcher.Fetch() + + if err != nil { + return errors.Wrapf(err, "cannot retrieve the elasticsearch license") + } + + if license == OSSLicense { + return errors.New("This Beat requires the default distribution of Elasticsearch. Please " + + "upgrade to the default distribution of Elasticsearch from elastic.co, or downgrade to " + + "the oss-only distribution of beats") + } + + if !Validate(log, *license, checks...) { + return fmt.Errorf( + "invalid license found, requires a basic or a valid trial license and received %s", + license.Get(), + ) + } + + return nil + } + + elasticsearch.RegisterGlobalCallback(cb) +} diff --git a/x-pack/functionbeat/licenser/license.go b/x-pack/libbeat/licenser/license.go similarity index 100% rename from x-pack/functionbeat/licenser/license.go rename to x-pack/libbeat/licenser/license.go diff --git a/x-pack/functionbeat/licenser/license_test.go b/x-pack/libbeat/licenser/license_test.go similarity index 100% rename from x-pack/functionbeat/licenser/license_test.go rename to x-pack/libbeat/licenser/license_test.go diff --git a/x-pack/functionbeat/licenser/licensetype_string.go b/x-pack/libbeat/licenser/licensetype_string.go similarity index 100% rename from x-pack/functionbeat/licenser/licensetype_string.go rename to x-pack/libbeat/licenser/licensetype_string.go diff --git a/x-pack/functionbeat/licenser/manager.go b/x-pack/libbeat/licenser/manager.go similarity index 100% rename from x-pack/functionbeat/licenser/manager.go rename to x-pack/libbeat/licenser/manager.go diff --git a/x-pack/functionbeat/licenser/manager_test.go b/x-pack/libbeat/licenser/manager_test.go similarity index 100% rename from x-pack/functionbeat/licenser/manager_test.go rename to x-pack/libbeat/licenser/manager_test.go diff --git a/x-pack/functionbeat/licenser/state_string.go b/x-pack/libbeat/licenser/state_string.go similarity index 100% rename from x-pack/functionbeat/licenser/state_string.go rename to x-pack/libbeat/licenser/state_string.go diff --git a/x-pack/functionbeat/licenser/testdata/x-pack-trial-6.4.0.json b/x-pack/libbeat/licenser/testdata/x-pack-trial-6.4.0.json similarity index 100% rename from x-pack/functionbeat/licenser/testdata/x-pack-trial-6.4.0.json rename to x-pack/libbeat/licenser/testdata/x-pack-trial-6.4.0.json diff --git a/x-pack/functionbeat/licenser/testdata/xpack-6.4.0.json b/x-pack/libbeat/licenser/testdata/xpack-6.4.0.json similarity index 100% rename from x-pack/functionbeat/licenser/testdata/xpack-6.4.0.json rename to x-pack/libbeat/licenser/testdata/xpack-6.4.0.json diff --git a/x-pack/functionbeat/licenser/types.go b/x-pack/libbeat/licenser/types.go similarity index 100% rename from x-pack/functionbeat/licenser/types.go rename to x-pack/libbeat/licenser/types.go