Skip to content

Commit

Permalink
Add leader election for autodiscover (elastic#20281)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrsMark authored Aug 10, 2020
1 parent 7ec32e4 commit 9ab9b97
Show file tree
Hide file tree
Showing 18 changed files with 309 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Added the `max_cached_sessions` option to the script processor. {pull}19562[19562]
- Add support for DNS over TLS for the dns_processor. {pull}19321[19321]
- Set index.max_docvalue_fields_search in index template to increase value to 200 fields. {issue}20215[20215]
- Add leader election for Kubernetes autodiscover. {pull}20281[20281]
- Add capability of enriching process metadata with contianer id also for non-privileged containers in `add_process_metadata` processor. {pull}19767[19767]

*Auditbeat*
Expand Down
6 changes: 6 additions & 0 deletions deploy/kubernetes/metricbeat-kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,12 @@ rules:
- "/metrics"
verbs:
- get
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- '*'
---
apiVersion: v1
kind: ServiceAccount
Expand Down
6 changes: 6 additions & 0 deletions deploy/kubernetes/metricbeat/metricbeat-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,9 @@ rules:
- "/metrics"
verbs:
- get
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- '*'
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ require (
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae
golang.org/x/text v0.3.2
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
golang.org/x/tools v0.0.0-20200701041122-1837592efa10
golang.org/x/tools v0.0.0-20200806022845-90696ccdc692
google.golang.org/api v0.15.0
google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb
google.golang.org/grpc v1.29.1
Expand Down
2 changes: 1 addition & 1 deletion libbeat/autodiscover/autodiscover.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func NewAutodiscover(
// Init providers
var providers []Provider
for _, providerCfg := range config.Providers {
provider, err := Registry.BuildProvider(bus, providerCfg, keystore)
provider, err := Registry.BuildProvider(name, bus, providerCfg, keystore)
if err != nil {
return nil, errors.Wrap(err, "error in autodiscover provider settings")
}
Expand Down
6 changes: 3 additions & 3 deletions libbeat/autodiscover/autodiscover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestAutodiscover(t *testing.T) {
// Register mock autodiscover provider
busChan := make(chan bus.Bus, 1)
Registry = NewRegistry()
Registry.AddProvider("mock", func(b bus.Bus, uuid uuid.UUID, c *common.Config, k keystore.Keystore) (Provider, error) {
Registry.AddProvider("mock", func(beatName string, b bus.Bus, uuid uuid.UUID, c *common.Config, k keystore.Keystore) (Provider, error) {
// intercept bus to mock events
busChan <- b

Expand Down Expand Up @@ -259,7 +259,7 @@ func TestAutodiscoverHash(t *testing.T) {
busChan := make(chan bus.Bus, 1)

Registry = NewRegistry()
Registry.AddProvider("mock", func(b bus.Bus, uuid uuid.UUID, c *common.Config, k keystore.Keystore) (Provider, error) {
Registry.AddProvider("mock", func(beatName string, b bus.Bus, uuid uuid.UUID, c *common.Config, k keystore.Keystore) (Provider, error) {
// intercept bus to mock events
busChan <- b

Expand Down Expand Up @@ -323,7 +323,7 @@ func TestAutodiscoverWithConfigCheckFailures(t *testing.T) {
// Register mock autodiscover provider
busChan := make(chan bus.Bus, 1)
Registry = NewRegistry()
Registry.AddProvider("mock", func(b bus.Bus, uuid uuid.UUID, c *common.Config, k keystore.Keystore) (Provider, error) {
Registry.AddProvider("mock", func(beatName string, b bus.Bus, uuid uuid.UUID, c *common.Config, k keystore.Keystore) (Provider, error) {
// intercept bus to mock events
busChan <- b

Expand Down
6 changes: 3 additions & 3 deletions libbeat/autodiscover/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Provider interface {
}

// ProviderBuilder creates a new provider based on the given config and returns it
type ProviderBuilder func(bus.Bus, uuid.UUID, *common.Config, keystore.Keystore) (Provider, error)
type ProviderBuilder func(string, bus.Bus, uuid.UUID, *common.Config, keystore.Keystore) (Provider, error)

// AddProvider registers a new ProviderBuilder
func (r *registry) AddProvider(name string, provider ProviderBuilder) error {
Expand Down Expand Up @@ -70,7 +70,7 @@ func (r *registry) GetProvider(name string) ProviderBuilder {
}

// BuildProvider reads provider configuration and instantiate one
func (r *registry) BuildProvider(bus bus.Bus, c *common.Config, keystore keystore.Keystore) (Provider, error) {
func (r *registry) BuildProvider(beatName string, bus bus.Bus, c *common.Config, keystore keystore.Keystore) (Provider, error) {
var config ProviderConfig
err := c.Unpack(&config)
if err != nil {
Expand All @@ -87,5 +87,5 @@ func (r *registry) BuildProvider(bus bus.Bus, c *common.Config, keystore keystor
return nil, err
}

return builder(bus, uuid, c, keystore)
return builder(beatName, bus, uuid, c, keystore)
}
8 changes: 7 additions & 1 deletion libbeat/autodiscover/providers/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,13 @@ type Provider struct {
}

// AutodiscoverBuilder builds and returns an autodiscover provider
func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore keystore.Keystore) (autodiscover.Provider, error) {
func AutodiscoverBuilder(
beatName string,
bus bus.Bus,
uuid uuid.UUID,
c *common.Config,
keystore keystore.Keystore,
) (autodiscover.Provider, error) {
logger := logp.NewLogger("docker")

errWrap := func(err error) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestDockerStart(t *testing.T) {
s := &template.MapperSettings{nil, nil}
config.Templates = *s
k, _ := keystore.NewFileKeystore("test")
provider, err := AutodiscoverBuilder(bus, UUID, common.MustNewConfigFrom(config), k)
provider, err := AutodiscoverBuilder("mockBeat", bus, UUID, common.MustNewConfigFrom(config), k)
if err != nil {
t.Fatal(err)
}
Expand Down
8 changes: 7 additions & 1 deletion libbeat/autodiscover/providers/jolokia/jolokia.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,13 @@ type Provider struct {

// AutodiscoverBuilder builds a Jolokia Discovery autodiscover provider, it fails if
// there is some problem with the configuration
func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config, keystore keystore.Keystore) (autodiscover.Provider, error) {
func AutodiscoverBuilder(
beatName string,
bus bus.Bus,
uuid uuid.UUID,
c *common.Config,
keystore keystore.Keystore,
) (autodiscover.Provider, error) {
errWrap := func(err error) error {
return errors.Wrap(err, "error setting up jolokia autodiscover provider")
}
Expand Down
7 changes: 7 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type Config struct {
// Scope can be either node or cluster.
Scope string `config:"scope"`
Resource string `config:"resource"`
// Unique identifies if this provider enables its templates only when it is elected as leader in a k8s cluster
Unique bool `config:"unique"`
LeaderLease string `config:"leader_lease"`

Prefix string `config:"prefix"`
Hints *common.Config `config:"hints"`
Expand All @@ -60,6 +63,7 @@ func defaultConfig() *Config {
Resource: "pod",
CleanupTimeout: 60 * time.Second,
Prefix: "co.elastic",
Unique: false,
}
}

Expand Down Expand Up @@ -98,6 +102,9 @@ func (c *Config) Validate() error {
if c.Scope != "node" && c.Scope != "cluster" {
return fmt.Errorf("invalid `scope` configured. supported values are `node` and `cluster`")
}
if c.Unique && c.Scope != "cluster" {
logp.L().Warnf("can only set `unique` when scope is `cluster`")
}

return nil
}
Loading

0 comments on commit 9ab9b97

Please sign in to comment.