diff --git a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/handler/v3/MQTT3SessionHandler.java b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/handler/v3/MQTT3SessionHandler.java index 8d6cdf0dd..35035dd6d 100644 --- a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/handler/v3/MQTT3SessionHandler.java +++ b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/handler/v3/MQTT3SessionHandler.java @@ -160,7 +160,7 @@ abstract class MQTT3SessionHandler extends MQTTMessageHandler implements IMQTT3S private final long idleTimeoutNanos; private final boolean cleanSession; private final boolean sessionPresent; - private final WillMessage willMessage; + private WillMessage willMessage; private final LinkedMap unconfirmedQoS1Messages = new LinkedMap<>(); // key: id used in qos2 protocol interaction, value: message's original messageId private final LinkedMap unconfirmedQoS2Indexes = new LinkedMap<>(); @@ -232,6 +232,16 @@ public void handlerAdded(ChannelHandlerContext ctx) { @Override public void channelInactive(ChannelHandlerContext ctx) { super.channelInactive(ctx); + if (idleTimeoutTask != null) { + idleTimeoutTask.cancel(true); + } + if (resendUnconfirmedTask != null) { + resendUnconfirmedTask.cancel(true); + } + if (willMessage != null) { + submitBgTask(() -> distWillMessage(willMessage)); + } + sessionCtx.localSessionRegistry.remove(channelId(), this); sessionRegister.stop(); tenantMeter.recordCount(MqttDisconnectCount); @@ -292,18 +302,12 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { if (evt instanceof ConnectionWillClose) { - if (idleTimeoutTask != null) { - idleTimeoutTask.cancel(true); - } - if (resendUnconfirmedTask != null) { - resendUnconfirmedTask.cancel(true); - } log.debug("Session closed: lwt={}, reason={}", willMessage, ((ConnectionWillClose) evt).reason); // don't send last will if disconnect by client, MQTT Spec 3.1.2.5 or kicked - if (willMessage != null && !(((ConnectionWillClose) evt).reason instanceof ByClient) - && (!(((ConnectionWillClose) evt).reason instanceof Kicked) - || !isSelfKick((Kicked) ((ConnectionWillClose) evt).reason))) { - submitBgTask(() -> distWillMessage(willMessage)); + if (((ConnectionWillClose) evt).reason instanceof ByClient + || (((ConnectionWillClose) evt).reason instanceof Kicked + && isSelfKick((Kicked) ((ConnectionWillClose) evt).reason))) { + willMessage = null; } } ctx.fireUserEventTriggered(evt); diff --git a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/MQTTLastWillTest.java b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/MQTTLastWillTest.java index 5eba80f04..716f8633a 100644 --- a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/MQTTLastWillTest.java +++ b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/MQTTLastWillTest.java @@ -49,6 +49,48 @@ public void resetMocks() { clearInvocations(mqttTest.eventCollector); } + @Test(groups = "integration") + public void lastWillByCloseClientForcibly() { + String deviceKey = "testDevice"; + String userName = tenantId + "/" + deviceKey; + String willTopic = "willTopic"; + ByteString willPayload = ByteString.copyFromUtf8("bye"); + when(mqttTest.authProvider.auth(any(MQTT3AuthData.class))) + .thenReturn(CompletableFuture.completedFuture(MQTT3AuthResult.newBuilder() + .setOk(Ok.newBuilder() + .setTenantId(tenantId) + .setUserId(deviceKey) + .build()) + .build())); + when(mqttTest.authProvider.check(any(ClientInfo.class), any(MQTTAction.class))) + .thenReturn(CompletableFuture.completedFuture(true)); + + + MqttConnectOptions lwtPubConnOpts = new MqttConnectOptions(); + lwtPubConnOpts.setCleanSession(true); + lwtPubConnOpts.setWill(willTopic, willPayload.toByteArray(), 0, false); + lwtPubConnOpts.setUserName(userName); + MqttTestClient lwtPubClient = new MqttTestClient(MQTTTest.brokerURI, "lwtPubclient"); + lwtPubClient.connect(lwtPubConnOpts); + + MqttConnectOptions lwtSubConnOpts = new MqttConnectOptions(); + lwtSubConnOpts.setCleanSession(true); + lwtSubConnOpts.setUserName(userName); + + MqttTestClient lwtSubClient = new MqttTestClient(MQTTTest.brokerURI, "lwtSubClient"); + lwtSubClient.connect(lwtSubConnOpts); + Observable topicSub = lwtSubClient.subscribe(willTopic, 0); + + log.info("Close client forcibly"); + lwtPubClient.closeForcibly(); + + MqttMsg msg = topicSub.blockingFirst(); + assertEquals(msg.topic, willTopic); + assertEquals(msg.qos, 0); + assertEquals(msg.payload, willPayload); + assertFalse(msg.isRetain); + } + @Test(groups = "integration") public void lastWillQoS1() { String deviceKey = "testDevice"; diff --git a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/client/MqttTestClient.java b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/client/MqttTestClient.java index 891fa706d..01df870dc 100644 --- a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/client/MqttTestClient.java +++ b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/client/MqttTestClient.java @@ -90,7 +90,7 @@ public void close() { public void closeForcibly() { try { - client.disconnectForcibly(); + client.disconnectForcibly(0, 0, false); client.close(true); } catch (Throwable e) { // ignore intentionally;