From 4320bed6851fb7da407800e63ab46f5dc2dc4503 Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Tue, 16 Jun 2020 04:59:48 +0200 Subject: [PATCH] Add support for deadletter and expiry settings Default settings are set to broker defaults, except broker full policy, which is set to FAIL. More settings will be exposed at a later point. Add systemtests for deadletter and expiry behavior, modify the AMQP client to be able to reject messages so that they end up in the dead letter queue. Make more tests run in multiple Kubernetes environments by retrieving a node ip and using nodeport to expose service. Issue #4469 --- pkg/state/broker/broker.go | 169 ++++++++++++++++-- pkg/state/broker/types.go | 45 +++++ pkg/state/infra.go | 30 ++++ .../enmasse/systemtest/amqp/AmqpClient.java | 7 +- .../systemtest/amqp/DeliveryHandler.java | 11 ++ .../io/enmasse/systemtest/amqp/Receiver.java | 8 +- .../platform/GenericKubernetes.java | 93 ++++++++++ .../systemtest/platform/Kubernetes.java | 49 +++-- .../platform/cluster/MinikubeCluster.java | 3 +- .../sharedinfra/MessagingAddressTest.java | 127 ++++++++++++- .../sharedinfra/MessagingEndpointTest.java | 15 +- 11 files changed, 504 insertions(+), 53 deletions(-) create mode 100644 systemtests/src/main/java/io/enmasse/systemtest/amqp/DeliveryHandler.java create mode 100644 systemtests/src/main/java/io/enmasse/systemtest/platform/GenericKubernetes.java diff --git a/pkg/state/broker/broker.go b/pkg/state/broker/broker.go index f0b14a0922f..14c1295fa90 100644 --- a/pkg/state/broker/broker.go +++ b/pkg/state/broker/broker.go @@ -77,7 +77,7 @@ func (b *BrokerState) Initialize(nextResync time.Time) error { b.reconnectCount = b.commandClient.ReconnectCount() totalEntities := 0 - entityTypes := []BrokerEntityType{BrokerQueueEntity, BrokerAddressEntity, BrokerDivertEntity} + entityTypes := []BrokerEntityType{BrokerQueueEntity, BrokerAddressEntity, BrokerDivertEntity, BrokerAddressSettingEntity} for _, t := range entityTypes { list, err := b.readEntities(t) if err != nil { @@ -216,6 +216,46 @@ func (b *BrokerState) readEntities(t BrokerEntityType) (map[string]BrokerEntity, default: return nil, fmt.Errorf("unexpected value with type %T", v) } + case BrokerAddressSettingEntity: + entities := make(map[string]BrokerEntity, 0) + for name, _ := range b.entities[BrokerQueueEntity] { + message, err := newManagementMessage("broker", "getAddressSettingsAsJSON", "", name) + if err != nil { + return nil, err + } + + result, err := doRequest(b.commandClient, message) + if err != nil { + return nil, err + } + if !success(result) { + return nil, fmt.Errorf("error reading address setting: %+v", result.Value) + } + + switch v := result.Value.(type) { + case string: + var entry []string + err := json.Unmarshal([]byte(v), &entry) + if err != nil { + return nil, err + } + + for _, e := range entry { + var setting BrokerAddressSetting + err := json.Unmarshal([]byte(e), &setting) + if err != nil { + return nil, err + } + + setting.Name = name + entities[name] = &setting + } + default: + return nil, fmt.Errorf("unexpected value with type %T", v) + } + } + log.Printf("[broker %s] Found address settings: %+v", b.host, entities) + return entities, nil default: return nil, fmt.Errorf("Unsupported entity type %s", t) } @@ -286,23 +326,33 @@ func (b *BrokerState) DeleteEntities(ctx context.Context, entities []BrokerEntit if !b.initialized { return NotInitializedError } - g, _ := errgroup.WithContext(ctx) completed := make(chan BrokerEntity, len(entities)) - for _, entity := range entities { - e := entity - if _, ok := b.entities[e.Type()][e.GetName()]; ok { - g.Go(func() error { - err := e.Delete(b.commandClient) - if err != nil { - return err + var err error + for order := maxOrder - 1; order >= 0; order-- { + g, _ := errgroup.WithContext(ctx) + for _, entity := range entities { + e := entity + if e.Order() == order { + if _, ok := b.entities[e.Type()][e.GetName()]; ok { + g.Go(func() error { + err := e.Delete(b.commandClient) + if err != nil { + return err + } + + completed <- e + return nil + }) } - completed <- e - return nil - }) + } + } + + err = g.Wait() + if err != nil { + break } } - err := g.Wait() close(completed) if isConnectionError(err) { b.Reset() @@ -417,7 +467,7 @@ func (b *BrokerQueue) Create(client amqpcommand.Client) error { } func (b *BrokerQueue) Delete(client amqpcommand.Client) error { - message, err := newManagementMessage("broker", "destroyQueue", "", b.Name, true, true) + message, err := newManagementMessage("broker", "destroyQueue", "", b.Name, true) if err != nil { return err } @@ -555,3 +605,94 @@ func (b *BrokerDivert) Delete(client amqpcommand.Client) error { log.Printf("Divert %s destroyed successfully on %s", b.Name, client.Addr()) return nil } + +/** + * Broker address settings + */ +func (b *BrokerAddressSetting) Type() BrokerEntityType { + return BrokerAddressSettingEntity +} + +func (b *BrokerAddressSetting) GetName() string { + return b.Name +} + +func (b *BrokerAddressSetting) Order() int { + return 0 +} + +func (b *BrokerAddressSetting) Equals(e BrokerEntity) bool { + if b.Type() != e.Type() { + return false + } + other := e.(*BrokerAddressSetting) + return b.Name == other.GetName() + // TODO: Compare more fields when we support updates +} + +func (b *BrokerAddressSetting) Create(client amqpcommand.Client) error { + log.Printf("[Broker %s] creating address setting: '%s'", client.Addr(), b.Name) + + message, err := newManagementMessage("broker", "addAddressSettings", "", + b.Name, + b.DeadLetterAddress, + b.ExpiryAddress, + b.ExpiryDelay, + b.LastValueQueue, + b.DeliveryAttempts, + b.MaxSizeBytes, + b.PageSizeBytes, + b.PageMaxCacheSize, + b.RedeliveryDelay, + b.RedeliveryMultiplier, + b.MaxRedeliveryDelay, + b.RedistributionDelay, + b.SendToDLAOnNoRoute, + b.AddressFullMessagePolicy, + b.SlowConsumerThreshold, + b.SlowConsumerCheckPeriod, + b.SlowConsumerPolicy, + b.AutoCreateJmsQueues, + b.AutoDeleteJmsQueues, + b.AutoCreateJmsTopics, + b.AutoDeleteJmsTopics, + b.AutoCreateQueues, + b.AutoDeleteQueues, + b.AutoCreateAddresses, + b.AutoDeleteAddresses) + + if err != nil { + return err + } + log.Printf("Creating address setting %s on %s: %+v", b.Name, client.Addr(), message) + response, err := doRequest(client, message) + if err != nil { + return err + } + if !success(response) { + return fmt.Errorf("error creating address setting %s: %+v", b.Name, response.Value) + } + log.Printf("Address setting %s created successfully on %s", b.Name, client.Addr()) + return nil +} + +func (b *BrokerAddressSetting) Delete(client amqpcommand.Client) error { + message, err := newManagementMessage("broker", "removeAddressSettings", "", b.Name) + if err != nil { + return err + } + + log.Printf("Removing address setting %s on %s", b.Name, client.Addr()) + + response, err := doRequest(client, message) + if err != nil { + return err + } + + if !success(response) { + return fmt.Errorf("error removing address setting %s: %+v", b.Name, response.Value) + } + + log.Printf("Address setting %s destroyed successfully on %s", b.Name, client.Addr()) + return nil +} diff --git a/pkg/state/broker/types.go b/pkg/state/broker/types.go index c6d94a666d4..bfbe8ad9476 100644 --- a/pkg/state/broker/types.go +++ b/pkg/state/broker/types.go @@ -65,6 +65,51 @@ type BrokerDivert struct { TransformerClassName string `json:"transformerClassName,omitempty"` } +type BrokerAddressSetting struct { + Name string `json:"name"` + DeadLetterAddress string `json:"DLA,omitempty"` + ExpiryAddress string `json:"expiryAddress,omitempty"` + ExpiryDelay int64 `json:"expiryDelay,omitempty"` + LastValueQueue bool `json:"lastValueQueue,omitempty"` + DeliveryAttempts int32 `json:"deliveryAttempts,omitempty"` + MaxSizeBytes int64 `json:"maxSizeBytes,omitempty"` + PageSizeBytes int32 `json:"pageSizeBytes,omitempty"` + PageMaxCacheSize int32 `json:"pageMaxCacheSize,omitempty"` + RedeliveryDelay int64 `json:"redeliveryDelay,omitempty"` + RedeliveryMultiplier float64 `json:"redeliveryMultiplier,omitempty"` + MaxRedeliveryDelay int64 `json:"maxRedeliveryDelay,omitempty"` + RedistributionDelay int64 `json:"redistributionDelay,omitempty"` + SendToDLAOnNoRoute bool `json:"sendToDLAOnNoRoute,omitempty"` + AddressFullMessagePolicy AddressFullPolicy `json:"addressFullMessagePolicy,omitempty"` + SlowConsumerThreshold int64 `json:"slowConsumerThreshold,omitempty"` + SlowConsumerCheckPeriod int64 `json:"slowConsumerCheckPeriod,omitempty"` + SlowConsumerPolicy SlowConsumerPolicy `json:"slowConsumerPolicy,omitempty"` + AutoCreateJmsQueues bool `json:"autoCreateJmsQueues,omitempty"` + AutoDeleteJmsQueues bool `json:"autoDeleteJmsQueues,omitempty"` + AutoCreateJmsTopics bool `json:"autoCreateJmsTopics,omitempty"` + AutoDeleteJmsTopics bool `json:"autoDeleteJmsTopics,omitempty"` + AutoCreateQueues bool `json:"autoCreateQueues,omitempty"` + AutoDeleteQueues bool `json:"autoDeleteQueues,omitempty"` + AutoCreateAddresses bool `json:"autoCreateAddresses,omitempty"` + AutoDeleteAddresses bool `json:"autoDeleteAddresses,omitempty"` +} + +type AddressFullPolicy string + +const ( + AddressFullPolicyDrop AddressFullPolicy = "DROP" + AddressFullPolicyPage AddressFullPolicy = "PAGE" + AddressFullPolicyFail AddressFullPolicy = "FAIL" + AddressFullPolicyBlock AddressFullPolicy = "BLOCK" +) + +type SlowConsumerPolicy string + +const ( + SlowConsumerPolicyKill SlowConsumerPolicy = "KILL" + SlowConsumerPolicyNotify SlowConsumerPolicy = "NOTIFY" +) + type RoutingType string const ( diff --git a/pkg/state/infra.go b/pkg/state/infra.go index a89a916901e..6b99e2b7d23 100644 --- a/pkg/state/infra.go +++ b/pkg/state/infra.go @@ -1007,6 +1007,17 @@ func (i *infraClient) buildBrokerAddressEntities(endpoint *v1beta2.MessagingEndp PurgeOnNoConsumers: false, AutoCreateAddress: false, }) + + settings := createDefaultAddressSettings(fullAddress) + if address.Spec.Queue.DeadLetterAddress != "" { + settings.DeadLetterAddress = qualifiedAddress(tenantId, address.Spec.Queue.DeadLetterAddress) + } + + if address.Spec.Queue.ExpiryAddress != "" { + settings.ExpiryAddress = qualifiedAddress(tenantId, address.Spec.Queue.ExpiryAddress) + } + + brokerEntities[host] = append(brokerEntities[host], settings) } } else if address.Spec.DeadLetter != nil { for _, brokerState := range i.brokers { @@ -1233,6 +1244,25 @@ func (i *infraClient) collectRequests(c chan *request) []*request { } } +func createDefaultAddressSettings(address string) *BrokerAddressSetting { + return &BrokerAddressSetting{ + Name: address, + ExpiryDelay: -1, + DeliveryAttempts: 10, + MaxSizeBytes: -1, + PageSizeBytes: 10485760, + PageMaxCacheSize: 5, + RedeliveryDelay: 0, + RedeliveryMultiplier: 1.0, + MaxRedeliveryDelay: 10000, + RedistributionDelay: -1, + AddressFullMessagePolicy: AddressFullPolicyFail, + SlowConsumerThreshold: -1, + SlowConsumerCheckPeriod: -1, + SlowConsumerPolicy: SlowConsumerPolicyKill, + } +} + func autoLinkName(tenantId string, address *v1beta2.MessagingAddress, host string, direction string) string { return fmt.Sprintf("autoLink-%s-%s-%s-%s", tenantId, address.Name, host, direction) } diff --git a/systemtests/src/main/java/io/enmasse/systemtest/amqp/AmqpClient.java b/systemtests/src/main/java/io/enmasse/systemtest/amqp/AmqpClient.java index 91c1001af44..f5fc030cab4 100644 --- a/systemtests/src/main/java/io/enmasse/systemtest/amqp/AmqpClient.java +++ b/systemtests/src/main/java/io/enmasse/systemtest/amqp/AmqpClient.java @@ -15,6 +15,7 @@ import io.vertx.core.impl.ConcurrentHashSet; import io.vertx.proton.ProtonConnection; import io.vertx.proton.ProtonDelivery; +import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.Target; @@ -122,13 +123,17 @@ public Future> recvMessages(Source source, String linkName, Predic } public ReceiverStatus recvMessages(Source source, Predicate done, Optional linkName) { + return recvMessages(source, done, linkName, protonDelivery -> protonDelivery.disposition(Accepted.getInstance(), true)); + } + + public ReceiverStatus recvMessages(Source source, Predicate done, Optional linkName, DeliveryHandler deliveryHandler) { CompletableFuture> resultPromise = new CompletableFuture<>(); Vertx vertx = VertxFactory.create(); clients.add(vertx); String containerId = "systemtest-receiver-" + source.getAddress(); CompletableFuture connectPromise = new CompletableFuture<>(); - Receiver receiver = new Receiver(options, done, new LinkOptions(source, new Target(), linkName), connectPromise, resultPromise, containerId); + Receiver receiver = new Receiver(options, done, new LinkOptions(source, new Target(), linkName), connectPromise, resultPromise, containerId, deliveryHandler); vertx.deployVerticle(receiver); try { connectPromise.get(2, TimeUnit.MINUTES); diff --git a/systemtests/src/main/java/io/enmasse/systemtest/amqp/DeliveryHandler.java b/systemtests/src/main/java/io/enmasse/systemtest/amqp/DeliveryHandler.java new file mode 100644 index 00000000000..29b97892a7a --- /dev/null +++ b/systemtests/src/main/java/io/enmasse/systemtest/amqp/DeliveryHandler.java @@ -0,0 +1,11 @@ +/* + * Copyright 2020, EnMasse authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.enmasse.systemtest.amqp; + +import io.vertx.proton.ProtonDelivery; + +public interface DeliveryHandler { + void handle(ProtonDelivery protonDelivery); +} diff --git a/systemtests/src/main/java/io/enmasse/systemtest/amqp/Receiver.java b/systemtests/src/main/java/io/enmasse/systemtest/amqp/Receiver.java index 29550858cdb..71fa77fcbbc 100644 --- a/systemtests/src/main/java/io/enmasse/systemtest/amqp/Receiver.java +++ b/systemtests/src/main/java/io/enmasse/systemtest/amqp/Receiver.java @@ -28,10 +28,12 @@ public class Receiver extends ClientHandlerBase> { private final List messages = new ArrayList<>(); private final AtomicInteger messageCount = new AtomicInteger(); private final Predicate done; + private final DeliveryHandler deliveryHandler; - public Receiver(AmqpConnectOptions clientOptions, Predicate done, LinkOptions linkOptions, CompletableFuture connectPromise, CompletableFuture> resultPromise, String containerId) { + public Receiver(AmqpConnectOptions clientOptions, Predicate done, LinkOptions linkOptions, CompletableFuture connectPromise, CompletableFuture> resultPromise, String containerId, DeliveryHandler deliveryHandler) { super(clientOptions, linkOptions, connectPromise, resultPromise, containerId); this.done = done; + this.deliveryHandler = deliveryHandler; } @Override @@ -43,10 +45,12 @@ private void connectionOpened(ProtonConnection conn, String linkName, Source sou receiver = conn.createReceiver(source.getAddress(), new ProtonLinkOptions().setLinkName(linkName)); receiver.setSource(source); receiver.setPrefetch(0); + receiver.setAutoAccept(false); receiver.handler((protonDelivery, message) -> { + log.info("Got message, count is {}", messageCount.get()); messages.add(message); messageCount.incrementAndGet(); - protonDelivery.disposition(Accepted.getInstance(), true); + deliveryHandler.handle(protonDelivery); if (done.test(message)) { resultPromise.complete(messages); conn.close(); diff --git a/systemtests/src/main/java/io/enmasse/systemtest/platform/GenericKubernetes.java b/systemtests/src/main/java/io/enmasse/systemtest/platform/GenericKubernetes.java new file mode 100644 index 00000000000..832d5906a29 --- /dev/null +++ b/systemtests/src/main/java/io/enmasse/systemtest/platform/GenericKubernetes.java @@ -0,0 +1,93 @@ +/* + * Copyright 2020, EnMasse authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.enmasse.systemtest.platform; + +import io.enmasse.systemtest.Endpoint; +import io.enmasse.systemtest.Environment; +import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.api.model.ServicePort; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.ConfigBuilder; +import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import io.fabric8.kubernetes.client.utils.HttpClientUtils; +import okhttp3.OkHttpClient; +import okhttp3.Protocol; + +import java.util.Collections; + +public class GenericKubernetes extends Kubernetes { + + private static final String OLM_NAMESPACE = "operators"; + + protected GenericKubernetes(Environment environment) { + super(environment, () -> { + Config config = new ConfigBuilder().build(); + OkHttpClient httpClient = HttpClientUtils.createHttpClient(config); + // Workaround https://github.com/square/okhttp/issues/3146 + httpClient = httpClient.newBuilder() + .protocols(Collections.singletonList(Protocol.HTTP_1_1)) + .connectTimeout(environment.getKubernetesApiConnectTimeout()) + .writeTimeout(environment.getKubernetesApiWriteTimeout()) + .readTimeout(environment.getKubernetesApiReadTimeout()) + .build(); + return new DefaultKubernetesClient(httpClient, config); + }); + } + + @Override + public Endpoint getMasterEndpoint() { + return new Endpoint(client.getMasterUrl()); + } + + @Override + public Endpoint getRestEndpoint() { + return new Endpoint(client.getMasterUrl()); + } + + @Override + public Endpoint getKeycloakEndpoint() { + return getExternalEndpoint("standard-authservice"); + } + + @Override + public Endpoint getExternalEndpoint(String name) { + return getExternalEndpoint(name, infraNamespace); + } + + @Override + public Endpoint getExternalEndpoint(String name, String namespace) { + throw new UnsupportedOperationException(); + } + + @Override + public void createExternalEndpoint(String name, String namespace, Service service, ServicePort targetPort) { + throw new UnsupportedOperationException(); + } + + @Override + public void deleteExternalEndpoint(String namespace, String name) { + throw new UnsupportedOperationException(); + } + + @Override + public String getHost() { + return getNodeHost(); + } + + @Override + public String getOlmNamespace() { + return OLM_NAMESPACE; + } + + @Override + public String getClusterExternalImageRegistry() { + return null; + } + + @Override + public String getClusterInternalImageRegistry() { + return null; + } +} diff --git a/systemtests/src/main/java/io/enmasse/systemtest/platform/Kubernetes.java b/systemtests/src/main/java/io/enmasse/systemtest/platform/Kubernetes.java index 2d56bebc07d..e2981de1468 100644 --- a/systemtests/src/main/java/io/enmasse/systemtest/platform/Kubernetes.java +++ b/systemtests/src/main/java/io/enmasse/systemtest/platform/Kubernetes.java @@ -35,6 +35,8 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import io.enmasse.systemtest.platform.cluster.KubernetesCluster; +import io.fabric8.kubernetes.api.model.*; import org.apache.commons.io.output.CloseShieldOutputStream; import org.slf4j.Logger; @@ -93,20 +95,6 @@ import io.enmasse.user.model.v1.User; import io.enmasse.user.model.v1.UserCrd; import io.enmasse.user.model.v1.UserList; -import io.fabric8.kubernetes.api.model.ConfigMap; -import io.fabric8.kubernetes.api.model.ConfigMapList; -import io.fabric8.kubernetes.api.model.Container; -import io.fabric8.kubernetes.api.model.DoneablePod; -import io.fabric8.kubernetes.api.model.HasMetadata; -import io.fabric8.kubernetes.api.model.Namespace; -import io.fabric8.kubernetes.api.model.NamespaceBuilder; -import io.fabric8.kubernetes.api.model.PersistentVolumeClaim; -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.api.model.Secret; -import io.fabric8.kubernetes.api.model.Service; -import io.fabric8.kubernetes.api.model.ServiceAccount; -import io.fabric8.kubernetes.api.model.ServiceAccountBuilder; -import io.fabric8.kubernetes.api.model.ServicePort; import io.fabric8.kubernetes.api.model.apiextensions.CustomResourceDefinition; import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.api.model.apps.ReplicaSet; @@ -157,8 +145,10 @@ public static Kubernetes getInstance() { throw new RuntimeException(ex); } Environment env = Environment.getInstance(); - if (cluster.toString().equals(MinikubeCluster.IDENTIFIER) || cluster.toString().equals(KubernetesCluster.IDENTIFIER)) { + if (cluster.toString().equals(MinikubeCluster.IDENTIFIER)) { instance = new Minikube(env); + } else if (cluster.toString().equals(KubernetesCluster.IDENTIFIER)) { + instance = new GenericKubernetes(env); } else { instance = new OpenShift(env); } @@ -187,7 +177,9 @@ public static Kubernetes getInstance() { /** * Retrieve host or ip address of Kubernetes node. */ - public abstract String getHost(); + public String getHost() { + return "localhost"; + } public List listRoutes(String namespace, Map labels) { throw new UnsupportedOperationException(); @@ -1236,6 +1228,31 @@ public FileVisitResult visitFile(final Path file, final BasicFileAttributes attr } + /** + * Return the external IP for the first node found in the cluster. + */ + public String getNodeHost() { + List addresses = client.nodes().list().getItems().stream() + .peek(n -> CustomLogger.getLogger().info("Found node: {}", n)) + .flatMap(n -> n.getStatus().getAddresses().stream() + .peek(a -> CustomLogger.getLogger().info("Found address: {}", a)) + .filter(a -> a.getType().equals("InternalIP") || a.getType().equals("ExternalIP"))) + .collect(Collectors.toList()); + if (addresses.isEmpty()) { + return null; + } + + // Return public ip if exists + for (NodeAddress address : addresses) { + if (address.getType().equals("ExternalIP")) { + return address.getAddress(); + } + } + + // Fall back to first internal ip + return addresses.get(0).getAddress(); + } + @FunctionalInterface public interface AfterInput { void afterInput(final OutputStream remoteInput) throws IOException; diff --git a/systemtests/src/main/java/io/enmasse/systemtest/platform/cluster/MinikubeCluster.java b/systemtests/src/main/java/io/enmasse/systemtest/platform/cluster/MinikubeCluster.java index c7a4d8a12a5..b6a1c79f925 100644 --- a/systemtests/src/main/java/io/enmasse/systemtest/platform/cluster/MinikubeCluster.java +++ b/systemtests/src/main/java/io/enmasse/systemtest/platform/cluster/MinikubeCluster.java @@ -5,6 +5,7 @@ package io.enmasse.systemtest.platform.cluster; import io.enmasse.systemtest.executor.Exec; +import io.enmasse.systemtest.executor.ExecutionResultData; import java.util.Arrays; @@ -14,7 +15,7 @@ public class MinikubeCluster implements KubeCluster { @Override public boolean isAvailable() { - return Exec.isExecutableOnPath(IDENTIFIER); + return Exec.isExecutableOnPath(IDENTIFIER) && Exec.execute(IDENTIFIER, "status").getRetCode(); } @Override diff --git a/systemtests/src/test/java/io/enmasse/systemtest/sharedinfra/MessagingAddressTest.java b/systemtests/src/test/java/io/enmasse/systemtest/sharedinfra/MessagingAddressTest.java index 357668a3e5a..b17f81aaafc 100644 --- a/systemtests/src/test/java/io/enmasse/systemtest/sharedinfra/MessagingAddressTest.java +++ b/systemtests/src/test/java/io/enmasse/systemtest/sharedinfra/MessagingAddressTest.java @@ -10,28 +10,45 @@ import io.enmasse.api.model.MessagingEndpointPort; import io.enmasse.api.model.MessagingTenant; import io.enmasse.systemtest.Endpoint; +import io.enmasse.systemtest.amqp.AmqpClient; +import io.enmasse.systemtest.amqp.AmqpConnectOptions; +import io.enmasse.systemtest.amqp.DeliveryHandler; +import io.enmasse.systemtest.amqp.QueueTerminusFactory; import io.enmasse.systemtest.annotations.DefaultMessagingInfrastructure; import io.enmasse.systemtest.annotations.DefaultMessagingTenant; import io.enmasse.systemtest.annotations.ExternalClients; import io.enmasse.systemtest.bases.TestBase; import io.enmasse.systemtest.bases.isolated.ITestIsolatedSharedInfra; +import io.enmasse.systemtest.condition.Kubernetes; +import io.enmasse.systemtest.condition.OpenShift; import io.enmasse.systemtest.messagingclients.ClientArgument; import io.enmasse.systemtest.messagingclients.ExternalMessagingClient; -import io.enmasse.systemtest.messagingclients.proton.java.ProtonJMSClientReceiver; -import io.enmasse.systemtest.messagingclients.proton.java.ProtonJMSClientSender; import io.enmasse.systemtest.messagingclients.rhea.RheaClientReceiver; import io.enmasse.systemtest.messagingclients.rhea.RheaClientSender; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.net.PemTrustOptions; +import io.vertx.proton.ProtonClientOptions; +import io.vertx.proton.ProtonDelivery; +import io.vertx.proton.ProtonQoS; +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.amqp.messaging.Rejected; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.message.Message; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; import static io.enmasse.systemtest.TestTag.ISOLATED_SHARED_INFRA; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -112,6 +129,86 @@ public void testQueue() throws Exception { doTestSendReceive("queue1", "queue1"); } + @Test + public void testDeadLetterExpiry() throws Exception { + infraResourceManager.createResource(new MessagingAddressBuilder() + .editOrNewMetadata() + .withName("dlq1") + .withNamespace(tenant.getMetadata().getNamespace()) + .endMetadata() + .editOrNewSpec() + .editOrNewDeadLetter() + .endDeadLetter() + .endSpec() + .build()); + infraResourceManager.createResource(new MessagingAddressBuilder() + .editOrNewMetadata() + .withName("queue1") + .withNamespace(tenant.getMetadata().getNamespace()) + .endMetadata() + .editOrNewSpec() + .editOrNewQueue() + .withExpiryAddress("dlq1") + .endQueue() + .endSpec() + .build()); + + doTestSendReceive(false, Collections.singletonMap(ClientArgument.MSG_TTL, "100"), null, "queue1", "dlq1"); + } + + @Test + public void testDeadLetterConsume() throws Exception { + MessagingEndpoint ingress = new MessagingEndpointBuilder() + .editOrNewMetadata() + .withName("dlq") + .withNamespace(tenant.getMetadata().getNamespace()) + .endMetadata() + .editOrNewSpec() + .editOrNewNodePort() + .endNodePort() + .withHost(kubernetes.getNodeHost()) + .addToProtocols("AMQP") + .endSpec() + .build(); + infraResourceManager.createResource(ingress); + infraResourceManager.createResource(new MessagingAddressBuilder() + .editOrNewMetadata() + .withName("dlq1") + .withNamespace(tenant.getMetadata().getNamespace()) + .endMetadata() + .editOrNewSpec() + .editOrNewDeadLetter() + .endDeadLetter() + .endSpec() + .build()); + infraResourceManager.createResource(new MessagingAddressBuilder() + .editOrNewMetadata() + .withName("queue1") + .withNamespace(tenant.getMetadata().getNamespace()) + .endMetadata() + .editOrNewSpec() + .editOrNewQueue() + .withDeadLetterAddress("dlq1") + .endQueue() + .endSpec() + .build()); + + + AmqpClient client = infraResourceManager.getAmqpClientFactory().createClient(new AmqpConnectOptions() + .setSaslMechanism("ANONYMOUS") + .setQos(ProtonQoS.AT_LEAST_ONCE) + .setEndpoint(new Endpoint(ingress.getStatus().getHost(), ingress.getStatus().getPorts().get(0).getPort())) + .setProtonClientOptions(new ProtonClientOptions()) + .setTerminusFactory(new QueueTerminusFactory())); + + assertEquals(1, client.sendMessages("queue1", Collections.singletonList("todeadletter")).get(1, TimeUnit.MINUTES)); + Source source = new Source(); + source.setAddress("queue1"); + AtomicInteger redeliveries = new AtomicInteger(0); + assertEquals(1, client.recvMessages(source, message -> redeliveries.incrementAndGet() >= 1, Optional.empty(), protonDelivery -> protonDelivery.disposition(new Rejected(), true)).getResult().get(1, TimeUnit.MINUTES).size()); + assertEquals(1, client.recvMessages("dlq1", 1).get(1, TimeUnit.MINUTES).size()); + } + @Test public void testTopic() throws Exception { infraResourceManager.createResource(new MessagingAddressBuilder() @@ -167,7 +264,7 @@ public void testSubscription() throws Exception { /** * Send 10 messages on sender address, and receive 10 messages on each receiver address. */ - void doTestSendReceive(boolean waitReceivers, String senderAddress, String ... receiverAddresses) throws Exception { + void doTestSendReceive(boolean waitReceivers, Map extraSenderArgs, Map extraReceiverArgs, String senderAddress, String ... receiverAddresses) throws Exception { int expectedMsgCount = 10; Endpoint e = new Endpoint(endpoint.getStatus().getHost(), getPort("AMQP", endpoint)); @@ -180,15 +277,29 @@ void doTestSendReceive(boolean waitReceivers, String senderAddress, String ... r .withAdditionalArgument(ClientArgument.CONN_AUTH_MECHANISM, "ANONYMOUS") .withTimeout(60); + if (extraSenderArgs != null) { + for (Map.Entry arg : extraSenderArgs.entrySet()) { + senderClient.withAdditionalArgument(arg.getKey(), arg.getValue()); + } + } + List receiverClients = new ArrayList<>(); for (String receiverAddress : receiverAddresses) { - receiverClients.add(new ExternalMessagingClient(false) + ExternalMessagingClient receiverClient = new ExternalMessagingClient(false) .withClientEngine(new RheaClientReceiver()) .withMessagingRoute(e) .withAddress(receiverAddress) .withCount(expectedMsgCount) .withAdditionalArgument(ClientArgument.CONN_AUTH_MECHANISM, "ANONYMOUS") - .withTimeout(60)); + .withTimeout(60); + + if (extraReceiverArgs != null) { + for (Map.Entry arg : extraReceiverArgs.entrySet()) { + receiverClient.withAdditionalArgument(arg.getKey(), arg.getValue()); + } + } + + receiverClients.add(receiverClient); } List> receiverResults = new ArrayList<>(); @@ -216,6 +327,10 @@ void doTestSendReceive(boolean waitReceivers, String senderAddress, String ... r } } + void doTestSendReceive(boolean waitReceivers, String senderAddress, String ... receiverAddresses) throws Exception { + doTestSendReceive(waitReceivers, null, null, senderAddress, receiverAddresses); + } + void doTestSendReceive(String senderAddress, String ... receiverAddresses) throws Exception { doTestSendReceive(false, senderAddress, receiverAddresses); } diff --git a/systemtests/src/test/java/io/enmasse/systemtest/sharedinfra/MessagingEndpointTest.java b/systemtests/src/test/java/io/enmasse/systemtest/sharedinfra/MessagingEndpointTest.java index ce65e416f97..ef042ac70c6 100644 --- a/systemtests/src/test/java/io/enmasse/systemtest/sharedinfra/MessagingEndpointTest.java +++ b/systemtests/src/test/java/io/enmasse/systemtest/sharedinfra/MessagingEndpointTest.java @@ -62,18 +62,7 @@ public class MessagingEndpointTest extends TestBase implements ITestIsolatedSharedInfra { @Test - @Kubernetes(type = ClusterType.MINIKUBE) - public void testNodePortEndpointMinikube() throws Exception { - testNodePortEndpoint(); - } - - @Test - @OpenShift(type = ClusterType.CRC) - public void testNodePortEndpointCRC() throws Exception { - testNodePortEndpoint(); - } - - private void testNodePortEndpoint() throws Exception { + public void testNodePortEndpoint() throws Exception { MessagingTenant tenant = infraResourceManager.getDefaultMessagingTenant(); MessagingEndpoint endpoint = new MessagingEndpointBuilder() .editOrNewMetadata() @@ -81,7 +70,7 @@ private void testNodePortEndpoint() throws Exception { .withName("app") .endMetadata() .editOrNewSpec() - .withHost(kubernetes.getHost()) + .withHost(kubernetes.getNodeHost()) .addToProtocols("AMQP") .editOrNewNodePort() .endNodePort()