Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kubernetes_leaderelection provider #24913

Merged
merged 11 commits into from
Apr 8, 2021
10 changes: 10 additions & 0 deletions deploy/kubernetes/elastic-agent-standalone-kubernetes.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
securityContext:
runAsUser: 0
resources:
Expand Down Expand Up @@ -650,6 +654,12 @@ rules:
- "/metrics"
verbs:
- get
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- '*'
---
apiVersion: v1
kind: ServiceAccount
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
securityContext:
runAsUser: 0
resources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,9 @@ rules:
- "/metrics"
verbs:
- get
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- '*'
jsoriano marked this conversation as resolved.
Show resolved Hide resolved
jsoriano marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions x-pack/elastic-agent/pkg/agent/cmd/include.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/env"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/host"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/kubernetes"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/kubernetesleaderelection"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/kubernetessecrets"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/local"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/localdynamic"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// TODO review the need for this
// +build linux darwin windows

package kubernetesleaderelection

// Config for kubernetes_leaderelection provider
type Config struct {
KubeConfig string `config:"kube_config"`
// Name of the leaderelection lease
LeaderLease string `config:"leader_lease"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So by default running Elastic Agent this is off. Is that the behavior we want? Being that will require a specific setting in the providers top-level key, and not something that could then be used by Fleet at the moment.

I think that we should have this enabled by default, by having it have a default value.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, I will add it.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package kubernetesleaderelection

import (
"context"
"os"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"

"github.com/elastic/beats/v7/libbeat/common/kubernetes"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

func init() {
composable.Providers.AddDynamicProvider("kubernetes_leaderelection", DynamicProviderBuilder)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I question if this should be a dynamic provider. I think this should be a context provider. That way leader election can effect all the other vars even dynamic ones discovered by the main kubernetes provider.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏼 ok I can change it

}

type dynamicProvider struct {
logger *logger.Logger
config *Config
comm composable.DynamicProviderComm
leaderElection *leaderelection.LeaderElectionConfig
cancelLeaderElection context.CancelFunc
}

// DynamicProviderBuilder builds the dynamic provider.
func DynamicProviderBuilder(logger *logger.Logger, c *config.Config) (composable.DynamicProvider, error) {
var cfg Config
if c == nil {
c = config.New()
}
err := c.Unpack(&cfg)
if err != nil {
return nil, errors.New(err, "failed to unpack configuration")
}
return &dynamicProvider{logger, &cfg, nil, nil, nil}, nil
}

// Run runs the leaderelection dynamic provider.
func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
client, err := kubernetes.GetKubernetesClient(p.config.KubeConfig)
if err != nil {
// info only; return nil (do nothing)
p.logger.Debugf("Kubernetes leaderelection provider skipped, unable to connect: %s", err)
return nil
}
if p.config.LeaderLease == "" {
p.logger.Debugf("Kubernetes leaderelection provider skipped, unable to define leader lease")
return nil
}

agentInfo, err := info.NewAgentInfo()
if err != nil {
return err
}
var id string
podName, found := os.LookupEnv("POD_NAME")
if found {
id = "elastic-agent-leader-" + podName
} else {
id = "elastic-agent-leader-" + agentInfo.AgentID()
}

ns, err := kubernetes.InClusterNamespace()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there also be a configuration value for this? Might want to run it on a different namespace?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The convention here is that we will create the lease object under the same namespace with one where Agent is running. We also add role so as Agent to have access only to leases under the same namespace, see #24913 (comment)

So I think we should keep it as is and not expose this as an option to the user.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

if err != nil {
ns = "default"
}
lease := metav1.ObjectMeta{
Name: p.config.LeaderLease,
Namespace: ns,
}
metaUID := lease.GetObjectMeta().GetUID()
p.leaderElection = &leaderelection.LeaderElectionConfig{
Lock: &resourcelock.LeaseLock{
LeaseMeta: lease,
Client: client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
},
},
ReleaseOnCancel: true,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
p.logger.Debugf("leader election lock GAINED, id %v", id)
p.startLeading(string(metaUID))
},
OnStoppedLeading: func() {
p.logger.Debugf("leader election lock LOST, id %v", id)
p.stopLeading(string(metaUID))
},
},
}
ctx, cancel := context.WithCancel(context.TODO())
p.cancelLeaderElection = cancel
p.comm = comm
p.startLeaderElector(ctx)

return nil
}

// startLeaderElector starts a Leader Elector in the background with the provided config
func (p *dynamicProvider) startLeaderElector(ctx context.Context) {
le, err := leaderelection.NewLeaderElector(*p.leaderElection)
if err != nil {
p.logger.Errorf("error while creating Leader Elector: %v", err)
}
p.logger.Debugf("Starting Leader Elector")
go le.Run(ctx)
}

func (p *dynamicProvider) startLeading(metaUID string) {
mapping := map[string]interface{}{
"leader": true,
}

processors := []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"fields": mapping,
"target": "kubernetes_leaderelection",
},
},
}
jsoriano marked this conversation as resolved.
Show resolved Hide resolved

p.comm.AddOrUpdate(metaUID, 0, mapping, processors)
}

func (p *dynamicProvider) stopLeading(metaUID string) {
mapping := map[string]interface{}{
"leader": false,
}

processors := []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"fields": mapping,
"target": "kubernetes_leaderelection",
},
},
}

p.comm.AddOrUpdate(metaUID, 0, mapping, processors)
}

// Stop signals the stop channel to force the leader election loop routine to stop.
func (p *dynamicProvider) Stop() {
if p.cancelLeaderElection != nil {
p.cancelLeaderElection()
}
}