Skip to content

Commit

Permalink
[receiver/awscontainerinsightreceiver] Add option to only use config …
Browse files Browse the repository at this point in the history
…maps as EKS leader election lock resource
  • Loading branch information
sky333999 committed Apr 28, 2023
1 parent 228dbf9 commit 201c8f5
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 17 deletions.
6 changes: 6 additions & 0 deletions receiver/awscontainerinsightreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,10 @@ type Config struct {
// election process for EKS Container Insights. The elected leader is responsible for scraping cluster level metrics.
// The default value is "otel-container-insight-clusterleader".
LeaderLockName string `mapstructure:"leader_lock_name"`

// LeaderLockUsingConfigMapOnly is an optional attribute to override the default behavior when obtaining a locking resource to be used during the leader
// election process for EKS Container Insights. By default, the leader election logic prefers a Lease and alternatively the combination of Lease & ConfigMap.
// When this flag is set to true, the leader election logic will be forced to use ConfigMap only. This flag mainly exists for backwards compatibility.
// The default value is false.
LeaderLockUsingConfigMapOnly bool `mapstructure:"leader_lock_using_config_map_only"`
}
11 changes: 11 additions & 0 deletions receiver/awscontainerinsightreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@ func TestLoadConfig(t *testing.T) {
LeaderLockName: "override-container-insight-clusterleader",
},
},
{
id: component.NewIDWithName(typeStr, "leader_lock_using_config_map_only"),
expected: &Config{
CollectionInterval: 60 * time.Second,
ContainerOrchestrator: "eks",
TagService: true,
PrefFullPodName: false,
LeaderLockName: "otel-container-insight-clusterleader",
LeaderLockUsingConfigMapOnly: true,
},
},
}

for _, tt := range tests {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package k8sapiserver

import (
"context"
"encoding/json"
"errors"
"fmt"
"k8s.io/client-go/tools/leaderelection/resourcelock"

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
)

type ConfigMapLock struct {
// ConfigMapMeta should contain a Name and a Namespace of a
// ConfigMapMeta object that the LeaderElector will attempt to lead.
ConfigMapMeta metav1.ObjectMeta
Client corev1client.ConfigMapsGetter
LockConfig resourcelock.ResourceLockConfig
cm *v1.ConfigMap
}

// Get returns the election record from a ConfigMap Annotation
func (cml *ConfigMapLock) Get(ctx context.Context) (*resourcelock.LeaderElectionRecord, []byte, error) {
var record resourcelock.LeaderElectionRecord
var err error
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Get(ctx, cml.ConfigMapMeta.Name, metav1.GetOptions{})
if err != nil {
return nil, nil, err
}
if cml.cm.Annotations == nil {
cml.cm.Annotations = make(map[string]string)
}
recordStr, found := cml.cm.Annotations[resourcelock.LeaderElectionRecordAnnotationKey]
recordBytes := []byte(recordStr)
if found {
if err := json.Unmarshal(recordBytes, &record); err != nil {
return nil, nil, err
}
}
return &record, recordBytes, nil
}

// Create attempts to create a LeaderElectionRecord annotation
func (cml *ConfigMapLock) Create(ctx context.Context, ler resourcelock.LeaderElectionRecord) error {
recordBytes, err := json.Marshal(ler)
if err != nil {
return err
}
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Create(ctx, &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: cml.ConfigMapMeta.Name,
Namespace: cml.ConfigMapMeta.Namespace,
Annotations: map[string]string{
resourcelock.LeaderElectionRecordAnnotationKey: string(recordBytes),
},
},
}, metav1.CreateOptions{})
return err
}

// Update will update an existing annotation on a given resource.
func (cml *ConfigMapLock) Update(ctx context.Context, ler resourcelock.LeaderElectionRecord) error {
if cml.cm == nil {
return errors.New("configmap not initialized, call get or create first")
}
recordBytes, err := json.Marshal(ler)
if err != nil {
return err
}
if cml.cm.Annotations == nil {
cml.cm.Annotations = make(map[string]string)
}
cml.cm.Annotations[resourcelock.LeaderElectionRecordAnnotationKey] = string(recordBytes)
cm, err := cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Update(ctx, cml.cm, metav1.UpdateOptions{})
if err != nil {
return err
}
cml.cm = cm
return nil
}

// RecordEvent in leader election while adding meta-data
func (cml *ConfigMapLock) RecordEvent(s string) {
if cml.LockConfig.EventRecorder == nil {
return
}
events := fmt.Sprintf("%v %v", cml.LockConfig.Identity, s)
subject := &v1.ConfigMap{ObjectMeta: cml.cm.ObjectMeta}
// Populate the type meta, so we don't have to get it from the schema
subject.Kind = "ConfigMap"
subject.APIVersion = v1.SchemeGroupVersion.String()
cml.LockConfig.EventRecorder.Eventf(subject, v1.EventTypeNormal, "LeaderElection", events)
}

