Skip to content

Commit

Permalink
Merge pull request #248 from qianwj/authentication-exchange
Browse files Browse the repository at this point in the history
Support auth message both client side and server side.
  • Loading branch information
vietj authored Oct 1, 2024
2 parents bdadb7b + 1a1033f commit d69618b
Show file tree
Hide file tree
Showing 13 changed files with 474 additions and 0 deletions.
31 changes: 31 additions & 0 deletions src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,22 @@ that will be called when the server is really closed.
{@link examples.VertxMqttServerExamples#example9}
----

=== Handling client auth packet/Sending AUTH packet to remote client(Only in MQTT version 5)

After a connection is established between client and server, the client can send an auth packet to server using the
AUTH message. The {@link io.vertx.mqtt.MqttEndpoint} interface allows to specify a handler for the incoming auth packet
using the {@link io.vertx.mqtt.MqttEndpoint#authenticationExchangeHandler(io.vertx.core.Handler)} method. Such handler
receives an instance of the {@link io.vertx.mqtt.messages.MqttAuthenticationExchangeMessage} interface which brings the
reason code, the authentication method and data. The server could continue to send AUTH packet using
the {@link io.vertx.mqtt.MqttEndpoint#authenticationExchange(io.vertx.mqtt.messages.MqttAuthenticationExchangeMessage)}
for authentication or just passed it.

[source,$lang]
----
{@link examples.VertxMqttServerExamples#example14}
----


=== Automatic clean-up in verticles

If you’re creating MQTT servers from inside verticles, those servers will be automatically closed when the verticle is undeployed.
Expand Down Expand Up @@ -291,6 +307,21 @@ Let's take a look at the example:
----
In the example we send message to topic with name "temperature".

=== Handling server auth request/Sending AUTH packet to server(Only in MQTT version 5)

After a connection is established between client and server, the client can send an auth request to server using
the {@link io.vertx.mqtt.MqttClient#authenticationExchange(io.vertx.mqtt.messages.MqttAuthenticationExchangeMessage)}
for authentication. The Server may return an AUTH packet. The {@link io.vertx.mqtt.MqttClient} interface allows to
specify a handler for the incoming auth packet using
the {@link io.vertx.mqtt.MqttClient#authenticationExchangeHandler(io.vertx.core.Handler)} method.
Such handler receives an instance of the {@link io.vertx.mqtt.messages.MqttAuthenticationExchangeMessage} interface
which brings the reason code, the authentication method and data.

[source,$lang]
----
{@link examples.VertxMqttClientExamples#example10}
----

=== Keep connection with server alive
In order to keep connection with server you should time to time send something to server otherwise server will close the connection.
The right way to keep connection alive is a {@link io.vertx.mqtt.MqttClient#ping()} method.
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/examples/VertxMqttClientExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package examples;

import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
Expand All @@ -24,6 +25,8 @@
import io.vertx.core.net.PemTrustOptions;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mqtt.messages.MqttAuthenticationExchangeMessage;
import io.vertx.mqtt.messages.codes.MqttAuthenticateReasonCode;

public class VertxMqttClientExamples {

Expand Down Expand Up @@ -153,6 +156,19 @@ public void example9(MqttClient client) {
});
}

/**
* Example for authenticationExchangeHandler and authenticationExchange method demonstration
*
* @param client
*/
public void example10(MqttClient client) {
client.authenticationExchange(MqttAuthenticationExchangeMessage.create(MqttAuthenticateReasonCode.SUCCESS, MqttProperties.NO_PROPERTIES));
client.authenticationExchangeHandler(auth -> {
//The handler will be called time to time by default
System.out.println("We have just received AUTH packet: " + auth.reasonCode());
});
}

public void tls(Vertx vertx, String algo) {
MqttClientOptions options = new MqttClientOptions();
options
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/examples/VertxMqttServerExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
import io.vertx.mqtt.MqttTopicSubscription;
import io.vertx.mqtt.messages.MqttAuthenticationExchangeMessage;
import io.vertx.mqtt.messages.codes.MqttAuthenticateReasonCode;
import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode;

import java.nio.charset.Charset;
Expand Down Expand Up @@ -325,4 +327,17 @@ public void example13(Vertx vertx) {
}
});
}

/**
* Example for authentication exchange
*
* @param endpoint
*/
public void example14(MqttEndpoint endpoint) {
endpoint.authenticationExchange(MqttAuthenticationExchangeMessage.create(MqttAuthenticateReasonCode.SUCCESS, MqttProperties.NO_PROPERTIES));
// handling auth from client
endpoint.authenticationExchangeHandler(auth -> {
System.out.println("AUTH packet received from client. code: " + auth.reasonCode());
});
}
}
18 changes: 18 additions & 0 deletions src/main/java/io/vertx/mqtt/MqttClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.impl.MqttClientImpl;
import io.vertx.mqtt.messages.MqttAuthenticationExchangeMessage;
import io.vertx.mqtt.messages.MqttConnAckMessage;
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.MqttSubAckMessage;
Expand Down Expand Up @@ -202,6 +203,23 @@ static MqttClient create(Vertx vertx) {
*/
Future<Integer> unsubscribe(List<String> topics);

/**
* Sets handler which will be called after AUTH packet receiving
*
* @param authenticationExchangeHandler handler to call
* @return current MQTT client instance
*/
@Fluent
MqttClient authenticationExchangeHandler(Handler<MqttAuthenticationExchangeMessage> authenticationExchangeHandler);

/**
* It is used for Enhanced Authentication and is able to carry an authentication method and authentication data.
*
* @param message authentication exchange message
* @return a {@code Future} completed after AUTH packet sent
*/
Future<Void> authenticationExchange(MqttAuthenticationExchangeMessage message);

/**
* Sets handler which will be called after PINGRESP packet receiving
*
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/io/vertx/mqtt/MqttEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.MqttSubscribeMessage;
import io.vertx.mqtt.messages.MqttUnsubscribeMessage;
import io.vertx.mqtt.messages.MqttAuthenticationExchangeMessage;
import io.vertx.mqtt.messages.codes.MqttDisconnectReasonCode;
import io.vertx.mqtt.messages.codes.MqttPubAckReasonCode;
import io.vertx.mqtt.messages.codes.MqttPubCompReasonCode;
Expand Down Expand Up @@ -337,6 +338,16 @@ public interface MqttEndpoint {
@Fluent
MqttEndpoint publishCompletionMessageHandler(Handler<MqttPubCompMessage> handler);

/**
* Set the auth handler on the MQTT endpoint. This handler is called when a AUTH message is
* received by the remote MQTT client
*
* @param handler the handler
* @return a reference to this, so the API can be used fluently
*/
@Fluent
MqttEndpoint authenticationExchangeHandler(Handler<MqttAuthenticationExchangeMessage> handler);

/**
* Set the pingreq handler on the MQTT endpoint. This handler is called when a PINGREQ
* message is received by the remote MQTT client. In any case the endpoint sends the
Expand Down Expand Up @@ -575,6 +586,16 @@ public interface MqttEndpoint {
@GenIgnore(GenIgnore.PERMITTED_TYPE)
Future<Integer> publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain, int messageId, MqttProperties properties);


/**
* Sends the AUTH message to the remote MQTT client
*
* @param message
* @return a reference to this, so the API can be used fluently
*/
@Fluent
MqttEndpoint authenticationExchange(MqttAuthenticationExchangeMessage message);

/**
* Sends the PINGRESP message to the remote MQTT client
*
Expand Down
44 changes: 44 additions & 0 deletions src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@
import io.netty.handler.codec.mqtt.MqttMessageIdAndPropertiesVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.handler.codec.mqtt.MqttUnsubscribePayload;
import io.netty.handler.codec.mqtt.MqttReasonCodeAndPropertiesVariableHeader;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
Expand All @@ -58,6 +60,8 @@
import io.vertx.mqtt.messages.MqttMessage;
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.MqttSubAckMessage;
import io.vertx.mqtt.messages.MqttAuthenticationExchangeMessage;
import io.vertx.mqtt.messages.codes.MqttAuthenticateReasonCode;
import io.vertx.mqtt.messages.impl.MqttPublishMessageImpl;

import java.io.UnsupportedEncodingException;
Expand Down Expand Up @@ -109,6 +113,9 @@ private enum Status { CLOSED, CONNECTING, CONNECTED, CLOSING }
private Handler<MqttPublishMessage> publishHandler;
// handler to call when a subscribe request is completed
private Handler<MqttSubAckMessage> subscribeCompletionHandler;
// handler to call when an auth message comes in
private Handler<MqttAuthenticationExchangeMessage> authenticationExchangeHandler;

// handler to call when a connection request is completed
private Promise<MqttConnAckMessage> connectPromise;
// handler to call when a connection disconnects
Expand Down Expand Up @@ -565,6 +572,24 @@ public Future<Integer> unsubscribe(List<String> topics) {
return ctx.succeededFuture(variableHeader.messageId());
}

private synchronized Handler<MqttAuthenticationExchangeMessage> authenticationExchangeHandler() {
return this.authenticationExchangeHandler;
}

@Override
public MqttClient authenticationExchangeHandler(Handler<MqttAuthenticationExchangeMessage> authenticationExchangeHandler) {
this.authenticationExchangeHandler = authenticationExchangeHandler;
return this;
}

@Override
public Future<Void> authenticationExchange(MqttAuthenticationExchangeMessage message) {
io.netty.handler.codec.mqtt.MqttMessage auth = MqttMessageBuilders.auth()
.reasonCode(message.reasonCode().value()).properties(message.properties()).build();
this.write(auth);
return ctx.succeededFuture();
}

/**
* See {@link MqttClient#pingResponseHandler(Handler)} for more details
*/
Expand Down Expand Up @@ -902,6 +927,13 @@ private void handleMessage(ChannelHandlerContext chctx, Object msg) {
handleUnsuback(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId());
break;

case AUTH:
MqttReasonCodeAndPropertiesVariableHeader header = (MqttReasonCodeAndPropertiesVariableHeader) mqttMessage.variableHeader();
MqttAuthenticateReasonCode reasonCode = MqttAuthenticateReasonCode.valueOf(header.reasonCode());
MqttAuthenticationExchangeMessage message = MqttAuthenticationExchangeMessage.create(reasonCode, header.properties());
handleAuth(message);
break;

case PINGRESP:
handlePingresp();
break;
Expand Down Expand Up @@ -1157,6 +1189,18 @@ private void handlePubrel(int pubrelMessageId) {

}

/**
* Used for calling the auth handler when the server sent an AUTH message
*
* @param msg AUTH message
*/
private void handleAuth(MqttAuthenticationExchangeMessage msg) {
Handler<MqttAuthenticationExchangeMessage> handler = this.authenticationExchangeHandler();
if (handler != null) {
handler.handle(msg);
}
}

/**
* Used for calling the connect handler when the server replies to the request
*
Expand Down
46 changes: 46 additions & 0 deletions src/main/java/io/vertx/mqtt/impl/MqttEndpointImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdAndPropertiesVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPubReplyMessageVariableHeader;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
Expand All @@ -49,17 +50,20 @@
import io.vertx.mqtt.messages.MqttPubCompMessage;
import io.vertx.mqtt.messages.MqttPubRecMessage;
import io.vertx.mqtt.messages.MqttPubRelMessage;
import io.vertx.mqtt.messages.MqttAuthenticationExchangeMessage;
import io.vertx.mqtt.messages.codes.MqttDisconnectReasonCode;
import io.vertx.mqtt.messages.codes.MqttPubAckReasonCode;
import io.vertx.mqtt.messages.codes.MqttPubCompReasonCode;
import io.vertx.mqtt.messages.codes.MqttPubRecReasonCode;
import io.vertx.mqtt.messages.codes.MqttPubRelReasonCode;
import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode;
import io.vertx.mqtt.messages.codes.MqttUnsubAckReasonCode;
import io.vertx.mqtt.messages.codes.MqttAuthenticateReasonCode;

import javax.net.ssl.SSLSession;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -106,6 +110,9 @@ public class MqttEndpointImpl implements MqttEndpoint {
// handler to call when a pubcomp message comes in
private Handler<Integer> pubcompHandler;
private Handler<MqttPubCompMessage> pubcompHandlerWithMessage;
// handler to call when an auth message comes in
private Handler<MqttAuthenticationExchangeMessage> authHandler;

// handler to call when a disconnect request comes in
private Handler<Void> disconnectHandler;
private Handler<MqttDisconnectMessage> disconnectHandlerWithMessage;
Expand Down Expand Up @@ -349,6 +356,15 @@ public MqttEndpointImpl publishCompletionMessageHandler(Handler<MqttPubCompMessa
}
}

@Override
public MqttEndpoint authenticationExchangeHandler(Handler<MqttAuthenticationExchangeMessage> handler) {
synchronized (this.conn) {
this.checkClosed();
this.authHandler = handler;
return this;
}
}


public MqttEndpointImpl pingHandler(Handler<Void> handler) {

Expand Down Expand Up @@ -585,6 +601,25 @@ public Future<Integer> publish(String topic, Buffer payload, MqttQoS qosLevel, b
return this.write(publish).map(variableHeader.packetId());
}

@Override
public MqttEndpoint authenticationExchange(MqttAuthenticationExchangeMessage message) {
if (this.protocolVersion != MqttVersion.MQTT_5.protocolLevel()) {
Optional<MqttVersion> current = Optional.empty();
for (MqttVersion value : MqttVersion.values()) {
if (this.protocolVersion == value.protocolLevel()) {
current = Optional.of(value);
break;
}
}
throw new IllegalArgumentException("Enhanced authentication can only be sent under the MQTTv5 protocol, " +
"and the current client version(" + current.map(MqttVersion::toString).orElse("Unknown") + ") is not applicable.");
}
io.netty.handler.codec.mqtt.MqttMessage auth = MqttMessageBuilders.auth()
.reasonCode(message.reasonCode().value()).properties(message.properties()).build();
this.write(auth);
return this;
}

public MqttEndpointImpl pong() {

MqttFixedHeader fixedHeader =
Expand Down Expand Up @@ -762,6 +797,17 @@ void handlePubcomp(int pubcompMessageId, MqttPubCompReasonCode code, MqttPropert
}
}

/**
* Used for calling the auth handler when the remote MQTT client with auth
*/
void handleAuth(MqttAuthenticateReasonCode code, MqttProperties properties) {
synchronized (this.conn) {
if (this.authHandler != null) {
this.authHandler.handle(MqttAuthenticationExchangeMessage.create(code, properties));
}
}
}

/**
* Used internally for handling the pinreq from the remote MQTT client
*/
Expand Down
Loading

0 comments on commit d69618b

Please sign in to comment.