diff --git a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md index 1e69fc173cce4..48eab44e6b72a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md @@ -5,7 +5,9 @@ ### Features Added - Added support for sending/receiving messages with `Duration`, `OffsetDateTime` and `URI` in `applicationProperties`. - Updated the receiver to retry to obtain a new connection if the RequestResponseChannel in the current connection is disposed. - +- Added getter methods to `ServiceBusProcessorClient` to get the queue, topic and subscription names associated with + the processor. + ### Breaking Changes ### Bugs Fixed diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java index e79f9ba736b0c..7ae64e0db5a66 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java @@ -1191,6 +1191,8 @@ public ServiceBusSessionProcessorClientBuilder disableAutoComplete() { */ public ServiceBusProcessorClient buildProcessorClient() { return new ServiceBusProcessorClient(sessionReceiverClientBuilder, + sessionReceiverClientBuilder.queueName, sessionReceiverClientBuilder.topicName, + sessionReceiverClientBuilder.subscriptionName, Objects.requireNonNull(processMessage, "'processMessage' cannot be null"), Objects.requireNonNull(processError, "'processError' cannot be null"), processorClientOptions); } @@ -1686,6 +1688,8 @@ public ServiceBusProcessorClientBuilder disableAutoComplete() { */ public ServiceBusProcessorClient buildProcessorClient() { return new ServiceBusProcessorClient(serviceBusReceiverClientBuilder, + serviceBusReceiverClientBuilder.queueName, serviceBusReceiverClientBuilder.topicName, + serviceBusReceiverClientBuilder.subscriptionName, Objects.requireNonNull(processMessage, "'processMessage' cannot be null"), Objects.requireNonNull(processError, "'processError' cannot be null"), processorClientOptions); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java index 45363374cb395..a5e376b7ab204 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java @@ -135,17 +135,24 @@ public final class ServiceBusProcessorClient implements AutoCloseable { private final AtomicReference asyncClient = new AtomicReference<>(); private final AtomicBoolean isRunning = new AtomicBoolean(); private final TracerProvider tracerProvider; + private final String queueName; + private final String topicName; + private final String subscriptionName; private ScheduledExecutorService scheduledExecutor; /** * Constructor to create a sessions-enabled processor. * * @param sessionReceiverBuilder The session processor builder to create new instances of async clients. + * @param queueName The name of the queue this processor is associated with. + * @param topicName The name of the topic this processor is associated with. + * @param subscriptionName The name of the subscription this processor is associated with. * @param processMessage The message processing callback. * @param processError The error handler. * @param processorOptions Options to configure this instance of the processor. */ ServiceBusProcessorClient(ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder sessionReceiverBuilder, + String queueName, String topicName, String subscriptionName, Consumer processMessage, Consumer processError, ServiceBusProcessorClientOptions processorOptions) { @@ -157,17 +164,24 @@ public final class ServiceBusProcessorClient implements AutoCloseable { this.asyncClient.set(sessionReceiverBuilder.buildAsyncClientForProcessor()); this.receiverBuilder = null; this.tracerProvider = processorOptions.getTracerProvider(); + this.queueName = queueName; + this.topicName = topicName; + this.subscriptionName = subscriptionName; } /** * Constructor to create a processor. * * @param receiverBuilder The processor builder to create new instances of async clients. + * @param queueName The name of the queue this processor is associated with. + * @param topicName The name of the topic this processor is associated with. + * @param subscriptionName The name of the subscription this processor is associated with. * @param processMessage The message processing callback. * @param processError The error handler. * @param processorOptions Options to configure this instance of the processor. */ ServiceBusProcessorClient(ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder, + String queueName, String topicName, String subscriptionName, Consumer processMessage, Consumer processError, ServiceBusProcessorClientOptions processorOptions) { this.receiverBuilder = Objects.requireNonNull(receiverBuilder, "'receiverBuilder' cannot be null"); @@ -177,6 +191,9 @@ public final class ServiceBusProcessorClient implements AutoCloseable { this.asyncClient.set(receiverBuilder.buildAsyncClient()); this.sessionReceiverBuilder = null; this.tracerProvider = processorOptions.getTracerProvider(); + this.queueName = queueName; + this.topicName = topicName; + this.subscriptionName = subscriptionName; } /** @@ -254,6 +271,36 @@ public synchronized boolean isRunning() { return isRunning.get(); } + /** + * Returns the queue name associated with this instance of {@link ServiceBusProcessorClient}. + * + * @return the queue name associated with this instance of {@link ServiceBusProcessorClient} or {@code null} if + * the processor instance is for a topic and subscription. + */ + public String getQueueName() { + return this.queueName; + } + + /** + * Returns the topic name associated with this instance of {@link ServiceBusProcessorClient}. + * + * @return the topic name associated with this instance of {@link ServiceBusProcessorClient} or {@code null} if + * the processor instance is for a queue. + */ + public String getTopicName() { + return this.topicName; + } + + /** + * Returns the subscription name associated with this instance of {@link ServiceBusProcessorClient}. + * + * @return the subscription name associated with this instance of {@link ServiceBusProcessorClient} or {@code null} + * if the processor instance is for a queue. + */ + public String getSubscriptionName() { + return this.subscriptionName; + } + private synchronized void receiveMessages() { if (receiverSubscriptions.size() > 0) { // For the case of start -> stop -> start again diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java index f4613a83aaab9..7308bba9b746e 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java @@ -71,7 +71,8 @@ public void testReceivingMessagesWithProcessor() throws InterruptedException { AtomicInteger messageId = new AtomicInteger(); CountDownLatch countDownLatch = new CountDownLatch(5); - ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, + ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, "queue", + null, null, messageContext -> { assertEquals(String.valueOf(messageId.getAndIncrement()), messageContext.getMessage().getMessageId()); countDownLatch.countDown(); @@ -110,7 +111,8 @@ public void testReceivingMultiSessionMessagesWithProcessor() throws InterruptedE AtomicInteger messageId = new AtomicInteger(); CountDownLatch countDownLatch = new CountDownLatch(numberOfMessages); - ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, + ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, "queue", + null, null, messageContext -> { int expectedMessageId = messageId.getAndIncrement(); assertEquals(String.valueOf(expectedMessageId), messageContext.getMessage().getMessageId()); @@ -143,7 +145,8 @@ public void testStartStopResume() throws InterruptedException { countDownLatch.set(new CountDownLatch(2)); AtomicBoolean assertionFailed = new AtomicBoolean(); - ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, + ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, "queue", + null, null, messageContext -> { try { assertEquals(String.valueOf(messageId.getAndIncrement()), @@ -225,7 +228,8 @@ public void testErrorRecovery() throws InterruptedException { countDownLatch.set(new CountDownLatch(4)); AtomicBoolean assertionFailed = new AtomicBoolean(); StringBuffer messageIdNotMatched = new StringBuffer(); - ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, + ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, "queue", + null, null, messageContext -> { try { assertEquals(String.valueOf(messageId.getAndIncrement() % 2), @@ -280,7 +284,8 @@ public void testUserMessageHandlerError() throws InterruptedException { final AtomicInteger messageId = new AtomicInteger(); final CountDownLatch countDownLatch = new CountDownLatch(numberOfEvents); - ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, + ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, "queue", + null, null, messageContext -> { assertEquals(String.valueOf(messageId.getAndIncrement()), messageContext.getMessage().getMessageId()); throw new IllegalStateException(); // throw error from user handler @@ -331,7 +336,8 @@ public void testUserMessageHandlerErrorWithAutoCompleteDisabled() throws Interru AtomicInteger messageId = new AtomicInteger(); CountDownLatch countDownLatch = new CountDownLatch(5); - ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, + ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, "queue", + null, null, messageContext -> { assertEquals(String.valueOf(messageId.getAndIncrement()), messageContext.getMessage().getMessageId()); throw new IllegalStateException(); // throw error from user handler @@ -393,7 +399,8 @@ public void testProcessorWithTracingEnabled() throws InterruptedException { AtomicInteger messageId = new AtomicInteger(); CountDownLatch countDownLatch = new CountDownLatch(numberOfTimes); - ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, + ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, "queue", + null, null, messageContext -> { assertEquals(String.valueOf(messageId.getAndIncrement()), messageContext.getMessage().getMessageId()); countDownLatch.countDown(); @@ -448,7 +455,8 @@ public void testProcessorWithTracingEnabledWithoutDiagnosticId() throws Interrup AtomicInteger messageId = new AtomicInteger(); CountDownLatch countDownLatch = new CountDownLatch(numberOfTimes); - ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, + ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, "queue", + null, null, messageContext -> { assertEquals(String.valueOf(messageId.getAndIncrement()), messageContext.getMessage().getMessageId()); countDownLatch.countDown();