Skip to content

Commit

Permalink
[backport][CWS] backport #29666 - fix task tags in fargate (#30164)
Browse files Browse the repository at this point in the history
Co-authored-by: safchain <safchain@gmail.com>
paulcacheux and safchain authored Oct 21, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 301549b commit 772b13f
Showing 5 changed files with 102 additions and 17 deletions.
11 changes: 8 additions & 3 deletions pkg/security/module/cws.go
Original file line number Diff line number Diff line change
@@ -61,8 +61,6 @@ func NewCWSConsumer(evm *eventmonitor.EventMonitor, cfg *config.RuntimeSecurityC
return nil, err
}

ctx, cancelFnc := context.WithCancel(context.Background())

var selfTester *selftests.SelfTester
if cfg.SelfTestEnabled {
selfTester, err = selftests.NewSelfTester(cfg, evm.Probe)
@@ -73,14 +71,21 @@ func NewCWSConsumer(evm *eventmonitor.EventMonitor, cfg *config.RuntimeSecurityC

family, address := config.GetFamilyAddress(cfg.SocketPath)

apiServer, err := NewAPIServer(cfg, evm.Probe, opts.MsgSender, evm.StatsdClient, selfTester)
if err != nil {
return nil, err
}

ctx, cancelFnc := context.WithCancel(context.Background())

c := &CWSConsumer{
config: cfg,
probe: evm.Probe,
statsdClient: evm.StatsdClient,
// internals
ctx: ctx,
cancelFnc: cancelFnc,
apiServer: NewAPIServer(cfg, evm.Probe, opts.MsgSender, evm.StatsdClient, selfTester),
apiServer: apiServer,
rateLimiter: events.NewRateLimiter(cfg, evm.StatsdClient),
sendStatsChan: make(chan chan bool, 1),
grpcServer: NewGRPCServer(family, address),
38 changes: 38 additions & 0 deletions pkg/security/module/ecs_tags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build docker

// Package module holds module related files
package module

import (
"context"
"time"

ecsmeta "github.com/DataDog/datadog-agent/pkg/util/ecs/metadata"
)

func getCurrentECSTaskTags() (map[string]string, error) {
client, err := ecsmeta.V3orV4FromCurrentTask()
if err != nil {
return nil, err
}

ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()

task, err := client.GetTask(ctx)
if err != nil {
return nil, err
}

return map[string]string{
"task_name": task.Family,
"task_family": task.Family,
"task_arn": task.TaskARN,
"task_version": task.Version,
}, nil
}
13 changes: 13 additions & 0 deletions pkg/security/module/noecs_tags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build !docker

// Package module holds module related files
package module

func getCurrentECSTaskTags() (map[string]string, error) {
return nil, nil
}
50 changes: 36 additions & 14 deletions pkg/security/module/server.go
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ import (
"go.uber.org/atomic"

pkgconfig "github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/config/env"
"github.com/DataDog/datadog-agent/pkg/security/common"
"github.com/DataDog/datadog-agent/pkg/security/config"
"github.com/DataDog/datadog-agent/pkg/security/events"
@@ -108,6 +109,7 @@ type APIServer struct {
policiesStatusLock sync.RWMutex
policiesStatus []*api.PolicyStatus
msgSender MsgSender
ecsTags map[string]string

stopChan chan struct{}
stopper startstop.Stopper
@@ -200,6 +202,29 @@ func (a *APIServer) dequeue(now time.Time, cb func(msg *pendingMsg) bool) {
})
}

func (a *APIServer) updateMsgTags(msg *api.SecurityEventMessage) {
// apply ecs tag if possible
if a.ecsTags != nil {
for key, value := range a.ecsTags {
if !slices.ContainsFunc(msg.Tags, func(tag string) bool {
return strings.HasPrefix(tag, key+":")
}) {
msg.Tags = append(msg.Tags, key+":"+value)
}
}
}

// look for the service tag if we don't have one yet
if len(msg.Service) == 0 {
for _, tag := range msg.Tags {
if strings.HasPrefix(tag, "service:") {
msg.Service = strings.TrimPrefix(tag, "service:")
break
}
}
}
}

func (a *APIServer) start(ctx context.Context) {
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
@@ -217,18 +242,6 @@ func (a *APIServer) start(ctx context.Context) {
}
}

// recopy tags
hasService := len(msg.service) != 0
for _, tag := range msg.tags {
// look for the service tag if we don't have one yet
if !hasService {
if strings.HasPrefix(tag, "service:") {
msg.service = strings.TrimPrefix(tag, "service:")
hasService = true
}
}
}

data, resolved, err := msg.ToJSON()
if err != nil {
seclog.Errorf("failed to marshal event context: %v", err)
@@ -248,6 +261,7 @@ func (a *APIServer) start(ctx context.Context) {
Service: msg.service,
Tags: msg.tags,
}
a.updateMsgTags(m)

a.msgSender.Send(m, a.expireEvent)

@@ -489,7 +503,7 @@ func (a *APIServer) SetCWSConsumer(consumer *CWSConsumer) {
}

// NewAPIServer returns a new gRPC event server
func NewAPIServer(cfg *config.RuntimeSecurityConfig, probe *sprobe.Probe, msgSender MsgSender, client statsd.ClientInterface, selfTester *selftests.SelfTester) *APIServer {
func NewAPIServer(cfg *config.RuntimeSecurityConfig, probe *sprobe.Probe, msgSender MsgSender, client statsd.ClientInterface, selfTester *selftests.SelfTester) (*APIServer, error) {
stopper := startstop.NewSerialStopper()

as := &APIServer{
@@ -522,5 +536,13 @@ func NewAPIServer(cfg *config.RuntimeSecurityConfig, probe *sprobe.Probe, msgSen
}
}

return as
if env.IsECS() || env.IsECSFargate() {
tags, err := getCurrentECSTaskTags()
if err != nil {
return nil, err
}
as.ecsTags = tags
}

return as, nil
}
7 changes: 7 additions & 0 deletions pkg/security/resolvers/tags/resolver.go
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ package tags

import (
"context"
"strings"

"github.com/DataDog/datadog-agent/comp/core/tagger/taggerimpl/remote"
taggerTelemetry "github.com/DataDog/datadog-agent/comp/core/tagger/telemetry"
@@ -72,6 +73,12 @@ func (t *DefaultResolver) Start(ctx context.Context) error {

// Resolve returns the tags for the given id
func (t *DefaultResolver) Resolve(id string) []string {
// container id for ecs task are composed of task id + container id.
// use only the container id part for the tag resolution.
if els := strings.Split(id, "-"); len(els) == 2 {
id = els[1]
}

entityID := types.NewEntityID(types.ContainerID, id)
tags, _ := t.tagger.Tag(entityID.String(), types.OrchestratorCardinality)
return tags

0 comments on commit 772b13f

Please sign in to comment.