Skip to content

Commit

Permalink
use context to cancel in-flight requests
Browse files Browse the repository at this point in the history
  • Loading branch information
linki committed Apr 2, 2020
1 parent 406f4bc commit 485dfa1
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 35 deletions.
26 changes: 13 additions & 13 deletions chaoskube/chaoskube.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func New(client kubernetes.Interface, labels, annotations, namespaces, namespace
// described by channel next. It returns when the given context is canceled.
func (c *Chaoskube) Run(ctx context.Context, next <-chan time.Time) {
for {
if err := c.TerminateVictims(); err != nil {
if err := c.TerminateVictims(ctx); err != nil {
c.Logger.WithField("err", err).Error("failed to terminate victim")
metrics.ErrorsTotal.Inc()
}
Expand All @@ -142,7 +142,7 @@ func (c *Chaoskube) Run(ctx context.Context, next <-chan time.Time) {

// TerminateVictims picks and deletes a victim.
// It respects the configured excluded weekdays, times of day and days of a year filters.
func (c *Chaoskube) TerminateVictims() error {
func (c *Chaoskube) TerminateVictims(ctx context.Context) error {
now := c.Now().In(c.Timezone)

for _, wd := range c.ExcludedWeekdays {
Expand All @@ -166,7 +166,7 @@ func (c *Chaoskube) TerminateVictims() error {
}
}

victims, err := c.Victims()
victims, err := c.Victims(ctx)
if err == errPodNotFound {
c.Logger.Debug(msgVictimNotFound)
return nil
Expand All @@ -177,16 +177,16 @@ func (c *Chaoskube) TerminateVictims() error {

var result *multierror.Error
for _, victim := range victims {
err = c.DeletePod(victim)
err = c.DeletePod(ctx, victim)
result = multierror.Append(result, err)
}

return result.ErrorOrNil()
}

// Victims returns up to N pods as configured by MaxKill flag
func (c *Chaoskube) Victims() ([]v1.Pod, error) {
pods, err := c.Candidates()
func (c *Chaoskube) Victims(ctx context.Context) ([]v1.Pod, error) {
pods, err := c.Candidates(ctx)
if err != nil {
return []v1.Pod{}, err
}
Expand All @@ -203,10 +203,10 @@ func (c *Chaoskube) Victims() ([]v1.Pod, error) {

// Candidates returns the list of pods that are available for termination.
// It returns all pods that match the configured label, annotation and namespace selectors.
func (c *Chaoskube) Candidates() ([]v1.Pod, error) {
func (c *Chaoskube) Candidates(ctx context.Context) ([]v1.Pod, error) {
listOptions := metav1.ListOptions{LabelSelector: c.Labels.String()}

podList, err := c.Client.CoreV1().Pods(v1.NamespaceAll).List(context.TODO(), listOptions)
podList, err := c.Client.CoreV1().Pods(v1.NamespaceAll).List(ctx, listOptions)
if err != nil {
return nil, err
}
Expand All @@ -216,7 +216,7 @@ func (c *Chaoskube) Candidates() ([]v1.Pod, error) {
return nil, err
}

pods, err = filterPodsByNamespaceLabels(pods, c.NamespaceLabels, c.Client)
pods, err = filterPodsByNamespaceLabels(ctx, pods, c.NamespaceLabels, c.Client)
if err != nil {
return nil, err
}
Expand All @@ -233,7 +233,7 @@ func (c *Chaoskube) Candidates() ([]v1.Pod, error) {

// DeletePod deletes the given pod with the selected terminator.
// It will not delete the pod if dry-run mode is enabled.
func (c *Chaoskube) DeletePod(victim v1.Pod) error {
func (c *Chaoskube) DeletePod(ctx context.Context, victim v1.Pod) error {
c.Logger.WithFields(log.Fields{
"namespace": victim.Namespace,
"name": victim.Name,
Expand All @@ -245,7 +245,7 @@ func (c *Chaoskube) DeletePod(victim v1.Pod) error {
}

start := time.Now()
err := c.Terminator.Terminate(victim)
err := c.Terminator.Terminate(ctx, victim)
metrics.TerminationDurationSeconds.Observe(time.Since(start).Seconds())
if err != nil {
return err
Expand Down Expand Up @@ -324,7 +324,7 @@ func filterByNamespaces(pods []v1.Pod, namespaces labels.Selector) ([]v1.Pod, er
}

// filterPodsByNamespaceLabels filters a list of pods by a given label selector on their namespace.
func filterPodsByNamespaceLabels(pods []v1.Pod, labels labels.Selector, client kubernetes.Interface) ([]v1.Pod, error) {
func filterPodsByNamespaceLabels(ctx context.Context, pods []v1.Pod, labels labels.Selector, client kubernetes.Interface) ([]v1.Pod, error) {
// empty filter returns original list
if labels.Empty() {
return pods, nil
Expand All @@ -333,7 +333,7 @@ func filterPodsByNamespaceLabels(pods []v1.Pod, labels labels.Selector, client k
// find all namespaces matching the label selector
listOptions := metav1.ListOptions{LabelSelector: labels.String()}

namespaces, err := client.CoreV1().Namespaces().List(context.TODO(), listOptions)
namespaces, err := client.CoreV1().Namespaces().List(ctx, listOptions)
if err != nil {
return nil, err
}
Expand Down
30 changes: 15 additions & 15 deletions chaoskube/chaoskube_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func (suite *Suite) TestNoVictimReturnsError() {
1,
)

_, err := chaoskube.Victims()
_, err := chaoskube.Victims(context.Background())
suite.Equal(err, errPodNotFound)
suite.EqualError(err, "pod not found")
}
Expand Down Expand Up @@ -412,7 +412,7 @@ func (suite *Suite) TestDeletePod() {

victim := util.NewPod("default", "foo", v1.PodRunning)

err := chaoskube.DeletePod(victim)
err := chaoskube.DeletePod(context.Background(), victim)
suite.Require().NoError(err)

suite.AssertLog(logOutput, log.InfoLevel, "terminating pod", log.Fields{"namespace": "default", "name": "foo"})
Expand Down Expand Up @@ -441,7 +441,7 @@ func (suite *Suite) TestDeletePodNotFound() {

victim := util.NewPod("default", "foo", v1.PodRunning)

err := chaoskube.DeletePod(victim)
err := chaoskube.DeletePod(context.Background(), victim)
suite.EqualError(err, `pods "foo" not found`)
}

Expand Down Expand Up @@ -670,10 +670,10 @@ func (suite *Suite) TestTerminateVictim() {
)
chaoskube.Now = tt.now

err := chaoskube.TerminateVictims()
err := chaoskube.TerminateVictims(context.Background())
suite.Require().NoError(err)

pods, err := chaoskube.Candidates()
pods, err := chaoskube.Candidates(context.Background())
suite.Require().NoError(err)

suite.Len(pods, tt.remainingPodCount)
Expand All @@ -699,7 +699,7 @@ func (suite *Suite) TestTerminateNoVictimLogsInfo() {
1,
)

err := chaoskube.TerminateVictims()
err := chaoskube.TerminateVictims(context.Background())
suite.Require().NoError(err)

suite.AssertLog(logOutput, log.DebugLevel, msgVictimNotFound, log.Fields{})
Expand All @@ -708,14 +708,14 @@ func (suite *Suite) TestTerminateNoVictimLogsInfo() {
// helper functions

func (suite *Suite) assertCandidates(chaoskube *Chaoskube, expected []map[string]string) {
pods, err := chaoskube.Candidates()
pods, err := chaoskube.Candidates(context.Background())
suite.Require().NoError(err)

suite.AssertPods(pods, expected)
}

func (suite *Suite) assertVictims(chaoskube *Chaoskube, expected []map[string]string) {
victims, err := chaoskube.Victims()
victims, err := chaoskube.Victims(context.Background())
suite.Require().NoError(err)

for i, victim := range victims {
Expand Down Expand Up @@ -753,7 +753,7 @@ func (suite *Suite) setupWithPods(labelSelector labels.Selector, annotations lab
util.NewNamespace("default"),
util.NewNamespace("testing"),
} {
_, err := chaoskube.Client.CoreV1().Namespaces().Create(context.TODO(), &namespace, metav1.CreateOptions{})
_, err := chaoskube.Client.CoreV1().Namespaces().Create(context.Background(), &namespace, metav1.CreateOptions{})
suite.Require().NoError(err)
}

Expand All @@ -764,7 +764,7 @@ func (suite *Suite) setupWithPods(labelSelector labels.Selector, annotations lab
}

for _, pod := range pods {
_, err := chaoskube.Client.CoreV1().Pods(pod.Namespace).Create(context.TODO(), &pod, metav1.CreateOptions{})
_, err := chaoskube.Client.CoreV1().Pods(pod.Namespace).Create(context.Background(), &pod, metav1.CreateOptions{})
suite.Require().NoError(err)
}

Expand All @@ -774,10 +774,10 @@ func (suite *Suite) setupWithPods(labelSelector labels.Selector, annotations lab
func (suite *Suite) createPods(client kubernetes.Interface, podsInfo []podInfo) {
for _, p := range podsInfo {
namespace := util.NewNamespace(p.Namespace)
_, err := client.CoreV1().Namespaces().Create(context.TODO(), &namespace, metav1.CreateOptions{})
_, err := client.CoreV1().Namespaces().Create(context.Background(), &namespace, metav1.CreateOptions{})
suite.Require().NoError(err)
pod := util.NewPod(p.Namespace, p.Name, v1.PodRunning)
_, err = client.CoreV1().Pods(p.Namespace).Create(context.TODO(), &pod, metav1.CreateOptions{})
_, err = client.CoreV1().Pods(p.Namespace).Create(context.Background(), &pod, metav1.CreateOptions{})
suite.Require().NoError(err)
}
}
Expand Down Expand Up @@ -916,11 +916,11 @@ func (suite *Suite) TestMinimumAge() {
for _, p := range tt.pods {
pod := util.NewPod(p.namespace, p.name, v1.PodRunning)
pod.ObjectMeta.CreationTimestamp = metav1.Time{Time: p.creationTime}
_, err := chaoskube.Client.CoreV1().Pods(pod.Namespace).Create(context.TODO(), &pod, metav1.CreateOptions{})
_, err := chaoskube.Client.CoreV1().Pods(pod.Namespace).Create(context.Background(), &pod, metav1.CreateOptions{})
suite.Require().NoError(err)
}

pods, err := chaoskube.Candidates()
pods, err := chaoskube.Candidates(context.Background())
suite.Require().NoError(err)

suite.Len(pods, tt.candidates)
Expand Down Expand Up @@ -999,7 +999,7 @@ func (suite *Suite) TestNotifierCall() {
)

victim := util.NewPod("default", "foo", v1.PodRunning)
err := chaoskube.DeletePod(victim)
err := chaoskube.DeletePod(context.Background(), victim)

suite.Require().NoError(err)
suite.assertNotified(testNotifier)
Expand Down
4 changes: 2 additions & 2 deletions terminator/delete_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ func NewDeletePodTerminator(client kubernetes.Interface, logger log.FieldLogger,
}

// Terminate sends a request to Kubernetes to delete the pod.
func (t *DeletePodTerminator) Terminate(victim v1.Pod) error {
func (t *DeletePodTerminator) Terminate(ctx context.Context, victim v1.Pod) error {
t.logger.WithFields(log.Fields{
"namespace": victim.Namespace,
"name": victim.Name,
}).Debug("calling deletePod endpoint")

return t.client.CoreV1().Pods(victim.Namespace).Delete(context.TODO(), victim.Name, deleteOptions(t.gracePeriod))
return t.client.CoreV1().Pods(victim.Namespace).Delete(ctx, victim.Name, deleteOptions(t.gracePeriod))
}

func deleteOptions(gracePeriod time.Duration) metav1.DeleteOptions {
Expand Down
6 changes: 3 additions & 3 deletions terminator/delete_pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,18 @@ func (suite *DeletePodTerminatorSuite) TestTerminate() {
}

for _, pod := range pods {
_, err := client.CoreV1().Pods(pod.Namespace).Create(context.TODO(), &pod, metav1.CreateOptions{})
_, err := client.CoreV1().Pods(pod.Namespace).Create(context.Background(), &pod, metav1.CreateOptions{})
suite.Require().NoError(err)
}

victim := util.NewPod("default", "foo", v1.PodRunning)

err := terminator.Terminate(victim)
err := terminator.Terminate(context.Background(), victim)
suite.Require().NoError(err)

suite.AssertLog(logOutput, log.DebugLevel, "calling deletePod endpoint", log.Fields{"namespace": "default", "name": "foo"})

remainingPods, err := client.CoreV1().Pods(v1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
remainingPods, err := client.CoreV1().Pods(v1.NamespaceAll).List(context.Background(), metav1.ListOptions{})
suite.Require().NoError(err)

suite.AssertPods(remainingPods.Items, []map[string]string{
Expand Down
6 changes: 4 additions & 2 deletions terminator/terminator.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package terminator

import (
"k8s.io/api/core/v1"
"context"

v1 "k8s.io/api/core/v1"
)

// Terminator is the interface for implementations of pod terminators.
type Terminator interface {
// Terminate terminates the given pod.
Terminate(victim v1.Pod) error
Terminate(ctx context.Context, victim v1.Pod) error
}

0 comments on commit 485dfa1

Please sign in to comment.