Skip to content

Commit

Permalink
Add getters to ServiceBusProcessorClient to get names of SB entity (#…
Browse files Browse the repository at this point in the history
…27697)

* Add getter methods to ServiceBusProcessorClient to get names of SB entity

* update changelog

* update javadocs and unit tests
  • Loading branch information
srnagar authored Mar 16, 2022
1 parent 0ddca11 commit bf64d42
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 9 deletions.
4 changes: 3 additions & 1 deletion sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
### Features Added
- Added support for sending/receiving messages with `Duration`, `OffsetDateTime` and `URI` in `applicationProperties`.
- Updated the receiver to retry to obtain a new connection if the RequestResponseChannel in the current connection is disposed.

- Added getter methods to `ServiceBusProcessorClient` to get the queue, topic and subscription names associated with
the processor.

### Breaking Changes

### Bugs Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1191,6 +1191,8 @@ public ServiceBusSessionProcessorClientBuilder disableAutoComplete() {
*/
public ServiceBusProcessorClient buildProcessorClient() {
return new ServiceBusProcessorClient(sessionReceiverClientBuilder,
sessionReceiverClientBuilder.queueName, sessionReceiverClientBuilder.topicName,
sessionReceiverClientBuilder.subscriptionName,
Objects.requireNonNull(processMessage, "'processMessage' cannot be null"),
Objects.requireNonNull(processError, "'processError' cannot be null"), processorClientOptions);
}
Expand Down Expand Up @@ -1686,6 +1688,8 @@ public ServiceBusProcessorClientBuilder disableAutoComplete() {
*/
public ServiceBusProcessorClient buildProcessorClient() {
return new ServiceBusProcessorClient(serviceBusReceiverClientBuilder,
serviceBusReceiverClientBuilder.queueName, serviceBusReceiverClientBuilder.topicName,
serviceBusReceiverClientBuilder.subscriptionName,
Objects.requireNonNull(processMessage, "'processMessage' cannot be null"),
Objects.requireNonNull(processError, "'processError' cannot be null"), processorClientOptions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,17 +135,24 @@ public final class ServiceBusProcessorClient implements AutoCloseable {
private final AtomicReference<ServiceBusReceiverAsyncClient> asyncClient = new AtomicReference<>();
private final AtomicBoolean isRunning = new AtomicBoolean();
private final TracerProvider tracerProvider;
private final String queueName;
private final String topicName;
private final String subscriptionName;
private ScheduledExecutorService scheduledExecutor;

/**
* Constructor to create a sessions-enabled processor.
*
* @param sessionReceiverBuilder The session processor builder to create new instances of async clients.
* @param queueName The name of the queue this processor is associated with.
* @param topicName The name of the topic this processor is associated with.
* @param subscriptionName The name of the subscription this processor is associated with.
* @param processMessage The message processing callback.
* @param processError The error handler.
* @param processorOptions Options to configure this instance of the processor.
*/
ServiceBusProcessorClient(ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder sessionReceiverBuilder,
String queueName, String topicName, String subscriptionName,
Consumer<ServiceBusReceivedMessageContext> processMessage,
Consumer<ServiceBusErrorContext> processError,
ServiceBusProcessorClientOptions processorOptions) {
Expand All @@ -157,17 +164,24 @@ public final class ServiceBusProcessorClient implements AutoCloseable {
this.asyncClient.set(sessionReceiverBuilder.buildAsyncClientForProcessor());
this.receiverBuilder = null;
this.tracerProvider = processorOptions.getTracerProvider();
this.queueName = queueName;
this.topicName = topicName;
this.subscriptionName = subscriptionName;
}

/**
* Constructor to create a processor.
*
* @param receiverBuilder The processor builder to create new instances of async clients.
* @param queueName The name of the queue this processor is associated with.
* @param topicName The name of the topic this processor is associated with.
* @param subscriptionName The name of the subscription this processor is associated with.
* @param processMessage The message processing callback.
* @param processError The error handler.
* @param processorOptions Options to configure this instance of the processor.
*/
ServiceBusProcessorClient(ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder,
String queueName, String topicName, String subscriptionName,
Consumer<ServiceBusReceivedMessageContext> processMessage,
Consumer<ServiceBusErrorContext> processError, ServiceBusProcessorClientOptions processorOptions) {
this.receiverBuilder = Objects.requireNonNull(receiverBuilder, "'receiverBuilder' cannot be null");
Expand All @@ -177,6 +191,9 @@ public final class ServiceBusProcessorClient implements AutoCloseable {
this.asyncClient.set(receiverBuilder.buildAsyncClient());
this.sessionReceiverBuilder = null;
this.tracerProvider = processorOptions.getTracerProvider();
this.queueName = queueName;
this.topicName = topicName;
this.subscriptionName = subscriptionName;
}

/**
Expand Down Expand Up @@ -254,6 +271,36 @@ public synchronized boolean isRunning() {
return isRunning.get();
}

/**
* Returns the queue name associated with this instance of {@link ServiceBusProcessorClient}.
*
* @return the queue name associated with this instance of {@link ServiceBusProcessorClient} or {@code null} if
* the processor instance is for a topic and subscription.
*/
public String getQueueName() {
return this.queueName;
}

/**
* Returns the topic name associated with this instance of {@link ServiceBusProcessorClient}.
*
* @return the topic name associated with this instance of {@link ServiceBusProcessorClient} or {@code null} if
* the processor instance is for a queue.
*/
public String getTopicName() {
return this.topicName;
}

/**
* Returns the subscription name associated with this instance of {@link ServiceBusProcessorClient}.
*
* @return the subscription name associated with this instance of {@link ServiceBusProcessorClient} or {@code null}
* if the processor instance is for a queue.
*/
public String getSubscriptionName() {
return this.subscriptionName;
}

private synchronized void receiveMessages() {
if (receiverSubscriptions.size() > 0) {
// For the case of start -> stop -> start again
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public void testReceivingMessagesWithProcessor() throws InterruptedException {

AtomicInteger messageId = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(5);
ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder,
ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, "queue",
null, null,
messageContext -> {
assertEquals(String.valueOf(messageId.getAndIncrement()), messageContext.getMessage().getMessageId());
countDownLatch.countDown();
Expand Down Expand Up @@ -110,7 +111,8 @@ public void testReceivingMultiSessionMessagesWithProcessor() throws InterruptedE

AtomicInteger messageId = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(numberOfMessages);
ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder,
ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, "queue",
null, null,
messageContext -> {
int expectedMessageId = messageId.getAndIncrement();
assertEquals(String.valueOf(expectedMessageId), messageContext.getMessage().getMessageId());
Expand Down Expand Up @@ -143,7 +145,8 @@ public void testStartStopResume() throws InterruptedException {
countDownLatch.set(new CountDownLatch(2));

AtomicBoolean assertionFailed = new AtomicBoolean();
ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder,
ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, "queue",
null, null,
messageContext -> {
try {
assertEquals(String.valueOf(messageId.getAndIncrement()),
Expand Down Expand Up @@ -225,7 +228,8 @@ public void testErrorRecovery() throws InterruptedException {
countDownLatch.set(new CountDownLatch(4));
AtomicBoolean assertionFailed = new AtomicBoolean();
StringBuffer messageIdNotMatched = new StringBuffer();
ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder,
ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, "queue",
null, null,
messageContext -> {
try {
assertEquals(String.valueOf(messageId.getAndIncrement() % 2),
Expand Down Expand Up @@ -280,7 +284,8 @@ public void testUserMessageHandlerError() throws InterruptedException {

final AtomicInteger messageId = new AtomicInteger();
final CountDownLatch countDownLatch = new CountDownLatch(numberOfEvents);
ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder,
ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, "queue",
null, null,
messageContext -> {
assertEquals(String.valueOf(messageId.getAndIncrement()), messageContext.getMessage().getMessageId());
throw new IllegalStateException(); // throw error from user handler
Expand Down Expand Up @@ -331,7 +336,8 @@ public void testUserMessageHandlerErrorWithAutoCompleteDisabled() throws Interru

AtomicInteger messageId = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(5);
ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder,
ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, "queue",
null, null,
messageContext -> {
assertEquals(String.valueOf(messageId.getAndIncrement()), messageContext.getMessage().getMessageId());
throw new IllegalStateException(); // throw error from user handler
Expand Down Expand Up @@ -393,7 +399,8 @@ public void testProcessorWithTracingEnabled() throws InterruptedException {

AtomicInteger messageId = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(numberOfTimes);
ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder,
ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, "queue",
null, null,
messageContext -> {
assertEquals(String.valueOf(messageId.getAndIncrement()), messageContext.getMessage().getMessageId());
countDownLatch.countDown();
Expand Down Expand Up @@ -448,7 +455,8 @@ public void testProcessorWithTracingEnabledWithoutDiagnosticId() throws Interrup

AtomicInteger messageId = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(numberOfTimes);
ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder,
ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, "queue",
null, null,
messageContext -> {
assertEquals(String.valueOf(messageId.getAndIncrement()), messageContext.getMessage().getMessageId());
countDownLatch.countDown();
Expand Down

0 comments on commit bf64d42

Please sign in to comment.