Skip to content

Commit

Permalink
GH-3732: Fix NPE in Mqttv5PahoMessageDrivenChA
Browse files Browse the repository at this point in the history
Fixes #3732

The `Mqttv5PahoMessageDrivenChannelAdapter` unconditionally tries to (un)subscribe
to/from topics when the `mqqtClient` might not be initialized yet.

* Add `mqqtClient` initialization check before adding or removing topic.
Re-align logic with the `MqttPahoMessageDrivenChannelAdapter`

**Cherry-pick to `5.5.x`**
  • Loading branch information
Mikhail Polivakha authored and artembilan committed Mar 18, 2022
1 parent 69efbd3 commit c1d29ab
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.springframework.integration.mqtt.inbound;

import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -40,6 +41,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Trung Pham
* @author Mikhail Polivakha
*
* @since 4.0
*
Expand Down Expand Up @@ -331,15 +333,7 @@ public boolean equals(Object obj) {
return false;
}
Topic other = (Topic) obj;
if (this.topic == null) {
if (other.topic != null) {
return false;
}
}
else if (!this.topic.equals(other.topic)) {
return false;
}
return true;
return Objects.equals(this.topic, other.topic);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 the original author or authors.
* Copyright 2021-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -65,6 +65,7 @@
* See {@link #setPayloadType} for more information about type conversion.
*
* @author Artem Bilan
* @author Mikhail Polivakha
*
* @since 5.5.5
*
Expand Down Expand Up @@ -141,13 +142,15 @@ public void setHeaderMapper(HeaderMapper<MqttProperties> headerMapper) {
@Override
protected void onInit() {
super.onInit();
try {
this.mqttClient = new MqttAsyncClient(getUrl(), getClientId(), this.persistence);
this.mqttClient.setCallback(this);
this.mqttClient.setManualAcks(isManualAcks());
}
catch (MqttException ex) {
throw new BeanCreationException("Cannot create 'MqttAsyncClient' for: " + getComponentName(), ex);
if (this.mqttClient == null) {
try {
this.mqttClient = new MqttAsyncClient(getUrl(), getClientId(), this.persistence);
this.mqttClient.setCallback(this);
this.mqttClient.setManualAcks(isManualAcks());
}
catch (MqttException ex) {
throw new BeanCreationException("Cannot create 'MqttAsyncClient' for: " + getComponentName(), ex);
}
}
if (this.messageConverter == null) {
setMessageConverter(getBeanFactory()
Expand Down Expand Up @@ -189,8 +192,10 @@ protected void doStop() {
this.topicLock.lock();
String[] topics = getTopic();
try {
this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout());
this.mqttClient.disconnect().waitForCompletion(getCompletionTimeout());
if (this.mqttClient != null && this.mqttClient.isConnected()) {
this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout());
this.mqttClient.disconnect().waitForCompletion(getCompletionTimeout());
}
}
catch (MqttException ex) {
logger.error(ex, () -> "Error unsubscribing from " + Arrays.toString(topics));
Expand All @@ -204,7 +209,9 @@ protected void doStop() {
public void destroy() {
super.destroy();
try {
this.mqttClient.close(true);
if (this.mqttClient != null) {
this.mqttClient.close(true);
}
}
catch (MqttException ex) {
logger.error(ex, "Failed to close 'MqttAsyncClient'");
Expand All @@ -215,8 +222,10 @@ public void destroy() {
public void addTopic(String topic, int qos) {
this.topicLock.lock();
try {
this.mqttClient.subscribe(topic, qos).waitForCompletion(getCompletionTimeout());
super.addTopic(topic, qos);
if (this.mqttClient != null && this.mqttClient.isConnected()) {
this.mqttClient.subscribe(topic, qos).waitForCompletion(getCompletionTimeout());
}
}
catch (MqttException ex) {
throw new MessagingException("Failed to subscribe to topic " + topic, ex);
Expand All @@ -230,7 +239,9 @@ public void addTopic(String topic, int qos) {
public void removeTopic(String... topic) {
this.topicLock.lock();
try {
this.mqttClient.unsubscribe(topic).waitForCompletion(getCompletionTimeout());
if (this.mqttClient != null && this.mqttClient.isConnected()) {
this.mqttClient.unsubscribe(topic).waitForCompletion(getCompletionTimeout());
}
super.removeTopic(topic);
}
catch (MqttException ex) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.List;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -55,6 +56,7 @@
/**
* @author Gary Russell
* @author Artem Bilan
* @author Mikhail Polivakha
*
* @since 5.5.5
*
Expand All @@ -73,6 +75,18 @@ public class Mqttv5BackToBackTests implements MosquittoContainerTest {
@Autowired
private Config config;

@Test //GH-3732
public void testNoNpeIsNotThrownInCaseDoInitIsNotInvokedBeforeTopicAddition() {
Mqttv5PahoMessageDrivenChannelAdapter channelAdapter = new Mqttv5PahoMessageDrivenChannelAdapter("tcp://mock-url.com:8091", "mock-client-id", "123");
Assertions.assertDoesNotThrow(() -> channelAdapter.addTopic("abc", 1));
}

@Test //GH-3732
public void testNoNpeIsNotThrownInCaseDoInitIsNotInvokedBeforeTopicRemoval() {
Mqttv5PahoMessageDrivenChannelAdapter channelAdapter = new Mqttv5PahoMessageDrivenChannelAdapter("tcp://mock-url.com:8091", "mock-client-id", "123");
Assertions.assertDoesNotThrow(() -> channelAdapter.removeTopic("abc"));
}

@Test
public void testSimpleMqttv5Interaction() {
String testPayload = "foo";
Expand Down

0 comments on commit c1d29ab

Please sign in to comment.