Skip to content

Commit

Permalink
Update log messages in core AMQP (#24578)
Browse files Browse the repository at this point in the history
* Fixing log messages in ReactorReceiver.

* Fix UnsupportedOperationException.

* Fix log messages in RequestResponseChannel.

* Add connectionId to all ReactorExecutor messages.

* Remove duplicated logging in ReactorConnection.
  • Loading branch information
conniey committed Oct 8, 2021
1 parent 8fe1c85 commit b313901
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ public void subscribe(CoreSubscriber<? super T> actual) {
}

subscribers.add(subscriber);
logger.verbose("namespace[{}] entityPath[{}]: Added a subscriber {} to AMQP channel processor. Total "
+ "subscribers = {}", fullyQualifiedNamespace, entityPath, subscriber, subscribers.size());
logger.verbose("namespace[{}] entityPath[{}] subscribers[{}]: Added a subscriber.",
fullyQualifiedNamespace, entityPath, subscribers.size());

if (!isRetryPending.get()) {
requestUpstream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,12 +565,10 @@ private synchronized Connection getOrCreateConnection() throws IOException {
// remaining work after OperationTimeout has elapsed and closes afterwards.
reactorProvider.getReactorDispatcher().getShutdownSignal()
.flatMap(signal -> {
logger.info("Shutdown signal received from reactor provider.");
reactorExceptionHandler.onConnectionShutdown(signal);
return executorCloseMono;
})
.onErrorResume(error -> {
logger.info("Error received from reactor provider.", error);
reactorExceptionHandler.onConnectionError(error);
return executorCloseMono;
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* Schedules the proton-j reactor to continuously run work.
*/
class ReactorExecutor implements AsyncCloseable {
private static final String LOG_MESSAGE = "connectionId[{}], message[{}]";
private static final String LOG_MESSAGE = "connectionId[{}] message[{}]";

private final ClientLogger logger = new ClientLogger(ReactorExecutor.class);
private final AtomicBoolean hasStarted = new AtomicBoolean();
Expand Down Expand Up @@ -79,7 +79,8 @@ void start() {
private void run() {
// If this hasn't been disposed of, and we're trying to run work items on it, log a warning and return.
if (!isDisposed.get() && !hasStarted.get()) {
logger.warning("Cannot run work items on ReactorExecutor if ReactorExecutor.start() has not been invoked.");
logger.warning(LOG_MESSAGE, connectionId,
"Cannot run work items on ReactorExecutor if ReactorExecutor.start() has not been invoked.");
return;
}

Expand Down Expand Up @@ -136,7 +137,7 @@ private void run() {
} finally {
if (!rescheduledReactor) {
if (hasStarted.getAndSet(false)) {
logger.verbose("Scheduling reactor to complete pending tasks.");
logger.verbose(LOG_MESSAGE, connectionId, "Scheduling reactor to complete pending tasks.");
scheduleCompletePendingTasks();
} else {
final String reason =
Expand All @@ -158,7 +159,8 @@ private void scheduleCompletePendingTasks() {
logger.info(LOG_MESSAGE, connectionId, "Processing all pending tasks and closing old reactor.");
try {
if (reactor.process()) {
logger.verbose("Had more tasks to process on reactor but it is shutting down.");
logger.verbose(LOG_MESSAGE, connectionId,
"Had more tasks to process on reactor but it is shutting down.");
}

reactor.stop();
Expand All @@ -181,17 +183,17 @@ private void scheduleCompletePendingTasks() {
try {
this.scheduler.schedule(work, timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
logger.warning("Scheduler was already closed. Manually releasing reactor.");
logger.warning(LOG_MESSAGE, connectionId, "Scheduler was already closed. Manually releasing reactor.");
work.run();
}
}

private void close(String reason, boolean initiatedByClient) {
logger.verbose("Completing close and disposing scheduler. {}", reason);
logger.verbose(LOG_MESSAGE, connectionId, "Completing close and disposing scheduler. {}", reason);
scheduler.dispose();
isClosedMono.emitEmpty((signalType, emitResult) -> {
logger.verbose("signalType[{}] emitResult[{}]: Unable to emit close event on reactor", signalType,
emitResult);
logger.verbose("connectionId[{}] signalType[{}] emitResult[{}]: Unable to emit close event on reactor",
connectionId, signalType, emitResult);
return false;
});
exceptionHandler.onConnectionShutdown(new AmqpShutdownSignal(false, initiatedByClient, reason));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ protected ReactorReceiver(AmqpConnection amqpConnection, String entityPath, Rece
final Integer credits = supplier.get();

if (credits != null && credits > 0) {
logger.info("connectionId[{}] linkName[{}] adding credits[{}]",
handler.getConnectionId(), getLinkName(), creditsLeft, credits);
logger.info("connectionId[{}] linkName[{}] credits[{}] Adding credits.",
handler.getConnectionId(), getLinkName(), credits);
receiver.flow(credits);
} else {
logger.verbose("connectionId[{}] linkName[{}] There are no credits to add.",
handler.getConnectionId(), getLinkName(), creditsLeft, credits);
logger.verbose("connectionId[{}] linkName[{}] credits[{}] There are no credits to add.",
handler.getConnectionId(), getLinkName(), credits);
}

sink.success(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ protected RequestResponseChannel(AmqpConnection amqpConnection, String connectio
receiveLinkHandler.getDeliveredMessages()
.map(this::decodeDelivery)
.subscribe(message -> {
logger.verbose("connectionId[{}], linkName[{}]: Settling message: {}", connectionId, linkName,
message.getCorrelationId());
logger.verbose("connectionId[{}], linkName[{}] messageId[{}]: Settling message.", connectionId,
linkName, message.getCorrelationId());

settleMessage(message);
}),
Expand Down Expand Up @@ -214,7 +214,7 @@ protected RequestResponseChannel(AmqpConnection amqpConnection, String connectio
});
} catch (IOException | RejectedExecutionException e) {
throw logger.logExceptionAsError(new RuntimeException(String.format(
"connectionId[%s], linkName[%s]: Unable to open send and receive link.", connectionId, linkName), e));
"connectionId[%s] linkName[%s]: Unable to open send and receive link.", connectionId, linkName), e));
}
}

Expand All @@ -235,7 +235,7 @@ public Mono<Void> closeAsync() {
return Mono.fromRunnable(() -> {
logger.info("connectionId[{}] linkName[{}] Timed out waiting for RequestResponseChannel to complete"
+ " closing. Manually closing.",
connectionId, linkName, error);
connectionId, linkName);

onTerminalState("SendLinkHandler");
onTerminalState("ReceiveLinkHandler");
Expand Down Expand Up @@ -319,7 +319,7 @@ public Mono<Message> sendWithAck(final Message message, DeliveryState deliverySt
return RetryUtil.withRetry(onActiveEndpoints, retryOptions, activeEndpointTimeoutMessage)
.then(Mono.create(sink -> {
try {
logger.verbose("connectionId[{}], linkName[{}]: Scheduling on dispatcher. MessageId[{}]",
logger.verbose("connectionId[{}], linkName[{}] messageId[{}]: Scheduling on dispatcher. ",
connectionId, linkName, messageId);
unconfirmedSends.putIfAbsent(messageId, sink);

Expand Down Expand Up @@ -381,9 +381,9 @@ private void settleMessage(Message message) {
final MonoSink<Message> sink = unconfirmedSends.remove(correlationId);

if (sink == null) {
int size = unconfirmedSends.size();
logger.warning("connectionId[{}] linkName[{}] Received delivery without pending messageId[{}]. size[{}]",
connectionId, linkName, id, size);
logger.warning(
"connectionId[{}] linkName[{}] messageId[{}] Received delivery without pending message.",
connectionId, linkName, id);
return;
}

Expand Down Expand Up @@ -411,12 +411,12 @@ private void handleError(Throwable error, String message) {

private void onTerminalState(String handlerName) {
if (pendingLinkTerminations.get() <= 0) {
logger.verbose("connectionId[{}] linkName[{}]: Already disposed send/receive links.");
logger.verbose("connectionId[{}] linkName[{}] Already disposed send/receive links.");
return;
}

final int remaining = pendingLinkTerminations.decrementAndGet();
logger.verbose("connectionId[{}] linkName[{}]: {} disposed. Remaining: {}",
logger.verbose("connectionId[{}] linkName[{}] {} disposed. Remaining: {}",
connectionId, linkName, handlerName, remaining);

if (remaining == 0) {
Expand Down

0 comments on commit b313901

Please sign in to comment.