From d55de02ca6f05d1a00aeb52d9448bb004a4e9fc3 Mon Sep 17 00:00:00 2001 From: gmaretic Date: Wed, 3 Feb 2021 19:00:58 +0000 Subject: [PATCH] Add the getRowKeysInRange method to the KVS interface (#5236) --- .../atlasdb/keyvalue/api/KeyValueService.java | 17 +++++++ .../cassandra/CassandraKeyValueService.java | 11 ----- .../impl/DualWriteKeyValueService.java | 5 ++ .../impl/InMemoryKeyValueService.java | 23 +++++++++ .../impl/ProfilingKeyValueService.java | 7 +++ .../keyvalue/impl/TracingKeyValueService.java | 14 ++++++ .../atlasdb/keyvalue/dbkvs/impl/DbKvs.java | 5 ++ .../impl/TableRemappingKeyValueService.java | 9 ++++ .../impl/TableSplittingKeyValueService.java | 5 ++ .../keyvalue/jdbc/JdbcKeyValueService.java | 5 ++ .../impl/TransactionTestSetup.java | 2 +- .../keyvalue/MemoryTransactionTest.java | 47 +++++++++++++++++++ changelog/@unreleased/pr-5236.v2.yml | 5 ++ 13 files changed, 143 insertions(+), 12 deletions(-) create mode 100644 changelog/@unreleased/pr-5236.v2.yml diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/KeyValueService.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/KeyValueService.java index 05d4b09aec2..4e0d93e61d5 100644 --- a/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/KeyValueService.java +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/KeyValueService.java @@ -646,4 +646,21 @@ default boolean performanceIsSensitiveToTombstones() { default boolean shouldTriggerCompactions() { return false; } + + /** + * Returns a sorted list of row keys in the specified range. + * + * This method is not guaranteed to be implemented for all implementations of {@link KeyValueService}. It may be + * changed or removed at any time without warning. + * + * @deprecated if you wish to use this method, contact the atlasdb team for support + * + * @param tableRef table for which the request is made. + * @param startRow inclusive start of the row key range. Use empty byte array for unbounded. + * @param endRow inclusive end of the row key range. Use empty byte array for unbounded. + * @param maxResults the request only returns the first maxResults rows in range. + */ + @DoDelegate + @Deprecated + List getRowKeysInRange(TableReference tableRef, byte[] startRow, byte[] endRow, int maxResults); } diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueService.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueService.java index 576169eee0f..980b9997279 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueService.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueService.java @@ -16,9 +16,7 @@ package com.palantir.atlasdb.keyvalue.cassandra; import com.palantir.atlasdb.keyvalue.api.KeyValueService; -import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.processors.AutoDelegate; -import java.util.List; @AutoDelegate public interface CassandraKeyValueService extends KeyValueService { @@ -30,13 +28,4 @@ public interface CassandraKeyValueService extends KeyValueService { @Override boolean isInitialized(); - /** - * Returns a sorted list of row keys in the specified range. - * - * @param tableRef table for which the request is made. - * @param startRow inclusive start of the row key range. Use empty byte array for unbounded. - * @param endRow inclusive end of the row key range. Use empty byte array for unbounded. - * @param maxResults the request only returns the first maxResults rows in range. - */ - List getRowKeysInRange(TableReference tableRef, byte[] startRow, byte[] endRow, int maxResults); } diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/DualWriteKeyValueService.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/DualWriteKeyValueService.java index 38fb8fa6e57..34ee2980680 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/DualWriteKeyValueService.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/DualWriteKeyValueService.java @@ -293,6 +293,11 @@ public boolean shouldTriggerCompactions() { return delegate1.shouldTriggerCompactions() || delegate2.shouldTriggerCompactions(); } + @Override + public List getRowKeysInRange(TableReference tableRef, byte[] startRow, byte[] endRow, int maxResults) { + return delegate1.getRowKeysInRange(tableRef, startRow, endRow, maxResults); + } + @Override public ListenableFuture> getAsync(TableReference tableRef, Map timestampByCell) { return delegate1.getAsync(tableRef, timestampByCell); diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/InMemoryKeyValueService.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/InMemoryKeyValueService.java index 6f5b7608c2b..05c1f06ea32 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/InMemoryKeyValueService.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/InMemoryKeyValueService.java @@ -30,6 +30,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.palantir.atlasdb.AtlasDbConstants; +import com.palantir.atlasdb.encoding.PtBytes; import com.palantir.atlasdb.keyvalue.api.BatchColumnRangeSelection; import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweeping; import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweepingRequest; @@ -52,6 +53,7 @@ import com.palantir.common.base.ClosableIterator; import com.palantir.common.base.ClosableIterators; import com.palantir.common.exception.TableMappingNotFoundException; +import com.palantir.conjure.java.lib.Bytes; import com.palantir.util.paging.TokenBackedBasicResultsPage; import java.util.ArrayList; import java.util.Arrays; @@ -70,6 +72,7 @@ import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.lang3.ArrayUtils; @@ -597,6 +600,26 @@ public ClusterAvailabilityStatus getClusterAvailabilityStatus() { return ClusterAvailabilityStatus.ALL_AVAILABLE; } + @Override + public List getRowKeysInRange(TableReference tableRef, byte[] startRow, byte[] endRow, int maxResults) { + RangeRequest.Builder rangeRequest = RangeRequest.builder().startRowInclusive(startRow); + if (Arrays.equals(endRow, PtBytes.EMPTY_BYTE_ARRAY)) { + rangeRequest.endRowExclusive(PtBytes.EMPTY_BYTE_ARRAY); + } else { + rangeRequest.endRowExclusive(RangeRequests.nextLexicographicName(endRow)); + } + try (ClosableIterator> rowsWithColumns = + getRange(tableRef, rangeRequest.build(), Long.MAX_VALUE)) { + return rowsWithColumns.stream() + .map(RowResult::getRowName) + .map(Bytes::from) + .distinct() + .limit(maxResults) + .map(Bytes::asNewByteArray) + .collect(Collectors.toList()); + } + } + private static IllegalArgumentException tableMappingException(TableReference tableReference) { return new IllegalArgumentException( new TableMappingNotFoundException("Table " + tableReference.getQualifiedName() + " does not exist")); diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/ProfilingKeyValueService.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/ProfilingKeyValueService.java index 2f664bc99a3..645e7cf0782 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/ProfilingKeyValueService.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/ProfilingKeyValueService.java @@ -507,6 +507,13 @@ public boolean shouldTriggerCompactions() { return delegate.shouldTriggerCompactions(); } + @Override + public List getRowKeysInRange(TableReference tableRef, byte[] startRow, byte[] endRow, int maxResults) { + return maybeLog( + () -> delegate.getRowKeysInRange(tableRef, startRow, endRow, maxResults), + logTimeAndTable("getRowKeysInRange", tableRef)); + } + @Override public ListenableFuture> getAsync(TableReference tableRef, Map timestampByCell) { long startTime = System.currentTimeMillis(); diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/TracingKeyValueService.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/TracingKeyValueService.java index 8a6c43740ac..f0fe1d54f68 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/TracingKeyValueService.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/TracingKeyValueService.java @@ -45,6 +45,8 @@ import com.palantir.common.base.ClosableIterator; import com.palantir.common.concurrent.PTExecutors; import com.palantir.logsafe.Preconditions; +import com.palantir.logsafe.SafeArg; +import com.palantir.logsafe.UnsafeArg; import com.palantir.tracing.DetachedSpan; import com.palantir.util.paging.TokenBackedBasicResultsPage; import java.util.Collection; @@ -427,6 +429,18 @@ public boolean shouldTriggerCompactions() { return delegate().shouldTriggerCompactions(); } + @Override + public List getRowKeysInRange(TableReference tableRef, byte[] startRow, byte[] endRow, int maxResults) { + try (CloseableTrace trace = startLocalTrace( + "getRowKeysInRange({}, {}, {}, {})", + LoggingArgs.safeTableOrPlaceholder(tableRef), + UnsafeArg.of("startRow", startRow), + UnsafeArg.of("endRow", endRow), + SafeArg.of("maxResults", maxResults))) { + return delegate().getRowKeysInRange(tableRef, startRow, endRow, maxResults); + } + } + @Override public ListenableFuture> getAsync(TableReference tableRef, Map timestampByCell) { DetachedSpan detachedSpan = DetachedSpan.start(String.format( diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java index 66328f9b462..cb6301bdb2d 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java @@ -1207,6 +1207,11 @@ public boolean shouldTriggerCompactions() { return true; } + @Override + public List getRowKeysInRange(TableReference tableRef, byte[] startRow, byte[] endRow, int maxResults) { + throw new UnsupportedOperationException("getRowKeysInRange is only supported for Cassandra."); + } + @Override public ListenableFuture> getAsync(TableReference tableRef, Map timestampByCell) { return Futures.immediateFuture(get(tableRef, timestampByCell)); diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/impl/TableRemappingKeyValueService.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/impl/TableRemappingKeyValueService.java index b6d1bd42033..cd9c41d6f31 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/impl/TableRemappingKeyValueService.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/impl/TableRemappingKeyValueService.java @@ -437,6 +437,15 @@ public boolean shouldTriggerCompactions() { return delegate.shouldTriggerCompactions(); } + @Override + public List getRowKeysInRange(TableReference tableRef, byte[] startRow, byte[] endRow, int maxResults) { + try { + return delegate().getRowKeysInRange(tableMapper.getMappedTableName(tableRef), startRow, endRow, maxResults); + } catch (TableMappingNotFoundException e) { + throw new IllegalArgumentException(e); + } + } + @Override public ListenableFuture> getAsync(TableReference tableRef, Map timestampByCell) { try { diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/impl/TableSplittingKeyValueService.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/impl/TableSplittingKeyValueService.java index f6619f42e8d..43b3be56688 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/impl/TableSplittingKeyValueService.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/impl/TableSplittingKeyValueService.java @@ -387,6 +387,11 @@ public boolean shouldTriggerCompactions() { return delegates.stream().anyMatch(KeyValueService::shouldTriggerCompactions); } + @Override + public List getRowKeysInRange(TableReference tableRef, byte[] startRow, byte[] endRow, int maxResults) { + return getDelegate(tableRef).getRowKeysInRange(tableRef, startRow, endRow, maxResults); + } + @Override public ListenableFuture> getAsync(TableReference tableRef, Map timestampByCell) { return getDelegate(tableRef).getAsync(tableRef, timestampByCell); diff --git a/atlasdb-jdbc/src/main/java/com/palantir/atlasdb/keyvalue/jdbc/JdbcKeyValueService.java b/atlasdb-jdbc/src/main/java/com/palantir/atlasdb/keyvalue/jdbc/JdbcKeyValueService.java index c223627fc78..bdc4805a8e3 100644 --- a/atlasdb-jdbc/src/main/java/com/palantir/atlasdb/keyvalue/jdbc/JdbcKeyValueService.java +++ b/atlasdb-jdbc/src/main/java/com/palantir/atlasdb/keyvalue/jdbc/JdbcKeyValueService.java @@ -1007,6 +1007,11 @@ public ClusterAvailabilityStatus getClusterAvailabilityStatus() { throw new UnsupportedOperationException("getClusterAvailabilityStatus has not been implemented for Jdbc KVS"); } + @Override + public List getRowKeysInRange(TableReference tableRef, byte[] startRow, byte[] endRow, int maxResults) { + throw new UnsupportedOperationException("getRowKeysInRange is only supported for Cassandra."); + } + @Override public ListenableFuture> getAsync(TableReference tableRef, Map timestampByCell) { return Futures.immediateFuture(this.get(tableRef, timestampByCell)); diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TransactionTestSetup.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TransactionTestSetup.java index 4c5fd6606f0..1f19f4024ff 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TransactionTestSetup.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TransactionTestSetup.java @@ -224,7 +224,7 @@ private String getCell(Transaction txn, TableReference tableRef, String rowName, return valueBytes != null ? PtBytes.toString(valueBytes) : null; } - void putDirect(String rowName, String columnName, String value, long timestamp) { + protected void putDirect(String rowName, String columnName, String value, long timestamp) { Cell cell = createCell(rowName, columnName); byte[] valueBytes = PtBytes.toBytes(value); keyValueService.put(TEST_TABLE, ImmutableMap.of(cell, valueBytes), timestamp); diff --git a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/keyvalue/MemoryTransactionTest.java b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/keyvalue/MemoryTransactionTest.java index 0ecedc5aec5..333c78402ff 100644 --- a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/keyvalue/MemoryTransactionTest.java +++ b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/keyvalue/MemoryTransactionTest.java @@ -15,15 +15,62 @@ */ package com.palantir.atlasdb.keyvalue; +import static org.assertj.core.api.Assertions.assertThat; + +import com.palantir.atlasdb.encoding.PtBytes; import com.palantir.atlasdb.keyvalue.impl.TestResourceManager; import com.palantir.atlasdb.transaction.impl.AbstractTransactionTest; import org.junit.ClassRule; +import org.junit.Test; public class MemoryTransactionTest extends AbstractTransactionTest { @ClassRule public static final TestResourceManager TRM = TestResourceManager.inMemory(); + private static final byte[] ROW_1 = PtBytes.toBytes("row1"); + private static final byte[] ZERO = new byte[0]; + public MemoryTransactionTest() { super(TRM, TRM); } + + @Test + public void testKeyValueRangeColumnSelectionEndInclusive() { + setup(); + assertThat(keyValueService.getRowKeysInRange(TEST_TABLE, ZERO, ROW_1, 9)) + .containsExactly(ROW_1); + } + + @Test + public void testKeyValueRangeColumnSelectionEntireTable() { + setup(); + byte[] row1 = PtBytes.toBytes("row1"); + assertThat(keyValueService.getRowKeysInRange(TEST_TABLE, ZERO, ZERO, 9)) + .containsExactly(row1, PtBytes.toBytes("row1a"), PtBytes.toBytes("row2")); + } + + @Test + public void testKeyValueRangeColumnSelectionStartInclusive() { + setup(); + byte[] row1 = PtBytes.toBytes("row1"); + assertThat(keyValueService.getRowKeysInRange(TEST_TABLE, ROW_1, ZERO, 9)) + .containsExactly(row1, PtBytes.toBytes("row1a"), PtBytes.toBytes("row2")); + } + + @Test + public void testKeyValueRangeColumnSelectionMaxResults() { + setup(); + byte[] row1 = PtBytes.toBytes("row1"); + assertThat(keyValueService.getRowKeysInRange(TEST_TABLE, ZERO, ZERO, 2)) + .containsExactly(row1, PtBytes.toBytes("row1a")); + } + + private void setup() { + putDirect("row1", "col1", "v1", 0); + putDirect("row1", "col2", "v2", 2); + putDirect("row1", "col4", "v5", 3); + putDirect("row1a", "col4", "v5", 100); + putDirect("row2", "col2", "v3", 1); + putDirect("row2", "col4", "v4", 6); + } } diff --git a/changelog/@unreleased/pr-5236.v2.yml b/changelog/@unreleased/pr-5236.v2.yml new file mode 100644 index 00000000000..848fa9bf2a9 --- /dev/null +++ b/changelog/@unreleased/pr-5236.v2.yml @@ -0,0 +1,5 @@ +type: improvement +improvement: + description: Added the getRowKeysInRange method to the KVS interface, so we can access the Cassandra KVS implementation from a transaction manager. This may be removed at any time, and if you wish to use it, contact the atlasdb team for support. + links: + - https://github.com/palantir/atlasdb/pull/5236