Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
Add the getRowKeysInRange method to the KVS interface (#5236)
Browse files Browse the repository at this point in the history
  • Loading branch information
gmaretic authored Feb 3, 2021
1 parent 709edb0 commit d55de02
Show file tree
Hide file tree
Showing 13 changed files with 143 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -646,4 +646,21 @@ default boolean performanceIsSensitiveToTombstones() {
default boolean shouldTriggerCompactions() {
return false;
}

/**
* Returns a sorted list of row keys in the specified range.
*
* This method is not guaranteed to be implemented for all implementations of {@link KeyValueService}. It may be
* changed or removed at any time without warning.
*
* @deprecated if you wish to use this method, contact the atlasdb team for support
*
* @param tableRef table for which the request is made.
* @param startRow inclusive start of the row key range. Use empty byte array for unbounded.
* @param endRow inclusive end of the row key range. Use empty byte array for unbounded.
* @param maxResults the request only returns the first maxResults rows in range.
*/
@DoDelegate
@Deprecated
List<byte[]> getRowKeysInRange(TableReference tableRef, byte[] startRow, byte[] endRow, int maxResults);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
package com.palantir.atlasdb.keyvalue.cassandra;

import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.processors.AutoDelegate;
import java.util.List;

@AutoDelegate
public interface CassandraKeyValueService extends KeyValueService {
Expand All @@ -30,13 +28,4 @@ public interface CassandraKeyValueService extends KeyValueService {

@Override
boolean isInitialized();
/**
* Returns a sorted list of row keys in the specified range.
*
* @param tableRef table for which the request is made.
* @param startRow inclusive start of the row key range. Use empty byte array for unbounded.
* @param endRow inclusive end of the row key range. Use empty byte array for unbounded.
* @param maxResults the request only returns the first maxResults rows in range.
*/
List<byte[]> getRowKeysInRange(TableReference tableRef, byte[] startRow, byte[] endRow, int maxResults);
}
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ public boolean shouldTriggerCompactions() {
return delegate1.shouldTriggerCompactions() || delegate2.shouldTriggerCompactions();
}

@Override
public List<byte[]> getRowKeysInRange(TableReference tableRef, byte[] startRow, byte[] endRow, int maxResults) {
return delegate1.getRowKeysInRange(tableRef, startRow, endRow, maxResults);
}

@Override
public ListenableFuture<Map<Cell, Value>> getAsync(TableReference tableRef, Map<Cell, Long> timestampByCell) {
return delegate1.getAsync(tableRef, timestampByCell);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.palantir.atlasdb.AtlasDbConstants;
import com.palantir.atlasdb.encoding.PtBytes;
import com.palantir.atlasdb.keyvalue.api.BatchColumnRangeSelection;
import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweeping;
import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweepingRequest;
Expand All @@ -52,6 +53,7 @@
import com.palantir.common.base.ClosableIterator;
import com.palantir.common.base.ClosableIterators;
import com.palantir.common.exception.TableMappingNotFoundException;
import com.palantir.conjure.java.lib.Bytes;
import com.palantir.util.paging.TokenBackedBasicResultsPage;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -70,6 +72,7 @@
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.lang3.ArrayUtils;
Expand Down Expand Up @@ -597,6 +600,26 @@ public ClusterAvailabilityStatus getClusterAvailabilityStatus() {
return ClusterAvailabilityStatus.ALL_AVAILABLE;
}

@Override
public List<byte[]> getRowKeysInRange(TableReference tableRef, byte[] startRow, byte[] endRow, int maxResults) {
RangeRequest.Builder rangeRequest = RangeRequest.builder().startRowInclusive(startRow);
if (Arrays.equals(endRow, PtBytes.EMPTY_BYTE_ARRAY)) {
rangeRequest.endRowExclusive(PtBytes.EMPTY_BYTE_ARRAY);
} else {
rangeRequest.endRowExclusive(RangeRequests.nextLexicographicName(endRow));
}
try (ClosableIterator<RowResult<Value>> rowsWithColumns =
getRange(tableRef, rangeRequest.build(), Long.MAX_VALUE)) {
return rowsWithColumns.stream()
.map(RowResult::getRowName)
.map(Bytes::from)
.distinct()
.limit(maxResults)
.map(Bytes::asNewByteArray)
.collect(Collectors.toList());
}
}

private static IllegalArgumentException tableMappingException(TableReference tableReference) {
return new IllegalArgumentException(
new TableMappingNotFoundException("Table " + tableReference.getQualifiedName() + " does not exist"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,13 @@ public boolean shouldTriggerCompactions() {
return delegate.shouldTriggerCompactions();
}

@Override
public List<byte[]> getRowKeysInRange(TableReference tableRef, byte[] startRow, byte[] endRow, int maxResults) {
return maybeLog(
() -> delegate.getRowKeysInRange(tableRef, startRow, endRow, maxResults),
logTimeAndTable("getRowKeysInRange", tableRef));
}

@Override
public ListenableFuture<Map<Cell, Value>> getAsync(TableReference tableRef, Map<Cell, Long> timestampByCell) {
long startTime = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import com.palantir.common.base.ClosableIterator;
import com.palantir.common.concurrent.PTExecutors;
import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.UnsafeArg;
import com.palantir.tracing.DetachedSpan;
import com.palantir.util.paging.TokenBackedBasicResultsPage;
import java.util.Collection;
Expand Down Expand Up @@ -427,6 +429,18 @@ public boolean shouldTriggerCompactions() {
return delegate().shouldTriggerCompactions();
}

@Override
public List<byte[]> getRowKeysInRange(TableReference tableRef, byte[] startRow, byte[] endRow, int maxResults) {
try (CloseableTrace trace = startLocalTrace(
"getRowKeysInRange({}, {}, {}, {})",
LoggingArgs.safeTableOrPlaceholder(tableRef),
UnsafeArg.of("startRow", startRow),
UnsafeArg.of("endRow", endRow),
SafeArg.of("maxResults", maxResults))) {
return delegate().getRowKeysInRange(tableRef, startRow, endRow, maxResults);
}
}

@Override
public ListenableFuture<Map<Cell, Value>> getAsync(TableReference tableRef, Map<Cell, Long> timestampByCell) {
DetachedSpan detachedSpan = DetachedSpan.start(String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,11 @@ public boolean shouldTriggerCompactions() {
return true;
}

@Override
public List<byte[]> getRowKeysInRange(TableReference tableRef, byte[] startRow, byte[] endRow, int maxResults) {
throw new UnsupportedOperationException("getRowKeysInRange is only supported for Cassandra.");
}

@Override
public ListenableFuture<Map<Cell, Value>> getAsync(TableReference tableRef, Map<Cell, Long> timestampByCell) {
return Futures.immediateFuture(get(tableRef, timestampByCell));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,15 @@ public boolean shouldTriggerCompactions() {
return delegate.shouldTriggerCompactions();
}

@Override
public List<byte[]> getRowKeysInRange(TableReference tableRef, byte[] startRow, byte[] endRow, int maxResults) {
try {
return delegate().getRowKeysInRange(tableMapper.getMappedTableName(tableRef), startRow, endRow, maxResults);
} catch (TableMappingNotFoundException e) {
throw new IllegalArgumentException(e);
}
}

@Override
public ListenableFuture<Map<Cell, Value>> getAsync(TableReference tableRef, Map<Cell, Long> timestampByCell) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,11 @@ public boolean shouldTriggerCompactions() {
return delegates.stream().anyMatch(KeyValueService::shouldTriggerCompactions);
}

@Override
public List<byte[]> getRowKeysInRange(TableReference tableRef, byte[] startRow, byte[] endRow, int maxResults) {
return getDelegate(tableRef).getRowKeysInRange(tableRef, startRow, endRow, maxResults);
}

@Override
public ListenableFuture<Map<Cell, Value>> getAsync(TableReference tableRef, Map<Cell, Long> timestampByCell) {
return getDelegate(tableRef).getAsync(tableRef, timestampByCell);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,11 @@ public ClusterAvailabilityStatus getClusterAvailabilityStatus() {
throw new UnsupportedOperationException("getClusterAvailabilityStatus has not been implemented for Jdbc KVS");
}

@Override
public List<byte[]> getRowKeysInRange(TableReference tableRef, byte[] startRow, byte[] endRow, int maxResults) {
throw new UnsupportedOperationException("getRowKeysInRange is only supported for Cassandra.");
}

@Override
public ListenableFuture<Map<Cell, Value>> getAsync(TableReference tableRef, Map<Cell, Long> timestampByCell) {
return Futures.immediateFuture(this.get(tableRef, timestampByCell));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ private String getCell(Transaction txn, TableReference tableRef, String rowName,
return valueBytes != null ? PtBytes.toString(valueBytes) : null;
}

void putDirect(String rowName, String columnName, String value, long timestamp) {
protected void putDirect(String rowName, String columnName, String value, long timestamp) {
Cell cell = createCell(rowName, columnName);
byte[] valueBytes = PtBytes.toBytes(value);
keyValueService.put(TEST_TABLE, ImmutableMap.of(cell, valueBytes), timestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,62 @@
*/
package com.palantir.atlasdb.keyvalue;

import static org.assertj.core.api.Assertions.assertThat;

import com.palantir.atlasdb.encoding.PtBytes;
import com.palantir.atlasdb.keyvalue.impl.TestResourceManager;
import com.palantir.atlasdb.transaction.impl.AbstractTransactionTest;
import org.junit.ClassRule;
import org.junit.Test;

public class MemoryTransactionTest extends AbstractTransactionTest {
@ClassRule
public static final TestResourceManager TRM = TestResourceManager.inMemory();

private static final byte[] ROW_1 = PtBytes.toBytes("row1");
private static final byte[] ZERO = new byte[0];

public MemoryTransactionTest() {
super(TRM, TRM);
}

@Test
public void testKeyValueRangeColumnSelectionEndInclusive() {
setup();
assertThat(keyValueService.getRowKeysInRange(TEST_TABLE, ZERO, ROW_1, 9))
.containsExactly(ROW_1);
}

@Test
public void testKeyValueRangeColumnSelectionEntireTable() {
setup();
byte[] row1 = PtBytes.toBytes("row1");
assertThat(keyValueService.getRowKeysInRange(TEST_TABLE, ZERO, ZERO, 9))
.containsExactly(row1, PtBytes.toBytes("row1a"), PtBytes.toBytes("row2"));
}

@Test
public void testKeyValueRangeColumnSelectionStartInclusive() {
setup();
byte[] row1 = PtBytes.toBytes("row1");
assertThat(keyValueService.getRowKeysInRange(TEST_TABLE, ROW_1, ZERO, 9))
.containsExactly(row1, PtBytes.toBytes("row1a"), PtBytes.toBytes("row2"));
}

@Test
public void testKeyValueRangeColumnSelectionMaxResults() {
setup();
byte[] row1 = PtBytes.toBytes("row1");
assertThat(keyValueService.getRowKeysInRange(TEST_TABLE, ZERO, ZERO, 2))
.containsExactly(row1, PtBytes.toBytes("row1a"));
}

private void setup() {
putDirect("row1", "col1", "v1", 0);
putDirect("row1", "col2", "v2", 2);
putDirect("row1", "col4", "v5", 3);
putDirect("row1a", "col4", "v5", 100);
putDirect("row2", "col2", "v3", 1);
putDirect("row2", "col4", "v4", 6);
}
}
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-5236.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: improvement
improvement:
description: Added the getRowKeysInRange method to the KVS interface, so we can access the Cassandra KVS implementation from a transaction manager. This may be removed at any time, and if you wish to use it, contact the atlasdb team for support.
links:
- https://github.com/palantir/atlasdb/pull/5236

0 comments on commit d55de02

Please sign in to comment.