diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/spi/KeyValueServiceConfig.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/spi/KeyValueServiceConfig.java index 51a2e9865df..729a281e558 100644 --- a/atlasdb-api/src/main/java/com/palantir/atlasdb/spi/KeyValueServiceConfig.java +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/spi/KeyValueServiceConfig.java @@ -42,4 +42,5 @@ public interface KeyValueServiceConfig { default int defaultGetRangesConcurrency() { return Math.min(8, concurrentGetRangesThreadPoolSize() / 2); } + } diff --git a/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraGetCandidateCellsForSweepingTest.java b/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraGetCandidateCellsForSweepingTest.java index c0bab648c98..2128c86bddf 100644 --- a/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraGetCandidateCellsForSweepingTest.java +++ b/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraGetCandidateCellsForSweepingTest.java @@ -16,13 +16,18 @@ package com.palantir.atlasdb.keyvalue.cassandra; +import static org.assertj.core.api.Assertions.assertThat; + import org.junit.ClassRule; +import org.junit.Test; import org.mockito.Mockito; import org.slf4j.Logger; import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfigManager; import com.palantir.atlasdb.containers.CassandraContainer; import com.palantir.atlasdb.containers.Containers; +import com.palantir.atlasdb.encoding.PtBytes; +import com.palantir.atlasdb.keyvalue.api.ImmutableCandidateCellForSweeping; import com.palantir.atlasdb.keyvalue.api.KeyValueService; import com.palantir.atlasdb.keyvalue.impl.AbstractGetCandidateCellsForSweepingTest; @@ -38,4 +43,29 @@ protected KeyValueService createKeyValueService() { CassandraContainer.LEADER_CONFIG, Mockito.mock(Logger.class)); } + + @Test + public void returnCandidateIfPossiblyUncommittedTimestamp() { + new TestDataBuilder().put(1, 1, 10L).store(); + assertThat(getAllCandidates(conservativeRequest(PtBytes.EMPTY_BYTE_ARRAY, 40L, 1))) + .containsExactly(ImmutableCandidateCellForSweeping.builder() + .cell(cell(1, 1)) + .sortedTimestamps(new long[] { 10L }) + .isLatestValueEmpty(false) + .numCellsTsPairsExamined(1) + .build()); + } + + @Test + public void returnCandidateIfTwoCommittedTimestamps() { + new TestDataBuilder().put(1, 1, 10L).put(1, 1, 20L).store(); + assertThat(getAllCandidates(conservativeRequest(PtBytes.EMPTY_BYTE_ARRAY, 40L, 1))) + .containsExactly(ImmutableCandidateCellForSweeping.builder() + .cell(cell(1, 1)) + .sortedTimestamps(new long[] { 10L, 20L }) + .isLatestValueEmpty(false) + .numCellsTsPairsExamined(2) + .build()); + } + } diff --git a/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceSweepTaskRunnerIntegrationTest.java b/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceSweepTaskRunnerIntegrationTest.java index 0277879b14b..c3485b2903f 100644 --- a/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceSweepTaskRunnerIntegrationTest.java +++ b/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceSweepTaskRunnerIntegrationTest.java @@ -15,11 +15,19 @@ */ package com.palantir.atlasdb.keyvalue.cassandra; +import java.util.Arrays; + +import org.apache.commons.lang3.RandomStringUtils; import org.junit.Assert; +import org.junit.Assume; import org.junit.ClassRule; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig; import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfigManager; +import com.palantir.atlasdb.cassandra.ImmutableCassandraKeyValueServiceConfig; import com.palantir.atlasdb.containers.CassandraContainer; import com.palantir.atlasdb.containers.Containers; import com.palantir.atlasdb.keyvalue.api.KeyValueService; @@ -27,17 +35,45 @@ import com.palantir.atlasdb.protos.generated.TableMetadataPersistence; import com.palantir.atlasdb.sweep.AbstractSweepTaskRunnerTest; +@RunWith(Parameterized.class) public class CassandraKeyValueServiceSweepTaskRunnerIntegrationTest extends AbstractSweepTaskRunnerTest { @ClassRule public static final Containers CONTAINERS = new Containers( CassandraKeyValueServiceSweepTaskRunnerIntegrationTest.class) .with(new CassandraContainer()); + @Parameterized.Parameter + public boolean useColumnBatchSize; + + @Parameterized.Parameters(name = "Use column batch size parameter = {0}") + public static Iterable parameters() { + return Arrays.asList(true, false); + } + + @Override protected KeyValueService getKeyValueService() { + CassandraKeyValueServiceConfig config = useColumnBatchSize + ? ImmutableCassandraKeyValueServiceConfig.copyOf(CassandraContainer.KVS_CONFIG) + .withTimestampsGetterBatchSize(10) + : CassandraContainer.KVS_CONFIG; + return CassandraKeyValueServiceImpl.create( - CassandraKeyValueServiceConfigManager.createSimpleManager( - CassandraContainer.KVS_CONFIG), CassandraContainer.LEADER_CONFIG); + CassandraKeyValueServiceConfigManager.createSimpleManager(config), CassandraContainer.LEADER_CONFIG); + } + + @Test + public void should_not_oom_when_there_are_many_large_values_to_sweep() { + Assume.assumeTrue("should_not_oom test will always fail if column batch size is not set!", useColumnBatchSize); + + createTable(TableMetadataPersistence.SweepStrategy.CONSERVATIVE); + + long numInsertions = 100; + insertMultipleValues(numInsertions); + + long sweepTimestamp = numInsertions + 1; + SweepResults results = completeSweep(sweepTimestamp); + Assert.assertEquals(numInsertions - 1, results.getStaleValuesDeleted()); } @Test @@ -53,4 +89,14 @@ public void should_return_values_for_multiple_columns_when_sweeping() { Assert.assertEquals(28, results.getStaleValuesDeleted()); } + private void insertMultipleValues(long numInsertions) { + for (int ts = 1; ts <= numInsertions; ts++) { + System.out.println("putting with ts = " + ts); + putIntoDefaultColumn("row", makeLongRandomString(), ts); + } + } + + private String makeLongRandomString() { + return RandomStringUtils.random(1_000_000); + } } diff --git a/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CellPagerIntegrationTest.java b/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CellPagerIntegrationTest.java deleted file mode 100644 index 4381551fe8e..00000000000 --- a/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CellPagerIntegrationTest.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.cassandra; - -import static org.junit.Assert.assertEquals; - -import java.util.List; - -import org.apache.cassandra.thrift.Column; -import org.apache.cassandra.thrift.ConsistencyLevel; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.mockito.Mockito; -import org.slf4j.Logger; - -import com.google.common.collect.ImmutableList; -import com.palantir.atlasdb.AtlasDbConstants; -import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfigManager; -import com.palantir.atlasdb.containers.CassandraContainer; -import com.palantir.atlasdb.containers.Containers; -import com.palantir.atlasdb.encoding.PtBytes; -import com.palantir.atlasdb.keyvalue.api.TableReference; -import com.palantir.atlasdb.keyvalue.cassandra.paging.CassandraRawCellValue; -import com.palantir.atlasdb.keyvalue.cassandra.paging.CellPager; -import com.palantir.atlasdb.keyvalue.cassandra.paging.CellPagerBatchSizingStrategy; -import com.palantir.atlasdb.keyvalue.cassandra.paging.ImmutableCassandraRawCellValue; -import com.palantir.atlasdb.keyvalue.cassandra.paging.SingleRowColumnPager; -import com.palantir.atlasdb.keyvalue.impl.TestDataBuilder; -import com.palantir.atlasdb.keyvalue.impl.TracingPrefsConfig; - -public class CellPagerIntegrationTest { - protected static final TableReference TEST_TABLE = TableReference.createFromFullyQualifiedName( - "cell_pager.test_table"); - - @ClassRule - public static final Containers CONTAINERS = new Containers(CellPagerIntegrationTest.class) - .with(new CassandraContainer()); - - private static CassandraKeyValueService kvs = null; - private CellPagerBatchSizingStrategy pageSizingStrategy = Mockito.mock(CellPagerBatchSizingStrategy.class); - private CellPager cellPager = null; - - @BeforeClass - public static void setUpKvs() { - kvs = CassandraKeyValueServiceImpl.create( - CassandraKeyValueServiceConfigManager.createSimpleManager(CassandraContainer.KVS_CONFIG), - CassandraContainer.LEADER_CONFIG, - Mockito.mock(Logger.class)); - } - - @Before - public void setUp() { - kvs.createTable(TEST_TABLE, AtlasDbConstants.GENERIC_TABLE_METADATA); - kvs.truncateTable(TEST_TABLE); - TracingQueryRunner queryRunner = new TracingQueryRunner( - Mockito.mock(Logger.class), - new TracingPrefsConfig()); - SingleRowColumnPager singleRowPager = new SingleRowColumnPager(kvs.getClientPool(), queryRunner); - cellPager = new CellPager(singleRowPager, kvs.getClientPool(), queryRunner, pageSizingStrategy); - } - - @Test - public void testPaging() { - // Row 1: 5 cells - // Row 2: 1 cell - // Row 3: 2 cells - // Row 4: 1 cell - // Row 5: 1 cell - testDataBuilder() - .put(1, 1, 30L, "a") - .put(1, 1, 20L, "b") - .put(1, 1, 10L, "c") - .put(1, 2, 20L, "d") - .put(1, 2, 10L, "e") - .put(2, 3, 10L, "f") - .put(3, 1, 20L, "g") - .put(3, 1, 10L, "h") - .put(4, 2, 10L, "i") - .put(5, 2, 10L, "j") - .store(); - Mockito.doReturn(new CellPagerBatchSizingStrategy.PageSizes(3, 2)) - .when(pageSizingStrategy).computePageSizes(Mockito.anyInt(), Mockito.any()); - List> pages = ImmutableList.copyOf(cellPager.createCellIterator( - TEST_TABLE, PtBytes.EMPTY_BYTE_ARRAY, 2, ConsistencyLevel.ALL)); - assertEquals( - ImmutableList.of( - // incomplete part of row 1 fetched with get_range_slice - ImmutableList.of(val(1, 1, 30L, "a"), val(1, 1, 20L, "b")), - // first page of the remainder of row 1 fetched with get_slice - ImmutableList.of(val(1, 1, 10L, "c"), val(1, 2, 20L, "d")), - // second page of the remainder of row 1 fetched with get_slice - ImmutableList.of(val(1, 2, 10L, "e")), - // remaining rows already fetched with get_range_slice - ImmutableList.of(val(2, 3, 10L, "f"), val(3, 1, 20L, "g"), val(3, 1, 10L, "h")), - // the second get_range_slice - ImmutableList.of(val(4, 2, 10L, "i"), val(5, 2, 10L, "j"))), - pages); - } - - @Test - public void testStartRow() { - testDataBuilder() - .put(1, 1, 10L, "foo") - .put(2, 1, 10L, "bar") - .store(); - Mockito.doReturn(new CellPagerBatchSizingStrategy.PageSizes(3, 2)) - .when(pageSizingStrategy).computePageSizes(Mockito.anyInt(), Mockito.any()); - List> pages = ImmutableList.copyOf(cellPager.createCellIterator( - TEST_TABLE, TestDataBuilder.row(2), 2, ConsistencyLevel.ALL)); - assertEquals(ImmutableList.of(ImmutableList.of(val(2, 1, 10L, "bar"))), pages); - } - - private static CassandraRawCellValue val(int row, int col, long ts, String value) { - Column column = new Column(); - column.setName(CassandraKeyValueServices.makeCompositeBuffer(TestDataBuilder.row(col), ts)); - column.setValue(TestDataBuilder.value(value)); - column.setTimestamp(ts); - return ImmutableCassandraRawCellValue.builder() - .rowKey(TestDataBuilder.row(row)) - .column(column) - .build(); - } - - private TestDataBuilder testDataBuilder() { - return new TestDataBuilder(kvs, TEST_TABLE); - } - -} diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceImpl.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceImpl.java index bba583054f6..ae81bdceafd 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceImpl.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceImpl.java @@ -107,19 +107,16 @@ import com.palantir.atlasdb.keyvalue.cassandra.jmx.CassandraJmxCompaction; import com.palantir.atlasdb.keyvalue.cassandra.jmx.CassandraJmxCompactionManager; import com.palantir.atlasdb.keyvalue.cassandra.paging.CassandraRangePagingIterable; -import com.palantir.atlasdb.keyvalue.cassandra.paging.CellPager; -import com.palantir.atlasdb.keyvalue.cassandra.paging.CellPagerBatchSizingStrategy; import com.palantir.atlasdb.keyvalue.cassandra.paging.ColumnGetter; import com.palantir.atlasdb.keyvalue.cassandra.paging.CqlColumnGetter; -import com.palantir.atlasdb.keyvalue.cassandra.paging.RowRangeLoader; -import com.palantir.atlasdb.keyvalue.cassandra.paging.SingleRowColumnPager; +import com.palantir.atlasdb.keyvalue.cassandra.paging.RowGetter; import com.palantir.atlasdb.keyvalue.cassandra.paging.ThriftColumnGetter; -import com.palantir.atlasdb.keyvalue.cassandra.sweep.CassandraGetCandidateCellsForSweepingImpl; import com.palantir.atlasdb.keyvalue.cassandra.thrift.SlicePredicates; import com.palantir.atlasdb.keyvalue.cassandra.thrift.SlicePredicates.Limit; import com.palantir.atlasdb.keyvalue.cassandra.thrift.SlicePredicates.Range; import com.palantir.atlasdb.keyvalue.impl.AbstractKeyValueService; import com.palantir.atlasdb.keyvalue.impl.Cells; +import com.palantir.atlasdb.keyvalue.impl.GetCandidateCellsForSweepingShim; import com.palantir.atlasdb.keyvalue.impl.KeyValueServices; import com.palantir.atlasdb.keyvalue.impl.LocalRowColumnRangeIterator; import com.palantir.atlasdb.logging.LoggingArgs; @@ -196,9 +193,7 @@ public void close() { protected final CassandraKeyValueServiceConfigManager configManager; private final Optional compactionManager; - - @VisibleForTesting - final CassandraClientPool clientPool; + private final CassandraClientPool clientPool; private SchemaMutationLock schemaMutationLock; private final Optional leaderConfig; @@ -214,8 +209,6 @@ public void close() { private final TracingQueryRunner queryRunner; private final CassandraTables cassandraTables; - private final CassandraGetCandidateCellsForSweepingImpl getCandidateCellsForSweepingImpl; - private final InitializingWrapper wrapper = new InitializingWrapper(); public static CassandraKeyValueService create( @@ -272,11 +265,6 @@ protected CassandraKeyValueServiceImpl(Logger log, this.queryRunner = new TracingQueryRunner(log, tracingPrefs); this.cassandraTables = new CassandraTables(clientPool, configManager); - - SingleRowColumnPager singleRowPager = new SingleRowColumnPager(clientPool, queryRunner); - CellPager cellPager = new CellPager( - singleRowPager, clientPool, queryRunner, new CellPagerBatchSizingStrategy()); - this.getCandidateCellsForSweepingImpl = new CassandraGetCandidateCellsForSweepingImpl(cellPager); } @Override @@ -1452,8 +1440,7 @@ public ClosableIterator>> getRangeOfTimestamps( @Override public ClosableIterator> getCandidateCellsForSweeping(TableReference tableRef, CandidateCellForSweepingRequest request) { - return ClosableIterators.wrap( - getCandidateCellsForSweepingImpl.getCandidateCellsForSweeping(tableRef, request, deleteConsistency)); + return new GetCandidateCellsForSweepingShim(this).getCandidateCellsForSweeping(tableRef, request); } private ClosableIterator>> getTimestampsInBatchesWithPageCreator( @@ -1463,18 +1450,12 @@ private ClosableIterator>> getTimestampsInBatchesWithPageCre long timestamp, ConsistencyLevel consistency) { SlicePredicate predicate = SlicePredicates.create(Range.ALL, Limit.ONE); - - RowRangeLoader rowRangeLoader = new RowRangeLoader(clientPool, queryRunner, consistency, tableRef); + RowGetter rowGetter = new RowGetter(clientPool, queryRunner, consistency, tableRef); CqlExecutor cqlExecutor = new CqlExecutor(clientPool, consistency); ColumnGetter columnGetter = new CqlColumnGetter(cqlExecutor, tableRef, columnBatchSize); - return getRangeWithPageCreator( - rowRangeLoader, - predicate, - columnGetter, - rangeRequest, - TimestampExtractor::new, + return getRangeWithPageCreator(rowGetter, predicate, columnGetter, rangeRequest, TimestampExtractor::new, timestamp); } @@ -1493,15 +1474,14 @@ private ClosableIterator> getRangeWithPageCreator( // each column. note that if no columns are specified, it's a special case that means all columns predicate = SlicePredicates.create(Range.ALL, Limit.NO_LIMIT); } - RowRangeLoader rowRangeLoader = new RowRangeLoader(clientPool, queryRunner, consistency, tableRef); + RowGetter rowGetter = new RowGetter(clientPool, queryRunner, consistency, tableRef); ColumnGetter columnGetter = new ThriftColumnGetter(); - return getRangeWithPageCreator(rowRangeLoader, predicate, columnGetter, rangeRequest, resultsExtractor, - startTs); + return getRangeWithPageCreator(rowGetter, predicate, columnGetter, rangeRequest, resultsExtractor, startTs); } private ClosableIterator> getRangeWithPageCreator( - RowRangeLoader rowRangeLoader, + RowGetter rowGetter, SlicePredicate slicePredicate, ColumnGetter columnGetter, RangeRequest rangeRequest, @@ -1515,7 +1495,7 @@ private ClosableIterator> getRangeWithPageCreator( } CassandraRangePagingIterable rowResults = new CassandraRangePagingIterable<>( - rowRangeLoader, + rowGetter, slicePredicate, columnGetter, rangeRequest, diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CassandraRangePagingIterable.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CassandraRangePagingIterable.java index d129b1c0e4a..4540d168251 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CassandraRangePagingIterable.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CassandraRangePagingIterable.java @@ -27,6 +27,7 @@ import com.google.common.base.Supplier; import com.palantir.atlasdb.keyvalue.api.ColumnSelection; import com.palantir.atlasdb.keyvalue.api.RangeRequest; +import com.palantir.atlasdb.keyvalue.api.RangeRequests; import com.palantir.atlasdb.keyvalue.api.RowResult; import com.palantir.atlasdb.keyvalue.cassandra.ResultsExtractor; import com.palantir.util.paging.AbstractPagingIterable; @@ -42,17 +43,17 @@ public class CassandraRangePagingIterable private final int batchHint; private final ColumnSelection selection; - private final RowRangeLoader rowRangeLoader; + private final RowGetter rowGetter; private final SlicePredicate slicePredicate; public CassandraRangePagingIterable( - RowRangeLoader rowRangeLoader, + RowGetter rowGetter, SlicePredicate slicePredicate, ColumnGetter columnGetter, RangeRequest rangeRequest, Supplier> resultsExtractor, long timestamp) { - this.rowRangeLoader = rowRangeLoader; + this.rowGetter = rowGetter; this.slicePredicate = slicePredicate; this.columnGetter = columnGetter; this.rangeRequest = rangeRequest; @@ -89,8 +90,8 @@ private TokenBackedBasicResultsPage, byte[]> getSinglePage(byte[] s } private List getRows(byte[] startKey) throws Exception { - KeyRange keyRange = KeyRanges.createKeyRange(startKey, rangeRequest.getEndExclusive(), batchHint); - return rowRangeLoader.getRows(keyRange, slicePredicate); + KeyRange keyRange = getKeyRange(startKey, rangeRequest.getEndExclusive()); + return rowGetter.getRows(keyRange, slicePredicate); } private Map> getColumns(List firstPage) { @@ -114,4 +115,15 @@ private TokenBackedBasicResultsPage, byte[]> pageWithNoMoreResultsA return SimpleTokenBackedResultsPage.create(rangeRequest.getEndExclusive(), page.getResults(), false); } + private KeyRange getKeyRange(byte[] startKey, byte[] endExclusive) { + KeyRange keyRange = new KeyRange(batchHint); + keyRange.setStart_key(startKey); + if (endExclusive.length == 0) { + keyRange.setEnd_key(endExclusive); + } else { + // We need the previous name because this is inclusive, not exclusive + keyRange.setEnd_key(RangeRequests.previousLexicographicName(endExclusive)); + } + return keyRange; + } } diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CassandraRawCellValue.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CassandraRawCellValue.java deleted file mode 100644 index bd616a242d4..00000000000 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CassandraRawCellValue.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.cassandra.paging; - -import org.apache.cassandra.thrift.Column; -import org.immutables.value.Value; - -@Value.Immutable -public interface CassandraRawCellValue { - - byte[] getRowKey(); - - Column getColumn(); - -} diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CellPager.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CellPager.java deleted file mode 100644 index 55b1ed72e53..00000000000 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CellPager.java +++ /dev/null @@ -1,294 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.cassandra.paging; - -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Queue; - -import org.apache.cassandra.thrift.Column; -import org.apache.cassandra.thrift.ColumnOrSuperColumn; -import org.apache.cassandra.thrift.ConsistencyLevel; -import org.apache.cassandra.thrift.KeyRange; -import org.apache.cassandra.thrift.KeySlice; -import org.apache.cassandra.thrift.SlicePredicate; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; -import com.palantir.atlasdb.encoding.PtBytes; -import com.palantir.atlasdb.keyvalue.api.RangeRequests; -import com.palantir.atlasdb.keyvalue.api.TableReference; -import com.palantir.atlasdb.keyvalue.cassandra.CassandraClientPool; -import com.palantir.atlasdb.keyvalue.cassandra.TracingQueryRunner; -import com.palantir.atlasdb.keyvalue.cassandra.paging.CellPagerBatchSizingStrategy.PageSizes; -import com.palantir.atlasdb.keyvalue.cassandra.thrift.SlicePredicates; -import com.palantir.atlasdb.keyvalue.cassandra.thrift.SlicePredicates.Limit; -import com.palantir.atlasdb.keyvalue.cassandra.thrift.SlicePredicates.Range; -import com.palantir.common.annotation.Output; - -/* - * A class for iterating uniformly through raw cells, i.e. (rowName, columnName, timestamp) triplets, in a given table - * in lexicographic order (with timestamps being ordered in reverse). - * - * By "uniformly", we mean that the size of each page we load at once is bounded in terms of the number of - * raw cells rather than the number of rows, and page boundaries don't coincide with row (partition) boundaries. - * - * Cassandra Thrift API doesn't provide a way to do this directly. So we have to resort to a "clever" hybrid approach: - * fetch blocks of row "heads" using the "get_range_slices" call with a column-per-row limit, and then page - * through the remainder of rows that exceed the fetched "head" limit using the "get_slice" call. - * - * Calling "get_range_slices" can be visualized as getting a "rectangle" of data from Cassandra, whose width and height - * are equal to SliceRange.count and KeyRange.count, respectively. Consider an example table: - * - * |<-w=4->| - * ._______. ___ - * Row 1 |a b c | ^ - * Row 2 |d e f g|h i j | - * Row 3 |k | h=5 - * Row 4 |l m n o| | - * Row 5 |p______| _v_ - * Row 6 q r - * Row 7 s t u - * Row 8 v w x y z - * - * Here, we set KeyRange.count = 5 and SliceRange.count = 4. - * Since 3 cells are returned for Row 1 while we requested 4 cells per row, we can conclude that Row 1 is fully fetched, - * and thus we can return it to the user. However, Row 2 is not fully fetched, so we have to page through its remainder. - * - * Overall, paging through the entire table in the picture looks like this: - * - * 1) Get the first rectangle of data: - * get_range_slices(KeyRange.start_key = [], KeyRange.count = 5, SliceRange.count = 4) - * -> returns rows 1-5 with at most 4 columns in each - * 2) Return Row 1 and the first four cells of Row 2 to the user - * 3) Get the remainder of Row 2: - * get_slice(key = "Row 2", SliceRange.start = , SliceRange.count = cellBatchHint) - * -> [ h i j ] - * 4) Return [ h i j ] to the user - * 5) Return Rows 3 and 4 to the user (they were already fetched in step 1) - * 6) Since we got exactly 4 cells for row 4, we don't really know whether there are more, so we have to attempt - * to get its remainder: - * get_slice(key = "Row 4", SliceRange.start = , SliceRange,count = cellBatchHint) - * -> [] - * 7) Return Row 5 to the user - * 8) Now we finished the first rectangle, so it's time to get another one: - * get_range_slices(KeyRange.start_key = , KeyRange.count = 5, SliceRange.count = 4) - * -> returns rows 6-8 with at most 4 columns in each - * 9) Return Row 6, Row 7 and the first four columns of Row 8 to the user - * 10) Get the remainder of Row 8: - * get_slice(key = "Row 8", SliceRange.start = , SliceRange,count = cellBatchHint) - * -> [ z ] - * 11) Return [ z ] to the user - * - * Note that getting the remainder of a row might require more than one call "get_slice" if the row is wide. - * - * We want the area of each rectangle to be roughly equal to "cellBatchHint" which is supplied by the user. In order to - * choose reasonable width and height of the rectangle given the area, we estimate the ditribution of row widths - * and try to make a somewhat intelligent decision based on that data (see CellPagerBatchSizingStrategy for details). - */ -public class CellPager { - private final SingleRowColumnPager singleRowPager; - private final CassandraClientPool clientPool; - private final TracingQueryRunner queryRunner; - private final CellPagerBatchSizingStrategy pageSizeStrategy; - - public CellPager(SingleRowColumnPager singleRowPager, - CassandraClientPool clientPool, - TracingQueryRunner queryRunner, - CellPagerBatchSizingStrategy pageSizeStrategy) { - this.singleRowPager = singleRowPager; - this.clientPool = clientPool; - this.queryRunner = queryRunner; - this.pageSizeStrategy = pageSizeStrategy; - } - - public Iterator> createCellIterator(TableReference tableRef, - byte[] startRowInclusive, - int cellBatchHint, - ConsistencyLevel consistencyLevel) { - Preconditions.checkNotNull(startRowInclusive, - "Use an empty byte array rather than null to start from the beginning of the table"); - Preconditions.checkArgument(cellBatchHint > 0, "cellBatchHint must be strictly positive"); - RowRangeLoader rowRangeLoader = new RowRangeLoader(clientPool, queryRunner, consistencyLevel, tableRef); - return new PageIterator(tableRef, rowRangeLoader, cellBatchHint, consistencyLevel, startRowInclusive); - } - - private class PageIterator extends AbstractIterator> { - private final TableReference tableRef; - private final RowRangeLoader rowRangeLoader; - private final int cellBatchHint; - private final ConsistencyLevel consistencyLevel; - - private byte[] nextRow; - - private final Queue>> cellsToReturn = new ArrayDeque<>(); - private final StatsAccumulator stats = new StatsAccumulator(); - private PageSizes pageSizes = null; - - PageIterator(TableReference tableRef, RowRangeLoader rowRangeLoader, int cellBatchHint, - ConsistencyLevel consistencyLevel, byte[] nextRow) { - this.tableRef = tableRef; - this.rowRangeLoader = rowRangeLoader; - this.cellBatchHint = cellBatchHint; - this.consistencyLevel = consistencyLevel; - this.nextRow = nextRow; - } - - @Override - protected List computeNext() { - while (true) { - if (cellsToReturn.isEmpty()) { - // We have no pre-loaded rows from the previous "rectangle", so we need to fetch more if we can - if (nextRow == null) { - return endOfData(); - } else { - // Get the next "rectangle" of data (step 8 in the example above) and try again - fetchNextRange(); - } - } else if (cellsToReturn.peek().hasNext()) { - // Scan through the remainder of the row (steps 3, 6 and 10 in the example above) - return cellsToReturn.peek().next(); - } else { - // We exhausted an iterator - pop it from the queue and try again - cellsToReturn.poll(); - } - } - } - - private void fetchNextRange() { - pageSizes = pageSizeStrategy.computePageSizes(cellBatchHint, stats); - KeyRange range = KeyRanges.createKeyRange(nextRow, PtBytes.EMPTY_BYTE_ARRAY, pageSizes.rowLimit); - SlicePredicate predicate = SlicePredicates.create(Range.ALL, Limit.of(pageSizes.columnPerRowLimit)); - List slices = rowRangeLoader.getRows(range, predicate); - if (slices.isEmpty()) { - nextRow = null; - } else { - cellsToReturn.addAll(splitFetchedRowsIntoTasks( - slices, - pageSizes.columnPerRowLimit, - (rowKey, lastSeenColumn) -> singleRowPager.createColumnIterator( - tableRef, rowKey, cellBatchHint, lastSeenColumn, consistencyLevel), - stats)); - computeNextStartRow(slices); - } - } - - private void computeNextStartRow(List slices) { - if (slices.size() < pageSizes.rowLimit) { - nextRow = null; - } else { - byte[] lastSeenRow = Iterables.getLast(slices).getKey(); - nextRow = RangeRequests.getNextStartRowUnlessTerminal( - false, RangeRequests.nextLexicographicName(lastSeenRow)); - } - } - } - - @VisibleForTesting - interface RowRemainderIteratorFactory { - Iterator> createIteratorForRemainderOfRow(byte[] rowKey, Column lastSeenColumn); - } - - @VisibleForTesting - static List>> splitFetchedRowsIntoTasks( - List slices, - int columnPerRowLimit, - RowRemainderIteratorFactory rowRemainderIteratorFactory, - @Output StatsAccumulator statsToAccumulate) { - // Split the returned slices into single partially fetched rows and contiguous runs of fully fetched rows - List>> ret = new ArrayList<>(); - List loadedRows = new ArrayList<>(); - for (KeySlice slice : slices) { - loadedRows.add(slice); - if (isFullyFetched(slice, columnPerRowLimit)) { - statsToAccumulate.add(slice.getColumnsSize()); - } else { - ret.add(ImmutableList.of(keySlicesToCells(loadedRows)).iterator()); - loadedRows.clear(); - // If the row was only partially fetched, we enqueue an iterator to page through - // the remainder of that single row. - Column lastSeenColumn = Iterables.getLast(slice.getColumns()).column; - Iterator> rawColumnIter = rowRemainderIteratorFactory - .createIteratorForRemainderOfRow(slice.getKey(), lastSeenColumn); - Iterator> statsUpdatingIter = new StatsUpdatingIterator( - rawColumnIter, statsToAccumulate, slice.getColumnsSize()); - ret.add(Iterators.transform( - statsUpdatingIter, cols -> columnsToCells(cols, slice.getKey()))); - } - } - if (!loadedRows.isEmpty()) { - ret.add(ImmutableList.of(keySlicesToCells(loadedRows)).iterator()); - } - return ret; - } - - private static boolean isFullyFetched(KeySlice slice, int columnPerRowLimit) { - return slice.getColumnsSize() < columnPerRowLimit; - } - - private static class StatsUpdatingIterator extends AbstractIterator> { - private final Iterator> delegate; - private final StatsAccumulator stats; - private long columnCount = 0; - - StatsUpdatingIterator(Iterator> delegate, StatsAccumulator stats, long columnCount) { - this.delegate = delegate; - this.stats = stats; - this.columnCount = columnCount; - } - - @Override - protected List computeNext() { - if (delegate.hasNext()) { - List cols = delegate.next(); - columnCount += cols.size(); - return cols; - } else { - stats.add(columnCount); - return endOfData(); - } - } - } - - private static List keySlicesToCells(List slices) { - List ret = new ArrayList<>(); - for (KeySlice slice : slices) { - for (ColumnOrSuperColumn col : slice.getColumns()) { - ret.add(ImmutableCassandraRawCellValue.builder() - .rowKey(slice.getKey()) - .column(col.getColumn()) - .build()); - } - } - return ret; - } - - private static List columnsToCells(List columns, byte[] rowKey) { - return Lists.transform(columns, col -> ImmutableCassandraRawCellValue.builder() - .rowKey(rowKey) - .column(col.getColumn()) - .build()); - } - -} diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CellPagerBatchSizingStrategy.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CellPagerBatchSizingStrategy.java deleted file mode 100644 index f69a3da0164..00000000000 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CellPagerBatchSizingStrategy.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.cassandra.paging; - -public class CellPagerBatchSizingStrategy { - public PageSizes computePageSizes(int cellBatchHint, StatsAccumulator stats) { - if (stats.count() == 0) { - // No data yet: go with a square - return getExactPageSize(Math.round(Math.sqrt(cellBatchHint)), cellBatchHint); - } else { - return computePageSizesAssumingNormalDistribution( - cellBatchHint, stats.mean(), stats.populationStandardDeviation()); - } - } - - /* Given the batch size in cells supplied by the user, we want to figure out the optimal (or at least decent) - * batching parameters for Cassandra. The `get_range_slices' call allows us to load a "rectangle" - * of cells at once: - * - * |<-w=4->| - * ._______. ___ - * Row 1 |x x x x| ^ (each 'x' is a (cell, ts) pair, i.e. a single "column" in Cassandra) - * Row 2 |x x | | - * Row 3 |x x x | h=5 - * Row 4 |x x x x|x x | - * Row 5 |x______| _v_ - * Row 6 x x - * Row 7 x x x x - * Row 8 x x x x x - * ... - * Row N x x - * - * Here, the height of the rectangle "h" is KeyRange.count and the width "w" is SliceRange.count. - * We want the area of our rectangle to be on the order of "cellBatchHint" supplied by the user, i.e. - * - * w * h ~ cellBatchHint (1) - * - * The question is how to choose "w" and "h" appropriately. If a row contains "w" or more columns (like rows 1 and - * 4 in the picture above), we need to make at least one extra round trip to retrieve that row individually - * using the "get_slice" call. Thus, setting "w" too low will result in extra round trips to retrieve the individual - * rows, while setting it too high would make "h" low (since "w" and "h" are inversely proportional as in (1)), - * resulting in more round trips to make progress "vertically". - * - * It makes sense to try to minimize the number of round trips to Cassandra. Let's assume that rows will be short - * enough to fit in "cellBatchHint", i.e. whenever we need to fetch the remainder of a row, we can do so with - * a single call to "get_slice" with SliceRange.count set to "cellBatchHint". (If the assumption doesn't hold - * and our rows are wide, then a particular choice of "w" and "h" doesn't have as much impact on the number of round - * trips -- we need to page through the individual rows anyway). - * - * Then for a table with N rows, the number of round trips is approximately equal to - * - * (N / h) + (# of rows with >= w columns) - * - * Let F(k) be the cumulative distribution function of the number of columns per row. Also, remember from (1) - * that "h" is on the order of "cellBatchHint / w". Hence the above expression is approximately equal to - * - * N * w / cellBatchHint + N * (1 - F(w - 1)) - * - * Minimizing this is equivalent to minimizing - * - * w / cellBatchHint - F(w - 1) (2) - * - * So if we had an adequate model of F, we could estimate its parameters and minimize (2) with respect to "w". - * The problem is that the nature of F can depend on the format of a particular table. For example, an append-only - * table with five named columns where all five columns are always written at once for every row, will have - * a degenerate distribution localized at 5; a single-column table with mutations might exhibit a Poisson-like - * behavior; while a table with dynamic columns can be shaped according to an arbitrary distribution depending - * on the user data. - * - * One other possibility is to compute a discrete approximation to the PDF/CDF instead of choosing a fixed family - * of distributions and estimating its parameters, but that seems both excessive and expensive. - * - * Pragmatically speaking, we want a formula like - * - * w = < estimated mean # columns per row > + 1 + g(sigma, cellBatchHint) (3) - * - * where g(sigma, cellBatchHint) is some term that grows with variance sigma^2. The term is necessary to efficiently - * handle both the "append-only fixed-column table" (zero variance) and the "partially dirty table" (high variance) - * cases. - * - * If we minimize (2) in the case of F being the normal distribution, we get (after doing all the calculus): - * - * w = mu + 1 + sigma * sqrt(-2 * ln(sqrt(2*pi*sigma^2) / cellBatchHint)) (4) - * - * where "mu" is the mean and "sigma" is the standard deviation. This actually looks like (3) for sufficiently - * small values of sigma, with - * - * g(sigma, cellBatchHint) = sigma * sqrt(-2 * ln(sqrt(2*pi*sigma^2) / cellBatchHint)) (5) - * - * Of course, a real table can't have normally distributed row widths (simply because the - * row width is a discrete quantity), but: - * - * - the normal distribution approximates the "clean narrow table" (low variance) case quite well, - * and is a half-decent approximation to the Poisson distribution (the "single-column dirty" case); - * - it's easy to estimate the parameters for it (mu and sigma), as well as to do the calculus exercise - * to get (4); - * - the numbers it produces actually seem reasonable. - * - * To illustrate the last point, here is a table of g(sigma, cellBatchHint) for some values of sigma. (The physical - * meaning of "g" is the number of extra columns to fetch per row in addition to the mean number of columns per row) - * - * cellBatchHint = 1000 cellBatchHint = 100 - * - * sigma | g(sigma, 1000) sigma | g(sigma, 100) - * --------+--------------- --------+-------------- - * 0 | 0.00 0 | 0.00 - * 0.5 | 1.82 0.5 | 1.48 - * 1 | 3.46 1 | 2.72 - * 2 | 6.51 2 | 4.89 - * 5 | 14.80 5 | 10.19 - * 10 | 27.15 10 | 16.64 - * 20 | 48.93 - * 50 | 101.90 - * 100 | 166.35 - */ - private static PageSizes computePageSizesAssumingNormalDistribution( - int cellBatchHint, - double columnsPerRowMean, - double columnsPerRowStdDev) { - double sigma = Math.min(columnsPerRowStdDev, cellBatchHint * 0.2); - // If sigma > cellBatchHint/sqrt(2*pi), then the formula doesn't work. - // At that point, variance in row length is so big that we can't do much: - // just use the minimum columnPerRowLimit to at least minimize the number of calls - // to get_range_slice. - double logArg = Math.min(1.0, SQRT_2_PI * sigma / cellBatchHint); - // "extraColumnsToFetch" is the value of "g" defined in (5) - // Mathematically, the limit of x*ln(x) is 0 as x->0, but numerically we need to compute the logarithm - // separately (unless we use a Taylor approximation to x*ln(x) instead, which we don't want), - // so we handle the case of small "s" separately to avoid the logarithm blow up at 0. - double extraColumnsToFetch = logArg < 1e-12 ? 0.0 : sigma * Math.sqrt(-2.0 * Math.log(logArg)); - return getExactPageSize(columnsPerRowMean + 1.0 + extraColumnsToFetch, cellBatchHint); - } - - private static PageSizes getExactPageSize(double desiredColumnPerRowLimit, int cellBatchHint) { - int approximateColumnPerRowLimit = Math.max(2, - Math.min(cellBatchHint, (int) Math.round(desiredColumnPerRowLimit))); - int rowLimit = Math.max(1, cellBatchHint / approximateColumnPerRowLimit); - int finalColumnPerRowLimit = Math.max(2, cellBatchHint / rowLimit); - return new PageSizes(rowLimit, finalColumnPerRowLimit); - } - - public static class PageSizes { - final int rowLimit; - final int columnPerRowLimit; - - public PageSizes(int rowLimit, int columnPerRowLimit) { - this.rowLimit = rowLimit; - this.columnPerRowLimit = columnPerRowLimit; - } - } - - private static final double SQRT_2_PI = Math.sqrt(2.0 * Math.PI); - -} diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/KeyRanges.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/KeyRanges.java deleted file mode 100644 index 75642ebb411..00000000000 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/KeyRanges.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.cassandra.paging; - -import org.apache.cassandra.thrift.KeyRange; - -import com.palantir.atlasdb.keyvalue.api.RangeRequests; - -public final class KeyRanges { - private KeyRanges() {} - - public static KeyRange createKeyRange(byte[] startKey, byte[] endExclusive, int limit) { - KeyRange keyRange = new KeyRange(limit); - keyRange.setStart_key(startKey); - if (endExclusive.length == 0) { - keyRange.setEnd_key(endExclusive); - } else { - // We need the previous name because this is inclusive, not exclusive - keyRange.setEnd_key(RangeRequests.previousLexicographicName(endExclusive)); - } - return keyRange; - } - -} diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/RowRangeLoader.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/RowGetter.java similarity index 90% rename from atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/RowRangeLoader.java rename to atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/RowGetter.java index 52edab589a4..6dcf07a1ca3 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/RowRangeLoader.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/RowGetter.java @@ -25,7 +25,6 @@ import org.apache.cassandra.thrift.KeySlice; import org.apache.cassandra.thrift.SlicePredicate; import org.apache.cassandra.thrift.UnavailableException; -import org.apache.thrift.TException; import com.palantir.atlasdb.keyvalue.api.InsufficientConsistencyException; import com.palantir.atlasdb.keyvalue.api.TableReference; @@ -34,13 +33,13 @@ import com.palantir.atlasdb.keyvalue.cassandra.TracingQueryRunner; import com.palantir.common.base.FunctionCheckedException; -public class RowRangeLoader { +public class RowGetter { private CassandraClientPool clientPool; private TracingQueryRunner queryRunner; private ConsistencyLevel consistency; private TableReference tableRef; - public RowRangeLoader( + public RowGetter( CassandraClientPool clientPool, TracingQueryRunner queryRunner, ConsistencyLevel consistency, @@ -51,14 +50,14 @@ public RowRangeLoader( this.tableRef = tableRef; } - public List getRows(KeyRange keyRange, SlicePredicate slicePredicate) { + public List getRows(KeyRange keyRange, SlicePredicate slicePredicate) throws Exception { ColumnParent colFam = new ColumnParent(CassandraKeyValueServiceImpl.internalTableName(tableRef)); InetSocketAddress host = clientPool.getRandomHostForKey(keyRange.getStart_key()); return clientPool.runWithRetryOnHost( host, - new FunctionCheckedException, RuntimeException>() { + new FunctionCheckedException, Exception>() { @Override - public List apply(Cassandra.Client client) throws RuntimeException { + public List apply(Cassandra.Client client) throws Exception { try { return queryRunner.run(client, tableRef, () -> client.get_range_slices(colFam, slicePredicate, keyRange, consistency)); @@ -67,10 +66,8 @@ public List apply(Cassandra.Client client) throws RuntimeException { throw new InsufficientConsistencyException("This operation requires all Cassandra" + " nodes to be up and available.", e); } else { - throw new RuntimeException(e); + throw e; } - } catch (TException e) { - throw new RuntimeException(e); } } diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/SingleRowColumnPager.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/SingleRowColumnPager.java deleted file mode 100644 index ff00fdeb520..00000000000 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/SingleRowColumnPager.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.cassandra.paging; - -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.util.Iterator; -import java.util.List; -import java.util.Optional; - -import javax.annotation.Nullable; - -import org.apache.cassandra.thrift.Cassandra; -import org.apache.cassandra.thrift.Column; -import org.apache.cassandra.thrift.ColumnOrSuperColumn; -import org.apache.cassandra.thrift.ColumnParent; -import org.apache.cassandra.thrift.ConsistencyLevel; -import org.apache.cassandra.thrift.SlicePredicate; -import org.apache.cassandra.thrift.SliceRange; -import org.apache.thrift.TException; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.palantir.atlasdb.encoding.PtBytes; -import com.palantir.atlasdb.keyvalue.api.RangeRequests; -import com.palantir.atlasdb.keyvalue.api.TableReference; -import com.palantir.atlasdb.keyvalue.cassandra.CassandraClientPool; -import com.palantir.atlasdb.keyvalue.cassandra.CassandraKeyValueServiceImpl; -import com.palantir.atlasdb.keyvalue.cassandra.CassandraKeyValueServices; -import com.palantir.atlasdb.keyvalue.cassandra.TracingQueryRunner; -import com.palantir.util.Pair; - -/** Iterates over raw Cassandra Columns of a single given row in batches. */ -public class SingleRowColumnPager { - private final CassandraClientPool clientPool; - private final TracingQueryRunner queryRunner; - - public SingleRowColumnPager(CassandraClientPool clientPool, TracingQueryRunner queryRunner) { - this.clientPool = clientPool; - this.queryRunner = queryRunner; - } - - public Iterator> createColumnIterator(TableReference tableRef, - byte[] rowKey, - int pageSize, - @Nullable Column startColumnExclusive, - ConsistencyLevel consistencyLevel) { - ColumnParent columnParent = new ColumnParent(CassandraKeyValueServiceImpl.internalTableName(tableRef)); - return new PageIterator( - tableRef, columnParent, ByteBuffer.wrap(rowKey), pageSize, consistencyLevel, startColumnExclusive); - } - - private class PageIterator extends AbstractIterator> { - private final TableReference tableRef; - private final ColumnParent columnParent; - private final ByteBuffer rowKey; - private final int pageSize; - private final ConsistencyLevel consistencyLevel; - private Column lastSeenColumn; - private boolean reachedEnd = false; - - PageIterator(TableReference tableRef, ColumnParent columnParent, ByteBuffer rowKey, int pageSize, - ConsistencyLevel consistencyLevel, Column lastSeenColumn) { - this.tableRef = tableRef; - this.columnParent = columnParent; - this.rowKey = rowKey; - this.pageSize = pageSize; - this.consistencyLevel = consistencyLevel; - this.lastSeenColumn = lastSeenColumn; - } - - @Override - protected List computeNext() { - if (reachedEnd) { - return endOfData(); - } else { - InetSocketAddress host = clientPool.getRandomHostForKey(rowKey.array()); - try { - List cells = clientPool.runWithRetryOnHost(host, this::getPage); - reachedEnd = cells.size() < pageSize; - if (cells.isEmpty()) { - return endOfData(); - } else { - lastSeenColumn = Iterables.getLast(cells).column; - return cells; - } - } catch (TException e) { - throw new RuntimeException(e); - } - } - } - - private List getPage(Cassandra.Client client) throws TException { - Optional startColumn = getStartColumn(lastSeenColumn); - if (startColumn.isPresent()) { - SliceRange sliceRange = new SliceRange( - startColumn.get(), - ByteBuffer.wrap(PtBytes.EMPTY_BYTE_ARRAY), - false, // reversed - pageSize); - SlicePredicate slicePred = new SlicePredicate(); - slicePred.setSlice_range(sliceRange); - return queryRunner.run(client, tableRef, - () -> client.get_slice(rowKey, columnParent, slicePred, consistencyLevel)); - } else { - return ImmutableList.of(); - } - } - } - - // Given the last seen column, compute the next start column - @VisibleForTesting - static Optional getStartColumn(@Nullable Column lastSeenColumn) { - if (lastSeenColumn == null) { - // An empty byte array means "start from the beginning of the row" - return Optional.of(ByteBuffer.wrap(PtBytes.EMPTY_BYTE_ARRAY)); - } else { - // We need to "increment" the last seen (colName, ts) pair to the lexicographically next pair. - // Note that timestamps are stored in descending order. For example, the next pair for ("a", 5) - // will be ("a", 4). - Pair colNameAndTs = CassandraKeyValueServices.decomposeName(lastSeenColumn); - if (colNameAndTs.getRhSide() == Long.MIN_VALUE) { - // This is an edge case that will never happen in reality, but technically we have - // to handle it anyway. If the last seen timestamp equals minimum possible value of a 64-bit - // signed integer, we can't decrement it, so we have to carry over to the colName. - // E.g. ("a", Long.MIN_VALUE) -> ("b", Long.MAX_VALUE) - byte[] nextColName = RangeRequests.getNextStartRowUnlessTerminal(false, colNameAndTs.getLhSide()); - if (nextColName == null) { - // An even "edgier" case: in addition to reaching the minimum timestamp, we also reached - // the last possible colName, so there is nowhere to increment. - return Optional.empty(); - } else { - return Optional.of(CassandraKeyValueServices.makeCompositeBuffer(nextColName, Long.MAX_VALUE)); - } - } else { - // Timestamps are in descending order, so just decrement the timestamp to find the next column. - // E.g., ("a", 4) -> ("a", 3) - return Optional.of(CassandraKeyValueServices.makeCompositeBuffer( - colNameAndTs.getLhSide(), - colNameAndTs.getRhSide() - 1)); - } - } - } -} diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/StatsAccumulator.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/StatsAccumulator.java deleted file mode 100644 index f53093a8edc..00000000000 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/paging/StatsAccumulator.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.cassandra.paging; - -// TODO(gsheasby): Replace this with StatsAccumulator from Guava 20.0 if we ever bump the dependency. -public class StatsAccumulator { - private double mean = 0.0; - private double sumOfDeltaSquares = 0.0; - private long count = 0; - - long count() { - return count; - } - - void add(double value) { - count += 1; - double delta = value - mean; - mean += delta / count; - sumOfDeltaSquares += delta * (value - mean); - } - - double mean() { - return mean; - } - - double populationStandardDeviation() { - return Math.sqrt(Math.max(0.0, sumOfDeltaSquares) / count); - } -} diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/sweep/CassandraGetCandidateCellsForSweepingImpl.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/sweep/CassandraGetCandidateCellsForSweepingImpl.java deleted file mode 100644 index f282334d195..00000000000 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/sweep/CassandraGetCandidateCellsForSweepingImpl.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.cassandra.sweep; - -import java.util.Iterator; -import java.util.List; -import java.util.Optional; - -import org.apache.cassandra.thrift.ConsistencyLevel; - -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.primitives.Longs; -import com.palantir.atlasdb.AtlasDbConstants; -import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweeping; -import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweepingRequest; -import com.palantir.atlasdb.keyvalue.api.Cell; -import com.palantir.atlasdb.keyvalue.api.ImmutableCandidateCellForSweeping; -import com.palantir.atlasdb.keyvalue.api.TableReference; -import com.palantir.atlasdb.keyvalue.cassandra.CassandraKeyValueServices; -import com.palantir.atlasdb.keyvalue.cassandra.paging.CassandraRawCellValue; -import com.palantir.atlasdb.keyvalue.cassandra.paging.CellPager; -import com.palantir.util.Pair; - -import gnu.trove.list.TLongList; -import gnu.trove.list.array.TLongArrayList; - -/* - * Simply use the CellPager to iterate over raw cells in a table and group the returned entries by the cell key. - */ -public class CassandraGetCandidateCellsForSweepingImpl { - - private final CellPager cellPager; - - public CassandraGetCandidateCellsForSweepingImpl(CellPager cellPager) { - this.cellPager = cellPager; - } - - public Iterator> getCandidateCellsForSweeping( - TableReference tableRef, - CandidateCellForSweepingRequest request, - ConsistencyLevel consistencyLevel) { - Iterator> rawIter = cellPager.createCellIterator( - tableRef, - request.startRowInclusive(), - request.batchSizeHint().orElse(AtlasDbConstants.DEFAULT_SWEEP_CANDIDATE_BATCH_HINT), - consistencyLevel); - return new CellGroupingIterator(rawIter, request); - } - - private static class CellGroupingIterator extends AbstractIterator> { - private final CandidateCellForSweepingRequest request; - - private final Iterator> rawIter; - private Cell currentCell = null; - private final TLongList currentTimestamps = new TLongArrayList(); - private boolean currentLatestValEmpty; - private long numCellTsPairsExamined = 0; - private boolean reachedEnd = false; - - CellGroupingIterator(Iterator> rawIter, CandidateCellForSweepingRequest request) { - this.rawIter = rawIter; - this.request = request; - } - - @Override - protected List computeNext() { - if (reachedEnd) { - return endOfData(); - } else { - List candidates = Lists.newArrayList(); - while (candidates.isEmpty() && rawIter.hasNext()) { - List cols = rawIter.next(); - for (CassandraRawCellValue col : cols) { - processColumn(col).ifPresent(candidates::add); - } - } - if (candidates.isEmpty()) { - reachedEnd = true; - if (!currentTimestamps.isEmpty()) { - // Since we reached the end of data, we know there can't be more timestamps for this cell, - // so create a candidate from them. - return ImmutableList.of(createCandidate()); - } else { - return endOfData(); - } - } else { - return candidates; - } - } - } - - private Optional processColumn(CassandraRawCellValue col) { - Optional maybeCandidate = Optional.empty(); - Pair colNameAndTs = CassandraKeyValueServices.decomposeName(col.getColumn()); - Cell cell = Cell.create(col.getRowKey(), colNameAndTs.getLhSide()); - if (!cell.equals(currentCell)) { - if (!currentTimestamps.isEmpty()) { - maybeCandidate = Optional.of(createCandidate()); - } - currentCell = cell; - } - long ts = colNameAndTs.getRhSide(); - if (ts < request.sweepTimestamp() && Longs.indexOf(request.timestampsToIgnore(), ts) < 0) { - if (currentTimestamps.isEmpty()) { - // Timestamps are in the decreasing order, so we pick the first timestamp below sweepTimestamp - // to check the value for emptiness - currentLatestValEmpty = request.shouldCheckIfLatestValueIsEmpty() - && col.getColumn().getValue().length == 0; - } - currentTimestamps.add(ts); - numCellTsPairsExamined += 1; - } - return maybeCandidate; - } - - private CandidateCellForSweeping createCandidate() { - currentTimestamps.reverse(); - long[] sortedTimestamps = currentTimestamps.toArray(); - currentTimestamps.clear(); - return ImmutableCandidateCellForSweeping.builder() - .cell(currentCell) - .sortedTimestamps(sortedTimestamps) - .isLatestValueEmpty(currentLatestValEmpty) - .numCellsTsPairsExamined(numCellTsPairsExamined) - .build(); - } - } - -} diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CellPagerBatchSizingStrategyTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CellPagerBatchSizingStrategyTest.java deleted file mode 100644 index 1a0e936792c..00000000000 --- a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CellPagerBatchSizingStrategyTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.cassandra.paging; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import org.junit.Test; - -import com.palantir.atlasdb.keyvalue.cassandra.paging.CellPagerBatchSizingStrategy.PageSizes; - -public class CellPagerBatchSizingStrategyTest { - - private final CellPagerBatchSizingStrategy strategy = new CellPagerBatchSizingStrategy(); - - @Test - public void testUseSquareIfNoStats() { - PageSizes pageSizes = strategy.computePageSizes(102, stats()); - assertEquals(10, pageSizes.columnPerRowLimit); - assertEquals(10, pageSizes.rowLimit); - } - - @Test - public void testZeroVariance() { - PageSizes pageSizes = strategy.computePageSizes(100, stats(4, 4, 4, 4, 4)); - assertEquals(5, pageSizes.columnPerRowLimit); - assertEquals(20, pageSizes.rowLimit); - } - - @Test - public void testNonZeroVariance() { - PageSizes pageSizes = strategy.computePageSizes(100, stats(4, 5, 4, 5, 4, 5)); - assertTrue(pageSizes.columnPerRowLimit > 5); - assertTrue(pageSizes.rowLimit < 20); - } - - @Test - public void testReturnReasonableResults() { - for (double v = 1; v <= 200; v += 0.1) { - PageSizes pageSizes = strategy.computePageSizes(100, stats(1, 1, 1, v, v, v)); - assertTrue(pageSizes.rowLimit > 0); - assertTrue(pageSizes.rowLimit <= 100); - assertTrue(pageSizes.columnPerRowLimit >= 2); - assertTrue(pageSizes.columnPerRowLimit <= 100); - assertTrue(pageSizes.rowLimit * pageSizes.columnPerRowLimit <= 100); - } - } - - private StatsAccumulator stats(double... measurements) { - StatsAccumulator stats = new StatsAccumulator(); - for (double x : measurements) { - stats.add(x); - } - return stats; - } - -} diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CellPagerSplitFetchedRowsIntoTasksTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CellPagerSplitFetchedRowsIntoTasksTest.java deleted file mode 100644 index 2f975659cf5..00000000000 --- a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/paging/CellPagerSplitFetchedRowsIntoTasksTest.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.cassandra.paging; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import org.apache.cassandra.thrift.Column; -import org.apache.cassandra.thrift.ColumnOrSuperColumn; -import org.apache.cassandra.thrift.KeySlice; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.primitives.Ints; -import com.palantir.atlasdb.keyvalue.cassandra.CassandraKeyValueServices; - -public class CellPagerSplitFetchedRowsIntoTasksTest { - - private StatsAccumulator accumulator; - private List recordedRowWidths; - - @Before - public void mockStatsAccumulator() { - accumulator = Mockito.mock(StatsAccumulator.class); - recordedRowWidths = new ArrayList<>(); - Mockito.doAnswer(inv -> { - recordedRowWidths.add(inv.getArgumentAt(0, double.class)); - return null; - }).when(accumulator).add(Mockito.anyDouble()); - } - - @Test - public void testTwoFullyFetchedRows() { - // Row 100: 1 - // Row 200: 1 2 - List>> iterators = CellPager.splitFetchedRowsIntoTasks( - ImmutableList.of(keySlice(100, 1), keySlice(200, 2)), - 3, - (rowName, lastSeenColumn) -> { - Assert.fail("Didn't expect this to be called - there are no partially fetch rows"); - return null; - }, - accumulator); - assertEquals(1, iterators.size()); - List> onlyIterCopy = ImmutableList.copyOf(Iterables.getOnlyElement(iterators)); - assertEquals( - ImmutableList.of(ImmutableList.of(rawCellValue(100, 1), rawCellValue(200, 1), rawCellValue(200, 2))), - onlyIterCopy); - assertEquals(ImmutableList.of(1.0, 2.0), recordedRowWidths); - } - - @Test - public void testSinglePartiallyFetchedRow() { - // Row 100: 1 2 3 4 5 - List>> iterators = CellPager.splitFetchedRowsIntoTasks( - ImmutableList.of(keySlice(100, 3)), - 3, - (rowName, lastSeenColumn) -> { - assertArrayEquals(key(100), rowName); - assertEquals(column(3), lastSeenColumn); - return ImmutableList.>of( - ImmutableList.of(columnOrSuperColumn(4), columnOrSuperColumn(5))).iterator(); - }, - accumulator); - assertEquals(2, iterators.size()); - // The first portion of cells consists of the first three columns of our row - assertEquals( - ImmutableList.of(ImmutableList.of( - rawCellValue(100, 1), rawCellValue(100, 2), rawCellValue(100, 3))), - ImmutableList.copyOf(iterators.get(0))); - // The second portion of cells is the remainder of our row that we returned from - // the mock RowRemainderIteratorFactory. - assertEquals( - ImmutableList.of(ImmutableList.of(rawCellValue(100, 4), rawCellValue(100, 5))), - ImmutableList.copyOf(iterators.get(1))); - assertEquals(ImmutableList.of(5.0), recordedRowWidths); - } - - @Test - public void testPartiallyFetchedRowInTheMiddle() { - // Row 100: 1 - // Row 200: 1 2 3 4 5 - // Row 300: 1 2 - List>> iterators = CellPager.splitFetchedRowsIntoTasks( - ImmutableList.of(keySlice(100, 1), keySlice(200, 3), keySlice(300, 2)), - 3, - (rowName, lastSeenColumn) -> { - assertArrayEquals(key(200), rowName); - assertEquals(column(3), lastSeenColumn); - return ImmutableList.>of( - ImmutableList.of(columnOrSuperColumn(4), columnOrSuperColumn(5))).iterator(); - }, - accumulator); - assertEquals(3, iterators.size()); - // The first portion of cells includes Row 100 and the fetched part of Row 200 (cells 1, 2 and 3) - assertEquals( - ImmutableList.of(ImmutableList.of( - rawCellValue(100, 1), rawCellValue(200, 1), rawCellValue(200, 2), rawCellValue(200, 3))), - ImmutableList.copyOf(iterators.get(0))); - // The second portion of cells consists of the remainder of row 200 that we returned from - // the mock RowRemainderIteratorFactory. - assertEquals( - ImmutableList.of(ImmutableList.of(rawCellValue(200, 4), rawCellValue(200, 5))), - ImmutableList.copyOf(iterators.get(1))); - // The third (and last) portion of cells is Row 300 - assertEquals( - ImmutableList.of(ImmutableList.of(rawCellValue(300, 1), rawCellValue(300, 2))), - ImmutableList.copyOf(iterators.get(2))); - // We expect the widths of Rows 100 and 300 to be recorded first because they are fully fetched. - // The width of Row 200 should be recorded only after we exhaust the corresponding iterator. - assertEquals(ImmutableList.of(1.0, 2.0, 5.0), recordedRowWidths); - } - - private static KeySlice keySlice(int row, int size) { - List columns = new ArrayList<>(size); - for (int col = 1; col <= size; ++col) { - columns.add(columnOrSuperColumn(col)); - } - return new KeySlice(ByteBuffer.wrap(key(row)), columns); - } - - private static CassandraRawCellValue rawCellValue(int row, int col) { - return ImmutableCassandraRawCellValue.builder() - .rowKey(key(row)) - .column(column(col)) - .build(); - } - - private static ColumnOrSuperColumn columnOrSuperColumn(int col) { - ColumnOrSuperColumn cosc = new ColumnOrSuperColumn(); - cosc.setColumn(column(col)); - return cosc; - } - - private static Column column(int col) { - Column column = new Column(); - column.setName(CassandraKeyValueServices.makeCompositeBuffer(key(col), 1000L)); - column.setValue(new byte[] { 1, 2, 3 }); - column.setTimestamp(1000L); - return column; - } - - private static byte[] key(int number) { - return Ints.toByteArray(number); - } -} diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/paging/KeyRangesTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/paging/KeyRangesTest.java deleted file mode 100644 index de042c042d5..00000000000 --- a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/paging/KeyRangesTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.cassandra.paging; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - -import org.apache.cassandra.thrift.KeyRange; -import org.junit.Test; - -import com.palantir.atlasdb.encoding.PtBytes; -import com.palantir.atlasdb.keyvalue.api.RangeRequests; - -public class KeyRangesTest { - @Test - public void createKeyRangeWithExplicitEnd() { - byte[] start = PtBytes.toBytes("start"); - byte[] end = PtBytes.toBytes("the-end"); - byte[] enc = RangeRequests.previousLexicographicName(end); - int limit = 7; - KeyRange keyRange = KeyRanges.createKeyRange(start, end, limit); - - assertArrayEquals(start, keyRange.getStart_key()); - assertArrayEquals(enc, keyRange.getEnd_key()); - assertEquals(limit, keyRange.getCount()); - } - - @Test - public void createKeyRangeWithEmptyEnd() { - byte[] start = PtBytes.toBytes("start"); - byte[] end = PtBytes.toBytes(""); - int limit = 7; - KeyRange keyRange = KeyRanges.createKeyRange(start, end, limit); - - assertArrayEquals(start, keyRange.getStart_key()); - assertArrayEquals(end, keyRange.getEnd_key()); - assertEquals(limit, keyRange.getCount()); - } -} diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/paging/SingleRowColumnPagerTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/paging/SingleRowColumnPagerTest.java deleted file mode 100644 index 906b06eaf1b..00000000000 --- a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/paging/SingleRowColumnPagerTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.cassandra.paging; - -import static org.junit.Assert.assertEquals; - -import java.nio.ByteBuffer; -import java.util.Optional; - -import org.apache.cassandra.thrift.Column; -import org.junit.Test; - -import com.palantir.atlasdb.encoding.PtBytes; -import com.palantir.atlasdb.keyvalue.api.Cell; -import com.palantir.atlasdb.keyvalue.api.RangeRequests; -import com.palantir.atlasdb.keyvalue.cassandra.CassandraKeyValueServices; - -public class SingleRowColumnPagerTest { - - @Test - public void startShouldBeEmptyWhenThereIsNoLastSeenColumn() { - assertEquals( - Optional.of(ByteBuffer.wrap(PtBytes.EMPTY_BYTE_ARRAY)), - SingleRowColumnPager.getStartColumn(null)); - } - - @Test - public void startShouldBeOneBelowPreviousTimestamp() { - assertEquals( - Optional.of(CassandraKeyValueServices.makeCompositeBuffer(new byte[] { 1, 2, 3 }, 999L)), - SingleRowColumnPager.getStartColumn(new Column( - CassandraKeyValueServices.makeCompositeBuffer(new byte[] { 1, 2, 3 }, 1000L)))); - } - - @Test - public void startShouldRollOverToNextColumnNameWhenMinimumTimestampReached() { - assertEquals( - Optional.of(CassandraKeyValueServices.makeCompositeBuffer( - RangeRequests.nextLexicographicName(new byte[] {1, 2, 3 }), Long.MAX_VALUE)), - SingleRowColumnPager.getStartColumn(new Column( - CassandraKeyValueServices.makeCompositeBuffer(new byte[] { 1, 2, 3 }, Long.MIN_VALUE)))); - } - - @Test - public void startShouldBeEmptyWhenLastPossiblePointReached() { - assertEquals( - Optional.empty(), - SingleRowColumnPager.getStartColumn(new Column( - CassandraKeyValueServices.makeCompositeBuffer(lastPossibleColumnName(), Long.MIN_VALUE)))); - } - - private static byte[] lastPossibleColumnName() { - byte[] ret = new byte[Cell.MAX_NAME_LENGTH]; - for (int i = 0; i < Cell.MAX_NAME_LENGTH; ++i) { - ret[i] |= 0xff; - } - return ret; - } - -} diff --git a/atlasdb-cli/src/main/java/com/palantir/atlasdb/cli/command/SweepCommand.java b/atlasdb-cli/src/main/java/com/palantir/atlasdb/cli/command/SweepCommand.java index 4de1dda41ae..2e89e17e7a8 100644 --- a/atlasdb-cli/src/main/java/com/palantir/atlasdb/cli/command/SweepCommand.java +++ b/atlasdb-cli/src/main/java/com/palantir/atlasdb/cli/command/SweepCommand.java @@ -98,7 +98,7 @@ public class SweepCommand extends SingleBackendCommand { @Option(name = {"--candidate-batch-hint"}, description = "Approximate number of candidate (cell, timestamp) pairs to load at once (default: " - + AtlasDbConstants.DEFAULT_SWEEP_CANDIDATE_BATCH_HINT + ")") + + AtlasDbConstants.DEFAULT_SWEEP_CANDIDATE_BATCH_HINT_MINIMUM + ")") Integer candidateBatchHint; @Option(name = {"--read-limit"}, @@ -238,7 +238,7 @@ private SweepBatchConfig getSweepBatchConfig() { .candidateBatchSize(chooseBestValue( candidateBatchHint, batchSize, - AtlasDbConstants.DEFAULT_SWEEP_CANDIDATE_BATCH_HINT)) + AtlasDbConstants.DEFAULT_SWEEP_CANDIDATE_BATCH_HINT_MINIMUM)) .deleteBatchSize(deleteBatchHint) .build(); } diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/AtlasDbConstants.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/AtlasDbConstants.java index 07c0530926e..3ce91b2cf0d 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/AtlasDbConstants.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/AtlasDbConstants.java @@ -109,7 +109,9 @@ private AtlasDbConstants() { public static final long DEFAULT_SWEEP_PAUSE_MILLIS = 5 * 1000; public static final long DEFAULT_SWEEP_PERSISTENT_LOCK_WAIT_MILLIS = 30_000L; public static final int DEFAULT_SWEEP_DELETE_BATCH_HINT = 1_000; - public static final int DEFAULT_SWEEP_CANDIDATE_BATCH_HINT = 1024; + public static final int DEFAULT_SWEEP_CANDIDATE_BATCH_HINT_CASSANDRA = 1; + public static final int DEFAULT_SWEEP_CANDIDATE_BATCH_HINT_NON_CASSANDRA = 1024; + public static final int DEFAULT_SWEEP_CANDIDATE_BATCH_HINT_MINIMUM = 1; public static final int DEFAULT_SWEEP_READ_LIMIT = 1_000; public static final int DEFAULT_STREAM_IN_MEMORY_THRESHOLD = 4 * 1024 * 1024; diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/GetCandidateCellsForSweepingShim.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/GetCandidateCellsForSweepingShim.java index 681d4e382a0..3468844cd61 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/GetCandidateCellsForSweepingShim.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/GetCandidateCellsForSweepingShim.java @@ -61,7 +61,8 @@ public ClosableIterator> getCandidateCellsForSwee CandidateCellForSweepingRequest request) { RangeRequest range = RangeRequest.builder() .startRowInclusive(request.startRowInclusive()) - .batchHint(request.batchSizeHint().orElse(AtlasDbConstants.DEFAULT_SWEEP_CANDIDATE_BATCH_HINT)) + .batchHint(request.batchSizeHint().orElse( + AtlasDbConstants.DEFAULT_SWEEP_CANDIDATE_BATCH_HINT_NON_CASSANDRA)) .build(); try (ReleasableCloseable>> valueResults = new ReleasableCloseable<>( getValues(tableRef, range, request.sweepTimestamp(), request.shouldCheckIfLatestValueIsEmpty())); diff --git a/atlasdb-commons/src/main/java/com/palantir/async/initializer/AsyncInitializer.java b/atlasdb-commons/src/main/java/com/palantir/async/initializer/AsyncInitializer.java index 43642527a29..4bfa6a17e28 100644 --- a/atlasdb-commons/src/main/java/com/palantir/async/initializer/AsyncInitializer.java +++ b/atlasdb-commons/src/main/java/com/palantir/async/initializer/AsyncInitializer.java @@ -71,6 +71,7 @@ private void tryInitializationLoop() { SafeArg.of("className", getInitializingClassName()), SafeArg.of("numberOfAttempts", numberOfInitializationAttempts), SafeArg.of("initializationDuration", System.currentTimeMillis() - initializationStartTime)); + singleThreadedExecutor.shutdown(); } catch (Throwable throwable) { log.info("Failed to initialize {} on the attempt {}", SafeArg.of("className", getInitializingClassName()), diff --git a/atlasdb-config/src/main/java/com/palantir/atlasdb/config/SweepConfig.java b/atlasdb-config/src/main/java/com/palantir/atlasdb/config/SweepConfig.java index 6997eaf44a1..4bd5ca8e4d9 100644 --- a/atlasdb-config/src/main/java/com/palantir/atlasdb/config/SweepConfig.java +++ b/atlasdb-config/src/main/java/com/palantir/atlasdb/config/SweepConfig.java @@ -16,6 +16,8 @@ package com.palantir.atlasdb.config; +import java.util.Optional; + import org.immutables.value.Value; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; @@ -25,7 +27,7 @@ @JsonDeserialize(as = ImmutableSweepConfig.class) @JsonSerialize(as = ImmutableSweepConfig.class) @Value.Immutable -public class SweepConfig { +public abstract class SweepConfig { /** * If true, a background thread will periodically delete cells that * have been overwritten or deleted. This differs from scrubbing @@ -57,10 +59,7 @@ public Integer readLimit() { /** * The target number of candidate (cell, timestamp) pairs to load per batch while sweeping. */ - @Value.Default - public Integer candidateBatchHint() { - return AtlasDbConstants.DEFAULT_SWEEP_CANDIDATE_BATCH_HINT; - } + public abstract Optional candidateBatchHint(); /** * The target number of (cell, timestamp) pairs to delete at once while sweeping. @@ -75,7 +74,6 @@ public static SweepConfig defaultSweepConfig() { .enabled(AtlasDbConstants.DEFAULT_ENABLE_SWEEP) .pauseMillis(AtlasDbConstants.DEFAULT_SWEEP_PAUSE_MILLIS) .readLimit(AtlasDbConstants.DEFAULT_SWEEP_READ_LIMIT) - .candidateBatchHint(AtlasDbConstants.DEFAULT_SWEEP_CANDIDATE_BATCH_HINT) .deleteBatchHint(AtlasDbConstants.DEFAULT_SWEEP_DELETE_BATCH_HINT) .build(); } diff --git a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java index 1c46ebebdbd..394ab91dd6b 100644 --- a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java +++ b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java @@ -37,6 +37,8 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.Uninterruptibles; +import com.palantir.async.initializer.AsyncInitializer; +import com.palantir.atlasdb.AtlasDbConstants; import com.palantir.atlasdb.cleaner.Cleaner; import com.palantir.atlasdb.cleaner.CleanupFollower; import com.palantir.atlasdb.cleaner.DefaultCleanerBuilder; @@ -68,6 +70,7 @@ import com.palantir.atlasdb.persistentlock.PersistentLockService; import com.palantir.atlasdb.schema.generated.SweepTableFactory; import com.palantir.atlasdb.spi.AtlasDbFactory; +import com.palantir.atlasdb.spi.KeyValueServiceConfig; import com.palantir.atlasdb.sweep.BackgroundSweeperImpl; import com.palantir.atlasdb.sweep.BackgroundSweeperPerformanceLogger; import com.palantir.atlasdb.sweep.CellsSweeper; @@ -361,7 +364,7 @@ SerializableTransactionManager serializable() { conflictManager, sweepStrategyManager, cleaner, - initializer, + () -> areTransactionManagerInitializationPrerequisitesSatisfied(initializer, lockAndTimestampServices), allowHiddenTableAccess(), () -> runtimeConfigSupplier.get().transaction().getLockAcquireTimeoutMillis(), config.keyValueService().concurrentGetRangesThreadPoolSize(), @@ -372,7 +375,9 @@ SerializableTransactionManager serializable() { PersistentLockManager persistentLockManager = new PersistentLockManager( persistentLockService, config.getSweepPersistentLockWaitMillis()); - initializeSweepEndpointAndBackgroundProcess(runtimeConfigSupplier, + initializeSweepEndpointAndBackgroundProcess( + config, + runtimeConfigSupplier, registrar(), kvs, transactionService, @@ -384,6 +389,17 @@ SerializableTransactionManager serializable() { return transactionManager; } + private static boolean areTransactionManagerInitializationPrerequisitesSatisfied( + AsyncInitializer initializer, + LockAndTimestampServices lockAndTimestampServices) { + return initializer.isInitialized() && timeLockMigrationCompleteIfNeeded(lockAndTimestampServices); + } + + @VisibleForTesting + static boolean timeLockMigrationCompleteIfNeeded(LockAndTimestampServices lockAndTimestampServices) { + return lockAndTimestampServices.migrator().map(AsyncInitializer::isInitialized).orElse(true); + } + private static void checkInstallConfig(AtlasDbConfig config) { if (config.getSweepBatchSize() != null || config.getSweepCellBatchSize() != null @@ -396,6 +412,7 @@ private static void checkInstallConfig(AtlasDbConfig config) { } private static void initializeSweepEndpointAndBackgroundProcess( + AtlasDbConfig config, Supplier runtimeConfigSupplier, Consumer env, KeyValueService kvs, @@ -418,7 +435,8 @@ private static void initializeSweepEndpointAndBackgroundProcess( cellsSweeper); BackgroundSweeperPerformanceLogger sweepPerfLogger = new NoOpBackgroundSweeperPerformanceLogger(); com.google.common.base.Supplier sweepBatchConfig = () -> - getSweepBatchConfig(runtimeConfigSupplier.get().sweep()); + getSweepBatchConfig(runtimeConfigSupplier.get().sweep(), config.keyValueService()); + SweepMetrics sweepMetrics = new SweepMetrics(); SpecificTableSweeper specificTableSweeper = initializeSweepEndpoint( @@ -460,14 +478,23 @@ private static SpecificTableSweeper initializeSweepEndpoint( return specificTableSweeper; } - private static SweepBatchConfig getSweepBatchConfig(SweepConfig sweepConfig) { + private static SweepBatchConfig getSweepBatchConfig(SweepConfig sweepConfig, KeyValueServiceConfig kvsConfig) { return ImmutableSweepBatchConfig.builder() .maxCellTsPairsToExamine(sweepConfig.readLimit()) - .candidateBatchSize(sweepConfig.candidateBatchHint()) + .candidateBatchSize(sweepConfig.candidateBatchHint() + .orElse(getDefaultSweepCandidateBatchHint(kvsConfig))) .deleteBatchSize(sweepConfig.deleteBatchHint()) .build(); } + // TODO(nziebart): this is a hack until we fix cassandra's getCandidateCellsForSweep to behave like the other KVSs + private static int getDefaultSweepCandidateBatchHint(KeyValueServiceConfig kvsConfig) { + if (kvsConfig.type().equals("cassandra")) { + return AtlasDbConstants.DEFAULT_SWEEP_CANDIDATE_BATCH_HINT_CASSANDRA; + } + return AtlasDbConstants.DEFAULT_SWEEP_CANDIDATE_BATCH_HINT_NON_CASSANDRA; + } + private static PersistentLockService createAndRegisterPersistentLockService( KeyValueService kvs, Consumer env, @@ -576,8 +603,12 @@ private static LockAndTimestampServices createRawServicesFromTimeLock( String resolvedClient = OptionalResolver.resolve(clientConfig.client(), config.namespace()); TimeLockClientConfig timeLockClientConfig = TimeLockClientConfigs.copyWithClient(config.timelock().get(), resolvedClient); - TimeLockMigrator.create(timeLockClientConfig, invalidator, userAgent).migrate(); - return createNamespacedRawRemoteServices(timeLockClientConfig, userAgent); + TimeLockMigrator migrator = + TimeLockMigrator.create(timeLockClientConfig, invalidator, userAgent, config.initializeAsync()); + migrator.migrate(); // This can proceed async if config.initializeAsync() was set + return ImmutableLockAndTimestampServices.copyOf( + createNamespacedRawRemoteServices(timeLockClientConfig, userAgent)) + .withMigrator(migrator); } private static LockAndTimestampServices createNamespacedRawRemoteServices( @@ -723,5 +754,6 @@ public interface LockAndTimestampServices { LockService lock(); TimestampService timestamp(); TimelockService timelock(); + Optional migrator(); } } diff --git a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/startup/TimeLockMigrator.java b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/startup/TimeLockMigrator.java index 97b937b036a..96fd1dc9203 100644 --- a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/startup/TimeLockMigrator.java +++ b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/startup/TimeLockMigrator.java @@ -15,6 +15,8 @@ */ package com.palantir.atlasdb.factory.startup; +import com.palantir.async.initializer.AsyncInitializer; +import com.palantir.atlasdb.AtlasDbConstants; import com.palantir.atlasdb.config.ServerListConfig; import com.palantir.atlasdb.config.TimeLockClientConfig; import com.palantir.atlasdb.factory.ServiceCreator; @@ -22,31 +24,45 @@ import com.palantir.timestamp.TimestampManagementService; import com.palantir.timestamp.TimestampStoreInvalidator; -public class TimeLockMigrator { +@SuppressWarnings("FinalClass") +public class TimeLockMigrator extends AsyncInitializer { private final TimestampStoreInvalidator source; private final TimestampManagementService destination; + private final boolean initializeAsync; - public TimeLockMigrator(TimestampStoreInvalidator source, TimestampManagementService destination) { + private TimeLockMigrator( + TimestampStoreInvalidator source, + TimestampManagementService destination, + boolean initializeAsync) { this.source = source; this.destination = destination; + this.initializeAsync = initializeAsync; } public static TimeLockMigrator create( TimeLockClientConfig config, TimestampStoreInvalidator invalidator, String userAgent) { + return create(config, invalidator, userAgent, AtlasDbConstants.DEFAULT_INITIALIZE_ASYNC); + } + + public static TimeLockMigrator create( + TimeLockClientConfig config, + TimestampStoreInvalidator invalidator, + String userAgent, + boolean initializeAsync) { TimestampManagementService remoteTimestampManagementService = createRemoteManagementService(config, userAgent); - return new TimeLockMigrator(invalidator, remoteTimestampManagementService); + return new TimeLockMigrator(invalidator, remoteTimestampManagementService, initializeAsync); } /** * Migration works as follows: * 1. Ping the destination Timelock Server. If this fails, throw. * 2. Backup and invalidate the Timestamp Bound, returning TS. - * At this point, the database should contain an invalidated timestamp bound, plus a backup bound of TS. + * At this point, the database should contain an invalidated timestamp bound, plus a backup bound of TS. * 3. Fast-forward the destination to TS. - * + *

* The purpose of step 1 is largely to handle cases where users accidentally mis-configure their AtlasDB clients to * attempt to talk to Timelock; we want to ensure there's a legitimate Timelock Server present before doing the * invalidation. In the event of a failure between steps 2 and 3, rerunning this method is safe, because @@ -54,8 +70,21 @@ public static TimeLockMigrator create( * is unreadable. */ @Idempotent - @SuppressWarnings("CheckReturnValue") // errorprone doesn't pick up "when=NEVER" public void migrate() { + initialize(initializeAsync); + } + + private static TimestampManagementService createRemoteManagementService( + TimeLockClientConfig timelockConfig, + String userAgent) { + ServerListConfig serverListConfig = timelockConfig.toNamespacedServerList(); + return new ServiceCreator<>(TimestampManagementService.class, userAgent) + .apply(serverListConfig); + } + + @Override + @SuppressWarnings({"CheckReturnValue", "ResultOfMethodCallIgnored"}) // errorprone doesn't pick up "when=NEVER" + protected synchronized void tryInitialize() { try { destination.ping(); } catch (Exception e) { @@ -65,11 +94,9 @@ public void migrate() { destination.fastForwardTimestamp(currentTimestamp); } - private static TimestampManagementService createRemoteManagementService( - TimeLockClientConfig timelockConfig, - String userAgent) { - ServerListConfig serverListConfig = timelockConfig.toNamespacedServerList(); - return new ServiceCreator<>(TimestampManagementService.class, userAgent) - .apply(serverListConfig); + @Override + protected String getInitializingClassName() { + return TimeLockMigrator.class.getSimpleName(); } } + diff --git a/atlasdb-config/src/test/java/com/palantir/atlasdb/factory/TransactionManagersTest.java b/atlasdb-config/src/test/java/com/palantir/atlasdb/factory/TransactionManagersTest.java index ba429a0b972..8d7f2aabcd5 100644 --- a/atlasdb-config/src/test/java/com/palantir/atlasdb/factory/TransactionManagersTest.java +++ b/atlasdb-config/src/test/java/com/palantir/atlasdb/factory/TransactionManagersTest.java @@ -18,7 +18,9 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -67,6 +69,7 @@ import com.palantir.atlasdb.config.ImmutableTimestampClientConfig; import com.palantir.atlasdb.config.ServerListConfig; import com.palantir.atlasdb.config.TimeLockClientConfig; +import com.palantir.atlasdb.factory.startup.TimeLockMigrator; import com.palantir.atlasdb.memory.InMemoryAtlasDbConfig; import com.palantir.atlasdb.transaction.impl.SerializableTransactionManager; import com.palantir.atlasdb.util.MetricsRule; @@ -122,6 +125,10 @@ public class TransactionManagersTest { = "/" + CLIENT + "/timestamp-management/fast-forward?currentTimestamp=" + EMBEDDED_BOUND; private static final MappingBuilder TIMELOCK_FF_MAPPING = post(urlEqualTo(TIMELOCK_FF_PATH)); + private final TimeLockMigrator migrator = mock(TimeLockMigrator.class); + private final TransactionManagers.LockAndTimestampServices lockAndTimestampServices = mock( + TransactionManagers.LockAndTimestampServices.class); + private int availablePort; private TimeLockClientConfig mockClientConfig; private ServerListConfig rawRemoteServerConfig; @@ -223,11 +230,11 @@ public void remoteCallsStillMadeIfPingableLeader404s() throws IOException, Inter setUpForRemoteServices(); setupLeaderBlockInConfig(); - TransactionManagers.LockAndTimestampServices lockAndTimestampServices = getLockAndTimestampServices(); + TransactionManagers.LockAndTimestampServices lockAndTimestamp = getLockAndTimestampServices(); availableServer.verify(getRequestedFor(urlMatching(LEADER_UUID_PATH))); - lockAndTimestampServices.timelock().getFreshTimestamp(); - lockAndTimestampServices.lock().currentTimeMillis(); + lockAndTimestamp.timelock().getFreshTimestamp(); + lockAndTimestamp.lock().currentTimeMillis(); availableServer.verify(postRequestedFor(urlMatching(TIMESTAMP_PATH)) .withHeader(USER_AGENT_HEADER, WireMock.equalTo(USER_AGENT))); @@ -240,11 +247,11 @@ public void remoteCallsElidedIfTalkingToLocalServer() throws IOException, Interr setUpForLocalServices(); setupLeaderBlockInConfig(); - TransactionManagers.LockAndTimestampServices lockAndTimestampServices = getLockAndTimestampServices(); + TransactionManagers.LockAndTimestampServices lockAndTimestamp = getLockAndTimestampServices(); availableServer.verify(getRequestedFor(urlMatching(LEADER_UUID_PATH))); - lockAndTimestampServices.timelock().getFreshTimestamp(); - lockAndTimestampServices.lock().currentTimeMillis(); + lockAndTimestamp.timelock().getFreshTimestamp(); + lockAndTimestamp.lock().currentTimeMillis(); availableServer.verify(0, postRequestedFor(urlMatching(TIMESTAMP_PATH)) .withHeader(USER_AGENT_HEADER, WireMock.equalTo(USER_AGENT))); @@ -362,13 +369,36 @@ public void metricsAreReportedExactlyOnceWhenUsingTimelockServiceWithRequestBatc TIMELOCK_SERVICE_CURRENT_TIME_METRIC); } + @Test + public void timeLockMigrationReportsReadyIfMigrationDone() { + when(migrator.isInitialized()).thenReturn(true); + when(lockAndTimestampServices.migrator()).thenReturn(Optional.of(migrator)); + + assertTrue(TransactionManagers.timeLockMigrationCompleteIfNeeded(lockAndTimestampServices)); + } + + @Test + public void timeLockMigrationReportsNotReadyIfMigrationNotDone() { + when(migrator.isInitialized()).thenReturn(false); + when(lockAndTimestampServices.migrator()).thenReturn(Optional.of(migrator)); + + assertFalse(TransactionManagers.timeLockMigrationCompleteIfNeeded(lockAndTimestampServices)); + } + + @Test + public void timeLockMigrationReportsReadyIfMigrationNotNeeded() { + when(lockAndTimestampServices.migrator()).thenReturn(Optional.empty()); + + assertTrue(TransactionManagers.timeLockMigrationCompleteIfNeeded(lockAndTimestampServices)); + } + private void assertThatTimeAndLockMetricsAreRecorded(String timestampMetric, String lockMetric) { assertThat(metricsRule.metrics().timer(timestampMetric).getCount(), is(equalTo(0L))); assertThat(metricsRule.metrics().timer(lockMetric).getCount(), is(equalTo(0L))); - TransactionManagers.LockAndTimestampServices lockAndTimestampServices = getLockAndTimestampServices(); - lockAndTimestampServices.timelock().getFreshTimestamp(); - lockAndTimestampServices.timelock().currentTimeMillis(); + TransactionManagers.LockAndTimestampServices lockAndTimestamp = getLockAndTimestampServices(); + lockAndTimestamp.timelock().getFreshTimestamp(); + lockAndTimestamp.timelock().currentTimeMillis(); assertThat(metricsRule.metrics().timer(timestampMetric).getCount(), is(equalTo(1L))); assertThat(metricsRule.metrics().timer(lockMetric).getCount(), is(equalTo(1L))); @@ -426,7 +456,7 @@ private void verifyUserAgentOnTimelockTimestampAndLockRequests() { } private void verifyUserAgentOnTimestampAndLockRequests(String timestampPath, String lockPath) { - TransactionManagers.LockAndTimestampServices lockAndTimestampServices = + TransactionManagers.LockAndTimestampServices lockAndTimestamp = TransactionManagers.createLockAndTimestampServices( config, () -> ImmutableTimestampClientConfig.of(false), @@ -435,8 +465,8 @@ private void verifyUserAgentOnTimestampAndLockRequests(String timestampPath, Str InMemoryTimestampService::new, invalidator, USER_AGENT); - lockAndTimestampServices.timelock().getFreshTimestamp(); - lockAndTimestampServices.timelock().currentTimeMillis(); + lockAndTimestamp.timelock().getFreshTimestamp(); + lockAndTimestamp.timelock().currentTimeMillis(); availableServer.verify(postRequestedFor(urlMatching(timestampPath)) .withHeader(USER_AGENT_HEADER, WireMock.equalTo(USER_AGENT))); diff --git a/atlasdb-config/src/test/java/com/palantir/atlasdb/factory/startup/TimeLockMigratorTest.java b/atlasdb-config/src/test/java/com/palantir/atlasdb/factory/startup/TimeLockMigratorTest.java index e7043618bb4..4cce918b8ae 100644 --- a/atlasdb-config/src/test/java/com/palantir/atlasdb/factory/startup/TimeLockMigratorTest.java +++ b/atlasdb-config/src/test/java/com/palantir/atlasdb/factory/startup/TimeLockMigratorTest.java @@ -29,13 +29,20 @@ import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor; import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import com.github.tomakehurst.wiremock.client.MappingBuilder; import com.github.tomakehurst.wiremock.core.WireMockConfiguration; import com.github.tomakehurst.wiremock.junit.WireMockRule; +import com.github.tomakehurst.wiremock.stubbing.Scenario; +import com.jayway.awaitility.Awaitility; import com.palantir.atlasdb.config.ImmutableServerListConfig; import com.palantir.atlasdb.config.ImmutableTimeLockClientConfig; import com.palantir.atlasdb.config.ServerListConfig; @@ -50,6 +57,7 @@ public class TimeLockMigratorTest { private static final String PING_ENDPOINT = "/testClient/timestamp-management/ping"; private static final MappingBuilder TEST_MAPPING = post(urlEqualTo(TEST_ENDPOINT)); private static final MappingBuilder PING_MAPPING = get(urlEqualTo(PING_ENDPOINT)); + private static final String SCENARIO = "scenario"; private static final String USER_AGENT = "user-agent (123456789)"; @@ -67,7 +75,10 @@ public void setUp() { wireMockRule.stubFor(PING_MAPPING.willReturn(aResponse() .withStatus(200) .withBody(TimestampManagementService.PING_RESPONSE) - .withHeader("Content-Type", "text/plain"))); + .withHeader("Content-Type", "text/plain")) + .inScenario(SCENARIO) + .whenScenarioStateIs(Scenario.STARTED) + .willSetStateTo(Scenario.STARTED)); String serverUri = String.format("http://%s:%s", WireMockConfiguration.DEFAULT_BIND_ADDRESS, @@ -108,4 +119,60 @@ public void migrationDoesNotProceedIfInvalidationFails() { assertThatThrownBy(migrator::migrate).isInstanceOf(IllegalStateException.class); wireMockRule.verify(0, postRequestedFor(urlEqualTo(TEST_ENDPOINT))); } + + @Test + public void asyncMigrationProceedsIfTimeLockInitiallyUnavailable() throws InterruptedException { + String nowSucceeding = "nowSucceeding"; + + wireMockRule.stubFor(PING_MAPPING.inScenario(SCENARIO) + .whenScenarioStateIs(Scenario.STARTED) + .willReturn(aResponse().withStatus(500)) + .willSetStateTo(nowSucceeding)); + + wireMockRule.stubFor(PING_MAPPING.inScenario(SCENARIO) + .whenScenarioStateIs(nowSucceeding) + .willReturn(aResponse().withStatus(204))); + + wireMockRule.stubFor(TEST_MAPPING.willReturn(aResponse().withStatus(204))); + + TimeLockMigrator migrator = TimeLockMigrator.create(timelockConfig, invalidator, USER_AGENT, true); + migrator.migrate(); + + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .until(migrator::isInitialized); + + wireMockRule.verify(getRequestedFor(urlEqualTo(PING_ENDPOINT))); + verify(invalidator, times(1)).backupAndInvalidate(); + wireMockRule.verify(postRequestedFor(urlEqualTo(TEST_ENDPOINT))); + } + + @Test + public void asyncMigrationProceedsIfInvalidatorInitiallyUnavailable() throws InterruptedException { + when(invalidator.backupAndInvalidate()) + .thenAnswer(new Answer() { + private AtomicBoolean shouldFail = new AtomicBoolean(true); + @Override + public Long answer(InvocationOnMock invocation) throws Throwable { + if (shouldFail.getAndSet(false)) { + throw new IllegalStateException("not ready yet"); + } + return BACKUP_TIMESTAMP; + } + }); + + wireMockRule.stubFor(TEST_MAPPING.willReturn(aResponse().withStatus(204))); + TimeLockMigrator migrator = TimeLockMigrator.create(timelockConfig, invalidator, USER_AGENT, true); + migrator.migrate(); + + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .until(migrator::isInitialized); + + wireMockRule.verify(getRequestedFor(urlEqualTo(PING_ENDPOINT))); + verify(invalidator, times(2)).backupAndInvalidate(); + wireMockRule.verify(postRequestedFor(urlEqualTo(TEST_ENDPOINT))); + } } diff --git a/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/TransactionManagerModule.java b/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/TransactionManagerModule.java index 1232ca80545..98a37f554ce 100644 --- a/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/TransactionManagerModule.java +++ b/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/TransactionManagerModule.java @@ -20,6 +20,7 @@ import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; +import com.palantir.atlasdb.AtlasDbConstants; import com.palantir.atlasdb.cleaner.Cleaner; import com.palantir.atlasdb.cleaner.CleanupFollower; import com.palantir.atlasdb.cleaner.DefaultCleanerBuilder; @@ -96,6 +97,7 @@ public SerializableTransactionManager provideTransactionManager(ServicesConfig c sweepStrategyManager, cleaner, config.allowAccessToHiddenTables(), + () -> AtlasDbConstants.DEFAULT_TRANSACTION_LOCK_ACQUIRE_TIMEOUT_MS, config.atlasDbConfig().keyValueService().concurrentGetRangesThreadPoolSize(), config.atlasDbConfig().keyValueService().defaultGetRangesConcurrency(), config.atlasDbConfig().getTimestampCacheSize()); diff --git a/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/test/TestTransactionManagerModule.java b/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/test/TestTransactionManagerModule.java index b73da347ec1..6c453294ab8 100644 --- a/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/test/TestTransactionManagerModule.java +++ b/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/test/TestTransactionManagerModule.java @@ -20,6 +20,7 @@ import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; +import com.palantir.atlasdb.AtlasDbConstants; import com.palantir.atlasdb.cleaner.Cleaner; import com.palantir.atlasdb.cleaner.CleanupFollower; import com.palantir.atlasdb.cleaner.DefaultCleanerBuilder; @@ -102,6 +103,7 @@ public SerializableTransactionManager provideTransactionManager(ServicesConfig c sweepStrategyManager, cleaner, config.allowAccessToHiddenTables(), + () -> AtlasDbConstants.DEFAULT_TRANSACTION_LOCK_ACQUIRE_TIMEOUT_MS, config.atlasDbConfig().keyValueService().concurrentGetRangesThreadPoolSize(), config.atlasDbConfig().keyValueService().defaultGetRangesConcurrency(), config.atlasDbConfig().getTimestampCacheSize()); diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/memory/InMemoryAtlasDbFactory.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/memory/InMemoryAtlasDbFactory.java index 0a63a7015ef..58938d57af9 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/memory/InMemoryAtlasDbFactory.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/memory/InMemoryAtlasDbFactory.java @@ -170,7 +170,7 @@ private static SerializableTransactionManager createInMemoryTransactionManagerIn client, ImmutableList.of(follower), transactionService).buildCleaner(); - SerializableTransactionManager ret = new SerializableTransactionManager( + SerializableTransactionManager ret = SerializableTransactionManager.createForTest( keyValueService, ts, client, diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManager.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManager.java index 3e0eaf5a4d3..e78ff36f31f 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManager.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManager.java @@ -18,7 +18,6 @@ import java.util.Optional; import com.google.common.base.Supplier; -import com.palantir.async.initializer.AsyncInitializer; import com.palantir.atlasdb.AtlasDbConstants; import com.palantir.atlasdb.cleaner.Cleaner; import com.palantir.atlasdb.keyvalue.api.KeyValueService; @@ -39,12 +38,12 @@ public class SerializableTransactionManager extends SnapshotTransactionManager { public static class InitializeCheckingWrapper extends AutoDelegate_SerializableTransactionManager { private final SerializableTransactionManager manager; - private final AsyncInitializer prerequisite; + private final Supplier initializationPrerequisite; public InitializeCheckingWrapper(SerializableTransactionManager manager, - AsyncInitializer prerequisite) { + Supplier initializationPrerequisite) { this.manager = manager; - this.prerequisite = prerequisite; + this.initializationPrerequisite = initializationPrerequisite; } @Override @@ -66,7 +65,7 @@ public boolean isInitialized() { && manager.getTimelockService().isInitialized() && manager.getTimestampService().isInitialized() && manager.getCleaner().isInitialized() - && prerequisite.isInitialized(); + && initializationPrerequisite.get(); } @Override @@ -85,7 +84,7 @@ public void registerClosingCallback(Runnable closingCallback) { * use the delegate instead. */ protected SerializableTransactionManager() { - this(null, null, null, null, null, null, null, null, null, 1, 1, 1); + this(null, null, null, null, null, null, null, null, false, null, 1, 1, 1); } public static SerializableTransactionManager create(KeyValueService keyValueService, @@ -96,7 +95,7 @@ public static SerializableTransactionManager create(KeyValueService keyValueServ ConflictDetectionManager conflictDetectionManager, SweepStrategyManager sweepStrategyManager, Cleaner cleaner, - AsyncInitializer initializer, + Supplier initializationPrerequisite, boolean allowHiddenTableAccess, Supplier lockAcquireTimeoutMs, int concurrentGetRangesThreadPoolSize, @@ -118,11 +117,12 @@ public static SerializableTransactionManager create(KeyValueService keyValueServ defaultGetRangesConcurrency, timestampCacheSize); - return initializeAsync ? new InitializeCheckingWrapper(serializableTransactionManager, initializer) + return initializeAsync + ? new InitializeCheckingWrapper(serializableTransactionManager, initializationPrerequisite) : serializableTransactionManager; } - public SerializableTransactionManager(KeyValueService keyValueService, + public static SerializableTransactionManager createForTest(KeyValueService keyValueService, TimestampService timestampService, LockClient lockClient, LockService lockService, @@ -134,9 +134,8 @@ public SerializableTransactionManager(KeyValueService keyValueService, int concurrentGetRangesThreadPoolSize, int defaultGetRangesConcurrency, long timestampCacheSize) { - this(keyValueService, - timestampService, - lockClient, + return new SerializableTransactionManager(keyValueService, + new LegacyTimelockService(timestampService, lockService, lockClient), lockService, transactionService, constraintModeSupplier, @@ -144,11 +143,17 @@ public SerializableTransactionManager(KeyValueService keyValueService, sweepStrategyManager, cleaner, false, + () -> AtlasDbConstants.DEFAULT_TRANSACTION_LOCK_ACQUIRE_TIMEOUT_MS, concurrentGetRangesThreadPoolSize, defaultGetRangesConcurrency, timestampCacheSize); } + /** + * @deprecated Use {@link SerializableTransactionManager#create} to create this class. + */ + @Deprecated + // Used by internal product. public SerializableTransactionManager(KeyValueService keyValueService, TimestampService timestampService, LockClient lockClient, @@ -172,39 +177,13 @@ public SerializableTransactionManager(KeyValueService keyValueService, sweepStrategyManager, cleaner, allowHiddenTableAccess, - concurrentGetRangesThreadPoolSize, - defaultGetRangesConcurrency, - timestampCacheSize); - } - - public SerializableTransactionManager(KeyValueService keyValueService, - TimelockService timelockService, - LockService lockService, - TransactionService transactionService, - Supplier constraintModeSupplier, - ConflictDetectionManager conflictDetectionManager, - SweepStrategyManager sweepStrategyManager, - Cleaner cleaner, - boolean allowHiddenTableAccess, - int concurrentGetRangesThreadPoolSize, - int defaultGetRangesConcurrency, - long timestampCacheSize) { - this( - keyValueService, - timelockService, - lockService, - transactionService, - constraintModeSupplier, - conflictDetectionManager, - sweepStrategyManager, - cleaner, - allowHiddenTableAccess, () -> AtlasDbConstants.DEFAULT_TRANSACTION_LOCK_ACQUIRE_TIMEOUT_MS, concurrentGetRangesThreadPoolSize, defaultGetRangesConcurrency, timestampCacheSize); } + // Canonical constructor. public SerializableTransactionManager(KeyValueService keyValueService, TimelockService timelockService, LockService lockService, diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManagerTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManagerTest.java index 95e89cd88f9..58baed8fe20 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManagerTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManagerTest.java @@ -50,7 +50,7 @@ public void setUp() { null, // conflictDetectionManager null, // sweepStrategyManager mockCleaner, - mockInitializer, + mockInitializer::isInitialized, false, // allowHiddenTableAccess () -> 1L, // lockAcquireTimeout TransactionTestConstants.GET_RANGES_THREAD_POOL_SIZE, @@ -110,7 +110,7 @@ public void synchronouslyInitializedManagerIsInitializedEvenIfKvsIsNot() { null, // conflictDetectionManager null, // sweepStrategyManager mockCleaner, - mockInitializer, + mockInitializer::isInitialized, false, // allowHiddenTableAccess () -> 1L, // lockAcquireTimeoutMs TransactionTestConstants.GET_RANGES_THREAD_POOL_SIZE, diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractGetCandidateCellsForSweepingTest.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractGetCandidateCellsForSweepingTest.java index ecbc0ffb7bf..dc97b99474f 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractGetCandidateCellsForSweepingTest.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractGetCandidateCellsForSweepingTest.java @@ -190,7 +190,7 @@ private void doTestLargerTable(boolean checkIfLatestValueIsEmpty) { candidates.stream().map(CandidateCellForSweeping::cell).collect(Collectors.toList())); } - private List getAllCandidates(CandidateCellForSweepingRequest request) { + protected List getAllCandidates(CandidateCellForSweepingRequest request) { try (ClosableIterator> iter = kvs.getCandidateCellsForSweeping(TEST_TABLE, request)) { return ImmutableList.copyOf( diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/SweepTestUtils.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/SweepTestUtils.java index fd07187e673..207314fbf47 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/SweepTestUtils.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/SweepTestUtils.java @@ -67,8 +67,8 @@ public static LockAwareTransactionManager setupTxManager(KeyValueService kvs, AtlasDbConstraintCheckingMode.NO_CONSTRAINT_CHECKING; ConflictDetectionManager cdm = ConflictDetectionManagers.createWithoutWarmingCache(kvs); Cleaner cleaner = new NoOpCleaner(); - LockAwareTransactionManager txManager = new SerializableTransactionManager( - kvs, tsService, lockClient, lockService, txService, constraints, cdm, ssm, cleaner, false, + LockAwareTransactionManager txManager = SerializableTransactionManager.createForTest( + kvs, tsService, lockClient, lockService, txService, constraints, cdm, ssm, cleaner, AbstractTransactionTest.GET_RANGES_THREAD_POOL_SIZE, AbstractTransactionTest.DEFAULT_GET_RANGES_CONCURRENCY, AtlasDbConstants.DEFAULT_TIMESTAMP_CACHE_SIZE); diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/AbstractSerializableTransactionTest.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/AbstractSerializableTransactionTest.java index 65f1986e616..5faf6e2a6a2 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/AbstractSerializableTransactionTest.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/AbstractSerializableTransactionTest.java @@ -66,7 +66,7 @@ public abstract class AbstractSerializableTransactionTest extends AbstractTransa @Override protected TransactionManager getManager() { - return new SerializableTransactionManager( + return SerializableTransactionManager.createForTest( keyValueService, timestampService, lockClient, diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TestTransactionManagerImpl.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TestTransactionManagerImpl.java index 8fd69e160b2..1a2e167314a 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TestTransactionManagerImpl.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TestTransactionManagerImpl.java @@ -31,50 +31,55 @@ import com.palantir.atlasdb.transaction.service.TransactionService; import com.palantir.lock.LockClient; import com.palantir.lock.LockService; +import com.palantir.lock.impl.LegacyTimelockService; import com.palantir.timestamp.TimestampService; public class TestTransactionManagerImpl extends SerializableTransactionManager implements TestTransactionManager { private final Map conflictHandlerOverrides = new HashMap<>(); + @SuppressWarnings("Indentation") // Checkstyle complains about lambda in constructor. public TestTransactionManagerImpl(KeyValueService keyValueService, - TimestampService timestampService, - LockClient lockClient, - LockService lockService, - TransactionService transactionService, - ConflictDetectionManager conflictDetectionManager, - SweepStrategyManager sweepStrategyManager) { + TimestampService timestampService, + LockClient lockClient, + LockService lockService, + TransactionService transactionService, + ConflictDetectionManager conflictDetectionManager, + SweepStrategyManager sweepStrategyManager) { super( createAssertKeyValue(keyValueService, lockService), - timestampService, - lockClient, + new LegacyTimelockService(timestampService, lockService, lockClient), lockService, transactionService, Suppliers.ofInstance(AtlasDbConstraintCheckingMode.FULL_CONSTRAINT_CHECKING_THROWS_EXCEPTIONS), conflictDetectionManager, sweepStrategyManager, NoOpCleaner.INSTANCE, + false, + () -> AtlasDbConstants.DEFAULT_TRANSACTION_LOCK_ACQUIRE_TIMEOUT_MS, AbstractTransactionTest.GET_RANGES_THREAD_POOL_SIZE, AbstractTransactionTest.DEFAULT_GET_RANGES_CONCURRENCY, AtlasDbConstants.DEFAULT_TIMESTAMP_CACHE_SIZE); } + @SuppressWarnings("Indentation") // Checkstyle complains about lambda in constructor. public TestTransactionManagerImpl(KeyValueService keyValueService, - TimestampService timestampService, - LockClient lockClient, - LockService lockService, - TransactionService transactionService, - AtlasDbConstraintCheckingMode constraintCheckingMode) { + TimestampService timestampService, + LockClient lockClient, + LockService lockService, + TransactionService transactionService, + AtlasDbConstraintCheckingMode constraintCheckingMode) { super( createAssertKeyValue(keyValueService, lockService), - timestampService, - lockClient, + new LegacyTimelockService(timestampService, lockService, lockClient), lockService, transactionService, Suppliers.ofInstance(constraintCheckingMode), ConflictDetectionManagers.createWithoutWarmingCache(keyValueService), SweepStrategyManagers.createDefault(keyValueService), NoOpCleaner.INSTANCE, + false, + () -> AtlasDbConstants.DEFAULT_TRANSACTION_LOCK_ACQUIRE_TIMEOUT_MS, AbstractTransactionTest.GET_RANGES_THREAD_POOL_SIZE, AbstractTransactionTest.DEFAULT_GET_RANGES_CONCURRENCY, AtlasDbConstants.DEFAULT_TIMESTAMP_CACHE_SIZE); diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TransactionManagerTest.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TransactionManagerTest.java index 555ebc81669..8a87f13f77e 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TransactionManagerTest.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TransactionManagerTest.java @@ -84,7 +84,7 @@ public void shouldNotRunTaskReadOnlyWithClosedTransactionManager() throws Except public void shouldNotMakeRemoteCallsInAReadonlyTransactionIfNoWorkIsDone() { TimestampService mockTimestampService = mock(TimestampService.class); LockService mockLockService = mock(LockService.class); - TransactionManager txnManagerWithMocks = new SerializableTransactionManager(getKeyValueService(), + TransactionManager txnManagerWithMocks = SerializableTransactionManager.createForTest(getKeyValueService(), mockTimestampService, LockClient.of("foo"), mockLockService, transactionService, () -> AtlasDbConstraintCheckingMode.FULL_CONSTRAINT_CHECKING_THROWS_EXCEPTIONS, conflictDetectionManager, sweepStrategyManager, NoOpCleaner.INSTANCE, diff --git a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/keyvalue/impl/TableTasksTest.java b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/keyvalue/impl/TableTasksTest.java index 12b50c53e26..58291da8386 100644 --- a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/keyvalue/impl/TableTasksTest.java +++ b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/keyvalue/impl/TableTasksTest.java @@ -73,8 +73,8 @@ public void setup() { ConflictDetectionManager cdm = ConflictDetectionManagers.createWithoutWarmingCache(kvs); SweepStrategyManager ssm = SweepStrategyManagers.createDefault(kvs); Cleaner cleaner = new NoOpCleaner(); - SerializableTransactionManager transactionManager = new SerializableTransactionManager( - kvs, tsService, lockClient, lockService, txService, constraints, cdm, ssm, cleaner, false, + SerializableTransactionManager transactionManager = SerializableTransactionManager.createForTest( + kvs, tsService, lockClient, lockService, txService, constraints, cdm, ssm, cleaner, AbstractTransactionTest.GET_RANGES_THREAD_POOL_SIZE, AbstractTransactionTest.DEFAULT_GET_RANGES_CONCURRENCY, AtlasDbConstants.DEFAULT_TIMESTAMP_CACHE_SIZE); diff --git a/docs/source/cluster_management/sweep.rst b/docs/source/cluster_management/sweep.rst index 644dc9d3504..1412b702832 100644 --- a/docs/source/cluster_management/sweep.rst +++ b/docs/source/cluster_management/sweep.rst @@ -60,7 +60,7 @@ Note that some of these parameters are just used as a hint. Sweep dynamically mo ``enabled``, "Only specified in config", "true", "Whether the background sweeper should run." ``readLimit``, ``maxCellTsPairsToExamine``, "1,000", "Target number of (cell, timestamp) pairs to examine in a single run." - ``candidateBatchHint``, ``candidateBatchSize``, "1", "Target number of candidate (cell, timestamp) pairs to load at once. Decrease this if sweep fails to complete (for example if the sweep job or the underlying KVS runs out of memory). Increasing it may improve sweep performance." + ``candidateBatchHint``, ``candidateBatchSize``, "1 (Cassandra); 1024 (all other KVSs)", "Target number of candidate (cell, timestamp) pairs to load at once. Decrease this if sweep fails to complete (for example if the sweep job or the underlying KVS runs out of memory). Increasing it may improve sweep performance." ``deleteBatchHint``, ``deleteBatchSize``, "1,000", "Target number of (cell, timestamp) pairs to delete in a single batch. Decrease if sweep cannot progress pass a large row or a large cell. Increasing it may improve sweep performance." ``pauseMillis``, "Only specified in config", "5000 ms", "Wait time between row batches. Set this if you want to use less shared DB resources, for example if you run sweep during user-facing hours." diff --git a/docs/source/release_notes/release-notes.rst b/docs/source/release_notes/release-notes.rst index f77fb78d6ff..588b48b381e 100644 --- a/docs/source/release_notes/release-notes.rst +++ b/docs/source/release_notes/release-notes.rst @@ -56,6 +56,17 @@ develop The existing ``create`` methods are deprecated and will be removed by November 15th, 2017. (`Pull Request `__) + * - |fixed| + - Async Initialization now works with TimeLock Server. + Previously, for Cassandra we would attempt to immediately migrate the timestamp bound from Cassandra to TimeLock on startup, which would fail if either of them was unavailable. + For DBKVS or other key-value services, we would attempt to ping TimeLock on startup, which would fail if TimeLock was unavailable (though the KVS need not be available). + (`Pull Request `__) + + * - |fixed| + - ``AsyncInitializer`` now shuts down its executor after initialization has completed. + Previously, the executor service wasn't shut down, which could lead to the initializer thread hanging around unnecessarily. + (`Pull Request `__) + * - |fixed| - TimeLock Server's ``ClockSkewMonitor`` now attempts to contact all other nodes in the TimeLock cluster, even in the presence of remoting exceptions or clock skews. Previously, we would stop querying nodes once we encountered a remoting exception or detected clock skew. @@ -113,6 +124,11 @@ develop a TransactionManager, utilize the helpers in TransactionManagers to have this done for you. (`Pull Request `__) + * - |devbreak| + - Simplify and annotate the constructors for ``SerializableTransactionManager``. This should make the code of that class more maintainable. + If you used one of the deleted or deprecated constructors, use the static ``create`` method. + (`Pull Request `__) + .. <<<<------------------------------------------------------------------------------------------------------------->>>> ======= @@ -149,7 +165,7 @@ v0.61.0 - Change * - |improved| - - Sweep is now more efficient on Cassandra, Postgres and Oracle. + - Sweep is now more efficient on Postgres and Oracle. (`Pull Request `__) * - |improved|