From 1ee5a4acea1b0209b9975eb1327ef0b3fb3e47c1 Mon Sep 17 00:00:00 2001 From: Nicolas Lopez Date: Wed, 5 Aug 2020 14:50:12 -0400 Subject: [PATCH] Migrate parallel reconciler to use messaging.* v1 resources --- pkg/reconciler/parallel/controller.go | 12 +- pkg/reconciler/parallel/controller_test.go | 6 +- pkg/reconciler/parallel/parallel.go | 40 +- pkg/reconciler/parallel/parallel_test.go | 351 +++++++++--------- pkg/reconciler/parallel/resources/channel.go | 8 +- .../parallel/resources/subscription.go | 20 +- 6 files changed, 217 insertions(+), 220 deletions(-) diff --git a/pkg/reconciler/parallel/controller.go b/pkg/reconciler/parallel/controller.go index 58d44db29dc..1dff3b11ef5 100644 --- a/pkg/reconciler/parallel/controller.go +++ b/pkg/reconciler/parallel/controller.go @@ -20,7 +20,7 @@ import ( "context" "k8s.io/client-go/tools/cache" - "knative.dev/eventing/pkg/apis/flows/v1beta1" + v1 "knative.dev/eventing/pkg/apis/flows/v1" "knative.dev/eventing/pkg/duck" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" @@ -28,10 +28,10 @@ import ( "knative.dev/pkg/logging" eventingclient "knative.dev/eventing/pkg/client/injection/client" - "knative.dev/eventing/pkg/client/injection/ducks/duck/v1beta1/channelable" - "knative.dev/eventing/pkg/client/injection/informers/flows/v1beta1/parallel" - "knative.dev/eventing/pkg/client/injection/informers/messaging/v1beta1/subscription" - parallelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/flows/v1beta1/parallel" + "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable" + "knative.dev/eventing/pkg/client/injection/informers/flows/v1/parallel" + "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription" + parallelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/flows/v1/parallel" ) // NewController initializes the controller and is called by the generated code @@ -60,7 +60,7 @@ func NewController( // Register handler for Subscriptions that are owned by Parallel, so that // we get notified if they change. subscriptionInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: controller.FilterControllerGK(v1beta1.Kind("Parallel")), + FilterFunc: controller.FilterControllerGK(v1.Kind("Parallel")), Handler: controller.HandleAll(impl.EnqueueControllerOf), }) diff --git a/pkg/reconciler/parallel/controller_test.go b/pkg/reconciler/parallel/controller_test.go index 5e507355571..acf3b8c5258 100644 --- a/pkg/reconciler/parallel/controller_test.go +++ b/pkg/reconciler/parallel/controller_test.go @@ -23,9 +23,9 @@ import ( . "knative.dev/pkg/reconciler/testing" // Fake injection informers - _ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1beta1/channelable/fake" - _ "knative.dev/eventing/pkg/client/injection/informers/flows/v1beta1/parallel/fake" - _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1beta1/subscription/fake" + _ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable/fake" + _ "knative.dev/eventing/pkg/client/injection/informers/flows/v1/parallel/fake" + _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake" ) func TestNew(t *testing.T) { diff --git a/pkg/reconciler/parallel/parallel.go b/pkg/reconciler/parallel/parallel.go index adef032fe93..81f9d8ebb51 100644 --- a/pkg/reconciler/parallel/parallel.go +++ b/pkg/reconciler/parallel/parallel.go @@ -30,14 +30,14 @@ import ( "k8s.io/client-go/dynamic" duckapis "knative.dev/pkg/apis/duck" - duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" - "knative.dev/eventing/pkg/apis/flows/v1beta1" - messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1" - messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1beta1" + duckv1 "knative.dev/eventing/pkg/apis/duck/v1" + v1 "knative.dev/eventing/pkg/apis/flows/v1" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1" clientset "knative.dev/eventing/pkg/client/clientset/versioned" - parallelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/flows/v1beta1/parallel" - listers "knative.dev/eventing/pkg/client/listers/flows/v1beta1" + parallelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/flows/v1/parallel" + listers "knative.dev/eventing/pkg/client/listers/flows/v1" "knative.dev/eventing/pkg/duck" "knative.dev/eventing/pkg/logging" "knative.dev/eventing/pkg/reconciler/parallel/resources" @@ -60,7 +60,7 @@ type Reconciler struct { // Check that our Reconciler implements parallelreconciler.Interface var _ parallelreconciler.Interface = (*Reconciler)(nil) -func (r *Reconciler) ReconcileKind(ctx context.Context, p *v1beta1.Parallel) pkgreconciler.Event { +func (r *Reconciler) ReconcileKind(ctx context.Context, p *v1.Parallel) pkgreconciler.Event { // Reconciling parallel is pretty straightforward, it does the following things: // 1. Create a channel fronting the whole parallel and one filter channel per branch. // 2. For each of the Branches: @@ -79,8 +79,8 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, p *v1beta1.Parallel) pkg return fmt.Errorf("unable to create dynamic client for: %+v", p.Spec.ChannelTemplate) } - var ingressChannel *duckv1beta1.Channelable - channels := make([]*duckv1beta1.Channelable, 0, len(p.Spec.Branches)) + var ingressChannel *duckv1.Channelable + channels := make([]*duckv1.Channelable, 0, len(p.Spec.Branches)) for i := -1; i < len(p.Spec.Branches); i++ { var channelName string if i == -1 { @@ -111,8 +111,8 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, p *v1beta1.Parallel) pkg } p.Status.PropagateChannelStatuses(ingressChannel, channels) - filterSubs := make([]*messagingv1beta1.Subscription, 0, len(p.Spec.Branches)) - subs := make([]*messagingv1beta1.Subscription, 0, len(p.Spec.Branches)) + filterSubs := make([]*messagingv1.Subscription, 0, len(p.Spec.Branches)) + subs := make([]*messagingv1.Subscription, 0, len(p.Spec.Branches)) for i := 0; i < len(p.Spec.Branches); i++ { filterSub, sub, err := r.reconcileBranch(ctx, i, p) if err != nil { @@ -127,7 +127,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, p *v1beta1.Parallel) pkg return nil } -func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterface dynamic.ResourceInterface, p *v1beta1.Parallel, channelObjRef corev1.ObjectReference) (*duckv1beta1.Channelable, error) { +func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterface dynamic.ResourceInterface, p *v1.Parallel, channelObjRef corev1.ObjectReference) (*duckv1.Channelable, error) { c, err := r.trackAndFetchChannel(ctx, p, channelObjRef) if err != nil { if apierrs.IsNotFound(err) { @@ -144,7 +144,7 @@ func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterf } logging.FromContext(ctx).Debug("Created Channel", zap.Any("channel", newChannel)) // Convert to Channel duck so that we can treat all Channels the same. - channelable := &duckv1beta1.Channelable{} + channelable := &duckv1.Channelable{} err = duckapis.FromUnstructured(created, channelable) if err != nil { logging.FromContext(ctx).Error("Failed to convert to Channelable Object", zap.Any("channel", created), zap.Error(err)) @@ -156,7 +156,7 @@ func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterf return nil, err } logging.FromContext(ctx).Debug("Found Channel", zap.Any("channel", channelObjRef)) - channelable, ok := c.(*duckv1beta1.Channelable) + channelable, ok := c.(*duckv1.Channelable) if !ok { logging.FromContext(ctx).Error("Failed to convert to Channelable Object", zap.Any("channel", channelObjRef), zap.Error(err)) return nil, fmt.Errorf("Failed to convert to Channelable Object: %+v", c) @@ -164,7 +164,7 @@ func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterf return channelable, nil } -func (r *Reconciler) reconcileBranch(ctx context.Context, branchNumber int, p *v1beta1.Parallel) (*messagingv1beta1.Subscription, *messagingv1beta1.Subscription, error) { +func (r *Reconciler) reconcileBranch(ctx context.Context, branchNumber int, p *v1.Parallel) (*messagingv1.Subscription, *messagingv1.Subscription, error) { filterExpected := resources.NewFilterSubscription(branchNumber, p) filterSubName := resources.ParallelFilterSubscriptionName(p.Name, branchNumber) @@ -184,14 +184,14 @@ func (r *Reconciler) reconcileBranch(ctx context.Context, branchNumber int, p *v return filterSub, sub, nil } -func (r *Reconciler) reconcileSubscription(ctx context.Context, branchNumber int, expected *messagingv1beta1.Subscription, subName, ns string) (*messagingv1beta1.Subscription, error) { +func (r *Reconciler) reconcileSubscription(ctx context.Context, branchNumber int, expected *messagingv1.Subscription, subName, ns string) (*messagingv1.Subscription, error) { sub, err := r.subscriptionLister.Subscriptions(ns).Get(subName) // If the resource doesn't exist, we'll create it. if apierrs.IsNotFound(err) { sub = expected logging.FromContext(ctx).Info(fmt.Sprintf("Creating subscription: %+v", sub)) - newSub, err := r.eventingClientSet.MessagingV1beta1().Subscriptions(sub.Namespace).Create(sub) + newSub, err := r.eventingClientSet.MessagingV1().Subscriptions(sub.Namespace).Create(sub) if err != nil { // TODO: Send events here, or elsewhere? //r.Recorder.Eventf(p, corev1.EventTypeWarning, subscriptionCreateFailed, "Create Parallel's subscription failed: %v", err) @@ -206,12 +206,12 @@ func (r *Reconciler) reconcileSubscription(ctx context.Context, branchNumber int } else if !equality.Semantic.DeepDerivative(expected.Spec, sub.Spec) { // Given that spec.channel is immutable, we cannot just update the subscription. We delete // it instead, and re-create it. - err = r.eventingClientSet.MessagingV1beta1().Subscriptions(sub.Namespace).Delete(sub.Name, &metav1.DeleteOptions{}) + err = r.eventingClientSet.MessagingV1().Subscriptions(sub.Namespace).Delete(sub.Name, &metav1.DeleteOptions{}) if err != nil { logging.FromContext(ctx).Info("Cannot delete subscription", zap.Error(err)) return nil, err } - newSub, err := r.eventingClientSet.MessagingV1beta1().Subscriptions(sub.Namespace).Create(expected) + newSub, err := r.eventingClientSet.MessagingV1().Subscriptions(sub.Namespace).Create(expected) if err != nil { logging.FromContext(ctx).Info("Cannot create subscription", zap.Error(err)) return nil, err @@ -221,7 +221,7 @@ func (r *Reconciler) reconcileSubscription(ctx context.Context, branchNumber int return sub, nil } -func (r *Reconciler) trackAndFetchChannel(ctx context.Context, p *v1beta1.Parallel, ref corev1.ObjectReference) (runtime.Object, pkgreconciler.Event) { +func (r *Reconciler) trackAndFetchChannel(ctx context.Context, p *v1.Parallel, ref corev1.ObjectReference) (runtime.Object, pkgreconciler.Event) { // Track the channel using the channelableTracker. // We don't need the explicitly set a channelInformer, as this will dynamically generate one for us. // This code needs to be called before checking the existence of the `channel`, in order to make sure the diff --git a/pkg/reconciler/parallel/parallel_test.go b/pkg/reconciler/parallel/parallel_test.go index 482538ff3ae..e88a734be53 100644 --- a/pkg/reconciler/parallel/parallel_test.go +++ b/pkg/reconciler/parallel/parallel_test.go @@ -24,7 +24,7 @@ import ( fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake" fakedynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake" - "knative.dev/eventing/pkg/client/injection/reconciler/flows/v1beta1/parallel" + "knative.dev/eventing/pkg/client/injection/reconciler/flows/v1/parallel" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,24 +33,22 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" clientgotesting "k8s.io/client-go/testing" - eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" - "knative.dev/eventing/pkg/client/injection/ducks/duck/v1beta1/channelable" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable" "knative.dev/eventing/pkg/duck" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" - duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" "knative.dev/pkg/logging" logtesting "knative.dev/pkg/logging/testing" . "knative.dev/pkg/reconciler/testing" - "knative.dev/eventing/pkg/apis/flows/v1beta1" - messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1" - "knative.dev/eventing/pkg/reconciler/parallel/resources" - rt "knative.dev/eventing/pkg/reconciler/testing/v1beta1" + v1 "knative.dev/eventing/pkg/apis/flows/v1" - . "knative.dev/eventing/pkg/reconciler/testing/v1beta1" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + "knative.dev/eventing/pkg/reconciler/parallel/resources" + . "knative.dev/eventing/pkg/reconciler/testing/v1" ) const ( @@ -70,16 +68,15 @@ var ( func init() { // Add types to scheme - _ = v1beta1.AddToScheme(scheme.Scheme) - _ = duckv1beta1.AddToScheme(scheme.Scheme) + _ = v1.AddToScheme(scheme.Scheme) _ = duckv1.AddToScheme(scheme.Scheme) } func TestAllBranches(t *testing.T) { pKey := testNS + "/" + parallelName - imc := &messagingv1beta1.ChannelTemplateSpec{ + imc := &messagingv1.ChannelTemplateSpec{ TypeMeta: metav1.TypeMeta{ - APIVersion: "messaging.knative.dev/v1beta1", + APIVersion: "messaging.knative.dev/v1", Kind: "InMemoryChannel", }, Spec: &runtime.RawExtension{Raw: []byte("{}")}, @@ -97,7 +94,7 @@ func TestAllBranches(t *testing.T) { }, { // TODO: there is a bug in the controller, it will query for "" // Name: "trigger key not found ", // Objects: []runtime.Object{ - // rt.NewTrigger(triggerName, testNS), + // NewTrigger(triggerName, testNS), // }, // Key: "foo/incomplete", // WantErr: true, @@ -108,44 +105,44 @@ func TestAllBranches(t *testing.T) { Name: "deleting", Key: pKey, Objects: []runtime.Object{ - rt.NewFlowsParallel(parallelName, testNS, - rt.WithInitFlowsParallelConditions, - rt.WithFlowsParallelDeleted)}, + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelDeleted)}, WantErr: false, }, { Name: "single branch, no filter", Key: pKey, Objects: []runtime.Object{ - rt.NewFlowsParallel(parallelName, testNS, - rt.WithInitFlowsParallelConditions, - rt.WithFlowsParallelGeneration(parallelGeneration), - rt.WithFlowsParallelChannelTemplateSpec(imc), - rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelGeneration(parallelGeneration), + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(0)}, }))}, WantErr: false, WantCreates: []runtime.Object{ createChannel(parallelName), createBranchChannel(parallelName, 0), - resources.NewFilterSubscription(0, rt.NewFlowsParallel(parallelName, testNS, rt.WithFlowsParallelChannelTemplateSpec(imc), rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(0)}, }))), - resources.NewSubscription(0, rt.NewFlowsParallel(parallelName, testNS, rt.WithFlowsParallelChannelTemplateSpec(imc), rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(0)}, }))), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: rt.NewFlowsParallel(parallelName, testNS, - rt.WithInitFlowsParallelConditions, - rt.WithFlowsParallelGeneration(parallelGeneration), - rt.WithFlowsParallelStatusObservedGeneration(parallelGeneration), - rt.WithFlowsParallelChannelTemplateSpec(imc), - rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{{Subscriber: createSubscriber(0)}}), - rt.WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), - rt.WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), - rt.WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), - rt.WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), - rt.WithFlowsParallelBranchStatuses([]v1beta1.ParallelBranchStatus{{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelGeneration(parallelGeneration), + WithFlowsParallelStatusObservedGeneration(parallelGeneration), + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{{Subscriber: createSubscriber(0)}}), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), @@ -155,33 +152,33 @@ func TestAllBranches(t *testing.T) { Name: "single branch, with filter", Key: pKey, Objects: []runtime.Object{ - rt.NewFlowsParallel(parallelName, testNS, - rt.WithInitFlowsParallelConditions, - rt.WithFlowsParallelChannelTemplateSpec(imc), - rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ {Filter: createFilter(0), Subscriber: createSubscriber(0)}, }))}, WantErr: false, WantCreates: []runtime.Object{ createChannel(parallelName), createBranchChannel(parallelName, 0), - resources.NewFilterSubscription(0, rt.NewFlowsParallel(parallelName, testNS, rt.WithFlowsParallelChannelTemplateSpec(imc), rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ {Filter: createFilter(0), Subscriber: createSubscriber(0)}, }))), - resources.NewSubscription(0, rt.NewFlowsParallel(parallelName, testNS, rt.WithFlowsParallelChannelTemplateSpec(imc), rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ {Filter: createFilter(0), Subscriber: createSubscriber(0)}, }))), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: rt.NewFlowsParallel(parallelName, testNS, - rt.WithInitFlowsParallelConditions, - rt.WithFlowsParallelChannelTemplateSpec(imc), - rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{{Filter: createFilter(0), Subscriber: createSubscriber(0)}}), - rt.WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), - rt.WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), - rt.WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), - rt.WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), - rt.WithFlowsParallelBranchStatuses([]v1beta1.ParallelBranchStatus{{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{{Filter: createFilter(0), Subscriber: createSubscriber(0)}}), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), @@ -191,33 +188,33 @@ func TestAllBranches(t *testing.T) { Name: "single branch, with filter, with delivery", Key: pKey, Objects: []runtime.Object{ - rt.NewFlowsParallel(parallelName, testNS, - rt.WithInitFlowsParallelConditions, - rt.WithFlowsParallelChannelTemplateSpec(imc), - rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ {Filter: createFilter(0), Subscriber: createSubscriber(0), Delivery: createDelivery(subscriberGVK, "dlc", testNS)}, }))}, WantErr: false, WantCreates: []runtime.Object{ createChannel(parallelName), createBranchChannel(parallelName, 0), - resources.NewFilterSubscription(0, rt.NewFlowsParallel(parallelName, testNS, rt.WithFlowsParallelChannelTemplateSpec(imc), rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ {Filter: createFilter(0), Subscriber: createSubscriber(0)}, }))), - resources.NewSubscription(0, rt.NewFlowsParallel(parallelName, testNS, rt.WithFlowsParallelChannelTemplateSpec(imc), rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ {Filter: createFilter(0), Subscriber: createSubscriber(0), Delivery: createDelivery(subscriberGVK, "dlc", testNS)}, }))), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: rt.NewFlowsParallel(parallelName, testNS, - rt.WithInitFlowsParallelConditions, - rt.WithFlowsParallelChannelTemplateSpec(imc), - rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{{Filter: createFilter(0), Subscriber: createSubscriber(0), Delivery: createDelivery(subscriberGVK, "dlc", testNS)}}), - rt.WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), - rt.WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), - rt.WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), - rt.WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), - rt.WithFlowsParallelBranchStatuses([]v1beta1.ParallelBranchStatus{{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{{Filter: createFilter(0), Subscriber: createSubscriber(0), Delivery: createDelivery(subscriberGVK, "dlc", testNS)}}), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), @@ -227,37 +224,37 @@ func TestAllBranches(t *testing.T) { Name: "single branch, no filter, with global reply", Key: pKey, Objects: []runtime.Object{ - rt.NewFlowsParallel(parallelName, testNS, - rt.WithInitFlowsParallelConditions, - rt.WithFlowsParallelChannelTemplateSpec(imc), - rt.WithFlowsParallelReply(createReplyChannel(replyChannelName)), - rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelReply(createReplyChannel(replyChannelName)), + WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(0)}, }))}, WantErr: false, WantCreates: []runtime.Object{ createChannel(parallelName), createBranchChannel(parallelName, 0), - resources.NewFilterSubscription(0, rt.NewFlowsParallel(parallelName, testNS, rt.WithFlowsParallelChannelTemplateSpec(imc), rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(0)}, }))), - resources.NewSubscription(0, rt.NewFlowsParallel(parallelName, testNS, rt.WithFlowsParallelChannelTemplateSpec(imc), rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(0)}, - }), rt.WithFlowsParallelReply(createReplyChannel(replyChannelName)))), + }), WithFlowsParallelReply(createReplyChannel(replyChannelName)))), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: rt.NewFlowsParallel(parallelName, testNS, - rt.WithInitFlowsParallelConditions, - rt.WithFlowsParallelChannelTemplateSpec(imc), - rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(0)}, }), - rt.WithFlowsParallelReply(createReplyChannel(replyChannelName)), - rt.WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), - rt.WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), - rt.WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), - rt.WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), - rt.WithFlowsParallelBranchStatuses([]v1beta1.ParallelBranchStatus{{ + WithFlowsParallelReply(createReplyChannel(replyChannelName)), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), @@ -267,37 +264,37 @@ func TestAllBranches(t *testing.T) { Name: "single branch with reply, no filter, with case and global reply", Key: pKey, Objects: []runtime.Object{ - rt.NewFlowsParallel(parallelName, testNS, - rt.WithInitFlowsParallelConditions, - rt.WithFlowsParallelChannelTemplateSpec(imc), - rt.WithFlowsParallelReply(createReplyChannel(replyChannelName)), - rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelReply(createReplyChannel(replyChannelName)), + WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(0), Reply: createBranchReplyChannel(0)}, }))}, WantErr: false, WantCreates: []runtime.Object{ createChannel(parallelName), createBranchChannel(parallelName, 0), - resources.NewFilterSubscription(0, rt.NewFlowsParallel(parallelName, testNS, rt.WithFlowsParallelChannelTemplateSpec(imc), rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(0)}, }))), - resources.NewSubscription(0, rt.NewFlowsParallel(parallelName, testNS, rt.WithFlowsParallelChannelTemplateSpec(imc), rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(0), Reply: createBranchReplyChannel(0)}, - }), rt.WithFlowsParallelReply(createReplyChannel(replyChannelName)))), + }), WithFlowsParallelReply(createReplyChannel(replyChannelName)))), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: rt.NewFlowsParallel(parallelName, testNS, - rt.WithInitFlowsParallelConditions, - rt.WithFlowsParallelChannelTemplateSpec(imc), - rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(0), Reply: createBranchReplyChannel(0)}, }), - rt.WithFlowsParallelReply(createReplyChannel(replyChannelName)), - rt.WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), - rt.WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), - rt.WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), - rt.WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), - rt.WithFlowsParallelBranchStatuses([]v1beta1.ParallelBranchStatus{{ + WithFlowsParallelReply(createReplyChannel(replyChannelName)), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), @@ -307,10 +304,10 @@ func TestAllBranches(t *testing.T) { Name: "two branches, no filters", Key: pKey, Objects: []runtime.Object{ - rt.NewFlowsParallel(parallelName, testNS, - rt.WithInitFlowsParallelConditions, - rt.WithFlowsParallelChannelTemplateSpec(imc), - rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(0)}, {Subscriber: createSubscriber(1)}, }))}, @@ -319,35 +316,35 @@ func TestAllBranches(t *testing.T) { createChannel(parallelName), createBranchChannel(parallelName, 0), createBranchChannel(parallelName, 1), - resources.NewFilterSubscription(0, rt.NewFlowsParallel(parallelName, testNS, rt.WithFlowsParallelChannelTemplateSpec(imc), rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(0)}, {Subscriber: createSubscriber(1)}, }))), - resources.NewSubscription(0, rt.NewFlowsParallel(parallelName, testNS, rt.WithFlowsParallelChannelTemplateSpec(imc), rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(0)}, {Subscriber: createSubscriber(1)}, }))), - resources.NewFilterSubscription(1, rt.NewFlowsParallel(parallelName, testNS, rt.WithFlowsParallelChannelTemplateSpec(imc), rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + resources.NewFilterSubscription(1, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(0)}, {Subscriber: createSubscriber(1)}, }))), - resources.NewSubscription(1, rt.NewFlowsParallel(parallelName, testNS, rt.WithFlowsParallelChannelTemplateSpec(imc), rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + resources.NewSubscription(1, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(0)}, {Subscriber: createSubscriber(1)}, })))}, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: rt.NewFlowsParallel(parallelName, testNS, - rt.WithInitFlowsParallelConditions, - rt.WithFlowsParallelChannelTemplateSpec(imc), - rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(0)}, {Subscriber: createSubscriber(1)}, }), - rt.WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), - rt.WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), - rt.WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), - rt.WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), - rt.WithFlowsParallelBranchStatuses([]v1beta1.ParallelBranchStatus{ + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{ { FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), @@ -363,11 +360,11 @@ func TestAllBranches(t *testing.T) { Name: "two branches with global reply", Key: pKey, Objects: []runtime.Object{ - rt.NewFlowsParallel(parallelName, testNS, - rt.WithInitFlowsParallelConditions, - rt.WithFlowsParallelChannelTemplateSpec(imc), - rt.WithFlowsParallelReply(createReplyChannel(replyChannelName)), - rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelReply(createReplyChannel(replyChannelName)), + WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(0)}, {Subscriber: createSubscriber(1)}, }))}, @@ -376,37 +373,37 @@ func TestAllBranches(t *testing.T) { createChannel(parallelName), createBranchChannel(parallelName, 0), createBranchChannel(parallelName, 1), - resources.NewFilterSubscription(0, rt.NewFlowsParallel(parallelName, testNS, rt.WithFlowsParallelChannelTemplateSpec(imc), rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(0)}, {Subscriber: createSubscriber(1)}, }))), - resources.NewSubscription(0, rt.NewFlowsParallel(parallelName, testNS, rt.WithFlowsParallelChannelTemplateSpec(imc), rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(0)}, {Subscriber: createSubscriber(1)}, - }), rt.WithFlowsParallelReply(createReplyChannel(replyChannelName)))), - resources.NewFilterSubscription(1, rt.NewFlowsParallel(parallelName, testNS, rt.WithFlowsParallelChannelTemplateSpec(imc), rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + }), WithFlowsParallelReply(createReplyChannel(replyChannelName)))), + resources.NewFilterSubscription(1, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(0)}, {Subscriber: createSubscriber(1)}, }))), - resources.NewSubscription(1, rt.NewFlowsParallel(parallelName, testNS, rt.WithFlowsParallelChannelTemplateSpec(imc), rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + resources.NewSubscription(1, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(0)}, {Subscriber: createSubscriber(1)}, - }), rt.WithFlowsParallelReply(createReplyChannel(replyChannelName)))), + }), WithFlowsParallelReply(createReplyChannel(replyChannelName)))), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: rt.NewFlowsParallel(parallelName, testNS, - rt.WithInitFlowsParallelConditions, - rt.WithFlowsParallelReply(createReplyChannel(replyChannelName)), - rt.WithFlowsParallelChannelTemplateSpec(imc), - rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelReply(createReplyChannel(replyChannelName)), + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(0)}, {Subscriber: createSubscriber(1)}, }), - rt.WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), - rt.WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), - rt.WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), - rt.WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), - rt.WithFlowsParallelBranchStatuses([]v1beta1.ParallelBranchStatus{ + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{ { FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), @@ -423,15 +420,15 @@ func TestAllBranches(t *testing.T) { Name: "single branch, no filter, update subscription", Key: pKey, Objects: []runtime.Object{ - rt.NewFlowsParallel(parallelName, testNS, - rt.WithInitFlowsParallelConditions, - rt.WithFlowsParallelChannelTemplateSpec(imc), - rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(1)}, })), - resources.NewSubscription(0, rt.NewFlowsParallel(parallelName, testNS, - rt.WithFlowsParallelChannelTemplateSpec(imc), - rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(0)}, })))}, WantErr: false, @@ -441,23 +438,23 @@ func TestAllBranches(t *testing.T) { WantCreates: []runtime.Object{ createChannel(parallelName), createBranchChannel(parallelName, 0), - resources.NewFilterSubscription(0, rt.NewFlowsParallel(parallelName, testNS, rt.WithFlowsParallelChannelTemplateSpec(imc), rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(1)}, }))), - resources.NewSubscription(0, rt.NewFlowsParallel(parallelName, testNS, rt.WithFlowsParallelChannelTemplateSpec(imc), rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{ + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ {Subscriber: createSubscriber(1)}, }))), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: rt.NewFlowsParallel(parallelName, testNS, - rt.WithInitFlowsParallelConditions, - rt.WithFlowsParallelChannelTemplateSpec(imc), - rt.WithFlowsParallelBranches([]v1beta1.ParallelBranch{{Subscriber: createSubscriber(1)}}), - rt.WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), - rt.WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), - rt.WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), - rt.WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), - rt.WithFlowsParallelBranchStatuses([]v1beta1.ParallelBranchStatus{{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{{Subscriber: createSubscriber(1)}}), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), @@ -485,7 +482,7 @@ func TestAllBranches(t *testing.T) { func createBranchReplyChannel(caseNumber int) *duckv1.Destination { return &duckv1.Destination{ Ref: &duckv1.KReference{ - APIVersion: "messaging.knative.dev/v1beta1", + APIVersion: "messaging.knative.dev/v1", Kind: "InMemoryChannel", Name: fmt.Sprintf("%s-case-%d", replyChannelName, caseNumber), Namespace: testNS, @@ -496,7 +493,7 @@ func createBranchReplyChannel(caseNumber int) *duckv1.Destination { func createReplyChannel(channelName string) *duckv1.Destination { return &duckv1.Destination{ Ref: &duckv1.KReference{ - APIVersion: "messaging.knative.dev/v1beta1", + APIVersion: "messaging.knative.dev/v1", Kind: "InMemoryChannel", Name: channelName, Namespace: testNS, @@ -507,7 +504,7 @@ func createReplyChannel(channelName string) *duckv1.Destination { func createChannel(parallelName string) *unstructured.Unstructured { return &unstructured.Unstructured{ Object: map[string]interface{}{ - "apiVersion": "messaging.knative.dev/v1beta1", + "apiVersion": "messaging.knative.dev/v1", "kind": "InMemoryChannel", "metadata": map[string]interface{}{ "creationTimestamp": nil, @@ -515,7 +512,7 @@ func createChannel(parallelName string) *unstructured.Unstructured { "name": resources.ParallelChannelName(parallelName), "ownerReferences": []interface{}{ map[string]interface{}{ - "apiVersion": "flows.knative.dev/v1beta1", + "apiVersion": "flows.knative.dev/v1", "blockOwnerDeletion": true, "controller": true, "kind": "Parallel", @@ -532,7 +529,7 @@ func createChannel(parallelName string) *unstructured.Unstructured { func createBranchChannel(parallelName string, caseNumber int) *unstructured.Unstructured { return &unstructured.Unstructured{ Object: map[string]interface{}{ - "apiVersion": "messaging.knative.dev/v1beta1", + "apiVersion": "messaging.knative.dev/v1", "kind": "InMemoryChannel", "metadata": map[string]interface{}{ "creationTimestamp": nil, @@ -540,7 +537,7 @@ func createBranchChannel(parallelName string, caseNumber int) *unstructured.Unst "name": resources.ParallelBranchChannelName(parallelName, caseNumber), "ownerReferences": []interface{}{ map[string]interface{}{ - "apiVersion": "flows.knative.dev/v1beta1", + "apiVersion": "flows.knative.dev/v1", "blockOwnerDeletion": true, "controller": true, "kind": "Parallel", @@ -554,10 +551,10 @@ func createBranchChannel(parallelName string, caseNumber int) *unstructured.Unst } } -func createParallelBranchChannelStatus(parallelName string, caseNumber int, status corev1.ConditionStatus) v1beta1.ParallelChannelStatus { - return v1beta1.ParallelChannelStatus{ +func createParallelBranchChannelStatus(parallelName string, caseNumber int, status corev1.ConditionStatus) v1.ParallelChannelStatus { + return v1.ParallelChannelStatus{ Channel: corev1.ObjectReference{ - APIVersion: "messaging.knative.dev/v1beta1", + APIVersion: "messaging.knative.dev/v1", Kind: "InMemoryChannel", Name: resources.ParallelBranchChannelName(parallelName, caseNumber), Namespace: testNS, @@ -571,10 +568,10 @@ func createParallelBranchChannelStatus(parallelName string, caseNumber int, stat } } -func createParallelChannelStatus(parallelName string, status corev1.ConditionStatus) v1beta1.ParallelChannelStatus { - return v1beta1.ParallelChannelStatus{ +func createParallelChannelStatus(parallelName string, status corev1.ConditionStatus) v1.ParallelChannelStatus { + return v1.ParallelChannelStatus{ Channel: corev1.ObjectReference{ - APIVersion: "messaging.knative.dev/v1beta1", + APIVersion: "messaging.knative.dev/v1", Kind: "InMemoryChannel", Name: resources.ParallelChannelName(parallelName), Namespace: testNS, @@ -588,10 +585,10 @@ func createParallelChannelStatus(parallelName string, status corev1.ConditionSta } } -func createParallelFilterSubscriptionStatus(parallelName string, caseNumber int, status corev1.ConditionStatus) v1beta1.ParallelSubscriptionStatus { - return v1beta1.ParallelSubscriptionStatus{ +func createParallelFilterSubscriptionStatus(parallelName string, caseNumber int, status corev1.ConditionStatus) v1.ParallelSubscriptionStatus { + return v1.ParallelSubscriptionStatus{ Subscription: corev1.ObjectReference{ - APIVersion: "messaging.knative.dev/v1beta1", + APIVersion: "messaging.knative.dev/v1", Kind: "Subscription", Name: resources.ParallelFilterSubscriptionName(parallelName, caseNumber), Namespace: testNS, @@ -599,10 +596,10 @@ func createParallelFilterSubscriptionStatus(parallelName string, caseNumber int, } } -func createParallelSubscriptionStatus(parallelName string, caseNumber int, status corev1.ConditionStatus) v1beta1.ParallelSubscriptionStatus { - return v1beta1.ParallelSubscriptionStatus{ +func createParallelSubscriptionStatus(parallelName string, caseNumber int, status corev1.ConditionStatus) v1.ParallelSubscriptionStatus { + return v1.ParallelSubscriptionStatus{ Subscription: corev1.ObjectReference{ - APIVersion: "messaging.knative.dev/v1beta1", + APIVersion: "messaging.knative.dev/v1", Kind: "Subscription", Name: resources.ParallelSubscriptionName(parallelName, caseNumber), Namespace: testNS, @@ -632,8 +629,8 @@ func apiVersion(gvk metav1.GroupVersionKind) string { return groupVersion } -func createDelivery(gvk metav1.GroupVersionKind, name, namespace string) *eventingduckv1beta1.DeliverySpec { - return &eventingduckv1beta1.DeliverySpec{ +func createDelivery(gvk metav1.GroupVersionKind, name, namespace string) *eventingduckv1.DeliverySpec { + return &eventingduckv1.DeliverySpec{ DeadLetterSink: &duckv1.Destination{ Ref: &duckv1.KReference{ APIVersion: apiVersion(gvk), diff --git a/pkg/reconciler/parallel/resources/channel.go b/pkg/reconciler/parallel/resources/channel.go index 49d019af790..2ee567486a5 100644 --- a/pkg/reconciler/parallel/resources/channel.go +++ b/pkg/reconciler/parallel/resources/channel.go @@ -24,8 +24,8 @@ import ( "knative.dev/pkg/kmeta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "knative.dev/eventing/pkg/apis/flows/v1beta1" - messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1" + v1 "knative.dev/eventing/pkg/apis/flows/v1" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" ) // ParallelChannelName creates a name for the Channel fronting parallel. @@ -40,9 +40,9 @@ func ParallelBranchChannelName(parallelName string, branchNumber int) string { // NewChannel returns an unstructured.Unstructured based on the ChannelTemplateSpec // for a given parallel. -func NewChannel(name string, p *v1beta1.Parallel) (*unstructured.Unstructured, error) { +func NewChannel(name string, p *v1.Parallel) (*unstructured.Unstructured, error) { // Set the name of the resource we're creating as well as the namespace, etc. - template := messagingv1beta1.ChannelTemplateSpecInternal{ + template := messagingv1.ChannelTemplateSpecInternal{ TypeMeta: metav1.TypeMeta{ Kind: p.Spec.ChannelTemplate.Kind, APIVersion: p.Spec.ChannelTemplate.APIVersion, diff --git a/pkg/reconciler/parallel/resources/subscription.go b/pkg/reconciler/parallel/resources/subscription.go index 6ef6e74c5b8..5caf0abf505 100644 --- a/pkg/reconciler/parallel/resources/subscription.go +++ b/pkg/reconciler/parallel/resources/subscription.go @@ -24,8 +24,8 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "knative.dev/eventing/pkg/apis/flows/v1beta1" - messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1" + v1 "knative.dev/eventing/pkg/apis/flows/v1" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" ) func ParallelFilterSubscriptionName(parallelName string, branchNumber int) string { @@ -36,11 +36,11 @@ func ParallelSubscriptionName(parallelName string, branchNumber int) string { return fmt.Sprintf("%s-kn-parallel-%d", parallelName, branchNumber) } -func NewFilterSubscription(branchNumber int, p *v1beta1.Parallel) *messagingv1beta1.Subscription { - r := &messagingv1beta1.Subscription{ +func NewFilterSubscription(branchNumber int, p *v1.Parallel) *messagingv1.Subscription { + r := &messagingv1.Subscription{ TypeMeta: metav1.TypeMeta{ Kind: "Subscription", - APIVersion: "messaging.knative.dev/v1beta1", + APIVersion: "messaging.knative.dev/v1", }, ObjectMeta: metav1.ObjectMeta{ Namespace: p.Namespace, @@ -50,7 +50,7 @@ func NewFilterSubscription(branchNumber int, p *v1beta1.Parallel) *messagingv1be *kmeta.NewControllerRef(p), }, }, - Spec: messagingv1beta1.SubscriptionSpec{ + Spec: messagingv1.SubscriptionSpec{ Channel: corev1.ObjectReference{ APIVersion: p.Spec.ChannelTemplate.APIVersion, Kind: p.Spec.ChannelTemplate.Kind, @@ -75,11 +75,11 @@ func NewFilterSubscription(branchNumber int, p *v1beta1.Parallel) *messagingv1be return r } -func NewSubscription(branchNumber int, p *v1beta1.Parallel) *messagingv1beta1.Subscription { - r := &messagingv1beta1.Subscription{ +func NewSubscription(branchNumber int, p *v1.Parallel) *messagingv1.Subscription { + r := &messagingv1.Subscription{ TypeMeta: metav1.TypeMeta{ Kind: "Subscription", - APIVersion: "messaging.knative.dev/v1beta1", + APIVersion: "messaging.knative.dev/v1", }, ObjectMeta: metav1.ObjectMeta{ Namespace: p.Namespace, @@ -89,7 +89,7 @@ func NewSubscription(branchNumber int, p *v1beta1.Parallel) *messagingv1beta1.Su *kmeta.NewControllerRef(p), }, }, - Spec: messagingv1beta1.SubscriptionSpec{ + Spec: messagingv1.SubscriptionSpec{ Channel: corev1.ObjectReference{ APIVersion: p.Spec.ChannelTemplate.APIVersion, Kind: p.Spec.ChannelTemplate.Kind,