Skip to content

Commit

Permalink
[CELEBORN-1030] Improve the logic of delete md5 files when initializi…
Browse files Browse the repository at this point in the history
…ng SimpleStateMachineStorage

### What changes were proposed in this pull request?

We need to delete md5 file init SimpleStateMachineStorage based on ratis-2.0.0, but the logic about cleanup md5 files already support after RATIS-1752, so we can optimize initialization.

Remove `MasterStateMachineSuiteJ#testSnapshotCleanup`, it already test cleanup snapshots and md5 files in
https://github.com/apache/ratis/blob/release-2.5.1/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java#L221

<br>

**links:**

https://issues.apache.org/jira/browse/RATIS-1752

https://github.com/apache/ratis/blob/release-2.5.1/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java#L105

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

local test.

Closes apache#1966 from xleoken/patch.

Authored-by: xleoken <[email protected]>
Signed-off-by: mingji <[email protected]>
  • Loading branch information
xleoken authored and FMX committed Oct 23, 2023
1 parent f1b48bd commit b3d35be
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 208 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,9 @@
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand All @@ -47,10 +39,8 @@
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.SnapshotRetentionPolicy;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
Expand All @@ -59,7 +49,6 @@
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.MD5FileUtil;
import org.slf4j.Logger;
Expand All @@ -72,94 +61,7 @@
public class StateMachine extends BaseStateMachine {
private static final Logger LOG = LoggerFactory.getLogger(StateMachine.class);

public static final Pattern MD5_REGEX = Pattern.compile("snapshot\\.(\\d+)_(\\d+)\\.md5");

private final SimpleStateMachineStorage storage =
new SimpleStateMachineStorage() {
/**
* we need to delete md5 file as the same time as snapshot file deleted, so we override the
* SimpleStateMachineStorage.cleanupOldSnapshots method, add delete md5 file action.
*
* @param snapshotRetentionPolicy snapshot retention policy
* @throws IOException
*/
@Override
public void cleanupOldSnapshots(SnapshotRetentionPolicy snapshotRetentionPolicy)
throws IOException {
if (snapshotRetentionPolicy != null
&& snapshotRetentionPolicy.getNumSnapshotsRetained() > 0) {
List<SingleFileSnapshotInfo> allSnapshotFiles = new ArrayList<>();
List<SingleFileSnapshotInfo> allMD5Files = new ArrayList<>();
try (DirectoryStream<Path> stream =
Files.newDirectoryStream(SimpleStateMachineStorageUtil.getSmDir(this).toPath())) {
for (Path path : stream) {
if (filePatternMatches(SNAPSHOT_REGEX, allSnapshotFiles, path)) {
continue;
} else {
filePatternMatches(MD5_REGEX, allMD5Files, path);
}
}
}
// first step, cleanup old snapshot and md5 file
SingleFileSnapshotInfo snapshotInfo =
cleanupOldFiles(
allSnapshotFiles,
snapshotRetentionPolicy.getNumSnapshotsRetained(),
false,
null);
// second step, cleanup only old md5 file
cleanupOldFiles(
allMD5Files, snapshotRetentionPolicy.getNumSnapshotsRetained(), true, snapshotInfo);
}
}

private boolean filePatternMatches(
Pattern pattern, List<SingleFileSnapshotInfo> result, Path filePath) {
Matcher md5Matcher = pattern.matcher(filePath.getFileName().toString());
if (md5Matcher.matches()) {
final long endIndex = Long.parseLong(md5Matcher.group(2));
final long term = Long.parseLong(md5Matcher.group(1));
final FileInfo fileInfo = new FileInfo(filePath, null);
result.add(new SingleFileSnapshotInfo(fileInfo, term, endIndex));
return true;
}
return false;
}

private SingleFileSnapshotInfo cleanupOldFiles(
List<SingleFileSnapshotInfo> inputFiles,
int retainedNum,
boolean onlyCleanupMD5Files,
SingleFileSnapshotInfo snapshotInfo) {
SingleFileSnapshotInfo result = null;
if (inputFiles.size() > retainedNum) {
inputFiles.sort(new RatisSnapshotFileComparator());
List<SingleFileSnapshotInfo> filesToBeCleaned =
inputFiles.subList(retainedNum, inputFiles.size());
result = filesToBeCleaned.get(0);
for (SingleFileSnapshotInfo fileInfo : filesToBeCleaned) {
if ((null != snapshotInfo && (fileInfo.getIndex() >= snapshotInfo.getIndex())
|| (onlyCleanupMD5Files && null == snapshotInfo))) {
continue;
}
File file = fileInfo.getFile().getPath().toFile();
if (onlyCleanupMD5Files) {
LOG.info("Deleting old md5 file at {}.", file.getAbsolutePath());
FileUtils.deleteFileQuietly(file);
} else {
File md5File = new File(file.getAbsolutePath() + MD5FileUtil.MD5_SUFFIX);
LOG.info(
"Deleting old snapshot at {}, md5 file at {}.",
file.getAbsolutePath(),
md5File.getAbsolutePath());
FileUtils.deleteFileQuietly(file);
FileUtils.deleteFileQuietly(md5File);
}
}
}
return result;
}
};
private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();

private final HARaftServer masterRatisServer;
private RaftGroupId raftGroupId;
Expand Down Expand Up @@ -419,14 +321,3 @@ public StateMachineStorage getStateMachineStorage() {
return this.storage;
}
}

