Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check license x-pack #11296

Merged
merged 17 commits into from
Mar 20, 2019
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- On Google Cloud Engine (GCE) the add_cloud_metadata will now trim the project
info from the cloud.machine.type and cloud.availability_zone. {issue}10968[10968]
- Rename `migration.enabled` config to `migration.6_to_7.enabled`. {pull}11284[11284]
- Beats Xpack now checks for Basic license on connect. {pull}11296[11296]

*Auditbeat*

Expand Down
10 changes: 10 additions & 0 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,16 @@ func NewClient(
}

client.Connection.onConnectCallback = func() error {
globalCallbackRegistry.mutex.Lock()
defer globalCallbackRegistry.mutex.Unlock()

for _, callback := range globalCallbackRegistry.callbacks {
err := callback(client)
if err != nil {
return err
}
}

if onConnect != nil {
onConnect.mutex.Lock()
defer onConnect.mutex.Unlock()
Expand Down
34 changes: 34 additions & 0 deletions libbeat/outputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,31 @@ type callbacksRegistry struct {
// XXX: it would be fantastic to do this without a package global
var connectCallbackRegistry = newCallbacksRegistry()

// NOTE(ph): We need to refactor this, right now this is the only way to ensure that every calls
// to an ES cluster executes a callback.
var globalCallbackRegistry = newCallbacksRegistry()

// RegisterGlobalCallback register a global callbacks.
func RegisterGlobalCallback(callback connectCallback) (uuid.UUID, error) {
globalCallbackRegistry.mutex.Lock()
defer globalCallbackRegistry.mutex.Unlock()

// find the next unique key
var key uuid.UUID
var err error
exists := true
for exists {
key, err = uuid.NewV4()
if err != nil {
return uuid.Nil, err
}
_, exists = globalCallbackRegistry.callbacks[key]
}

globalCallbackRegistry.callbacks[key] = callback
return key, nil
}

func newCallbacksRegistry() callbacksRegistry {
return callbacksRegistry{
callbacks: make(map[uuid.UUID]connectCallback),
Expand Down Expand Up @@ -99,6 +124,15 @@ func DeregisterConnectCallback(key uuid.UUID) {
delete(connectCallbackRegistry.callbacks, key)
}

// DeregisterGlobalCallback deregisters a callback for the elasticsearch output
// specified by its key. If a callback does not exist, nothing happens.
func DeregisterGlobalCallback(key uuid.UUID) {
globalCallbackRegistry.mutex.Lock()
defer globalCallbackRegistry.mutex.Unlock()

delete(globalCallbackRegistry.callbacks, key)
}

func makeES(
im outputs.IndexManager,
beat beat.Info,
Expand Down
25 changes: 25 additions & 0 deletions libbeat/outputs/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,28 @@ func TestConnectCallbacksManagement(t *testing.T) {
t.Fatalf("third callback cannot be retrieved")
}
}

func TestGlobalConnectCallbacksManagement(t *testing.T) {
f0 := func(client *Client) error { fmt.Println("i am function #0"); return nil }
f1 := func(client *Client) error { fmt.Println("i am function #1"); return nil }
f2 := func(client *Client) error { fmt.Println("i am function #2"); return nil }

_, err := RegisterGlobalCallback(f0)
if err != nil {
t.Fatalf("error while registering callback: %v", err)
}
id1, err := RegisterGlobalCallback(f1)
if err != nil {
t.Fatalf("error while registering callback: %v", err)
}
id2, err := RegisterGlobalCallback(f2)
if err != nil {
t.Fatalf("error while registering callback: %v", err)
}

t.Logf("removing second callback")
DeregisterGlobalCallback(id1)
if _, ok := globalCallbackRegistry.callbacks[id2]; !ok {
t.Fatalf("third callback cannot be retrieved")
}
}
6 changes: 3 additions & 3 deletions x-pack/functionbeat/beater/functionbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"github.com/elastic/beats/x-pack/functionbeat/config"
"github.com/elastic/beats/x-pack/functionbeat/core"
_ "github.com/elastic/beats/x-pack/functionbeat/include" // imports features
"github.com/elastic/beats/x-pack/functionbeat/licenser"
"github.com/elastic/beats/x-pack/functionbeat/provider"
"github.com/elastic/beats/x-pack/libbeat/licenser"
)

var (
Expand Down Expand Up @@ -82,7 +82,7 @@ func (bt *Functionbeat) Run(b *beat.Beat) error {
defer manager.Stop()

// Wait until we receive the initial license.
if err := licenser.WaitForLicense(bt.ctx, bt.log, manager, checkLicense); err != nil {
if err := licenser.WaitForLicense(bt.ctx, bt.log, manager, licenser.BasicAndAboveOrTrial); err != nil {
return err
}

Expand Down Expand Up @@ -156,7 +156,7 @@ func makeClientFactory(log *logp.Logger, manager *licenser.Manager, pipeline bea

// Make the client aware of the current license, the client will accept sending events to the
// pipeline until the client is closed or if the license change and is not valid.
licenseAware := core.NewLicenseAwareClient(client, checkLicense)
licenseAware := core.NewLicenseAwareClient(client, licenser.BasicAndAboveOrTrial)
if err := manager.AddWatcher(licenseAware); err != nil {
return nil, err
}
Expand Down
14 changes: 0 additions & 14 deletions x-pack/functionbeat/beater/license.go

This file was deleted.

2 changes: 1 addition & 1 deletion x-pack/functionbeat/core/license_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common/atomic"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/x-pack/functionbeat/licenser"
"github.com/elastic/beats/x-pack/libbeat/licenser"
)

var errInvalidLicense = errors.New("invalid license detected, cannot publish events")
Expand Down
2 changes: 1 addition & 1 deletion x-pack/functionbeat/core/license_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/x-pack/functionbeat/licenser"
"github.com/elastic/beats/x-pack/libbeat/licenser"
)

type dummySyncClient struct{ EventCount int }
Expand Down
5 changes: 5 additions & 0 deletions x-pack/libbeat/cmd/inject.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ package cmd

import (
"github.com/elastic/beats/libbeat/cmd"
"github.com/elastic/beats/libbeat/logp"

// register central management
"github.com/elastic/beats/x-pack/libbeat/licenser"
_ "github.com/elastic/beats/x-pack/libbeat/management"
)

const licenseDebugK = "license"

// AddXPack extends the given root folder with XPack features
func AddXPack(root *cmd.BeatsRootCmd, name string) {
licenser.Enforce(logp.NewLogger(licenseDebugK), licenser.BasicAndAboveOrTrial)
root.AddCommand(genEnrollCmd(name, ""))
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func CheckTrial(log *logp.Logger, license License) bool {
log.Error("Trial license is expired")
return false
}
log.Info("Trial license active")
return true
}
return false
Expand All @@ -27,10 +28,11 @@ func CheckTrial(log *logp.Logger, license License) bool {
// CheckLicenseCover check that the current license cover the requested license.
func CheckLicenseCover(licenseType LicenseType) func(*logp.Logger, License) bool {
return func(log *logp.Logger, license License) bool {
log.Debugf("Checking that license cover %s", licenseType)
log.Debug("Checking that license covers %s", licenseType)
if license.Cover(licenseType) && license.IsActive() {
return true
}
log.Infof("License is active for %s", licenseType)
return false
}
}
Expand All @@ -48,3 +50,8 @@ func Validate(log *logp.Logger, license License, checks ...CheckFunc) bool {
}
return false
}

// BasicAndAboveOrTrial return true if the license is basic or if the license is trial and active.
func BasicAndAboveOrTrial(log *logp.Logger, license License) bool {
return CheckBasic(log, license) || CheckTrial(log, license)
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,18 @@ func (f *ElasticFetcher) Fetch() (*License, error) {
status, body, err := f.client.Request("GET", xPackURL, "", params, nil)
// When we are running an OSS release of elasticsearch the _xpack endpoint will return a 405,
// "Method Not Allowed", so we return the default OSS license.
if status == http.StatusBadRequest {
f.log.Debug("Received 'Bad request' (400) response from server, fallback to OSS license")
return OSSLicense, nil
}

if status == http.StatusMethodNotAllowed {
f.log.Debug("Received 'Method Not allowed' (405) response from server, fallback to OSS license")
return OSSLicense, nil
}

if status == http.StatusUnauthorized {
return nil, errors.New("Unauthorized access, could not connect to the xpack endpoint, verify your credentials")
return nil, errors.New("unauthorized access, could not connect to the xpack endpoint, verify your credentials")
}

if status != http.StatusOK {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func getTestClient() *elasticsearch.Client {
client, err := elasticsearch.NewClient(elasticsearch.ClientSettings{
URL: host,
Index: outil.MakeSelector(),
Username: cli.GetEnvOr("ES_USER", ""),
Password: cli.GetEnvOr("ES_PASS", ""),
Username: "myelastic", // NOTE: I will refactor this in a followup PR
Password: "changeme",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tsg, @andrewkroh I will do a followup on this, I didn't want to waste time on docker-compose environment variable and slow the merge. Since we need to add integration test for this new scenario we can fix it at the same time.

Timeout: 60 * time.Second,
CompressionLevel: 3,
}, nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func newServerClientPair(t *testing.T, handler http.HandlerFunc) (*httptest.Serv
}

func TestParseJSON(t *testing.T) {
t.Run("OSS release of Elasticsearch", func(t *testing.T) {
t.Run("OSS release of Elasticsearch (Code: 405)", func(t *testing.T) {
h := func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Method Not Allowed", 405)
}
Expand All @@ -52,6 +52,23 @@ func TestParseJSON(t *testing.T) {
assert.Equal(t, OSSLicense, oss)
})

t.Run("OSS release of Elasticsearch (Code: 400)", func(t *testing.T) {
h := func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Bad Request", 400)
}
s, c := newServerClientPair(t, h)
defer s.Close()
defer c.Close()

fetcher := NewElasticFetcher(c)
oss, err := fetcher.Fetch()
if assert.NoError(t, err) {
return
}

assert.Equal(t, OSSLicense, oss)
})

t.Run("malformed JSON", func(t *testing.T) {
h := func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("hello bad JSON"))
Expand All @@ -75,7 +92,7 @@ func TestParseJSON(t *testing.T) {

fetcher := NewElasticFetcher(c)
_, err := fetcher.Fetch()
assert.Equal(t, err.Error(), "Unauthorized access, could not connect to the xpack endpoint, verify your credentials")
assert.Equal(t, err.Error(), "unauthorized access, could not connect to the xpack endpoint, verify your credentials")
})

t.Run("any error from the server", func(t *testing.T) {
Expand Down
45 changes: 45 additions & 0 deletions x-pack/libbeat/licenser/es_callback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package licenser

import (
"fmt"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs/elasticsearch"
)

// Enforce setups the corresponding callbacks in libbeat to verify the license on the
// remote elasticsearch cluster.
func Enforce(log *logp.Logger, checks ...CheckFunc) {
cb := func(client *elasticsearch.Client) error {
fetcher := NewElasticFetcher(client)
license, err := fetcher.Fetch()

if err != nil {
return errors.Wrapf(err, "cannot retrieve the elasticsearch license")
}

if license == OSSLicense {
return errors.New("this Elastic licensed beat requires an Elasticsearch server with X-Pack " +
"and a free basic license. Please use the Elasticsearch distribution that includes " +
"X-Pack or download the Apache 2.0 licensed beat distribution that does not include " +
"X-Pack features")
}

if !Validate(log, *license, checks...) {
return fmt.Errorf(
"invalid license found, requires a basic or a valid trial license and received %s",
ph marked this conversation as resolved.
Show resolved Hide resolved
license.Get(),
)
}

return nil
}

elasticsearch.RegisterGlobalCallback(cb)
}
File renamed without changes.