Skip to content

Commit

Permalink
Merge pull request bisq-network#4804 from chimp1984/improve-persisten…
Browse files Browse the repository at this point in the history
…ce-manager

Improve persistence manager
  • Loading branch information
sqrrm authored Nov 14, 2020
2 parents 627e067 + 755ea4d commit c1287ac
Showing 1 changed file with 76 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

Expand Down Expand Up @@ -80,43 +81,59 @@ public class PersistenceManager<T extends PersistableEnvelope> {
///////////////////////////////////////////////////////////////////////////////////////////

public static final Map<String, PersistenceManager<?>> ALL_PERSISTENCE_MANAGERS = new HashMap<>();
public static boolean FLUSH_ALL_DATA_TO_DISK_CALLED = false;

// We don't know from which thread we are called so we map back to user thread

// We require being called only once from the global shutdown routine. As the shutdown routine has a timeout
// and error condition where we call the method as well beside the standard path and it could be that those
// alternative code paths call our method after it was called already, so it is a valid but rare case.
// We add a guard to prevent repeated calls.
public static void flushAllDataToDisk(ResultHandler completeHandler) {
log.info("Start flushAllDataToDisk at shutdown");
AtomicInteger openInstances = new AtomicInteger(ALL_PERSISTENCE_MANAGERS.size());
// We don't know from which thread we are called so we map to user thread
UserThread.execute(() -> {
if (FLUSH_ALL_DATA_TO_DISK_CALLED) {
log.warn("We got flushAllDataToDisk called again. This can happen in some rare cases. We ignore the repeated call.");
return;
}

if (openInstances.get() == 0) {
log.info("flushAllDataToDisk completed");
UserThread.execute(completeHandler::handleResult);
}
FLUSH_ALL_DATA_TO_DISK_CALLED = true;

log.info("Start flushAllDataToDisk at shutdown");
AtomicInteger openInstances = new AtomicInteger(ALL_PERSISTENCE_MANAGERS.size());

if (openInstances.get() == 0) {
log.info("No PersistenceManager instances have been created yet.");
completeHandler.handleResult();
}

new HashSet<>(ALL_PERSISTENCE_MANAGERS.values()).forEach(persistenceManager -> {
// For Priority.HIGH data we want to write to disk in any case to be on the safe side if we might have missed
// a requestPersistence call after an important state update. Those are usually rather small data stores.
// Otherwise we only persist if requestPersistence was called since the last persist call.
if (persistenceManager.source.flushAtShutDown || persistenceManager.persistenceRequested) {
// We don't know from which thread we are called so we map back to user thread when calling persistNow
UserThread.execute(() -> {
new HashSet<>(ALL_PERSISTENCE_MANAGERS.values()).forEach(persistenceManager -> {
// For Priority.HIGH data we want to write to disk in any case to be on the safe side if we might have missed
// a requestPersistence call after an important state update. Those are usually rather small data stores.
// Otherwise we only persist if requestPersistence was called since the last persist call.
if (persistenceManager.source.flushAtShutDown || persistenceManager.persistenceRequested) {
// We always get our completeHandler called even if exceptions happen. In case a file write fails
// we still call our shutdown and count down routine as the completeHandler is triggered in any case.

// We get our result handler called from the write thread so we map back to user thread.
persistenceManager.persistNow(() ->
onWriteCompleted(completeHandler, openInstances, persistenceManager));
});
} else {
onWriteCompleted(completeHandler, openInstances, persistenceManager);
}
UserThread.execute(() -> onWriteCompleted(completeHandler, openInstances, persistenceManager)));
} else {
onWriteCompleted(completeHandler, openInstances, persistenceManager);
}
});
});
}

// We get called always from user thread here.
private static void onWriteCompleted(ResultHandler completeHandler,
AtomicInteger openInstances,
PersistenceManager<?> persistenceManager) {
persistenceManager.shutdown();
if (openInstances.decrementAndGet() == 0) {
log.info("flushAllDataToDisk completed");
UserThread.execute(completeHandler::handleResult);
completeHandler.handleResult();
}

}


Expand Down Expand Up @@ -166,6 +183,7 @@ public enum Source {
@Nullable
private Timer timer;
private ExecutorService writeToDiskExecutor;
public final AtomicBoolean initCalled = new AtomicBoolean(false);


///////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -190,6 +208,29 @@ public void initialize(T persistable, Source source) {
}

public void initialize(T persistable, String fileName, Source source) {
if (FLUSH_ALL_DATA_TO_DISK_CALLED) {
log.warn("We have started the shut down routine already. We ignore that initialize call.");
return;
}

if (ALL_PERSISTENCE_MANAGERS.containsKey(fileName)) {
RuntimeException runtimeException = new RuntimeException("We must not create multiple " +
"PersistenceManager instances for file " + fileName + ".");
// We want to get logged from where we have been called so lets print the stack trace.
runtimeException.printStackTrace();
throw runtimeException;
}

if (initCalled.get()) {
RuntimeException runtimeException = new RuntimeException("We must not call initialize multiple times. " +
"PersistenceManager for file: " + fileName + ".");
// We want to get logged from where we have been called so lets print the stack trace.
runtimeException.printStackTrace();
throw runtimeException;
}

initCalled.set(true);

this.persistable = persistable;
this.fileName = fileName;
this.source = source;
Expand Down Expand Up @@ -233,6 +274,11 @@ public void readPersisted(Consumer<T> resultHandler, Runnable orElse) {
* @param orElse Called if no file exists or reading of file failed.
*/
public void readPersisted(String fileName, Consumer<T> resultHandler, Runnable orElse) {
if (FLUSH_ALL_DATA_TO_DISK_CALLED) {
log.warn("We have started the shut down routine already. We ignore that readPersisted call.");
return;
}

new Thread(() -> {
T persisted = getPersisted(fileName);
if (persisted != null) {
Expand All @@ -252,6 +298,11 @@ public T getPersisted() {

@Nullable
public T getPersisted(String fileName) {
if (FLUSH_ALL_DATA_TO_DISK_CALLED) {
log.warn("We have started the shut down routine already. We ignore that getPersisted call.");
return null;
}

File storageFile = new File(dir, fileName);
if (!storageFile.exists()) {
return null;
Expand Down Expand Up @@ -288,6 +339,11 @@ public T getPersisted(String fileName) {
///////////////////////////////////////////////////////////////////////////////////////////

public void requestPersistence() {
if (FLUSH_ALL_DATA_TO_DISK_CALLED) {
log.warn("We have started the shut down routine already. We ignore that requestPersistence call.");
return;
}

persistenceRequested = true;

// We write to disk with a delay to avoid frequent write operations. Depending on the priority those delays
Expand Down

0 comments on commit c1287ac

Please sign in to comment.