From d4e9db1893d990e9a1688372dad955a2de41f468 Mon Sep 17 00:00:00 2001 From: Sylvain Afchain Date: Wed, 2 Oct 2024 10:05:55 +0200 Subject: [PATCH] [CWS] fix task tags in fargate (#29666) --- pkg/security/module/cws.go | 11 ++++-- pkg/security/module/ecs_tags.go | 38 ++++++++++++++++++ pkg/security/module/noecs_tags.go | 13 +++++++ pkg/security/module/server.go | 51 ++++++++++++++++++------- pkg/security/resolvers/tags/resolver.go | 7 ++++ 5 files changed, 103 insertions(+), 17 deletions(-) create mode 100644 pkg/security/module/ecs_tags.go create mode 100644 pkg/security/module/noecs_tags.go diff --git a/pkg/security/module/cws.go b/pkg/security/module/cws.go index 863a5dedc01a5..6cd8aa2220378 100644 --- a/pkg/security/module/cws.go +++ b/pkg/security/module/cws.go @@ -61,8 +61,6 @@ func NewCWSConsumer(evm *eventmonitor.EventMonitor, cfg *config.RuntimeSecurityC return nil, err } - ctx, cancelFnc := context.WithCancel(context.Background()) - var selfTester *selftests.SelfTester if cfg.SelfTestEnabled { selfTester, err = selftests.NewSelfTester(cfg, evm.Probe) @@ -73,6 +71,13 @@ func NewCWSConsumer(evm *eventmonitor.EventMonitor, cfg *config.RuntimeSecurityC family, address := config.GetFamilyAddress(cfg.SocketPath) + apiServer, err := NewAPIServer(cfg, evm.Probe, opts.MsgSender, evm.StatsdClient, selfTester) + if err != nil { + return nil, err + } + + ctx, cancelFnc := context.WithCancel(context.Background()) + c := &CWSConsumer{ config: cfg, probe: evm.Probe, @@ -80,7 +85,7 @@ func NewCWSConsumer(evm *eventmonitor.EventMonitor, cfg *config.RuntimeSecurityC // internals ctx: ctx, cancelFnc: cancelFnc, - apiServer: NewAPIServer(cfg, evm.Probe, opts.MsgSender, evm.StatsdClient, selfTester), + apiServer: apiServer, rateLimiter: events.NewRateLimiter(cfg, evm.StatsdClient), sendStatsChan: make(chan chan bool, 1), grpcServer: NewGRPCServer(family, address), diff --git a/pkg/security/module/ecs_tags.go b/pkg/security/module/ecs_tags.go new file mode 100644 index 0000000000000..6d9db97c81c7e --- /dev/null +++ b/pkg/security/module/ecs_tags.go @@ -0,0 +1,38 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build docker + +// Package module holds module related files +package module + +import ( + "context" + "time" + + ecsmeta "github.com/DataDog/datadog-agent/pkg/util/ecs/metadata" +) + +func getCurrentECSTaskTags() (map[string]string, error) { + client, err := ecsmeta.V3orV4FromCurrentTask() + if err != nil { + return nil, err + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) + defer cancel() + + task, err := client.GetTask(ctx) + if err != nil { + return nil, err + } + + return map[string]string{ + "task_name": task.Family, + "task_family": task.Family, + "task_arn": task.TaskARN, + "task_version": task.Version, + }, nil +} diff --git a/pkg/security/module/noecs_tags.go b/pkg/security/module/noecs_tags.go new file mode 100644 index 0000000000000..93e2a2467d4b1 --- /dev/null +++ b/pkg/security/module/noecs_tags.go @@ -0,0 +1,13 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build !docker + +// Package module holds module related files +package module + +func getCurrentECSTaskTags() (map[string]string, error) { + return nil, nil +} diff --git a/pkg/security/module/server.go b/pkg/security/module/server.go index dc314d9d42f10..61ebecea1bbe0 100644 --- a/pkg/security/module/server.go +++ b/pkg/security/module/server.go @@ -21,6 +21,7 @@ import ( "github.com/mailru/easyjson" "go.uber.org/atomic" + "github.com/DataDog/datadog-agent/pkg/config/env" pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" "github.com/DataDog/datadog-agent/pkg/security/common" "github.com/DataDog/datadog-agent/pkg/security/config" @@ -124,6 +125,7 @@ type APIServer struct { policiesStatusLock sync.RWMutex policiesStatus []*api.PolicyStatus msgSender MsgSender + ecsTags map[string]string stopChan chan struct{} stopper startstop.Stopper @@ -216,6 +218,29 @@ func (a *APIServer) dequeue(now time.Time, cb func(msg *pendingMsg) bool) { }) } +func (a *APIServer) updateMsgTags(msg *api.SecurityEventMessage) { + // apply ecs tag if possible + if a.ecsTags != nil { + for key, value := range a.ecsTags { + if !slices.ContainsFunc(msg.Tags, func(tag string) bool { + return strings.HasPrefix(tag, key+":") + }) { + msg.Tags = append(msg.Tags, key+":"+value) + } + } + } + + // look for the service tag if we don't have one yet + if len(msg.Service) == 0 { + for _, tag := range msg.Tags { + if strings.HasPrefix(tag, "service:") { + msg.Service = strings.TrimPrefix(tag, "service:") + break + } + } + } +} + func (a *APIServer) start(ctx context.Context) { ticker := time.NewTicker(200 * time.Millisecond) defer ticker.Stop() @@ -233,18 +258,6 @@ func (a *APIServer) start(ctx context.Context) { } } - // recopy tags - hasService := len(msg.service) != 0 - for _, tag := range msg.tags { - // look for the service tag if we don't have one yet - if !hasService { - if strings.HasPrefix(tag, "service:") { - msg.service = strings.TrimPrefix(tag, "service:") - hasService = true - } - } - } - // not fully resolved, retry if !msg.isResolved() && msg.retry < maxRetry { return false @@ -264,6 +277,7 @@ func (a *APIServer) start(ctx context.Context) { Service: msg.service, Tags: msg.tags, } + a.updateMsgTags(m) a.msgSender.Send(m, a.expireEvent) @@ -389,6 +403,7 @@ func (a *APIServer) SendEvent(rule *rules.Rule, event events.Event, extTagsCb fu Service: service, Tags: tags, } + a.updateMsgTags(m) a.msgSender.Send(m, a.expireEvent) } @@ -526,7 +541,7 @@ func (a *APIServer) SetCWSConsumer(consumer *CWSConsumer) { } // NewAPIServer returns a new gRPC event server -func NewAPIServer(cfg *config.RuntimeSecurityConfig, probe *sprobe.Probe, msgSender MsgSender, client statsd.ClientInterface, selfTester *selftests.SelfTester) *APIServer { +func NewAPIServer(cfg *config.RuntimeSecurityConfig, probe *sprobe.Probe, msgSender MsgSender, client statsd.ClientInterface, selfTester *selftests.SelfTester) (*APIServer, error) { stopper := startstop.NewSerialStopper() as := &APIServer{ @@ -559,5 +574,13 @@ func NewAPIServer(cfg *config.RuntimeSecurityConfig, probe *sprobe.Probe, msgSen } } - return as + if env.IsECS() || env.IsECSFargate() { + tags, err := getCurrentECSTaskTags() + if err != nil { + return nil, err + } + as.ecsTags = tags + } + + return as, nil } diff --git a/pkg/security/resolvers/tags/resolver.go b/pkg/security/resolvers/tags/resolver.go index ab0285529e524..caa06d60626cf 100644 --- a/pkg/security/resolvers/tags/resolver.go +++ b/pkg/security/resolvers/tags/resolver.go @@ -8,6 +8,7 @@ package tags import ( "context" + "strings" "github.com/DataDog/datadog-agent/comp/core/tagger/taggerimpl/remote" taggerTelemetry "github.com/DataDog/datadog-agent/comp/core/tagger/telemetry" @@ -72,6 +73,12 @@ func (t *DefaultResolver) Start(ctx context.Context) error { // Resolve returns the tags for the given id func (t *DefaultResolver) Resolve(id string) []string { + // container id for ecs task are composed of task id + container id. + // use only the container id part for the tag resolution. + if els := strings.Split(id, "-"); len(els) == 2 { + id = els[1] + } + entityID := types.NewEntityID(types.ContainerID, id) tags, _ := t.tagger.Tag(entityID.String(), types.OrchestratorCardinality) return tags