// Describe is used to convert details on current resource lock
// into a string
func (cml *ConfigMapLock) Describe() string {
return fmt.Sprintf("%v/%v", cml.ConfigMapMeta.Namespace, cml.ConfigMapMeta.Name)
}

// Identity returns the Identity of the lock
func (cml *ConfigMapLock) Identity() string {
return cml.LockConfig.Identity
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ type K8sAPIServer struct {
clusterNameProvider clusterNameProvider
cancel context.CancelFunc

mu sync.Mutex
leading bool
leaderLockName string
mu sync.Mutex
leading bool
leaderLockName string
leaderLockUsingConfigMapOnly bool

k8sClient K8sClient // *k8sclient.K8sClient
epClient k8sclient.EpClient
Expand All @@ -97,6 +98,12 @@ func WithLeaderLockName(name string) Option {
}
}

func WithLeaderLockUsingConfigMapOnly(leaderLockUsingConfigMapOnly bool) Option {
return func(server *K8sAPIServer) {
server.leaderLockUsingConfigMapOnly = leaderLockUsingConfigMapOnly
}
}

// New creates a k8sApiServer which can generate cluster-level metrics
func New(clusterNameProvider clusterNameProvider, logger *zap.Logger, options ...Option) (*K8sAPIServer, error) {
k := &K8sAPIServer{
Expand Down Expand Up @@ -218,6 +225,11 @@ func (k *K8sAPIServer) init() error {
return errors.New("environment variable K8S_NAMESPACE is not set in k8s deployment config")
}

resourceLockConfig := resourcelock.ResourceLockConfig{
Identity: k.nodeName,
EventRecorder: k.createRecorder(k.leaderLockName, lockNamespace),
}

clientSet := k.k8sClient.GetClientSet()
configMapInterface := clientSet.CoreV1().ConfigMaps(lockNamespace)
if configMap, err := configMapInterface.Get(ctx, k.leaderLockName, metav1.GetOptions{}); configMap == nil || err != nil {
Expand All @@ -232,18 +244,29 @@ func (k *K8sAPIServer) init() error {
k.logger.Info(fmt.Sprintf("configMap: %v, err: %v", configMap, err))
}

lock, err := resourcelock.New(
resourcelock.ConfigMapsLeasesResourceLock,
lockNamespace, k.leaderLockName,
clientSet.CoreV1(),
clientSet.CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: k.nodeName,
EventRecorder: k.createRecorder(k.leaderLockName, lockNamespace),
})
if err != nil {
k.logger.Warn("Failed to create resource lock", zap.Error(err))
return err
var lock resourcelock.Interface
if k.leaderLockUsingConfigMapOnly {
lock = &ConfigMapLock{
ConfigMapMeta: metav1.ObjectMeta{
Namespace: lockNamespace,
Name: k.leaderLockName,
},
Client: clientSet.CoreV1(),
LockConfig: resourceLockConfig,
}
} else {
l, err := resourcelock.New(
resourcelock.ConfigMapsLeasesResourceLock,
lockNamespace, k.leaderLockName,
clientSet.CoreV1(),
clientSet.CoordinationV1(),
resourceLockConfig)
if err != nil {
k.logger.Warn("Failed to create resource lock", zap.Error(err))
return err
} else {
lock = l
}
}

go k.startLeaderElection(ctx, lock)
Expand Down
3 changes: 2 additions & 1 deletion receiver/awscontainerinsightreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ func (acir *awsContainerInsightReceiver) Start(ctx context.Context, host compone
if err != nil {
return err
}
acir.k8sapiserver, err = k8sapiserver.New(hostinfo, acir.settings.Logger, k8sapiserver.WithLeaderLockName(acir.config.LeaderLockName))
acir.k8sapiserver, err = k8sapiserver.New(hostinfo, acir.settings.Logger, k8sapiserver.WithLeaderLockName(acir.config.LeaderLockName),
k8sapiserver.WithLeaderLockUsingConfigMapOnly(acir.config.LeaderLockUsingConfigMapOnly))
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion receiver/awscontainerinsightreceiver/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ awscontainerinsightreceiver/collection_interval_settings:
awscontainerinsightreceiver/cluster_name:
cluster_name: override_cluster
awscontainerinsightreceiver/leader_lock_name:
leader_lock_name: override-container-insight-clusterleader
leader_lock_name: override-container-insight-clusterleader
awscontainerinsightreceiver/leader_lock_using_config_map_only:
leader_lock_using_config_map_only: true

0 comments on commit 201c8f5

Please sign in to comment.