Skip to content

Commit

Permalink
Refactor to fix race conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
davidor committed Jan 5, 2023
1 parent e793b9e commit c4fbfb1
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 186 deletions.
12 changes: 6 additions & 6 deletions pkg/workloadmeta/collectors/internal/containerd/containerd.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,18 @@ type collector struct {
// contToExitInfo caches the exit info of a task to enrich the container deletion event when it's received later.
contToExitInfo map[string]*exitInfo

// Stores a map of image name => image ID.
// Events from containerd contain an image name, but not IDs. When a delete
// event arrives it'll contain a name, but we won't be able to access the ID
// because the image is already gone, that's why we need this map.
knownImages map[string]string
knownImages *knownImages

// Map of image ID => array of repo tags
repoTags map[string][]string
}

func init() {
workloadmeta.RegisterCollector(collectorID, func() workloadmeta.Collector {
return &collector{
contToExitInfo: make(map[string]*exitInfo),
knownImages: make(map[string]string),
knownImages: newKnownImages(),
repoTags: make(map[string][]string),
}
})
}
Expand Down
228 changes: 121 additions & 107 deletions pkg/workloadmeta/collectors/internal/containerd/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"fmt"
"strings"

"github.com/DataDog/datadog-agent/pkg/workloadmeta"
log "github.com/cihub/seelog"
"github.com/containerd/containerd"
"github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/content"
Expand All @@ -24,10 +22,61 @@ import (
"github.com/containerd/containerd/namespaces"
"github.com/gogo/protobuf/proto"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"

"github.com/DataDog/datadog-agent/pkg/util/log"
"github.com/DataDog/datadog-agent/pkg/workloadmeta"
)

const imageTopicPrefix = "/images/"

// Events from containerd contain an image name, but not IDs. When a delete
// event arrives it'll contain a name, but we won't be able to access the ID
// because the image is already gone, that's why we need to keep the IDs =>
// names relationships.
type knownImages struct {
// Store IDs and names in both directions for efficient access
idsByName map[string]string // map name => ID
namesByID map[string]map[string]struct{} // map ID => set of names
}

func newKnownImages() *knownImages {
return &knownImages{
idsByName: make(map[string]string),
namesByID: make(map[string]map[string]struct{}),
}
}

func (images *knownImages) addAssociation(imageName string, imageID string) {
images.idsByName[imageName] = imageID

if images.namesByID[imageID] == nil {
images.namesByID[imageID] = make(map[string]struct{})
}
images.namesByID[imageID][imageName] = struct{}{}
}

func (images *knownImages) deleteAssociation(imageName string, imageID string) {
delete(images.idsByName, imageName)

if images.namesByID[imageID] == nil {
return
}

delete(images.namesByID[imageID], imageName)
if len(images.namesByID[imageID]) == 0 {
delete(images.namesByID, imageID)
}
}

func (images *knownImages) getImageID(imageName string) (string, bool) {
id, found := images.idsByName[imageName]
return id, found
}

func (images *knownImages) isReferenced(imageID string) bool {
return len(images.namesByID[imageID]) > 0
}

func isImageTopic(topic string) bool {
return strings.HasPrefix(topic, imageTopicPrefix)
}
Expand Down Expand Up @@ -56,13 +105,19 @@ func (c *collector) handleImageEvent(ctx context.Context, containerdEvent *conta
return fmt.Errorf("error unmarshaling containerd event: %w", err)
}

imageID, found := c.knownImages[event.Name]
imageID, found := c.knownImages.getImageID(event.Name)
if !found {
// Not necessarily an error. If an image had multiple names, it
// could have already been deleted using other name
return nil
}

c.knownImages.deleteAssociation(event.Name, imageID)

if c.knownImages.isReferenced(imageID) {
// Image is still referenced by a different name. Don't delete the
// image, but update its repo tags.
return c.deleteRepoTagOfImage(ctx, containerdEvent.Namespace, imageID, event.Name)
}

