diff --git a/java/src/main/java/org/eclipse/ditto/client/messaging/internal/WebSocketMessagingProvider.java b/java/src/main/java/org/eclipse/ditto/client/messaging/internal/WebSocketMessagingProvider.java index f9776c7f..c51c6e38 100644 --- a/java/src/main/java/org/eclipse/ditto/client/messaging/internal/WebSocketMessagingProvider.java +++ b/java/src/main/java/org/eclipse/ditto/client/messaging/internal/WebSocketMessagingProvider.java @@ -21,17 +21,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -104,21 +94,22 @@ public final class WebSocketMessagingProvider extends WebSocketAdapter implement private final AtomicBoolean manuallyPerformReconnect = new AtomicBoolean(false); private Runnable channelCloser; - @Nullable private Throwable lastReceivedDittoProtocolError = null; + @Nullable + private Throwable lastReceivedDittoProtocolError = null; private CountDownLatch lastReceivedDittoProtocolErrorLatch = new CountDownLatch(1); /** * Constructs a new {@code WsMessagingProvider}. * - * @param adaptableBus the bus to publish all messages to. + * @param adaptableBus the bus to publish all messages to. * @param messagingConfiguration the specific configuration to apply. * @param authenticationProvider provider for the authentication method with which to open the websocket. - * @param callbackExecutor the executor service to run callbacks with. + * @param callbackExecutor the executor service to run callbacks with. */ private WebSocketMessagingProvider(final AdaptableBus adaptableBus, - final MessagingConfiguration messagingConfiguration, - final AuthenticationProvider authenticationProvider, - final ExecutorService callbackExecutor) { + final MessagingConfiguration messagingConfiguration, + final AuthenticationProvider authenticationProvider, + final ExecutorService callbackExecutor) { this.adaptableBus = adaptableBus; this.messagingConfiguration = messagingConfiguration; this.authenticationProvider = authenticationProvider; @@ -129,7 +120,8 @@ private WebSocketMessagingProvider(final AdaptableBus adaptableBus, subscriptionMessages = new ConcurrentHashMap<>(); webSocket = new AtomicReference<>(); - channelCloser = () -> {}; + channelCloser = () -> { + }; disconnectionHandler = new DisconnectedContext.DisconnectionHandler() { @Override @@ -171,14 +163,14 @@ private static ScheduledExecutorService createConnectExecutor(final String sessi * * @param messagingConfiguration configuration of messaging. * @param authenticationProvider provides authentication. - * @param defaultExecutor the executor for messages. - * @param scheduledExecutor the scheduled executor for scheduling tasks. + * @param defaultExecutor the executor for messages. + * @param scheduledExecutor the scheduled executor for scheduling tasks. * @return the provider. */ public static WebSocketMessagingProvider newInstance(final MessagingConfiguration messagingConfiguration, - final AuthenticationProvider authenticationProvider, - final ExecutorService defaultExecutor, - final ScheduledExecutorService scheduledExecutor) { + final AuthenticationProvider authenticationProvider, + final ExecutorService defaultExecutor, + final ScheduledExecutorService scheduledExecutor) { checkNotNull(messagingConfiguration, "messagingConfiguration"); checkNotNull(authenticationProvider, "authenticationProvider"); checkNotNull(defaultExecutor, "defaultExecutor"); @@ -364,15 +356,32 @@ public void onConnected(final WebSocket websocket, final Map: Subscribing again for messages from backend after reconnection", sessionId); - subscriptionMessages.values().forEach(this::emit); + CompletableFuture isReconnecting = new CompletableFuture<>(); + Runnable checkTask = () -> { + if (!reconnecting.get()) { + isReconnecting.complete(true); // Complete the future if flag is true + } + }; + ScheduledFuture future = connectExecutor.scheduleAtFixedRate(checkTask, 0, 20, TimeUnit.MILLISECONDS); + try { + if (Boolean.TRUE.equals(isReconnecting.get(160, TimeUnit.MILLISECONDS))) { // Ensures 4 retries of the scheduleAtFixedRate method. + future.cancel(true); + LOGGER.debug("Reconnecting is completed -> emitting subscriptionMessages: {}", subscriptionMessages); + subscriptionMessages.values().forEach(this::emit); + } + } catch (TimeoutException | InterruptedException | ExecutionException e) { + isReconnecting.complete(false); + future.cancel(true); + LOGGER.error("Reconnecting failed: {}", e.getMessage()); + } } }); } @Override public void onDisconnected(final WebSocket websocket, final WebSocketFrame serverCloseFrame, - final WebSocketFrame clientCloseFrame, - final boolean closedByServer) { + final WebSocketFrame clientCloseFrame, + final boolean closedByServer) { callbackExecutor.execute(() -> { if (closedByServer) { @@ -390,8 +399,7 @@ public void onDisconnected(final WebSocket websocket, final WebSocketFrame serve sessionId, messagingConfiguration.getEndpointUri()); awaitLastReceivedDittoProtocolError(); handleReconnectionIfEnabled(DisconnectedContext.Source.CLIENT, lastReceivedDittoProtocolError); - } - else { + } else { // only when close() was called we should end here LOGGER.info("Client <{}>: WebSocket connection to endpoint <{}> was closed by user", sessionId, messagingConfiguration.getEndpointUri()); @@ -427,15 +435,15 @@ public void onError(final WebSocket websocket, final WebSocketException cause) { } private CompletionStage connectWithPotentialRetries(final String actionName, - final Supplier webSocket, - final CompletableFuture future, - final boolean retry) { + final Supplier webSocket, + final CompletableFuture future, + final boolean retry) { try { final Predicate isRecoverable = retry ? WebSocketMessagingProvider::isRecoverable : exception -> false; return Retry.retryTo(actionName, - () -> initiateConnection(webSocket.get())) + () -> initiateConnection(webSocket.get())) .inClientSession(sessionId) .withExecutors(connectExecutor, callbackExecutor) .notifyOnError(messagingConfiguration.getConnectionErrorHandler().orElse(null)) @@ -448,7 +456,7 @@ private CompletionStage connectWithPotentialRetries(final String acti } private void handleReconnectionIfEnabled(final DisconnectedContext.Source disconnectionSource, - @Nullable final Throwable throwableSupplier) { + @Nullable final Throwable throwableSupplier) { final Optional> disconnectedListener = messagingConfiguration.getDisconnectedListener(); @@ -492,7 +500,7 @@ private void doReconnect() { private void reconnectWithRetries() { this.connectWithPotentialRetries("reconnect WebSocket", this::recreateWebSocket, new CompletableFuture<>(), - messagingConfiguration.isReconnectEnabled() || manuallyPerformReconnect.get()) + messagingConfiguration.isReconnectEnabled() || manuallyPerformReconnect.get()) .thenAccept(reconnectedWebSocket -> { setWebSocket(reconnectedWebSocket); reconnecting.set(false);