Skip to content

Commit

Permalink
[eclipse-kanto#53] Unit test the resources clean up implementation
Browse files Browse the repository at this point in the history
Covered the containerd client internal logic.
Covered the resources monitoring mechanism.
Optimized watched resources disposal.
Improved input parameters readability for the containerd internal client initialization.

Signed-off-by: Konstantina Gramatova <[email protected]>
  • Loading branch information
konstantina-gramatova committed Sep 19, 2022
1 parent 483e508 commit 0b706c0
Show file tree
Hide file tree
Showing 8 changed files with 808 additions and 52 deletions.
2 changes: 1 addition & 1 deletion containerm/ctr/ctrd_client_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/eclipse-kanto/container-management/containerm/util"
)

func newContainerdClient(namespace, socket, rootExec, metaPath string, registryConfigs map[string]*RegistryConfig, imageDecKeys, imageDecRecipients []string, runcRuntime types.Runtime, imageExpiry time.Duration, imageExpiryDisable bool, leaseID string) (ContainerAPIClient, error) {
func newContainerdClient(namespace string, socket string, rootExec string, metaPath string, registryConfigs map[string]*RegistryConfig, imageDecKeys, imageDecRecipients []string, runcRuntime types.Runtime, imageExpiry time.Duration, imageExpiryDisable bool, leaseID string) (ContainerAPIClient, error) {

//ensure storage
err := util.MkDir(rootExec)
Expand Down
168 changes: 164 additions & 4 deletions containerm/ctr/ctrd_client_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1144,7 +1144,7 @@ func TestClientInternalProcessEvents(t *testing.T) {
}

func TestClientInternalIsImageUsed(t *testing.T) {
testImgRef := "test.image/ref:latest"
const testImgRef = "test.image/ref:latest"
entryDigest := digest.NewDigest(digest.SHA256, sha256.New())

testCases := map[string]struct {
Expand Down Expand Up @@ -1200,7 +1200,7 @@ func TestClientInternalIsImageUsed(t *testing.T) {
}

func TestClientInternalRemoveUnusedImage(t *testing.T) {
testImgRef := "test.image/ref:latest"
const testImgRef = "test.image/ref:latest"
entryDigest := digest.NewDigest(digest.SHA256, sha256.New())

testCases := map[string]struct {
Expand All @@ -1222,6 +1222,15 @@ func TestClientInternalRemoveUnusedImage(t *testing.T) {
return errImageIsInUse
},
},
"test_not_used_delete_not_found_error": {
mockExec: func(ctx context.Context, spiMock *mocksCtrd.MockcontainerdSpi, imageMock *mocksContainerd.MockImage) error {
imageMock.EXPECT().Name().Return(testImgRef).Times(2)
imageMock.EXPECT().RootFS(ctx).Return([]digest.Digest{entryDigest}, nil)
spiMock.EXPECT().ListSnapshots(ctx, fmt.Sprintf(snapshotsWalkFilterFormat, entryDigest.String())).Return(nil, nil)
spiMock.EXPECT().DeleteImage(ctx, testImgRef).Return(errdefs.ErrNotFound)
return nil
},
},
"test_not_used_delete_error": {
mockExec: func(ctx context.Context, spiMock *mocksCtrd.MockcontainerdSpi, imageMock *mocksContainerd.MockImage) error {
imageMock.EXPECT().Name().Return(testImgRef).Times(2)
Expand Down Expand Up @@ -1266,7 +1275,7 @@ func TestClientInternalRemoveUnusedImage(t *testing.T) {
}

func TestClientInternalHandleImageExpired(t *testing.T) {
testImgRef := "test.image/ref:latest"
const testImgRef = "test.image/ref:latest"
entryDigest := digest.NewDigest(digest.SHA256, sha256.New())

testCases := map[string]struct {
Expand Down Expand Up @@ -1294,6 +1303,15 @@ func TestClientInternalHandleImageExpired(t *testing.T) {
return err
},
},
"test_used_error": {
mockExec: func(ctx context.Context, spiMock *mocksCtrd.MockcontainerdSpi, imageMock *mocksContainerd.MockImage) error {
spiMock.EXPECT().GetImage(ctx, testImgRef).Return(imageMock, nil)
imageMock.EXPECT().Name().Return(testImgRef).Times(2)
imageMock.EXPECT().RootFS(ctx).Return([]digest.Digest{entryDigest}, nil)
spiMock.EXPECT().ListSnapshots(ctx, fmt.Sprintf(snapshotsWalkFilterFormat, entryDigest.String())).Return([]snapshots.Info{{}}, nil)
return nil
},
},
"test_no_error": {
mockExec: func(ctx context.Context, spiMock *mocksCtrd.MockcontainerdSpi, imageMock *mocksContainerd.MockImage) error {
spiMock.EXPECT().GetImage(ctx, testImgRef).Return(imageMock, nil)
Expand Down Expand Up @@ -1329,7 +1347,7 @@ func TestClientInternalHandleImageExpired(t *testing.T) {
}

func TestClientInternalManageImageExpiry(t *testing.T) {
testImgRef := "test.image/ref:latest"
const testImgRef = "test.image/ref:latest"
entryDigest := digest.NewDigest(digest.SHA256, sha256.New())

testCases := map[string]struct {
Expand Down Expand Up @@ -1421,3 +1439,145 @@ func TestClientInternalManageImageExpiry(t *testing.T) {
})
}
}
func TestClientInternalHandleImageExpiryOnRemove(t *testing.T) {
const (
testImgRef = "test.image/ref:latest"
imagesExpiry = 2 * time.Hour
)

testCases := map[string]struct {
imagesExpiryDisabled bool
mockExec func(ctx context.Context, spiMock *mocksCtrd.MockcontainerdSpi, watcherMock *MockresourcesWatcher, imageMock *mocksContainerd.MockImage) error
}{
"test_get_image_error": {
imagesExpiryDisabled: false,
mockExec: func(ctx context.Context, spiMock *mocksCtrd.MockcontainerdSpi, _ *MockresourcesWatcher, _ *mocksContainerd.MockImage) error {
err := log.NewError("test error")
spiMock.EXPECT().GetImage(ctx, testImgRef).Return(nil, err)
return err
},
},
"test_manage_expiry_error": {
imagesExpiryDisabled: false,
mockExec: func(ctx context.Context, spiMock *mocksCtrd.MockcontainerdSpi, watcherMock *MockresourcesWatcher, imageMock *mocksContainerd.MockImage) error {
spiMock.EXPECT().GetImage(ctx, testImgRef).Return(imageMock, nil)
imageMock.EXPECT().Name().Return(testImgRef)
imageMock.EXPECT().Metadata().Return(images.Image{CreatedAt: time.Now().Add(-5 * time.Minute)})
err := log.NewError("test error")
watcherMock.EXPECT().Watch(testImgRef, gomock.Any(), gomock.Any()).Return(err)
return err
},
},
"test_manage_expiry_no_error": {
imagesExpiryDisabled: false,
mockExec: func(ctx context.Context, spiMock *mocksCtrd.MockcontainerdSpi, watcherMock *MockresourcesWatcher, imageMock *mocksContainerd.MockImage) error {
spiMock.EXPECT().GetImage(ctx, testImgRef).Return(imageMock, nil)
imageMock.EXPECT().Name().Return(testImgRef)
imageMock.EXPECT().Metadata().Return(images.Image{CreatedAt: time.Now().Add(-5 * time.Minute)})
watcherMock.EXPECT().Watch(testImgRef, gomock.Any(), gomock.Any()).Return(nil)
return nil
},
},
"test_disabled": {
imagesExpiryDisabled: true,
mockExec: func(ctx context.Context, spiMock *mocksCtrd.MockcontainerdSpi, watcherMock *MockresourcesWatcher, imageMock *mocksContainerd.MockImage) error {
spiMock.EXPECT().GetImage(ctx, testImgRef).Return(imageMock, nil).Times(0)
imageMock.EXPECT().Name().Return(testImgRef).Times(0)
imageMock.EXPECT().Metadata().Return(images.Image{CreatedAt: time.Now().Add(-5 * time.Minute)}).Times(0)
watcherMock.EXPECT().Watch(testImgRef, gomock.Any(), gomock.Any()).Return(nil).Times(0)
return nil
},
},
}

for testCaseName, testCaseData := range testCases {
t.Run(testCaseName, func(t *testing.T) {
t.Log(testCaseName)

ctrl := gomock.NewController(t)
defer ctrl.Finish()
// init mocks
spiMock := mocksCtrd.NewMockcontainerdSpi(ctrl)
watcherMock := NewMockresourcesWatcher(ctrl)
imageMock := mocksContainerd.NewMockImage(ctrl)

ctx := context.Background()
ctrdClient := &containerdClient{
spi: spiMock,
imagesWatcher: watcherMock,
imageExpiry: imagesExpiry,
imageExpiryDisable: testCaseData.imagesExpiryDisabled,
}
// mock exec
expectedErr := testCaseData.mockExec(ctx, spiMock, watcherMock, imageMock)

err := ctrdClient.handleImageExpiryOnRemove(ctx, testImgRef)
testutil.AssertError(t, expectedErr, err)
})
}
}

func TestClientInternalInitImagesExpiryManagement(t *testing.T) {
const (
testImgRef = "test.image/ref:latest"
imagesExpiry = 2 * time.Hour
)

testCases := map[string]struct {
mockExec func(ctx context.Context, spiMock *mocksCtrd.MockcontainerdSpi, watcherMock *MockresourcesWatcher, imageMock *mocksContainerd.MockImage) error
}{
"test_list_images_error": {
mockExec: func(ctx context.Context, spiMock *mocksCtrd.MockcontainerdSpi, _ *MockresourcesWatcher, _ *mocksContainerd.MockImage) error {
err := log.NewError("test error")
spiMock.EXPECT().ListImages(ctx).Return(nil, err)
return err
},
},
"test_watch_image_error": {
mockExec: func(ctx context.Context, spiMock *mocksCtrd.MockcontainerdSpi, watcherMock *MockresourcesWatcher, imageMock *mocksContainerd.MockImage) error {
spiMock.EXPECT().ListImages(ctx).Return([]containerd.Image{imageMock, imageMock}, nil)
imageMock.EXPECT().Name().Return(testImgRef).Times(4)
imageMock.EXPECT().Metadata().Return(images.Image{CreatedAt: time.Now().Add(-5 * time.Minute)}).Times(2) // not expired
err := log.NewError("test error")
gomock.InOrder(
watcherMock.EXPECT().Watch(testImgRef, gomock.Any(), gomock.Any()).Return(err), // only first one fails
watcherMock.EXPECT().Watch(testImgRef, gomock.Any(), gomock.Any()).Return(nil),
)
return nil
},
},
"test_no_error": {
mockExec: func(ctx context.Context, spiMock *mocksCtrd.MockcontainerdSpi, watcherMock *MockresourcesWatcher, imageMock *mocksContainerd.MockImage) error {
spiMock.EXPECT().ListImages(ctx).Return([]containerd.Image{imageMock, imageMock}, nil)
imageMock.EXPECT().Name().Return(testImgRef).Times(4)
imageMock.EXPECT().Metadata().Return(images.Image{CreatedAt: time.Now().Add(-5 * time.Minute)}).Times(2) // not expired
watcherMock.EXPECT().Watch(testImgRef, gomock.Any(), gomock.Any()).Return(nil).Times(2)
return nil
},
},
}
for testCaseName, testCaseData := range testCases {
t.Run(testCaseName, func(t *testing.T) {
t.Log(testCaseName)

ctrl := gomock.NewController(t)
defer ctrl.Finish()
// init mocks
spiMock := mocksCtrd.NewMockcontainerdSpi(ctrl)
watcherMock := NewMockresourcesWatcher(ctrl)
imageMock := mocksContainerd.NewMockImage(ctrl)

ctx := context.Background()
ctrdClient := &containerdClient{
spi: spiMock,
imagesWatcher: watcherMock,
imageExpiry: imagesExpiry,
}
// mock exec
expectedErr := testCaseData.mockExec(ctx, spiMock, watcherMock, imageMock)

err := ctrdClient.initImagesExpiryManagement(ctx)
testutil.AssertError(t, expectedErr, err)
})
}
}
34 changes: 19 additions & 15 deletions containerm/ctr/ctrd_resource_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,18 @@ type watchInfo struct {

type resWatcher struct {
sync.Mutex
watchCache map[string]watchInfo
watchCacheLock sync.RWMutex
watcherCtx context.Context
watcherCtxCancel context.CancelFunc
watchCache map[string]watchInfo
watchCacheLock sync.RWMutex
watcherCtx context.Context
watcherCtxCancel context.CancelFunc
watchCacheWaitGroup *sync.WaitGroup
}

func newResourcesWatcher(ctx context.Context) resourcesWatcher {
watcher := &resWatcher{
watchCache: make(map[string]watchInfo),
watchCacheLock: sync.RWMutex{},
watchCache: make(map[string]watchInfo),
watchCacheLock: sync.RWMutex{},
watchCacheWaitGroup: &sync.WaitGroup{},
}
watcher.watcherCtx, watcher.watcherCtxCancel = context.WithCancel(ctx)
return watcher
Expand All @@ -72,17 +74,20 @@ func (watcher *resWatcher) Watch(resourceID string, duration time.Duration, expi
expiredHandler: expiredHandler,
}
watcher.watchCache[info.resourceID] = info
watcher.watchCacheWaitGroup.Add(1)
go func(ctx context.Context, info watchInfo) {
defer watcher.watchCacheWaitGroup.Done()
select {
case <-info.timer.C:
if info.expiredHandler != nil {
if err := info.expiredHandler(ctx, info.resourceID); err != nil {
log.WarnErr(err, "error while handling monitoring expiry for resource %s", info.resourceID)
}
}
watcher.cleanCache(info.resourceID)
watcher.cleanCache(info.resourceID, false)
log.Debug("successfully processed expired resource %s", info.resourceID)
case <-ctx.Done():
watcher.cleanCache(info.resourceID, true)
log.Debug("cancelled monitoring for resource %s", info.resourceID)
}
log.Debug("finished watch process for resource %s", info.resourceID)
Expand All @@ -97,23 +102,22 @@ func (watcher *resWatcher) Dispose() {
log.Debug("resource watcher is disposing")
watcher.watcherCtxCancel()

watcher.watchCacheLock.RLock()
defer watcher.watchCacheLock.RUnlock()
log.Debug("waiting for monitoring routines to finish")
watcher.watchCacheWaitGroup.Wait()

for infoKey, info := range watcher.watchCache {
log.Debug("stopping monitoring for resource %s", infoKey)
info.timer.Stop()
}
log.Debug("resource watcher disposed")
}
func (watcher *resWatcher) cleanCache(id string) {
func (watcher *resWatcher) cleanCache(id string, withStop bool) {
watcher.watchCacheLock.Lock()
defer watcher.watchCacheLock.Unlock()
info, ok := watcher.watchCache[id]
if ok {
if withStop && info.timer.Stop() {
log.Debug("stopped monitoring timer for resource %s", info.resourceID)
}
delete(watcher.watchCache, id)
log.Debug("removed watch cache for resource %s", info.resourceID)
} else {
log.Debug("no watch cache to remove for resource %s", info.resourceID)
log.Warn("no watch cache found for resource %s", info.resourceID)
}
}
Loading

0 comments on commit 0b706c0

Please sign in to comment.