Skip to content

Commit

Permalink
[CONTP-312] duplicate pod and deployment metadata with address aliasi…
Browse files Browse the repository at this point in the history
…ng (#27506)
  • Loading branch information
adel121 authored Jul 15, 2024
1 parent a8f7ee2 commit e1ae6d8
Show file tree
Hide file tree
Showing 15 changed files with 520 additions and 245 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1186,7 +1186,7 @@ func TestHandleKubeMetadata(t *testing.T) {
"namespace_security": "critical",
},
},
GVR: schema.GroupVersionResource{
GVR: &schema.GroupVersionResource{
Version: "v1",
Resource: "namespaces",
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,26 @@ package kubeapiserver

import (
"context"
"regexp"
"strings"

appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

"github.com/DataDog/datadog-agent/comp/core/config"
"github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/util"
workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def"
languagedetectionUtil "github.com/DataDog/datadog-agent/pkg/languagedetection/util"
ddkube "github.com/DataDog/datadog-agent/pkg/util/kubernetes"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

// deploymentFilter filters out deployments that can't be used for unified service tagging or process language detection
type deploymentFilter struct{}

func (f *deploymentFilter) filteredOut(entity workloadmeta.Entity) bool {
deployment := entity.(*workloadmeta.KubernetesDeployment)
return deployment == nil
}

func newDeploymentStore(ctx context.Context, wlm workloadmeta.Component, _ config.Reader, client kubernetes.Interface) (*cache.Reflector, *reflectorStore) {
func newDeploymentStore(ctx context.Context, wlm workloadmeta.Component, cfg config.Reader, client kubernetes.Interface) (*cache.Reflector, *reflectorStore) {
deploymentListerWatcher := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.AppsV1().Deployments(metav1.NamespaceAll).List(ctx, options)
Expand All @@ -43,7 +39,7 @@ func newDeploymentStore(ctx context.Context, wlm workloadmeta.Component, _ confi
},
}

deploymentStore := newDeploymentReflectorStore(wlm)
deploymentStore := newDeploymentReflectorStore(wlm, cfg)
deploymentReflector := cache.NewNamedReflector(
componentName,
deploymentListerWatcher,
Expand All @@ -54,21 +50,41 @@ func newDeploymentStore(ctx context.Context, wlm workloadmeta.Component, _ confi
return deploymentReflector, deploymentStore
}

func newDeploymentReflectorStore(wlmetaStore workloadmeta.Component) *reflectorStore {
func newDeploymentReflectorStore(wlmetaStore workloadmeta.Component, cfg config.Reader) *reflectorStore {
annotationsExclude := cfg.GetStringSlice("cluster_agent.kubernetes_resources_collection.deployment_annotations_exclude")
parser, err := newdeploymentParser(annotationsExclude)
if err != nil {
_ = log.Errorf("unable to parse all deployment_annotations_exclude: %v, err:", err)
parser, _ = newdeploymentParser(nil)
}

store := &reflectorStore{
wlmetaStore: wlmetaStore,
seen: make(map[string]workloadmeta.EntityID),
parser: newdeploymentParser(),
filter: &deploymentFilter{},
seen: make(map[string][]workloadmeta.EntityID),
parser: parser,
}

return store
}

type deploymentParser struct{}
type deploymentParser struct {
annotationsFilter []*regexp.Regexp
gvr *schema.GroupVersionResource
}

func newdeploymentParser() objectParser {
return deploymentParser{}
func newdeploymentParser(annotationsExclude []string) (objectParser, error) {
filters, err := parseFilters(annotationsExclude)
if err != nil {
return nil, err
}
return deploymentParser{
annotationsFilter: filters,
gvr: &schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "deployments",
},
}, nil
}

func updateContainerLanguage(cl languagedetectionUtil.ContainersLanguages, container languagedetectionUtil.Container, languages string) {
Expand All @@ -81,7 +97,7 @@ func updateContainerLanguage(cl languagedetectionUtil.ContainersLanguages, conta
}
}

func (p deploymentParser) Parse(obj interface{}) workloadmeta.Entity {
func (p deploymentParser) Parse(obj interface{}) []workloadmeta.Entity {
deployment := obj.(*appsv1.Deployment)
containerLanguages := make(languagedetectionUtil.ContainersLanguages)

Expand All @@ -100,14 +116,32 @@ func (p deploymentParser) Parse(obj interface{}) workloadmeta.Entity {
}
}

return &workloadmeta.KubernetesDeployment{
deploymentEntity := &workloadmeta.KubernetesDeployment{
EntityID: workloadmeta.EntityID{
Kind: workloadmeta.KindKubernetesDeployment,
ID: deployment.Namespace + "/" + deployment.Name, // we use the namespace/name as id to make it easier for the admission controller to retrieve the corresponding deployment
},
EntityMeta: workloadmeta.EntityMeta{
Name: deployment.Name,
Namespace: deployment.Namespace,
Labels: deployment.Labels,
Annotations: filterMapStringKey(deployment.Annotations, p.annotationsFilter),
},
Env: deployment.Labels[ddkube.EnvTagLabelKey],
Service: deployment.Labels[ddkube.ServiceTagLabelKey],
Version: deployment.Labels[ddkube.VersionTagLabelKey],
InjectableLanguages: containerLanguages,
}

return []workloadmeta.Entity{
deploymentEntity,
&workloadmeta.KubernetesMetadata{
EntityID: workloadmeta.EntityID{
Kind: workloadmeta.KindKubernetesMetadata,
ID: string(util.GenerateKubeMetadataEntityID("apps", "deployments", deployment.Namespace, deployment.Name)),
},
EntityMeta: deploymentEntity.EntityMeta,
GVR: p.gvr,
},
}
}
Loading

0 comments on commit e1ae6d8

Please sign in to comment.