Skip to content

Commit

Permalink
merge: #14795
Browse files Browse the repository at this point in the history
14795: [Backport stable/8.3] Fix empty checksum file/snapshot corruption r=npepinpe a=backport-action

# Description
Backport of #14778 to `stable/8.3`.

relates to #14699
original author: `@npepinpe`

Co-authored-by: Nicolas Pepin-Perreault <[email protected]>
  • Loading branch information
zeebe-bors-camunda[bot] and npepinpe authored Oct 19, 2023
2 parents abd26f0 + e40a0be commit 5c143a7
Show file tree
Hide file tree
Showing 8 changed files with 330 additions and 36 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ jobs:
timeout-minutes: 30
steps:
- uses: actions/checkout@v4
- name: Install strace tests
run: sudo apt-get -qq update && sudo apt-get install -y strace
- uses: ./.github/actions/setup-zeebe
with:
go: false
Expand Down
6 changes: 6 additions & 0 deletions snapshot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-test-util</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ public final class FileBasedSnapshotStore extends Actor
// instead of using the implicit sort order.
static final String METADATA_FILE_NAME = "zeebe.metadata";
// first is the metadata and the second the received snapshot count
private static final String RECEIVING_DIR_FORMAT = "%s-%d";
private static final Logger LOGGER = LoggerFactory.getLogger(FileBasedSnapshotStore.class);
private static final String CHECKSUM_SUFFIX = ".checksum";
private static final String TMP_CHECKSUM_SUFFIX = ".tmp";
// the root snapshotsDirectory where all snapshots should be stored
private final Path snapshotsDirectory;
// the root snapshotsDirectory when pending snapshots should be stored
Expand Down Expand Up @@ -132,9 +132,7 @@ protected void onActorClosing() {

private FileBasedSnapshot loadLatestSnapshot(final Path snapshotDirectory) {
FileBasedSnapshot latestPersistedSnapshot = null;
try (final var stream =
Files.newDirectoryStream(
snapshotDirectory, p -> !p.getFileName().toString().endsWith(CHECKSUM_SUFFIX))) {
try (final var stream = Files.newDirectoryStream(snapshotDirectory, Files::isDirectory)) {
for (final var path : stream) {
final var snapshot = collectSnapshot(path);
if (snapshot != null) {
Expand Down Expand Up @@ -526,12 +524,14 @@ FileBasedSnapshot persistNewSnapshot(

try (final var ignored = snapshotMetrics.startPersistTimer()) {
// it's important to persist the checksum file only after the move is finished, since we use
// it
// as a marker file to guarantee the move was complete and not partial
// it as a marker file to guarantee the move was complete and not partial
final var destination = buildSnapshotDirectory(snapshotId);
final var checksumPath = buildSnapshotsChecksumPath(snapshotId);
final var tmpChecksumPath =
checksumPath.resolveSibling(checksumPath.getFileName().toString() + TMP_CHECKSUM_SUFFIX);
try {
SnapshotChecksum.persist(checksumPath, immutableChecksumsSFV);
SnapshotChecksum.persist(tmpChecksumPath, immutableChecksumsSFV);
FileUtil.moveDurably(tmpChecksumPath, checksumPath);
} catch (final IOException e) {
rollbackPartialSnapshot(destination);
throw new UncheckedIOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@

import io.camunda.zeebe.snapshots.ImmutableChecksumsSFV;
import io.camunda.zeebe.snapshots.MutableChecksumsSFV;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.UncheckedIOException;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.stream.Stream;

final class SnapshotChecksum {
Expand Down Expand Up @@ -62,8 +64,16 @@ private static boolean isNotMetadataFile(final Path file) {

public static void persist(final Path checksumPath, final ImmutableChecksumsSFV checksum)
throws IOException {
try (final var stream = new FileOutputStream(checksumPath.toFile())) {
checksum.write(stream);
// FileOutputStream#flush does nothing, so use a file channel to enforce it
try (final var channel =
FileChannel.open(
checksumPath,
StandardOpenOption.CREATE,
StandardOpenOption.WRITE,
StandardOpenOption.TRUNCATE_EXISTING);
final var output = Channels.newOutputStream(channel)) {
checksum.write(output);
channel.force(true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
import static org.assertj.core.api.Assertions.assertThat;

import io.camunda.zeebe.snapshots.ImmutableChecksumsSFV;
import io.camunda.zeebe.test.util.STracer;
import io.camunda.zeebe.test.util.STracer.Syscall;
import io.camunda.zeebe.test.util.asserts.strace.FSyncTraceAssert;
import io.camunda.zeebe.test.util.asserts.strace.STracerAssert;
import io.camunda.zeebe.util.FileUtil;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
Expand All @@ -21,24 +26,26 @@
import java.util.zip.CRC32C;
import java.util.zip.Checksum;
import org.agrona.IoUtil;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable;
import org.junit.jupiter.api.condition.EnabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.junit.jupiter.api.io.TempDir;

public class SnapshotChecksumTest {
public final class SnapshotChecksumTest {

@Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
private @TempDir Path temporaryFolder;

private Path singleFileSnapshot;
private Path multipleFileSnapshot;
private Path corruptedSnapshot;

@Before
public void setup() throws Exception {
singleFileSnapshot = temporaryFolder.newFolder().toPath();
multipleFileSnapshot = temporaryFolder.newFolder().toPath();
corruptedSnapshot = temporaryFolder.newFolder().toPath();
@BeforeEach
void setup() throws Exception {
singleFileSnapshot = createTempDir("single");
multipleFileSnapshot = createTempDir("multi");
corruptedSnapshot = createTempDir("corrupted");

createChunk(singleFileSnapshot, "file1.txt");

Expand All @@ -60,7 +67,7 @@ static void createChunk(final Path snapshot, final String chunkName) throws IOEx
}

@Test
public void shouldGenerateTheSameChecksumForOneFile() throws Exception {
void shouldGenerateTheSameChecksumForOneFile() throws Exception {
// given
final var expectedChecksum = SnapshotChecksum.calculate(singleFileSnapshot).getCombinedValue();

Expand All @@ -72,7 +79,7 @@ public void shouldGenerateTheSameChecksumForOneFile() throws Exception {
}

@Test
public void shouldGenerateDifferentChecksumWhenFileNameIsDifferent() throws Exception {
void shouldGenerateDifferentChecksumWhenFileNameIsDifferent() throws Exception {
// given
final var expectedChecksum = SnapshotChecksum.calculate(singleFileSnapshot).getCombinedValue();

Expand All @@ -85,7 +92,7 @@ public void shouldGenerateDifferentChecksumWhenFileNameIsDifferent() throws Exce
}

@Test
public void shouldGenerateTheSameChecksumForMultipleFiles() throws Exception {
void shouldGenerateTheSameChecksumForMultipleFiles() throws Exception {
// given
final var expectedChecksum =
SnapshotChecksum.calculate(multipleFileSnapshot).getCombinedValue();
Expand All @@ -98,7 +105,7 @@ public void shouldGenerateTheSameChecksumForMultipleFiles() throws Exception {
}

@Test
public void shouldGenerateDifferentChecksumForDifferentFiles() throws Exception {
void shouldGenerateDifferentChecksumForDifferentFiles() throws Exception {
// given
final var expectedChecksum = SnapshotChecksum.calculate(singleFileSnapshot).getCombinedValue();

Expand All @@ -110,7 +117,7 @@ public void shouldGenerateDifferentChecksumForDifferentFiles() throws Exception
}

@Test
public void shouldPersistChecksum() throws Exception {
void shouldPersistChecksum() throws Exception {
// given
final var expectedChecksum = SnapshotChecksum.calculate(multipleFileSnapshot);
final var checksumPath = multipleFileSnapshot.resolveSibling("checksum");
Expand All @@ -123,8 +130,32 @@ public void shouldPersistChecksum() throws Exception {
assertThat(actual.getCombinedValue()).isEqualTo(expectedChecksum.getCombinedValue());
}

@DisabledIfEnvironmentVariable(named = "GITHUB_ACTIONS", matches = "true")
@EnabledOnOs(OS.LINUX)
@Test
public void shouldDetectCorruptedSnapshot() throws IOException {
void shouldFlushOnPersist() throws Exception {
// given
final var traceFile = temporaryFolder.resolve("traceFile");
final var expectedChecksum = SnapshotChecksum.calculate(multipleFileSnapshot);
final var checksumPath = multipleFileSnapshot.resolveSibling("checksum");
final var tracer = STracer.traceFor(Syscall.FSYNC, traceFile);

// when
try (tracer) {
SnapshotChecksum.persist(checksumPath, expectedChecksum);
}

// then
STracerAssert.assertThat(tracer)
.fsyncTraces()
.hasSize(1)
.first(FSyncTraceAssert.factory())
.hasPath(checksumPath)
.isSuccessful();
}

@Test
void shouldDetectCorruptedSnapshot() throws IOException {
// given
final var expectedChecksum = SnapshotChecksum.calculate(corruptedSnapshot);
final var checksumPath = corruptedSnapshot.resolveSibling("checksum");
Expand All @@ -139,9 +170,9 @@ public void shouldDetectCorruptedSnapshot() throws IOException {
}

@Test
public void shouldCalculateSameChecksumOfLargeFile() throws IOException {
void shouldCalculateSameChecksumOfLargeFile() throws IOException {
// given
final var largeSnapshot = temporaryFolder.newFolder().toPath();
final var largeSnapshot = createTempDir("large");
final Path file = largeSnapshot.resolve("file");
final String largeData = "a".repeat(4 * IoUtil.BLOCK_SIZE + 100);
Files.writeString(file, largeData, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
Expand All @@ -159,12 +190,12 @@ public void shouldCalculateSameChecksumOfLargeFile() throws IOException {
}

@Test
public void shouldReadFormerSimpleChecksumFile() throws IOException {
void shouldReadFormerSimpleChecksumFile() throws IOException {
// given
final Path temp = temporaryFolder.newFolder().toPath();
final Path temp = createTempDir("temp");
final File tempFile = new File(temp.toFile(), "checksum");
final long expectedChecksum = 0xccaaffeeL;
try (final RandomAccessFile file = new RandomAccessFile(tempFile, "rw"); ) {
try (final RandomAccessFile file = new RandomAccessFile(tempFile, "rw")) {
file.writeLong(expectedChecksum);
}

Expand All @@ -177,9 +208,9 @@ public void shouldReadFormerSimpleChecksumFile() throws IOException {
}

@Test
public void shouldWriteTheNumberOfFiles() throws IOException {
void shouldWriteTheNumberOfFiles() throws IOException {
// given
final var folder = temporaryFolder.newFolder().toPath();
final var folder = createTempDir("folder");
createChunk(folder, "file1.txt");
createChunk(folder, "file2.txt");
createChunk(folder, "file3.txt");
Expand All @@ -195,9 +226,9 @@ public void shouldWriteTheNumberOfFiles() throws IOException {
}

@Test
public void shouldAddChecksumOfMetadataAtTheEnd() throws IOException {
void shouldAddChecksumOfMetadataAtTheEnd() throws IOException {
// given
final var folder = temporaryFolder.newFolder().toPath();
final var folder = createTempDir("folder");
createChunk(folder, "file1.txt");
createChunk(folder, "file2.txt");
createChunk(folder, "file3.txt");
Expand All @@ -216,4 +247,10 @@ public void shouldAddChecksumOfMetadataAtTheEnd() throws IOException {
assertThat(checksumCalculatedInSteps.getCombinedValue())
.isEqualTo(checksumCalculatedAtOnce.getCombinedValue());
}

private Path createTempDir(final String name) throws IOException {
final var path = temporaryFolder.resolve(name);
FileUtil.ensureDirectoryExists(path);
return path;
}
}
96 changes: 96 additions & 0 deletions test-util/src/main/java/io/camunda/zeebe/test/util/STracer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.test.util;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.regex.Pattern;

public final class STracer implements AutoCloseable {
private final Process process;
private final Path outputFile;

private STracer(final Process process, final Path outputFile) {
this.process = process;
this.outputFile = outputFile;
}

public static STracer traceFor(final Syscall syscall, final Path outputFile) throws IOException {
return traceFor(syscall, outputFile, false);
}

public static STracer traceFor(final Syscall syscall, final Path outputFile, final boolean debug)
throws IOException {
final var pid = ProcessHandle.current().pid();
final var output = outputFile.toAbsolutePath().toString();
final var builder =
new ProcessBuilder()
.command(
"strace",
"-y",
"-f",
"-e",
"trace=" + syscall.id,
"-o",
output,
"-p",
String.valueOf(pid));

if (debug) {
builder.inheritIO();
}

return new STracer(builder.start(), outputFile);
}

@Override
public void close() throws Exception {
process.destroy();
process.waitFor();
}

public List<FSyncTrace> fSyncTraces() throws IOException {
try (final var reader = Files.newBufferedReader(outputFile)) {
return reader.lines().filter(s -> s.contains("fsync")).map(FSyncTrace::of).toList();
}
}

public record FSyncTrace(int pid, int fd, Path path, int result) {
private static final Pattern FSYNC_CALL =
Pattern.compile(
"^(?<pid>[0-9]+)\\s+fsync\\((?<fd>[0-9]+)\\<(?<path>.+?)\\>\\)\\s+=\\s+(?<result>[0-9]+)$");

public static FSyncTrace of(final String straceLine) {
final var matcher = FSYNC_CALL.matcher(straceLine);
if (!matcher.find()) {
throw new IllegalArgumentException(
"Expected line to match format of 'PID fsync(FD<PATH>) = RESULT', but '%s' does not match"
.formatted(straceLine));
}

final var pid = Integer.parseInt(matcher.group("pid"));
final var fd = Integer.parseInt(matcher.group("fd"));
final var path = Path.of(matcher.group("path"));
final var result = Integer.parseInt(matcher.group("result"));

return new FSyncTrace(pid, fd, path, result);
}
}

public enum Syscall {
FSYNC("fsync");

private final String id;

Syscall(final String id) {
this.id = id;
}
}
}
Loading

0 comments on commit 5c143a7

Please sign in to comment.