Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable sending host tags with metrics for a configurable duration #22467

Merged
merged 18 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions comp/core/tagger/collectors/workloadmeta_extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ func (c *WorkloadMetaCollector) processEvents(evBundle workloadmeta.EventBundle)
tagInfos = append(tagInfos, c.handleECSTask(ev)...)
case workloadmeta.KindContainerImageMetadata:
tagInfos = append(tagInfos, c.handleContainerImage(ev)...)
case workloadmeta.KindHost:
tagInfos = append(tagInfos, c.handleHostTags(ev)...)
case workloadmeta.KindProcess:
// tagInfos = append(tagInfos, c.handleProcess(ev)...) No tags for now
case workloadmeta.KindKubernetesDeployment:
Expand Down Expand Up @@ -283,6 +285,17 @@ func (c *WorkloadMetaCollector) handleContainerImage(ev workloadmeta.Event) []*T
}
}

func (c *WorkloadMetaCollector) handleHostTags(ev workloadmeta.Event) []*TagInfo {
hostTags := ev.Entity.(*workloadmeta.HostTags)
return []*TagInfo{
{
Source: hostSource,
Entity: GlobalEntityID,
LowCardTags: hostTags.HostTags,
},
}
}

func (c *WorkloadMetaCollector) labelsToTags(labels map[string]string, tags *utils.TagList) {
// standard tags from labels
c.extractFromMapWithFn(labels, standardDockerLabels, tags.AddStandard)
Expand Down Expand Up @@ -708,6 +721,8 @@ func buildTaggerEntityID(entityID workloadmeta.EntityID) string {
return fmt.Sprintf("process://%s", entityID.ID)
case workloadmeta.KindKubernetesDeployment:
return fmt.Sprintf("deployment://%s", entityID.ID)
case workloadmeta.KindHost:
return fmt.Sprintf("host://%s", entityID.ID)
default:
log.Errorf("can't recognize entity %q with kind %q; trying %s://%s as tagger entity",
entityID.ID, entityID.Kind, entityID.ID, entityID.Kind)
Expand Down
1 change: 1 addition & 0 deletions comp/core/tagger/collectors/workloadmeta_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
containerSource = workloadmetaCollectorName + "-" + string(workloadmeta.KindContainer)
containerImageSource = workloadmetaCollectorName + "-" + string(workloadmeta.KindContainerImageMetadata)
processSource = workloadmetaCollectorName + "-" + string(workloadmeta.KindProcess)
hostSource = workloadmetaCollectorName + "-" + string(workloadmeta.KindHost)

clusterTagNamePrefix = "kube_cluster_name"
)
Expand Down
5 changes: 4 additions & 1 deletion comp/core/workloadmeta/collectors/collectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,21 @@
package collectors

import (
"go.uber.org/fx"

cf_container "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/cloudfoundry/container"
cf_vm "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/cloudfoundry/vm"
"github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/containerd"
"github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/docker"
"github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/ecs"
"github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/ecsfargate"
"github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/host"
"github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/kubeapiserver"
"github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/kubelet"
"github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/kubemetadata"
"github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/podman"
"github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/remote/processcollector"
"github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/remote/workloadmeta"
"go.uber.org/fx"
)

// GetCatalog returns the set of FX options to populate the catalog
Expand All @@ -39,6 +41,7 @@ func GetCatalog() fx.Option {
podman.GetFxOptions(),
workloadmeta.GetFxOptions(),
processcollector.GetFxOptions(),
host.GetFxOptions(),
}

// remove nil options
Expand Down
117 changes: 117 additions & 0 deletions comp/core/workloadmeta/collectors/internal/host/host.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

// Package host implements the host tag Workloadmeta collector.
package host

import (
"context"

"github.com/benbjohnson/clock"

"github.com/DataDog/datadog-agent/comp/core/config"
"github.com/DataDog/datadog-agent/comp/core/workloadmeta"
hostMetadataUtils "github.com/DataDog/datadog-agent/comp/metadata/host/hostimpl/hosttags"
"github.com/DataDog/datadog-agent/pkg/util/log"

"go.uber.org/fx"
)

const id = "host"

type dependencies struct {
fx.In

Config config.Component
}

type collector struct {
store workloadmeta.Component
catalog workloadmeta.AgentType
config config.Component
clock clock.Clock
timeoutTimer *clock.Timer
}

// GetFxOptions returns the FX framework options for the collector
func GetFxOptions() fx.Option {
return fx.Provide(NewCollector)
}

// NewCollector returns a new host collector provider and an error
func NewCollector(deps dependencies) (workloadmeta.CollectorProvider, error) {
return workloadmeta.CollectorProvider{
Collector: &collector{
catalog: workloadmeta.NodeAgent | workloadmeta.ProcessAgent,
config: deps.Config,
clock: clock.New(),
},
}, nil
}

func (c *collector) Start(_ context.Context, store workloadmeta.Component) error {

c.store = store

duration := c.config.GetDuration("expected_tags_duration")
if duration <= 0 {
return nil
}

log.Debugf("Adding host tags to metrics for %v", duration)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there was a warning here telling users that they set the duration too low. Was that not useful?

Copy link
Member Author

@gh123man gh123man Feb 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 1 min constraint is no longer relevant with the new design. I suppose I could add back a warning about it being < 0

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c.timeoutTimer = c.clock.Timer(duration)

return nil
}

func (c *collector) Pull(ctx context.Context) error {
// Feature is disabled or timeout has previously occurred
if c.timeoutTimer == nil {
return nil
}

// Timeout reached - expire any host tags in the store
if c.resetTimerIfTimedOut() {
c.store.Notify(makeEvent([]string{}))
return nil
}

tags := hostMetadataUtils.Get(ctx, false, c.config).System
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quick question hostmetadata package is not a component yet?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Host metadata is - but this function still depends on a bunch of non-component bits. So I assume that's the reason it has not yet been converted.
I think this would be on @DataDog/agent-shared-components roadmap.

c.store.Notify(makeEvent(tags))
return nil
}

func (c *collector) GetID() string {
return id
}

func (c *collector) GetTargetCatalog() workloadmeta.AgentType {
return c.catalog
}

func (c *collector) resetTimerIfTimedOut() bool {
select {
case <-c.timeoutTimer.C:
c.timeoutTimer = nil
return true
default:
return false
}
}

func makeEvent(tags []string) []workloadmeta.CollectorEvent {
return []workloadmeta.CollectorEvent{
{
Type: workloadmeta.EventTypeSet,
Source: workloadmeta.SourceHost,
Entity: &workloadmeta.HostTags{
EntityID: workloadmeta.EntityID{
Kind: workloadmeta.KindHost,
ID: id,
},
HostTags: tags,
},
}}
}
72 changes: 72 additions & 0 deletions comp/core/workloadmeta/collectors/internal/host/host_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

// Package host implements the host tag Workloadmeta collector.
package host

import (
"context"
"testing"
"time"

"github.com/benbjohnson/clock"
"github.com/stretchr/testify/assert"

"github.com/DataDog/datadog-agent/comp/core"
"github.com/DataDog/datadog-agent/comp/core/config"
"github.com/DataDog/datadog-agent/comp/core/workloadmeta"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"

"go.uber.org/fx"
)

type testDeps struct {
fx.In

Config config.Component
Wml workloadmeta.Mock
}

func TestHostCollector(t *testing.T) {
expectedTags := []string{"tag1:value1", "tag2", "tag3"}
ctx := context.TODO()

overrides := map[string]interface{}{
"tags": expectedTags,
"expected_tags_duration": "10m",
}

deps := fxutil.Test[testDeps](t, fx.Options(
fx.Replace(config.MockParams{Overrides: overrides}),
core.MockBundle(),
fx.Supply(workloadmeta.NewParams()),
fx.Supply(context.Background()),
workloadmeta.MockModule(),
))

eventChan := deps.Wml.SubscribeToEvents()

mockClock := clock.NewMock()
c := collector{
config: deps.Config,
clock: mockClock,
}

c.Start(ctx, deps.Wml)
c.Pull(ctx)

assertTags(t, (<-eventChan).Entity, expectedTags)

mockClock.Add(11 * time.Minute)
mockClock.WaitForAllTimers()
c.Pull(ctx)

assertTags(t, (<-eventChan).Entity, []string{})
}

func assertTags(t *testing.T, entity workloadmeta.Entity, expectedTags []string) {
e := entity.(*workloadmeta.HostTags)
assert.ElementsMatch(t, e.HostTags, expectedTags)
}
3 changes: 3 additions & 0 deletions comp/core/workloadmeta/component_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type Mock interface {

// GetConfig returns a Config Reader for the internal injected config
GetNotifiedEvents() []CollectorEvent

// SubscribeToEvents returns a channel that receives events
SubscribeToEvents() chan CollectorEvent
}

// MockModule defines the fx options for the mock component.
Expand Down
48 changes: 47 additions & 1 deletion comp/core/workloadmeta/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ package workloadmeta

import (
"fmt"
langUtil "github.com/DataDog/datadog-agent/pkg/languagedetection/util"
"io"
"strings"
"time"

langUtil "github.com/DataDog/datadog-agent/pkg/languagedetection/util"

"github.com/CycloneDX/cyclonedx-go"
"github.com/mohae/deepcopy"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
Expand Down Expand Up @@ -45,6 +46,7 @@ const (
KindECSTask Kind = "ecs_task"
KindContainerImageMetadata Kind = "container_image_metadata"
KindProcess Kind = "process"
KindHost Kind = "host"
)

// Source is the source name of an entity.
Expand Down Expand Up @@ -82,6 +84,9 @@ const (
// SourceLanguageDetectionServer represents container languages
// detected by node agents
SourceLanguageDetectionServer Source = "language_detection_server"

// SourceHost represents entities detected by the host such as host tags.
SourceHost Source = "host"
)

// ContainerRuntime is the container runtime used by a container.
Expand Down Expand Up @@ -1007,6 +1012,47 @@ func (p Process) String(verbose bool) string { //nolint:revive // TODO fix reviv
return sb.String()
}

// HostTags is an Entity that represents host tags
type HostTags struct {
EntityID

HostTags []string
}

var _ Entity = &HostTags{}

// GetID implements Entity#GetID.
func (p HostTags) GetID() EntityID {
return p.EntityID
}

// DeepCopy implements Entity#DeepCopy.
func (p HostTags) DeepCopy() Entity {
cp := deepcopy.Copy(p).(HostTags)
return &cp
}

// Merge implements Entity#Merge.
func (p *HostTags) Merge(e Entity) error {
otherHost, ok := e.(*HostTags)
if !ok {
return fmt.Errorf("cannot merge Host metadata with different kind %T", e)
}

return merge(p, otherHost)
}

// String implements Entity#String.
func (p HostTags) String(verbose bool) string {
var sb strings.Builder

_, _ = fmt.Fprintln(&sb, "----------- Entity ID -----------")
_, _ = fmt.Fprint(&sb, p.EntityID.String(verbose))
_, _ = fmt.Fprintln(&sb, "Host Tags:", sliceToString(p.HostTags))

return sb.String()
}

// CollectorEvent is an event generated by a metadata collector, to be handled
// by the metadata store.
type CollectorEvent struct {
Expand Down
Loading
Loading