Skip to content

Commit

Permalink
Use session expiry from CONNECT message properties (#753)
Browse files Browse the repository at this point in the history
Leverage the existing global expiry harness introduced with #739, reading the expiration time from the CONNECT's MQTT property SESSION_EXPIRY_INTERVAL.
  • Loading branch information
andsel authored Jun 21, 2023
1 parent abab3b2 commit 0cab6e6
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 7 deletions.
18 changes: 16 additions & 2 deletions broker/src/main/java/io/moquette/broker/SessionRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttVersion;
import org.slf4j.Logger;
Expand Down Expand Up @@ -298,9 +299,22 @@ private Session createNewSession(MqttConnectMessage msg, String clientId) {
} else {
queue = new InMemoryQueue();
}
// in MQTT3 cleanSession = true means expiryInterval=0 else infinite
final int expiryInterval = clean ? 0 : globalExpirySeconds;
final int expiryInterval;
final MqttVersion mqttVersion = Utils.versionFromConnect(msg);
if (mqttVersion != MqttVersion.MQTT_5) {
// in MQTT3 cleanSession = true means expiryInterval=0 else infinite
expiryInterval = clean ? 0 : globalExpirySeconds;
} else {
final MqttProperties.MqttProperty<Integer> expiryIntervalProperty =
(MqttProperties.MqttProperty<Integer>) msg.variableHeader().properties()
.getProperty(MqttProperties.MqttPropertyType.SESSION_EXPIRY_INTERVAL.value());
if (expiryIntervalProperty != null) {
expiryInterval = expiryIntervalProperty.value();
} else {
// the connect doesn't provide any expiry, fallback to global expiry
expiryInterval = clean ? 0 : globalExpirySeconds;
}
}
final ISessionsRepository.SessionData sessionData = new ISessionsRepository.SessionData(clientId,
mqttVersion, expiryInterval, clock);
if (msg.variableHeader().isWillFlag()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.moquette.broker;

import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttVersion;
import org.awaitility.Awaitility;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class SessionRegistryMQTT5Test extends SessionRegistryTest {
private static final Logger LOG = LoggerFactory.getLogger(SessionRegistryMQTT5Test.class);

@Test
public void givenSessionWithConnectionExpireTimeWhenAfterExpirationIsPassedThenSessionIsRemoved() {
LOG.info("givenSessionWithExpireTimeWhenAfterExpirationIsPassedThenSessionIsRemoved");

// insert a not clean session that should expire in connect selected expiry time
final String clientId = "client_to_be_removed";
final MqttProperties connectProperties = new MqttProperties();
int customExpirySeconds = 60;
connectProperties.add(new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.SESSION_EXPIRY_INTERVAL.value(), customExpirySeconds));
final MqttConnectMessage connectMessage = MqttMessageBuilders.connect()
.protocolVersion(MqttVersion.MQTT_5)
.cleanSession(false)
.properties(connectProperties)
.build();
final SessionRegistry.SessionCreationResult res = sut.createOrReopenSession(connectMessage, clientId, "User");
assertEquals(SessionRegistry.CreationModeEnum.CREATED_CLEAN_NEW, res.mode, "Not clean session must be created");

// remove it, so that it's tracked in the inner delay queue
sut.connectionClosed(res.session);
assertEquals(1, sessionRepository.list().size(), "Not clean session must be persisted");

// move time forward
Duration moreThenSessionExpiration = Duration.ofSeconds(customExpirySeconds).plusSeconds(10);
slidingClock.forward(moreThenSessionExpiration);

// check the session has been removed
Awaitility
.await()
.atMost(3 * SessionRegistry.EXPIRED_SESSION_CLEANER_TASK_INTERVAL.toMillis(), TimeUnit.MILLISECONDS)
.until(sessionsList(), Matchers.empty());
}
}
10 changes: 5 additions & 5 deletions broker/src/test/java/io/moquette/broker/SessionRegistryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ public class SessionRegistryTest {

private MQTTConnection connection;
private EmbeddedChannel channel;
private SessionRegistry sut;
private MqttMessageBuilders.ConnectBuilder connMsg;
protected SessionRegistry sut;
protected MqttMessageBuilders.ConnectBuilder connMsg;
private static final BrokerConfiguration ALLOW_ANONYMOUS_AND_ZEROBYTE_CLIENT_ID =
new BrokerConfiguration(true, true, false, NO_BUFFER_FLUSH);
private MemoryQueueRepository queueRepository;
private ScheduledExecutorService scheduler;
private final Clock pointInTimeFixedClock = Clock.fixed(Instant.parse("2023-03-26T18:09:30.00Z"), ZoneId.of("Europe/Rome"));
private ForwardableClock slidingClock = new ForwardableClock(pointInTimeFixedClock);
private ISessionsRepository sessionRepository;
protected ForwardableClock slidingClock = new ForwardableClock(pointInTimeFixedClock);
protected ISessionsRepository sessionRepository;

@BeforeEach
public void setUp() {
Expand Down Expand Up @@ -317,7 +317,7 @@ public void givenSessionThatExpiresWhenReopenIsNotAnymoreTrackedForExpiration()
.until(sessionsList(), Matchers.not(Matchers.empty()));
}

private Callable<Collection<ISessionsRepository.SessionData>> sessionsList() {
protected Callable<Collection<ISessionsRepository.SessionData>> sessionsList() {
return () -> sessionRepository.list();
}
}

0 comments on commit 0cab6e6

Please sign in to comment.