Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop steps after they are done #2681

Merged
merged 10 commits into from
Nov 1, 2023
16 changes: 16 additions & 0 deletions pipeline/backend/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,22 @@ func (e *docker) TailStep(ctx context.Context, step *backend.Step, taskUUID stri
return rc, nil
}

func (e *docker) DestroyStep(ctx context.Context, step *backend.Step, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("stop step %s", step.Name)

containerName := toContainerName(step)

if err := e.client.ContainerKill(ctx, containerName, "9"); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
return err
}

if err := e.client.ContainerRemove(ctx, containerName, removeOpts); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
return err
}

return nil
}

func (e *docker) DestroyWorkflow(_ context.Context, conf *backend.Config, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("delete workflow environment")

Expand Down
27 changes: 24 additions & 3 deletions pipeline/backend/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,29 @@ func (e *kube) TailStep(ctx context.Context, step *types.Step, taskUUID string)
// return rc, nil
}

func (e *kube) DestroyStep(ctx context.Context, step *types.Step, taskUUID string) error {
podName, err := dnsName(step.Name)
if err != nil {
return err
}

log.Trace().Str("taskUUID", taskUUID).Msgf("Stopping pod: %s", podName)

gracePeriodSeconds := int64(0) // immediately
dpb := metav1.DeletePropagationBackground

deleteOpts := metav1.DeleteOptions{
GracePeriodSeconds: &gracePeriodSeconds,
PropagationPolicy: &dpb,
}

if err := e.client.CoreV1().Pods(e.config.Namespace).Delete(ctx, podName, deleteOpts); err != nil && !errors.IsNotFound(err) {
return err
}

return nil
}

// Destroy the pipeline environment.
func (e *kube) DestroyWorkflow(_ context.Context, conf *types.Config, taskUUID string) error {
anbraten marked this conversation as resolved.
Show resolved Hide resolved
log.Trace().Str("taskUUID", taskUUID).Msg("Deleting Kubernetes primitives")
Expand All @@ -349,9 +372,7 @@ func (e *kube) DestroyWorkflow(_ context.Context, conf *types.Config, taskUUID s
}
log.Trace().Msgf("Deleting pod: %s", stepName)
if err := e.client.CoreV1().Pods(e.config.Namespace).Delete(noContext, stepName, deleteOpts); err != nil {
if errors.IsNotFound(err) {
log.Trace().Err(err).Msgf("Unable to delete pod %s", stepName)
} else {
if !errors.IsNotFound(err) {
return err
}
}
Expand Down
5 changes: 5 additions & 0 deletions pipeline/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,11 @@ func (e *local) TailStep(_ context.Context, step *types.Step, taskUUID string) (
return e.output, nil
}

func (e *local) DestroyStep(_ context.Context, _ *types.Step, _ string) error {
// WaitStep already waits for the command to finish, so there is nothing to do here.
return nil
}

// DestroyWorkflow the pipeline environment.
func (e *local) DestroyWorkflow(_ context.Context, _ *types.Config, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("delete workflow environment")
Expand Down
15 changes: 9 additions & 6 deletions pipeline/backend/types/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,25 @@ type Engine interface {
// IsAvailable check if the backend is available.
IsAvailable(ctx context.Context) bool

// Load the backend engine.
// Load loads the backend engine.
Load(ctx context.Context) error

// SetupWorkflow the workflow environment.
// SetupWorkflow sets up the workflow environment.
SetupWorkflow(ctx context.Context, conf *Config, taskUUID string) error

// StartStep start the workflow step.
// StartStep starts the workflow step.
StartStep(ctx context.Context, step *Step, taskUUID string) error

// WaitStep for the workflow step to complete and returns
// WaitStep waits for the workflow step to complete and returns
// the completion results.
WaitStep(ctx context.Context, step *Step, taskUUID string) (*State, error)

// TailStep the workflow step logs.
// TailStep tails the workflow step logs.
TailStep(ctx context.Context, step *Step, taskUUID string) (io.ReadCloser, error)

// DestroyWorkflow the workflow environment.
// DestroyStep destroys the workflow step.
DestroyStep(ctx context.Context, step *Step, taskUUID string) error

// DestroyWorkflow destroys the workflow environment.
DestroyWorkflow(ctx context.Context, conf *Config, taskUUID string) error
}
4 changes: 4 additions & 0 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,10 @@ func (r *Runtime) exec(step *backend.Step) (*backend.State, error) {
return nil, err
}

if err := r.engine.DestroyStep(r.ctx, step, r.taskUUID); err != nil {
return nil, err
}

if waitState.OOMKilled {
return waitState, &OomError{
Name: step.Name,
Expand Down