Skip to content

Commit

Permalink
Add support for topic and subscriptions
Browse files Browse the repository at this point in the history
* Add support for non-durable and durable topics
* Add systemtest for all supported address types
* Fix bug in state reconnect not being correctly initialized

Issue EnMasseProject#4469
  • Loading branch information
lulf committed Jun 14, 2020
1 parent 213dbd6 commit 277f8ca
Show file tree
Hide file tree
Showing 11 changed files with 663 additions and 32 deletions.
16 changes: 16 additions & 0 deletions api-model/src/main/resources/schema/kube-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,16 @@
"github_com_enmasseproject_enmasse_pkg_apis_enmasse_v1beta2_MessagingAddressSpecQueue": {
"type": "object",
"description": "",
"properties": {
"deadLetterQueue": {
"type": "string",
"description": ""
},
"expiryQueue": {
"type": "string",
"description": ""
}
},
"additionalProperties": true,
"javaType": "MessagingAddressSpecQueue",
"javaInterfaces": [
Expand All @@ -239,6 +249,12 @@
"github_com_enmasseproject_enmasse_pkg_apis_enmasse_v1beta2_MessagingAddressSpecSubscription": {
"type": "object",
"description": "",
"properties": {
"topic": {
"type": "string",
"description": ""
}
},
"additionalProperties": true,
"javaType": "MessagingAddressSpecSubscription",
"javaInterfaces": [
Expand Down
8 changes: 2 additions & 6 deletions broker-plugin/plugin/src/main/resources/shared/broker.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ under the License.

<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="manage"/>
<permission type="deleteNonDurableQueue" roles="manage"/>
<permission type="createNonDurableQueue" roles="manage,router"/>
<permission type="deleteNonDurableQueue" roles="manage,router"/>
<permission type="createDurableQueue" roles="manage"/>
<permission type="deleteDurableQueue" roles="manage"/>
<permission type="createAddress" roles="manage"/>
Expand All @@ -99,10 +99,6 @@ under the License.
<address-settings>
<!--default for catch all-->
<address-setting match="#">
<max-delivery-attempts>-1</max-delivery-attempts>
<redelivery-delay>1</redelivery-delay>
<redelivery-delay-multiplier>1.5</redelivery-delay-multiplier>
<max-redelivery-delay>10000</max-redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/enmasse/v1beta2/types_address.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,18 @@ type MessagingAddressSpecMulticast struct {
}

type MessagingAddressSpecQueue struct {
// Dead letter address (must be address with type deadLetter)
DeadLetterAddress string `json:"deadLetterAddress,omitempty"`
// Expiry queue address (must be address with type deadLetter)
ExpiryAddress string `json:"expiryAddress,omitempty"`
}

type MessagingAddressSpecTopic struct {
}

type MessagingAddressSpecSubscription struct {
// Topic address this subscription should be subscribed to.
Topic string `json:"topic"`
}

type MessagingAddressSpecDeadLetter struct {
Expand Down Expand Up @@ -108,6 +114,7 @@ type MessagingAddressConditionType string

const (
MessagingAddressFoundTenant MessagingAddressConditionType = "FoundTenant"
MessagingAddressValidated MessagingAddressConditionType = "Validated"
MessagingAddressScheduled MessagingAddressConditionType = "Scheduled"
MessagingAddressCreated MessagingAddressConditionType = "Created"
MessagingAddressReady MessagingAddressConditionType = "Ready"
Expand Down
130 changes: 124 additions & 6 deletions pkg/controller/messagingaddress/messagingaddress_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,7 @@ func Add(mgr manager.Manager) error {
}

/**
* TODO - Add support for topic addresses
* TODO - Add support for scheduling based on broker load
* TODO - Add support for subscription addresses
* TODO - Add support for deadLetter addresses
* TODO - Add support for per-address limits based on MessagingAddressPlan
* TODO - Add support for migrating queues to different brokers based on scheduling decision
*/
Expand Down Expand Up @@ -150,6 +147,7 @@ func (r *ReconcileMessagingAddress) Reconcile(request reconcile.Request) (reconc

// Initialize phase and conditions and type
var foundTenant *v1beta2.MessagingAddressCondition
var validated *v1beta2.MessagingAddressCondition
var scheduled *v1beta2.MessagingAddressCondition
var created *v1beta2.MessagingAddressCondition
var ready *v1beta2.MessagingAddressCondition
Expand All @@ -171,6 +169,7 @@ func (r *ReconcileMessagingAddress) Reconcile(request reconcile.Request) (reconc
address.Status.Type = v1beta2.MessagingAddressTypeDeadLetter
}
foundTenant = address.Status.GetMessagingAddressCondition(v1beta2.MessagingAddressFoundTenant)
validated = address.Status.GetMessagingAddressCondition(v1beta2.MessagingAddressValidated)
scheduled = address.Status.GetMessagingAddressCondition(v1beta2.MessagingAddressScheduled)
created = address.Status.GetMessagingAddressCondition(v1beta2.MessagingAddressCreated)
ready = address.Status.GetMessagingAddressCondition(v1beta2.MessagingAddressReady)
Expand Down Expand Up @@ -207,8 +206,40 @@ func (r *ReconcileMessagingAddress) Reconcile(request reconcile.Request) (reconc
logger.Info("[Finalizer] Error looking up infra")
return reconcile.Result{}, err
}

if address.Spec.Topic != nil {
// For topics, make sure no subscriptions referencing this topic exists.
foundSub, err := matchAnyAddress(ctx, r.client, address.Namespace, func(a *v1beta2.MessagingAddress) bool {
return a.Spec.Subscription != nil && a.Spec.Subscription.Topic == address.GetAddress()
})
if err != nil {
return reconcile.Result{}, err
}
if foundSub {
err := fmt.Errorf("subscriptions referencing this topic address exists")
address.Status.Message = err.Error()
return reconcile.Result{Requeue: true}, nil
}
} else if address.Spec.DeadLetter != nil {
// For deadLetter addresses, make sure no queues are referencing it.
// For topics, make sure no subscriptions referencing this topic exists.
foundQueue, err := matchAnyAddress(ctx, r.client, address.Namespace, func(a *v1beta2.MessagingAddress) bool {
return a.Spec.Queue != nil && (a.Spec.Queue.DeadLetterAddress == address.GetAddress() || a.Spec.Queue.ExpiryAddress == address.GetAddress())
})
if err != nil {
return reconcile.Result{}, err
}
if foundQueue {
err := fmt.Errorf("queues referencing this deadletter address exists")
address.Status.Message = err.Error()
return reconcile.Result{Requeue: true}, nil
}
}
client := r.clientManager.GetClient(infra)
err = client.DeleteAddress(address)

// TODO: Notify Terminating deadLetter or topics referenced by this queue to speed up reconcile

logger.Info("[Finalizer] Deleted address", "err", err)
return reconcile.Result{}, err
},
Expand All @@ -221,6 +252,10 @@ func (r *ReconcileMessagingAddress) Reconcile(request reconcile.Request) (reconc
if result.Requeue {
// Update and requeue if changed
if !reflect.DeepEqual(original, address) {
if !reflect.DeepEqual(original.Status, address.Status) {
err := r.client.Status().Update(ctx, address)
return processorResult{Requeue: true}, err
}
err := r.client.Update(ctx, address)
return processorResult{Return: true}, err
}
Expand Down Expand Up @@ -249,13 +284,79 @@ func (r *ReconcileMessagingAddress) Reconcile(request reconcile.Request) (reconc
return result.Result(), err
}

// Schedule address. Scheduling and creation of addresses are separated and each step is persisted. This is to avoid
// the case where a scheduled address is forgotten if the operator crashes. Once persisted, the operator will
// be able to reconcile the broker state as specified in the address status.
// Perform validation of address
result, err = rc.Process(func(address *v1beta2.MessagingAddress) (processorResult, error) {
if address.Spec.Queue != nil &&
// Ensure any deadletter or expiry address exists
(address.Spec.Queue.DeadLetterAddress != "" || address.Spec.Queue.ExpiryAddress != "") {

deadLetterAddress := address.Spec.Queue.DeadLetterAddress
if deadLetterAddress != "" {
found, err := matchAnyAddress(ctx, r.client, address.Namespace, func(a *v1beta2.MessagingAddress) bool {
return a.Spec.DeadLetter != nil && a.GetAddress() == deadLetterAddress
})
if err != nil {
return processorResult{}, err
}
if !found {
err := fmt.Errorf("unable to find deadLetterAddress %s", deadLetterAddress)
validated.SetStatus(corev1.ConditionFalse, "", err.Error())
address.Status.Message = err.Error()
return processorResult{RequeueAfter: 10 * time.Second}, nil
}
}

expiryAddress := address.Spec.Queue.ExpiryAddress
if expiryAddress != "" {
found, err := matchAnyAddress(ctx, r.client, address.Namespace, func(a *v1beta2.MessagingAddress) bool {
return a.Spec.DeadLetter != nil && a.GetAddress() == expiryAddress
})
if err != nil {
return processorResult{}, err
}
if !found {
err := fmt.Errorf("unable to find expiryAddress %s", expiryAddress)
validated.SetStatus(corev1.ConditionFalse, "", err.Error())
address.Status.Message = err.Error()
return processorResult{RequeueAfter: 10 * time.Second}, nil
}
}
} else if address.Spec.Subscription != nil {
// Ensure our topic exists
topic := address.Spec.Subscription.Topic
if topic == "" {
err := fmt.Errorf("topic referenced by subscription must be non-empty")
validated.SetStatus(corev1.ConditionFalse, "", err.Error())
address.Status.Message = err.Error()
return processorResult{RequeueAfter: 10 * time.Second}, nil
}
found, err := matchAnyAddress(ctx, r.client, address.Namespace, func(a *v1beta2.MessagingAddress) bool {
return a.Spec.Topic != nil && a.GetAddress() == topic
})
if err != nil {
return processorResult{}, err
}

if !found {
err := fmt.Errorf("unable to find address for topic %s, required by subscription", topic)
validated.SetStatus(corev1.ConditionFalse, "", err.Error())
address.Status.Message = err.Error()
return processorResult{RequeueAfter: 10 * time.Second}, nil
}

}
validated.SetStatus(corev1.ConditionTrue, "", "")
return processorResult{}, nil
})
if result.ShouldReturn(err) {
return result.Result(), err
}

result, err = rc.Process(func(address *v1beta2.MessagingAddress) (processorResult, error) {
// TODO: Handle changes to partitions etc.
if len(address.Status.Brokers) > 0 {
// We're already scheduled so don't change
scheduled.SetStatus(corev1.ConditionTrue, "", "")
return processorResult{}, nil
}

Expand Down Expand Up @@ -326,6 +427,23 @@ func (r *ReconcileMessagingAddress) Reconcile(request reconcile.Request) (reconc
return result.Result(), err
}

type FilterFunc func(*v1beta2.MessagingAddress) bool

func matchAnyAddress(ctx context.Context, c client.Client, namespace string, filter FilterFunc) (bool, error) {
list := &v1beta2.MessagingAddressList{}
err := c.List(ctx, list, client.InNamespace(namespace))
if err != nil {
return false, err
}

for _, address := range list.Items {
if filter(&address) {
return true, nil
}
}
return false, nil
}

/*
* Automatically handle status update of the resource after running some reconcile logic.
*/
Expand Down
98 changes: 97 additions & 1 deletion pkg/state/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ func (b *BrokerState) Initialize(nextResync time.Time) error {

log.Printf("[Broker %s] Initializing...", b.Host)

b.reconnectCount = b.commandClient.ReconnectCount()
totalEntities := 0
entityTypes := []BrokerEntityType{BrokerQueueEntity, BrokerAddressEntity}
entityTypes := []BrokerEntityType{BrokerQueueEntity, BrokerAddressEntity, BrokerDivertEntity}
for _, t := range entityTypes {
list, err := b.readEntities(t)
if err != nil {
Expand Down Expand Up @@ -157,6 +158,40 @@ func (b *BrokerState) readEntities(t BrokerEntityType) (map[string]BrokerEntity,
default:
return nil, fmt.Errorf("unexpected value with type %T", v)
}
case BrokerDivertEntity:
message, err := newManagementMessage("broker", "getDivertNames", "")
if err != nil {
return nil, err
}

result, err := doRequest(b.commandClient, message)
if err != nil {
return nil, err
}
if !success(result) {
return nil, fmt.Errorf("error reading diverts: %+v", result.Value)
}

switch v := result.Value.(type) {
case string:
entities := make(map[string]BrokerEntity, 0)
var list [][]string
err := json.Unmarshal([]byte(result.Value.(string)), &list)
if err != nil {
return nil, err
}
for _, entry := range list {
for _, name := range entry {
entities[name] = &BrokerDivert{
Name: name,
}
}
}
log.Printf("[broker %s] Found diverts: %+v", b.Host, entities)
return entities, nil
default:
return nil, fmt.Errorf("unexpected value with type %T", v)
}
default:
return nil, fmt.Errorf("Unsupported entity type %s", t)
}
Expand Down Expand Up @@ -430,3 +465,64 @@ func (b *BrokerAddress) Delete(client amqpcommand.Client) error {
log.Printf("Address %s deleted successfully on %s", b.Name, client.Addr())
return nil
}

/**
* Broker Diverts
*/
func (b *BrokerDivert) Type() BrokerEntityType {
return BrokerDivertEntity
}

func (b *BrokerDivert) GetName() string {
return b.Name
}

func (b *BrokerDivert) Order() int {
return 0
}

// Updates not allowed for addresses: they are the same if they have the same type and name.
func (b *BrokerDivert) Equals(other BrokerEntity) bool {
return b.Type() == other.Type() &&
b.Name == other.GetName()
}

func (b *BrokerDivert) Create(client amqpcommand.Client) error {
log.Printf("[Broker %s] creating divert: '%s'", client.Addr(), b.Name)

message, err := newManagementMessage("broker", "createDivert", "", b.Name, b.RoutingName, b.Address, b.ForwardingAddress, b.Exclusive, b.FilterString, nil)
if err != nil {
return err
}
log.Printf("Creating divert %s on %s: %+v", b.Name, client.Addr(), message)
response, err := doRequest(client, message)
if err != nil {
return err
}
if !success(response) {
return fmt.Errorf("error creating divert %s: %+v", b.Name, response.Value)
}
log.Printf("Divert %s created successfully on %s", b.Name, client.Addr())
return nil
}

func (b *BrokerDivert) Delete(client amqpcommand.Client) error {
message, err := newManagementMessage("broker", "destroyDivert", "", b.Name)
if err != nil {
return err
}

log.Printf("Destroying divert %s on %s", b.Name, client.Addr())

response, err := doRequest(client, message)
if err != nil {
return err
}

if !success(response) {
return fmt.Errorf("error destroying divert %s: %+v", b.Name, response.Value)
}

log.Printf("Divert %s destroyed successfully on %s", b.Name, client.Addr())
return nil
}
Loading

0 comments on commit 277f8ca

Please sign in to comment.