Skip to content

Commit

Permalink
feat(tagger): implement caching for GenerateContainerIDFromOriginInfo
Browse files Browse the repository at this point in the history
Signed-off-by: Wassim DHIF <[email protected]>
  • Loading branch information
wdhif committed Dec 19, 2024
1 parent 894a731 commit 0eedc89
Showing 1 changed file with 65 additions and 52 deletions.
117 changes: 65 additions & 52 deletions comp/core/tagger/impl-remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core"
taggertypes "github.com/DataDog/datadog-agent/pkg/tagger/types"
"github.com/DataDog/datadog-agent/pkg/tagset"
"github.com/DataDog/datadog-agent/pkg/util/cache"
"github.com/DataDog/datadog-agent/pkg/util/common"
grpcutil "github.com/DataDog/datadog-agent/pkg/util/grpc"
httputils "github.com/DataDog/datadog-agent/pkg/util/http"
Expand All @@ -44,11 +45,13 @@ import (
const (
noTimeout = 0 * time.Minute
streamRecvTimeout = 10 * time.Minute
cacheExpiration = 1 * time.Minute
)

var errTaggerStreamNotStarted = errors.New("tagger stream not started")

var errTaggerFailedGenerateContainerIDFromOriginInfo = errors.New("tagger failed to generate container ID from origin info")
var (
errTaggerStreamNotStarted = errors.New("tagger stream not started")
errTaggerFailedGenerateContainerIDFromOriginInfo = errors.New("tagger failed to generate container ID from origin info")
)

// Requires defines the dependencies for the remote tagger.
type Requires struct {
Expand Down Expand Up @@ -269,65 +272,75 @@ func (t *remoteTagger) GenerateContainerIDFromOriginInfo(originInfo origindetect
}
}()

expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = 500 * time.Millisecond
expBackoff.MaxInterval = 1 * time.Second
expBackoff.MaxElapsedTime = 15 * time.Second

var containerID string

err := backoff.Retry(func() error {
select {
case <-t.ctx.Done():
return &backoff.PermanentError{Err: errTaggerFailedGenerateContainerIDFromOriginInfo}
default:
}
// Currently we only use the External Data from the Origin Info to generate the container ID.
// Serialize the External Data to a string and use it as the cache key.
initPrefix := ""
if originInfo.ExternalData.Init {
initPrefix = "i/"
}
key := cache.BuildAgentKey("remoteTagger", "container_id", fmt.Sprintf("%s%s/%s", initPrefix, originInfo.ExternalData.PodUID, originInfo.ExternalData.ContainerName))

cachedContainerID, err := cache.GetWithExpiration(key, func() (containerID string, err error) {
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = 500 * time.Millisecond
expBackoff.MaxInterval = 1 * time.Second
expBackoff.MaxElapsedTime = 15 * time.Second

err = backoff.Retry(func() error {
select {
case <-t.ctx.Done():
return &backoff.PermanentError{Err: errTaggerFailedGenerateContainerIDFromOriginInfo}
default:
}

// Fetch the auth token
if t.token == "" {
var authError error
t.token, authError = t.options.TokenFetcher()
if authError != nil {
_ = t.log.Errorf("unable to fetch auth token, will possibly retry: %s", authError)
return authError
// Fetch the auth token
if t.token == "" {
var authError error
t.token, authError = t.options.TokenFetcher()
if authError != nil {
_ = t.log.Errorf("unable to fetch auth token, will possibly retry: %s", authError)
return authError
}
}
}

// Create the context with the auth token
t.queryCtx, t.queryCancel = context.WithCancel(
metadata.NewOutgoingContext(t.ctx, metadata.MD{
"authorization": []string{fmt.Sprintf("Bearer %s", t.token)},
}),
)
// Create the context with the auth token
t.queryCtx, t.queryCancel = context.WithCancel(
metadata.NewOutgoingContext(t.ctx, metadata.MD{
"authorization": []string{fmt.Sprintf("Bearer %s", t.token)},
}),
)

// Call the gRPC method to get the container ID from the origin info
containerIDResponse, err := t.client.TaggerGenerateContainerIDFromOriginInfo(t.queryCtx, &pb.GenerateContainerIDFromOriginInfoRequest{
ExternalData: &pb.GenerateContainerIDFromOriginInfoRequest_ExternalData{
Init: &originInfo.ExternalData.Init,
ContainerName: &originInfo.ExternalData.ContainerName,
PodUID: &originInfo.ExternalData.PodUID,
},
})
if err != nil {
_ = t.log.Errorf("unable to generate container ID from origin info, will retry: %s", err)
return err
}

// Call the gRPC method to get the container ID from the origin info
containerIDResponse, err := t.client.TaggerGenerateContainerIDFromOriginInfo(t.queryCtx, &pb.GenerateContainerIDFromOriginInfoRequest{
ExternalData: &pb.GenerateContainerIDFromOriginInfoRequest_ExternalData{
Init: &originInfo.ExternalData.Init,
ContainerName: &originInfo.ExternalData.ContainerName,
PodUID: &originInfo.ExternalData.PodUID,
},
})
if err != nil {
_ = t.log.Errorf("unable to generate container ID from origin info, will retry: %s", err)
return err
}
if containerIDResponse == nil {
_ = t.log.Warnf("unable to generate container ID from origin info, will retry: %s", err)
return errors.New("containerIDResponse is nil")
}
containerID = containerIDResponse.ContainerID

if containerIDResponse == nil {
_ = t.log.Warnf("unable to generate container ID from origin info, will retry: %s", err)
return errors.New("containerIDResponse is nil")
}
containerID = containerIDResponse.ContainerID
t.log.Debugf("Container ID generated successfully from origin info %+v: %s", originInfo, containerID)
return nil
}, expBackoff)

fail = false
t.log.Debugf("Container ID generated successfully from origin info %+v: %s", originInfo, containerID)
return nil
}, expBackoff)
return containerID, err
}, cacheExpiration)

if err != nil {
return "", err
}
return containerID, nil
fail = false
return cachedContainerID, nil
}

// AccumulateTagsFor returns tags for a given entity at the desired cardinality.
Expand Down

0 comments on commit 0eedc89

Please sign in to comment.