Skip to content

Commit

Permalink
Fixing NullPointerException when lastKnownErrorTime is not set. (#10673)
Browse files Browse the repository at this point in the history
* Fixing NullPointerException.

* Add null check.
  • Loading branch information
conniey authored May 1, 2020
1 parent 34195ac commit 06166e6
Showing 1 changed file with 20 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static com.azure.core.amqp.implementation.ClientConstants.MAX_AMQP_HEADER_SIZE_BYTES;
import static com.azure.core.amqp.implementation.ClientConstants.SERVER_BUSY_BASE_SLEEP_TIME_IN_SECS;
import static java.nio.charset.StandardCharsets.UTF_8;

/**
Expand Down Expand Up @@ -155,7 +157,7 @@ public Flux<AmqpEndpointState> getEndpointStates() {
public Mono<Void> send(Message message) {
final int payloadSize = messageSerializer.getSize(message);
final int allocationSize =
Math.min(payloadSize + ClientConstants.MAX_AMQP_HEADER_SIZE_BYTES, maxMessageSize);
Math.min(payloadSize + MAX_AMQP_HEADER_SIZE_BYTES, maxMessageSize);
final byte[] bytes = new byte[allocationSize];

int encodedSize;
Expand Down Expand Up @@ -199,7 +201,7 @@ public Mono<Void> send(List<Message> messageBatch) {

int payloadSize = messageSerializer.getSize(amqpMessage);
int allocationSize =
Math.min(payloadSize + ClientConstants.MAX_AMQP_HEADER_SIZE_BYTES, maxMessageSizeTemp);
Math.min(payloadSize + MAX_AMQP_HEADER_SIZE_BYTES, maxMessageSizeTemp);

byte[] messageBytes = new byte[allocationSize];
int messageSizeBytes = amqpMessage.encode(messageBytes, 0, allocationSize);
Expand Down Expand Up @@ -289,9 +291,8 @@ private Mono<Void> send(byte[] bytes, int arrayOffset, int messageFormat) {
return RetryUtil.withRetry(
handler.getEndpointStates().takeUntil(state -> state == EndpointState.ACTIVE),
timeout, retry)
.then(Mono.create(sink -> {
send(new RetriableWorkItem(bytes, arrayOffset, messageFormat, sink, timeout));
}));
.then(Mono.create(sink ->
send(new RetriableWorkItem(bytes, arrayOffset, messageFormat, sink, timeout))));
}
}

Expand Down Expand Up @@ -408,6 +409,7 @@ private void processDeliveredMessage(Delivery delivery) {
if (outcome instanceof Accepted) {
synchronized (errorConditionLock) {
lastKnownLinkError = null;
lastKnownErrorReportedAt = null;
retryAttempts.set(0);
}

Expand All @@ -425,6 +427,7 @@ private void processDeliveredMessage(Delivery delivery) {
if (isGeneralSendError(error.getCondition())) {
synchronized (errorConditionLock) {
lastKnownLinkError = exception;
lastKnownErrorReportedAt = Instant.now();
retryAttempt = retryAttempts.incrementAndGet();
}
} else {
Expand All @@ -433,7 +436,7 @@ private void processDeliveredMessage(Delivery delivery) {

final Duration retryInterval = retry.calculateRetryDelay(exception, retryAttempt);

if (retryInterval.compareTo(workItem.getTimeoutTracker().remaining()) > 0) {
if (retryInterval == null || retryInterval.compareTo(workItem.getTimeoutTracker().remaining()) > 0) {
cleanupFailedSend(workItem, exception);
} else {
workItem.setLastKnownException(exception);
Expand Down Expand Up @@ -520,7 +523,7 @@ public void run() {
return;
}

Exception exceptionUsed = lastKnownLinkError;
Exception cause = lastKnownLinkError;
final Exception lastError;
final Instant lastErrorTime;

Expand All @@ -529,22 +532,25 @@ public void run() {
lastErrorTime = lastKnownErrorReportedAt;
}

if (lastError != null) {
// Means that there was a timeout error on the send link before. So we check if the last time we got an
// error it is after the sleep time buffer we allowed. Or if it is after the operation timeout we allotted.
if (lastError != null && lastErrorTime != null) {
final Instant now = Instant.now();
final Instant duration = now.minusSeconds(ClientConstants.SERVER_BUSY_BASE_SLEEP_TIME_IN_SECS);
final boolean isServerBusy = (lastError instanceof AmqpException) && lastErrorTime.isAfter(duration);
final boolean isLastErrorAfterSleepTime =
lastErrorTime.isAfter(now.minusSeconds(SERVER_BUSY_BASE_SLEEP_TIME_IN_SECS));
final boolean isServerBusy = lastError instanceof AmqpException && isLastErrorAfterSleepTime;
final boolean isLastErrorAfterOperationTimeout = lastErrorTime.isAfter(now.minus(timeout));

final Instant timedOut = now.minusMillis(timeout.toMillis());
exceptionUsed = isServerBusy || lastErrorTime.isAfter(timedOut)
cause = isServerBusy || isLastErrorAfterOperationTimeout
? lastError
: null;
}

// If it is a type of AmqpException, we received this error from the service, otherwise, it is a client-side
// error.
final AmqpException exception;
if (exceptionUsed instanceof AmqpException) {
exception = (AmqpException) exceptionUsed;
if (cause instanceof AmqpException) {
exception = (AmqpException) cause;
} else {
exception = new AmqpException(true, AmqpErrorCondition.TIMEOUT_ERROR,
String.format(Locale.US, "Entity(%s): Send operation timed out", entityPath),
Expand Down

0 comments on commit 06166e6

Please sign in to comment.