Skip to content

Commit

Permalink
Migrate Subscription reconciler to use messaging.* v1 resources (knat…
Browse files Browse the repository at this point in the history
…ive#3930)

* migrate subscription to v1 messaging

* stash changes

* review comments
  • Loading branch information
Nicolas Lopez authored and matzew committed Aug 27, 2020
1 parent af07231 commit 9326a01
Show file tree
Hide file tree
Showing 6 changed files with 440 additions and 425 deletions.
12 changes: 6 additions & 6 deletions pkg/reconciler/subscription/controller.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2019 The Knative Authors
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -25,11 +25,11 @@ import (
"knative.dev/pkg/resolver"
"knative.dev/pkg/tracker"

messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelablecombined"
"knative.dev/eventing/pkg/client/injection/informers/messaging/v1beta1/channel"
"knative.dev/eventing/pkg/client/injection/informers/messaging/v1beta1/subscription"
subscriptionreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1beta1/subscription"
"knative.dev/eventing/pkg/client/injection/informers/messaging/v1/channel"
"knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription"
subscriptionreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/subscription"
"knative.dev/eventing/pkg/duck"
"knative.dev/pkg/injection/clients/dynamicclient"
)
Expand Down Expand Up @@ -67,7 +67,7 @@ func NewController(
// populated.
controller.EnsureTypeMeta(
r.tracker.OnChanged,
messagingv1beta1.SchemeGroupVersion.WithKind("Channel"),
messagingv1.SchemeGroupVersion.WithKind("Channel"),
),
))

Expand Down
6 changes: 3 additions & 3 deletions pkg/reconciler/subscription/controller_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2019 The Knative Authors
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -25,8 +25,8 @@ import (
// Fake injection informers
_ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelable/fake"
_ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelablecombined/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1beta1/channel/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1beta1/subscription/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/channel/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake"
_ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake"
)

Expand Down
84 changes: 42 additions & 42 deletions pkg/reconciler/subscription/subscription.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2018 The Knative Authors
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,13 +35,13 @@ import (
"knative.dev/pkg/resolver"
"knative.dev/pkg/tracker"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
"knative.dev/eventing/pkg/apis/messaging"
v1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/apis/messaging/v1beta1"
subscriptionreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1beta1/subscription"
listers "knative.dev/eventing/pkg/client/listers/messaging/v1beta1"
subscriptionreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/subscription"
listers "knative.dev/eventing/pkg/client/listers/messaging/v1"
eventingduck "knative.dev/eventing/pkg/duck"
"knative.dev/eventing/pkg/logging"
)
Expand All @@ -58,8 +58,7 @@ const (
)

var (
v1beta1ChannelGVK = v1beta1.SchemeGroupVersion.WithKind("Channel")
v1ChannelGVK = v1.SchemeGroupVersion.WithKind("Channel")
v1ChannelGVK = v1.SchemeGroupVersion.WithKind("Channel")
)

