Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#8859] Fix duplicate mapped file in mutil commitlog store path mode. #8897

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
public class AllocateMappedFileService extends ServiceThread {
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static int waitTimeOut = 1000 * 5;
private ConcurrentMap<String, AllocateRequest> requestTable =
private ConcurrentMap<String/*FileName*/, AllocateRequest> requestTable =
new ConcurrentHashMap<>();
private PriorityBlockingQueue<AllocateRequest> requestQueue =
new PriorityBlockingQueue<>();
Expand All @@ -50,6 +50,11 @@ public AllocateMappedFileService(DefaultMessageStore messageStore) {
this.messageStore = messageStore;
}

public String getCachedMappedFilePath(long createOffset) {
AllocateRequest req = requestTable.get(UtilAll.offset2FileName(createOffset));
return req == null ? null : req.getFilePath();
}

public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
int canSubmitRequests = 2;
if (this.messageStore.isTransientStorePoolEnable()) {
Expand All @@ -59,14 +64,15 @@ public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String next
}
}

String nextFileName = nextFilePath.substring(nextFilePath.lastIndexOf(File.separator) + 1);
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
boolean nextPutOK = this.requestTable.putIfAbsent(nextFileName, nextReq) == null;

if (nextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.remainTransientStoreBufferNumbs());
this.requestTable.remove(nextFilePath);
this.requestTable.remove(nextFileName);
return null;
}
boolean offerOK = this.requestQueue.offer(nextReq);
Expand All @@ -76,13 +82,14 @@ public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String next
canSubmitRequests--;
}

String nextNextFileName = nextNextFilePath.substring(nextNextFilePath.lastIndexOf(File.separator) + 1);
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFileName, nextNextReq) == null;
if (nextNextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.remainTransientStoreBufferNumbs());
this.requestTable.remove(nextNextFilePath);
this.requestTable.remove(nextNextFileName);
} else {
boolean offerOK = this.requestQueue.offer(nextNextReq);
if (!offerOK) {
Expand All @@ -96,7 +103,7 @@ public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String next
return null;
}

AllocateRequest result = this.requestTable.get(nextFilePath);
AllocateRequest result = this.requestTable.get(nextFileName);
try {
if (result != null) {
messageStore.getPerfCounter().startTick("WAIT_MAPFILE_TIME_MS");
Expand All @@ -106,7 +113,7 @@ public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String next
log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
return null;
} else {
this.requestTable.remove(nextFilePath);
this.requestTable.remove(nextFileName);
return result.getMappedFile();
}
} else {
Expand Down Expand Up @@ -156,7 +163,8 @@ private boolean mmapOperation() {
AllocateRequest req = null;
try {
req = this.requestQueue.take();
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
String fileName = req.getFilePath().substring(req.getFilePath().lastIndexOf(File.separator) + 1);
AllocateRequest expectedRequest = this.requestTable.get(fileName);
if (null == expectedRequest) {
log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,24 @@ public MappedFile tryCreateMappedFile(long createOffset) {

String[] paths = availableStorePath.toArray(new String[]{});
Arrays.sort(paths);
String nextFilePath = paths[(int) (fileIdx % paths.length)] + File.separator

String nextFilePath = null;
String nextNextFilePath = null;
if (allocateMappedFileService != null) {
nextFilePath = allocateMappedFileService.getCachedMappedFilePath(
createOffset);
nextNextFilePath = allocateMappedFileService.getCachedMappedFilePath(
createOffset + this.mappedFileSize);
}
if (nextFilePath == null) {
nextFilePath = paths[(int) (fileIdx % paths.length)] + File.separator
+ UtilAll.offset2FileName(createOffset);
String nextNextFilePath = paths[(int) ((fileIdx + 1) % paths.length)] + File.separator
}
if (nextNextFilePath == null) {
nextNextFilePath = paths[(int) ((fileIdx + 1) % paths.length)] + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
}

return doCreateMappedFile(nextFilePath, nextNextFilePath);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@
package org.apache.rocketmq.store;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Test;


Expand Down Expand Up @@ -153,4 +160,45 @@ public void testFullStorePath() {
mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
}

@Test
public void testUniqueNextNextMappedFile() throws IOException {
Set<String> fullStorePath = new HashSet<>();

MessageStoreConfig config = new MessageStoreConfig();
config.setStorePathCommitLog("target/unit_test_store/a" + MixAll.MULTI_PATH_SPLITTER
+ "target/unit_test_store/b" + MixAll.MULTI_PATH_SPLITTER
+ "target/unit_test_store/c");

DefaultMessageStore messageStore = new DefaultMessageStore(config,
new BrokerStatsManager("CommitlogTest", true),
null,
new BrokerConfig(),
new ConcurrentHashMap<>());

AllocateMappedFileService allocateMappedFileService = new AllocateMappedFileService(messageStore);
allocateMappedFileService.start();

MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, allocateMappedFileService, () -> fullStorePath);
String[] storePaths = config.getStorePathCommitLog().trim().split(MixAll.MULTI_PATH_SPLITTER);
assertThat(storePaths.length).isEqualTo(3);

// the nextFilePath is "target/unit_test_store/a/00000000000000000000"
// the first invoke will insert nextNextFilePath "target/unit_test_store/b/00000000000000001024" into requestTable
mappedFileQueue.tryCreateMappedFile(0);

// mark target/unit_test_store/b/ as full
fullStorePath.add("target/unit_test_store/b");
// the nextFilePath is still "target/unit_test_store/b/00000000000000001024"
MappedFile mappedFile1 = mappedFileQueue.tryCreateMappedFile(1024);

assertThat(mappedFile1).isNotNull();
assertThat(mappedFile1.getFile().getPath()).isEqualTo(
String.join(File.separator, Arrays.asList("target", "unit_test_store", "b", "00000000000000001024"))
);

mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
allocateMappedFileService.shutdown();
}
}