Skip to content

Commit

Permalink
Add dleger commitlog with tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dongeforever committed Nov 20, 2018
1 parent 0f153b9 commit 515bc35
Show file tree
Hide file tree
Showing 13 changed files with 914 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class MessageDecoder {
public final static int MESSAGE_MAGIC_CODE = -626843481;
public static final char NAME_VALUE_SEPARATOR = 1;
public static final char PROPERTY_SEPARATOR = 2;
public static final int PHY_POS_POSITION = 4 + 4 + 4 + 4 + 4 + 8;
public static final int BODY_SIZE_POSITION = 4 // 1 TOTALSIZE
+ 4 // 2 MAGICCODE
+ 4 // 3 BODYCRC
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@
<maven.test.skip>false</maven.test.skip>
<maven.javadoc.skip>true</maven.javadoc.skip>
<!-- Compiler settings properties -->
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin>
<!-- Exclude all generated code -->
<sonar.jacoco.itReportPath>${project.basedir}/../test/target/jacoco-it.exec</sonar.jacoco.itReportPath>
Expand Down
15 changes: 15 additions & 0 deletions store/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,21 @@
<name>rocketmq-store ${project.version}</name>

<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-dleger</artifactId>
<version>0.1-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-remoting</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-common</artifactId>
Expand Down
16 changes: 8 additions & 8 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,23 @@
public class CommitLog {
// Message's MAGIC CODE daa320a7
public final static int MESSAGE_MAGIC_CODE = -626843481;
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// End of file empty MAGIC CODE cbd43194
private final static int BLANK_MAGIC_CODE = -875286124;
protected final static int BLANK_MAGIC_CODE = -875286124;
private final MappedFileQueue mappedFileQueue;
private final DefaultMessageStore defaultMessageStore;
protected final DefaultMessageStore defaultMessageStore;
private final FlushCommitLogService flushCommitLogService;

//If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
private final FlushCommitLogService commitLogService;

private final AppendMessageCallback appendMessageCallback;
private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal;
private HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
private volatile long confirmOffset = -1L;
protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
protected volatile long confirmOffset = -1L;

private volatile long beginTimeInLock = 0;
private final PutMessageLock putMessageLock;
protected volatile long beginTimeInLock = 0;
protected final PutMessageLock putMessageLock;

public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
Expand Down Expand Up @@ -366,7 +366,7 @@ public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer,
return new DispatchRequest(-1, false /* success */);
}

private static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) {
protected static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) {
final int msgLen = 4 //TOTALSIZE
+ 4 //MAGICCODE
+ 4 //BODYCRC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ public void correctMinOffset(long phyMinOffset) {
long tagsCode = result.getByteBuffer().getLong();

if (offsetPy >= phyMinOffset) {
this.minLogicOffset = result.getMappedFile().getFileFromOffset() + i;
this.minLogicOffset = mappedFile.getFileFromOffset() + i;
log.info("Compute logical min offset: {}, topic: {}, queueId: {}",
this.getMinOffsetInQueue(), this.topic, this.queueId);
// This maybe not take effect, when not every consume queue has extend file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.dleger.DLegerCommitLog;
import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.index.IndexService;
import org.apache.rocketmq.store.index.QueryOffsetResult;
Expand Down Expand Up @@ -119,7 +120,11 @@ public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final Br
this.messageStoreConfig = messageStoreConfig;
this.brokerStatsManager = brokerStatsManager;
this.allocateMappedFileService = new AllocateMappedFileService(this);
this.commitLog = new CommitLog(this);
if (messageStoreConfig.isEnableDLegerCommitLog()) {
this.commitLog = new DLegerCommitLog(this);
} else {
this.commitLog = new CommitLog(this);
}
this.consumeQueueTable = new ConcurrentHashMap<>(32);

this.flushConsumeQueueService = new FlushConsumeQueueService();
Expand Down Expand Up @@ -1763,7 +1768,7 @@ private void doReput() {
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getMsgSize();
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();

if (dispatchRequest.isSuccess()) {
if (size > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class DispatchRequest {
private final String topic;
private final int queueId;
private final long commitLogOffset;
private final int msgSize;
private int msgSize;
private final long tagsCode;
private final long storeTimestamp;
private final long consumeQueueOffset;
Expand All @@ -35,6 +35,8 @@ public class DispatchRequest {
private final Map<String, String> propertiesMap;
private byte[] bitMap;

private int bufferSize = -1;//the buffer size maybe larger than the msg size if the message is wrapped by something

public DispatchRequest(
final String topic,
final int queueId,
Expand Down Expand Up @@ -156,4 +158,16 @@ public byte[] getBitMap() {
public void setBitMap(byte[] bitMap) {
this.bitMap = bitMap;
}

public void setMsgSize(int msgSize) {
this.msgSize = msgSize;
}

public int getBufferSize() {
return bufferSize;
}

public void setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ public void setSize(final int s) {
this.byteBuffer.limit(this.size);
}

public MappedFile getMappedFile() {
/* public MappedFile getMappedFile() {
return mappedFile;
}
}*/

// @Override
// protected void finalize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ public class MessageStoreConfig {
private int transientStorePoolSize = 5;
private boolean fastFailIfNoBufferInStorePool = false;

private boolean enableDLegerCommitLog;
private String dLegerGroup;
private String dLegerPeers;
private String dLegerSelfId;

public boolean isDebugLockEnable() {
return debugLockEnable;
}
Expand Down Expand Up @@ -666,4 +671,35 @@ public void setCommitCommitLogThoroughInterval(final int commitCommitLogThorough
this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval;
}

public String getdLegerGroup() {
return dLegerGroup;
}

public void setdLegerGroup(String dLegerGroup) {
this.dLegerGroup = dLegerGroup;
}

public String getdLegerPeers() {
return dLegerPeers;
}

public void setdLegerPeers(String dLegerPeers) {
this.dLegerPeers = dLegerPeers;
}

public String getdLegerSelfId() {
return dLegerSelfId;
}

public void setdLegerSelfId(String dLegerSelfId) {
this.dLegerSelfId = dLegerSelfId;
}

public boolean isEnableDLegerCommitLog() {
return enableDLegerCommitLog;
}

public void setEnableDLegerCommitLog(boolean enableDLegerCommitLog) {
this.enableDLegerCommitLog = enableDLegerCommitLog;
}
}
Loading

0 comments on commit 515bc35

Please sign in to comment.