Skip to content

Commit

Permalink
Switched boolean flag to immediate flushing to integer (#738)
Browse files Browse the repository at this point in the history
Introduces the ability to specify the flush interval for IO write operations, changing the default, to an immediate flush.

- Changed the value of configuration setting 'immediate_buffer_flush' from boolean to a number (expressing milliseconds)
- Added 'buffer_flush_millis' setting which overrides the deprecated 'immediate_buffer_flush'
- Changed default behavior from "no flush on every write" to flush on each write.
- Used the value of the new 'buffer_flush_millis' settings to initialize the auto flusher.
  • Loading branch information
andsel authored Mar 4, 2023
1 parent 399c503 commit b734501
Show file tree
Hide file tree
Showing 16 changed files with 113 additions and 40 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Version 0.17-SNAPSHOT:
[feature] introduce new `buffer_flush_millis` deprecating the old `immediate_buffer_flush` and switch default behavior to flush on every write (#738).
[refactory] purge of session state also on disconnect and reused logic (#715).
[feature] add `moquette.session_loop.debug` property to enable session loop checking assignments (#714).
[break] deprecate `persistent_store` to separate the enablement of persistence with `persistence_enabled` and the path `data_path` (#706).
Expand Down
13 changes: 13 additions & 0 deletions broker/src/main/java/io/moquette/BrokerConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,20 @@ public final class BrokerConstants {
public static final String NETTY_EPOLL_PROPERTY_NAME = "netty.epoll";
public static final String NETTY_MAX_BYTES_PROPERTY_NAME = "netty.mqtt.message_size";
public static final int DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE = 8092;
/**
* @deprecated use the BUFFER_FLUSH_MS_PROPERTY_NAME
* */
@Deprecated
public static final String IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME = "immediate_buffer_flush";
/**
* 0/immediate means immediate flush, like immediate_buffer_flush = true
* -1/full means no explicit flush, let Netty flush when write buffers are full, like immediate_buffer_flush = false
* a number of milliseconds to between flushes
* */
public static final String BUFFER_FLUSH_MS_PROPERTY_NAME = "buffer_flush_millis";
public static final int NO_BUFFER_FLUSH = -1;
public static final int IMMEDIATE_BUFFER_FLUSH = 0;

public static final String METRICS_ENABLE_PROPERTY_NAME = "use_metrics";
public static final String METRICS_LIBRATO_EMAIL_PROPERTY_NAME = "metrics.librato.email";
public static final String METRICS_LIBRATO_TOKEN_PROPERTY_NAME = "metrics.librato.token";
Expand Down
43 changes: 37 additions & 6 deletions broker/src/main/java/io/moquette/broker/BrokerConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,57 @@
import io.moquette.BrokerConstants;
import io.moquette.broker.config.IConfig;

import java.util.Locale;

class BrokerConfiguration {

private final boolean allowAnonymous;
private final boolean allowZeroByteClientId;
private final boolean reauthorizeSubscriptionsOnConnect;
private final boolean immediateBufferFlush;
private final int bufferFlushMillis;

BrokerConfiguration(IConfig props) {
allowAnonymous = props.boolProp(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, true);
allowZeroByteClientId = props.boolProp(BrokerConstants.ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME, false);
reauthorizeSubscriptionsOnConnect = props.boolProp(BrokerConstants.REAUTHORIZE_SUBSCRIPTIONS_ON_CONNECT, false);
immediateBufferFlush = props.boolProp(BrokerConstants.IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME, false);

// BUFFER_FLUSH_MS_PROPERTY_NAME has precedence over the deprecated IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME
final String bufferFlushMillisProp = props.getProperty(BrokerConstants.BUFFER_FLUSH_MS_PROPERTY_NAME);
if (bufferFlushMillisProp != null && !bufferFlushMillisProp.isEmpty()) {
switch (bufferFlushMillisProp.toLowerCase(Locale.ROOT)) {
case "immediate":
bufferFlushMillis = BrokerConstants. IMMEDIATE_BUFFER_FLUSH;
break;
case "full":
bufferFlushMillis = BrokerConstants.NO_BUFFER_FLUSH;
break;
default:
final String errorMsg = String.format("Can't state value of %s property. Has to be 'immediate', " +
"'full' or a number >= -1, found %s", BrokerConstants.BUFFER_FLUSH_MS_PROPERTY_NAME, bufferFlushMillisProp);
try {
bufferFlushMillis = Integer.parseInt(bufferFlushMillisProp);
if (bufferFlushMillis < -1) {
throw new IllegalArgumentException(errorMsg);
}
} catch (NumberFormatException ex) {
throw new IllegalArgumentException(errorMsg);
}
}
} else {
if (props.boolProp(BrokerConstants.IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME, true)) {
bufferFlushMillis = BrokerConstants.IMMEDIATE_BUFFER_FLUSH;
} else {
bufferFlushMillis = BrokerConstants.NO_BUFFER_FLUSH;
}
}
}

public BrokerConfiguration(boolean allowAnonymous, boolean allowZeroByteClientId,
boolean reauthorizeSubscriptionsOnConnect, boolean immediateBufferFlush) {
boolean reauthorizeSubscriptionsOnConnect, int bufferFlushMillis) {
this.allowAnonymous = allowAnonymous;
this.allowZeroByteClientId = allowZeroByteClientId;
this.reauthorizeSubscriptionsOnConnect = reauthorizeSubscriptionsOnConnect;
this.immediateBufferFlush = immediateBufferFlush;
this.bufferFlushMillis = bufferFlushMillis;
}

public boolean isAllowAnonymous() {
Expand All @@ -52,7 +83,7 @@ public boolean isReauthorizeSubscriptionsOnConnect() {
return reauthorizeSubscriptionsOnConnect;
}

public boolean isImmediateBufferFlush() {
return immediateBufferFlush;
public int getBufferFlushMillis() {
return bufferFlushMillis;
}
}
3 changes: 2 additions & 1 deletion broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.moquette.broker;

import io.moquette.BrokerConstants;
import io.moquette.broker.subscriptions.Topic;
import io.moquette.broker.security.IAuthenticator;
import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -535,7 +536,7 @@ void sendIfWritableElseDrop(MqttMessage msg) {
}

ChannelFuture channelFuture;
if (brokerConfig.isImmediateBufferFlush()) {
if (brokerConfig.getBufferFlushMillis() == BrokerConstants.IMMEDIATE_BUFFER_FLUSH) {
channelFuture = channel.writeAndFlush(retainedDup);
} else {
channelFuture = channel.write(retainedDup);
Expand Down
36 changes: 21 additions & 15 deletions broker/src/main/java/io/moquette/broker/NewNettyAcceptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void operationComplete(ChannelFuture future) throws Exception {

private Class<? extends ServerSocketChannel> channelClass;

public void initialize(NewNettyMQTTHandler mqttHandler, IConfig props, ISslContextCreator sslCtxCreator) {
public void initialize(NewNettyMQTTHandler mqttHandler, IConfig props, ISslContextCreator sslCtxCreator, BrokerConfiguration brokerConfiguration) {
LOG.debug("Initializing Netty acceptor");

nettySoBacklog = props.intProp(BrokerConstants.NETTY_SO_BACKLOG_PROPERTY_NAME, 128);
Expand Down Expand Up @@ -178,16 +178,16 @@ public void initialize(NewNettyMQTTHandler mqttHandler, IConfig props, ISslConte
} else {
this.errorsCather = Optional.empty();
}
initializePlainTCPTransport(mqttHandler, props);
initializeWebSocketTransport(mqttHandler, props);
initializePlainTCPTransport(mqttHandler, props, brokerConfiguration);
initializeWebSocketTransport(mqttHandler, props, brokerConfiguration);
if (securityPortsConfigured(props)) {
SslContext sslContext = sslCtxCreator.initSSLContext();
if (sslContext == null) {
LOG.error("Can't initialize SSLHandler layer! Exiting, check your configuration of jks");
return;
}
initializeSSLTCPTransport(mqttHandler, props, sslContext);
initializeWSSTransport(mqttHandler, props, sslContext);
initializeSSLTCPTransport(mqttHandler, props, sslContext, brokerConfiguration);
initializeWSSTransport(mqttHandler, props, sslContext, brokerConfiguration);
}
}

Expand Down Expand Up @@ -239,7 +239,7 @@ public int getSslPort() {
return ports.computeIfAbsent(SSL_MQTT_PROTO, i -> 0);
}

private void initializePlainTCPTransport(NewNettyMQTTHandler handler, IConfig props) {
private void initializePlainTCPTransport(NewNettyMQTTHandler handler, IConfig props, BrokerConfiguration brokerConfiguration) {
LOG.debug("Configuring TCP MQTT transport");
final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler();
String host = props.getProperty(BrokerConstants.HOST_PROPERTY_NAME);
Expand All @@ -250,26 +250,29 @@ private void initializePlainTCPTransport(NewNettyMQTTHandler handler, IConfig pr
return;
}
int port = Integer.parseInt(tcpPortProp);
final int writeFlushMillis = brokerConfiguration.getBufferFlushMillis();
initFactory(host, port, PLAIN_MQTT_PROTO, new PipelineInitializer() {

@Override
void init(SocketChannel channel) {
ChannelPipeline pipeline = channel.pipeline();
configureMQTTPipeline(pipeline, timeoutHandler, handler);
configureMQTTPipeline(pipeline, timeoutHandler, handler, writeFlushMillis);
}
});
}

private void configureMQTTPipeline(ChannelPipeline pipeline, MoquetteIdleTimeoutHandler timeoutHandler,
NewNettyMQTTHandler handler) {
NewNettyMQTTHandler handler, int writeFlushMillis) {
pipeline.addFirst("idleStateHandler", new IdleStateHandler(nettyChannelTimeoutSeconds, 0, 0));
pipeline.addAfter("idleStateHandler", "idleEventHandler", timeoutHandler);
// pipeline.addLast("logger", new LoggingHandler("Netty", LogLevel.ERROR));
if (errorsCather.isPresent()) {
pipeline.addLast("bugsnagCatcher", errorsCather.get());
}
pipeline.addFirst("bytemetrics", new BytesMetricsHandler(bytesMetricsCollector));
pipeline.addLast("autoflush", new AutoFlushHandler(1, TimeUnit.SECONDS));
if (writeFlushMillis > IMMEDIATE_BUFFER_FLUSH) {
pipeline.addLast("autoflush", new AutoFlushHandler(writeFlushMillis, TimeUnit.MILLISECONDS));
}
pipeline.addLast("decoder", new MqttDecoder(maxBytesInMessage));
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
pipeline.addLast("metrics", new MessageMetricsHandler(metricsCollector));
Expand All @@ -280,7 +283,7 @@ private void configureMQTTPipeline(ChannelPipeline pipeline, MoquetteIdleTimeout
pipeline.addLast("handler", handler);
}

private void initializeWebSocketTransport(final NewNettyMQTTHandler handler, IConfig props) {
private void initializeWebSocketTransport(final NewNettyMQTTHandler handler, IConfig props, BrokerConfiguration brokerConfiguration) {
LOG.debug("Configuring Websocket MQTT transport");
String webSocketPortProp = props.getProperty(WEB_SOCKET_PORT_PROPERTY_NAME, DISABLED_PORT_BIND);
if (DISABLED_PORT_BIND.equals(webSocketPortProp)) {
Expand All @@ -296,6 +299,7 @@ private void initializeWebSocketTransport(final NewNettyMQTTHandler handler, ICo
String host = props.getProperty(BrokerConstants.HOST_PROPERTY_NAME);
String path = props.getProperty(BrokerConstants.WEB_SOCKET_PATH_PROPERTY_NAME, BrokerConstants.WEBSOCKET_PATH);
int maxFrameSize = props.intProp(BrokerConstants.WEB_SOCKET_MAX_FRAME_SIZE_PROPERTY_NAME, 65536);
final int writeFlushMillis = brokerConfiguration.getBufferFlushMillis();
initFactory(host, port, "Websocket MQTT", new PipelineInitializer() {

@Override
Expand All @@ -307,12 +311,12 @@ void init(SocketChannel channel) {
new WebSocketServerProtocolHandler(path, MQTT_SUBPROTOCOL_CSV_LIST, false, maxFrameSize));
pipeline.addLast("ws2bytebufDecoder", new WebSocketFrameToByteBufDecoder());
pipeline.addLast("bytebuf2wsEncoder", new ByteBufToWebSocketFrameEncoder());
configureMQTTPipeline(pipeline, timeoutHandler, handler);
configureMQTTPipeline(pipeline, timeoutHandler, handler, writeFlushMillis);
}
});
}

private void initializeSSLTCPTransport(NewNettyMQTTHandler handler, IConfig props, SslContext sslContext) {
private void initializeSSLTCPTransport(NewNettyMQTTHandler handler, IConfig props, SslContext sslContext, BrokerConfiguration brokerConfiguration) {
LOG.debug("Configuring SSL MQTT transport");
String sslPortProp = props.getProperty(SSL_PORT_PROPERTY_NAME, DISABLED_PORT_BIND);
if (DISABLED_PORT_BIND.equals(sslPortProp)) {
Expand All @@ -329,18 +333,19 @@ private void initializeSSLTCPTransport(NewNettyMQTTHandler handler, IConfig prop
String host = props.getProperty(BrokerConstants.HOST_PROPERTY_NAME);
String sNeedsClientAuth = props.getProperty(BrokerConstants.NEED_CLIENT_AUTH, "false");
final boolean needsClientAuth = Boolean.valueOf(sNeedsClientAuth);
final int writeFlushMillis = brokerConfiguration.getBufferFlushMillis();
initFactory(host, sslPort, SSL_MQTT_PROTO, new PipelineInitializer() {

@Override
void init(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("ssl", createSslHandler(channel, sslContext, needsClientAuth));
configureMQTTPipeline(pipeline, timeoutHandler, handler);
configureMQTTPipeline(pipeline, timeoutHandler, handler, writeFlushMillis);
}
});
}

private void initializeWSSTransport(NewNettyMQTTHandler handler, IConfig props, SslContext sslContext) {
private void initializeWSSTransport(NewNettyMQTTHandler handler, IConfig props, SslContext sslContext, BrokerConfiguration brokerConfiguration) {
LOG.debug("Configuring secure websocket MQTT transport");
String sslPortProp = props.getProperty(WSS_PORT_PROPERTY_NAME, DISABLED_PORT_BIND);
if (DISABLED_PORT_BIND.equals(sslPortProp)) {
Expand All @@ -356,6 +361,7 @@ private void initializeWSSTransport(NewNettyMQTTHandler handler, IConfig props,
int maxFrameSize = props.intProp(BrokerConstants.WEB_SOCKET_MAX_FRAME_SIZE_PROPERTY_NAME, 65536);
String sNeedsClientAuth = props.getProperty(BrokerConstants.NEED_CLIENT_AUTH, "false");
final boolean needsClientAuth = Boolean.valueOf(sNeedsClientAuth);
final int writeFlushMillis = brokerConfiguration.getBufferFlushMillis();
initFactory(host, sslPort, "Secure websocket", new PipelineInitializer() {

@Override
Expand All @@ -370,7 +376,7 @@ void init(SocketChannel channel) throws Exception {
pipeline.addLast("ws2bytebufDecoder", new WebSocketFrameToByteBufDecoder());
pipeline.addLast("bytebuf2wsEncoder", new ByteBufToWebSocketFrameEncoder());

configureMQTTPipeline(pipeline, timeoutHandler, handler);
configureMQTTPipeline(pipeline, timeoutHandler, handler, writeFlushMillis);
}
});
}
Expand Down
2 changes: 1 addition & 1 deletion broker/src/main/java/io/moquette/broker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler

final NewNettyMQTTHandler mqttHandler = new NewNettyMQTTHandler(connectionFactory);
acceptor = new NewNettyAcceptor();
acceptor.initialize(mqttHandler, config, sslCtxCreator);
acceptor.initialize(mqttHandler, config, sslCtxCreator, brokerConfig);

final long startTime = System.currentTimeMillis() - start;
LOG.info("Moquette integration has been started successfully in {} ms", startTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.Properties;

import static io.moquette.BrokerConstants.IMMEDIATE_BUFFER_FLUSH;
import static org.junit.jupiter.api.Assertions.*;

public class BrokerConfigurationTest {
Expand All @@ -32,7 +33,7 @@ public void defaultConfig() {
assertTrue(brokerConfiguration.isAllowAnonymous());
assertFalse(brokerConfiguration.isAllowZeroByteClientId());
assertFalse(brokerConfiguration.isReauthorizeSubscriptionsOnConnect());
assertFalse(brokerConfiguration.isImmediateBufferFlush());
assertEquals(IMMEDIATE_BUFFER_FLUSH, brokerConfiguration.getBufferFlushMillis(), "Immediate flush by default");
}

@Test
Expand All @@ -44,7 +45,7 @@ public void configureAllowAnonymous() {
assertFalse(brokerConfiguration.isAllowAnonymous());
assertFalse(brokerConfiguration.isAllowZeroByteClientId());
assertFalse(brokerConfiguration.isReauthorizeSubscriptionsOnConnect());
assertFalse(brokerConfiguration.isImmediateBufferFlush());
assertEquals(IMMEDIATE_BUFFER_FLUSH, brokerConfiguration.getBufferFlushMillis(), "Immediate flush by default");
}

@Test
Expand All @@ -56,7 +57,7 @@ public void configureAllowZeroByteClientId() {
assertTrue(brokerConfiguration.isAllowAnonymous());
assertTrue(brokerConfiguration.isAllowZeroByteClientId());
assertFalse(brokerConfiguration.isReauthorizeSubscriptionsOnConnect());
assertFalse(brokerConfiguration.isImmediateBufferFlush());
assertEquals(IMMEDIATE_BUFFER_FLUSH, brokerConfiguration.getBufferFlushMillis(), "Immediate flush by default");
}

@Test
Expand All @@ -68,7 +69,7 @@ public void configureReauthorizeSubscriptionsOnConnect() {
assertTrue(brokerConfiguration.isAllowAnonymous());
assertFalse(brokerConfiguration.isAllowZeroByteClientId());
assertTrue(brokerConfiguration.isReauthorizeSubscriptionsOnConnect());
assertFalse(brokerConfiguration.isImmediateBufferFlush());
assertEquals(IMMEDIATE_BUFFER_FLUSH, brokerConfiguration.getBufferFlushMillis(), "Immediate flush by default");
}

@Test
Expand All @@ -80,6 +81,6 @@ public void configureImmediateBufferFlush() {
assertTrue(brokerConfiguration.isAllowAnonymous());
assertFalse(brokerConfiguration.isAllowZeroByteClientId());
assertFalse(brokerConfiguration.isReauthorizeSubscriptionsOnConnect());
assertTrue(brokerConfiguration.isImmediateBufferFlush());
assertEquals(IMMEDIATE_BUFFER_FLUSH, brokerConfiguration.getBufferFlushMillis(), "No immediate flush by default");
}
}
Loading

0 comments on commit b734501

Please sign in to comment.