From 2bb5ae1dfd284a3ff9f6508d8d23dc1b4efa274d Mon Sep 17 00:00:00 2001 From: Kalaiyarasi Date: Fri, 31 Jan 2020 12:07:19 +0530 Subject: [PATCH 1/2] Implement retry for WebSocket client Resolve https://github.com/ballerina-platform/ballerina-lang/issues/19717 --- ...iterBasedCompletionForCompleteSource1.json | 11 + .../ifWhileConditionContextCompletion4.json | 11 + .../function/matchStatementSuggestions4.json | 11 + .../function/workerDeclarationContext4.json | 11 + .../completion/object/objectTest13.json | 11 + .../packageimport/packageImport1.json | 11 + .../toplevel/globalVarDefPackageContent.json | 11 + .../statementWithMissingSemiColon2.json | 11 + .../statementWithMissingSemiColon3.json | 11 + .../topLevelPackageContentAccess.json | 26 +++ .../src/http/websocket/websocket_client.bal | 24 ++- .../http/websocket/WebSocketConstants.java | 22 +- .../http/websocket/WebSocketException.java | 17 +- .../net/http/websocket/WebSocketUtil.java | 191 +++++++++++++++--- .../http/websocket/client/InitEndpoint.java | 142 +++++++++---- .../http/websocket/client/RetryContext.java | 140 +++++++++++++ .../WebSocketClientConnectorListener.java | 28 ++- .../WebSocketClientHandshakeListener.java | 22 +- ...SocketClientHandshakeListenerForRetry.java | 165 +++++++++++++++ .../server/WebSocketServerService.java | 3 +- .../service/websocket/ClientErrorsTest.java | 9 + .../service/websocket/RetryClientTest.java | 134 ++++++++++++ .../websocket/SimpleProxyTestCommons.java | 2 +- .../websocket/WebSocketTestCommons.java | 2 +- .../server/WebSocketRemoteServer.java | 6 +- .../src/test/resources/testng.xml | 1 + .../src/wsservices/27_client_exceptions.bal | 9 +- .../src/wsservices/30_retry_client.bal | 62 ++++++ .../31_retry_client_with_configurations.bal | 53 +++++ 29 files changed, 1051 insertions(+), 106 deletions(-) create mode 100644 stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/client/RetryContext.java create mode 100644 stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/client/WebSocketClientHandshakeListenerForRetry.java create mode 100644 tests/jballerina-integration-test/src/test/java/org/ballerinalang/test/service/websocket/RetryClientTest.java create mode 100644 tests/jballerina-integration-test/src/test/resources/websocket/src/wsservices/30_retry_client.bal create mode 100644 tests/jballerina-integration-test/src/test/resources/websocket/src/wsservices/31_retry_client_with_configurations.bal diff --git a/language-server/modules/langserver-core/src/test/resources/completion/function/delimiterBasedCompletionForCompleteSource1.json b/language-server/modules/langserver-core/src/test/resources/completion/function/delimiterBasedCompletionForCompleteSource1.json index 237df4a214d4..1536812bb70a 100644 --- a/language-server/modules/langserver-core/src/test/resources/completion/function/delimiterBasedCompletionForCompleteSource1.json +++ b/language-server/modules/langserver-core/src/test/resources/completion/function/delimiterBasedCompletionForCompleteSource1.json @@ -3588,6 +3588,17 @@ "insertText": "WebSocketClientConfiguration", "insertTextFormat": "Snippet" }, + { + "label": "WebSocketRetryConfig", + "kind": "Struct", + "detail": "Record", + "documentation": { + "left": "Retry configurations for WebSocket.\n" + }, + "sortText": "180", + "insertText": "WebSocketRetryConfig", + "insertTextFormat": "Snippet" + }, { "label": "WsConnectionClosureError", "kind": "Event", diff --git a/language-server/modules/langserver-core/src/test/resources/completion/function/ifWhileConditionContextCompletion4.json b/language-server/modules/langserver-core/src/test/resources/completion/function/ifWhileConditionContextCompletion4.json index 4cf9333231b7..481cfde1e082 100644 --- a/language-server/modules/langserver-core/src/test/resources/completion/function/ifWhileConditionContextCompletion4.json +++ b/language-server/modules/langserver-core/src/test/resources/completion/function/ifWhileConditionContextCompletion4.json @@ -1109,6 +1109,17 @@ "insertText": "WebSocketClientConfiguration", "insertTextFormat": "Snippet" }, + { + "label": "WebSocketRetryConfig", + "kind": "Struct", + "detail": "Record", + "documentation": { + "left": "Retry configurations for WebSocket.\n" + }, + "sortText": "180", + "insertText": "WebSocketRetryConfig", + "insertTextFormat": "Snippet" + }, { "label": "WebSocketConnector", "kind": "Interface", diff --git a/language-server/modules/langserver-core/src/test/resources/completion/function/matchStatementSuggestions4.json b/language-server/modules/langserver-core/src/test/resources/completion/function/matchStatementSuggestions4.json index 9e658ac6fd41..919de34baabc 100644 --- a/language-server/modules/langserver-core/src/test/resources/completion/function/matchStatementSuggestions4.json +++ b/language-server/modules/langserver-core/src/test/resources/completion/function/matchStatementSuggestions4.json @@ -3588,6 +3588,17 @@ "insertText": "WebSocketClientConfiguration", "insertTextFormat": "Snippet" }, + { + "label": "WebSocketRetryConfig", + "kind": "Struct", + "detail": "Record", + "documentation": { + "left": "Retry configurations for WebSocket.\n" + }, + "sortText": "180", + "insertText": "WebSocketRetryConfig", + "insertTextFormat": "Snippet" + }, { "label": "WsConnectionClosureError", "kind": "Event", diff --git a/language-server/modules/langserver-core/src/test/resources/completion/function/workerDeclarationContext4.json b/language-server/modules/langserver-core/src/test/resources/completion/function/workerDeclarationContext4.json index 77dc6b4da5a7..17342f6ec501 100644 --- a/language-server/modules/langserver-core/src/test/resources/completion/function/workerDeclarationContext4.json +++ b/language-server/modules/langserver-core/src/test/resources/completion/function/workerDeclarationContext4.json @@ -1109,6 +1109,17 @@ "insertText": "WebSocketClientConfiguration", "insertTextFormat": "Snippet" }, + { + "label": "WebSocketRetryConfig", + "kind": "Struct", + "detail": "Record", + "documentation":{ + "left": "Retry configurations for WebSocket.\n" + }, + "sortText": "180", + "insertText": "WebSocketRetryConfig", + "insertTextFormat": "Snippet" + }, { "label": "WebSocketConnector", "kind": "Interface", diff --git a/language-server/modules/langserver-core/src/test/resources/completion/object/objectTest13.json b/language-server/modules/langserver-core/src/test/resources/completion/object/objectTest13.json index ba6eac02f7bb..aa5754f07a07 100644 --- a/language-server/modules/langserver-core/src/test/resources/completion/object/objectTest13.json +++ b/language-server/modules/langserver-core/src/test/resources/completion/object/objectTest13.json @@ -1109,6 +1109,17 @@ "insertText": "WebSocketClientConfiguration", "insertTextFormat": "Snippet" }, + { + "label": "WebSocketRetryConfig", + "kind": "Struct", + "detail": "Record", + "documentation": { + "left": "Retry configurations for WebSocket.\n" + }, + "sortText": "221", + "insertText": "WebSocketRetryConfig", + "insertTextFormat": "Snippet" + }, { "label": "WebSocketConnector", "kind": "Interface", diff --git a/language-server/modules/langserver-core/src/test/resources/completion/packageimport/packageImport1.json b/language-server/modules/langserver-core/src/test/resources/completion/packageimport/packageImport1.json index 4aad0cf4b256..d088b05ca9a2 100644 --- a/language-server/modules/langserver-core/src/test/resources/completion/packageimport/packageImport1.json +++ b/language-server/modules/langserver-core/src/test/resources/completion/packageimport/packageImport1.json @@ -3588,6 +3588,17 @@ "insertText": "WebSocketClientConfiguration", "insertTextFormat": "Snippet" }, + { + "label": "WebSocketRetryConfig", + "kind": "Struct", + "detail": "Record", + "documentation": { + "left": "Retry configurations for WebSocket.\n" + }, + "sortText": "180", + "insertText": "WebSocketRetryConfig", + "insertTextFormat": "Snippet" + }, { "label": "WsConnectionClosureError", "kind": "Event", diff --git a/language-server/modules/langserver-core/src/test/resources/completion/toplevel/globalVarDefPackageContent.json b/language-server/modules/langserver-core/src/test/resources/completion/toplevel/globalVarDefPackageContent.json index 4a6deb1744b5..1887a402652b 100644 --- a/language-server/modules/langserver-core/src/test/resources/completion/toplevel/globalVarDefPackageContent.json +++ b/language-server/modules/langserver-core/src/test/resources/completion/toplevel/globalVarDefPackageContent.json @@ -3588,6 +3588,17 @@ "insertText": "WebSocketClientConfiguration", "insertTextFormat": "Snippet" }, + { + "label": "WebSocketRetryConfig", + "kind": "Struct", + "detail": "Record", + "documentation": { + "left": "Retry configurations for WebSocket.\n" + }, + "sortText": "180", + "insertText": "WebSocketRetryConfig", + "insertTextFormat": "Snippet" + }, { "label": "WsConnectionClosureError", "kind": "Event", diff --git a/language-server/modules/langserver-core/src/test/resources/completion/toplevel/statementWithMissingSemiColon2.json b/language-server/modules/langserver-core/src/test/resources/completion/toplevel/statementWithMissingSemiColon2.json index 7694e0d25376..08ab456ddbbe 100644 --- a/language-server/modules/langserver-core/src/test/resources/completion/toplevel/statementWithMissingSemiColon2.json +++ b/language-server/modules/langserver-core/src/test/resources/completion/toplevel/statementWithMissingSemiColon2.json @@ -1109,6 +1109,17 @@ "insertText": "WebSocketClientConfiguration", "insertTextFormat": "Snippet" }, + { + "label": "WebSocketRetryConfig", + "kind": "Struct", + "detail": "Record", + "documentation": { + "left": "Retry configurations for WebSocket.\n" + }, + "sortText": "180", + "insertText": "WebSocketRetryConfig", + "insertTextFormat": "Snippet" + }, { "label": "WebSocketConnector", "kind": "Interface", diff --git a/language-server/modules/langserver-core/src/test/resources/completion/toplevel/statementWithMissingSemiColon3.json b/language-server/modules/langserver-core/src/test/resources/completion/toplevel/statementWithMissingSemiColon3.json index f560db61d9be..af4d34cee414 100644 --- a/language-server/modules/langserver-core/src/test/resources/completion/toplevel/statementWithMissingSemiColon3.json +++ b/language-server/modules/langserver-core/src/test/resources/completion/toplevel/statementWithMissingSemiColon3.json @@ -3588,6 +3588,17 @@ "insertText": "WebSocketClientConfiguration", "insertTextFormat": "Snippet" }, + { + "label": "WebSocketRetryConfig", + "kind": "Struct", + "detail": "Record", + "documentation": { + "left": "Retry configurations for WebSocket.\n" + }, + "sortText": "180", + "insertText": "WebSocketRetryConfig", + "insertTextFormat": "Snippet" + }, { "label": "WsConnectionClosureError", "kind": "Event", diff --git a/language-server/modules/langserver-core/src/test/resources/completion/toplevel/topLevelPackageContentAccess.json b/language-server/modules/langserver-core/src/test/resources/completion/toplevel/topLevelPackageContentAccess.json index 0ff9def95102..606e8358a457 100644 --- a/language-server/modules/langserver-core/src/test/resources/completion/toplevel/topLevelPackageContentAccess.json +++ b/language-server/modules/langserver-core/src/test/resources/completion/toplevel/topLevelPackageContentAccess.json @@ -1366,6 +1366,32 @@ } ] }, + { + "label": "WebSocketRetryConfig", + "kind": "Struct", + "detail": "Record", + "documentation": { + "left": "Configurations for reconnecting to the WebSocket.\n" + }, + "sortText": "180", + "insertText": "WebSocketRetryConfig", + "insertTextFormat": "Snippet", + "additionalTextEdits": [ + { + "range": { + "start": { + "line": 0, + "character": 0 + }, + "end": { + "line": 0, + "character": 0 + } + }, + "newText": "import ballerina/http;\n" + } + ] + }, { "label": "AuthnFilter", "kind": "Interface", diff --git a/stdlib/http/src/main/ballerina/src/http/websocket/websocket_client.bal b/stdlib/http/src/main/ballerina/src/http/websocket/websocket_client.bal index b7502b6906c5..6894dd097b9f 100644 --- a/stdlib/http/src/main/ballerina/src/http/websocket/websocket_client.bal +++ b/stdlib/http/src/main/ballerina/src/http/websocket/websocket_client.bal @@ -177,8 +177,12 @@ public type WebSocketClient client object { # `WebSocketClient`needs to be called once to start receiving messages. # + secureSocket - SSL/TLS related options # + maxFrameSize - The maximum payload size of a WebSocket frame in bytes. -# If this is not set or is negative or zero the default frame size of 65536 will be used. -# + webSocketCompressionEnabled - Enable support for compression in WebSocket +# If this is not set, is negative, or is zero, the default frame size of 65536 will be used. +# + webSocketCompressionEnabled - Enable support for compression in the WebSocket. +# + handShakeTimeoutInSeconds - Time (in seconds) that a connection waits to get the response of +# the webSocket handshake. If the timeout exceeds, then the connection is terminated with +# an error.If the value < 0, then the value sets to the default value(300). +# + retryConfig - Retry related configurations. public type WebSocketClientConfiguration record {| service? callbackService = (); string[] subProtocols = []; @@ -188,6 +192,22 @@ public type WebSocketClientConfiguration record {| ClientSecureSocket? secureSocket = (); int maxFrameSize = 0; boolean webSocketCompressionEnabled = true; + int handShakeTimeoutInSeconds = 300; + WebSocketRetryConfig retryConfig?; +|}; + +# Retry configurations for WebSocket. +# +# + maxCount - The maximum number of retry attempts. If the count is zero, the client will retry indefinitely. +# + intervalInMillis - The number of milliseconds to delay before attempting to reconnect. +# + backOffFactor - The rate of increase of the reconnect delay. Allows reconnect attempts to back off when problems +# persist. +# + maxWaitIntervalInMillis - Maximum time of the retry interval in milliseconds. +public type WebSocketRetryConfig record {| + int maxCount = 0; + int intervalInMillis = 1000; + float backOffFactor = 1.0; + int maxWaitIntervalInMillis = 30000; |}; function externWSInitEndpoint(WebSocketClient wsClient) = @java:Method { diff --git a/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/WebSocketConstants.java b/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/WebSocketConstants.java index de13990b4a7e..baaadd2cea6d 100644 --- a/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/WebSocketConstants.java +++ b/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/WebSocketConstants.java @@ -18,8 +18,8 @@ package org.ballerinalang.net.http.websocket; - -import static org.ballerinalang.jvm.util.BLangConstants.BALLERINA_PACKAGE_PREFIX; +import org.ballerinalang.jvm.types.BPackage; +import org.ballerinalang.jvm.util.BLangConstants; /** * Constants of WebSocket. @@ -28,7 +28,7 @@ public class WebSocketConstants { public static final String BALLERINA_ORG = "ballerina"; public static final String PACKAGE_HTTP = "http"; - public static final String FULL_PACKAGE_HTTP = BALLERINA_PACKAGE_PREFIX + PACKAGE_HTTP; + public static final String FULL_PACKAGE_HTTP = BLangConstants.BALLERINA_PACKAGE_PREFIX + PACKAGE_HTTP; public static final String SEPARATOR = ":"; public static final String LISTENER = "Listener"; public static final String WEBSOCKET_CONNECTOR = "WebSocketConnector"; @@ -38,9 +38,11 @@ public class WebSocketConstants { public static final String WEBSOCKET_CLIENT_SERVICE = "WebSocketClientService"; public static final String WSS_SCHEME = "wss"; public static final String WEBSOCKET_CALLER_NAME = PACKAGE_HTTP + SEPARATOR + WEBSOCKET_CALLER; - public static final String FULL_WEBSOCKET_CALLER_NAME = BALLERINA_PACKAGE_PREFIX + WEBSOCKET_CALLER_NAME; + public static final String FULL_WEBSOCKET_CALLER_NAME = BLangConstants.BALLERINA_PACKAGE_PREFIX + + WEBSOCKET_CALLER_NAME; public static final String WEBSOCKET_CLIENT_NAME = PACKAGE_HTTP + SEPARATOR + WEBSOCKET_CLIENT; - public static final String FULL_WEBSOCKET_CLIENT_NAME = BALLERINA_PACKAGE_PREFIX + WEBSOCKET_CLIENT_NAME; + public static final String FULL_WEBSOCKET_CLIENT_NAME = BLangConstants.BALLERINA_PACKAGE_PREFIX + + WEBSOCKET_CLIENT_NAME; public static final String WEBSOCKET_ANNOTATION_CONFIGURATION = "WebSocketServiceConfig"; @@ -49,6 +51,7 @@ public class WebSocketConstants { public static final String ANNOTATION_ATTR_IDLE_TIMEOUT = "idleTimeoutInSeconds"; public static final String ANNOTATION_ATTR_MAX_FRAME_SIZE = "maxFrameSize"; + public static final String RESOURCE_NAME_ON_OPEN = "onOpen"; public static final String RESOURCE_NAME_ON_TEXT = "onText"; public static final String RESOURCE_NAME_ON_BINARY = "onBinary"; @@ -76,6 +79,13 @@ public class WebSocketConstants { public static final String CLIENT_READY_ON_CONNECT = "readyOnConnect"; public static final String WEBSOCKET_UPGRADE_SERVICE_CONFIG = "upgradeService"; + public static final String RETRY_CONFIG = "retryConfig"; + public static final String COUNT_DOWN_LATCH = "countDownLatch"; + public static final String CLIENT_LISTENER = "clientListener"; + public static final String CLIENT_CONNECTOR = "clientConnector"; + public static final String LOG_MESSAGE = "{} {}"; + public static final String ERROR_MESSAGE = "Error occurred: "; + public static final String COMPRESSION_ENABLED_CONFIG = "webSocketCompressionEnabled"; // WebSocketListener field names @@ -98,6 +108,8 @@ public class WebSocketConstants { public static final int STATUS_CODE_FOR_NO_STATUS_CODE_PRESENT = 1005; public static final int DEFAULT_MAX_FRAME_SIZE = 65536; + public static final BPackage PROTOCOL_HTTP_PKG_ID = new BPackage(BLangConstants.BALLERINA_BUILTIN_PKG_PREFIX, + "http"); // Warning suppression public static final String UNCHECKED = "unchecked"; diff --git a/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/WebSocketException.java b/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/WebSocketException.java index 64b28de45e0c..bc3c1a6fa0a3 100644 --- a/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/WebSocketException.java +++ b/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/WebSocketException.java @@ -18,7 +18,10 @@ package org.ballerinalang.net.http.websocket; +import org.ballerinalang.jvm.BallerinaValues; import org.ballerinalang.jvm.values.ErrorValue; +import org.ballerinalang.jvm.values.MapValue; +import org.ballerinalang.net.http.HttpConstants; import static org.ballerinalang.net.http.websocket.WebSocketConstants.ErrorCode; @@ -39,16 +42,26 @@ public WebSocketException(String message) { } public WebSocketException(ErrorCode errorCode, String message) { - super(errorCode.errorCode(), WebSocketUtil.createDetailRecord(message)); + super(errorCode.errorCode(), createDetailRecord(message)); this.message = message; } public WebSocketException(ErrorCode errorCode, String message, ErrorValue cause) { - super(errorCode.errorCode(), WebSocketUtil.createDetailRecord(message, cause)); + super(errorCode.errorCode(), createDetailRecord(message, cause)); this.message = message; } public String detailMessage() { return message; } + + private static MapValue createDetailRecord(String errMsg) { + return createDetailRecord(errMsg, null); + } + + private static MapValue createDetailRecord(String errMsg, ErrorValue cause) { + MapValue detail = BallerinaValues.createRecordValue(HttpConstants.PROTOCOL_HTTP_PKG_ID, + WebSocketConstants.WEBSOCKET_ERROR_DETAILS); + return BallerinaValues.createRecord(detail, errMsg, cause); + } } diff --git a/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/WebSocketUtil.java b/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/WebSocketUtil.java index a7aa3a563c2a..bc0b0e70f99b 100644 --- a/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/WebSocketUtil.java +++ b/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/WebSocketUtil.java @@ -33,6 +33,10 @@ import org.ballerinalang.jvm.values.connector.NonBlockingCallback; import org.ballerinalang.net.http.HttpConstants; import org.ballerinalang.net.http.HttpErrorType; +import org.ballerinalang.net.http.websocket.client.RetryContext; +import org.ballerinalang.net.http.websocket.client.WebSocketClientConnectorListener; +import org.ballerinalang.net.http.websocket.client.WebSocketClientHandshakeListener; +import org.ballerinalang.net.http.websocket.client.WebSocketClientHandshakeListenerForRetry; import org.ballerinalang.net.http.websocket.observability.WebSocketObservabilityUtil; import org.ballerinalang.net.http.websocket.server.WebSocketConnectionInfo; import org.ballerinalang.net.http.websocket.server.WebSocketConnectionManager; @@ -40,22 +44,26 @@ import org.ballerinalang.stdlib.io.utils.IOConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.wso2.transport.http.netty.contract.websocket.ClientHandshakeFuture; +import org.wso2.transport.http.netty.contract.websocket.WebSocketClientConnector; import org.wso2.transport.http.netty.contract.websocket.WebSocketConnection; import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.Locale; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLException; -import static org.ballerinalang.net.http.websocket.WebSocketConstants.ErrorCode; -import static org.ballerinalang.stdlib.io.utils.IOConstants.IO_PACKAGE_ID; - /** * Utility class for WebSocket. */ public class WebSocketUtil { private static final Logger logger = LoggerFactory.getLogger(WebSocketUtil.class); + private static final String CLIENT_ENDPOINT_CONFIG = "config"; public static ObjectValue createAndPopulateWebSocketCaller(WebSocketConnection webSocketConnection, WebSocketServerService wsService, @@ -111,7 +119,7 @@ public static void readFirstFrame(WebSocketConnection webSocketConnection, Objec /** * Closes the connection with the unexpected failure status code. * - * @param webSocketConnection the websocket connection to be closed. + * @param webSocketConnection - the webSocket connection to be closed. */ public static void closeDuringUnexpectedCondition(WebSocketConnection webSocketConnection) { webSocketConnection.terminateConnection(1011, "Unexpected condition"); @@ -137,16 +145,16 @@ public static int findMaxFrameSize(MapValue configs) { } - public static int findIdleTimeoutInSeconds(MapValue configs) { - long timeout = configs.getIntValue(WebSocketConstants.ANNOTATION_ATTR_IDLE_TIMEOUT); - if (timeout <= 0) { - return 0; + public static int findTimeoutInSeconds(MapValue config, String key, int defaultValue) { + long timeout = config.getIntValue(key); + if (timeout < 0) { + return defaultValue; } try { return Math.toIntExact(timeout); } catch (ArithmeticException e) { - logger.warn("The value set for idleTimeoutInSeconds needs to be less than" + Integer.MAX_VALUE + - ". The idleTimeoutInSeconds value is set to " + Integer.MAX_VALUE); + logger.warn("The value set for {} needs to be less than {} .The {} value is set to {} ", key, + Integer.MAX_VALUE, key, Integer.MAX_VALUE); return Integer.MAX_VALUE; } } @@ -155,7 +163,7 @@ public static String[] findNegotiableSubProtocols(MapValue confi return configs.getArrayValue(WebSocketConstants.ANNOTATION_ATTR_SUB_PROTOCOLS).getStringArray(); } - public static String getErrorMessage(Throwable err) { + static String getErrorMessage(Throwable err) { if (err.getMessage() == null) { return "Unexpected error occurred"; } @@ -165,19 +173,19 @@ public static String getErrorMessage(Throwable err) { /** * Creates the appropriate ballerina errors using for the given throwable. * - * @param throwable the throwable to be represented in Ballerina. + * @param throwable - the throwable to be represented in Ballerina. * @return the relevant WebSocketException with proper error code. */ public static WebSocketException createErrorByType(Throwable throwable) { - ErrorCode errorCode = ErrorCode.WsGenericError; + WebSocketConstants.ErrorCode errorCode = WebSocketConstants.ErrorCode.WsGenericError; ErrorValue cause = null; String message = getErrorMessage(throwable); if (throwable instanceof CorruptedWebSocketFrameException) { WebSocketCloseStatus status = ((CorruptedWebSocketFrameException) throwable).closeStatus(); if (status == WebSocketCloseStatus.MESSAGE_TOO_BIG) { - errorCode = ErrorCode.WsPayloadTooBigError; + errorCode = WebSocketConstants.ErrorCode.WsPayloadTooBigError; } else { - errorCode = ErrorCode.WsProtocolError; + errorCode = WebSocketConstants.ErrorCode.WsProtocolError; } } else if (throwable instanceof SSLException) { cause = createErrorCause(throwable.getMessage(), HttpErrorType.SSL_ERROR.getReason(), @@ -185,29 +193,29 @@ public static WebSocketException createErrorByType(Throwable throwable) { message = "SSL/TLS Error"; } else if (throwable instanceof IllegalStateException) { if (throwable.getMessage().contains("frame continuation")) { - errorCode = ErrorCode.WsInvalidContinuationFrameError; + errorCode = WebSocketConstants.ErrorCode.WsInvalidContinuationFrameError; } else if (throwable.getMessage().toLowerCase(Locale.ENGLISH).contains("close frame")) { - errorCode = ErrorCode.WsConnectionClosureError; + errorCode = WebSocketConstants.ErrorCode.WsConnectionClosureError; } } else if (throwable instanceof IllegalAccessException && throwable.getMessage().equals(WebSocketConstants.THE_WEBSOCKET_CONNECTION_HAS_NOT_BEEN_MADE)) { - errorCode = ErrorCode.WsConnectionError; + errorCode = WebSocketConstants.ErrorCode.WsConnectionError; } else if (throwable instanceof TooLongFrameException) { - errorCode = ErrorCode.WsPayloadTooBigError; + errorCode = WebSocketConstants.ErrorCode.WsPayloadTooBigError; } else if (throwable instanceof CodecException) { - errorCode = ErrorCode.WsProtocolError; + errorCode = WebSocketConstants.ErrorCode.WsProtocolError; } else if (throwable instanceof WebSocketHandshakeException) { - errorCode = ErrorCode.WsInvalidHandshakeError; + errorCode = WebSocketConstants.ErrorCode.WsInvalidHandshakeError; } else if (throwable instanceof IOException) { - errorCode = ErrorCode.WsConnectionError; + errorCode = WebSocketConstants.ErrorCode.WsConnectionError; cause = createErrorCause(throwable.getMessage(), IOConstants.ErrorCode.GenericError.errorCode(), - IO_PACKAGE_ID); + IOConstants.IO_PACKAGE_ID); message = "IO Error"; } return new WebSocketException(errorCode, message, cause); } - public static ErrorValue createErrorCause(String message, String reason, BPackage packageName) { + private static ErrorValue createErrorCause(String message, String reason, BPackage packageName) { MapValue detailRecordType = BallerinaValues.createRecordValue( packageName, WebSocketConstants.WEBSOCKET_ERROR_DETAILS); @@ -215,14 +223,137 @@ public static ErrorValue createErrorCause(String message, String reason, BPackag return BallerinaErrors.createError(reason, detailRecord); } - public static MapValue createDetailRecord(String errMsg) { - return createDetailRecord(errMsg, null); + /** + * Establish connection with the endpoint. + * + * @param webSocketClient - the WebSocket client. + * @param wsService - the WebSocket service. + */ + public static void establishWebSocketConnection(ObjectValue webSocketClient, WebSocketService wsService) { + WebSocketClientConnectorListener clientConnectorListener = (WebSocketClientConnectorListener) webSocketClient. + getNativeData(WebSocketConstants.CLIENT_LISTENER); + WebSocketClientConnector clientConnector = (WebSocketClientConnector) webSocketClient. + getNativeData(WebSocketConstants.CLIENT_CONNECTOR); + boolean readyOnConnect = webSocketClient.getMapValue(CLIENT_ENDPOINT_CONFIG).getBooleanValue( + WebSocketConstants.CLIENT_READY_ON_CONNECT); + ClientHandshakeFuture handshakeFuture = clientConnector.connect(); + handshakeFuture.setWebSocketConnectorListener(clientConnectorListener); + CountDownLatch countDownLatch = new CountDownLatch(1); + if (WebSocketUtil.hasRetryConfig(webSocketClient)) { + handshakeFuture.setClientHandshakeListener(new WebSocketClientHandshakeListenerForRetry(webSocketClient, + wsService, clientConnectorListener, readyOnConnect, countDownLatch, + (RetryContext) webSocketClient.getNativeData(WebSocketConstants.RETRY_CONFIG))); + } else { + handshakeFuture.setClientHandshakeListener(new WebSocketClientHandshakeListener(webSocketClient, wsService, + clientConnectorListener, readyOnConnect, countDownLatch)); + } + // Set the countDown latch for every handshake + waitForHandshake(webSocketClient, countDownLatch); + } + + /** + * Check whether the client's config has the retryConfig property. + * + * @param webSocketClient - the WebSocket client. + * @return If the client's config has the retry config, then return true. + */ + public static boolean hasRetryConfig(ObjectValue webSocketClient) { + return webSocketClient.getMapValue(CLIENT_ENDPOINT_CONFIG). + getMapValue(WebSocketConstants.RETRY_CONFIG) != null; + } + + private static void waitForHandshake(ObjectValue webSocketClient, CountDownLatch countDownLatch) { + @SuppressWarnings(WebSocketConstants.UNCHECKED) + long timeout = WebSocketUtil.findTimeoutInSeconds((MapValue) webSocketClient.getMapValue( + CLIENT_ENDPOINT_CONFIG), "handShakeTimeoutInSeconds", 300); + try { + if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) { + throw new WebSocketException(WebSocketConstants.ErrorCode.WsGenericError, + "Waiting for WebSocket handshake has not been successful", WebSocketUtil.createErrorCause( + "Connection timeout", IOConstants.ErrorCode.ConnectionTimedOut.errorCode(), + IOConstants.IO_PACKAGE_ID)); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new WebSocketException(WebSocketConstants.ERROR_MESSAGE + e.getMessage()); + } + } + + /** + * Reconnect when the WebSocket connection is lost. + * + * @param connectionInfo - information about the connection. + * @return If attempts reconnection, then return true. + */ + public static boolean reconnect(WebSocketConnectionInfo connectionInfo) { + ObjectValue webSocketClient = connectionInfo.getWebSocketEndpoint(); + RetryContext retryConnectorConfig = (RetryContext) webSocketClient.getNativeData(WebSocketConstants. + RETRY_CONFIG); + int interval = retryConnectorConfig.getInterval(); + int maxInterval = retryConnectorConfig.getMaxInterval(); + int maxAttempts = retryConnectorConfig.getMaxAttempts(); + int noOfReconnectAttempts = retryConnectorConfig.getReconnectAttempts(); + double backOfFactor = retryConnectorConfig.getBackOfFactor(); + WebSocketService wsService = connectionInfo.getService(); + Date date = new Date(); + SimpleDateFormat formatter = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss"); + if (noOfReconnectAttempts < maxAttempts || maxAttempts == 0) { + retryConnectorConfig.setReconnectAttempts(noOfReconnectAttempts + 1); + String time = formatter.format(date.getTime()); + logger.debug(WebSocketConstants.LOG_MESSAGE, time, "reconnecting..."); + createDelay(calculateWaitingTime(interval, maxInterval, backOfFactor, noOfReconnectAttempts)); + establishWebSocketConnection(webSocketClient, wsService); + return true; + } + logger.debug(WebSocketConstants.LOG_MESSAGE, "Maximum retry attempts but couldn't connect to the server: ", + webSocketClient.getStringValue(WebSocketConstants.CLIENT_URL_CONFIG)); + return false; + } + + /** + * Set the time to wait before attempting to reconnect. + * + * @param interval - interval to wait before trying to reconnect. + */ + private static void createDelay(int interval) { + CountDownLatch countDownLatch = new CountDownLatch(1); + try { + if (!countDownLatch.await(interval, TimeUnit.MILLISECONDS)) { + countDownLatch.countDown(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new WebSocketException(WebSocketConstants.ERROR_MESSAGE + e.getMessage()); + } + } + + /** + * Calculate the waiting time. + * + * @param interval- interval to wait before trying to reconnect. + * @param maxInterval - maximum interval to wait before trying to reconnect. + * @param backOfFactor - the rate of increase of the reconnect delay. + * @param reconnectAttempts - the number of reconnecting attempts. + * @return The time to wait before attempting to reconnect. + */ + private static int calculateWaitingTime(int interval, int maxInterval, double backOfFactor, + int reconnectAttempts) { + interval = (int) (interval * Math.pow(backOfFactor, reconnectAttempts)); + if (interval > maxInterval) { + interval = maxInterval; + } + return interval; } - public static MapValue createDetailRecord(String errMsg, ErrorValue cause) { - MapValue detail = BallerinaValues.createRecordValue(HttpConstants.PROTOCOL_HTTP_PKG_ID, - WebSocketConstants.WEBSOCKET_ERROR_DETAILS); - return BallerinaValues.createRecord(detail, errMsg, cause); + public static WebSocketConnectionInfo getWebSocketOpenConnectionInfo(WebSocketConnection webSocketConnection, + ObjectValue webSocketConnector, + ObjectValue webSocketClient, + WebSocketService wsService) { + WebSocketConnectionInfo connectionInfo = new WebSocketConnectionInfo( + wsService, webSocketConnection, webSocketClient); + webSocketConnector.addNativeData(WebSocketConstants.NATIVE_DATA_WEBSOCKET_CONNECTION_INFO, connectionInfo); + webSocketClient.set(WebSocketConstants.CLIENT_CONNECTOR_FIELD, webSocketConnector); + return connectionInfo; } private WebSocketUtil() { diff --git a/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/client/InitEndpoint.java b/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/client/InitEndpoint.java index a494d71296d6..c131466ec3a9 100644 --- a/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/client/InitEndpoint.java +++ b/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/client/InitEndpoint.java @@ -29,9 +29,9 @@ import org.ballerinalang.net.http.websocket.WebSocketException; import org.ballerinalang.net.http.websocket.WebSocketService; import org.ballerinalang.net.http.websocket.WebSocketUtil; -import org.ballerinalang.stdlib.io.utils.IOConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.wso2.transport.http.netty.contract.HttpWsConnectorFactory; -import org.wso2.transport.http.netty.contract.websocket.ClientHandshakeFuture; import org.wso2.transport.http.netty.contract.websocket.WebSocketClientConnector; import org.wso2.transport.http.netty.contract.websocket.WebSocketClientConnectorConfig; @@ -39,10 +39,6 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import static org.ballerinalang.net.http.websocket.WebSocketConstants.WSS_SCHEME; -import static org.ballerinalang.stdlib.io.utils.IOConstants.IO_PACKAGE_ID; /** * Initialize the WebSocket Client. @@ -51,58 +47,61 @@ */ public class InitEndpoint { + private static final Logger logger = LoggerFactory.getLogger(InitEndpoint.class); + private static final String INTERVAL_IN_MILLIS = "intervalInMillis"; + private static final String MAX_WAIT_INTERVAL = "maxWaitIntervalInMillis"; + private static final String MAX_COUNT = "maxCount"; + private static final String BACK_OF_FACTOR = "backOffFactor"; + public static void initEndpoint(ObjectValue webSocketClient) { @SuppressWarnings(WebSocketConstants.UNCHECKED) MapValue clientEndpointConfig = (MapValue) webSocketClient.getMapValue( HttpConstants.CLIENT_ENDPOINT_CONFIG); Strand strand = Scheduler.getStrand(); - String remoteUrl = webSocketClient.getStringValue(WebSocketConstants.CLIENT_URL_CONFIG); + WebSocketService wsService = validateAndCreateWebSocketService(clientEndpointConfig, strand); + HttpWsConnectorFactory connectorFactory = HttpUtil.createHttpWsConnectionFactory(); + WebSocketClientConnectorConfig clientConnectorConfig = new WebSocketClientConnectorConfig(remoteUrl); String scheme = URI.create(remoteUrl).getScheme(); - Object clientService = clientEndpointConfig.get(WebSocketConstants.CLIENT_SERVICE_CONFIG); - WebSocketService wsService; - if (clientService != null) { - BType param = ((ObjectValue) clientService).getType().getAttachedFunctions()[0].getParameterType()[0]; - if (param == null || !WebSocketConstants.WEBSOCKET_CLIENT_NAME.equals( - param.toString())) { - throw new WebSocketException("The callback service should be a WebSocket Client Service"); - } - wsService = new WebSocketService((ObjectValue) clientService, strand.scheduler); + populateClientConnectorConfig(clientEndpointConfig, clientConnectorConfig, scheme); + // Create the client connector. + WebSocketClientConnector clientConnector = connectorFactory.createWsClientConnector(clientConnectorConfig); + WebSocketClientConnectorListener clientConnectorListener = new WebSocketClientConnectorListener(); + // Add the client connector as a native data when the client is not a failover client + // because when using one URL, there is no need to create the client connector again. + webSocketClient.addNativeData(WebSocketConstants.CLIENT_CONNECTOR, clientConnector); + webSocketClient.addNativeData(WebSocketConstants.CLIENT_LISTENER, clientConnectorListener); + if (WebSocketUtil.hasRetryConfig(webSocketClient)) { + @SuppressWarnings(WebSocketConstants.UNCHECKED) + MapValue retryConfig = (MapValue) clientEndpointConfig.getMapValue( + WebSocketConstants.RETRY_CONFIG); + RetryContext retryConnectorConfig = new RetryContext(); + populateRetryConnectorConfig(retryConfig, retryConnectorConfig); + webSocketClient.addNativeData(WebSocketConstants.RETRY_CONFIG, retryConnectorConfig); + CountDownLatch countDownLatch = new CountDownLatch(1); + webSocketClient.addNativeData(WebSocketConstants.COUNT_DOWN_LATCH, countDownLatch); + WebSocketUtil.establishWebSocketConnection(webSocketClient, wsService); + // Set the count Down latch for initial connection + waitForHandshake(countDownLatch); } else { - wsService = new WebSocketService(strand.scheduler); + WebSocketUtil.establishWebSocketConnection(webSocketClient, wsService); } - WebSocketClientConnectorConfig clientConnectorConfig = new WebSocketClientConnectorConfig(remoteUrl); - populateClientConnectorConfig(clientEndpointConfig, clientConnectorConfig, scheme); - HttpWsConnectorFactory connectorFactory = HttpUtil.createHttpWsConnectionFactory(); - WebSocketClientConnector clientConnector = connectorFactory.createWsClientConnector( - clientConnectorConfig); - WebSocketClientConnectorListener clientConnectorListener = new WebSocketClientConnectorListener(); - boolean readyOnConnect = clientEndpointConfig.getBooleanValue(WebSocketConstants.CLIENT_READY_ON_CONNECT); - ClientHandshakeFuture handshakeFuture = clientConnector.connect(); - handshakeFuture.setWebSocketConnectorListener(clientConnectorListener); - CountDownLatch countDownLatch = new CountDownLatch(1); - handshakeFuture.setClientHandshakeListener( - new WebSocketClientHandshakeListener(webSocketClient, wsService, clientConnectorListener, - readyOnConnect, countDownLatch)); + } + + private static void waitForHandshake(CountDownLatch countDownLatch) { try { - // Wait for 5 minutes before timeout - if (!countDownLatch.await(60 * 5L, TimeUnit.SECONDS)) { - throw new WebSocketException(WebSocketConstants.ErrorCode.WsGenericError, - "Waiting for WebSocket handshake has not been successful", - WebSocketUtil.createErrorCause( - "Connection timeout", - IOConstants.ErrorCode.ConnectionTimedOut.errorCode(), - IO_PACKAGE_ID)); - } + // Wait to call countDown() + countDownLatch.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new WebSocketException("Error occurred: " + e.getMessage()); + throw new WebSocketException(WebSocketConstants.ERROR_MESSAGE + e.getMessage()); } } private static void populateClientConnectorConfig(MapValue clientEndpointConfig, - WebSocketClientConnectorConfig clientConnectorConfig, String scheme) { + WebSocketClientConnectorConfig clientConnectorConfig, + String scheme) { clientConnectorConfig.setAutoRead(false); // Frames are read sequentially in ballerina. clientConnectorConfig.setSubProtocols(WebSocketUtil.findNegotiableSubProtocols(clientEndpointConfig)); @SuppressWarnings(WebSocketConstants.UNCHECKED) @@ -112,7 +111,8 @@ private static void populateClientConnectorConfig(MapValue clien clientConnectorConfig.addHeaders(getCustomHeaders(headerValues)); } - long idleTimeoutInSeconds = WebSocketUtil.findIdleTimeoutInSeconds(clientEndpointConfig); + long idleTimeoutInSeconds = WebSocketUtil.findTimeoutInSeconds(clientEndpointConfig, + WebSocketConstants.ANNOTATION_ATTR_IDLE_TIMEOUT, 0); if (idleTimeoutInSeconds > 0) { clientConnectorConfig.setIdleTimeoutInMillis((int) (idleTimeoutInSeconds * 1000)); } @@ -122,7 +122,7 @@ private static void populateClientConnectorConfig(MapValue clien MapValue secureSocket = clientEndpointConfig.getMapValue(HttpConstants.ENDPOINT_CONFIG_SECURE_SOCKET); if (secureSocket != null) { HttpUtil.populateSSLConfiguration(clientConnectorConfig, secureSocket); - } else if (scheme.equals(WSS_SCHEME)) { + } else if (scheme.equals(WebSocketConstants.WSS_SCHEME)) { clientConnectorConfig.useJavaDefaults(); } clientConnectorConfig.setWebSocketCompressionEnabled( @@ -137,6 +137,62 @@ private static Map getCustomHeaders(MapValue hea return customHeaders; } + /** + * Populate the retry config. + * + * @param retryConfig - the retry config. + * @param retryConnectorConfig - the retry connector config. + */ + private static void populateRetryConnectorConfig(MapValue retryConfig, + RetryContext retryConnectorConfig) { + retryConnectorConfig.setInterval(getIntValue(retryConfig, INTERVAL_IN_MILLIS, 1000)); + retryConnectorConfig.setBackOfFactor(getDoubleValue(retryConfig)); + retryConnectorConfig.setMaxInterval(getIntValue(retryConfig, MAX_WAIT_INTERVAL, 30000)); + retryConnectorConfig.setMaxAttempts(getIntValue(retryConfig, MAX_COUNT, 0)); + } + + /** + * Validate and create the webSocket service. + * + * @param clientEndpointConfig - a client endpoint config. + * @param strand - which holds the observer context being started + * @return webSocketService + */ + private static WebSocketService validateAndCreateWebSocketService(MapValue clientEndpointConfig, + Strand strand) { + Object clientService = clientEndpointConfig.get(WebSocketConstants.CLIENT_SERVICE_CONFIG); + if (clientService != null) { + BType param = ((ObjectValue) clientService).getType().getAttachedFunctions()[0].getParameterType()[0]; + if (param == null || !WebSocketConstants.WEBSOCKET_CLIENT_NAME.equals(param.toString())) { + throw new WebSocketException("The callback service should be a WebSocket Client Service"); + } + return new WebSocketService((ObjectValue) clientService, strand.scheduler); + } else { + return new WebSocketService(strand.scheduler); + } + } + + private static int getIntValue(MapValue configs, String key, int defaultValue) { + int value = Math.toIntExact(configs.getIntValue(key)); + if (value < 0) { + logger.warn("The value set for `{}` needs to be great than than -1. The `{}` value is set to {}", key, key, + defaultValue); + value = defaultValue; + } + return value; + } + + private static Double getDoubleValue(MapValue configs) { + double value = Math.toRadians(configs.getFloatValue(BACK_OF_FACTOR)); + if (value < 1) { + logger.warn("The value set for `backOffFactor` needs to be great than than 1. The `backOffFactor`" + + " value is set to {}", 1.0); + value = 1.0; + } + return value; + } + + private InitEndpoint() { } } diff --git a/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/client/RetryContext.java b/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/client/RetryContext.java new file mode 100644 index 000000000000..fdfc938113aa --- /dev/null +++ b/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/client/RetryContext.java @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2020, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.ballerinalang.net.http.websocket.client; + +/** + * Represents a retry config. + * + * @since 1.2.0 + */ +public class RetryContext { + + private int interval = 0; + private Double backOfFactor = 0.0; + private int maxInterval = 0; + private int maxAttempts = 0; + private int reconnectAttempts = 0; + private boolean firstConnectionMadeSuccessfully = false; + + /** + * Gets the `interval`. + * + * @return interval + */ + public int getInterval() { + return interval; + } + + /** + * Assigns the interval of the `RetryContext` to the `interval` variable. + * + * @param interval - the initial index. + */ + public final void setInterval(int interval) { + this.interval = interval; + } + + /** + * Gets the `backOfFactor`. + * + * @return backOfFactor + */ + public Double getBackOfFactor() { + return backOfFactor; + } + + /** + * Assigns the` backOfFactor` of the `RetryContext` to the `backOfFactor` variable. + * + * @param backOfFactor - the rate of increase of the reconnect delay. + */ + void setBackOfFactor(Double backOfFactor) { + this.backOfFactor = backOfFactor; + } + + /** + * Gets the `maxInterval`. + * + * @return maximum interval + */ + public int getMaxInterval() { + return maxInterval; + } + + /** + * Assigns the `maxInterval` of the `RetryContext` to the `maxInterval` variable. + * + * @param maxInterval - the maximum time of the retry interval. + */ + void setMaxInterval(int maxInterval) { + this.maxInterval = maxInterval; + } + + /** + * Gets the `maxAttempts`. + * + * @return no of maximum attempts + */ + public int getMaxAttempts() { + return maxAttempts; + } + + /** + * Assigns the `maxAttempts` of the `RetryContext` to the `maxAttempts` variable. + * + * @param maxAttempts - the maximum number of retry attempts. + */ + void setMaxAttempts(int maxAttempts) { + this.maxAttempts = maxAttempts; + } + + /** + * Gets the `reconnectAttempts`. + * + * @return no of reconnect attempts + */ + public int getReconnectAttempts() { + return reconnectAttempts; + } + + /** + * Assigns the `reconnectAttempts` of the `RetryContext` to the `reconnectAttempts` variable. + * + * @param reconnectAttempts - the no of reconnect attempts. + */ + public void setReconnectAttempts(int reconnectAttempts) { + this.reconnectAttempts = reconnectAttempts; + } + + /** + * Gets the `firstConnectionMadeSuccessfully`. + * + * @return firstConnectionMadeSuccessfully + */ + boolean isFirstConnectionMadeSuccessfully() { + return firstConnectionMadeSuccessfully; + } + + /** + * Assigns the connection state of the `RetryContext` to the `firstConnectionMadeSuccessfully` variable. + */ + void setFirstConnectionMadeSuccessfully() { + this.firstConnectionMadeSuccessfully = true; + } +} diff --git a/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/client/WebSocketClientConnectorListener.java b/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/client/WebSocketClientConnectorListener.java index 47492964c4e1..2c7121177ee5 100644 --- a/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/client/WebSocketClientConnectorListener.java +++ b/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/client/WebSocketClientConnectorListener.java @@ -18,10 +18,14 @@ package org.ballerinalang.net.http.websocket.client; +import org.ballerinalang.jvm.values.ObjectValue; +import org.ballerinalang.net.http.websocket.WebSocketConstants; import org.ballerinalang.net.http.websocket.WebSocketResourceDispatcher; import org.ballerinalang.net.http.websocket.WebSocketUtil; import org.ballerinalang.net.http.websocket.observability.WebSocketObservabilityUtil; import org.ballerinalang.net.http.websocket.server.WebSocketConnectionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.wso2.transport.http.netty.contract.websocket.WebSocketBinaryMessage; import org.wso2.transport.http.netty.contract.websocket.WebSocketCloseMessage; import org.wso2.transport.http.netty.contract.websocket.WebSocketConnection; @@ -30,13 +34,16 @@ import org.wso2.transport.http.netty.contract.websocket.WebSocketHandshaker; import org.wso2.transport.http.netty.contract.websocket.WebSocketTextMessage; +import java.io.IOException; + /** * Ballerina Connector listener for WebSocket. * * @since 0.93 */ public class WebSocketClientConnectorListener implements WebSocketConnectorListener { - private WebSocketConnectionInfo connectionInfo; + private WebSocketConnectionInfo connectionInfo = null; + private static final Logger logger = LoggerFactory.getLogger(WebSocketClientConnectorListener.class); public void setConnectionInfo(WebSocketConnectionInfo connectionInfo) { this.connectionInfo = connectionInfo; @@ -64,11 +71,30 @@ public void onMessage(WebSocketControlMessage webSocketControlMessage) { @Override public void onMessage(WebSocketCloseMessage webSocketCloseMessage) { + ObjectValue webSocketClient = connectionInfo.getWebSocketEndpoint(); + int statusCode = webSocketCloseMessage.getCloseCode(); + if (WebSocketUtil.hasRetryConfig(webSocketClient)) { + if (statusCode == WebSocketConstants.STATUS_CODE_ABNORMAL_CLOSURE && + WebSocketUtil.reconnect(connectionInfo)) { + return; + } else { + if (statusCode != WebSocketConstants.STATUS_CODE_ABNORMAL_CLOSURE) { + logger.debug(WebSocketConstants.LOG_MESSAGE, "Reconnect attempt not made because of " + + "close initiated by the server: ", webSocketClient.getStringValue(WebSocketConstants. + CLIENT_URL_CONFIG)); + } + } + } WebSocketResourceDispatcher.dispatchOnClose(connectionInfo, webSocketCloseMessage); } @Override public void onError(WebSocketConnection webSocketConnection, Throwable throwable) { + ObjectValue webSocketClient = connectionInfo.getWebSocketEndpoint(); + if (WebSocketUtil.hasRetryConfig(webSocketClient) && throwable instanceof IOException && + WebSocketUtil.reconnect(connectionInfo)) { + return; + } WebSocketResourceDispatcher.dispatchOnError(connectionInfo, throwable); } diff --git a/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/client/WebSocketClientHandshakeListener.java b/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/client/WebSocketClientHandshakeListener.java index e2f4c5bb31f4..430aff950301 100644 --- a/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/client/WebSocketClientHandshakeListener.java +++ b/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/client/WebSocketClientHandshakeListener.java @@ -33,8 +33,6 @@ import java.util.concurrent.CountDownLatch; -import static org.ballerinalang.net.http.HttpConstants.PROTOCOL_HTTP_PKG_ID; - /** * The handshake listener for the client. * @@ -64,10 +62,10 @@ public void onSuccess(WebSocketConnection webSocketConnection, HttpCarbonRespons //using only one service endpoint in the client as there can be only one connection. webSocketClient.set(WebSocketConstants.CLIENT_RESPONSE_FIELD, HttpUtil.createResponseStruct(carbonResponse)); - ObjectValue webSocketConnector = BallerinaValues.createObjectValue(PROTOCOL_HTTP_PKG_ID, + ObjectValue webSocketConnector = BallerinaValues.createObjectValue(WebSocketConstants.PROTOCOL_HTTP_PKG_ID, WebSocketConstants.WEBSOCKET_CONNECTOR); - WebSocketConnectionInfo connectionInfo = getWebSocketOpenConnectionInfo(webSocketConnection, - webSocketConnector); + WebSocketConnectionInfo connectionInfo = WebSocketUtil.getWebSocketOpenConnectionInfo(webSocketConnection, + webSocketConnector, webSocketClient, wsService); WebSocketUtil.populateWebSocketEndpoint(webSocketConnection, webSocketClient); clientConnectorListener.setConnectionInfo(connectionInfo); if (readyOnConnect) { @@ -82,19 +80,11 @@ public void onError(Throwable throwable, HttpCarbonResponse response) { if (response != null) { webSocketClient.set(WebSocketConstants.CLIENT_RESPONSE_FIELD, HttpUtil.createResponseStruct(response)); } - ObjectValue webSocketConnector = BallerinaValues.createObjectValue(PROTOCOL_HTTP_PKG_ID, + ObjectValue webSocketConnector = BallerinaValues.createObjectValue(WebSocketConstants.PROTOCOL_HTTP_PKG_ID, WebSocketConstants.WEBSOCKET_CONNECTOR); - WebSocketConnectionInfo connectionInfo = getWebSocketOpenConnectionInfo(null, webSocketConnector); + WebSocketConnectionInfo connectionInfo = WebSocketUtil.getWebSocketOpenConnectionInfo(null, + webSocketConnector, webSocketClient, wsService); countDownLatch.countDown(); WebSocketResourceDispatcher.dispatchOnError(connectionInfo, throwable); } - - private WebSocketConnectionInfo getWebSocketOpenConnectionInfo(WebSocketConnection webSocketConnection, - ObjectValue webSocketConnector) { - WebSocketConnectionInfo connectionInfo = new WebSocketConnectionInfo( - wsService, webSocketConnection, webSocketClient); - webSocketConnector.addNativeData(WebSocketConstants.NATIVE_DATA_WEBSOCKET_CONNECTION_INFO, connectionInfo); - webSocketClient.set(WebSocketConstants.CLIENT_CONNECTOR_FIELD, webSocketConnector); - return connectionInfo; - } } diff --git a/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/client/WebSocketClientHandshakeListenerForRetry.java b/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/client/WebSocketClientHandshakeListenerForRetry.java new file mode 100644 index 000000000000..85dc1254ed5a --- /dev/null +++ b/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/client/WebSocketClientHandshakeListenerForRetry.java @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2020, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.ballerinalang.net.http.websocket.client; + +import org.ballerinalang.jvm.BallerinaValues; +import org.ballerinalang.jvm.values.ObjectValue; +import org.ballerinalang.net.http.HttpUtil; +import org.ballerinalang.net.http.websocket.WebSocketConstants; +import org.ballerinalang.net.http.websocket.WebSocketResourceDispatcher; +import org.ballerinalang.net.http.websocket.WebSocketService; +import org.ballerinalang.net.http.websocket.WebSocketUtil; +import org.ballerinalang.net.http.websocket.observability.WebSocketObservabilityUtil; +import org.ballerinalang.net.http.websocket.server.WebSocketConnectionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wso2.transport.http.netty.contract.websocket.ClientHandshakeListener; +import org.wso2.transport.http.netty.contract.websocket.WebSocketConnection; +import org.wso2.transport.http.netty.message.HttpCarbonResponse; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +/** + * The retry handshake listener for the client. + * + * @since 1.2.0 + */ +public class WebSocketClientHandshakeListenerForRetry implements ClientHandshakeListener { + + private final WebSocketService wsService; + private final WebSocketClientConnectorListener clientConnectorListener; + private final boolean readyOnConnect; + private final ObjectValue webSocketClient; + private CountDownLatch countDownLatch; + private RetryContext retryConfig; + private static final Logger logger = LoggerFactory.getLogger(WebSocketClientHandshakeListenerForRetry.class); + + public WebSocketClientHandshakeListenerForRetry(ObjectValue webSocketClient, + WebSocketService wsService, + WebSocketClientConnectorListener clientConnectorListener, + boolean readyOnConnect, CountDownLatch countDownLatch, + RetryContext retryConfig) { + this.webSocketClient = webSocketClient; + this.wsService = wsService; + this.clientConnectorListener = clientConnectorListener; + this.readyOnConnect = readyOnConnect; + this.countDownLatch = countDownLatch; + this.retryConfig = retryConfig; + } + + @Override + public void onSuccess(WebSocketConnection webSocketConnection, HttpCarbonResponse carbonResponse) { + ObjectValue webSocketConnector; + if (!retryConfig.isFirstConnectionMadeSuccessfully()) { + webSocketConnector = BallerinaValues.createObjectValue(WebSocketConstants.PROTOCOL_HTTP_PKG_ID, + WebSocketConstants.WEBSOCKET_CONNECTOR); + } else { + webSocketConnector = (ObjectValue) webSocketClient.get(WebSocketConstants.CLIENT_CONNECTOR_FIELD); + } + setWebSocketClient(webSocketClient, carbonResponse, webSocketConnection, retryConfig); + WebSocketConnectionInfo connectionInfo = WebSocketUtil.getWebSocketOpenConnectionInfo(webSocketConnection, + webSocketConnector, webSocketClient, wsService); + clientConnectorListener.setConnectionInfo(connectionInfo); + // Reads the next frame when `readyOnConnect` is true or `isReady` is true. + readNextFrame(readyOnConnect, webSocketClient, webSocketConnection, retryConfig); + countDownLatch.countDown(); + // Calls the countDown() to initial connection's countDown latch + countDownForHandshake(webSocketClient); + WebSocketObservabilityUtil.observeConnection(connectionInfo); + logger.debug(WebSocketConstants.LOG_MESSAGE, "Connected to ", webSocketClient.getStringValue( + WebSocketConstants.CLIENT_URL_CONFIG)); + // The following are created for future connections. + // Checks whether the config has a retry config or not. + // If it has a retry config, set these variables to the default variable. + adjustContextOnSuccess(retryConfig); + } + + @Override + public void onError(Throwable throwable, HttpCarbonResponse response) { + if (response != null) { + webSocketClient.set(WebSocketConstants.CLIENT_RESPONSE_FIELD, HttpUtil.createResponseStruct(response)); + } + ObjectValue webSocketConnector = BallerinaValues.createObjectValue(WebSocketConstants.PROTOCOL_HTTP_PKG_ID, + WebSocketConstants.WEBSOCKET_CONNECTOR); + WebSocketConnectionInfo connectionInfo = WebSocketUtil.getWebSocketOpenConnectionInfo(null, + webSocketConnector, webSocketClient, wsService); + countDownLatch.countDown(); + if (throwable instanceof IOException && WebSocketUtil.reconnect(connectionInfo)) { + return; + } + dispatchOnError(connectionInfo, throwable); + } + + private void setWebSocketClient(ObjectValue webSocketClient, HttpCarbonResponse carbonResponse, + WebSocketConnection webSocketConnection, RetryContext retryConfig) { + //Using only one service endpoint in the client as there can be only one connection. + webSocketClient.set(WebSocketConstants.CLIENT_RESPONSE_FIELD, HttpUtil.createResponseStruct(carbonResponse)); + if (retryConfig.isFirstConnectionMadeSuccessfully()) { + webSocketClient.set(WebSocketConstants.LISTENER_ID_FIELD, webSocketConnection.getChannelId()); + } else { + WebSocketUtil.populateWebSocketEndpoint(webSocketConnection, webSocketClient); + } + } + + /** + * Calls the `readNextFrame()` function. + * + * @param readyOnConnect - the ready on connect + * @param webSocketClient - the WebSocket client. + * @param webSocketConnection - the WebSocket connection. + */ + private static void readNextFrame(boolean readyOnConnect, ObjectValue webSocketClient, + WebSocketConnection webSocketConnection, RetryContext retryConfig) { + if (readyOnConnect || ((boolean) (webSocketClient.get(WebSocketConstants.CONNECTOR_IS_READY_FIELD)))) { + if (retryConfig.isFirstConnectionMadeSuccessfully()) { + webSocketConnection.readNextFrame(); + } else { + WebSocketUtil.readFirstFrame(webSocketConnection, webSocketClient); + } + } + } + + /** + * Counts the initialized `countDownLatch`. + * + * @param webSocketClient - the WebSocket client. + */ + private static void countDownForHandshake(ObjectValue webSocketClient) { + if (webSocketClient.getNativeData(WebSocketConstants.COUNT_DOWN_LATCH) != null) { + ((CountDownLatch) webSocketClient.getNativeData(WebSocketConstants.COUNT_DOWN_LATCH)).countDown(); + webSocketClient.addNativeData(WebSocketConstants.COUNT_DOWN_LATCH, null); + } + } + + /** + * Sets the value into the `retryContext`. + * + * @param retryConfig - the retry context that represents a retry config + */ + private void adjustContextOnSuccess(RetryContext retryConfig) { + retryConfig.setFirstConnectionMadeSuccessfully(); + retryConfig.setReconnectAttempts(0); + } + + private void dispatchOnError(WebSocketConnectionInfo connectionInfo, Throwable throwable) { + countDownForHandshake(webSocketClient); + WebSocketResourceDispatcher.dispatchOnError(connectionInfo, throwable); + } +} diff --git a/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/server/WebSocketServerService.java b/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/server/WebSocketServerService.java index f987e427c7f7..81e2345f258f 100644 --- a/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/server/WebSocketServerService.java +++ b/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/server/WebSocketServerService.java @@ -63,7 +63,8 @@ private void populateConfigs() { MapValue configAnnotation = getServiceConfigAnnotation(); if (configAnnotation != null) { negotiableSubProtocols = WebSocketUtil.findNegotiableSubProtocols(configAnnotation); - idleTimeoutInSeconds = WebSocketUtil.findIdleTimeoutInSeconds(configAnnotation); + idleTimeoutInSeconds = WebSocketUtil.findTimeoutInSeconds(configAnnotation, + WebSocketConstants.ANNOTATION_ATTR_IDLE_TIMEOUT, 0); maxFrameSize = WebSocketUtil.findMaxFrameSize(configAnnotation); } // This will be overridden if there is an upgrade path diff --git a/tests/jballerina-integration-test/src/test/java/org/ballerinalang/test/service/websocket/ClientErrorsTest.java b/tests/jballerina-integration-test/src/test/java/org/ballerinalang/test/service/websocket/ClientErrorsTest.java index 1c861f844d74..b3859dadb3d8 100644 --- a/tests/jballerina-integration-test/src/test/java/org/ballerinalang/test/service/websocket/ClientErrorsTest.java +++ b/tests/jballerina-integration-test/src/test/java/org/ballerinalang/test/service/websocket/ClientErrorsTest.java @@ -91,6 +91,15 @@ public void testHandshakeError() throws InterruptedException { } + @Test(description = "Tests the ready function using the WebSocket client. When `readyOnConnect` is true," + + " calls the `ready()` function.") + public void testReadyOnConnect() throws InterruptedException { + sendTextAndAssertResponse( + "ready", + "error {ballerina/http}WsGenericError message=Already started reading frames"); + + } + private void sendTextAndAssertResponse(String msg, String expected) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); client.setCountDownLatch(countDownLatch); diff --git a/tests/jballerina-integration-test/src/test/java/org/ballerinalang/test/service/websocket/RetryClientTest.java b/tests/jballerina-integration-test/src/test/java/org/ballerinalang/test/service/websocket/RetryClientTest.java new file mode 100644 index 000000000000..3d32b0c7b277 --- /dev/null +++ b/tests/jballerina-integration-test/src/test/java/org/ballerinalang/test/service/websocket/RetryClientTest.java @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http:www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http:www.apache.orglicensesLICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specif ic language governing permissions and limitations + * under the License. + */ + +package org.ballerinalang.test.service.websocket; + +import org.ballerinalang.test.context.BallerinaTestException; +import org.ballerinalang.test.util.websocket.client.WebSocketTestClient; +import org.ballerinalang.test.util.websocket.server.WebSocketRemoteServer; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * This Class tests the retry support of the WebSocket client and server. + */ +@Test(groups = {"websocket-test"}) +public class RetryClientTest extends WebSocketTestCommons { + + private WebSocketRemoteServer remoteServer; + private WebSocketTestClient client; + private static final String url = "ws://localhost:21030"; + private static final int port = 15300; + + @Test(description = "Tests the retry function using the WebSocket client (Restart the server and send the data") + public void testRetry() throws URISyntaxException, InterruptedException, BallerinaTestException { + remoteServer = initiateServer(); + client = initiateClient(url); + sendTextDataAndAssert("Hi"); + restartServerAndGiveTimeClientConnectToServer(); + sendTextDataAndAssert("Hi madam"); + closeConnection(); + } + + @Test(description = "Tests the retry function with the maximum count using the WebSocket client (" + + "Restart the server twice and send the data for every restart)") + public void testMultipleRetryAttempts() throws URISyntaxException, InterruptedException, BallerinaTestException { + remoteServer = initiateServer(); + client = initiateClient(url); + sendTextDataAndAssert("Hi"); + restartServerAndGiveTimeClientConnectToServer(); + sendTextDataAndAssert("Hi madam"); + restartServerAndGiveTimeClientConnectToServer(); + sendBinaryDataAndAssert(); + closeConnection(); + } + + @Test(description = "Tests the retry function using the WebSocket client (Restart the server and " + + "check the maximum count") + public void testBinaryFrameForRetryWithMaxCount() throws URISyntaxException, InterruptedException, + BallerinaTestException { + remoteServer = initiateServer(); + client = initiateClient("ws://localhost:21031"); + sendBinaryDataAndAssert(); + restartServerAndGiveTimeClientConnectToServer(); + CountDownLatch latchForSendBinary = new CountDownLatch(1); + client.setCountDownLatch(latchForSendBinary); + client.sendBinary(ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 6})); + latchForSendBinary.await(TIMEOUT_IN_SECS, TimeUnit.SECONDS); + Assert.assertNull(client.getBufferReceived()); + closeConnection(); + } + + @Test(description = "Tests the `countDownLatch` for the retry function using the WebSocket client (" + + "Restart the server and check the countDownLatch for handshake)") + public void testCountdownLatchForRetry() throws URISyntaxException, InterruptedException, BallerinaTestException { + remoteServer = initiateServer(); + client = initiateClient(url); + sendBinaryDataAndAssert(); + restartServerAndGiveTimeClientConnectToServer(); + sendBinaryDataAndAssert(); + closeConnection(); + } + + private WebSocketRemoteServer initiateServer() throws InterruptedException, BallerinaTestException { + remoteServer = new WebSocketRemoteServer(port); + remoteServer.run(); + return remoteServer; + } + + private WebSocketTestClient initiateClient(String url) throws InterruptedException, URISyntaxException { + client = new WebSocketTestClient(url); + client.handshake(); + return client; + } + + private void closeConnection() throws InterruptedException { + client.shutDown(); + remoteServer.stop(); + } + + private void restartServerAndGiveTimeClientConnectToServer() throws InterruptedException, BallerinaTestException { + remoteServer.stop(); + CountDownLatch latchForRestart = new CountDownLatch(1); + latchForRestart.await(7, TimeUnit.SECONDS); + remoteServer.run(); + latchForRestart.await(2, TimeUnit.SECONDS); + } + + private void sendTextDataAndAssert(String text) throws InterruptedException { + CountDownLatch countDownLatch = new CountDownLatch(1); + client.setCountDownLatch(countDownLatch); + client.sendText(text); + countDownLatch.await(TIMEOUT_IN_SECS, TimeUnit.SECONDS); + Assert.assertEquals(client.getTextReceived(), text); + } + + private void sendBinaryDataAndAssert() throws InterruptedException { + ByteBuffer data = ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 6}); + CountDownLatch countDownLatch = new CountDownLatch(1); + client.setCountDownLatch(countDownLatch); + client.sendBinary(data); + countDownLatch.await(TIMEOUT_IN_SECS, TimeUnit.SECONDS); + Assert.assertEquals(client.getBufferReceived(), data); + } +} diff --git a/tests/jballerina-integration-test/src/test/java/org/ballerinalang/test/service/websocket/SimpleProxyTestCommons.java b/tests/jballerina-integration-test/src/test/java/org/ballerinalang/test/service/websocket/SimpleProxyTestCommons.java index ce3de0252ec7..85e707572f4a 100644 --- a/tests/jballerina-integration-test/src/test/java/org/ballerinalang/test/service/websocket/SimpleProxyTestCommons.java +++ b/tests/jballerina-integration-test/src/test/java/org/ballerinalang/test/service/websocket/SimpleProxyTestCommons.java @@ -83,7 +83,7 @@ public void testSendBinary() throws URISyntaxException, InterruptedException { } @AfterClass(description = "Stops Ballerina") - public void cleanup() { + public void cleanup() throws InterruptedException { remoteServer.stop(); } } diff --git a/tests/jballerina-integration-test/src/test/java/org/ballerinalang/test/service/websocket/WebSocketTestCommons.java b/tests/jballerina-integration-test/src/test/java/org/ballerinalang/test/service/websocket/WebSocketTestCommons.java index 6987a978215d..80ab8f5a84a3 100644 --- a/tests/jballerina-integration-test/src/test/java/org/ballerinalang/test/service/websocket/WebSocketTestCommons.java +++ b/tests/jballerina-integration-test/src/test/java/org/ballerinalang/test/service/websocket/WebSocketTestCommons.java @@ -46,7 +46,7 @@ public void start() throws BallerinaTestException { int[] requiredPorts = new int[]{21001, 21002, 21003, 21004, 21005, 21006, 21007, 21008, 21009, 21010, 21011, 21022, 21021, 21012, 21013, 21014, 21015, 21016, 21017, 21018, 21019, 21020, 21023, 21024, 21025, 21026, - 21027, 21028, 21029}; + 21027, 21028, 21029, 21030, 21031, 21032}; String balFile = new File("src" + File.separator + "test" + File.separator + "resources" + File.separator + "websocket").getAbsolutePath(); String keyStore = StringEscapeUtils.escapeJava( diff --git a/tests/jballerina-integration-test/src/test/java/org/ballerinalang/test/util/websocket/server/WebSocketRemoteServer.java b/tests/jballerina-integration-test/src/test/java/org/ballerinalang/test/util/websocket/server/WebSocketRemoteServer.java index 497ad3703d3e..440860ff13d1 100644 --- a/tests/jballerina-integration-test/src/test/java/org/ballerinalang/test/util/websocket/server/WebSocketRemoteServer.java +++ b/tests/jballerina-integration-test/src/test/java/org/ballerinalang/test/util/websocket/server/WebSocketRemoteServer.java @@ -63,10 +63,10 @@ public void run() throws InterruptedException, BallerinaTestException { bootstrap.bind(port).sync(); } - public void stop() { + public void stop() throws InterruptedException { log.info("Shutting down websocket remote server at '" + port + "'"); - bossGroup.shutdownGracefully(); - workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully().sync(); + workerGroup.shutdownGracefully().sync(); Utils.waitForPortsToClose(new int[]{port}, 30000); } } diff --git a/tests/jballerina-integration-test/src/test/resources/testng.xml b/tests/jballerina-integration-test/src/test/resources/testng.xml index cf5a27231224..6d2f7cae0d5e 100644 --- a/tests/jballerina-integration-test/src/test/resources/testng.xml +++ b/tests/jballerina-integration-test/src/test/resources/testng.xml @@ -185,6 +185,7 @@ + diff --git a/tests/jballerina-integration-test/src/test/resources/websocket/src/wsservices/27_client_exceptions.bal b/tests/jballerina-integration-test/src/test/resources/websocket/src/wsservices/27_client_exceptions.bal index 026c3120d931..0f261271e32e 100644 --- a/tests/jballerina-integration-test/src/test/resources/websocket/src/wsservices/27_client_exceptions.bal +++ b/tests/jballerina-integration-test/src/test/resources/websocket/src/wsservices/27_client_exceptions.bal @@ -49,7 +49,14 @@ service clientError on new http:Listener(21027) { checkpanic wsEp->pushText(err.toString()); } } else if (text == "handshake") { - http:WebSocketClient wsClientEp = new (REMOTE_BACKEND_URL, {callbackService: errorResourceService, subProtocols: ["abc"]}); + http:WebSocketClient wsClientEp = new (REMOTE_BACKEND_URL, {callbackService: errorResourceService, + subProtocols: ["abc"]}); + } else if (text == "ready") { + http:WebSocketClient wsClientEp = new (REMOTE_BACKEND_URL, {callbackService: errorResourceService}); + var returnVal = wsClientEp->ready(); + if (returnVal is error) { + checkpanic wsEp->pushText(returnVal.toString()); + } } else { checkpanic wsEp->pushText(text); } diff --git a/tests/jballerina-integration-test/src/test/resources/websocket/src/wsservices/30_retry_client.bal b/tests/jballerina-integration-test/src/test/resources/websocket/src/wsservices/30_retry_client.bal new file mode 100644 index 000000000000..e414a8d70c86 --- /dev/null +++ b/tests/jballerina-integration-test/src/test/resources/websocket/src/wsservices/30_retry_client.bal @@ -0,0 +1,62 @@ +// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 Inc. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/http; + +@http:WebSocketServiceConfig { +} +service on new http:Listener(21030) { + + resource function onOpen(http:WebSocketCaller wsEp) { + http:WebSocketClient wsClientEp = new("ws://localhost:15300/websocket", { callbackService: + retryClientCallbackService, readyOnConnect: false, retryConfig: {}, handShakeTimeoutInSeconds: 5}); + wsEp.setAttribute(ASSOCIATED_CONNECTION, wsClientEp); + wsClientEp.setAttribute(ASSOCIATED_CONNECTION, wsEp); + checkpanic wsClientEp->ready(); + } + + resource function onText(http:WebSocketCaller wsEp, string text) { + http:WebSocketClient clientEp = getAssociatedClientEndpoint(wsEp); + checkpanic clientEp->pushText(text); + } + + resource function onBinary(http:WebSocketCaller wsEp, byte[] data) { + http:WebSocketClient clientEp = getAssociatedClientEndpoint(wsEp); + checkpanic clientEp->pushBinary(data); + } + + resource function onClose(http:WebSocketCaller wsEp, int statusCode, string reason) { + http:WebSocketClient clientEp = getAssociatedClientEndpoint(wsEp); + checkpanic clientEp->close(statusCode, reason); + } +} + +service retryClientCallbackService = @http:WebSocketServiceConfig {} service { + resource function onText(http:WebSocketClient wsEp, string text) { + http:WebSocketCaller serviceEp = getAssociatedListener(wsEp); + checkpanic serviceEp->pushText(text); + } + + resource function onBinary(http:WebSocketClient wsEp, byte[] data) { + http:WebSocketCaller serviceEp = getAssociatedListener(wsEp); + checkpanic serviceEp->pushBinary(data); + } + + resource function onClose(http:WebSocketClient wsEp, int statusCode, string reason) { + http:WebSocketCaller serviceEp = getAssociatedListener(wsEp); + checkpanic serviceEp->close(statusCode = statusCode, reason = reason); + } +}; diff --git a/tests/jballerina-integration-test/src/test/resources/websocket/src/wsservices/31_retry_client_with_configurations.bal b/tests/jballerina-integration-test/src/test/resources/websocket/src/wsservices/31_retry_client_with_configurations.bal new file mode 100644 index 000000000000..7c8696588e2e --- /dev/null +++ b/tests/jballerina-integration-test/src/test/resources/websocket/src/wsservices/31_retry_client_with_configurations.bal @@ -0,0 +1,53 @@ +// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 Inc. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/http; + +@http:WebSocketServiceConfig { +} +service on new http:Listener(21031) { + + resource function onOpen(http:WebSocketCaller wsEp) { + http:WebSocketClient wsClientEp = new("ws://localhost:15300/websocket", { callbackService: + retryCallbackService, readyOnConnect: false, retryConfig: {maxCount: 10, intervalInMillis: 500, + backOffFactor: 0.5}}); + wsEp.setAttribute(ASSOCIATED_CONNECTION, wsClientEp); + wsClientEp.setAttribute(ASSOCIATED_CONNECTION, wsEp); + checkpanic wsClientEp->ready(); + } + + resource function onText(http:WebSocketCaller wsEp, string text) { + http:WebSocketClient clientEp = getAssociatedClientEndpoint(wsEp); + checkpanic clientEp->pushText(text); + } + + resource function onBinary(http:WebSocketCaller wsEp, byte[] data) { + http:WebSocketClient clientEp = getAssociatedClientEndpoint(wsEp); + checkpanic clientEp->pushBinary(data); + } +} + +service retryCallbackService = @http:WebSocketServiceConfig {} service { + resource function onText(http:WebSocketClient wsEp, string text) { + http:WebSocketCaller serviceEp = getAssociatedListener(wsEp); + checkpanic serviceEp->pushText(text); + } + + resource function onBinary(http:WebSocketClient wsEp, byte[] data) { + http:WebSocketCaller serviceEp = getAssociatedListener(wsEp); + checkpanic serviceEp->pushBinary(data); + } +}; From de7e116b16df3c7f4ce34193a1b87defe5096727 Mon Sep 17 00:00:00 2001 From: Kalaiyarasi Date: Fri, 31 Jan 2020 14:39:48 +0530 Subject: [PATCH 2/2] Make the change in the method comment --- .../ballerinalang/net/http/websocket/client/InitEndpoint.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/client/InitEndpoint.java b/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/client/InitEndpoint.java index c131466ec3a9..ae40f6e998d1 100644 --- a/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/client/InitEndpoint.java +++ b/stdlib/http/src/main/java/org/ballerinalang/net/http/websocket/client/InitEndpoint.java @@ -155,7 +155,7 @@ private static void populateRetryConnectorConfig(MapValue retryC * Validate and create the webSocket service. * * @param clientEndpointConfig - a client endpoint config. - * @param strand - which holds the observer context being started + * @param strand - which holds the observer context being started. * @return webSocketService */ private static WebSocketService validateAndCreateWebSocketService(MapValue clientEndpointConfig,