diff --git a/src/main/java/software/amazon/awssdk/crt/mqtt/MqttClientConnection.java b/src/main/java/software/amazon/awssdk/crt/mqtt/MqttClientConnection.java index cf79504c0..edee81cc5 100644 --- a/src/main/java/software/amazon/awssdk/crt/mqtt/MqttClientConnection.java +++ b/src/main/java/software/amazon/awssdk/crt/mqtt/MqttClientConnection.java @@ -14,6 +14,9 @@ import software.amazon.awssdk.crt.io.SocketOptions; import software.amazon.awssdk.crt.io.TlsContext; import software.amazon.awssdk.crt.mqtt.MqttConnectionConfig; +import software.amazon.awssdk.crt.mqtt5.Mqtt5Client; +import software.amazon.awssdk.crt.mqtt5.Mqtt5ClientOptions; +import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; @@ -49,6 +52,32 @@ void deliver(String topic, byte[] payload, boolean dup, int qos, boolean retain) } } + /** + * Static help function to create a MqttConnectionConfig from a + * Mqtt5ClientOptions + */ + private static MqttConnectionConfig s_toMqtt3ConnectionConfig(Mqtt5ClientOptions mqtt5options) { + MqttConnectionConfig options = new MqttConnectionConfig(); + options.setEndpoint(mqtt5options.getHostName()); + options.setPort(mqtt5options.getPort() != null ? Math.toIntExact(mqtt5options.getPort()) : 0); + options.setSocketOptions(mqtt5options.getSocketOptions()); + if (mqtt5options.getConnectOptions() != null) { + options.setClientId(mqtt5options.getConnectOptions().getClientId()); + options.setKeepAliveSecs( + mqtt5options.getConnectOptions().getKeepAliveIntervalSeconds() != null + ? Math.toIntExact(mqtt5options.getConnectOptions().getKeepAliveIntervalSeconds()) + : 0); + } + options.setCleanSession( + mqtt5options.getSessionBehavior().compareTo(Mqtt5ClientOptions.ClientSessionBehavior.CLEAN) <= 0); + options.setPingTimeoutMs( + mqtt5options.getPingTimeoutMs() != null ? Math.toIntExact(mqtt5options.getPingTimeoutMs()) : 0); + options.setProtocolOperationTimeoutMs(mqtt5options.getAckTimeoutSeconds() != null + ? Math.toIntExact(mqtt5options.getAckTimeoutSeconds()) * 1000 + : 0); + return options; + } + /** * Constructs a new MqttClientConnection. Connections are reusable after being * disconnected. @@ -71,8 +100,62 @@ public MqttClientConnection(MqttConnectionConfig config) throws MqttException { } try { - acquireNativeHandle(mqttClientConnectionNew(config.getMqttClient().getNativeHandle(), this)); + acquireNativeHandle(mqttClientConnectionNewFrom311Client(config.getMqttClient().getNativeHandle(), this)); + SetupConfig(config); + + } catch (CrtRuntimeException ex) { + throw new MqttException("Exception during mqttClientConnectionNew: " + ex.getMessage()); + } + } + + /** + * Constructs a new MqttClientConnection from a Mqtt5Client. Connections are + * reusable after being + * disconnected. + * + * @param mqtt5client the mqtt5 client to setup from + * @param callbacks connection callbacks triggered when receive connection + * events + * + * @throws MqttException If mqttClient is null + */ + public MqttClientConnection(Mqtt5Client mqtt5client, MqttClientConnectionEvents callbacks) throws MqttException { + if (mqtt5client == null) { + throw new MqttException("mqttClient must not be null"); + } + try (MqttConnectionConfig config = s_toMqtt3ConnectionConfig(mqtt5client.getClientOptions())) { + config.setMqtt5Client(mqtt5client); + if (callbacks != null) { + config.setConnectionCallbacks(callbacks); + } + + if (config.getClientId() == null) { + throw new MqttException("clientId must not be null"); + } + if (config.getEndpoint() == null) { + throw new MqttException("endpoint must not be null"); + } + if (config.getPort() <= 0 || config.getPort() > 65535) { + throw new MqttException("port must be a positive integer between 1 and 65535"); + } + + try { + acquireNativeHandle( + mqttClientConnectionNewFrom5Client(config.getMqtt5Client().getNativeHandle(), this)); + SetupConfig(config); + + } catch (CrtRuntimeException ex) { + throw new MqttException("Exception during mqttClientConnectionNew: " + ex.getMessage()); + } + } catch (Exception e) { + throw new MqttException("Failed to setup mqtt3 connection : " + e.getMessage()); + } + + } + + private void SetupConfig(MqttConnectionConfig config) throws MqttException { + try { if (config.getUsername() != null) { mqttClientConnectionSetLogin(getNativeHandle(), config.getUsername(), config.getPassword()); } @@ -200,7 +283,12 @@ private void onConnectionClosed() { */ public CompletableFuture connect() throws MqttException { - TlsContext tls = config.getMqttClient().getTlsContext(); + TlsContext tls = null; + if (config.getMqttClient() != null) { + tls = config.getMqttClient().getTlsContext(); + } else if (config.getMqtt5Client() != null) { + tls = config.getMqtt5Client().getClientOptions().getTlsContext(); + } // Just clamp the pingTimeout, no point in throwing short pingTimeout = (short) Math.max(0, Math.min(config.getPingTimeoutMs(), Short.MAX_VALUE)); @@ -362,7 +450,9 @@ private void onWebsocketHandshake(HttpRequest handshakeRequest, long nativeUserD } /** - * Returns statistics about the current state of the MqttClientConnection's queue of operations. + * Returns statistics about the current state of the MqttClientConnection's + * queue of operations. + * * @return Current state of the connection's queue of operations. */ public MqttClientConnectionOperationStatistics getOperationStatistics() { @@ -372,7 +462,10 @@ public MqttClientConnectionOperationStatistics getOperationStatistics() { /******************************************************************************* * Native methods ******************************************************************************/ - private static native long mqttClientConnectionNew(long client, MqttClientConnection thisObj) + private static native long mqttClientConnectionNewFrom311Client(long client, MqttClientConnection thisObj) + throws CrtRuntimeException; + + private static native long mqttClientConnectionNewFrom5Client(long client, MqttClientConnection thisObj) throws CrtRuntimeException; private static native void mqttClientConnectionDestroy(long connection); @@ -419,6 +512,7 @@ private static native void mqttClientConnectionSetHttpProxyOptions(long connecti String proxyAuthorizationUsername, String proxyAuthorizationPassword) throws CrtRuntimeException; - private static native MqttClientConnectionOperationStatistics mqttClientConnectionGetOperationStatistics(long connection); + private static native MqttClientConnectionOperationStatistics mqttClientConnectionGetOperationStatistics( + long connection); }; diff --git a/src/main/java/software/amazon/awssdk/crt/mqtt/MqttConnectionConfig.java b/src/main/java/software/amazon/awssdk/crt/mqtt/MqttConnectionConfig.java index f61b7e34c..255697852 100644 --- a/src/main/java/software/amazon/awssdk/crt/mqtt/MqttConnectionConfig.java +++ b/src/main/java/software/amazon/awssdk/crt/mqtt/MqttConnectionConfig.java @@ -11,6 +11,7 @@ import software.amazon.awssdk.crt.http.HttpProxyOptions; import software.amazon.awssdk.crt.io.ClientTlsContext; import software.amazon.awssdk.crt.io.SocketOptions; +import software.amazon.awssdk.crt.mqtt5.Mqtt5Client; /** * Encapsulates all per-mqtt-connection configuration @@ -23,6 +24,7 @@ public final class MqttConnectionConfig extends CrtResource { /* mqtt */ private MqttClient mqttClient; + private Mqtt5Client mqtt5Client; private String clientId; private String username; private String password; @@ -314,6 +316,25 @@ public MqttClient getMqttClient() { return mqttClient; } + /** + * Configures the mqtt5 client to use for a connection + * + * @param mqtt5Client the mqtt client to use + */ + public void setMqtt5Client(Mqtt5Client mqtt5Client) { + swapReferenceTo(this.mqtt5Client, mqtt5Client); + this.mqtt5Client = mqtt5Client; + } + + /** + * Queries the mqtt5 client to use for a connection + * + * @return the mqtt5 client to use + */ + public Mqtt5Client getMqtt5Client() { + return mqtt5Client; + } + /** * Sets the login credentials for a connection. * @@ -529,6 +550,7 @@ public MqttConnectionConfig clone() { clone.setSocketOptions(getSocketOptions()); clone.setMqttClient(getMqttClient()); + clone.setMqtt5Client(getMqtt5Client()); clone.setClientId(getClientId()); clone.setUsername(getUsername()); clone.setPassword(getPassword()); diff --git a/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java index ed68eb61c..6cfb92b9c 100644 --- a/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java +++ b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java @@ -46,6 +46,11 @@ public class Mqtt5Client extends CrtResource { */ private boolean isConnected; + /** + * A private config used to save config for mqtt3 connection creation + */ + private Mqtt5ClientOptions clientOptions; + /** * Creates a Mqtt5Client instance using the provided Mqtt5ClientOptions. Once the Mqtt5Client is created, * changing the settings will not cause a change in already created Mqtt5Client's. @@ -54,6 +59,7 @@ public class Mqtt5Client extends CrtResource { * @throws CrtRuntimeException If the system is unable to allocate space for a native MQTT5 client structure */ public Mqtt5Client(Mqtt5ClientOptions options) throws CrtRuntimeException { + clientOptions = options; ClientBootstrap bootstrap = options.getBootstrap(); SocketOptions socketOptions = options.getSocketOptions(); TlsContext tlsContext = options.getTlsContext(); @@ -202,6 +208,21 @@ private synchronized void setIsConnected(boolean connected) { isConnected = connected; } + + /******************************************************************************* + * Mqtt5 to Mqtt3 Adapter + ******************************************************************************/ + + /** + * Returns the Mqtt5ClientOptions used for the Mqtt5Client + * + * @return Mqtt5ClientOptions + */ + public Mqtt5ClientOptions getClientOptions() + { + return clientOptions; + } + /******************************************************************************* * websocket methods ******************************************************************************/ @@ -228,6 +249,7 @@ private void onWebsocketHandshake(HttpRequest handshakeRequest, long nativeUserD } } + /******************************************************************************* * native methods ******************************************************************************/ diff --git a/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5ClientOptions.java b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5ClientOptions.java index 1d03daf9c..10c227bc3 100644 --- a/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5ClientOptions.java +++ b/src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5ClientOptions.java @@ -11,6 +11,7 @@ import software.amazon.awssdk.crt.io.ExponentialBackoffRetryOptions.JitterMode; import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket; +import software.amazon.awssdk.crt.mqtt.MqttConnectionConfig; import java.util.Map; import java.util.function.Function; diff --git a/src/native/mqtt_connection.c b/src/native/mqtt_connection.c index ba2cc0fdf..b90928427 100644 --- a/src/native/mqtt_connection.c +++ b/src/native/mqtt_connection.c @@ -29,6 +29,7 @@ #include "http_request_utils.h" #include "java_class_ids.h" +#include "mqtt5_client_jni.h" /******************************************************************************* * mqtt_jni_async_callback - carries an AsyncCallback around as user data to mqtt @@ -82,25 +83,21 @@ static void s_mqtt_jni_connection_release(struct mqtt_jni_connection *connection size_t old_value = aws_atomic_fetch_sub(&connection->ref_count, 1); AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "mqtt_jni_connection release, ref count now = %d", (int)old_value - 1); +} - if (old_value == 1) { +/* The destroy function is called on Java MqttClientConnection resource release. */ +static void s_mqtt_jni_connection_destroy(struct mqtt_jni_connection *connection) { + /* For mqtt311 client, we have to call aws_mqtt_client_connection_disconnect before releasing the underlying c + * connection.*/ + if (aws_mqtt_client_connection_disconnect( + connection->client_connection, s_on_shutdown_disconnect_complete, connection) != AWS_OP_SUCCESS) { - /** - * Disable the onClosed callback, so it is not invoked on the last disconnect for clean-up. - * (We do not invoke any callbacks on the last disconnect during clean-up/shutdown. - * Calling the callback on the final disconnect will cause a segfault!) + /* + * This can happen under normal code paths if the client happens to be disconnected at cleanup/shutdown + * time. Log it (in case it was unexpected) and then shutdown the underlying connection manually. */ - aws_mqtt_client_connection_set_connection_closed_handler(connection->client_connection, NULL, NULL); - - if (aws_mqtt_client_connection_disconnect( - connection->client_connection, s_on_shutdown_disconnect_complete, connection) != AWS_OP_SUCCESS) { - - /* - * This can happen under normal code paths if the client happens to be disconnected at cleanup/shutdown - * time. Log it (in case it was unexpected) and then invoke the shutdown callback manually. - */ - s_on_shutdown_disconnect_complete(connection->client_connection, connection); - } + AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "Client disconnect failed. Release the client connection."); + s_on_shutdown_disconnect_complete(connection->client_connection, NULL); } } @@ -329,14 +326,39 @@ static void s_on_connection_closed( } aws_jni_release_thread_env(connection->jvm, env); /********** JNI ENV RELEASE **********/ +} - /* Release the connection acquired during the disconnect function invoke */ - s_mqtt_jni_connection_release(connection); +static void s_on_connection_terminated(void *user_data) { + + struct mqtt_jni_connection *jni_connection = (struct mqtt_jni_connection *)user_data; + + /********** JNI ENV ACQUIRE **********/ + JNIEnv *env = aws_jni_acquire_thread_env(jni_connection->jvm); + if (env == NULL) { + /* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */ + return; + } + + jobject mqtt_connection = (*env)->NewLocalRef(env, jni_connection->java_mqtt_connection); + if (mqtt_connection != NULL) { + (*env)->CallVoidMethod(env, mqtt_connection, crt_resource_properties.release_references); + + (*env)->DeleteLocalRef(env, mqtt_connection); + + aws_jni_check_and_clear_exception(env); + } + + JavaVM *jvm = jni_connection->jvm; + + s_mqtt_connection_destroy(env, jni_connection); + aws_jni_release_thread_env(jvm, env); + /********** JNI ENV RELEASE **********/ } static struct mqtt_jni_connection *s_mqtt_connection_new( JNIEnv *env, - struct aws_mqtt_client *client, + struct aws_mqtt_client *client3, + struct aws_mqtt5_client_java_jni *client5_jni, jobject java_mqtt_connection) { struct aws_allocator *allocator = aws_jni_get_allocator(); @@ -348,12 +370,17 @@ static struct mqtt_jni_connection *s_mqtt_connection_new( } aws_atomic_store_int(&connection->ref_count, 1); - connection->client = client; connection->java_mqtt_connection = (*env)->NewWeakGlobalRef(env, java_mqtt_connection); jint jvmresult = (*env)->GetJavaVM(env, &connection->jvm); AWS_FATAL_ASSERT(jvmresult == 0); - connection->client_connection = aws_mqtt_client_connection_new(client); + if (client3 != NULL) { + connection->client = client3; + connection->client_connection = aws_mqtt_client_connection_new(client3); + } else if (client5_jni != NULL) { + connection->client_connection = aws_mqtt_client_connection_new_from_mqtt5_client(client5_jni->client); + } + if (!connection->client_connection) { aws_jni_throw_runtime_exception( env, @@ -362,6 +389,15 @@ static struct mqtt_jni_connection *s_mqtt_connection_new( goto on_error; } + if (aws_mqtt_client_connection_set_connection_termination_handler( + connection->client_connection, s_on_connection_terminated, connection)) { + aws_jni_throw_runtime_exception( + env, + "MqttClientConnection.mqtt_connect: aws_mqtt_client_connection_new failed, unable to set termination " + "callback"); + goto on_error; + } + return connection; on_error: @@ -384,15 +420,13 @@ static void s_mqtt_connection_destroy(JNIEnv *env, struct mqtt_jni_connection *c (*env)->DeleteWeakGlobalRef(env, connection->java_mqtt_connection); } - aws_mqtt_client_connection_release(connection->client_connection); - aws_tls_connection_options_clean_up(&connection->tls_options); struct aws_allocator *allocator = aws_jni_get_allocator(); aws_mem_release(allocator, connection); } -JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionNew( +JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionNewFrom311Client( JNIEnv *env, jclass jni_class, jlong jni_client, @@ -400,14 +434,14 @@ JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnectio (void)jni_class; aws_cache_jni_ids(env); - struct aws_mqtt_client *client = (struct aws_mqtt_client *)jni_client; - if (!client) { - aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_new: Client is invalid/null"); + struct mqtt_jni_connection *connection = NULL; + struct aws_mqtt_client *client3 = (struct aws_mqtt_client *)jni_client; + if (!client3) { + aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_new: Mqtt3 Client is invalid/null"); return (jlong)NULL; } - /* any error after this point needs to jump to error_cleanup */ - struct mqtt_jni_connection *connection = s_mqtt_connection_new(env, client, jni_mqtt_connection); + connection = s_mqtt_connection_new(env, client3, NULL, jni_mqtt_connection); if (!connection) { return (jlong)NULL; } @@ -420,33 +454,44 @@ JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnectio return (jlong)connection; } -static void s_on_shutdown_disconnect_complete(struct aws_mqtt_client_connection *connection, void *user_data) { - (void)connection; - - struct mqtt_jni_connection *jni_connection = (struct mqtt_jni_connection *)user_data; +JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionNewFrom5Client( + JNIEnv *env, + jclass jni_class, + jlong jni_client, + jobject jni_mqtt_connection) { + (void)jni_class; + aws_cache_jni_ids(env); - AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "mqtt_jni_connection shutdown complete, releasing references"); + struct mqtt_jni_connection *connection = NULL; + struct aws_mqtt5_client_java_jni *client5_jni = (struct aws_mqtt5_client_java_jni *)jni_client; + if (!client5_jni) { + aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_new: Mqtt5 Client is invalid/null"); + return (jlong)NULL; + } - /********** JNI ENV ACQUIRE **********/ - JNIEnv *env = aws_jni_acquire_thread_env(jni_connection->jvm); - if (env == NULL) { - /* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */ - return; + connection = s_mqtt_connection_new(env, NULL, client5_jni, jni_mqtt_connection); + if (!connection) { + return (jlong)NULL; } - jobject mqtt_connection = (*env)->NewLocalRef(env, jni_connection->java_mqtt_connection); - if (mqtt_connection != NULL) { - (*env)->CallVoidMethod(env, mqtt_connection, crt_resource_properties.release_references); + aws_mqtt_client_connection_set_connection_interruption_handlers( + connection->client_connection, s_on_connection_interrupted, connection, s_on_connection_resumed, connection); + aws_mqtt_client_connection_set_connection_closed_handler( + connection->client_connection, s_on_connection_closed, connection); - (*env)->DeleteLocalRef(env, mqtt_connection); + return (jlong)connection; +} - aws_jni_check_and_clear_exception(env); - } +/* The disconnect callback called on shutdown. We will release the underlying connection here, which should init the +** client shutdown process. Then on termination callback, we will finally release all jni resources. +*/ +static void s_on_shutdown_disconnect_complete(struct aws_mqtt_client_connection *connection, void *user_data) { + (void)user_data; - JavaVM *jvm = jni_connection->jvm; - s_mqtt_connection_destroy(env, jni_connection); - aws_jni_release_thread_env(jvm, env); - /********** JNI ENV RELEASE **********/ + AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "mqtt_jni_connection shutdown complete, releasing references"); + + /* Release the underlying mqtt connection */ + aws_mqtt_client_connection_release(connection); } /******************************************************************************* @@ -461,7 +506,7 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection aws_cache_jni_ids(env); struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection; - s_mqtt_jni_connection_release(connection); + s_mqtt_jni_connection_destroy(connection); } /******************************************************************************* @@ -583,8 +628,6 @@ void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClien return; } - s_mqtt_jni_connection_acquire(connection); - if (aws_mqtt_client_connection_disconnect( connection->client_connection, s_on_connection_disconnected, disconnect_callback) != AWS_OP_SUCCESS) { int error = aws_last_error(); @@ -598,8 +641,6 @@ void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClien error, aws_error_str(error)); s_on_connection_disconnected(connection->client_connection, disconnect_callback); - // Release the reference manually - s_mqtt_jni_connection_release(connection); } } diff --git a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java index 3d01c07ef..6dc60670d 100644 --- a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java +++ b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java @@ -43,7 +43,6 @@ import software.amazon.awssdk.crt.mqtt5.packets.SubscribePacket.RetainHandlingType; import java.util.ArrayList; -import java.util.List; import java.util.Random; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -51,155 +50,11 @@ import java.util.function.Consumer; /* For environment variable setup, see SetupCrossCICrtEnvironment in the CRT builder */ -public class Mqtt5ClientTest extends CrtTestFixture { - - // MQTT5 Codebuild/Direct connections data - static final String AWS_TEST_MQTT5_DIRECT_MQTT_HOST = System.getProperty("AWS_TEST_MQTT5_DIRECT_MQTT_HOST"); - static final String AWS_TEST_MQTT5_DIRECT_MQTT_PORT = System.getProperty("AWS_TEST_MQTT5_DIRECT_MQTT_PORT"); - static final String AWS_TEST_MQTT5_DIRECT_MQTT_BASIC_AUTH_HOST = System.getProperty("AWS_TEST_MQTT5_DIRECT_MQTT_BASIC_AUTH_HOST"); - static final String AWS_TEST_MQTT5_DIRECT_MQTT_BASIC_AUTH_PORT = System.getProperty("AWS_TEST_MQTT5_DIRECT_MQTT_BASIC_AUTH_PORT"); - static final String AWS_TEST_MQTT5_DIRECT_MQTT_TLS_HOST = System.getProperty("AWS_TEST_MQTT5_DIRECT_MQTT_TLS_HOST"); - static final String AWS_TEST_MQTT5_DIRECT_MQTT_TLS_PORT = System.getProperty("AWS_TEST_MQTT5_DIRECT_MQTT_TLS_PORT"); - // MQTT5 Codebuild/Websocket connections data - static final String AWS_TEST_MQTT5_WS_MQTT_HOST = System.getProperty("AWS_TEST_MQTT5_WS_MQTT_HOST"); - static final String AWS_TEST_MQTT5_WS_MQTT_PORT = System.getProperty("AWS_TEST_MQTT5_WS_MQTT_PORT"); - static final String AWS_TEST_MQTT5_WS_MQTT_BASIC_AUTH_HOST = System.getProperty("AWS_TEST_MQTT5_WS_MQTT_BASIC_AUTH_HOST"); - static final String AWS_TEST_MQTT5_WS_MQTT_BASIC_AUTH_PORT = System.getProperty("AWS_TEST_MQTT5_WS_MQTT_BASIC_AUTH_PORT"); - static final String AWS_TEST_MQTT5_WS_MQTT_TLS_HOST = System.getProperty("AWS_TEST_MQTT5_WS_MQTT_TLS_HOST"); - static final String AWS_TEST_MQTT5_WS_MQTT_TLS_PORT = System.getProperty("AWS_TEST_MQTT5_WS_MQTT_TLS_PORT"); - // MQTT5 Codebuild misc connections data - static final String AWS_TEST_MQTT5_BASIC_AUTH_USERNAME = System.getProperty("AWS_TEST_MQTT5_BASIC_AUTH_USERNAME"); - static final String AWS_TEST_MQTT5_BASIC_AUTH_PASSWORD = System.getProperty("AWS_TEST_MQTT5_BASIC_AUTH_PASSWORD"); - static final String AWS_TEST_MQTT5_CERTIFICATE_FILE = System.getProperty("AWS_TEST_MQTT5_CERTIFICATE_FILE"); - static final String AWS_TEST_MQTT5_KEY_FILE = System.getProperty("AWS_TEST_MQTT5_KEY_FILE"); - // MQTT5 Proxy - static final String AWS_TEST_MQTT5_PROXY_HOST = System.getProperty("AWS_TEST_MQTT5_PROXY_HOST"); - static final String AWS_TEST_MQTT5_PROXY_PORT = System.getProperty("AWS_TEST_MQTT5_PROXY_PORT"); - // MQTT5 Endpoint/Host credentials - static final String AWS_TEST_MQTT5_IOT_CORE_HOST = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_HOST"); - static final String AWS_TEST_MQTT5_IOT_CORE_REGION = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_REGION"); - static final String AWS_TEST_MQTT5_IOT_CORE_RSA_CERT = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT"); - static final String AWS_TEST_MQTT5_IOT_CORE_RSA_KEY = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY"); - // MQTT5 Static credential related - static final String AWS_TEST_MQTT5_ROLE_CREDENTIAL_ACCESS_KEY = System.getProperty("AWS_TEST_MQTT5_ROLE_CREDENTIAL_ACCESS_KEY"); - static final String AWS_TEST_MQTT5_ROLE_CREDENTIAL_SECRET_ACCESS_KEY = System.getProperty("AWS_TEST_MQTT5_ROLE_CREDENTIAL_SECRET_ACCESS_KEY"); - static final String AWS_TEST_MQTT5_ROLE_CREDENTIAL_SESSION_TOKEN = System.getProperty("AWS_TEST_MQTT5_ROLE_CREDENTIAL_SESSION_TOKEN"); - // MQTT5 Cognito - static final String AWS_TEST_MQTT5_COGNITO_ENDPOINT = System.getProperty("AWS_TEST_MQTT5_COGNITO_ENDPOINT"); - static final String AWS_TEST_MQTT5_COGNITO_IDENTITY = System.getProperty("AWS_TEST_MQTT5_COGNITO_IDENTITY"); - // MQTT5 Keystore - static final String AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_FORMAT = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_FORMAT"); - static final String AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_FILE = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_FILE"); - static final String AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_PASSWORD = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_PASSWORD"); - static final String AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_CERT_ALIAS = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_CERT_ALIAS"); - static final String AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_CERT_PASSWORD = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_CERT_PASSWORD"); - // MQTT5 PKCS12 - static final String AWS_TEST_MQTT5_IOT_CORE_PKCS12_KEY = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_PKCS12_KEY"); - static final String AWS_TEST_MQTT5_IOT_CORE_PKCS12_KEY_PASSWORD = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_PKCS12_KEY_PASSWORD"); - // MQTT5 PKCS11 - static final String AWS_TEST_MQTT5_IOT_CORE_PKCS11_LIB = System.getProperty("AWS_TEST_PKCS11_LIB"); - static final String AWS_TEST_MQTT5_IOT_CORE_PKCS11_TOKEN_LABEL = System.getProperty("AWS_TEST_PKCS11_TOKEN_LABEL"); - static final String AWS_TEST_MQTT5_IOT_CORE_PKCS11_PIN = System.getProperty("AWS_TEST_PKCS11_PIN"); - static final String AWS_TEST_MQTT5_IOT_CORE_PKCS11_PKEY_LABEL = System.getProperty("AWS_TEST_PKCS11_PKEY_LABEL"); - static final String AWS_TEST_MQTT5_IOT_CORE_PKCS11_CERT_FILE = System.getProperty("AWS_TEST_PKCS11_CERT_FILE"); - // MQTT5 X509 - static final String AWS_TEST_MQTT5_IOT_CORE_X509_CERT = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_X509_CERT"); - static final String AWS_TEST_MQTT5_IOT_CORE_X509_KEY = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_X509_KEY"); - static final String AWS_TEST_MQTT5_IOT_CORE_X509_ENDPOINT = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_X509_ENDPOINT"); - static final String AWS_TEST_MQTT5_IOT_CORE_X509_ROLE_ALIAS = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_X509_ROLE_ALIAS"); - static final String AWS_TEST_MQTT5_IOT_CORE_X509_THING_NAME = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_X509_THING_NAME"); - // MQTT5 Windows Cert Store - static final String AWS_TEST_MQTT5_IOT_CORE_WINDOWS_PFX_CERT_NO_PASS = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_WINDOWS_PFX_CERT_NO_PASS"); - static final String AWS_TEST_MQTT5_IOT_CORE_WINDOWS_CERT_STORE = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_WINDOWS_CERT_STORE"); - - private int OPERATION_TIMEOUT_TIME = 30; +public class Mqtt5ClientTest extends Mqtt5ClientTestFixture { public Mqtt5ClientTest() { } - /** - * ============================================================ - * TEST HELPER FUNCTIONS - * ============================================================ - */ - - static final class LifecycleEvents_Futured implements Mqtt5ClientOptions.LifecycleEvents { - CompletableFuture connectedFuture = new CompletableFuture<>(); - CompletableFuture stopFuture = new CompletableFuture<>(); - - ConnAckPacket connectSuccessPacket = null; - NegotiatedSettings connectSuccessSettings = null; - - int connectFailureCode = 0; - ConnAckPacket connectFailurePacket = null; - - int disconnectFailureCode = 0; - DisconnectPacket disconnectPacket = null; - - @Override - public void onAttemptingConnect(Mqtt5Client client, OnAttemptingConnectReturn onAttemptingConnectReturn) {} - - @Override - public void onConnectionSuccess(Mqtt5Client client, OnConnectionSuccessReturn onConnectionSuccessReturn) { - ConnAckPacket connAckData = onConnectionSuccessReturn.getConnAckPacket(); - NegotiatedSettings negotiatedSettings = onConnectionSuccessReturn.getNegotiatedSettings(); - connectSuccessPacket = connAckData; - connectSuccessSettings = negotiatedSettings; - connectedFuture.complete(null); - } - - @Override - public void onConnectionFailure(Mqtt5Client client, OnConnectionFailureReturn onConnectionFailureReturn) { - connectFailureCode = onConnectionFailureReturn.getErrorCode(); - connectFailurePacket = onConnectionFailureReturn.getConnAckPacket(); - connectedFuture.completeExceptionally(new Exception("Could not connect! Error name: " + CRT.awsErrorName(connectFailureCode))); - } - - @Override - public void onDisconnection(Mqtt5Client client, OnDisconnectionReturn onDisconnectionReturn) { - disconnectFailureCode = onDisconnectionReturn.getErrorCode(); - disconnectPacket = onDisconnectionReturn.getDisconnectPacket(); - } - - @Override - public void onStopped(Mqtt5Client client, OnStoppedReturn onStoppedReturn) { - stopFuture.complete(null); - } - } - - static final class PublishEvents_Futured implements PublishEvents { - CompletableFuture publishReceivedFuture = new CompletableFuture<>(); - PublishPacket publishPacket = null; - - @Override - public void onMessageReceived(Mqtt5Client client, PublishReturn result) { - publishPacket = result.getPublishPacket(); - publishReceivedFuture.complete(null); - } - } - - static final class PublishEvents_Futured_Counted implements PublishEvents { - CompletableFuture publishReceivedFuture = new CompletableFuture<>(); - int currentPublishCount = 0; - int desiredPublishCount = 0; - List publishPacketsReceived = new ArrayList(); - - @Override - public void onMessageReceived(Mqtt5Client client, PublishReturn result) { - currentPublishCount += 1; - if (currentPublishCount == desiredPublishCount) { - publishReceivedFuture.complete(null); - } else if (currentPublishCount > desiredPublishCount) { - publishReceivedFuture.completeExceptionally(new Throwable("Too many publish packets received")); - } - - if (publishPacketsReceived.contains(result)) { - publishReceivedFuture.completeExceptionally(new Throwable("Duplicate publish packet received!")); - } - publishPacketsReceived.add(result.getPublishPacket()); - } - } - /** * ============================================================ * CREATION TEST CASES @@ -608,7 +463,7 @@ public void ConnDC_UC6() { willPacketBuilder.withQOS(QOS.AT_LEAST_ONCE).withPayload("Hello World".getBytes()).withTopic("test/topic"); ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder(); - connectBuilder.withClientId("MQTT5 CRT"); + connectBuilder.withClientId("MQTT5 CRT" + UUID.randomUUID().toString()); connectBuilder.withKeepAliveIntervalSeconds(1000L); connectBuilder.withMaximumPacketSizeBytes(1000L); connectBuilder.withPassword(AWS_TEST_MQTT5_BASIC_AUTH_PASSWORD.getBytes()); @@ -876,7 +731,7 @@ public void ConnWS_UC6() { willPacketBuilder.withQOS(QOS.AT_LEAST_ONCE).withPayload("Hello World".getBytes()).withTopic("test/topic"); ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder(); - connectBuilder.withClientId("MQTT5 CRT"); + connectBuilder.withClientId("MQTT5 CRT"+UUID.randomUUID().toString()); connectBuilder.withKeepAliveIntervalSeconds(1000L); connectBuilder.withMaximumPacketSizeBytes(1000L); connectBuilder.withPassword(AWS_TEST_MQTT5_BASIC_AUTH_PASSWORD.getBytes()); @@ -1984,6 +1839,7 @@ public void Negotiated_Rejoin_Always() { events.connectSuccessSettings.getRejoinedSession()); client.stop(new DisconnectPacketBuilder().build()); + events.stopFuture.get(); } builder.withSessionBehavior(ClientSessionBehavior.REJOIN_ALWAYS); diff --git a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTestFixture.java b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTestFixture.java new file mode 100644 index 000000000..85008668f --- /dev/null +++ b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTestFixture.java @@ -0,0 +1,168 @@ +package software.amazon.awssdk.crt.test; + + +import software.amazon.awssdk.crt.*; + +import software.amazon.awssdk.crt.mqtt5.*; + +import software.amazon.awssdk.crt.mqtt5.Mqtt5ClientOptions.PublishEvents; +import software.amazon.awssdk.crt.mqtt5.packets.*; + + +import java.util.ArrayList; +import java.util.List; + +import java.util.concurrent.CompletableFuture; + + +/* For environment variable setup, see SetupCrossCICrtEnvironment in the CRT builder */ +public class Mqtt5ClientTestFixture extends CrtTestFixture { + + // MQTT5 Codebuild/Direct connections data + static final String AWS_TEST_MQTT5_DIRECT_MQTT_HOST = System.getProperty("AWS_TEST_MQTT5_DIRECT_MQTT_HOST"); + static final String AWS_TEST_MQTT5_DIRECT_MQTT_PORT = System.getProperty("AWS_TEST_MQTT5_DIRECT_MQTT_PORT"); + static final String AWS_TEST_MQTT5_DIRECT_MQTT_BASIC_AUTH_HOST = System.getProperty("AWS_TEST_MQTT5_DIRECT_MQTT_BASIC_AUTH_HOST"); + static final String AWS_TEST_MQTT5_DIRECT_MQTT_BASIC_AUTH_PORT = System.getProperty("AWS_TEST_MQTT5_DIRECT_MQTT_BASIC_AUTH_PORT"); + static final String AWS_TEST_MQTT5_DIRECT_MQTT_TLS_HOST = System.getProperty("AWS_TEST_MQTT5_DIRECT_MQTT_TLS_HOST"); + static final String AWS_TEST_MQTT5_DIRECT_MQTT_TLS_PORT = System.getProperty("AWS_TEST_MQTT5_DIRECT_MQTT_TLS_PORT"); + // MQTT5 Codebuild/Websocket connections data + static final String AWS_TEST_MQTT5_WS_MQTT_HOST = System.getProperty("AWS_TEST_MQTT5_WS_MQTT_HOST"); + static final String AWS_TEST_MQTT5_WS_MQTT_PORT = System.getProperty("AWS_TEST_MQTT5_WS_MQTT_PORT"); + static final String AWS_TEST_MQTT5_WS_MQTT_BASIC_AUTH_HOST = System.getProperty("AWS_TEST_MQTT5_WS_MQTT_BASIC_AUTH_HOST"); + static final String AWS_TEST_MQTT5_WS_MQTT_BASIC_AUTH_PORT = System.getProperty("AWS_TEST_MQTT5_WS_MQTT_BASIC_AUTH_PORT"); + static final String AWS_TEST_MQTT5_WS_MQTT_TLS_HOST = System.getProperty("AWS_TEST_MQTT5_WS_MQTT_TLS_HOST"); + static final String AWS_TEST_MQTT5_WS_MQTT_TLS_PORT = System.getProperty("AWS_TEST_MQTT5_WS_MQTT_TLS_PORT"); + // MQTT5 Codebuild misc connections data + static final String AWS_TEST_MQTT5_BASIC_AUTH_USERNAME = System.getProperty("AWS_TEST_MQTT5_BASIC_AUTH_USERNAME"); + static final String AWS_TEST_MQTT5_BASIC_AUTH_PASSWORD = System.getProperty("AWS_TEST_MQTT5_BASIC_AUTH_PASSWORD"); + static final String AWS_TEST_MQTT5_CERTIFICATE_FILE = System.getProperty("AWS_TEST_MQTT5_CERTIFICATE_FILE"); + static final String AWS_TEST_MQTT5_KEY_FILE = System.getProperty("AWS_TEST_MQTT5_KEY_FILE"); + // MQTT5 Proxy + static final String AWS_TEST_MQTT5_PROXY_HOST = System.getProperty("AWS_TEST_MQTT5_PROXY_HOST"); + static final String AWS_TEST_MQTT5_PROXY_PORT = System.getProperty("AWS_TEST_MQTT5_PROXY_PORT"); + // MQTT5 Endpoint/Host credentials + static final String AWS_TEST_MQTT5_IOT_CORE_HOST = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_HOST"); + static final String AWS_TEST_MQTT5_IOT_CORE_REGION = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_REGION"); + static final String AWS_TEST_MQTT5_IOT_CORE_RSA_CERT = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT"); + static final String AWS_TEST_MQTT5_IOT_CORE_RSA_KEY = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY"); + // MQTT5 Static credential related + static final String AWS_TEST_MQTT5_ROLE_CREDENTIAL_ACCESS_KEY = System.getProperty("AWS_TEST_MQTT5_ROLE_CREDENTIAL_ACCESS_KEY"); + static final String AWS_TEST_MQTT5_ROLE_CREDENTIAL_SECRET_ACCESS_KEY = System.getProperty("AWS_TEST_MQTT5_ROLE_CREDENTIAL_SECRET_ACCESS_KEY"); + static final String AWS_TEST_MQTT5_ROLE_CREDENTIAL_SESSION_TOKEN = System.getProperty("AWS_TEST_MQTT5_ROLE_CREDENTIAL_SESSION_TOKEN"); + // MQTT5 Cognito + static final String AWS_TEST_MQTT5_COGNITO_ENDPOINT = System.getProperty("AWS_TEST_MQTT5_COGNITO_ENDPOINT"); + static final String AWS_TEST_MQTT5_COGNITO_IDENTITY = System.getProperty("AWS_TEST_MQTT5_COGNITO_IDENTITY"); + // MQTT5 Keystore + static final String AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_FORMAT = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_FORMAT"); + static final String AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_FILE = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_FILE"); + static final String AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_PASSWORD = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_PASSWORD"); + static final String AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_CERT_ALIAS = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_CERT_ALIAS"); + static final String AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_CERT_PASSWORD = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_CERT_PASSWORD"); + // MQTT5 PKCS12 + static final String AWS_TEST_MQTT5_IOT_CORE_PKCS12_KEY = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_PKCS12_KEY"); + static final String AWS_TEST_MQTT5_IOT_CORE_PKCS12_KEY_PASSWORD = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_PKCS12_KEY_PASSWORD"); + // MQTT5 PKCS11 + static final String AWS_TEST_MQTT5_IOT_CORE_PKCS11_LIB = System.getProperty("AWS_TEST_PKCS11_LIB"); + static final String AWS_TEST_MQTT5_IOT_CORE_PKCS11_TOKEN_LABEL = System.getProperty("AWS_TEST_PKCS11_TOKEN_LABEL"); + static final String AWS_TEST_MQTT5_IOT_CORE_PKCS11_PIN = System.getProperty("AWS_TEST_PKCS11_PIN"); + static final String AWS_TEST_MQTT5_IOT_CORE_PKCS11_PKEY_LABEL = System.getProperty("AWS_TEST_PKCS11_PKEY_LABEL"); + static final String AWS_TEST_MQTT5_IOT_CORE_PKCS11_CERT_FILE = System.getProperty("AWS_TEST_PKCS11_CERT_FILE"); + // MQTT5 X509 + static final String AWS_TEST_MQTT5_IOT_CORE_X509_CERT = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_X509_CERT"); + static final String AWS_TEST_MQTT5_IOT_CORE_X509_KEY = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_X509_KEY"); + static final String AWS_TEST_MQTT5_IOT_CORE_X509_ENDPOINT = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_X509_ENDPOINT"); + static final String AWS_TEST_MQTT5_IOT_CORE_X509_ROLE_ALIAS = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_X509_ROLE_ALIAS"); + static final String AWS_TEST_MQTT5_IOT_CORE_X509_THING_NAME = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_X509_THING_NAME"); + // MQTT5 Windows Cert Store + static final String AWS_TEST_MQTT5_IOT_CORE_WINDOWS_PFX_CERT_NO_PASS = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_WINDOWS_PFX_CERT_NO_PASS"); + static final String AWS_TEST_MQTT5_IOT_CORE_WINDOWS_CERT_STORE = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_WINDOWS_CERT_STORE"); + + protected int OPERATION_TIMEOUT_TIME = 30; + + public Mqtt5ClientTestFixture() { + } + + /** + * ============================================================ + * TEST HELPER FUNCTIONS + * ============================================================ + */ + + static final class LifecycleEvents_Futured implements Mqtt5ClientOptions.LifecycleEvents { + CompletableFuture connectedFuture = new CompletableFuture<>(); + CompletableFuture stopFuture = new CompletableFuture<>(); + + ConnAckPacket connectSuccessPacket = null; + NegotiatedSettings connectSuccessSettings = null; + + int connectFailureCode = 0; + ConnAckPacket connectFailurePacket = null; + + int disconnectFailureCode = 0; + DisconnectPacket disconnectPacket = null; + + @Override + public void onAttemptingConnect(Mqtt5Client client, OnAttemptingConnectReturn onAttemptingConnectReturn) {} + + @Override + public void onConnectionSuccess(Mqtt5Client client, OnConnectionSuccessReturn onConnectionSuccessReturn) { + ConnAckPacket connAckData = onConnectionSuccessReturn.getConnAckPacket(); + NegotiatedSettings negotiatedSettings = onConnectionSuccessReturn.getNegotiatedSettings(); + connectSuccessPacket = connAckData; + connectSuccessSettings = negotiatedSettings; + connectedFuture.complete(null); + } + + @Override + public void onConnectionFailure(Mqtt5Client client, OnConnectionFailureReturn onConnectionFailureReturn) { + connectFailureCode = onConnectionFailureReturn.getErrorCode(); + connectFailurePacket = onConnectionFailureReturn.getConnAckPacket(); + connectedFuture.completeExceptionally(new Exception("Could not connect! Error name: " + CRT.awsErrorName(connectFailureCode))); + } + + @Override + public void onDisconnection(Mqtt5Client client, OnDisconnectionReturn onDisconnectionReturn) { + disconnectFailureCode = onDisconnectionReturn.getErrorCode(); + disconnectPacket = onDisconnectionReturn.getDisconnectPacket(); + } + + @Override + public void onStopped(Mqtt5Client client, OnStoppedReturn onStoppedReturn) { + stopFuture.complete(null); + } + } + + static final class PublishEvents_Futured implements PublishEvents { + CompletableFuture publishReceivedFuture = new CompletableFuture<>(); + PublishPacket publishPacket = null; + + @Override + public void onMessageReceived(Mqtt5Client client, PublishReturn result) { + publishPacket = result.getPublishPacket(); + publishReceivedFuture.complete(null); + } + } + + static final class PublishEvents_Futured_Counted implements PublishEvents { + CompletableFuture publishReceivedFuture = new CompletableFuture<>(); + int currentPublishCount = 0; + int desiredPublishCount = 0; + List publishPacketsReceived = new ArrayList(); + + @Override + public void onMessageReceived(Mqtt5Client client, PublishReturn result) { + currentPublishCount += 1; + if (currentPublishCount == desiredPublishCount) { + publishReceivedFuture.complete(null); + } else if (currentPublishCount > desiredPublishCount) { + publishReceivedFuture.completeExceptionally(new Throwable("Too many publish packets received")); + } + + if (publishPacketsReceived.contains(result)) { + publishReceivedFuture.completeExceptionally(new Throwable("Duplicate publish packet received!")); + } + publishPacketsReceived.add(result.getPublishPacket()); + } + } + +} diff --git a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5to3AdapterConnectionTest.java b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5to3AdapterConnectionTest.java new file mode 100644 index 000000000..9a3a9a33d --- /dev/null +++ b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5to3AdapterConnectionTest.java @@ -0,0 +1,941 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +package software.amazon.awssdk.crt.test; + +import org.junit.Assume; +import org.junit.Test; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import software.amazon.awssdk.crt.*; +import software.amazon.awssdk.crt.auth.credentials.CredentialsProvider; +import software.amazon.awssdk.crt.auth.credentials.CognitoCredentialsProvider.CognitoCredentialsProviderBuilder; +import software.amazon.awssdk.crt.auth.credentials.DefaultChainCredentialsProvider.DefaultChainCredentialsProviderBuilder; +import software.amazon.awssdk.crt.auth.credentials.StaticCredentialsProvider.StaticCredentialsProviderBuilder; +import software.amazon.awssdk.crt.auth.credentials.X509CredentialsProvider.X509CredentialsProviderBuilder; +import software.amazon.awssdk.crt.auth.signing.AwsSigningConfig; +import software.amazon.awssdk.crt.auth.signing.AwsSigningConfig.AwsSigningAlgorithm; +import software.amazon.awssdk.crt.http.HttpProxyOptions; +import software.amazon.awssdk.crt.http.HttpProxyOptions.HttpProxyConnectionType; +import software.amazon.awssdk.crt.io.ClientBootstrap; +import software.amazon.awssdk.crt.io.EventLoopGroup; +import software.amazon.awssdk.crt.io.HostResolver; +import software.amazon.awssdk.crt.io.Pkcs11Lib; +import software.amazon.awssdk.crt.io.SocketOptions; +import software.amazon.awssdk.crt.io.TlsContext; +import software.amazon.awssdk.crt.io.TlsContextOptions; +import software.amazon.awssdk.crt.io.TlsContextPkcs11Options; +import software.amazon.awssdk.crt.io.ExponentialBackoffRetryOptions.JitterMode; +import software.amazon.awssdk.crt.mqtt5.*; +import software.amazon.awssdk.crt.mqtt5.Mqtt5ClientOptions.ClientOfflineQueueBehavior; +import software.amazon.awssdk.crt.mqtt5.Mqtt5ClientOptions.ClientSessionBehavior; +import software.amazon.awssdk.crt.mqtt5.Mqtt5ClientOptions.ExtendedValidationAndFlowControlOptions; +import software.amazon.awssdk.crt.mqtt5.Mqtt5ClientOptions.LifecycleEvents; +import software.amazon.awssdk.crt.mqtt5.Mqtt5ClientOptions.Mqtt5ClientOptionsBuilder; +import software.amazon.awssdk.crt.mqtt5.Mqtt5ClientOptions.PublishEvents; +import software.amazon.awssdk.crt.mqtt5.packets.*; +import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket.ConnectPacketBuilder; +import software.amazon.awssdk.crt.mqtt5.packets.DisconnectPacket.DisconnectPacketBuilder; +import software.amazon.awssdk.crt.mqtt5.packets.DisconnectPacket.DisconnectReasonCode; +import software.amazon.awssdk.crt.mqtt5.packets.PublishPacket.PublishPacketBuilder; +import software.amazon.awssdk.crt.mqtt5.packets.SubscribePacket.SubscribePacketBuilder; +import software.amazon.awssdk.crt.mqtt5.packets.UnsubscribePacket.UnsubscribePacketBuilder; +import software.amazon.awssdk.crt.mqtt5.packets.SubscribePacket.RetainHandlingType; +import software.amazon.awssdk.crt.mqtt.MqttClient; +import software.amazon.awssdk.crt.mqtt.MqttClientConnection; +import software.amazon.awssdk.crt.mqtt.MqttClientConnectionEvents; +import software.amazon.awssdk.crt.mqtt.OnConnectionSuccessReturn; +import software.amazon.awssdk.crt.mqtt.QualityOfService; +import software.amazon.awssdk.crt.mqtt.OnConnectionFailureReturn; +import software.amazon.awssdk.crt.mqtt.OnConnectionClosedReturn; +import software.amazon.awssdk.crt.mqtt.MqttMessage; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import org.junit.Test; + +/* For environment variable setup, see SetupCrossCICrtEnvironment in the CRT builder */ +public class Mqtt5to3AdapterConnectionTest extends Mqtt5ClientTestFixture { + + private boolean disconnecting = false; + MqttClientConnection connection = null; + private CompletableFuture onConnectionSuccessFuture = new CompletableFuture(); + private CompletableFuture onConnectionFailureFuture = new CompletableFuture(); + private CompletableFuture onConnectionClosedFuture = new CompletableFuture(); + int pubsAcked = 0; + int subsAcked = 0; + // Connection callback events + private MqttClientConnectionEvents events = new MqttClientConnectionEvents() { + @Override + public void onConnectionResumed(boolean sessionPresent) { + System.out.println("Connection resumed"); + } + + @Override + public void onConnectionInterrupted(int errorCode) { + if (!disconnecting) { + System.out.println( + "Connection interrupted: error: " + errorCode + " " + CRT.awsErrorString(errorCode)); + } + } + + @Override + public void onConnectionFailure(OnConnectionFailureReturn data) { + System.out.println("Connection failed with error: " + data.getErrorCode() + " " + + CRT.awsErrorString(data.getErrorCode())); + onConnectionFailureFuture.complete(data); + } + + @Override + public void onConnectionSuccess(OnConnectionSuccessReturn data) { + System.out.println("Connection success. Session present: " + data.getSessionPresent()); + onConnectionSuccessFuture.complete(data); + } + + @Override + public void onConnectionClosed(OnConnectionClosedReturn data) { + System.out.println("Connection disconnected successfully"); + onConnectionClosedFuture.complete(data); + } + }; + + public Mqtt5to3AdapterConnectionTest() { + } + + Consumer connectionMessageTransfomer = null; + + void setConnectionMessageTransformer(Consumer connectionMessageTransfomer) { + this.connectionMessageTransfomer = connectionMessageTransfomer; + } + + void Mqtt3ConnectionClose() { + connection.close(); + } + + void Mqtt3ConnectionDisconnect() { + disconnecting = true; + try { + CompletableFuture disconnected = connection.disconnect(); + disconnected.get(); + } catch (Exception ex) { + fail("Exception during disconnect: " + ex.getMessage()); + } + + } + + boolean Mqtt3Connect(Mqtt5Client client) throws Exception { + try { + connection = new MqttClientConnection(client, events); + if (connectionMessageTransfomer != null) { + connection.onMessage(connectionMessageTransfomer); + } + CompletableFuture connected = connection.connect(); + connected.get(); + } catch (Exception ex) { + fail(ex.getMessage()); + } + return true; + } + + /* Minimal creation and clean up */ + @Test + public void TestCreationMinimal() { + skipIfNetworkUnavailable(); + Assume.assumeNotNull(AWS_TEST_MQTT5_DIRECT_MQTT_HOST, AWS_TEST_MQTT5_DIRECT_MQTT_PORT); + try { + Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder( + AWS_TEST_MQTT5_DIRECT_MQTT_HOST, + Long.parseLong(AWS_TEST_MQTT5_DIRECT_MQTT_PORT)); + ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder(); + connectBuilder.withClientId("test/MQTT5to3Adapter" + UUID.randomUUID().toString()); + builder.withConnectOptions(connectBuilder.build()); + try (Mqtt5Client client = new Mqtt5Client(builder.build())) { + assertNotNull(client); + MqttClientConnection connection = new MqttClientConnection(client, null); + connection.close(); + } + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + + /* Maximum creation and cleanup */ + @Test + public void TestCreationFull() { + skipIfNetworkUnavailable(); + Assume.assumeNotNull( + AWS_TEST_MQTT5_DIRECT_MQTT_HOST, AWS_TEST_MQTT5_DIRECT_MQTT_PORT, + AWS_TEST_MQTT5_BASIC_AUTH_USERNAME, AWS_TEST_MQTT5_BASIC_AUTH_PASSWORD, + AWS_TEST_MQTT5_PROXY_HOST, AWS_TEST_MQTT5_PROXY_PORT); + try { + try ( + EventLoopGroup elg = new EventLoopGroup(1); + HostResolver hr = new HostResolver(elg); + ClientBootstrap bootstrap = new ClientBootstrap(elg, hr); + SocketOptions socketOptions = new SocketOptions();) { + + PublishPacketBuilder willPacketBuilder = new PublishPacketBuilder(); + willPacketBuilder.withQOS(QOS.AT_LEAST_ONCE).withPayload("Hello World".getBytes()) + .withTopic("test/topic"); + + ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder(); + connectBuilder.withClientId("MQTT5 CRT") + .withKeepAliveIntervalSeconds(1000L) + .withMaximumPacketSizeBytes(1000L) + .withPassword(AWS_TEST_MQTT5_BASIC_AUTH_PASSWORD.getBytes()) + .withReceiveMaximum(1000L) + .withRequestProblemInformation(true) + .withRequestResponseInformation(true) + .withSessionExpiryIntervalSeconds(1000L) + .withUsername(AWS_TEST_MQTT5_BASIC_AUTH_USERNAME) + .withWill(willPacketBuilder.build()) + .withWillDelayIntervalSeconds(1000L); + + ArrayList userProperties = new ArrayList(); + userProperties.add(new UserProperty("Hello", "World")); + connectBuilder.withUserProperties(userProperties); + + Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder( + AWS_TEST_MQTT5_DIRECT_MQTT_HOST, + Long.parseLong(AWS_TEST_MQTT5_DIRECT_MQTT_PORT)); + builder.withBootstrap(bootstrap) + .withConnackTimeoutMs(100L) + .withConnectOptions(connectBuilder.build()) + .withExtendedValidationAndFlowControlOptions(ExtendedValidationAndFlowControlOptions.NONE) + .withLifecycleEvents(new software.amazon.awssdk.crt.mqtt5.Mqtt5ClientOptions.LifecycleEvents() { + @Override + public void onAttemptingConnect(Mqtt5Client client, + software.amazon.awssdk.crt.mqtt5.OnAttemptingConnectReturn onAttemptingConnectReturn) { + } + + @Override + public void onConnectionSuccess(Mqtt5Client client, + software.amazon.awssdk.crt.mqtt5.OnConnectionSuccessReturn onConnectionSuccessReturn) { + } + + @Override + public void onConnectionFailure(Mqtt5Client client, + software.amazon.awssdk.crt.mqtt5.OnConnectionFailureReturn onConnectionFailureReturn) { + } + + @Override + public void onDisconnection(Mqtt5Client client, + OnDisconnectionReturn onDisconnectionReturn) { + } + + @Override + public void onStopped(Mqtt5Client client, OnStoppedReturn onStoppedReturn) { + } + }) + .withMaxReconnectDelayMs(1000L) + .withMinConnectedTimeToResetReconnectDelayMs(1000L) + .withMinReconnectDelayMs(1000L) + .withOfflineQueueBehavior(ClientOfflineQueueBehavior.FAIL_ALL_ON_DISCONNECT) + .withAckTimeoutSeconds(1000L) + .withPingTimeoutMs(1000L) + .withPublishEvents(new PublishEvents() { + @Override + public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { + } + }) + .withRetryJitterMode(JitterMode.Default) + .withSessionBehavior(ClientSessionBehavior.CLEAN) + .withSocketOptions(socketOptions); + // Skip websocket and TLS options - those are all different tests + + HttpProxyOptions proxyOptions = new HttpProxyOptions(); + proxyOptions.setHost(AWS_TEST_MQTT5_PROXY_HOST); + proxyOptions.setPort((Integer.parseInt(AWS_TEST_MQTT5_PROXY_PORT))); + proxyOptions.setConnectionType(HttpProxyConnectionType.Tunneling); + builder.withHttpProxyOptions(proxyOptions); + + try (Mqtt5Client client = new Mqtt5Client(builder.build())) { + assertNotNull(client); + MqttClientConnection connection = new MqttClientConnection(client, null); + connection.close(); + } + } + + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + + /**************************************************************** + * CONNECT THROUGH MQTT5 INTERFACE + ****************************************************************/ + /* Happy path. Direct connection with minimal configuration */ + @Test + public void TestDirectConnectThroughMqtt5() { + skipIfNetworkUnavailable(); + Assume.assumeNotNull(AWS_TEST_MQTT5_DIRECT_MQTT_HOST, AWS_TEST_MQTT5_DIRECT_MQTT_PORT); + try { + LifecycleEvents_Futured events = new LifecycleEvents_Futured(); + + Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder( + AWS_TEST_MQTT5_DIRECT_MQTT_HOST, + Long.parseLong(AWS_TEST_MQTT5_DIRECT_MQTT_PORT)); + builder.withLifecycleEvents(events); + ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder(); + connectBuilder.withClientId("test/MQTT5to3Adapter" + UUID.randomUUID().toString()); + builder.withConnectOptions(connectBuilder.build()); + + try (Mqtt5Client client = new Mqtt5Client(builder.build()); + MqttClientConnection connection = new MqttClientConnection(client, null)) { + client.start(); + events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + DisconnectPacketBuilder disconnect = new DisconnectPacketBuilder(); + client.stop(disconnect.build()); + } + + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + + /* Direct connection with basic authentication */ + @Test + public void TestBasicAuthConnectThroughMqtt5() { + skipIfNetworkUnavailable(); + Assume.assumeNotNull( + AWS_TEST_MQTT5_DIRECT_MQTT_BASIC_AUTH_HOST, AWS_TEST_MQTT5_DIRECT_MQTT_BASIC_AUTH_PORT, + AWS_TEST_MQTT5_BASIC_AUTH_USERNAME, AWS_TEST_MQTT5_BASIC_AUTH_PASSWORD); + try { + LifecycleEvents_Futured events = new LifecycleEvents_Futured(); + + Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder( + AWS_TEST_MQTT5_DIRECT_MQTT_BASIC_AUTH_HOST, + Long.parseLong(AWS_TEST_MQTT5_DIRECT_MQTT_BASIC_AUTH_PORT)); + builder.withLifecycleEvents(events); + ConnectPacketBuilder connectOptions = new ConnectPacketBuilder(); + connectOptions.withUsername(AWS_TEST_MQTT5_BASIC_AUTH_USERNAME) + .withPassword(AWS_TEST_MQTT5_BASIC_AUTH_PASSWORD.getBytes()) + .withClientId("test/MQTT5to3Adapter" + UUID.randomUUID().toString()); + builder.withConnectOptions(connectOptions.build()); + + try (Mqtt5Client client = new Mqtt5Client(builder.build()); + MqttClientConnection connection = new MqttClientConnection(client, null);) { + client.start(); + events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + DisconnectPacketBuilder disconnect = new DisconnectPacketBuilder(); + client.stop(disconnect.build()); + } + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + + /* Direct connection with mTLS */ + @Test + public void TestmTLSConnectThroughMqtt5() { + skipIfNetworkUnavailable(); + Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, + AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + try { + LifecycleEvents_Futured events = new LifecycleEvents_Futured(); + + try ( + TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath( + AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + TlsContext tlsContext = new TlsContext(tlsOptions);) { + Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l); + builder.withLifecycleEvents(events); + builder.withTlsContext(tlsContext); + ConnectPacketBuilder connectOptions = new ConnectPacketBuilder(); + connectOptions.withClientId("test/MQTT5to3Adapter" + UUID.randomUUID().toString()); + builder.withConnectOptions(connectOptions.build()); + + try (Mqtt5Client client = new Mqtt5Client(builder.build()); + MqttClientConnection connection = new MqttClientConnection(client, null);) { + client.start(); + events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + DisconnectPacketBuilder disconnect = new DisconnectPacketBuilder(); + client.stop(disconnect.build()); + } + } + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + + /* Happy path. Websocket connection with minimal configuration */ + @Test + public void TestWebsocketMinimalConnectThroughMqtt5() { + skipIfNetworkUnavailable(); + Assume.assumeNotNull(AWS_TEST_MQTT5_WS_MQTT_HOST, AWS_TEST_MQTT5_WS_MQTT_PORT); + try { + + try ( + EventLoopGroup elg = new EventLoopGroup(1); + HostResolver hr = new HostResolver(elg); + ClientBootstrap bootstrap = new ClientBootstrap(elg, hr);) { + + LifecycleEvents_Futured events = new LifecycleEvents_Futured(); + + Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder( + AWS_TEST_MQTT5_WS_MQTT_HOST, Long.parseLong(AWS_TEST_MQTT5_WS_MQTT_PORT)); + builder.withLifecycleEvents(events); + builder.withBootstrap(bootstrap); + ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder(); + connectBuilder.withClientId("test/MQTT5to3Adapter" + UUID.randomUUID().toString()); + builder.withConnectOptions(connectBuilder.build()); + + Consumer websocketTransform = new Consumer() { + @Override + public void accept(Mqtt5WebsocketHandshakeTransformArgs t) { + t.complete(t.getHttpRequest()); + } + }; + builder.withWebsocketHandshakeTransform(websocketTransform); + + try (Mqtt5Client client = new Mqtt5Client(builder.build()); + MqttClientConnection connection = new MqttClientConnection(client, null);) { + client.start(); + events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + DisconnectPacketBuilder disconnect = new DisconnectPacketBuilder(); + client.stop(disconnect.build()); + } + } + + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + + /* Websocket connection with HttpProxyOptions */ + @Test + public void TestWebsocketHttpProxyConnectThroughMqtt5() { + skipIfNetworkUnavailable(); + Assume.assumeNotNull( + AWS_TEST_MQTT5_WS_MQTT_TLS_HOST, AWS_TEST_MQTT5_WS_MQTT_TLS_PORT, + AWS_TEST_MQTT5_PROXY_HOST, AWS_TEST_MQTT5_PROXY_PORT); + try { + LifecycleEvents_Futured events = new LifecycleEvents_Futured(); + + EventLoopGroup elg = new EventLoopGroup(1); + HostResolver hr = new HostResolver(elg); + ClientBootstrap bootstrap = new ClientBootstrap(elg, hr); + + Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder( + AWS_TEST_MQTT5_WS_MQTT_TLS_HOST, Long.parseLong(AWS_TEST_MQTT5_WS_MQTT_TLS_PORT)); + builder.withLifecycleEvents(events); + builder.withBootstrap(bootstrap); + ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder(); + connectBuilder.withClientId("test/MQTT5to3Adapter" + UUID.randomUUID().toString()); + builder.withConnectOptions(connectBuilder.build()); + + TlsContextOptions tlsOptions = TlsContextOptions.createDefaultClient(); + tlsOptions.withVerifyPeer(false); + TlsContext tlsContext = new TlsContext(tlsOptions); + builder.withTlsContext(tlsContext); + + Consumer websocketTransform = new Consumer() { + @Override + public void accept(Mqtt5WebsocketHandshakeTransformArgs t) { + t.complete(t.getHttpRequest()); + } + }; + builder.withWebsocketHandshakeTransform(websocketTransform); + + HttpProxyOptions proxyOptions = new HttpProxyOptions(); + proxyOptions.setHost(AWS_TEST_MQTT5_PROXY_HOST); + proxyOptions.setPort(Integer.parseInt(AWS_TEST_MQTT5_PROXY_PORT)); + proxyOptions.setConnectionType(HttpProxyConnectionType.Tunneling); + builder.withHttpProxyOptions(proxyOptions); + + Mqtt5Client client = new Mqtt5Client(builder.build()); + MqttClientConnection connection = new MqttClientConnection(client, null); + + client.start(); + events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + DisconnectPacketBuilder disconnect = new DisconnectPacketBuilder(); + client.stop(disconnect.build()); + + connection.close(); + client.close(); + tlsContext.close(); + tlsOptions.close(); + elg.close(); + hr.close(); + bootstrap.close(); + + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + + /**************************************************************** + * CONNECT THROUGH MQTT311 INTERFACE + ****************************************************************/ + /* Happy path. Direct connection with minimal configuration */ + @Test + public void TestDirectConnectThroughMqtt311() { + skipIfNetworkUnavailable(); + Assume.assumeNotNull(AWS_TEST_MQTT5_DIRECT_MQTT_HOST, AWS_TEST_MQTT5_DIRECT_MQTT_PORT); + try { + LifecycleEvents_Futured events = new LifecycleEvents_Futured(); + + Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder( + AWS_TEST_MQTT5_DIRECT_MQTT_HOST, + Long.parseLong(AWS_TEST_MQTT5_DIRECT_MQTT_PORT)); + builder.withLifecycleEvents(events); + ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder(); + connectBuilder.withClientId("test/MQTT5to3Adapter" + UUID.randomUUID().toString()); + builder.withConnectOptions(connectBuilder.build()); + + try (Mqtt5Client client = new Mqtt5Client(builder.build())) { + Mqtt3Connect(client); + Mqtt3ConnectionDisconnect(); + Mqtt3ConnectionClose(); + } + + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + + /* Direct connection with basic authentication */ + @Test + public void TestBasicAuthConnectThroughMqtt311() { + skipIfNetworkUnavailable(); + Assume.assumeNotNull( + AWS_TEST_MQTT5_DIRECT_MQTT_BASIC_AUTH_HOST, AWS_TEST_MQTT5_DIRECT_MQTT_BASIC_AUTH_PORT, + AWS_TEST_MQTT5_BASIC_AUTH_USERNAME, AWS_TEST_MQTT5_BASIC_AUTH_PASSWORD); + try { + LifecycleEvents_Futured events = new LifecycleEvents_Futured(); + + Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder( + AWS_TEST_MQTT5_DIRECT_MQTT_BASIC_AUTH_HOST, + Long.parseLong(AWS_TEST_MQTT5_DIRECT_MQTT_BASIC_AUTH_PORT)); + builder.withLifecycleEvents(events); + + ConnectPacketBuilder connectOptions = new ConnectPacketBuilder(); + connectOptions.withUsername(AWS_TEST_MQTT5_BASIC_AUTH_USERNAME) + .withPassword(AWS_TEST_MQTT5_BASIC_AUTH_PASSWORD.getBytes()) + .withClientId("test/MQTT5to3Adapter" + UUID.randomUUID().toString()); + builder.withConnectOptions(connectOptions.build()); + + try (Mqtt5Client client = new Mqtt5Client(builder.build()); + MqttClientConnection connection = new MqttClientConnection(client, null);) { + Mqtt3Connect(client); + Mqtt3ConnectionDisconnect(); + Mqtt3ConnectionClose(); + } + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + + /* Direct connection with mTLS */ + @Test + public void TestmTLSConnectThroughMqtt311() { + skipIfNetworkUnavailable(); + Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, + AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + try { + LifecycleEvents_Futured events = new LifecycleEvents_Futured(); + + try ( + TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath( + AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + TlsContext tlsContext = new TlsContext(tlsOptions);) { + Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l); + builder.withLifecycleEvents(events); + builder.withTlsContext(tlsContext); + ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder(); + connectBuilder.withClientId("test/MQTT5to3Adapter" + UUID.randomUUID().toString()); + builder.withConnectOptions(connectBuilder.build()); + try (Mqtt5Client client = new Mqtt5Client(builder.build()); + MqttClientConnection connection = new MqttClientConnection(client, null);) { + Mqtt3Connect(client); + Mqtt3ConnectionDisconnect(); + Mqtt3ConnectionClose(); + } + } + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + + /* Happy path. Websocket connection with minimal configuration */ + @Test + public void TestWebsocketMinimalConnectThroughMqtt311() { + skipIfNetworkUnavailable(); + Assume.assumeNotNull(AWS_TEST_MQTT5_WS_MQTT_HOST, AWS_TEST_MQTT5_WS_MQTT_PORT); + try { + + try ( + EventLoopGroup elg = new EventLoopGroup(1); + HostResolver hr = new HostResolver(elg); + ClientBootstrap bootstrap = new ClientBootstrap(elg, hr);) { + + LifecycleEvents_Futured events = new LifecycleEvents_Futured(); + + Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder( + AWS_TEST_MQTT5_WS_MQTT_HOST, Long.parseLong(AWS_TEST_MQTT5_WS_MQTT_PORT)); + builder.withLifecycleEvents(events); + builder.withBootstrap(bootstrap); + ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder(); + connectBuilder.withClientId("test/MQTT5to3Adapter" + UUID.randomUUID().toString()); + builder.withConnectOptions(connectBuilder.build()); + + Consumer websocketTransform = new Consumer() { + @Override + public void accept(Mqtt5WebsocketHandshakeTransformArgs t) { + t.complete(t.getHttpRequest()); + } + }; + builder.withWebsocketHandshakeTransform(websocketTransform); + + try (Mqtt5Client client = new Mqtt5Client(builder.build()); + MqttClientConnection connection = new MqttClientConnection(client, null);) { + Mqtt3Connect(client); + Mqtt3ConnectionDisconnect(); + Mqtt3ConnectionClose(); + } + } + + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + + /* Websocket connection with HttpProxyOptions */ + @Test + public void TestWebsocketHttpProxyConnectThroughMqtt311() { + skipIfNetworkUnavailable(); + Assume.assumeNotNull( + AWS_TEST_MQTT5_WS_MQTT_TLS_HOST, AWS_TEST_MQTT5_WS_MQTT_TLS_PORT, + AWS_TEST_MQTT5_PROXY_HOST, AWS_TEST_MQTT5_PROXY_PORT); + try { + LifecycleEvents_Futured events = new LifecycleEvents_Futured(); + + EventLoopGroup elg = new EventLoopGroup(1); + HostResolver hr = new HostResolver(elg); + ClientBootstrap bootstrap = new ClientBootstrap(elg, hr); + + Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder( + AWS_TEST_MQTT5_WS_MQTT_TLS_HOST, Long.parseLong(AWS_TEST_MQTT5_WS_MQTT_TLS_PORT)); + builder.withLifecycleEvents(events); + builder.withBootstrap(bootstrap); + + TlsContextOptions tlsOptions = TlsContextOptions.createDefaultClient(); + tlsOptions.withVerifyPeer(false); + TlsContext tlsContext = new TlsContext(tlsOptions); + builder.withTlsContext(tlsContext); + ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder(); + connectBuilder.withClientId("test/MQTT5to3Adapter" + UUID.randomUUID().toString()); + builder.withConnectOptions(connectBuilder.build()); + + Consumer websocketTransform = new Consumer() { + @Override + public void accept(Mqtt5WebsocketHandshakeTransformArgs t) { + t.complete(t.getHttpRequest()); + } + }; + builder.withWebsocketHandshakeTransform(websocketTransform); + + HttpProxyOptions proxyOptions = new HttpProxyOptions(); + proxyOptions.setHost(AWS_TEST_MQTT5_PROXY_HOST); + proxyOptions.setPort(Integer.parseInt(AWS_TEST_MQTT5_PROXY_PORT)); + proxyOptions.setConnectionType(HttpProxyConnectionType.Tunneling); + builder.withHttpProxyOptions(proxyOptions); + + Mqtt5Client client = new Mqtt5Client(builder.build()); + + Mqtt3Connect(client); + Mqtt3ConnectionDisconnect(); + Mqtt3ConnectionClose(); + + client.close(); + tlsContext.close(); + tlsOptions.close(); + elg.close(); + hr.close(); + bootstrap.close(); + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + + /**************************************************************** + * OPERATION TEST CASE + ****************************************************************/ + @Test + public void TestOperationSubUnsub() { + skipIfNetworkUnavailable(); + Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, + AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + String testUUID = UUID.randomUUID().toString(); + String testTopic = "test/MQTT5to3Adapter_Binding_Java_" + testUUID; + String clientId = "test/MQTT5TO3Adapter_ClientId" + testUUID; + String testPayload = "PUBLISH ME!"; + + Consumer messageHandler = (message) -> { + byte[] payload = message.getPayload(); + try { + assertEquals(testTopic, message.getTopic()); + String contents = new String(payload, "UTF-8"); + assertEquals("Message is intact", testPayload, contents); + } catch (Exception ex) { + fail("Unable to decode payload: " + ex.getMessage()); + } + }; + + try { + Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l); + LifecycleEvents_Futured events = new LifecycleEvents_Futured(); + builder.withLifecycleEvents(events); + + TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath( + AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + TlsContext tlsContext = new TlsContext(tlsOptions); + tlsOptions.close(); + builder.withTlsContext(tlsContext); + + PublishEvents_Futured publishEvents = new PublishEvents_Futured(); + builder.withPublishEvents(publishEvents); + ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder(); + connectBuilder.withClientId(clientId); + builder.withConnectOptions(connectBuilder.build()); + + try (Mqtt5Client client = new Mqtt5Client(builder.build()); + MqttClientConnection connection = new MqttClientConnection(client, null);) { + connection.onMessage(messageHandler); + client.start(); + events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + CompletableFuture receivedFuture = new CompletableFuture<>(); + Consumer subscriberMessageHandler = (message) -> { + receivedFuture.complete(message); + }; + + CompletableFuture subscribed = connection.subscribe(testTopic, QualityOfService.AT_LEAST_ONCE, + subscriberMessageHandler); + subscribed.thenApply(unused -> subsAcked++); + int packetId = subscribed.get(); + + assertNotSame(0, packetId); + assertEquals("Single subscription", 1, subsAcked); + + MqttMessage message = new MqttMessage(testTopic, testPayload.getBytes(), QualityOfService.AT_LEAST_ONCE, + false); + CompletableFuture published = connection.publish(message); + published.thenApply(unused -> pubsAcked++); + packetId = published.get(); + + assertNotSame(0, packetId); + assertEquals("Published", 1, pubsAcked); + + published = connection.publish(message); + published.thenApply(unused -> pubsAcked++); + packetId = published.get(); + + assertNotSame(0, packetId); + assertEquals("Published", 2, pubsAcked); + + MqttMessage received = receivedFuture.get(); + assertEquals("Received", message.getTopic(), received.getTopic()); + assertArrayEquals("Received", message.getPayload(), received.getPayload()); + assertEquals("Received", message.getQos(), received.getQos()); + assertEquals("Received", message.getRetain(), received.getRetain()); + + CompletableFuture unsubscribed = connection.unsubscribe(testTopic); + unsubscribed.thenApply(unused -> subsAcked--); + packetId = unsubscribed.get(); + + assertNotSame(0, packetId); + + client.stop(new DisconnectPacketBuilder().build()); + } + + if (tlsContext != null) { + tlsContext.close(); + } + + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + + /**************************************************************** + * MQTT311 LIFECYCLE CALLBACK TEST CASE + ****************************************************************/ + @Test + public void TestConnectionSuccessCallback() { + skipIfNetworkUnavailable(); + Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, + AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + try { + LifecycleEvents_Futured events = new LifecycleEvents_Futured(); + + try ( + TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath( + AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + TlsContext tlsContext = new TlsContext(tlsOptions);) { + Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l); + builder.withLifecycleEvents(events); + builder.withTlsContext(tlsContext); + ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder(); + connectBuilder.withClientId("test/MQTT5to3Adapter" + UUID.randomUUID().toString()); + builder.withConnectOptions(connectBuilder.build()); + try (Mqtt5Client client = new Mqtt5Client(builder.build())) { + connection = new MqttClientConnection(client, this.events); + connection.connect(); + onConnectionSuccessFuture.get(); + Mqtt3ConnectionDisconnect(); + Mqtt3ConnectionClose(); + } + } + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + + @Test + public void TestConnectionFailureCallback() { + skipIfNetworkUnavailable(); + Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, + AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, + AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + try { + LifecycleEvents_Futured events = new LifecycleEvents_Futured(); + + try ( + TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath( + AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + TlsContext tlsContext = new TlsContext(tlsOptions);) { + Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder("badhost", + 8883l); + builder.withLifecycleEvents(events); + builder.withTlsContext(tlsContext); + ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder(); + connectBuilder.withClientId("test/MQTT5to3Adapter" + + UUID.randomUUID().toString()); + builder.withConnectOptions(connectBuilder.build()); + try (Mqtt5Client client = new Mqtt5Client(builder.build())) { + connection = new MqttClientConnection(client, this.events); + connection.connect(); + onConnectionFailureFuture.get(); + Mqtt3ConnectionDisconnect(); + Mqtt3ConnectionClose(); + } + } + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + + /**************************************************************** + * MQTT311 ADAPTER TEST CASE + ****************************************************************/ + + @Test + public void TestMultipleAdapter() { + skipIfNetworkUnavailable(); + Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, + AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + String testUUID = UUID.randomUUID().toString(); + String testTopic1 = "test/MQTT5to3Adapter1" + testUUID; + String testTopic2 = "test/MQTT5to3Adapter2" + testUUID; + String clientId = "test/MQTT5TO3Adapter_ClientId" + testUUID; + + String testPayload = "PUBLISH ME!"; + + try { + Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l); + LifecycleEvents_Futured events = new LifecycleEvents_Futured(); + builder.withLifecycleEvents(events); + + TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath( + AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY); + TlsContext tlsContext = new TlsContext(tlsOptions); + tlsOptions.close(); + builder.withTlsContext(tlsContext); + + PublishEvents_Futured publishEvents = new PublishEvents_Futured(); + builder.withPublishEvents(publishEvents); + ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder(); + connectBuilder.withClientId(clientId); + builder.withConnectOptions(connectBuilder.build()); + + try (Mqtt5Client client = new Mqtt5Client(builder.build()); + MqttClientConnection connection1 = new MqttClientConnection(client, null); + MqttClientConnection connection2 = new MqttClientConnection(client, null);) { + // Connect + client.start(); + events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + + CompletableFuture receivedFuture1 = new CompletableFuture<>(); + Consumer subscriberMessageHandler1 = (message) -> { + receivedFuture1.complete(message); + }; + CompletableFuture receivedFuture2 = new CompletableFuture<>(); + Consumer subscriberMessageHandler2 = (message) -> { + receivedFuture2.complete(message); + }; + + CompletableFuture subscribed = connection1.subscribe(testTopic1, + QualityOfService.AT_LEAST_ONCE, + subscriberMessageHandler1); + int packetId = subscribed.get(); + assertNotSame(0, packetId); + + subscribed = connection2.subscribe(testTopic2, QualityOfService.AT_LEAST_ONCE, + subscriberMessageHandler2); + packetId = subscribed.get(); + assertNotSame(0, packetId); + + MqttMessage message1 = new MqttMessage(testTopic1, testPayload.getBytes(), + QualityOfService.AT_LEAST_ONCE, + false); + CompletableFuture published = connection1.publish(message1); + packetId = published.get(); + + assertNotSame(0, packetId); + + MqttMessage message2 = new MqttMessage(testTopic2, testPayload.getBytes(), + QualityOfService.AT_LEAST_ONCE, + false); + published = connection2.publish(message2); + packetId = published.get(); + + assertNotSame(0, packetId); + + assertEquals("Received", message1.getTopic(), receivedFuture1.get().getTopic()); + assertArrayEquals("Received", message1.getPayload(), receivedFuture1.get().getPayload()); + assertEquals("Received", message2.getQos(), receivedFuture2.get().getQos()); + assertEquals("Received", message2.getRetain(), receivedFuture2.get().getRetain()); + + client.stop(new DisconnectPacketBuilder().build()); + } + + if (tlsContext != null) { + tlsContext.close(); + } + + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + +};