From 95fb4dd20d513a23f8c00f28dd905a112c73a49a Mon Sep 17 00:00:00 2001 From: Greg Bonik Date: Fri, 13 Oct 2017 16:54:38 -0700 Subject: [PATCH 01/13] DbKvs sweep refactors to address last PR comments As dxiao pointed out, CandidatePagingState does too many things. The solution is to make CellTsPairLoader return an iterator and move all paging logic inside that iterator, and remove CellTsPairLoaderFactory. Then we replace CandidatePagingState with CandidateGroupingIterator, which now doesn't have any paging logic and only takes care of grouping (cell, ts) pairs by cell. Also remove CandidatePageJoiningIterator because it's not used anymore. We could have done this earlier but I forgot. Plan for the future: - Since we gave up on the in-database filtering idea, we can replace KVS.getCandidateCellsForSweeping() with a simpler call like getCellTsPairs() which would do what CellTsPairLoader does now. - Implement the new call for the remaining KVS's. We need at least the InMemoryKVS. We should decide the destiny of JdbcKVS and CqlKVS. - Remove all remaining usages of getRangeOfTimestamps(). I think that's basically deleteRange() on Cassandra. - Remove KVS.getRangeOfTimestamps()! [no release notes] --- .../atlasdb/keyvalue/dbkvs/impl/DbKvs.java | 12 +- ...ctory.java => OracleCellTsPageLoader.java} | 154 ++++++++---- .../CandidatePageJoiningIterator.java | 65 ----- ...ory.java => PostgresCellTsPageLoader.java} | 122 +++++---- .../impl/sweep/CandidateGroupingIterator.java | 120 +++++++++ .../impl/sweep/CandidatePagingState.java | 198 --------------- ...LoaderFactory.java => CellTsPairInfo.java} | 17 +- .../dbkvs/impl/sweep/CellTsPairLoader.java | 19 +- .../DbKvsGetCandidateCellsForSweeping.java | 45 +--- .../CandidatePageJoiningIteratorTest.java | 103 -------- .../sweep/CandidateGroupingIteratorTest.java | 124 ++++++++++ .../impl/sweep/CandidatePagingStateTest.java | 232 ------------------ 12 files changed, 456 insertions(+), 755 deletions(-) rename atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/{OracleCellTsPageLoaderFactory.java => OracleCellTsPageLoader.java} (63%) delete mode 100644 atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/CandidatePageJoiningIterator.java rename atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/{PostgresCellTsPageLoaderFactory.java => PostgresCellTsPageLoader.java} (60%) create mode 100644 atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidateGroupingIterator.java delete mode 100644 atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidatePagingState.java rename atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/{CellTsPairLoaderFactory.java => CellTsPairInfo.java} (63%) delete mode 100644 atlasdb-dbkvs/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/CandidatePageJoiningIteratorTest.java create mode 100644 atlasdb-dbkvs/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidateGroupingIteratorTest.java delete mode 100644 atlasdb-dbkvs/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidatePagingStateTest.java diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java index ef7302e2f2e..69e9b49a8c3 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java @@ -95,16 +95,16 @@ import com.palantir.atlasdb.keyvalue.dbkvs.impl.batch.BatchingTaskRunner; import com.palantir.atlasdb.keyvalue.dbkvs.impl.batch.ImmediateSingleBatchTaskRunner; import com.palantir.atlasdb.keyvalue.dbkvs.impl.batch.ParallelTaskRunner; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.oracle.OracleCellTsPageLoaderFactory; +import com.palantir.atlasdb.keyvalue.dbkvs.impl.oracle.OracleCellTsPageLoader; import com.palantir.atlasdb.keyvalue.dbkvs.impl.oracle.OracleGetRange; import com.palantir.atlasdb.keyvalue.dbkvs.impl.oracle.OracleOverflowValueLoader; import com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres.DbkvsVersionException; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres.PostgresCellTsPageLoaderFactory; +import com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres.PostgresCellTsPageLoader; import com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres.PostgresGetRange; import com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres.PostgresPrefixedTableNames; import com.palantir.atlasdb.keyvalue.dbkvs.impl.ranges.DbKvsGetRange; import com.palantir.atlasdb.keyvalue.dbkvs.impl.ranges.DbKvsGetRanges; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairLoaderFactory; +import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairLoader; import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.DbKvsGetCandidateCellsForSweeping; import com.palantir.atlasdb.keyvalue.dbkvs.util.DbKvsPartitioners; import com.palantir.atlasdb.keyvalue.impl.AbstractKeyValueService; @@ -182,7 +182,7 @@ private static DbKvs createPostgres(ExecutorService executor, PostgresPrefixedTableNames prefixedTableNames = new PostgresPrefixedTableNames(config); DbTableFactory tableFactory = new PostgresDbTableFactory(config, prefixedTableNames); TableMetadataCache tableMetadataCache = new TableMetadataCache(tableFactory); - CellTsPairLoaderFactory cellTsPairLoaderFactory = new PostgresCellTsPageLoaderFactory( + CellTsPairLoader cellTsPairLoader = new PostgresCellTsPageLoader( prefixedTableNames, connections); return new DbKvs( executor, @@ -192,7 +192,7 @@ private static DbKvs createPostgres(ExecutorService executor, new ParallelTaskRunner(newFixedThreadPool(config.poolSize()), config.fetchBatchSize()), (conns, tbl, ids) -> Collections.emptyMap(), // no overflow on postgres new PostgresGetRange(prefixedTableNames, connections, tableMetadataCache), - new DbKvsGetCandidateCellsForSweeping(cellTsPairLoaderFactory)); + new DbKvsGetCandidateCellsForSweeping(cellTsPairLoader)); } private static DbKvs createOracle(ExecutorService executor, @@ -204,7 +204,7 @@ private static DbKvs createOracle(ExecutorService executor, OverflowValueLoader overflowValueLoader = new OracleOverflowValueLoader(oracleDdlConfig, tableNameGetter); DbKvsGetRange getRange = new OracleGetRange( connections, overflowValueLoader, tableNameGetter, valueStyleCache, oracleDdlConfig); - CellTsPairLoaderFactory cellTsPairLoaderFactory = new OracleCellTsPageLoaderFactory( + CellTsPairLoader cellTsPairLoaderFactory = new OracleCellTsPageLoader( connections, tableNameGetter, valueStyleCache, oracleDdlConfig); return new DbKvs( executor, diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoaderFactory.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java similarity index 63% rename from atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoaderFactory.java rename to atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java index 40ccfd29b73..75de961591a 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoaderFactory.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java @@ -17,9 +17,17 @@ package com.palantir.atlasdb.keyvalue.dbkvs.impl.oracle; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; import java.util.List; +import javax.annotation.Nullable; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.palantir.atlasdb.encoding.PtBytes; import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweepingRequest; +import com.palantir.atlasdb.keyvalue.api.RangeRequests; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.keyvalue.dbkvs.OracleDdlConfig; import com.palantir.atlasdb.keyvalue.dbkvs.OracleTableNameGetter; @@ -29,10 +37,8 @@ import com.palantir.atlasdb.keyvalue.dbkvs.impl.TableValueStyle; import com.palantir.atlasdb.keyvalue.dbkvs.impl.TableValueStyleCache; import com.palantir.atlasdb.keyvalue.dbkvs.impl.ranges.RangePredicateHelper; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CandidatePagingState.CellTsPairInfo; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CandidatePagingState.StartingPosition; +import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairInfo; import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairLoader; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairLoaderFactory; import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.SweepQueryHelpers; import com.palantir.atlasdb.keyvalue.impl.TableMappingNotFoundException; import com.palantir.nexus.db.DBType; @@ -73,7 +79,7 @@ // predicate and '(col_name > ? OR ts >= ?)' as the filter predicate. Once we exhaust all results // in the row, we switch back to the "normal" mode, starting from the beginning of the lexicographically // next row. -public class OracleCellTsPageLoaderFactory implements CellTsPairLoaderFactory { +public class OracleCellTsPageLoader implements CellTsPairLoader { private final SqlConnectionSupplier connectionPool; private final OracleTableNameGetter tableNameGetter; private final TableValueStyleCache valueStyleCache; @@ -81,10 +87,10 @@ public class OracleCellTsPageLoaderFactory implements CellTsPairLoaderFactory { private static final int DEFAULT_BATCH_SIZE = 1000; - public OracleCellTsPageLoaderFactory(SqlConnectionSupplier connectionPool, - OracleTableNameGetter tableNameGetter, - TableValueStyleCache valueStyleCache, - OracleDdlConfig config) { + public OracleCellTsPageLoader(SqlConnectionSupplier connectionPool, + OracleTableNameGetter tableNameGetter, + TableValueStyleCache valueStyleCache, + OracleDdlConfig config) { this.connectionPool = connectionPool; this.tableNameGetter = tableNameGetter; this.valueStyleCache = valueStyleCache; @@ -92,43 +98,64 @@ public OracleCellTsPageLoaderFactory(SqlConnectionSupplier connectionPool, } @Override - public CellTsPairLoader createCellTsLoader(TableReference tableRef, CandidateCellForSweepingRequest request) { + public Iterator> createPageIterator(TableReference tableRef, + CandidateCellForSweepingRequest request) { TableDetails tableDetails = getTableDetailsUsingNewConnection(tableRef); - return new Loader(connectionPool, request, tableDetails, request.batchSizeHint().orElse(DEFAULT_BATCH_SIZE)); + return new PageIterator( + connectionPool, + request, + tableDetails, + Math.max(1, request.batchSizeHint().orElse(DEFAULT_BATCH_SIZE)), + request.startRowInclusive()); } - private static class Loader implements CellTsPairLoader { - private final CandidateCellForSweepingRequest request; - private final TableDetails tableDetails; - private final int sqlRowLimit; - private final SqlConnectionSupplier connectionPool; + private static class PageIterator implements Iterator> { + final SqlConnectionSupplier connectionPool; + final CandidateCellForSweepingRequest request; + final TableDetails tableDetails; + final int sqlRowLimit; + + byte[] startRowInclusive; + byte[] startColInclusive = PtBytes.EMPTY_BYTE_ARRAY; + @Nullable Long startTsInclusive = null; + long cellTsPairsAlreadyExaminedInCurrentRow = 0L; + boolean reachedEnd = false; - Loader(SqlConnectionSupplier connectionPool, - CandidateCellForSweepingRequest request, - TableDetails tableDetails, - int sqlRowLimit) { + PageIterator(SqlConnectionSupplier connectionPool, CandidateCellForSweepingRequest request, + TableDetails tableDetails, int sqlRowLimit, byte[] startRowInclusive) { this.connectionPool = connectionPool; this.request = request; this.tableDetails = tableDetails; this.sqlRowLimit = sqlRowLimit; + this.startRowInclusive = startRowInclusive; + } + + @Override + public boolean hasNext() { + return !reachedEnd; } + // We don't use AbstractIterator to make sure hasNext() is fast and doesn't actually load the next page. + // As a downside, our iterator can return an empty page at the end. + // However, we can just filter out empty pages later. @Override - public Page loadNextPage(StartingPosition startInclusive, - long cellsAlreadyExaminedInStartingRow) { - boolean singleRow = shouldScanSingleRow(cellsAlreadyExaminedInStartingRow); - List cellTsPairs = loadPage(startInclusive, singleRow); - return new Page(cellTsPairs, singleRow, cellTsPairs.size() < sqlRowLimit); + public List next() { + Preconditions.checkArgument(hasNext()); + boolean singleRow = shouldScanSingleRow(); + List cellTsPairs = loadPage(singleRow); + updateCountOfExaminedCellTsPairsInCurrentRow(cellTsPairs); + computeNextStartPosition(cellTsPairs, singleRow); + return cellTsPairs; } - private boolean shouldScanSingleRow(long cellsAlreadyExaminedInCurrentRow) { + private boolean shouldScanSingleRow() { // The idea is that we don't want to throw away more than N database rows in order to get N results. // This only matters for tables with wide rows. We could tweak this. - return cellsAlreadyExaminedInCurrentRow > sqlRowLimit; + return cellTsPairsAlreadyExaminedInCurrentRow > sqlRowLimit; } - private List loadPage(StartingPosition startInclusive, boolean singleRow) { - FullQuery query = getQuery(startInclusive, singleRow); + private List loadPage(boolean singleRow) { + FullQuery query = getQuery(singleRow); try (ConnectionSupplier conns = new ConnectionSupplier(connectionPool); AgnosticLightResultSet resultSet = executeQuery(conns.get(), query)) { List ret = new ArrayList<>(); @@ -143,14 +170,7 @@ private List loadPage(StartingPosition startInclusive, boolean s } } - private AgnosticLightResultSet executeQuery(SqlConnection conn, FullQuery query) { - return conn.selectLightResultSetUnregisteredQueryWithFetchSize( - query.getQuery(), - sqlRowLimit, - query.getArgs()); - } - - private FullQuery getQuery(StartingPosition startingPos, boolean singleRow) { + private FullQuery getQuery(boolean singleRow) { String pkIndex = PrimaryKeyConstraintNames.get(tableDetails.shortName); FullQuery.Builder queryBuilder = FullQuery.builder() .append("/* GET_CANDIDATE_CELLS_FOR_SWEEPING */ ") @@ -166,7 +186,7 @@ private FullQuery getQuery(StartingPosition startingPos, boolean singleRow) { .append(" FROM ").append(tableDetails.shortName).append(" t") .append(" WHERE ts < ? ", request.sweepTimestamp()); SweepQueryHelpers.appendIgnoredTimestampPredicate(request, queryBuilder); - appendRangePredicates(startingPos, singleRow, queryBuilder); + appendRangePredicates(singleRow, queryBuilder); return queryBuilder .append(" ORDER BY row_name, col_name, ts") .append(") WHERE rownum <= ").append(sqlRowLimit) @@ -174,18 +194,18 @@ private FullQuery getQuery(StartingPosition startingPos, boolean singleRow) { .build(); } - private void appendRangePredicates(StartingPosition startingPos, boolean singleRow, FullQuery.Builder builder) { + private void appendRangePredicates(boolean singleRow, FullQuery.Builder builder) { if (singleRow) { - builder.append(" AND row_name = ?", startingPos.rowName); - if (startingPos.colName.length > 0) { - builder.append(" AND col_name >= ?", startingPos.colName); - if (startingPos.timestamp != null) { - builder.append(" AND (col_name > ? OR ts >= ?)", startingPos.colName, startingPos.timestamp); + builder.append(" AND row_name = ?", startRowInclusive); + if (startColInclusive.length > 0) { + builder.append(" AND col_name >= ?", startColInclusive); + if (startTsInclusive != null) { + builder.append(" AND (col_name > ? OR ts >= ?)", startColInclusive, startTsInclusive); } } } else { RangePredicateHelper.create(false, DBType.ORACLE, builder) - .startCellTsInclusive(startingPos.rowName, startingPos.colName, startingPos.timestamp); + .startCellTsInclusive(startRowInclusive, startColInclusive, startTsInclusive); } } @@ -199,6 +219,51 @@ private void appendEmptyValueFlagSelector(FullQuery.Builder builder) { } builder.append(" THEN 1 ELSE 0 END AS empty_val"); } + + private AgnosticLightResultSet executeQuery(SqlConnection conn, FullQuery query) { + return conn.selectLightResultSetUnregisteredQueryWithFetchSize( + query.getQuery(), + sqlRowLimit, + query.getArgs()); + } + + private void updateCountOfExaminedCellTsPairsInCurrentRow(List results) { + byte[] currentRow = startRowInclusive; + for (CellTsPairInfo cellTs : results) { + if (!Arrays.equals(currentRow, cellTs.rowName)) { + currentRow = cellTs.rowName; + cellTsPairsAlreadyExaminedInCurrentRow = 0L; + } + cellTsPairsAlreadyExaminedInCurrentRow += 1; + } + } + + private void computeNextStartPosition(List results, boolean scannedSingleRow) { + if (results.size() < sqlRowLimit) { + if (scannedSingleRow) { + // If we scanned a single row and reached the end, we just restart + // from the lexicographically next row + byte[] nextRow = RangeRequests.getNextStartRowUnlessTerminal(false, startRowInclusive); + if (nextRow == null) { + reachedEnd = true; + } else { + startRowInclusive = nextRow; + startColInclusive = PtBytes.EMPTY_BYTE_ARRAY; + startTsInclusive = null; + cellTsPairsAlreadyExaminedInCurrentRow = 0L; + } + } else { + reachedEnd = true; + } + } else { + CellTsPairInfo lastResult = Iterables.getLast(results); + Preconditions.checkState(lastResult.ts != Long.MAX_VALUE); + startRowInclusive = lastResult.rowName; + startColInclusive = lastResult.colName; + startTsInclusive = lastResult.ts + 1; + } + } + } private static class TableDetails { @@ -209,7 +274,6 @@ private static class TableDetails { this.shortName = shortName; this.hasOverflow = hasOverflow; } - } private TableDetails getTableDetailsUsingNewConnection(TableReference tableRef) { diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/CandidatePageJoiningIterator.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/CandidatePageJoiningIterator.java deleted file mode 100644 index 0918e8b0700..00000000000 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/CandidatePageJoiningIterator.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres; - -import java.util.Iterator; -import java.util.List; - -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.Lists; -import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweeping; - -public class CandidatePageJoiningIterator extends AbstractIterator> { - private final Iterator> sourceIterator; - private final int minPageSize; - - public CandidatePageJoiningIterator(Iterator> sourceIterator, int minPageSize) { - this.sourceIterator = sourceIterator; - this.minPageSize = minPageSize; - } - - @Override - protected List computeNext() { - if (!sourceIterator.hasNext()) { - return endOfData(); - } else { - List page = sourceIterator.next(); - int pageSize = getSizeOfPage(page); - if (pageSize >= minPageSize) { - return page; - } else { - List ret = Lists.newArrayList(); - int retSize = pageSize; - ret.addAll(page); - // The order is important: calling hasNext() is expensive, - // so we should check first if we reached the desired page size - while (retSize < minPageSize && sourceIterator.hasNext()) { - List nextPage = sourceIterator.next(); - retSize += getSizeOfPage(nextPage); - ret.addAll(nextPage); - } - return ret.isEmpty() ? endOfData() : ret; - } - } - } - - private static int getSizeOfPage(List page) { - // If a cell doesn't have any timestamps (i.e., is not a candidate), we don't want count - // it as zero because it still takes memory. Hence max(1, ...). - return page.stream().mapToInt(c -> Math.max(1, c.sortedTimestamps().length)).sum(); - } -} diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoaderFactory.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java similarity index 60% rename from atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoaderFactory.java rename to atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java index d469a2b3a30..629a01abd3a 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoaderFactory.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java @@ -18,8 +18,14 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; +import javax.annotation.Nullable; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.palantir.atlasdb.encoding.PtBytes; import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweepingRequest; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.keyvalue.dbkvs.impl.ConnectionSupplier; @@ -27,94 +33,108 @@ import com.palantir.atlasdb.keyvalue.dbkvs.impl.FullQuery; import com.palantir.atlasdb.keyvalue.dbkvs.impl.SqlConnectionSupplier; import com.palantir.atlasdb.keyvalue.dbkvs.impl.ranges.RangePredicateHelper; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CandidatePagingState; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CandidatePagingState.StartingPosition; +import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairInfo; import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairLoader; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairLoaderFactory; import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.SweepQueryHelpers; import com.palantir.nexus.db.DBType; import com.palantir.nexus.db.sql.AgnosticLightResultRow; import com.palantir.nexus.db.sql.AgnosticLightResultSet; -public class PostgresCellTsPageLoaderFactory implements CellTsPairLoaderFactory { +public class PostgresCellTsPageLoader implements CellTsPairLoader { private final PostgresPrefixedTableNames prefixedTableNames; private final SqlConnectionSupplier connectionPool; private static final int DEFAULT_BATCH_SIZE = 1000; - public PostgresCellTsPageLoaderFactory(PostgresPrefixedTableNames prefixedTableNames, + public PostgresCellTsPageLoader(PostgresPrefixedTableNames prefixedTableNames, SqlConnectionSupplier connectionPool) { this.prefixedTableNames = prefixedTableNames; this.connectionPool = connectionPool; } @Override - public CellTsPairLoader createCellTsLoader(TableReference tableRef, CandidateCellForSweepingRequest request) { - return new Loader(connectionPool, + public Iterator> createPageIterator(TableReference tableRef, + CandidateCellForSweepingRequest request) { + return new PageIterator( + connectionPool, request, - request.batchSizeHint().orElse(DEFAULT_BATCH_SIZE), + Math.max(1, request.batchSizeHint().orElse(DEFAULT_BATCH_SIZE)), DbKvs.internalTableName(tableRef), - prefixedTableNames.get(tableRef)); + prefixedTableNames.get(tableRef), + request.startRowInclusive()); } - private static class Loader implements CellTsPairLoader { + private static class PageIterator implements Iterator> { final SqlConnectionSupplier connectionPool; final CandidateCellForSweepingRequest request; final int sqlRowLimit; final String tableName; final String prefixedTableName; - Loader(SqlConnectionSupplier connectionPool, - CandidateCellForSweepingRequest request, - int sqlRowLimit, - String tableName, - String prefixedTableName) { + byte[] startRowInclusive; + byte[] startColInclusive = PtBytes.EMPTY_BYTE_ARRAY; + @Nullable Long startTsInclusive = null; + boolean reachedEnd = false; + + PageIterator(SqlConnectionSupplier connectionPool, CandidateCellForSweepingRequest request, int sqlRowLimit, + String tableName, String prefixedTableName, byte[] startRowInclusive) { this.connectionPool = connectionPool; this.request = request; this.sqlRowLimit = sqlRowLimit; this.tableName = tableName; this.prefixedTableName = prefixedTableName; + this.startRowInclusive = startRowInclusive; } @Override - public Page loadNextPage(StartingPosition startInclusive, - long cellsAlreadyExaminedInStartingRow) { - try (ConnectionSupplier conns = new ConnectionSupplier(connectionPool); - AgnosticLightResultSet resultSet = selectNextPage(conns, startInclusive)) { - List cellTsPairs = readResultSet(resultSet); - return new Page(cellTsPairs, false, cellTsPairs.size() < sqlRowLimit); - } + public boolean hasNext() { + return !reachedEnd; } - private AgnosticLightResultSet selectNextPage(ConnectionSupplier conns, StartingPosition startingPosition) { - FullQuery query = getQuery(startingPosition); - return conns.get().selectLightResultSetUnregisteredQuery(query.getQuery(), query.getArgs()); + // We don't use AbstractIterator to make sure hasNext() is fast and doesn't actually load the next page. + // As a downside, our iterator can return an empty page at the end. + // However, we can just filter out empty pages later. + @Override + public List next() { + Preconditions.checkState(hasNext()); + List cellTsPairs = loadNextPage(); + computeNextStartPosition(cellTsPairs); + return cellTsPairs; } - private List readResultSet(AgnosticLightResultSet resultSet) { - List ret = new ArrayList<>(); - for (AgnosticLightResultRow row : resultSet) { - byte[] rowName = row.getBytes("row_name"); - byte[] colName = row.getBytes("col_name"); - if (request.shouldCheckIfLatestValueIsEmpty()) { - long[] sortedTimestamps = castAndSortTimestamps((Object[]) row.getArray("timestamps")); - boolean isLatestValEmpty = row.getBoolean("latest_val_empty"); - for (int i = 0; i < sortedTimestamps.length - 1; ++i) { - ret.add(new CandidatePagingState.CellTsPairInfo(rowName, colName, sortedTimestamps[i], false)); + private List loadNextPage() { + try (ConnectionSupplier conns = new ConnectionSupplier(connectionPool); + AgnosticLightResultSet resultSet = selectNextPage(conns)) { + List ret = new ArrayList<>(); + for (AgnosticLightResultRow row : resultSet) { + byte[] rowName = row.getBytes("row_name"); + byte[] colName = row.getBytes("col_name"); + if (request.shouldCheckIfLatestValueIsEmpty()) { + long[] sortedTimestamps = castAndSortTimestamps((Object[]) row.getArray("timestamps")); + boolean isLatestValEmpty = row.getBoolean("latest_val_empty"); + for (int i = 0; i < sortedTimestamps.length - 1; ++i) { + ret.add(new CellTsPairInfo(rowName, colName, sortedTimestamps[i], false)); + } + // For the maximum timestamp, we know whether its value is empty or not, + // so we handle it separately + ret.add(new CellTsPairInfo( + rowName, colName, sortedTimestamps[sortedTimestamps.length - 1], isLatestValEmpty)); + } else { + long ts = row.getLong("ts"); + ret.add(new CellTsPairInfo(rowName, colName, ts, false)); } - // For the maximum timestamp, we know whether its value is empty or not, so we handle it separately - ret.add(new CandidatePagingState.CellTsPairInfo( - rowName, colName, sortedTimestamps[sortedTimestamps.length - 1], isLatestValEmpty)); - } else { - long ts = row.getLong("ts"); - ret.add(new CandidatePagingState.CellTsPairInfo(rowName, colName, ts, false)); } + return ret; } - return ret; } - private FullQuery getQuery(StartingPosition startingPos) { + private AgnosticLightResultSet selectNextPage(ConnectionSupplier conns) { + FullQuery query = getQuery(); + return conns.get().selectLightResultSetUnregisteredQuery(query.getQuery(), query.getArgs()); + } + + private FullQuery getQuery() { if (request.shouldCheckIfLatestValueIsEmpty()) { FullQuery.Builder queryBuilder = FullQuery.builder() .append("/* GET_CANDIDATE_CELLS_FOR_SWEEPING_THOROUGH(").append(tableName).append(") */") @@ -129,7 +149,7 @@ private FullQuery getQuery(StartingPosition startingPos) { .append(" WHERE ts < ? ", request.sweepTimestamp()); SweepQueryHelpers.appendIgnoredTimestampPredicate(request, queryBuilder); RangePredicateHelper.create(false, DBType.POSTGRESQL, queryBuilder) - .startCellTsInclusive(startingPos.rowName, startingPos.colName, startingPos.timestamp); + .startCellTsInclusive(startRowInclusive, startColInclusive, startTsInclusive); return queryBuilder .append(" ORDER BY row_name, col_name, ts") .append(" LIMIT ").append(sqlRowLimit) @@ -151,13 +171,25 @@ private FullQuery getQuery(StartingPosition startingPos) { .append(" WHERE ts < ? ", request.sweepTimestamp()); SweepQueryHelpers.appendIgnoredTimestampPredicate(request, queryBuilder); RangePredicateHelper.create(false, DBType.POSTGRESQL, queryBuilder) - .startCellTsInclusive(startingPos.rowName, startingPos.colName, startingPos.timestamp); + .startCellTsInclusive(startRowInclusive, startColInclusive, startTsInclusive); return queryBuilder .append(" ORDER BY row_name, col_name, ts") .append(" LIMIT ").append(sqlRowLimit) .build(); } } + + private void computeNextStartPosition(List results) { + if (results.size() < sqlRowLimit) { + reachedEnd = true; + } else { + CellTsPairInfo lastResult = Iterables.getLast(results); + Preconditions.checkState(lastResult.ts != Long.MAX_VALUE); + startRowInclusive = lastResult.rowName; + startColInclusive = lastResult.colName; + startTsInclusive = lastResult.ts + 1; + } + } } // Postgres doesn't guarantee the order of results of ARRAY_AGG, so we sort the timestamps ourselves. diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidateGroupingIterator.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidateGroupingIterator.java new file mode 100644 index 00000000000..35fa17a9171 --- /dev/null +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidateGroupingIterator.java @@ -0,0 +1,120 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; + +import com.google.common.base.Preconditions; +import com.palantir.atlasdb.encoding.PtBytes; +import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweeping; +import com.palantir.atlasdb.keyvalue.api.Cell; +import com.palantir.atlasdb.keyvalue.api.ImmutableCandidateCellForSweeping; + +import gnu.trove.list.TLongList; +import gnu.trove.list.array.TLongArrayList; + +public final class CandidateGroupingIterator implements Iterator> { + private final Iterator> cellTsIterator; + + private byte[] currentRowName = PtBytes.EMPTY_BYTE_ARRAY; + private byte[] currentColName = PtBytes.EMPTY_BYTE_ARRAY; + private final TLongList currentCellTimestamps = new TLongArrayList(); + private boolean currentIsLatestValueEmpty = false; + private long cellTsPairsExamined = 0L; + + private CandidateGroupingIterator(Iterator> cellTsIterator) { + this.cellTsIterator = cellTsIterator; + } + + /** The 'cellTsIterator' is expected to return (cell, ts) pairs in strict lexicographically increasing order. + **/ + public static Iterator> create(Iterator> cellTsIterator) { + return new CandidateGroupingIterator(cellTsIterator); + } + + @Override + public boolean hasNext() { + return cellTsIterator.hasNext(); + } + + @Override + public List next() { + Preconditions.checkState(hasNext()); + List cellTsBatch = cellTsIterator.next(); + List candidates = new ArrayList<>(); + for (CellTsPairInfo cellTs : cellTsBatch) { + checkCurrentCellAndUpdateIfNecessary(cellTs).ifPresent(candidates::add); + if (currentCellTimestamps.size() > 0) { + // We expect the timestamps in ascending order. This check costs us a few CPU cycles + // but it's worth it for paranoia reasons - mistaking a cell with data for an empty one + // can cause data corruption. + Preconditions.checkArgument(cellTs.ts > currentCellTimestamps.get(currentCellTimestamps.size() - 1), + "Timestamps for each cell must be fed in strictly increasing order"); + } + updateStateAfterSingleCellTsPairProcessed(cellTs); + } + if (!cellTsIterator.hasNext()) { + getCurrentCandidate().ifPresent(candidates::add); + } + return candidates; + } + + private void updateStateAfterSingleCellTsPairProcessed(CellTsPairInfo cellTs) { + currentIsLatestValueEmpty = cellTs.hasEmptyValue; + currentCellTimestamps.add(cellTs.ts); + cellTsPairsExamined += 1; + } + + private Optional checkCurrentCellAndUpdateIfNecessary(CellTsPairInfo cellTs) { + if (isCurrentCell(cellTs)) { + return Optional.empty(); + } else { + Optional candidate = getCurrentCandidate(); + updateStateForNewCell(cellTs); + return candidate; + } + } + + private boolean isCurrentCell(CellTsPairInfo cellTs) { + return Arrays.equals(currentRowName, cellTs.rowName) && Arrays.equals(currentColName, cellTs.colName); + } + + private Optional getCurrentCandidate() { + if (currentCellTimestamps.isEmpty()) { + return Optional.empty(); + } else { + return Optional.of(ImmutableCandidateCellForSweeping.builder() + .cell(Cell.create(currentRowName, currentColName)) + .sortedTimestamps(currentCellTimestamps.toArray()) + .isLatestValueEmpty(currentIsLatestValueEmpty) + .numCellsTsPairsExamined(cellTsPairsExamined) + .build()); + } + } + + private void updateStateForNewCell(CellTsPairInfo cell) { + currentCellTimestamps.clear(); + currentRowName = cell.rowName; + currentColName = cell.colName; + currentIsLatestValueEmpty = false; + } + +} diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidatePagingState.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidatePagingState.java deleted file mode 100644 index 724cf818a91..00000000000 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidatePagingState.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Optional; - -import javax.annotation.Nullable; - -import com.google.common.base.Preconditions; -import com.palantir.atlasdb.encoding.PtBytes; -import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweeping; -import com.palantir.atlasdb.keyvalue.api.Cell; -import com.palantir.atlasdb.keyvalue.api.ImmutableCandidateCellForSweeping; -import com.palantir.atlasdb.keyvalue.api.RangeRequests; - -import gnu.trove.list.TLongList; -import gnu.trove.list.array.TLongArrayList; - -public final class CandidatePagingState { - private byte[] currentRowName; - private byte[] currentColName = PtBytes.EMPTY_BYTE_ARRAY; - private Long maxSeenTimestampForCurrentCell = null; - private final TLongList currentCellTimestamps = new TLongArrayList(); - private boolean currentIsLatestValueEmpty = false; - private long cellTsPairsExamined = 0L; - private long cellTsPairsExaminedInCurrentRow = 0L; - private boolean reachedEnd = false; - - private CandidatePagingState(byte[] currentRowName) { - this.currentRowName = currentRowName; - } - - public static CandidatePagingState create(byte[] startRowInclusive) { - return new CandidatePagingState(startRowInclusive); - } - - /** - * Semantically different from Cassandra's CellWithTimestamp, because: - * (a) colName can be an empty byte array (indicating starting from the beginning of the row) - * (b) timestamp is Nullable. - */ - public static class StartingPosition { - public final byte[] rowName; - public final byte[] colName; - @Nullable public final Long timestamp; - - public StartingPosition(byte[] rowName, byte[] colName, Long timestamp) { - this.rowName = rowName; - this.colName = colName; - this.timestamp = timestamp; - } - } - - public static class CellTsPairInfo { - public final byte[] rowName; - public final byte[] colName; - public final long ts; - public final boolean isEmptyValue; - - public CellTsPairInfo(byte[] rowName, byte[] colName, long ts, boolean isEmptyValue) { - this.rowName = rowName; - this.colName = colName; - this.ts = ts; - this.isEmptyValue = isEmptyValue; - } - } - - /** The caller is expected to feed (cell, ts) pairs in strict lexicographically increasing order, - * even across several batches. - **/ - public List processBatch(List cellTsPairs, boolean reachedEndOfResults) { - List candidates = new ArrayList<>(); - for (CellTsPairInfo cellTs : cellTsPairs) { - checkCurrentCellAndUpdateIfNecessary(cellTs).ifPresent(candidates::add); - if (maxSeenTimestampForCurrentCell != null) { - // We expect the timestamps in ascending order. This check costs us a few CPU cycles - // but it's worth it for paranoia reasons - mistaking a cell with data for an empty one - // can cause data corruption. - Preconditions.checkArgument(cellTs.ts > maxSeenTimestampForCurrentCell, - "Timestamps for each cell must be fed in strictly increasing order"); - } - updateStateAfterSingleCellProcessed(cellTs); - } - if (reachedEndOfResults) { - getCurrentCandidate().ifPresent(candidates::add); - reachedEnd = true; - } - return candidates; - } - - private Optional checkCurrentCellAndUpdateIfNecessary(CellTsPairInfo cellTs) { - boolean sameRow = Arrays.equals(currentRowName, cellTs.rowName); - boolean sameCol = Arrays.equals(currentColName, cellTs.colName); - if (!sameRow) { - cellTsPairsExaminedInCurrentRow = 0L; - } - - if (sameRow && sameCol) { - return Optional.empty(); - } else { - Optional candidate = getCurrentCandidate(); - updateStateForNewCell(cellTs); - return candidate; - } - } - - private Optional getCurrentCandidate() { - if (currentCellTimestamps.isEmpty()) { - return Optional.empty(); - } else { - return Optional.of(ImmutableCandidateCellForSweeping.builder() - .cell(Cell.create(currentRowName, currentColName)) - .sortedTimestamps(currentCellTimestamps.toArray()) - .isLatestValueEmpty(currentIsLatestValueEmpty) - .numCellsTsPairsExamined(cellTsPairsExamined) - .build()); - } - } - - private void updateStateForNewCell(CellTsPairInfo cell) { - currentCellTimestamps.clear(); - maxSeenTimestampForCurrentCell = null; - currentRowName = cell.rowName; - currentColName = cell.colName; - currentIsLatestValueEmpty = false; - } - - private void updateStateAfterSingleCellProcessed(CellTsPairInfo cellTs) { - maxSeenTimestampForCurrentCell = cellTs.ts; - currentIsLatestValueEmpty = cellTs.isEmptyValue; - currentCellTimestamps.add(cellTs.ts); - cellTsPairsExamined += 1; - cellTsPairsExaminedInCurrentRow += 1; - } - - public Optional getNextStartingPosition() { - if (reachedEnd) { - return Optional.empty(); - } else { - return Optional.of(new StartingPosition(currentRowName, currentColName, getNextStartTimestamp())); - } - } - - @Nullable - private Long getNextStartTimestamp() { - if (maxSeenTimestampForCurrentCell == null) { - return null; - } else { - // This can never happen because we request 'WHERE ts < ?', where '?' is some 'long' value. - // So if the timestamp is strictly less than another 'long', it can not be Long.MAX_VALUE. - // But we check anyway for general paranoia reasons. - Preconditions.checkState(maxSeenTimestampForCurrentCell != Long.MAX_VALUE, - "Timestamps must be strictly less than Long.MAX_VALUE"); - return maxSeenTimestampForCurrentCell + 1; - } - } - - public long getCellTsPairsExaminedInCurrentRow() { - return cellTsPairsExaminedInCurrentRow; - } - - public void restartFromNextRow() { - @Nullable byte[] nextRow = RangeRequests.getNextStartRowUnlessTerminal(false, currentRowName); - if (nextRow == null) { - reachedEnd = true; - } else { - updateStateForNewRow(nextRow); - } - } - - private void updateStateForNewRow(byte[] nextRow) { - currentRowName = nextRow; - currentColName = PtBytes.EMPTY_BYTE_ARRAY; - maxSeenTimestampForCurrentCell = null; - currentCellTimestamps.clear(); - currentIsLatestValueEmpty = false; - cellTsPairsExaminedInCurrentRow = 0L; - reachedEnd = false; - } - -} diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairLoaderFactory.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairInfo.java similarity index 63% rename from atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairLoaderFactory.java rename to atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairInfo.java index bb3fd522558..77ddd6f6fb7 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairLoaderFactory.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairInfo.java @@ -16,11 +16,16 @@ package com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep; -import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweepingRequest; -import com.palantir.atlasdb.keyvalue.api.TableReference; - -public interface CellTsPairLoaderFactory { - - CellTsPairLoader createCellTsLoader(TableReference tableRef, CandidateCellForSweepingRequest request); +public class CellTsPairInfo { + public final byte[] rowName; + public final byte[] colName; + public final long ts; + public final boolean hasEmptyValue; + public CellTsPairInfo(byte[] rowName, byte[] colName, long ts, boolean hasEmptyValue) { + this.rowName = rowName; + this.colName = colName; + this.ts = ts; + this.hasEmptyValue = hasEmptyValue; + } } diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairLoader.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairLoader.java index a66de224d1f..0cf02ed6216 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairLoader.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairLoader.java @@ -16,25 +16,14 @@ package com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep; +import java.util.Iterator; import java.util.List; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CandidatePagingState.CellTsPairInfo; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CandidatePagingState.StartingPosition; +import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweepingRequest; +import com.palantir.atlasdb.keyvalue.api.TableReference; public interface CellTsPairLoader { - class Page { - public final List entries; - public final boolean scannedSingleRow; - public final boolean reachedEnd; - - public Page(List entries, boolean scannedSingleRow, boolean reachedEnd) { - this.entries = entries; - this.scannedSingleRow = scannedSingleRow; - this.reachedEnd = reachedEnd; - } - } - - Page loadNextPage(StartingPosition startInclusive, long cellsAlreadyExaminedInStartingRow); + Iterator> createPageIterator(TableReference tableRef, CandidateCellForSweepingRequest request); } diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/DbKvsGetCandidateCellsForSweeping.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/DbKvsGetCandidateCellsForSweeping.java index baae915db89..5107b951b00 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/DbKvsGetCandidateCellsForSweeping.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/DbKvsGetCandidateCellsForSweeping.java @@ -18,61 +18,26 @@ import java.util.Iterator; import java.util.List; -import java.util.Optional; -import com.google.common.collect.AbstractIterator; import com.google.common.collect.Iterators; import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweeping; import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweepingRequest; import com.palantir.atlasdb.keyvalue.api.TableReference; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CandidatePagingState.StartingPosition; public class DbKvsGetCandidateCellsForSweeping { - private final CellTsPairLoaderFactory cellTsPairLoaderFactory; + private final CellTsPairLoader cellTsPairLoaderFactory; - public DbKvsGetCandidateCellsForSweeping(CellTsPairLoaderFactory cellTsPairLoaderFactory) { + public DbKvsGetCandidateCellsForSweeping(CellTsPairLoader cellTsPairLoaderFactory) { this.cellTsPairLoaderFactory = cellTsPairLoaderFactory; } public Iterator> getCandidateCellsForSweeping( TableReference tableRef, CandidateCellForSweepingRequest request) { - CellTsPairLoader loader = cellTsPairLoaderFactory.createCellTsLoader(tableRef, request); - CandidatePagingState state = CandidatePagingState.create(request.startRowInclusive()); - return Iterators.filter(new PageIterator(loader, state), page -> !page.isEmpty()); + Iterator> cellTsIter = cellTsPairLoaderFactory.createPageIterator(tableRef, request); + Iterator> rawIter = CandidateGroupingIterator.create(cellTsIter); + return Iterators.filter(rawIter, page -> !page.isEmpty()); } - private class PageIterator extends AbstractIterator> { - private final CellTsPairLoader pageLoader; - private final CandidatePagingState state; - - PageIterator(CellTsPairLoader pageLoader, CandidatePagingState state) { - this.pageLoader = pageLoader; - this.state = state; - } - - @Override - protected List computeNext() { - Optional startingPosition = state.getNextStartingPosition(); - if (startingPosition.isPresent()) { - return loadNextPage(startingPosition.get()); - } else { - return endOfData(); - } - } - - private List loadNextPage(StartingPosition startingPosition) { - CellTsPairLoader.Page page = pageLoader.loadNextPage( - startingPosition, state.getCellTsPairsExaminedInCurrentRow()); - List candidates = state.processBatch( - page.entries, page.reachedEnd); - if (page.reachedEnd && page.scannedSingleRow) { - // If we reached the end of results in the single-row mode, it doesn't mean that we reached - // the end of the table. We need to restart from the next row in the normal mode. - state.restartFromNextRow(); - } - return candidates; - } - } } diff --git a/atlasdb-dbkvs/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/CandidatePageJoiningIteratorTest.java b/atlasdb-dbkvs/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/CandidatePageJoiningIteratorTest.java deleted file mode 100644 index 977f1a58db1..00000000000 --- a/atlasdb-dbkvs/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/CandidatePageJoiningIteratorTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.List; - -import org.junit.Test; - -import com.google.common.collect.ImmutableList; -import com.google.common.primitives.Ints; -import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweeping; -import com.palantir.atlasdb.keyvalue.api.Cell; -import com.palantir.atlasdb.keyvalue.api.ImmutableCandidateCellForSweeping; - -public class CandidatePageJoiningIteratorTest { - - @Test - public void emptySequenceStaysEmpty() { - // nothing -> nothing - assertTrue(transform(2, ImmutableList.of()).isEmpty()); - } - - @Test - public void oneEmptyListBecomesEmptySequence() { - // [] -> nothing - assertTrue(transform(2, ImmutableList.of(ImmutableList.of())).isEmpty()); - } - - @Test - public void manyEmptyListsBecomeEmptySequence() { - // [], [], [] -> nothing - assertTrue( - transform(2, ImmutableList.of(ImmutableList.of(), ImmutableList.of(), ImmutableList.of())).isEmpty()); - } - - @Test - public void pagesThatAreBigEnoughArePreservedAsIs() { - // min page size 2: [2], [2] -> [2], [2] - assertEquals( - ImmutableList.of(ImmutableList.of(candidate(100, 2)), ImmutableList.of(candidate(110, 2))), - transform(2, - ImmutableList.of(ImmutableList.of(candidate(100, 2)), ImmutableList.of(candidate(110, 2))))); - } - - @Test - public void smallPagesAreJoined() { - // min page size 2: [1], [], [2], [1] -> [1, 2], [1] - assertEquals( - ImmutableList.of( - ImmutableList.of(candidate(100, 1), candidate(110, 2)), - ImmutableList.of(candidate(120, 1))), - transform(2, ImmutableList.of( - ImmutableList.of(candidate(100, 1)), - ImmutableList.of(), - ImmutableList.of(candidate(110, 2)), - ImmutableList.of(candidate(120, 1))))); - } - - @Test - public void nonCandidateCellsCountAsOne() { - // min page size 2: [0], [0], [0], -> [0, 0], [0] - assertEquals( - ImmutableList.of( - ImmutableList.of(candidate(100, 0), candidate(110, 0)), - ImmutableList.of(candidate(120, 0))), - transform(2, ImmutableList.of( - ImmutableList.of(candidate(100, 0)), - ImmutableList.of(candidate(110, 0)), - ImmutableList.of(candidate(120, 0))))); - } - - private static List> transform(int minPageSize, - List> inputPages) { - return ImmutableList.copyOf(new CandidatePageJoiningIterator(inputPages.iterator(), minPageSize)); - } - - private static CandidateCellForSweeping candidate(int rowName, int numTs) { - return ImmutableCandidateCellForSweeping.builder() - .cell(Cell.create(Ints.toByteArray(rowName), Ints.toByteArray(1))) - .isLatestValueEmpty(false) - .numCellsTsPairsExamined(123) - .sortedTimestamps(new long[numTs]) - .build(); - } - -} diff --git a/atlasdb-dbkvs/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidateGroupingIteratorTest.java b/atlasdb-dbkvs/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidateGroupingIteratorTest.java new file mode 100644 index 00000000000..cbea9cad34c --- /dev/null +++ b/atlasdb-dbkvs/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidateGroupingIteratorTest.java @@ -0,0 +1,124 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import com.google.common.collect.ImmutableList; +import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweeping; +import com.palantir.atlasdb.keyvalue.api.Cell; +import com.palantir.atlasdb.keyvalue.api.ImmutableCandidateCellForSweeping; + +public class CandidateGroupingIteratorTest { + + @Rule + public final ExpectedException thrown = ExpectedException.none(); + + @Test + public void emptyInput() { + assertThat(group(ImmutableList.of()), empty()); + } + + @Test + public void singleCellInOnePage() { + assertThat( + group(ImmutableList.of(ImmutableList.of(cellTs("a", "x", 10L), cellTs("a", "x", 20L)))), + equalTo(ImmutableList.of(ImmutableList.of(candidate("a", "x", 2L, false, 10L, 20L))))); + } + + @Test + public void singleCellSpanningTwoPages() { + assertThat( + group(ImmutableList.of( + ImmutableList.of(cellTs("a", "x", 10L), cellTs("a", "x", 20L)), + ImmutableList.of(cellTs("a", "x", 30L)))), + equalTo(ImmutableList.of( + ImmutableList.of(), + ImmutableList.of(candidate("a", "x", 3L, false, 10L, 20L, 30L))))); + } + + @Test + public void severalCellsInSinglePage() { + assertThat( + group(ImmutableList.of( + ImmutableList.of(cellTs("a", "x", 10L), cellTs("a", "y", 10L), cellTs("b", "y", 10L)))), + equalTo(ImmutableList.of( + ImmutableList.of( + candidate("a", "x", 1L, false, 10L), + candidate("a", "y", 2L, false, 10L), + candidate("b", "y", 3L, false, 10L))))); + } + + @Test + public void latestValueEmpty() { + assertThat( + group(ImmutableList.of(ImmutableList.of(cellTs("a", "x", 10L, false), cellTs("a", "x", 20L, true)))), + equalTo(ImmutableList.of(ImmutableList.of(candidate("a", "x", 2L, true, 10L, 20L))))); + } + + @Test + public void nonLatestValueEmpty() { + assertThat( + group(ImmutableList.of(ImmutableList.of(cellTs("a", "x", 10L, true), cellTs("a", "x", 20L, false)))), + equalTo(ImmutableList.of(ImmutableList.of(candidate("a", "x", 2L, false, 10L, 20L))))); + } + + @Test + public void throwIfTimestampsAreOutOfOrder() { + thrown.expectMessage("Timestamps for each cell must be fed in strictly increasing order"); + group(ImmutableList.of(ImmutableList.of(cellTs("a", "x", 20L), cellTs("a", "x", 10L)))); + } + + private static CandidateCellForSweeping candidate(String rowName, + String colName, + long numCellTsPairsExamined, + boolean latestValEmpty, + long... ts) { + return ImmutableCandidateCellForSweeping.builder() + .cell(Cell.create(bytes(rowName), bytes(colName))) + .isLatestValueEmpty(latestValEmpty) + .numCellsTsPairsExamined(numCellTsPairsExamined) + .sortedTimestamps(ts) + .build(); + } + + private List> group(List> cellTsPairBatches) { + return ImmutableList.copyOf(CandidateGroupingIterator.create(cellTsPairBatches.iterator())); + } + + private static CellTsPairInfo cellTs(String rowName, String colName, long ts) { + return cellTs(rowName, colName, ts, false); + } + + private static CellTsPairInfo cellTs(String rowName, String colName, long ts, boolean emptyVal) { + return new CellTsPairInfo(bytes(rowName), bytes(colName), ts, emptyVal); + } + + private static byte[] bytes(String string) { + return string.getBytes(StandardCharsets.UTF_8); + } + +} diff --git a/atlasdb-dbkvs/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidatePagingStateTest.java b/atlasdb-dbkvs/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidatePagingStateTest.java deleted file mode 100644 index e4f2b4cc73a..00000000000 --- a/atlasdb-dbkvs/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidatePagingStateTest.java +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep; - -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.empty; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Optional; - -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -import com.google.common.collect.ImmutableList; -import com.palantir.atlasdb.encoding.PtBytes; -import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweeping; -import com.palantir.atlasdb.keyvalue.api.Cell; -import com.palantir.atlasdb.keyvalue.api.ImmutableCandidateCellForSweeping; -import com.palantir.atlasdb.keyvalue.api.RangeRequests; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CandidatePagingState.CellTsPairInfo; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CandidatePagingState.StartingPosition; - -public class CandidatePagingStateTest { - - @Rule - public ExpectedException thrown = ExpectedException.none(); - - @Test - public void initialStartingPositionFromBeginningOfTable() { - CandidatePagingState state = CandidatePagingState.create(PtBytes.EMPTY_BYTE_ARRAY); - Optional pos = state.getNextStartingPosition(); - assertTrue(pos.isPresent()); - assertArrayEquals(PtBytes.EMPTY_BYTE_ARRAY, pos.get().rowName); - assertArrayEquals(PtBytes.EMPTY_BYTE_ARRAY, pos.get().colName); - assertNull(pos.get().timestamp); - } - - @Test - public void initialStartingPositionFromSpecificRow() { - CandidatePagingState state = CandidatePagingState.create(bytes("foo")); - Optional pos = state.getNextStartingPosition(); - assertTrue(pos.isPresent()); - assertArrayEquals(bytes("foo"), pos.get().rowName); - assertArrayEquals(PtBytes.EMPTY_BYTE_ARRAY, pos.get().colName); - assertNull(pos.get().timestamp); - } - - @Test - public void startingPositionForSecondBatch() { - CandidatePagingState state = CandidatePagingState.create(PtBytes.EMPTY_BYTE_ARRAY); - state.processBatch(ImmutableList.of(cellTs("a", "b", 10L), cellTs("a", "b", 20L)), false); - Optional pos = state.getNextStartingPosition(); - assertTrue(pos.isPresent()); - assertArrayEquals(bytes("a"), pos.get().rowName); - assertArrayEquals(bytes("b"), pos.get().colName); - assertEquals((Long) 21L, pos.get().timestamp); - } - - @Test - public void noNextStartingPositionAfterReachedEnd() { - CandidatePagingState state = CandidatePagingState.create(PtBytes.EMPTY_BYTE_ARRAY); - state.processBatch(ImmutableList.of(cellTs("a", "b", 10L)), true); - Optional pos = state.getNextStartingPosition(); - assertFalse(pos.isPresent()); - } - - @Test - public void reachedEndAfterScanningSingleCell() { - CandidatePagingState state = CandidatePagingState.create(PtBytes.EMPTY_BYTE_ARRAY); - - // We got 2 (cell, ts) pairs with the same cell and reached the end, so we expect a candidate. - List candidates = state.processBatch( - ImmutableList.of(cellTs("a", "b", 10L), cellTs("a", "b", 20L)), true); - assertThat(candidates, contains(candidate("a", "b", false, 2L, 10L, 20L))); - } - - @Test - public void singleCellSpanningTwoBatches() { - CandidatePagingState state = CandidatePagingState.create(PtBytes.EMPTY_BYTE_ARRAY); - - // We got 2 (cell, ts) pairs, both belonging to the same cell, and have not reached the end yet. - // This means that we can't form a candidate yet - there could be more timestamps for that cell - // in the next batch - List firstBatch = state.processBatch( - ImmutableList.of(cellTs("a", "b", 10L), cellTs("a", "b", 20L)), false); - assertThat(firstBatch, empty()); - - List secondBatch = state.processBatch(ImmutableList.of(cellTs("a", "b", 30L)), true); - assertThat(secondBatch, contains(candidate("a", "b", false, 3L, 10L, 20L, 30L))); - } - - @Test - public void severalCellsInOneBatch() { - CandidatePagingState state = CandidatePagingState.create(PtBytes.EMPTY_BYTE_ARRAY); - List batch = state.processBatch( - ImmutableList.of(cellTs("a", "x", 10L), cellTs("a", "y", 10L), cellTs("b", "y", 10L)), true); - assertThat(batch, contains( - candidate("a", "x", false, 1L, 10L), - candidate("a", "y", false, 2L, 10L), - candidate("b", "y", false, 3L, 10L))); - } - - @Test - public void testLatestValueEmpty() { - CandidatePagingState state = CandidatePagingState.create(PtBytes.EMPTY_BYTE_ARRAY); - List batch = state.processBatch( - ImmutableList.of(cellTs("a", "x", 10L, false), cellTs("a", "x", 20L, true)), true); - // Since the latest timestamp has the "isEmptyValue" flag set to true, we expect the candidate to - // indicate that the latest value is empty - assertThat(batch, contains(candidate("a", "x", true, 2L, 10L, 20L))); - } - - @Test - public void testNonLatestValueEmpty() { - CandidatePagingState state = CandidatePagingState.create(PtBytes.EMPTY_BYTE_ARRAY); - List batch = state.processBatch( - ImmutableList.of(cellTs("a", "x", 10L, true), cellTs("a", "x", 20L, false)), true); - // Since the latest timestamp has the "isEmptyValue" flag set to false, we expect the candidate to - // indicate that the latest value is not empty - assertThat(batch, contains(candidate("a", "x", false, 2L, 10L, 20L))); - } - - @Test - public void restartFromNextRow() { - CandidatePagingState state = CandidatePagingState.create(PtBytes.EMPTY_BYTE_ARRAY); - state.processBatch(ImmutableList.of(cellTs("a", "x", 10L)), true); - state.restartFromNextRow(); - Optional pos = state.getNextStartingPosition(); - assertTrue(pos.isPresent()); - // [ 'a', 0 ] is the lexicographically next key after [ 'a' ] - assertArrayEquals(new byte[] { 'a', 0 }, pos.get().rowName); - assertArrayEquals(PtBytes.EMPTY_BYTE_ARRAY, pos.get().colName); - assertNull(pos.get().timestamp); - } - - @Test - public void restartFromNextRowWhenThereIsNoNextRow() { - CandidatePagingState state = CandidatePagingState.create(PtBytes.EMPTY_BYTE_ARRAY); - byte[] rowName = RangeRequests.getLastRowName(); - CellTsPairInfo cellTs = new CellTsPairInfo(rowName, bytes("a"), 10L, false); - state.processBatch(ImmutableList.of(cellTs), false); - state.restartFromNextRow(); - // There is no next row if the current row is lexicographically last - assertFalse(state.getNextStartingPosition().isPresent()); - } - - @Test - public void cellTsPairsExaminedInCurrentRow() { - CandidatePagingState state = CandidatePagingState.create(PtBytes.EMPTY_BYTE_ARRAY); - state.processBatch(ImmutableList.of( - cellTs("a", "x", 10L), cellTs("b", "y", 10L), cellTs("b", "y", 20L), cellTs("b", "z", 30L)), - false); - // Three (cell, ts) pairs in row "b" so far: (b, y, 10), (b, y, 20), (b, z, 30) - assertEquals(3, state.getCellTsPairsExaminedInCurrentRow()); - - state.processBatch(ImmutableList.of(cellTs("b", "z", 40L)), false); - // Added (b, z, 40), so should be 4 total now - assertEquals(4, state.getCellTsPairsExaminedInCurrentRow()); - - state.processBatch(ImmutableList.of(cellTs("c", "z", 40L)), false); - // Started a new row "c", and only examined one entry - assertEquals(1, state.getCellTsPairsExaminedInCurrentRow()); - - state.restartFromNextRow(); - // Nothing examined yet if starting from a fresh row - assertEquals(0, state.getCellTsPairsExaminedInCurrentRow()); - } - - @Test - public void throwIfGotRepeatingTimestamps() { - CandidatePagingState state = CandidatePagingState.create(PtBytes.EMPTY_BYTE_ARRAY); - state.processBatch(ImmutableList.of(cellTs("a", "b", 20L)), false); - thrown.expectMessage("Timestamps for each cell must be fed in strictly increasing order"); - state.processBatch(ImmutableList.of(cellTs("a", "b", 20L)), false); - } - - @Test - public void throwIfTimestampIsMaxLong() { - CandidatePagingState state = CandidatePagingState.create(PtBytes.EMPTY_BYTE_ARRAY); - state.processBatch(ImmutableList.of(cellTs("a", "b", Long.MAX_VALUE)), false); - thrown.expectMessage("Timestamps must be strictly less than Long.MAX_VALUE"); - state.getNextStartingPosition(); - } - - private static CellTsPairInfo cellTs(String row, String col, long ts) { - return cellTs(row, col, ts, false); - } - - private static CellTsPairInfo cellTs(String row, String col, long ts, boolean emptyVal) { - return new CellTsPairInfo(bytes(row), bytes(col), ts, emptyVal); - } - - private static CandidateCellForSweeping candidate(String row, - String col, - boolean emptyLatestVal, - long numCellTsPairsExamined, - long... sortedTimestamps) { - return ImmutableCandidateCellForSweeping.builder() - .cell(Cell.create(bytes(row), bytes(col))) - .isLatestValueEmpty(emptyLatestVal) - .numCellsTsPairsExamined(numCellTsPairsExamined) - .sortedTimestamps(sortedTimestamps) - .build(); - } - - private static byte[] bytes(String string) { - return string.getBytes(StandardCharsets.UTF_8); - } - -} From 5d2d93dac73e86ddb1502d22e6c6897cd421291f Mon Sep 17 00:00:00 2001 From: Glenn Sheasby Date: Tue, 17 Oct 2017 16:13:10 +0100 Subject: [PATCH 02/13] Renames --- .../com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java | 4 ++-- .../impl/sweep/DbKvsGetCandidateCellsForSweeping.java | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java index 69e9b49a8c3..6ac0e31a03e 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java @@ -204,7 +204,7 @@ private static DbKvs createOracle(ExecutorService executor, OverflowValueLoader overflowValueLoader = new OracleOverflowValueLoader(oracleDdlConfig, tableNameGetter); DbKvsGetRange getRange = new OracleGetRange( connections, overflowValueLoader, tableNameGetter, valueStyleCache, oracleDdlConfig); - CellTsPairLoader cellTsPairLoaderFactory = new OracleCellTsPageLoader( + CellTsPairLoader cellTsPageLoader = new OracleCellTsPageLoader( connections, tableNameGetter, valueStyleCache, oracleDdlConfig); return new DbKvs( executor, @@ -214,7 +214,7 @@ private static DbKvs createOracle(ExecutorService executor, new ImmediateSingleBatchTaskRunner(), overflowValueLoader, getRange, - new DbKvsGetCandidateCellsForSweeping(cellTsPairLoaderFactory)); + new DbKvsGetCandidateCellsForSweeping(cellTsPageLoader)); } private DbKvs(ExecutorService executor, diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/DbKvsGetCandidateCellsForSweeping.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/DbKvsGetCandidateCellsForSweeping.java index 5107b951b00..2923c58f392 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/DbKvsGetCandidateCellsForSweeping.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/DbKvsGetCandidateCellsForSweeping.java @@ -26,16 +26,16 @@ public class DbKvsGetCandidateCellsForSweeping { - private final CellTsPairLoader cellTsPairLoaderFactory; + private final CellTsPairLoader cellTsPairLoader; - public DbKvsGetCandidateCellsForSweeping(CellTsPairLoader cellTsPairLoaderFactory) { - this.cellTsPairLoaderFactory = cellTsPairLoaderFactory; + public DbKvsGetCandidateCellsForSweeping(CellTsPairLoader cellTsPairLoader) { + this.cellTsPairLoader = cellTsPairLoader; } public Iterator> getCandidateCellsForSweeping( TableReference tableRef, CandidateCellForSweepingRequest request) { - Iterator> cellTsIter = cellTsPairLoaderFactory.createPageIterator(tableRef, request); + Iterator> cellTsIter = cellTsPairLoader.createPageIterator(tableRef, request); Iterator> rawIter = CandidateGroupingIterator.create(cellTsIter); return Iterators.filter(rawIter, page -> !page.isEmpty()); } From 9ea48bb2ea18fa0427851569fe021e69f0731fb1 Mon Sep 17 00:00:00 2001 From: Glenn Sheasby Date: Tue, 17 Oct 2017 16:14:26 +0100 Subject: [PATCH 03/13] Create OracleCellTsPageLoader.PageIterator.Token class --- .../impl/oracle/OracleCellTsPageLoader.java | 67 +++++++++++++------ 1 file changed, 45 insertions(+), 22 deletions(-) diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java index 75de961591a..d926e6157f6 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java @@ -115,11 +115,20 @@ private static class PageIterator implements Iterator> { final TableDetails tableDetails; final int sqlRowLimit; - byte[] startRowInclusive; - byte[] startColInclusive = PtBytes.EMPTY_BYTE_ARRAY; - @Nullable Long startTsInclusive = null; + private class Token { + byte[] startRowInclusive; + byte[] startColInclusive = PtBytes.EMPTY_BYTE_ARRAY; + @Nullable + Long startTsInclusive = null; + boolean reachedEnd = false; + + Token(byte[] startRowInclusive) { + this.startRowInclusive = startRowInclusive; + } + } + + Token token; long cellTsPairsAlreadyExaminedInCurrentRow = 0L; - boolean reachedEnd = false; PageIterator(SqlConnectionSupplier connectionPool, CandidateCellForSweepingRequest request, TableDetails tableDetails, int sqlRowLimit, byte[] startRowInclusive) { @@ -127,12 +136,12 @@ private static class PageIterator implements Iterator> { this.request = request; this.tableDetails = tableDetails; this.sqlRowLimit = sqlRowLimit; - this.startRowInclusive = startRowInclusive; + this.token = new Token(startRowInclusive); } @Override public boolean hasNext() { - return !reachedEnd; + return !token.reachedEnd; } // We don't use AbstractIterator to make sure hasNext() is fast and doesn't actually load the next page. @@ -196,16 +205,22 @@ private FullQuery getQuery(boolean singleRow) { private void appendRangePredicates(boolean singleRow, FullQuery.Builder builder) { if (singleRow) { - builder.append(" AND row_name = ?", startRowInclusive); - if (startColInclusive.length > 0) { - builder.append(" AND col_name >= ?", startColInclusive); - if (startTsInclusive != null) { - builder.append(" AND (col_name > ? OR ts >= ?)", startColInclusive, startTsInclusive); + builder.append(" AND row_name = ?", token.startRowInclusive); + if (token.startColInclusive.length > 0) { + builder.append(" AND col_name >= ?", token.startColInclusive); + if (token.startTsInclusive != null) { + builder.append(" AND (col_name > ? OR ts >= ?)", + token.startColInclusive, + token.startTsInclusive); } } } else { RangePredicateHelper.create(false, DBType.ORACLE, builder) - .startCellTsInclusive(startRowInclusive, startColInclusive, startTsInclusive); + .startCellTsInclusive( + // TODO can we reuse a value type here? + token.startRowInclusive, + token.startColInclusive, + token.startTsInclusive); } } @@ -228,7 +243,7 @@ private AgnosticLightResultSet executeQuery(SqlConnection conn, FullQuery query) } private void updateCountOfExaminedCellTsPairsInCurrentRow(List results) { - byte[] currentRow = startRowInclusive; + byte[] currentRow = token.startRowInclusive; for (CellTsPairInfo cellTs : results) { if (!Arrays.equals(currentRow, cellTs.rowName)) { currentRow = cellTs.rowName; @@ -243,27 +258,35 @@ private void computeNextStartPosition(List results, boolean scan if (scannedSingleRow) { // If we scanned a single row and reached the end, we just restart // from the lexicographically next row - byte[] nextRow = RangeRequests.getNextStartRowUnlessTerminal(false, startRowInclusive); + byte[] nextRow = RangeRequests.getNextStartRowUnlessTerminal(false, token.startRowInclusive); if (nextRow == null) { - reachedEnd = true; + token.reachedEnd = true; } else { - startRowInclusive = nextRow; - startColInclusive = PtBytes.EMPTY_BYTE_ARRAY; - startTsInclusive = null; + newRow(nextRow); cellTsPairsAlreadyExaminedInCurrentRow = 0L; } } else { - reachedEnd = true; + token.reachedEnd = true; } } else { CellTsPairInfo lastResult = Iterables.getLast(results); Preconditions.checkState(lastResult.ts != Long.MAX_VALUE); - startRowInclusive = lastResult.rowName; - startColInclusive = lastResult.colName; - startTsInclusive = lastResult.ts + 1; + continueRow(lastResult); } } + private void continueRow(CellTsPairInfo lastResult) { + token.startRowInclusive = lastResult.rowName; + token.startColInclusive = lastResult.colName; + token.startTsInclusive = lastResult.ts + 1; + } + + private void newRow(byte[] nextRow) { + token.startRowInclusive = nextRow; + token.startColInclusive = PtBytes.EMPTY_BYTE_ARRAY; + token.startTsInclusive = null; + } + } private static class TableDetails { From 5c7c0e30b5b12662439230ad32a56bd5443a7691 Mon Sep 17 00:00:00 2001 From: Glenn Sheasby Date: Tue, 17 Oct 2017 16:27:21 +0100 Subject: [PATCH 04/13] Create PostgresCellTsPageLaoder.PageIterator.Token --- .../postgres/PostgresCellTsPageLoader.java | 33 ++++++++++++------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java index 629a01abd3a..d373afbfea4 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java @@ -72,10 +72,19 @@ private static class PageIterator implements Iterator> { final String tableName; final String prefixedTableName; - byte[] startRowInclusive; - byte[] startColInclusive = PtBytes.EMPTY_BYTE_ARRAY; - @Nullable Long startTsInclusive = null; - boolean reachedEnd = false; + private class Token { + byte[] startRowInclusive; + byte[] startColInclusive = PtBytes.EMPTY_BYTE_ARRAY; + @Nullable + Long startTsInclusive = null; + boolean reachedEnd = false; + + Token(byte[] startRowInclusive) { + this.startRowInclusive = startRowInclusive; + } + } + + Token token; PageIterator(SqlConnectionSupplier connectionPool, CandidateCellForSweepingRequest request, int sqlRowLimit, String tableName, String prefixedTableName, byte[] startRowInclusive) { @@ -84,12 +93,12 @@ private static class PageIterator implements Iterator> { this.sqlRowLimit = sqlRowLimit; this.tableName = tableName; this.prefixedTableName = prefixedTableName; - this.startRowInclusive = startRowInclusive; + this.token = new Token(startRowInclusive); } @Override public boolean hasNext() { - return !reachedEnd; + return !token.reachedEnd; } // We don't use AbstractIterator to make sure hasNext() is fast and doesn't actually load the next page. @@ -149,7 +158,7 @@ private FullQuery getQuery() { .append(" WHERE ts < ? ", request.sweepTimestamp()); SweepQueryHelpers.appendIgnoredTimestampPredicate(request, queryBuilder); RangePredicateHelper.create(false, DBType.POSTGRESQL, queryBuilder) - .startCellTsInclusive(startRowInclusive, startColInclusive, startTsInclusive); + .startCellTsInclusive(token.startRowInclusive, token.startColInclusive, token.startTsInclusive); return queryBuilder .append(" ORDER BY row_name, col_name, ts") .append(" LIMIT ").append(sqlRowLimit) @@ -171,7 +180,7 @@ private FullQuery getQuery() { .append(" WHERE ts < ? ", request.sweepTimestamp()); SweepQueryHelpers.appendIgnoredTimestampPredicate(request, queryBuilder); RangePredicateHelper.create(false, DBType.POSTGRESQL, queryBuilder) - .startCellTsInclusive(startRowInclusive, startColInclusive, startTsInclusive); + .startCellTsInclusive(token.startRowInclusive, token.startColInclusive, token.startTsInclusive); return queryBuilder .append(" ORDER BY row_name, col_name, ts") .append(" LIMIT ").append(sqlRowLimit) @@ -181,13 +190,13 @@ private FullQuery getQuery() { private void computeNextStartPosition(List results) { if (results.size() < sqlRowLimit) { - reachedEnd = true; + token.reachedEnd = true; } else { CellTsPairInfo lastResult = Iterables.getLast(results); Preconditions.checkState(lastResult.ts != Long.MAX_VALUE); - startRowInclusive = lastResult.rowName; - startColInclusive = lastResult.colName; - startTsInclusive = lastResult.ts + 1; + token.startRowInclusive = lastResult.rowName; + token.startColInclusive = lastResult.colName; + token.startTsInclusive = lastResult.ts + 1; } } } From cbbfff03570df3bc58670cacde24a484eb1c6b0a Mon Sep 17 00:00:00 2001 From: Glenn Sheasby Date: Tue, 17 Oct 2017 16:27:41 +0100 Subject: [PATCH 05/13] Fix Precondition having-next-ness is a state, not an argument --- .../keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java index d926e6157f6..59688cfaef0 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java @@ -149,7 +149,7 @@ public boolean hasNext() { // However, we can just filter out empty pages later. @Override public List next() { - Preconditions.checkArgument(hasNext()); + Preconditions.checkState(hasNext()); boolean singleRow = shouldScanSingleRow(); List cellTsPairs = loadPage(singleRow); updateCountOfExaminedCellTsPairsInCurrentRow(cellTsPairs); From e6aa802f0166b0a89037998305e7e315697b71ca Mon Sep 17 00:00:00 2001 From: Glenn Sheasby Date: Tue, 17 Oct 2017 16:34:12 +0100 Subject: [PATCH 06/13] One Token To Rull Them All Well... two, actually. This one is a CellTsPairToken. We already had a Token class in DbKvs :-/ --- .../impl/oracle/OracleCellTsPageLoader.java | 19 ++--------- .../postgres/PostgresCellTsPageLoader.java | 20 ++--------- .../dbkvs/impl/sweep/CellTsPairToken.java | 33 +++++++++++++++++++ 3 files changed, 39 insertions(+), 33 deletions(-) create mode 100644 atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairToken.java diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java index 59688cfaef0..18bc016f2d3 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java @@ -21,8 +21,6 @@ import java.util.Iterator; import java.util.List; -import javax.annotation.Nullable; - import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.palantir.atlasdb.encoding.PtBytes; @@ -39,6 +37,7 @@ import com.palantir.atlasdb.keyvalue.dbkvs.impl.ranges.RangePredicateHelper; import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairInfo; import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairLoader; +import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairToken; import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.SweepQueryHelpers; import com.palantir.atlasdb.keyvalue.impl.TableMappingNotFoundException; import com.palantir.nexus.db.DBType; @@ -115,19 +114,7 @@ private static class PageIterator implements Iterator> { final TableDetails tableDetails; final int sqlRowLimit; - private class Token { - byte[] startRowInclusive; - byte[] startColInclusive = PtBytes.EMPTY_BYTE_ARRAY; - @Nullable - Long startTsInclusive = null; - boolean reachedEnd = false; - - Token(byte[] startRowInclusive) { - this.startRowInclusive = startRowInclusive; - } - } - - Token token; + CellTsPairToken token; long cellTsPairsAlreadyExaminedInCurrentRow = 0L; PageIterator(SqlConnectionSupplier connectionPool, CandidateCellForSweepingRequest request, @@ -136,7 +123,7 @@ private class Token { this.request = request; this.tableDetails = tableDetails; this.sqlRowLimit = sqlRowLimit; - this.token = new Token(startRowInclusive); + this.token = new CellTsPairToken(startRowInclusive); } @Override diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java index d373afbfea4..8f79ff4d61a 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java @@ -21,11 +21,8 @@ import java.util.Iterator; import java.util.List; -import javax.annotation.Nullable; - import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; -import com.palantir.atlasdb.encoding.PtBytes; import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweepingRequest; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.keyvalue.dbkvs.impl.ConnectionSupplier; @@ -35,6 +32,7 @@ import com.palantir.atlasdb.keyvalue.dbkvs.impl.ranges.RangePredicateHelper; import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairInfo; import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairLoader; +import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairToken; import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.SweepQueryHelpers; import com.palantir.nexus.db.DBType; import com.palantir.nexus.db.sql.AgnosticLightResultRow; @@ -72,19 +70,7 @@ private static class PageIterator implements Iterator> { final String tableName; final String prefixedTableName; - private class Token { - byte[] startRowInclusive; - byte[] startColInclusive = PtBytes.EMPTY_BYTE_ARRAY; - @Nullable - Long startTsInclusive = null; - boolean reachedEnd = false; - - Token(byte[] startRowInclusive) { - this.startRowInclusive = startRowInclusive; - } - } - - Token token; + CellTsPairToken token; PageIterator(SqlConnectionSupplier connectionPool, CandidateCellForSweepingRequest request, int sqlRowLimit, String tableName, String prefixedTableName, byte[] startRowInclusive) { @@ -93,7 +79,7 @@ private class Token { this.sqlRowLimit = sqlRowLimit; this.tableName = tableName; this.prefixedTableName = prefixedTableName; - this.token = new Token(startRowInclusive); + this.token = new CellTsPairToken(startRowInclusive); } @Override diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairToken.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairToken.java new file mode 100644 index 00000000000..8f8173abe4c --- /dev/null +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairToken.java @@ -0,0 +1,33 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep; + +import javax.annotation.Nullable; + +import com.palantir.atlasdb.encoding.PtBytes; + +public class CellTsPairToken { + public byte[] startRowInclusive; + public byte[] startColInclusive = PtBytes.EMPTY_BYTE_ARRAY; + @Nullable + public Long startTsInclusive = null; + public boolean reachedEnd = false; + + public CellTsPairToken(byte[] startRowInclusive) { + this.startRowInclusive = startRowInclusive; + } +} From f5de0c23dbb9e4754b438594480bd495e453a950 Mon Sep 17 00:00:00 2001 From: Glenn Sheasby Date: Tue, 17 Oct 2017 16:47:50 +0100 Subject: [PATCH 07/13] Make CellTsPairToken final; replace modification with factory methods --- .../impl/oracle/OracleCellTsPageLoader.java | 23 ++++---------- .../postgres/PostgresCellTsPageLoader.java | 8 ++--- .../dbkvs/impl/sweep/CellTsPairToken.java | 30 +++++++++++++++---- 3 files changed, 32 insertions(+), 29 deletions(-) diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java index 18bc016f2d3..04ed02e4f20 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java @@ -23,7 +23,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; -import com.palantir.atlasdb.encoding.PtBytes; import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweepingRequest; import com.palantir.atlasdb.keyvalue.api.RangeRequests; import com.palantir.atlasdb.keyvalue.api.TableReference; @@ -123,7 +122,7 @@ private static class PageIterator implements Iterator> { this.request = request; this.tableDetails = tableDetails; this.sqlRowLimit = sqlRowLimit; - this.token = new CellTsPairToken(startRowInclusive); + this.token = CellTsPairToken.startRow(startRowInclusive); } @Override @@ -247,33 +246,21 @@ private void computeNextStartPosition(List results, boolean scan // from the lexicographically next row byte[] nextRow = RangeRequests.getNextStartRowUnlessTerminal(false, token.startRowInclusive); if (nextRow == null) { - token.reachedEnd = true; + token = CellTsPairToken.end(); } else { - newRow(nextRow); + token = CellTsPairToken.startRow(nextRow); cellTsPairsAlreadyExaminedInCurrentRow = 0L; } } else { - token.reachedEnd = true; + token = CellTsPairToken.end(); } } else { CellTsPairInfo lastResult = Iterables.getLast(results); Preconditions.checkState(lastResult.ts != Long.MAX_VALUE); - continueRow(lastResult); + token = CellTsPairToken.continueRow(lastResult); } } - private void continueRow(CellTsPairInfo lastResult) { - token.startRowInclusive = lastResult.rowName; - token.startColInclusive = lastResult.colName; - token.startTsInclusive = lastResult.ts + 1; - } - - private void newRow(byte[] nextRow) { - token.startRowInclusive = nextRow; - token.startColInclusive = PtBytes.EMPTY_BYTE_ARRAY; - token.startTsInclusive = null; - } - } private static class TableDetails { diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java index 8f79ff4d61a..62e417fad85 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java @@ -79,7 +79,7 @@ private static class PageIterator implements Iterator> { this.sqlRowLimit = sqlRowLimit; this.tableName = tableName; this.prefixedTableName = prefixedTableName; - this.token = new CellTsPairToken(startRowInclusive); + this.token = CellTsPairToken.startRow(startRowInclusive); } @Override @@ -176,13 +176,11 @@ private FullQuery getQuery() { private void computeNextStartPosition(List results) { if (results.size() < sqlRowLimit) { - token.reachedEnd = true; + token = CellTsPairToken.end(); } else { CellTsPairInfo lastResult = Iterables.getLast(results); Preconditions.checkState(lastResult.ts != Long.MAX_VALUE); - token.startRowInclusive = lastResult.rowName; - token.startColInclusive = lastResult.colName; - token.startTsInclusive = lastResult.ts + 1; + token = CellTsPairToken.continueRow(lastResult); } } } diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairToken.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairToken.java index 8f8173abe4c..afebe95192d 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairToken.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairToken.java @@ -20,14 +20,32 @@ import com.palantir.atlasdb.encoding.PtBytes; -public class CellTsPairToken { - public byte[] startRowInclusive; - public byte[] startColInclusive = PtBytes.EMPTY_BYTE_ARRAY; +public final class CellTsPairToken { + public final byte[] startRowInclusive; + public final byte[] startColInclusive; @Nullable - public Long startTsInclusive = null; - public boolean reachedEnd = false; + public final Long startTsInclusive; + public final boolean reachedEnd; - public CellTsPairToken(byte[] startRowInclusive) { + private CellTsPairToken(byte[] startRowInclusive, + byte[] startColInclusive, + Long startTsInclusive, + boolean reachedEnd) { this.startRowInclusive = startRowInclusive; + this.startColInclusive = startColInclusive; + this.startTsInclusive = startTsInclusive; + this.reachedEnd = reachedEnd; + } + + public static CellTsPairToken startRow(byte[] startRowInclusive) { + return new CellTsPairToken(startRowInclusive, PtBytes.EMPTY_BYTE_ARRAY, null, false); + } + + public static CellTsPairToken continueRow(CellTsPairInfo lastResult) { + return new CellTsPairToken(lastResult.rowName, lastResult.colName, lastResult.ts + 1, false); + } + + public static CellTsPairToken end() { + return new CellTsPairToken(PtBytes.EMPTY_BYTE_ARRAY, PtBytes.EMPTY_BYTE_ARRAY, null, true); } } From 33627f9d3e484d39054951040a3cbdfc477b14a9 Mon Sep 17 00:00:00 2001 From: Glenn Sheasby Date: Tue, 17 Oct 2017 16:53:58 +0100 Subject: [PATCH 08/13] Have computeNextStartPosition return a token --- .../dbkvs/impl/oracle/OracleCellTsPageLoader.java | 12 ++++++------ .../impl/postgres/PostgresCellTsPageLoader.java | 8 ++++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java index 04ed02e4f20..d544687e823 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java @@ -139,7 +139,7 @@ public List next() { boolean singleRow = shouldScanSingleRow(); List cellTsPairs = loadPage(singleRow); updateCountOfExaminedCellTsPairsInCurrentRow(cellTsPairs); - computeNextStartPosition(cellTsPairs, singleRow); + token = computeNextStartPosition(cellTsPairs, singleRow); return cellTsPairs; } @@ -239,25 +239,25 @@ private void updateCountOfExaminedCellTsPairsInCurrentRow(List r } } - private void computeNextStartPosition(List results, boolean scannedSingleRow) { + private CellTsPairToken computeNextStartPosition(List results, boolean scannedSingleRow) { if (results.size() < sqlRowLimit) { if (scannedSingleRow) { // If we scanned a single row and reached the end, we just restart // from the lexicographically next row byte[] nextRow = RangeRequests.getNextStartRowUnlessTerminal(false, token.startRowInclusive); if (nextRow == null) { - token = CellTsPairToken.end(); + return CellTsPairToken.end(); } else { - token = CellTsPairToken.startRow(nextRow); cellTsPairsAlreadyExaminedInCurrentRow = 0L; + return CellTsPairToken.startRow(nextRow); } } else { - token = CellTsPairToken.end(); + return CellTsPairToken.end(); } } else { CellTsPairInfo lastResult = Iterables.getLast(results); Preconditions.checkState(lastResult.ts != Long.MAX_VALUE); - token = CellTsPairToken.continueRow(lastResult); + return CellTsPairToken.continueRow(lastResult); } } diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java index 62e417fad85..cf95ccb8f5c 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java @@ -94,7 +94,7 @@ public boolean hasNext() { public List next() { Preconditions.checkState(hasNext()); List cellTsPairs = loadNextPage(); - computeNextStartPosition(cellTsPairs); + token = computeNextStartPosition(cellTsPairs); return cellTsPairs; } @@ -174,13 +174,13 @@ private FullQuery getQuery() { } } - private void computeNextStartPosition(List results) { + private CellTsPairToken computeNextStartPosition(List results) { if (results.size() < sqlRowLimit) { - token = CellTsPairToken.end(); + return CellTsPairToken.end(); } else { CellTsPairInfo lastResult = Iterables.getLast(results); Preconditions.checkState(lastResult.ts != Long.MAX_VALUE); - token = CellTsPairToken.continueRow(lastResult); + return CellTsPairToken.continueRow(lastResult); } } } From 05ff84bc35d61062266f2ef471dd7f1200907166 Mon Sep 17 00:00:00 2001 From: Glenn Sheasby Date: Tue, 17 Oct 2017 17:03:13 +0100 Subject: [PATCH 09/13] Remove the TODO I don't think we can do it nicely --- .../keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java | 1 - 1 file changed, 1 deletion(-) diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java index d544687e823..49ff1f51ef6 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java @@ -203,7 +203,6 @@ private void appendRangePredicates(boolean singleRow, FullQuery.Builder builder) } else { RangePredicateHelper.create(false, DBType.ORACLE, builder) .startCellTsInclusive( - // TODO can we reuse a value type here? token.startRowInclusive, token.startColInclusive, token.startTsInclusive); From 6c3c05a25f3752e8054e0b0667ee3af97cb4c597 Mon Sep 17 00:00:00 2001 From: Glenn Sheasby Date: Tue, 17 Oct 2017 17:18:11 +0100 Subject: [PATCH 10/13] empty commit From 117277ae729f3a2672bf0acb782de6ef85faa31f Mon Sep 17 00:00:00 2001 From: Glenn Sheasby Date: Tue, 17 Oct 2017 18:55:11 +0100 Subject: [PATCH 11/13] Nits * Move duplicated precondition to CellTsPairToken and add error message * Renames --- .../keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java | 1 - .../dbkvs/impl/postgres/PostgresCellTsPageLoader.java | 7 +++---- .../atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairToken.java | 3 +++ 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java index 49ff1f51ef6..2e4dafe18d6 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java @@ -255,7 +255,6 @@ private CellTsPairToken computeNextStartPosition(List results, b } } else { CellTsPairInfo lastResult = Iterables.getLast(results); - Preconditions.checkState(lastResult.ts != Long.MAX_VALUE); return CellTsPairToken.continueRow(lastResult); } } diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java index cf95ccb8f5c..ee744fc08f7 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java @@ -125,11 +125,11 @@ private List loadNextPage() { } private AgnosticLightResultSet selectNextPage(ConnectionSupplier conns) { - FullQuery query = getQuery(); - return conns.get().selectLightResultSetUnregisteredQuery(query.getQuery(), query.getArgs()); + FullQuery fullQuery = getFullQuery(); + return conns.get().selectLightResultSetUnregisteredQuery(fullQuery.getQuery(), fullQuery.getArgs()); } - private FullQuery getQuery() { + private FullQuery getFullQuery() { if (request.shouldCheckIfLatestValueIsEmpty()) { FullQuery.Builder queryBuilder = FullQuery.builder() .append("/* GET_CANDIDATE_CELLS_FOR_SWEEPING_THOROUGH(").append(tableName).append(") */") @@ -179,7 +179,6 @@ private CellTsPairToken computeNextStartPosition(List results) { return CellTsPairToken.end(); } else { CellTsPairInfo lastResult = Iterables.getLast(results); - Preconditions.checkState(lastResult.ts != Long.MAX_VALUE); return CellTsPairToken.continueRow(lastResult); } } diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairToken.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairToken.java index afebe95192d..b1fe86a3c77 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairToken.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairToken.java @@ -18,6 +18,7 @@ import javax.annotation.Nullable; +import com.google.common.base.Preconditions; import com.palantir.atlasdb.encoding.PtBytes; public final class CellTsPairToken { @@ -42,6 +43,8 @@ public static CellTsPairToken startRow(byte[] startRowInclusive) { } public static CellTsPairToken continueRow(CellTsPairInfo lastResult) { + Preconditions.checkState(lastResult.ts != Long.MAX_VALUE, "Illegal timestamp MAX_VALUE"); + return new CellTsPairToken(lastResult.rowName, lastResult.colName, lastResult.ts + 1, false); } From 751f3ae265b64c0b32e94192dde4b10fd7a39723 Mon Sep 17 00:00:00 2001 From: Glenn Sheasby Date: Tue, 17 Oct 2017 19:02:06 +0100 Subject: [PATCH 12/13] Make CellTsPairToken immutable --- .../impl/oracle/OracleCellTsPageLoader.java | 24 +++++----- .../postgres/PostgresCellTsPageLoader.java | 10 ++-- .../dbkvs/impl/sweep/CellTsPairToken.java | 48 ++++++++++++------- 3 files changed, 50 insertions(+), 32 deletions(-) diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java index 2e4dafe18d6..0b408dc73f7 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java @@ -127,7 +127,7 @@ private static class PageIterator implements Iterator> { @Override public boolean hasNext() { - return !token.reachedEnd; + return !token.reachedEnd(); } // We don't use AbstractIterator to make sure hasNext() is fast and doesn't actually load the next page. @@ -191,21 +191,21 @@ private FullQuery getQuery(boolean singleRow) { private void appendRangePredicates(boolean singleRow, FullQuery.Builder builder) { if (singleRow) { - builder.append(" AND row_name = ?", token.startRowInclusive); - if (token.startColInclusive.length > 0) { - builder.append(" AND col_name >= ?", token.startColInclusive); - if (token.startTsInclusive != null) { + builder.append(" AND row_name = ?", token.startRowInclusive()); + if (token.startColInclusive().length > 0) { + builder.append(" AND col_name >= ?", token.startColInclusive()); + if (token.startTsInclusive() != null) { builder.append(" AND (col_name > ? OR ts >= ?)", - token.startColInclusive, - token.startTsInclusive); + token.startColInclusive(), + token.startTsInclusive()); } } } else { RangePredicateHelper.create(false, DBType.ORACLE, builder) .startCellTsInclusive( - token.startRowInclusive, - token.startColInclusive, - token.startTsInclusive); + token.startRowInclusive(), + token.startColInclusive(), + token.startTsInclusive()); } } @@ -228,7 +228,7 @@ private AgnosticLightResultSet executeQuery(SqlConnection conn, FullQuery query) } private void updateCountOfExaminedCellTsPairsInCurrentRow(List results) { - byte[] currentRow = token.startRowInclusive; + byte[] currentRow = token.startRowInclusive(); for (CellTsPairInfo cellTs : results) { if (!Arrays.equals(currentRow, cellTs.rowName)) { currentRow = cellTs.rowName; @@ -243,7 +243,7 @@ private CellTsPairToken computeNextStartPosition(List results, b if (scannedSingleRow) { // If we scanned a single row and reached the end, we just restart // from the lexicographically next row - byte[] nextRow = RangeRequests.getNextStartRowUnlessTerminal(false, token.startRowInclusive); + byte[] nextRow = RangeRequests.getNextStartRowUnlessTerminal(false, token.startRowInclusive()); if (nextRow == null) { return CellTsPairToken.end(); } else { diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java index ee744fc08f7..83fb4454870 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java @@ -84,7 +84,7 @@ private static class PageIterator implements Iterator> { @Override public boolean hasNext() { - return !token.reachedEnd; + return !token.reachedEnd(); } // We don't use AbstractIterator to make sure hasNext() is fast and doesn't actually load the next page. @@ -144,7 +144,9 @@ private FullQuery getFullQuery() { .append(" WHERE ts < ? ", request.sweepTimestamp()); SweepQueryHelpers.appendIgnoredTimestampPredicate(request, queryBuilder); RangePredicateHelper.create(false, DBType.POSTGRESQL, queryBuilder) - .startCellTsInclusive(token.startRowInclusive, token.startColInclusive, token.startTsInclusive); + .startCellTsInclusive(token.startRowInclusive(), + token.startColInclusive(), + token.startTsInclusive()); return queryBuilder .append(" ORDER BY row_name, col_name, ts") .append(" LIMIT ").append(sqlRowLimit) @@ -166,7 +168,9 @@ private FullQuery getFullQuery() { .append(" WHERE ts < ? ", request.sweepTimestamp()); SweepQueryHelpers.appendIgnoredTimestampPredicate(request, queryBuilder); RangePredicateHelper.create(false, DBType.POSTGRESQL, queryBuilder) - .startCellTsInclusive(token.startRowInclusive, token.startColInclusive, token.startTsInclusive); + .startCellTsInclusive(token.startRowInclusive(), + token.startColInclusive(), + token.startTsInclusive()); return queryBuilder .append(" ORDER BY row_name, col_name, ts") .append(" LIMIT ").append(sqlRowLimit) diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairToken.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairToken.java index b1fe86a3c77..7668f86f295 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairToken.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairToken.java @@ -18,37 +18,51 @@ import javax.annotation.Nullable; +import org.immutables.value.Value; + import com.google.common.base.Preconditions; import com.palantir.atlasdb.encoding.PtBytes; -public final class CellTsPairToken { - public final byte[] startRowInclusive; - public final byte[] startColInclusive; +@Value.Immutable +public abstract class CellTsPairToken { + public abstract byte[] startRowInclusive(); + + @Value.Default + public byte[] startColInclusive() { + return PtBytes.EMPTY_BYTE_ARRAY; + } + @Nullable - public final Long startTsInclusive; - public final boolean reachedEnd; - - private CellTsPairToken(byte[] startRowInclusive, - byte[] startColInclusive, - Long startTsInclusive, - boolean reachedEnd) { - this.startRowInclusive = startRowInclusive; - this.startColInclusive = startColInclusive; - this.startTsInclusive = startTsInclusive; - this.reachedEnd = reachedEnd; + @Value.Default + public Long startTsInclusive() { + return null; + } + + @Value.Default + public boolean reachedEnd() { + return false; } public static CellTsPairToken startRow(byte[] startRowInclusive) { - return new CellTsPairToken(startRowInclusive, PtBytes.EMPTY_BYTE_ARRAY, null, false); + return ImmutableCellTsPairToken.builder() + .startRowInclusive(startRowInclusive) + .build(); } public static CellTsPairToken continueRow(CellTsPairInfo lastResult) { Preconditions.checkState(lastResult.ts != Long.MAX_VALUE, "Illegal timestamp MAX_VALUE"); - return new CellTsPairToken(lastResult.rowName, lastResult.colName, lastResult.ts + 1, false); + return ImmutableCellTsPairToken.builder() + .startRowInclusive(lastResult.rowName) + .startColInclusive(lastResult.colName) + .startTsInclusive(lastResult.ts + 1) + .build(); } public static CellTsPairToken end() { - return new CellTsPairToken(PtBytes.EMPTY_BYTE_ARRAY, PtBytes.EMPTY_BYTE_ARRAY, null, true); + return ImmutableCellTsPairToken.builder() + .startRowInclusive(PtBytes.EMPTY_BYTE_ARRAY) + .reachedEnd(true) + .build(); } } From 34dfa39d26c23b19021879a112b14d8a2c0ff57b Mon Sep 17 00:00:00 2001 From: Glenn Sheasby Date: Wed, 18 Oct 2017 11:18:11 +0100 Subject: [PATCH 13/13] Sort timestamps array, just in case --- .../dbkvs/impl/sweep/CandidateGroupingIteratorTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/atlasdb-dbkvs/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidateGroupingIteratorTest.java b/atlasdb-dbkvs/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidateGroupingIteratorTest.java index cbea9cad34c..8ae67dbf219 100644 --- a/atlasdb-dbkvs/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidateGroupingIteratorTest.java +++ b/atlasdb-dbkvs/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidateGroupingIteratorTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertThat; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.List; import org.junit.Rule; @@ -97,6 +98,7 @@ private static CandidateCellForSweeping candidate(String rowName, long numCellTsPairsExamined, boolean latestValEmpty, long... ts) { + Arrays.sort(ts); return ImmutableCandidateCellForSweeping.builder() .cell(Cell.create(bytes(rowName), bytes(colName))) .isLatestValueEmpty(latestValEmpty)