diff --git a/pkg/reconciler/subscription/controller.go b/pkg/reconciler/subscription/controller.go index 295e8cb232f..87908e43b48 100644 --- a/pkg/reconciler/subscription/controller.go +++ b/pkg/reconciler/subscription/controller.go @@ -1,5 +1,5 @@ /* -Copyright 2019 The Knative Authors +Copyright 2020 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -25,11 +25,11 @@ import ( "knative.dev/pkg/resolver" "knative.dev/pkg/tracker" - messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" "knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelablecombined" - "knative.dev/eventing/pkg/client/injection/informers/messaging/v1beta1/channel" - "knative.dev/eventing/pkg/client/injection/informers/messaging/v1beta1/subscription" - subscriptionreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1beta1/subscription" + "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/channel" + "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription" + subscriptionreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/subscription" "knative.dev/eventing/pkg/duck" "knative.dev/pkg/injection/clients/dynamicclient" ) @@ -67,7 +67,7 @@ func NewController( // populated. controller.EnsureTypeMeta( r.tracker.OnChanged, - messagingv1beta1.SchemeGroupVersion.WithKind("Channel"), + messagingv1.SchemeGroupVersion.WithKind("Channel"), ), )) diff --git a/pkg/reconciler/subscription/controller_test.go b/pkg/reconciler/subscription/controller_test.go index b956c556c6a..adb8c0c0be4 100644 --- a/pkg/reconciler/subscription/controller_test.go +++ b/pkg/reconciler/subscription/controller_test.go @@ -1,5 +1,5 @@ /* -Copyright 2019 The Knative Authors +Copyright 2020 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -25,8 +25,8 @@ import ( // Fake injection informers _ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelable/fake" _ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelablecombined/fake" - _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1beta1/channel/fake" - _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1beta1/subscription/fake" + _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/channel/fake" + _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake" _ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake" ) diff --git a/pkg/reconciler/subscription/subscription.go b/pkg/reconciler/subscription/subscription.go index d9e94ea25fa..c8930c7046c 100644 --- a/pkg/reconciler/subscription/subscription.go +++ b/pkg/reconciler/subscription/subscription.go @@ -1,5 +1,5 @@ /* -Copyright 2018 The Knative Authors +Copyright 2020 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -35,13 +35,13 @@ import ( "knative.dev/pkg/resolver" "knative.dev/pkg/tracker" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" "knative.dev/eventing/pkg/apis/messaging" v1 "knative.dev/eventing/pkg/apis/messaging/v1" - "knative.dev/eventing/pkg/apis/messaging/v1beta1" - subscriptionreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1beta1/subscription" - listers "knative.dev/eventing/pkg/client/listers/messaging/v1beta1" + subscriptionreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/subscription" + listers "knative.dev/eventing/pkg/client/listers/messaging/v1" eventingduck "knative.dev/eventing/pkg/duck" "knative.dev/eventing/pkg/logging" ) @@ -58,8 +58,7 @@ const ( ) var ( - v1beta1ChannelGVK = v1beta1.SchemeGroupVersion.WithKind("Channel") - v1ChannelGVK = v1.SchemeGroupVersion.WithKind("Channel") + v1ChannelGVK = v1.SchemeGroupVersion.WithKind("Channel") ) func newChannelWarnEvent(messageFmt string, args ...interface{}) pkgreconciler.Event { @@ -85,7 +84,7 @@ var _ subscriptionreconciler.Interface = (*Reconciler)(nil) var _ subscriptionreconciler.Finalizer = (*Reconciler)(nil) // ReconcileKind implements Interface.ReconcileKind. -func (r *Reconciler) ReconcileKind(ctx context.Context, subscription *v1beta1.Subscription) pkgreconciler.Event { +func (r *Reconciler) ReconcileKind(ctx context.Context, subscription *v1.Subscription) pkgreconciler.Event { // Find the channel for this subscription. channel, err := r.getChannel(ctx, subscription) if err != nil { @@ -116,7 +115,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, subscription *v1beta1.Su return nil } -func (r *Reconciler) FinalizeKind(ctx context.Context, subscription *v1beta1.Subscription) pkgreconciler.Event { +func (r *Reconciler) FinalizeKind(ctx context.Context, subscription *v1.Subscription) pkgreconciler.Event { channel, err := r.getChannel(ctx, subscription) if err != nil { // If the channel was deleted (i.e., error == notFound), just return nil so that @@ -133,8 +132,8 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, subscription *v1beta1.Sub return nil } -func (r Reconciler) checkChannelStatusForSubscription(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1beta1.Subscription) pkgreconciler.Event { - ss, err := r.getSubStatus(sub, channel) +func (r Reconciler) checkChannelStatusForSubscription(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1.Subscription) pkgreconciler.Event { + ss, err := r.getSubStatus(ctx, sub, channel) if err != nil { logging.FromContext(ctx).Warn("Failed to get subscription status.", zap.Error(err)) sub.Status.MarkChannelUnknown(subscriptionNotMarkedReadyByChannel, "Failed to get subscription status: %s", err) @@ -153,7 +152,7 @@ func (r Reconciler) checkChannelStatusForSubscription(ctx context.Context, chann return nil } -func (r Reconciler) syncChannel(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1beta1.Subscription) pkgreconciler.Event { +func (r Reconciler) syncChannel(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1.Subscription) pkgreconciler.Event { // Ok, now that we have the Channel and at least one of the Call/Result, let's reconcile // the Channel with this information. if patched, err := r.syncPhysicalChannel(ctx, sub, channel, false); err != nil { @@ -174,7 +173,7 @@ func (r Reconciler) syncChannel(ctx context.Context, channel *eventingduckv1alph return nil } -func (r *Reconciler) resolveSubscriptionURIs(ctx context.Context, subscription *v1beta1.Subscription) pkgreconciler.Event { +func (r *Reconciler) resolveSubscriptionURIs(ctx context.Context, subscription *v1.Subscription) pkgreconciler.Event { // Everything that was supposed to be resolved was, so flip the status bit on that. subscription.Status.MarkReferencesResolvedUnknown("Resolving", "Subscription resolution interrupted.") @@ -195,7 +194,7 @@ func (r *Reconciler) resolveSubscriptionURIs(ctx context.Context, subscription * return nil } -func (r *Reconciler) resolveSubscriber(ctx context.Context, subscription *v1beta1.Subscription) pkgreconciler.Event { +func (r *Reconciler) resolveSubscriber(ctx context.Context, subscription *v1.Subscription) pkgreconciler.Event { // Resolve Subscriber. subscriber := subscription.Spec.Subscriber.DeepCopy() if !isNilOrEmptyDestination(subscriber) { @@ -222,7 +221,7 @@ func (r *Reconciler) resolveSubscriber(ctx context.Context, subscription *v1beta return nil } -func (r *Reconciler) resolveReply(ctx context.Context, subscription *v1beta1.Subscription) pkgreconciler.Event { +func (r *Reconciler) resolveReply(ctx context.Context, subscription *v1.Subscription) pkgreconciler.Event { // Resolve Reply. reply := subscription.Spec.Reply.DeepCopy() if !isNilOrEmptyDestination(reply) { @@ -249,7 +248,7 @@ func (r *Reconciler) resolveReply(ctx context.Context, subscription *v1beta1.Sub return nil } -func (r *Reconciler) resolveDeadLetterSink(ctx context.Context, subscription *v1beta1.Subscription) pkgreconciler.Event { +func (r *Reconciler) resolveDeadLetterSink(ctx context.Context, subscription *v1.Subscription) pkgreconciler.Event { // Resolve DeadLetterSink. delivery := subscription.Spec.Delivery.DeepCopy() if !isNilOrEmptyDeliveryDeadLetterSink(delivery) { @@ -277,36 +276,38 @@ func (r *Reconciler) resolveDeadLetterSink(ctx context.Context, subscription *v1 return nil } -func (r *Reconciler) getSubStatus(subscription *v1beta1.Subscription, channel *eventingduckv1alpha1.ChannelableCombined) (eventingduckv1beta1.SubscriberStatus, error) { +func (r *Reconciler) getSubStatus(ctx context.Context, subscription *v1.Subscription, channel *eventingduckv1alpha1.ChannelableCombined) (eventingduckv1.SubscriberStatus, error) { if channel.Annotations != nil { if channel.Annotations[messaging.SubscribableDuckVersionAnnotation] == "v1beta1" || channel.Annotations[messaging.SubscribableDuckVersionAnnotation] == "v1" { - return r.getSubStatusV1Beta1(subscription, channel) + return r.getSubStatusV1(subscription, channel) } } - return r.getSubStatusV1Alpha1(subscription, channel) + return r.getSubStatusV1Alpha1(ctx, subscription, channel) } -func (r *Reconciler) getSubStatusV1Alpha1(subscription *v1beta1.Subscription, channel *eventingduckv1alpha1.ChannelableCombined) (eventingduckv1beta1.SubscriberStatus, error) { +func (r *Reconciler) getSubStatusV1Alpha1(ctx context.Context, subscription *v1.Subscription, channel *eventingduckv1alpha1.ChannelableCombined) (eventingduckv1.SubscriberStatus, error) { subscribableStatus := channel.Status.GetSubscribableTypeStatus() if subscribableStatus == nil { - return eventingduckv1beta1.SubscriberStatus{}, fmt.Errorf("channel.Status.SubscribableStatus is nil") + return eventingduckv1.SubscriberStatus{}, fmt.Errorf("channel.Status.SubscribableStatus is nil") } for _, sub := range subscribableStatus.Subscribers { if sub.UID == subscription.GetUID() && sub.ObservedGeneration == subscription.GetGeneration() { - return sub, nil + subv1 := eventingduckv1.SubscriberStatus{} + sub.ConvertTo(ctx, &subv1) + return subv1, nil } } - return eventingduckv1beta1.SubscriberStatus{}, fmt.Errorf("subscription %q not present in channel %q subscriber's list", subscription.Name, channel.Name) + return eventingduckv1.SubscriberStatus{}, fmt.Errorf("subscription %q not present in channel %q subscriber's list", subscription.Name, channel.Name) } -func (r *Reconciler) getSubStatusV1Beta1(subscription *v1beta1.Subscription, channel *eventingduckv1alpha1.ChannelableCombined) (eventingduckv1beta1.SubscriberStatus, error) { +func (r *Reconciler) getSubStatusV1(subscription *v1.Subscription, channel *eventingduckv1alpha1.ChannelableCombined) (eventingduckv1.SubscriberStatus, error) { for _, sub := range channel.Status.Subscribers { if sub.UID == subscription.GetUID() && sub.ObservedGeneration == subscription.GetGeneration() { - return eventingduckv1beta1.SubscriberStatus{ + return eventingduckv1.SubscriberStatus{ UID: sub.UID, ObservedGeneration: sub.ObservedGeneration, Ready: sub.Ready, @@ -314,10 +315,10 @@ func (r *Reconciler) getSubStatusV1Beta1(subscription *v1beta1.Subscription, cha }, nil } } - return eventingduckv1beta1.SubscriberStatus{}, fmt.Errorf("subscription %q not present in channel %q subscriber's list", subscription.Name, channel.Name) + return eventingduckv1.SubscriberStatus{}, fmt.Errorf("subscription %q not present in channel %q subscriber's list", subscription.Name, channel.Name) } -func (r *Reconciler) trackAndFetchChannel(ctx context.Context, sub *v1beta1.Subscription, ref corev1.ObjectReference) (runtime.Object, pkgreconciler.Event) { +func (r *Reconciler) trackAndFetchChannel(ctx context.Context, sub *v1.Subscription, ref corev1.ObjectReference) (runtime.Object, pkgreconciler.Event) { // Track the channel using the channelableTracker. // We don't need the explicitly set a channelInformer, as this will dynamically generate one for us. // This code needs to be called before checking the existence of the `channel`, in order to make sure the @@ -342,7 +343,7 @@ func (r *Reconciler) trackAndFetchChannel(ctx context.Context, sub *v1beta1.Subs // and verifies it's a channelable (so that we can operate on it via patches). // If the Channel is a channels.messaging type (hence, it's only a factory for // underlying channels), fetch and validate the "backing" channel. -func (r *Reconciler) getChannel(ctx context.Context, sub *v1beta1.Subscription) (*eventingduckv1alpha1.ChannelableCombined, pkgreconciler.Event) { +func (r *Reconciler) getChannel(ctx context.Context, sub *v1.Subscription) (*eventingduckv1alpha1.ChannelableCombined, pkgreconciler.Event) { logging.FromContext(ctx).Info("Getting channel", zap.Any("channel", sub.Spec.Channel)) // 1. Track the channel pointed by subscription. @@ -358,8 +359,7 @@ func (r *Reconciler) getChannel(ctx context.Context, sub *v1beta1.Subscription) // Test to see if the channel is Channel.messaging because it is going // to have a "backing" channel that is what we need to actually operate on // as well as keep track of. - if (v1beta1ChannelGVK.Group == gvk.Group && v1beta1ChannelGVK.Kind == gvk.Kind) || - (v1ChannelGVK.Group == gvk.Group && v1ChannelGVK.Kind == gvk.Kind) { + if v1ChannelGVK.Group == gvk.Group && v1ChannelGVK.Kind == gvk.Kind { // Track changes on Channel. // Ref: https://github.com/knative/eventing/issues/2641 // NOTE: There is a race condition with using the channelableTracker @@ -371,7 +371,7 @@ func (r *Reconciler) getChannel(ctx context.Context, sub *v1beta1.Subscription) // to the cache we intend to use to pull the Channel from. This linkage // is setup in NewController for r.tracker. if err := r.tracker.TrackReference(tracker.Reference{ - APIVersion: "messaging.knative.dev/v1beta1", + APIVersion: "messaging.knative.dev/v1", Kind: "Channel", Namespace: sub.Namespace, Name: sub.Spec.Channel.Name, @@ -412,8 +412,8 @@ func (r *Reconciler) getChannel(ctx context.Context, sub *v1beta1.Subscription) return ch.DeepCopy(), nil } -func isNilOrEmptyDeliveryDeadLetterSink(delivery *eventingduckv1beta1.DeliverySpec) bool { - return delivery == nil || equality.Semantic.DeepEqual(delivery, &eventingduckv1beta1.DeliverySpec{}) || +func isNilOrEmptyDeliveryDeadLetterSink(delivery *eventingduckv1.DeliverySpec) bool { + return delivery == nil || equality.Semantic.DeepEqual(delivery, &eventingduckv1.DeliverySpec{}) || delivery.DeadLetterSink == nil } @@ -421,7 +421,7 @@ func isNilOrEmptyDestination(destination *duckv1.Destination) bool { return destination == nil || equality.Semantic.DeepEqual(destination, &duckv1.Destination{}) } -func (r *Reconciler) syncPhysicalChannel(ctx context.Context, sub *v1beta1.Subscription, channel *eventingduckv1alpha1.ChannelableCombined, isDeleted bool) (bool, error) { +func (r *Reconciler) syncPhysicalChannel(ctx context.Context, sub *v1.Subscription, channel *eventingduckv1alpha1.ChannelableCombined, isDeleted bool) (bool, error) { logging.FromContext(ctx).Debug("Reconciling physical from Channel", zap.Any("sub", sub)) if patched, patchErr := r.patchSubscription(ctx, sub.Namespace, channel, sub); patchErr != nil { if isDeleted && apierrors.IsNotFound(patchErr) { @@ -434,7 +434,7 @@ func (r *Reconciler) syncPhysicalChannel(ctx context.Context, sub *v1beta1.Subsc } } -func (r *Reconciler) patchSubscription(ctx context.Context, namespace string, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1beta1.Subscription) (bool, error) { +func (r *Reconciler) patchSubscription(ctx context.Context, namespace string, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1.Subscription) (bool, error) { after := channel.DeepCopy() if sub.DeletionTimestamp.IsZero() { @@ -467,10 +467,10 @@ func (r *Reconciler) patchSubscription(ctx context.Context, namespace string, ch return true, nil } -func (r *Reconciler) updateChannelRemoveSubscription(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1beta1.Subscription) { +func (r *Reconciler) updateChannelRemoveSubscription(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1.Subscription) { if channel.Annotations != nil { - if channel.Annotations[messaging.SubscribableDuckVersionAnnotation] == "v1beta1" || - channel.Annotations[messaging.SubscribableDuckVersionAnnotation] == "v1" { + if channel.Annotations[messaging.SubscribableDuckVersionAnnotation] == "v1" || + channel.Annotations[messaging.SubscribableDuckVersionAnnotation] == "v1beta1" { r.updateChannelRemoveSubscriptionV1Beta1(ctx, channel, sub) return } @@ -478,7 +478,7 @@ func (r *Reconciler) updateChannelRemoveSubscription(ctx context.Context, channe r.updateChannelRemoveSubscriptionV1Alpha1(ctx, channel, sub) } -func (r *Reconciler) updateChannelRemoveSubscriptionV1Beta1(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1beta1.Subscription) { +func (r *Reconciler) updateChannelRemoveSubscriptionV1Beta1(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1.Subscription) { for i, v := range channel.Spec.Subscribers { if v.UID == sub.UID { channel.Spec.Subscribers = append( @@ -489,7 +489,7 @@ func (r *Reconciler) updateChannelRemoveSubscriptionV1Beta1(ctx context.Context, } } -func (r *Reconciler) updateChannelRemoveSubscriptionV1Alpha1(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1beta1.Subscription) { +func (r *Reconciler) updateChannelRemoveSubscriptionV1Alpha1(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1.Subscription) { if channel.Spec.Subscribable == nil { return } @@ -504,7 +504,7 @@ func (r *Reconciler) updateChannelRemoveSubscriptionV1Alpha1(ctx context.Context } } -func (r *Reconciler) updateChannelAddSubscription(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1beta1.Subscription) { +func (r *Reconciler) updateChannelAddSubscription(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1.Subscription) { if channel.Annotations != nil { if channel.Annotations[messaging.SubscribableDuckVersionAnnotation] == "v1beta1" || channel.Annotations[messaging.SubscribableDuckVersionAnnotation] == "v1" { @@ -515,7 +515,7 @@ func (r *Reconciler) updateChannelAddSubscription(ctx context.Context, channel * r.updateChannelAddSubscriptionV1Alpha1(ctx, channel, sub) } -func (r *Reconciler) updateChannelAddSubscriptionV1Alpha1(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1beta1.Subscription) { +func (r *Reconciler) updateChannelAddSubscriptionV1Alpha1(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1.Subscription) { if channel.Spec.Subscribable == nil { channel.Spec.Subscribable = &eventingduckv1alpha1.Subscribable{ Subscribers: []eventingduckv1alpha1.SubscriberSpec{{ @@ -551,7 +551,7 @@ func (r *Reconciler) updateChannelAddSubscriptionV1Alpha1(ctx context.Context, c }) } -func (r *Reconciler) updateChannelAddSubscriptionV1Beta1(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1beta1.Subscription) { +func (r *Reconciler) updateChannelAddSubscriptionV1Beta1(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1.Subscription) { // Look to update subscriber. for i, v := range channel.Spec.Subscribers { if v.UID == sub.UID { diff --git a/pkg/reconciler/subscription/subscription_test.go b/pkg/reconciler/subscription/subscription_test.go index b785e2b9203..d79aeb517cb 100644 --- a/pkg/reconciler/subscription/subscription_test.go +++ b/pkg/reconciler/subscription/subscription_test.go @@ -1,5 +1,5 @@ /* -Copyright 2019 The Knative Authors +Copyright 2020 The Knative Authors Licensed under the Apache License, Veroute.on 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -22,23 +22,23 @@ import ( "fmt" "testing" - eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" eventingclient "knative.dev/eventing/pkg/client/injection/client" "knative.dev/pkg/injection/clients/dynamicclient" corev1 "k8s.io/api/core/v1" - apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" clientgotesting "k8s.io/client-go/testing" + eventingduck "knative.dev/eventing/pkg/apis/duck/v1" eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" - eventingduck "knative.dev/eventing/pkg/apis/duck/v1beta1" - messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" "knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelable" "knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelablecombined" - "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1beta1/subscription" + "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/subscription" "knative.dev/eventing/pkg/duck" "knative.dev/eventing/pkg/utils" "knative.dev/pkg/apis" @@ -50,9 +50,9 @@ import ( logtesting "knative.dev/pkg/logging/testing" "knative.dev/pkg/resolver" - _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1beta1/channel/fake" - _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1beta1/inmemorychannel/fake" - . "knative.dev/eventing/pkg/reconciler/testing/v1beta1" + _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/channel/fake" + _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/inmemorychannel/fake" + . "knative.dev/eventing/pkg/reconciler/testing/v1" . "knative.dev/pkg/reconciler/testing" ) @@ -104,33 +104,20 @@ var ( Kind: "Service", } - imcV1Beta1GVK = metav1.GroupVersionKind{ + imcV1GVK = metav1.GroupVersionKind{ Group: "messaging.knative.dev", - Version: "v1beta1", + Version: "v1", Kind: "InMemoryChannel", } - channelableV1Alpha1GVK = metav1.GroupVersionKind{ - Group: "duck.knative.dev", - Version: "v1alpha1", - Kind: "Channelable", - } - - channelV1Beta1GVK = metav1.GroupVersionKind{ + channelV1GVK = metav1.GroupVersionKind{ Group: "messaging.knative.dev", - Version: "v1beta1", + Version: "v1", Kind: "Channel", } - channelableV1Alpha1KRef = duckv1.KReference{ - APIVersion: "duck.knative.dev/v1alpha1", - Kind: "Channelable", - Namespace: testNS, - Name: channelName, - } - - imcV1Beta1KRef = duckv1.KReference{ - APIVersion: "messaging.knative.dev/v1beta1", + imcV1KRef = duckv1.KReference{ + APIVersion: "messaging.knative.dev/v1", Kind: "InMemoryChannel", Namespace: testNS, Name: channelName, @@ -139,11 +126,11 @@ var ( func init() { // Add types to scheme - _ = eventingv1beta1.AddToScheme(scheme.Scheme) + _ = eventingv1.AddToScheme(scheme.Scheme) _ = duckv1alpha1.AddToScheme(scheme.Scheme) _ = eventingduckv1alpha1.AddToScheme(scheme.Scheme) - _ = apiextensionsv1beta1.AddToScheme(scheme.Scheme) - _ = messagingv1beta1.AddToScheme(scheme.Scheme) + _ = apiextensionsv1.AddToScheme(scheme.Scheme) + _ = messagingv1.AddToScheme(scheme.Scheme) } func TestAllCases(t *testing.T) { @@ -161,9 +148,9 @@ func TestAllCases(t *testing.T) { Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), - WithSubscriptionReply(imcV1Beta1GVK, replyName, testNS), + WithSubscriptionReply(imcV1GVK, replyName, testNS), WithInitSubscriptionConditions, WithSubscriptionFinalizers(finalizerName), MarkReferencesResolved, @@ -211,9 +198,9 @@ func TestAllCases(t *testing.T) { WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), - WithSubscriptionReply(imcV1Beta1GVK, replyName, testNS), + WithSubscriptionReply(imcV1GVK, replyName, testNS), WithInitSubscriptionConditions, WithSubscriptionFinalizers(finalizerName), WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), @@ -227,7 +214,7 @@ func TestAllCases(t *testing.T) { Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), ), NewUnstructured(subscriberGVK, subscriberName, testNS), @@ -240,7 +227,7 @@ func TestAllCases(t *testing.T) { WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), // The first reconciliation will initialize the status conditions. WithInitSubscriptionConditions, @@ -255,7 +242,7 @@ func TestAllCases(t *testing.T) { Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), ), NewUnstructured(subscriberGVK, subscriberName, testNS), @@ -272,7 +259,7 @@ func TestAllCases(t *testing.T) { WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), // The first reconciliation will initialize the status conditions. WithInitSubscriptionConditions, @@ -287,7 +274,7 @@ func TestAllCases(t *testing.T) { Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), ), NewUnstructured(subscriberGVK, subscriberName, testNS), @@ -304,7 +291,7 @@ func TestAllCases(t *testing.T) { WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), // The first reconciliation will initialize the status conditions. WithInitSubscriptionConditions, @@ -319,7 +306,7 @@ func TestAllCases(t *testing.T) { Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), ), NewInMemoryChannel(channelName, testNS, @@ -335,7 +322,7 @@ func TestAllCases(t *testing.T) { WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), // The first reconciliation will initialize the status conditions. WithInitSubscriptionConditions, @@ -350,9 +337,9 @@ func TestAllCases(t *testing.T) { Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), - WithSubscriptionReply(imcV1Beta1GVK, replyName, testNS), + WithSubscriptionReply(imcV1GVK, replyName, testNS), ), NewUnstructured(subscriberGVK, subscriberName, testNS, WithUnstructuredAddressable(subscriberDNS)), @@ -364,18 +351,18 @@ func TestAllCases(t *testing.T) { Key: testNS + "/" + subscriptionName, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", subscriptionName), - Eventf(corev1.EventTypeWarning, "ReplyResolveFailed", "Failed to resolve spec.reply: failed to get ref &ObjectReference{Kind:InMemoryChannel,Namespace:testnamespace,Name:reply,UID:,APIVersion:messaging.knative.dev/v1beta1,ResourceVersion:,FieldPath:,}: inmemorychannels.messaging.knative.dev %q not found", replyName), + Eventf(corev1.EventTypeWarning, "ReplyResolveFailed", "Failed to resolve spec.reply: failed to get ref &ObjectReference{Kind:InMemoryChannel,Namespace:testnamespace,Name:reply,UID:,APIVersion:messaging.knative.dev/v1,ResourceVersion:,FieldPath:,}: inmemorychannels.messaging.knative.dev %q not found", replyName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), - WithSubscriptionReply(imcV1Beta1GVK, replyName, testNS), + WithSubscriptionReply(imcV1GVK, replyName, testNS), // The first reconciliation will initialize the status conditions. WithInitSubscriptionConditions, WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), - WithSubscriptionReferencesNotResolved(replyResolveFailed, fmt.Sprintf("Failed to resolve spec.reply: failed to get ref &ObjectReference{Kind:InMemoryChannel,Namespace:testnamespace,Name:reply,UID:,APIVersion:messaging.knative.dev/v1beta1,ResourceVersion:,FieldPath:,}: inmemorychannels.messaging.knative.dev %q not found", replyName)), + WithSubscriptionReferencesNotResolved(replyResolveFailed, fmt.Sprintf("Failed to resolve spec.reply: failed to get ref &ObjectReference{Kind:InMemoryChannel,Namespace:testnamespace,Name:reply,UID:,APIVersion:messaging.knative.dev/v1,ResourceVersion:,FieldPath:,}: inmemorychannels.messaging.knative.dev %q not found", replyName)), ), }}, WantPatches: []clientgotesting.PatchActionImpl{ @@ -386,7 +373,7 @@ func TestAllCases(t *testing.T) { Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), WithSubscriptionReply(nonAddressableGVK, replyName, testNS), // reply will be a nonAddressableGVK for this test ), @@ -407,7 +394,7 @@ func TestAllCases(t *testing.T) { WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), WithSubscriptionReply(nonAddressableGVK, replyName, testNS), @@ -420,11 +407,11 @@ func TestAllCases(t *testing.T) { patchFinalizers(testNS, subscriptionName), }, }, { - Name: "v1beta1 imc, valid channel+subscriber", + Name: "v1 imc, valid channel+subscriber", Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), ), NewUnstructured(subscriberGVK, subscriberName, testNS, @@ -445,7 +432,7 @@ func TestAllCases(t *testing.T) { WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), // The first reconciliation will initialize the status conditions. WithInitSubscriptionConditions, @@ -462,52 +449,11 @@ func TestAllCases(t *testing.T) { patchFinalizers(testNS, subscriptionName), }, }, { - // Use v1alpha1 duck shaped Channelable to make sure the patch uses the v1alpha1 subscribable - Name: "v1alpha1 channelable, valid channel+subscriber", - Objects: []runtime.Object{ - NewSubscription(subscriptionName, testNS, - WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(channelableV1Alpha1GVK, channelName), - WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), - ), - NewUnstructured(subscriberGVK, subscriberName, testNS, - WithUnstructuredAddressable(subscriberDNS), - ), - NewChannelable(channelName, testNS, - WithChannelableReadySubscriber(subscriptionUID), - ), - }, - Key: testNS + "/" + subscriptionName, - WantErr: false, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", subscriptionName), - Eventf(corev1.EventTypeNormal, "SubscriberSync", "Subscription was synchronized to channel %q", channelName), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewSubscription(subscriptionName, testNS, - WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(channelableV1Alpha1GVK, channelName), - WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), - // The first reconciliation will initialize the status conditions. - WithInitSubscriptionConditions, - MarkReferencesResolved, - MarkAddedToChannel, - - WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), - ), - }}, - WantPatches: []clientgotesting.PatchActionImpl{ - patchSubscribersV1Alpha1(testNS, channelName, []eventingduckv1alpha1.SubscriberSpec{ - {UID: subscriptionUID, SubscriberURI: subscriberURI}, - }), - patchFinalizers(testNS, subscriptionName), - }, - }, { - Name: "v1beta1 imc, valid channel+subscriber+missing delivery", + Name: "v1 imc, valid channel+subscriber+missing delivery", Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), WithSubscriptionDeliveryRef(subscriberGVK, dlcName, testNS), ), @@ -529,7 +475,7 @@ func TestAllCases(t *testing.T) { WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), WithSubscriptionDeliveryRef(subscriberGVK, dlcName, testNS), // The first reconciliation will initialize the status conditions. @@ -546,7 +492,7 @@ func TestAllCases(t *testing.T) { Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), WithSubscriptionDeliveryRef(subscriberGVK, dlcName, testNS), ), @@ -571,7 +517,7 @@ func TestAllCases(t *testing.T) { WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), WithSubscriptionDeliveryRef(subscriberGVK, dlcName, testNS), // The first reconciliation will initialize the status conditions. @@ -589,107 +535,11 @@ func TestAllCases(t *testing.T) { patchFinalizers(testNS, subscriptionName), }, }, { - // Use v1beta1 Channel, v1alpha Channelable as backing channel to make sure that backing channel gets - // patched properly using v1alpha1 - Name: "v1beta1 channel+v1alpha1 channelable backing channel+subscriber", - Objects: []runtime.Object{ - NewSubscription(subscriptionName, testNS, - WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(channelV1Beta1GVK, channelName), - WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), - ), - NewUnstructured(subscriberGVK, subscriberName, testNS, - WithUnstructuredAddressable(subscriberDNS), - ), - NewChannel(channelName, testNS, - WithInitChannelConditions, - WithBackingChannelObjRef(&channelableV1Alpha1KRef), - WithBackingChannelReady, - WithChannelAddress("example.com"), - ), - NewChannelable(channelName, testNS, - WithChannelableReadySubscriber(subscriptionUID), - ), - }, - Key: testNS + "/" + subscriptionName, - WantErr: false, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", subscriptionName), - Eventf(corev1.EventTypeNormal, "SubscriberSync", "Subscription was synchronized to channel %q", channelName), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewSubscription(subscriptionName, testNS, - WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(channelV1Beta1GVK, channelName), - WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), - // The first reconciliation will initialize the status conditions. - WithInitSubscriptionConditions, - MarkReferencesResolved, - MarkAddedToChannel, - - WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), - ), - }}, - WantPatches: []clientgotesting.PatchActionImpl{ - patchSubscribersV1Alpha1(testNS, channelName, []eventingduckv1alpha1.SubscriberSpec{ - {UID: subscriptionUID, SubscriberURI: subscriberURI}, - }), - patchFinalizers(testNS, subscriptionName), - }, - }, { - // Use v1beta1 Channel, v1alpha Channelable as backing channel to make sure that backing channel gets - // patched properly using v1alpha1 - Name: "v1beta1 channel no annotation+v1alpha1 channelable backing channel+subscriber", - Objects: []runtime.Object{ - NewSubscription(subscriptionName, testNS, - WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(channelV1Beta1GVK, channelName), - WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), - ), - NewUnstructured(subscriberGVK, subscriberName, testNS, - WithUnstructuredAddressable(subscriberDNS), - ), - NewChannel(channelName, testNS, - WithInitChannelConditions, - WithBackingChannelObjRef(&channelableV1Alpha1KRef), - WithBackingChannelReady, - WithChannelAddress("example.com"), - WithNoAnnotations, - ), - NewChannelable(channelName, testNS, - WithChannelableReadySubscriber(subscriptionUID), - ), - }, - Key: testNS + "/" + subscriptionName, - WantErr: false, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", subscriptionName), - Eventf(corev1.EventTypeNormal, "SubscriberSync", "Subscription was synchronized to channel %q", channelName), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewSubscription(subscriptionName, testNS, - WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(channelV1Beta1GVK, channelName), - WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), - // The first reconciliation will initialize the status conditions. - WithInitSubscriptionConditions, - MarkReferencesResolved, - MarkAddedToChannel, - - WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), - ), - }}, - WantPatches: []clientgotesting.PatchActionImpl{ - patchSubscribersV1Alpha1(testNS, channelName, []eventingduckv1alpha1.SubscriberSpec{ - {UID: subscriptionUID, SubscriberURI: subscriberURI}, - }), - patchFinalizers(testNS, subscriptionName), - }}, {}, { - Name: "v1beta1 channel+v1beta1 imc backing channel+subscriber", + Name: "v1 channel+v1 imc backing channel+subscriber", Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(channelV1Beta1GVK, channelName), + WithSubscriptionChannel(channelV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), ), NewUnstructured(subscriberGVK, subscriberName, testNS, @@ -697,7 +547,7 @@ func TestAllCases(t *testing.T) { ), NewChannel(channelName, testNS, WithInitChannelConditions, - WithBackingChannelObjRef(&imcV1Beta1KRef), + WithBackingChannelObjRef(&imcV1KRef), WithBackingChannelReady, WithChannelAddress("example.com"), ), @@ -717,7 +567,7 @@ func TestAllCases(t *testing.T) { WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(channelV1Beta1GVK, channelName), + WithSubscriptionChannel(channelV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), // The first reconciliation will initialize the status conditions. WithInitSubscriptionConditions, @@ -734,11 +584,11 @@ func TestAllCases(t *testing.T) { patchFinalizers(testNS, subscriptionName), }, }, { - Name: "v1beta1 channel+backing v1beta1 imc channel not ready+subscriber", + Name: "v1 channel+backing v1 imc channel not ready+subscriber", Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(channelV1Beta1GVK, channelName), + WithSubscriptionChannel(channelV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), ), NewUnstructured(subscriberGVK, subscriberName, testNS, @@ -746,7 +596,7 @@ func TestAllCases(t *testing.T) { ), NewChannel(channelName, testNS, WithInitChannelConditions, - WithBackingChannelObjRef(&imcV1Beta1KRef), + WithBackingChannelObjRef(&imcV1KRef), WithBackingChannelReady, ), NewInMemoryChannel(channelName, testNS, @@ -764,7 +614,7 @@ func TestAllCases(t *testing.T) { WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(channelV1Beta1GVK, channelName), + WithSubscriptionChannel(channelV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), // The first reconciliation will initialize the status conditions. WithInitSubscriptionConditions, @@ -775,12 +625,12 @@ func TestAllCases(t *testing.T) { patchFinalizers(testNS, subscriptionName), }, }, { - Name: "v1beta1 imc channel+reply", + Name: "v1 imc channel+reply", Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), - WithSubscriptionReply(imcV1Beta1GVK, replyName, testNS), + WithSubscriptionChannel(imcV1GVK, channelName), + WithSubscriptionReply(imcV1GVK, replyName, testNS), ), NewInMemoryChannel(channelName, testNS, WithInitInMemoryChannelConditions, @@ -801,8 +651,8 @@ func TestAllCases(t *testing.T) { WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), - WithSubscriptionReply(imcV1Beta1GVK, replyName, testNS), + WithSubscriptionChannel(imcV1GVK, channelName), + WithSubscriptionReply(imcV1GVK, replyName, testNS), // The first reconciliation will initialize the status conditions. WithInitSubscriptionConditions, MarkReferencesResolved, @@ -817,12 +667,12 @@ func TestAllCases(t *testing.T) { patchFinalizers(testNS, subscriptionName), }, }, { - Name: "v1beta1 imc+reply - not deprecated", + Name: "v1 imc+reply - not deprecated", Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), - WithSubscriptionReply(imcV1Beta1GVK, replyName, testNS), + WithSubscriptionChannel(imcV1GVK, channelName), + WithSubscriptionReply(imcV1GVK, replyName, testNS), ), NewInMemoryChannel(channelName, testNS, WithInitInMemoryChannelConditions, @@ -843,8 +693,8 @@ func TestAllCases(t *testing.T) { WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), - WithSubscriptionReply(imcV1Beta1GVK, replyName, testNS), + WithSubscriptionChannel(imcV1GVK, channelName), + WithSubscriptionReply(imcV1GVK, replyName, testNS), // The first reconciliation will initialize the status conditions. WithInitSubscriptionConditions, MarkReferencesResolved, @@ -859,13 +709,13 @@ func TestAllCases(t *testing.T) { patchFinalizers(testNS, subscriptionName), }, }, { - Name: "v1beta1 imc+subscriber+reply", + Name: "v1 imc+subscriber+reply", Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), - WithSubscriptionReply(imcV1Beta1GVK, replyName, testNS), + WithSubscriptionReply(imcV1GVK, replyName, testNS), ), NewUnstructured(subscriberGVK, subscriberName, testNS, WithUnstructuredAddressable(subscriberDNS), @@ -889,9 +739,9 @@ func TestAllCases(t *testing.T) { WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), - WithSubscriptionReply(imcV1Beta1GVK, replyName, testNS), + WithSubscriptionReply(imcV1GVK, replyName, testNS), // The first reconciliation will initialize the status conditions. WithInitSubscriptionConditions, MarkReferencesResolved, @@ -907,12 +757,12 @@ func TestAllCases(t *testing.T) { patchFinalizers(testNS, subscriptionName), }, }, { - Name: "v1beta1 imc+valid remove reply", + Name: "v1 imc+valid remove reply", Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), WithSubscriptionGeneration(subscriptionGeneration), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), WithInitSubscriptionConditions, WithSubscriptionFinalizers(finalizerName), @@ -940,7 +790,7 @@ func TestAllCases(t *testing.T) { Object: NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), WithSubscriptionGeneration(subscriptionGeneration), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), WithInitSubscriptionConditions, WithSubscriptionFinalizers(finalizerName), @@ -955,14 +805,14 @@ func TestAllCases(t *testing.T) { }), }, }, { - Name: "v1beta1 imc+remove subscriber", + Name: "v1 imc+remove subscriber", Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), WithSubscriptionGeneration(subscriptionGeneration), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithInitSubscriptionConditions, - WithSubscriptionReply(imcV1Beta1GVK, replyName, testNS), + WithSubscriptionReply(imcV1GVK, replyName, testNS), MarkSubscriptionReady, WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), WithSubscriptionPhysicalSubscriptionReply(replyURI), @@ -990,8 +840,8 @@ func TestAllCases(t *testing.T) { Object: NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), WithSubscriptionGeneration(subscriptionGeneration), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), - WithSubscriptionReply(imcV1Beta1GVK, replyName, testNS), + WithSubscriptionChannel(imcV1GVK, channelName), + WithSubscriptionReply(imcV1GVK, replyName, testNS), WithInitSubscriptionConditions, MarkSubscriptionReady, WithSubscriptionPhysicalSubscriptionReply(replyURI), @@ -1005,11 +855,11 @@ func TestAllCases(t *testing.T) { patchFinalizers(testNS, subscriptionName), }, }, { - Name: "v1beta1 imc+subscriber as service", + Name: "v1 imc+subscriber as service", Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS), ), NewInMemoryChannel(channelName, testNS, @@ -1028,7 +878,7 @@ func TestAllCases(t *testing.T) { WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS), // The first reconciliation will initialize the status conditions. WithInitSubscriptionConditions, @@ -1044,17 +894,17 @@ func TestAllCases(t *testing.T) { patchFinalizers(testNS, subscriptionName), }, }, { - Name: "v1beta1 imc+two subscribers for a channel", + Name: "v1 imc+two subscribers for a channel", Objects: []runtime.Object{ NewSubscription("a-"+subscriptionName, testNS, WithSubscriptionUID("a-"+subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS), ), // an already rec'ed subscription NewSubscription("b-"+subscriptionName, testNS, WithSubscriptionUID("b-"+subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS), WithInitSubscriptionConditions, MarkSubscriptionReady, @@ -1077,7 +927,7 @@ func TestAllCases(t *testing.T) { WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSubscription("a-"+subscriptionName, testNS, WithSubscriptionUID("a-"+subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS), // The first reconciliation will initialize the status conditions. WithInitSubscriptionConditions, @@ -1093,11 +943,11 @@ func TestAllCases(t *testing.T) { patchFinalizers(testNS, "a-"+subscriptionName), }, }, { - Name: "v1beta1 imc+two subscribers for a channel - update delivery", + Name: "v1 imc+two subscribers for a channel - update delivery", Objects: []runtime.Object{ NewSubscription("a-"+subscriptionName, testNS, WithSubscriptionUID("a-"+subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS), WithSubscriptionDeliveryRef(subscriberGVK, dlcName, testNS), WithSubscriptionDeadLetterSinkURI(dlcURI), @@ -1108,7 +958,7 @@ func TestAllCases(t *testing.T) { // an already rec'ed subscription NewSubscription("b-"+subscriptionName, testNS, WithSubscriptionUID("b-"+subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS), WithInitSubscriptionConditions, MarkSubscriptionReady, @@ -1134,7 +984,7 @@ func TestAllCases(t *testing.T) { WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSubscription("a-"+subscriptionName, testNS, WithSubscriptionUID("a-"+subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS), WithSubscriptionDeliveryRef(subscriberGVK, dlcName, testNS), // The first reconciliation will initialize the status conditions. @@ -1152,109 +1002,11 @@ func TestAllCases(t *testing.T) { patchFinalizers(testNS, "a-"+subscriptionName), }, }, { - Name: "v1alpha1 channelable+two subscribers for a channel", - Objects: []runtime.Object{ - NewSubscription("a-"+subscriptionName, testNS, - WithSubscriptionUID("a-"+subscriptionUID), - WithSubscriptionChannel(channelableV1Alpha1GVK, channelName), - WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS), - ), - // an already rec'ed subscription - NewSubscription("b-"+subscriptionName, testNS, - WithSubscriptionUID("b-"+subscriptionUID), - WithSubscriptionChannel(channelableV1Alpha1GVK, channelName), - WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS), - WithInitSubscriptionConditions, - MarkSubscriptionReady, - WithSubscriptionPhysicalSubscriptionSubscriber(serviceURIWithPath), - ), - NewChannelable(channelName, testNS, - WithChannelableSubscribers([]eventingduckv1alpha1.SubscriberSpec{{UID: "b-" + subscriptionUID}}), - WithChannelableReadySubscriber("a-"+subscriptionUID), - WithChannelableReadySubscriber("b-"+subscriptionUID), - ), - NewService(serviceName, testNS), - }, - Key: testNS + "/" + "a-" + subscriptionName, - WantErr: false, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", "a-"+subscriptionName), - Eventf(corev1.EventTypeNormal, "SubscriberSync", "Subscription was synchronized to channel %q", channelName), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewSubscription("a-"+subscriptionName, testNS, - WithSubscriptionUID("a-"+subscriptionUID), - WithSubscriptionChannel(channelableV1Alpha1GVK, channelName), - WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS), - // The first reconciliation will initialize the status conditions. - WithInitSubscriptionConditions, - MarkReferencesResolved, - MarkAddedToChannel, - WithSubscriptionPhysicalSubscriptionSubscriber(serviceURIWithPath), - ), - }}, - WantPatches: []clientgotesting.PatchActionImpl{ - patchSubscribersV1Alpha1(testNS, channelName, []eventingduckv1alpha1.SubscriberSpec{ - {UID: "b-" + subscriptionUID}, - {UID: "a-" + subscriptionUID, SubscriberURI: serviceURIWithPath}, - }), - patchFinalizers(testNS, "a-"+subscriptionName), - }, - }, { - Name: "v1alpha1 channelable+two subscribers for a channel - updated", - Objects: []runtime.Object{ - NewSubscription("a-"+subscriptionName, testNS, - WithSubscriptionUID("a-"+subscriptionUID), - WithSubscriptionChannel(channelableV1Alpha1GVK, channelName), - WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS), - ), - // an already rec'ed subscription - NewSubscription("b-"+subscriptionName, testNS, - WithSubscriptionUID("b-"+subscriptionUID), - WithSubscriptionChannel(channelableV1Alpha1GVK, channelName), - WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS), - WithInitSubscriptionConditions, - MarkSubscriptionReady, - WithSubscriptionPhysicalSubscriptionSubscriber(serviceURIWithPath), - ), - NewChannelable(channelName, testNS, - WithChannelableSubscribers([]eventingduckv1alpha1.SubscriberSpec{{UID: "b-" + subscriptionUID}, {UID: "a-" + subscriptionUID}}), - WithChannelableReadySubscriber("a-"+subscriptionUID), - WithChannelableReadySubscriber("b-"+subscriptionUID), - ), - NewService(serviceName, testNS), - }, - Key: testNS + "/" + "a-" + subscriptionName, - WantErr: false, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", "a-"+subscriptionName), - Eventf(corev1.EventTypeNormal, "SubscriberSync", "Subscription was synchronized to channel %q", channelName), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewSubscription("a-"+subscriptionName, testNS, - WithSubscriptionUID("a-"+subscriptionUID), - WithSubscriptionChannel(channelableV1Alpha1GVK, channelName), - WithSubscriptionSubscriberRef(serviceGVK, serviceName, testNS), - // The first reconciliation will initialize the status conditions. - WithInitSubscriptionConditions, - MarkReferencesResolved, - MarkAddedToChannel, - WithSubscriptionPhysicalSubscriptionSubscriber(serviceURIWithPath), - ), - }}, - WantPatches: []clientgotesting.PatchActionImpl{ - patchSubscribersV1Alpha1(testNS, channelName, []eventingduckv1alpha1.SubscriberSpec{ - {UID: "b-" + subscriptionUID}, - {UID: "a-" + subscriptionUID, SubscriberURI: serviceURIWithPath}, - }), - patchFinalizers(testNS, "a-"+subscriptionName), - }, - }, { - Name: "v1beta1 imc+deleted - channel patch succeeded", + Name: "v1 imc+deleted - channel patch succeeded", Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), WithInitSubscriptionConditions, MarkSubscriptionReady, @@ -1283,44 +1035,12 @@ func TestAllCases(t *testing.T) { patchSubscribers(testNS, channelName, nil), patchRemoveFinalizers(testNS, subscriptionName), }, - }, { - Name: "v1alpha1 channelable+deleted - channel patch succeeded", - Objects: []runtime.Object{ - NewSubscription(subscriptionName, testNS, - WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(channelableV1Alpha1GVK, channelName), - WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), - WithInitSubscriptionConditions, - MarkSubscriptionReady, - WithSubscriptionFinalizers(finalizerName), - WithSubscriptionPhysicalSubscriptionSubscriber(serviceURI), - WithSubscriptionDeleted, - ), - NewUnstructured(subscriberGVK, subscriberName, testNS, - WithUnstructuredAddressable(subscriberDNS), - ), - NewChannelable(channelName, testNS, - WithChannelableSubscribers([]eventingduckv1alpha1.SubscriberSpec{ - {UID: subscriptionUID, SubscriberURI: subscriberURI}, - }), - ), - }, - Key: testNS + "/" + subscriptionName, - WantErr: false, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", subscriptionName), - Eventf(corev1.EventTypeNormal, "SubscriberRemoved", "Subscription was removed from channel \"origin\""), - }, - WantPatches: []clientgotesting.PatchActionImpl{ - patchSubscribersV1Alpha1(testNS, channelName, nil), - patchRemoveFinalizers(testNS, subscriptionName), - }, }, { Name: "subscription not deleted - channel patch fails", Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), WithInitSubscriptionConditions, MarkSubscriptionReady, @@ -1350,7 +1070,7 @@ func TestAllCases(t *testing.T) { WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithInitSubscriptionConditions, MarkSubscriptionReady, MarkNotAddedToChannel("PhysicalChannelSyncFailed", "Failed to sync physical Channel: inducing failure for patch inmemorychannels"), @@ -1368,9 +1088,9 @@ func TestAllCases(t *testing.T) { Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), - WithSubscriptionChannel(imcV1Beta1GVK, channelName), + WithSubscriptionChannel(imcV1GVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), - WithSubscriptionReply(imcV1Beta1GVK, replyName, testNS), + WithSubscriptionReply(imcV1GVK, replyName, testNS), WithInitSubscriptionConditions, MarkSubscriptionReady, WithSubscriptionFinalizers(finalizerName), diff --git a/pkg/reconciler/testing/v1/service.go b/pkg/reconciler/testing/v1/service.go new file mode 100644 index 00000000000..1eb8817d1dc --- /dev/null +++ b/pkg/reconciler/testing/v1/service.go @@ -0,0 +1,62 @@ +/* +Copyright 2020 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// ServiceOption enables further configuration of a Service. +type ServiceOption func(*corev1.Service) + +// NewService creates a Service with ServiceOptions +func NewService(name, namespace string, so ...ServiceOption) *corev1.Service { + s := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: corev1.ServiceSpec{}, + } + for _, opt := range so { + opt(s) + } + return s +} + +func WithServiceOwnerReferences(ownerReferences []metav1.OwnerReference) ServiceOption { + return func(s *corev1.Service) { + s.OwnerReferences = ownerReferences + } +} + +func WithServiceLabels(labels map[string]string) ServiceOption { + return func(s *corev1.Service) { + s.ObjectMeta.Labels = labels + s.Spec.Selector = labels + } +} + +func WithServicePorts(ports []corev1.ServicePort) ServiceOption { + return func(s *corev1.Service) { + s.Spec.Ports = ports + } +} + +func WithServiceAnnotations(annotations map[string]string) ServiceOption { + return func(s *corev1.Service) { + s.ObjectMeta.Annotations = annotations + } +} diff --git a/pkg/reconciler/testing/v1/subscription.go b/pkg/reconciler/testing/v1/subscription.go new file mode 100644 index 00000000000..d3f4415e26c --- /dev/null +++ b/pkg/reconciler/testing/v1/subscription.go @@ -0,0 +1,233 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "context" + "errors" + "time" + + "k8s.io/apimachinery/pkg/types" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + v1 "knative.dev/eventing/pkg/apis/messaging/v1" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" +) + +// SubscriptionOption enables further configuration of a Subscription. +type SubscriptionOption func(*v1.Subscription) + +// NewSubscription creates a Subscription with SubscriptionOptions +func NewSubscription(name, namespace string, so ...SubscriptionOption) *v1.Subscription { + s := &v1.Subscription{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + for _, opt := range so { + opt(s) + } + s.SetDefaults(context.Background()) + return s +} + +// NewSubscriptionWithoutNamespace creates a Subscription with SubscriptionOptions but without a specific namespace +func NewSubscriptionWithoutNamespace(name string, so ...SubscriptionOption) *v1.Subscription { + s := &v1.Subscription{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + } + for _, opt := range so { + opt(s) + } + s.SetDefaults(context.Background()) + return s +} + +func WithSubscriptionUID(uid types.UID) SubscriptionOption { + return func(s *v1.Subscription) { + s.UID = uid + } +} + +func WithSubscriptionGeneration(gen int64) SubscriptionOption { + return func(s *v1.Subscription) { + s.Generation = gen + } +} + +func WithSubscriptionStatusObservedGeneration(gen int64) SubscriptionOption { + return func(s *v1.Subscription) { + s.Status.ObservedGeneration = gen + } +} + +func WithSubscriptionGenerateName(generateName string) SubscriptionOption { + return func(c *v1.Subscription) { + c.ObjectMeta.GenerateName = generateName + } +} + +// WithInitSubscriptionConditions initializes the Subscriptions's conditions. +func WithInitSubscriptionConditions(s *v1.Subscription) { + s.Status.InitializeConditions() +} + +func WithSubscriptionReady(s *v1.Subscription) { + s.Status = *eventingv1.TestHelper.ReadySubscriptionStatus() +} + +// TODO: this can be a runtime object +func WithSubscriptionDeleted(s *v1.Subscription) { + t := metav1.NewTime(time.Unix(1e9, 0)) + s.ObjectMeta.SetDeletionTimestamp(&t) +} + +func WithSubscriptionOwnerReferences(ownerReferences []metav1.OwnerReference) SubscriptionOption { + return func(c *v1.Subscription) { + c.ObjectMeta.OwnerReferences = ownerReferences + } +} + +func WithSubscriptionLabels(labels map[string]string) SubscriptionOption { + return func(c *v1.Subscription) { + c.ObjectMeta.Labels = labels + } +} + +func WithSubscriptionChannel(gvk metav1.GroupVersionKind, name string) SubscriptionOption { + return func(s *v1.Subscription) { + s.Spec.Channel = corev1.ObjectReference{ + APIVersion: apiVersion(gvk), + Kind: gvk.Kind, + Name: name, + } + } +} + +func WithSubscriptionSubscriberRef(gvk metav1.GroupVersionKind, name, namespace string) SubscriptionOption { + return func(s *v1.Subscription) { + s.Spec.Subscriber = &duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: apiVersion(gvk), + Kind: gvk.Kind, + Name: name, + Namespace: namespace, + }, + } + } +} + +func WithSubscriptionDeliveryRef(gvk metav1.GroupVersionKind, name, namespace string) SubscriptionOption { + return func(s *v1.Subscription) { + s.Spec.Delivery = &eventingduckv1.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: apiVersion(gvk), + Kind: gvk.Kind, + Name: name, + Namespace: namespace, + }, + }, + } + } +} + +func WithSubscriptionPhysicalSubscriptionSubscriber(uri *apis.URL) SubscriptionOption { + return func(s *v1.Subscription) { + if uri == nil { + panic(errors.New("nil URI")) + } + s.Status.PhysicalSubscription.SubscriberURI = uri + } +} + +func WithSubscriptionPhysicalSubscriptionReply(uri *apis.URL) SubscriptionOption { + return func(s *v1.Subscription) { + if uri == nil { + panic(errors.New("nil URI")) + } + s.Status.PhysicalSubscription.ReplyURI = uri + } +} + +func WithSubscriptionDeadLetterSinkURI(uri *apis.URL) SubscriptionOption { + return func(s *v1.Subscription) { + if uri == nil { + panic(errors.New("nil URI")) + } + s.Status.PhysicalSubscription.DeadLetterSinkURI = uri + } +} + +func WithSubscriptionFinalizers(finalizers ...string) SubscriptionOption { + return func(s *v1.Subscription) { + s.Finalizers = finalizers + } +} + +func MarkSubscriptionReady(s *v1.Subscription) { + s.Status.MarkChannelReady() + s.Status.MarkReferencesResolved() + s.Status.MarkAddedToChannel() +} + +func MarkAddedToChannel(s *v1.Subscription) { + s.Status.MarkAddedToChannel() +} + +func MarkNotAddedToChannel(reason, msg string) SubscriptionOption { + return func(s *v1.Subscription) { + s.Status.MarkNotAddedToChannel(reason, msg) + } +} + +func MarkReferencesResolved(s *v1.Subscription) { + s.Status.MarkReferencesResolved() +} + +func WithSubscriptionReferencesNotResolved(reason, msg string) SubscriptionOption { + return func(s *v1.Subscription) { + s.Status.MarkReferencesNotResolved(reason, msg) + } +} + +func WithSubscriptionReferencesResolvedUnknown(reason, msg string) SubscriptionOption { + return func(s *v1.Subscription) { + s.Status.MarkReferencesResolvedUnknown(reason, msg) + } +} + +func WithSubscriptionReply(gvk metav1.GroupVersionKind, name, namespace string) SubscriptionOption { + return func(s *v1.Subscription) { + s.Spec.Reply = &duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: apiVersion(gvk), + Kind: gvk.Kind, + Name: name, + Namespace: namespace, + }, + } + } +}