Skip to content

Commit

Permalink
fix: handle BerkeleyJE DB interruption [tp-tests]
Browse files Browse the repository at this point in the history
Co-authored-by: Pavel Ershov <[email protected]>

Signed-off-by: Tiến Nguyễn Khắc <[email protected]>
  • Loading branch information
tien authored and li-boxuan committed May 14, 2024
1 parent a8a2a5d commit 90b9694
Show file tree
Hide file tree
Showing 9 changed files with 259 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Put;
import com.sleepycat.je.ReadOptions;
import com.sleepycat.je.ThreadInterruptedException;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.WriteOptions;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.PermanentBackendException;
import org.janusgraph.diskstorage.StaticBuffer;
import org.janusgraph.diskstorage.TemporaryBackendException;
import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction;
import org.janusgraph.diskstorage.keycolumnvalue.keyvalue.KVQuery;
import org.janusgraph.diskstorage.keycolumnvalue.keyvalue.KeySelector;
Expand Down Expand Up @@ -60,10 +63,10 @@ public class BerkeleyJEKeyValueStore implements OrderedKeyValueStore {
public static Function<Integer, Integer> ttlConverter = ttl -> (int) Math.max(1, Duration.of(ttl, ChronoUnit.SECONDS).toHours());


private final Database db;
private volatile Database db;
private final String name;
private final BerkeleyJEStoreManager manager;
private boolean isOpen;
private volatile boolean isOpen;

public BerkeleyJEKeyValueStore(String n, Database data, BerkeleyJEStoreManager m) {
db = data;
Expand All @@ -75,6 +78,11 @@ public BerkeleyJEKeyValueStore(String n, Database data, BerkeleyJEStoreManager m
public DatabaseConfig getConfiguration() throws BackendException {
try {
return db.getConfig();
} catch (ThreadInterruptedException e) {
Thread.currentThread().interrupt();
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);
} catch (EnvironmentFailureException e) {
throw new TemporaryBackendException(e);
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand All @@ -95,15 +103,24 @@ private Cursor openCursor(StoreTransaction txh) throws BackendException {
return ((BerkeleyJETx) txh).openCursor(db);
}

private static void closeCursor(StoreTransaction txh, Cursor cursor) {
private static void closeCursor(StoreTransaction txh, Cursor cursor) throws BackendException {
Preconditions.checkArgument(txh!=null);
((BerkeleyJETx) txh).closeCursor(cursor);
}

public void reopen(final Database db) {
this.db = db;
}

@Override
public synchronized void close() throws BackendException {
try {
if(isOpen) db.close();
} catch (ThreadInterruptedException e) {
Thread.currentThread().interrupt();
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);
} catch (EnvironmentFailureException e) {
throw new TemporaryBackendException(e);
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand All @@ -127,6 +144,11 @@ public StaticBuffer get(StaticBuffer key, StoreTransaction txh) throws BackendEx
} else {
return null;
}
} catch (ThreadInterruptedException e) {
Thread.currentThread().interrupt();
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);
} catch (EnvironmentFailureException e) {
throw new TemporaryBackendException(e);
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand Down Expand Up @@ -161,7 +183,11 @@ public RecordIterator<KeyValueEntry> getSlice(KVQuery query, StoreTransaction tx
@Override
public boolean hasNext() {
if (current == null) {
current = getNextEntry();
try {
current = getNextEntry();
} catch (BackendException e) {
throw new RuntimeException(e);
}
}
return current != null;
}
Expand All @@ -176,16 +202,26 @@ public KeyValueEntry next() {
return next;
}

private KeyValueEntry getNextEntry() {
private KeyValueEntry getNextEntry() throws BackendException {
if (status != null && status != OperationStatus.SUCCESS) {
return null;
}
while (!selector.reachedLimit()) {
if (status == null) {
status = cursor.get(foundKey, foundData, Get.SEARCH_GTE, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
} else {
status = cursor.get(foundKey, foundData, Get.NEXT, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
try {
if (status == null) {
status = cursor.get(foundKey, foundData, Get.SEARCH_GTE, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
} else {
status = cursor.get(foundKey, foundData, Get.NEXT, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
}
} catch (ThreadInterruptedException e) {
Thread.currentThread().interrupt();
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);
} catch (EnvironmentFailureException e) {
throw new TemporaryBackendException(e);
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}

if (status != OperationStatus.SUCCESS) {
break;
}
Expand All @@ -205,7 +241,11 @@ private KeyValueEntry getNextEntry() {

@Override
public void close() {
closeCursor(txh, cursor);
try {
closeCursor(txh, cursor);
} catch (BackendException e) {
throw new RuntimeException(e);
}
}

@Override
Expand Down Expand Up @@ -237,13 +277,22 @@ public void insert(StaticBuffer key, StaticBuffer value, StoreTransaction txh, b
int convertedTtl = ttlConverter.apply(ttl);
writeOptions.setTTL(convertedTtl, TimeUnit.HOURS);
}
if (allowOverwrite) {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.OVERWRITE, writeOptions);
EnvironmentFailureException.assertState(result != null);
status = OperationStatus.SUCCESS;
} else {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.NO_OVERWRITE, writeOptions);
status = result == null ? OperationStatus.KEYEXIST : OperationStatus.SUCCESS;
try {
if (allowOverwrite) {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.OVERWRITE, writeOptions);
EnvironmentFailureException.assertState(result != null);
status = OperationStatus.SUCCESS;
} else {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.NO_OVERWRITE, writeOptions);
status = result == null ? OperationStatus.KEYEXIST : OperationStatus.SUCCESS;
}
} catch (ThreadInterruptedException e) {
Thread.currentThread().interrupt();
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);
} catch (EnvironmentFailureException e) {
throw new TemporaryBackendException(e);
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}

if (status != OperationStatus.SUCCESS) {
Expand All @@ -261,6 +310,11 @@ public void delete(StaticBuffer key, StoreTransaction txh) throws BackendExcepti
if (status != OperationStatus.SUCCESS && status != OperationStatus.NOTFOUND) {
throw new PermanentBackendException("Could not remove: " + status);
}
} catch (ThreadInterruptedException e) {
Thread.currentThread().interrupt();
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);
} catch (EnvironmentFailureException e) {
throw new TemporaryBackendException(e);
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.EnvironmentFailureException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.ThreadInterruptedException;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.TransactionConfig;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.BaseTransactionConfig;
import org.janusgraph.diskstorage.PermanentBackendException;
import org.janusgraph.diskstorage.StaticBuffer;
import org.janusgraph.diskstorage.TemporaryBackendException;
import org.janusgraph.diskstorage.common.LocalStoreManager;
import org.janusgraph.diskstorage.configuration.ConfigNamespace;
import org.janusgraph.diskstorage.configuration.ConfigOption;
Expand All @@ -48,9 +52,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static org.janusgraph.diskstorage.configuration.ConfigOption.disallowEmpty;

Expand Down Expand Up @@ -88,19 +93,16 @@ public class BerkeleyJEStoreManager extends LocalStoreManager implements Ordered
ConfigOption.Type.MASKABLE, String.class,
IsolationLevel.REPEATABLE_READ.toString(), disallowEmpty(String.class));

private final Map<String, BerkeleyJEKeyValueStore> stores;
private final ConcurrentMap<String, BerkeleyJEKeyValueStore> stores;

protected Environment environment;
protected volatile Environment environment;
protected final StoreFeatures features;

public BerkeleyJEStoreManager(Configuration configuration) throws BackendException {
super(configuration);
stores = new HashMap<>();
stores = new ConcurrentHashMap<>();

int cachePercentage = configuration.get(JVM_CACHE);
boolean sharedCache = configuration.get(SHARED_CACHE);
CacheMode cacheMode = ConfigOption.getEnumValue(configuration.get(CACHE_MODE), CacheMode.class);
initialize(cachePercentage, sharedCache, cacheMode);
initialize();

features = new StandardStoreFeatures.Builder()
.orderedScan(true)
Expand All @@ -111,14 +113,24 @@ public BerkeleyJEStoreManager(Configuration configuration) throws BackendExcepti
.scanTxConfig(GraphDatabaseConfiguration.buildGraphConfiguration()
.set(ISOLATION_LEVEL, IsolationLevel.READ_UNCOMMITTED.toString())
)
.supportsInterruption(false)
.supportsInterruption(true)
.cellTTL(true)
.optimisticLocking(false)
.build();
}

private void initialize(int cachePercent, final boolean sharedCache, final CacheMode cacheMode) throws BackendException {
private synchronized void initialize() throws BackendException {
try {
if (environment != null && environment.isValid()) {
return;
}

close(true);

int cachePercent = storageConfig.get(JVM_CACHE);
boolean sharedCache = storageConfig.get(SHARED_CACHE);
CacheMode cacheMode = ConfigOption.getEnumValue(storageConfig.get(CACHE_MODE), CacheMode.class);

EnvironmentConfig envConfig = new EnvironmentConfig();
envConfig.setAllowCreate(true);
envConfig.setTransactional(transactional);
Expand All @@ -131,15 +143,28 @@ private void initialize(int cachePercent, final boolean sharedCache, final Cache
envConfig.setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false");
}

//Open the environment
// Open the environment
environment = new Environment(directory, envConfig);

// Reopen any existing DB connections
for (String storeName : stores.keySet()) {
openDatabase(storeName, true);
}
} catch (DatabaseException e) {
throw new PermanentBackendException("Error during BerkeleyJE initialization: ", e);
}

}

private synchronized void reInitialize(DatabaseException exception) throws BackendException {
initialize();

if (exception instanceof ThreadInterruptedException) {
Thread.currentThread().interrupt();
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(exception);
}
}

@Override
public StoreFeatures getFeatures() {
return features;
Expand All @@ -150,8 +175,7 @@ public List<KeyRange> getLocalKeyPartition() throws BackendException {
throw new UnsupportedOperationException();
}

@Override
public BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg) throws BackendException {
private BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg, boolean retryEnvironmentFailure) throws BackendException {
try {
Transaction tx = null;

Expand Down Expand Up @@ -182,15 +206,27 @@ public BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg) throws B
}

return btx;
} catch (EnvironmentFailureException e) {
reInitialize(e);

if (retryEnvironmentFailure) {
return beginTransaction(txCfg, false);
}

throw new TemporaryBackendException("Could not start BerkeleyJE transaction", e);
} catch (DatabaseException e) {
throw new PermanentBackendException("Could not start BerkeleyJE transaction", e);
}
}

@Override
public BerkeleyJEKeyValueStore openDatabase(String name) throws BackendException {
public BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg) throws BackendException {
return beginTransaction(txCfg, true);
}

private BerkeleyJEKeyValueStore openDatabase(String name, boolean force, boolean retryEnvironmentFailure) throws BackendException {
Preconditions.checkNotNull(name);
if (stores.containsKey(name)) {
if (stores.containsKey(name) && !force) {
return stores.get(name);
}
try {
Expand All @@ -209,13 +245,34 @@ public BerkeleyJEKeyValueStore openDatabase(String name) throws BackendException
log.debug("Opened database {}", name);

BerkeleyJEKeyValueStore store = new BerkeleyJEKeyValueStore(name, db, this);
stores.put(name, store);
if (stores.containsKey(name)) {
stores.get(name).reopen(db);
} else {
stores.put(name, store);
}
return store;
} catch (EnvironmentFailureException e) {
reInitialize(e);

if (retryEnvironmentFailure) {
return openDatabase(name, force, false);
}

throw new TemporaryBackendException("Could not open BerkeleyJE data store", e);
} catch (DatabaseException e) {
throw new PermanentBackendException("Could not open BerkeleyJE data store", e);
}
}

private BerkeleyJEKeyValueStore openDatabase(String name, boolean force) throws BackendException {
return openDatabase(name, force, true);
}

@Override
public BerkeleyJEKeyValueStore openDatabase(String name) throws BackendException {
return openDatabase(name, false, true);
}

@Override
public void mutateMany(Map<String, KVMutation> mutations, StoreTransaction txh) throws BackendException {
for (Map.Entry<String,KVMutation> mutation : mutations.entrySet()) {
Expand Down Expand Up @@ -252,18 +309,16 @@ void removeDatabase(BerkeleyJEKeyValueStore db) {
log.debug("Removed database {}", name);
}


@Override
public void close() throws BackendException {
public void close(boolean force) throws BackendException {
if (environment != null) {
if (!stores.isEmpty())
if (!force && !stores.isEmpty())
throw new IllegalStateException("Cannot shutdown manager since some databases are still open");
try {
// TODO this looks like a race condition
//Wait just a little bit before closing so that independent transaction threads can clean up.
Thread.sleep(30);
} catch (InterruptedException e) {
//Ignore
Thread.currentThread().interrupt();
}
try {
environment.close();
Expand All @@ -274,6 +329,11 @@ public void close() throws BackendException {

}

@Override
public void close() throws BackendException {
close(false);
}

private static final Transaction NULL_TRANSACTION = null;

@Override
Expand Down
Loading

0 comments on commit 90b9694

Please sign in to comment.