Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Created adapter layer to use Segmented queues #704

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
Version 0.17-SNAPSHOT:
[enhancement] introduced new queue implementation based on segments in memory mapped files. The type of queue implementation
could be selected by setting `persistent_queue_type` (#691, #704).

Version 0.16:
[build] drop generation of broker-test, removed distribution and embedding_moquette modules from deploy phase (#616)
Expand Down
1 change: 1 addition & 0 deletions broker/src/main/java/io/moquette/BrokerConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public final class BrokerConstants {
public static final String INTERCEPT_HANDLER_PROPERTY_NAME = "intercept.handler";
public static final String BROKER_INTERCEPTOR_THREAD_POOL_SIZE = "intercept.thread_pool.size";
public static final String PERSISTENT_STORE_PROPERTY_NAME = "persistent_store";
public static final String PERSISTENT_QUEUE_TYPE_PROPERTY_NAME = "persistent_queue_type"; // h2 or segmented, default h2
public static final String AUTOSAVE_INTERVAL_PROPERTY_NAME = "autosave_interval";
public static final String PASSWORD_FILE_PROPERTY_NAME = "password_file";
public static final String PORT_PROPERTY_NAME = "port";
Expand Down
5 changes: 4 additions & 1 deletion broker/src/main/java/io/moquette/broker/DebugUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
public final class DebugUtils {

public static String payload2Str(ByteBuf content) {
return content.copy().toString(StandardCharsets.UTF_8);
final int readerPin = content.readableBytes();
final String result = content.toString(StandardCharsets.UTF_8);
content.readerIndex(readerPin);
return result;
}

private DebugUtils() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.moquette.broker;

import java.util.Queue;
import java.util.Set;

public interface IQueueRepository {
Expand All @@ -10,4 +9,6 @@ public interface IQueueRepository {
boolean containsQueue(String clientId);

SessionMessageQueue<SessionRegistry.EnqueuedMessage> getOrCreateQueue(String clientId);

void close();
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.moquette.broker;

import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class MemoryQueueRepository implements IQueueRepository {

Expand All @@ -28,6 +30,11 @@ public SessionMessageQueue<SessionRegistry.EnqueuedMessage> getOrCreateQueue(Str
return queue;
}

@Override
public void close() {
queues.clear();
}

void dropQueue(String queueName) {
queues.remove(queueName);
}
Expand Down
40 changes: 35 additions & 5 deletions broker/src/main/java/io/moquette/broker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
import io.moquette.broker.security.IAuthorizatorPolicy;
import io.moquette.broker.security.PermitAllAuthorizatorPolicy;
import io.moquette.broker.security.ResourceAuthenticator;
import io.moquette.broker.unsafequeues.QueueException;
import io.moquette.interception.InterceptHandler;
import io.moquette.persistence.H2Builder;
import io.moquette.persistence.MemorySubscriptionsRepository;
import io.moquette.interception.BrokerInterceptor;
import io.moquette.broker.subscriptions.CTrieSubscriptionDirectory;
import io.moquette.broker.subscriptions.ISubscriptionsDirectory;
import io.moquette.persistence.SegmentQueueRepository;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -168,7 +170,7 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler
}

public void startServer(IConfig config, List<? extends InterceptHandler> handlers, ISslContextCreator sslCtxCreator,
IAuthenticator authenticator, IAuthorizatorPolicy authorizatorPolicy) {
IAuthenticator authenticator, IAuthorizatorPolicy authorizatorPolicy) throws IOException {
final long start = System.currentTimeMillis();
if (handlers == null) {
handlers = Collections.emptyList();
Expand All @@ -181,8 +183,6 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler
if (handlerProp != null) {
config.setProperty(BrokerConstants.INTERCEPT_HANDLER_PROPERTY_NAME, handlerProp);
}
final String persistencePath = config.getProperty(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME);
LOG.debug("Configuring Using persistent store file, path: {}", persistencePath);
initInterceptors(config, handlers);
LOG.debug("Initialized MQTT protocol processor");
if (sslCtxCreator == null) {
Expand All @@ -195,11 +195,14 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler
final ISubscriptionsRepository subscriptionsRepository;
final IQueueRepository queueRepository;
final IRetainedRepository retainedRepository;
final String persistencePath = config.getProperty(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME);
LOG.debug("Configuring Using persistent store file, path: {}", persistencePath);
if (persistencePath != null && !persistencePath.isEmpty()) {
LOG.trace("Configuring H2 subscriptions store to {}", persistencePath);
LOG.info("Configuring H2 subscription store to {}", persistencePath);
h2Builder = new H2Builder(config, scheduler).initStore();
queueRepository = initQueuesRepository(config, persistencePath, h2Builder);
LOG.trace("Configuring H2 subscriptions store to {}", persistencePath);
subscriptionsRepository = h2Builder.subscriptionsRepository();
queueRepository = h2Builder.queueRepository();
retainedRepository = h2Builder.retainedRepository();
} else {
LOG.trace("Configuring in-memory subscriptions store");
Expand Down Expand Up @@ -233,6 +236,27 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler
initialized = true;
}

private static IQueueRepository initQueuesRepository(IConfig config, String persistencePath, H2Builder h2Builder) throws IOException {
final IQueueRepository queueRepository;
final String queueType = config.getProperty(BrokerConstants.PERSISTENT_QUEUE_TYPE_PROPERTY_NAME, "h2");
if ("h2".equalsIgnoreCase(queueType)) {
LOG.info("Configuring H2 queue store to {}", persistencePath);
queueRepository = h2Builder.queueRepository();
} else if ("segmented".equalsIgnoreCase(queueType)) {
final String segmentedPath = persistencePath.substring(0, persistencePath.lastIndexOf("/"));
LOG.info("Configuring segmented queue store to {}", segmentedPath);
try {
queueRepository = new SegmentQueueRepository(segmentedPath);
} catch (QueueException e) {
throw new IOException("Problem in configuring persistent queue on path " + segmentedPath, e);
}
} else {
final String errMsg = String.format("Invalid property for %s found [%s] while only h2 or segmented are admitted", BrokerConstants.PERSISTENT_QUEUE_TYPE_PROPERTY_NAME, queueType);
throw new RuntimeException(errMsg);
}
return queueRepository;
}

private void collectAndSendTelemetryDataAsynch(IConfig config) {
final Thread telCollector = new Thread(() -> collectAndSendTelemetryData(config));
telCollector.start();
Expand Down Expand Up @@ -505,6 +529,10 @@ public RoutingResults internalPublish(MqttPublishMessage msg, final String clien

public void stopServer() {
LOG.info("Unbinding integration from the configured ports");
if (acceptor == null) {
LOG.error("Closing a badly started server, exit immediately");
return;
}
acceptor.close();
LOG.trace("Stopping MQTT protocol processor");
initialized = false;
Expand All @@ -513,6 +541,8 @@ public void stopServer() {
// and SessionsRepository does not stop its tasks. Thus shutdownNow().
scheduler.shutdownNow();

sessions.close();

if (h2Builder != null) {
LOG.trace("Shutting down H2 persistence {}");
h2Builder.closeStore();
Expand Down
8 changes: 7 additions & 1 deletion broker/src/main/java/io/moquette/broker/SessionRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ private SessionCreationResult reopenExistingSession(MqttConnectMessage msg, Stri
oldSession.closeImmediately();
}


if (newIsClean) {
boolean result = oldSession.assignState(SessionStatus.DISCONNECTED, SessionStatus.DESTROYED);
if (!result) {
Expand Down Expand Up @@ -290,4 +289,11 @@ private Optional<ClientDescriptor> createClientDescriptor(Session s) {
final Optional<InetSocketAddress> remoteAddressOpt = s.remoteAddress();
return remoteAddressOpt.map(r -> new ClientDescriptor(clientID, r.getHostString(), r.getPort()));
}

/**
* Close all resources related to session management
* */
public void close() {
queueRepository.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -151,6 +152,10 @@ public static QueuePool loadQueues(Path dataPath) throws QueueException {
return queuePool;
}

public Set<String> queueNames() {
return queues.keySet().stream().map(qn -> qn.name).collect(Collectors.toSet());
}

private static Properties createOrLoadCheckpointFile(Path dataPath) throws QueueException {
final Path checkpointPath = dataPath.resolve("checkpoint.properties");
if (!Files.exists(checkpointPath)) {
Expand All @@ -159,6 +164,7 @@ private static Properties createOrLoadCheckpointFile(Path dataPath) throws Queue
try {
notExisted = checkpointPath.toFile().createNewFile();
} catch (IOException e) {
LOG.error("IO Error creating the file {}", checkpointPath, e);
throw new QueueException("Reached an IO error during the bootstrapping of empty 'checkpoint.properties'", e);
}
if (!notExisted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@
import io.moquette.broker.SessionRegistry.EnqueuedMessage;
import org.h2.mvstore.MVStore;

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;

public class H2QueueRepository implements IQueueRepository {
Expand Down Expand Up @@ -52,4 +48,9 @@ public boolean containsQueue(String queueName) {
public SessionMessageQueue<EnqueuedMessage> getOrCreateQueue(String clientId) {
return new H2PersistentQueue(mvStore, clientId);
}

@Override
public void close() {
// No-op
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package io.moquette.persistence;

import io.moquette.broker.AbstractSessionMessageQueue;
import io.moquette.broker.SessionRegistry;
import io.moquette.broker.subscriptions.Topic;
import io.moquette.broker.unsafequeues.Queue;
import io.moquette.broker.unsafequeues.QueueException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttQoS;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Optional;

public class SegmentPersistentQueue extends AbstractSessionMessageQueue<SessionRegistry.EnqueuedMessage> {

private static class SerDes {

private enum MessageType {PUB_REL_MARKER, PUBLISHED_MESSAGE}

public ByteBuffer toBytes(SessionRegistry.EnqueuedMessage message) {
final int memorySize = getMemory(message);
final ByteBuffer payload = ByteBuffer.allocate(memorySize);
payload.mark();
write(message, payload);
payload.reset();
return payload;
}

private void write(SessionRegistry.EnqueuedMessage obj, ByteBuffer buff) {
if (obj instanceof SessionRegistry.PublishedMessage) {
buff.put((byte) MessageType.PUBLISHED_MESSAGE.ordinal());

final SessionRegistry.PublishedMessage casted = (SessionRegistry.PublishedMessage) obj;
buff.put((byte) casted.getPublishingQos().value());

final String topic = casted.getTopic().toString();

writeTopic(buff, topic);
writePayload(buff, casted.getPayload());
} else if (obj instanceof SessionRegistry.PubRelMarker) {
buff.put((byte) MessageType.PUB_REL_MARKER.ordinal());
} else {
throw new IllegalArgumentException("Unrecognized message class " + obj.getClass());
}
}

private void writePayload(ByteBuffer target, ByteBuf source) {
final int payloadSize = source.readableBytes();
byte[] rawBytes = new byte[payloadSize];
final int pinPoint = source.readerIndex();
source.readBytes(rawBytes).release();
source.readerIndex(pinPoint);
target.putInt(payloadSize);
target.put(rawBytes);
}

private void writeTopic(ByteBuffer buff, String topic) {
final byte[] topicBytes = topic.getBytes(StandardCharsets.UTF_8);
buff.putInt(topicBytes.length).put(topicBytes);
}

private int getMemory(SessionRegistry.EnqueuedMessage obj) {
if (obj instanceof SessionRegistry.PubRelMarker) {
return 1;
}
final SessionRegistry.PublishedMessage casted = (SessionRegistry.PublishedMessage) obj;
return 1 + // message type
1 + // qos
topicMemorySize(casted.getTopic()) +
payloadMemorySize(casted.getPayload());
}

private int payloadMemorySize(ByteBuf payload) {
return 4 + // size
payload.readableBytes();
}

private int topicMemorySize(Topic topic) {
return 4 + // size
topic.toString().getBytes(StandardCharsets.UTF_8).length;
}

public SessionRegistry.EnqueuedMessage fromBytes(ByteBuffer buff) {
final byte messageType = buff.get();
if (messageType == MessageType.PUB_REL_MARKER.ordinal()) {
return new SessionRegistry.PubRelMarker();
} else if (messageType == MessageType.PUBLISHED_MESSAGE.ordinal()) {
final MqttQoS qos = MqttQoS.valueOf(buff.get());
final String topicStr = readTopic(buff);
final ByteBuf payload = readPayload(buff);
return new SessionRegistry.PublishedMessage(Topic.asTopic(topicStr), qos, payload, false);
} else {
throw new IllegalArgumentException("Can't recognize record of type: " + messageType);
}
}

private String readTopic(ByteBuffer buff) {
final int stringLen = buff.getInt();
final byte[] rawString = new byte[stringLen];
buff.get(rawString);
return new String(rawString, StandardCharsets.UTF_8);
}

private ByteBuf readPayload(ByteBuffer buff) {
final int payloadSize = buff.getInt();
byte[] payload = new byte[payloadSize];
buff.get(payload);
return Unpooled.wrappedBuffer(payload);
}
}

private final Queue segmentedQueue;
private final SerDes serdes = new SerDes();

public SegmentPersistentQueue(Queue segmentedQueue) {
this.segmentedQueue = segmentedQueue;
}

@Override
public void enqueue(SessionRegistry.EnqueuedMessage message) {
checkEnqueuePreconditions(message);

final ByteBuffer payload = serdes.toBytes(message);
try {
segmentedQueue.enqueue(payload);
} catch (QueueException e) {
throw new RuntimeException(e);
}
}

@Override
public SessionRegistry.EnqueuedMessage dequeue() {
checkDequeuePreconditions();

final Optional<ByteBuffer> dequeue;
try {
dequeue = segmentedQueue.dequeue();
} catch (QueueException e) {
throw new RuntimeException(e);
}
if (!dequeue.isPresent()) {
return null;
}

final ByteBuffer content = dequeue.get();
return serdes.fromBytes(content);
}

@Override
public boolean isEmpty() {
return segmentedQueue.isEmpty();
}

@Override
public void closeAndPurge() {
closed = true;
}
}
Loading