Skip to content

Commit

Permalink
Disabled translog fsync in remote store path (opensearch-project#8288)
Browse files Browse the repository at this point in the history
Signed-off-by: Vikas Bansal <[email protected]>
  • Loading branch information
vikasvb90 authored Jul 11, 2023
1 parent a39f60f commit 3c1bca4
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,14 @@ public static void write(ChannelFactory factory, Path checkpointFile, Checkpoint
}
}

public static void write(FileChannel fileChannel, Path checkpointFile, Checkpoint checkpoint) throws IOException {
public static void write(FileChannel fileChannel, Path checkpointFile, Checkpoint checkpoint, boolean fsync) throws IOException {
byte[] bytes = createCheckpointBytes(checkpointFile, checkpoint);
Channels.writeToChannel(bytes, fileChannel, 0);
// no need to force metadata, file size stays the same and we did the full fsync
// when we first created the file, so the directory entry doesn't change as well
fileChannel.force(false);
if (fsync == true) {
// no need to force metadata, file size stays the same and we did the full fsync
// when we first created the file, so the directory entry doesn't change as well
fileChannel.force(false);
}
}

private static byte[] createCheckpointBytes(Path checkpointFile, Checkpoint checkpoint) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private static void tryReportOldVersionError(final Path path, final FileChannel
/**
* Writes this header with the latest format into the file channel
*/
void write(final FileChannel channel) throws IOException {
void write(final FileChannel channel, boolean fsync) throws IOException {
// This output is intentionally not closed because closing it will close the FileChannel.
@SuppressWarnings({ "IOResourceOpenedButNotSafelyClosed", "resource" })
final BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(
Expand All @@ -229,7 +229,9 @@ void write(final FileChannel channel) throws IOException {
// Checksum header
out.writeInt((int) out.getChecksum());
out.flush();
channel.force(true);
if (fsync == true) {
channel.force(true);
}
assert channel.position() == headerSizeInBytes : "Header is not fully written; header size ["
+ headerSizeInBytes
+ "], channel position ["
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {

private final Map<Long, Tuple<BytesReference, Exception>> seenSequenceNumbers;

private final Boolean remoteTranslogEnabled;

private TranslogWriter(
final ShardId shardId,
final Checkpoint initialCheckpoint,
Expand All @@ -123,7 +125,8 @@ private TranslogWriter(
TranslogHeader header,
final TragicExceptionHolder tragedy,
final LongConsumer persistedSequenceNumberConsumer,
final BigArrays bigArrays
final BigArrays bigArrays,
Boolean remoteTranslogEnabled
) throws IOException {
super(initialCheckpoint.generation, channel, path, header);
assert initialCheckpoint.offset == channel.position() : "initial checkpoint offset ["
Expand All @@ -148,6 +151,7 @@ private TranslogWriter(
this.bigArrays = bigArrays;
this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap<>() : null;
this.tragedy = tragedy;
this.remoteTranslogEnabled = remoteTranslogEnabled;
}

public static TranslogWriter create(
Expand All @@ -174,14 +178,14 @@ public static TranslogWriter create(
try {
checkpointChannel = channelFactory.open(checkpointFile, StandardOpenOption.WRITE);
final TranslogHeader header = new TranslogHeader(translogUUID, primaryTerm);
header.write(channel);
header.write(channel, !Boolean.TRUE.equals(remoteTranslogEnabled));
final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(
header.sizeInBytes(),
fileGeneration,
initialGlobalCheckpoint,
initialMinTranslogGen
);
writeCheckpoint(checkpointChannel, checkpointFile, checkpoint);
writeCheckpoint(checkpointChannel, checkpointFile, checkpoint, remoteTranslogEnabled);
final LongSupplier writerGlobalCheckpointSupplier;
if (Assertions.ENABLED) {
writerGlobalCheckpointSupplier = () -> {
Expand Down Expand Up @@ -209,7 +213,8 @@ public static TranslogWriter create(
header,
tragedy,
persistedSequenceNumberConsumer,
bigArrays
bigArrays,
remoteTranslogEnabled
);
} catch (Exception exception) {
// if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that
Expand Down Expand Up @@ -508,8 +513,10 @@ final boolean syncUpTo(long offset) throws IOException {
// now do the actual fsync outside of the synchronized block such that
// we can continue writing to the buffer etc.
try {
channel.force(false);
writeCheckpoint(checkpointChannel, checkpointPath, checkpointToSync);
if (!Boolean.TRUE.equals(remoteTranslogEnabled)) {
channel.force(false);
}
writeCheckpoint(checkpointChannel, checkpointPath, checkpointToSync, remoteTranslogEnabled);
} catch (final Exception ex) {
closeWithTragicEvent(ex);
throw ex;
Expand Down Expand Up @@ -603,9 +610,13 @@ protected void readBytes(ByteBuffer targetBuffer, long position) throws IOExcept
Channels.readFromFileChannelWithEofException(channel, position, targetBuffer);
}

private static void writeCheckpoint(final FileChannel fileChannel, final Path checkpointFile, final Checkpoint checkpoint)
throws IOException {
Checkpoint.write(fileChannel, checkpointFile, checkpoint);
private static void writeCheckpoint(
final FileChannel fileChannel,
final Path checkpointFile,
final Checkpoint checkpoint,
final Boolean remoteTranslogEnabled
) throws IOException {
Checkpoint.write(fileChannel, checkpointFile, checkpoint, !Boolean.TRUE.equals(remoteTranslogEnabled));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ private static void writeEmptyCheckpoint(Path filename, int translogLength, long
private static int writeEmptyTranslog(Path filename, String translogUUID) throws IOException {
try (FileChannel fc = FileChannel.open(filename, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW)) {
TranslogHeader header = new TranslogHeader(translogUUID, SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
header.write(fc);
header.write(fc, true);
return header.sizeInBytes();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1541,6 +1541,91 @@ ChannelFactory getChannelFactory() {
}
}

public void testTranslogWriterFsyncedWithLocalTranslog() throws IOException {
Path tempDir = createTempDir();
final TranslogConfig temp = getTranslogConfig(tempDir);
final TranslogConfig config = new TranslogConfig(
temp.getShardId(),
temp.getTranslogPath(),
temp.getIndexSettings(),
temp.getBigArrays(),
new ByteSizeValue(1, ByteSizeUnit.KB)
);

final Set<Long> persistedSeqNos = new HashSet<>();
final AtomicInteger translogFsyncCalls = new AtomicInteger();
final AtomicInteger checkpointFsyncCalls = new AtomicInteger();

final ChannelFactory channelFactory = (file, openOption) -> {
FileChannel delegate = FileChannel.open(file, openOption);
boolean success = false;
try {
// don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation
final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp");

final FileChannel channel;
if (isCkpFile) {
channel = new FilterFileChannel(delegate) {
@Override
public void force(boolean metaData) throws IOException {
checkpointFsyncCalls.incrementAndGet();
}
};
} else {
channel = new FilterFileChannel(delegate) {

@Override
public void force(boolean metaData) throws IOException {
translogFsyncCalls.incrementAndGet();
}
};
}
success = true;
return channel;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(delegate);
}
}
};

String translogUUID = Translog.createEmptyTranslog(
config.getTranslogPath(),
SequenceNumbers.NO_OPS_PERFORMED,
shardId,
channelFactory,
primaryTerm.get()
);

try (
Translog translog = new LocalTranslog(
config,
translogUUID,
new DefaultTranslogDeletionPolicy(-1, -1, 0),
() -> SequenceNumbers.NO_OPS_PERFORMED,
primaryTerm::get,
persistedSeqNos::add
) {
@Override
ChannelFactory getChannelFactory() {
return channelFactory;
}
}
) {
TranslogWriter writer = translog.getCurrent();
byte[] bytes = new byte[256];
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 1);
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 2);
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 3);
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 4);
writer.sync();
assertEquals(4, checkpointFsyncCalls.get());
assertEquals(3, translogFsyncCalls.get());
// Sequence numbers are marked as persisted after sync
assertThat(persistedSeqNos, contains(1L, 2L, 3L, 4L));
}
}

public void testTranslogWriterDoesNotBlockAddsOnWrite() throws IOException, InterruptedException {
Path tempDir = createTempDir();
final TranslogConfig config = getTranslogConfig(tempDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ private TranslogConfig getTranslogConfig(final Path path) {
// only randomize between nog age retention and a long one, so failures will have a chance of reproducing
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), randomBoolean() ? "-1ms" : "1h")
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), randomIntBetween(-1, 2048) + "b")
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.build();
return getTranslogConfig(path, settings);
Expand Down Expand Up @@ -1261,6 +1262,95 @@ ChannelFactory getChannelFactory() {
}
}

public void testTranslogWriterFsyncDisabledInRemoteFsTranslog() throws IOException {
Path tempDir = createTempDir();
final TranslogConfig temp = getTranslogConfig(tempDir);
final TranslogConfig config = new TranslogConfig(
temp.getShardId(),
temp.getTranslogPath(),
temp.getIndexSettings(),
temp.getBigArrays(),
new ByteSizeValue(1, ByteSizeUnit.KB)
);

final Set<Long> persistedSeqNos = new HashSet<>();
final AtomicInteger translogFsyncCalls = new AtomicInteger();
final AtomicInteger checkpointFsyncCalls = new AtomicInteger();

final ChannelFactory channelFactory = (file, openOption) -> {
FileChannel delegate = FileChannel.open(file, openOption);
boolean success = false;
try {
// don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation
final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp");

final FileChannel channel;
if (isCkpFile) {
channel = new FilterFileChannel(delegate) {
@Override
public void force(boolean metaData) throws IOException {
checkpointFsyncCalls.incrementAndGet();
}
};
} else {
channel = new FilterFileChannel(delegate) {

@Override
public void force(boolean metaData) throws IOException {
translogFsyncCalls.incrementAndGet();
}
};
}
success = true;
return channel;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(delegate);
}
}
};

String translogUUID = Translog.createEmptyTranslog(
config.getTranslogPath(),
SequenceNumbers.NO_OPS_PERFORMED,
shardId,
channelFactory,
primaryTerm.get()
);

try (
Translog translog = new RemoteFsTranslog(
config,
translogUUID,
new DefaultTranslogDeletionPolicy(-1, -1, 0),
() -> SequenceNumbers.NO_OPS_PERFORMED,
primaryTerm::get,
persistedSeqNos::add,
repository,
threadPool,
() -> Boolean.TRUE
) {
@Override
ChannelFactory getChannelFactory() {
return channelFactory;
}
}
) {
TranslogWriter writer = translog.getCurrent();
byte[] bytes = new byte[256];
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 1);
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 2);
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 3);
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 4);
writer.sync();
// Fsync is still enabled during empty translog creation.
assertEquals(2, checkpointFsyncCalls.get());
assertEquals(1, translogFsyncCalls.get());
// Sequence numbers are marked as persisted after sync
assertThat(persistedSeqNos, contains(1L, 2L, 3L, 4L));
}
}

public void testCloseIntoReader() throws IOException {
try (TranslogWriter writer = translog.createWriter(translog.currentFileGeneration() + 1)) {
final int numOps = randomIntBetween(8, 128);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void testCurrentHeaderVersion() throws Exception {
final long generation = randomNonNegativeLong();
final Path translogFile = createTempDir().resolve(Translog.getFilename(generation));
try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) {
outHeader.write(channel);
outHeader.write(channel, true);
assertThat(outHeader.sizeInBytes(), equalTo((int) channel.position()));
}
try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.READ)) {
Expand Down Expand Up @@ -165,7 +165,7 @@ public void testCorruptTranslogHeader() throws Exception {
final Path translogLocation = createTempDir();
final Path translogFile = translogLocation.resolve(Translog.getFilename(generation));
try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) {
outHeader.write(channel);
outHeader.write(channel, true);
assertThat(outHeader.sizeInBytes(), equalTo((int) channel.position()));
}
TestTranslog.corruptFile(logger, random(), translogFile, false);
Expand Down

0 comments on commit 3c1bca4

Please sign in to comment.