Skip to content

Commit

Permalink
Handle interruption for BerkeleyJE backend
Browse files Browse the repository at this point in the history
Fixes JanusGraph#2120

Signed-off-by: Pavel Ershov <[email protected]>
  • Loading branch information
mad committed Sep 16, 2023
1 parent 6681799 commit de6260c
Show file tree
Hide file tree
Showing 10 changed files with 258 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Put;
import com.sleepycat.je.ReadOptions;
import com.sleepycat.je.ThreadInterruptedException;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.WriteOptions;
import org.janusgraph.diskstorage.BackendException;
Expand All @@ -50,6 +51,8 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static org.janusgraph.diskstorage.berkeleyje.BerkeleyJEStoreManager.convertThreadInterruptedException;

public class BerkeleyJEKeyValueStore implements OrderedKeyValueStore {

private static final Logger log = LoggerFactory.getLogger(BerkeleyJEKeyValueStore.class);
Expand All @@ -60,7 +63,7 @@ public class BerkeleyJEKeyValueStore implements OrderedKeyValueStore {
public static Function<Integer, Integer> ttlConverter = ttl -> (int) Math.max(1, Duration.of(ttl, ChronoUnit.SECONDS).toHours());


private final Database db;
private Database db;
private final String name;
private final BerkeleyJEStoreManager manager;
private boolean isOpen;
Expand Down Expand Up @@ -100,10 +103,16 @@ private static void closeCursor(StoreTransaction txh, Cursor cursor) {
((BerkeleyJETx) txh).closeCursor(cursor);
}

void reopen(final Database db) {
this.db = db;
}

@Override
public synchronized void close() throws BackendException {
try {
if(isOpen) db.close();
if (isOpen) db.close();
} catch (ThreadInterruptedException ignored) {
// environment will be closed
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand Down Expand Up @@ -153,6 +162,7 @@ public RecordIterator<KeyValueEntry> getSlice(KVQuery query, StoreTransaction tx
final DatabaseEntry foundKey = keyStart.as(ENTRY_FACTORY);
final DatabaseEntry foundData = new DatabaseEntry();
final Cursor cursor = openCursor(txh);
final ReadOptions readOptions = getReadOptions(txh);

return new RecordIterator<KeyValueEntry>() {
private OperationStatus status;
Expand Down Expand Up @@ -182,9 +192,9 @@ private KeyValueEntry getNextEntry() {
}
while (!selector.reachedLimit()) {
if (status == null) {
status = cursor.get(foundKey, foundData, Get.SEARCH_GTE, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
status = get(Get.SEARCH_GTE, readOptions);
} else {
status = cursor.get(foundKey, foundData, Get.NEXT, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
status = get(Get.NEXT, readOptions);
}
if (status != OperationStatus.SUCCESS) {
break;
Expand All @@ -203,6 +213,16 @@ private KeyValueEntry getNextEntry() {
return null;
}

private OperationStatus get(Get get, ReadOptions readOptions) {
try {
return cursor.get(foundKey, foundData, get, readOptions) == null
? OperationStatus.NOTFOUND
: OperationStatus.SUCCESS;
} catch (ThreadInterruptedException e) {
throw convertThreadInterruptedException(e);
}
}

@Override
public void close() {
closeCursor(txh, cursor);
Expand Down Expand Up @@ -237,13 +257,17 @@ public void insert(StaticBuffer key, StaticBuffer value, StoreTransaction txh, b
int convertedTtl = ttlConverter.apply(ttl);
writeOptions.setTTL(convertedTtl, TimeUnit.HOURS);
}
if (allowOverwrite) {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.OVERWRITE, writeOptions);
EnvironmentFailureException.assertState(result != null);
status = OperationStatus.SUCCESS;
} else {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.NO_OVERWRITE, writeOptions);
status = result == null ? OperationStatus.KEYEXIST : OperationStatus.SUCCESS;
try {
if (allowOverwrite) {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.OVERWRITE, writeOptions);
EnvironmentFailureException.assertState(result != null);
status = OperationStatus.SUCCESS;
} else {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.NO_OVERWRITE, writeOptions);
status = result == null ? OperationStatus.KEYEXIST : OperationStatus.SUCCESS;
}
} catch (ThreadInterruptedException e) {
throw convertThreadInterruptedException(e);
}

if (status != OperationStatus.SUCCESS) {
Expand All @@ -261,6 +285,8 @@ public void delete(StaticBuffer key, StoreTransaction txh) throws BackendExcepti
if (status != OperationStatus.SUCCESS && status != OperationStatus.NOTFOUND) {
throw new PermanentBackendException("Could not remove: " + status);
}
} catch (ThreadInterruptedException e) {
throw convertThreadInterruptedException(e);
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package org.janusgraph.diskstorage.berkeleyje;


import com.google.common.base.Preconditions;
import com.sleepycat.je.CacheMode;
import com.sleepycat.je.Database;
Expand All @@ -23,8 +22,10 @@
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.ThreadInterruptedException;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.TransactionConfig;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.BaseTransactionConfig;
import org.janusgraph.diskstorage.PermanentBackendException;
Expand All @@ -51,6 +52,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import static org.janusgraph.diskstorage.configuration.ConfigOption.disallowEmpty;

Expand Down Expand Up @@ -90,17 +92,14 @@ public class BerkeleyJEStoreManager extends LocalStoreManager implements Ordered

private final Map<String, BerkeleyJEKeyValueStore> stores;

protected Environment environment;
protected AtomicReference<Environment> environment = new AtomicReference<>();
protected final StoreFeatures features;

public BerkeleyJEStoreManager(Configuration configuration) throws BackendException {
super(configuration);
stores = new HashMap<>();

int cachePercentage = configuration.get(JVM_CACHE);
boolean sharedCache = configuration.get(SHARED_CACHE);
CacheMode cacheMode = ConfigOption.getEnumValue(configuration.get(CACHE_MODE), CacheMode.class);
initialize(cachePercentage, sharedCache, cacheMode);
initialize(configuration);

features = new StandardStoreFeatures.Builder()
.orderedScan(true)
Expand All @@ -117,7 +116,10 @@ public BerkeleyJEStoreManager(Configuration configuration) throws BackendExcepti
.build();
}

private void initialize(int cachePercent, final boolean sharedCache, final CacheMode cacheMode) throws BackendException {
private void initialize(Configuration configuration) throws BackendException {
int cachePercent = configuration.get(JVM_CACHE);
boolean sharedCache = configuration.get(SHARED_CACHE);
CacheMode cacheMode = ConfigOption.getEnumValue(configuration.get(CACHE_MODE), CacheMode.class);
try {
EnvironmentConfig envConfig = new EnvironmentConfig();
envConfig.setAllowCreate(true);
Expand All @@ -132,7 +134,7 @@ private void initialize(int cachePercent, final boolean sharedCache, final Cache
}

//Open the environment
environment = new Environment(directory, envConfig);
environment.set(new Environment(directory, envConfig));

} catch (DatabaseException e) {
throw new PermanentBackendException("Error during BerkeleyJE initialization: ", e);
Expand All @@ -152,39 +154,56 @@ public List<KeyRange> getLocalKeyPartition() throws BackendException {

@Override
public BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg) throws BackendException {
try {
Transaction tx = null;
boolean interrupted = false;
do {
try {
Transaction tx = null;

Configuration effectiveCfg =
Configuration effectiveCfg =
new MergedConfiguration(txCfg.getCustomOptions(), getStorageConfig());

if (transactional) {
TransactionConfig txnConfig = new TransactionConfig();
ConfigOption.getEnumValue(effectiveCfg.get(ISOLATION_LEVEL), IsolationLevel.class).configure(txnConfig);
tx = environment.beginTransaction(null, txnConfig);
} else {
if (txCfg instanceof TransactionConfiguration) {
if (!((TransactionConfiguration) txCfg).isSingleThreaded()) {
// Non-transactional cursors can't shared between threads, more info ThreadLocker.checkState
throw new PermanentBackendException("BerkeleyJE does not support non-transactional for multi threaded tx");
if (transactional) {
TransactionConfig txnConfig = new TransactionConfig();
ConfigOption.getEnumValue(effectiveCfg.get(ISOLATION_LEVEL), IsolationLevel.class).configure(txnConfig);
tx = environment.get().beginTransaction(null, txnConfig);
} else {
if (txCfg instanceof TransactionConfiguration) {
if (!((TransactionConfiguration) txCfg).isSingleThreaded()) {
// Non-transactional cursors can't shared between threads, more info ThreadLocker.checkState
throw new PermanentBackendException("BerkeleyJE does not support non-transactional for multi threaded tx");
}
}
}
}
BerkeleyJETx btx =
new BerkeleyJETx(
tx,
ConfigOption.getEnumValue(effectiveCfg.get(LOCK_MODE), LockMode.class),
ConfigOption.getEnumValue(effectiveCfg.get(CACHE_MODE), CacheMode.class),
txCfg);

if (log.isTraceEnabled()) {
log.trace("Berkeley tx created", new TransactionBegin(btx.toString()));
}
BerkeleyJETx btx =
new BerkeleyJETx(
tx,
ConfigOption.getEnumValue(effectiveCfg.get(LOCK_MODE), LockMode.class),
ConfigOption.getEnumValue(effectiveCfg.get(CACHE_MODE), CacheMode.class),
txCfg);

if (log.isTraceEnabled()) {
log.trace("Berkeley tx created", new TransactionBegin(btx.toString()));
}

return btx;
} catch (DatabaseException e) {
throw new PermanentBackendException("Could not start BerkeleyJE transaction", e);
}
return btx;
} catch (ThreadInterruptedException e) {
log.error("BerkeleyJE backend is interrupted! Try to recreate environment", e);
environment.get().close();
initialize(storageConfig);
for (Map.Entry<String, BerkeleyJEKeyValueStore> entry : stores.entrySet()) {
final String name = entry.getKey();
final BerkeleyJEKeyValueStore store = entry.getValue();
store.reopen(openDb(name));
}
if (!interrupted) {
interrupted = true;
} else {
throw new PermanentBackendException("Could not start BerkeleyJE transaction", e);
}
} catch (DatabaseException e) {
throw new PermanentBackendException("Could not start BerkeleyJE transaction", e);
}
} while (true);
}

@Override
Expand All @@ -194,19 +213,8 @@ public BerkeleyJEKeyValueStore openDatabase(String name) throws BackendException
return stores.get(name);
}
try {
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setReadOnly(false);
dbConfig.setAllowCreate(true);
dbConfig.setTransactional(transactional);
dbConfig.setKeyPrefixing(true);

if (batchLoading) {
dbConfig.setDeferredWrite(true);
}

Database db = environment.openDatabase(null, name, dbConfig);

log.debug("Opened database {}", name);
Database db = openDb(name);
log.trace("Opened database {}", name);

BerkeleyJEKeyValueStore store = new BerkeleyJEKeyValueStore(name, db, this);
stores.put(name, store);
Expand All @@ -216,6 +224,20 @@ public BerkeleyJEKeyValueStore openDatabase(String name) throws BackendException
}
}

private Database openDb(String name) {
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setReadOnly(false);
dbConfig.setAllowCreate(true);
dbConfig.setTransactional(transactional);
dbConfig.setKeyPrefixing(true);

if (batchLoading) {
dbConfig.setDeferredWrite(true);
}

return environment.get().openDatabase(null, name, dbConfig);
}

@Override
public void mutateMany(Map<String, KVMutation> mutations, StoreTransaction txh) throws BackendException {
for (Map.Entry<String,KVMutation> mutation : mutations.entrySet()) {
Expand Down Expand Up @@ -266,7 +288,7 @@ public void close() throws BackendException {
//Ignore
}
try {
environment.close();
environment.get().close();
} catch (DatabaseException e) {
throw new PermanentBackendException("Could not close BerkeleyJE database", e);
}
Expand All @@ -282,8 +304,8 @@ public void clearStorage() throws BackendException {
throw new IllegalStateException("Cannot delete store, since database is open: " + stores.keySet());
}

for (final String db : environment.getDatabaseNames()) {
environment.removeDatabase(NULL_TRANSACTION, db);
for (final String db : environment.get().getDatabaseNames()) {
environment.get().removeDatabase(NULL_TRANSACTION, db);
log.debug("Removed database {} (clearStorage)", db);
}
close();
Expand All @@ -292,7 +314,7 @@ public void clearStorage() throws BackendException {

@Override
public boolean exists() throws BackendException {
return !environment.getDatabaseNames().isEmpty();
return !environment.get().getDatabaseNames().isEmpty();
}

@Override
Expand Down Expand Up @@ -335,4 +357,10 @@ private TransactionBegin(String msg) {
super(msg);
}
}

static TraversalInterruptedException convertThreadInterruptedException(final ThreadInterruptedException e) {
final TraversalInterruptedException ex = new TraversalInterruptedException();
ex.initCause(e);
return ex;
}
}
Loading

1 comment on commit de6260c

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: de6260c Previous: ade0405 Ratio
org.janusgraph.JanusGraphSpeedBenchmark.basicAddAndDelete 14809.40436799877 ms/op 22286.726412919736 ms/op 0.66
org.janusgraph.GraphCentricQueryBenchmark.getVertices 1366.2336971544337 ms/op 1958.8027245756166 ms/op 0.70
org.janusgraph.MgmtOlapJobBenchmark.runClearIndex 219.2990697963768 ms/op 223.7761308695652 ms/op 0.98
org.janusgraph.MgmtOlapJobBenchmark.runReindex 462.9758427433333 ms/op 590.7720412917461 ms/op 0.78
org.janusgraph.JanusGraphSpeedBenchmark.basicCount 382.932076412383 ms/op 504.0852440155036 ms/op 0.76
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 8997.87455100404 ms/op 15023.262050752626 ms/op 0.60
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingEmitRepeatSteps 30922.56826872222 ms/op 50386.97165637 ms/op 0.61
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithSmallBatch 32043.48246509405 ms/op 54643.32990924667 ms/op 0.59
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.vertexCentricPropertiesFetching 61794.34378910001 ms/op 111581.5529258 ms/op 0.55
org.janusgraph.CQLMultiQueryBenchmark.getAllElementsTraversedFromOuterVertex 15280.04452038483 ms/op 25239.897441037225 ms/op 0.61
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithDoubleUnion 580.6126288690144 ms/op 871.065031951974 ms/op 0.67
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithUnlimitedBatch 7962.604027827262 ms/op 13445.342319330233 ms/op 0.59
org.janusgraph.CQLMultiQueryBenchmark.getNames 15231.144150024367 ms/op 24720.798408079998 ms/op 0.62
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesThreePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 10274.717827099546 ms/op 19139.401842954125 ms/op 0.54
org.janusgraph.CQLMultiQueryBenchmark.getLabels 13475.43272307516 ms/op 21065.935363511977 ms/op 0.64
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFilteredByAndStep 642.1220405227226 ms/op 902.4843701998306 ms/op 0.71
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFromMultiNestedRepeatStepStartingFromSingleVertex 21463.948493273638 ms/op 32304.605784971427 ms/op 0.66
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithCoalesceUsage 555.8038720789718 ms/op 840.8013422309493 ms/op 0.66
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 26384.057410603335 ms/op 49514.061807633334 ms/op 0.53
org.janusgraph.CQLMultiQueryBenchmark.getIdToOutVerticesProjection 377.68287940528774 ms/op 562.9583588020965 ms/op 0.67
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithUnlimitedBatch 31027.295506834154 ms/op 53092.57219631174 ms/op 0.58
org.janusgraph.CQLMultiQueryBenchmark.getNeighborNames 15208.48953423988 ms/op 24932.604541416746 ms/op 0.61
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingRepeatUntilSteps 16210.963051734285 ms/op 26574.47173412619 ms/op 0.61
org.janusgraph.CQLMultiQueryBenchmark.getAdjacentVerticesLocalCounts 15161.730696858613 ms/op 25245.709355865558 ms/op 0.60

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.