Skip to content

Commit

Permalink
Simplified if-conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
andsel committed Nov 7, 2020
1 parent c21fc5f commit b38542d
Showing 1 changed file with 36 additions and 37 deletions.
73 changes: 36 additions & 37 deletions broker/src/main/java/io/moquette/broker/SessionRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,11 @@ static final class PubRelMarker extends EnqueuedMessage {
}

public enum CreationModeEnum {
NOT_CREATED, CREATED_CLEAN_NEW, REOPEN_EXISTING, DROP_EXISTING;
CREATED_CLEAN_NEW, REOPEN_EXISTING, DROP_EXISTING;
}

public static class SessionCreationResult {

final static SessionCreationResult NONE = new SessionCreationResult(null, CreationModeEnum.NOT_CREATED, false);

final Session session;
final CreationModeEnum mode;
final boolean alreadyStored;
Expand Down Expand Up @@ -117,48 +115,49 @@ private SessionCreationResult reopenExistingSession(MqttConnectMessage msg, Stri
Session newSession, String username) {
final boolean newIsClean = msg.variableHeader().isCleanSession();
final Session oldSession = pool.get(clientId);
SessionCreationResult creationResult = SessionCreationResult.NONE;
if (newIsClean && oldSession.disconnected()) {
// case 2
dropQueuesForClient(clientId);
unsubscribe(oldSession);

// publish new session
boolean result = oldSession.assignState(SessionStatus.DISCONNECTED, SessionStatus.CONNECTING);
if (!result) {
throw new SessionCorruptedException("old session was already changed state");
}
copySessionConfig(msg, oldSession);
LOG.trace("case 2, oldSession with same CId {} disconnected", clientId);
creationResult = new SessionCreationResult(oldSession, CreationModeEnum.CREATED_CLEAN_NEW, true);
} else if (!newIsClean && oldSession.disconnected()) {
final boolean connecting = oldSession.assignState(SessionStatus.DISCONNECTED, SessionStatus.CONNECTING);
if (!connecting) {
throw new SessionCorruptedException("old session moved in connected state by other thread");
final SessionCreationResult creationResult;
if (oldSession.disconnected()) {
if (newIsClean) {
boolean result = oldSession.assignState(SessionStatus.DISCONNECTED, SessionStatus.CONNECTING);
if (!result) {
throw new SessionCorruptedException("old session was already changed state");
}

// case 2
// publish new session
dropQueuesForClient(clientId);
unsubscribe(oldSession);
copySessionConfig(msg, oldSession);

LOG.trace("case 2, oldSession with same CId {} disconnected", clientId);
creationResult = new SessionCreationResult(oldSession, CreationModeEnum.CREATED_CLEAN_NEW, true);
} else {
final boolean connecting = oldSession.assignState(SessionStatus.DISCONNECTED, SessionStatus.CONNECTING);
if (!connecting) {
throw new SessionCorruptedException("old session moved in connected state by other thread");
}
// case 3
reactivateSubscriptions(oldSession, username);

LOG.trace("case 3, oldSession with same CId {} disconnected", clientId);
creationResult = new SessionCreationResult(oldSession, CreationModeEnum.REOPEN_EXISTING, true);
}
// case 3
reactivateSubscriptions(oldSession, username);

LOG.trace("case 3, oldSession with same CId {} disconnected", clientId);
creationResult = new SessionCreationResult(oldSession, CreationModeEnum.REOPEN_EXISTING, true);
} else if (oldSession.connected()) {
} else {
// case 4
LOG.trace("case 4, oldSession with same CId {} still connected, force to close", clientId);
oldSession.closeImmediately();
//remove(clientId);
creationResult = new SessionCreationResult(newSession, CreationModeEnum.DROP_EXISTING, true);
}

if (creationResult != SessionCreationResult.NONE) {
final boolean published;
if (creationResult.mode == CreationModeEnum.DROP_EXISTING) {
published = pool.replace(clientId, oldSession, newSession);
} else {
published = pool.replace(clientId, oldSession, oldSession);
}
if (!published) {
throw new SessionCorruptedException("old session was already removed");
}
final boolean published;
if (creationResult.mode == CreationModeEnum.DROP_EXISTING) {
published = pool.replace(clientId, oldSession, newSession);
} else {
published = pool.replace(clientId, oldSession, oldSession);
}
if (!published) {
throw new SessionCorruptedException("old session was already removed");
}

// case not covered new session is clean true/false and old session not in CONNECTED/DISCONNECTED
Expand Down

0 comments on commit b38542d

Please sign in to comment.