diff --git a/broker/src/main/java/io/moquette/broker/IQueueRepository.java b/broker/src/main/java/io/moquette/broker/IQueueRepository.java index 602949f23..9ccadc334 100644 --- a/broker/src/main/java/io/moquette/broker/IQueueRepository.java +++ b/broker/src/main/java/io/moquette/broker/IQueueRepository.java @@ -1,8 +1,11 @@ package io.moquette.broker; +import java.util.Map; import java.util.Queue; public interface IQueueRepository { Queue createQueue(String cli, boolean clean); + + Map> listAllQueues(); } diff --git a/broker/src/main/java/io/moquette/broker/MemoryQueueRepository.java b/broker/src/main/java/io/moquette/broker/MemoryQueueRepository.java index 1b8405297..27d75bbe9 100644 --- a/broker/src/main/java/io/moquette/broker/MemoryQueueRepository.java +++ b/broker/src/main/java/io/moquette/broker/MemoryQueueRepository.java @@ -1,12 +1,24 @@ package io.moquette.broker; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; public class MemoryQueueRepository implements IQueueRepository { + private Map> queues = new HashMap<>(); + @Override public Queue createQueue(String cli, boolean clean) { - return new ConcurrentLinkedQueue<>(); + final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + queues.put(cli, queue); + return queue; + } + + @Override + public Map> listAllQueues() { + return Collections.unmodifiableMap(queues); } } diff --git a/broker/src/main/java/io/moquette/broker/Session.java b/broker/src/main/java/io/moquette/broker/Session.java index 55c1c903d..e6e92feca 100644 --- a/broker/src/main/java/io/moquette/broker/Session.java +++ b/broker/src/main/java/io/moquette/broker/Session.java @@ -88,7 +88,7 @@ static final class Will { private final String clientId; private boolean clean; private Will will; - private Queue sessionQueue; + private final Queue sessionQueue; private final AtomicReference status = new AtomicReference<>(SessionStatus.DISCONNECTED); private MQTTConnection mqttConnection; private List subscriptions = new ArrayList<>(); @@ -103,6 +103,9 @@ static final class Will { } Session(String clientId, boolean clean, Queue sessionQueue) { + if (sessionQueue == null) { + throw new IllegalArgumentException("sessionQueue parameter can't be null"); + } this.clientId = clientId; this.clean = clean; this.sessionQueue = sessionQueue; diff --git a/broker/src/main/java/io/moquette/broker/SessionRegistry.java b/broker/src/main/java/io/moquette/broker/SessionRegistry.java index d86cedb79..903f9af35 100644 --- a/broker/src/main/java/io/moquette/broker/SessionRegistry.java +++ b/broker/src/main/java/io/moquette/broker/SessionRegistry.java @@ -28,6 +28,8 @@ import java.net.InetSocketAddress; import java.util.Collection; +import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; @@ -56,7 +58,7 @@ static final class PubRelMarker extends EnqueuedMessage { } public enum CreationModeEnum { - CREATED_CLEAN_NEW, REOPEN_EXISTING, DROP_EXISTING; + CREATED_CLEAN_NEW, REOPEN_EXISTING, DROP_EXISTING } public static class SessionCreationResult { @@ -86,6 +88,24 @@ public SessionCreationResult(Session session, CreationModeEnum mode, boolean alr this.subscriptionsDirectory = subscriptionsDirectory; this.queueRepository = queueRepository; this.authorizator = authorizator; + reloadPersistentQueues(); + recreateSessionPool(); + } + + private void reloadPersistentQueues() { + final Map> persistentQueues = queueRepository.listAllQueues(); + persistentQueues.forEach(queues::put); + } + + private void recreateSessionPool() { + for (String clientId : subscriptionsDirectory.listAllSessionIds()) { + // if the subscriptions are present is obviously false + final Queue persistentQueue = queues.get(clientId); + if (persistentQueue != null) { + Session rehydrated = new Session(clientId, false, persistentQueue); + pool.put(clientId, rehydrated); + } + } } SessionCreationResult createOrReopenSession(MqttConnectMessage msg, String clientId, String username) { diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java b/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java index 3dc71c198..0fbe725d8 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java @@ -49,6 +49,19 @@ public void init(ISubscriptionsRepository subscriptionsRepository) { } } + /** + * @return the list of client ids that has a subscription stored. + */ + @Override + public Set listAllSessionIds() { + final List subscriptions = subscriptionsRepository.listAllSubscriptions(); + final Set clientIds = new HashSet<>(subscriptions.size()); + for (Subscription subscription : subscriptions) { + clientIds.add(subscription.clientId); + } + return clientIds; + } + Optional lookup(Topic topic) { return ctrie.lookup(topic); } diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java b/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java index 9f67db2d0..97f320011 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java @@ -24,6 +24,8 @@ public interface ISubscriptionsDirectory { void init(ISubscriptionsRepository sessionsRepository); + Set listAllSessionIds(); + Set matchWithoutQosSharpening(Topic topic); Set matchQosSharpening(Topic topic); diff --git a/broker/src/main/java/io/moquette/persistence/H2QueueRepository.java b/broker/src/main/java/io/moquette/persistence/H2QueueRepository.java index 84e940ca3..8264cb162 100644 --- a/broker/src/main/java/io/moquette/persistence/H2QueueRepository.java +++ b/broker/src/main/java/io/moquette/persistence/H2QueueRepository.java @@ -1,9 +1,11 @@ package io.moquette.persistence; import io.moquette.broker.IQueueRepository; -import io.moquette.broker.SessionRegistry; +import io.moquette.broker.SessionRegistry.EnqueuedMessage; import org.h2.mvstore.MVStore; +import java.util.HashMap; +import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @@ -16,10 +18,20 @@ public H2QueueRepository(MVStore mvStore) { } @Override - public Queue createQueue(String cli, boolean clean) { + public Queue createQueue(String cli, boolean clean) { if (!clean) { return new H2PersistentQueue<>(mvStore, cli); } return new ConcurrentLinkedQueue<>(); } + + @Override + public Map> listAllQueues() { + Map> result = new HashMap<>(); + mvStore.getMapNames().stream() + .filter(name -> name.startsWith("queue_") && !name.endsWith("_meta")) + .map(name -> name.substring("queue_".length())) + .forEach(name -> result.put(name, new H2PersistentQueue(mvStore, name))); + return result; + } } diff --git a/broker/src/test/java/io/moquette/integration/ServerIntegrationPahoTest.java b/broker/src/test/java/io/moquette/integration/ServerIntegrationPahoTest.java index 9154aabbd..2596cdfbf 100644 --- a/broker/src/test/java/io/moquette/integration/ServerIntegrationPahoTest.java +++ b/broker/src/test/java/io/moquette/integration/ServerIntegrationPahoTest.java @@ -151,7 +151,6 @@ public void checkSubscribersGetCorrectQosNotifications() throws Exception { @Test public void testSubcriptionDoesntStayActiveAfterARestart() throws Exception { - LOG.info("*** testSubcriptionDoesntStayActiveAfterARestart ***"); // clientForSubscribe1 connect and subscribe to /topic QoS2 MqttClientPersistence dsSubscriberA = new MqttDefaultFilePersistence( IntegrationUtils.newFolder(tempFolder, "clientForSubscribe1").getAbsolutePath()); diff --git a/broker/src/test/java/io/moquette/integration/ServerIntegrationRestartTest.java b/broker/src/test/java/io/moquette/integration/ServerIntegrationRestartTest.java index f7a55cfc0..57798abe5 100644 --- a/broker/src/test/java/io/moquette/integration/ServerIntegrationRestartTest.java +++ b/broker/src/test/java/io/moquette/integration/ServerIntegrationRestartTest.java @@ -25,6 +25,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -93,6 +94,27 @@ public void tearDown() throws Exception { m_server.stopServer(); } + @DisplayName("given not clean session then after a server restart the session should be present") + @Test + public void testNotCleanSessionIsVisibleAfterServerRestart() throws Exception { + m_subscriber.connect(CLEAN_SESSION_OPT); + m_subscriber.subscribe("/topic", 1); + m_subscriber.disconnect(); + + m_server.stopServer(); + m_server.startServer(IntegrationUtils.prepareTestProperties(dbPath)); + + //publish a message + m_publisher.connect(); + m_publisher.publish("/topic", "Hello world MQTT!!".getBytes(UTF_8), 1, false); + + //reconnect subscriber and topic should be sent + m_subscriber.connect(CLEAN_SESSION_OPT); + // verify the sent message while offline is read + MqttMessage msg = m_messageCollector.waitMessage(1); + assertEquals("Hello world MQTT!!", new String(msg.getPayload(), UTF_8)); + } + @Test public void checkRestartCleanSubscriptionTree() throws Exception { // subscribe to /topic