Skip to content

Commit

Permalink
fix a bug which causes will message cannot be sent when client is clo…
Browse files Browse the repository at this point in the history
…sed forcibly (#63)
  • Loading branch information
zhongyuan17 authored Mar 12, 2024
1 parent 87eaad2 commit 5595e79
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ abstract class MQTT3SessionHandler extends MQTTMessageHandler implements IMQTT3S
private final long idleTimeoutNanos;
private final boolean cleanSession;
private final boolean sessionPresent;
private final WillMessage willMessage;
private WillMessage willMessage;
private final LinkedMap<Integer, UnconfirmedQoS1Message> unconfirmedQoS1Messages = new LinkedMap<>();
// key: id used in qos2 protocol interaction, value: message's original messageId
private final LinkedMap<Integer, QoS2MessageKey> unconfirmedQoS2Indexes = new LinkedMap<>();
Expand Down Expand Up @@ -232,6 +232,16 @@ public void handlerAdded(ChannelHandlerContext ctx) {
@Override
public void channelInactive(ChannelHandlerContext ctx) {
super.channelInactive(ctx);
if (idleTimeoutTask != null) {
idleTimeoutTask.cancel(true);
}
if (resendUnconfirmedTask != null) {
resendUnconfirmedTask.cancel(true);
}
if (willMessage != null) {
submitBgTask(() -> distWillMessage(willMessage));
}

sessionCtx.localSessionRegistry.remove(channelId(), this);
sessionRegister.stop();
tenantMeter.recordCount(MqttDisconnectCount);
Expand Down Expand Up @@ -292,18 +302,12 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof ConnectionWillClose) {
if (idleTimeoutTask != null) {
idleTimeoutTask.cancel(true);
}
if (resendUnconfirmedTask != null) {
resendUnconfirmedTask.cancel(true);
}
log.debug("Session closed: lwt={}, reason={}", willMessage, ((ConnectionWillClose) evt).reason);
// don't send last will if disconnect by client, MQTT Spec 3.1.2.5 or kicked
if (willMessage != null && !(((ConnectionWillClose) evt).reason instanceof ByClient)
&& (!(((ConnectionWillClose) evt).reason instanceof Kicked)
|| !isSelfKick((Kicked) ((ConnectionWillClose) evt).reason))) {
submitBgTask(() -> distWillMessage(willMessage));
if (((ConnectionWillClose) evt).reason instanceof ByClient
|| (((ConnectionWillClose) evt).reason instanceof Kicked
&& isSelfKick((Kicked) ((ConnectionWillClose) evt).reason))) {
willMessage = null;
}
}
ctx.fireUserEventTriggered(evt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,48 @@ public void resetMocks() {
clearInvocations(mqttTest.eventCollector);
}

@Test(groups = "integration")
public void lastWillByCloseClientForcibly() {
String deviceKey = "testDevice";
String userName = tenantId + "/" + deviceKey;
String willTopic = "willTopic";
ByteString willPayload = ByteString.copyFromUtf8("bye");
when(mqttTest.authProvider.auth(any(MQTT3AuthData.class)))
.thenReturn(CompletableFuture.completedFuture(MQTT3AuthResult.newBuilder()
.setOk(Ok.newBuilder()
.setTenantId(tenantId)
.setUserId(deviceKey)
.build())
.build()));
when(mqttTest.authProvider.check(any(ClientInfo.class), any(MQTTAction.class)))
.thenReturn(CompletableFuture.completedFuture(true));


MqttConnectOptions lwtPubConnOpts = new MqttConnectOptions();
lwtPubConnOpts.setCleanSession(true);
lwtPubConnOpts.setWill(willTopic, willPayload.toByteArray(), 0, false);
lwtPubConnOpts.setUserName(userName);
MqttTestClient lwtPubClient = new MqttTestClient(MQTTTest.brokerURI, "lwtPubclient");
lwtPubClient.connect(lwtPubConnOpts);

MqttConnectOptions lwtSubConnOpts = new MqttConnectOptions();
lwtSubConnOpts.setCleanSession(true);
lwtSubConnOpts.setUserName(userName);

MqttTestClient lwtSubClient = new MqttTestClient(MQTTTest.brokerURI, "lwtSubClient");
lwtSubClient.connect(lwtSubConnOpts);
Observable<MqttMsg> topicSub = lwtSubClient.subscribe(willTopic, 0);

log.info("Close client forcibly");
lwtPubClient.closeForcibly();

MqttMsg msg = topicSub.blockingFirst();
assertEquals(msg.topic, willTopic);
assertEquals(msg.qos, 0);
assertEquals(msg.payload, willPayload);
assertFalse(msg.isRetain);
}

@Test(groups = "integration")
public void lastWillQoS1() {
String deviceKey = "testDevice";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void close() {

public void closeForcibly() {
try {
client.disconnectForcibly();
client.disconnectForcibly(0, 0, false);
client.close(true);
} catch (Throwable e) {
// ignore intentionally;
Expand Down

0 comments on commit 5595e79

Please sign in to comment.