diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/locking/consistentkey/ConsistentKeyLocker.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/locking/consistentkey/ConsistentKeyLocker.java index 1e3f28edf1..39ff0a35d6 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/locking/consistentkey/ConsistentKeyLocker.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/locking/consistentkey/ConsistentKeyLocker.java @@ -382,13 +382,20 @@ private WriteResult tryWriteLockOnce(StaticBuffer key, StaticBuffer del, StoreTr final Timer writeTimer = times.getTimer().start(); StaticBuffer newLockCol = serializer.toLockCol(writeTimer.getStartTime(), rid, times); Entry newLockEntry = StaticArrayEntry.of(newLockCol, zeroBuf); + StoreTransaction newTx = null; try { - final StoreTransaction newTx = overrideTimestamp(txh, writeTimer.getStartTime()); + newTx = overrideTimestamp(txh, writeTimer.getStartTime()); + store.mutate(key, Collections.singletonList(newLockEntry), null == del ? KeyColumnValueStore.NO_DELETIONS : Collections.singletonList(del), newTx); + + newTx.commit(); + newTx = null; } catch (BackendException e) { log.debug("Lock write attempt failed with exception", e); t = e; + } finally { + rollbackIfNotNull(newTx); } writeTimer.stop(); @@ -398,11 +405,18 @@ private WriteResult tryWriteLockOnce(StaticBuffer key, StaticBuffer del, StoreTr private WriteResult tryDeleteLockOnce(StaticBuffer key, StaticBuffer col, StoreTransaction txh) { Throwable t = null; final Timer delTimer = times.getTimer().start(); + StoreTransaction newTx = null; try { - final StoreTransaction newTx = overrideTimestamp(txh, delTimer.getStartTime()); + newTx = overrideTimestamp(txh, delTimer.getStartTime()); + store.mutate(key, ImmutableList.of(), Collections.singletonList(col), newTx); + + newTx.commit(); + newTx = null; } catch (BackendException e) { t = e; + } finally { + rollbackIfNotNull(newTx); } delTimer.stop(); @@ -533,9 +547,13 @@ private void checkSeniority(KeyColumn target, ConsistentKeyLockStatus ls, protected void deleteSingleLock(KeyColumn kc, ConsistentKeyLockStatus ls, StoreTransaction tx) { List deletions = ImmutableList.of(serializer.toLockCol(ls.getWriteTimestamp(), rid, times)); for (int i = 0; i < lockRetryCount; i++) { + StoreTransaction newTx = null; try { - StoreTransaction newTx = overrideTimestamp(tx, times.getTime()); + newTx = overrideTimestamp(tx, times.getTime()); store.mutate(serializer.toLockKey(kc.getKey(), kc.getColumn()), ImmutableList.of(), deletions, newTx); + + newTx.commit(); + newTx = null; return; } catch (TemporaryBackendException e) { log.warn("Temporary storage exception while deleting lock", e); @@ -543,6 +561,8 @@ protected void deleteSingleLock(KeyColumn kc, ConsistentKeyLockStatus ls, StoreT } catch (BackendException e) { log.error("Storage exception while deleting lock", e); return; // give up on this lock + } finally { + rollbackIfNotNull(newTx); } } } @@ -554,6 +574,21 @@ private StoreTransaction overrideTimestamp(final StoreTransaction tx, return manager.beginTransaction(newCfg); } + private void rollbackIfNotNull(StoreTransaction tx) { + if (tx != null) { + try { + if (log.isDebugEnabled()) { + log.debug("Transaction is still open! Rolling back: " + tx.toString(), new Throwable()); + } + + tx.rollback(); + } catch (Throwable excp) { + log.error("Failed to rollback transaction " + tx.toString() + ". The transaction may be leaked.", excp); + } + } + + } + private static class WriteResult { private final Duration duration; private final Instant writeTimestamp; diff --git a/janusgraph-test/src/test/java/org/janusgraph/diskstorage/locking/ConsistentKeyLockerTest.java b/janusgraph-test/src/test/java/org/janusgraph/diskstorage/locking/ConsistentKeyLockerTest.java index d216baafd6..04784d8a86 100644 --- a/janusgraph-test/src/test/java/org/janusgraph/diskstorage/locking/ConsistentKeyLockerTest.java +++ b/janusgraph-test/src/test/java/org/janusgraph/diskstorage/locking/ConsistentKeyLockerTest.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; +import org.easymock.IAnswer; import org.janusgraph.diskstorage.locking.consistentkey.*; import org.janusgraph.diskstorage.util.time.Timer; @@ -77,10 +78,10 @@ public class ConsistentKeyLockerTest { private final StaticBuffer otherLockRid = new StaticArrayBuffer(new byte[]{(byte) 64}); private final StaticBuffer defaultLockVal = BufferUtil.getIntBuffer(0); // maybe refactor... - private StoreTransaction defaultTx; + private TestTrxImpl defaultTx; private Configuration defaultTxCustomOpts; - private StoreTransaction otherTx; + private TestTrxImpl otherTx; private Configuration otherTxCustomOpts; private final Duration defaultWaitNS = Duration.ofNanos(100 * 1000 * 1000); @@ -109,27 +110,35 @@ public void setupMocks() throws BackendException, NoSuchMethodException, Securit manager = relaxedCtrl.createMock(StoreManager.class); - defaultTx = relaxedCtrl.createMock(StoreTransaction.class); BaseTransactionConfig defaultTxCfg = relaxedCtrl.createMock(BaseTransactionConfig.class); + defaultTx = new TestTrxImpl(defaultTxCfg); defaultTxCustomOpts = relaxedCtrl.createMock(Configuration.class); - expect(defaultTx.getConfiguration()).andReturn(defaultTxCfg).anyTimes(); expect(defaultTxCfg.getGroupName()).andReturn("default").anyTimes(); expect(defaultTxCfg.getCustomOptions()).andReturn(defaultTxCustomOpts).anyTimes(); final Comparator defaultTxCfgChecker = (actual, ignored) -> actual.getCustomOptions() == defaultTxCustomOpts ? 0 : -1; expect(manager.beginTransaction(cmp(null, defaultTxCfgChecker, LogicalOperator.EQUAL))) - .andReturn(defaultTx).anyTimes(); + .andAnswer(new IAnswer() { + @Override + public StoreTransaction answer() throws Throwable { + return defaultTx.open(); + } + }).anyTimes(); - otherTx = relaxedCtrl.createMock(StoreTransaction.class); BaseTransactionConfig otherTxCfg = relaxedCtrl.createMock(BaseTransactionConfig.class); + otherTx = new TestTrxImpl(otherTxCfg); otherTxCustomOpts = relaxedCtrl.createMock(Configuration.class); - expect(otherTx.getConfiguration()).andReturn(otherTxCfg).anyTimes(); expect(otherTxCfg.getGroupName()).andReturn("other").anyTimes(); expect(otherTxCfg.getCustomOptions()).andReturn(otherTxCustomOpts).anyTimes(); final Comparator otherTxCfgChecker = (actual, ignored) -> actual.getCustomOptions() == otherTxCustomOpts ? 0 : -1; expect(manager.beginTransaction(cmp(null, otherTxCfgChecker, LogicalOperator.EQUAL))) - .andReturn(otherTx).anyTimes(); + .andAnswer(new IAnswer() { + @Override + public StoreTransaction answer() throws Throwable { + return otherTx.open(); + } + }).anyTimes(); /* @@ -164,6 +173,9 @@ public void setupMocks() throws BackendException, NoSuchMethodException, Securit public void verifyMocks() { ctrl.verify(); relaxedCtrl.verify(); + + assertFalse(defaultTx.isOpen(), "Transaction leak found: openCount=" + defaultTx.getOpenCount() + ", commitCount=" + defaultTx.getCommitCount() + ", rollbackCount=" + defaultTx.getRollbackCount()); + assertFalse(otherTx.isOpen(), "Transaction leak found: openCount=" + otherTx.getOpenCount() + ", commitCount=" + otherTx.getCommitCount() + ", rollbackCount=" + otherTx.getRollbackCount()); } /** @@ -1202,4 +1214,46 @@ public long getTime(Instant timestamp) { return timestamp.getEpochSecond() * 1000000000L + timestamp.getNano(); } } + + private static class TestTrxImpl implements StoreTransaction { + private final BaseTransactionConfig trxConfig; + private int openCount = 0; + private int commitCount = 0; + private int rollbackCount = 0; + + public TestTrxImpl(BaseTransactionConfig trxConfig) { + this.trxConfig = trxConfig; + } + + public boolean isOpen() { + return openCount > (commitCount + rollbackCount); + } + + public int getOpenCount() { return openCount; } + + public int getCommitCount() { return commitCount; } + + public int getRollbackCount() { return rollbackCount; } + + public StoreTransaction open() { + openCount++; + + return this; + } + + @Override + public BaseTransactionConfig getConfiguration() { + return trxConfig; + } + + @Override + public void commit() throws BackendException { + commitCount++; + } + + @Override + public void rollback() throws BackendException { + rollbackCount++; + } + } }