From cc26357b9ae1fead2581828199167e59c744883b Mon Sep 17 00:00:00 2001 From: Chris Mark Date: Thu, 8 Apr 2021 11:14:33 +0300 Subject: [PATCH] Add kubernetes_leaderelection provider (#24913) (cherry picked from commit 2494ae509b1036d39bd1bbb4ec751c50347c9aff) --- .../elastic-agent-standalone-kubernetes.yml | 32 ++++ .../elastic-agent-standalone-daemonset.yaml | 4 + ...elastic-agent-standalone-role-binding.yaml | 14 ++ .../elastic-agent-standalone-role.yaml | 14 ++ x-pack/elastic-agent/pkg/agent/cmd/include.go | 1 + .../kubernetesleaderelection/config.go | 20 +++ .../kubernetes_leaderelection.go | 148 ++++++++++++++++++ 7 files changed, 233 insertions(+) create mode 100644 x-pack/elastic-agent/pkg/composable/providers/kubernetesleaderelection/config.go create mode 100644 x-pack/elastic-agent/pkg/composable/providers/kubernetesleaderelection/kubernetes_leaderelection.go diff --git a/deploy/kubernetes/elastic-agent-standalone-kubernetes.yml b/deploy/kubernetes/elastic-agent-standalone-kubernetes.yml index b92b0cb5c586..21d468a34b2e 100644 --- a/deploy/kubernetes/elastic-agent-standalone-kubernetes.yml +++ b/deploy/kubernetes/elastic-agent-standalone-kubernetes.yml @@ -35,6 +35,10 @@ spec: valueFrom: fieldRef: fieldPath: spec.nodeName + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name securityContext: runAsUser: 0 resources: @@ -649,6 +653,34 @@ rules: verbs: - get --- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + namespace: kube-system + name: elastic-agent +subjects: + - kind: ServiceAccount + name: elastic-agent + namespace: kube-system +roleRef: + kind: Role + name: elastic-agent + apiGroup: rbac.authorization.k8s.io +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: elastic-agent + namespace: kube-system + labels: + k8s-app: elastic-agent +rules: + - apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: ["get", "create", "update"] +--- apiVersion: v1 kind: ServiceAccount metadata: diff --git a/deploy/kubernetes/elastic-agent-standalone/elastic-agent-standalone-daemonset.yaml b/deploy/kubernetes/elastic-agent-standalone/elastic-agent-standalone-daemonset.yaml index e97e07439263..f23e2cb1534b 100644 --- a/deploy/kubernetes/elastic-agent-standalone/elastic-agent-standalone-daemonset.yaml +++ b/deploy/kubernetes/elastic-agent-standalone/elastic-agent-standalone-daemonset.yaml @@ -38,6 +38,10 @@ spec: valueFrom: fieldRef: fieldPath: spec.nodeName + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name securityContext: runAsUser: 0 resources: diff --git a/deploy/kubernetes/elastic-agent-standalone/elastic-agent-standalone-role-binding.yaml b/deploy/kubernetes/elastic-agent-standalone/elastic-agent-standalone-role-binding.yaml index b352b2901d0d..f053b2463371 100644 --- a/deploy/kubernetes/elastic-agent-standalone/elastic-agent-standalone-role-binding.yaml +++ b/deploy/kubernetes/elastic-agent-standalone/elastic-agent-standalone-role-binding.yaml @@ -10,3 +10,17 @@ roleRef: kind: ClusterRole name: elastic-agent apiGroup: rbac.authorization.k8s.io +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + namespace: kube-system + name: elastic-agent +subjects: + - kind: ServiceAccount + name: elastic-agent + namespace: kube-system +roleRef: + kind: Role + name: elastic-agent + apiGroup: rbac.authorization.k8s.io diff --git a/deploy/kubernetes/elastic-agent-standalone/elastic-agent-standalone-role.yaml b/deploy/kubernetes/elastic-agent-standalone/elastic-agent-standalone-role.yaml index 13b3554b83cc..11089d0cc7af 100644 --- a/deploy/kubernetes/elastic-agent-standalone/elastic-agent-standalone-role.yaml +++ b/deploy/kubernetes/elastic-agent-standalone/elastic-agent-standalone-role.yaml @@ -38,3 +38,17 @@ rules: - "/metrics" verbs: - get +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: elastic-agent + namespace: kube-system + labels: + k8s-app: elastic-agent +rules: + - apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: ["get", "create", "update"] diff --git a/x-pack/elastic-agent/pkg/agent/cmd/include.go b/x-pack/elastic-agent/pkg/agent/cmd/include.go index 5bc763c6df06..f28b5cb11b50 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/include.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/include.go @@ -11,6 +11,7 @@ import ( _ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/env" _ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/host" _ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/kubernetes" + _ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/kubernetesleaderelection" _ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/kubernetessecrets" _ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/local" _ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/localdynamic" diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetesleaderelection/config.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetesleaderelection/config.go new file mode 100644 index 000000000000..a7f71cc32b5d --- /dev/null +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetesleaderelection/config.go @@ -0,0 +1,20 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// TODO review the need for this +// +build linux darwin windows + +package kubernetesleaderelection + +// Config for kubernetes_leaderelection provider +type Config struct { + KubeConfig string `config:"kube_config"` + // Name of the leaderelection lease + LeaderLease string `config:"leader_lease"` +} + +// InitDefaults initializes the default values for the config. +func (c *Config) InitDefaults() { + c.LeaderLease = "elastic-agent-cluster-leader" +} diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetesleaderelection/kubernetes_leaderelection.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetesleaderelection/kubernetes_leaderelection.go new file mode 100644 index 000000000000..acb5e732b824 --- /dev/null +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetesleaderelection/kubernetes_leaderelection.go @@ -0,0 +1,148 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package kubernetesleaderelection + +import ( + "context" + "os" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + + "github.com/elastic/beats/v7/libbeat/common/kubernetes" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" + corecomp "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/composable" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" +) + +func init() { + composable.Providers.AddContextProvider("kubernetes_leaderelection", ContextProviderBuilder) +} + +type contextProvider struct { + logger *logger.Logger + config *Config + comm corecomp.ContextProviderComm + leaderElection *leaderelection.LeaderElectionConfig + cancelLeaderElection context.CancelFunc +} + +// ContextProviderBuilder builds the provider. +func ContextProviderBuilder(logger *logger.Logger, c *config.Config) (corecomp.ContextProvider, error) { + var cfg Config + if c == nil { + c = config.New() + } + err := c.Unpack(&cfg) + if err != nil { + return nil, errors.New(err, "failed to unpack configuration") + } + return &contextProvider{logger, &cfg, nil, nil, nil}, nil +} + +// Run runs the leaderelection provider. +func (p *contextProvider) Run(comm corecomp.ContextProviderComm) error { + client, err := kubernetes.GetKubernetesClient(p.config.KubeConfig) + if err != nil { + // info only; return nil (do nothing) + p.logger.Debugf("Kubernetes leaderelection provider skipped, unable to connect: %s", err) + return nil + } + + agentInfo, err := info.NewAgentInfo() + if err != nil { + return err + } + var id string + podName, found := os.LookupEnv("POD_NAME") + if found { + id = "elastic-agent-leader-" + podName + } else { + id = "elastic-agent-leader-" + agentInfo.AgentID() + } + + ns, err := kubernetes.InClusterNamespace() + if err != nil { + ns = "default" + } + lease := metav1.ObjectMeta{ + Name: p.config.LeaderLease, + Namespace: ns, + } + metaUID := lease.GetObjectMeta().GetUID() + p.leaderElection = &leaderelection.LeaderElectionConfig{ + Lock: &resourcelock.LeaseLock{ + LeaseMeta: lease, + Client: client.CoordinationV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: id, + }, + }, + ReleaseOnCancel: true, + LeaseDuration: 15 * time.Second, + RenewDeadline: 10 * time.Second, + RetryPeriod: 2 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + p.logger.Debugf("leader election lock GAINED, id %v", id) + p.startLeading(string(metaUID)) + }, + OnStoppedLeading: func() { + p.logger.Debugf("leader election lock LOST, id %v", id) + p.stopLeading(string(metaUID)) + }, + }, + } + ctx, cancel := context.WithCancel(context.TODO()) + p.cancelLeaderElection = cancel + p.comm = comm + p.startLeaderElector(ctx) + + return nil +} + +// startLeaderElector starts a Leader Elector in the background with the provided config +func (p *contextProvider) startLeaderElector(ctx context.Context) { + le, err := leaderelection.NewLeaderElector(*p.leaderElection) + if err != nil { + p.logger.Errorf("error while creating Leader Elector: %v", err) + } + p.logger.Debugf("Starting Leader Elector") + go le.Run(ctx) +} + +func (p *contextProvider) startLeading(metaUID string) { + mapping := map[string]interface{}{ + "leader": true, + } + + err := p.comm.Set(mapping) + if err != nil { + p.logger.Errorf("Failed updating leaderelection status to leader TRUE: %s", err) + } +} + +func (p *contextProvider) stopLeading(metaUID string) { + mapping := map[string]interface{}{ + "leader": false, + } + + err := p.comm.Set(mapping) + if err != nil { + p.logger.Errorf("Failed updating leaderelection status to leader FALSE: %s", err) + } +} + +// Stop signals the stop channel to force the leader election loop routine to stop. +func (p *contextProvider) Stop() { + if p.cancelLeaderElection != nil { + p.cancelLeaderElection() + } +}