Skip to content

Commit

Permalink
fix: Remove need for get pods from Emissary (#8133)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <[email protected]>
  • Loading branch information
alexec authored Mar 13, 2022
1 parent 537dd3b commit af26ff7
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 34 deletions.
2 changes: 2 additions & 0 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ const (
EnvVarContainerName = "ARGO_CONTAINER_NAME"
// EnvVarDeadline is the deadline for the pod
EnvVarDeadline = "ARGO_DEADLINE"
// EnvVarTerminationGracePeriodSeconds is pod.spec.terminationGracePeriodSeconds
EnvVarTerminationGracePeriodSeconds = "ARGO_TERMINATION_GRACE_PERIOD_SECONDS"
// EnvVarIncludeScriptOutput capture the stdout and stderr
EnvVarIncludeScriptOutput = "ARGO_INCLUDE_SCRIPT_OUTPUT"
// EnvVarTemplate is the template
Expand Down
3 changes: 3 additions & 0 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,9 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
c.Command = append([]string{"/var/run/argo/argoexec", "emissary", "--"}, c.Command...)
}
c.VolumeMounts = append(c.VolumeMounts, volumeMountVarArgo)
if x := pod.Spec.TerminationGracePeriodSeconds; x != nil && c.Name == common.WaitContainerName {
c.Env = append(c.Env, apiv1.EnvVar{Name: common.EnvVarTerminationGracePeriodSeconds, Value: fmt.Sprint(*x)})
}
pod.Spec.Containers[i] = c
}

Expand Down
42 changes: 8 additions & 34 deletions workflow/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"path"
"path/filepath"
"runtime/debug"
"strconv"
"strings"
"syscall"
"time"
Expand Down Expand Up @@ -109,14 +110,6 @@ type ContainerRuntimeExecutor interface {
ListContainerNames(ctx context.Context) ([]string, error)
}

func errWithHelp(err error) error {
return fmt.Errorf("unable to get pods, you can check https://argoproj.github.io/argo-workflows/faq/: %w", err)
}

func isErrUnknownGetPods(err error) bool {
return strings.Contains(err.Error(), "unknown (get pods)")
}

// NewExecutor instantiates a new workflow executor
func NewExecutor(
clientset kubernetes.Interface,
Expand Down Expand Up @@ -624,24 +617,6 @@ func (we *WorkflowExecutor) InitDriver(ctx context.Context, art *wfv1.Artifact)
return driver, err
}

// getPod is a wrapper around the pod interface to get the current pod from kube API server
func (we *WorkflowExecutor) getPod(ctx context.Context) (*apiv1.Pod, error) {
podsIf := we.ClientSet.CoreV1().Pods(we.Namespace)
var pod *apiv1.Pod
err := waitutil.Backoff(executorretry.ExecutorRetry, func() (bool, error) {
var err error
pod, err = podsIf.Get(ctx, we.PodName, metav1.GetOptions{})
if err != nil && isErrUnknownGetPods(err) {
return !errorsutil.IsTransientErr(err), errWithHelp(err)
}
return !errorsutil.IsTransientErr(err), err
})
if err != nil {
return nil, argoerrs.InternalWrapError(err)
}
return pod, nil
}

// GetConfigMapKey retrieves a configmap value and memoizes the result
func (we *WorkflowExecutor) GetConfigMapKey(ctx context.Context, name, key string) (string, error) {
namespace := we.Namespace
Expand Down Expand Up @@ -700,13 +675,12 @@ func (we *WorkflowExecutor) GetSecrets(ctx context.Context, namespace, name, key
}

// GetTerminationGracePeriodDuration returns the terminationGracePeriodSeconds of podSpec in Time.Duration format
func (we *WorkflowExecutor) GetTerminationGracePeriodDuration(ctx context.Context) (time.Duration, error) {
pod, err := we.getPod(ctx)
if err != nil || pod.Spec.TerminationGracePeriodSeconds == nil {
return time.Duration(0), err
func getTerminationGracePeriodDuration() time.Duration {
x, _ := strconv.ParseInt(os.Getenv(common.EnvVarTerminationGracePeriodSeconds), 10, 64)
if x > 0 {
return time.Duration(x) * time.Second
}
terminationGracePeriodDuration := time.Second * time.Duration(*pod.Spec.TerminationGracePeriodSeconds)
return terminationGracePeriodDuration, nil
return 30 * time.Second
}

// CaptureScriptResult will add the stdout of a script template as output result
Expand Down Expand Up @@ -1089,7 +1063,7 @@ func (we *WorkflowExecutor) monitorDeadline(ctx context.Context, containerNames

func (we *WorkflowExecutor) killContainers(ctx context.Context, containerNames []string) {
log.Infof("Killing containers")
terminationGracePeriodDuration, _ := we.GetTerminationGracePeriodDuration(ctx)
terminationGracePeriodDuration := getTerminationGracePeriodDuration()
if err := we.RuntimeExecutor.Kill(ctx, containerNames, terminationGracePeriodDuration); err != nil {
log.Warnf("Failed to kill %q: %v", containerNames, err)
}
Expand All @@ -1111,7 +1085,7 @@ func (we *WorkflowExecutor) KillSidecars(ctx context.Context) error {
if len(sidecarNames) == 0 {
return nil // exit early as GetTerminationGracePeriodDuration performs `get pod`
}
terminationGracePeriodDuration, _ := we.GetTerminationGracePeriodDuration(ctx)
terminationGracePeriodDuration := getTerminationGracePeriodDuration()
return we.RuntimeExecutor.Kill(ctx, sidecarNames, terminationGracePeriodDuration)
}

Expand Down

0 comments on commit af26ff7

Please sign in to comment.