diff --git a/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceSweepTaskRunnerIntegrationTest.java b/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceSweepTaskRunnerIntegrationTest.java index 0277879b14b..c3485b2903f 100644 --- a/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceSweepTaskRunnerIntegrationTest.java +++ b/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceSweepTaskRunnerIntegrationTest.java @@ -15,11 +15,19 @@ */ package com.palantir.atlasdb.keyvalue.cassandra; +import java.util.Arrays; + +import org.apache.commons.lang3.RandomStringUtils; import org.junit.Assert; +import org.junit.Assume; import org.junit.ClassRule; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig; import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfigManager; +import com.palantir.atlasdb.cassandra.ImmutableCassandraKeyValueServiceConfig; import com.palantir.atlasdb.containers.CassandraContainer; import com.palantir.atlasdb.containers.Containers; import com.palantir.atlasdb.keyvalue.api.KeyValueService; @@ -27,17 +35,45 @@ import com.palantir.atlasdb.protos.generated.TableMetadataPersistence; import com.palantir.atlasdb.sweep.AbstractSweepTaskRunnerTest; +@RunWith(Parameterized.class) public class CassandraKeyValueServiceSweepTaskRunnerIntegrationTest extends AbstractSweepTaskRunnerTest { @ClassRule public static final Containers CONTAINERS = new Containers( CassandraKeyValueServiceSweepTaskRunnerIntegrationTest.class) .with(new CassandraContainer()); + @Parameterized.Parameter + public boolean useColumnBatchSize; + + @Parameterized.Parameters(name = "Use column batch size parameter = {0}") + public static Iterable parameters() { + return Arrays.asList(true, false); + } + + @Override protected KeyValueService getKeyValueService() { + CassandraKeyValueServiceConfig config = useColumnBatchSize + ? ImmutableCassandraKeyValueServiceConfig.copyOf(CassandraContainer.KVS_CONFIG) + .withTimestampsGetterBatchSize(10) + : CassandraContainer.KVS_CONFIG; + return CassandraKeyValueServiceImpl.create( - CassandraKeyValueServiceConfigManager.createSimpleManager( - CassandraContainer.KVS_CONFIG), CassandraContainer.LEADER_CONFIG); + CassandraKeyValueServiceConfigManager.createSimpleManager(config), CassandraContainer.LEADER_CONFIG); + } + + @Test + public void should_not_oom_when_there_are_many_large_values_to_sweep() { + Assume.assumeTrue("should_not_oom test will always fail if column batch size is not set!", useColumnBatchSize); + + createTable(TableMetadataPersistence.SweepStrategy.CONSERVATIVE); + + long numInsertions = 100; + insertMultipleValues(numInsertions); + + long sweepTimestamp = numInsertions + 1; + SweepResults results = completeSweep(sweepTimestamp); + Assert.assertEquals(numInsertions - 1, results.getStaleValuesDeleted()); } @Test @@ -53,4 +89,14 @@ public void should_return_values_for_multiple_columns_when_sweeping() { Assert.assertEquals(28, results.getStaleValuesDeleted()); } + private void insertMultipleValues(long numInsertions) { + for (int ts = 1; ts <= numInsertions; ts++) { + System.out.println("putting with ts = " + ts); + putIntoDefaultColumn("row", makeLongRandomString(), ts); + } + } + + private String makeLongRandomString() { + return RandomStringUtils.random(1_000_000); + } } diff --git a/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CellPagerIntegrationTest.java b/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CellPagerIntegrationTest.java deleted file mode 100644 index 4381551fe8e..00000000000 --- a/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CellPagerIntegrationTest.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.cassandra; - -import static org.junit.Assert.assertEquals; - -import java.util.List; - -import org.apache.cassandra.thrift.Column; -import org.apache.cassandra.thrift.ConsistencyLevel; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.mockito.Mockito; -import org.slf4j.Logger; - -import com.google.common.collect.ImmutableList; -import com.palantir.atlasdb.AtlasDbConstants; -import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfigManager; -import com.palantir.atlasdb.containers.CassandraContainer; -import com.palantir.atlasdb.containers.Containers; -import com.palantir.atlasdb.encoding.PtBytes; -import com.palantir.atlasdb.keyvalue.api.TableReference; -import com.palantir.atlasdb.keyvalue.cassandra.paging.CassandraRawCellValue; -import com.palantir.atlasdb.keyvalue.cassandra.paging.CellPager; -import com.palantir.atlasdb.keyvalue.cassandra.paging.CellPagerBatchSizingStrategy; -import com.palantir.atlasdb.keyvalue.cassandra.paging.ImmutableCassandraRawCellValue; -import com.palantir.atlasdb.keyvalue.cassandra.paging.SingleRowColumnPager; -import com.palantir.atlasdb.keyvalue.impl.TestDataBuilder; -import com.palantir.atlasdb.keyvalue.impl.TracingPrefsConfig; - -public class CellPagerIntegrationTest { - protected static final TableReference TEST_TABLE = TableReference.createFromFullyQualifiedName( - "cell_pager.test_table"); - - @ClassRule - public static final Containers CONTAINERS = new Containers(CellPagerIntegrationTest.class) - .with(new CassandraContainer()); - - private static CassandraKeyValueService kvs = null; - private CellPagerBatchSizingStrategy pageSizingStrategy = Mockito.mock(CellPagerBatchSizingStrategy.class); - private CellPager cellPager = null; - - @BeforeClass - public static void setUpKvs() { - kvs = CassandraKeyValueServiceImpl.create( - CassandraKeyValueServiceConfigManager.createSimpleManager(CassandraContainer.KVS_CONFIG), - CassandraContainer.LEADER_CONFIG, - Mockito.mock(Logger.class)); - } - - @Before - public void setUp() { - kvs.createTable(TEST_TABLE, AtlasDbConstants.GENERIC_TABLE_METADATA); - kvs.truncateTable(TEST_TABLE); - TracingQueryRunner queryRunner = new TracingQueryRunner( - Mockito.mock(Logger.class), - new TracingPrefsConfig()); - SingleRowColumnPager singleRowPager = new SingleRowColumnPager(kvs.getClientPool(), queryRunner); - cellPager = new CellPager(singleRowPager, kvs.getClientPool(), queryRunner, pageSizingStrategy); - } - - @Test - public void testPaging() { - // Row 1: 5 cells - // Row 2: 1 cell - // Row 3: 2 cells - // Row 4: 1 cell - // Row 5: 1 cell - testDataBuilder() - .put(1, 1, 30L, "a") - .put(1, 1, 20L, "b") - .put(1, 1, 10L, "c") - .put(1, 2, 20L, "d") - .put(1, 2, 10L, "e") - .put(2, 3, 10L, "f") - .put(3, 1, 20L, "g") - .put(3, 1, 10L, "h") - .put(4, 2, 10L, "i") - .put(5, 2, 10L, "j") - .store(); - Mockito.doReturn(new CellPagerBatchSizingStrategy.PageSizes(3, 2)) - .when(pageSizingStrategy).computePageSizes(Mockito.anyInt(), Mockito.any()); - List> pages = ImmutableList.copyOf(cellPager.createCellIterator( - TEST_TABLE, PtBytes.EMPTY_BYTE_ARRAY, 2, ConsistencyLevel.ALL)); - assertEquals( - ImmutableList.of( - // incomplete part of row 1 fetched with get_range_slice - ImmutableList.of(val(1, 1, 30L, "a"), val(1, 1, 20L, "b")), - // first page of the remainder of row 1 fetched with get_slice - ImmutableList.of(val(1, 1, 10L, "c"), val(1, 2, 20L, "d")), - // second page of the remainder of row 1 fetched with get_slice - ImmutableList.of(val(1, 2, 10L, "e")), - // remaining rows already fetched with get_range_slice - ImmutableList.of(val(2, 3, 10L, "f"), val(3, 1, 20L, "g"), val(3, 1, 10L, "h")), - // the second get_range_slice - ImmutableList.of(val(4, 2, 10L, "i"), val(5, 2, 10L, "j"))), - pages); - } - - @Test - public void testStartRow() { - testDataBuilder() - .put(1, 1, 10L, "foo") - .put(2, 1, 10L, "bar") - .store(); - Mockito.doReturn(new CellPagerBatchSizingStrategy.PageSizes(3, 2)) - .when(pageSizingStrategy).computePageSizes(Mockito.anyInt(), Mockito.any()); - List> pages = ImmutableList.copyOf(cellPager.createCellIterator( - TEST_TABLE, TestDataBuilder.row(2), 2, ConsistencyLevel.ALL)); - assertEquals(ImmutableList.of(ImmutableList.of(val(2, 1, 10L, "bar"))), pages); - } - - private static CassandraRawCellValue val(int row, int col, long ts, String value) { - Column column = new Column(); - column.setName(CassandraKeyValueServices.makeCompositeBuffer(TestDataBuilder.row(col), ts)); - column.setValue(TestDataBuilder.value(value)); - column.setTimestamp(ts); - return ImmutableCassandraRawCellValue.builder() - .rowKey(TestDataBuilder.row(row)) - .column(column) - .build(); - } - - private TestDataBuilder testDataBuilder() { - return new TestDataBuilder(kvs, TEST_TABLE); - } - -} diff --git a/atlasdb-cassandra/build.gradle b/atlasdb-cassandra/build.gradle index 07e405d85c1..c47c2199f87 100644 --- a/atlasdb-cassandra/build.gradle +++ b/atlasdb-cassandra/build.gradle @@ -55,8 +55,6 @@ dependencies { processor group: 'org.immutables', name: 'value' processor 'com.google.auto.service:auto-service:1.0-rc2' processor project(":atlasdb-processors") - - explicitShadow 'com.palantir.patches.sourceforge:trove3:' + libVersions.trove } shadowJar { diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueService.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueService.java index bc7c07a7791..329fe973569 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueService.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueService.java @@ -16,6 +16,7 @@ package com.palantir.atlasdb.keyvalue.cassandra; import com.palantir.atlasdb.keyvalue.api.KeyValueService; +import com.palantir.atlasdb.keyvalue.impl.GetCandidateCellsForSweepingShim; public interface CassandraKeyValueService extends KeyValueService { /** diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServices.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServices.java index f7f8d9d4a53..c4e640bfcde 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServices.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServices.java @@ -175,7 +175,7 @@ static String encodeAsHex(byte[] array) { return "0x" + PtBytes.encodeHexString(array); } - public static ByteBuffer makeCompositeBuffer(byte[] colName, long positiveTimestamp) { + static ByteBuffer makeCompositeBuffer(byte[] colName, long positiveTimestamp) { assert colName.length <= 1 << 16 : "Cannot use column names larger than 64KiB, was " + colName.length; ByteBuffer buffer = ByteBuffer @@ -218,7 +218,7 @@ static Pair decompose(ByteBuffer inputComposite) { * Convenience method to get the name buffer for the specified column and * decompose it into the name and timestamp. */ - public static Pair decomposeName(Column column) { + static Pair decomposeName(Column column) { ByteBuffer nameBuffer; if (column.isSetName()) { nameBuffer = column.bufferForName(); diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CassandraRangePagingIterable.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CassandraRangePagingIterable.java index d129b1c0e4a..c159e51e247 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CassandraRangePagingIterable.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CassandraRangePagingIterable.java @@ -27,6 +27,7 @@ import com.google.common.base.Supplier; import com.palantir.atlasdb.keyvalue.api.ColumnSelection; import com.palantir.atlasdb.keyvalue.api.RangeRequest; +import com.palantir.atlasdb.keyvalue.api.RangeRequests; import com.palantir.atlasdb.keyvalue.api.RowResult; import com.palantir.atlasdb.keyvalue.cassandra.ResultsExtractor; import com.palantir.util.paging.AbstractPagingIterable; @@ -42,17 +43,17 @@ public class CassandraRangePagingIterable private final int batchHint; private final ColumnSelection selection; - private final RowRangeLoader rowRangeLoader; + private final RowGetter rowGetter; private final SlicePredicate slicePredicate; public CassandraRangePagingIterable( - RowRangeLoader rowRangeLoader, + RowGetter rowGetter, SlicePredicate slicePredicate, ColumnGetter columnGetter, RangeRequest rangeRequest, Supplier> resultsExtractor, long timestamp) { - this.rowRangeLoader = rowRangeLoader; + this.rowGetter = rowGetter; this.slicePredicate = slicePredicate; this.columnGetter = columnGetter; this.rangeRequest = rangeRequest; @@ -89,8 +90,8 @@ private TokenBackedBasicResultsPage, byte[]> getSinglePage(byte[] s } private List getRows(byte[] startKey) throws Exception { - KeyRange keyRange = KeyRanges.createKeyRange(startKey, rangeRequest.getEndExclusive(), batchHint); - return rowRangeLoader.getRows(keyRange, slicePredicate); + KeyRange keyRange = getKeyRange(startKey, rangeRequest.getEndExclusive()); + return rowGetter.getRows(keyRange); } private Map> getColumns(List firstPage) { @@ -114,4 +115,15 @@ private TokenBackedBasicResultsPage, byte[]> pageWithNoMoreResultsA return SimpleTokenBackedResultsPage.create(rangeRequest.getEndExclusive(), page.getResults(), false); } + private KeyRange getKeyRange(byte[] startKey, byte[] endExclusive) { + KeyRange keyRange = new KeyRange(batchHint); + keyRange.setStart_key(startKey); + if (endExclusive.length == 0) { + keyRange.setEnd_key(endExclusive); + } else { + // We need the previous name because this is inclusive, not exclusive + keyRange.setEnd_key(RangeRequests.previousLexicographicName(endExclusive)); + } + return keyRange; + } } diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CassandraRawCellValue.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CassandraRawCellValue.java deleted file mode 100644 index bd616a242d4..00000000000 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CassandraRawCellValue.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.cassandra.paging; - -import org.apache.cassandra.thrift.Column; -import org.immutables.value.Value; - -@Value.Immutable -public interface CassandraRawCellValue { - - byte[] getRowKey(); - - Column getColumn(); - -} diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CellPager.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CellPager.java deleted file mode 100644 index 55b1ed72e53..00000000000 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CellPager.java +++ /dev/null @@ -1,294 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.cassandra.paging; - -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Queue; - -import org.apache.cassandra.thrift.Column; -import org.apache.cassandra.thrift.ColumnOrSuperColumn; -import org.apache.cassandra.thrift.ConsistencyLevel; -import org.apache.cassandra.thrift.KeyRange; -import org.apache.cassandra.thrift.KeySlice; -import org.apache.cassandra.thrift.SlicePredicate; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; -import com.palantir.atlasdb.encoding.PtBytes; -import com.palantir.atlasdb.keyvalue.api.RangeRequests; -import com.palantir.atlasdb.keyvalue.api.TableReference; -import com.palantir.atlasdb.keyvalue.cassandra.CassandraClientPool; -import com.palantir.atlasdb.keyvalue.cassandra.TracingQueryRunner; -import com.palantir.atlasdb.keyvalue.cassandra.paging.CellPagerBatchSizingStrategy.PageSizes; -import com.palantir.atlasdb.keyvalue.cassandra.thrift.SlicePredicates; -import com.palantir.atlasdb.keyvalue.cassandra.thrift.SlicePredicates.Limit; -import com.palantir.atlasdb.keyvalue.cassandra.thrift.SlicePredicates.Range; -import com.palantir.common.annotation.Output; - -/* - * A class for iterating uniformly through raw cells, i.e. (rowName, columnName, timestamp) triplets, in a given table - * in lexicographic order (with timestamps being ordered in reverse). - * - * By "uniformly", we mean that the size of each page we load at once is bounded in terms of the number of - * raw cells rather than the number of rows, and page boundaries don't coincide with row (partition) boundaries. - * - * Cassandra Thrift API doesn't provide a way to do this directly. So we have to resort to a "clever" hybrid approach: - * fetch blocks of row "heads" using the "get_range_slices" call with a column-per-row limit, and then page - * through the remainder of rows that exceed the fetched "head" limit using the "get_slice" call. - * - * Calling "get_range_slices" can be visualized as getting a "rectangle" of data from Cassandra, whose width and height - * are equal to SliceRange.count and KeyRange.count, respectively. Consider an example table: - * - * |<-w=4->| - * ._______. ___ - * Row 1 |a b c | ^ - * Row 2 |d e f g|h i j | - * Row 3 |k | h=5 - * Row 4 |l m n o| | - * Row 5 |p______| _v_ - * Row 6 q r - * Row 7 s t u - * Row 8 v w x y z - * - * Here, we set KeyRange.count = 5 and SliceRange.count = 4. - * Since 3 cells are returned for Row 1 while we requested 4 cells per row, we can conclude that Row 1 is fully fetched, - * and thus we can return it to the user. However, Row 2 is not fully fetched, so we have to page through its remainder. - * - * Overall, paging through the entire table in the picture looks like this: - * - * 1) Get the first rectangle of data: - * get_range_slices(KeyRange.start_key = [], KeyRange.count = 5, SliceRange.count = 4) - * -> returns rows 1-5 with at most 4 columns in each - * 2) Return Row 1 and the first four cells of Row 2 to the user - * 3) Get the remainder of Row 2: - * get_slice(key = "Row 2", SliceRange.start = , SliceRange.count = cellBatchHint) - * -> [ h i j ] - * 4) Return [ h i j ] to the user - * 5) Return Rows 3 and 4 to the user (they were already fetched in step 1) - * 6) Since we got exactly 4 cells for row 4, we don't really know whether there are more, so we have to attempt - * to get its remainder: - * get_slice(key = "Row 4", SliceRange.start = , SliceRange,count = cellBatchHint) - * -> [] - * 7) Return Row 5 to the user - * 8) Now we finished the first rectangle, so it's time to get another one: - * get_range_slices(KeyRange.start_key = , KeyRange.count = 5, SliceRange.count = 4) - * -> returns rows 6-8 with at most 4 columns in each - * 9) Return Row 6, Row 7 and the first four columns of Row 8 to the user - * 10) Get the remainder of Row 8: - * get_slice(key = "Row 8", SliceRange.start = , SliceRange,count = cellBatchHint) - * -> [ z ] - * 11) Return [ z ] to the user - * - * Note that getting the remainder of a row might require more than one call "get_slice" if the row is wide. - * - * We want the area of each rectangle to be roughly equal to "cellBatchHint" which is supplied by the user. In order to - * choose reasonable width and height of the rectangle given the area, we estimate the ditribution of row widths - * and try to make a somewhat intelligent decision based on that data (see CellPagerBatchSizingStrategy for details). - */ -public class CellPager { - private final SingleRowColumnPager singleRowPager; - private final CassandraClientPool clientPool; - private final TracingQueryRunner queryRunner; - private final CellPagerBatchSizingStrategy pageSizeStrategy; - - public CellPager(SingleRowColumnPager singleRowPager, - CassandraClientPool clientPool, - TracingQueryRunner queryRunner, - CellPagerBatchSizingStrategy pageSizeStrategy) { - this.singleRowPager = singleRowPager; - this.clientPool = clientPool; - this.queryRunner = queryRunner; - this.pageSizeStrategy = pageSizeStrategy; - } - - public Iterator> createCellIterator(TableReference tableRef, - byte[] startRowInclusive, - int cellBatchHint, - ConsistencyLevel consistencyLevel) { - Preconditions.checkNotNull(startRowInclusive, - "Use an empty byte array rather than null to start from the beginning of the table"); - Preconditions.checkArgument(cellBatchHint > 0, "cellBatchHint must be strictly positive"); - RowRangeLoader rowRangeLoader = new RowRangeLoader(clientPool, queryRunner, consistencyLevel, tableRef); - return new PageIterator(tableRef, rowRangeLoader, cellBatchHint, consistencyLevel, startRowInclusive); - } - - private class PageIterator extends AbstractIterator> { - private final TableReference tableRef; - private final RowRangeLoader rowRangeLoader; - private final int cellBatchHint; - private final ConsistencyLevel consistencyLevel; - - private byte[] nextRow; - - private final Queue>> cellsToReturn = new ArrayDeque<>(); - private final StatsAccumulator stats = new StatsAccumulator(); - private PageSizes pageSizes = null; - - PageIterator(TableReference tableRef, RowRangeLoader rowRangeLoader, int cellBatchHint, - ConsistencyLevel consistencyLevel, byte[] nextRow) { - this.tableRef = tableRef; - this.rowRangeLoader = rowRangeLoader; - this.cellBatchHint = cellBatchHint; - this.consistencyLevel = consistencyLevel; - this.nextRow = nextRow; - } - - @Override - protected List computeNext() { - while (true) { - if (cellsToReturn.isEmpty()) { - // We have no pre-loaded rows from the previous "rectangle", so we need to fetch more if we can - if (nextRow == null) { - return endOfData(); - } else { - // Get the next "rectangle" of data (step 8 in the example above) and try again - fetchNextRange(); - } - } else if (cellsToReturn.peek().hasNext()) { - // Scan through the remainder of the row (steps 3, 6 and 10 in the example above) - return cellsToReturn.peek().next(); - } else { - // We exhausted an iterator - pop it from the queue and try again - cellsToReturn.poll(); - } - } - } - - private void fetchNextRange() { - pageSizes = pageSizeStrategy.computePageSizes(cellBatchHint, stats); - KeyRange range = KeyRanges.createKeyRange(nextRow, PtBytes.EMPTY_BYTE_ARRAY, pageSizes.rowLimit); - SlicePredicate predicate = SlicePredicates.create(Range.ALL, Limit.of(pageSizes.columnPerRowLimit)); - List slices = rowRangeLoader.getRows(range, predicate); - if (slices.isEmpty()) { - nextRow = null; - } else { - cellsToReturn.addAll(splitFetchedRowsIntoTasks( - slices, - pageSizes.columnPerRowLimit, - (rowKey, lastSeenColumn) -> singleRowPager.createColumnIterator( - tableRef, rowKey, cellBatchHint, lastSeenColumn, consistencyLevel), - stats)); - computeNextStartRow(slices); - } - } - - private void computeNextStartRow(List slices) { - if (slices.size() < pageSizes.rowLimit) { - nextRow = null; - } else { - byte[] lastSeenRow = Iterables.getLast(slices).getKey(); - nextRow = RangeRequests.getNextStartRowUnlessTerminal( - false, RangeRequests.nextLexicographicName(lastSeenRow)); - } - } - } - - @VisibleForTesting - interface RowRemainderIteratorFactory { - Iterator> createIteratorForRemainderOfRow(byte[] rowKey, Column lastSeenColumn); - } - - @VisibleForTesting - static List>> splitFetchedRowsIntoTasks( - List slices, - int columnPerRowLimit, - RowRemainderIteratorFactory rowRemainderIteratorFactory, - @Output StatsAccumulator statsToAccumulate) { - // Split the returned slices into single partially fetched rows and contiguous runs of fully fetched rows - List>> ret = new ArrayList<>(); - List loadedRows = new ArrayList<>(); - for (KeySlice slice : slices) { - loadedRows.add(slice); - if (isFullyFetched(slice, columnPerRowLimit)) { - statsToAccumulate.add(slice.getColumnsSize()); - } else { - ret.add(ImmutableList.of(keySlicesToCells(loadedRows)).iterator()); - loadedRows.clear(); - // If the row was only partially fetched, we enqueue an iterator to page through - // the remainder of that single row. - Column lastSeenColumn = Iterables.getLast(slice.getColumns()).column; - Iterator> rawColumnIter = rowRemainderIteratorFactory - .createIteratorForRemainderOfRow(slice.getKey(), lastSeenColumn); - Iterator> statsUpdatingIter = new StatsUpdatingIterator( - rawColumnIter, statsToAccumulate, slice.getColumnsSize()); - ret.add(Iterators.transform( - statsUpdatingIter, cols -> columnsToCells(cols, slice.getKey()))); - } - } - if (!loadedRows.isEmpty()) { - ret.add(ImmutableList.of(keySlicesToCells(loadedRows)).iterator()); - } - return ret; - } - - private static boolean isFullyFetched(KeySlice slice, int columnPerRowLimit) { - return slice.getColumnsSize() < columnPerRowLimit; - } - - private static class StatsUpdatingIterator extends AbstractIterator> { - private final Iterator> delegate; - private final StatsAccumulator stats; - private long columnCount = 0; - - StatsUpdatingIterator(Iterator> delegate, StatsAccumulator stats, long columnCount) { - this.delegate = delegate; - this.stats = stats; - this.columnCount = columnCount; - } - - @Override - protected List computeNext() { - if (delegate.hasNext()) { - List cols = delegate.next(); - columnCount += cols.size(); - return cols; - } else { - stats.add(columnCount); - return endOfData(); - } - } - } - - private static List keySlicesToCells(List slices) { - List ret = new ArrayList<>(); - for (KeySlice slice : slices) { - for (ColumnOrSuperColumn col : slice.getColumns()) { - ret.add(ImmutableCassandraRawCellValue.builder() - .rowKey(slice.getKey()) - .column(col.getColumn()) - .build()); - } - } - return ret; - } - - private static List columnsToCells(List columns, byte[] rowKey) { - return Lists.transform(columns, col -> ImmutableCassandraRawCellValue.builder() - .rowKey(rowKey) - .column(col.getColumn()) - .build()); - } - -} diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CellPagerBatchSizingStrategy.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CellPagerBatchSizingStrategy.java deleted file mode 100644 index f69a3da0164..00000000000 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CellPagerBatchSizingStrategy.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.cassandra.paging; - -public class CellPagerBatchSizingStrategy { - public PageSizes computePageSizes(int cellBatchHint, StatsAccumulator stats) { - if (stats.count() == 0) { - // No data yet: go with a square - return getExactPageSize(Math.round(Math.sqrt(cellBatchHint)), cellBatchHint); - } else { - return computePageSizesAssumingNormalDistribution( - cellBatchHint, stats.mean(), stats.populationStandardDeviation()); - } - } - - /* Given the batch size in cells supplied by the user, we want to figure out the optimal (or at least decent) - * batching parameters for Cassandra. The `get_range_slices' call allows us to load a "rectangle" - * of cells at once: - * - * |<-w=4->| - * ._______. ___ - * Row 1 |x x x x| ^ (each 'x' is a (cell, ts) pair, i.e. a single "column" in Cassandra) - * Row 2 |x x | | - * Row 3 |x x x | h=5 - * Row 4 |x x x x|x x | - * Row 5 |x______| _v_ - * Row 6 x x - * Row 7 x x x x - * Row 8 x x x x x - * ... - * Row N x x - * - * Here, the height of the rectangle "h" is KeyRange.count and the width "w" is SliceRange.count. - * We want the area of our rectangle to be on the order of "cellBatchHint" supplied by the user, i.e. - * - * w * h ~ cellBatchHint (1) - * - * The question is how to choose "w" and "h" appropriately. If a row contains "w" or more columns (like rows 1 and - * 4 in the picture above), we need to make at least one extra round trip to retrieve that row individually - * using the "get_slice" call. Thus, setting "w" too low will result in extra round trips to retrieve the individual - * rows, while setting it too high would make "h" low (since "w" and "h" are inversely proportional as in (1)), - * resulting in more round trips to make progress "vertically". - * - * It makes sense to try to minimize the number of round trips to Cassandra. Let's assume that rows will be short - * enough to fit in "cellBatchHint", i.e. whenever we need to fetch the remainder of a row, we can do so with - * a single call to "get_slice" with SliceRange.count set to "cellBatchHint". (If the assumption doesn't hold - * and our rows are wide, then a particular choice of "w" and "h" doesn't have as much impact on the number of round - * trips -- we need to page through the individual rows anyway). - * - * Then for a table with N rows, the number of round trips is approximately equal to - * - * (N / h) + (# of rows with >= w columns) - * - * Let F(k) be the cumulative distribution function of the number of columns per row. Also, remember from (1) - * that "h" is on the order of "cellBatchHint / w". Hence the above expression is approximately equal to - * - * N * w / cellBatchHint + N * (1 - F(w - 1)) - * - * Minimizing this is equivalent to minimizing - * - * w / cellBatchHint - F(w - 1) (2) - * - * So if we had an adequate model of F, we could estimate its parameters and minimize (2) with respect to "w". - * The problem is that the nature of F can depend on the format of a particular table. For example, an append-only - * table with five named columns where all five columns are always written at once for every row, will have - * a degenerate distribution localized at 5; a single-column table with mutations might exhibit a Poisson-like - * behavior; while a table with dynamic columns can be shaped according to an arbitrary distribution depending - * on the user data. - * - * One other possibility is to compute a discrete approximation to the PDF/CDF instead of choosing a fixed family - * of distributions and estimating its parameters, but that seems both excessive and expensive. - * - * Pragmatically speaking, we want a formula like - * - * w = < estimated mean # columns per row > + 1 + g(sigma, cellBatchHint) (3) - * - * where g(sigma, cellBatchHint) is some term that grows with variance sigma^2. The term is necessary to efficiently - * handle both the "append-only fixed-column table" (zero variance) and the "partially dirty table" (high variance) - * cases. - * - * If we minimize (2) in the case of F being the normal distribution, we get (after doing all the calculus): - * - * w = mu + 1 + sigma * sqrt(-2 * ln(sqrt(2*pi*sigma^2) / cellBatchHint)) (4) - * - * where "mu" is the mean and "sigma" is the standard deviation. This actually looks like (3) for sufficiently - * small values of sigma, with - * - * g(sigma, cellBatchHint) = sigma * sqrt(-2 * ln(sqrt(2*pi*sigma^2) / cellBatchHint)) (5) - * - * Of course, a real table can't have normally distributed row widths (simply because the - * row width is a discrete quantity), but: - * - * - the normal distribution approximates the "clean narrow table" (low variance) case quite well, - * and is a half-decent approximation to the Poisson distribution (the "single-column dirty" case); - * - it's easy to estimate the parameters for it (mu and sigma), as well as to do the calculus exercise - * to get (4); - * - the numbers it produces actually seem reasonable. - * - * To illustrate the last point, here is a table of g(sigma, cellBatchHint) for some values of sigma. (The physical - * meaning of "g" is the number of extra columns to fetch per row in addition to the mean number of columns per row) - * - * cellBatchHint = 1000 cellBatchHint = 100 - * - * sigma | g(sigma, 1000) sigma | g(sigma, 100) - * --------+--------------- --------+-------------- - * 0 | 0.00 0 | 0.00 - * 0.5 | 1.82 0.5 | 1.48 - * 1 | 3.46 1 | 2.72 - * 2 | 6.51 2 | 4.89 - * 5 | 14.80 5 | 10.19 - * 10 | 27.15 10 | 16.64 - * 20 | 48.93 - * 50 | 101.90 - * 100 | 166.35 - */ - private static PageSizes computePageSizesAssumingNormalDistribution( - int cellBatchHint, - double columnsPerRowMean, - double columnsPerRowStdDev) { - double sigma = Math.min(columnsPerRowStdDev, cellBatchHint * 0.2); - // If sigma > cellBatchHint/sqrt(2*pi), then the formula doesn't work. - // At that point, variance in row length is so big that we can't do much: - // just use the minimum columnPerRowLimit to at least minimize the number of calls - // to get_range_slice. - double logArg = Math.min(1.0, SQRT_2_PI * sigma / cellBatchHint); - // "extraColumnsToFetch" is the value of "g" defined in (5) - // Mathematically, the limit of x*ln(x) is 0 as x->0, but numerically we need to compute the logarithm - // separately (unless we use a Taylor approximation to x*ln(x) instead, which we don't want), - // so we handle the case of small "s" separately to avoid the logarithm blow up at 0. - double extraColumnsToFetch = logArg < 1e-12 ? 0.0 : sigma * Math.sqrt(-2.0 * Math.log(logArg)); - return getExactPageSize(columnsPerRowMean + 1.0 + extraColumnsToFetch, cellBatchHint); - } - - private static PageSizes getExactPageSize(double desiredColumnPerRowLimit, int cellBatchHint) { - int approximateColumnPerRowLimit = Math.max(2, - Math.min(cellBatchHint, (int) Math.round(desiredColumnPerRowLimit))); - int rowLimit = Math.max(1, cellBatchHint / approximateColumnPerRowLimit); - int finalColumnPerRowLimit = Math.max(2, cellBatchHint / rowLimit); - return new PageSizes(rowLimit, finalColumnPerRowLimit); - } - - public static class PageSizes { - final int rowLimit; - final int columnPerRowLimit; - - public PageSizes(int rowLimit, int columnPerRowLimit) { - this.rowLimit = rowLimit; - this.columnPerRowLimit = columnPerRowLimit; - } - } - - private static final double SQRT_2_PI = Math.sqrt(2.0 * Math.PI); - -} diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/KeyRanges.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/KeyRanges.java deleted file mode 100644 index 75642ebb411..00000000000 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/KeyRanges.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.cassandra.paging; - -import org.apache.cassandra.thrift.KeyRange; - -import com.palantir.atlasdb.keyvalue.api.RangeRequests; - -public final class KeyRanges { - private KeyRanges() {} - - public static KeyRange createKeyRange(byte[] startKey, byte[] endExclusive, int limit) { - KeyRange keyRange = new KeyRange(limit); - keyRange.setStart_key(startKey); - if (endExclusive.length == 0) { - keyRange.setEnd_key(endExclusive); - } else { - // We need the previous name because this is inclusive, not exclusive - keyRange.setEnd_key(RangeRequests.previousLexicographicName(endExclusive)); - } - return keyRange; - } - -} diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/RowRangeLoader.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/RowGetter.java similarity index 82% rename from atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/RowRangeLoader.java rename to atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/RowGetter.java index 52edab589a4..e9a2feba7c3 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/RowRangeLoader.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/RowGetter.java @@ -25,7 +25,6 @@ import org.apache.cassandra.thrift.KeySlice; import org.apache.cassandra.thrift.SlicePredicate; import org.apache.cassandra.thrift.UnavailableException; -import org.apache.thrift.TException; import com.palantir.atlasdb.keyvalue.api.InsufficientConsistencyException; import com.palantir.atlasdb.keyvalue.api.TableReference; @@ -34,21 +33,24 @@ import com.palantir.atlasdb.keyvalue.cassandra.TracingQueryRunner; import com.palantir.common.base.FunctionCheckedException; -public class RowRangeLoader { +public class RowGetter { private CassandraClientPool clientPool; private TracingQueryRunner queryRunner; private ConsistencyLevel consistency; private TableReference tableRef; + private ColumnFetchMode fetchMode; - public RowRangeLoader( + public RowGetter( CassandraClientPool clientPool, TracingQueryRunner queryRunner, ConsistencyLevel consistency, - TableReference tableRef) { + TableReference tableRef, + ColumnFetchMode fetchMode) { this.clientPool = clientPool; this.queryRunner = queryRunner; this.consistency = consistency; this.tableRef = tableRef; + this.fetchMode = fetchMode; } public List getRows(KeyRange keyRange, SlicePredicate slicePredicate) { @@ -56,9 +58,9 @@ public List getRows(KeyRange keyRange, SlicePredicate slicePredicate) InetSocketAddress host = clientPool.getRandomHostForKey(keyRange.getStart_key()); return clientPool.runWithRetryOnHost( host, - new FunctionCheckedException, RuntimeException>() { + new FunctionCheckedException, Exception>() { @Override - public List apply(Cassandra.Client client) throws RuntimeException { + public List apply(Cassandra.Client client) throws Exception { try { return queryRunner.run(client, tableRef, () -> client.get_range_slices(colFam, slicePredicate, keyRange, consistency)); @@ -67,10 +69,8 @@ public List apply(Cassandra.Client client) throws RuntimeException { throw new InsufficientConsistencyException("This operation requires all Cassandra" + " nodes to be up and available.", e); } else { - throw new RuntimeException(e); + throw e; } - } catch (TException e) { - throw new RuntimeException(e); } } @@ -81,4 +81,14 @@ public String toString() { }); } + private SlicePredicate getSlicePredicate() { + SliceRange slice = new SliceRange( + ByteBuffer.wrap(PtBytes.EMPTY_BYTE_ARRAY), + ByteBuffer.wrap(PtBytes.EMPTY_BYTE_ARRAY), + false, + fetchMode.getColumnsToFetch()); + SlicePredicate predicate = new SlicePredicate(); + predicate.setSlice_range(slice); + return predicate; + } } diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/SingleRowColumnPager.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/SingleRowColumnPager.java deleted file mode 100644 index ff00fdeb520..00000000000 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/SingleRowColumnPager.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.cassandra.paging; - -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.util.Iterator; -import java.util.List; -import java.util.Optional; - -import javax.annotation.Nullable; - -import org.apache.cassandra.thrift.Cassandra; -import org.apache.cassandra.thrift.Column; -import org.apache.cassandra.thrift.ColumnOrSuperColumn; -import org.apache.cassandra.thrift.ColumnParent; -import org.apache.cassandra.thrift.ConsistencyLevel; -import org.apache.cassandra.thrift.SlicePredicate; -import org.apache.cassandra.thrift.SliceRange; -import org.apache.thrift.TException; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.palantir.atlasdb.encoding.PtBytes; -import com.palantir.atlasdb.keyvalue.api.RangeRequests; -import com.palantir.atlasdb.keyvalue.api.TableReference; -import com.palantir.atlasdb.keyvalue.cassandra.CassandraClientPool; -import com.palantir.atlasdb.keyvalue.cassandra.CassandraKeyValueServiceImpl; -import com.palantir.atlasdb.keyvalue.cassandra.CassandraKeyValueServices; -import com.palantir.atlasdb.keyvalue.cassandra.TracingQueryRunner; -import com.palantir.util.Pair; - -/** Iterates over raw Cassandra Columns of a single given row in batches. */ -public class SingleRowColumnPager { - private final CassandraClientPool clientPool; - private final TracingQueryRunner queryRunner; - - public SingleRowColumnPager(CassandraClientPool clientPool, TracingQueryRunner queryRunner) { - this.clientPool = clientPool; - this.queryRunner = queryRunner; - } - - public Iterator> createColumnIterator(TableReference tableRef, - byte[] rowKey, - int pageSize, - @Nullable Column startColumnExclusive, - ConsistencyLevel consistencyLevel) { - ColumnParent columnParent = new ColumnParent(CassandraKeyValueServiceImpl.internalTableName(tableRef)); - return new PageIterator( - tableRef, columnParent, ByteBuffer.wrap(rowKey), pageSize, consistencyLevel, startColumnExclusive); - } - - private class PageIterator extends AbstractIterator> { - private final TableReference tableRef; - private final ColumnParent columnParent; - private final ByteBuffer rowKey; - private final int pageSize; - private final ConsistencyLevel consistencyLevel; - private Column lastSeenColumn; - private boolean reachedEnd = false; - - PageIterator(TableReference tableRef, ColumnParent columnParent, ByteBuffer rowKey, int pageSize, - ConsistencyLevel consistencyLevel, Column lastSeenColumn) { - this.tableRef = tableRef; - this.columnParent = columnParent; - this.rowKey = rowKey; - this.pageSize = pageSize; - this.consistencyLevel = consistencyLevel; - this.lastSeenColumn = lastSeenColumn; - } - - @Override - protected List computeNext() { - if (reachedEnd) { - return endOfData(); - } else { - InetSocketAddress host = clientPool.getRandomHostForKey(rowKey.array()); - try { - List cells = clientPool.runWithRetryOnHost(host, this::getPage); - reachedEnd = cells.size() < pageSize; - if (cells.isEmpty()) { - return endOfData(); - } else { - lastSeenColumn = Iterables.getLast(cells).column; - return cells; - } - } catch (TException e) { - throw new RuntimeException(e); - } - } - } - - private List getPage(Cassandra.Client client) throws TException { - Optional startColumn = getStartColumn(lastSeenColumn); - if (startColumn.isPresent()) { - SliceRange sliceRange = new SliceRange( - startColumn.get(), - ByteBuffer.wrap(PtBytes.EMPTY_BYTE_ARRAY), - false, // reversed - pageSize); - SlicePredicate slicePred = new SlicePredicate(); - slicePred.setSlice_range(sliceRange); - return queryRunner.run(client, tableRef, - () -> client.get_slice(rowKey, columnParent, slicePred, consistencyLevel)); - } else { - return ImmutableList.of(); - } - } - } - - // Given the last seen column, compute the next start column - @VisibleForTesting - static Optional getStartColumn(@Nullable Column lastSeenColumn) { - if (lastSeenColumn == null) { - // An empty byte array means "start from the beginning of the row" - return Optional.of(ByteBuffer.wrap(PtBytes.EMPTY_BYTE_ARRAY)); - } else { - // We need to "increment" the last seen (colName, ts) pair to the lexicographically next pair. - // Note that timestamps are stored in descending order. For example, the next pair for ("a", 5) - // will be ("a", 4). - Pair colNameAndTs = CassandraKeyValueServices.decomposeName(lastSeenColumn); - if (colNameAndTs.getRhSide() == Long.MIN_VALUE) { - // This is an edge case that will never happen in reality, but technically we have - // to handle it anyway. If the last seen timestamp equals minimum possible value of a 64-bit - // signed integer, we can't decrement it, so we have to carry over to the colName. - // E.g. ("a", Long.MIN_VALUE) -> ("b", Long.MAX_VALUE) - byte[] nextColName = RangeRequests.getNextStartRowUnlessTerminal(false, colNameAndTs.getLhSide()); - if (nextColName == null) { - // An even "edgier" case: in addition to reaching the minimum timestamp, we also reached - // the last possible colName, so there is nowhere to increment. - return Optional.empty(); - } else { - return Optional.of(CassandraKeyValueServices.makeCompositeBuffer(nextColName, Long.MAX_VALUE)); - } - } else { - // Timestamps are in descending order, so just decrement the timestamp to find the next column. - // E.g., ("a", 4) -> ("a", 3) - return Optional.of(CassandraKeyValueServices.makeCompositeBuffer( - colNameAndTs.getLhSide(), - colNameAndTs.getRhSide() - 1)); - } - } - } -} diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/StatsAccumulator.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/StatsAccumulator.java deleted file mode 100644 index f53093a8edc..00000000000 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/StatsAccumulator.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.cassandra.paging; - -// TODO(gsheasby): Replace this with StatsAccumulator from Guava 20.0 if we ever bump the dependency. -public class StatsAccumulator { - private double mean = 0.0; - private double sumOfDeltaSquares = 0.0; - private long count = 0; - - long count() { - return count; - } - - void add(double value) { - count += 1; - double delta = value - mean; - mean += delta / count; - sumOfDeltaSquares += delta * (value - mean); - } - - double mean() { - return mean; - } - - double populationStandardDeviation() { - return Math.sqrt(Math.max(0.0, sumOfDeltaSquares) / count); - } -} diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/sweep/CassandraGetCandidateCellsForSweepingImpl.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/sweep/CassandraGetCandidateCellsForSweepingImpl.java deleted file mode 100644 index f282334d195..00000000000 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/sweep/CassandraGetCandidateCellsForSweepingImpl.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.cassandra.sweep; - -import java.util.Iterator; -import java.util.List; -import java.util.Optional; - -import org.apache.cassandra.thrift.ConsistencyLevel; - -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.primitives.Longs; -import com.palantir.atlasdb.AtlasDbConstants; -import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweeping; -import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweepingRequest; -import com.palantir.atlasdb.keyvalue.api.Cell; -import com.palantir.atlasdb.keyvalue.api.ImmutableCandidateCellForSweeping; -import com.palantir.atlasdb.keyvalue.api.TableReference; -import com.palantir.atlasdb.keyvalue.cassandra.CassandraKeyValueServices; -import com.palantir.atlasdb.keyvalue.cassandra.paging.CassandraRawCellValue; -import com.palantir.atlasdb.keyvalue.cassandra.paging.CellPager; -import com.palantir.util.Pair; - -import gnu.trove.list.TLongList; -import gnu.trove.list.array.TLongArrayList; - -/* - * Simply use the CellPager to iterate over raw cells in a table and group the returned entries by the cell key. - */ -public class CassandraGetCandidateCellsForSweepingImpl { - - private final CellPager cellPager; - - public CassandraGetCandidateCellsForSweepingImpl(CellPager cellPager) { - this.cellPager = cellPager; - } - - public Iterator> getCandidateCellsForSweeping( - TableReference tableRef, - CandidateCellForSweepingRequest request, - ConsistencyLevel consistencyLevel) { - Iterator> rawIter = cellPager.createCellIterator( - tableRef, - request.startRowInclusive(), - request.batchSizeHint().orElse(AtlasDbConstants.DEFAULT_SWEEP_CANDIDATE_BATCH_HINT), - consistencyLevel); - return new CellGroupingIterator(rawIter, request); - } - - private static class CellGroupingIterator extends AbstractIterator> { - private final CandidateCellForSweepingRequest request; - - private final Iterator> rawIter; - private Cell currentCell = null; - private final TLongList currentTimestamps = new TLongArrayList(); - private boolean currentLatestValEmpty; - private long numCellTsPairsExamined = 0; - private boolean reachedEnd = false; - - CellGroupingIterator(Iterator> rawIter, CandidateCellForSweepingRequest request) { - this.rawIter = rawIter; - this.request = request; - } - - @Override - protected List computeNext() { - if (reachedEnd) { - return endOfData(); - } else { - List candidates = Lists.newArrayList(); - while (candidates.isEmpty() && rawIter.hasNext()) { - List cols = rawIter.next(); - for (CassandraRawCellValue col : cols) { - processColumn(col).ifPresent(candidates::add); - } - } - if (candidates.isEmpty()) { - reachedEnd = true; - if (!currentTimestamps.isEmpty()) { - // Since we reached the end of data, we know there can't be more timestamps for this cell, - // so create a candidate from them. - return ImmutableList.of(createCandidate()); - } else { - return endOfData(); - } - } else { - return candidates; - } - } - } - - private Optional processColumn(CassandraRawCellValue col) { - Optional maybeCandidate = Optional.empty(); - Pair colNameAndTs = CassandraKeyValueServices.decomposeName(col.getColumn()); - Cell cell = Cell.create(col.getRowKey(), colNameAndTs.getLhSide()); - if (!cell.equals(currentCell)) { - if (!currentTimestamps.isEmpty()) { - maybeCandidate = Optional.of(createCandidate()); - } - currentCell = cell; - } - long ts = colNameAndTs.getRhSide(); - if (ts < request.sweepTimestamp() && Longs.indexOf(request.timestampsToIgnore(), ts) < 0) { - if (currentTimestamps.isEmpty()) { - // Timestamps are in the decreasing order, so we pick the first timestamp below sweepTimestamp - // to check the value for emptiness - currentLatestValEmpty = request.shouldCheckIfLatestValueIsEmpty() - && col.getColumn().getValue().length == 0; - } - currentTimestamps.add(ts); - numCellTsPairsExamined += 1; - } - return maybeCandidate; - } - - private CandidateCellForSweeping createCandidate() { - currentTimestamps.reverse(); - long[] sortedTimestamps = currentTimestamps.toArray(); - currentTimestamps.clear(); - return ImmutableCandidateCellForSweeping.builder() - .cell(currentCell) - .sortedTimestamps(sortedTimestamps) - .isLatestValueEmpty(currentLatestValEmpty) - .numCellsTsPairsExamined(numCellTsPairsExamined) - .build(); - } - } - -} diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CellPagerBatchSizingStrategyTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CellPagerBatchSizingStrategyTest.java deleted file mode 100644 index 1a0e936792c..00000000000 --- a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CellPagerBatchSizingStrategyTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.cassandra.paging; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import org.junit.Test; - -import com.palantir.atlasdb.keyvalue.cassandra.paging.CellPagerBatchSizingStrategy.PageSizes; - -public class CellPagerBatchSizingStrategyTest { - - private final CellPagerBatchSizingStrategy strategy = new CellPagerBatchSizingStrategy(); - - @Test - public void testUseSquareIfNoStats() { - PageSizes pageSizes = strategy.computePageSizes(102, stats()); - assertEquals(10, pageSizes.columnPerRowLimit); - assertEquals(10, pageSizes.rowLimit); - } - - @Test - public void testZeroVariance() { - PageSizes pageSizes = strategy.computePageSizes(100, stats(4, 4, 4, 4, 4)); - assertEquals(5, pageSizes.columnPerRowLimit); - assertEquals(20, pageSizes.rowLimit); - } - - @Test - public void testNonZeroVariance() { - PageSizes pageSizes = strategy.computePageSizes(100, stats(4, 5, 4, 5, 4, 5)); - assertTrue(pageSizes.columnPerRowLimit > 5); - assertTrue(pageSizes.rowLimit < 20); - } - - @Test - public void testReturnReasonableResults() { - for (double v = 1; v <= 200; v += 0.1) { - PageSizes pageSizes = strategy.computePageSizes(100, stats(1, 1, 1, v, v, v)); - assertTrue(pageSizes.rowLimit > 0); - assertTrue(pageSizes.rowLimit <= 100); - assertTrue(pageSizes.columnPerRowLimit >= 2); - assertTrue(pageSizes.columnPerRowLimit <= 100); - assertTrue(pageSizes.rowLimit * pageSizes.columnPerRowLimit <= 100); - } - } - - private StatsAccumulator stats(double... measurements) { - StatsAccumulator stats = new StatsAccumulator(); - for (double x : measurements) { - stats.add(x); - } - return stats; - } - -} diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CellPagerSplitFetchedRowsIntoTasksTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CellPagerSplitFetchedRowsIntoTasksTest.java deleted file mode 100644 index 2f975659cf5..00000000000 --- a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CellPagerSplitFetchedRowsIntoTasksTest.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.cassandra.paging; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import org.apache.cassandra.thrift.Column; -import org.apache.cassandra.thrift.ColumnOrSuperColumn; -import org.apache.cassandra.thrift.KeySlice; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.primitives.Ints; -import com.palantir.atlasdb.keyvalue.cassandra.CassandraKeyValueServices; - -public class CellPagerSplitFetchedRowsIntoTasksTest { - - private StatsAccumulator accumulator; - private List recordedRowWidths; - - @Before - public void mockStatsAccumulator() { - accumulator = Mockito.mock(StatsAccumulator.class); - recordedRowWidths = new ArrayList<>(); - Mockito.doAnswer(inv -> { - recordedRowWidths.add(inv.getArgumentAt(0, double.class)); - return null; - }).when(accumulator).add(Mockito.anyDouble()); - } - - @Test - public void testTwoFullyFetchedRows() { - // Row 100: 1 - // Row 200: 1 2 - List>> iterators = CellPager.splitFetchedRowsIntoTasks( - ImmutableList.of(keySlice(100, 1), keySlice(200, 2)), - 3, - (rowName, lastSeenColumn) -> { - Assert.fail("Didn't expect this to be called - there are no partially fetch rows"); - return null; - }, - accumulator); - assertEquals(1, iterators.size()); - List> onlyIterCopy = ImmutableList.copyOf(Iterables.getOnlyElement(iterators)); - assertEquals( - ImmutableList.of(ImmutableList.of(rawCellValue(100, 1), rawCellValue(200, 1), rawCellValue(200, 2))), - onlyIterCopy); - assertEquals(ImmutableList.of(1.0, 2.0), recordedRowWidths); - } - - @Test - public void testSinglePartiallyFetchedRow() { - // Row 100: 1 2 3 4 5 - List>> iterators = CellPager.splitFetchedRowsIntoTasks( - ImmutableList.of(keySlice(100, 3)), - 3, - (rowName, lastSeenColumn) -> { - assertArrayEquals(key(100), rowName); - assertEquals(column(3), lastSeenColumn); - return ImmutableList.>of( - ImmutableList.of(columnOrSuperColumn(4), columnOrSuperColumn(5))).iterator(); - }, - accumulator); - assertEquals(2, iterators.size()); - // The first portion of cells consists of the first three columns of our row - assertEquals( - ImmutableList.of(ImmutableList.of( - rawCellValue(100, 1), rawCellValue(100, 2), rawCellValue(100, 3))), - ImmutableList.copyOf(iterators.get(0))); - // The second portion of cells is the remainder of our row that we returned from - // the mock RowRemainderIteratorFactory. - assertEquals( - ImmutableList.of(ImmutableList.of(rawCellValue(100, 4), rawCellValue(100, 5))), - ImmutableList.copyOf(iterators.get(1))); - assertEquals(ImmutableList.of(5.0), recordedRowWidths); - } - - @Test - public void testPartiallyFetchedRowInTheMiddle() { - // Row 100: 1 - // Row 200: 1 2 3 4 5 - // Row 300: 1 2 - List>> iterators = CellPager.splitFetchedRowsIntoTasks( - ImmutableList.of(keySlice(100, 1), keySlice(200, 3), keySlice(300, 2)), - 3, - (rowName, lastSeenColumn) -> { - assertArrayEquals(key(200), rowName); - assertEquals(column(3), lastSeenColumn); - return ImmutableList.>of( - ImmutableList.of(columnOrSuperColumn(4), columnOrSuperColumn(5))).iterator(); - }, - accumulator); - assertEquals(3, iterators.size()); - // The first portion of cells includes Row 100 and the fetched part of Row 200 (cells 1, 2 and 3) - assertEquals( - ImmutableList.of(ImmutableList.of( - rawCellValue(100, 1), rawCellValue(200, 1), rawCellValue(200, 2), rawCellValue(200, 3))), - ImmutableList.copyOf(iterators.get(0))); - // The second portion of cells consists of the remainder of row 200 that we returned from - // the mock RowRemainderIteratorFactory. - assertEquals( - ImmutableList.of(ImmutableList.of(rawCellValue(200, 4), rawCellValue(200, 5))), - ImmutableList.copyOf(iterators.get(1))); - // The third (and last) portion of cells is Row 300 - assertEquals( - ImmutableList.of(ImmutableList.of(rawCellValue(300, 1), rawCellValue(300, 2))), - ImmutableList.copyOf(iterators.get(2))); - // We expect the widths of Rows 100 and 300 to be recorded first because they are fully fetched. - // The width of Row 200 should be recorded only after we exhaust the corresponding iterator. - assertEquals(ImmutableList.of(1.0, 2.0, 5.0), recordedRowWidths); - } - - private static KeySlice keySlice(int row, int size) { - List columns = new ArrayList<>(size); - for (int col = 1; col <= size; ++col) { - columns.add(columnOrSuperColumn(col)); - } - return new KeySlice(ByteBuffer.wrap(key(row)), columns); - } - - private static CassandraRawCellValue rawCellValue(int row, int col) { - return ImmutableCassandraRawCellValue.builder() - .rowKey(key(row)) - .column(column(col)) - .build(); - } - - private static ColumnOrSuperColumn columnOrSuperColumn(int col) { - ColumnOrSuperColumn cosc = new ColumnOrSuperColumn(); - cosc.setColumn(column(col)); - return cosc; - } - - private static Column column(int col) { - Column column = new Column(); - column.setName(CassandraKeyValueServices.makeCompositeBuffer(key(col), 1000L)); - column.setValue(new byte[] { 1, 2, 3 }); - column.setTimestamp(1000L); - return column; - } - - private static byte[] key(int number) { - return Ints.toByteArray(number); - } -} diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/paging/KeyRangesTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/paging/KeyRangesTest.java deleted file mode 100644 index de042c042d5..00000000000 --- a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/paging/KeyRangesTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.cassandra.paging; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - -import org.apache.cassandra.thrift.KeyRange; -import org.junit.Test; - -import com.palantir.atlasdb.encoding.PtBytes; -import com.palantir.atlasdb.keyvalue.api.RangeRequests; - -public class KeyRangesTest { - @Test - public void createKeyRangeWithExplicitEnd() { - byte[] start = PtBytes.toBytes("start"); - byte[] end = PtBytes.toBytes("the-end"); - byte[] enc = RangeRequests.previousLexicographicName(end); - int limit = 7; - KeyRange keyRange = KeyRanges.createKeyRange(start, end, limit); - - assertArrayEquals(start, keyRange.getStart_key()); - assertArrayEquals(enc, keyRange.getEnd_key()); - assertEquals(limit, keyRange.getCount()); - } - - @Test - public void createKeyRangeWithEmptyEnd() { - byte[] start = PtBytes.toBytes("start"); - byte[] end = PtBytes.toBytes(""); - int limit = 7; - KeyRange keyRange = KeyRanges.createKeyRange(start, end, limit); - - assertArrayEquals(start, keyRange.getStart_key()); - assertArrayEquals(end, keyRange.getEnd_key()); - assertEquals(limit, keyRange.getCount()); - } -} diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/paging/SingleRowColumnPagerTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/paging/SingleRowColumnPagerTest.java deleted file mode 100644 index 906b06eaf1b..00000000000 --- a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/paging/SingleRowColumnPagerTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.cassandra.paging; - -import static org.junit.Assert.assertEquals; - -import java.nio.ByteBuffer; -import java.util.Optional; - -import org.apache.cassandra.thrift.Column; -import org.junit.Test; - -import com.palantir.atlasdb.encoding.PtBytes; -import com.palantir.atlasdb.keyvalue.api.Cell; -import com.palantir.atlasdb.keyvalue.api.RangeRequests; -import com.palantir.atlasdb.keyvalue.cassandra.CassandraKeyValueServices; - -public class SingleRowColumnPagerTest { - - @Test - public void startShouldBeEmptyWhenThereIsNoLastSeenColumn() { - assertEquals( - Optional.of(ByteBuffer.wrap(PtBytes.EMPTY_BYTE_ARRAY)), - SingleRowColumnPager.getStartColumn(null)); - } - - @Test - public void startShouldBeOneBelowPreviousTimestamp() { - assertEquals( - Optional.of(CassandraKeyValueServices.makeCompositeBuffer(new byte[] { 1, 2, 3 }, 999L)), - SingleRowColumnPager.getStartColumn(new Column( - CassandraKeyValueServices.makeCompositeBuffer(new byte[] { 1, 2, 3 }, 1000L)))); - } - - @Test - public void startShouldRollOverToNextColumnNameWhenMinimumTimestampReached() { - assertEquals( - Optional.of(CassandraKeyValueServices.makeCompositeBuffer( - RangeRequests.nextLexicographicName(new byte[] {1, 2, 3 }), Long.MAX_VALUE)), - SingleRowColumnPager.getStartColumn(new Column( - CassandraKeyValueServices.makeCompositeBuffer(new byte[] { 1, 2, 3 }, Long.MIN_VALUE)))); - } - - @Test - public void startShouldBeEmptyWhenLastPossiblePointReached() { - assertEquals( - Optional.empty(), - SingleRowColumnPager.getStartColumn(new Column( - CassandraKeyValueServices.makeCompositeBuffer(lastPossibleColumnName(), Long.MIN_VALUE)))); - } - - private static byte[] lastPossibleColumnName() { - byte[] ret = new byte[Cell.MAX_NAME_LENGTH]; - for (int i = 0; i < Cell.MAX_NAME_LENGTH; ++i) { - ret[i] |= 0xff; - } - return ret; - } - -} diff --git a/atlasdb-cassandra/versions.lock b/atlasdb-cassandra/versions.lock index db8b1674a77..f35e750bd0b 100644 --- a/atlasdb-cassandra/versions.lock +++ b/atlasdb-cassandra/versions.lock @@ -215,10 +215,6 @@ "com.palantir.atlasdb:timestamp-impl": { "project": true }, - "com.palantir.patches.sourceforge:trove3": { - "locked": "3.0.3-p5", - "requested": "3.0.3-p5" - }, "com.palantir.remoting-api:ssl-config": { "locked": "1.1.0", "transitive": [ @@ -676,10 +672,6 @@ "com.palantir.atlasdb:timestamp-impl": { "project": true }, - "com.palantir.patches.sourceforge:trove3": { - "locked": "3.0.3-p5", - "requested": "3.0.3-p5" - }, "com.palantir.remoting-api:ssl-config": { "locked": "1.1.0", "transitive": [ diff --git a/atlasdb-cli-distribution/versions.lock b/atlasdb-cli-distribution/versions.lock index c65829b826f..56ce6323bdd 100644 --- a/atlasdb-cli-distribution/versions.lock +++ b/atlasdb-cli-distribution/versions.lock @@ -543,7 +543,6 @@ "com.palantir.patches.sourceforge:trove3": { "locked": "3.0.3-p5", "transitive": [ - "com.palantir.atlasdb:atlasdb-cassandra", "com.palantir.atlasdb:atlasdb-impl-shared", "com.palantir.atlasdb:lock-impl" ] diff --git a/atlasdb-cli/versions.lock b/atlasdb-cli/versions.lock index 301e37dc7d2..0fea1f47dca 100644 --- a/atlasdb-cli/versions.lock +++ b/atlasdb-cli/versions.lock @@ -467,7 +467,6 @@ "com.palantir.patches.sourceforge:trove3": { "locked": "3.0.3-p5", "transitive": [ - "com.palantir.atlasdb:atlasdb-cassandra", "com.palantir.atlasdb:atlasdb-impl-shared", "com.palantir.atlasdb:lock-impl" ] @@ -1325,7 +1324,6 @@ "com.palantir.patches.sourceforge:trove3": { "locked": "3.0.3-p5", "transitive": [ - "com.palantir.atlasdb:atlasdb-cassandra", "com.palantir.atlasdb:atlasdb-impl-shared", "com.palantir.atlasdb:lock-impl" ] diff --git a/atlasdb-console-distribution/versions.lock b/atlasdb-console-distribution/versions.lock index e9e99ecb743..722552a2b85 100644 --- a/atlasdb-console-distribution/versions.lock +++ b/atlasdb-console-distribution/versions.lock @@ -525,7 +525,6 @@ "com.palantir.patches.sourceforge:trove3": { "locked": "3.0.3-p5", "transitive": [ - "com.palantir.atlasdb:atlasdb-cassandra", "com.palantir.atlasdb:atlasdb-impl-shared", "com.palantir.atlasdb:lock-impl" ] diff --git a/atlasdb-container-test-utils/versions.lock b/atlasdb-container-test-utils/versions.lock index 02c20467907..7b6ee3ac104 100644 --- a/atlasdb-container-test-utils/versions.lock +++ b/atlasdb-container-test-utils/versions.lock @@ -277,12 +277,6 @@ "com.palantir.docker.proxy:docker-proxy-rule": { "locked": "0.3.0" }, - "com.palantir.patches.sourceforge:trove3": { - "locked": "3.0.3-p5", - "transitive": [ - "com.palantir.atlasdb:atlasdb-cassandra" - ] - }, "com.palantir.remoting-api:ssl-config": { "locked": "1.1.0", "transitive": [ @@ -871,12 +865,6 @@ "com.palantir.docker.proxy:docker-proxy-rule": { "locked": "0.3.0" }, - "com.palantir.patches.sourceforge:trove3": { - "locked": "3.0.3-p5", - "transitive": [ - "com.palantir.atlasdb:atlasdb-cassandra" - ] - }, "com.palantir.remoting-api:ssl-config": { "locked": "1.1.0", "transitive": [ diff --git a/atlasdb-dropwizard-bundle/versions.lock b/atlasdb-dropwizard-bundle/versions.lock index 4a20d748f60..ed64d39825f 100644 --- a/atlasdb-dropwizard-bundle/versions.lock +++ b/atlasdb-dropwizard-bundle/versions.lock @@ -498,7 +498,6 @@ "com.palantir.patches.sourceforge:trove3": { "locked": "3.0.3-p5", "transitive": [ - "com.palantir.atlasdb:atlasdb-cassandra", "com.palantir.atlasdb:atlasdb-impl-shared", "com.palantir.atlasdb:lock-impl" ] @@ -1806,7 +1805,6 @@ "com.palantir.patches.sourceforge:trove3": { "locked": "3.0.3-p5", "transitive": [ - "com.palantir.atlasdb:atlasdb-cassandra", "com.palantir.atlasdb:atlasdb-impl-shared", "com.palantir.atlasdb:lock-impl" ] diff --git a/atlasdb-ete-tests/versions.lock b/atlasdb-ete-tests/versions.lock index c03c9bccca2..4a6eb20b59a 100644 --- a/atlasdb-ete-tests/versions.lock +++ b/atlasdb-ete-tests/versions.lock @@ -523,7 +523,6 @@ "com.palantir.patches.sourceforge:trove3": { "locked": "3.0.3-p5", "transitive": [ - "com.palantir.atlasdb:atlasdb-cassandra", "com.palantir.atlasdb:atlasdb-impl-shared", "com.palantir.atlasdb:lock-impl" ] @@ -1928,7 +1927,6 @@ "com.palantir.patches.sourceforge:trove3": { "locked": "3.0.3-p5", "transitive": [ - "com.palantir.atlasdb:atlasdb-cassandra", "com.palantir.atlasdb:atlasdb-impl-shared", "com.palantir.atlasdb:lock-impl" ] diff --git a/atlasdb-perf/versions.lock b/atlasdb-perf/versions.lock index 99221e8d8b8..a35dfeaadba 100644 --- a/atlasdb-perf/versions.lock +++ b/atlasdb-perf/versions.lock @@ -551,7 +551,6 @@ "com.palantir.patches.sourceforge:trove3": { "locked": "3.0.3-p5", "transitive": [ - "com.palantir.atlasdb:atlasdb-cassandra", "com.palantir.atlasdb:atlasdb-impl-shared", "com.palantir.atlasdb:lock-impl" ] @@ -1591,7 +1590,6 @@ "com.palantir.patches.sourceforge:trove3": { "locked": "3.0.3-p5", "transitive": [ - "com.palantir.atlasdb:atlasdb-cassandra", "com.palantir.atlasdb:atlasdb-impl-shared", "com.palantir.atlasdb:lock-impl" ] diff --git a/atlasdb-service-server/versions.lock b/atlasdb-service-server/versions.lock index 3afc736e20d..5e5ac0e6de2 100644 --- a/atlasdb-service-server/versions.lock +++ b/atlasdb-service-server/versions.lock @@ -1570,7 +1570,6 @@ "com.palantir.patches.sourceforge:trove3": { "locked": "3.0.3-p5", "transitive": [ - "com.palantir.atlasdb:atlasdb-cassandra", "com.palantir.atlasdb:atlasdb-impl-shared", "com.palantir.atlasdb:lock-impl" ] diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/keyvalue/impl/TestDataBuilder.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/keyvalue/impl/TestDataBuilder.java deleted file mode 100644 index 7e6a751a2e1..00000000000 --- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/keyvalue/impl/TestDataBuilder.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.impl; - -import java.nio.charset.StandardCharsets; -import java.util.Map; - -import com.google.common.collect.Maps; -import com.google.common.primitives.Ints; -import com.palantir.atlasdb.encoding.PtBytes; -import com.palantir.atlasdb.keyvalue.api.Cell; -import com.palantir.atlasdb.keyvalue.api.KeyValueService; -import com.palantir.atlasdb.keyvalue.api.TableReference; - -public class TestDataBuilder { - private final KeyValueService kvs; - private final TableReference table; - - private Map> cellsByTimestamp = Maps.newHashMap(); - - public TestDataBuilder(KeyValueService kvs, TableReference table) { - this.kvs = kvs; - this.table = table; - } - - public TestDataBuilder put(int row, int col, long ts, String val) { - return put(row, col, ts, value(val)); - } - - public TestDataBuilder put(int row, int col, long ts) { - return put(row, col, ts, value("foobar")); - } - - public TestDataBuilder put(int row, int col, long ts, byte[] value) { - cellsByTimestamp.computeIfAbsent(ts, key -> Maps.newHashMap()) - .put(cell(row, col), value); - return this; - } - - public TestDataBuilder putEmpty(int row, int col, long ts) { - return put(row, col, ts, PtBytes.EMPTY_BYTE_ARRAY); - } - - public void store() { - for (Map.Entry> e : cellsByTimestamp.entrySet()) { - kvs.put(table, e.getValue(), e.getKey()); - } - } - - public static Cell cell(int rowNum, int colNum) { - return Cell.create(row(rowNum), row(colNum)); - } - - public static byte[] row(int rowNum) { - return Ints.toByteArray(rowNum); - } - - public static byte[] value(String valueStr) { - return valueStr.getBytes(StandardCharsets.UTF_8); - } -} diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/AbstractSweepTaskRunnerTest.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/AbstractSweepTaskRunnerTest.java index 6646347d3ef..f5bde10c5f6 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/AbstractSweepTaskRunnerTest.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/AbstractSweepTaskRunnerTest.java @@ -382,6 +382,7 @@ public void testSweepOnMixedCaseTable() { } @Test + @SuppressWarnings("unchecked") public void testSweepBatchesDownToDeleteBatchSize() { CellsSweeper cellsSweeper = Mockito.mock(CellsSweeper.class); SweepTaskRunner spiedSweepRunner = @@ -392,8 +393,9 @@ public void testSweepBatchesDownToDeleteBatchSize() { int deleteBatchSize = 1; List> sweptCells = runSweep(cellsSweeper, spiedSweepRunner, 8, 8, deleteBatchSize); - assertThat(sweptCells).allMatch(list -> list.size() <= 2 * deleteBatchSize); - assertThat(Iterables.concat(sweptCells)).containsExactlyElementsOf(SMALL_LIST_OF_CELLS); + + List> expectedCells = groupCells(SMALL_LIST_OF_CELLS, 2 * deleteBatchSize); + assertEquals(expectedCells, sweptCells); } @Test @@ -465,6 +467,17 @@ private List> runSweep(CellsSweeper cellsSweeper, SweepTaskRunner spi return sweptCells; } + private List> groupCells(List cells, int sizeOfEachGroup) { + List> groupedCells = Lists.newArrayList(); + + for (int i = 0; i < cells.size(); i += sizeOfEachGroup) { + int groupMax = Math.min(i + sizeOfEachGroup, cells.size()); + groupedCells.add(cells.subList(i, groupMax)); + } + + return groupedCells; + } + private void testSweepManyRows(SweepStrategy strategy) { createTable(strategy); putIntoDefaultColumn("foo", "bar1", 5); diff --git a/timelock-server-distribution/versions.lock b/timelock-server-distribution/versions.lock index 5c4902d0dbe..52d6b5d7a89 100644 --- a/timelock-server-distribution/versions.lock +++ b/timelock-server-distribution/versions.lock @@ -579,7 +579,6 @@ "com.palantir.patches.sourceforge:trove3": { "locked": "3.0.3-p5", "transitive": [ - "com.palantir.atlasdb:atlasdb-cassandra", "com.palantir.atlasdb:atlasdb-impl-shared", "com.palantir.atlasdb:lock-impl" ] diff --git a/timelock-server/versions.lock b/timelock-server/versions.lock index 2cc5131a542..8e59169e6db 100644 --- a/timelock-server/versions.lock +++ b/timelock-server/versions.lock @@ -1733,7 +1733,6 @@ "com.palantir.patches.sourceforge:trove3": { "locked": "3.0.3-p5", "transitive": [ - "com.palantir.atlasdb:atlasdb-cassandra", "com.palantir.atlasdb:atlasdb-impl-shared", "com.palantir.atlasdb:lock-impl" ]