Skip to content

Commit

Permalink
Improve translog corruption detection (#42980)
Browse files Browse the repository at this point in the history
Today we test for translog corruption by incrementing a byte by 1 somewhere in
a file, and verify that this leads to a `TranslogCorruptionException`.
However, we rely on _all_ corruptions leading to this exception in the
`RemoveCorruptedShardDataCommand`: this command fails if a translog file
corruption leads to a different kind of exception, and `EOFException` and
`NegativeArraySizeException` are both possible. This commit strengthens the
translog corruption detection tests by simulating the following:

- a random value is written
- the file is truncated

It also makes sure that we return a `TranslogCorruptionException` in all such
cases.

Fixes #42661
Backport of #42744
  • Loading branch information
DaveCTurner authored Jun 7, 2019
1 parent 479a1ee commit 5bc0dfc
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public long getLastModifiedTime() throws IOException {
}

/**
* Reads a single opertation from the given location.
* Reads a single operation from the given location.
*/
Translog.Operation read(Translog.Location location) throws IOException {
assert location.generation == this.generation : "generation mismatch expected: " + generation + " got: " + location.generation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;

import java.io.EOFException;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
Expand Down Expand Up @@ -108,56 +109,61 @@ private static int headerSizeInBytes(int version, int uuidLength) {
* Read a translog header from the given path and file channel
*/
static TranslogHeader read(final String translogUUID, final Path path, final FileChannel channel) throws IOException {
// This input is intentionally not closed because closing it will close the FileChannel.
final BufferedChecksumStreamInput in =
new BufferedChecksumStreamInput(
try {
// This input is intentionally not closed because closing it will close the FileChannel.
final BufferedChecksumStreamInput in =
new BufferedChecksumStreamInput(
new InputStreamStreamInput(java.nio.channels.Channels.newInputStream(channel), channel.size()),
path.toString());
final int version;
try {
version = CodecUtil.checkHeader(new InputStreamDataInput(in), TRANSLOG_CODEC, VERSION_CHECKSUMS, VERSION_PRIMARY_TERM);
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) {
tryReportOldVersionError(path, channel);
throw new TranslogCorruptedException(path.toString(), "translog header corrupted", e);
}
if (version == VERSION_CHECKSUMS) {
throw new IllegalStateException("pre-2.0 translog found [" + path + "]");
}
// Read the translogUUID
final int uuidLen = in.readInt();
if (uuidLen > channel.size()) {
throw new TranslogCorruptedException(
path.toString(),
"UUID length can't be larger than the translog");
}
final BytesRef uuid = new BytesRef(uuidLen);
uuid.length = uuidLen;
in.read(uuid.bytes, uuid.offset, uuid.length);
final BytesRef expectedUUID = new BytesRef(translogUUID);
if (uuid.bytesEquals(expectedUUID) == false) {
throw new TranslogCorruptedException(
final int version;
try {
version = CodecUtil.checkHeader(new InputStreamDataInput(in), TRANSLOG_CODEC, VERSION_CHECKSUMS, VERSION_PRIMARY_TERM);
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) {
tryReportOldVersionError(path, channel);
throw new TranslogCorruptedException(path.toString(), "translog header corrupted", e);
}
if (version == VERSION_CHECKSUMS) {
throw new IllegalStateException("pre-2.0 translog found [" + path + "]");
}
// Read the translogUUID
final int uuidLen = in.readInt();
if (uuidLen > channel.size()) {
throw new TranslogCorruptedException(path.toString(), "UUID length can't be larger than the translog");
}
if (uuidLen <= 0) {
throw new TranslogCorruptedException(path.toString(), "UUID length must be positive");
}
final BytesRef uuid = new BytesRef(uuidLen);
uuid.length = uuidLen;
in.read(uuid.bytes, uuid.offset, uuid.length);
final BytesRef expectedUUID = new BytesRef(translogUUID);
if (uuid.bytesEquals(expectedUUID) == false) {
throw new TranslogCorruptedException(
path.toString(),
"expected shard UUID " + expectedUUID + " but got: " + uuid +
" this translog file belongs to a different translog");
}
// Read the primary term
final long primaryTerm;
if (version == VERSION_PRIMARY_TERM) {
primaryTerm = in.readLong();
} else {
assert version == VERSION_CHECKPOINTS : "Unknown header version [" + version + "]";
primaryTerm = UNASSIGNED_PRIMARY_TERM;
}
// Verify the checksum
if (version >= VERSION_PRIMARY_TERM) {
Translog.verifyChecksum(in);
" this translog file belongs to a different translog");
}
// Read the primary term
final long primaryTerm;
if (version == VERSION_PRIMARY_TERM) {
primaryTerm = in.readLong();
} else {
assert version == VERSION_CHECKPOINTS : "Unknown header version [" + version + "]";
primaryTerm = UNASSIGNED_PRIMARY_TERM;
}
// Verify the checksum
if (version >= VERSION_PRIMARY_TERM) {
Translog.verifyChecksum(in);
}
assert primaryTerm >= 0 : "Primary term must be non-negative [" + primaryTerm + "]; translog path [" + path + "]";

final int headerSizeInBytes = headerSizeInBytes(version, uuid.length);
assert channel.position() == headerSizeInBytes :
"Header is not fully read; header size [" + headerSizeInBytes + "], position [" + channel.position() + "]";
return new TranslogHeader(translogUUID, primaryTerm, headerSizeInBytes);
} catch (EOFException e) {
throw new TranslogCorruptedException(path.toString(), "translog header truncated", e);
}
assert primaryTerm >= 0 : "Primary term must be non-negative [" + primaryTerm + "]; translog path [" + path + "]";

final int headerSizeInBytes = headerSizeInBytes(version, uuid.length);
assert channel.position() == headerSizeInBytes :
"Header is not fully read; header size [" + headerSizeInBytes + "], position [" + channel.position() + "]";
return new TranslogHeader(translogUUID, primaryTerm, headerSizeInBytes);
}

private static void tryReportOldVersionError(final Path path, final FileChannel channel) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public Translog.Operation next() throws IOException {
return null;
}

protected Translog.Operation readOperation() throws IOException {
private Translog.Operation readOperation() throws IOException {
final int opSize = readSize(reusableBuffer, position);
reuse = checksummedStream(reusableBuffer, position, opSize, reuse);
Translog.Operation op = read(reuse);
Expand All @@ -93,15 +93,19 @@ public long sizeInBytes() {
* reads an operation at the given position into the given buffer.
*/
protected void readBytes(ByteBuffer buffer, long position) throws IOException {
if (position >= length) {
throw new EOFException("read requested past EOF. pos [" + position + "] end: [" + length + "], generation: [" +
getGeneration() + "], path: [" + path + "]");
}
if (position < getFirstOperationOffset()) {
throw new IOException("read requested before position of first ops. pos [" + position + "] first op on: [" +
getFirstOperationOffset() + "], generation: [" + getGeneration() + "], path: [" + path + "]");
try {
if (position >= length) {
throw new EOFException("read requested past EOF. pos [" + position + "] end: [" + length + "], generation: [" +
getGeneration() + "], path: [" + path + "]");
}
if (position < getFirstOperationOffset()) {
throw new IOException("read requested before position of first ops. pos [" + position + "] first op on: [" +
getFirstOperationOffset() + "], generation: [" + getGeneration() + "], path: [" + path + "]");
}
Channels.readFromFileChannelWithEofException(channel, position, buffer);
} catch (EOFException e) {
throw new TranslogCorruptedException(path.toString(), "translog truncated", e);
}
Channels.readFromFileChannelWithEofException(channel, position, buffer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ public void execute(Terminal terminal, ShardPath shardPath, Directory indexDirec

private boolean isTranslogClean(ShardPath shardPath, String translogUUID) throws IOException {
// perform clean check of translog instead of corrupted marker file
boolean clean = true;
try {
final Path translogPath = shardPath.resolveTranslog();
final long translogGlobalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID);
Expand All @@ -184,18 +183,19 @@ private boolean isTranslogClean(ShardPath shardPath, String translogUUID) throws
try (Translog translog = new Translog(translogConfig, translogUUID,
translogDeletionPolicy, () -> translogGlobalCheckpoint, () -> primaryTerm);
Translog.Snapshot snapshot = translog.newSnapshot()) {
//noinspection StatementWithEmptyBody we are just checking that we can iterate through the whole snapshot
while (snapshot.next() != null) {
// just iterate over snapshot
}
}
return true;
} catch (TranslogCorruptedException e) {
clean = false;
return false;
}
return clean;
}

/** Write a checkpoint file to the given location with the given generation */
static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration, long globalCheckpoint) throws IOException {
private static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration, long globalCheckpoint)
throws IOException {
Checkpoint emptyCheckpoint = Checkpoint.emptyTranslogCheckpoint(translogLength, translogGeneration,
globalCheckpoint, translogGeneration);
Checkpoint.write(FileChannel::open, filename, emptyCheckpoint,
Expand Down Expand Up @@ -234,7 +234,7 @@ private String deletingFilesDetails(Path translogPath, Set<Path> files) {
}

/** Return a Set of all files in a given directory */
public static Set<Path> filesInDirectory(Path directory) throws IOException {
private static Set<Path> filesInDirectory(Path directory) throws IOException {
Set<Path> files = new TreeSet<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(directory)) {
for (Path file : stream) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -67,9 +68,7 @@ public class RemoveCorruptedShardDataCommandTests extends IndexShardTestCase {

private ShardId shardId;
private ShardRouting routing;
private Path dataDir;
private Environment environment;
private Settings settings;
private ShardPath shardPath;
private IndexMetaData indexMetaData;
private IndexShard indexShard;
Expand All @@ -86,7 +85,7 @@ public void setup() throws IOException {
routing = TestShardRouting.newShardRouting(shardId, nodeId, true, ShardRoutingState.INITIALIZING,
RecoverySource.EmptyStoreRecoverySource.INSTANCE);

dataDir = createTempDir();
final Path dataDir = createTempDir();

environment =
TestEnvironment.newEnvironment(Settings.builder()
Expand All @@ -96,7 +95,7 @@ public void setup() throws IOException {
// create same directory structure as prod does
final Path path = NodeEnvironment.resolveNodePath(dataDir, 0);
Files.createDirectories(path);
settings = Settings.builder()
final Settings settings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
Expand Down Expand Up @@ -217,7 +216,7 @@ public void testCorruptedTranslog() throws Exception {
// close shard
closeShards(indexShard);

TestTranslog.corruptRandomTranslogFile(logger, random(), Arrays.asList(translogPath));
TestTranslog.corruptRandomTranslogFile(logger, random(), Collections.singletonList(translogPath));

// test corrupted shard
final IndexShard corruptedShard = reopenIndexShard(true);
Expand Down Expand Up @@ -283,7 +282,7 @@ public void testCorruptedBothIndexAndTranslog() throws Exception {
expectThrows(IndexShardRecoveryException.class, () -> newStartedShard(p -> corruptedShard, true));
closeShards(corruptedShard);
}
TestTranslog.corruptRandomTranslogFile(logger, random(), Arrays.asList(translogPath));
TestTranslog.corruptRandomTranslogFile(logger, random(), Collections.singletonList(translogPath));

final RemoveCorruptedShardDataCommand command = new RemoveCorruptedShardDataCommand();
final MockTerminal t = new MockTerminal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,16 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNot.not;

/**
* Helpers for testing translog.
*/
public class TestTranslog {
static final Pattern TRANSLOG_FILE_PATTERN = Pattern.compile("translog-(\\d+)\\.tlog");
private static final Pattern TRANSLOG_FILE_PATTERN = Pattern.compile("translog-(\\d+)\\.tlog");

public static void corruptRandomTranslogFile(Logger logger, Random random, Collection<Path> translogDirs) throws IOException {
for (Path translogDir : translogDirs) {
Expand All @@ -65,12 +67,11 @@ public static void corruptRandomTranslogFile(Logger logger, Random random, Colle

/**
* Corrupts random translog file (translog-N.tlog) from the given translog directory.
*
* @return a translog file which has been corrupted.
*/
public static Path corruptRandomTranslogFile(Logger logger, Random random, Path translogDir, long minGeneration) throws IOException {
public static void corruptRandomTranslogFile(Logger logger, Random random, Path translogDir, long minGeneration)
throws IOException {
Set<Path> candidates = new TreeSet<>(); // TreeSet makes sure iteration order is deterministic
logger.info("--> Translog dir [{}], minUsedTranslogGen [{}]", translogDir, minGeneration);
logger.info("--> corruptRandomTranslogFile: translogDir [{}], minUsedTranslogGen [{}]", translogDir, minGeneration);
try (DirectoryStream<Path> stream = Files.newDirectoryStream(translogDir)) {
for (Path item : stream) {
if (Files.isRegularFile(item)) {
Expand All @@ -81,41 +82,51 @@ public static Path corruptRandomTranslogFile(Logger logger, Random random, Path
}
}
}
assertThat(candidates, is(not(empty())));
assertThat("no translog files found in " + translogDir, candidates, is(not(empty())));

Path corruptedFile = RandomPicks.randomFrom(random, candidates);
corruptFile(logger, random, corruptedFile);
return corruptedFile;
}


static void corruptFile(Logger logger, Random random, Path fileToCorrupt) throws IOException {
try (FileChannel raf = FileChannel.open(fileToCorrupt, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
// read
raf.position(RandomNumbers.randomLongBetween(random, 0, raf.size() - 1));
long filePointer = raf.position();
ByteBuffer bb = ByteBuffer.wrap(new byte[1]);
raf.read(bb);
bb.flip();

// corrupt
byte oldValue = bb.get(0);
byte newValue = (byte) (oldValue + 1);
bb.put(0, newValue);

// rewrite
raf.position(filePointer);
raf.write(bb);
logger.info("--> corrupting file {} -- flipping at position {} from {} to {} file: {}",
fileToCorrupt, filePointer, Integer.toHexString(oldValue),
Integer.toHexString(newValue), fileToCorrupt);
static void corruptFile(Logger logger, Random random, Path fileToCorrupt) throws IOException {
final long fileSize = Files.size(fileToCorrupt);
assertThat("cannot corrupt empty file " + fileToCorrupt, fileSize, greaterThan(0L));

try (FileChannel fileChannel = FileChannel.open(fileToCorrupt, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
final long corruptPosition = RandomNumbers.randomLongBetween(random, 0, fileSize - 1);

if (random.nextBoolean()) {
// read
fileChannel.position(corruptPosition);
assertThat(fileChannel.position(), equalTo(corruptPosition));
ByteBuffer bb = ByteBuffer.wrap(new byte[1]);
fileChannel.read(bb);
bb.flip();

// corrupt
byte oldValue = bb.get(0);
byte newValue;
do {
newValue = (byte) random.nextInt(0x100);
} while (newValue == oldValue);
bb.put(0, newValue);

// rewrite
fileChannel.position(corruptPosition);
fileChannel.write(bb);
logger.info("--> corrupting file {} at position {} turning 0x{} into 0x{}", fileToCorrupt, corruptPosition,
Integer.toHexString(oldValue & 0xff), Integer.toHexString(newValue & 0xff));
} else {
logger.info("--> truncating file {} from length {} to length {}", fileToCorrupt, fileSize, corruptPosition);
fileChannel.truncate(corruptPosition);
}
}
}

/**
* Lists all existing commits in a given index path, then read the minimum translog generation that will be used in recoverFromTranslog.
*/
public static long minTranslogGenUsedInRecovery(Path translogPath) throws IOException {
private static long minTranslogGenUsedInRecovery(Path translogPath) throws IOException {
try (NIOFSDirectory directory = new NIOFSDirectory(translogPath.getParent().resolve("index"))) {
List<IndexCommit> commits = DirectoryReader.listCommits(directory);
final String translogUUID = commits.get(commits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY);
Expand Down
Loading

0 comments on commit 5bc0dfc

Please sign in to comment.