diff --git a/persistence/src/main/java/bisq/persistence/AtomicFileWriter.java b/persistence/src/main/java/bisq/persistence/AtomicFileWriter.java index 3c043420adb..39f1a6e2106 100644 --- a/persistence/src/main/java/bisq/persistence/AtomicFileWriter.java +++ b/persistence/src/main/java/bisq/persistence/AtomicFileWriter.java @@ -21,8 +21,7 @@ import java.io.File; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; @@ -40,25 +39,19 @@ public AtomicFileWriter(Path destinationPath, rollingFile = rollingFileWriter.getFilePath().toFile(); } - public synchronized void write(byte[] data) { - try { - CountDownLatch countDownLatch = rollingFileWriter.write(data); - boolean isSuccess = countDownLatch.await(45, TimeUnit.SECONDS); - if (!isSuccess) { - throw new AtomicFileWriteFailedException("Async atomic file write timeout triggered after 45 seconds."); - } - - isSuccess = rollingFile.renameTo(activeFile); - if (!isSuccess) { - throw new AtomicFileWriteFailedException("Couldn't rename rolling file to active file."); - } - - File tmpFile = activeFile; - activeFile = rollingFile; - rollingFile = tmpFile; + public synchronized CompletableFuture write(byte[] data) { + return rollingFileWriter.write(data) + .thenRunAsync(this::swapActiveAndRollingFile); + } - } catch (InterruptedException e) { - log.error("AtomicFileWriter got interrupted during write.", e); + private void swapActiveAndRollingFile() { + var isSuccess = rollingFile.renameTo(activeFile); + if (!isSuccess) { + throw new AtomicFileWriteFailedException("Couldn't rename rolling file to active file."); } + + File tmpFile = activeFile; + activeFile = rollingFile; + rollingFile = tmpFile; } } diff --git a/persistence/src/main/java/bisq/persistence/PersistenceFileWriter.java b/persistence/src/main/java/bisq/persistence/PersistenceFileWriter.java index 10c0e45b2ab..7b96e218f2d 100644 --- a/persistence/src/main/java/bisq/persistence/PersistenceFileWriter.java +++ b/persistence/src/main/java/bisq/persistence/PersistenceFileWriter.java @@ -19,7 +19,7 @@ import java.nio.file.Path; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; @@ -32,35 +32,39 @@ public PersistenceFileWriter(AsyncFileWriter asyncWriter, ExecutorService writeR this.writeRequestScheduler = writeRequestScheduler; } - public CountDownLatch write(byte[] data) { - CountDownLatch writeFinished = new CountDownLatch(1); - scheduleAsyncWrite(data, 0, data.length, writeFinished); - return writeFinished; + public CompletableFuture write(byte[] data) { + CompletableFuture completableFuture = new CompletableFuture<>(); + scheduleAsyncWrite(data, 0, data.length, completableFuture); + return completableFuture; } public Path getFilePath() { return asyncWriter.getFilePath(); } - private void scheduleAsyncWrite(byte[] data, int offset, int size, CountDownLatch writeFinished) { + private void scheduleAsyncWrite(byte[] data, int offset, int size, CompletableFuture completableFuture) { asyncWriter.write(data, offset) - .thenAcceptAsync(writeUntilEndAsync(data, offset, size, writeFinished), writeRequestScheduler); + .thenAcceptAsync(writeUntilEndAsync(data, offset, size, completableFuture), writeRequestScheduler) + .exceptionally(throwable -> { + completableFuture.completeExceptionally(throwable); + return null; + }); } private Consumer writeUntilEndAsync(byte[] data, int offset, int totalBytes, - CountDownLatch writeFinished) { + CompletableFuture completableFuture) { return writtenBytes -> { if (writtenBytes == totalBytes) { - writeFinished.countDown(); + completableFuture.complete(null); return; } int remainingBytes = totalBytes - writtenBytes; if (remainingBytes > 0) { int newOffset = offset + writtenBytes; - scheduleAsyncWrite(data, newOffset, remainingBytes, writeFinished); + scheduleAsyncWrite(data, newOffset, remainingBytes, completableFuture); } }; } diff --git a/persistence/src/test/java/bisq/persistence/AtomicFileWriterTests.java b/persistence/src/test/java/bisq/persistence/AtomicFileWriterTests.java index d6610b8b3a5..872d9187e7a 100644 --- a/persistence/src/test/java/bisq/persistence/AtomicFileWriterTests.java +++ b/persistence/src/test/java/bisq/persistence/AtomicFileWriterTests.java @@ -21,8 +21,13 @@ import java.nio.file.Path; import java.io.File; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -32,9 +37,10 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; -import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -45,13 +51,10 @@ public class AtomicFileWriterTests { private PersistenceFileWriter persistenceFileWriter; @Mock private File rollingFile = mock(File.class); - @Mock - private CountDownLatch countDownLatch; private AtomicFileWriter atomicFileWriter; @BeforeEach void setup(@TempDir Path tempDir, @Mock Path rollingFilePath) { - doReturn(countDownLatch).when(persistenceFileWriter).write(any()); doReturn(rollingFile).when(rollingFilePath).toFile(); doReturn(rollingFilePath).when(persistenceFileWriter).getFilePath(); @@ -60,25 +63,49 @@ void setup(@TempDir Path tempDir, @Mock Path rollingFilePath) { } @Test - void triggerFileWriteTimeout() throws InterruptedException { - doReturn(false).when(countDownLatch).await(anyLong(), any()); - assertThrows(AtomicFileWriteFailedException.class, - () -> atomicFileWriter.write(DATA)); + void writeFails() throws ExecutionException, InterruptedException, TimeoutException { + var ioException = new IOException(); + doReturn(CompletableFuture.failedFuture(ioException)) + .when(persistenceFileWriter).write(any()); + + CountDownLatch exceptionTriggeredLatch = new CountDownLatch(1); + atomicFileWriter.write(DATA) + .exceptionally(throwable -> { + assertThat(throwable.getCause(), is(ioException)); + exceptionTriggeredLatch.countDown(); + return null; + }) + .get(30, TimeUnit.SECONDS); + + assertThat(exceptionTriggeredLatch.getCount(), is(0L)); } @Test - void renameFailure() throws InterruptedException { - doReturn(true).when(countDownLatch).await(anyLong(), any()); + void renameFailure() throws InterruptedException, ExecutionException, TimeoutException { + doReturn(CompletableFuture.completedFuture(null)) + .when(persistenceFileWriter).write(any()); + doReturn(false).when(rollingFile).renameTo(any()); - assertThrows(AtomicFileWriteFailedException.class, - () -> atomicFileWriter.write(DATA)); + CountDownLatch exceptionTriggeredLatch = new CountDownLatch(1); + atomicFileWriter.write(DATA) + .exceptionally(throwable -> { + assertThat(throwable.getCause(), instanceOf(AtomicFileWriteFailedException.class)); + exceptionTriggeredLatch.countDown(); + return null; + }) + .get(30, TimeUnit.SECONDS); + + assertThat(exceptionTriggeredLatch.getCount(), is(0L)); } @Test - void write() throws InterruptedException { - doReturn(true).when(countDownLatch).await(anyLong(), any()); + void write() throws InterruptedException, ExecutionException, TimeoutException { + doReturn(CompletableFuture.completedFuture(null)) + .when(persistenceFileWriter).write(any()); + doReturn(true).when(rollingFile).renameTo(any()); - atomicFileWriter.write(DATA); + + atomicFileWriter.write(DATA).get(30, TimeUnit.SECONDS); } } diff --git a/persistence/src/test/java/bisq/persistence/PersistenceFileWriterTests.java b/persistence/src/test/java/bisq/persistence/PersistenceFileWriterTests.java index ffcd0fd3742..348f4b06a98 100644 --- a/persistence/src/test/java/bisq/persistence/PersistenceFileWriterTests.java +++ b/persistence/src/test/java/bisq/persistence/PersistenceFileWriterTests.java @@ -17,9 +17,11 @@ package bisq.persistence; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -30,8 +32,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import static java.util.concurrent.CompletableFuture.completedFuture; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.CoreMatchers.is; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doReturn; @@ -57,40 +57,37 @@ static void teardown() { } @Test - void writeInOneGo() throws InterruptedException { + void writeInOneGo() throws InterruptedException, ExecutionException, TimeoutException { doReturn(completedFuture(DATA.length)) .when(asyncWriter).write(any(), anyInt()); - boolean isSuccess = fileWriter.write(DATA) - .await(30, TimeUnit.SECONDS); + fileWriter.write(DATA) + .get(30, TimeUnit.SECONDS); - assertThat(isSuccess, is(true)); verify(asyncWriter, times(1)).write(any(), anyInt()); } @Test - void writeInTwoPhases() throws InterruptedException { + void writeInTwoPhases() throws InterruptedException, ExecutionException, TimeoutException { doReturn(completedFuture(25), completedFuture(75)) .when(asyncWriter).write(any(), anyInt()); - boolean isSuccess = fileWriter.write(DATA) - .await(30, TimeUnit.SECONDS); + fileWriter.write(DATA) + .get(30, TimeUnit.SECONDS); - assertThat(isSuccess, is(true)); verify(asyncWriter, times(2)).write(any(), anyInt()); } @Test - void writeInFivePhases() throws InterruptedException { + void writeInFivePhases() throws InterruptedException, ExecutionException, TimeoutException { doReturn(completedFuture(10), completedFuture(20), completedFuture(30), completedFuture(15), completedFuture(25)) .when(asyncWriter).write(any(), anyInt()); - boolean isSuccess = fileWriter.write(DATA) - .await(30, TimeUnit.SECONDS); + fileWriter.write(DATA) + .get(30, TimeUnit.SECONDS); - assertThat(isSuccess, is(true)); verify(asyncWriter, times(5)).write(any(), anyInt()); } }