Skip to content
This repository has been archived by the owner on Dec 4, 2023. It is now read-only.

Commit

Permalink
Add support for deadletter and expiry settings
Browse files Browse the repository at this point in the history
Default settings are set to broker defaults, except broker full
policy, which is set to FAIL. More settings will be exposed at a later
point.

Add systemtests for deadletter and expiry behavior, modify the AMQP
client to be able to reject messages so that they end up in the dead
letter queue.

Make more tests run in multiple Kubernetes environments by retrieving
a node ip and using nodeport to expose service.

Issue #4469
  • Loading branch information
lulf committed Jun 18, 2020
1 parent b9fe693 commit 4320bed
Show file tree
Hide file tree
Showing 11 changed files with 504 additions and 53 deletions.
169 changes: 155 additions & 14 deletions pkg/state/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (b *BrokerState) Initialize(nextResync time.Time) error {

b.reconnectCount = b.commandClient.ReconnectCount()
totalEntities := 0
entityTypes := []BrokerEntityType{BrokerQueueEntity, BrokerAddressEntity, BrokerDivertEntity}
entityTypes := []BrokerEntityType{BrokerQueueEntity, BrokerAddressEntity, BrokerDivertEntity, BrokerAddressSettingEntity}
for _, t := range entityTypes {
list, err := b.readEntities(t)
if err != nil {
Expand Down Expand Up @@ -216,6 +216,46 @@ func (b *BrokerState) readEntities(t BrokerEntityType) (map[string]BrokerEntity,
default:
return nil, fmt.Errorf("unexpected value with type %T", v)
}
case BrokerAddressSettingEntity:
entities := make(map[string]BrokerEntity, 0)
for name, _ := range b.entities[BrokerQueueEntity] {
message, err := newManagementMessage("broker", "getAddressSettingsAsJSON", "", name)
if err != nil {
return nil, err
}

result, err := doRequest(b.commandClient, message)
if err != nil {
return nil, err
}
if !success(result) {
return nil, fmt.Errorf("error reading address setting: %+v", result.Value)
}

switch v := result.Value.(type) {
case string:
var entry []string
err := json.Unmarshal([]byte(v), &entry)
if err != nil {
return nil, err
}

for _, e := range entry {
var setting BrokerAddressSetting
err := json.Unmarshal([]byte(e), &setting)
if err != nil {
return nil, err
}

setting.Name = name
entities[name] = &setting
}
default:
return nil, fmt.Errorf("unexpected value with type %T", v)
}
}
log.Printf("[broker %s] Found address settings: %+v", b.host, entities)
return entities, nil
default:
return nil, fmt.Errorf("Unsupported entity type %s", t)
}
Expand Down Expand Up @@ -286,23 +326,33 @@ func (b *BrokerState) DeleteEntities(ctx context.Context, entities []BrokerEntit
if !b.initialized {
return NotInitializedError
}
g, _ := errgroup.WithContext(ctx)
completed := make(chan BrokerEntity, len(entities))
for _, entity := range entities {
e := entity
if _, ok := b.entities[e.Type()][e.GetName()]; ok {
g.Go(func() error {
err := e.Delete(b.commandClient)
if err != nil {
return err
var err error
for order := maxOrder - 1; order >= 0; order-- {
g, _ := errgroup.WithContext(ctx)
for _, entity := range entities {
e := entity
if e.Order() == order {
if _, ok := b.entities[e.Type()][e.GetName()]; ok {
g.Go(func() error {
err := e.Delete(b.commandClient)
if err != nil {
return err
}

completed <- e
return nil
})
}
completed <- e
return nil
})
}
}

err = g.Wait()
if err != nil {
break
}
}

err := g.Wait()
close(completed)
if isConnectionError(err) {
b.Reset()
Expand Down Expand Up @@ -417,7 +467,7 @@ func (b *BrokerQueue) Create(client amqpcommand.Client) error {
}

func (b *BrokerQueue) Delete(client amqpcommand.Client) error {
message, err := newManagementMessage("broker", "destroyQueue", "", b.Name, true, true)
message, err := newManagementMessage("broker", "destroyQueue", "", b.Name, true)
if err != nil {
return err
}
Expand Down Expand Up @@ -555,3 +605,94 @@ func (b *BrokerDivert) Delete(client amqpcommand.Client) error {
log.Printf("Divert %s destroyed successfully on %s", b.Name, client.Addr())
return nil
}

/**
* Broker address settings
*/
func (b *BrokerAddressSetting) Type() BrokerEntityType {
return BrokerAddressSettingEntity
}

func (b *BrokerAddressSetting) GetName() string {
return b.Name
}

func (b *BrokerAddressSetting) Order() int {
return 0
}

func (b *BrokerAddressSetting) Equals(e BrokerEntity) bool {
if b.Type() != e.Type() {
return false
}
other := e.(*BrokerAddressSetting)
return b.Name == other.GetName()
// TODO: Compare more fields when we support updates
}

func (b *BrokerAddressSetting) Create(client amqpcommand.Client) error {
log.Printf("[Broker %s] creating address setting: '%s'", client.Addr(), b.Name)

message, err := newManagementMessage("broker", "addAddressSettings", "",
b.Name,
b.DeadLetterAddress,
b.ExpiryAddress,
b.ExpiryDelay,
b.LastValueQueue,
b.DeliveryAttempts,
b.MaxSizeBytes,
b.PageSizeBytes,
b.PageMaxCacheSize,
b.RedeliveryDelay,
b.RedeliveryMultiplier,
b.MaxRedeliveryDelay,
b.RedistributionDelay,
b.SendToDLAOnNoRoute,
b.AddressFullMessagePolicy,
b.SlowConsumerThreshold,
b.SlowConsumerCheckPeriod,
b.SlowConsumerPolicy,
b.AutoCreateJmsQueues,
b.AutoDeleteJmsQueues,
b.AutoCreateJmsTopics,
b.AutoDeleteJmsTopics,
b.AutoCreateQueues,
b.AutoDeleteQueues,
b.AutoCreateAddresses,
b.AutoDeleteAddresses)

if err != nil {
return err
}
log.Printf("Creating address setting %s on %s: %+v", b.Name, client.Addr(), message)
response, err := doRequest(client, message)
if err != nil {
return err
}
if !success(response) {
return fmt.Errorf("error creating address setting %s: %+v", b.Name, response.Value)
}
log.Printf("Address setting %s created successfully on %s", b.Name, client.Addr())
return nil
}

func (b *BrokerAddressSetting) Delete(client amqpcommand.Client) error {
message, err := newManagementMessage("broker", "removeAddressSettings", "", b.Name)
if err != nil {
return err
}

log.Printf("Removing address setting %s on %s", b.Name, client.Addr())

response, err := doRequest(client, message)
if err != nil {
return err
}

if !success(response) {
return fmt.Errorf("error removing address setting %s: %+v", b.Name, response.Value)
}

log.Printf("Address setting %s destroyed successfully on %s", b.Name, client.Addr())
return nil
}
45 changes: 45 additions & 0 deletions pkg/state/broker/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,51 @@ type BrokerDivert struct {
TransformerClassName string `json:"transformerClassName,omitempty"`
}

type BrokerAddressSetting struct {
Name string `json:"name"`
DeadLetterAddress string `json:"DLA,omitempty"`
ExpiryAddress string `json:"expiryAddress,omitempty"`
ExpiryDelay int64 `json:"expiryDelay,omitempty"`
LastValueQueue bool `json:"lastValueQueue,omitempty"`
DeliveryAttempts int32 `json:"deliveryAttempts,omitempty"`
MaxSizeBytes int64 `json:"maxSizeBytes,omitempty"`
PageSizeBytes int32 `json:"pageSizeBytes,omitempty"`
PageMaxCacheSize int32 `json:"pageMaxCacheSize,omitempty"`
RedeliveryDelay int64 `json:"redeliveryDelay,omitempty"`
RedeliveryMultiplier float64 `json:"redeliveryMultiplier,omitempty"`
MaxRedeliveryDelay int64 `json:"maxRedeliveryDelay,omitempty"`
RedistributionDelay int64 `json:"redistributionDelay,omitempty"`
SendToDLAOnNoRoute bool `json:"sendToDLAOnNoRoute,omitempty"`
AddressFullMessagePolicy AddressFullPolicy `json:"addressFullMessagePolicy,omitempty"`
SlowConsumerThreshold int64 `json:"slowConsumerThreshold,omitempty"`
SlowConsumerCheckPeriod int64 `json:"slowConsumerCheckPeriod,omitempty"`
SlowConsumerPolicy SlowConsumerPolicy `json:"slowConsumerPolicy,omitempty"`
AutoCreateJmsQueues bool `json:"autoCreateJmsQueues,omitempty"`
AutoDeleteJmsQueues bool `json:"autoDeleteJmsQueues,omitempty"`
AutoCreateJmsTopics bool `json:"autoCreateJmsTopics,omitempty"`
AutoDeleteJmsTopics bool `json:"autoDeleteJmsTopics,omitempty"`
AutoCreateQueues bool `json:"autoCreateQueues,omitempty"`
AutoDeleteQueues bool `json:"autoDeleteQueues,omitempty"`
AutoCreateAddresses bool `json:"autoCreateAddresses,omitempty"`
AutoDeleteAddresses bool `json:"autoDeleteAddresses,omitempty"`
}

type AddressFullPolicy string

const (
AddressFullPolicyDrop AddressFullPolicy = "DROP"
AddressFullPolicyPage AddressFullPolicy = "PAGE"
AddressFullPolicyFail AddressFullPolicy = "FAIL"
AddressFullPolicyBlock AddressFullPolicy = "BLOCK"
)

type SlowConsumerPolicy string

const (
SlowConsumerPolicyKill SlowConsumerPolicy = "KILL"
SlowConsumerPolicyNotify SlowConsumerPolicy = "NOTIFY"
)

type RoutingType string

const (
Expand Down
30 changes: 30 additions & 0 deletions pkg/state/infra.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,17 @@ func (i *infraClient) buildBrokerAddressEntities(endpoint *v1beta2.MessagingEndp
PurgeOnNoConsumers: false,
AutoCreateAddress: false,
})

settings := createDefaultAddressSettings(fullAddress)
if address.Spec.Queue.DeadLetterAddress != "" {
settings.DeadLetterAddress = qualifiedAddress(tenantId, address.Spec.Queue.DeadLetterAddress)
}

if address.Spec.Queue.ExpiryAddress != "" {
settings.ExpiryAddress = qualifiedAddress(tenantId, address.Spec.Queue.ExpiryAddress)
}

brokerEntities[host] = append(brokerEntities[host], settings)
}
} else if address.Spec.DeadLetter != nil {
for _, brokerState := range i.brokers {
Expand Down Expand Up @@ -1233,6 +1244,25 @@ func (i *infraClient) collectRequests(c chan *request) []*request {
}
}

func createDefaultAddressSettings(address string) *BrokerAddressSetting {
return &BrokerAddressSetting{
Name: address,
ExpiryDelay: -1,
DeliveryAttempts: 10,
MaxSizeBytes: -1,
PageSizeBytes: 10485760,
PageMaxCacheSize: 5,
RedeliveryDelay: 0,
RedeliveryMultiplier: 1.0,
MaxRedeliveryDelay: 10000,
RedistributionDelay: -1,
AddressFullMessagePolicy: AddressFullPolicyFail,
SlowConsumerThreshold: -1,
SlowConsumerCheckPeriod: -1,
SlowConsumerPolicy: SlowConsumerPolicyKill,
}
}

func autoLinkName(tenantId string, address *v1beta2.MessagingAddress, host string, direction string) string {
return fmt.Sprintf("autoLink-%s-%s-%s-%s", tenantId, address.Name, host, direction)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonDelivery;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
Expand Down Expand Up @@ -122,13 +123,17 @@ public Future<List<Message>> recvMessages(Source source, String linkName, Predic
}

public ReceiverStatus recvMessages(Source source, Predicate<Message> done, Optional<String> linkName) {
return recvMessages(source, done, linkName, protonDelivery -> protonDelivery.disposition(Accepted.getInstance(), true));
}

public ReceiverStatus recvMessages(Source source, Predicate<Message> done, Optional<String> linkName, DeliveryHandler deliveryHandler) {
CompletableFuture<List<Message>> resultPromise = new CompletableFuture<>();

Vertx vertx = VertxFactory.create();
clients.add(vertx);
String containerId = "systemtest-receiver-" + source.getAddress();
CompletableFuture<Void> connectPromise = new CompletableFuture<>();
Receiver receiver = new Receiver(options, done, new LinkOptions(source, new Target(), linkName), connectPromise, resultPromise, containerId);
Receiver receiver = new Receiver(options, done, new LinkOptions(source, new Target(), linkName), connectPromise, resultPromise, containerId, deliveryHandler);
vertx.deployVerticle(receiver);
try {
connectPromise.get(2, TimeUnit.MINUTES);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright 2020, EnMasse authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.enmasse.systemtest.amqp;

import io.vertx.proton.ProtonDelivery;

public interface DeliveryHandler {
void handle(ProtonDelivery protonDelivery);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ public class Receiver extends ClientHandlerBase<List<Message>> {
private final List<Message> messages = new ArrayList<>();
private final AtomicInteger messageCount = new AtomicInteger();
private final Predicate<Message> done;
private final DeliveryHandler deliveryHandler;

public Receiver(AmqpConnectOptions clientOptions, Predicate<Message> done, LinkOptions linkOptions, CompletableFuture<Void> connectPromise, CompletableFuture<List<Message>> resultPromise, String containerId) {
public Receiver(AmqpConnectOptions clientOptions, Predicate<Message> done, LinkOptions linkOptions, CompletableFuture<Void> connectPromise, CompletableFuture<List<Message>> resultPromise, String containerId, DeliveryHandler deliveryHandler) {
super(clientOptions, linkOptions, connectPromise, resultPromise, containerId);
this.done = done;
this.deliveryHandler = deliveryHandler;
}

@Override
Expand All @@ -43,10 +45,12 @@ private void connectionOpened(ProtonConnection conn, String linkName, Source sou
receiver = conn.createReceiver(source.getAddress(), new ProtonLinkOptions().setLinkName(linkName));
receiver.setSource(source);
receiver.setPrefetch(0);
receiver.setAutoAccept(false);
receiver.handler((protonDelivery, message) -> {
log.info("Got message, count is {}", messageCount.get());
messages.add(message);
messageCount.incrementAndGet();
protonDelivery.disposition(Accepted.getInstance(), true);
deliveryHandler.handle(protonDelivery);
if (done.test(message)) {
resultPromise.complete(messages);
conn.close();
Expand Down
Loading

0 comments on commit 4320bed

Please sign in to comment.