Skip to content

Commit

Permalink
Delete corrupted file to re-download from remote store (opensearch-pr…
Browse files Browse the repository at this point in the history
…oject#10891)

---------

Signed-off-by: Sachin Kale <[email protected]>
Co-authored-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale and Sachin Kale authored Oct 25, 2023
1 parent 91ac084 commit b5299f1
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4962,7 +4962,8 @@ private String copySegmentFiles(
return segmentNFile;
}

private boolean localDirectoryContains(Directory localDirectory, String file, long checksum) {
// Visible for testing
boolean localDirectoryContains(Directory localDirectory, String file, long checksum) throws IOException {
try (IndexInput indexInput = localDirectory.openInput(file, IOContext.DEFAULT)) {
if (checksum == CodecUtil.retrieveChecksum(indexInput)) {
return true;
Expand All @@ -4981,6 +4982,8 @@ private boolean localDirectoryContains(Directory localDirectory, String file, lo
logger.debug("File {} does not exist in local FS, downloading from remote store", file);
} catch (IOException e) {
logger.warn("Exception while reading checksum of file: {}, this can happen if file is corrupted", file);
// For any other exception on reading checksum, we delete the file to re-download again
localDirectory.deleteFile(file);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
package org.opensearch.index.shard;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
Expand All @@ -45,6 +46,7 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.tests.mockfile.ExtrasFS;
import org.apache.lucene.tests.store.BaseDirectoryWrapper;
import org.apache.lucene.util.BytesRef;
Expand Down Expand Up @@ -91,6 +93,7 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.core.util.FileSystemUtils;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
Expand Down Expand Up @@ -163,11 +166,13 @@
import org.junit.Assert;

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -4907,6 +4912,53 @@ public void testRecordsForceMerges() throws IOException {
closeShards(shard);
}

public void testLocalDirectoryContains() throws IOException {
IndexShard indexShard = newStartedShard(true);
int numDocs = between(1, 10);
for (int i = 0; i < numDocs; i++) {
indexDoc(indexShard, "_doc", Integer.toString(i));
}
flushShard(indexShard);
indexShard.store().incRef();
Directory localDirectory = indexShard.store().directory();
Path shardPath = indexShard.shardPath().getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME);
Path tempDir = createTempDir();
for (String file : localDirectory.listAll()) {
if (file.equals("write.lock") || file.startsWith("extra")) {
continue;
}
boolean corrupted = randomBoolean();
long checksum = 0;
try (IndexInput indexInput = localDirectory.openInput(file, IOContext.DEFAULT)) {
checksum = CodecUtil.retrieveChecksum(indexInput);
}
if (corrupted) {
Files.copy(shardPath.resolve(file), tempDir.resolve(file));
try (FileChannel raf = FileChannel.open(shardPath.resolve(file), StandardOpenOption.READ, StandardOpenOption.WRITE)) {
CorruptionUtils.corruptAt(shardPath.resolve(file), raf, (int) (raf.size() - 8));
}
}
if (corrupted == false) {
assertTrue(indexShard.localDirectoryContains(localDirectory, file, checksum));
} else {
assertFalse(indexShard.localDirectoryContains(localDirectory, file, checksum));
assertFalse(Files.exists(shardPath.resolve(file)));
}
}
try (Stream<Path> files = Files.list(tempDir)) {
files.forEach(p -> {
try {
Files.copy(p, shardPath.resolve(p.getFileName()));
} catch (IOException e) {
// Ignore
}
});
}
FileSystemUtils.deleteSubDirectories(tempDir);
indexShard.store().decRef();
closeShards(indexShard);
}

private void populateSampleRemoteSegmentStats(RemoteSegmentTransferTracker tracker) {
tracker.addUploadBytesStarted(30L);
tracker.addUploadBytesSucceeded(10L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public static void corruptFile(Random random, Path... files) throws IOException
}
}

static void corruptAt(Path path, FileChannel channel, int position) throws IOException {
public static void corruptAt(Path path, FileChannel channel, int position) throws IOException {
// read
channel.position(position);
long filePointer = channel.position();
Expand Down

0 comments on commit b5299f1

Please sign in to comment.