Skip to content

Commit

Permalink
Stop container when dependencies are non-resolvable (#3218)
Browse files Browse the repository at this point in the history
Stop container when dependencies are non-resolvable
  • Loading branch information
angelcar authored May 25, 2022
1 parent 23c5d34 commit 2dd337e
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 47 deletions.
57 changes: 39 additions & 18 deletions agent/engine/dependencygraph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,45 @@ const (
var (
// CredentialsNotResolvedErr is the error where a container needs to wait for
// credentials before it can process by agent
CredentialsNotResolvedErr = errors.New("dependency graph: container execution credentials not available")
CredentialsNotResolvedErr = &dependencyError{err: errors.New("dependency graph: container execution credentials not available")}
// DependentContainerNotResolvedErr is the error where a dependent container isn't in expected state
DependentContainerNotResolvedErr = errors.New("dependency graph: dependent container not in expected state")
DependentContainerNotResolvedErr = &dependencyError{err: errors.New("dependency graph: dependent container not in expected state")}
// ContainerPastDesiredStatusErr is the error where the container status is bigger than desired status
ContainerPastDesiredStatusErr = errors.New("container transition: container status is equal or greater than desired status")
ContainerPastDesiredStatusErr = &dependencyError{err: errors.New("container transition: container status is equal or greater than desired status")}
// ErrContainerDependencyNotResolved is when the container's dependencies
// on other containers are not resolved
ErrContainerDependencyNotResolved = errors.New("dependency graph: dependency on containers not resolved")
ErrContainerDependencyNotResolved = &dependencyError{err: errors.New("dependency graph: dependency on containers not resolved")}
// ErrResourceDependencyNotResolved is when the container's dependencies
// on task resources are not resolved
ErrResourceDependencyNotResolved = errors.New("dependency graph: dependency on resources not resolved")
ErrResourceDependencyNotResolved = &dependencyError{err: errors.New("dependency graph: dependency on resources not resolved")}
// ResourcePastDesiredStatusErr is the error where the task resource known status is bigger than desired status
ResourcePastDesiredStatusErr = errors.New("task resource transition: task resource status is equal or greater than desired status")
ResourcePastDesiredStatusErr = &dependencyError{err: errors.New("task resource transition: task resource status is equal or greater than desired status")}
// ErrContainerDependencyNotResolvedForResource is when the resource's dependencies
// on other containers are not resolved
ErrContainerDependencyNotResolvedForResource = errors.New("dependency graph: resource's dependency on containers not resolved")
ErrContainerDependencyNotResolvedForResource = &dependencyError{err: errors.New("dependency graph: resource's dependency on containers not resolved")}
)

// DependencyError represents an error of a container dependency. These errors can be either terminal or non-terminal.
// Terminal dependency errors indicate that a given dependency can never be fulfilled (e.g. a container with a SUCCESS
// dependency has stopped with an exit code other than zero).
type DependencyError interface {
Error() string
IsTerminal() bool
}

type dependencyError struct {
err error
isTerminal bool
}

func (de *dependencyError) Error() string {
return de.err.Error()
}

func (de *dependencyError) IsTerminal() bool {
return de.isTerminal
}

// ValidDependencies takes a task and verifies that it is possible to allow all
// containers within it to reach the desired status by proceeding in some
// order.
Expand Down Expand Up @@ -124,7 +145,7 @@ func DependenciesAreResolved(target *apicontainer.Container,
id string,
manager credentials.Manager,
resources []taskresource.TaskResource,
cfg *config.Config) (*apicontainer.DependsOn, error) {
cfg *config.Config) (*apicontainer.DependsOn, DependencyError) {
if !executionCredentialsResolved(target, id, manager) {
return nil, CredentialsNotResolvedErr
}
Expand Down Expand Up @@ -244,7 +265,7 @@ func verifyStatusResolvable(target *apicontainer.Container, existingContainers m
// (map from name to container). The `resolves` function passed should return true if the named container is resolved.

func verifyContainerOrderingStatusResolvable(target *apicontainer.Container, existingContainers map[string]*apicontainer.Container,
cfg *config.Config, resolves func(*apicontainer.Container, *apicontainer.Container, string, *config.Config) bool) (*apicontainer.DependsOn, error) {
cfg *config.Config, resolves func(*apicontainer.Container, *apicontainer.Container, string, *config.Config) bool) (*apicontainer.DependsOn, DependencyError) {

targetGoal := target.GetDesiredStatus()
targetKnown := target.GetKnownStatus()
Expand All @@ -260,44 +281,44 @@ func verifyContainerOrderingStatusResolvable(target *apicontainer.Container, exi
for _, dependency := range targetDependencies {
dependencyContainer, ok := existingContainers[dependency.ContainerName]
if !ok {
return nil, fmt.Errorf("dependency graph: container ordering dependency [%v] for target [%v] does not exist.", dependencyContainer, target)
return nil, &dependencyError{err: fmt.Errorf("dependency graph: container ordering dependency [%v] for target [%v] does not exist.", dependencyContainer, target), isTerminal: true}
}

// We want to check whether the dependency container has timed out only if target has not been created yet.
// If the target is already created, then everything is normal and dependency can be and is resolved.
// However, if dependency container has already stopped, then it cannot time out.
if targetKnown < apicontainerstatus.ContainerCreated && dependencyContainer.GetKnownStatus() != apicontainerstatus.ContainerStopped {
if hasDependencyTimedOut(dependencyContainer, dependency.Condition) {
return nil, fmt.Errorf("dependency graph: container ordering dependency [%v] for target [%v] has timed out.", dependencyContainer, target)
return nil, &dependencyError{err: fmt.Errorf("dependency graph: container ordering dependency [%v] for target [%v] has timed out.", dependencyContainer, target), isTerminal: true}
}
}

// We want to fail fast if the dependency container has stopped but did not exit successfully because target container
// can then never progress to its desired state when the dependency condition is 'SUCCESS'
if dependency.Condition == successCondition && dependencyContainer.GetKnownStatus() == apicontainerstatus.ContainerStopped &&
!hasDependencyStoppedSuccessfully(dependencyContainer) {
return nil, fmt.Errorf("dependency graph: failed to resolve container ordering dependency [%v] for target [%v] as dependency did not exit successfully.", dependencyContainer, target)
return nil, &dependencyError{err: fmt.Errorf("dependency graph: failed to resolve container ordering dependency [%v] for target [%v] as dependency did not exit successfully.", dependencyContainer, target), isTerminal: true}
}

// For any of the dependency conditions - START/COMPLETE/SUCCESS/HEALTHY, if the dependency container has
// not started and will not start in the future, this dependency can never be resolved.
if dependencyContainer.HasNotAndWillNotStart() {
return nil, fmt.Errorf("dependency graph: failed to resolve container ordering dependency [%v] for target [%v] because dependency will never start", dependencyContainer, target)
return nil, &dependencyError{err: fmt.Errorf("dependency graph: failed to resolve container ordering dependency [%v] for target [%v] because dependency will never start", dependencyContainer, target), isTerminal: true}
}

if !resolves(target, dependencyContainer, dependency.Condition, cfg) {
blockedDependency = &dependency
}
}
if blockedDependency != nil {
return blockedDependency, fmt.Errorf("dependency graph: failed to resolve the container ordering dependency [%v] for target [%v]", blockedDependency, target)
return blockedDependency, &dependencyError{err: fmt.Errorf("dependency graph: failed to resolve the container ordering dependency [%v] for target [%v]", blockedDependency, target)}
}
return nil, nil
}

func verifyTransitionDependenciesResolved(target *apicontainer.Container,
existingContainers map[string]*apicontainer.Container,
existingResources map[string]taskresource.TaskResource) error {
existingResources map[string]taskresource.TaskResource) DependencyError {

if !verifyContainerDependenciesResolved(target, existingContainers) {
return ErrContainerDependencyNotResolved
Expand Down Expand Up @@ -471,7 +492,7 @@ func verifyContainerOrderingStatus(dependsOnContainer *apicontainer.Container) b
dependsOnContainerDesiredStatus == dependsOnContainer.GetSteadyStateStatus()
}

func verifyShutdownOrder(target *apicontainer.Container, existingContainers map[string]*apicontainer.Container) error {
func verifyShutdownOrder(target *apicontainer.Container, existingContainers map[string]*apicontainer.Container) DependencyError {
// We considered adding this to the task state, but this will be at most 45 loops,
// so we err'd on the side of having less state.
missingShutdownDependencies := []string{}
Expand All @@ -493,8 +514,8 @@ func verifyShutdownOrder(target *apicontainer.Container, existingContainers map[
return nil
}

return fmt.Errorf("dependency graph: target %s needs other containers stopped before it can stop: [%s]",
target.Name, strings.Join(missingShutdownDependencies, "], ["))
return &dependencyError{err: fmt.Errorf("dependency graph: target %s needs other containers stopped before it can stop: [%s]",
target.Name, strings.Join(missingShutdownDependencies, "], ["))}
}

func onSteadyStateCanResolve(target *apicontainer.Container, run *apicontainer.Container) bool {
Expand Down
55 changes: 46 additions & 9 deletions agent/engine/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type containerTransition struct {
nextState apicontainerstatus.ContainerStatus
actionRequired bool
blockedOn *apicontainer.DependsOn
reason error
reason dependencygraph.DependencyError
}

// resourceTransition defines the struct for a resource to transition.
Expand Down Expand Up @@ -1088,6 +1088,9 @@ func (mtask *managedTask) startContainerTransitions(transitionFunc containerTran
for _, cont := range mtask.Containers {
transition := mtask.containerNextState(cont)
if transition.reason != nil {
if transition.reason.IsTerminal() {
mtask.handleTerminalDependencyError(cont, transition.reason)
}
// container can't be transitioned
reasons = append(reasons, transition.reason)
if transition.blockedOn != nil {
Expand Down Expand Up @@ -1128,6 +1131,31 @@ func (mtask *managedTask) startContainerTransitions(transitionFunc containerTran
return anyCanTransition, blocked, transitions, reasons
}

func (mtask *managedTask) handleTerminalDependencyError(container *apicontainer.Container, error dependencygraph.DependencyError) {
logger.Error("Terminal error detected during transition; marking container as stopped", logger.Fields{
field.Container: container.Name,
field.Error: error.Error(),
})
container.SetDesiredStatus(apicontainerstatus.ContainerStopped)
exitCode := 143
container.SetKnownExitCode(&exitCode)
// Change container status to STOPPED with exit code 143. This exit code is what docker reports when
// a container receives SIGTERM. In this case it's technically not true that we send SIGTERM because the
// container didn't even start, but we have to report an error and 143 seems the most appropriate.
go func(cont *apicontainer.Container) {
mtask.dockerMessages <- dockerContainerChange{
container: cont,
event: dockerapi.DockerContainerChangeEvent{
Status: apicontainerstatus.ContainerStopped,
DockerContainerMetadata: dockerapi.DockerContainerMetadata{
Error: dockerapi.CannotStartContainerError{FromError: error},
ExitCode: &exitCode,
},
},
}
}(container)
}

// startResourceTransitions steps through each resource in the task and calls
// the passed transition function when a transition should occur
func (mtask *managedTask) startResourceTransitions(transitionFunc resourceTransitionFunc) (bool, map[string]string) {
Expand Down Expand Up @@ -1370,10 +1398,6 @@ func (mtask *managedTask) resourceNextState(resource taskresource.TaskResource)
}

func (mtask *managedTask) handleContainersUnableToTransitionState() {
logger.Critical("Task in a bad state; it's not steady state but no containers want to transition", logger.Fields{
field.TaskID: mtask.GetID(),
})

if mtask.GetDesiredStatus().Terminal() {
// Ack, really bad. We want it to stop but the containers don't think
// that's possible. let's just break out and hope for the best!
Expand All @@ -1384,10 +1408,23 @@ func (mtask *managedTask) handleContainersUnableToTransitionState() {
mtask.emitTaskEvent(mtask.Task, taskUnableToTransitionToStoppedReason)
// TODO we should probably panic here
} else {
logger.Critical("Moving task to stopped due to bad state", logger.Fields{
field.TaskID: mtask.GetID(),
})
mtask.handleDesiredStatusChange(apitaskstatus.TaskStopped, 0)
// If we end up here, it means containers are not able to transition anymore; maybe because of dependencies that
// are unable to start. Therefore, if there are essential containers that haven't started yet, we need to
// stop the task since they are not going to start.
stopTask := false
for _, c := range mtask.Containers {
if c.IsEssential() && !c.IsKnownSteadyState() {
stopTask = true
break
}
}

if stopTask {
logger.Critical("Task in a bad state; it's not steady state but no containers want to transition", logger.Fields{
field.TaskID: mtask.GetID(),
})
mtask.handleDesiredStatusChange(apitaskstatus.TaskStopped, 0)
}
}
}

Expand Down
Loading

0 comments on commit 2dd337e

Please sign in to comment.