From 1c040a6f946bb9cba55ad87297ddc1e3154d8367 Mon Sep 17 00:00:00 2001 From: hexueyuan Date: Wed, 6 Nov 2024 21:07:01 +0800 Subject: [PATCH 1/7] Fix duplicate mapped file in mutil commitlog store path mode. --- .../store/AllocateMappedFileService.java | 24 ++++++---- .../store/MultiPathMappedFileQueue.java | 24 ++++++++-- .../store/MultiPathMappedFileQueueTest.java | 46 +++++++++++++++++++ 3 files changed, 81 insertions(+), 13 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java index d9cd602a65c..8ebba88da1b 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java +++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java @@ -39,7 +39,7 @@ public class AllocateMappedFileService extends ServiceThread { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private static int waitTimeOut = 1000 * 5; - private ConcurrentMap requestTable = + private ConcurrentMap requestTable = new ConcurrentHashMap<>(); private PriorityBlockingQueue requestQueue = new PriorityBlockingQueue<>(); @@ -50,6 +50,11 @@ public AllocateMappedFileService(DefaultMessageStore messageStore) { this.messageStore = messageStore; } + public String getCachedMappedFilePath(long createOffset) { + AllocateRequest req = requestTable.get(UtilAll.offset2FileName(createOffset)); + return req == null ? null : req.getFilePath(); + } + public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) { int canSubmitRequests = 2; if (this.messageStore.isTransientStorePoolEnable()) { @@ -59,14 +64,15 @@ public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String next } } + String nextFileName = nextFilePath.substring(nextFilePath.lastIndexOf("/") + 1); AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize); - boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null; + boolean nextPutOK = this.requestTable.putIfAbsent(nextFileName, nextReq) == null; if (nextPutOK) { if (canSubmitRequests <= 0) { log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " + "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.remainTransientStoreBufferNumbs()); - this.requestTable.remove(nextFilePath); + this.requestTable.remove(nextFileName); return null; } boolean offerOK = this.requestQueue.offer(nextReq); @@ -76,13 +82,14 @@ public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String next canSubmitRequests--; } + String nextNextFileName = nextNextFilePath.substring(nextNextFilePath.lastIndexOf("/") + 1); AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize); - boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null; + boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFileName, nextNextReq) == null; if (nextNextPutOK) { if (canSubmitRequests <= 0) { log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " + "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.remainTransientStoreBufferNumbs()); - this.requestTable.remove(nextNextFilePath); + this.requestTable.remove(nextNextFileName); } else { boolean offerOK = this.requestQueue.offer(nextNextReq); if (!offerOK) { @@ -96,7 +103,7 @@ public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String next return null; } - AllocateRequest result = this.requestTable.get(nextFilePath); + AllocateRequest result = this.requestTable.get(nextFileName); try { if (result != null) { messageStore.getPerfCounter().startTick("WAIT_MAPFILE_TIME_MS"); @@ -106,7 +113,7 @@ public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String next log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize()); return null; } else { - this.requestTable.remove(nextFilePath); + this.requestTable.remove(nextFileName); return result.getMappedFile(); } } else { @@ -156,7 +163,8 @@ private boolean mmapOperation() { AllocateRequest req = null; try { req = this.requestQueue.take(); - AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath()); + String fileName = req.getFilePath().substring(req.getFilePath().lastIndexOf("/") + 1); + AllocateRequest expectedRequest = this.requestTable.get(fileName); if (null == expectedRequest) { log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " " + req.getFileSize()); diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java index 8ff050dfe3b..c1f7413d92f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java @@ -37,8 +37,8 @@ public class MultiPathMappedFileQueue extends MappedFileQueue { private final Supplier> fullStorePathsSupplier; public MultiPathMappedFileQueue(MessageStoreConfig messageStoreConfig, int mappedFileSize, - AllocateMappedFileService allocateMappedFileService, - Supplier> fullStorePathsSupplier) { + AllocateMappedFileService allocateMappedFileService, + Supplier> fullStorePathsSupplier) { super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, allocateMappedFileService); this.config = messageStoreConfig; this.fullStorePathsSupplier = fullStorePathsSupplier; @@ -81,7 +81,7 @@ public MappedFile tryCreateMappedFile(long createOffset) { Set storePath = getPaths(); Set readonlyPathSet = getReadonlyPaths(); Set fullStorePaths = - fullStorePathsSupplier == null ? Collections.emptySet() : fullStorePathsSupplier.get(); + fullStorePathsSupplier == null ? Collections.emptySet() : fullStorePathsSupplier.get(); HashSet availableStorePath = new HashSet<>(storePath); @@ -99,10 +99,24 @@ public MappedFile tryCreateMappedFile(long createOffset) { String[] paths = availableStorePath.toArray(new String[]{}); Arrays.sort(paths); - String nextFilePath = paths[(int) (fileIdx % paths.length)] + File.separator + + String nextFilePath = null; + String nextNextFilePath = null; + if (allocateMappedFileService != null) { + nextFilePath = allocateMappedFileService.getCachedMappedFilePath( + createOffset); + nextNextFilePath = allocateMappedFileService.getCachedMappedFilePath( + createOffset + this.mappedFileSize); + } + if (nextFilePath == null) { + nextFilePath = paths[(int) (fileIdx % paths.length)] + File.separator + UtilAll.offset2FileName(createOffset); - String nextNextFilePath = paths[(int) ((fileIdx + 1) % paths.length)] + File.separator + } + if (nextNextFilePath == null) { + nextNextFilePath = paths[(int) ((fileIdx + 1) % paths.length)] + File.separator + UtilAll.offset2FileName(createOffset + this.mappedFileSize); + } + return doCreateMappedFile(nextFilePath, nextNextFilePath); } diff --git a/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java index 07037aa03c8..ef285f88a69 100644 --- a/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java @@ -18,12 +18,19 @@ package org.apache.rocketmq.store; import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; import java.util.HashSet; +import java.util.Objects; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.logfile.MappedFile; +import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.junit.Test; @@ -153,4 +160,43 @@ public void testFullStorePath() { mappedFileQueue.shutdown(1000); mappedFileQueue.destroy(); } + + @Test + public void testUniqueNextNextMappedFile() throws IOException { + Set fullStorePath = new HashSet<>(); + + MessageStoreConfig config = new MessageStoreConfig(); + config.setStorePathCommitLog("target/unit_test_store/a" + MixAll.MULTI_PATH_SPLITTER + + "target/unit_test_store/b" + MixAll.MULTI_PATH_SPLITTER + + "target/unit_test_store/c"); + + DefaultMessageStore messageStore = new DefaultMessageStore(config, + new BrokerStatsManager("CommitlogTest", true), + (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> {}, + new BrokerConfig(), + new ConcurrentHashMap<>()); + + AllocateMappedFileService allocateMappedFileService = new AllocateMappedFileService(messageStore); + allocateMappedFileService.start(); + + MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, allocateMappedFileService, () -> fullStorePath); + String[] storePaths = config.getStorePathCommitLog().trim().split(MixAll.MULTI_PATH_SPLITTER); + assertThat(storePaths.length).isEqualTo(3); + + // the nextFilePath is "target/unit_test_store/a/00000000000000000000" + // the first invoke will insert nextNextFilePath "target/unit_test_store/b/00000000000000001024" into requestTable + mappedFileQueue.tryCreateMappedFile(0); + + // mark target/unit_test_store/b/ as full + fullStorePath.add("target/unit_test_store/b"); + // the nextFilePath is still "target/unit_test_store/b/00000000000000001024" + MappedFile mappedFile1 = mappedFileQueue.tryCreateMappedFile(1024); + + assertThat(mappedFile1).isNotNull(); + assertThat(mappedFile1.getFile().getPath()).isEqualTo("target/unit_test_store/b/00000000000000001024"); + + mappedFileQueue.shutdown(1000); + mappedFileQueue.destroy(); + allocateMappedFileService.shutdown(); + } } \ No newline at end of file From 895445c642ec44f843a36f1c718cacc93d43f853 Mon Sep 17 00:00:00 2001 From: hexueyuan Date: Thu, 7 Nov 2024 11:17:22 +0800 Subject: [PATCH 2/7] Fix duplicate mapped file in mutil commitlog store path mode. --- .../org/apache/rocketmq/store/MultiPathMappedFileQueue.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java index c1f7413d92f..5f2281f0cf4 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java @@ -37,8 +37,8 @@ public class MultiPathMappedFileQueue extends MappedFileQueue { private final Supplier> fullStorePathsSupplier; public MultiPathMappedFileQueue(MessageStoreConfig messageStoreConfig, int mappedFileSize, - AllocateMappedFileService allocateMappedFileService, - Supplier> fullStorePathsSupplier) { + AllocateMappedFileService allocateMappedFileService, + Supplier> fullStorePathsSupplier) { super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, allocateMappedFileService); this.config = messageStoreConfig; this.fullStorePathsSupplier = fullStorePathsSupplier; @@ -81,7 +81,7 @@ public MappedFile tryCreateMappedFile(long createOffset) { Set storePath = getPaths(); Set readonlyPathSet = getReadonlyPaths(); Set fullStorePaths = - fullStorePathsSupplier == null ? Collections.emptySet() : fullStorePathsSupplier.get(); + fullStorePathsSupplier == null ? Collections.emptySet() : fullStorePathsSupplier.get(); HashSet availableStorePath = new HashSet<>(storePath); From 6b072f393f4b32fe9c61aca298c5c8ceea56d16e Mon Sep 17 00:00:00 2001 From: hexueyuan Date: Sat, 9 Nov 2024 14:09:22 +0800 Subject: [PATCH 3/7] Fix duplicate mapped file in mutil commitlog store path mode. --- .../org/apache/rocketmq/store/AllocateMappedFileService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java index 8ebba88da1b..273f732dac0 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java +++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java @@ -64,7 +64,7 @@ public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String next } } - String nextFileName = nextFilePath.substring(nextFilePath.lastIndexOf("/") + 1); + String nextFileName = nextFilePath.substring(nextFilePath.lastIndexOf(File.separator) + 1); AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize); boolean nextPutOK = this.requestTable.putIfAbsent(nextFileName, nextReq) == null; @@ -82,7 +82,7 @@ public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String next canSubmitRequests--; } - String nextNextFileName = nextNextFilePath.substring(nextNextFilePath.lastIndexOf("/") + 1); + String nextNextFileName = nextNextFilePath.substring(nextNextFilePath.lastIndexOf(File.separator) + 1); AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize); boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFileName, nextNextReq) == null; if (nextNextPutOK) { From 06631368834ae81da1f87d5a6a2459eddcf09fca Mon Sep 17 00:00:00 2001 From: hexueyuan Date: Mon, 11 Nov 2024 10:30:51 +0800 Subject: [PATCH 4/7] Fix duplicate mapped file in mutil commitlog store path mode. --- .../apache/rocketmq/store/MultiPathMappedFileQueueTest.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java index ef285f88a69..991757dc8d5 100644 --- a/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java @@ -19,10 +19,8 @@ import static org.assertj.core.api.Assertions.assertThat; -import java.io.File; import java.io.IOException; import java.util.HashSet; -import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.common.BrokerConfig; @@ -172,7 +170,7 @@ public void testUniqueNextNextMappedFile() throws IOException { DefaultMessageStore messageStore = new DefaultMessageStore(config, new BrokerStatsManager("CommitlogTest", true), - (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> {}, + null, new BrokerConfig(), new ConcurrentHashMap<>()); From f2759f8355050d799a9295747af9201c2327e8bd Mon Sep 17 00:00:00 2001 From: hexueyuan Date: Mon, 11 Nov 2024 17:56:32 +0800 Subject: [PATCH 5/7] Fix duplicate mapped file in mutil commitlog store path mode. --- .../apache/rocketmq/store/AllocateMappedFileService.java | 2 +- .../apache/rocketmq/store/MultiPathMappedFileQueueTest.java | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java index 273f732dac0..79227a3567a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java +++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java @@ -163,7 +163,7 @@ private boolean mmapOperation() { AllocateRequest req = null; try { req = this.requestQueue.take(); - String fileName = req.getFilePath().substring(req.getFilePath().lastIndexOf("/") + 1); + String fileName = req.getFilePath().substring(req.getFilePath().lastIndexOf(File.separator) + 1); AllocateRequest expectedRequest = this.requestTable.get(fileName); if (null == expectedRequest) { log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " " diff --git a/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java index 991757dc8d5..3b22386789f 100644 --- a/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java @@ -19,7 +19,9 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -191,7 +193,9 @@ public void testUniqueNextNextMappedFile() throws IOException { MappedFile mappedFile1 = mappedFileQueue.tryCreateMappedFile(1024); assertThat(mappedFile1).isNotNull(); - assertThat(mappedFile1.getFile().getPath()).isEqualTo("target/unit_test_store/b/00000000000000001024"); + assertThat(mappedFile1.getFile().getPath()).isEqualTo( + String.join(File.separator, Arrays.asList("target", "unit_test_store", "b", "00000000000000001024")) + ); mappedFileQueue.shutdown(1000); mappedFileQueue.destroy(); From c22f48b21dba1b42eea8c81d40d9fc7f4486fd8f Mon Sep 17 00:00:00 2001 From: hexueyuan Date: Fri, 15 Nov 2024 10:53:01 +0800 Subject: [PATCH 6/7] Fix duplicate mapped file in mutil commitlog store path mode. --- .../org/apache/rocketmq/store/AllocateMappedFileService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java index 79227a3567a..8646c7e4e2e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java +++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java @@ -51,6 +51,7 @@ public AllocateMappedFileService(DefaultMessageStore messageStore) { } public String getCachedMappedFilePath(long createOffset) { + AllocateRequest req = requestTable.get(UtilAll.offset2FileName(createOffset)); return req == null ? null : req.getFilePath(); } From 135f0a27fa92eec857a986e264024949da687450 Mon Sep 17 00:00:00 2001 From: hexueyuan Date: Fri, 15 Nov 2024 10:53:27 +0800 Subject: [PATCH 7/7] Fix duplicate mapped file in mutil commitlog store path mode. --- .../org/apache/rocketmq/store/AllocateMappedFileService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java index 8646c7e4e2e..79227a3567a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java +++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java @@ -51,7 +51,6 @@ public AllocateMappedFileService(DefaultMessageStore messageStore) { } public String getCachedMappedFilePath(long createOffset) { - AllocateRequest req = requestTable.get(UtilAll.offset2FileName(createOffset)); return req == null ? null : req.getFilePath(); }