Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backport][CWS] backport #29666 - fix task tags in fargate #30164

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions pkg/security/module/cws.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ func NewCWSConsumer(evm *eventmonitor.EventMonitor, cfg *config.RuntimeSecurityC
return nil, err
}

ctx, cancelFnc := context.WithCancel(context.Background())

var selfTester *selftests.SelfTester
if cfg.SelfTestEnabled {
selfTester, err = selftests.NewSelfTester(cfg, evm.Probe)
Expand All @@ -73,14 +71,21 @@ func NewCWSConsumer(evm *eventmonitor.EventMonitor, cfg *config.RuntimeSecurityC

family, address := config.GetFamilyAddress(cfg.SocketPath)

apiServer, err := NewAPIServer(cfg, evm.Probe, opts.MsgSender, evm.StatsdClient, selfTester)
if err != nil {
return nil, err
}

ctx, cancelFnc := context.WithCancel(context.Background())

c := &CWSConsumer{
config: cfg,
probe: evm.Probe,
statsdClient: evm.StatsdClient,
// internals
ctx: ctx,
cancelFnc: cancelFnc,
apiServer: NewAPIServer(cfg, evm.Probe, opts.MsgSender, evm.StatsdClient, selfTester),
apiServer: apiServer,
rateLimiter: events.NewRateLimiter(cfg, evm.StatsdClient),
sendStatsChan: make(chan chan bool, 1),
grpcServer: NewGRPCServer(family, address),
Expand Down
38 changes: 38 additions & 0 deletions pkg/security/module/ecs_tags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build docker

// Package module holds module related files
package module

import (
"context"
"time"

ecsmeta "github.com/DataDog/datadog-agent/pkg/util/ecs/metadata"
)

func getCurrentECSTaskTags() (map[string]string, error) {
client, err := ecsmeta.V3orV4FromCurrentTask()
if err != nil {
return nil, err
}

ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()

task, err := client.GetTask(ctx)
if err != nil {
return nil, err
}

return map[string]string{
"task_name": task.Family,
"task_family": task.Family,
"task_arn": task.TaskARN,
"task_version": task.Version,
}, nil
}
13 changes: 13 additions & 0 deletions pkg/security/module/noecs_tags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build !docker

// Package module holds module related files
package module

func getCurrentECSTaskTags() (map[string]string, error) {
return nil, nil
}
50 changes: 36 additions & 14 deletions pkg/security/module/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.uber.org/atomic"

pkgconfig "github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/config/env"
"github.com/DataDog/datadog-agent/pkg/security/common"
"github.com/DataDog/datadog-agent/pkg/security/config"
"github.com/DataDog/datadog-agent/pkg/security/events"
Expand Down Expand Up @@ -108,6 +109,7 @@ type APIServer struct {
policiesStatusLock sync.RWMutex
policiesStatus []*api.PolicyStatus
msgSender MsgSender
ecsTags map[string]string

stopChan chan struct{}
stopper startstop.Stopper
Expand Down Expand Up @@ -200,6 +202,29 @@ func (a *APIServer) dequeue(now time.Time, cb func(msg *pendingMsg) bool) {
})
}

func (a *APIServer) updateMsgTags(msg *api.SecurityEventMessage) {
// apply ecs tag if possible
if a.ecsTags != nil {
for key, value := range a.ecsTags {
if !slices.ContainsFunc(msg.Tags, func(tag string) bool {
return strings.HasPrefix(tag, key+":")
}) {
msg.Tags = append(msg.Tags, key+":"+value)
}
}
}

// look for the service tag if we don't have one yet
if len(msg.Service) == 0 {
for _, tag := range msg.Tags {
if strings.HasPrefix(tag, "service:") {
msg.Service = strings.TrimPrefix(tag, "service:")
break
}
}
}
}

func (a *APIServer) start(ctx context.Context) {
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
Expand All @@ -217,18 +242,6 @@ func (a *APIServer) start(ctx context.Context) {
}
}

// recopy tags
hasService := len(msg.service) != 0
for _, tag := range msg.tags {
// look for the service tag if we don't have one yet
if !hasService {
if strings.HasPrefix(tag, "service:") {
msg.service = strings.TrimPrefix(tag, "service:")
hasService = true
}
}
}

data, resolved, err := msg.ToJSON()
if err != nil {
seclog.Errorf("failed to marshal event context: %v", err)
Expand All @@ -248,6 +261,7 @@ func (a *APIServer) start(ctx context.Context) {
Service: msg.service,
Tags: msg.tags,
}
a.updateMsgTags(m)

a.msgSender.Send(m, a.expireEvent)

Expand Down Expand Up @@ -489,7 +503,7 @@ func (a *APIServer) SetCWSConsumer(consumer *CWSConsumer) {
}

// NewAPIServer returns a new gRPC event server
func NewAPIServer(cfg *config.RuntimeSecurityConfig, probe *sprobe.Probe, msgSender MsgSender, client statsd.ClientInterface, selfTester *selftests.SelfTester) *APIServer {
func NewAPIServer(cfg *config.RuntimeSecurityConfig, probe *sprobe.Probe, msgSender MsgSender, client statsd.ClientInterface, selfTester *selftests.SelfTester) (*APIServer, error) {
stopper := startstop.NewSerialStopper()

as := &APIServer{
Expand Down Expand Up @@ -522,5 +536,13 @@ func NewAPIServer(cfg *config.RuntimeSecurityConfig, probe *sprobe.Probe, msgSen
}
}

return as
if env.IsECS() || env.IsECSFargate() {
tags, err := getCurrentECSTaskTags()
if err != nil {
return nil, err
}
as.ecsTags = tags
}

return as, nil
}
7 changes: 7 additions & 0 deletions pkg/security/resolvers/tags/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package tags

import (
"context"
"strings"

"github.com/DataDog/datadog-agent/comp/core/tagger/taggerimpl/remote"
taggerTelemetry "github.com/DataDog/datadog-agent/comp/core/tagger/telemetry"
Expand Down Expand Up @@ -72,6 +73,12 @@ func (t *DefaultResolver) Start(ctx context.Context) error {

// Resolve returns the tags for the given id
func (t *DefaultResolver) Resolve(id string) []string {
// container id for ecs task are composed of task id + container id.
// use only the container id part for the tag resolution.
if els := strings.Split(id, "-"); len(els) == 2 {
id = els[1]
}

entityID := types.NewEntityID(types.ContainerID, id)
tags, _ := t.tagger.Tag(entityID.String(), types.OrchestratorCardinality)
return tags
Expand Down
Loading