Skip to content

Commit

Permalink
Set OwnerReference on Plugin resources (#845)
Browse files Browse the repository at this point in the history
When creating Pod or DaemonSet resources when running plugins, we didn't
mark them as owned by the Sonobuoy aggregator pod. This meant that if
the aggregator pod was deleted, the resources created by that pod would
be orphaned and would have to be deleted manually. This change sets the
OwnerReference for all resources to be the Sonobuoy aggregator pod so
that when the aggregator is deleted, all child resources are also
deleted.

Signed-off-by: Bridget McErlean <[email protected]>
  • Loading branch information
zubron authored Aug 16, 2019
1 parent 2bec2b1 commit 031d29d
Show file tree
Hide file tree
Showing 12 changed files with 315 additions and 52 deletions.
2 changes: 1 addition & 1 deletion pkg/client/retrieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (c *SonobuoyClient) RetrieveResults(cfg *RetrieveConfig) (io.Reader, <-chan
}

// Determine sonobuoy pod name
podName, err := pluginaggregation.GetStatusPodName(client, cfg.Namespace)
podName, err := pluginaggregation.GetAggregatorPodName(client, cfg.Namespace)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to get the name of the aggregator pod to fetch results from")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func setStatus(client kubernetes.Interface, namespace string, status *pluginaggr
}