func newChannelWarnEvent(messageFmt string, args ...interface{}) pkgreconciler.Event {
Expand All @@ -85,7 +84,7 @@ var _ subscriptionreconciler.Interface = (*Reconciler)(nil)
var _ subscriptionreconciler.Finalizer = (*Reconciler)(nil)

// ReconcileKind implements Interface.ReconcileKind.
func (r *Reconciler) ReconcileKind(ctx context.Context, subscription *v1beta1.Subscription) pkgreconciler.Event {
func (r *Reconciler) ReconcileKind(ctx context.Context, subscription *v1.Subscription) pkgreconciler.Event {
// Find the channel for this subscription.
channel, err := r.getChannel(ctx, subscription)
if err != nil {
Expand Down Expand Up @@ -116,7 +115,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, subscription *v1beta1.Su
return nil
}

func (r *Reconciler) FinalizeKind(ctx context.Context, subscription *v1beta1.Subscription) pkgreconciler.Event {
func (r *Reconciler) FinalizeKind(ctx context.Context, subscription *v1.Subscription) pkgreconciler.Event {
channel, err := r.getChannel(ctx, subscription)
if err != nil {
// If the channel was deleted (i.e., error == notFound), just return nil so that
Expand All @@ -133,8 +132,8 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, subscription *v1beta1.Sub
return nil
}

func (r Reconciler) checkChannelStatusForSubscription(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1beta1.Subscription) pkgreconciler.Event {
ss, err := r.getSubStatus(sub, channel)
func (r Reconciler) checkChannelStatusForSubscription(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1.Subscription) pkgreconciler.Event {
ss, err := r.getSubStatus(ctx, sub, channel)
if err != nil {
logging.FromContext(ctx).Warn("Failed to get subscription status.", zap.Error(err))
sub.Status.MarkChannelUnknown(subscriptionNotMarkedReadyByChannel, "Failed to get subscription status: %s", err)
Expand All @@ -153,7 +152,7 @@ func (r Reconciler) checkChannelStatusForSubscription(ctx context.Context, chann
return nil
}

func (r Reconciler) syncChannel(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1beta1.Subscription) pkgreconciler.Event {
func (r Reconciler) syncChannel(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1.Subscription) pkgreconciler.Event {
// Ok, now that we have the Channel and at least one of the Call/Result, let's reconcile
// the Channel with this information.
if patched, err := r.syncPhysicalChannel(ctx, sub, channel, false); err != nil {
Expand All @@ -174,7 +173,7 @@ func (r Reconciler) syncChannel(ctx context.Context, channel *eventingduckv1alph
return nil
}

func (r *Reconciler) resolveSubscriptionURIs(ctx context.Context, subscription *v1beta1.Subscription) pkgreconciler.Event {
func (r *Reconciler) resolveSubscriptionURIs(ctx context.Context, subscription *v1.Subscription) pkgreconciler.Event {
// Everything that was supposed to be resolved was, so flip the status bit on that.
subscription.Status.MarkReferencesResolvedUnknown("Resolving", "Subscription resolution interrupted.")

Expand All @@ -195,7 +194,7 @@ func (r *Reconciler) resolveSubscriptionURIs(ctx context.Context, subscription *
return nil
}

func (r *Reconciler) resolveSubscriber(ctx context.Context, subscription *v1beta1.Subscription) pkgreconciler.Event {
func (r *Reconciler) resolveSubscriber(ctx context.Context, subscription *v1.Subscription) pkgreconciler.Event {
// Resolve Subscriber.
subscriber := subscription.Spec.Subscriber.DeepCopy()
if !isNilOrEmptyDestination(subscriber) {
Expand All @@ -222,7 +221,7 @@ func (r *Reconciler) resolveSubscriber(ctx context.Context, subscription *v1beta
return nil
}

func (r *Reconciler) resolveReply(ctx context.Context, subscription *v1beta1.Subscription) pkgreconciler.Event {
func (r *Reconciler) resolveReply(ctx context.Context, subscription *v1.Subscription) pkgreconciler.Event {
// Resolve Reply.
reply := subscription.Spec.Reply.DeepCopy()
if !isNilOrEmptyDestination(reply) {
Expand All @@ -249,7 +248,7 @@ func (r *Reconciler) resolveReply(ctx context.Context, subscription *v1beta1.Sub
return nil
}

func (r *Reconciler) resolveDeadLetterSink(ctx context.Context, subscription *v1beta1.Subscription) pkgreconciler.Event {
func (r *Reconciler) resolveDeadLetterSink(ctx context.Context, subscription *v1.Subscription) pkgreconciler.Event {
// Resolve DeadLetterSink.
delivery := subscription.Spec.Delivery.DeepCopy()
if !isNilOrEmptyDeliveryDeadLetterSink(delivery) {
Expand Down Expand Up @@ -277,47 +276,49 @@ func (r *Reconciler) resolveDeadLetterSink(ctx context.Context, subscription *v1
return nil
}

func (r *Reconciler) getSubStatus(subscription *v1beta1.Subscription, channel *eventingduckv1alpha1.ChannelableCombined) (eventingduckv1beta1.SubscriberStatus, error) {
func (r *Reconciler) getSubStatus(ctx context.Context, subscription *v1.Subscription, channel *eventingduckv1alpha1.ChannelableCombined) (eventingduckv1.SubscriberStatus, error) {
if channel.Annotations != nil {
if channel.Annotations[messaging.SubscribableDuckVersionAnnotation] == "v1beta1" ||
channel.Annotations[messaging.SubscribableDuckVersionAnnotation] == "v1" {
return r.getSubStatusV1Beta1(subscription, channel)
return r.getSubStatusV1(subscription, channel)
}
}
return r.getSubStatusV1Alpha1(subscription, channel)
return r.getSubStatusV1Alpha1(ctx, subscription, channel)
}

func (r *Reconciler) getSubStatusV1Alpha1(subscription *v1beta1.Subscription, channel *eventingduckv1alpha1.ChannelableCombined) (eventingduckv1beta1.SubscriberStatus, error) {
func (r *Reconciler) getSubStatusV1Alpha1(ctx context.Context, subscription *v1.Subscription, channel *eventingduckv1alpha1.ChannelableCombined) (eventingduckv1.SubscriberStatus, error) {
subscribableStatus := channel.Status.GetSubscribableTypeStatus()

if subscribableStatus == nil {
return eventingduckv1beta1.SubscriberStatus{}, fmt.Errorf("channel.Status.SubscribableStatus is nil")
return eventingduckv1.SubscriberStatus{}, fmt.Errorf("channel.Status.SubscribableStatus is nil")
}
for _, sub := range subscribableStatus.Subscribers {
if sub.UID == subscription.GetUID() &&
sub.ObservedGeneration == subscription.GetGeneration() {
return sub, nil
subv1 := eventingduckv1.SubscriberStatus{}
sub.ConvertTo(ctx, &subv1)
return subv1, nil
}
}
return eventingduckv1beta1.SubscriberStatus{}, fmt.Errorf("subscription %q not present in channel %q subscriber's list", subscription.Name, channel.Name)
return eventingduckv1.SubscriberStatus{}, fmt.Errorf("subscription %q not present in channel %q subscriber's list", subscription.Name, channel.Name)
}

func (r *Reconciler) getSubStatusV1Beta1(subscription *v1beta1.Subscription, channel *eventingduckv1alpha1.ChannelableCombined) (eventingduckv1beta1.SubscriberStatus, error) {
func (r *Reconciler) getSubStatusV1(subscription *v1.Subscription, channel *eventingduckv1alpha1.ChannelableCombined) (eventingduckv1.SubscriberStatus, error) {
for _, sub := range channel.Status.Subscribers {
if sub.UID == subscription.GetUID() &&
sub.ObservedGeneration == subscription.GetGeneration() {
return eventingduckv1beta1.SubscriberStatus{
return eventingduckv1.SubscriberStatus{
UID: sub.UID,
ObservedGeneration: sub.ObservedGeneration,
Ready: sub.Ready,
Message: sub.Message,
}, nil
}
}
return eventingduckv1beta1.SubscriberStatus{}, fmt.Errorf("subscription %q not present in channel %q subscriber's list", subscription.Name, channel.Name)
return eventingduckv1.SubscriberStatus{}, fmt.Errorf("subscription %q not present in channel %q subscriber's list", subscription.Name, channel.Name)
}

func (r *Reconciler) trackAndFetchChannel(ctx context.Context, sub *v1beta1.Subscription, ref corev1.ObjectReference) (runtime.Object, pkgreconciler.Event) {
func (r *Reconciler) trackAndFetchChannel(ctx context.Context, sub *v1.Subscription, ref corev1.ObjectReference) (runtime.Object, pkgreconciler.Event) {
// Track the channel using the channelableTracker.
// We don't need the explicitly set a channelInformer, as this will dynamically generate one for us.
// This code needs to be called before checking the existence of the `channel`, in order to make sure the
Expand All @@ -342,7 +343,7 @@ func (r *Reconciler) trackAndFetchChannel(ctx context.Context, sub *v1beta1.Subs
// and verifies it's a channelable (so that we can operate on it via patches).
// If the Channel is a channels.messaging type (hence, it's only a factory for
// underlying channels), fetch and validate the "backing" channel.
func (r *Reconciler) getChannel(ctx context.Context, sub *v1beta1.Subscription) (*eventingduckv1alpha1.ChannelableCombined, pkgreconciler.Event) {
func (r *Reconciler) getChannel(ctx context.Context, sub *v1.Subscription) (*eventingduckv1alpha1.ChannelableCombined, pkgreconciler.Event) {
logging.FromContext(ctx).Info("Getting channel", zap.Any("channel", sub.Spec.Channel))

// 1. Track the channel pointed by subscription.
Expand All @@ -358,8 +359,7 @@ func (r *Reconciler) getChannel(ctx context.Context, sub *v1beta1.Subscription)
// Test to see if the channel is Channel.messaging because it is going
// to have a "backing" channel that is what we need to actually operate on
// as well as keep track of.
if (v1beta1ChannelGVK.Group == gvk.Group && v1beta1ChannelGVK.Kind == gvk.Kind) ||
(v1ChannelGVK.Group == gvk.Group && v1ChannelGVK.Kind == gvk.Kind) {
if v1ChannelGVK.Group == gvk.Group && v1ChannelGVK.Kind == gvk.Kind {
// Track changes on Channel.
// Ref: https://github.com/knative/eventing/issues/2641
// NOTE: There is a race condition with using the channelableTracker
Expand All @@ -371,7 +371,7 @@ func (r *Reconciler) getChannel(ctx context.Context, sub *v1beta1.Subscription)
// to the cache we intend to use to pull the Channel from. This linkage
// is setup in NewController for r.tracker.
if err := r.tracker.TrackReference(tracker.Reference{
APIVersion: "messaging.knative.dev/v1beta1",
APIVersion: "messaging.knative.dev/v1",
Kind: "Channel",
Namespace: sub.Namespace,
Name: sub.Spec.Channel.Name,
Expand Down Expand Up @@ -412,16 +412,16 @@ func (r *Reconciler) getChannel(ctx context.Context, sub *v1beta1.Subscription)
return ch.DeepCopy(), nil
}

func isNilOrEmptyDeliveryDeadLetterSink(delivery *eventingduckv1beta1.DeliverySpec) bool {
return delivery == nil || equality.Semantic.DeepEqual(delivery, &eventingduckv1beta1.DeliverySpec{}) ||
func isNilOrEmptyDeliveryDeadLetterSink(delivery *eventingduckv1.DeliverySpec) bool {
return delivery == nil || equality.Semantic.DeepEqual(delivery, &eventingduckv1.DeliverySpec{}) ||
delivery.DeadLetterSink == nil
}

func isNilOrEmptyDestination(destination *duckv1.Destination) bool {
return destination == nil || equality.Semantic.DeepEqual(destination, &duckv1.Destination{})
}

func (r *Reconciler) syncPhysicalChannel(ctx context.Context, sub *v1beta1.Subscription, channel *eventingduckv1alpha1.ChannelableCombined, isDeleted bool) (bool, error) {
func (r *Reconciler) syncPhysicalChannel(ctx context.Context, sub *v1.Subscription, channel *eventingduckv1alpha1.ChannelableCombined, isDeleted bool) (bool, error) {
logging.FromContext(ctx).Debug("Reconciling physical from Channel", zap.Any("sub", sub))
if patched, patchErr := r.patchSubscription(ctx, sub.Namespace, channel, sub); patchErr != nil {
if isDeleted && apierrors.IsNotFound(patchErr) {
Expand All @@ -434,7 +434,7 @@ func (r *Reconciler) syncPhysicalChannel(ctx context.Context, sub *v1beta1.Subsc
}
}

func (r *Reconciler) patchSubscription(ctx context.Context, namespace string, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1beta1.Subscription) (bool, error) {
func (r *Reconciler) patchSubscription(ctx context.Context, namespace string, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1.Subscription) (bool, error) {
after := channel.DeepCopy()

if sub.DeletionTimestamp.IsZero() {
Expand Down Expand Up @@ -467,18 +467,18 @@ func (r *Reconciler) patchSubscription(ctx context.Context, namespace string, ch
return true, nil
}

func (r *Reconciler) updateChannelRemoveSubscription(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1beta1.Subscription) {
func (r *Reconciler) updateChannelRemoveSubscription(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1.Subscription) {
if channel.Annotations != nil {
if channel.Annotations[messaging.SubscribableDuckVersionAnnotation] == "v1beta1" ||
channel.Annotations[messaging.SubscribableDuckVersionAnnotation] == "v1" {
if channel.Annotations[messaging.SubscribableDuckVersionAnnotation] == "v1" ||
channel.Annotations[messaging.SubscribableDuckVersionAnnotation] == "v1beta1" {
r.updateChannelRemoveSubscriptionV1Beta1(ctx, channel, sub)
return
}
}
r.updateChannelRemoveSubscriptionV1Alpha1(ctx, channel, sub)
}

func (r *Reconciler) updateChannelRemoveSubscriptionV1Beta1(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1beta1.Subscription) {
func (r *Reconciler) updateChannelRemoveSubscriptionV1Beta1(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1.Subscription) {
for i, v := range channel.Spec.Subscribers {
if v.UID == sub.UID {
channel.Spec.Subscribers = append(
Expand All @@ -489,7 +489,7 @@ func (r *Reconciler) updateChannelRemoveSubscriptionV1Beta1(ctx context.Context,
}
}

func (r *Reconciler) updateChannelRemoveSubscriptionV1Alpha1(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1beta1.Subscription) {
func (r *Reconciler) updateChannelRemoveSubscriptionV1Alpha1(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1.Subscription) {
if channel.Spec.Subscribable == nil {
return
}
Expand All @@ -504,7 +504,7 @@ func (r *Reconciler) updateChannelRemoveSubscriptionV1Alpha1(ctx context.Context
}
}

func (r *Reconciler) updateChannelAddSubscription(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1beta1.Subscription) {
func (r *Reconciler) updateChannelAddSubscription(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1.Subscription) {
if channel.Annotations != nil {
if channel.Annotations[messaging.SubscribableDuckVersionAnnotation] == "v1beta1" ||
channel.Annotations[messaging.SubscribableDuckVersionAnnotation] == "v1" {
Expand All @@ -515,7 +515,7 @@ func (r *Reconciler) updateChannelAddSubscription(ctx context.Context, channel *
r.updateChannelAddSubscriptionV1Alpha1(ctx, channel, sub)
}

func (r *Reconciler) updateChannelAddSubscriptionV1Alpha1(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1beta1.Subscription) {
func (r *Reconciler) updateChannelAddSubscriptionV1Alpha1(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1.Subscription) {
if channel.Spec.Subscribable == nil {
channel.Spec.Subscribable = &eventingduckv1alpha1.Subscribable{
Subscribers: []eventingduckv1alpha1.SubscriberSpec{{
Expand Down Expand Up @@ -551,7 +551,7 @@ func (r *Reconciler) updateChannelAddSubscriptionV1Alpha1(ctx context.Context, c
})
}

func (r *Reconciler) updateChannelAddSubscriptionV1Beta1(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1beta1.Subscription) {
func (r *Reconciler) updateChannelAddSubscriptionV1Beta1(ctx context.Context, channel *eventingduckv1alpha1.ChannelableCombined, sub *v1.Subscription) {
// Look to update subscriber.
for i, v := range channel.Spec.Subscribers {
if v.UID == sub.UID {
Expand Down
Loading

0 comments on commit 9326a01

Please sign in to comment.