diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorSample.java index 7a2018e0ac724..afdaaa7653d74 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorSample.java @@ -3,6 +3,7 @@ package com.azure.messaging.servicebus; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -24,18 +25,40 @@ public static void main(String[] args) throws InterruptedException { System.out.println("Received message " + message.getBody().toString()); }; + final CountDownLatch countdownLatch = new CountDownLatch(1); + // Consumer that handles any errors that occur when receiving messages Consumer errorHandler = errorContext -> { - System.out.println("Error when receiving messages " + errorContext.getException().getMessage()); if (errorContext.getException() instanceof ServiceBusException) { - ServiceBusException serviceBusException = (ServiceBusException) errorContext.getException(); - System.out.printf("Error source %s, reason %s\n", serviceBusException.getErrorSource(), - serviceBusException.getReason()); + final ServiceBusException serviceBusException = (ServiceBusException) errorContext.getException(); + final ServiceBusFailureReason reason = serviceBusException.getReason(); + + if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED + || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND + || reason == ServiceBusFailureReason.UNAUTHORIZED) { + System.out.printf("An unrecoverable error occurred. Stopping processing with reason %s: %s\n", + reason, serviceBusException.getMessage()); + countdownLatch.countDown(); + } else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) { + System.out.printf("Message lock lost for message: %s", errorContext.getException().toString()); + } else if (reason == ServiceBusFailureReason.SERVICE_BUSY) { + try { + // choosing an arbitrary amount of time to wait. + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } else { + System.out.printf("Error source %s, reason %s, message: %s\n", serviceBusException.getErrorSource(), + reason, errorContext.getException().getMessage()); + } + } else { + System.out.printf("Exception: %s\n", errorContext.getException().toString()); } }; // Create an instance of the processor through the ServiceBusClientBuilder - ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder() + final ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder() .connectionString("<< connection-string >>") .processor() .queueName("<< queue name >>") @@ -46,16 +69,13 @@ public static void main(String[] args) throws InterruptedException { System.out.println("Starting the processor"); processorClient.start(); - TimeUnit.SECONDS.sleep(10); - System.out.println("Stopping the processor"); - processorClient.stop(); - - TimeUnit.SECONDS.sleep(10); - System.out.println("Resuming the processor"); - processorClient.start(); + System.out.println("Listening for 10 seconds..."); + if (countdownLatch.await(10, TimeUnit.SECONDS)) { + System.out.println("Closing processor due to fatal error"); + } else { + System.out.println("Closing processor"); + } - TimeUnit.SECONDS.sleep(10); - System.out.println("Closing the processor"); processorClient.close(); } }