diff --git a/ChangeLog.txt b/ChangeLog.txt index 5461e1436..7d747c666 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,4 +1,5 @@ Version 0.17-SNAPSHOT: + [fix] resolved issue #633 of bad perfomance when adding many subscriptions to few topics, resolved in #758. [fix] resolved issue #629 that originated from subscription trees wide and flat, resolved in #630 [dependency] updated Netty to 4.1.93 and tcnative to 2.0.61 (#755) [feature] add saved session expiry configurable through the `persistent_client_expiration` setting (#739). diff --git a/broker/src/main/java/io/moquette/broker/PostOffice.java b/broker/src/main/java/io/moquette/broker/PostOffice.java index d56a0bfd8..fa2631f2a 100644 --- a/broker/src/main/java/io/moquette/broker/PostOffice.java +++ b/broker/src/main/java/io/moquette/broker/PostOffice.java @@ -455,7 +455,7 @@ public int countBatches() { private RoutingResults publish2Subscribers(ByteBuf payload, Topic topic, MqttQoS publishingQos, Set filterTargetClients) { - Set topicMatchingSubscriptions = subscriptions.matchQosSharpening(topic); + List topicMatchingSubscriptions = subscriptions.matchQosSharpening(topic); if (topicMatchingSubscriptions.isEmpty()) { // no matching subscriptions, clean exit LOG.trace("No matching subscriptions for topic: {}", topic); diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java b/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java index b7ce922de..be2020d9f 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java @@ -21,18 +21,18 @@ class CNode implements Comparable { private final Token token; private final List children; - Set subscriptions; + List subscriptions; CNode(Token token) { this.children = new ArrayList<>(); - this.subscriptions = new HashSet<>(); + this.subscriptions = new ArrayList<>(); this.token = token; } //Copy constructor - private CNode(Token token, List children, Set subscriptions) { + private CNode(Token token, List children, List subscriptions) { this.token = token; // keep reference, root comparison in directory logic relies on it for now. - this.subscriptions = new HashSet<>(subscriptions); + this.subscriptions = new ArrayList<>(subscriptions); this.children = new ArrayList<>(children); } @@ -86,16 +86,15 @@ public void remove(INode node) { CNode addSubscription(Subscription newSubscription) { // if already contains one with same topic and same client, keep that with higher QoS - if (subscriptions.contains(newSubscription)) { - final Subscription existing = subscriptions.stream() - .filter(s -> s.equals(newSubscription)) - .findFirst().get(); + int idx = Collections.binarySearch(subscriptions, newSubscription); + if (idx >= 0) { + // Subscription already exists + final Subscription existing = subscriptions.get(idx); if (existing.getRequestedQos().value() < newSubscription.getRequestedQos().value()) { - subscriptions.remove(existing); - subscriptions.add(new Subscription(newSubscription)); + subscriptions.set(idx, newSubscription); } } else { - this.subscriptions.add(new Subscription(newSubscription)); + this.subscriptions.add(-1 - idx, new Subscription(newSubscription)); } return this; } diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java b/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java index 7dbe006c5..c8669da1f 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java @@ -1,7 +1,9 @@ package io.moquette.broker.subscriptions; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Optional; import java.util.Set; @@ -64,24 +66,24 @@ private NavigationAction evaluate(Topic topic, CNode cnode) { return NavigationAction.GODEEP; } - public Set recursiveMatch(Topic topic) { + public List recursiveMatch(Topic topic) { return recursiveMatch(topic, this.root); } - private Set recursiveMatch(Topic topic, INode inode) { + private List recursiveMatch(Topic topic, INode inode) { CNode cnode = inode.mainNode(); if (cnode instanceof TNode) { - return Collections.emptySet(); + return Collections.emptyList(); } NavigationAction action = evaluate(topic, cnode); if (action == NavigationAction.MATCH) { return cnode.subscriptions; } if (action == NavigationAction.STOP) { - return Collections.emptySet(); + return Collections.emptyList(); } Topic remainingTopic = (ROOT.equals(cnode.getToken())) ? topic : topic.exceptHeadToken(); - Set subscriptions = new HashSet<>(); + List subscriptions = new ArrayList<>(); // We should only consider the maximum three children children of // type #, + or exact match diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java b/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java index 00673538b..5ce0f32ee 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java @@ -76,13 +76,13 @@ Optional lookup(Topic topic) { * @return the list of matching subscriptions, or empty if not matching. */ @Override - public Set matchWithoutQosSharpening(Topic topic) { + public List matchWithoutQosSharpening(Topic topic) { return ctrie.recursiveMatch(topic); } @Override - public Set matchQosSharpening(Topic topic) { - final Set subscriptions = matchWithoutQosSharpening(topic); + public List matchQosSharpening(Topic topic) { + final List subscriptions = matchWithoutQosSharpening(topic); Map subsGroupedByClient = new HashMap<>(); for (Subscription sub : subscriptions) { @@ -92,7 +92,7 @@ public Set matchQosSharpening(Topic topic) { subsGroupedByClient.put(sub.clientId, sub); } } - return new HashSet<>(subsGroupedByClient.values()); + return new ArrayList<>(subsGroupedByClient.values()); } @Override diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java b/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java index 97f320011..a524e5504 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java @@ -26,9 +26,9 @@ public interface ISubscriptionsDirectory { Set listAllSessionIds(); - Set matchWithoutQosSharpening(Topic topic); + List matchWithoutQosSharpening(Topic topic); - Set matchQosSharpening(Topic topic); + List matchQosSharpening(Topic topic); void add(Subscription newSubscription); diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java index da47792eb..e0c27993d 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java @@ -42,6 +42,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Collections.singleton; import static java.util.Collections.singletonMap; +import java.util.List; import static org.junit.jupiter.api.Assertions.*; public class PostOfficeInternalPublishTest { @@ -337,7 +338,7 @@ protected void subscribe(MQTTConnection connection, String topic, MqttQoS desire final String clientId = connection.getClientId(); Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), desiredQos); - final Set matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); + final List matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); assertEquals(1, matchedSubscriptions.size()); final Subscription onlyMatchedSubscription = matchedSubscriptions.iterator().next(); assertEquals(expectedSubscription, onlyMatchedSubscription); diff --git a/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java b/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java index 4b95ee531..3a6355f6f 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java @@ -196,7 +196,7 @@ protected void subscribe(MQTTConnection connection, String topic, MqttQoS desire final String clientId = connection.getClientId(); Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), desiredQos); - final Set matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); + final List matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); assertEquals(1, matchedSubscriptions.size()); final Subscription onlyMatchedSubscription = matchedSubscriptions.iterator().next(); assertEquals(expectedSubscription, onlyMatchedSubscription); diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java index 8e4d4134a..dd97da478 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java @@ -144,7 +144,7 @@ protected void subscribe(EmbeddedChannel channel, String topic, MqttQoS desiredQ final String clientId = NettyUtils.clientID(channel); Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), desiredQos); - final Set matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); + final List matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); assertEquals(1, matchedSubscriptions.size()); final Subscription onlyMatchedSubscription = matchedSubscriptions.iterator().next(); assertEquals(expectedSubscription, onlyMatchedSubscription); @@ -164,7 +164,7 @@ protected void subscribe(MQTTConnection connection, String topic, MqttQoS desire final String clientId = connection.getClientId(); Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), desiredQos); - final Set matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); + final List matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); assertEquals(1, matchedSubscriptions.size()); final Subscription onlyMatchedSubscription = matchedSubscriptions.iterator().next(); assertEquals(expectedSubscription, onlyMatchedSubscription); diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java index 3e407d24a..91df197b5 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java @@ -41,6 +41,7 @@ import static io.moquette.broker.PostOfficePublishTest.PUBLISHER_ID; import static io.netty.handler.codec.mqtt.MqttQoS.*; import static java.util.Collections.*; +import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -123,7 +124,7 @@ protected void subscribe(MQTTConnection connection, String topic, MqttQoS desire final String clientId = connection.getClientId(); Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), desiredQos); - final Set matchedSubscriptions = subscriptions.matchQosSharpening(new Topic(topic)); + final List matchedSubscriptions = subscriptions.matchQosSharpening(new Topic(topic)); assertEquals(1, matchedSubscriptions.size()); //assertTrue(matchedSubscriptions.size() >=1); final Subscription onlyMatchedSubscription = matchedSubscriptions.iterator().next(); diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java index 567394cbd..7dfc9104f 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java @@ -16,8 +16,10 @@ package io.moquette.broker.subscriptions; import static io.moquette.broker.subscriptions.Topic.asTopic; +import io.netty.handler.codec.mqtt.MqttQoS; import java.util.ArrayList; import java.util.List; +import java.util.Random; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.slf4j.Logger; @@ -35,6 +37,13 @@ static Subscription clientSubOnTopic(String clientID, String topicName) { return new Subscription(clientID, asTopic(topicName), null); } + @Test + @Timeout(value = MAX_DURATION_S) + public void testManyClientsFewTopics() { + List subscriptionList = prepareSubscriptionsManyClientsFewTopic(); + createSubscriptions(subscriptionList); + } + @Test @Timeout(value = MAX_DURATION_S) public void testFlat() { @@ -73,6 +82,15 @@ public void createSubscriptions(List results) { LOGGER.info("Added " + count + " subscriptions in " + duration + " ms (" + Math.round(1000.0 * count / duration) + "/s)"); } + public List prepareSubscriptionsManyClientsFewTopic() { + List subscriptionList = new ArrayList<>(TOTAL_SUBSCRIPTIONS); + for (int i = 0; i < TOTAL_SUBSCRIPTIONS; i++) { + Topic topic = asTopic("topic/test/" + new Random().nextInt(1 + i % 10) + "/test"); + subscriptionList.add(new Subscription("TestClient-" + i, topic, MqttQoS.AT_LEAST_ONCE)); + } + return subscriptionList; + } + public List prepareSubscriptionsFlat() { List results = new ArrayList<>(TOTAL_SUBSCRIPTIONS); int count = 0; diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java index a3a57a034..6deddc6c0 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java @@ -27,6 +27,7 @@ import static io.moquette.broker.subscriptions.CTrieTest.clientSubOnTopic; import static io.moquette.broker.subscriptions.Topic.asTopic; +import java.util.List; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -203,7 +204,7 @@ public void testOverlappingSubscriptions() { sut.add(specificSub); //Exercise - final Set matchingForSpecific = sut.matchQosSharpening(asTopic("a/b")); + final List matchingForSpecific = sut.matchQosSharpening(asTopic("a/b")); // Verify assertThat(matchingForSpecific.size()).isEqualTo(1); @@ -234,7 +235,7 @@ public void removeSubscription_sameClients_subscribedSameTopic() { sut.removeSubscription(asTopic("/topic"), slashSub.clientId); // Verify - final Set matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("/topic")); + final List matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("/topic")); assertThat(matchingSubscriptions).isEmpty(); } @@ -252,7 +253,7 @@ public void duplicatedSubscriptionsWithDifferentQos() { this.sut.add(client1SubQoS2); // Verify - Set subscriptions = this.sut.matchQosSharpening(asTopic("client/test/b")); + List subscriptions = this.sut.matchQosSharpening(asTopic("client/test/b")); assertThat(subscriptions).contains(client1SubQoS2); assertThat(subscriptions).contains(client2Sub); diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java index 4b7764b14..88aa6394e 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java @@ -24,6 +24,7 @@ import java.util.Set; import static io.moquette.broker.subscriptions.Topic.asTopic; +import java.util.List; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -94,7 +95,7 @@ public void testAddNewSubscriptionOnExistingNode() { //Verify final Optional matchedNode = sut.lookup(asTopic("/temp")); assertTrue(matchedNode.isPresent(), "Node on path /temp must be present"); - final Set subscriptions = matchedNode.get().subscriptions; + final List subscriptions = matchedNode.get().subscriptions; assertTrue(subscriptions.contains(newSubscription)); } @@ -109,7 +110,7 @@ public void testAddNewDeepNodes() { //Verify final Optional matchedNode = sut.lookup(asTopic("/italy/happiness")); assertTrue(matchedNode.isPresent(), "Node on path /italy/happiness must be present"); - final Set subscriptions = matchedNode.get().subscriptions; + final List subscriptions = matchedNode.get().subscriptions; assertTrue(subscriptions.contains(happinessSensor)); } @@ -176,7 +177,7 @@ public void givenTreeWithSomeNodeHierarchyWhenRemoveContainedSubscriptionThenNod sut.removeFromTree(asTopic("/temp/1"), "TempSensor1"); sut.removeFromTree(asTopic("/temp/1"), "TempSensor1"); - final Set matchingSubs = sut.recursiveMatch(asTopic("/temp/2")); + final List matchingSubs = sut.recursiveMatch(asTopic("/temp/2")); //Verify final Subscription expectedMatchingsub = new Subscription("TempSensor1", asTopic("/temp/2"), MqttQoS.AT_MOST_ONCE); @@ -191,8 +192,8 @@ public void givenTreeWithSomeNodeHierarchWhenRemoveContainedSubscriptionSmallerT //Exercise sut.removeFromTree(asTopic("/temp"), "TempSensor1"); - final Set matchingSubs1 = sut.recursiveMatch(asTopic("/temp/1")); - final Set matchingSubs2 = sut.recursiveMatch(asTopic("/temp/2")); + final List matchingSubs1 = sut.recursiveMatch(asTopic("/temp/1")); + final List matchingSubs2 = sut.recursiveMatch(asTopic("/temp/2")); //Verify // not clear to me, but I believe /temp unsubscribe should not unsub you from downstream /temp/1 or /temp/2 @@ -218,7 +219,7 @@ public void testMatchSubscriptionNoWildcards() { sut.addToTree(clientSubOnTopic("TempSensor1", "/temp")); //Exercise - final Set matchingSubs = sut.recursiveMatch(asTopic("/temp")); + final List matchingSubs = sut.recursiveMatch(asTopic("/temp")); //Verify final Subscription expectedMatchingsub = new Subscription("TempSensor1", asTopic("/temp"), MqttQoS.AT_MOST_ONCE); @@ -231,8 +232,8 @@ public void testRemovalInnerTopicOffRootSameClient() { sut.addToTree(clientSubOnTopic("TempSensor1", "temp/1")); //Exercise - final Set matchingSubs1 = sut.recursiveMatch(asTopic("temp")); - final Set matchingSubs2 = sut.recursiveMatch(asTopic("temp/1")); + final List matchingSubs1 = sut.recursiveMatch(asTopic("temp")); + final List matchingSubs2 = sut.recursiveMatch(asTopic("temp/1")); //Verify final Subscription expectedMatchingsub1 = new Subscription("TempSensor1", asTopic("temp"), MqttQoS.AT_MOST_ONCE); @@ -244,8 +245,8 @@ public void testRemovalInnerTopicOffRootSameClient() { sut.removeFromTree(asTopic("temp"), "TempSensor1"); //Exercise - final Set matchingSubs3 = sut.recursiveMatch(asTopic("temp")); - final Set matchingSubs4 = sut.recursiveMatch(asTopic("temp/1")); + final List matchingSubs3 = sut.recursiveMatch(asTopic("temp")); + final List matchingSubs4 = sut.recursiveMatch(asTopic("temp/1")); assertThat(matchingSubs3).doesNotContain(expectedMatchingsub1); assertThat(matchingSubs4).contains(expectedMatchingsub2); @@ -257,8 +258,8 @@ public void testRemovalInnerTopicOffRootDiffClient() { sut.addToTree(clientSubOnTopic("TempSensor2", "temp/1")); //Exercise - final Set matchingSubs1 = sut.recursiveMatch(asTopic("temp")); - final Set matchingSubs2 = sut.recursiveMatch(asTopic("temp/1")); + final List matchingSubs1 = sut.recursiveMatch(asTopic("temp")); + final List matchingSubs2 = sut.recursiveMatch(asTopic("temp/1")); //Verify final Subscription expectedMatchingsub1 = new Subscription("TempSensor1", asTopic("temp"), MqttQoS.AT_MOST_ONCE); @@ -270,8 +271,8 @@ public void testRemovalInnerTopicOffRootDiffClient() { sut.removeFromTree(asTopic("temp"), "TempSensor1"); //Exercise - final Set matchingSubs3 = sut.recursiveMatch(asTopic("temp")); - final Set matchingSubs4 = sut.recursiveMatch(asTopic("temp/1")); + final List matchingSubs3 = sut.recursiveMatch(asTopic("temp")); + final List matchingSubs4 = sut.recursiveMatch(asTopic("temp/1")); assertThat(matchingSubs3).doesNotContain(expectedMatchingsub1); assertThat(matchingSubs4).contains(expectedMatchingsub2); @@ -283,8 +284,8 @@ public void testRemovalOuterTopicOffRootDiffClient() { sut.addToTree(clientSubOnTopic("TempSensor2", "temp/1")); //Exercise - final Set matchingSubs1 = sut.recursiveMatch(asTopic("temp")); - final Set matchingSubs2 = sut.recursiveMatch(asTopic("temp/1")); + final List matchingSubs1 = sut.recursiveMatch(asTopic("temp")); + final List matchingSubs2 = sut.recursiveMatch(asTopic("temp/1")); //Verify final Subscription expectedMatchingsub1 = new Subscription("TempSensor1", asTopic("temp"), MqttQoS.AT_MOST_ONCE); @@ -296,8 +297,8 @@ public void testRemovalOuterTopicOffRootDiffClient() { sut.removeFromTree(asTopic("temp/1"), "TempSensor2"); //Exercise - final Set matchingSubs3 = sut.recursiveMatch(asTopic("temp")); - final Set matchingSubs4 = sut.recursiveMatch(asTopic("temp/1")); + final List matchingSubs3 = sut.recursiveMatch(asTopic("temp")); + final List matchingSubs4 = sut.recursiveMatch(asTopic("temp/1")); assertThat(matchingSubs3).contains(expectedMatchingsub1); assertThat(matchingSubs4).doesNotContain(expectedMatchingsub2);