Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve translog corruption detection #42744

Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public long getLastModifiedTime() throws IOException {
}

/**
* Reads a single opertation from the given location.
* Reads a single operation from the given location.
*/
Translog.Operation read(Translog.Location location) throws IOException {
assert location.generation == this.generation : "generation mismatch expected: " + generation + " got: " + location.generation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;

import java.io.EOFException;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
Expand Down Expand Up @@ -106,52 +107,57 @@ private static int headerSizeInBytes(int version, int uuidLength) {
* Read a translog header from the given path and file channel
*/
static TranslogHeader read(final String translogUUID, final Path path, final FileChannel channel) throws IOException {
// This input is intentionally not closed because closing it will close the FileChannel.
final BufferedChecksumStreamInput in =
new BufferedChecksumStreamInput(
try {
// This input is intentionally not closed because closing it will close the FileChannel.
final BufferedChecksumStreamInput in =
new BufferedChecksumStreamInput(
new InputStreamStreamInput(java.nio.channels.Channels.newInputStream(channel), channel.size()),
path.toString());
final int version;
try {
version = CodecUtil.checkHeader(new InputStreamDataInput(in), TRANSLOG_CODEC, VERSION_CHECKSUMS, VERSION_PRIMARY_TERM);
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) {
tryReportOldVersionError(path, channel);
throw new TranslogCorruptedException(path.toString(), "translog header corrupted", e);
}
if (version == VERSION_CHECKSUMS) {
throw new IllegalStateException("pre-2.0 translog found [" + path + "]");
}
if (version == VERSION_CHECKPOINTS) {
throw new IllegalStateException("pre-6.3 translog found [" + path + "]");
}
// Read the translogUUID
final int uuidLen = in.readInt();
if (uuidLen > channel.size()) {
throw new TranslogCorruptedException(
path.toString(),
"UUID length can't be larger than the translog");
}
final BytesRef uuid = new BytesRef(uuidLen);
uuid.length = uuidLen;
in.read(uuid.bytes, uuid.offset, uuid.length);
final BytesRef expectedUUID = new BytesRef(translogUUID);
if (uuid.bytesEquals(expectedUUID) == false) {
throw new TranslogCorruptedException(
final int version;
try {
version = CodecUtil.checkHeader(new InputStreamDataInput(in), TRANSLOG_CODEC, VERSION_CHECKSUMS, VERSION_PRIMARY_TERM);
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) {
tryReportOldVersionError(path, channel);
throw new TranslogCorruptedException(path.toString(), "translog header corrupted", e);
}
if (version == VERSION_CHECKSUMS) {
throw new IllegalStateException("pre-2.0 translog found [" + path + "]");
}
if (version == VERSION_CHECKPOINTS) {
throw new IllegalStateException("pre-6.3 translog found [" + path + "]");
}
// Read the translogUUID
final int uuidLen = in.readInt();
if (uuidLen > channel.size()) {
throw new TranslogCorruptedException(path.toString(), "UUID length can't be larger than the translog");
}
if (uuidLen <= 0) {
throw new TranslogCorruptedException(path.toString(), "UUID length must be positive");
}
final BytesRef uuid = new BytesRef(uuidLen);
uuid.length = uuidLen;
in.read(uuid.bytes, uuid.offset, uuid.length);
final BytesRef expectedUUID = new BytesRef(translogUUID);
if (uuid.bytesEquals(expectedUUID) == false) {
throw new TranslogCorruptedException(
path.toString(),
"expected shard UUID " + expectedUUID + " but got: " + uuid +
" this translog file belongs to a different translog");
" this translog file belongs to a different translog");
}
// Read the primary term
assert version == VERSION_PRIMARY_TERM;
final long primaryTerm = in.readLong();
// Verify the checksum
Translog.verifyChecksum(in);
assert primaryTerm >= 0 : "Primary term must be non-negative [" + primaryTerm + "]; translog path [" + path + "]";

final int headerSizeInBytes = headerSizeInBytes(version, uuid.length);
assert channel.position() == headerSizeInBytes :
"Header is not fully read; header size [" + headerSizeInBytes + "], position [" + channel.position() + "]";
return new TranslogHeader(translogUUID, primaryTerm, headerSizeInBytes);
} catch (EOFException e) {
throw new TranslogCorruptedException(path.toString(), "translog header truncated", e);
}
// Read the primary term
assert version == VERSION_PRIMARY_TERM;
final long primaryTerm = in.readLong();
// Verify the checksum
Translog.verifyChecksum(in);
assert primaryTerm >= 0 : "Primary term must be non-negative [" + primaryTerm + "]; translog path [" + path + "]";

final int headerSizeInBytes = headerSizeInBytes(version, uuid.length);
assert channel.position() == headerSizeInBytes :
"Header is not fully read; header size [" + headerSizeInBytes + "], position [" + channel.position() + "]";
return new TranslogHeader(translogUUID, primaryTerm, headerSizeInBytes);
}

private static void tryReportOldVersionError(final Path path, final FileChannel channel) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public Translog.Operation next() throws IOException {
return null;
}

protected Translog.Operation readOperation() throws IOException {
private Translog.Operation readOperation() throws IOException {
final int opSize = readSize(reusableBuffer, position);
reuse = checksummedStream(reusableBuffer, position, opSize, reuse);
Translog.Operation op = read(reuse);
Expand All @@ -93,15 +93,19 @@ public long sizeInBytes() {
* reads an operation at the given position into the given buffer.
*/
protected void readBytes(ByteBuffer buffer, long position) throws IOException {
if (position >= length) {
throw new EOFException("read requested past EOF. pos [" + position + "] end: [" + length + "], generation: [" +
getGeneration() + "], path: [" + path + "]");
}
if (position < getFirstOperationOffset()) {
throw new IOException("read requested before position of first ops. pos [" + position + "] first op on: [" +
getFirstOperationOffset() + "], generation: [" + getGeneration() + "], path: [" + path + "]");
try {
if (position >= length) {
throw new EOFException("read requested past EOF. pos [" + position + "] end: [" + length + "], generation: [" +
getGeneration() + "], path: [" + path + "]");
}
if (position < getFirstOperationOffset()) {
throw new IOException("read requested before position of first ops. pos [" + position + "] first op on: [" +
getFirstOperationOffset() + "], generation: [" + getGeneration() + "], path: [" + path + "]");
}
Channels.readFromFileChannelWithEofException(channel, position, buffer);
} catch (EOFException e) {
throw new TranslogCorruptedException(path.toString(), "translog truncated", e);
}
Channels.readFromFileChannelWithEofException(channel, position, buffer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ public void execute(Terminal terminal, ShardPath shardPath, Directory indexDirec

private boolean isTranslogClean(ShardPath shardPath, String translogUUID) throws IOException {
// perform clean check of translog instead of corrupted marker file
boolean clean = true;
try {
final Path translogPath = shardPath.resolveTranslog();
final long translogGlobalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID);
Expand All @@ -184,18 +183,19 @@ private boolean isTranslogClean(ShardPath shardPath, String translogUUID) throws
try (Translog translog = new Translog(translogConfig, translogUUID,
translogDeletionPolicy, () -> translogGlobalCheckpoint, () -> primaryTerm);
Translog.Snapshot snapshot = translog.newSnapshot()) {
//noinspection StatementWithEmptyBody we are just checking that we can iterate through the whole snapshot
while (snapshot.next() != null) {
// just iterate over snapshot
}
}
return true;
} catch (TranslogCorruptedException e) {
clean = false;
return false;
}
return clean;
}

/** Write a checkpoint file to the given location with the given generation */
static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration, long globalCheckpoint) throws IOException {
private static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration, long globalCheckpoint)
throws IOException {
Checkpoint emptyCheckpoint = Checkpoint.emptyTranslogCheckpoint(translogLength, translogGeneration,
globalCheckpoint, translogGeneration);
Checkpoint.write(FileChannel::open, filename, emptyCheckpoint,
Expand Down Expand Up @@ -234,7 +234,7 @@ private String deletingFilesDetails(Path translogPath, Set<Path> files) {
}

/** Return a Set of all files in a given directory */
public static Set<Path> filesInDirectory(Path directory) throws IOException {
private static Set<Path> filesInDirectory(Path directory) throws IOException {
Set<Path> files = new TreeSet<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(directory)) {
for (Path file : stream) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -67,9 +68,7 @@ public class RemoveCorruptedShardDataCommandTests extends IndexShardTestCase {

private ShardId shardId;
private ShardRouting routing;
private Path dataDir;
private Environment environment;
private Settings settings;
private ShardPath shardPath;
private IndexMetaData indexMetaData;
private IndexShard indexShard;
Expand All @@ -86,7 +85,7 @@ public void setup() throws IOException {
routing = TestShardRouting.newShardRouting(shardId, nodeId, true, ShardRoutingState.INITIALIZING,
RecoverySource.EmptyStoreRecoverySource.INSTANCE);

dataDir = createTempDir();
final Path dataDir = createTempDir();

environment =
TestEnvironment.newEnvironment(Settings.builder()
Expand All @@ -95,7 +94,7 @@ public void setup() throws IOException {

// create same directory structure as prod does
Files.createDirectories(dataDir);
settings = Settings.builder()
final Settings settings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
Expand Down Expand Up @@ -216,7 +215,7 @@ public void testCorruptedTranslog() throws Exception {
// close shard
closeShards(indexShard);

TestTranslog.corruptRandomTranslogFile(logger, random(), Arrays.asList(translogPath));
TestTranslog.corruptRandomTranslogFile(logger, random(), Collections.singletonList(translogPath));

// test corrupted shard
final IndexShard corruptedShard = reopenIndexShard(true);
Expand Down Expand Up @@ -282,7 +281,7 @@ public void testCorruptedBothIndexAndTranslog() throws Exception {
expectThrows(IndexShardRecoveryException.class, () -> newStartedShard(p -> corruptedShard, true));
closeShards(corruptedShard);
}
TestTranslog.corruptRandomTranslogFile(logger, random(), Arrays.asList(translogPath));
TestTranslog.corruptRandomTranslogFile(logger, random(), Collections.singletonList(translogPath));

final RemoveCorruptedShardDataCommand command = new RemoveCorruptedShardDataCommand();
final MockTerminal t = new MockTerminal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,16 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNot.not;

/**
* Helpers for testing translog.
*/
public class TestTranslog {
static final Pattern TRANSLOG_FILE_PATTERN = Pattern.compile("translog-(\\d+)\\.tlog");
private static final Pattern TRANSLOG_FILE_PATTERN = Pattern.compile("translog-(\\d+)\\.tlog");

public static void corruptRandomTranslogFile(Logger logger, Random random, Collection<Path> translogDirs) throws IOException {
for (Path translogDir : translogDirs) {
Expand All @@ -65,12 +67,11 @@ public static void corruptRandomTranslogFile(Logger logger, Random random, Colle

/**
* Corrupts random translog file (translog-N.tlog) from the given translog directory.
*
* @return a translog file which has been corrupted.
*/
public static Path corruptRandomTranslogFile(Logger logger, Random random, Path translogDir, long minGeneration) throws IOException {
public static void corruptRandomTranslogFile(Logger logger, Random random, Path translogDir, long minGeneration)
throws IOException {
Set<Path> candidates = new TreeSet<>(); // TreeSet makes sure iteration order is deterministic
logger.info("--> Translog dir [{}], minUsedTranslogGen [{}]", translogDir, minGeneration);
logger.info("--> corruptRandomTranslogFile: translogDir [{}], minUsedTranslogGen [{}]", translogDir, minGeneration);
try (DirectoryStream<Path> stream = Files.newDirectoryStream(translogDir)) {
for (Path item : stream) {
if (Files.isRegularFile(item)) {
Expand All @@ -81,41 +82,51 @@ public static Path corruptRandomTranslogFile(Logger logger, Random random, Path
}
}
}
assertThat(candidates, is(not(empty())));
assertThat("no translog files found in " + translogDir, candidates, is(not(empty())));

Path corruptedFile = RandomPicks.randomFrom(random, candidates);
corruptFile(logger, random, corruptedFile);
return corruptedFile;
}


static void corruptFile(Logger logger, Random random, Path fileToCorrupt) throws IOException {
try (FileChannel raf = FileChannel.open(fileToCorrupt, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
// read
raf.position(RandomNumbers.randomLongBetween(random, 0, raf.size() - 1));
long filePointer = raf.position();
ByteBuffer bb = ByteBuffer.wrap(new byte[1]);
raf.read(bb);
bb.flip();

// corrupt
byte oldValue = bb.get(0);
byte newValue = (byte) (oldValue + 1);
bb.put(0, newValue);

// rewrite
raf.position(filePointer);
raf.write(bb);
logger.info("--> corrupting file {} -- flipping at position {} from {} to {} file: {}",
fileToCorrupt, filePointer, Integer.toHexString(oldValue),
Integer.toHexString(newValue), fileToCorrupt);
static void corruptFile(Logger logger, Random random, Path fileToCorrupt) throws IOException {
final long fileSize = Files.size(fileToCorrupt);
assertThat("cannot corrupt empty file " + fileToCorrupt, fileSize, greaterThan(0L));

try (FileChannel fileChannel = FileChannel.open(fileToCorrupt, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
final long corruptPosition = RandomNumbers.randomLongBetween(random, 0, fileSize - 1);

if (random.nextBoolean()) {
// read
fileChannel.position(corruptPosition);
assertThat(fileChannel.position(), equalTo(corruptPosition));
ByteBuffer bb = ByteBuffer.wrap(new byte[1]);
fileChannel.read(bb);
bb.flip();

// corrupt
byte oldValue = bb.get(0);
byte newValue;
do {
newValue = (byte) random.nextInt(0x100);
} while (newValue == oldValue);
bb.put(0, newValue);

// rewrite
fileChannel.position(corruptPosition);
fileChannel.write(bb);
logger.info("--> corrupting file {} at position {} turning 0x{} into 0x{}", fileToCorrupt, corruptPosition,
Integer.toHexString(oldValue & 0xff), Integer.toHexString(newValue & 0xff));
} else {
logger.info("--> truncating file {} from length {} to length {}", fileToCorrupt, fileSize, corruptPosition);
fileChannel.truncate(corruptPosition);
}
}
}

/**
* Lists all existing commits in a given index path, then read the minimum translog generation that will be used in recoverFromTranslog.
*/
public static long minTranslogGenUsedInRecovery(Path translogPath) throws IOException {
private static long minTranslogGenUsedInRecovery(Path translogPath) throws IOException {
try (NIOFSDirectory directory = new NIOFSDirectory(translogPath.getParent().resolve("index"))) {
List<IndexCommit> commits = DirectoryReader.listCommits(directory);
final String translogUUID = commits.get(commits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY);
Expand Down
Loading