Skip to content

Commit

Permalink
[CWS] fix task tags in fargate (#29666)
Browse files Browse the repository at this point in the history
  • Loading branch information
safchain authored Oct 2, 2024
1 parent 9ce8a82 commit d4e9db1
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 17 deletions.
11 changes: 8 additions & 3 deletions pkg/security/module/cws.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ func NewCWSConsumer(evm *eventmonitor.EventMonitor, cfg *config.RuntimeSecurityC
return nil, err
}

ctx, cancelFnc := context.WithCancel(context.Background())

var selfTester *selftests.SelfTester
if cfg.SelfTestEnabled {
selfTester, err = selftests.NewSelfTester(cfg, evm.Probe)
Expand All @@ -73,14 +71,21 @@ func NewCWSConsumer(evm *eventmonitor.EventMonitor, cfg *config.RuntimeSecurityC

family, address := config.GetFamilyAddress(cfg.SocketPath)

apiServer, err := NewAPIServer(cfg, evm.Probe, opts.MsgSender, evm.StatsdClient, selfTester)
if err != nil {
return nil, err
}

ctx, cancelFnc := context.WithCancel(context.Background())

c := &CWSConsumer{
config: cfg,
probe: evm.Probe,
statsdClient: evm.StatsdClient,
// internals
ctx: ctx,
cancelFnc: cancelFnc,
apiServer: NewAPIServer(cfg, evm.Probe, opts.MsgSender, evm.StatsdClient, selfTester),
apiServer: apiServer,
rateLimiter: events.NewRateLimiter(cfg, evm.StatsdClient),
sendStatsChan: make(chan chan bool, 1),
grpcServer: NewGRPCServer(family, address),
Expand Down
38 changes: 38 additions & 0 deletions pkg/security/module/ecs_tags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build docker

// Package module holds module related files
package module

import (
"context"
"time"

ecsmeta "github.com/DataDog/datadog-agent/pkg/util/ecs/metadata"
)

func getCurrentECSTaskTags() (map[string]string, error) {
client, err := ecsmeta.V3orV4FromCurrentTask()
if err != nil {
return nil, err
}

ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()

task, err := client.GetTask(ctx)
if err != nil {
return nil, err
}

return map[string]string{
"task_name": task.Family,
"task_family": task.Family,
"task_arn": task.TaskARN,
"task_version": task.Version,
}, nil
}
13 changes: 13 additions & 0 deletions pkg/security/module/noecs_tags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build !docker

// Package module holds module related files
package module

func getCurrentECSTaskTags() (map[string]string, error) {
return nil, nil
}
51 changes: 37 additions & 14 deletions pkg/security/module/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/mailru/easyjson"
"go.uber.org/atomic"

"github.com/DataDog/datadog-agent/pkg/config/env"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
"github.com/DataDog/datadog-agent/pkg/security/common"
"github.com/DataDog/datadog-agent/pkg/security/config"
Expand Down Expand Up @@ -124,6 +125,7 @@ type APIServer struct {
policiesStatusLock sync.RWMutex
policiesStatus []*api.PolicyStatus
msgSender MsgSender
ecsTags map[string]string

stopChan chan struct{}
stopper startstop.Stopper
Expand Down Expand Up @@ -216,6 +218,29 @@ func (a *APIServer) dequeue(now time.Time, cb func(msg *pendingMsg) bool) {
})
}

func (a *APIServer) updateMsgTags(msg *api.SecurityEventMessage) {
// apply ecs tag if possible
if a.ecsTags != nil {
for key, value := range a.ecsTags {
if !slices.ContainsFunc(msg.Tags, func(tag string) bool {
return strings.HasPrefix(tag, key+":")
}) {
msg.Tags = append(msg.Tags, key+":"+value)
}
}
}

// look for the service tag if we don't have one yet
if len(msg.Service) == 0 {
for _, tag := range msg.Tags {
if strings.HasPrefix(tag, "service:") {
msg.Service = strings.TrimPrefix(tag, "service:")
break
}
}
}
}

func (a *APIServer) start(ctx context.Context) {
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
Expand All @@ -233,18 +258,6 @@ func (a *APIServer) start(ctx context.Context) {
}
}

// recopy tags
hasService := len(msg.service) != 0
for _, tag := range msg.tags {
// look for the service tag if we don't have one yet
if !hasService {
if strings.HasPrefix(tag, "service:") {
msg.service = strings.TrimPrefix(tag, "service:")
hasService = true
}
}
}

// not fully resolved, retry
if !msg.isResolved() && msg.retry < maxRetry {
return false
Expand All @@ -264,6 +277,7 @@ func (a *APIServer) start(ctx context.Context) {
Service: msg.service,
Tags: msg.tags,
}
a.updateMsgTags(m)

a.msgSender.Send(m, a.expireEvent)

Expand Down Expand Up @@ -389,6 +403,7 @@ func (a *APIServer) SendEvent(rule *rules.Rule, event events.Event, extTagsCb fu
Service: service,
Tags: tags,
}
a.updateMsgTags(m)

a.msgSender.Send(m, a.expireEvent)
}
Expand Down Expand Up @@ -526,7 +541,7 @@ func (a *APIServer) SetCWSConsumer(consumer *CWSConsumer) {
}

// NewAPIServer returns a new gRPC event server
func NewAPIServer(cfg *config.RuntimeSecurityConfig, probe *sprobe.Probe, msgSender MsgSender, client statsd.ClientInterface, selfTester *selftests.SelfTester) *APIServer {
func NewAPIServer(cfg *config.RuntimeSecurityConfig, probe *sprobe.Probe, msgSender MsgSender, client statsd.ClientInterface, selfTester *selftests.SelfTester) (*APIServer, error) {
stopper := startstop.NewSerialStopper()

as := &APIServer{
Expand Down Expand Up @@ -559,5 +574,13 @@ func NewAPIServer(cfg *config.RuntimeSecurityConfig, probe *sprobe.Probe, msgSen
}
}

return as
if env.IsECS() || env.IsECSFargate() {
tags, err := getCurrentECSTaskTags()
if err != nil {
return nil, err
}
as.ecsTags = tags
}

return as, nil
}
7 changes: 7 additions & 0 deletions pkg/security/resolvers/tags/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package tags

import (
"context"
"strings"

"github.com/DataDog/datadog-agent/comp/core/tagger/taggerimpl/remote"
taggerTelemetry "github.com/DataDog/datadog-agent/comp/core/tagger/telemetry"
Expand Down Expand Up @@ -72,6 +73,12 @@ func (t *DefaultResolver) Start(ctx context.Context) error {

// Resolve returns the tags for the given id
func (t *DefaultResolver) Resolve(id string) []string {
// container id for ecs task are composed of task id + container id.
// use only the container id part for the tag resolution.
if els := strings.Split(id, "-"); len(els) == 2 {
id = els[1]
}

entityID := types.NewEntityID(types.ContainerID, id)
tags, _ := t.tagger.Tag(entityID.String(), types.OrchestratorCardinality)
return tags
Expand Down

0 comments on commit d4e9db1

Please sign in to comment.