// Determine sonobuoy pod name
podName, err := pluginaggregation.GetStatusPodName(client, namespace)
podName, err := pluginaggregation.GetAggregatorPodName(client, namespace)
if err != nil {
return errors.Wrap(err, "failed to get the name of the aggregator pod to set the status on")
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/plugin/aggregation/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,15 @@ func Run(client kubernetes.Interface, plugins []plugin.Interface, cfg plugin.Agg
certs[p.GetName()] = cert
}

// Get a reference to the aggregator pod to set up owner references correctly for each started plugin
aggregatorPod, err := GetAggregatorPod(client, namespace)
if err != nil {
return errors.Wrapf(err, "couldn't get aggregator pod")
}

for _, p := range plugins {
logrus.WithField("plugin", p.GetName()).Info("Running plugin")
go aggr.RunAndMonitorPlugin(ctx, p, client, nodes.Items, cfg.AdvertiseAddress, certs[p.GetName()])
go aggr.RunAndMonitorPlugin(ctx, p, client, nodes.Items, cfg.AdvertiseAddress, certs[p.GetName()], aggregatorPod)
}

// Give the plugins a chance to cleanup before a hard timeout occurs
Expand Down Expand Up @@ -209,11 +215,11 @@ func Cleanup(client kubernetes.Interface, plugins []plugin.Interface) {

// RunAndMonitorPlugin will start a plugin then monitor it for errors starting/running.
// Errors detected will be handled by saving an error result in the aggregator.Results.
func (a *Aggregator) RunAndMonitorPlugin(ctx context.Context, p plugin.Interface, client kubernetes.Interface, nodes []corev1.Node, address string, cert *tls.Certificate) {
func (a *Aggregator) RunAndMonitorPlugin(ctx context.Context, p plugin.Interface, client kubernetes.Interface, nodes []corev1.Node, address string, cert *tls.Certificate, aggregatorPod *corev1.Pod) {
monitorCh := make(chan *plugin.Result, 1)
pCtx, cancel := context.WithCancel(ctx)

if err := p.Run(client, address, cert); err != nil {
if err := p.Run(client, address, cert, aggregatorPod); err != nil {
err := errors.Wrapf(err, "error running plugin %v", p.GetName())
logrus.Error(err)
monitorCh <- utils.MakeErrorResult(p.GetResultType(), map[string]interface{}{"error": err.Error()}, "")
Expand Down
4 changes: 2 additions & 2 deletions pkg/plugin/aggregation/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestRunAndMonitorPlugin(t *testing.T) {
}

go func() {
a.RunAndMonitorPlugin(ctx, tc.plugin, fclient, nil, "testname", testCert)
a.RunAndMonitorPlugin(ctx, tc.plugin, fclient, nil, "testname", testCert, &corev1.Pod{})
doneCh <- struct{}{}
}()

Expand Down Expand Up @@ -207,7 +207,7 @@ type MockCleanupPlugin struct {
cleanedUp bool
}

func (cp *MockCleanupPlugin) Run(_ kubernetes.Interface, _ string, _ *tls.Certificate) error {
func (cp *MockCleanupPlugin) Run(_ kubernetes.Interface, _ string, _ *tls.Certificate, _ *corev1.Pod) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/plugin/aggregation/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func GetStatus(client kubernetes.Interface, namespace string) (*Status, error) {
}

// Determine sonobuoy pod name
podName, err := GetStatusPodName(client, namespace)
podName, err := GetAggregatorPodName(client, namespace)
if err != nil {
return nil, errors.Wrap(err, "failed to get the name of the aggregator pod to get the status from")
}
Expand Down
46 changes: 37 additions & 9 deletions pkg/plugin/aggregation/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
Expand All @@ -36,6 +37,13 @@ const (
DefaultStatusPodName = "sonobuoy"
)

// NoPodWithLabelError represents an error encountered when a pod with a given label can't be found
type NoPodWithLabelError string

func (n NoPodWithLabelError) Error() string {
return string(n)
}

// node and name uniquely identify a single plugin result
type key struct {
node, name string
Expand Down Expand Up @@ -118,7 +126,7 @@ func (u *updater) Annotate(results map[string]*plugin.Result) error {
}

// Determine sonobuoy pod name
podName, err := GetStatusPodName(u.client, u.namespace)
podName, err := GetAggregatorPodName(u.client, u.namespace)
if err != nil {
return errors.Wrap(err, "failed to get name of the aggregator pod to annotate")
}
Expand Down Expand Up @@ -166,25 +174,45 @@ func GetPatch(annotation string) map[string]interface{} {
}
}

// GetStatusPodName gets the sonobuoy master pod name based on its label.
func GetStatusPodName(client kubernetes.Interface, namespace string) (string, error) {
// GetAggregatorPod gets the sonobuoy aggregator pod based on its label.
// It returns NoPodWithLabelError in the case where a pod with sonobuoy aggregator label could not be found.
func GetAggregatorPod(client kubernetes.Interface, namespace string) (*v1.Pod, error) {
listOptions := metav1.ListOptions{
LabelSelector: StatusPodLabel,
}

podList, err := client.CoreV1().Pods(namespace).List(listOptions)
if err != nil {
return "", errors.Wrap(err, "unable to list pods with label %q")
return nil, errors.Wrapf(err, "unable to list pods with label %q", StatusPodLabel)
}

switch {
case len(podList.Items) == 0:
logrus.Warningf("No pods found with label %q in namespace %s, using default pod name %q", StatusPodLabel, namespace, DefaultStatusPodName)
return DefaultStatusPodName, nil
logrus.Warningf("no pods found with label %q in namespace %s", StatusPodLabel, namespace)
return nil, NoPodWithLabelError(fmt.Sprintf("no pods found with label %q in namespace %s", StatusPodLabel, namespace))

case len(podList.Items) > 1:
logrus.Warningf("Found more than one pod with label %q. Using %q", StatusPodLabel, podList.Items[0].GetName())
return podList.Items[0].GetName(), nil
logrus.Warningf("Found more than one pod with label %q. Using pod with name %q", StatusPodLabel, podList.Items[0].GetName())
return &podList.Items[0], nil
default:
return podList.Items[0].GetName(), nil
return &podList.Items[0], nil
}
}

// GetAggregatorPodName gets the sonobuoy aggregator pod name. It returns the default pod name
// if the pod cannot be found.
func GetAggregatorPodName(client kubernetes.Interface, namespace string) (string, error) {
ap, err := GetAggregatorPod(client, namespace)

if err != nil {
switch err.(type) {
case NoPodWithLabelError:
logrus.Warningf("Aggregator pod not found, using default pod name %q: %v", DefaultStatusPodName, err)
return DefaultStatusPodName, nil
default:
return "", errors.Wrap(err, "failed to get aggregator pod")
}
}

return ap.GetName(), nil
}
112 changes: 98 additions & 14 deletions pkg/plugin/aggregation/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,102 @@ func TestCreateUpdater(t *testing.T) {
}
}

func TestGetStatusPodName(t *testing.T) {
func TestGetAggregatorPod(t *testing.T) {
createPodWithRunLabel := func(name string) corev1.Pod {
return corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{"run": "sonobuoy-master"},
},
}
}

testPods := []corev1.Pod{
createPodWithRunLabel("sonobuoy-run-pod-1"),
createPodWithRunLabel("sonobuoy-run-pod-2"),
}

checkNoError := func(err error) error {
if err != nil {
return errors.Wrap(err, "expected no error")
}
return nil
}

checkErrorFromServer := func(err error) error {
if err == nil {
return errors.New("expected error but got nil")
}
return nil
}

checkNoPodWithLabelError := func(err error) error {
if _, ok := err.(NoPodWithLabelError); !ok {
return errors.Wrap(err, "expected error to have type NoPodWithLabelError")
}
return nil
}

testCases := []struct {
desc string
podsOnServer corev1.PodList
errFromServer error
checkError func(err error) error
expectedPod *corev1.Pod
}{
{
desc: "Error retrieving pods from server results in no pod and an error being returned",
podsOnServer: corev1.PodList{},
errFromServer: errors.New("could not retrieve pods"),
checkError: checkErrorFromServer,
expectedPod: nil,
},
{
desc: "No pods results in no pod and no error",
podsOnServer: corev1.PodList{},
checkError: checkNoPodWithLabelError,
expectedPod: nil,
},
{
desc: "Only one pod results in that pod being returned",
podsOnServer: corev1.PodList{Items: []corev1.Pod{testPods[0]}},
checkError: checkNoError,
expectedPod: &testPods[0],
},
{
desc: "More that one pod results in the first pod being returned",
podsOnServer: corev1.PodList{Items: testPods},
checkError: checkNoError,
expectedPod: &testPods[0],
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
fclient := fake.NewSimpleClientset()
fclient.PrependReactor("*", "*", func(action k8stesting.Action) (handled bool, ret kuberuntime.Object, err error) {
return true, &tc.podsOnServer, tc.errFromServer
})

pod, err := GetAggregatorPod(fclient, "sonobuoy")
if checkErr := tc.checkError(err); checkErr != nil {
t.Errorf("error check failed: %v", checkErr)
}
if tc.expectedPod == nil {
if pod != nil {
t.Errorf("Expected no pod to be found but found pod %q", pod.GetName())
}
} else {
if pod.GetName() != tc.expectedPod.GetName() {
t.Errorf("Incorrect pod returned, expected %q but got %q", tc.expectedPod.GetName(), pod.GetName())
}
}

})
}
}

func TestGetAggregatorPodName(t *testing.T) {
createPodWithRunLabel := func(name string) corev1.Pod {
return corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -84,7 +179,7 @@ func TestGetStatusPodName(t *testing.T) {
expectedPodName: "sonobuoy",
},
{
desc: "Only one pod results in that pod name being used",
desc: "A returned pod results in that pod name being used",
podsOnServer: corev1.PodList{
Items: []corev1.Pod{
createPodWithRunLabel("sonobuoy-run-pod"),
Expand All @@ -93,17 +188,6 @@ func TestGetStatusPodName(t *testing.T) {
errFromServer: nil,
expectedPodName: "sonobuoy-run-pod",
},
{
desc: "More that one pod results in the first pod name being used",
podsOnServer: corev1.PodList{
Items: []corev1.Pod{
createPodWithRunLabel("sonobuoy-run-pod-1"),
createPodWithRunLabel("sonobuoy-run-pod-2"),
},
},
errFromServer: nil,
expectedPodName: "sonobuoy-run-pod-1",
},
}

for _, tc := range testCases {
Expand All @@ -113,7 +197,7 @@ func TestGetStatusPodName(t *testing.T) {
return true, &tc.podsOnServer, tc.errFromServer
})

podName, err := GetStatusPodName(fclient, "sonobuoy")
podName, err := GetAggregatorPodName(fclient, "sonobuoy")
if tc.errFromServer == nil && err != nil {
t.Errorf("Unexpected error returned, expected nil but got %q", err)
}
Expand Down
14 changes: 11 additions & 3 deletions pkg/plugin/driver/daemonset/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func getMasterAddress(hostname string) string {
return fmt.Sprintf("https://%s/api/v1/results/by-node", hostname)
}

func (p *Plugin) createDaemonSetDefinition(hostname string, cert *tls.Certificate) appsv1.DaemonSet {
func (p *Plugin) createDaemonSetDefinition(hostname string, cert *tls.Certificate, ownerPod *v1.Pod) appsv1.DaemonSet {
ds := appsv1.DaemonSet{}
annotations := map[string]string{
"sonobuoy-driver": p.GetDriver(),
Expand All @@ -106,6 +106,14 @@ func (p *Plugin) createDaemonSetDefinition(hostname string, cert *tls.Certificat
Namespace: p.Namespace,
Labels: labels,
Annotations: annotations,
OwnerReferences: []metav1.OwnerReference{
metav1.OwnerReference{
APIVersion: "v1",
Kind: "Pod",
Name: ownerPod.GetName(),
UID: ownerPod.GetUID(),
},
},
}

ds.Spec.Selector = &metav1.LabelSelector{
Expand Down Expand Up @@ -151,8 +159,8 @@ func (p *Plugin) createDaemonSetDefinition(hostname string, cert *tls.Certificat
}

// Run dispatches worker pods according to the DaemonSet's configuration.
func (p *Plugin) Run(kubeclient kubernetes.Interface, hostname string, cert *tls.Certificate) error {
daemonSet := p.createDaemonSetDefinition(getMasterAddress(hostname), cert)
func (p *Plugin) Run(kubeclient kubernetes.Interface, hostname string, cert *tls.Certificate, ownerPod *v1.Pod) error {
daemonSet := p.createDaemonSetDefinition(getMasterAddress(hostname), cert, ownerPod)

secret, err := p.MakeTLSSecret(cert)
if err != nil {
Expand Down
Loading

0 comments on commit 031d29d

Please sign in to comment.