Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix V2 LWT #63

Merged
merged 1 commit into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ abstract class MQTT3SessionHandler extends MQTTMessageHandler implements IMQTT3S
private final long idleTimeoutNanos;
private final boolean cleanSession;
private final boolean sessionPresent;
private final WillMessage willMessage;
private WillMessage willMessage;
private final LinkedMap<Integer, UnconfirmedQoS1Message> unconfirmedQoS1Messages = new LinkedMap<>();
// key: id used in qos2 protocol interaction, value: message's original messageId
private final LinkedMap<Integer, QoS2MessageKey> unconfirmedQoS2Indexes = new LinkedMap<>();
Expand Down Expand Up @@ -232,6 +232,16 @@ public void handlerAdded(ChannelHandlerContext ctx) {
@Override
public void channelInactive(ChannelHandlerContext ctx) {
super.channelInactive(ctx);
if (idleTimeoutTask != null) {
idleTimeoutTask.cancel(true);
}
if (resendUnconfirmedTask != null) {
resendUnconfirmedTask.cancel(true);
}
if (willMessage != null) {
submitBgTask(() -> distWillMessage(willMessage));
}

sessionCtx.localSessionRegistry.remove(channelId(), this);
sessionRegister.stop();
tenantMeter.recordCount(MqttDisconnectCount);
Expand Down Expand Up @@ -292,18 +302,12 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof ConnectionWillClose) {
if (idleTimeoutTask != null) {
idleTimeoutTask.cancel(true);
}
if (resendUnconfirmedTask != null) {
resendUnconfirmedTask.cancel(true);
}
log.debug("Session closed: lwt={}, reason={}", willMessage, ((ConnectionWillClose) evt).reason);
// don't send last will if disconnect by client, MQTT Spec 3.1.2.5 or kicked
if (willMessage != null && !(((ConnectionWillClose) evt).reason instanceof ByClient)
&& (!(((ConnectionWillClose) evt).reason instanceof Kicked)
|| !isSelfKick((Kicked) ((ConnectionWillClose) evt).reason))) {
submitBgTask(() -> distWillMessage(willMessage));
if (((ConnectionWillClose) evt).reason instanceof ByClient
|| (((ConnectionWillClose) evt).reason instanceof Kicked
&& isSelfKick((Kicked) ((ConnectionWillClose) evt).reason))) {
willMessage = null;
}
}
ctx.fireUserEventTriggered(evt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,48 @@ public void resetMocks() {
clearInvocations(mqttTest.eventCollector);
}

@Test(groups = "integration")
public void lastWillByCloseClientForcibly() {
String deviceKey = "testDevice";
String userName = tenantId + "/" + deviceKey;
String willTopic = "willTopic";
ByteString willPayload = ByteString.copyFromUtf8("bye");
when(mqttTest.authProvider.auth(any(MQTT3AuthData.class)))
.thenReturn(CompletableFuture.completedFuture(MQTT3AuthResult.newBuilder()
.setOk(Ok.newBuilder()
.setTenantId(tenantId)
.setUserId(deviceKey)
.build())
.build()));
when(mqttTest.authProvider.check(any(ClientInfo.class), any(MQTTAction.class)))
.thenReturn(CompletableFuture.completedFuture(true));


MqttConnectOptions lwtPubConnOpts = new MqttConnectOptions();
lwtPubConnOpts.setCleanSession(true);
lwtPubConnOpts.setWill(willTopic, willPayload.toByteArray(), 0, false);
lwtPubConnOpts.setUserName(userName);
MqttTestClient lwtPubClient = new MqttTestClient(MQTTTest.brokerURI, "lwtPubclient");
lwtPubClient.connect(lwtPubConnOpts);

MqttConnectOptions lwtSubConnOpts = new MqttConnectOptions();
lwtSubConnOpts.setCleanSession(true);
lwtSubConnOpts.setUserName(userName);

MqttTestClient lwtSubClient = new MqttTestClient(MQTTTest.brokerURI, "lwtSubClient");
lwtSubClient.connect(lwtSubConnOpts);
Observable<MqttMsg> topicSub = lwtSubClient.subscribe(willTopic, 0);

log.info("Close client forcibly");
lwtPubClient.closeForcibly();

MqttMsg msg = topicSub.blockingFirst();
assertEquals(msg.topic, willTopic);
assertEquals(msg.qos, 0);
assertEquals(msg.payload, willPayload);
assertFalse(msg.isRetain);
}

@Test(groups = "integration")
public void lastWillQoS1() {
String deviceKey = "testDevice";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void close() {

public void closeForcibly() {
try {
client.disconnectForcibly();
client.disconnectForcibly(0, 0, false);
client.close(true);
} catch (Throwable e) {
// ignore intentionally;
Expand Down
Loading