/**
* Compare snapshot files based on transaction indexes. Copy from
* org.apache.ratis.statemachine.impl.SnapshotFileComparator
*/
class RatisSnapshotFileComparator implements Comparator<SingleFileSnapshotInfo> {
@Override
public int compare(SingleFileSnapshotInfo file1, SingleFileSnapshotInfo file2) {
return (int) (file2.getIndex() - file1.getIndex());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,11 @@

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Matcher;

import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.server.storage.StorageImplUtils;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.SnapshotRetentionPolicy;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorageUtil;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -44,7 +34,6 @@
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.quota.ResourceConsumption;
import org.apache.celeborn.common.util.JavaUtils;
import org.apache.celeborn.common.util.Utils;
import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;
import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.RequestSlotsRequest;
import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.ResourceRequest;
Expand Down Expand Up @@ -114,93 +103,6 @@ public void testTakeSnapshot() {
Assert.assertEquals(1, latest.getFiles().size());
}

@Test
public void testSnapshotCleanup() throws IOException {
StateMachine stateMachine = ratisServer.getMasterStateMachine();
SnapshotRetentionPolicy snapshotRetentionPolicy =
new SnapshotRetentionPolicy() {
@Override
public int getNumSnapshotsRetained() {
return 3;
}
};

File storageDir = Utils.createTempDir("./", "snapshot");

System.out.println(storageDir);
final RaftStorage storage =
StorageImplUtils.newRaftStorage(storageDir, null, RaftStorage.StartupOption.FORMAT, 100);
storage.initialize();
SimpleStateMachineStorage simpleStateMachineStorage =
(SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
simpleStateMachineStorage.init(storage);

List<Long> indices = new ArrayList<>();

// Create 5 snapshot files in storage dir.
for (int i = 0; i < 5; i++) {
final long term = ThreadLocalRandom.current().nextLong(3L, 10L);
final long index = ThreadLocalRandom.current().nextLong(100L, 1000L);
indices.add(index);
File snapshotFile = simpleStateMachineStorage.getSnapshotFile(term, index);
snapshotFile.createNewFile();
File md5File = new File(snapshotFile.getAbsolutePath() + ".md5");
md5File.createNewFile();
}

// following 2 md5 files will be deleted
File snapshotFile1 = simpleStateMachineStorage.getSnapshotFile(1, 1);
File md5File1 = new File(snapshotFile1.getAbsolutePath() + ".md5");
md5File1.createNewFile();
File snapshotFile2 = simpleStateMachineStorage.getSnapshotFile(5, 2);
File md5File2 = new File(snapshotFile2.getAbsolutePath() + ".md5");
md5File2.createNewFile();
// this md5 file will not be deleted
File snapshotFile3 = simpleStateMachineStorage.getSnapshotFile(11, 1001);
File md5File3 = new File(snapshotFile3.getAbsolutePath() + ".md5");
md5File3.createNewFile();

File stateMachineDir = SimpleStateMachineStorageUtil.getSmDir(simpleStateMachineStorage);
Assert.assertTrue(stateMachineDir.listFiles().length == 13);
simpleStateMachineStorage.cleanupOldSnapshots(snapshotRetentionPolicy);
File[] remainingFiles = stateMachineDir.listFiles();
Assert.assertTrue(remainingFiles.length == 7);

Collections.sort(indices);
Collections.reverse(indices);
List<Long> remainingIndices = indices.subList(0, 3);
// check snapshot file and its md5 file management
for (File file : remainingFiles) {
System.out.println(file.getName());
Matcher matcher = SimpleStateMachineStorage.SNAPSHOT_REGEX.matcher(file.getName());
if (matcher.matches()) {
Assert.assertTrue(remainingIndices.contains(Long.parseLong(matcher.group(2))));
Assert.assertTrue(new File(file.getAbsolutePath() + ".md5").exists());
}
}

// Attempt to clean up again should not delete any more files.
simpleStateMachineStorage.cleanupOldSnapshots(snapshotRetentionPolicy);
remainingFiles = stateMachineDir.listFiles();
Assert.assertTrue(remainingFiles.length == 7);

// Test with Retention disabled.
// Create 2 snapshot files in storage dir.
for (int i = 0; i < 2; i++) {
final long term = ThreadLocalRandom.current().nextLong(10L);
final long index = ThreadLocalRandom.current().nextLong(1000L);
indices.add(index);
File snapshotFile = simpleStateMachineStorage.getSnapshotFile(term, index);
snapshotFile.createNewFile();
File md5File = new File(snapshotFile.getAbsolutePath() + ".md5");
md5File.createNewFile();
}

simpleStateMachineStorage.cleanupOldSnapshots(new SnapshotRetentionPolicy() {});

Assert.assertTrue(stateMachineDir.listFiles().length == 11);
}

@Test
public void testObjSerde() throws IOException, InterruptedException {
CelebornConf conf = new CelebornConf();
Expand Down

0 comments on commit b3d35be

Please sign in to comment.