Skip to content

Commit

Permalink
Merge pull request #86 from drone-runners/spec-termination-channel
Browse files Browse the repository at this point in the history
bail out from engine's Run method when pipeline is cancelled
  • Loading branch information
marko-gacesa authored Mar 15, 2022
2 parents eacc3d2 + bbdbd50 commit 284196d
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 7 deletions.
39 changes: 32 additions & 7 deletions engine/engine_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package engine

import (
"context"
"errors"
"io"
"sync"
"time"
Expand All @@ -31,6 +32,8 @@ type Kubernetes struct {
containerStartTimeout time.Duration
}

var errPodStopped = errors.New("pod has been stopped")

// New returns a new engine with the provided kubernetes client
func New(client kubernetes.Interface, containerStartTimeout time.Duration) runtime.Engine {
if containerStartTimeout < time.Second {
Expand Down Expand Up @@ -89,6 +92,8 @@ func (k *Kubernetes) Setup(ctx context.Context, specv runtime.Spec) (err error)
}
log.Trace("created pod")

spec.stop = make(chan struct{})

return nil
}

Expand Down Expand Up @@ -118,6 +123,8 @@ func (k *Kubernetes) Destroy(ctx context.Context, specv runtime.Spec) error {
log.Trace("deleted secret")
}

close(spec.stop)

var isPodDeleted bool

if err := k.client.CoreV1().Pods(spec.PodSpec.Namespace).Delete(context.Background(), spec.PodSpec.Name, metav1.DeleteOptions{}); err != nil {
Expand Down Expand Up @@ -209,6 +216,8 @@ func (k *Kubernetes) Run(ctx context.Context, specv runtime.Spec, stepv runtime.
case <-time.After(k.containerStartTimeout):
err = podwatcher.StartTimeoutContainerError{Container: containerId, Image: containerImage}
log.WithError(err).Error("Engine: Container start timeout")
case <-spec.stop:
return nil, errPodStopped
}
if err != nil {
return
Expand All @@ -219,15 +228,31 @@ func (k *Kubernetes) Run(ctx context.Context, specv runtime.Spec, stepv runtime.
return
}

code, err := watcher.WaitContainerTerminated(containerId)
if err != nil {
return
type containerResult struct {
code int
err error
}

state = &runtime.State{
ExitCode: code,
Exited: true,
OOMKilled: false,
chErrStop := make(chan containerResult)
go func() {
code, err := watcher.WaitContainerTerminated(containerId)
chErrStop <- containerResult{code: code, err: err}
}()

select {
case result := <-chErrStop:
err = result.err
if err != nil {
return
}

state = &runtime.State{
ExitCode: result.code,
Exited: true,
OOMKilled: false,
}
case <-spec.stop:
return nil, errPodStopped
}

return
Expand Down
4 changes: 4 additions & 0 deletions engine/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ type (
// the pipeline completes. WARNING this field should only
// be set if you want custom per-pipeline namespaces.
Namespace string `json:"namespace,omitempty"`

// stop channel is created by the engine's Setup method, and closed by the Destroy method.
// It's used to quickly bail out from the Run method if the pipeline is terminated or canceled.
stop chan struct{}
}

// Step defines a pipeline step.
Expand Down

0 comments on commit 284196d

Please sign in to comment.