Skip to content

Commit

Permalink
Created adapter layer to use Segmented queues (#704)
Browse files Browse the repository at this point in the history
Wraps the segmented queues into session queues. This storage engine could be enabled with configuration `persistent_queue_type` setting, which accepts `h2` or segmented`, defaulting to `segmented` if option is not specified.
Added close method to session registry to flush all the meta information that compose checkpoint file of segmented queues.
  • Loading branch information
andsel authored Jan 21, 2023
1 parent f520c82 commit b8438e6
Show file tree
Hide file tree
Showing 16 changed files with 310 additions and 24 deletions.
2 changes: 2 additions & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
Version 0.17-SNAPSHOT:
[enhancement] introduced new queue implementation based on segments in memory mapped files. The type of queue implementation
could be selected by setting `persistent_queue_type` (#691, #704).

Version 0.16:
[build] drop generation of broker-test, removed distribution and embedding_moquette modules from deploy phase (#616)
Expand Down
1 change: 1 addition & 0 deletions broker/src/main/java/io/moquette/BrokerConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public final class BrokerConstants {
public static final String INTERCEPT_HANDLER_PROPERTY_NAME = "intercept.handler";
public static final String BROKER_INTERCEPTOR_THREAD_POOL_SIZE = "intercept.thread_pool.size";
public static final String PERSISTENT_STORE_PROPERTY_NAME = "persistent_store";
public static final String PERSISTENT_QUEUE_TYPE_PROPERTY_NAME = "persistent_queue_type"; // h2 or segmented, default h2
public static final String AUTOSAVE_INTERVAL_PROPERTY_NAME = "autosave_interval";
public static final String PASSWORD_FILE_PROPERTY_NAME = "password_file";
public static final String PORT_PROPERTY_NAME = "port";
Expand Down
5 changes: 4 additions & 1 deletion broker/src/main/java/io/moquette/broker/DebugUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
public final class DebugUtils {

public static String payload2Str(ByteBuf content) {
return content.copy().toString(StandardCharsets.UTF_8);
final int readerPin = content.readableBytes();
final String result = content.toString(StandardCharsets.UTF_8);
content.readerIndex(readerPin);
return result;
}

private DebugUtils() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.moquette.broker;

import java.util.Queue;
import java.util.Set;

public interface IQueueRepository {
Expand All @@ -10,4 +9,6 @@ public interface IQueueRepository {
boolean containsQueue(String clientId);

SessionMessageQueue<SessionRegistry.EnqueuedMessage> getOrCreateQueue(String clientId);

void close();
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.moquette.broker;

import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class MemoryQueueRepository implements IQueueRepository {

Expand All @@ -28,6 +30,11 @@ public SessionMessageQueue<SessionRegistry.EnqueuedMessage> getOrCreateQueue(Str
return queue;
}

@Override
public void close() {
queues.clear();
}

void dropQueue(String queueName) {
queues.remove(queueName);
}
Expand Down
40 changes: 35 additions & 5 deletions broker/src/main/java/io/moquette/broker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
import io.moquette.broker.security.IAuthorizatorPolicy;
import io.moquette.broker.security.PermitAllAuthorizatorPolicy;
import io.moquette.broker.security.ResourceAuthenticator;
import io.moquette.broker.unsafequeues.QueueException;
import io.moquette.interception.InterceptHandler;
import io.moquette.persistence.H2Builder;
import io.moquette.persistence.MemorySubscriptionsRepository;
import io.moquette.interception.BrokerInterceptor;
import io.moquette.broker.subscriptions.CTrieSubscriptionDirectory;
import io.moquette.broker.subscriptions.ISubscriptionsDirectory;
import io.moquette.persistence.SegmentQueueRepository;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -168,7 +170,7 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler
}

public void startServer(IConfig config, List<? extends InterceptHandler> handlers, ISslContextCreator sslCtxCreator,
IAuthenticator authenticator, IAuthorizatorPolicy authorizatorPolicy) {
IAuthenticator authenticator, IAuthorizatorPolicy authorizatorPolicy) throws IOException {
final long start = System.currentTimeMillis();
if (handlers == null) {
handlers = Collections.emptyList();
Expand All @@ -181,8 +183,6 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler
if (handlerProp != null) {
config.setProperty(BrokerConstants.INTERCEPT_HANDLER_PROPERTY_NAME, handlerProp);
}
final String persistencePath = config.getProperty(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME);
LOG.debug("Configuring Using persistent store file, path: {}", persistencePath);
initInterceptors(config, handlers);
LOG.debug("Initialized MQTT protocol processor");
if (sslCtxCreator == null) {
Expand All @@ -195,11 +195,14 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler
final ISubscriptionsRepository subscriptionsRepository;
final IQueueRepository queueRepository;
final IRetainedRepository retainedRepository;
final String persistencePath = config.getProperty(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME);
LOG.debug("Configuring Using persistent store file, path: {}", persistencePath);
if (persistencePath != null && !persistencePath.isEmpty()) {
LOG.trace("Configuring H2 subscriptions store to {}", persistencePath);
LOG.info("Configuring H2 subscription store to {}", persistencePath);
h2Builder = new H2Builder(config, scheduler).initStore();
queueRepository = initQueuesRepository(config, persistencePath, h2Builder);
LOG.trace("Configuring H2 subscriptions store to {}", persistencePath);
subscriptionsRepository = h2Builder.subscriptionsRepository();
queueRepository = h2Builder.queueRepository();
retainedRepository = h2Builder.retainedRepository();
} else {
LOG.trace("Configuring in-memory subscriptions store");
Expand Down Expand Up @@ -233,6 +236,27 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler
initialized = true;
}

private static IQueueRepository initQueuesRepository(IConfig config, String persistencePath, H2Builder h2Builder) throws IOException {
final IQueueRepository queueRepository;
final String queueType = config.getProperty(BrokerConstants.PERSISTENT_QUEUE_TYPE_PROPERTY_NAME, "h2");
if ("h2".equalsIgnoreCase(queueType)) {
LOG.info("Configuring H2 queue store to {}", persistencePath);
queueRepository = h2Builder.queueRepository();
} else if ("segmented".equalsIgnoreCase(queueType)) {
final String segmentedPath = persistencePath.substring(0, persistencePath.lastIndexOf("/"));
LOG.info("Configuring segmented queue store to {}", segmentedPath);
try {
queueRepository = new SegmentQueueRepository(segmentedPath);
} catch (QueueException e) {
throw new IOException("Problem in configuring persistent queue on path " + segmentedPath, e);
}
} else {
final String errMsg = String.format("Invalid property for %s found [%s] while only h2 or segmented are admitted", BrokerConstants.PERSISTENT_QUEUE_TYPE_PROPERTY_NAME, queueType);
throw new RuntimeException(errMsg);
}
return queueRepository;
}

private void collectAndSendTelemetryDataAsynch(IConfig config) {
final Thread telCollector = new Thread(() -> collectAndSendTelemetryData(config));
telCollector.start();
Expand Down Expand Up @@ -505,6 +529,10 @@ public RoutingResults internalPublish(MqttPublishMessage msg, final String clien

public void stopServer() {
LOG.info("Unbinding integration from the configured ports");
if (acceptor == null) {
LOG.error("Closing a badly started server, exit immediately");
return;
}
acceptor.close();
LOG.trace("Stopping MQTT protocol processor");
initialized = false;
Expand All @@ -513,6 +541,8 @@ public void stopServer() {
// and SessionsRepository does not stop its tasks. Thus shutdownNow().
scheduler.shutdownNow();

sessions.close();

if (h2Builder != null) {
LOG.trace("Shutting down H2 persistence {}");
h2Builder.closeStore();
Expand Down
8 changes: 7 additions & 1 deletion broker/src/main/java/io/moquette/broker/SessionRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ private SessionCreationResult reopenExistingSession(MqttConnectMessage msg, Stri
oldSession.closeImmediately();
}


if (newIsClean) {
boolean result = oldSession.assignState(SessionStatus.DISCONNECTED, SessionStatus.DESTROYED);
if (!result) {
Expand Down Expand Up @@ -290,4 +289,11 @@ private Optional<ClientDescriptor> createClientDescriptor(Session s) {
final Optional<InetSocketAddress> remoteAddressOpt = s.remoteAddress();
return remoteAddressOpt.map(r -> new ClientDescriptor(clientID, r.getHostString(), r.getPort()));
}

/**
* Close all resources related to session management
* */
public void close() {
queueRepository.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -151,6 +152,10 @@ public static QueuePool loadQueues(Path dataPath) throws QueueException {
return queuePool;
}

public Set<String> queueNames() {
return queues.keySet().stream().map(qn -> qn.name).collect(Collectors.toSet());
}

private static Properties createOrLoadCheckpointFile(Path dataPath) throws QueueException {
final Path checkpointPath = dataPath.resolve("checkpoint.properties");
if (!Files.exists(checkpointPath)) {
Expand All @@ -159,6 +164,7 @@ private static Properties createOrLoadCheckpointFile(Path dataPath) throws Queue
try {
notExisted = checkpointPath.toFile().createNewFile();
} catch (IOException e) {
LOG.error("IO Error creating the file {}", checkpointPath, e);
throw new QueueException("Reached an IO error during the bootstrapping of empty 'checkpoint.properties'", e);
}
if (!notExisted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@
import io.moquette.broker.SessionRegistry.EnqueuedMessage;
import org.h2.mvstore.MVStore;

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;

public class H2QueueRepository implements IQueueRepository {
Expand Down Expand Up @@ -52,4 +48,9 @@ public boolean containsQueue(String queueName) {
public SessionMessageQueue<EnqueuedMessage> getOrCreateQueue(String clientId) {
return new H2PersistentQueue(mvStore, clientId);
}

@Override
public void close() {
// No-op
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package io.moquette.persistence;

import io.moquette.broker.AbstractSessionMessageQueue;
import io.moquette.broker.SessionRegistry;
import io.moquette.broker.subscriptions.Topic;
import io.moquette.broker.unsafequeues.Queue;
import io.moquette.broker.unsafequeues.QueueException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttQoS;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Optional;

public class SegmentPersistentQueue extends AbstractSessionMessageQueue<SessionRegistry.EnqueuedMessage> {

private static class SerDes {

private enum MessageType {PUB_REL_MARKER, PUBLISHED_MESSAGE}

public ByteBuffer toBytes(SessionRegistry.EnqueuedMessage message) {
final int memorySize = getMemory(message);
final ByteBuffer payload = ByteBuffer.allocate(memorySize);
payload.mark();
write(message, payload);
payload.reset();
return payload;
}

private void write(SessionRegistry.EnqueuedMessage obj, ByteBuffer buff) {
if (obj instanceof SessionRegistry.PublishedMessage) {
buff.put((byte) MessageType.PUBLISHED_MESSAGE.ordinal());

final SessionRegistry.PublishedMessage casted = (SessionRegistry.PublishedMessage) obj;
buff.put((byte) casted.getPublishingQos().value());

final String topic = casted.getTopic().toString();

writeTopic(buff, topic);
writePayload(buff, casted.getPayload());
} else if (obj instanceof SessionRegistry.PubRelMarker) {
buff.put((byte) MessageType.PUB_REL_MARKER.ordinal());
} else {
throw new IllegalArgumentException("Unrecognized message class " + obj.getClass());
}
}

private void writePayload(ByteBuffer target, ByteBuf source) {
final int payloadSize = source.readableBytes();
byte[] rawBytes = new byte[payloadSize];
final int pinPoint = source.readerIndex();
source.readBytes(rawBytes).release();
source.readerIndex(pinPoint);
target.putInt(payloadSize);
target.put(rawBytes);
}

private void writeTopic(ByteBuffer buff, String topic) {
final byte[] topicBytes = topic.getBytes(StandardCharsets.UTF_8);
buff.putInt(topicBytes.length).put(topicBytes);
}

private int getMemory(SessionRegistry.EnqueuedMessage obj) {
if (obj instanceof SessionRegistry.PubRelMarker) {
return 1;
}
final SessionRegistry.PublishedMessage casted = (SessionRegistry.PublishedMessage) obj;
return 1 + // message type
1 + // qos
topicMemorySize(casted.getTopic()) +
payloadMemorySize(casted.getPayload());
}

private int payloadMemorySize(ByteBuf payload) {
return 4 + // size
payload.readableBytes();
}

private int topicMemorySize(Topic topic) {
return 4 + // size
topic.toString().getBytes(StandardCharsets.UTF_8).length;
}

public SessionRegistry.EnqueuedMessage fromBytes(ByteBuffer buff) {
final byte messageType = buff.get();
if (messageType == MessageType.PUB_REL_MARKER.ordinal()) {
return new SessionRegistry.PubRelMarker();
} else if (messageType == MessageType.PUBLISHED_MESSAGE.ordinal()) {
final MqttQoS qos = MqttQoS.valueOf(buff.get());
final String topicStr = readTopic(buff);
final ByteBuf payload = readPayload(buff);
return new SessionRegistry.PublishedMessage(Topic.asTopic(topicStr), qos, payload, false);
} else {
throw new IllegalArgumentException("Can't recognize record of type: " + messageType);
}
}

private String readTopic(ByteBuffer buff) {
final int stringLen = buff.getInt();
final byte[] rawString = new byte[stringLen];
buff.get(rawString);
return new String(rawString, StandardCharsets.UTF_8);
}

private ByteBuf readPayload(ByteBuffer buff) {
final int payloadSize = buff.getInt();
byte[] payload = new byte[payloadSize];
buff.get(payload);
return Unpooled.wrappedBuffer(payload);
}
}

private final Queue segmentedQueue;
private final SerDes serdes = new SerDes();

public SegmentPersistentQueue(Queue segmentedQueue) {
this.segmentedQueue = segmentedQueue;
}

@Override
public void enqueue(SessionRegistry.EnqueuedMessage message) {
checkEnqueuePreconditions(message);

final ByteBuffer payload = serdes.toBytes(message);
try {
segmentedQueue.enqueue(payload);
} catch (QueueException e) {
throw new RuntimeException(e);
}
}

@Override
public SessionRegistry.EnqueuedMessage dequeue() {
checkDequeuePreconditions();

final Optional<ByteBuffer> dequeue;
try {
dequeue = segmentedQueue.dequeue();
} catch (QueueException e) {
throw new RuntimeException(e);
}
if (!dequeue.isPresent()) {
return null;
}

final ByteBuffer content = dequeue.get();
return serdes.fromBytes(content);
}

@Override
public boolean isEmpty() {
return segmentedQueue.isEmpty();
}

@Override
public void closeAndPurge() {
closed = true;
}
}
Loading

0 comments on commit b8438e6

Please sign in to comment.