Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(#3393): Update to CronJob batch/v1 #3402

Merged
merged 2 commits into from
Jul 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions e2e/common/cli/install_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
"github.com/apache/camel-k/pkg/util/defaults"
"github.com/apache/camel-k/pkg/util/kubernetes"
"github.com/apache/camel-k/pkg/util/openshift"
console "github.com/openshift/api/console/v1"
consolev1 "github.com/openshift/api/console/v1"
corev1 "k8s.io/api/core/v1"
)

Expand Down Expand Up @@ -98,11 +98,11 @@ func TestConsoleCliDownload(t *testing.T) {
ocp, err := openshift.IsOpenShift(TestClient())
assert.Nil(t, err)

ok, err := kubernetes.IsAPIResourceInstalled(TestClient(), "console.openshift.io/v1", reflect.TypeOf(console.ConsoleCLIDownload{}).Name())
ok, err := kubernetes.IsAPIResourceInstalled(TestClient(), "console.openshift.io/v1", reflect.TypeOf(consolev1.ConsoleCLIDownload{}).Name())
assert.Nil(t, err)

if !ocp || !ok {
t.Skip("This test requires ConsoleCliDownload object which is available on OpenShift 4 only.")
t.Skip("This test requires ConsoleCliDownload object which is available on OpenShift 4+ only.")
return
}

Expand Down
12 changes: 12 additions & 0 deletions e2e/common/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,29 @@ limitations under the License.
package common

import (
"reflect"
"testing"

. "github.com/onsi/gomega"
"github.com/stretchr/testify/assert"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"

. "github.com/apache/camel-k/e2e/support"
v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
"github.com/apache/camel-k/pkg/util/kubernetes"
)

func TestRunCronExample(t *testing.T) {
ok, err := kubernetes.IsAPIResourceInstalled(TestClient(), batchv1.SchemeGroupVersion.Group, reflect.TypeOf(batchv1.CronJob{}).Name())
assert.Nil(t, err)

if !ok {
t.Skip("This test requires CronJob batch/v1 API installed.")
return
}

WithNewTestNamespace(t, func(ns string) {
Expect(Kamel("install", "-n", ns).Execute()).To(Succeed())

Expand Down
10 changes: 5 additions & 5 deletions e2e/support/test_support.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
"github.com/pkg/errors"
"github.com/spf13/cobra"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/api/batch/v1beta1"
batchv1 "k8s.io/api/batch/v1"
coordination "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
Expand Down Expand Up @@ -599,12 +599,12 @@ func RouteStatus(ns string, name string) func() string {
}
}

func IntegrationCronJob(ns string, name string) func() *v1beta1.CronJob {
return func() *v1beta1.CronJob {
lst := v1beta1.CronJobList{
func IntegrationCronJob(ns string, name string) func() *batchv1.CronJob {
return func() *batchv1.CronJob {
lst := batchv1.CronJobList{
TypeMeta: metav1.TypeMeta{
Kind: "CronJob",
APIVersion: v1beta1.SchemeGroupVersion.String(),
APIVersion: batchv1.SchemeGroupVersion.String(),
},
}
err := TestClient().List(TestContext, &lst,
Expand Down
27 changes: 19 additions & 8 deletions pkg/cmd/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"math/rand"
"os"
"reflect"
"runtime"
"strconv"
"strings"
Expand All @@ -32,10 +33,10 @@ import (

appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
coordination "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
Expand Down Expand Up @@ -193,6 +194,22 @@ func Run(healthPort, monitoringPort int32, leaderElection bool, leaderElectionID
exitOnError(err, "cannot create Integration label selector")
selector := labels.NewSelector().Add(*hasIntegrationLabel)

selectors := cache.SelectorsByObject{
&corev1.Pod{}: {Label: selector},
&appsv1.Deployment{}: {Label: selector},
&batchv1.Job{}: {Label: selector},
&servingv1.Service{}: {Label: selector},
}

if ok, err := kubernetes.IsAPIResourceInstalled(c, batchv1.SchemeGroupVersion.String(), reflect.TypeOf(batchv1.CronJob{}).Name()); ok && err == nil {
selectors[&batchv1.CronJob{}] = struct {
Label labels.Selector
Field fields.Selector
}{
Label: selector,
}
}

mgr, err := manager.New(c.GetConfig(), manager.Options{
Namespace: watchNamespace,
EventBroadcaster: broadcaster,
Expand All @@ -205,13 +222,7 @@ func Run(healthPort, monitoringPort int32, leaderElection bool, leaderElectionID
MetricsBindAddress: ":" + strconv.Itoa(int(monitoringPort)),
NewCache: cache.BuilderWithOptions(
cache.Options{
SelectorsByObject: cache.SelectorsByObject{
&corev1.Pod{}: {Label: selector},
&appsv1.Deployment{}: {Label: selector},
&batchv1beta1.CronJob{}: {Label: selector},
&batchv1.Job{}: {Label: selector},
&servingv1.Service{}: {Label: selector},
},
SelectorsByObject: selectors,
},
),
})
Expand Down
9 changes: 6 additions & 3 deletions pkg/controller/integration/integration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"time"

appsv1 "k8s.io/api/apps/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -205,8 +205,6 @@ func add(mgr manager.Manager, c client.Client, r reconcile.Reconciler) error {
})).
// Watch for the owned Deployments
Owns(&appsv1.Deployment{}, builder.WithPredicates(StatusChangedPredicate{})).
// Watch for the owned CronJobs
Owns(&batchv1beta1.CronJob{}, builder.WithPredicates(StatusChangedPredicate{})).
// Watch for the Integration Pods
Watches(&source.Kind{Type: &corev1.Pod{}},
handler.EnqueueRequestsFromMapFunc(func(a ctrl.Object) []reconcile.Request {
Expand All @@ -225,6 +223,11 @@ func add(mgr manager.Manager, c client.Client, r reconcile.Reconciler) error {
}
}))

if ok, err := kubernetes.IsAPIResourceInstalled(c, batchv1.SchemeGroupVersion.String(), reflect.TypeOf(batchv1.CronJob{}).Name()); ok && err == nil {
// Watch for the owned CronJobs
b.Owns(&batchv1.CronJob{}, builder.WithPredicates(StatusChangedPredicate{}))
}

// Watch for the owned Knative Services conditionally
if ok, err := kubernetes.IsAPIResourceInstalled(c, servingv1.SchemeGroupVersion.String(), reflect.TypeOf(servingv1.Service{}).Name()); err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/integration/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"strconv"

appsv1 "k8s.io/api/apps/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -185,9 +185,9 @@ func (action *monitorAction) newController(ctx context.Context, env *trait.Envir
integration: integration,
}
case isConditionTrue(integration, v1.IntegrationConditionCronJobAvailable):
obj = getUpdatedController(env, &batchv1beta1.CronJob{})
obj = getUpdatedController(env, &batchv1.CronJob{})
controller = &cronJobController{
obj: obj.(*batchv1beta1.CronJob),
obj: obj.(*batchv1.CronJob),
integration: integration,
client: action.client,
context: ctx,
Expand Down
3 changes: 1 addition & 2 deletions pkg/controller/integration/monitor_cronjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"

batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
corev1 "k8s.io/api/core/v1"

ctrl "sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -33,7 +32,7 @@ import (
)

type cronJobController struct {
obj *batchv1beta1.CronJob
obj *batchv1.CronJob
integration *v1.Integration
client client.Client
context context.Context
Expand Down
4 changes: 2 additions & 2 deletions pkg/trait/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"path"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/api/batch/v1beta1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/intstr"
Expand Down Expand Up @@ -298,7 +298,7 @@ func (t *containerTrait) configureContainer(e *Environment) error {
}

// CronJob
if err := e.Resources.VisitCronJobE(func(cron *v1beta1.CronJob) error {
if err := e.Resources.VisitCronJobE(func(cron *batchv1.CronJob) error {
for _, envVar := range e.EnvVars {
envvar.SetVar(&container.Env, envVar)
}
Expand Down
15 changes: 7 additions & 8 deletions pkg/trait/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"strings"

batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
Expand Down Expand Up @@ -173,7 +172,7 @@ func (t *cronTrait) Configure(e *Environment) (bool, error) {
}

if t.ConcurrencyPolicy == "" {
t.ConcurrencyPolicy = string(batchv1beta1.ForbidConcurrent)
t.ConcurrencyPolicy = string(batchv1.ForbidConcurrent)
}

if (t.Schedule == "" && t.Components == "") && t.Fallback == nil {
Expand Down Expand Up @@ -265,7 +264,7 @@ func (t *cronTrait) Apply(e *Environment) error {
return nil
}

func (t *cronTrait) getCronJobFor(e *Environment) *batchv1beta1.CronJob {
func (t *cronTrait) getCronJobFor(e *Environment) *batchv1.CronJob {
annotations := make(map[string]string)
if e.Integration.Annotations != nil {
for k, v := range filterTransferableAnnotations(e.Integration.Annotations) {
Expand All @@ -283,10 +282,10 @@ func (t *cronTrait) getCronJobFor(e *Environment) *batchv1beta1.CronJob {
backoffLimit = *t.BackoffLimit
}

cronjob := batchv1beta1.CronJob{
cronjob := batchv1.CronJob{
TypeMeta: metav1.TypeMeta{
Kind: "CronJob",
APIVersion: batchv1beta1.SchemeGroupVersion.String(),
APIVersion: batchv1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: e.Integration.Name,
Expand All @@ -296,11 +295,11 @@ func (t *cronTrait) getCronJobFor(e *Environment) *batchv1beta1.CronJob {
},
Annotations: e.Integration.Annotations,
},
Spec: batchv1beta1.CronJobSpec{
Spec: batchv1.CronJobSpec{
Schedule: t.Schedule,
ConcurrencyPolicy: batchv1beta1.ConcurrencyPolicy(t.ConcurrencyPolicy),
ConcurrencyPolicy: batchv1.ConcurrencyPolicy(t.ConcurrencyPolicy),
StartingDeadlineSeconds: t.StartingDeadlineSeconds,
JobTemplate: batchv1beta1.JobTemplateSpec{
JobTemplate: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
ActiveDeadlineSeconds: &activeDeadline,
BackoffLimit: &backoffLimit,
Expand Down
6 changes: 3 additions & 3 deletions pkg/trait/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
passert "github.com/magiconair/properties/assert"
"github.com/stretchr/testify/assert"

batchv1beta1 "k8s.io/api/batch/v1beta1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -428,7 +428,7 @@ func TestCronWithActiveDeadline(t *testing.T) {
assert.Nil(t, ct.Fallback)
assert.Contains(t, environment.Interceptors, "cron")

cronJob := environment.Resources.GetCronJob(func(job *batchv1beta1.CronJob) bool { return true })
cronJob := environment.Resources.GetCronJob(func(job *batchv1.CronJob) bool { return true })
assert.NotNil(t, cronJob)

assert.NotNil(t, cronJob.Spec.JobTemplate.Spec.ActiveDeadlineSeconds)
Expand Down Expand Up @@ -501,7 +501,7 @@ func TestCronWithBackoffLimit(t *testing.T) {
assert.Nil(t, ct.Fallback)
assert.Contains(t, environment.Interceptors, "cron")

cronJob := environment.Resources.GetCronJob(func(job *batchv1beta1.CronJob) bool { return true })
cronJob := environment.Resources.GetCronJob(func(job *batchv1.CronJob) bool { return true })
assert.NotNil(t, cronJob)

assert.NotNil(t, cronJob.Spec.JobTemplate.Spec.ActiveDeadlineSeconds)
Expand Down
4 changes: 2 additions & 2 deletions pkg/trait/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"strings"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/api/batch/v1beta1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/utils/pointer"
Expand Down Expand Up @@ -119,7 +119,7 @@ func (t *mountTrait) Apply(e *Environment) error {
}

// CronJob
if err := e.Resources.VisitCronJobE(func(cron *v1beta1.CronJob) error {
if err := e.Resources.VisitCronJobE(func(cron *batchv1.CronJob) error {
volumes = &cron.Spec.JobTemplate.Spec.Template.Spec.Volumes
visited = true
return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/trait/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"fmt"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/api/batch/v1beta1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/strategicpatch"
Expand Down Expand Up @@ -71,7 +71,7 @@ func (t *podTrait) Apply(e *Environment) error {
}
switch strategy {
case ControllerStrategyCronJob:
e.Resources.VisitCronJob(func(c *v1beta1.CronJob) {
e.Resources.VisitCronJob(func(c *batchv1.CronJob) {
if c.Name == e.Integration.Name {
if patchedPodSpec, err = t.applyChangesTo(&c.Spec.JobTemplate.Spec.Template.Spec, changes); err == nil {
c.Spec.JobTemplate.Spec.Template.Spec = *patchedPodSpec
Expand Down
8 changes: 4 additions & 4 deletions pkg/trait/test_support.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
serving "knative.dev/serving/pkg/apis/serving/v1"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/api/batch/v1beta1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -93,12 +93,12 @@ func createNominalKnativeServiceTraitTest() (*Environment, *serving.Service) {
return environment, knativeService
}

func createNominalCronJobTraitTest() (*Environment, *v1beta1.CronJob) {
cronJob := &v1beta1.CronJob{
func createNominalCronJobTraitTest() (*Environment, *batchv1.CronJob) {
cronJob := &batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: "integration-name",
},
Spec: v1beta1.CronJobSpec{},
Spec: batchv1.CronJobSpec{},
}

environment := &Environment{
Expand Down
4 changes: 2 additions & 2 deletions pkg/trait/trait_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/pkg/errors"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/api/batch/v1beta1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -326,7 +326,7 @@ func (e *Environment) GetIntegrationPodSpec() *corev1.PodSpec {
}

// Cronjob
cronJob := e.Resources.GetCronJob(func(c *v1beta1.CronJob) bool {
cronJob := e.Resources.GetCronJob(func(c *batchv1.CronJob) bool {
return c.Name == e.Integration.Name
})
if cronJob != nil {
Expand Down
Loading