From b31390134d2643fa16577b10bbecf1fb90d2bcae Mon Sep 17 00:00:00 2001 From: Connie Yau Date: Wed, 6 Oct 2021 12:58:14 -0700 Subject: [PATCH] Update log messages in core AMQP (#24578) * Fixing log messages in ReactorReceiver. * Fix UnsupportedOperationException. * Fix log messages in RequestResponseChannel. * Add connectionId to all ReactorExecutor messages. * Remove duplicated logging in ReactorConnection. --- .../implementation/AmqpChannelProcessor.java | 4 ++-- .../implementation/ReactorConnection.java | 2 -- .../amqp/implementation/ReactorExecutor.java | 18 +++++++++-------- .../amqp/implementation/ReactorReceiver.java | 8 ++++---- .../RequestResponseChannel.java | 20 +++++++++---------- 5 files changed, 26 insertions(+), 26 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java index 0ef34bf9672ab..560fba1933885 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java @@ -257,8 +257,8 @@ public void subscribe(CoreSubscriber actual) { } subscribers.add(subscriber); - logger.verbose("namespace[{}] entityPath[{}]: Added a subscriber {} to AMQP channel processor. Total " - + "subscribers = {}", fullyQualifiedNamespace, entityPath, subscriber, subscribers.size()); + logger.verbose("namespace[{}] entityPath[{}] subscribers[{}]: Added a subscriber.", + fullyQualifiedNamespace, entityPath, subscribers.size()); if (!isRetryPending.get()) { requestUpstream(); diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java index acd900de38455..afacff4ae7377 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java @@ -565,12 +565,10 @@ private synchronized Connection getOrCreateConnection() throws IOException { // remaining work after OperationTimeout has elapsed and closes afterwards. reactorProvider.getReactorDispatcher().getShutdownSignal() .flatMap(signal -> { - logger.info("Shutdown signal received from reactor provider."); reactorExceptionHandler.onConnectionShutdown(signal); return executorCloseMono; }) .onErrorResume(error -> { - logger.info("Error received from reactor provider.", error); reactorExceptionHandler.onConnectionError(error); return executorCloseMono; }) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorExecutor.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorExecutor.java index 51752e30d3655..6a7d80476ea12 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorExecutor.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorExecutor.java @@ -27,7 +27,7 @@ * Schedules the proton-j reactor to continuously run work. */ class ReactorExecutor implements AsyncCloseable { - private static final String LOG_MESSAGE = "connectionId[{}], message[{}]"; + private static final String LOG_MESSAGE = "connectionId[{}] message[{}]"; private final ClientLogger logger = new ClientLogger(ReactorExecutor.class); private final AtomicBoolean hasStarted = new AtomicBoolean(); @@ -79,7 +79,8 @@ void start() { private void run() { // If this hasn't been disposed of, and we're trying to run work items on it, log a warning and return. if (!isDisposed.get() && !hasStarted.get()) { - logger.warning("Cannot run work items on ReactorExecutor if ReactorExecutor.start() has not been invoked."); + logger.warning(LOG_MESSAGE, connectionId, + "Cannot run work items on ReactorExecutor if ReactorExecutor.start() has not been invoked."); return; } @@ -136,7 +137,7 @@ private void run() { } finally { if (!rescheduledReactor) { if (hasStarted.getAndSet(false)) { - logger.verbose("Scheduling reactor to complete pending tasks."); + logger.verbose(LOG_MESSAGE, connectionId, "Scheduling reactor to complete pending tasks."); scheduleCompletePendingTasks(); } else { final String reason = @@ -158,7 +159,8 @@ private void scheduleCompletePendingTasks() { logger.info(LOG_MESSAGE, connectionId, "Processing all pending tasks and closing old reactor."); try { if (reactor.process()) { - logger.verbose("Had more tasks to process on reactor but it is shutting down."); + logger.verbose(LOG_MESSAGE, connectionId, + "Had more tasks to process on reactor but it is shutting down."); } reactor.stop(); @@ -181,17 +183,17 @@ private void scheduleCompletePendingTasks() { try { this.scheduler.schedule(work, timeout.toMillis(), TimeUnit.MILLISECONDS); } catch (RejectedExecutionException e) { - logger.warning("Scheduler was already closed. Manually releasing reactor."); + logger.warning(LOG_MESSAGE, connectionId, "Scheduler was already closed. Manually releasing reactor."); work.run(); } } private void close(String reason, boolean initiatedByClient) { - logger.verbose("Completing close and disposing scheduler. {}", reason); + logger.verbose(LOG_MESSAGE, connectionId, "Completing close and disposing scheduler. {}", reason); scheduler.dispose(); isClosedMono.emitEmpty((signalType, emitResult) -> { - logger.verbose("signalType[{}] emitResult[{}]: Unable to emit close event on reactor", signalType, - emitResult); + logger.verbose("connectionId[{}] signalType[{}] emitResult[{}]: Unable to emit close event on reactor", + connectionId, signalType, emitResult); return false; }); exceptionHandler.onConnectionShutdown(new AmqpShutdownSignal(false, initiatedByClient, reason)); diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java index 2f8659444b18f..086a395fc8e77 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java @@ -83,12 +83,12 @@ protected ReactorReceiver(AmqpConnection amqpConnection, String entityPath, Rece final Integer credits = supplier.get(); if (credits != null && credits > 0) { - logger.info("connectionId[{}] linkName[{}] adding credits[{}]", - handler.getConnectionId(), getLinkName(), creditsLeft, credits); + logger.info("connectionId[{}] linkName[{}] credits[{}] Adding credits.", + handler.getConnectionId(), getLinkName(), credits); receiver.flow(credits); } else { - logger.verbose("connectionId[{}] linkName[{}] There are no credits to add.", - handler.getConnectionId(), getLinkName(), creditsLeft, credits); + logger.verbose("connectionId[{}] linkName[{}] credits[{}] There are no credits to add.", + handler.getConnectionId(), getLinkName(), credits); } sink.success(message); diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java index ddac0dc3f8250..be296b85d981e 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java @@ -169,8 +169,8 @@ protected RequestResponseChannel(AmqpConnection amqpConnection, String connectio receiveLinkHandler.getDeliveredMessages() .map(this::decodeDelivery) .subscribe(message -> { - logger.verbose("connectionId[{}], linkName[{}]: Settling message: {}", connectionId, linkName, - message.getCorrelationId()); + logger.verbose("connectionId[{}], linkName[{}] messageId[{}]: Settling message.", connectionId, + linkName, message.getCorrelationId()); settleMessage(message); }), @@ -214,7 +214,7 @@ protected RequestResponseChannel(AmqpConnection amqpConnection, String connectio }); } catch (IOException | RejectedExecutionException e) { throw logger.logExceptionAsError(new RuntimeException(String.format( - "connectionId[%s], linkName[%s]: Unable to open send and receive link.", connectionId, linkName), e)); + "connectionId[%s] linkName[%s]: Unable to open send and receive link.", connectionId, linkName), e)); } } @@ -235,7 +235,7 @@ public Mono closeAsync() { return Mono.fromRunnable(() -> { logger.info("connectionId[{}] linkName[{}] Timed out waiting for RequestResponseChannel to complete" + " closing. Manually closing.", - connectionId, linkName, error); + connectionId, linkName); onTerminalState("SendLinkHandler"); onTerminalState("ReceiveLinkHandler"); @@ -319,7 +319,7 @@ public Mono sendWithAck(final Message message, DeliveryState deliverySt return RetryUtil.withRetry(onActiveEndpoints, retryOptions, activeEndpointTimeoutMessage) .then(Mono.create(sink -> { try { - logger.verbose("connectionId[{}], linkName[{}]: Scheduling on dispatcher. MessageId[{}]", + logger.verbose("connectionId[{}], linkName[{}] messageId[{}]: Scheduling on dispatcher. ", connectionId, linkName, messageId); unconfirmedSends.putIfAbsent(messageId, sink); @@ -381,9 +381,9 @@ private void settleMessage(Message message) { final MonoSink sink = unconfirmedSends.remove(correlationId); if (sink == null) { - int size = unconfirmedSends.size(); - logger.warning("connectionId[{}] linkName[{}] Received delivery without pending messageId[{}]. size[{}]", - connectionId, linkName, id, size); + logger.warning( + "connectionId[{}] linkName[{}] messageId[{}] Received delivery without pending message.", + connectionId, linkName, id); return; } @@ -411,12 +411,12 @@ private void handleError(Throwable error, String message) { private void onTerminalState(String handlerName) { if (pendingLinkTerminations.get() <= 0) { - logger.verbose("connectionId[{}] linkName[{}]: Already disposed send/receive links."); + logger.verbose("connectionId[{}] linkName[{}] Already disposed send/receive links."); return; } final int remaining = pendingLinkTerminations.decrementAndGet(); - logger.verbose("connectionId[{}] linkName[{}]: {} disposed. Remaining: {}", + logger.verbose("connectionId[{}] linkName[{}] {} disposed. Remaining: {}", connectionId, linkName, handlerName, remaining); if (remaining == 0) {