Skip to content

Commit

Permalink
fix: job metrics owner ref when using custom job kubeconfig/ns (argop…
Browse files Browse the repository at this point in the history
…roj#3425)

* fix: job metrcis owner ref when using custom job kubeconfig/ns

Signed-off-by: Soumya Ghosh Dastidar <[email protected]>

* fix: job metrcis owner ref when using custom job kubeconfig/ns

Signed-off-by: Soumya Ghosh Dastidar <[email protected]>

* revert: go mod change

Signed-off-by: Soumya Ghosh Dastidar <[email protected]>

---------

Signed-off-by: Soumya Ghosh Dastidar <[email protected]>
Signed-off-by: Zach Aller <[email protected]>
Co-authored-by: Zach Aller <[email protected]>
  • Loading branch information
gdsoumya and zachaller authored Mar 15, 2024
1 parent 383f719 commit 0f52c93
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 35 deletions.
29 changes: 25 additions & 4 deletions analysis/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (

"github.com/argoproj/argo-rollouts/metric"
jobProvider "github.com/argoproj/argo-rollouts/metricproviders/job"
"github.com/aws/smithy-go/ptr"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

unstructuredutil "github.com/argoproj/argo-rollouts/utils/unstructured"

Expand All @@ -32,6 +35,10 @@ import (
timeutil "github.com/argoproj/argo-rollouts/utils/time"
)

var (
analysisRunGVK = v1alpha1.SchemeGroupVersion.WithKind("AnalysisRun")
)

// Controller is the controller implementation for Analysis resources
type Controller struct {
// kubeclientset is a standard kubernetes clientset
Expand Down Expand Up @@ -187,18 +194,32 @@ func (c *Controller) syncHandler(ctx context.Context, key string) error {
return c.persistAnalysisRunStatus(run, newRun.Status)
}

func (c *Controller) jobParentNamespace(obj any) string {
func (c *Controller) jobParentReference(obj any) (*v1.OwnerReference, string) {
job, ok := obj.(*batchv1.Job)
if !ok {
return ""
return nil, ""
}
// if it has owner reference, return it as is
ownerRef := v1.GetControllerOf(job)
// else if it's missing owner reference check if analysis run uid is set and
// if it is there use labels/annotations to create owner reference
if ownerRef == nil && job.Labels[jobProvider.AnalysisRunUIDLabelKey] != "" {
ownerRef = &v1.OwnerReference{
APIVersion: analysisRunGVK.GroupVersion().String(),
Kind: analysisRunGVK.Kind,
Name: job.Annotations[jobProvider.AnalysisRunNameAnnotationKey],
UID: types.UID(job.Labels[jobProvider.AnalysisRunUIDLabelKey]),
BlockOwnerDeletion: ptr.Bool(true),
Controller: ptr.Bool(true),
}
}
ns := job.GetNamespace()
if job.Annotations != nil {
if job.Annotations[jobProvider.AnalysisRunNamespaceAnnotationKey] != "" {
ns = job.Annotations[jobProvider.AnalysisRunNamespaceAnnotationKey]
}
}
return ns
return ownerRef, ns
}

func (c *Controller) enqueueJobIfCompleted(obj any) {
Expand All @@ -209,7 +230,7 @@ func (c *Controller) enqueueJobIfCompleted(obj any) {
for _, condition := range job.Status.Conditions {
switch condition.Type {
case batchv1.JobFailed, batchv1.JobComplete:
controllerutil.EnqueueParentObject(job, register.AnalysisRunKind, c.enqueueAnalysis, c.jobParentNamespace)
controllerutil.EnqueueParentObject(job, register.AnalysisRunKind, c.enqueueAnalysis, c.jobParentReference)
return
}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/rollouts-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func newCommand() *cobra.Command {
instanceIDTweakListFunc := func(options *metav1.ListOptions) {
options.LabelSelector = instanceIDSelector.String()
}
jobKubeClient, err := metricproviders.GetAnalysisJobClientset(kubeClient)
jobKubeClient, _, err := metricproviders.GetAnalysisJobClientset(kubeClient)
checkError(err)
jobNs := metricproviders.GetAnalysisJobNamespace()
if jobNs == "" {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.27.5
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.36.1
github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2 v1.30.3
github.com/aws/smithy-go v1.20.1
github.com/blang/semver v3.5.1+incompatible
github.com/bombsimon/logrusr/v4 v4.1.0
github.com/evanphx/json-patch/v5 v5.9.0
Expand Down Expand Up @@ -88,7 +89,6 @@ require (
github.com/aws/aws-sdk-go-v2/service/sso v1.20.1 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.28.2 // indirect
github.com/aws/smithy-go v1.20.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
Expand Down
45 changes: 31 additions & 14 deletions metricproviders/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,20 @@ var (
)

type JobProvider struct {
kubeclientset kubernetes.Interface
jobLister batchlisters.JobLister
logCtx log.Entry
jobNamespace string
kubeclientset kubernetes.Interface
jobLister batchlisters.JobLister
logCtx log.Entry
jobNamespace string
customJobKubeconfig bool
}

func NewJobProvider(logCtx log.Entry, kubeclientset kubernetes.Interface, jobLister batchlisters.JobLister, jobNS string) *JobProvider {
func NewJobProvider(logCtx log.Entry, kubeclientset kubernetes.Interface, jobLister batchlisters.JobLister, jobNS string, customJobKubeconfig bool) *JobProvider {
return &JobProvider{
kubeclientset: kubeclientset,
logCtx: logCtx,
jobLister: jobLister,
jobNamespace: jobNS,
kubeclientset: kubeclientset,
logCtx: logCtx,
jobLister: jobLister,
jobNamespace: jobNS,
customJobKubeconfig: customJobKubeconfig,
}
}

Expand Down Expand Up @@ -85,7 +87,7 @@ func getJobIDSuffix(run *v1alpha1.AnalysisRun, metricName string) int {
return int(res.Count + res.Error + 1)
}

func newMetricJob(run *v1alpha1.AnalysisRun, metric v1alpha1.Metric, jobNS string) (*batchv1.Job, error) {
func newMetricJob(run *v1alpha1.AnalysisRun, metric v1alpha1.Metric, jobNS string, customJobKubeconfig bool) (*batchv1.Job, error) {
ns := run.Namespace
if jobNS != "" {
ns = jobNS
Expand All @@ -102,11 +104,17 @@ func newMetricJob(run *v1alpha1.AnalysisRun, metric v1alpha1.Metric, jobNS strin
jobAnnotations[AnalysisRunNameAnnotationKey] = run.Name
jobAnnotations[AnalysisRunNamespaceAnnotationKey] = run.Namespace
jobAnnotations[AnalysisRunMetricAnnotationKey] = metric.Name

ownerRef := []metav1.OwnerReference{*metav1.NewControllerRef(run, analysisRunGVK)}

if ns != run.Namespace || customJobKubeconfig {
ownerRef = nil
}
job := batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: newJobName(run, metric),
Namespace: ns,
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(run, analysisRunGVK)},
OwnerReferences: ownerRef,
Annotations: jobAnnotations,
Labels: jobLabels,
},
Expand All @@ -122,7 +130,7 @@ func (p *JobProvider) Run(run *v1alpha1.AnalysisRun, metric v1alpha1.Metric) v1a
StartedAt: &now,
Phase: v1alpha1.AnalysisPhaseRunning,
}
job, err := newMetricJob(run, metric, p.jobNamespace)
job, err := newMetricJob(run, metric, p.jobNamespace, p.customJobKubeconfig)
if err != nil {
p.logCtx.Errorf("job initialization failed: %v", err)
return metricutil.MarkMeasurementError(measurement, err)
Expand All @@ -139,8 +147,17 @@ func (p *JobProvider) Run(run *v1alpha1.AnalysisRun, metric v1alpha1.Metric) v1a
p.logCtx.Errorf("job create (verify) %s failed: %v", job.Name, createErr)
return metricutil.MarkMeasurementError(measurement, createErr)
}
controllerRef := metav1.GetControllerOf(existingJob)
if run.UID != controllerRef.UID {
ownerUID := ""
// if custom kubeconfig or different namespace is used owner ref is absent,
// use run uid label to get owner analysis run uid
if p.customJobKubeconfig || job.Namespace != run.Namespace {
ownerUID = job.Labels[AnalysisRunUIDLabelKey]
} else {
controllerRef := metav1.GetControllerOf(existingJob)
ownerUID = string(controllerRef.UID)
}

if string(run.UID) != ownerUID {
// NOTE: we don't bother to check for semantic equality. UID is good enough
p.logCtx.Errorf("job create (uid check) %s failed: %v", job.Name, createErr)
return metricutil.MarkMeasurementError(measurement, createErr)
Expand Down
4 changes: 2 additions & 2 deletions metricproviders/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func newTestJobProvider(objects ...runtime.Object) *JobProvider {
cancel()

jobLister := k8sI.Batch().V1().Jobs().Lister()
return NewJobProvider(*logCtx, kubeclient, jobLister, "")
return NewJobProvider(*logCtx, kubeclient, jobLister, "", false)
}

func newRunWithJobMetric() *v1alpha1.AnalysisRun {
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestRunCreateCollision(t *testing.T) {
p := newTestJobProvider()
run := newRunWithJobMetric()

existingJob, err := newMetricJob(run, run.Spec.Metrics[0], p.jobNamespace)
existingJob, err := newMetricJob(run, run.Spec.Metrics[0], p.jobNamespace, p.customJobKubeconfig)
assert.NoError(t, err)
fakeClient := p.kubeclientset.(*k8sfake.Clientset)
fakeClient.Tracker().Add(existingJob)
Expand Down
13 changes: 7 additions & 6 deletions metricproviders/metricproviders.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ func (f *ProviderFactory) NewProvider(logCtx log.Entry, metric v1alpha1.Metric)
}
return prometheus.NewPrometheusProvider(api, logCtx, metric)
case job.ProviderType:
kubeClient, err := GetAnalysisJobClientset(f.KubeClient)
kubeClient, customKubeconfig, err := GetAnalysisJobClientset(f.KubeClient)
if err != nil {
return nil, err
}

return job.NewJobProvider(logCtx, kubeClient, f.JobLister, GetAnalysisJobNamespace()), nil
return job.NewJobProvider(logCtx, kubeClient, f.JobLister, GetAnalysisJobNamespace(), customKubeconfig), nil
case kayenta.ProviderType:
c := kayenta.NewHttpClient()
return kayenta.NewKayentaProvider(logCtx, c), nil
Expand Down Expand Up @@ -154,7 +154,7 @@ func Type(metric v1alpha1.Metric) string {
// if the AnalysisJobKubeconfigEnv is set to InclusterKubeconfig, it will return the incluster client
// else if it's set to a kubeconfig file it will return the clientset corresponding to the kubeconfig file.
// If empty it returns the provided defaultClientset
func GetAnalysisJobClientset(defaultClientset kubernetes.Interface) (kubernetes.Interface, error) {
func GetAnalysisJobClientset(defaultClientset kubernetes.Interface) (kubernetes.Interface, bool, error) {
customJobKubeconfig := os.Getenv(AnalysisJobKubeconfigEnv)
if customJobKubeconfig != "" {
var (
Expand All @@ -167,11 +167,12 @@ func GetAnalysisJobClientset(defaultClientset kubernetes.Interface) (kubernetes.
cfg, err = clientcmd.BuildConfigFromFlags("", customJobKubeconfig)
}
if err != nil {
return nil, err
return nil, true, err
}
return kubernetes.NewForConfig(cfg)
clientSet, err := kubernetes.NewForConfig(cfg)
return clientSet, true, err
}
return defaultClientset, nil
return defaultClientset, false, nil
}

func GetAnalysisJobNamespace() string {
Expand Down
23 changes: 16 additions & 7 deletions utils/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func EnqueueRateLimited(obj any, q workqueue.RateLimitingInterface) {
// It then enqueues that ownerType resource to be processed. If the object does not
// have an appropriate OwnerReference, it will simply be skipped.
// This function assumes parent object is in the same namespace as the child
func EnqueueParentObject(obj any, ownerType string, enqueue func(obj any), parentNamespaceGetter ...func(any) string) {
func EnqueueParentObject(obj any, ownerType string, enqueue func(obj any), parentGetter ...func(any) (*metav1.OwnerReference, string)) {
var object metav1.Object
var ok bool
if object, ok = obj.(metav1.Object); !ok {
Expand All @@ -239,16 +239,25 @@ func EnqueueParentObject(obj any, ownerType string, enqueue func(obj any), paren
log.Infof("Recovered deleted object '%s' from tombstone", object.GetName())
}

if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
var (
ownerRef *metav1.OwnerReference
namespace string
)

if len(parentGetter) > 0 {
ownerRef, namespace = parentGetter[0](obj)
} else {
ownerRef = metav1.GetControllerOf(object)
}

if ownerRef != nil {
// If this object is not owned by the ownerType, we should not do anything more with it.
if ownerRef.Kind != ownerType {
return
}
namespace := object.GetNamespace()
if len(parentNamespaceGetter) > 0 {
// If the parentNamespaceGetter is provided, use it to get the parent namespace
// only uses the first parentNamespaceGetter func
namespace = parentNamespaceGetter[0](obj)
// if namespace not set by parentGetter use object namespace
if namespace == "" {
namespace = object.GetNamespace()
}
parent := cache.ExplicitKey(namespace + "/" + ownerRef.Name)
log.Infof("Enqueueing parent of %s/%s: %s %s", namespace, object.GetName(), ownerRef.Kind, parent)
Expand Down

0 comments on commit 0f52c93

Please sign in to comment.