c.store.Notify([]workloadmeta.CollectorEvent{
{
Type: workloadmeta.EventTypeUnset,
Expand All @@ -76,8 +131,6 @@ func (c *collector) handleImageEvent(ctx context.Context, containerdEvent *conta
},
})

delete(c.knownImages, event.Name)

return nil
default:
return fmt.Errorf("unknown containerd image event topic %s, ignoring", containerdEvent.Topic)
Expand All @@ -103,7 +156,7 @@ func (c *collector) notifyEventForImage(ctx context.Context, namespace string, i

layers, err := getLayersWithHistory(ctxWithNamespace, img.ContentStore(), manifest)
if err != nil {
_ = log.Warnf("error while getting layers with history: %s", err)
log.Warnf("error while getting layers with history: %s", err)

// Not sure if the layers and history are always available. Instead of
// returning an error, collect the image without this information.
Expand All @@ -115,6 +168,15 @@ func (c *collector) notifyEventForImage(ctx context.Context, namespace string, i
}

imageName := img.Name()
shortName := ""
parsedImg, err := workloadmeta.NewContainerImage(imageName)
if err == nil {
// Don't set a short name. We know that some images handled here contain
// "sha256" in the name, and those don't have a short name.
} else {
shortName = parsedImg.ShortName
}

imageID := manifest.Config.Digest.String()

// We can get "create" events for images that already exist. That happens
Expand All @@ -125,41 +187,35 @@ func (c *collector) notifyEventForImage(ctx context.Context, namespace string, i
// name is a digest, in other is something with the same format as
// datadog/agent:7, and sometimes there's a temporary name prefixed with
// "import-".
// When that happens, give precedence to the name with repo and tag instead
// of the name that includes a digest. This is just to show names that are
// more user-friendly (the digests are already present in other attributes
// like ID, and repo digest).
existingImg, err := c.store.GetImage(imageID)
if err == nil {
updatedImg := existingImg.DeepCopy().(*workloadmeta.ContainerImageMetadata) // Avoid race conditions
changed := c.updateContainerImageMetadata(updatedImg, imageName)

if changed {
c.store.Notify([]workloadmeta.CollectorEvent{
{
Type: workloadmeta.EventTypeSet,
Source: workloadmeta.SourceRuntime,
Entity: updatedImg,
},
})

c.updateKnownImages(imageName, imageID)
if strings.Contains(imageName, "sha256:") && !strings.Contains(existingImg.Name, "sha256:") {
imageName = existingImg.Name
shortName = existingImg.ShortName
}

return nil
}

shortName := ""
parsedImg, err := workloadmeta.NewContainerImage(imageName)
if err != nil {
_ = log.Warn("Can't get image short name")
} else {
shortName = parsedImg.ShortName
}

var repoDigests []string
var repoTags []string
if strings.Contains(imageName, "@sha256:") {
repoDigests = append(repoDigests, imageName)
} else {
repoDigests = append(repoDigests, imageName+"@"+img.Target().Digest.String())
repoTags = append(repoTags, imageName)

repoTagAlreadyPresent := false
for _, repoTag := range c.repoTags[imageID] {
if repoTag == imageName {
repoTagAlreadyPresent = true
break
}
}

if !repoTagAlreadyPresent {
c.repoTags[imageID] = append(c.repoTags[imageID], imageName)
}
}

var totalSizeBytes int64 = 0
Expand Down Expand Up @@ -187,7 +243,7 @@ func (c *collector) notifyEventForImage(ctx context.Context, namespace string, i
Labels: img.Labels(),
},
ShortName: shortName,
RepoTags: repoTags,
RepoTags: c.repoTags[imageID],
RepoDigests: repoDigests,
MediaType: manifest.MediaType,
SizeBytes: totalSizeBytes,
Expand All @@ -206,39 +262,45 @@ func (c *collector) notifyEventForImage(ctx context.Context, namespace string, i
},
})

c.updateKnownImages(imageName, imageID)
return c.updateKnownImages(ctx, namespace, imageName, imageID)
}

// Updates the map with the image name => image ID relationships and also the repo tags
func (c *collector) updateKnownImages(ctx context.Context, namespace string, imageName string, newImageID string) error {
oldImageID, found := c.knownImages.getImageID(imageName)
c.knownImages.addAssociation(imageName, newImageID)

// If the image name is already pointing to an ID, we need to delete the name from
// the repo tags of the image with that ID.
if found && newImageID != oldImageID {
c.knownImages.deleteAssociation(imageName, oldImageID)
return c.deleteRepoTagOfImage(ctx, namespace, oldImageID, imageName)
}

return nil
}

func (c *collector) updateKnownImages(imageName string, newImageID string) {
currentImageID, found := c.knownImages[imageName]
func (c *collector) deleteRepoTagOfImage(ctx context.Context, namespace string, imageID string, repoTagToDelete string) error {
repoTagDeleted := false

// If the image name is already pointing to an ID, we need to delete it from
// the repo tags of the image with that ID.
if found {
existingImg, err := c.store.GetImage(currentImageID)
if err == nil {
existingImgCopy := existingImg.DeepCopy().(*workloadmeta.ContainerImageMetadata) // Avoid race conditions
for i, repoTag := range existingImgCopy.RepoTags {
if repoTag == imageName {
existingImgCopy.RepoTags = append(existingImgCopy.RepoTags[:i], existingImgCopy.RepoTags[i+1:]...)

c.store.Notify([]workloadmeta.CollectorEvent{
{
Type: workloadmeta.EventTypeSet,
Source: workloadmeta.SourceRuntime,
Entity: existingImgCopy,
},
})

break
}
}
for i, repoTag := range c.repoTags[imageID] {
if repoTag == repoTagToDelete {
c.repoTags[imageID] = append(c.repoTags[imageID][:i], c.repoTags[imageID][i+1:]...)
repoTagDeleted = true
break
}
}

c.knownImages[imageName] = newImageID
if repoTagDeleted && len(c.repoTags[imageID]) > 0 {
// We need to notify to workloadmeta that the image has changed.
// Updating workloadmeta entities directly is not thread-safe, that's
// why we generate an update event here instead.
if err := c.handleImageCreateOrUpdate(ctx, namespace, c.repoTags[imageID][len(c.repoTags[imageID])-1]); err != nil {
return err
}
}

return nil
}

func getLayersWithHistory(ctx context.Context, store content.Store, manifest ocispec.Manifest) ([]workloadmeta.ContainerImageLayer, error) {
Expand Down Expand Up @@ -294,51 +356,3 @@ func getLayersWithHistory(ctx context.Context, store content.Store, manifest oci

return layers, nil
}

// updateContainerImageMetadata Updates the given container image metadata so
// that it takes into account the new image name that refers to it. Returns a
// boolean that indicates if the image metadata was updated.
// There are 2 possible changes to be made:
// 1. Some environments refer to the same image with multiple names. When
// that's the case, give precedence to the name with repo and tag instead of
// the name that includes a digest. This is just to show names that are more
// user-friendly (the digests are already present in other attributes like ID,
// and repo digest).
// 2. Add the new name to the repo tags of the image. Notice that repo names
// are not digests.
func (c *collector) updateContainerImageMetadata(imageMetadata *workloadmeta.ContainerImageMetadata, newName string) bool {
if strings.Contains(newName, "sha256:") {
// Nothing to do. It's not going to replace the current name, and it's
// not a repo tag.
return false
}

changed := false

// If the current name is a digest and the new one is not, replace.
if strings.Contains(imageMetadata.Name, "sha256:") {
imageMetadata.Name = newName

parsedName, err := workloadmeta.NewContainerImage(newName)
if err == nil {
imageMetadata.ShortName = parsedName.ShortName
}

changed = true
}

// Add new repo tag if not already present.
addNewRepoTag := true
for _, existingRepoTag := range imageMetadata.RepoTags {
if existingRepoTag == newName {
addNewRepoTag = false
break
}
}
if addNewRepoTag {
imageMetadata.RepoTags = append(imageMetadata.RepoTags, newName)
changed = true
}

return changed
}
Loading

0 comments on commit c4fbfb1

Please sign in to comment.