diff --git a/common/src/main/java/bisq/common/persistence/PersistenceManager.java b/common/src/main/java/bisq/common/persistence/PersistenceManager.java index c377c69a478..2f3c74f3e47 100644 --- a/common/src/main/java/bisq/common/persistence/PersistenceManager.java +++ b/common/src/main/java/bisq/common/persistence/PersistenceManager.java @@ -44,6 +44,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -80,43 +81,59 @@ public class PersistenceManager { /////////////////////////////////////////////////////////////////////////////////////////// public static final Map> ALL_PERSISTENCE_MANAGERS = new HashMap<>(); + public static boolean FLUSH_ALL_DATA_TO_DISK_CALLED = false; - // We don't know from which thread we are called so we map back to user thread + + // We require being called only once from the global shutdown routine. As the shutdown routine has a timeout + // and error condition where we call the method as well beside the standard path and it could be that those + // alternative code paths call our method after it was called already, so it is a valid but rare case. + // We add a guard to prevent repeated calls. public static void flushAllDataToDisk(ResultHandler completeHandler) { - log.info("Start flushAllDataToDisk at shutdown"); - AtomicInteger openInstances = new AtomicInteger(ALL_PERSISTENCE_MANAGERS.size()); + // We don't know from which thread we are called so we map to user thread + UserThread.execute(() -> { + if (FLUSH_ALL_DATA_TO_DISK_CALLED) { + log.warn("We got flushAllDataToDisk called again. This can happen in some rare cases. We ignore the repeated call."); + return; + } - if (openInstances.get() == 0) { - log.info("flushAllDataToDisk completed"); - UserThread.execute(completeHandler::handleResult); - } + FLUSH_ALL_DATA_TO_DISK_CALLED = true; + + log.info("Start flushAllDataToDisk at shutdown"); + AtomicInteger openInstances = new AtomicInteger(ALL_PERSISTENCE_MANAGERS.size()); + + if (openInstances.get() == 0) { + log.info("No PersistenceManager instances have been created yet."); + completeHandler.handleResult(); + } - new HashSet<>(ALL_PERSISTENCE_MANAGERS.values()).forEach(persistenceManager -> { - // For Priority.HIGH data we want to write to disk in any case to be on the safe side if we might have missed - // a requestPersistence call after an important state update. Those are usually rather small data stores. - // Otherwise we only persist if requestPersistence was called since the last persist call. - if (persistenceManager.source.flushAtShutDown || persistenceManager.persistenceRequested) { - // We don't know from which thread we are called so we map back to user thread when calling persistNow - UserThread.execute(() -> { + new HashSet<>(ALL_PERSISTENCE_MANAGERS.values()).forEach(persistenceManager -> { + // For Priority.HIGH data we want to write to disk in any case to be on the safe side if we might have missed + // a requestPersistence call after an important state update. Those are usually rather small data stores. + // Otherwise we only persist if requestPersistence was called since the last persist call. + if (persistenceManager.source.flushAtShutDown || persistenceManager.persistenceRequested) { // We always get our completeHandler called even if exceptions happen. In case a file write fails // we still call our shutdown and count down routine as the completeHandler is triggered in any case. + + // We get our result handler called from the write thread so we map back to user thread. persistenceManager.persistNow(() -> - onWriteCompleted(completeHandler, openInstances, persistenceManager)); - }); - } else { - onWriteCompleted(completeHandler, openInstances, persistenceManager); - } + UserThread.execute(() -> onWriteCompleted(completeHandler, openInstances, persistenceManager))); + } else { + onWriteCompleted(completeHandler, openInstances, persistenceManager); + } + }); }); } + // We get called always from user thread here. private static void onWriteCompleted(ResultHandler completeHandler, AtomicInteger openInstances, PersistenceManager persistenceManager) { persistenceManager.shutdown(); if (openInstances.decrementAndGet() == 0) { log.info("flushAllDataToDisk completed"); - UserThread.execute(completeHandler::handleResult); + completeHandler.handleResult(); } + } @@ -166,6 +183,7 @@ public enum Source { @Nullable private Timer timer; private ExecutorService writeToDiskExecutor; + public final AtomicBoolean initCalled = new AtomicBoolean(false); /////////////////////////////////////////////////////////////////////////////////////////// @@ -190,6 +208,29 @@ public void initialize(T persistable, Source source) { } public void initialize(T persistable, String fileName, Source source) { + if (FLUSH_ALL_DATA_TO_DISK_CALLED) { + log.warn("We have started the shut down routine already. We ignore that initialize call."); + return; + } + + if (ALL_PERSISTENCE_MANAGERS.containsKey(fileName)) { + RuntimeException runtimeException = new RuntimeException("We must not create multiple " + + "PersistenceManager instances for file " + fileName + "."); + // We want to get logged from where we have been called so lets print the stack trace. + runtimeException.printStackTrace(); + throw runtimeException; + } + + if (initCalled.get()) { + RuntimeException runtimeException = new RuntimeException("We must not call initialize multiple times. " + + "PersistenceManager for file: " + fileName + "."); + // We want to get logged from where we have been called so lets print the stack trace. + runtimeException.printStackTrace(); + throw runtimeException; + } + + initCalled.set(true); + this.persistable = persistable; this.fileName = fileName; this.source = source; @@ -233,6 +274,11 @@ public void readPersisted(Consumer resultHandler, Runnable orElse) { * @param orElse Called if no file exists or reading of file failed. */ public void readPersisted(String fileName, Consumer resultHandler, Runnable orElse) { + if (FLUSH_ALL_DATA_TO_DISK_CALLED) { + log.warn("We have started the shut down routine already. We ignore that readPersisted call."); + return; + } + new Thread(() -> { T persisted = getPersisted(fileName); if (persisted != null) { @@ -252,6 +298,11 @@ public T getPersisted() { @Nullable public T getPersisted(String fileName) { + if (FLUSH_ALL_DATA_TO_DISK_CALLED) { + log.warn("We have started the shut down routine already. We ignore that getPersisted call."); + return null; + } + File storageFile = new File(dir, fileName); if (!storageFile.exists()) { return null; @@ -288,6 +339,11 @@ public T getPersisted(String fileName) { /////////////////////////////////////////////////////////////////////////////////////////// public void requestPersistence() { + if (FLUSH_ALL_DATA_TO_DISK_CALLED) { + log.warn("We have started the shut down routine already. We ignore that requestPersistence call."); + return; + } + persistenceRequested = true; // We write to disk with a delay to avoid frequent write operations. Depending on the priority those delays