From b3d35be9744e630811e2e8609c7f70f022e7de18 Mon Sep 17 00:00:00 2001 From: xleoken Date: Mon, 23 Oct 2023 10:26:16 +0800 Subject: [PATCH] [CELEBORN-1030] Improve the logic of delete md5 files when initializing SimpleStateMachineStorage ### What changes were proposed in this pull request? We need to delete md5 file init SimpleStateMachineStorage based on ratis-2.0.0, but the logic about cleanup md5 files already support after RATIS-1752, so we can optimize initialization. Remove `MasterStateMachineSuiteJ#testSnapshotCleanup`, it already test cleanup snapshots and md5 files in https://github.com/apache/ratis/blob/release-2.5.1/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java#L221
**links:** https://issues.apache.org/jira/browse/RATIS-1752 https://github.com/apache/ratis/blob/release-2.5.1/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java#L105 ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? local test. Closes #1966 from xleoken/patch. Authored-by: xleoken Signed-off-by: mingji --- .../master/clustermeta/ha/StateMachine.java | 111 +----------------- .../ha/MasterStateMachineSuiteJ.java | 98 ---------------- 2 files changed, 1 insertion(+), 208 deletions(-) diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java index 5b3582f6164..db537671e62 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java @@ -25,17 +25,9 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.nio.file.DirectoryStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; import java.util.Collection; -import java.util.Comparator; -import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -47,10 +39,8 @@ import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.raftlog.RaftLog; -import org.apache.ratis.server.storage.FileInfo; import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.statemachine.SnapshotInfo; -import org.apache.ratis.statemachine.SnapshotRetentionPolicy; import org.apache.ratis.statemachine.StateMachineStorage; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.statemachine.impl.BaseStateMachine; @@ -59,7 +49,6 @@ import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.ExitUtils; -import org.apache.ratis.util.FileUtils; import org.apache.ratis.util.LifeCycle; import org.apache.ratis.util.MD5FileUtil; import org.slf4j.Logger; @@ -72,94 +61,7 @@ public class StateMachine extends BaseStateMachine { private static final Logger LOG = LoggerFactory.getLogger(StateMachine.class); - public static final Pattern MD5_REGEX = Pattern.compile("snapshot\\.(\\d+)_(\\d+)\\.md5"); - - private final SimpleStateMachineStorage storage = - new SimpleStateMachineStorage() { - /** - * we need to delete md5 file as the same time as snapshot file deleted, so we override the - * SimpleStateMachineStorage.cleanupOldSnapshots method, add delete md5 file action. - * - * @param snapshotRetentionPolicy snapshot retention policy - * @throws IOException - */ - @Override - public void cleanupOldSnapshots(SnapshotRetentionPolicy snapshotRetentionPolicy) - throws IOException { - if (snapshotRetentionPolicy != null - && snapshotRetentionPolicy.getNumSnapshotsRetained() > 0) { - List allSnapshotFiles = new ArrayList<>(); - List allMD5Files = new ArrayList<>(); - try (DirectoryStream stream = - Files.newDirectoryStream(SimpleStateMachineStorageUtil.getSmDir(this).toPath())) { - for (Path path : stream) { - if (filePatternMatches(SNAPSHOT_REGEX, allSnapshotFiles, path)) { - continue; - } else { - filePatternMatches(MD5_REGEX, allMD5Files, path); - } - } - } - // first step, cleanup old snapshot and md5 file - SingleFileSnapshotInfo snapshotInfo = - cleanupOldFiles( - allSnapshotFiles, - snapshotRetentionPolicy.getNumSnapshotsRetained(), - false, - null); - // second step, cleanup only old md5 file - cleanupOldFiles( - allMD5Files, snapshotRetentionPolicy.getNumSnapshotsRetained(), true, snapshotInfo); - } - } - - private boolean filePatternMatches( - Pattern pattern, List result, Path filePath) { - Matcher md5Matcher = pattern.matcher(filePath.getFileName().toString()); - if (md5Matcher.matches()) { - final long endIndex = Long.parseLong(md5Matcher.group(2)); - final long term = Long.parseLong(md5Matcher.group(1)); - final FileInfo fileInfo = new FileInfo(filePath, null); - result.add(new SingleFileSnapshotInfo(fileInfo, term, endIndex)); - return true; - } - return false; - } - - private SingleFileSnapshotInfo cleanupOldFiles( - List inputFiles, - int retainedNum, - boolean onlyCleanupMD5Files, - SingleFileSnapshotInfo snapshotInfo) { - SingleFileSnapshotInfo result = null; - if (inputFiles.size() > retainedNum) { - inputFiles.sort(new RatisSnapshotFileComparator()); - List filesToBeCleaned = - inputFiles.subList(retainedNum, inputFiles.size()); - result = filesToBeCleaned.get(0); - for (SingleFileSnapshotInfo fileInfo : filesToBeCleaned) { - if ((null != snapshotInfo && (fileInfo.getIndex() >= snapshotInfo.getIndex()) - || (onlyCleanupMD5Files && null == snapshotInfo))) { - continue; - } - File file = fileInfo.getFile().getPath().toFile(); - if (onlyCleanupMD5Files) { - LOG.info("Deleting old md5 file at {}.", file.getAbsolutePath()); - FileUtils.deleteFileQuietly(file); - } else { - File md5File = new File(file.getAbsolutePath() + MD5FileUtil.MD5_SUFFIX); - LOG.info( - "Deleting old snapshot at {}, md5 file at {}.", - file.getAbsolutePath(), - md5File.getAbsolutePath()); - FileUtils.deleteFileQuietly(file); - FileUtils.deleteFileQuietly(md5File); - } - } - } - return result; - } - }; + private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); private final HARaftServer masterRatisServer; private RaftGroupId raftGroupId; @@ -419,14 +321,3 @@ public StateMachineStorage getStateMachineStorage() { return this.storage; } } - -/** - * Compare snapshot files based on transaction indexes. Copy from - * org.apache.ratis.statemachine.impl.SnapshotFileComparator - */ -class RatisSnapshotFileComparator implements Comparator { - @Override - public int compare(SingleFileSnapshotInfo file1, SingleFileSnapshotInfo file2) { - return (int) (file2.getIndex() - file1.getIndex()); - } -} diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java index a136a1ea707..51f669a5333 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java @@ -19,21 +19,11 @@ import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.ThreadLocalRandom; -import java.util.regex.Matcher; -import org.apache.ratis.server.storage.RaftStorage; -import org.apache.ratis.server.storage.StorageImplUtils; import org.apache.ratis.statemachine.SnapshotInfo; -import org.apache.ratis.statemachine.SnapshotRetentionPolicy; -import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; -import org.apache.ratis.statemachine.impl.SimpleStateMachineStorageUtil; import org.junit.Assert; import org.junit.Test; @@ -44,7 +34,6 @@ import org.apache.celeborn.common.meta.WorkerInfo; import org.apache.celeborn.common.quota.ResourceConsumption; import org.apache.celeborn.common.util.JavaUtils; -import org.apache.celeborn.common.util.Utils; import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos; import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.RequestSlotsRequest; import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.ResourceRequest; @@ -114,93 +103,6 @@ public void testTakeSnapshot() { Assert.assertEquals(1, latest.getFiles().size()); } - @Test - public void testSnapshotCleanup() throws IOException { - StateMachine stateMachine = ratisServer.getMasterStateMachine(); - SnapshotRetentionPolicy snapshotRetentionPolicy = - new SnapshotRetentionPolicy() { - @Override - public int getNumSnapshotsRetained() { - return 3; - } - }; - - File storageDir = Utils.createTempDir("./", "snapshot"); - - System.out.println(storageDir); - final RaftStorage storage = - StorageImplUtils.newRaftStorage(storageDir, null, RaftStorage.StartupOption.FORMAT, 100); - storage.initialize(); - SimpleStateMachineStorage simpleStateMachineStorage = - (SimpleStateMachineStorage) stateMachine.getStateMachineStorage(); - simpleStateMachineStorage.init(storage); - - List indices = new ArrayList<>(); - - // Create 5 snapshot files in storage dir. - for (int i = 0; i < 5; i++) { - final long term = ThreadLocalRandom.current().nextLong(3L, 10L); - final long index = ThreadLocalRandom.current().nextLong(100L, 1000L); - indices.add(index); - File snapshotFile = simpleStateMachineStorage.getSnapshotFile(term, index); - snapshotFile.createNewFile(); - File md5File = new File(snapshotFile.getAbsolutePath() + ".md5"); - md5File.createNewFile(); - } - - // following 2 md5 files will be deleted - File snapshotFile1 = simpleStateMachineStorage.getSnapshotFile(1, 1); - File md5File1 = new File(snapshotFile1.getAbsolutePath() + ".md5"); - md5File1.createNewFile(); - File snapshotFile2 = simpleStateMachineStorage.getSnapshotFile(5, 2); - File md5File2 = new File(snapshotFile2.getAbsolutePath() + ".md5"); - md5File2.createNewFile(); - // this md5 file will not be deleted - File snapshotFile3 = simpleStateMachineStorage.getSnapshotFile(11, 1001); - File md5File3 = new File(snapshotFile3.getAbsolutePath() + ".md5"); - md5File3.createNewFile(); - - File stateMachineDir = SimpleStateMachineStorageUtil.getSmDir(simpleStateMachineStorage); - Assert.assertTrue(stateMachineDir.listFiles().length == 13); - simpleStateMachineStorage.cleanupOldSnapshots(snapshotRetentionPolicy); - File[] remainingFiles = stateMachineDir.listFiles(); - Assert.assertTrue(remainingFiles.length == 7); - - Collections.sort(indices); - Collections.reverse(indices); - List remainingIndices = indices.subList(0, 3); - // check snapshot file and its md5 file management - for (File file : remainingFiles) { - System.out.println(file.getName()); - Matcher matcher = SimpleStateMachineStorage.SNAPSHOT_REGEX.matcher(file.getName()); - if (matcher.matches()) { - Assert.assertTrue(remainingIndices.contains(Long.parseLong(matcher.group(2)))); - Assert.assertTrue(new File(file.getAbsolutePath() + ".md5").exists()); - } - } - - // Attempt to clean up again should not delete any more files. - simpleStateMachineStorage.cleanupOldSnapshots(snapshotRetentionPolicy); - remainingFiles = stateMachineDir.listFiles(); - Assert.assertTrue(remainingFiles.length == 7); - - // Test with Retention disabled. - // Create 2 snapshot files in storage dir. - for (int i = 0; i < 2; i++) { - final long term = ThreadLocalRandom.current().nextLong(10L); - final long index = ThreadLocalRandom.current().nextLong(1000L); - indices.add(index); - File snapshotFile = simpleStateMachineStorage.getSnapshotFile(term, index); - snapshotFile.createNewFile(); - File md5File = new File(snapshotFile.getAbsolutePath() + ".md5"); - md5File.createNewFile(); - } - - simpleStateMachineStorage.cleanupOldSnapshots(new SnapshotRetentionPolicy() {}); - - Assert.assertTrue(stateMachineDir.listFiles().length == 11); - } - @Test public void testObjSerde() throws IOException, InterruptedException { CelebornConf conf = new CelebornConf();