From 9587c74523172e8e861907e5e7f8175355bc7fe3 Mon Sep 17 00:00:00 2001 From: gmaretic Date: Thu, 29 Sep 2022 13:53:02 +0100 Subject: [PATCH] [TTS] InMemory implementation of MCAS (#6243) --- .../api/CheckAndSetCompatibility.java | 21 ++ .../atlasdb/keyvalue/api/KeyValueService.java | 2 + .../api/CheckAndSetCompatibilityTest.java | 80 ++++--- .../CassandraKeyValueServiceImpl.java | 2 + .../impl/AbstractKeyValueService.java | 1 + .../impl/InMemoryKeyValueService.java | 215 ++++++++++++------ .../impl/AbstractKeyValueServiceTest.java | 132 +++++++++++ changelog/@unreleased/pr-6243.v2.yml | 5 + 8 files changed, 368 insertions(+), 90 deletions(-) create mode 100644 changelog/@unreleased/pr-6243.v2.yml diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/CheckAndSetCompatibility.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/CheckAndSetCompatibility.java index 7cc70193afe..c064b810340 100644 --- a/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/CheckAndSetCompatibility.java +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/CheckAndSetCompatibility.java @@ -16,6 +16,7 @@ package com.palantir.atlasdb.keyvalue.api; +import com.palantir.logsafe.Preconditions; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -31,6 +32,11 @@ public interface CheckAndSetCompatibility { */ boolean supportsCheckAndSetOperations(); + @Value.Default + default boolean supportsMultiCheckAndSetOperations() { + return false; + } + /** * If false, there are no guarantees that a {@link CheckAndSetException#getActualValues()} or * {@link KeyAlreadyExistsException#getExistingKeys()} from exceptions thrown by the the {@link KeyValueService} @@ -53,6 +59,13 @@ public interface CheckAndSetCompatibility { */ boolean consistentOnFailure(); + @Value.Check + default void cannotOnlySupportMultiCheckAndSet() { + Preconditions.checkArgument( + supportsCheckAndSetOperations() || !supportsMultiCheckAndSetOperations(), + "Support for MultiCAS implies support for CAS."); + } + static CheckAndSetCompatibility intersect(Stream compatibilities) { Set presentCompatibilities = compatibilities.collect(Collectors.toSet()); @@ -61,10 +74,13 @@ static CheckAndSetCompatibility intersect(Stream compa if (!supported) { return Unsupported.INSTANCE; } + boolean multiCas = + presentCompatibilities.stream().allMatch(CheckAndSetCompatibility::supportsMultiCheckAndSetOperations); boolean detail = presentCompatibilities.stream().allMatch(CheckAndSetCompatibility::supportsDetailOnFailure); boolean consistency = presentCompatibilities.stream().allMatch(CheckAndSetCompatibility::consistentOnFailure); return supportedBuilder() + .supportsMultiCheckAndSetOperations(multiCas) .supportsDetailOnFailure(detail) .consistentOnFailure(consistency) .build(); @@ -86,6 +102,11 @@ public boolean supportsCheckAndSetOperations() { return false; } + @Override + public boolean supportsMultiCheckAndSetOperations() { + return false; + } + @Override public boolean supportsDetailOnFailure() { throw new UnsupportedOperationException( diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/KeyValueService.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/KeyValueService.java index 6988272dd2c..91f514670b9 100644 --- a/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/KeyValueService.java +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/KeyValueService.java @@ -317,6 +317,8 @@ default boolean supportsCheckAndSet() { * In this case, you can be sure that all your cells have been updated to their new values. * In case of failure, there are no guarantees that the operation was not partially applied but the * implementations may offer such a guarantee. + * Reads concurrent with this operation may see a partially applied update that later succeeds, though + * implementations may offer stronger guarantees. * * If a {@link MultiCheckAndSetException} is thrown, it is likely that the values stored in the cells were not as * you expected. diff --git a/atlasdb-api/src/test/java/com/palantir/atlasdb/keyvalue/api/CheckAndSetCompatibilityTest.java b/atlasdb-api/src/test/java/com/palantir/atlasdb/keyvalue/api/CheckAndSetCompatibilityTest.java index 1ba85d33b22..9db07a648a0 100644 --- a/atlasdb-api/src/test/java/com/palantir/atlasdb/keyvalue/api/CheckAndSetCompatibilityTest.java +++ b/atlasdb-api/src/test/java/com/palantir/atlasdb/keyvalue/api/CheckAndSetCompatibilityTest.java @@ -23,21 +23,32 @@ import org.junit.Test; public class CheckAndSetCompatibilityTest { - private static final CheckAndSetCompatibility SUPPORTS_DETAIL_NOT_CONSISTENT_ON_FAILURE = - CheckAndSetCompatibility.supportedBuilder() - .supportsDetailOnFailure(true) - .consistentOnFailure(false) - .build(); - private static final CheckAndSetCompatibility NO_DETAIL_CONSISTENT_ON_FAILURE = - CheckAndSetCompatibility.supportedBuilder() - .supportsDetailOnFailure(false) - .consistentOnFailure(true) - .build(); - private static final CheckAndSetCompatibility SUPPORTS_DETAIL_CONSISTENT_ON_FAILURE = - CheckAndSetCompatibility.supportedBuilder() - .supportsDetailOnFailure(true) - .consistentOnFailure(true) - .build(); + private static final CheckAndSetCompatibility DETAIL_ONLY = CheckAndSetCompatibility.supportedBuilder() + .supportsMultiCheckAndSetOperations(false) + .supportsDetailOnFailure(true) + .consistentOnFailure(false) + .build(); + private static final CheckAndSetCompatibility CONSISTENT_ONLY = CheckAndSetCompatibility.supportedBuilder() + .supportsMultiCheckAndSetOperations(false) + .supportsDetailOnFailure(false) + .consistentOnFailure(true) + .build(); + private static final CheckAndSetCompatibility MCAS_ONLY = CheckAndSetCompatibility.supportedBuilder() + .supportsMultiCheckAndSetOperations(true) + .supportsDetailOnFailure(false) + .consistentOnFailure(false) + .build(); + private static final CheckAndSetCompatibility DETAIL_AND_CONSISTENT = CheckAndSetCompatibility.supportedBuilder() + .supportsMultiCheckAndSetOperations(false) + .supportsDetailOnFailure(true) + .consistentOnFailure(true) + .build(); + + private static final CheckAndSetCompatibility MCAS_DETAIL_CONSISTENT = CheckAndSetCompatibility.supportedBuilder() + .supportsMultiCheckAndSetOperations(true) + .supportsDetailOnFailure(true) + .consistentOnFailure(true) + .build(); @Test public void checkingDetailSupportedOnUnsupportedThrows() { @@ -59,9 +70,16 @@ public void unsupportedDoesNotSupportCheckAndSetOperations() { .isFalse(); } + @Test + public void unsupportedDoesNotSupportMultiCheckAndSetOperations() { + assertThat(CheckAndSetCompatibility.unsupported().supportsMultiCheckAndSetOperations()) + .isFalse(); + } + @Test public void supportedBuilderSupportsCheckAndSetOperations() { assertThat(CheckAndSetCompatibility.supportedBuilder() + .supportsMultiCheckAndSetOperations(false) .supportsDetailOnFailure(false) .consistentOnFailure(false) .build() @@ -73,44 +91,53 @@ public void supportedBuilderSupportsCheckAndSetOperations() { public void intersectReturnsLeastRestrictiveWhenNoCompatibilitiesProvided() { CheckAndSetCompatibility intersection = intersectCompatibility(); assertThat(intersection.supportsCheckAndSetOperations()).isTrue(); + assertThat(intersection.supportsMultiCheckAndSetOperations()).isTrue(); assertThat(intersection.supportsDetailOnFailure()).isTrue(); assertThat(intersection.consistentOnFailure()).isTrue(); } @Test - public void intersectAppliesToBothProperties() { + public void intersectAppliesToAllProperties() { CheckAndSetCompatibility intersection = intersectCompatibility( - SUPPORTS_DETAIL_NOT_CONSISTENT_ON_FAILURE, - NO_DETAIL_CONSISTENT_ON_FAILURE, - SUPPORTS_DETAIL_CONSISTENT_ON_FAILURE); + DETAIL_ONLY, CONSISTENT_ONLY, MCAS_ONLY, DETAIL_AND_CONSISTENT, MCAS_DETAIL_CONSISTENT); assertThat(intersection.supportsCheckAndSetOperations()).isTrue(); + assertThat(intersection.supportsMultiCheckAndSetOperations()).isFalse(); assertThat(intersection.supportsDetailOnFailure()).isFalse(); assertThat(intersection.consistentOnFailure()).isFalse(); } @Test public void intersectReturnsDetailSupportedWhenAllSupport() { - CheckAndSetCompatibility intersection = intersectCompatibility( - SUPPORTS_DETAIL_NOT_CONSISTENT_ON_FAILURE, SUPPORTS_DETAIL_CONSISTENT_ON_FAILURE); + CheckAndSetCompatibility intersection = intersectCompatibility(DETAIL_ONLY, DETAIL_AND_CONSISTENT); assertThat(intersection.supportsCheckAndSetOperations()).isTrue(); + assertThat(intersection.supportsMultiCheckAndSetOperations()).isFalse(); assertThat(intersection.supportsDetailOnFailure()).isTrue(); assertThat(intersection.consistentOnFailure()).isFalse(); } @Test public void intersectReturnsConsistentWhenAllConsistent() { - CheckAndSetCompatibility intersection = - intersectCompatibility(SUPPORTS_DETAIL_CONSISTENT_ON_FAILURE, NO_DETAIL_CONSISTENT_ON_FAILURE); + CheckAndSetCompatibility intersection = intersectCompatibility(DETAIL_AND_CONSISTENT, CONSISTENT_ONLY); assertThat(intersection.supportsCheckAndSetOperations()).isTrue(); + assertThat(intersection.supportsMultiCheckAndSetOperations()).isFalse(); assertThat(intersection.supportsDetailOnFailure()).isFalse(); assertThat(intersection.consistentOnFailure()).isTrue(); } + @Test + public void intersectReturnsMultiCheckAndSetOperationsSupportedWhenAllSupport() { + CheckAndSetCompatibility intersection = intersectCompatibility(MCAS_ONLY, MCAS_DETAIL_CONSISTENT); + assertThat(intersection.supportsCheckAndSetOperations()).isTrue(); + assertThat(intersection.supportsMultiCheckAndSetOperations()).isTrue(); + assertThat(intersection.supportsDetailOnFailure()).isFalse(); + assertThat(intersection.consistentOnFailure()).isFalse(); + } + @Test public void intersectDoesNotRestrictUnnecessarily() { - CheckAndSetCompatibility intersection = - intersectCompatibility(SUPPORTS_DETAIL_CONSISTENT_ON_FAILURE, SUPPORTS_DETAIL_CONSISTENT_ON_FAILURE); + CheckAndSetCompatibility intersection = intersectCompatibility(MCAS_DETAIL_CONSISTENT, MCAS_DETAIL_CONSISTENT); assertThat(intersection.supportsCheckAndSetOperations()).isTrue(); + assertThat(intersection.supportsMultiCheckAndSetOperations()).isTrue(); assertThat(intersection.supportsDetailOnFailure()).isTrue(); assertThat(intersection.consistentOnFailure()).isTrue(); } @@ -118,8 +145,9 @@ public void intersectDoesNotRestrictUnnecessarily() { @Test public void intersectWithUnsupportedIsUnsupported() { CheckAndSetCompatibility intersection = - intersectCompatibility(SUPPORTS_DETAIL_CONSISTENT_ON_FAILURE, CheckAndSetCompatibility.unsupported()); + intersectCompatibility(DETAIL_AND_CONSISTENT, CheckAndSetCompatibility.unsupported()); assertThat(intersection.supportsCheckAndSetOperations()).isFalse(); + assertThat(intersection.supportsMultiCheckAndSetOperations()).isFalse(); } private CheckAndSetCompatibility intersectCompatibility(CheckAndSetCompatibility... compatibilities) { diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceImpl.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceImpl.java index d34cdca5d29..6a7543320a7 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceImpl.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceImpl.java @@ -1936,6 +1936,7 @@ private static Column prepareColumnForPutUnlessExists(Map.Entry in @Override public CheckAndSetCompatibility getCheckAndSetCompatibility() { return CheckAndSetCompatibility.supportedBuilder() + .supportsMultiCheckAndSetOperations(true) .supportsDetailOnFailure(true) .consistentOnFailure(false) .build(); @@ -1980,6 +1981,7 @@ public void checkAndSet(final CheckAndSetRequest request) throws CheckAndSetExce * In this case, you can be sure that all your cells have been updated to their new values. * If the old cells initially did not have the values you expected, none of the cells will be updated and * {@link MultiCheckAndSetException} will be thrown. + * Reads concurrent with this operation will not see a partial update. * * Another thing to note is that the check operation will **only be performed on values of cells that are declared * in the set of expected values** i.e. the check operation DOES NOT take updates into account. diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractKeyValueService.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractKeyValueService.java index ae7218f5d26..d54e6451e87 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractKeyValueService.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractKeyValueService.java @@ -84,6 +84,7 @@ protected static ExecutorService createThreadPool(String threadNamePrefix, int _ @Override public CheckAndSetCompatibility getCheckAndSetCompatibility() { return CheckAndSetCompatibility.supportedBuilder() + .supportsMultiCheckAndSetOperations(false) .consistentOnFailure(true) .supportsDetailOnFailure(false) .build(); diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/InMemoryKeyValueService.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/InMemoryKeyValueService.java index 84b066a0810..c361150b255 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/InMemoryKeyValueService.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/InMemoryKeyValueService.java @@ -52,6 +52,7 @@ import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.keyvalue.api.TimestampRangeDelete; import com.palantir.atlasdb.keyvalue.api.Value; +import com.palantir.atlasdb.logging.LoggingArgs; import com.palantir.common.annotation.Output; import com.palantir.common.base.ClosableIterator; import com.palantir.common.base.ClosableIterators; @@ -76,6 +77,11 @@ import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -107,8 +113,7 @@ public InMemoryKeyValueService(boolean createTablesAutomatically, ExecutorServic public Map getRows( TableReference tableRef, Iterable rows, ColumnSelection columnSelection, long timestamp) { Map result = new HashMap<>(); - ConcurrentSkipListMap table = getTableMap(tableRef).entries; - + ConcurrentNavigableMap table = getTableMap(tableRef).entries; for (byte[] row : rows) { Cell rowBegin = Cells.createSmallestCellForRow(row); Cell rowEnd = Cells.createLargestCellForRow(row); @@ -153,8 +158,8 @@ private void getLatestVersionOfCell( @Override public Map get(TableReference tableRef, Map timestampByCell) { - ConcurrentSkipListMap table = getTableMap(tableRef).entries; Map result = new HashMap<>(); + ConcurrentNavigableMap table = getTableMap(tableRef).entries; for (Map.Entry e : timestampByCell.entrySet()) { Cell cell = e.getKey(); Map.Entry lastEntry = table.lowerEntry(new Key(cell, e.getValue())); @@ -429,26 +434,28 @@ public void setOnce(TableReference tableRef, Map values) { private void putInternal( TableReference tableRef, Collection> values, OverwriteBehaviour overwriteBehaviour) { - Table table = getTableMap(tableRef); - List knownSuccessfullyCommittedKeys = new ArrayList<>(); - for (Map.Entry entry : values) { - byte[] contents = entry.getValue().getContents(); - long timestamp = entry.getValue().getTimestamp(); - - Key key = getKey(table, entry.getKey(), timestamp); - if (overwriteBehaviour == OverwriteBehaviour.OVERWRITE) { - table.entries.put(key, copyOf(contents)); - } else { - byte[] oldContents = putIfAbsent(table, key, contents); - if (shouldThrow(overwriteBehaviour, contents, oldContents)) { - throw new KeyAlreadyExistsException( - "We already have a value for this timestamp", - ImmutableList.of(entry.getKey()), - knownSuccessfullyCommittedKeys); + runWithLockForWrites(tableRef, table -> { + List knownSuccessfullyCommittedKeys = new ArrayList<>(); + for (Map.Entry entry : values) { + byte[] contents = entry.getValue().getContents(); + long timestamp = entry.getValue().getTimestamp(); + + Key key = getKey(table, entry.getKey(), timestamp); + if (overwriteBehaviour == OverwriteBehaviour.OVERWRITE) { + table.put(key, copyOf(contents)); + } else { + byte[] oldContents = putIfAbsent(table, key, contents); + if (shouldThrow(overwriteBehaviour, contents, oldContents)) { + throw new KeyAlreadyExistsException( + "We already have a value for this timestamp", + ImmutableList.of(entry.getKey()), + knownSuccessfullyCommittedKeys); + } + knownSuccessfullyCommittedKeys.add(entry.getKey()); } - knownSuccessfullyCommittedKeys.add(entry.getKey()); } - } + return null; + }); } private boolean shouldThrow(OverwriteBehaviour overwriteBehaviour, byte[] contents, byte[] oldContents) { @@ -471,6 +478,7 @@ private enum OverwriteBehaviour { public CheckAndSetCompatibility getCheckAndSetCompatibility() { // We advertise inconsistency on failure, for the purposes of test re-usability. return CheckAndSetCompatibility.supportedBuilder() + .supportsMultiCheckAndSetOperations(true) .supportsDetailOnFailure(true) .consistentOnFailure(false) .build(); @@ -479,43 +487,83 @@ public CheckAndSetCompatibility getCheckAndSetCompatibility() { @Override public void checkAndSet(CheckAndSetRequest request) throws CheckAndSetException { TableReference tableRef = request.table(); - Table table = getTableMap(tableRef); - Cell cell = request.cell(); - Optional oldValue = request.oldValue(); - byte[] contents = request.newValue(); - - Key key = getKey(table, cell, AtlasDbConstants.TRANSACTION_TS); - if (oldValue.isPresent()) { - byte[] storedValue = table.entries.get(key); - boolean succeeded = Arrays.equals(storedValue, oldValue.get()) - && table.entries.replace(key, storedValue, copyOf(contents)); - if (!succeeded) { - byte[] actual = table.entries.get(key); // Re-fetch, something may have happened between get and replace - throwCheckAndSetException(cell, tableRef, oldValue.get(), actual); - } - } else { - byte[] oldContents = putIfAbsent(table, key, contents); - if (oldContents != null) { - throwCheckAndSetException(cell, tableRef, null, oldContents); + runWithLockForWrites(tableRef, table -> { + Cell cell = request.cell(); + Optional oldValue = request.oldValue(); + byte[] contents = request.newValue(); + + Key key = getKey(table, cell, AtlasDbConstants.TRANSACTION_TS); + if (oldValue.isPresent()) { + byte[] storedValue = table.get(key); + boolean succeeded = + Arrays.equals(storedValue, oldValue.get()) && table.replace(key, storedValue, copyOf(contents)); + if (!succeeded) { + byte[] actual = table.get(key); // Re-fetch, something may have happened between get and replace + throwCheckAndSetException(cell, tableRef, oldValue.get(), actual); + } + } else { + byte[] oldContents = putIfAbsent(table, key, contents); + if (oldContents != null) { + throwCheckAndSetException(cell, tableRef, null, oldContents); + } } - } + return null; + }); } + /** + * This operation ensures atomicity by locking down the table. This ensures that there are no conflicts with other + * writes, and that reads see a consistent view of the world. The exception are reads that return iterators, which + * may see a partially applied multiCAS. + * + * In case of failure, updates will not be partially applied. + * Reads concurrent with this operation may see a partially applied update. + */ @Override public void multiCheckAndSet(MultiCheckAndSetRequest multiCheckAndSetRequest) throws MultiCheckAndSetException { - throw new UnsupportedOperationException("InMemoryKvs does not support multi-checkAndSet operation!"); + TableReference tableRef = multiCheckAndSetRequest.tableRef(); + runWithExclusiveLock(tableRef, tableEntry -> { + Map actualValues = new HashMap<>(); + boolean success = true; + for (Map.Entry entry : + multiCheckAndSetRequest.expected().entrySet()) { + Cell cell = entry.getKey(); + Key key = getKey(tableEntry, cell, AtlasDbConstants.TRANSACTION_TS); + byte[] actual = tableEntry.entries.get(key); + if (actual != null) { + actualValues.put(cell, actual); + } + success &= Arrays.equals(actual, entry.getValue()); + } + if (!success) { + throw new MultiCheckAndSetException( + LoggingArgs.tableRef(tableRef), + multiCheckAndSetRequest.rowName(), + multiCheckAndSetRequest.expected(), + actualValues); + } + putInternal( + tableRef, + KeyValueServices.toConstantTimestampValues( + multiCheckAndSetRequest.updates().entrySet(), AtlasDbConstants.TRANSACTION_TS), + OverwriteBehaviour.OVERWRITE); + }); } // Returns the existing contents, if any, and null otherwise - private byte[] putIfAbsent(Table table, Key key, final byte[] contents) { - return table.entries.putIfAbsent(key, copyOf(contents)); + private byte[] putIfAbsent(ConcurrentSkipListMap table, Key key, final byte[] contents) { + return table.putIfAbsent(key, copyOf(contents)); } private Key getKey(Table table, Cell cell, long timestamp) { + return getKey(table.entries, cell, timestamp); + } + + private Key getKey(ConcurrentSkipListMap table, Cell cell, long timestamp) { byte[] row = cell.getRowName(); byte[] col = cell.getColumnName(); - Key nextKey = table.entries.ceilingKey(new Key(row, ArrayUtils.EMPTY_BYTE_ARRAY, Long.MIN_VALUE)); + Key nextKey = table.ceilingKey(new Key(row, ArrayUtils.EMPTY_BYTE_ARRAY, Long.MIN_VALUE)); if (nextKey != null && nextKey.matchesRow(row)) { // Save memory by sharing rows. row = nextKey.row; @@ -530,25 +578,29 @@ private void throwCheckAndSetException(Cell cell, TableReference tableRef, byte[ @Override public void delete(TableReference tableRef, Multimap keys) { - ConcurrentSkipListMap table = getTableMap(tableRef).entries; - for (Map.Entry e : keys.entries()) { - table.remove(new Key(e.getKey(), e.getValue())); - } + runWithLockForWrites(tableRef, table -> { + for (Map.Entry e : keys.entries()) { + table.remove(new Key(e.getKey(), e.getValue())); + } + return null; + }); } @Override public void deleteAllTimestamps(TableReference tableRef, Map deletes) { - ConcurrentSkipListMap table = getTableMap(tableRef).entries; - deletes.forEach((cell, delete) -> table.subMap( - new Key(cell, delete.minTimestampToDelete()), true, - new Key(cell, delete.maxTimestampToDelete()), true) - .clear()); + runWithLockForWrites(tableRef, table -> { + deletes.forEach((cell, delete) -> table.subMap( + new Key(cell, delete.minTimestampToDelete()), true, + new Key(cell, delete.maxTimestampToDelete()), true) + .clear()); + return null; + }); } @Override public Multimap getAllTimestamps(TableReference tableRef, Set cells, long ts) { Multimap multimap = HashMultimap.create(); - ConcurrentSkipListMap table = getTableMap(tableRef).entries; + ConcurrentNavigableMap table = getTableMap(tableRef).entries; for (Cell key : cells) { for (Key entry : table.subMap(new Key(key, Long.MIN_VALUE), new Key(key, ts)).keySet()) { @@ -571,12 +623,14 @@ public void truncateTables(final Set tableRefs) { @Override public void truncateTable(TableReference tableRef) { - Table table = tables.get(tableRef); - if (table != null) { - table.entries.clear(); - } else { - throw tableMappingException(tableRef); - } + runWithLockForWrites(tableRef, table -> { + if (table != null) { + table.clear(); + } else { + throw tableMappingException(tableRef); + } + return null; + }); } @Override @@ -613,6 +667,7 @@ private byte[] copyOf(byte[] contents) { static class Table { final ConcurrentSkipListMap entries; + final ReadWriteLock lock = new ReentrantReadWriteLock(true); Table() { this.entries = new ConcurrentSkipListMap<>(); @@ -632,10 +687,12 @@ private Table getTableMap(TableReference tableRef) { @Override public void addGarbageCollectionSentinelValues(TableReference tableRef, Iterable cells) { - ConcurrentSkipListMap table = getTableMap(tableRef).entries; - for (Cell cell : cells) { - table.put(new Key(cell, Value.INVALID_VALUE_TIMESTAMP), ArrayUtils.EMPTY_BYTE_ARRAY); - } + runWithLockForWrites(tableRef, table -> { + for (Cell cell : cells) { + table.put(new Key(cell, Value.INVALID_VALUE_TIMESTAMP), ArrayUtils.EMPTY_BYTE_ARRAY); + } + return null; + }); } @Override @@ -678,6 +735,36 @@ public ListenableFuture> getAsync(TableReference tableRef, Map< return Futures.immediateFuture(get(tableRef, timestampByCell)); } + /** + * This takes out a reentrant read lock. It is only exclusive specifically with + * {@link #multiCheckAndSet(MultiCheckAndSetRequest)}. + */ + private T runWithLockForWrites(TableReference tableRef, Function, T> task) { + Table tableEntry = getTableMap(tableRef); + Lock lock = tableEntry.lock.readLock(); + lock.lock(); + try { + return task.apply(tableEntry.entries); + } finally { + lock.unlock(); + } + } + + /** + * This takes out a reentrant write lock. This ensures that {@link #multiCheckAndSet(MultiCheckAndSetRequest)} + * does not contend with other writes. + */ + private void runWithExclusiveLock(TableReference tableRef, Consumer task) { + Table table = getTableMap(tableRef); + Lock lock = table.lock.writeLock(); + lock.lock(); + try { + task.accept(table); + } finally { + lock.unlock(); + } + } + private static class Key implements Comparable { private final byte[] row; private final byte[] col; diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractKeyValueServiceTest.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractKeyValueServiceTest.java index 05ed3b1abc4..02ac33e6316 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractKeyValueServiceTest.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractKeyValueServiceTest.java @@ -45,6 +45,8 @@ import com.palantir.atlasdb.keyvalue.api.ColumnSelection; import com.palantir.atlasdb.keyvalue.api.KeyAlreadyExistsException; import com.palantir.atlasdb.keyvalue.api.KeyValueService; +import com.palantir.atlasdb.keyvalue.api.MultiCheckAndSetException; +import com.palantir.atlasdb.keyvalue.api.MultiCheckAndSetRequest; import com.palantir.atlasdb.keyvalue.api.RangeRequest; import com.palantir.atlasdb.keyvalue.api.RangeRequests; import com.palantir.atlasdb.keyvalue.api.RowColumnRangeIterator; @@ -59,6 +61,7 @@ import com.palantir.atlasdb.table.description.TableMetadata; import com.palantir.atlasdb.table.description.ValueType; import com.palantir.common.base.ClosableIterator; +import com.palantir.common.streams.KeyedStream; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -74,6 +77,7 @@ import java.util.function.Function; import java.util.function.UnaryOperator; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.LongStream; import org.apache.commons.lang3.ArrayUtils; import org.assertj.core.data.MapEntry; @@ -1926,11 +1930,139 @@ public void compactingShouldNotFail() { keyValueService.compactInternally(TEST_TABLE); } + @SuppressWarnings("JavadocStyleCheck") + /** + * 0->0 ---- 2->2 ---- 4->4 + * => + * 0,1,2,3,4 -> update + */ + @Test + public void multiCheckAndSetSuccessTest() { + assumeTrue(keyValueService.getCheckAndSetCompatibility().supportsMultiCheckAndSetOperations()); + keyValueService.createTable(TEST_TABLE, AtlasDbConstants.GENERIC_TABLE_METADATA); + + List cells = IntStream.range(0, 5) + .mapToObj(AbstractKeyValueServiceTest::cell) + .collect(Collectors.toList()); + byte[] update = PtBytes.toBytes("update"); + ImmutableMap actualMap = ImmutableMap.of( + cells.get(0), cells.get(0).getColumnName(), + cells.get(2), cells.get(2).getColumnName(), + cells.get(4), cells.get(4).getColumnName()); + Map updateMap = cells.stream().collect(Collectors.toMap(x -> x, _ignore -> update)); + + keyValueService.putUnlessExists(TEST_TABLE, actualMap); + + MultiCheckAndSetRequest request = MultiCheckAndSetRequest.builder() + .tableRef(TEST_TABLE) + .rowName(cells.get(0).getRowName()) + .expected(actualMap) + .updates(updateMap) + .build(); + keyValueService.multiCheckAndSet(request); + assertThat(keyValueService.get(TEST_TABLE, cells.stream().collect(Collectors.toMap(x -> x, _ignore -> 1L)))) + .containsExactlyInAnyOrderEntriesOf(KeyedStream.stream(updateMap) + .map(val -> Value.create(val, 0L)) + .collectToMap()); + } + @Test public void clusterAvailabilityStatusShouldBeAllAvailable() { assertThat(keyValueService.getClusterAvailabilityStatus()).isEqualTo(ClusterAvailabilityStatus.ALL_AVAILABLE); } + @SuppressWarnings("JavadocStyleCheck") + /** + * 0->0 1->2 2->2 3->2 4->4 + * fail check for + * 0->0 1->1 2->2 3->3 4->4 + */ + @Test + public void multiCheckAndSetFailureOnMismatchTest() { + assumeTrue(keyValueService.getCheckAndSetCompatibility().supportsMultiCheckAndSetOperations()); + keyValueService.createTable(TEST_TABLE, AtlasDbConstants.GENERIC_TABLE_METADATA); + + List cells = IntStream.range(0, 5) + .mapToObj(AbstractKeyValueServiceTest::cell) + .collect(Collectors.toList()); + byte[] update = PtBytes.toBytes("update"); + ImmutableMap actualMap = ImmutableMap.of( + cells.get(0), cells.get(0).getColumnName(), + cells.get(1), cells.get(2).getColumnName(), + cells.get(2), cells.get(2).getColumnName(), + cells.get(3), cells.get(2).getColumnName(), + cells.get(4), cells.get(4).getColumnName()); + Map expectedMap = cells.stream().collect(Collectors.toMap(x -> x, Cell::getColumnName)); + Map updateMap = cells.stream().collect(Collectors.toMap(x -> x, _ignore -> update)); + + keyValueService.putUnlessExists(TEST_TABLE, actualMap); + + MultiCheckAndSetRequest request = MultiCheckAndSetRequest.builder() + .tableRef(TEST_TABLE) + .rowName(cells.get(0).getRowName()) + .expected(expectedMap) + .updates(updateMap) + .build(); + + assertThatThrownBy(() -> keyValueService.multiCheckAndSet(request)) + .isInstanceOf(MultiCheckAndSetException.class) + .satisfies(exception -> { + MultiCheckAndSetException mcasException = (MultiCheckAndSetException) exception; + assertThat(mcasException.getActualValues()).containsExactlyInAnyOrderEntriesOf(actualMap); + }); + assertThat(keyValueService.get(TEST_TABLE, cells.stream().collect(Collectors.toMap(x -> x, _ignore -> 1L)))) + .containsExactlyInAnyOrderEntriesOf(KeyedStream.stream(actualMap) + .map(val -> Value.create(val, 0L)) + .collectToMap()); + } + + @SuppressWarnings("JavadocStyleCheck") + /** + * 0->0 ---- 2->2 ---- 4->4 + * fail check for + * 0->0 1->1 2->2 3->3 4->4 + */ + @Test + public void multiCheckAndSetFailureOnAbsentTest() { + assumeTrue(keyValueService.getCheckAndSetCompatibility().supportsMultiCheckAndSetOperations()); + keyValueService.createTable(TEST_TABLE, AtlasDbConstants.GENERIC_TABLE_METADATA); + + List cells = IntStream.range(0, 5) + .mapToObj(AbstractKeyValueServiceTest::cell) + .collect(Collectors.toList()); + byte[] update = PtBytes.toBytes("update"); + ImmutableMap actualMap = ImmutableMap.of( + cells.get(0), cells.get(0).getColumnName(), + cells.get(2), cells.get(2).getColumnName(), + cells.get(4), cells.get(4).getColumnName()); + Map expectedMap = cells.stream().collect(Collectors.toMap(x -> x, Cell::getColumnName)); + Map updateMap = cells.stream().collect(Collectors.toMap(x -> x, _ignore -> update)); + + keyValueService.putUnlessExists(TEST_TABLE, actualMap); + + MultiCheckAndSetRequest request = MultiCheckAndSetRequest.builder() + .tableRef(TEST_TABLE) + .rowName(cells.get(0).getRowName()) + .expected(expectedMap) + .updates(updateMap) + .build(); + + assertThatThrownBy(() -> keyValueService.multiCheckAndSet(request)) + .isInstanceOf(MultiCheckAndSetException.class) + .satisfies(exception -> { + MultiCheckAndSetException mcasException = (MultiCheckAndSetException) exception; + assertThat(mcasException.getActualValues()).containsExactlyInAnyOrderEntriesOf(actualMap); + }); + assertThat(keyValueService.get(TEST_TABLE, cells.stream().collect(Collectors.toMap(x -> x, _ignore -> 1L)))) + .containsExactlyInAnyOrderEntriesOf(KeyedStream.stream(actualMap) + .map(val -> Value.create(val, 0L)) + .collectToMap()); + } + + private static Cell cell(int index) { + return Cell.create(PtBytes.toBytes("row"), PtBytes.toBytes(index)); + } + protected static byte[] row(int number) { return PtBytes.toBytes("row" + number); } diff --git a/changelog/@unreleased/pr-6243.v2.yml b/changelog/@unreleased/pr-6243.v2.yml new file mode 100644 index 00000000000..6a9086d485b --- /dev/null +++ b/changelog/@unreleased/pr-6243.v2.yml @@ -0,0 +1,5 @@ +type: improvement +improvement: + description: '[TTS] InMemory implementation of MCAS' + links: + - https://github.com/palantir/atlasdb/pull/6243