Skip to content

Commit

Permalink
Add kubernetes_leaderelection provider (#24913)
Browse files Browse the repository at this point in the history
(cherry picked from commit 2494ae5)
  • Loading branch information
ChrsMark committed Apr 8, 2021
1 parent b161b5b commit cc26357
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 0 deletions.
32 changes: 32 additions & 0 deletions deploy/kubernetes/elastic-agent-standalone-kubernetes.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
securityContext:
runAsUser: 0
resources:
Expand Down Expand Up @@ -649,6 +653,34 @@ rules:
verbs:
- get
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
namespace: kube-system
name: elastic-agent
subjects:
- kind: ServiceAccount
name: elastic-agent
namespace: kube-system
roleRef:
kind: Role
name: elastic-agent
apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: elastic-agent
namespace: kube-system
labels:
k8s-app: elastic-agent
rules:
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs: ["get", "create", "update"]
---
apiVersion: v1
kind: ServiceAccount
metadata:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
securityContext:
runAsUser: 0
resources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,17 @@ roleRef:
kind: ClusterRole
name: elastic-agent
apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
namespace: kube-system
name: elastic-agent
subjects:
- kind: ServiceAccount
name: elastic-agent
namespace: kube-system
roleRef:
kind: Role
name: elastic-agent
apiGroup: rbac.authorization.k8s.io
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,17 @@ rules:
- "/metrics"
verbs:
- get
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: elastic-agent
namespace: kube-system
labels:
k8s-app: elastic-agent
rules:
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs: ["get", "create", "update"]
1 change: 1 addition & 0 deletions x-pack/elastic-agent/pkg/agent/cmd/include.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/env"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/host"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/kubernetes"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/kubernetesleaderelection"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/kubernetessecrets"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/local"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/localdynamic"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// TODO review the need for this
// +build linux darwin windows

package kubernetesleaderelection

// Config for kubernetes_leaderelection provider
type Config struct {
KubeConfig string `config:"kube_config"`
// Name of the leaderelection lease
LeaderLease string `config:"leader_lease"`
}

// InitDefaults initializes the default values for the config.
func (c *Config) InitDefaults() {
c.LeaderLease = "elastic-agent-cluster-leader"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package kubernetesleaderelection

import (
"context"
"os"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"

"github.com/elastic/beats/v7/libbeat/common/kubernetes"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
corecomp "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

func init() {
composable.Providers.AddContextProvider("kubernetes_leaderelection", ContextProviderBuilder)
}

type contextProvider struct {
logger *logger.Logger
config *Config
comm corecomp.ContextProviderComm
leaderElection *leaderelection.LeaderElectionConfig
cancelLeaderElection context.CancelFunc
}

// ContextProviderBuilder builds the provider.
func ContextProviderBuilder(logger *logger.Logger, c *config.Config) (corecomp.ContextProvider, error) {
var cfg Config
if c == nil {
c = config.New()
}
err := c.Unpack(&cfg)
if err != nil {
return nil, errors.New(err, "failed to unpack configuration")
}
return &contextProvider{logger, &cfg, nil, nil, nil}, nil
}

// Run runs the leaderelection provider.
func (p *contextProvider) Run(comm corecomp.ContextProviderComm) error {
client, err := kubernetes.GetKubernetesClient(p.config.KubeConfig)
if err != nil {
// info only; return nil (do nothing)
p.logger.Debugf("Kubernetes leaderelection provider skipped, unable to connect: %s", err)
return nil
}

agentInfo, err := info.NewAgentInfo()
if err != nil {
return err
}
var id string
podName, found := os.LookupEnv("POD_NAME")
if found {
id = "elastic-agent-leader-" + podName
} else {
id = "elastic-agent-leader-" + agentInfo.AgentID()
}

ns, err := kubernetes.InClusterNamespace()
if err != nil {
ns = "default"
}
lease := metav1.ObjectMeta{
Name: p.config.LeaderLease,
Namespace: ns,
}
metaUID := lease.GetObjectMeta().GetUID()
p.leaderElection = &leaderelection.LeaderElectionConfig{
Lock: &resourcelock.LeaseLock{
LeaseMeta: lease,
Client: client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
},
},
ReleaseOnCancel: true,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
p.logger.Debugf("leader election lock GAINED, id %v", id)
p.startLeading(string(metaUID))
},
OnStoppedLeading: func() {
p.logger.Debugf("leader election lock LOST, id %v", id)
p.stopLeading(string(metaUID))
},
},
}
ctx, cancel := context.WithCancel(context.TODO())
p.cancelLeaderElection = cancel
p.comm = comm
p.startLeaderElector(ctx)

return nil
}

// startLeaderElector starts a Leader Elector in the background with the provided config
func (p *contextProvider) startLeaderElector(ctx context.Context) {
le, err := leaderelection.NewLeaderElector(*p.leaderElection)
if err != nil {
p.logger.Errorf("error while creating Leader Elector: %v", err)
}
p.logger.Debugf("Starting Leader Elector")
go le.Run(ctx)
}

func (p *contextProvider) startLeading(metaUID string) {
mapping := map[string]interface{}{
"leader": true,
}

err := p.comm.Set(mapping)
if err != nil {
p.logger.Errorf("Failed updating leaderelection status to leader TRUE: %s", err)
}
}

func (p *contextProvider) stopLeading(metaUID string) {
mapping := map[string]interface{}{
"leader": false,
}

err := p.comm.Set(mapping)
if err != nil {
p.logger.Errorf("Failed updating leaderelection status to leader FALSE: %s", err)
}
}

// Stop signals the stop channel to force the leader election loop routine to stop.
func (p *contextProvider) Stop() {
if p.cancelLeaderElection != nil {
p.cancelLeaderElection()
}
}

0 comments on commit cc26357

Please sign in to comment.