From a03ef19c591f2e135ec66a037f8a03c219251631 Mon Sep 17 00:00:00 2001 From: Alexander Kalankhodzhaev Date: Fri, 11 Feb 2022 13:42:19 +0300 Subject: [PATCH] WsProvider refactoring (#9) --- build.gradle | 2 +- .../transport/ProviderInterface.java | 8 +- .../transport/ProviderStatus.java | 8 + .../transport/ws/WsProvider.java | 194 +++++++++++------- .../transport/ws/WsProviderProxyTest.java | 12 +- .../transport/ws/WsProviderTest.java | 62 ++++-- 6 files changed, 185 insertions(+), 101 deletions(-) create mode 100644 transport/src/main/java/com/strategyobject/substrateclient/transport/ProviderStatus.java diff --git a/build.gradle b/build.gradle index 1c70b5c5..db9e5fa4 100644 --- a/build.gradle +++ b/build.gradle @@ -5,7 +5,7 @@ plugins { allprojects { group = 'com.strategyobject.substrateclient' - version = '0.0.2-SNAPSHOT' + version = '0.0.3-SNAPSHOT' repositories { mavenLocal() diff --git a/transport/src/main/java/com/strategyobject/substrateclient/transport/ProviderInterface.java b/transport/src/main/java/com/strategyobject/substrateclient/transport/ProviderInterface.java index 921eee4b..521fc2e8 100644 --- a/transport/src/main/java/com/strategyobject/substrateclient/transport/ProviderInterface.java +++ b/transport/src/main/java/com/strategyobject/substrateclient/transport/ProviderInterface.java @@ -14,6 +14,12 @@ public interface ProviderInterface { */ boolean hasSubscriptions(); + + /** + * @return Current status + */ + ProviderStatus getStatus(); + /** * Whether the node is connected or not * @@ -30,7 +36,7 @@ public interface ProviderInterface { /** * Manually disconnect from the connection, clearing auto-connect logic */ - void disconnect(); + CompletableFuture disconnect(); /** * Subscribe to provider events diff --git a/transport/src/main/java/com/strategyobject/substrateclient/transport/ProviderStatus.java b/transport/src/main/java/com/strategyobject/substrateclient/transport/ProviderStatus.java new file mode 100644 index 00000000..8be076d5 --- /dev/null +++ b/transport/src/main/java/com/strategyobject/substrateclient/transport/ProviderStatus.java @@ -0,0 +1,8 @@ +package com.strategyobject.substrateclient.transport; + +public enum ProviderStatus { + CONNECTING, + CONNECTED, + DISCONNECTING, + DISCONNECTED +} diff --git a/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/WsProvider.java b/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/WsProvider.java index a38b349e..e6d925b1 100644 --- a/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/WsProvider.java +++ b/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/WsProvider.java @@ -6,22 +6,21 @@ import com.strategyobject.substrateclient.common.eventemitter.EventListener; import com.strategyobject.substrateclient.transport.ProviderInterface; import com.strategyobject.substrateclient.transport.ProviderInterfaceEmitted; +import com.strategyobject.substrateclient.transport.ProviderStatus; import com.strategyobject.substrateclient.transport.SubscriptionHandler; import com.strategyobject.substrateclient.transport.coder.JsonRpcResponse; import com.strategyobject.substrateclient.transport.coder.JsonRpcResponseSingle; import com.strategyobject.substrateclient.transport.coder.JsonRpcResponseSubscription; import com.strategyobject.substrateclient.transport.coder.RpcCoder; import lombok.*; +import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import org.java_websocket.framing.CloseFrame; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.net.URI; import java.net.URISyntaxException; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @Getter @@ -40,9 +39,9 @@ public WsStateSubscription(BiConsumer callBack, } } -@AllArgsConstructor @Getter @Setter +@AllArgsConstructor class WsStateAwaiting { private CompletableFuture callback; private String method; @@ -50,14 +49,14 @@ class WsStateAwaiting { private SubscriptionHandler subscription; } +@Slf4j public class WsProvider implements ProviderInterface, AutoCloseable { - private static final Logger logger = LoggerFactory.getLogger(WsProvider.class); private static final int RESUBSCRIBE_TIMEOUT = 20; private static final Map ALIASES = new HashMap<>(); - private static final ScheduledExecutorService timedOutHandlerCleaner = Executors - .newScheduledThreadPool(1); + private static final ScheduledExecutorService timedOutHandlerCleaner; static { + timedOutHandlerCleaner = Executors.newScheduledThreadPool(1); ALIASES.put("chain_finalisedHead", "chain_finalizedHead"); ALIASES.put("chain_subscribeFinalisedHeads", "chain_subscribeFinalizedHeads"); ALIASES.put("chain_unsubscribeFinalisedHeads", "chain_unsubscribeFinalizedHeads"); @@ -71,10 +70,12 @@ public class WsProvider implements ProviderInterface, AutoCloseable { private final Map subscriptions = new ConcurrentHashMap<>(); private final Map waitingForId = new ConcurrentHashMap<>(); private final int heartbeatInterval; - private final AtomicReference webSocket = new AtomicReference<>(null); private final long responseTimeoutInMs; - private int autoConnectMs; - private volatile boolean isConnected = false; + private volatile int autoConnectMs; + private volatile WebSocketClient webSocket = null; + private volatile CompletableFuture whenConnected = null; + private volatile CompletableFuture whenDisconnected = null; + private volatile ProviderStatus status = ProviderStatus.DISCONNECTED; WsProvider(@NonNull URI endpoint, int autoConnectMs, @@ -93,10 +94,6 @@ public class WsProvider implements ProviderInterface, AutoCloseable { this.headers = headers; this.heartbeatInterval = heartbeatInterval; this.responseTimeoutInMs = responseTimeoutInMs; - - if (autoConnectMs > 0) { - this.connect(); - } } public static Builder builder() { @@ -113,6 +110,11 @@ public boolean hasSubscriptions() { return true; } + @Override + public ProviderStatus getStatus() { + return this.status; + } + /** * {@inheritDoc} * @@ -120,7 +122,7 @@ public boolean hasSubscriptions() { */ @Override public boolean isConnected() { - return this.isConnected; + return this.status == ProviderStatus.CONNECTED; } /** @@ -128,31 +130,45 @@ public boolean isConnected() { *

The {@link com.strategyobject.substrateclient.transport.ws.WsProvider} connects automatically by default, * however if you decided otherwise, you may connect manually using this method. */ - public CompletableFuture connect() { + public synchronized CompletableFuture connect() { + val currentStatus = this.status; + Preconditions.checkState( + currentStatus == ProviderStatus.DISCONNECTED || currentStatus == ProviderStatus.CONNECTING, + "WebSocket is already connected"); + + var inProgress = this.whenConnected; + if (inProgress != null) { + return inProgress; + } + + this.status = ProviderStatus.CONNECTING; + val whenConnected = new CompletableFuture(); + this.whenConnected = whenConnected; try { - Preconditions.checkState( - this.webSocket.compareAndSet( - null, - WebSocket.builder() - .setServerUri(this.endpoint) - .setHttpHeaders(this.headers) - .onClose(this::onSocketClose) - .onError(this::onSocketError) - .onMessage(this::onSocketMessage) - .onOpen(this::onSocketOpen) - .build())); - - val webSocket = this.webSocket.get(); - webSocket.setConnectionLostTimeout(this.heartbeatInterval); - - this.eventEmitter.once(ProviderInterfaceEmitted.CONNECTED, _x -> whenConnected.complete(null)); - webSocket.connect(); + val ws = WebSocket.builder() + .setServerUri(this.endpoint) + .setHttpHeaders(this.headers) + .onClose(this::onSocketClose) + .onError(this::onSocketError) + .onMessage(this::onSocketMessage) + .onOpen(this::onSocketOpen) + .build(); + ws.setConnectionLostTimeout(this.heartbeatInterval); + + this.webSocket = ws; + this.eventEmitter.once(ProviderInterfaceEmitted.CONNECTED, _x -> { + whenConnected.complete(null); + this.whenConnected = null; + }); + ws.connect(); } catch (Exception ex) { - logger.error("Connect error", ex); - this.emit(ProviderInterfaceEmitted.ERROR, ex); + log.error("Connect error", ex); whenConnected.completeExceptionally(ex); + this.emit(ProviderInterfaceEmitted.ERROR, ex); + this.whenConnected = null; + this.status = ProviderStatus.DISCONNECTED; } return whenConnected; @@ -162,25 +178,32 @@ public CompletableFuture connect() { * {@inheritDoc} */ @Override - public void disconnect() { - this.isConnected = false; - // switch off autoConnect, we are in manual mode now - this.autoConnectMs = 0; + public synchronized CompletableFuture disconnect() { + val currentStatus = this.status; + var inProgress = this.whenDisconnected; - try { - this.webSocket.updateAndGet(ws -> { - if (ws != null) { - ws.close(CloseFrame.NORMAL); - } + Preconditions.checkState( + currentStatus == ProviderStatus.CONNECTED || + (currentStatus == ProviderStatus.DISCONNECTING && inProgress != null), + "WebSocket is not connected"); - return null; - }); + if (inProgress != null) { + return inProgress; + } - } catch (Exception ex) { - logger.error("Error disconnecting", ex); - this.emit(ProviderInterfaceEmitted.ERROR, ex); - throw ex; + val whenDisconnected = new CompletableFuture(); + this.whenDisconnected = whenDisconnected; + + // switch off autoConnect, we are in manual mode now + this.autoConnectMs = 0; + this.status = ProviderStatus.DISCONNECTING; + + val ws = this.webSocket; + if (ws != null) { + ws.close(CloseFrame.NORMAL); } + + return whenDisconnected; } /** @@ -200,23 +223,24 @@ public Runnable on(ProviderInterfaceEmitted type, EventListener sub) { private CompletableFuture send(String method, List params, SubscriptionHandler subscription) { + val ws = this.webSocket; Preconditions.checkState( - this.webSocket.get() != null && this.isConnected, + ws != null && this.isConnected(), "WebSocket is not connected"); val jsonRpcRequest = this.coder.encodeObject(method, params); val json = RpcCoder.encodeJson(jsonRpcRequest); val id = jsonRpcRequest.getId(); - - logger.debug("Calling {} {}, {}, {}, {}", id, method, params, json, subscription); + log.debug("Calling {} {}, {}, {}, {}", id, method, params, json, subscription); val whenResponseReceived = new CompletableFuture(); this.handlers.put(id, new WsStateAwaiting<>(whenResponseReceived, method, params, subscription)); - return CompletableFuture.runAsync(() -> this.webSocket.get().send(json)) + return CompletableFuture.runAsync(() -> ws.send(json)) .whenCompleteAsync((_res, ex) -> { if (ex != null) { this.handlers.remove(id); + whenResponseReceived.completeExceptionally(ex); } else { scheduleCleanupIfNoResponseWithinTimeout(id); } @@ -281,12 +305,12 @@ public CompletableFuture unsubscribe(String type, String method, String // a slight complication in solving - since we cannot rely on the sent id, but rather // need to find the actual subscription id to map it if (this.subscriptions.get(subscription) == null) { - logger.info("Unable to find active subscription={}", subscription); + log.info("Unable to find active subscription={}", subscription); whenUnsubscribed.complete(false); } else { this.subscriptions.remove(subscription); - if (this.isConnected() && this.webSocket.get() != null) { + if (this.isConnected() && this.webSocket != null) { return this.send(method, Collections.singletonList(id), null); } @@ -317,44 +341,55 @@ private void emit(ProviderInterfaceEmitted type, Object... args) { this.eventEmitter.emit(type, args); } - private void onSocketClose(int code, String reason) { + private synchronized void onSocketClose(int code, String reason) { + val currentStatus = this.status; + if (currentStatus == ProviderStatus.CONNECTED || currentStatus == ProviderStatus.CONNECTING) { + this.status = ProviderStatus.DISCONNECTING; + } + if (Strings.isNullOrEmpty(reason)) { reason = ErrorCodes.getWSErrorString(code); } + val ws = this.webSocket; val errorMessage = String.format( "Disconnected from %s code: '%s' reason: '%s'", - this.webSocket.get() == null ? this.endpoint : this.webSocket.get().getURI(), + ws == null ? this.endpoint : ws.getURI(), code, reason); if (this.autoConnectMs > 0) { - logger.error(errorMessage); + log.error(errorMessage); } - this.isConnected = false; - this.webSocket.updateAndGet(_ws -> null); - this.emit(ProviderInterfaceEmitted.DISCONNECTED); - // reject all hanging requests val wsClosedException = new WsClosedException(errorMessage); this.handlers.values().forEach(x -> x.getCallback().completeExceptionally(wsClosedException)); this.handlers.clear(); this.waitingForId.clear(); + this.webSocket = null; + this.status = ProviderStatus.DISCONNECTED; + this.emit(ProviderInterfaceEmitted.DISCONNECTED); + val whenDisconnected = this.whenDisconnected; + if (whenDisconnected != null) { + whenDisconnected.complete(null); + this.whenDisconnected = null; + } + if (this.autoConnectMs > 0) { - logger.info("Trying to reconnect to {}", this.endpoint); + log.info("Trying to reconnect to {}", this.endpoint); this.connect(); } } private void onSocketError(Exception ex) { - logger.error("WebSocket error", ex); + log.error("WebSocket error", ex); this.emit(ProviderInterfaceEmitted.ERROR, ex); } private void onSocketMessage(String message) { - logger.debug("Received {}", message); + log.debug("Received {}", message); JsonRpcResponse response = RpcCoder.decodeJson(message); if (Strings.isNullOrEmpty(response.getMethod())) { @@ -369,7 +404,7 @@ private void onSocketMessageResult(JsonRpcResponseSingle response) { val id = response.getId(); val handler = (WsStateAwaiting) this.handlers.get(id); if (handler == null) { - logger.error("Unable to find handler for id={}", id); + log.error("Unable to find handler for id={}", id); return; } @@ -407,13 +442,13 @@ private void onSocketMessageSubscribe(JsonRpcResponseSubscription response) { val method = ALIASES.getOrDefault(response.getMethod(), response.getMethod()); val subId = method + "::" + response.getParams().getSubscription(); - logger.debug("Handling: response =', {}, 'subscription =', {}", response, subId); + log.debug("Handling: response =', {}, 'subscription =', {}", response, subId); val handler = this.subscriptions.get(subId); if (handler == null) { // store the JSON, we could have out-of-order subid coming in this.waitingForId.put(subId, response); - logger.info("Unable to find handler for subscription={}", subId); + log.info("Unable to find handler for subscription={}", subId); return; } @@ -428,17 +463,24 @@ private void onSocketMessageSubscribe(JsonRpcResponseSubscription response) { } } - public void onSocketOpen() { - logger.info("Connected to: {}", this.webSocket.get().getURI()); + public synchronized void onSocketOpen() { + log.info("Connected to: {}", this.webSocket.getURI()); - this.isConnected = true; + this.status = ProviderStatus.CONNECTED; this.emit(ProviderInterfaceEmitted.CONNECTED); this.resubscribe(); } @Override public void close() { - this.disconnect(); + try { + val currentStatus = this.status; + if (currentStatus == ProviderStatus.CONNECTED || currentStatus == ProviderStatus.DISCONNECTING) { + this.disconnect(); + } + } catch (Exception ex) { + log.error("Error while automatic closing", ex); + } } private void resubscribe() { @@ -462,7 +504,7 @@ private void resubscribe() { subscription.getParams(), subscription.getCallBack()); } catch (Exception ex) { - logger.error("Resubscribe error {}", subscription, ex); + log.error("Resubscribe error {}", subscription, ex); return null; } }) @@ -470,7 +512,7 @@ private void resubscribe() { .toArray(CompletableFuture[]::new) ).get(RESUBSCRIBE_TIMEOUT, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException ex) { - logger.error("Resubscribe error", ex); + log.error("Resubscribe error", ex); } } @@ -478,7 +520,7 @@ public static class Builder { private URI endpoint; private int autoConnectMs = 2500; private Map headers = null; - private int heartbeatInterval = 60; + private int heartbeatInterval = 30; private long responseTimeoutInMs = 20000; Builder() { diff --git a/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderProxyTest.java b/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderProxyTest.java index c95fe73c..c4582b3e 100644 --- a/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderProxyTest.java +++ b/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderProxyTest.java @@ -32,19 +32,19 @@ public class WsProviderProxyTest { .withNetwork(network) .withNetworkAliases("toxiproxy"); private static final int HEARTBEAT_INTERVAL = 5; - private static final int WAIT_TIMEOUT = HEARTBEAT_INTERVAL * 2; + private static final int WAIT_TIMEOUT = HEARTBEAT_INTERVAL * 3; final ToxiproxyContainer.ContainerProxy proxy = toxiproxy.getProxy(substrate, 9944); @Test + @SneakyThrows void canReconnect() { try (val wsProvider = WsProvider.builder() .setEndpoint(getWsAddress()) .setHeartbeatsInterval(HEARTBEAT_INTERVAL) .build()) { - await() - .atMost(WAIT_TIMEOUT, TimeUnit.SECONDS) - .until(wsProvider::isConnected); + wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS); + assertTrue(wsProvider.isConnected()); proxy.setConnectionCut(true); await() @@ -68,7 +68,9 @@ void canAutoConnectWhenServerAvailable() { .disableHeartbeats() .build()) { - Thread.sleep(WAIT_TIMEOUT * 1000); + assertThrows( + TimeoutException.class, + () -> wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS)); assertFalse(wsProvider.isConnected()); proxy.setConnectionCut(false); diff --git a/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderTest.java b/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderTest.java index 5ed7f217..dcf25c27 100644 --- a/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderTest.java +++ b/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderTest.java @@ -4,6 +4,7 @@ import com.strategyobject.substrateclient.tests.containers.SubstrateVersion; import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer; import com.strategyobject.substrateclient.transport.ProviderInterfaceEmitted; +import com.strategyobject.substrateclient.transport.ProviderStatus; import lombok.SneakyThrows; import lombok.val; import org.junit.jupiter.api.Test; @@ -38,29 +39,15 @@ void canConnect() { } @Test - void connectFailsWhenConnected() { + void connectReturnsSameFutureWhenCalledMultiple() { try (val wsProvider = WsProvider.builder() .setEndpoint(substrate.getWsAddress()) .build()) { - val executionException = assertThrows( - ExecutionException.class, - () -> wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS)); - assertTrue(executionException.getCause() instanceof IllegalStateException); - } - } + val connectA = wsProvider.connect(); + val connectB = wsProvider.connect(); - @Test - void canAutoConnect() { - try (val wsProvider = WsProvider.builder() - .setEndpoint(substrate.getWsAddress()) - .setAutoConnectDelay(5000) - .build()) { - - assertDoesNotThrow( - () -> await() - .atMost(WAIT_TIMEOUT, TimeUnit.SECONDS) - .until(wsProvider::isConnected)); + assertEquals(connectA, connectB); } } @@ -113,6 +100,23 @@ void canDisconnect() { } } + @Test + @SneakyThrows + void disconnectReturnsSameFutureWhenCalledMultiple() { + try (val wsProvider = WsProvider.builder() + .setEndpoint(substrate.getWsAddress()) + .build()) { + + wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS); + assertTrue(wsProvider.isConnected()); + + val disconnectA = wsProvider.disconnect(); + val disconnectB = wsProvider.disconnect(); + + assertEquals(disconnectA, disconnectB); + } + } + @Test @SneakyThrows void notifiesWhenDisconnected() { @@ -221,7 +225,29 @@ void supportsSubscriptions() { .setEndpoint(substrate.getWsAddress()) .disableAutoConnect() .build()) { + assertTrue(wsProvider.hasSubscriptions()); } } + + @Test + @SneakyThrows + void canReconnectManually() { + try (val wsProvider = WsProvider.builder() + .setEndpoint(substrate.getWsAddress()) + .disableAutoConnect() + .build()) { + + wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS); + assertTrue(wsProvider.isConnected()); + + wsProvider.disconnect().get(WAIT_TIMEOUT, TimeUnit.SECONDS); + assertFalse(wsProvider.isConnected()); + assertEquals(ProviderStatus.DISCONNECTED, wsProvider.getStatus()); + + wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS); + assertTrue(wsProvider.isConnected()); + assertEquals(ProviderStatus.CONNECTED, wsProvider.getStatus()); + } + } } \ No newline at end of file