Skip to content

Commit

Permalink
test: add integration test for engine restart
Browse files Browse the repository at this point in the history
  • Loading branch information
Peng Yin committed Mar 22, 2018
1 parent 6f16e3c commit 0ce3038
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 35 deletions.
34 changes: 33 additions & 1 deletion agent/engine/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import (
)

const (
containerID = "containerID"
containerID = "containerID"
waitTaskStateChangeDuration = 2 * time.Minute
)

var (
Expand Down Expand Up @@ -244,3 +245,34 @@ func waitForStopEvents(t *testing.T, stateChangeEvents <-chan statechange.Event,
default:
}
}

func triggerSteadyStateCheck(times int, task *managedTask) {
for i := 0; i < times; {
err := task.cancelSteadyStateWait()
if err != nil {
// Wait for the waitSteady to be invoked
time.Sleep(10 * time.Millisecond)
continue
}
i++
}
}

func waitForContainerHealthStatus(t *testing.T, testTask *api.Task) {
ctx, cancel := context.WithTimeout(context.TODO(), waitTaskStateChangeDuration)
defer cancel()

for {
select {
case <-ctx.Done():
t.Error("Timed out waiting for container health status")
default:
healthStatus := testTask.Containers[0].GetHealthStatus()
if healthStatus.Status.BackendStatus() == "UNKNOWN" {
time.Sleep(time.Second)
continue
}
return
}
}
}
8 changes: 4 additions & 4 deletions agent/engine/docker_image_manager_integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestIntegImageCleanupHappyCase(t *testing.T) {
cfg.MinimumImageDeletionAge = 1 * time.Second
cfg.NumImagesToDeletePerCycle = 2
// start agent
taskEngine, done, _ := setup(cfg, t)
taskEngine, done, _ := setup(cfg, nil, t)

imageManager := taskEngine.(*DockerTaskEngine).imageManager.(*dockerImageManager)
imageManager.SetSaver(statemanager.NewNoopStateManager())
Expand Down Expand Up @@ -166,7 +166,7 @@ func TestIntegImageCleanupThreshold(t *testing.T) {
// Set to delete three images, but in this test we expect only two images to be removed
cfg.NumImagesToDeletePerCycle = 3
// start agent
taskEngine, done, _ := setup(cfg, t)
taskEngine, done, _ := setup(cfg, nil, t)

imageManager := taskEngine.(*DockerTaskEngine).imageManager.(*dockerImageManager)
imageManager.SetSaver(statemanager.NewNoopStateManager())
Expand Down Expand Up @@ -280,7 +280,7 @@ func TestImageWithSameNameAndDifferentID(t *testing.T) {
// Set low values so this test can complete in a sane amout of time
cfg.MinimumImageDeletionAge = 15 * time.Minute

taskEngine, done, _ := setup(cfg, t)
taskEngine, done, _ := setup(cfg, nil, t)
defer done()

dockerClient := taskEngine.(*DockerTaskEngine).client
Expand Down Expand Up @@ -418,7 +418,7 @@ func TestImageWithSameIDAndDifferentNames(t *testing.T) {
// Set low values so this test can complete in a sane amout of time
cfg.MinimumImageDeletionAge = 15 * time.Minute

taskEngine, done, _ := setup(cfg, t)
taskEngine, done, _ := setup(cfg, nil, t)
defer done()

dockerClient := taskEngine.(*DockerTaskEngine).client
Expand Down
5 changes: 3 additions & 2 deletions agent/engine/docker_task_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,8 @@ func TestSteadyStatePoll(t *testing.T) {
// trigger steady state verification
mtasks := taskEngine.(*DockerTaskEngine).managedTasks
for _, task := range mtasks {
task.cancel()
// Trigger the steadyState check at least twice
triggerSteadyStateCheck(4, task)
}

// StopContainer might be invoked if the test execution is slow, during
Expand Down Expand Up @@ -1109,7 +1110,7 @@ func TestPauseContainerHappyPath(t *testing.T) {
// trigger steady state verification
mtasks := taskEngine.(*DockerTaskEngine).managedTasks
for _, task := range mtasks {
task.cancel()
triggerSteadyStateCheck(1, task)
}

var wg sync.WaitGroup
Expand Down
107 changes: 86 additions & 21 deletions agent/engine/engine_integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ import (
)

const (
testDockerStopTimeout = 2 * time.Second
credentialsIDIntegTest = "credsid"
waitTaskStateChangeDuration = 2 * time.Minute
testDockerStopTimeout = 2 * time.Second
credentialsIDIntegTest = "credsid"
)

func init() {
Expand All @@ -66,10 +65,14 @@ func defaultTestConfigIntegTest() *config.Config {
}

func setupWithDefaultConfig(t *testing.T) (TaskEngine, func(), credentials.Manager) {
return setup(defaultTestConfigIntegTest(), t)
return setup(defaultTestConfigIntegTest(), nil, t)
}

func setup(cfg *config.Config, t *testing.T) (TaskEngine, func(), credentials.Manager) {
func setupWithState(t *testing.T, state dockerstate.TaskEngineState) (TaskEngine, func(), credentials.Manager) {
return setup(defaultTestConfigIntegTest(), state, t)
}

func setup(cfg *config.Config, state dockerstate.TaskEngineState, t *testing.T) (TaskEngine, func(), credentials.Manager) {
if os.Getenv("ECS_SKIP_ENGINE_INTEG_TEST") != "" {
t.Skip("ECS_SKIP_ENGINE_INTEG_TEST")
}
Expand All @@ -82,7 +85,9 @@ func setup(cfg *config.Config, t *testing.T) (TaskEngine, func(), credentials.Ma
t.Fatalf("Error creating Docker client: %v", err)
}
credentialsManager := credentials.NewManager()
state := dockerstate.NewTaskEngineState()
if state == nil {
state = dockerstate.NewTaskEngineState()
}
imageManager := NewImageManager(cfg, dockerClient, state)
imageManager.SetSaver(statemanager.NewNoopStateManager())
metadataManager := containermetadata.NewManager(dockerClient, cfg)
Expand Down Expand Up @@ -224,7 +229,7 @@ func TestSweepContainer(t *testing.T) {
cfg := defaultTestConfigIntegTest()
cfg.TaskCleanupWaitDuration = 1 * time.Minute
cfg.ContainerMetadataEnabled = true
taskEngine, done, _ := setup(cfg, t)
taskEngine, done, _ := setup(cfg, nil, t)
defer done()

taskArn := "arn:aws:ecs:us-east-1:123456789012:task/testSweepContainer"
Expand Down Expand Up @@ -311,19 +316,79 @@ func TestContainerHealthCheck(t *testing.T) {
verifyTaskIsStopped(stateChangeEvents, testTask)
}

func waitForContainerHealthStatus(t *testing.T, testTask *api.Task) {
ctx, _ := context.WithTimeout(context.TODO(), waitTaskStateChangeDuration)
for {
select {
case <-ctx.Done():
t.Error("Timed out waiting for container health status")
default:
healthStatus := testTask.Containers[0].GetHealthStatus()
if healthStatus.Status.BackendStatus() == "UNKNOWN" {
time.Sleep(time.Second)
continue
}
return
}
// TestEngineSynchronize tests the agent synchronize the container status on restart
func TestEngineSynchronize(t *testing.T) {
taskEngine, done, _ := setupWithDefaultConfig(t)

taskArn := "arn:aws:ecs:us-east-1:123456789012:task/testEngineSynchronize"
testTask := createTestTask(taskArn)
testTask.Containers[0].Image = testVolumeImage

// Start a task
go taskEngine.AddTask(testTask)
verifyContainerRunningStateChange(t, taskEngine)
verifyTaskRunningStateChange(t, taskEngine)
// Record the container information
state := taskEngine.(*DockerTaskEngine).State()
containersMap, ok := state.ContainerMapByArn(taskArn)
require.True(t, ok, "no container found in the agent state")
require.Len(t, containersMap, 1)
containerIDs := state.GetAllContainerIDs()
require.Len(t, containerIDs, 1)
imageStates := state.AllImageStates()
assert.Len(t, imageStates, 1)
taskEngine.(*DockerTaskEngine).stopEngine()

containerBeforeSync, ok := containersMap[testTask.Containers[0].Name]
assert.True(t, ok, "container not found in the containers map")
// Task and Container restored from state file
containerSaved := &api.Container{
Name: containerBeforeSync.Container.Name,
SentStatusUnsafe: api.ContainerRunning,
DesiredStatusUnsafe: api.ContainerRunning,
}
task := &api.Task{
Arn: taskArn,
Containers: []*api.Container{
containerSaved,
},
KnownStatusUnsafe: api.TaskRunning,
DesiredStatusUnsafe: api.TaskRunning,
SentStatusUnsafe: api.TaskRunning,
}

state = dockerstate.NewTaskEngineState()
state.AddTask(task)
state.AddContainer(&api.DockerContainer{
DockerID: containerBeforeSync.DockerID,
Container: containerSaved,
}, task)
state.AddImageState(imageStates[0])

// Simulate the agent restart
taskEngine, done, _ = setupWithState(t, state)
defer done()

taskEngine.MustInit(context.TODO())

// Check container status/metadata and image information are correctly synchronized
containerIDAfterSync := state.GetAllContainerIDs()
require.Len(t, containerIDAfterSync, 1)
containerAfterSync, ok := state.ContainerByID(containerIDAfterSync[0])
assert.True(t, ok, "no container found in the agent state")

assert.Equal(t, containerAfterSync.Container.GetKnownStatus(), containerBeforeSync.Container.GetKnownStatus())
assert.Equal(t, containerAfterSync.Container.GetLabels(), containerBeforeSync.Container.GetLabels())
assert.Equal(t, containerAfterSync.Container.GetStartedAt(), containerBeforeSync.Container.GetStartedAt())
assert.Equal(t, containerAfterSync.Container.GetCreatedAt(), containerBeforeSync.Container.GetCreatedAt())

imageStateAfterSync := state.AllImageStates()
assert.Len(t, imageStateAfterSync, 1)
assert.Equal(t, *imageStateAfterSync[0], *imageStates[0])

testTask.SetDesiredStatus(api.TaskStopped)
go taskEngine.AddTask(testTask)

verifyContainerStoppedStateChange(t, taskEngine)
verifyTaskStoppedStateChange(t, taskEngine)
}
6 changes: 3 additions & 3 deletions agent/engine/engine_unix_integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func TestDockerCfgAuth(t *testing.T) {
cfg.EngineAuthType = "dockercfg"

removeImage(testAuthRegistryImage)
taskEngine, done, _ := setup(cfg, t)
taskEngine, done, _ := setup(cfg, nil, t)
defer done()
defer func() {
cfg.EngineAuthData = config.NewSensitiveRawMessage(nil)
Expand Down Expand Up @@ -522,7 +522,7 @@ func TestDockerAuth(t *testing.T) {
cfg.EngineAuthType = ""
}()

taskEngine, done, _ := setup(cfg, t)
taskEngine, done, _ := setup(cfg, nil, t)
defer done()
removeImage(testAuthRegistryImage)

Expand Down Expand Up @@ -773,7 +773,7 @@ func TestDockerStopTimeout(t *testing.T) {
defer os.Unsetenv("ECS_CONTAINER_STOP_TIMEOUT")
cfg := defaultTestConfigIntegTest()

taskEngine, _, _ := setup(cfg, t)
taskEngine, _, _ := setup(cfg, nil, t)

dockerTaskEngine := taskEngine.(*DockerTaskEngine)

Expand Down
45 changes: 41 additions & 4 deletions agent/engine/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/aws/amazon-ecs-agent/agent/utils/ttime"

"github.com/cihub/seelog"
"github.com/pkg/errors"
)

const (
Expand All @@ -46,8 +47,9 @@ const (
)

var (
_stoppedSentWaitInterval = stoppedSentWaitInterval
_maxStoppedWaitTimes = int(maxStoppedWaitTimes)
_stoppedSentWaitInterval = stoppedSentWaitInterval
_maxStoppedWaitTimes = int(maxStoppedWaitTimes)
taskNotWaitForSteadyStateError = errors.New("managed task: steady state check context is nil")
)

type acsTaskUpdate struct {
Expand Down Expand Up @@ -94,8 +96,11 @@ type containerTransition struct {
// task's statuses yourself)
type managedTask struct {
*api.Task
ctx context.Context
cancel context.CancelFunc
ctx context.Context
cancel context.CancelFunc
// waitSteadyCancel is the function to timeout the context for waiting for
// task steady state of which timeout duration is `steadyStateTaskVerifyInterval`
waitSteadyCancel context.CancelFunc
engine *DockerTaskEngine
cfg *config.Config
saver statemanager.Saver
Expand All @@ -121,6 +126,7 @@ type managedTask struct {
_timeOnce sync.Once

resource resources.Resource
lock sync.RWMutex
}

// newManagedTask is a method on DockerTaskEngine to create a new managedTask.
Expand Down Expand Up @@ -164,6 +170,13 @@ func (mtask *managedTask) overseeTask() {

// Main infinite loop. This is where we receive messages and dispatch work.
for {
select {
case <-mtask.ctx.Done():
seelog.Infof("Managed task [%s]: parent context cancelled, exit", mtask.Arn)
return
default:
}

// If it's steadyState, just spin until we need to do work
for mtask.steadyState() {
mtask.waitSteady()
Expand Down Expand Up @@ -269,6 +282,7 @@ func (mtask *managedTask) waitSteady() {

timeoutCtx, cancel := context.WithTimeout(mtask.ctx, steadyStateTaskVerifyInterval)
defer cancel()
mtask.setWaitSteadyCheckCancelFunc(cancel)
timedOut := mtask.waitEvent(timeoutCtx.Done())

if timedOut {
Expand All @@ -277,6 +291,29 @@ func (mtask *managedTask) waitSteady() {
}
}

// cancelSteadyStateWait checks if the task is waiting for steady state and stop
// the waiting if it it. This is currently only used in test
func (mtask *managedTask) cancelSteadyStateWait() error {
mtask.lock.Lock()
defer mtask.lock.Unlock()

if mtask.waitSteadyCancel == nil {
return taskNotWaitForSteadyStateError
}

mtask.waitSteadyCancel()
// reset the cancel func to nil to indicate the task isn't waiting for steady state
mtask.waitSteadyCancel = nil
return nil
}

// setWaitSteadyCheckCancelFunc record the steadyStateCheck cancel function in mtask
func (mtask *managedTask) setWaitSteadyCheckCancelFunc(cancel context.CancelFunc) {
mtask.lock.Lock()
defer mtask.lock.Unlock()
mtask.waitSteadyCancel = cancel
}

// steadyState returns if the task is in a steady state. Steady state is when task's desired
// and known status are both RUNNING
func (mtask *managedTask) steadyState() bool {
Expand Down

0 comments on commit 0ce3038

Please sign in to comment.