Skip to content

Commit

Permalink
Revert "[CONTP-48] Cluster Agent consistent tagging via global tags" (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
FlorentClarret authored Nov 21, 2024
1 parent 8418f10 commit 13d83bd
Show file tree
Hide file tree
Showing 16 changed files with 89 additions and 233 deletions.
6 changes: 3 additions & 3 deletions cmd/cluster-agent-cloudfoundry/subcommands/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func run(
}

var clusterCheckHandler *clusterchecks.Handler
clusterCheckHandler, err = setupClusterCheck(mainCtx, ac, taggerComp)
clusterCheckHandler, err = setupClusterCheck(mainCtx, ac)
if err == nil {
api.ModifyAPIRouter(func(r *mux.Router) {
dcav1.InstallChecksEndpoints(r, clusteragent.ServerContext{ClusterCheckHandler: clusterCheckHandler})
Expand Down Expand Up @@ -300,8 +300,8 @@ func initializeBBSCache(ctx context.Context) error {
}
}

func setupClusterCheck(ctx context.Context, ac autodiscovery.Component, tagger tagger.Component) (*clusterchecks.Handler, error) {
handler, err := clusterchecks.NewHandler(ac, tagger)
func setupClusterCheck(ctx context.Context, ac autodiscovery.Component) (*clusterchecks.Handler, error) {
handler, err := clusterchecks.NewHandler(ac)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/cluster-agent/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func start(log log.Component,

if config.GetBool("cluster_checks.enabled") {
// Start the cluster check Autodiscovery
clusterCheckHandler, err := setupClusterCheck(mainCtx, ac, taggerComp)
clusterCheckHandler, err := setupClusterCheck(mainCtx, ac)
if err == nil {
api.ModifyAPIRouter(func(r *mux.Router) {
dcav1.InstallChecksEndpoints(r, clusteragent.ServerContext{ClusterCheckHandler: clusterCheckHandler})
Expand Down Expand Up @@ -549,8 +549,8 @@ func start(log log.Component,
return nil
}

func setupClusterCheck(ctx context.Context, ac autodiscovery.Component, tagger tagger.Component) (*clusterchecks.Handler, error) {
handler, err := clusterchecks.NewHandler(ac, tagger)
func setupClusterCheck(ctx context.Context, ac autodiscovery.Component) (*clusterchecks.Handler, error) {
handler, err := clusterchecks.NewHandler(ac)
if err != nil {
return nil, err
}
Expand Down
12 changes: 4 additions & 8 deletions comp/core/tagger/collectors/workloadmeta_extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,8 @@ func (c *WorkloadMetaCollector) handleContainer(ev workloadmeta.Event) []*types.
}

// static tags for ECS and EKS Fargate containers
for tag, valueList := range c.staticTags {
for _, value := range valueList {
tagList.AddLow(tag, value)
}
for tag, value := range c.staticTags {
tagList.AddLow(tag, value)
}

// gpu tags from container resource requests
Expand Down Expand Up @@ -401,10 +399,8 @@ func (c *WorkloadMetaCollector) extractTagsFromPodEntity(pod *workloadmeta.Kuber
}

// static tags for EKS Fargate pods
for tag, valueList := range c.staticTags {
for _, value := range valueList {
tagList.AddLow(tag, value)
}
for tag, value := range c.staticTags {
tagList.AddLow(tag, value)
}

low, orch, high, standard := tagList.Compute()
Expand Down
53 changes: 22 additions & 31 deletions comp/core/tagger/collectors/workloadmeta_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type WorkloadMetaCollector struct {
containerEnvAsTags map[string]string
containerLabelsAsTags map[string]string

staticTags map[string][]string // for ECS and EKS Fargate
staticTags map[string]string
k8sResourcesAnnotationsAsTags map[string]map[string]string
k8sResourcesLabelsAsTags map[string]map[string]string
globContainerLabels map[string]glob.Glob
Expand Down Expand Up @@ -91,51 +91,42 @@ func (c *WorkloadMetaCollector) initK8sResourcesMetaAsTags(resourcesLabelsAsTags

// Run runs the continuous event watching loop and sends new tags to the
// tagger based on the events sent by the workloadmeta.
func (c *WorkloadMetaCollector) Run(ctx context.Context, datadogConfig config.Component) {
c.collectStaticGlobalTags(ctx, datadogConfig)
func (c *WorkloadMetaCollector) Run(ctx context.Context) {
c.collectStaticGlobalTags(ctx)
c.stream(ctx)
}

func (c *WorkloadMetaCollector) collectStaticGlobalTags(ctx context.Context, datadogConfig config.Component) {
func (c *WorkloadMetaCollector) collectStaticGlobalTags(ctx context.Context) {
c.staticTags = util.GetStaticTags(ctx)
if _, exists := c.staticTags[clusterTagNamePrefix]; flavor.GetFlavor() == flavor.ClusterAgent && !exists {
// If we are running the cluster agent, we want to set the kube_cluster_name tag as a global tag if we are able
// to read it, for the instances where we are running in an environment where hostname cannot be detected.
if cluster := clustername.GetClusterNameTagValue(ctx, ""); cluster != "" {
if c.staticTags == nil {
c.staticTags = make(map[string][]string, 1)
c.staticTags = make(map[string]string, 1)
}
if _, exists := c.staticTags[clusterTagNamePrefix]; !exists {
c.staticTags[clusterTagNamePrefix] = []string{}
}
c.staticTags[clusterTagNamePrefix] = append(c.staticTags[clusterTagNamePrefix], cluster)
c.staticTags[clusterTagNamePrefix] = cluster
}
}
// These are the global tags that should only be applied to the internal global entity
// Whereas the static tags are applied to containers and pods directly as well.
globalEnvTags := util.GetGlobalEnvTags(datadogConfig)

tagList := taglist.NewTagList()
if len(c.staticTags) > 0 {
tags := taglist.NewTagList()

for _, tags := range []map[string][]string{c.staticTags, globalEnvTags} {
for tagKey, valueList := range tags {
for _, value := range valueList {
tagList.AddLow(tagKey, value)
}
for tag, value := range c.staticTags {
tags.AddLow(tag, value)
}
}

low, orch, high, standard := tagList.Compute()
c.tagProcessor.ProcessTagInfo([]*types.TagInfo{
{
Source: staticSource,
EntityID: common.GetGlobalEntityID(),
HighCardTags: high,
OrchestratorCardTags: orch,
LowCardTags: low,
StandardTags: standard,
},
})
low, orch, high, standard := tags.Compute()
c.tagProcessor.ProcessTagInfo([]*types.TagInfo{
{
Source: staticSource,
EntityID: common.GetGlobalEntityID(),
HighCardTags: high,
OrchestratorCardTags: orch,
LowCardTags: low,
StandardTags: standard,
},
})
}
}

func (c *WorkloadMetaCollector) stream(ctx context.Context) {
Expand Down
16 changes: 8 additions & 8 deletions comp/core/tagger/collectors/workloadmeta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestHandleKubePod(t *testing.T) {

tests := []struct {
name string
staticTags map[string][]string
staticTags map[string]string
k8sResourcesAnnotationsAsTags map[string]map[string]string
k8sResourcesLabelsAsTags map[string]map[string]string
pod workloadmeta.KubernetesPod
Expand Down Expand Up @@ -789,8 +789,8 @@ func TestHandleKubePod(t *testing.T) {
},
{
name: "static tags",
staticTags: map[string][]string{
"eks_fargate_node": {"foobar"},
staticTags: map[string]string{
"eks_fargate_node": "foobar",
},
pod: workloadmeta.KubernetesPod{
EntityID: podEntityID,
Expand Down Expand Up @@ -961,7 +961,7 @@ func TestHandleKubePodWithoutPvcAsTags(t *testing.T) {

tests := []struct {
name string
staticTags map[string][]string
staticTags map[string]string
labelsAsTags map[string]string
annotationsAsTags map[string]string
nsLabelsAsTags map[string]string
Expand Down Expand Up @@ -1117,7 +1117,7 @@ func TestHandleKubePodNoContainerName(t *testing.T) {

tests := []struct {
name string
staticTags map[string][]string
staticTags map[string]string
labelsAsTags map[string]string
annotationsAsTags map[string]string
nsLabelsAsTags map[string]string
Expand Down Expand Up @@ -1617,7 +1617,7 @@ func TestHandleContainer(t *testing.T) {

tests := []struct {
name string
staticTags map[string][]string
staticTags map[string]string
labelsAsTags map[string]string
envAsTags map[string]string
container workloadmeta.Container
Expand Down Expand Up @@ -2098,8 +2098,8 @@ func TestHandleContainer(t *testing.T) {
},
{
name: "static tags",
staticTags: map[string][]string{
"eks_fargate_node": {"foobar"},
staticTags: map[string]string{
"eks_fargate_node": "foobar",
},
container: workloadmeta.Container{
EntityID: entityID,
Expand Down
2 changes: 1 addition & 1 deletion comp/core/tagger/impl/local_tagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (t *localTagger) Start(ctx context.Context) error {
)

go t.tagStore.Run(t.ctx)
go t.collector.Run(t.ctx, t.cfg)
go t.collector.Run(t.ctx)

return nil
}
Expand Down
10 changes: 3 additions & 7 deletions pkg/clusteragent/clusterchecks/dispatcher_isolate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@ import (
"github.com/stretchr/testify/assert"

"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
"github.com/DataDog/datadog-agent/comp/core/tagger/mock"
"github.com/DataDog/datadog-agent/pkg/clusteragent/clusterchecks/types"
checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
)

func TestIsolateCheckSuccessful(t *testing.T) {
fakeTagger := mock.SetupFakeTagger(t)
testDispatcher := newDispatcher(fakeTagger)
testDispatcher := newDispatcher()
testDispatcher.store.nodes["A"] = newNodeStore("A", "")
testDispatcher.store.nodes["A"].workers = pkgconfigsetup.DefaultNumWorkers
testDispatcher.store.nodes["B"] = newNodeStore("B", "")
Expand Down Expand Up @@ -101,8 +99,7 @@ func TestIsolateCheckSuccessful(t *testing.T) {
}

func TestIsolateNonExistentCheckFails(t *testing.T) {
fakeTagger := mock.SetupFakeTagger(t)
testDispatcher := newDispatcher(fakeTagger)
testDispatcher := newDispatcher()
testDispatcher.store.nodes["A"] = newNodeStore("A", "")
testDispatcher.store.nodes["A"].workers = pkgconfigsetup.DefaultNumWorkers
testDispatcher.store.nodes["B"] = newNodeStore("B", "")
Expand Down Expand Up @@ -180,8 +177,7 @@ func TestIsolateNonExistentCheckFails(t *testing.T) {
}

func TestIsolateCheckOnlyOneRunnerFails(t *testing.T) {
fakeTagger := mock.SetupFakeTagger(t)
testDispatcher := newDispatcher(fakeTagger)
testDispatcher := newDispatcher()
testDispatcher.store.nodes["A"] = newNodeStore("A", "")
testDispatcher.store.nodes["A"].workers = pkgconfigsetup.DefaultNumWorkers

Expand Down
16 changes: 3 additions & 13 deletions pkg/clusteragent/clusterchecks/dispatcher_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
"time"

"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
"github.com/DataDog/datadog-agent/comp/core/tagger/types"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
"github.com/DataDog/datadog-agent/pkg/status/health"
"github.com/DataDog/datadog-agent/pkg/util/clusteragent"
Expand All @@ -36,21 +34,12 @@ type dispatcher struct {
rebalancingPeriod time.Duration
}

func newDispatcher(tagger tagger.Component) *dispatcher {
func newDispatcher() *dispatcher {
d := &dispatcher{
store: newClusterStore(),
}
d.nodeExpirationSeconds = pkgconfigsetup.Datadog().GetInt64("cluster_checks.node_expiration_timeout")

// Attach the cluster agent's global tags to all dispatched checks
// as defined in the tagger's workloadmeta collector
var err error
d.extraTags, err = tagger.GlobalTags(types.LowCardinality)
if err != nil {
log.Warnf("Cannot get global tags from the tagger: %v", err)
} else {
log.Debugf("Adding global tags to cluster check dispatcher: %v", d.extraTags)
}
d.extraTags = pkgconfigsetup.Datadog().GetStringSlice("cluster_checks.extra_tags")

excludedChecks := pkgconfigsetup.Datadog().GetStringSlice("cluster_checks.exclude_checks")
// This option will almost always be empty
Expand Down Expand Up @@ -88,6 +77,7 @@ func newDispatcher(tagger tagger.Component) *dispatcher {
return d
}

var err error
d.clcRunnersClient, err = clusteragent.GetCLCRunnerClient()
if err != nil {
log.Warnf("Cannot create CLC runners client, advanced dispatching will be disabled: %v", err)
Expand Down
13 changes: 4 additions & 9 deletions pkg/clusteragent/clusterchecks/dispatcher_rebalance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/stretchr/testify/require"

"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
"github.com/DataDog/datadog-agent/comp/core/tagger/mock"
"github.com/DataDog/datadog-agent/pkg/clusteragent/clusterchecks/types"
checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
Expand Down Expand Up @@ -1378,8 +1377,7 @@ func TestRebalance(t *testing.T) {
checkMetricSamplesWeight = originalMetricSamplesWeight
}()

fakeTagger := mock.SetupFakeTagger(t)
dispatcher := newDispatcher(fakeTagger)
dispatcher := newDispatcher()

// prepare store
dispatcher.store.active = true
Expand Down Expand Up @@ -1435,8 +1433,7 @@ func TestMoveCheck(t *testing.T) {
},
} {
t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) {
fakeTagger := mock.SetupFakeTagger(t)
dispatcher := newDispatcher(fakeTagger)
dispatcher := newDispatcher()

// setup check id
id := checkid.BuildID(tc.check.config.Name, tc.check.config.FastDigest(), tc.check.config.Instances[0], tc.check.config.InitConfig)
Expand Down Expand Up @@ -1480,8 +1477,7 @@ func TestCalculateAvg(t *testing.T) {
checkMetricSamplesWeight = originalMetricSamplesWeight
}()

fakeTagger := mock.SetupFakeTagger(t)
testDispatcher := newDispatcher(fakeTagger)
testDispatcher := newDispatcher()

// The busyness of this node is 3 (1 + 2)
testDispatcher.store.nodes["node1"] = newNodeStore("node1", "")
Expand Down Expand Up @@ -1522,8 +1518,7 @@ func TestRebalanceUsingUtilization(t *testing.T) {
// other tests specific for the checksDistribution struct that test more
// complex scenarios.

fakeTagger := mock.SetupFakeTagger(t)
testDispatcher := newDispatcher(fakeTagger)
testDispatcher := newDispatcher()

testDispatcher.store.active = true
testDispatcher.store.nodes["node1"] = newNodeStore("node1", "")
Expand Down
Loading

0 comments on commit 13d83bd

Please sign in to comment.