commitTransaction(ServiceBusTransactionContext transactionCont
* // This mono creates a transaction and caches the output value, so we can associate operations with the
* // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
* // the operation.
- * Mono<ServiceBusTransactionContext> transactionContext = receiver.createTransaction()
+ * Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
* .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
* error -> Duration.ZERO,
* () -> Duration.ZERO);
*
- * transactionContext.flatMap(transaction -> {
+ * // Dispose of the disposable to cancel the operation.
+ * Disposable disposable = transactionContext.flatMap(transaction -> {
* // Process messages and associate operations with the transaction.
* Mono<Void> operations = Mono.when(
- * receiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
- * receiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
- * receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));
+ * asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
+ * asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
+ * asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));
*
* // Finally, either commit or rollback the transaction once all the operations are associated with it.
- * return operations.flatMap(transactionOperations -> receiver.commitTransaction(transaction));
+ * return operations.then(asyncReceiver.commitTransaction(transaction));
+ * }).subscribe(unused -> {
+ * }, error -> {
+ * System.err.println("Error occurred processing transaction: " + error);
+ * }, () -> {
+ * System.out.println("Completed transaction");
* });
*
*
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java
index 6e9d4296030a1..ab1fe454f6783 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java
@@ -35,15 +35,23 @@
* Create an instance of receiver
*
*
- * // The required parameters is connectionString, a way to authenticate with Service Bus using credentials.
- * // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below.
- * // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
+ * TokenCredential credential = new DefaultAzureCredentialBuilder().build();
+ *
+ * // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ * // 'disableAutoComplete' indicates that users will explicitly settle their message.
* ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
- * .connectionString(connectionString)
+ * .credential(fullyQualifiedNamespace, credential)
* .receiver()
- * .queueName(queueName)
+ * .disableAutoComplete()
+ * .topicName(topicName)
+ * .subscriptionName(subscriptionName)
* .buildClient();
*
+ * receiver.receiveMessages(3, Duration.ofSeconds(5))
+ * .forEach(message -> {
+ * System.out.println("Message: " + message.getBody());
+ * });
+ *
* // Use the receiver and finally close it.
* receiver.close();
*
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusRuleManagerAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusRuleManagerAsyncClient.java
index 931c279256516..3e99182656299 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusRuleManagerAsyncClient.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusRuleManagerAsyncClient.java
@@ -34,9 +34,11 @@
* // The required parameters is connectionString, a way to authenticate with Service Bus using credentials.
* // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below.
* // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
+ * TokenCredential credential = new DefaultAzureCredentialBuilder().build();
*
+ * // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
* ServiceBusRuleManagerAsyncClient ruleManager = new ServiceBusClientBuilder()
- * .connectionString(connectionString)
+ * .credential(fullyQualifiedNamespace, credential)
* .ruleManager()
* .topicName(topicName)
* .subscriptionName(subscriptionName)
@@ -49,17 +51,27 @@
*
* RuleFilter trueRuleFilter = new TrueRuleFilter();
* CreateRuleOptions options = new CreateRuleOptions(trueRuleFilter);
+ *
+ * // `subscribe` is a non-blocking call. After setting up the create rule operation, it will move onto the next
+ * // line of code to execute.
+ * // Consider using Mono.usingWhen to scope the creation, usage, and cleanup of the rule manager.
* ruleManager.createRule("new-rule", options).subscribe(
- * unused -> { },
+ * unused -> {
+ * },
* err -> System.err.println("Error occurred when create a rule, err: " + err),
* () -> System.out.println("Create complete.")
* );
+ *
+ * // Finally dispose of the rule manager when done using it.
+ * ruleManager.close();
*
*
*
* Fetch all rules.
*
*
+ * // `subscribe` is a non-blocking call. After setting up the list rules operation, it will move onto the next
+ * // line of code to execute.
* ruleManager.listRules().subscribe(ruleProperties -> System.out.println(ruleProperties.getName()));
*
*
@@ -67,6 +79,8 @@
* Delete a rule.
*
*
+ * // `subscribe` is a non-blocking call. After setting up the delete rule operation, it will move onto the next
+ * // line of code to execute.
* ruleManager.deleteRule("exist-rule").subscribe(
* unused -> { },
* err -> System.err.println("Error occurred when delete rule, err: " + err),
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusRuleManagerClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusRuleManagerClient.java
index c1f730b38c4d1..880a6683633f0 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusRuleManagerClient.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusRuleManagerClient.java
@@ -22,9 +22,22 @@
* Create a rule to a Service Bus subscription
*
*
+ * TokenCredential credential = new DefaultAzureCredentialBuilder().build();
+ *
+ * // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ * ServiceBusRuleManagerClient ruleManager = new ServiceBusClientBuilder()
+ * .credential(fullyQualifiedNamespace, credential)
+ * .ruleManager()
+ * .topicName(topicName)
+ * .subscriptionName(subscriptionName)
+ * .buildClient();
+ *
* RuleFilter trueRuleFilter = new TrueRuleFilter();
* CreateRuleOptions options = new CreateRuleOptions(trueRuleFilter);
* ruleManager.createRule("new-rule", options);
+ *
+ * // Dispose of the ruleManager when finished using it.
+ * ruleManager.close();
*
*
*/
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java
index 00a038e8c667d..ccc5e2e8168ea 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java
@@ -56,52 +56,35 @@
* Create an instance of sender
*
*
- * // The required parameters is connectionString, a way to authenticate with Service Bus using credentials.
- * // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below.
- * // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
- * ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder()
- * .connectionString(connectionString)
+ * TokenCredential credential = new DefaultAzureCredentialBuilder().build();
+ *
+ * // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ * ServiceBusSenderAsyncClient asyncSender = new ServiceBusClientBuilder()
+ * .credential(fullyQualifiedNamespace, credential)
* .sender()
* .queueName(queueName)
* .buildAsyncClient();
- *
- *
*
- * Create an instance of sender using default credential
- *
- *
- * // The required parameter is a way to authenticate with Service Bus using credentials.
- * // The connectionString provides a way to authenticate with Service Bus.
- * ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder()
- * .credential("<<fully-qualified-namespace>>",
- * new DefaultAzureCredentialBuilder().build())
- * .sender()
- * .queueName("<< QUEUE NAME >>")
- * .buildAsyncClient();
+ * // Use the sender and finally close it.
+ * asyncSender.close();
*
- *
+ *
*
* Send messages to a Service Bus resource
*
*
- * // The required parameters is connectionString, a way to authenticate with Service Bus using credentials.
- * // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below.
- * // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
- * ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder()
- * .connectionString(connectionString)
- * .sender()
- * .queueName(queueName)
- * .buildAsyncClient();
- *
- * // Creating a batch without options set, will allow for automatic routing of events to any partition.
- * sender.createMessageBatch().flatMap(batch -> {
- * batch.tryAddMessage(new ServiceBusMessage(BinaryData.fromBytes("test-1".getBytes(UTF_8))));
- * batch.tryAddMessage(new ServiceBusMessage(BinaryData.fromBytes("test-2".getBytes(UTF_8))));
- * return sender.sendMessages(batch);
+ * // `subscribe` is a non-blocking call. The program will move onto the next line of code when it starts the
+ * // operation. Users should use the callbacks on `subscribe` to understand the status of the send operation.
+ * asyncSender.createMessageBatch().flatMap(batch -> {
+ * batch.tryAddMessage(new ServiceBusMessage("test-1"));
+ * batch.tryAddMessage(new ServiceBusMessage("test-2"));
+ * return asyncSender.sendMessages(batch);
* }).subscribe(unused -> {
- * },
- * error -> System.err.println("Error occurred while sending batch:" + error),
- * () -> System.out.println("Send complete."));
+ * }, error -> {
+ * System.err.println("Error occurred while sending batch:" + error);
+ * }, () -> {
+ * System.out.println("Send complete.");
+ * });
*
*
*
@@ -114,36 +97,68 @@
* // In this case, all the batches created with these options are limited to 256 bytes.
* CreateMessageBatchOptions options = new CreateMessageBatchOptions()
* .setMaximumSizeInBytes(256);
- * AtomicReference<ServiceBusMessageBatch> currentBatch = new AtomicReference<>(
- * sender.createMessageBatch(options).block());
+ * AtomicReference<ServiceBusMessageBatch> currentBatch = new AtomicReference<>();
*
- * // The sample Flux contains two messages, but it could be an infinite stream of telemetry messages.
- * telemetryMessages.flatMap(message -> {
+ * // Sends the current batch if it is not null and not empty. If the current batch is null, sets it.
+ * // Returns the batch to work with.
+ * Mono<ServiceBusMessageBatch> sendBatchAndGetCurrentBatchOperation = Mono.defer(() -> {
* ServiceBusMessageBatch batch = currentBatch.get();
- * if (batch.tryAddMessage(message)) {
- * return Mono.empty();
- * }
*
- * return Mono.when(
- * sender.sendMessages(batch),
- * sender.createMessageBatch(options).map(newBatch -> {
- * currentBatch.set(newBatch);
+ * if (batch == null) {
+ * return asyncSender.createMessageBatch(options);
+ * }
*
- * // Add the message that did not fit in the previous batch.
- * if (!newBatch.tryAddMessage(message)) {
- * throw Exceptions.propagate(new IllegalArgumentException(
- * "Message was too large to fit in an empty batch. Max size: " + newBatch.getMaxSizeInBytes()));
- * }
+ * if (batch.getCount() > 0) {
+ * return asyncSender.sendMessages(batch).then(
+ * asyncSender.createMessageBatch(options)
+ * .handle((ServiceBusMessageBatch newBatch, SynchronousSink<ServiceBusMessageBatch> sink) -> {
+ * // Expect that the batch we just sent is the current one. If it is not, there's a race
+ * // condition accessing currentBatch reference.
+ * if (!currentBatch.compareAndSet(batch, newBatch)) {
+ * sink.error(new IllegalStateException(
+ * "Expected that the object in currentBatch was batch. But it is not."));
+ * } else {
+ * sink.next(newBatch);
+ * }
+ * }));
+ * } else {
+ * return Mono.just(batch);
+ * }
+ * });
*
- * return newBatch;
- * }));
- * }).then()
- * .doFinally(signal -> {
- * ServiceBusMessageBatch batch = currentBatch.getAndSet(null);
- * if (batch != null && batch.getCount() > 0) {
- * sender.sendMessages(batch).block();
+ * // The sample Flux contains two messages, but it could be an infinite stream of telemetry messages.
+ * Flux<Void> sendMessagesOperation = telemetryMessages.flatMap(message -> {
+ * return sendBatchAndGetCurrentBatchOperation.flatMap(batch -> {
+ * if (batch.tryAddMessage(message)) {
+ * return Mono.empty();
+ * } else {
+ * return sendBatchAndGetCurrentBatchOperation
+ * .handle((ServiceBusMessageBatch newBatch, SynchronousSink<Void> sink) -> {
+ * if (!newBatch.tryAddMessage(message)) {
+ * sink.error(new IllegalArgumentException(
+ * "Message is too large to fit in an empty batch."));
+ * } else {
+ * sink.complete();
+ * }
+ * });
* }
* });
+ * });
+ *
+ * // `subscribe` is a non-blocking call. The program will move onto the next line of code when it starts the
+ * // operation. Users should use the callbacks on `subscribe` to understand the status of the send operation.
+ * Disposable disposable = sendMessagesOperation.then(sendBatchAndGetCurrentBatchOperation)
+ * .subscribe(batch -> {
+ * System.out.println("Last batch should be empty: " + batch.getCount());
+ * }, error -> {
+ * System.err.println("Error sending telemetry messages: " + error);
+ * }, () -> {
+ * System.out.println("Completed.");
+ *
+ * // Clean up sender when done using it. Publishers should be long-lived objects.
+ * asyncSender.close();
+ * });
+ *
*
*
*
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderClient.java
index 0f3c8ecfd740d..5ff6e7be7824e 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderClient.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderClient.java
@@ -18,39 +18,56 @@
* Create an instance of sender
*
*
- * // The required parameters is connectionString, a way to authenticate with Service Bus using credentials.
- * // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below.
- * // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
+ * TokenCredential credential = new DefaultAzureCredentialBuilder().build();
+ *
+ * // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
* ServiceBusSenderClient sender = new ServiceBusClientBuilder()
- * .connectionString(connectionString)
+ * .credential(fullyQualifiedNamespace, credential)
* .sender()
* .queueName(queueName)
* .buildClient();
+ *
+ * sender.sendMessage(new ServiceBusMessage("Foo bar"));
*
*
*
* Send messages to a Service Bus resource
*
*
- * List<ServiceBusMessage> messages = Arrays.asList(new ServiceBusMessage(BinaryData.fromBytes("test-1".getBytes(UTF_8))),
- * new ServiceBusMessage(BinaryData.fromBytes("test-2".getBytes(UTF_8))));
- *
- * CreateMessageBatchOptions options = new CreateMessageBatchOptions().setMaximumSizeInBytes(10 * 1024);
+ * List<ServiceBusMessage> messages = Arrays.asList(
+ * new ServiceBusMessage("test-1"),
+ * new ServiceBusMessage("test-2"));
*
* // Creating a batch without options set.
- * ServiceBusMessageBatch batch = sender.createMessageBatch(options);
+ * ServiceBusMessageBatch batch = sender.createMessageBatch();
* for (ServiceBusMessage message : messages) {
* if (batch.tryAddMessage(message)) {
* continue;
* }
*
+ * // The batch is full. Send the current batch and create a new one.
* sender.sendMessages(batch);
+ *
+ * batch = sender.createMessageBatch();
+ *
+ * // Add the message we couldn't before.
+ * if (!batch.tryAddMessage(message)) {
+ * throw new IllegalArgumentException("Message is too large for an empty batch.");
+ * }
* }
+ *
+ * // Send the final batch if there are any messages in it.
+ * if (batch.getCount() > 0) {
+ * sender.sendMessages(batch);
+ * }
+ *
+ * // Finally dispose of the sender.
+ * sender.close();
*
*
*
* Send messages using a size-limited {@link ServiceBusMessageBatch}
- *
+ *
*
* List<ServiceBusMessage> telemetryMessages = Arrays.asList(firstMessage, secondMessage, thirdMessage);
*
@@ -74,8 +91,16 @@
* }
* }
* }
+ *
+ * // Send the final batch if there are any messages in it.
+ * if (currentBatch.getCount() > 0) {
+ * sender.sendMessages(currentBatch);
+ * }
+ *
+ * // Dispose of the sender
+ * sender.close();
*
- *
+ *
*
* @see ServiceBusClientBuilder#sender()
* @see ServiceBusSenderAsyncClient To communicate with a Service Bus resource using an asynchronous client.
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java
index aa744ce51e3cf..c8fa4eba6ca33 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java
@@ -31,29 +31,40 @@
* Use {@link #acceptSession(String)} to acquire the lock of a session if you know the session id.
*
*
- * // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below.
- * // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
+ * TokenCredential credential = new DefaultAzureCredentialBuilder().build();
+ *
+ * // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ * // 'disableAutoComplete' indicates that users will explicitly settle their message.
* ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
- * .connectionString(connectionString)
+ * .credential(fullyQualifiedNamespace, credential)
* .sessionReceiver()
- * .queueName(queueName)
+ * .disableAutoComplete()
+ * .queueName(sessionEnabledQueueName)
* .buildAsyncClient();
*
- * // acceptSession(String) completes successfully with a receiver when "<< my-session-id >>" session is
+ * // acceptSession(String) completes successfully with a receiver when "<<my-session-id>>" session is
* // successfully locked.
- * // `Flux.usingWhen` is used so we dispose of the receiver resource after `receiveMessages()` completes.
+ * // `Flux.usingWhen` is used, so we dispose of the receiver resource after `receiveMessages()` completes.
* // `Mono.usingWhen` can also be used if the resource closure only returns a single item.
* Flux<ServiceBusReceivedMessage> sessionMessages = Flux.usingWhen(
- * sessionReceiver.acceptSession("<< my-session-id >>"),
- * receiver -> receiver.receiveMessages(),
- * receiver -> Mono.fromRunnable(() -> receiver.close()));
+ * sessionReceiver.acceptSession("<<my-session-id>>"),
+ * receiver -> {
+ * // Receive messages from <<my-session-id>> session.
+ * return receiver.receiveMessages();
+ * },
+ * receiver -> Mono.fromRunnable(() -> {
+ * // Dispose of
+ * receiver.close();
+ * sessionReceiver.close();
+ * }));
*
* // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
* // is non-blocking and kicks off the operation.
* Disposable subscription = sessionMessages.subscribe(
* message -> System.out.printf("Received Sequence #: %s. Contents: %s%n",
* message.getSequenceNumber(), message.getBody()),
- * error -> System.err.print(error));
+ * error -> System.err.print(error),
+ * () -> System.out.println("Completed receiving from session."));
*
*
*
@@ -62,28 +73,32 @@
* id.
*
*
- * // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below.
- * // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
+ * TokenCredential credential = new DefaultAzureCredentialBuilder().build();
+ *
+ * // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ * // 'disableAutoComplete' indicates that users will explicitly settle their message.
* ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
- * .connectionString(connectionString)
+ * .credential(fullyQualifiedNamespace, credential)
* .sessionReceiver()
- * .queueName(queueName)
+ * .disableAutoComplete()
+ * .queueName(sessionEnabledQueueName)
* .buildAsyncClient();
*
- * // acceptNextSession() completes successfully with a receiver when it acquires the next available session.
- * // `Flux.usingWhen` is used so we dispose of the receiver resource after `receiveMessages()` completes.
- * // `Mono.usingWhen` can also be used if the resource closure only returns a single item.
- * Flux<ServiceBusReceivedMessage> sessionMessages = Flux.usingWhen(
- * sessionReceiver.acceptNextSession(),
- * receiver -> receiver.receiveMessages(),
- * receiver -> Mono.fromRunnable(() -> receiver.close()));
+ * // Creates a client to receive messages from the first available session. It waits until
+ * // AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
+ * // completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
+ * Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();
*
- * // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
- * // is non-blocking and kicks off the operation.
- * Disposable subscription = sessionMessages.subscribe(
- * message -> System.out.printf("Received Sequence #: %s. Contents: %s%n",
- * message.getSequenceNumber(), message.getBody()),
- * error -> System.err.print(error));
+ * Disposable disposable = Flux.usingWhen(receiverMono,
+ * receiver -> receiver.receiveMessages(),
+ * receiver -> Mono.fromRunnable(() -> {
+ * // Dispose of the receiver and sessionReceiver when done receiving messages.
+ * receiver.close();
+ * sessionReceiver.close();
+ * }))
+ * .subscribe(message -> {
+ * System.out.println("Received message: " + message.getBody());
+ * });
*
*
*/
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverClient.java
index ff94643f82485..6ebb89bedd679 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverClient.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverClient.java
@@ -20,14 +20,17 @@
* Use {@link #acceptSession(String)} to acquire the lock of a session if you know the session id.
*
*
- * // The connectionString/sessionQueueName must be set by the application. The 'connectionString' format is shown below.
- * // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
+ * TokenCredential credential = new DefaultAzureCredentialBuilder().build();
+ *
+ * // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ * // 'disableAutoComplete' indicates that users will explicitly settle their message.
* ServiceBusSessionReceiverClient sessionReceiver = new ServiceBusClientBuilder()
- * .connectionString(connectionString)
+ * .credential(fullyQualifiedNamespace, credential)
* .sessionReceiver()
- * .queueName(sessionQueueName)
+ * .queueName(sessionEnabledQueueName)
+ * .disableAutoComplete()
* .buildClient();
- * ServiceBusReceiverClient receiver = sessionReceiver.acceptSession("<< my-session-id >>");
+ * ServiceBusReceiverClient receiver = sessionReceiver.acceptSession("<<my-session-id>>");
*
* // Use the receiver and finally close it along with the sessionReceiver.
* receiver.close();
@@ -40,19 +43,35 @@
* id.
*
*
- * // The connectionString/sessionQueueName must be set by the application. The 'connectionString' format is shown below.
- * // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
+ * TokenCredential credential = new DefaultAzureCredentialBuilder().build();
+ *
+ * // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ * // 'disableAutoComplete' indicates that users will explicitly settle their message.
* ServiceBusSessionReceiverClient sessionReceiver = new ServiceBusClientBuilder()
- * .connectionString(
- * "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}")
+ * .credential(fullyQualifiedNamespace, credential)
* .sessionReceiver()
- * .queueName("<< QUEUE NAME >>")
+ * .disableAutoComplete()
+ * .queueName(sessionEnabledQueueName)
* .buildClient();
+ *
+ * // Creates a client to receive messages from the first available session. It waits until
+ * // AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
+ * // throws a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
* ServiceBusReceiverClient receiver = sessionReceiver.acceptNextSession();
*
* // Use the receiver and finally close it along with the sessionReceiver.
- * receiver.close();
- * sessionReceiver.close();
+ * try {
+ * IterableStream<ServiceBusReceivedMessage> receivedMessages =
+ * receiver.receiveMessages(10, Duration.ofSeconds(30));
+ *
+ * for (ServiceBusReceivedMessage message : receivedMessages) {
+ * System.out.println("Body: " + message);
+ * }
+ * } finally {
+ * receiver.close();
+ * sessionReceiver.close();
+ * }
+ *
*
*
*/
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusTransactionContext.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusTransactionContext.java
index 3e263b6afd7e5..421009d5b0369 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusTransactionContext.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusTransactionContext.java
@@ -21,20 +21,26 @@
* // This mono creates a transaction and caches the output value, so we can associate operations with the
* // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
* // the operation.
- * Mono<ServiceBusTransactionContext> transactionContext = receiver.createTransaction()
+ * Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
* .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
* error -> Duration.ZERO,
* () -> Duration.ZERO);
*
- * transactionContext.flatMap(transaction -> {
+ * // Dispose of the disposable to cancel the operation.
+ * Disposable disposable = transactionContext.flatMap(transaction -> {
* // Process messages and associate operations with the transaction.
* Mono<Void> operations = Mono.when(
- * receiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
- * receiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
- * receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));
+ * asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
+ * asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
+ * asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));
*
* // Finally, either commit or rollback the transaction once all the operations are associated with it.
- * return operations.flatMap(transactionOperations -> receiver.commitTransaction(transaction));
+ * return operations.then(asyncReceiver.commitTransaction(transaction));
+ * }).subscribe(unused -> {
+ * }, error -> {
+ * System.err.println("Error occurred processing transaction: " + error);
+ * }, () -> {
+ * System.out.println("Completed transaction");
* });
*
*
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationClientBuilder.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationClientBuilder.java
index 3140825742c7b..90078d5a16a7b 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationClientBuilder.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationClientBuilder.java
@@ -63,13 +63,15 @@
* Create the sync client using a connection string
*
*
- * // Retrieve 'connectionString' from your configuration.
- *
* HttpLogOptions logOptions = new HttpLogOptions()
* .setLogLevel(HttpLogDetailLevel.HEADERS);
*
+ * // DefaultAzureCredential creates a credential based on the environment it is executed in.
+ * TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
+ *
+ * // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
* ServiceBusAdministrationClient client = new ServiceBusAdministrationClientBuilder()
- * .connectionString(connectionString)
+ * .credential(fullyQualifiedNamespace, tokenCredential)
* .httpLogOptions(logOptions)
* .buildClient();
*
@@ -81,9 +83,9 @@
* // DefaultAzureCredential creates a credential based on the environment it is executed in.
* TokenCredential credential = new DefaultAzureCredentialBuilder().build();
*
+ * // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
* ServiceBusAdministrationAsyncClient client = new ServiceBusAdministrationClientBuilder()
- * .connectionString("<< Service Bus NAMESPACE connection string>>")
- * .credential("<< my-sb-namespace.servicebus.windows.net >>", credential)
+ * .credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
* .buildAsyncClient();
*
*
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java
deleted file mode 100644
index 8084db52096f5..0000000000000
--- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java
+++ /dev/null
@@ -1,335 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.messaging.servicebus;
-
-import com.azure.core.credential.AzureNamedKeyCredential;
-import com.azure.core.credential.TokenCredential;
-import com.azure.core.util.IterableStream;
-import com.azure.identity.DefaultAzureCredentialBuilder;
-import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
-import com.azure.messaging.servicebus.models.SubQueue;
-import reactor.core.Disposable;
-import reactor.core.publisher.Mono;
-
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.List;
-import java.util.function.Consumer;
-
-/**
- * Class containing code snippets that will be injected to README.md.
- */
-public class ReadmeSamples {
- /**
- * Code sample for creating an asynchronous Service Bus sender.
- */
- public void createAsynchronousServiceBusSender() {
- // BEGIN: readme-sample-createAsynchronousServiceBusSender
- ServiceBusSenderClient sender = new ServiceBusClientBuilder()
- .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
- .sender()
- .queueName("<< QUEUE NAME >>")
- .buildClient();
- // END: readme-sample-createAsynchronousServiceBusSender
- }
-
- /**
- * Code sample for creating an asynchronous Service Bus receiver.
- */
- public void createAsynchronousServiceBusReceiver() {
- // BEGIN: readme-sample-createAsynchronousServiceBusReceiver
- ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
- .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
- .receiver()
- .topicName("<< TOPIC NAME >>")
- .subscriptionName("<< SUBSCRIPTION NAME >>")
- .buildAsyncClient();
- // END: readme-sample-createAsynchronousServiceBusReceiver
- }
-
- /**
- * Code sample for creating an asynchronous Service Bus receiver using {@link DefaultAzureCredentialBuilder}.
- */
- public void createAsynchronousServiceBusReceiverWithAzureIdentity() {
- // BEGIN: readme-sample-createAsynchronousServiceBusReceiverWithAzureIdentity
- TokenCredential credential = new DefaultAzureCredentialBuilder()
- .build();
- ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
- .credential("<>", credential)
- .receiver()
- .queueName("<>")
- .buildAsyncClient();
- // END: readme-sample-createAsynchronousServiceBusReceiverWithAzureIdentity
- }
-
- /**
- * Code sample for creating an asynchronous Service Bus receiver using {@link AzureNamedKeyCredential}.
- */
- public void createAsynchronousServiceBusReceiverWithAzureNamedKeyCredential() {
- // BEGIN: readme-sample-createAsynchronousServiceBusReceiverWithAzureNamedKeyCredential
- AzureNamedKeyCredential azureNamedKeyCredential =
- new AzureNamedKeyCredential("<>", "<>");
- ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
- .fullyQualifiedNamespace("<>")
- .credential(azureNamedKeyCredential)
- .receiver()
- .queueName("<>")
- .buildAsyncClient();
- // END: readme-sample-createAsynchronousServiceBusReceiverWithAzureNamedKeyCredential
- }
-
- /**
- * Sends messages to a queue.
- */
- public void sendMessage() {
- // BEGIN: readme-sample-sendMessage
- ServiceBusSenderClient sender = new ServiceBusClientBuilder()
- .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
- .sender()
- .queueName("<< QUEUE NAME >>")
- .buildClient();
- List messages = Arrays.asList(
- new ServiceBusMessage("Hello world").setMessageId("1"),
- new ServiceBusMessage("Bonjour").setMessageId("2"));
-
- sender.sendMessages(messages);
-
- // When you are done using the sender, dispose of it.
- sender.close();
- // END: readme-sample-sendMessage
- }
-
- /**
- * Receives messages from a topic and subscription.
- */
- public void receiveMessages() {
- ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
- .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
- .receiver()
- .topicName("<< TOPIC NAME >>")
- .subscriptionName("<< SUBSCRIPTION NAME >>")
- .buildClient();
-
- // Receives a batch of messages when 10 messages are received or until 30 seconds have elapsed, whichever
- // happens first.
- IterableStream messages = receiver.receiveMessages(10, Duration.ofSeconds(30));
- messages.forEach(message -> {
-
- System.out.printf("Id: %s. Contents: %s%n", message.getMessageId(), message.getBody());
- });
-
- // When you are done using the receiver, dispose of it.
- receiver.close();
- }
-
- /**
- * Receives messages asynchronously.
- */
- public void receiveMessagesAsync() {
- ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
- .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
- .receiver()
- .queueName("<< QUEUE NAME >>")
- .buildAsyncClient();
-
- // receive() operation continuously fetches messages until the subscription is disposed.
- // The stream is infinite, and completes when the subscription or receiver is closed.
- Disposable subscription = receiver.receiveMessages().subscribe(message -> {
-
- System.out.printf("Id: %s%n", message.getMessageId());
- System.out.printf("Contents: %s%n", message.getBody().toString());
- }, error -> System.err.println("Error occurred while receiving messages: " + error),
- () -> System.out.println("Finished receiving messages."));
-
- // Continue application processing. When you are finished receiving messages, dispose of the subscription.
- subscription.dispose();
-
- // When you are done using the receiver, dispose of it.
- receiver.close();
- }
-
- /**
- * Complete a message.
- */
- public void completeMessage() {
-
- ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
- .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
- .receiver()
- .topicName("<< TOPIC NAME >>")
- .subscriptionName("<< SUBSCRIPTION NAME >>")
- .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
- .buildClient();
-
- // This fetches a batch of 10 messages or until the default operation timeout has elapsed.
- receiver.receiveMessages(10).forEach(message -> {
- // Process message and then complete it.
- System.out.println("Completing message " + message.getLockToken());
-
- receiver.complete(message);
- });
- }
-
- /**
- * Create a session message.
- */
- public void createSessionMessage() {
- ServiceBusSenderClient sender = new ServiceBusClientBuilder()
- .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
- .sender()
- .queueName("<< QUEUE NAME >>")
- .buildClient();
-
- // BEGIN: readme-sample-createSessionMessage
- // Setting sessionId publishes that message to a specific session, in this case, "greeting".
- ServiceBusMessage message = new ServiceBusMessage("Hello world")
- .setSessionId("greetings");
-
- sender.sendMessage(message);
- // END: readme-sample-createSessionMessage
- }
-
- /**
- * Create session receiver for "greetings"
- */
- public void namedSessionReceiver() {
- // Creates a session-enabled receiver that gets messages from the session "greetings".
- ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
- .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
- .sessionReceiver()
- .queueName("<< QUEUE NAME >>")
- .buildAsyncClient();
- Mono receiverAsyncClient = sessionReceiver.acceptSession("greetings");
- }
-
- /**
- * Create session receiver for the first available session.
- */
- public void unnamedSessionReceiver() {
- // Creates a session-enabled receiver that gets messages from the first available session.
- ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
- .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
- .sessionReceiver()
- .queueName("<< QUEUE NAME >>")
- .buildAsyncClient();
- Mono receiverAsyncClient = sessionReceiver.acceptNextSession();
- }
-
- /**
- * Code sample for creating a synchronous Service Bus receiver to read message from dead-letter queue.
- */
- public void createSynchronousServiceBusDeadLetterQueueReceiver() {
- // BEGIN: readme-sample-createSynchronousServiceBusDeadLetterQueueReceiver
- ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
- .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
- .receiver() // Use this for session or non-session enabled queue or topic/subscriptions
- .topicName("<< TOPIC NAME >>")
- .subscriptionName("<< SUBSCRIPTION NAME >>")
- .subQueue(SubQueue.DEAD_LETTER_QUEUE)
- .buildClient();
- // END: readme-sample-createSynchronousServiceBusDeadLetterQueueReceiver
- }
-
- /**
- * Code sample for creating a Service Bus Processor Client to receive in PeekLock mode.
- */
- public void createServiceBusProcessorClientInPeekLockMode() {
- // BEGIN: readme-sample-createServiceBusProcessorClientInPeekLockMode
- // Sample code that processes a single message which is received in PeekLock mode.
- Consumer processMessage = context -> {
- final ServiceBusReceivedMessage message = context.getMessage();
- // Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
- // handling message reaches desired state such that it doesn't require Service Bus to redeliver
- // the same message, then context.complete() should be called otherwise context.abandon().
- final boolean success = Math.random() < 0.5;
- if (success) {
- try {
- context.complete();
- } catch (Exception completionError) {
- System.out.printf("Completion of the message %s failed\n", message.getMessageId());
- completionError.printStackTrace();
- }
- } else {
- try {
- context.abandon();
- } catch (Exception abandonError) {
- System.out.printf("Abandoning of the message %s failed\n", message.getMessageId());
- abandonError.printStackTrace();
- }
- }
- };
-
- // Sample code that gets called if there's an error
- Consumer processError = errorContext -> {
- System.err.println("Error occurred while receiving message: " + errorContext.getException());
- };
-
- // create the processor client via the builder and its sub-builder
- ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
- .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
- .processor()
- .queueName("<< QUEUE NAME >>")
- .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
- .disableAutoComplete() // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
- .processMessage(processMessage)
- .processError(processError)
- .disableAutoComplete()
- .buildProcessorClient();
-
- // Starts the processor in the background and returns immediately
- processorClient.start();
- // END: readme-sample-createServiceBusProcessorClientInPeekLockMode
- }
-
- /**
- * Code sample for creating a Service Bus Processor Client to receive in ReceiveAndDelete mode.
- */
- public void createServiceBusProcessorClientInReceiveAndDelete() {
- // BEGIN: readme-sample-createServiceBusProcessorClientInReceiveAndDeleteMode
- // Sample code that processes a single message which is received in ReceiveAndDelete mode.
- Consumer processMessage = context -> {
- final ServiceBusReceivedMessage message = context.getMessage();
- System.out.printf("handler processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(),
- message.getSequenceNumber(), message.getBody());
- };
-
- // Sample code that gets called if there's an error
- Consumer processError = errorContext -> {
- System.err.println("Error occurred while receiving message: " + errorContext.getException());
- };
-
- // create the processor client via the builder and its sub-builder
- ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
- .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
- .processor()
- .queueName("<< QUEUE NAME >>")
- .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
- .processMessage(processMessage)
- .processError(processError)
- .disableAutoComplete()
- .buildProcessorClient();
-
- // Starts the processor in the background and returns immediately
- processorClient.start();
- // END: readme-sample-createServiceBusProcessorClientInReceiveAndDeleteMode
- }
-
- public void connectionSharingAcrossClients() {
- // BEGIN: readme-sample-connectionSharingAcrossClients
- // Create shared builder.
- ServiceBusClientBuilder sharedConnectionBuilder = new ServiceBusClientBuilder()
- .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>");
- // Create receiver and sender which will share the connection.
- ServiceBusReceiverClient receiver = sharedConnectionBuilder
- .receiver()
- .queueName("<< QUEUE NAME >>")
- .buildClient();
- ServiceBusSenderClient sender = sharedConnectionBuilder
- .sender()
- .queueName("<< QUEUE NAME >>")
- .buildClient();
- // END: readme-sample-connectionSharingAcrossClients
- }
-
-}
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusAdministrationClientJavaDocCodeSamples.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusAdministrationClientJavaDocCodeSamples.java
index 486c4397de1eb..71f0430a54caf 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusAdministrationClientJavaDocCodeSamples.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusAdministrationClientJavaDocCodeSamples.java
@@ -21,7 +21,11 @@
* @see ServiceBusAdministrationClient
*/
public class ServiceBusAdministrationClientJavaDocCodeSamples {
- private static String connectionString = System.getenv("AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING");
+ /**
+ * Fully qualified namespace is the host name of the Service Bus resource. It can be found by navigating to the
+ * Service Bus namespace and looking in the "Essentials" panel.
+ */
+ private final String fullyQualifiedNamespace = System.getenv("AZURE_SERVICEBUS_FULLY_QUALIFIED_DOMAIN_NAME");
/**
* Creates {@link ServiceBusAdministrationClient} with a connection string.
@@ -29,13 +33,15 @@ public class ServiceBusAdministrationClientJavaDocCodeSamples {
@Test
public void instantiate() {
// BEGIN: com.azure.messaging.servicebus.administration.servicebusadministrationclient.instantiation
- // Retrieve 'connectionString' from your configuration.
-
HttpLogOptions logOptions = new HttpLogOptions()
.setLogLevel(HttpLogDetailLevel.HEADERS);
+ // DefaultAzureCredential creates a credential based on the environment it is executed in.
+ TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
+
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusAdministrationClient client = new ServiceBusAdministrationClientBuilder()
- .connectionString(connectionString)
+ .credential(fullyQualifiedNamespace, tokenCredential)
.httpLogOptions(logOptions)
.buildClient();
// END: com.azure.messaging.servicebus.administration.servicebusadministrationclient.instantiation
@@ -51,12 +57,11 @@ public void instantiateAsync() {
// DefaultAzureCredential creates a credential based on the environment it is executed in.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusAdministrationAsyncClient client = new ServiceBusAdministrationClientBuilder()
- .connectionString("<< Service Bus NAMESPACE connection string>>")
- .credential("<< my-sb-namespace.servicebus.windows.net >>", credential)
+ .credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
.buildAsyncClient();
// END: com.azure.messaging.servicebus.administration.servicebusadministrationasyncclient.instantiation
-
}
/**
@@ -66,7 +71,7 @@ public void instantiateAsync() {
public void createQueue() {
// Retrieve 'connectionString' from your configuration.
ServiceBusAdministrationClient client = new ServiceBusAdministrationClientBuilder()
- .connectionString(connectionString)
+ .credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
.buildClient();
// BEGIN: com.azure.messaging.servicebus.administration.servicebusadministrationclient.createqueue#string
@@ -82,7 +87,7 @@ public void createQueue() {
@Test
public void createQueueAsync() {
ServiceBusAdministrationAsyncClient client = new ServiceBusAdministrationClientBuilder()
- .connectionString(connectionString)
+ .credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
.buildAsyncClient();
// BEGIN: com.azure.messaging.servicebus.administration.servicebusadministrationasyncclient.createqueue#string
@@ -103,7 +108,7 @@ public void createQueueAsync() {
@Test
public void updateSubscription() {
ServiceBusAdministrationClient client = new ServiceBusAdministrationClientBuilder()
- .connectionString(connectionString)
+ .credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
.buildClient();
// BEGIN: com.azure.messaging.servicebus.administration.servicebusadministrationclient.updatesubscription#subscriptionproperties
@@ -132,7 +137,7 @@ public void updateSubscription() {
@Test
public void updateSubscriptionAsync() {
ServiceBusAdministrationAsyncClient client = new ServiceBusAdministrationClientBuilder()
- .connectionString(connectionString)
+ .credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
.buildAsyncClient();
// BEGIN: com.azure.messaging.servicebus.administration.servicebusadministrationasyncclient.updatesubscription#subscriptionproperties
@@ -168,7 +173,7 @@ public void updateSubscriptionAsync() {
@Test
public void listQueues() {
ServiceBusAdministrationClient client = new ServiceBusAdministrationClientBuilder()
- .connectionString(connectionString)
+ .credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
.buildClient();
// BEGIN: com.azure.messaging.servicebus.administration.servicebusadministrationclient.listQueues
@@ -185,7 +190,7 @@ public void listQueues() {
@Test
public void listQueuesAsync() {
ServiceBusAdministrationAsyncClient client = new ServiceBusAdministrationClientBuilder()
- .connectionString(connectionString)
+ .credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
.buildAsyncClient();
// BEGIN: com.azure.messaging.servicebus.administration.servicebusadministrationasyncclient.listQueues
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusClientBuilderJavaDocCodeSamples.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusClientBuilderJavaDocCodeSamples.java
deleted file mode 100644
index 8f592d5856235..0000000000000
--- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusClientBuilderJavaDocCodeSamples.java
+++ /dev/null
@@ -1,111 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.messaging.servicebus;
-
-import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
-import org.junit.jupiter.api.Test;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.time.Duration;
-
-public class ServiceBusClientBuilderJavaDocCodeSamples {
- String connectionString = System.getenv("AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING");
- String queueName = System.getenv("AZURE_SERVICEBUS_SAMPLE_QUEUE_NAME");
- String topicName = System.getenv("AZURE_SERVICEBUS_SAMPLE_TOPIC_NAME");
- String subscriptionName = System.getenv("AZURE_SERVICEBUS_SAMPLE_SUBSCRIPTION_NAME");
-
- @Test
- public void instantiateReceiverAsync() {
- // BEGIN: com.azure.messaging.servicebus.receiver.async.client.instantiation
- // Retrieve 'connectionString', 'topicName' and 'subscriptionName' from your configuration.
- ServiceBusClientBuilder builder = new ServiceBusClientBuilder()
- .connectionString(connectionString);
- ServiceBusReceiverAsyncClient receiver = builder
- .receiver()
- .disableAutoComplete() // Allows user to take control of settling a message.
- .topicName(topicName)
- .subscriptionName(subscriptionName)
- .buildAsyncClient();
- // END: com.azure.messaging.servicebus.receiver.async.client.instantiation
- receiver.receiveMessages().blockFirst(Duration.ofSeconds(1));
- }
-
- public void instantiateSessionReceiver() {
- // BEGIN: com.azure.messaging.servicebus.session.receiver.async.client.instantiation
- // Retrieve 'connectionString', 'topicName' and 'subscriptionName' from your configuration.
- ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
- .connectionString(connectionString)
- .sessionReceiver()
- .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
- .topicName(topicName)
- .subscriptionName(subscriptionName)
- .buildAsyncClient();
-
- // Receiving messages from the first available sessions. It waits up to the AmqpRetryOptions.getTryTimeout().
- // If no session is available within that operation timeout, it completes with an error. Otherwise, a receiver
- // is returned when a lock on the session is acquired.
- Mono receiverMono = sessionReceiver.acceptNextSession();
-
- Flux.usingWhen(receiverMono,
- receiver -> receiver.receiveMessages(),
- receiver -> Mono.fromRunnable(receiver::close))
- .subscribe(message -> System.out.println(message.getBody().toString()));
- // END: com.azure.messaging.servicebus.session.receiver.async.client.instantiation
- }
-
- @Test
- public void instantiateSenderSync() {
- // BEGIN: com.azure.messaging.servicebus.sender.sync.client.instantiation
- // Retrieve 'connectionString' and 'queueName' from your configuration.
- ServiceBusClientBuilder builder = new ServiceBusClientBuilder()
- .connectionString(connectionString);
- ServiceBusSenderClient sender = builder
- .sender()
- .queueName(queueName)
- .buildClient();
- // END: com.azure.messaging.servicebus.sender.sync.client.instantiation
- sender.sendMessage(new ServiceBusMessage("payload"));
- }
-
- @Test
- public void instantiateProcessor() {
- // BEGIN: com.azure.messaging.servicebus.processor.client.instantiation
- // Retrieve 'connectionString' and 'queueName' from your configuration.
- ServiceBusClientBuilder builder = new ServiceBusClientBuilder()
- .connectionString(connectionString);
- ServiceBusProcessorClient processor = builder
- .processor()
- .queueName(queueName)
- .processMessage(System.out::println)
- .processError(context -> System.err.println(context.getErrorSource()))
- .buildProcessorClient();
- // END: com.azure.messaging.servicebus.processor.client.instantiation
- processor.start();
- processor.stop();
- }
-
- @Test
- public void connectionSharingAcrossClients() {
- // BEGIN: com.azure.messaging.servicebus.connection.sharing
- // Retrieve 'connectionString' and 'queueName' from your configuration.
- // Create shared builder.
- ServiceBusClientBuilder sharedConnectionBuilder = new ServiceBusClientBuilder()
- .connectionString(connectionString);
- // Create receiver and sender which will share the connection.
- ServiceBusReceiverClient receiver = sharedConnectionBuilder
- .receiver()
- .queueName(queueName)
- .buildClient();
- ServiceBusSenderClient sender = sharedConnectionBuilder
- .sender()
- .queueName(queueName)
- .buildClient();
- // END: com.azure.messaging.servicebus.connection.sharing
-
- sender.sendMessage(new ServiceBusMessage("payload"));
- receiver.receiveMessages(1);
- }
-
-}
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorClientJavaDocCodeSamples.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorClientJavaDocCodeSamples.java
index 2aed44f51d17a..a0922f5d2cb50 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorClientJavaDocCodeSamples.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorClientJavaDocCodeSamples.java
@@ -3,6 +3,8 @@
package com.azure.messaging.servicebus;
+import com.azure.core.credential.TokenCredential;
+import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import org.junit.jupiter.api.Test;
@@ -14,8 +16,19 @@
* @see ServiceBusProcessorClient
*/
public class ServiceBusProcessorClientJavaDocCodeSamples {
- String connectionString = System.getenv("AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING");
- String queueName = System.getenv("AZURE_SERVICEBUS_SAMPLE_SESSION_QUEUE_NAME");
+ /**
+ * Fully qualified namespace is the host name of the Service Bus resource. It can be found by navigating to the
+ * Service Bus namespace and looking in the "Essentials" panel.
+ */
+ private final String fullyQualifiedNamespace = System.getenv("AZURE_SERVICEBUS_FULLY_QUALIFIED_DOMAIN_NAME");
+ /**
+ * Name of a queue inside the Service Bus namespace.
+ */
+ private final String queueName = System.getenv("AZURE_SERVICEBUS_SAMPLE_QUEUE_NAME");
+ /**
+ * Name of a session-enabled queue in the Service Bus namespace.
+ */
+ private final String sessionEnabledQueueName = System.getenv("AZURE_SERVICEBUS_SAMPLE_SESSION_QUEUE_NAME");
/**
* Creates a non session-enabled {@link ServiceBusProcessorClient} to receive in PeekLock mode.
@@ -51,11 +64,14 @@ public void createServiceBusProcessorClientInPeekLockMode() {
System.err.println("Error occurred while receiving message: " + errorContext.getException());
};
- // create the processor client via the builder and its sub-builder
+ TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
+
+ // Create the processor client via the builder and its sub-builder
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
- .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
+ .credential(fullyQualifiedNamespace, tokenCredential)
.processor()
- .queueName("<< QUEUE NAME >>")
+ .queueName(queueName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete() // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
.processMessage(processMessage)
@@ -63,8 +79,12 @@ public void createServiceBusProcessorClientInPeekLockMode() {
.disableAutoComplete()
.buildProcessorClient();
- // Starts the processor in the background and returns immediately
+ // Starts the processor in the background. Control returns immediately.
processorClient.start();
+
+ // Stop processor and dispose when done processing messages.
+ processorClient.stop();
+ processorClient.close();
// END: com.azure.messaging.servicebus.servicebusprocessorclient#receive-mode-peek-lock-instantiation
}
@@ -85,19 +105,27 @@ public void createServiceBusProcessorClientInReceiveAndDeleteMode() {
System.err.println("Error occurred while receiving message: " + errorContext.getException());
};
- // create the processor client via the builder and its sub-builder
+ TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
+
+ // Create the processor client via the builder and its sub-builder
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
- .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
+ .credential(fullyQualifiedNamespace, tokenCredential)
.processor()
- .queueName("<< QUEUE NAME >>")
+ .queueName(queueName)
.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
- // Starts the processor in the background and returns immediately
+
+ // Starts the processor in the background. Control returns immediately.
processorClient.start();
+
+ // Stop processor and dispose when done processing messages.
+ processorClient.stop();
+ processorClient.close();
// END: com.azure.messaging.servicebus.servicebusprocessorclient#receive-mode-receive-and-delete-instantiation
}
@@ -126,19 +154,26 @@ public void createSessionEnabledServiceBusProcessorClient() {
}
};
- // Retrieve 'connectionString/queueName' from your configuration.
+ TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
+
+ // Create the processor client via the builder and its sub-builder
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder()
- .connectionString(connectionString)
+ .credential(fullyQualifiedNamespace, tokenCredential)
.sessionProcessor()
- .queueName(queueName)
+ .queueName(sessionEnabledQueueName)
.maxConcurrentSessions(2)
.processMessage(onMessage)
.processError(onError)
.buildProcessorClient();
- // Start the processor in the background
+ // Starts the processor in the background. Control returns immediately.
sessionProcessor.start();
+
+ // Stop processor and dispose when done processing messages.
+ sessionProcessor.stop();
+ sessionProcessor.close();
// END: com.azure.messaging.servicebus.servicebusprocessorclient#session-instantiation
}
}
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientJavaDocCodeSamples.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientJavaDocCodeSamples.java
deleted file mode 100644
index 52f4530f203df..0000000000000
--- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientJavaDocCodeSamples.java
+++ /dev/null
@@ -1,249 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.messaging.servicebus;
-
-import com.azure.core.util.BinaryData;
-import com.azure.identity.DefaultAzureCredentialBuilder;
-import com.azure.messaging.servicebus.models.AbandonOptions;
-import com.azure.messaging.servicebus.models.CompleteOptions;
-import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
-import org.junit.jupiter.api.Test;
-import org.reactivestreams.Subscription;
-import reactor.core.Disposable;
-import reactor.core.publisher.BaseSubscriber;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.time.Duration;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Code snippets demonstrating various {@link ServiceBusReceiverAsyncClient} scenarios.
- */
-public class ServiceBusReceiverAsyncClientJavaDocCodeSamples {
- // The required parameters is connectionString, a way to authenticate with Service Bus using credentials.
- // We are reading 'connectionString/queueName' from environment variable.
- // You can configure them as it fits suitable for your application.
- // 1. "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
- // 2. "<>" will look similar to "{your-namespace}.servicebus.windows.net"
- // 3. "queueName" will be the name of the Service Bus queue instance you created
- // inside the Service Bus namespace.
- String connectionString = System.getenv("AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING");
- String queueName = System.getenv("AZURE_SERVICEBUS_SAMPLE_QUEUE_NAME");
-
- ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
-
- .connectionString(connectionString)
- .receiver()
- .queueName(queueName)
- .buildAsyncClient();
-
- @Test
- public void initialization() {
- // BEGIN: com.azure.messaging.servicebus.servicebusreceiverasyncclient.instantiation
- // The required parameters is connectionString, a way to authenticate with Service Bus using credentials.
- // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below.
- // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
-
- ServiceBusReceiverAsyncClient consumer = new ServiceBusClientBuilder()
- .connectionString(connectionString)
- .receiver()
- .queueName(queueName)
- .buildAsyncClient();
- // END: com.azure.messaging.servicebus.servicebusreceiverasyncclient.instantiation
-
- consumer.close();
- }
-
- public void instantiateWithDefaultCredential() {
- // BEGIN: com.azure.messaging.servicebus.servicebusreceiverasyncclient.instantiateWithDefaultCredential
- // The required parameters is connectionString, a way to authenticate with Service Bus using credentials.
- ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
- .credential("<>",
- new DefaultAzureCredentialBuilder().build())
- .receiver()
- .queueName("<< QUEUE NAME >>")
- .buildAsyncClient();
- // END: com.azure.messaging.servicebus.servicebusreceiverasyncclient.instantiateWithDefaultCredential
-
- receiver.close();
- }
-
- /**
- * Receives message from a queue or topic using receive and delete mode.
- */
- @Test
- public void receiveWithReceiveAndDeleteMode() {
- ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
- .connectionString(connectionString)
- .receiver()
- .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
- .queueName(queueName)
- .buildAsyncClient();
-
- // BEGIN: com.azure.messaging.servicebus.servicebusreceiverasyncclient.receiveWithReceiveAndDeleteMode
- // Keep a reference to `subscription`. When the program is finished receiving messages, call
- // subscription.dispose(). This will stop fetching messages from the Service Bus.
- Disposable subscription = receiver.receiveMessages()
- .subscribe(message -> {
- System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
- System.out.printf("Contents of message as string: %s%n", message.getBody().toString());
- }, error -> System.err.print(error));
- // END: com.azure.messaging.servicebus.servicebusreceiverasyncclient.receiveWithReceiveAndDeleteMode
-
- // When program ends, or you're done receiving all messages.
- receiver.close();
- subscription.dispose();
- }
-
- /**
- * Receives message with back pressure.
- */
- @Test
- public void receiveBackpressure() {
- // BEGIN: com.azure.messaging.servicebus.servicebusreceiverasyncclient.receive#basesubscriber
- receiver.receiveMessages().subscribe(new BaseSubscriber() {
- private static final int NUMBER_OF_MESSAGES = 5;
- private final AtomicInteger currentNumberOfMessages = new AtomicInteger();
-
- @Override
- protected void hookOnSubscribe(Subscription subscription) {
- // Tell the Publisher we only want 5 message at a time.
- request(NUMBER_OF_MESSAGES);
- }
-
- @Override
- protected void hookOnNext(ServiceBusReceivedMessage message) {
- // Process the ServiceBusReceivedMessage
- // If the number of messages we have currently received is a multiple of 5, that means we have reached
- // the last message the Subscriber will provide to us. Invoking request(long) here, tells the Publisher
- // that the subscriber is ready to get more messages from upstream.
- if (currentNumberOfMessages.incrementAndGet() % 5 == 0) {
- request(NUMBER_OF_MESSAGES);
- }
- }
- });
- // END: com.azure.messaging.servicebus.servicebusreceiverasyncclient.receive#basesubscriber
- receiver.close();
- }
-
- /**
- * Receives from all the messages.
- */
- @Test
- public void receiveAll() {
- // BEGIN: com.azure.messaging.servicebus.servicebusreceiverasyncclient.receive#all
- Disposable subscription = receiver.receiveMessages()
- .subscribe(message -> {
- System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
- System.out.printf("Contents of message as string: %s%n", message.getBody());
- },
- error -> System.out.println("Error occurred: " + error),
- () -> System.out.println("Receiving complete."));
-
- // When program ends, or you're done receiving all messages.
- subscription.dispose();
- receiver.close();
- // END: com.azure.messaging.servicebus.servicebusreceiverasyncclient.receive#all
- }
-
- /**
- * Demonstrates how to create a session receiver for a single, first available session.
- */
- @Test
- public void sessionReceiverSingleInstantiation() {
- // BEGIN: com.azure.messaging.servicebus.servicebusreceiverasyncclient.instantiation#nextsession
- // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below.
- // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
- ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
- .connectionString(connectionString)
- .sessionReceiver()
- .queueName(queueName)
- .buildAsyncClient();
-
- // acceptNextSession() completes successfully with a receiver when it acquires the next available session.
- // `Flux.usingWhen` is used so we dispose of the receiver resource after `receiveMessages()` completes.
- // `Mono.usingWhen` can also be used if the resource closure only returns a single item.
- Flux sessionMessages = Flux.usingWhen(
- sessionReceiver.acceptNextSession(),
- receiver -> receiver.receiveMessages(),
- receiver -> Mono.fromRunnable(() -> receiver.close()));
-
- // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
- // is non-blocking and kicks off the operation.
- Disposable subscription = sessionMessages.subscribe(
- message -> System.out.printf("Received Sequence #: %s. Contents: %s%n",
- message.getSequenceNumber(), message.getBody()),
- error -> System.err.print(error));
- // END: com.azure.messaging.servicebus.servicebusreceiverasyncclient.instantiation#nextsession
-
- subscription.dispose();
- sessionReceiver.close();
- }
-
- /**
- * Demonstrates how to create a session receiver for a single know session id.
- */
- @Test
- public void sessionReceiverSessionIdInstantiation() {
- // BEGIN: com.azure.messaging.servicebus.servicebusreceiverasyncclient.instantiation#sessionId
- // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below.
- // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
- ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
- .connectionString(connectionString)
- .sessionReceiver()
- .queueName(queueName)
- .buildAsyncClient();
-
- // acceptSession(String) completes successfully with a receiver when "<< my-session-id >>" session is
- // successfully locked.
- // `Flux.usingWhen` is used so we dispose of the receiver resource after `receiveMessages()` completes.
- // `Mono.usingWhen` can also be used if the resource closure only returns a single item.
- Flux sessionMessages = Flux.usingWhen(
- sessionReceiver.acceptSession("<< my-session-id >>"),
- receiver -> receiver.receiveMessages(),
- receiver -> Mono.fromRunnable(() -> receiver.close()));
-
- // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
- // is non-blocking and kicks off the operation.
- Disposable subscription = sessionMessages.subscribe(
- message -> System.out.printf("Received Sequence #: %s. Contents: %s%n",
- message.getSequenceNumber(), message.getBody()),
- error -> System.err.print(error));
- // END: com.azure.messaging.servicebus.servicebusreceiverasyncclient.instantiation#sessionId
-
- subscription.dispose();
- sessionReceiver.close();
- }
-
- @Test
- public void transactionsSnippet() {
- // Some random sequenceNumber.
- long sequenceNumber = 1000L;
- ServiceBusReceivedMessage receivedMessage = new ServiceBusReceivedMessage(BinaryData.fromString("Hello"));
-
- // BEGIN: com.azure.messaging.servicebus.servicebusreceiverasyncclient.committransaction#servicebustransactioncontext
- // This mono creates a transaction and caches the output value, so we can associate operations with the
- // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
- // the operation.
- Mono transactionContext = receiver.createTransaction()
- .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
- error -> Duration.ZERO,
- () -> Duration.ZERO);
-
- transactionContext.flatMap(transaction -> {
- // Process messages and associate operations with the transaction.
- Mono operations = Mono.when(
- receiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
- receiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
- receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));
-
- // Finally, either commit or rollback the transaction once all the operations are associated with it.
- return operations.flatMap(transactionOperations -> receiver.commitTransaction(transaction));
- });
- // END: com.azure.messaging.servicebus.servicebusreceiverasyncclient.committransaction#servicebustransactioncontext
-
- receiver.close();
- }
-}
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusReceiverClientJavaDocCodeSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusReceiverClientJavaDocCodeSample.java
deleted file mode 100644
index cdbe6eb612934..0000000000000
--- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusReceiverClientJavaDocCodeSample.java
+++ /dev/null
@@ -1,113 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.messaging.servicebus;
-
-import com.azure.core.util.BinaryData;
-import com.azure.messaging.servicebus.models.AbandonOptions;
-import com.azure.messaging.servicebus.models.CompleteOptions;
-import org.junit.jupiter.api.Test;
-
-/**
- * Code snippets demonstrating various {@link ServiceBusReceiverClient} scenarios.
- */
-public class ServiceBusReceiverClientJavaDocCodeSample {
- // The required parameters is connectionString, a way to authenticate with Service Bus using credentials.
- // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below.
- // 1. "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
- // 2. "<>" will look similar to "{your-namespace}.servicebus.windows.net"
- // 3. "queueName" will be the name of the Service Bus queue instance you created
- // inside the Service Bus namespace.
- String connectionString = System.getenv("AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING");
- String queueName = System.getenv("AZURE_SERVICEBUS_SAMPLE_QUEUE_NAME");
- String sessionQueueName = System.getenv("AZURE_SERVICEBUS_SAMPLE_SESSION_QUEUE_NAME");
-
- ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
- .connectionString(connectionString)
- .receiver()
- .queueName(queueName)
- .buildClient();
-
- /**
- * Code snippet for creating an ServiceBusReceiverClient
- */
- @Test
- public void instantiate() {
- // BEGIN: com.azure.messaging.servicebus.servicebusreceiverclient.instantiation
- // The required parameters is connectionString, a way to authenticate with Service Bus using credentials.
- // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below.
- // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
- ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
- .connectionString(connectionString)
- .receiver()
- .queueName(queueName)
- .buildClient();
-
- // Use the receiver and finally close it.
- receiver.close();
- // END: com.azure.messaging.servicebus.servicebusreceiverclient.instantiation
- }
-
- /**
- * Demonstrates how to create a session receiver for a single, first available session.
- */
- public void sessionReceiverSingleInstantiation() {
- // BEGIN: com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#nextsession
- // The connectionString/sessionQueueName must be set by the application. The 'connectionString' format is shown below.
- // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
- ServiceBusSessionReceiverClient sessionReceiver = new ServiceBusClientBuilder()
- .connectionString(
- "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}")
- .sessionReceiver()
- .queueName("<< QUEUE NAME >>")
- .buildClient();
- ServiceBusReceiverClient receiver = sessionReceiver.acceptNextSession();
-
- // Use the receiver and finally close it along with the sessionReceiver.
- receiver.close();
- sessionReceiver.close();
- // END: com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#nextsession
- }
-
- /**
- * Demonstrates how to create a session receiver for a single know session id.
- */
- @Test
- public void sessionReceiverSessionIdInstantiation() {
- // BEGIN: com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#sessionId
- // The connectionString/sessionQueueName must be set by the application. The 'connectionString' format is shown below.
- // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
- ServiceBusSessionReceiverClient sessionReceiver = new ServiceBusClientBuilder()
- .connectionString(connectionString)
- .sessionReceiver()
- .queueName(sessionQueueName)
- .buildClient();
- ServiceBusReceiverClient receiver = sessionReceiver.acceptSession("<< my-session-id >>");
-
- // Use the receiver and finally close it along with the sessionReceiver.
- receiver.close();
- sessionReceiver.close();
- // END: com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#sessionId
- }
-
- /**
- * Demonstrates how to use a transaction.
- */
- public void transactionsSnippet() {
- // Some random sequenceNumber.
- long sequenceNumber = 1000L;
- ServiceBusReceivedMessage receivedMessage = new ServiceBusReceivedMessage((BinaryData) null);
-
- // BEGIN: com.azure.messaging.servicebus.servicebusreceiverclient.committransaction#servicebustransactioncontext
- ServiceBusTransactionContext transaction = receiver.createTransaction();
-
- // Process messages and associate operations with the transaction.
- ServiceBusReceivedMessage deferredMessage = receiver.receiveDeferredMessage(sequenceNumber);
- receiver.complete(deferredMessage, new CompleteOptions().setTransactionContext(transaction));
- receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction));
- receiver.commitTransaction(transaction);
- // END: com.azure.messaging.servicebus.servicebusreceiverclient.committransaction#servicebustransactioncontext
-
- receiver.close();
- }
-}
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusReceiverClientJavaDocCodeSamples.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusReceiverClientJavaDocCodeSamples.java
new file mode 100644
index 0000000000000..7210853b0d308
--- /dev/null
+++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusReceiverClientJavaDocCodeSamples.java
@@ -0,0 +1,516 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.messaging.servicebus;
+
+import com.azure.core.credential.TokenCredential;
+import com.azure.core.util.BinaryData;
+import com.azure.core.util.IterableStream;
+import com.azure.identity.DefaultAzureCredentialBuilder;
+import com.azure.messaging.servicebus.models.AbandonOptions;
+import com.azure.messaging.servicebus.models.CompleteOptions;
+import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
+import com.azure.messaging.servicebus.models.SubQueue;
+import org.junit.jupiter.api.Test;
+import org.reactivestreams.Subscription;
+import reactor.core.Disposable;
+import reactor.core.publisher.BaseSubscriber;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Code snippets demonstrating various {@link ServiceBusReceiverClient} and {@link ServiceBusReceiverAsyncClient}.
+ */
+public class ServiceBusReceiverClientJavaDocCodeSamples {
+ /**
+ * Fully qualified namespace is the host name of the Service Bus resource. It can be found by navigating to the
+ * Service Bus namespace and looking in the "Essentials" panel.
+ */
+ private final String fullyQualifiedNamespace = System.getenv("AZURE_SERVICEBUS_FULLY_QUALIFIED_DOMAIN_NAME");
+ /**
+ * Name of a queue inside the Service Bus namespace.
+ */
+ private final String queueName = System.getenv("AZURE_SERVICEBUS_SAMPLE_QUEUE_NAME");
+ /**
+ * Name of a topic inside the Service Bus namespace.
+ */
+ private final String topicName = System.getenv("AZURE_SERVICEBUS_SAMPLE_TOPIC_NAME");
+ /**
+ * Name of a subscription associated with the {@link #topicName}.
+ */
+ private final String subscriptionName = System.getenv("AZURE_SERVICEBUS_SAMPLE_SUBSCRIPTION_NAME");
+ /**
+ * Name of a session-enabled queue in the Service Bus namespace.
+ */
+ private final String sessionEnabledQueueName = System.getenv("AZURE_SERVICEBUS_SAMPLE_SESSION_QUEUE_NAME");
+
+ /**
+ * Code snippet for creating an ServiceBusReceiverClient
+ */
+ @Test
+ public void instantiate() {
+ // BEGIN: com.azure.messaging.servicebus.servicebusreceiverclient.instantiation
+ TokenCredential credential = new DefaultAzureCredentialBuilder().build();
+
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ // 'disableAutoComplete' indicates that users will explicitly settle their message.
+ ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
+ .credential(fullyQualifiedNamespace, credential)
+ .receiver()
+ .disableAutoComplete()
+ .topicName(topicName)
+ .subscriptionName(subscriptionName)
+ .buildClient();
+
+ receiver.receiveMessages(3, Duration.ofSeconds(5))
+ .forEach(message -> {
+ System.out.println("Message: " + message.getBody());
+ });
+
+ // Use the receiver and finally close it.
+ receiver.close();
+ // END: com.azure.messaging.servicebus.servicebusreceiverclient.instantiation
+ }
+
+ /**
+ * Code snippet for creating an ServiceBusReceiverClient
+ */
+ @Test
+ public void instantiateAsync() {
+ // BEGIN: com.azure.messaging.servicebus.servicebusreceiverasyncclient.instantiation
+ TokenCredential credential = new DefaultAzureCredentialBuilder().build();
+
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ // 'disableAutoComplete' indicates that users will explicitly settle their message.
+ ServiceBusReceiverAsyncClient asyncReceiver = new ServiceBusClientBuilder()
+ .credential(fullyQualifiedNamespace, credential)
+ .receiver()
+ .disableAutoComplete()
+ .queueName(queueName)
+ .buildAsyncClient();
+
+ // Use the receiver and finally close it.
+ asyncReceiver.close();
+ // END: com.azure.messaging.servicebus.servicebusreceiverasyncclient.instantiation
+ }
+
+ /**
+ * Code sample for creating a synchronous Service Bus receiver to read message from dead-letter queue.
+ */
+ @Test
+ public void instantiateDeadLetterQueue() {
+ // BEGIN: com.azure.messaging.servicebus.servicebusreceiverclient.instantiation-deadLetterQueue
+ TokenCredential credential = new DefaultAzureCredentialBuilder().build();
+
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ // 'disableAutoComplete' indicates that users will explicitly settle their message.
+ ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
+ .credential(fullyQualifiedNamespace, credential)
+ .receiver() // Use this for session or non-session enabled queue or topic/subscriptions
+ .topicName(topicName)
+ .subscriptionName(subscriptionName)
+ .subQueue(SubQueue.DEAD_LETTER_QUEUE)
+ .buildClient();
+
+ // Use the receiver and finally close it.
+ receiver.close();
+ // END: com.azure.messaging.servicebus.servicebusreceiverclient.instantiation-deadLetterQueue
+ }
+
+ /**
+ * Receives messages from a topic and subscription.
+ */
+ public void receiveMessages() {
+ // BEGIN: com.azure.messaging.servicebus.servicebusreceiverclient.receiveMessages-int-duration
+ TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
+
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
+ .credential(fullyQualifiedNamespace, tokenCredential)
+ .receiver()
+ .topicName(topicName)
+ .subscriptionName(subscriptionName)
+ .buildClient();
+
+ // Receives a batch of messages when 10 messages are received or until 30 seconds have elapsed, whichever
+ // happens first.
+ IterableStream messages = receiver.receiveMessages(10, Duration.ofSeconds(30));
+ messages.forEach(message -> {
+ System.out.printf("Id: %s. Contents: %s%n", message.getMessageId(), message.getBody());
+ });
+
+ // When you are done using the receiver, dispose of it.
+ receiver.close();
+ // END: com.azure.messaging.servicebus.servicebusreceiverclient.receiveMessages-int-duration
+ }
+
+ /**
+ * Receives from all the messages.
+ */
+ @Test
+ public void receiveMessagesAsync() {
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ // 'disableAutoComplete' indicates that users will explicitly settle their message.
+ ServiceBusReceiverAsyncClient asyncReceiver = new ServiceBusClientBuilder()
+ .credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
+ .receiver()
+ .disableAutoComplete()
+ .queueName(queueName)
+ .buildAsyncClient();
+
+ // BEGIN: com.azure.messaging.servicebus.servicebusreceiverasyncclient.receiveMessages
+ // Keep a reference to `subscription`. When the program is finished receiving messages, call
+ // subscription.dispose(). This will stop fetching messages from the Service Bus.
+ // Consider using Flux.usingWhen to scope the creation, usage, and cleanup of the receiver.
+ Disposable subscription = asyncReceiver.receiveMessages()
+ .subscribe(message -> {
+ System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
+ System.out.printf("Contents of message as string: %s%n", message.getBody());
+ },
+ error -> System.out.println("Error occurred: " + error),
+ () -> System.out.println("Receiving complete."));
+
+ // When program ends, or you're done receiving all messages.
+ subscription.dispose();
+ asyncReceiver.close();
+ // END: com.azure.messaging.servicebus.servicebusreceiverasyncclient.receiveMessages
+ }
+
+ /**
+ * Receives message from a queue or topic using receive and delete mode.
+ */
+ @Test
+ public void receiveWithReceiveAndDeleteModeAsync() {
+ // BEGIN: com.azure.messaging.servicebus.servicebusreceiverasyncclient.receiveWithReceiveAndDeleteMode
+ TokenCredential credential = new DefaultAzureCredentialBuilder().build();
+
+ // Keep a reference to `subscription`. When the program is finished receiving messages, call
+ // subscription.dispose(). This will stop fetching messages from the Service Bus.
+ Disposable subscription = Flux.usingWhen(
+ Mono.fromCallable(() -> {
+ // Setting the receiveMode when creating the receiver enables receive and delete mode. By default,
+ // peek lock mode is used. In peek lock mode, users are responsible for settling messages.
+ return new ServiceBusClientBuilder()
+ .credential(fullyQualifiedNamespace, credential)
+ .receiver()
+ .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
+ .queueName(queueName)
+ .buildAsyncClient();
+ }), receiver -> {
+ return receiver.receiveMessages();
+ }, receiver -> {
+ return Mono.fromRunnable(() -> receiver.close());
+ })
+ .subscribe(message -> {
+ System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
+ System.out.printf("Contents of message as string: %s%n", message.getBody().toString());
+ }, error -> System.err.print(error));
+ // END: com.azure.messaging.servicebus.servicebusreceiverasyncclient.receiveWithReceiveAndDeleteMode
+
+ subscription.dispose();
+ }
+
+ /**
+ * Receives message with back pressure.
+ */
+ @Test
+ public void receiveMessagesBackpressureAsync() {
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ // 'disableAutoComplete' indicates that users will explicitly settle their message.
+ ServiceBusReceiverAsyncClient asyncReceiver = new ServiceBusClientBuilder()
+ .credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
+ .receiver()
+ .disableAutoComplete()
+ .queueName(queueName)
+ .buildAsyncClient();
+
+ // BEGIN: com.azure.messaging.servicebus.servicebusreceiverasyncclient.receive#basesubscriber
+ // This is a non-blocking call. The program will move to the next line of code after setting up the operation.
+ asyncReceiver.receiveMessages().subscribe(new BaseSubscriber() {
+ private static final int NUMBER_OF_MESSAGES = 5;
+ private final AtomicInteger currentNumberOfMessages = new AtomicInteger();
+
+ @Override
+ protected void hookOnSubscribe(Subscription subscription) {
+ // Tell the Publisher we only want 5 message at a time.
+ request(NUMBER_OF_MESSAGES);
+ }
+
+ @Override
+ protected void hookOnNext(ServiceBusReceivedMessage message) {
+ // Process the ServiceBusReceivedMessage
+ // If the number of messages we have currently received is a multiple of 5, that means we have reached
+ // the last message the Subscriber will provide to us. Invoking request(long) here, tells the Publisher
+ // that the subscriber is ready to get more messages from upstream.
+ if (currentNumberOfMessages.incrementAndGet() % 5 == 0) {
+ request(NUMBER_OF_MESSAGES);
+ }
+ }
+ });
+ // END: com.azure.messaging.servicebus.servicebusreceiverasyncclient.receive#basesubscriber
+
+ // When completed receiving messages, close the receiver.
+ asyncReceiver.close();
+ }
+
+ /**
+ * Demonstrates how to create a session receiver for a single, first available session.
+ */
+ public void sessionReceiverSingleInstantiation() {
+ // BEGIN: com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#nextsession
+ TokenCredential credential = new DefaultAzureCredentialBuilder().build();
+
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ // 'disableAutoComplete' indicates that users will explicitly settle their message.
+ ServiceBusSessionReceiverClient sessionReceiver = new ServiceBusClientBuilder()
+ .credential(fullyQualifiedNamespace, credential)
+ .sessionReceiver()
+ .disableAutoComplete()
+ .queueName(sessionEnabledQueueName)
+ .buildClient();
+
+ // Creates a client to receive messages from the first available session. It waits until
+ // AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
+ // throws a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
+ ServiceBusReceiverClient receiver = sessionReceiver.acceptNextSession();
+
+ // Use the receiver and finally close it along with the sessionReceiver.
+ try {
+ IterableStream receivedMessages =
+ receiver.receiveMessages(10, Duration.ofSeconds(30));
+
+ for (ServiceBusReceivedMessage message : receivedMessages) {
+ System.out.println("Body: " + message);
+ }
+ } finally {
+ receiver.close();
+ sessionReceiver.close();
+ }
+
+ // END: com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#nextsession
+ }
+
+ /**
+ * Demonstrates how to create a session receiver for a single, first available session for
+ * {@link ServiceBusReceiverAsyncClient}.
+ */
+ public void sessionReceiverSingleInstantiationAsync() {
+ // BEGIN: com.azure.messaging.servicebus.servicebusreceiverasyncclient.instantiation#nextsession
+ TokenCredential credential = new DefaultAzureCredentialBuilder().build();
+
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ // 'disableAutoComplete' indicates that users will explicitly settle their message.
+ ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
+ .credential(fullyQualifiedNamespace, credential)
+ .sessionReceiver()
+ .disableAutoComplete()
+ .queueName(sessionEnabledQueueName)
+ .buildAsyncClient();
+
+ // Creates a client to receive messages from the first available session. It waits until
+ // AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
+ // completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
+ Mono receiverMono = sessionReceiver.acceptNextSession();
+
+ Disposable disposable = Flux.usingWhen(receiverMono,
+ receiver -> receiver.receiveMessages(),
+ receiver -> Mono.fromRunnable(() -> {
+ // Dispose of the receiver and sessionReceiver when done receiving messages.
+ receiver.close();
+ sessionReceiver.close();
+ }))
+ .subscribe(message -> {
+ System.out.println("Received message: " + message.getBody());
+ });
+ // END: com.azure.messaging.servicebus.servicebusreceiverasyncclient.instantiation#nextsession
+
+ // Users can dispose of the subscription to cancel receive operation.
+ disposable.dispose();
+ }
+
+ /**
+ * Demonstrates how to create a session receiver for a single know session id.
+ */
+ @Test
+ public void sessionReceiverSessionIdInstantiation() {
+ // BEGIN: com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#sessionId
+ TokenCredential credential = new DefaultAzureCredentialBuilder().build();
+
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ // 'disableAutoComplete' indicates that users will explicitly settle their message.
+ ServiceBusSessionReceiverClient sessionReceiver = new ServiceBusClientBuilder()
+ .credential(fullyQualifiedNamespace, credential)
+ .sessionReceiver()
+ .queueName(sessionEnabledQueueName)
+ .disableAutoComplete()
+ .buildClient();
+ ServiceBusReceiverClient receiver = sessionReceiver.acceptSession("<>");
+
+ // Use the receiver and finally close it along with the sessionReceiver.
+ receiver.close();
+ sessionReceiver.close();
+ // END: com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#sessionId
+ }
+
+ /**
+ * Demonstrates how to create a session receiver for a single know session id.
+ */
+ @Test
+ public void sessionReceiverSessionIdInstantiationAsync() {
+ // BEGIN: com.azure.messaging.servicebus.servicebusreceiverasyncclient.instantiation#sessionId
+ TokenCredential credential = new DefaultAzureCredentialBuilder().build();
+
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ // 'disableAutoComplete' indicates that users will explicitly settle their message.
+ ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
+ .credential(fullyQualifiedNamespace, credential)
+ .sessionReceiver()
+ .disableAutoComplete()
+ .queueName(sessionEnabledQueueName)
+ .buildAsyncClient();
+
+ // acceptSession(String) completes successfully with a receiver when "<>" session is
+ // successfully locked.
+ // `Flux.usingWhen` is used, so we dispose of the receiver resource after `receiveMessages()` completes.
+ // `Mono.usingWhen` can also be used if the resource closure only returns a single item.
+ Flux sessionMessages = Flux.usingWhen(
+ sessionReceiver.acceptSession("<>"),
+ receiver -> {
+ // Receive messages from <> session.
+ return receiver.receiveMessages();
+ },
+ receiver -> Mono.fromRunnable(() -> {
+ // Dispose of
+ receiver.close();
+ sessionReceiver.close();
+ }));
+
+ // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
+ // is non-blocking and kicks off the operation.
+ Disposable subscription = sessionMessages.subscribe(
+ message -> System.out.printf("Received Sequence #: %s. Contents: %s%n",
+ message.getSequenceNumber(), message.getBody()),
+ error -> System.err.print(error),
+ () -> System.out.println("Completed receiving from session."));
+ // END: com.azure.messaging.servicebus.servicebusreceiverasyncclient.instantiation#sessionId
+
+ subscription.dispose();
+ sessionReceiver.close();
+ }
+
+ /**
+ * Demonstrates how to use a transaction.
+ */
+ @Test
+ public void transactionsSnippet() {
+ TokenCredential credential = new DefaultAzureCredentialBuilder().build();
+
+ ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
+ .credential(fullyQualifiedNamespace, credential)
+ .receiver()
+ .disableAutoComplete()
+ .queueName(queueName)
+ .buildClient();
+
+ // Some random sequenceNumber.
+ long sequenceNumber = 1000L;
+ ServiceBusReceivedMessage receivedMessage = new ServiceBusReceivedMessage(BinaryData.fromString("Hello"));
+
+ // BEGIN: com.azure.messaging.servicebus.servicebusreceiverclient.committransaction#servicebustransactioncontext
+ ServiceBusTransactionContext transaction = receiver.createTransaction();
+
+ // Process messages and associate operations with the transaction.
+ ServiceBusReceivedMessage deferredMessage = receiver.receiveDeferredMessage(sequenceNumber);
+ receiver.complete(deferredMessage, new CompleteOptions().setTransactionContext(transaction));
+ receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction));
+ receiver.commitTransaction(transaction);
+ // END: com.azure.messaging.servicebus.servicebusreceiverclient.committransaction#servicebustransactioncontext
+
+ // Close receiver when finished using it.
+ receiver.close();
+ }
+
+ /**
+ * Demonstrates transactions using async client.
+ */
+ @Test
+ public void transactionsSnippetAsync() {
+ ServiceBusReceiverAsyncClient asyncReceiver = new ServiceBusClientBuilder()
+ .credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
+ .receiver()
+ .disableAutoComplete()
+ .queueName(queueName)
+ .buildAsyncClient();
+
+ // Some random sequenceNumber.
+ long sequenceNumber = 1000L;
+ ServiceBusReceivedMessage receivedMessage = new ServiceBusReceivedMessage(BinaryData.fromString("Hello"));
+
+ // BEGIN: com.azure.messaging.servicebus.servicebusreceiverasyncclient.committransaction#servicebustransactioncontext
+ // This mono creates a transaction and caches the output value, so we can associate operations with the
+ // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
+ // the operation.
+ Mono transactionContext = asyncReceiver.createTransaction()
+ .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
+ error -> Duration.ZERO,
+ () -> Duration.ZERO);
+
+ // Dispose of the disposable to cancel the operation.
+ Disposable disposable = transactionContext.flatMap(transaction -> {
+ // Process messages and associate operations with the transaction.
+ Mono operations = Mono.when(
+ asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
+ asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
+ asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));
+
+ // Finally, either commit or rollback the transaction once all the operations are associated with it.
+ return operations.then(asyncReceiver.commitTransaction(transaction));
+ }).subscribe(unused -> {
+ }, error -> {
+ System.err.println("Error occurred processing transaction: " + error);
+ }, () -> {
+ System.out.println("Completed transaction");
+ });
+ // END: com.azure.messaging.servicebus.servicebusreceiverasyncclient.committransaction#servicebustransactioncontext
+ // Close receiver when finished using it.
+ asyncReceiver.close();
+
+ if (!disposable.isDisposed()) {
+ disposable.dispose();
+ }
+ }
+
+ @Test
+ public void connectionSharingAcrossClients() {
+ // BEGIN: com.azure.messaging.servicebus.connection.sharing
+ TokenCredential credential = new DefaultAzureCredentialBuilder().build();
+
+ // Retrieve 'connectionString' and 'queueName' from your configuration.
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ ServiceBusClientBuilder sharedConnectionBuilder = new ServiceBusClientBuilder()
+ .credential(fullyQualifiedNamespace, credential);
+
+ // Create receiver and sender which will share the connection.
+ ServiceBusReceiverClient receiver = sharedConnectionBuilder
+ .receiver()
+ .queueName(queueName)
+ .buildClient();
+ ServiceBusSenderClient sender = sharedConnectionBuilder
+ .sender()
+ .queueName(queueName)
+ .buildClient();
+
+ // Use the clients and finally close them.
+ try {
+ sender.sendMessage(new ServiceBusMessage("payload"));
+ receiver.receiveMessages(1);
+ } finally {
+ sender.close();
+ receiver.close();
+ }
+ // END: com.azure.messaging.servicebus.connection.sharing
+ }
+}
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusRuleManagerAsyncClientJavaDocSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusRuleManagerAsyncClientJavaDocSample.java
deleted file mode 100644
index d0b21933706c3..0000000000000
--- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusRuleManagerAsyncClientJavaDocSample.java
+++ /dev/null
@@ -1,104 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.messaging.servicebus;
-
-import com.azure.messaging.servicebus.administration.models.CreateRuleOptions;
-import com.azure.messaging.servicebus.administration.models.RuleFilter;
-import com.azure.messaging.servicebus.administration.models.TrueRuleFilter;
-import org.junit.jupiter.api.Test;
-
-/**
- * Code snippets demonstrating various {@link ServiceBusRuleManagerAsyncClient} scenarios.
- */
-public class ServiceBusRuleManagerAsyncClientJavaDocSample {
- // The required parameters is connectionString, a way to authenticate with Service Bus using credentials.
- // We are reading 'connectionString/topicName/subscriptionName' from environment variable.
- // You can configure them as it fits suitable for your application.
- // 1. "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
- // 2. "<>" will look similar to "{your-namespace}.servicebus.windows.net"
- // 3. "topicName" will be the name of the Service Bus queue instance you created
- // inside the Service Bus namespace.
- // 4. "subscriptionName" will be the name of the Service Bus subscription instance you created inside
- // the Service Bus topic.
- String connectionString = System.getenv("AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING");
- String topicName = System.getenv("AZURE_SERVICEBUS_TOPIC_NAME");
- String subscriptionName = System.getenv("AZURE_SERVICEBUS_SUBSCRIPTION_NAME");
-
- ServiceBusRuleManagerAsyncClient ruleManager = new ServiceBusClientBuilder()
- .connectionString(connectionString)
- .ruleManager()
- .topicName(topicName)
- .subscriptionName(subscriptionName)
- .buildAsyncClient();
-
- /**
- * Code snippet for creating a ServiceBusRuleManagerAsyncClient.
- */
- @Test
- public void initialization() {
- // BEGIN: com.azure.messaging.servicebus.servicebusrulemanagerasyncclient.instantiation
- // The required parameters is connectionString, a way to authenticate with Service Bus using credentials.
- // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below.
- // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
-
- ServiceBusRuleManagerAsyncClient ruleManager = new ServiceBusClientBuilder()
- .connectionString(connectionString)
- .ruleManager()
- .topicName(topicName)
- .subscriptionName(subscriptionName)
- .buildAsyncClient();
- // END: com.azure.messaging.servicebus.servicebusrulemanagerasyncclient.instantiation
-
- ruleManager.close();
- }
-
- /**
- * Demonstrates how to create a rule for a Service Bus subscription.
- */
- @Test
- public void createRule() {
- ServiceBusRuleManagerAsyncClient ruleManager = new ServiceBusClientBuilder()
- .connectionString(connectionString)
- .ruleManager()
- .topicName(topicName)
- .subscriptionName(subscriptionName)
- .buildAsyncClient();
-
- // BEGIN: com.azure.messaging.servicebus.servicebusrulemanagerasyncclient.createRule
- RuleFilter trueRuleFilter = new TrueRuleFilter();
- CreateRuleOptions options = new CreateRuleOptions(trueRuleFilter);
- ruleManager.createRule("new-rule", options).subscribe(
- unused -> { },
- err -> System.err.println("Error occurred when create a rule, err: " + err),
- () -> System.out.println("Create complete.")
- );
- // END: com.azure.messaging.servicebus.servicebusrulemanagerasyncclient.createRule
-
- ruleManager.close();
- }
-
- /**
- * Demonstrates how to fetch all rules under a Service Bus subscription.
- */
- @Test
- public void getRules() {
- // BEGIN: com.azure.messaging.servicebus.servicebusrulemanagerasyncclient.getRules
- ruleManager.listRules().subscribe(ruleProperties -> System.out.println(ruleProperties.getName()));
- // END: com.azure.messaging.servicebus.servicebusrulemanagerasyncclient.getRules
- }
-
- /**
- * Demonstrates how to delete a rule.
- */
- @Test
- public void deleteRule() {
- // BEGIN: com.azure.messaging.servicebus.servicebusrulemanagerasyncclient.deleteRule
- ruleManager.deleteRule("exist-rule").subscribe(
- unused -> { },
- err -> System.err.println("Error occurred when delete rule, err: " + err),
- () -> System.out.println("Delete complete.")
- );
- // END: com.azure.messaging.servicebus.servicebusrulemanagerasyncclient.deleteRule
- }
-}
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusRuleManagerClientJavaDocSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusRuleManagerClientJavaDocSample.java
deleted file mode 100644
index 05e9800ea812c..0000000000000
--- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusRuleManagerClientJavaDocSample.java
+++ /dev/null
@@ -1,49 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.messaging.servicebus;
-
-import com.azure.messaging.servicebus.administration.models.CreateRuleOptions;
-import com.azure.messaging.servicebus.administration.models.RuleFilter;
-import com.azure.messaging.servicebus.administration.models.TrueRuleFilter;
-import org.junit.jupiter.api.Test;
-
-/**
- * Code snippets demonstrating various {@link ServiceBusRuleManagerClient} scenarios.
- */
-public class ServiceBusRuleManagerClientJavaDocSample {
- // The required parameters is connectionString, a way to authenticate with Service Bus using credentials.
- // We are reading 'connectionString/topicName/subscriptionName' from environment variable.
- // You can configure them as it fits suitable for your application.
- // 1. "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
- // 2. "<>" will look similar to "{your-namespace}.servicebus.windows.net"
- // 3. "topicName" will be the name of the Service Bus queue instance you created
- // inside the Service Bus namespace.
- // 4. "subscriptionName" will be the name of the Service Bus subscription instance you created inside
- // the Service Bus topic.
- String connectionString = System.getenv("AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING");
- String topicName = System.getenv("AZURE_SERVICEBUS_TOPIC_NAME");
- String subscriptionName = System.getenv("AZURE_SERVICEBUS_SUBSCRIPTION_NAME");
-
- ServiceBusRuleManagerClient ruleManager = new ServiceBusClientBuilder()
- .connectionString(connectionString)
- .ruleManager()
- .topicName(topicName)
- .subscriptionName(subscriptionName)
- .buildClient();
-
- /**
- * Demonstrates how to create a rule for a Service Bus subscription.
- */
- @Test
- public void createRule() {
- // BEGIN: com.azure.messaging.servicebus.servicebusrulemanagerclient.createRule
- RuleFilter trueRuleFilter = new TrueRuleFilter();
- CreateRuleOptions options = new CreateRuleOptions(trueRuleFilter);
- ruleManager.createRule("new-rule", options);
- // END: com.azure.messaging.servicebus.servicebusrulemanagerclient.createRule
-
- ruleManager.close();
- }
-
-}
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusRuleManagerClientJavaDocSamples.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusRuleManagerClientJavaDocSamples.java
new file mode 100644
index 0000000000000..3997c547b01c3
--- /dev/null
+++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusRuleManagerClientJavaDocSamples.java
@@ -0,0 +1,152 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.messaging.servicebus;
+
+import com.azure.core.credential.TokenCredential;
+import com.azure.identity.DefaultAzureCredentialBuilder;
+import com.azure.messaging.servicebus.administration.models.CreateRuleOptions;
+import com.azure.messaging.servicebus.administration.models.RuleFilter;
+import com.azure.messaging.servicebus.administration.models.TrueRuleFilter;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Code snippets demonstrating various {@link ServiceBusRuleManagerClient} scenarios.
+ */
+public class ServiceBusRuleManagerClientJavaDocSamples {
+ /**
+ * Fully qualified namespace is the host name of the Service Bus resource. It can be found by navigating to the
+ * Service Bus namespace and looking in the "Essentials" panel.
+ */
+ private final String fullyQualifiedNamespace = System.getenv("AZURE_SERVICEBUS_FULLY_QUALIFIED_DOMAIN_NAME");
+ private final String topicName = System.getenv("AZURE_SERVICEBUS_TOPIC_NAME");
+ private final String subscriptionName = System.getenv("AZURE_SERVICEBUS_SUBSCRIPTION_NAME");
+
+ @Test
+ public void initializationAsync() {
+ // BEGIN: com.azure.messaging.servicebus.servicebusrulemanagerasyncclient.instantiation
+ // The required parameters is connectionString, a way to authenticate with Service Bus using credentials.
+ // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below.
+ // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
+ TokenCredential credential = new DefaultAzureCredentialBuilder().build();
+
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ ServiceBusRuleManagerAsyncClient ruleManager = new ServiceBusClientBuilder()
+ .credential(fullyQualifiedNamespace, credential)
+ .ruleManager()
+ .topicName(topicName)
+ .subscriptionName(subscriptionName)
+ .buildAsyncClient();
+ // END: com.azure.messaging.servicebus.servicebusrulemanagerasyncclient.instantiation
+
+ ruleManager.close();
+ }
+
+ /**
+ * Demonstrates how to create a rule for a Service Bus subscription.
+ */
+ @Test
+ public void createRule() {
+ // BEGIN: com.azure.messaging.servicebus.servicebusrulemanagerclient.createRule
+ TokenCredential credential = new DefaultAzureCredentialBuilder().build();
+
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ ServiceBusRuleManagerClient ruleManager = new ServiceBusClientBuilder()
+ .credential(fullyQualifiedNamespace, credential)
+ .ruleManager()
+ .topicName(topicName)
+ .subscriptionName(subscriptionName)
+ .buildClient();
+
+ RuleFilter trueRuleFilter = new TrueRuleFilter();
+ CreateRuleOptions options = new CreateRuleOptions(trueRuleFilter);
+ ruleManager.createRule("new-rule", options);
+
+ // Dispose of the ruleManager when finished using it.
+ ruleManager.close();
+ // END: com.azure.messaging.servicebus.servicebusrulemanagerclient.createRule
+ }
+
+ /**
+ * Demonstrates how to create a rule for a Service Bus subscription.
+ */
+ @Test
+ public void createRuleAsync() {
+
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ ServiceBusRuleManagerAsyncClient ruleManager = new ServiceBusClientBuilder()
+ .credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
+ .ruleManager()
+ .topicName(topicName)
+ .subscriptionName(subscriptionName)
+ .buildAsyncClient();
+
+ // BEGIN: com.azure.messaging.servicebus.servicebusrulemanagerasyncclient.createRule
+ RuleFilter trueRuleFilter = new TrueRuleFilter();
+ CreateRuleOptions options = new CreateRuleOptions(trueRuleFilter);
+
+ // `subscribe` is a non-blocking call. After setting up the create rule operation, it will move onto the next
+ // line of code to execute.
+ // Consider using Mono.usingWhen to scope the creation, usage, and cleanup of the rule manager.
+ ruleManager.createRule("new-rule", options).subscribe(
+ unused -> {
+ },
+ err -> System.err.println("Error occurred when create a rule, err: " + err),
+ () -> System.out.println("Create complete.")
+ );
+
+ // Finally dispose of the rule manager when done using it.
+ ruleManager.close();
+ // END: com.azure.messaging.servicebus.servicebusrulemanagerasyncclient.createRule
+ }
+
+ /**
+ * Demonstrates how to fetch all rules under a Service Bus subscription.
+ */
+ @Test
+ public void getRulesAsync() {
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ ServiceBusRuleManagerAsyncClient ruleManager = new ServiceBusClientBuilder()
+ .credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
+ .ruleManager()
+ .topicName(topicName)
+ .subscriptionName(subscriptionName)
+ .buildAsyncClient();
+
+ // BEGIN: com.azure.messaging.servicebus.servicebusrulemanagerasyncclient.getRules
+ // `subscribe` is a non-blocking call. After setting up the list rules operation, it will move onto the next
+ // line of code to execute.
+ ruleManager.listRules().subscribe(ruleProperties -> System.out.println(ruleProperties.getName()));
+ // END: com.azure.messaging.servicebus.servicebusrulemanagerasyncclient.getRules
+
+ // Finally dispose of the rule manager when done using it.
+ ruleManager.close();
+ }
+
+ /**
+ * Demonstrates how to delete a rule.
+ */
+ @Test
+ public void deleteRuleAsync() {
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ ServiceBusRuleManagerAsyncClient ruleManager = new ServiceBusClientBuilder()
+ .credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
+ .ruleManager()
+ .topicName(topicName)
+ .subscriptionName(subscriptionName)
+ .buildAsyncClient();
+
+ // BEGIN: com.azure.messaging.servicebus.servicebusrulemanagerasyncclient.deleteRule
+ // `subscribe` is a non-blocking call. After setting up the delete rule operation, it will move onto the next
+ // line of code to execute.
+ ruleManager.deleteRule("exist-rule").subscribe(
+ unused -> { },
+ err -> System.err.println("Error occurred when delete rule, err: " + err),
+ () -> System.out.println("Delete complete.")
+ );
+ // END: com.azure.messaging.servicebus.servicebusrulemanagerasyncclient.deleteRule
+
+ // Finally dispose of the rule manager when done using it.
+ ruleManager.close();
+ }
+}
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientJavaDocCodeSamples.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientJavaDocCodeSamples.java
deleted file mode 100644
index 21a82642b0660..0000000000000
--- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientJavaDocCodeSamples.java
+++ /dev/null
@@ -1,153 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.messaging.servicebus;
-
-import com.azure.core.util.BinaryData;
-import com.azure.identity.DefaultAzureCredentialBuilder;
-import com.azure.messaging.servicebus.models.CreateMessageBatchOptions;
-import org.junit.jupiter.api.Test;
-import reactor.core.Exceptions;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.concurrent.atomic.AtomicReference;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-/**
- * Contains code snippets when generating javadocs through doclets for {@link ServiceBusSenderAsyncClient}.
- */
-public class ServiceBusSenderAsyncClientJavaDocCodeSamples {
- // The required parameters is connectionString, a way to authenticate with Service Bus using credentials.
- // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below.
- // 1. "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
- // 2. "<>" will look similar to "{your-namespace}.servicebus.windows.net"
- // 3. "queueName" will be the name of the Service Bus queue instance you created
- // inside the Service Bus namespace.
- String connectionString = System.getenv("AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING");
- String queueName = System.getenv("AZURE_SERVICEBUS_SAMPLE_QUEUE_NAME");
-
- ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder()
- .connectionString(System.getenv("AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING"))
- .sender()
- .queueName(System.getenv("AZURE_SERVICEBUS_SAMPLE_QUEUE_NAME"))
- .buildAsyncClient();
-
- /**
- * Code snippet demonstrating how to create an {@link ServiceBusSenderAsyncClient}.
- */
- @Test
- public void instantiate() {
- // BEGIN: com.azure.messaging.servicebus.servicebusasyncsenderclient.instantiation
- // The required parameters is connectionString, a way to authenticate with Service Bus using credentials.
- // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below.
- // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
- ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder()
- .connectionString(connectionString)
- .sender()
- .queueName(queueName)
- .buildAsyncClient();
- // END: com.azure.messaging.servicebus.servicebusasyncsenderclient.instantiation
-
- sender.close();
- }
-
- /**
- * Code snippet demonstrating how to create an {@link ServiceBusSenderAsyncClient}.
- */
- public void instantiateWithDefaultCredential() {
- // BEGIN: com.azure.messaging.servicebus.servicebusasyncsenderclient.instantiateWithDefaultCredential
- // The required parameter is a way to authenticate with Service Bus using credentials.
- // The connectionString provides a way to authenticate with Service Bus.
- ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder()
- .credential("<>",
- new DefaultAzureCredentialBuilder().build())
- .sender()
- .queueName("<< QUEUE NAME >>")
- .buildAsyncClient();
- // END: com.azure.messaging.servicebus.servicebusasyncsenderclient.instantiateWithDefaultCredential
-
- sender.close();
- }
-
- /**
- * Code snippet demonstrating how to send a batch to Service Bus queue or topic.
- */
- @Test
- public void sendBatch() {
- // BEGIN: com.azure.messaging.servicebus.servicebusasyncsenderclient.createMessageBatch
- // The required parameters is connectionString, a way to authenticate with Service Bus using credentials.
- // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below.
- // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
- ServiceBusSenderAsyncClient sender = new ServiceBusClientBuilder()
- .connectionString(connectionString)
- .sender()
- .queueName(queueName)
- .buildAsyncClient();
-
- // Creating a batch without options set, will allow for automatic routing of events to any partition.
- sender.createMessageBatch().flatMap(batch -> {
- batch.tryAddMessage(new ServiceBusMessage(BinaryData.fromBytes("test-1".getBytes(UTF_8))));
- batch.tryAddMessage(new ServiceBusMessage(BinaryData.fromBytes("test-2".getBytes(UTF_8))));
- return sender.sendMessages(batch);
- }).subscribe(unused -> {
- },
- error -> System.err.println("Error occurred while sending batch:" + error),
- () -> System.out.println("Send complete."));
- // END: com.azure.messaging.servicebus.servicebusasyncsenderclient.createMessageBatch
-
- sender.close();
- }
-
- /**
- * Code snippet demonstrating how to create a size-limited {@link ServiceBusMessageBatch} and send it.
- */
- @Test
- public void batchSizeLimited() {
-
- ServiceBusMessage firstMessage = new ServiceBusMessage(BinaryData.fromBytes("92".getBytes(UTF_8)));
- firstMessage.getApplicationProperties().put("telemetry", "latency");
- ServiceBusMessage secondMessage = new ServiceBusMessage(BinaryData.fromBytes("98".getBytes(UTF_8)));
- secondMessage.getApplicationProperties().put("telemetry", "cpu-temperature");
-
- // BEGIN: com.azure.messaging.servicebus.servicebusasyncsenderclient.createMessageBatch#CreateMessageBatchOptionsLimitedSize
- Flux telemetryMessages = Flux.just(firstMessage, secondMessage);
-
- // Setting `setMaximumSizeInBytes` when creating a batch, limits the size of that batch.
- // In this case, all the batches created with these options are limited to 256 bytes.
- CreateMessageBatchOptions options = new CreateMessageBatchOptions()
- .setMaximumSizeInBytes(256);
- AtomicReference currentBatch = new AtomicReference<>(
- sender.createMessageBatch(options).block());
-
- // The sample Flux contains two messages, but it could be an infinite stream of telemetry messages.
- telemetryMessages.flatMap(message -> {
- ServiceBusMessageBatch batch = currentBatch.get();
- if (batch.tryAddMessage(message)) {
- return Mono.empty();
- }
-
- return Mono.when(
- sender.sendMessages(batch),
- sender.createMessageBatch(options).map(newBatch -> {
- currentBatch.set(newBatch);
-
- // Add the message that did not fit in the previous batch.
- if (!newBatch.tryAddMessage(message)) {
- throw Exceptions.propagate(new IllegalArgumentException(
- "Message was too large to fit in an empty batch. Max size: " + newBatch.getMaxSizeInBytes()));
- }
-
- return newBatch;
- }));
- }).then()
- .doFinally(signal -> {
- ServiceBusMessageBatch batch = currentBatch.getAndSet(null);
- if (batch != null && batch.getCount() > 0) {
- sender.sendMessages(batch).block();
- }
- });
- // END: com.azure.messaging.servicebus.servicebusasyncsenderclient.createMessageBatch#CreateMessageBatchOptionsLimitedSize
- }
-}
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusSenderClientJavaDocCodeSamples.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusSenderClientJavaDocCodeSamples.java
index eb6ab9c834996..08cd2d2a6e544 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusSenderClientJavaDocCodeSamples.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusSenderClientJavaDocCodeSamples.java
@@ -3,33 +3,37 @@
package com.azure.messaging.servicebus;
+import com.azure.core.credential.TokenCredential;
import com.azure.core.util.BinaryData;
+import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.messaging.servicebus.models.CreateMessageBatchOptions;
import org.junit.jupiter.api.Test;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.SynchronousSink;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
import static java.nio.charset.StandardCharsets.UTF_8;
/**
- * Contains code snippets when generating javadocs through doclets for {@link ServiceBusSenderClient}.
+ * Contains code snippets when generating javadocs through doclets for {@link ServiceBusSenderClient} and
+ * {@link ServiceBusSenderAsyncClient}.
*/
public class ServiceBusSenderClientJavaDocCodeSamples {
-
- // The required parameters is connectionString, a way to authenticate with Service Bus using credentials.
- // We are reading 'connectionString/queueName' from environment variable.
- // You can configure them as it fits suitable for your application.
- // 1. "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
- // 2. "<>" will look similar to "{your-namespace}.servicebus.windows.net"
- // inside the Service Bus namespace.
- private String connectionString = System.getenv("AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING");
- private String queueName = System.getenv("AZURE_SERVICEBUS_SAMPLE_QUEUE_NAME");
- private ServiceBusSenderClient sender = new ServiceBusClientBuilder()
- .connectionString(connectionString)
- .sender()
- .queueName(queueName)
- .buildClient();
+ /**
+ * Fully qualified namespace is the host name of the Service Bus resource. It can be found by navigating to the
+ * Service Bus namespace and looking in the "Essentials" panel.
+ */
+ private final String fullyQualifiedNamespace = System.getenv("AZURE_SERVICEBUS_FULLY_QUALIFIED_DOMAIN_NAME");
+ private final String queueName = System.getenv("AZURE_SERVICEBUS_SAMPLE_QUEUE_NAME");
+ /**
+ * Name of a session-enabled queue in the Service Bus namespace.
+ */
+ private final String sessionEnabledQueueName = System.getenv("AZURE_SERVICEBUS_SAMPLE_SESSION_QUEUE_NAME");
/**
* Code snippet demonstrating how to create an {@link ServiceBusSenderClient}.
@@ -37,19 +41,41 @@ public class ServiceBusSenderClientJavaDocCodeSamples {
@Test
public void instantiate() {
// BEGIN: com.azure.messaging.servicebus.servicebussenderclient.instantiation
- // The required parameters is connectionString, a way to authenticate with Service Bus using credentials.
- // The connectionString/queueName must be set by the application. The 'connectionString' format is shown below.
- // "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
+ TokenCredential credential = new DefaultAzureCredentialBuilder().build();
+
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusSenderClient sender = new ServiceBusClientBuilder()
- .connectionString(connectionString)
+ .credential(fullyQualifiedNamespace, credential)
.sender()
.queueName(queueName)
.buildClient();
+
+ sender.sendMessage(new ServiceBusMessage("Foo bar"));
// END: com.azure.messaging.servicebus.servicebussenderclient.instantiation
sender.close();
}
+ /**
+ * Code snippet demonstrating how to create an {@link ServiceBusSenderAsyncClient}.
+ */
+ @Test
+ public void instantiateAsync() {
+ // BEGIN: com.azure.messaging.servicebus.servicebusasyncsenderclient.instantiation
+ TokenCredential credential = new DefaultAzureCredentialBuilder().build();
+
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ ServiceBusSenderAsyncClient asyncSender = new ServiceBusClientBuilder()
+ .credential(fullyQualifiedNamespace, credential)
+ .sender()
+ .queueName(queueName)
+ .buildAsyncClient();
+
+ // Use the sender and finally close it.
+ asyncSender.close();
+ // END: com.azure.messaging.servicebus.servicebusasyncsenderclient.instantiation
+ }
+
/**
* Code snippet demonstrating how to send a batch to Service Bus queue or topic.
*
@@ -57,25 +83,74 @@ public void instantiate() {
*/
@Test
public void sendBatch() {
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ ServiceBusSenderClient sender = new ServiceBusClientBuilder()
+ .credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
+ .sender()
+ .queueName(queueName)
+ .buildClient();
// BEGIN: com.azure.messaging.servicebus.servicebussenderclient.createMessageBatch
- List messages = Arrays.asList(new ServiceBusMessage(BinaryData.fromBytes("test-1".getBytes(UTF_8))),
- new ServiceBusMessage(BinaryData.fromBytes("test-2".getBytes(UTF_8))));
-
- CreateMessageBatchOptions options = new CreateMessageBatchOptions().setMaximumSizeInBytes(10 * 1024);
+ List messages = Arrays.asList(
+ new ServiceBusMessage("test-1"),
+ new ServiceBusMessage("test-2"));
// Creating a batch without options set.
- ServiceBusMessageBatch batch = sender.createMessageBatch(options);
+ ServiceBusMessageBatch batch = sender.createMessageBatch();
for (ServiceBusMessage message : messages) {
if (batch.tryAddMessage(message)) {
continue;
}
+ // The batch is full. Send the current batch and create a new one.
+ sender.sendMessages(batch);
+
+ batch = sender.createMessageBatch();
+
+ // Add the message we couldn't before.
+ if (!batch.tryAddMessage(message)) {
+ throw new IllegalArgumentException("Message is too large for an empty batch.");
+ }
+ }
+
+ // Send the final batch if there are any messages in it.
+ if (batch.getCount() > 0) {
sender.sendMessages(batch);
}
- // END: com.azure.messaging.servicebus.servicebussenderclient.createMessageBatch
+ // Finally dispose of the sender.
sender.close();
+ // END: com.azure.messaging.servicebus.servicebussenderclient.createMessageBatch
+ }
+
+ /**
+ * Code snippet demonstrating how to send a batch to Service Bus queue or topic.
+ */
+ @Test
+ public void sendBatchAsync() {
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ ServiceBusSenderAsyncClient asyncSender = new ServiceBusClientBuilder()
+ .credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
+ .sender()
+ .queueName(queueName)
+ .buildAsyncClient();
+
+ // BEGIN: com.azure.messaging.servicebus.servicebusasyncsenderclient.createMessageBatch
+ // `subscribe` is a non-blocking call. The program will move onto the next line of code when it starts the
+ // operation. Users should use the callbacks on `subscribe` to understand the status of the send operation.
+ asyncSender.createMessageBatch().flatMap(batch -> {
+ batch.tryAddMessage(new ServiceBusMessage("test-1"));
+ batch.tryAddMessage(new ServiceBusMessage("test-2"));
+ return asyncSender.sendMessages(batch);
+ }).subscribe(unused -> {
+ }, error -> {
+ System.err.println("Error occurred while sending batch:" + error);
+ }, () -> {
+ System.out.println("Send complete.");
+ });
+ // END: com.azure.messaging.servicebus.servicebusasyncsenderclient.createMessageBatch
+
+ asyncSender.close();
}
/**
@@ -85,15 +160,21 @@ public void sendBatch() {
*/
@Test
public void batchSizeLimited() {
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ ServiceBusSenderClient sender = new ServiceBusClientBuilder()
+ .credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
+ .sender()
+ .queueName(queueName)
+ .buildClient();
- ServiceBusMessage firstMessage = new ServiceBusMessage(BinaryData.fromBytes("message-1".getBytes(UTF_8)));
+ ServiceBusMessage firstMessage = new ServiceBusMessage("message-1");
firstMessage.getApplicationProperties().put("telemetry", "latency");
- ServiceBusMessage secondMessage = new ServiceBusMessage(BinaryData.fromBytes("message-2".getBytes(UTF_8)));
+ ServiceBusMessage secondMessage = new ServiceBusMessage("message-2");
secondMessage.getApplicationProperties().put("telemetry", "cpu-temperature");
- ServiceBusMessage thirdMessage = new ServiceBusMessage(BinaryData.fromBytes("message-3".getBytes(UTF_8)));
+ ServiceBusMessage thirdMessage = new ServiceBusMessage("message-3");
thirdMessage.getApplicationProperties().put("telemetry", "fps");
- // BEGIN: com.azure.messaging.servicebus.servicebussenderclient.createMessageBatch#CreateMessageBatchOptions-int
+ // BEGIN: com.azure.messaging.servicebus.servicebussenderclient.createMessageBatch#CreateMessageBatchOptions
List telemetryMessages = Arrays.asList(firstMessage, secondMessage, thirdMessage);
// Setting `setMaximumSizeInBytes` when creating a batch, limits the size of that batch.
@@ -116,6 +197,129 @@ public void batchSizeLimited() {
}
}
}
- // END: com.azure.messaging.servicebus.servicebussenderclient.createMessageBatch#CreateMessageBatchOptions-int
+
+ // Send the final batch if there are any messages in it.
+ if (currentBatch.getCount() > 0) {
+ sender.sendMessages(currentBatch);
+ }
+
+ // Dispose of the sender
+ sender.close();
+ // END: com.azure.messaging.servicebus.servicebussenderclient.createMessageBatch#CreateMessageBatchOptions
+ }
+
+ /**
+ * Code snippet demonstrating how to create a size-limited {@link ServiceBusMessageBatch} and send it.
+ */
+ @Test
+ public void batchSizeLimitedAsync() {
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ ServiceBusSenderAsyncClient asyncSender = new ServiceBusClientBuilder()
+ .credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
+ .sender()
+ .queueName(queueName)
+ .buildAsyncClient();
+
+ ServiceBusMessage firstMessage = new ServiceBusMessage(BinaryData.fromBytes("92".getBytes(UTF_8)));
+ firstMessage.getApplicationProperties().put("telemetry", "latency");
+ ServiceBusMessage secondMessage = new ServiceBusMessage(BinaryData.fromBytes("98".getBytes(UTF_8)));
+ secondMessage.getApplicationProperties().put("telemetry", "cpu-temperature");
+
+ // BEGIN: com.azure.messaging.servicebus.servicebusasyncsenderclient.createMessageBatch#CreateMessageBatchOptionsLimitedSize
+ Flux telemetryMessages = Flux.just(firstMessage, secondMessage);
+
+ // Setting `setMaximumSizeInBytes` when creating a batch, limits the size of that batch.
+ // In this case, all the batches created with these options are limited to 256 bytes.
+ CreateMessageBatchOptions options = new CreateMessageBatchOptions()
+ .setMaximumSizeInBytes(256);
+ AtomicReference currentBatch = new AtomicReference<>();
+
+ // Sends the current batch if it is not null and not empty. If the current batch is null, sets it.
+ // Returns the batch to work with.
+ Mono sendBatchAndGetCurrentBatchOperation = Mono.defer(() -> {
+ ServiceBusMessageBatch batch = currentBatch.get();
+
+ if (batch == null) {
+ return asyncSender.createMessageBatch(options);
+ }
+
+ if (batch.getCount() > 0) {
+ return asyncSender.sendMessages(batch).then(
+ asyncSender.createMessageBatch(options)
+ .handle((ServiceBusMessageBatch newBatch, SynchronousSink sink) -> {
+ // Expect that the batch we just sent is the current one. If it is not, there's a race
+ // condition accessing currentBatch reference.
+ if (!currentBatch.compareAndSet(batch, newBatch)) {
+ sink.error(new IllegalStateException(
+ "Expected that the object in currentBatch was batch. But it is not."));
+ } else {
+ sink.next(newBatch);
+ }
+ }));
+ } else {
+ return Mono.just(batch);
+ }
+ });
+
+ // The sample Flux contains two messages, but it could be an infinite stream of telemetry messages.
+ Flux sendMessagesOperation = telemetryMessages.flatMap(message -> {
+ return sendBatchAndGetCurrentBatchOperation.flatMap(batch -> {
+ if (batch.tryAddMessage(message)) {
+ return Mono.empty();
+ } else {
+ return sendBatchAndGetCurrentBatchOperation
+ .handle((ServiceBusMessageBatch newBatch, SynchronousSink sink) -> {
+ if (!newBatch.tryAddMessage(message)) {
+ sink.error(new IllegalArgumentException(
+ "Message is too large to fit in an empty batch."));
+ } else {
+ sink.complete();
+ }
+ });
+ }
+ });
+ });
+
+ // `subscribe` is a non-blocking call. The program will move onto the next line of code when it starts the
+ // operation. Users should use the callbacks on `subscribe` to understand the status of the send operation.
+ Disposable disposable = sendMessagesOperation.then(sendBatchAndGetCurrentBatchOperation)
+ .subscribe(batch -> {
+ System.out.println("Last batch should be empty: " + batch.getCount());
+ }, error -> {
+ System.err.println("Error sending telemetry messages: " + error);
+ }, () -> {
+ System.out.println("Completed.");
+
+ // Clean up sender when done using it. Publishers should be long-lived objects.
+ asyncSender.close();
+ });
+
+ // END: com.azure.messaging.servicebus.servicebusasyncsenderclient.createMessageBatch#CreateMessageBatchOptionsLimitedSize
+ // Dispose of subscription to cancel operations.
+ disposable.dispose();
+ }
+
+ /**
+ * Create a session message.
+ */
+ @Test
+ public void sendSessionMessage() {
+ // BEGIN: com.azure.messaging.servicebus.servicebussenderclient.sendMessage-session
+ // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
+ ServiceBusSenderClient sender = new ServiceBusClientBuilder()
+ .credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
+ .sender()
+ .queueName(sessionEnabledQueueName)
+ .buildClient();
+
+ // Setting sessionId publishes that message to a specific session, in this case, "greeting".
+ ServiceBusMessage message = new ServiceBusMessage("Hello world")
+ .setSessionId("greetings");
+
+ sender.sendMessage(message);
+
+ // Dispose of the sender.
+ sender.close();
+ // END: com.azure.messaging.servicebus.servicebussenderclient.sendMessage-session
}
}