Skip to content

Commit

Permalink
Removing bridge API for reactor. (#12827)
Browse files Browse the repository at this point in the history
Co-authored-by: Hemant Tanwar <[email protected]>
  • Loading branch information
hemanttanwar and Hemant Tanwar authored Jul 6, 2020
1 parent 2fc46dc commit 714c26b
Show file tree
Hide file tree
Showing 3 changed files with 0 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@
* other terminal scenarios. See {@link #receiveMessages()} for more information.</p>
* {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.receive#all}
*
* <p><strong>Receive a maximum number of messages or until max a Duration</strong></p>
* <p>This receives at most 15 messages, or until a duration of 30 seconds elapses. Whichever occurs first.</p>
* {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.receive#int-duration}
*
* <p><strong>Receive messages in {@link ReceiveMode#RECEIVE_AND_DELETE} mode from Service Bus resource</strong></p>
* {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.receiveWithReceiveAndDeleteMode}
*
Expand Down Expand Up @@ -886,30 +882,6 @@ public Flux<ServiceBusReceivedMessageContext> receiveMessages() {
}
}

/**
* Receives a bounded stream of {@link ServiceBusReceivedMessage messages} from the Service Bus entity. This stream
* receives either {@code maxNumberOfMessages} are received or the {@code maxWaitTime} has elapsed.
*
* @param maxNumberOfMessages Maximum number of messages to receive.
* @param maxWaitTime Maximum time to wait.
*
* @return A bounded {@link Flux} of messages.
* @throws NullPointerException if {@code maxWaitTime} is null.
* @throws IllegalArgumentException if {@code maxNumberOfMessages} is less than 1. {@code maxWaitTime} is zero
* or a negative duration.
*/
public Flux<ServiceBusReceivedMessageContext> receiveMessages(int maxNumberOfMessages, Duration maxWaitTime) {
if (maxNumberOfMessages < 1) {
return fluxError(logger, new IllegalArgumentException("'maxNumberOfMessages' cannot be less than 1."));
} else if (maxWaitTime == null) {
return fluxError(logger, new NullPointerException("'maxWaitTime' cannot be null."));
} else if (maxWaitTime.isNegative() || maxWaitTime.isZero()) {
return fluxError(logger, new NullPointerException("'maxWaitTime' cannot be negative or zero."));
}

return receiveMessages().take(maxNumberOfMessages).take(maxWaitTime);
}

/**
* Receives a deferred {@link ServiceBusReceivedMessage message}. Deferred messages can only be received by using
* sequence number.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import reactor.core.publisher.Mono;

import java.util.Map;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -140,35 +139,6 @@ public void receiveAll() {
// END: com.azure.messaging.servicebus.servicebusasyncreceiverclient.receive#all
}

/**
* Receives messages up to a time or duration.
*/
public void receiveMaxTimeDuration() {
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
.connectionString("fake-string")
.receiver()
.queueName("<< QUEUE NAME >>")
.buildAsyncClient();

// BEGIN: com.azure.messaging.servicebus.servicebusasyncreceiverclient.receive#int-duration
Disposable subscription = receiver.receiveMessages(15, Duration.ofSeconds(30))
.flatMap(context -> {
ServiceBusReceivedMessage message = context.getMessage();
System.out.printf("Received message id: %s%n", message.getMessageId());
System.out.printf("Contents of message as string: %s%n", new String(message.getBody(), UTF_8));
return receiver.complete(message.getLockToken());
}).subscribe(aVoid -> System.out.println("Processed message."),
error -> System.out.println("Error occurred: " + error));

// subscribe is a non-blocking call and program flow will continue before messages are fetched.
// END: com.azure.messaging.servicebus.servicebusasyncreceiverclient.receive#int-duration

// When program ends, or you're done receiving all messages.
subscription.dispose();

receiver.close();
}

/**
* Demonstrates how to create a session receiver for a single, first available session.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -933,44 +933,6 @@ void setAndGetSessionState(MessagingEntityType entityType) {
messagesPending.decrementAndGet();
}

@MethodSource("com.azure.messaging.servicebus.IntegrationTestBase#messagingEntityProvider")
@ParameterizedTest
void receivesByNumber(MessagingEntityType entityType) {
// Arrange
setSenderAndReceiver(entityType, TestUtils.USE_CASE_RECEIVE_BY_NUMBER, false);

final String messageId = UUID.randomUUID().toString();
final int number = 10;
final List<ServiceBusMessage> messages = TestUtils.getServiceBusMessages(number, messageId, CONTENTS_BYTES);

sendMessage(messages).block(Duration.ofSeconds(10));

// Act & Assert
StepVerifier.create(receiveAndDeleteReceiver.receiveMessages(messages.size(), Duration.ofSeconds(15))
.doOnNext(next -> messagesPending.decrementAndGet()))
.expectNextCount(number)
.verifyComplete();
}

@MethodSource("com.azure.messaging.servicebus.IntegrationTestBase#messagingEntityProvider")
@ParameterizedTest
void receivesByTime(MessagingEntityType entityType) {
// Arrange

setSenderAndReceiver(entityType, TestUtils.USE_CASE_RECEIVE_BY_TIME, false);
final String messageId = UUID.randomUUID().toString();
final int number = 10;
final List<ServiceBusMessage> messages = TestUtils.getServiceBusMessages(number, messageId, CONTENTS_BYTES);

sendMessage(messages).block(Duration.ofSeconds(15));

// Act & Assert
StepVerifier.create(receiveAndDeleteReceiver.receiveMessages(number + 10, Duration.ofSeconds(15))
.doOnNext(next -> messagesPending.decrementAndGet()))
.expectNextCount(number)
.verifyComplete();
}

/**
* Verifies that we can receive a message from dead letter queue.
*/
Expand Down

0 comments on commit 714c26b

Please sign in to comment.