diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java index ef7302e2f2e..6ac0e31a03e 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java @@ -95,16 +95,16 @@ import com.palantir.atlasdb.keyvalue.dbkvs.impl.batch.BatchingTaskRunner; import com.palantir.atlasdb.keyvalue.dbkvs.impl.batch.ImmediateSingleBatchTaskRunner; import com.palantir.atlasdb.keyvalue.dbkvs.impl.batch.ParallelTaskRunner; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.oracle.OracleCellTsPageLoaderFactory; +import com.palantir.atlasdb.keyvalue.dbkvs.impl.oracle.OracleCellTsPageLoader; import com.palantir.atlasdb.keyvalue.dbkvs.impl.oracle.OracleGetRange; import com.palantir.atlasdb.keyvalue.dbkvs.impl.oracle.OracleOverflowValueLoader; import com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres.DbkvsVersionException; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres.PostgresCellTsPageLoaderFactory; +import com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres.PostgresCellTsPageLoader; import com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres.PostgresGetRange; import com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres.PostgresPrefixedTableNames; import com.palantir.atlasdb.keyvalue.dbkvs.impl.ranges.DbKvsGetRange; import com.palantir.atlasdb.keyvalue.dbkvs.impl.ranges.DbKvsGetRanges; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairLoaderFactory; +import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairLoader; import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.DbKvsGetCandidateCellsForSweeping; import com.palantir.atlasdb.keyvalue.dbkvs.util.DbKvsPartitioners; import com.palantir.atlasdb.keyvalue.impl.AbstractKeyValueService; @@ -182,7 +182,7 @@ private static DbKvs createPostgres(ExecutorService executor, PostgresPrefixedTableNames prefixedTableNames = new PostgresPrefixedTableNames(config); DbTableFactory tableFactory = new PostgresDbTableFactory(config, prefixedTableNames); TableMetadataCache tableMetadataCache = new TableMetadataCache(tableFactory); - CellTsPairLoaderFactory cellTsPairLoaderFactory = new PostgresCellTsPageLoaderFactory( + CellTsPairLoader cellTsPairLoader = new PostgresCellTsPageLoader( prefixedTableNames, connections); return new DbKvs( executor, @@ -192,7 +192,7 @@ private static DbKvs createPostgres(ExecutorService executor, new ParallelTaskRunner(newFixedThreadPool(config.poolSize()), config.fetchBatchSize()), (conns, tbl, ids) -> Collections.emptyMap(), // no overflow on postgres new PostgresGetRange(prefixedTableNames, connections, tableMetadataCache), - new DbKvsGetCandidateCellsForSweeping(cellTsPairLoaderFactory)); + new DbKvsGetCandidateCellsForSweeping(cellTsPairLoader)); } private static DbKvs createOracle(ExecutorService executor, @@ -204,7 +204,7 @@ private static DbKvs createOracle(ExecutorService executor, OverflowValueLoader overflowValueLoader = new OracleOverflowValueLoader(oracleDdlConfig, tableNameGetter); DbKvsGetRange getRange = new OracleGetRange( connections, overflowValueLoader, tableNameGetter, valueStyleCache, oracleDdlConfig); - CellTsPairLoaderFactory cellTsPairLoaderFactory = new OracleCellTsPageLoaderFactory( + CellTsPairLoader cellTsPageLoader = new OracleCellTsPageLoader( connections, tableNameGetter, valueStyleCache, oracleDdlConfig); return new DbKvs( executor, @@ -214,7 +214,7 @@ private static DbKvs createOracle(ExecutorService executor, new ImmediateSingleBatchTaskRunner(), overflowValueLoader, getRange, - new DbKvsGetCandidateCellsForSweeping(cellTsPairLoaderFactory)); + new DbKvsGetCandidateCellsForSweeping(cellTsPageLoader)); } private DbKvs(ExecutorService executor, diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoaderFactory.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java similarity index 63% rename from atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoaderFactory.java rename to atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java index 40ccfd29b73..0b408dc73f7 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoaderFactory.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/oracle/OracleCellTsPageLoader.java @@ -17,9 +17,14 @@ package com.palantir.atlasdb.keyvalue.dbkvs.impl.oracle; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; import java.util.List; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweepingRequest; +import com.palantir.atlasdb.keyvalue.api.RangeRequests; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.keyvalue.dbkvs.OracleDdlConfig; import com.palantir.atlasdb.keyvalue.dbkvs.OracleTableNameGetter; @@ -29,10 +34,9 @@ import com.palantir.atlasdb.keyvalue.dbkvs.impl.TableValueStyle; import com.palantir.atlasdb.keyvalue.dbkvs.impl.TableValueStyleCache; import com.palantir.atlasdb.keyvalue.dbkvs.impl.ranges.RangePredicateHelper; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CandidatePagingState.CellTsPairInfo; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CandidatePagingState.StartingPosition; +import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairInfo; import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairLoader; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairLoaderFactory; +import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairToken; import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.SweepQueryHelpers; import com.palantir.atlasdb.keyvalue.impl.TableMappingNotFoundException; import com.palantir.nexus.db.DBType; @@ -73,7 +77,7 @@ // predicate and '(col_name > ? OR ts >= ?)' as the filter predicate. Once we exhaust all results // in the row, we switch back to the "normal" mode, starting from the beginning of the lexicographically // next row. -public class OracleCellTsPageLoaderFactory implements CellTsPairLoaderFactory { +public class OracleCellTsPageLoader implements CellTsPairLoader { private final SqlConnectionSupplier connectionPool; private final OracleTableNameGetter tableNameGetter; private final TableValueStyleCache valueStyleCache; @@ -81,10 +85,10 @@ public class OracleCellTsPageLoaderFactory implements CellTsPairLoaderFactory { private static final int DEFAULT_BATCH_SIZE = 1000; - public OracleCellTsPageLoaderFactory(SqlConnectionSupplier connectionPool, - OracleTableNameGetter tableNameGetter, - TableValueStyleCache valueStyleCache, - OracleDdlConfig config) { + public OracleCellTsPageLoader(SqlConnectionSupplier connectionPool, + OracleTableNameGetter tableNameGetter, + TableValueStyleCache valueStyleCache, + OracleDdlConfig config) { this.connectionPool = connectionPool; this.tableNameGetter = tableNameGetter; this.valueStyleCache = valueStyleCache; @@ -92,43 +96,61 @@ public OracleCellTsPageLoaderFactory(SqlConnectionSupplier connectionPool, } @Override - public CellTsPairLoader createCellTsLoader(TableReference tableRef, CandidateCellForSweepingRequest request) { + public Iterator> createPageIterator(TableReference tableRef, + CandidateCellForSweepingRequest request) { TableDetails tableDetails = getTableDetailsUsingNewConnection(tableRef); - return new Loader(connectionPool, request, tableDetails, request.batchSizeHint().orElse(DEFAULT_BATCH_SIZE)); + return new PageIterator( + connectionPool, + request, + tableDetails, + Math.max(1, request.batchSizeHint().orElse(DEFAULT_BATCH_SIZE)), + request.startRowInclusive()); } - private static class Loader implements CellTsPairLoader { - private final CandidateCellForSweepingRequest request; - private final TableDetails tableDetails; - private final int sqlRowLimit; - private final SqlConnectionSupplier connectionPool; + private static class PageIterator implements Iterator> { + final SqlConnectionSupplier connectionPool; + final CandidateCellForSweepingRequest request; + final TableDetails tableDetails; + final int sqlRowLimit; - Loader(SqlConnectionSupplier connectionPool, - CandidateCellForSweepingRequest request, - TableDetails tableDetails, - int sqlRowLimit) { + CellTsPairToken token; + long cellTsPairsAlreadyExaminedInCurrentRow = 0L; + + PageIterator(SqlConnectionSupplier connectionPool, CandidateCellForSweepingRequest request, + TableDetails tableDetails, int sqlRowLimit, byte[] startRowInclusive) { this.connectionPool = connectionPool; this.request = request; this.tableDetails = tableDetails; this.sqlRowLimit = sqlRowLimit; + this.token = CellTsPairToken.startRow(startRowInclusive); + } + + @Override + public boolean hasNext() { + return !token.reachedEnd(); } + // We don't use AbstractIterator to make sure hasNext() is fast and doesn't actually load the next page. + // As a downside, our iterator can return an empty page at the end. + // However, we can just filter out empty pages later. @Override - public Page loadNextPage(StartingPosition startInclusive, - long cellsAlreadyExaminedInStartingRow) { - boolean singleRow = shouldScanSingleRow(cellsAlreadyExaminedInStartingRow); - List cellTsPairs = loadPage(startInclusive, singleRow); - return new Page(cellTsPairs, singleRow, cellTsPairs.size() < sqlRowLimit); + public List next() { + Preconditions.checkState(hasNext()); + boolean singleRow = shouldScanSingleRow(); + List cellTsPairs = loadPage(singleRow); + updateCountOfExaminedCellTsPairsInCurrentRow(cellTsPairs); + token = computeNextStartPosition(cellTsPairs, singleRow); + return cellTsPairs; } - private boolean shouldScanSingleRow(long cellsAlreadyExaminedInCurrentRow) { + private boolean shouldScanSingleRow() { // The idea is that we don't want to throw away more than N database rows in order to get N results. // This only matters for tables with wide rows. We could tweak this. - return cellsAlreadyExaminedInCurrentRow > sqlRowLimit; + return cellTsPairsAlreadyExaminedInCurrentRow > sqlRowLimit; } - private List loadPage(StartingPosition startInclusive, boolean singleRow) { - FullQuery query = getQuery(startInclusive, singleRow); + private List loadPage(boolean singleRow) { + FullQuery query = getQuery(singleRow); try (ConnectionSupplier conns = new ConnectionSupplier(connectionPool); AgnosticLightResultSet resultSet = executeQuery(conns.get(), query)) { List ret = new ArrayList<>(); @@ -143,14 +165,7 @@ private List loadPage(StartingPosition startInclusive, boolean s } } - private AgnosticLightResultSet executeQuery(SqlConnection conn, FullQuery query) { - return conn.selectLightResultSetUnregisteredQueryWithFetchSize( - query.getQuery(), - sqlRowLimit, - query.getArgs()); - } - - private FullQuery getQuery(StartingPosition startingPos, boolean singleRow) { + private FullQuery getQuery(boolean singleRow) { String pkIndex = PrimaryKeyConstraintNames.get(tableDetails.shortName); FullQuery.Builder queryBuilder = FullQuery.builder() .append("/* GET_CANDIDATE_CELLS_FOR_SWEEPING */ ") @@ -166,7 +181,7 @@ private FullQuery getQuery(StartingPosition startingPos, boolean singleRow) { .append(" FROM ").append(tableDetails.shortName).append(" t") .append(" WHERE ts < ? ", request.sweepTimestamp()); SweepQueryHelpers.appendIgnoredTimestampPredicate(request, queryBuilder); - appendRangePredicates(startingPos, singleRow, queryBuilder); + appendRangePredicates(singleRow, queryBuilder); return queryBuilder .append(" ORDER BY row_name, col_name, ts") .append(") WHERE rownum <= ").append(sqlRowLimit) @@ -174,18 +189,23 @@ private FullQuery getQuery(StartingPosition startingPos, boolean singleRow) { .build(); } - private void appendRangePredicates(StartingPosition startingPos, boolean singleRow, FullQuery.Builder builder) { + private void appendRangePredicates(boolean singleRow, FullQuery.Builder builder) { if (singleRow) { - builder.append(" AND row_name = ?", startingPos.rowName); - if (startingPos.colName.length > 0) { - builder.append(" AND col_name >= ?", startingPos.colName); - if (startingPos.timestamp != null) { - builder.append(" AND (col_name > ? OR ts >= ?)", startingPos.colName, startingPos.timestamp); + builder.append(" AND row_name = ?", token.startRowInclusive()); + if (token.startColInclusive().length > 0) { + builder.append(" AND col_name >= ?", token.startColInclusive()); + if (token.startTsInclusive() != null) { + builder.append(" AND (col_name > ? OR ts >= ?)", + token.startColInclusive(), + token.startTsInclusive()); } } } else { RangePredicateHelper.create(false, DBType.ORACLE, builder) - .startCellTsInclusive(startingPos.rowName, startingPos.colName, startingPos.timestamp); + .startCellTsInclusive( + token.startRowInclusive(), + token.startColInclusive(), + token.startTsInclusive()); } } @@ -199,6 +219,46 @@ private void appendEmptyValueFlagSelector(FullQuery.Builder builder) { } builder.append(" THEN 1 ELSE 0 END AS empty_val"); } + + private AgnosticLightResultSet executeQuery(SqlConnection conn, FullQuery query) { + return conn.selectLightResultSetUnregisteredQueryWithFetchSize( + query.getQuery(), + sqlRowLimit, + query.getArgs()); + } + + private void updateCountOfExaminedCellTsPairsInCurrentRow(List results) { + byte[] currentRow = token.startRowInclusive(); + for (CellTsPairInfo cellTs : results) { + if (!Arrays.equals(currentRow, cellTs.rowName)) { + currentRow = cellTs.rowName; + cellTsPairsAlreadyExaminedInCurrentRow = 0L; + } + cellTsPairsAlreadyExaminedInCurrentRow += 1; + } + } + + private CellTsPairToken computeNextStartPosition(List results, boolean scannedSingleRow) { + if (results.size() < sqlRowLimit) { + if (scannedSingleRow) { + // If we scanned a single row and reached the end, we just restart + // from the lexicographically next row + byte[] nextRow = RangeRequests.getNextStartRowUnlessTerminal(false, token.startRowInclusive()); + if (nextRow == null) { + return CellTsPairToken.end(); + } else { + cellTsPairsAlreadyExaminedInCurrentRow = 0L; + return CellTsPairToken.startRow(nextRow); + } + } else { + return CellTsPairToken.end(); + } + } else { + CellTsPairInfo lastResult = Iterables.getLast(results); + return CellTsPairToken.continueRow(lastResult); + } + } + } private static class TableDetails { @@ -209,7 +269,6 @@ private static class TableDetails { this.shortName = shortName; this.hasOverflow = hasOverflow; } - } private TableDetails getTableDetailsUsingNewConnection(TableReference tableRef) { diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/CandidatePageJoiningIterator.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/CandidatePageJoiningIterator.java deleted file mode 100644 index 0918e8b0700..00000000000 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/CandidatePageJoiningIterator.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres; - -import java.util.Iterator; -import java.util.List; - -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.Lists; -import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweeping; - -public class CandidatePageJoiningIterator extends AbstractIterator> { - private final Iterator> sourceIterator; - private final int minPageSize; - - public CandidatePageJoiningIterator(Iterator> sourceIterator, int minPageSize) { - this.sourceIterator = sourceIterator; - this.minPageSize = minPageSize; - } - - @Override - protected List computeNext() { - if (!sourceIterator.hasNext()) { - return endOfData(); - } else { - List page = sourceIterator.next(); - int pageSize = getSizeOfPage(page); - if (pageSize >= minPageSize) { - return page; - } else { - List ret = Lists.newArrayList(); - int retSize = pageSize; - ret.addAll(page); - // The order is important: calling hasNext() is expensive, - // so we should check first if we reached the desired page size - while (retSize < minPageSize && sourceIterator.hasNext()) { - List nextPage = sourceIterator.next(); - retSize += getSizeOfPage(nextPage); - ret.addAll(nextPage); - } - return ret.isEmpty() ? endOfData() : ret; - } - } - } - - private static int getSizeOfPage(List page) { - // If a cell doesn't have any timestamps (i.e., is not a candidate), we don't want count - // it as zero because it still takes memory. Hence max(1, ...). - return page.stream().mapToInt(c -> Math.max(1, c.sortedTimestamps().length)).sum(); - } -} diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoaderFactory.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java similarity index 60% rename from atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoaderFactory.java rename to atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java index d469a2b3a30..83fb4454870 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoaderFactory.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresCellTsPageLoader.java @@ -18,8 +18,11 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweepingRequest; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.keyvalue.dbkvs.impl.ConnectionSupplier; @@ -27,94 +30,106 @@ import com.palantir.atlasdb.keyvalue.dbkvs.impl.FullQuery; import com.palantir.atlasdb.keyvalue.dbkvs.impl.SqlConnectionSupplier; import com.palantir.atlasdb.keyvalue.dbkvs.impl.ranges.RangePredicateHelper; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CandidatePagingState; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CandidatePagingState.StartingPosition; +import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairInfo; import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairLoader; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairLoaderFactory; +import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairToken; import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.SweepQueryHelpers; import com.palantir.nexus.db.DBType; import com.palantir.nexus.db.sql.AgnosticLightResultRow; import com.palantir.nexus.db.sql.AgnosticLightResultSet; -public class PostgresCellTsPageLoaderFactory implements CellTsPairLoaderFactory { +public class PostgresCellTsPageLoader implements CellTsPairLoader { private final PostgresPrefixedTableNames prefixedTableNames; private final SqlConnectionSupplier connectionPool; private static final int DEFAULT_BATCH_SIZE = 1000; - public PostgresCellTsPageLoaderFactory(PostgresPrefixedTableNames prefixedTableNames, + public PostgresCellTsPageLoader(PostgresPrefixedTableNames prefixedTableNames, SqlConnectionSupplier connectionPool) { this.prefixedTableNames = prefixedTableNames; this.connectionPool = connectionPool; } @Override - public CellTsPairLoader createCellTsLoader(TableReference tableRef, CandidateCellForSweepingRequest request) { - return new Loader(connectionPool, + public Iterator> createPageIterator(TableReference tableRef, + CandidateCellForSweepingRequest request) { + return new PageIterator( + connectionPool, request, - request.batchSizeHint().orElse(DEFAULT_BATCH_SIZE), + Math.max(1, request.batchSizeHint().orElse(DEFAULT_BATCH_SIZE)), DbKvs.internalTableName(tableRef), - prefixedTableNames.get(tableRef)); + prefixedTableNames.get(tableRef), + request.startRowInclusive()); } - private static class Loader implements CellTsPairLoader { + private static class PageIterator implements Iterator> { final SqlConnectionSupplier connectionPool; final CandidateCellForSweepingRequest request; final int sqlRowLimit; final String tableName; final String prefixedTableName; - Loader(SqlConnectionSupplier connectionPool, - CandidateCellForSweepingRequest request, - int sqlRowLimit, - String tableName, - String prefixedTableName) { + CellTsPairToken token; + + PageIterator(SqlConnectionSupplier connectionPool, CandidateCellForSweepingRequest request, int sqlRowLimit, + String tableName, String prefixedTableName, byte[] startRowInclusive) { this.connectionPool = connectionPool; this.request = request; this.sqlRowLimit = sqlRowLimit; this.tableName = tableName; this.prefixedTableName = prefixedTableName; + this.token = CellTsPairToken.startRow(startRowInclusive); } @Override - public Page loadNextPage(StartingPosition startInclusive, - long cellsAlreadyExaminedInStartingRow) { - try (ConnectionSupplier conns = new ConnectionSupplier(connectionPool); - AgnosticLightResultSet resultSet = selectNextPage(conns, startInclusive)) { - List cellTsPairs = readResultSet(resultSet); - return new Page(cellTsPairs, false, cellTsPairs.size() < sqlRowLimit); - } + public boolean hasNext() { + return !token.reachedEnd(); } - private AgnosticLightResultSet selectNextPage(ConnectionSupplier conns, StartingPosition startingPosition) { - FullQuery query = getQuery(startingPosition); - return conns.get().selectLightResultSetUnregisteredQuery(query.getQuery(), query.getArgs()); + // We don't use AbstractIterator to make sure hasNext() is fast and doesn't actually load the next page. + // As a downside, our iterator can return an empty page at the end. + // However, we can just filter out empty pages later. + @Override + public List next() { + Preconditions.checkState(hasNext()); + List cellTsPairs = loadNextPage(); + token = computeNextStartPosition(cellTsPairs); + return cellTsPairs; } - private List readResultSet(AgnosticLightResultSet resultSet) { - List ret = new ArrayList<>(); - for (AgnosticLightResultRow row : resultSet) { - byte[] rowName = row.getBytes("row_name"); - byte[] colName = row.getBytes("col_name"); - if (request.shouldCheckIfLatestValueIsEmpty()) { - long[] sortedTimestamps = castAndSortTimestamps((Object[]) row.getArray("timestamps")); - boolean isLatestValEmpty = row.getBoolean("latest_val_empty"); - for (int i = 0; i < sortedTimestamps.length - 1; ++i) { - ret.add(new CandidatePagingState.CellTsPairInfo(rowName, colName, sortedTimestamps[i], false)); + private List loadNextPage() { + try (ConnectionSupplier conns = new ConnectionSupplier(connectionPool); + AgnosticLightResultSet resultSet = selectNextPage(conns)) { + List ret = new ArrayList<>(); + for (AgnosticLightResultRow row : resultSet) { + byte[] rowName = row.getBytes("row_name"); + byte[] colName = row.getBytes("col_name"); + if (request.shouldCheckIfLatestValueIsEmpty()) { + long[] sortedTimestamps = castAndSortTimestamps((Object[]) row.getArray("timestamps")); + boolean isLatestValEmpty = row.getBoolean("latest_val_empty"); + for (int i = 0; i < sortedTimestamps.length - 1; ++i) { + ret.add(new CellTsPairInfo(rowName, colName, sortedTimestamps[i], false)); + } + // For the maximum timestamp, we know whether its value is empty or not, + // so we handle it separately + ret.add(new CellTsPairInfo( + rowName, colName, sortedTimestamps[sortedTimestamps.length - 1], isLatestValEmpty)); + } else { + long ts = row.getLong("ts"); + ret.add(new CellTsPairInfo(rowName, colName, ts, false)); } - // For the maximum timestamp, we know whether its value is empty or not, so we handle it separately - ret.add(new CandidatePagingState.CellTsPairInfo( - rowName, colName, sortedTimestamps[sortedTimestamps.length - 1], isLatestValEmpty)); - } else { - long ts = row.getLong("ts"); - ret.add(new CandidatePagingState.CellTsPairInfo(rowName, colName, ts, false)); } + return ret; } - return ret; } - private FullQuery getQuery(StartingPosition startingPos) { + private AgnosticLightResultSet selectNextPage(ConnectionSupplier conns) { + FullQuery fullQuery = getFullQuery(); + return conns.get().selectLightResultSetUnregisteredQuery(fullQuery.getQuery(), fullQuery.getArgs()); + } + + private FullQuery getFullQuery() { if (request.shouldCheckIfLatestValueIsEmpty()) { FullQuery.Builder queryBuilder = FullQuery.builder() .append("/* GET_CANDIDATE_CELLS_FOR_SWEEPING_THOROUGH(").append(tableName).append(") */") @@ -129,7 +144,9 @@ private FullQuery getQuery(StartingPosition startingPos) { .append(" WHERE ts < ? ", request.sweepTimestamp()); SweepQueryHelpers.appendIgnoredTimestampPredicate(request, queryBuilder); RangePredicateHelper.create(false, DBType.POSTGRESQL, queryBuilder) - .startCellTsInclusive(startingPos.rowName, startingPos.colName, startingPos.timestamp); + .startCellTsInclusive(token.startRowInclusive(), + token.startColInclusive(), + token.startTsInclusive()); return queryBuilder .append(" ORDER BY row_name, col_name, ts") .append(" LIMIT ").append(sqlRowLimit) @@ -151,13 +168,24 @@ private FullQuery getQuery(StartingPosition startingPos) { .append(" WHERE ts < ? ", request.sweepTimestamp()); SweepQueryHelpers.appendIgnoredTimestampPredicate(request, queryBuilder); RangePredicateHelper.create(false, DBType.POSTGRESQL, queryBuilder) - .startCellTsInclusive(startingPos.rowName, startingPos.colName, startingPos.timestamp); + .startCellTsInclusive(token.startRowInclusive(), + token.startColInclusive(), + token.startTsInclusive()); return queryBuilder .append(" ORDER BY row_name, col_name, ts") .append(" LIMIT ").append(sqlRowLimit) .build(); } } + + private CellTsPairToken computeNextStartPosition(List results) { + if (results.size() < sqlRowLimit) { + return CellTsPairToken.end(); + } else { + CellTsPairInfo lastResult = Iterables.getLast(results); + return CellTsPairToken.continueRow(lastResult); + } + } } // Postgres doesn't guarantee the order of results of ARRAY_AGG, so we sort the timestamps ourselves. diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidateGroupingIterator.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidateGroupingIterator.java new file mode 100644 index 00000000000..35fa17a9171 --- /dev/null +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidateGroupingIterator.java @@ -0,0 +1,120 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; + +import com.google.common.base.Preconditions; +import com.palantir.atlasdb.encoding.PtBytes; +import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweeping; +import com.palantir.atlasdb.keyvalue.api.Cell; +import com.palantir.atlasdb.keyvalue.api.ImmutableCandidateCellForSweeping; + +import gnu.trove.list.TLongList; +import gnu.trove.list.array.TLongArrayList; + +public final class CandidateGroupingIterator implements Iterator> { + private final Iterator> cellTsIterator; + + private byte[] currentRowName = PtBytes.EMPTY_BYTE_ARRAY; + private byte[] currentColName = PtBytes.EMPTY_BYTE_ARRAY; + private final TLongList currentCellTimestamps = new TLongArrayList(); + private boolean currentIsLatestValueEmpty = false; + private long cellTsPairsExamined = 0L; + + private CandidateGroupingIterator(Iterator> cellTsIterator) { + this.cellTsIterator = cellTsIterator; + } + + /** The 'cellTsIterator' is expected to return (cell, ts) pairs in strict lexicographically increasing order. + **/ + public static Iterator> create(Iterator> cellTsIterator) { + return new CandidateGroupingIterator(cellTsIterator); + } + + @Override + public boolean hasNext() { + return cellTsIterator.hasNext(); + } + + @Override + public List next() { + Preconditions.checkState(hasNext()); + List cellTsBatch = cellTsIterator.next(); + List candidates = new ArrayList<>(); + for (CellTsPairInfo cellTs : cellTsBatch) { + checkCurrentCellAndUpdateIfNecessary(cellTs).ifPresent(candidates::add); + if (currentCellTimestamps.size() > 0) { + // We expect the timestamps in ascending order. This check costs us a few CPU cycles + // but it's worth it for paranoia reasons - mistaking a cell with data for an empty one + // can cause data corruption. + Preconditions.checkArgument(cellTs.ts > currentCellTimestamps.get(currentCellTimestamps.size() - 1), + "Timestamps for each cell must be fed in strictly increasing order"); + } + updateStateAfterSingleCellTsPairProcessed(cellTs); + } + if (!cellTsIterator.hasNext()) { + getCurrentCandidate().ifPresent(candidates::add); + } + return candidates; + } + + private void updateStateAfterSingleCellTsPairProcessed(CellTsPairInfo cellTs) { + currentIsLatestValueEmpty = cellTs.hasEmptyValue; + currentCellTimestamps.add(cellTs.ts); + cellTsPairsExamined += 1; + } + + private Optional checkCurrentCellAndUpdateIfNecessary(CellTsPairInfo cellTs) { + if (isCurrentCell(cellTs)) { + return Optional.empty(); + } else { + Optional candidate = getCurrentCandidate(); + updateStateForNewCell(cellTs); + return candidate; + } + } + + private boolean isCurrentCell(CellTsPairInfo cellTs) { + return Arrays.equals(currentRowName, cellTs.rowName) && Arrays.equals(currentColName, cellTs.colName); + } + + private Optional getCurrentCandidate() { + if (currentCellTimestamps.isEmpty()) { + return Optional.empty(); + } else { + return Optional.of(ImmutableCandidateCellForSweeping.builder() + .cell(Cell.create(currentRowName, currentColName)) + .sortedTimestamps(currentCellTimestamps.toArray()) + .isLatestValueEmpty(currentIsLatestValueEmpty) + .numCellsTsPairsExamined(cellTsPairsExamined) + .build()); + } + } + + private void updateStateForNewCell(CellTsPairInfo cell) { + currentCellTimestamps.clear(); + currentRowName = cell.rowName; + currentColName = cell.colName; + currentIsLatestValueEmpty = false; + } + +} diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidatePagingState.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidatePagingState.java deleted file mode 100644 index 724cf818a91..00000000000 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidatePagingState.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Optional; - -import javax.annotation.Nullable; - -import com.google.common.base.Preconditions; -import com.palantir.atlasdb.encoding.PtBytes; -import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweeping; -import com.palantir.atlasdb.keyvalue.api.Cell; -import com.palantir.atlasdb.keyvalue.api.ImmutableCandidateCellForSweeping; -import com.palantir.atlasdb.keyvalue.api.RangeRequests; - -import gnu.trove.list.TLongList; -import gnu.trove.list.array.TLongArrayList; - -public final class CandidatePagingState { - private byte[] currentRowName; - private byte[] currentColName = PtBytes.EMPTY_BYTE_ARRAY; - private Long maxSeenTimestampForCurrentCell = null; - private final TLongList currentCellTimestamps = new TLongArrayList(); - private boolean currentIsLatestValueEmpty = false; - private long cellTsPairsExamined = 0L; - private long cellTsPairsExaminedInCurrentRow = 0L; - private boolean reachedEnd = false; - - private CandidatePagingState(byte[] currentRowName) { - this.currentRowName = currentRowName; - } - - public static CandidatePagingState create(byte[] startRowInclusive) { - return new CandidatePagingState(startRowInclusive); - } - - /** - * Semantically different from Cassandra's CellWithTimestamp, because: - * (a) colName can be an empty byte array (indicating starting from the beginning of the row) - * (b) timestamp is Nullable. - */ - public static class StartingPosition { - public final byte[] rowName; - public final byte[] colName; - @Nullable public final Long timestamp; - - public StartingPosition(byte[] rowName, byte[] colName, Long timestamp) { - this.rowName = rowName; - this.colName = colName; - this.timestamp = timestamp; - } - } - - public static class CellTsPairInfo { - public final byte[] rowName; - public final byte[] colName; - public final long ts; - public final boolean isEmptyValue; - - public CellTsPairInfo(byte[] rowName, byte[] colName, long ts, boolean isEmptyValue) { - this.rowName = rowName; - this.colName = colName; - this.ts = ts; - this.isEmptyValue = isEmptyValue; - } - } - - /** The caller is expected to feed (cell, ts) pairs in strict lexicographically increasing order, - * even across several batches. - **/ - public List processBatch(List cellTsPairs, boolean reachedEndOfResults) { - List candidates = new ArrayList<>(); - for (CellTsPairInfo cellTs : cellTsPairs) { - checkCurrentCellAndUpdateIfNecessary(cellTs).ifPresent(candidates::add); - if (maxSeenTimestampForCurrentCell != null) { - // We expect the timestamps in ascending order. This check costs us a few CPU cycles - // but it's worth it for paranoia reasons - mistaking a cell with data for an empty one - // can cause data corruption. - Preconditions.checkArgument(cellTs.ts > maxSeenTimestampForCurrentCell, - "Timestamps for each cell must be fed in strictly increasing order"); - } - updateStateAfterSingleCellProcessed(cellTs); - } - if (reachedEndOfResults) { - getCurrentCandidate().ifPresent(candidates::add); - reachedEnd = true; - } - return candidates; - } - - private Optional checkCurrentCellAndUpdateIfNecessary(CellTsPairInfo cellTs) { - boolean sameRow = Arrays.equals(currentRowName, cellTs.rowName); - boolean sameCol = Arrays.equals(currentColName, cellTs.colName); - if (!sameRow) { - cellTsPairsExaminedInCurrentRow = 0L; - } - - if (sameRow && sameCol) { - return Optional.empty(); - } else { - Optional candidate = getCurrentCandidate(); - updateStateForNewCell(cellTs); - return candidate; - } - } - - private Optional getCurrentCandidate() { - if (currentCellTimestamps.isEmpty()) { - return Optional.empty(); - } else { - return Optional.of(ImmutableCandidateCellForSweeping.builder() - .cell(Cell.create(currentRowName, currentColName)) - .sortedTimestamps(currentCellTimestamps.toArray()) - .isLatestValueEmpty(currentIsLatestValueEmpty) - .numCellsTsPairsExamined(cellTsPairsExamined) - .build()); - } - } - - private void updateStateForNewCell(CellTsPairInfo cell) { - currentCellTimestamps.clear(); - maxSeenTimestampForCurrentCell = null; - currentRowName = cell.rowName; - currentColName = cell.colName; - currentIsLatestValueEmpty = false; - } - - private void updateStateAfterSingleCellProcessed(CellTsPairInfo cellTs) { - maxSeenTimestampForCurrentCell = cellTs.ts; - currentIsLatestValueEmpty = cellTs.isEmptyValue; - currentCellTimestamps.add(cellTs.ts); - cellTsPairsExamined += 1; - cellTsPairsExaminedInCurrentRow += 1; - } - - public Optional getNextStartingPosition() { - if (reachedEnd) { - return Optional.empty(); - } else { - return Optional.of(new StartingPosition(currentRowName, currentColName, getNextStartTimestamp())); - } - } - - @Nullable - private Long getNextStartTimestamp() { - if (maxSeenTimestampForCurrentCell == null) { - return null; - } else { - // This can never happen because we request 'WHERE ts < ?', where '?' is some 'long' value. - // So if the timestamp is strictly less than another 'long', it can not be Long.MAX_VALUE. - // But we check anyway for general paranoia reasons. - Preconditions.checkState(maxSeenTimestampForCurrentCell != Long.MAX_VALUE, - "Timestamps must be strictly less than Long.MAX_VALUE"); - return maxSeenTimestampForCurrentCell + 1; - } - } - - public long getCellTsPairsExaminedInCurrentRow() { - return cellTsPairsExaminedInCurrentRow; - } - - public void restartFromNextRow() { - @Nullable byte[] nextRow = RangeRequests.getNextStartRowUnlessTerminal(false, currentRowName); - if (nextRow == null) { - reachedEnd = true; - } else { - updateStateForNewRow(nextRow); - } - } - - private void updateStateForNewRow(byte[] nextRow) { - currentRowName = nextRow; - currentColName = PtBytes.EMPTY_BYTE_ARRAY; - maxSeenTimestampForCurrentCell = null; - currentCellTimestamps.clear(); - currentIsLatestValueEmpty = false; - cellTsPairsExaminedInCurrentRow = 0L; - reachedEnd = false; - } - -} diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairLoaderFactory.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairInfo.java similarity index 63% rename from atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairLoaderFactory.java rename to atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairInfo.java index bb3fd522558..77ddd6f6fb7 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairLoaderFactory.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairInfo.java @@ -16,11 +16,16 @@ package com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep; -import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweepingRequest; -import com.palantir.atlasdb.keyvalue.api.TableReference; - -public interface CellTsPairLoaderFactory { - - CellTsPairLoader createCellTsLoader(TableReference tableRef, CandidateCellForSweepingRequest request); +public class CellTsPairInfo { + public final byte[] rowName; + public final byte[] colName; + public final long ts; + public final boolean hasEmptyValue; + public CellTsPairInfo(byte[] rowName, byte[] colName, long ts, boolean hasEmptyValue) { + this.rowName = rowName; + this.colName = colName; + this.ts = ts; + this.hasEmptyValue = hasEmptyValue; + } } diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairLoader.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairLoader.java index a66de224d1f..0cf02ed6216 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairLoader.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairLoader.java @@ -16,25 +16,14 @@ package com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep; +import java.util.Iterator; import java.util.List; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CandidatePagingState.CellTsPairInfo; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CandidatePagingState.StartingPosition; +import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweepingRequest; +import com.palantir.atlasdb.keyvalue.api.TableReference; public interface CellTsPairLoader { - class Page { - public final List entries; - public final boolean scannedSingleRow; - public final boolean reachedEnd; - - public Page(List entries, boolean scannedSingleRow, boolean reachedEnd) { - this.entries = entries; - this.scannedSingleRow = scannedSingleRow; - this.reachedEnd = reachedEnd; - } - } - - Page loadNextPage(StartingPosition startInclusive, long cellsAlreadyExaminedInStartingRow); + Iterator> createPageIterator(TableReference tableRef, CandidateCellForSweepingRequest request); } diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairToken.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairToken.java new file mode 100644 index 00000000000..7668f86f295 --- /dev/null +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CellTsPairToken.java @@ -0,0 +1,68 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep; + +import javax.annotation.Nullable; + +import org.immutables.value.Value; + +import com.google.common.base.Preconditions; +import com.palantir.atlasdb.encoding.PtBytes; + +@Value.Immutable +public abstract class CellTsPairToken { + public abstract byte[] startRowInclusive(); + + @Value.Default + public byte[] startColInclusive() { + return PtBytes.EMPTY_BYTE_ARRAY; + } + + @Nullable + @Value.Default + public Long startTsInclusive() { + return null; + } + + @Value.Default + public boolean reachedEnd() { + return false; + } + + public static CellTsPairToken startRow(byte[] startRowInclusive) { + return ImmutableCellTsPairToken.builder() + .startRowInclusive(startRowInclusive) + .build(); + } + + public static CellTsPairToken continueRow(CellTsPairInfo lastResult) { + Preconditions.checkState(lastResult.ts != Long.MAX_VALUE, "Illegal timestamp MAX_VALUE"); + + return ImmutableCellTsPairToken.builder() + .startRowInclusive(lastResult.rowName) + .startColInclusive(lastResult.colName) + .startTsInclusive(lastResult.ts + 1) + .build(); + } + + public static CellTsPairToken end() { + return ImmutableCellTsPairToken.builder() + .startRowInclusive(PtBytes.EMPTY_BYTE_ARRAY) + .reachedEnd(true) + .build(); + } +} diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/DbKvsGetCandidateCellsForSweeping.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/DbKvsGetCandidateCellsForSweeping.java index baae915db89..2923c58f392 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/DbKvsGetCandidateCellsForSweeping.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/DbKvsGetCandidateCellsForSweeping.java @@ -18,61 +18,26 @@ import java.util.Iterator; import java.util.List; -import java.util.Optional; -import com.google.common.collect.AbstractIterator; import com.google.common.collect.Iterators; import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweeping; import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweepingRequest; import com.palantir.atlasdb.keyvalue.api.TableReference; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CandidatePagingState.StartingPosition; public class DbKvsGetCandidateCellsForSweeping { - private final CellTsPairLoaderFactory cellTsPairLoaderFactory; + private final CellTsPairLoader cellTsPairLoader; - public DbKvsGetCandidateCellsForSweeping(CellTsPairLoaderFactory cellTsPairLoaderFactory) { - this.cellTsPairLoaderFactory = cellTsPairLoaderFactory; + public DbKvsGetCandidateCellsForSweeping(CellTsPairLoader cellTsPairLoader) { + this.cellTsPairLoader = cellTsPairLoader; } public Iterator> getCandidateCellsForSweeping( TableReference tableRef, CandidateCellForSweepingRequest request) { - CellTsPairLoader loader = cellTsPairLoaderFactory.createCellTsLoader(tableRef, request); - CandidatePagingState state = CandidatePagingState.create(request.startRowInclusive()); - return Iterators.filter(new PageIterator(loader, state), page -> !page.isEmpty()); + Iterator> cellTsIter = cellTsPairLoader.createPageIterator(tableRef, request); + Iterator> rawIter = CandidateGroupingIterator.create(cellTsIter); + return Iterators.filter(rawIter, page -> !page.isEmpty()); } - private class PageIterator extends AbstractIterator> { - private final CellTsPairLoader pageLoader; - private final CandidatePagingState state; - - PageIterator(CellTsPairLoader pageLoader, CandidatePagingState state) { - this.pageLoader = pageLoader; - this.state = state; - } - - @Override - protected List computeNext() { - Optional startingPosition = state.getNextStartingPosition(); - if (startingPosition.isPresent()) { - return loadNextPage(startingPosition.get()); - } else { - return endOfData(); - } - } - - private List loadNextPage(StartingPosition startingPosition) { - CellTsPairLoader.Page page = pageLoader.loadNextPage( - startingPosition, state.getCellTsPairsExaminedInCurrentRow()); - List candidates = state.processBatch( - page.entries, page.reachedEnd); - if (page.reachedEnd && page.scannedSingleRow) { - // If we reached the end of results in the single-row mode, it doesn't mean that we reached - // the end of the table. We need to restart from the next row in the normal mode. - state.restartFromNextRow(); - } - return candidates; - } - } } diff --git a/atlasdb-dbkvs/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/CandidatePageJoiningIteratorTest.java b/atlasdb-dbkvs/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/CandidatePageJoiningIteratorTest.java deleted file mode 100644 index 977f1a58db1..00000000000 --- a/atlasdb-dbkvs/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/CandidatePageJoiningIteratorTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.List; - -import org.junit.Test; - -import com.google.common.collect.ImmutableList; -import com.google.common.primitives.Ints; -import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweeping; -import com.palantir.atlasdb.keyvalue.api.Cell; -import com.palantir.atlasdb.keyvalue.api.ImmutableCandidateCellForSweeping; - -public class CandidatePageJoiningIteratorTest { - - @Test - public void emptySequenceStaysEmpty() { - // nothing -> nothing - assertTrue(transform(2, ImmutableList.of()).isEmpty()); - } - - @Test - public void oneEmptyListBecomesEmptySequence() { - // [] -> nothing - assertTrue(transform(2, ImmutableList.of(ImmutableList.of())).isEmpty()); - } - - @Test - public void manyEmptyListsBecomeEmptySequence() { - // [], [], [] -> nothing - assertTrue( - transform(2, ImmutableList.of(ImmutableList.of(), ImmutableList.of(), ImmutableList.of())).isEmpty()); - } - - @Test - public void pagesThatAreBigEnoughArePreservedAsIs() { - // min page size 2: [2], [2] -> [2], [2] - assertEquals( - ImmutableList.of(ImmutableList.of(candidate(100, 2)), ImmutableList.of(candidate(110, 2))), - transform(2, - ImmutableList.of(ImmutableList.of(candidate(100, 2)), ImmutableList.of(candidate(110, 2))))); - } - - @Test - public void smallPagesAreJoined() { - // min page size 2: [1], [], [2], [1] -> [1, 2], [1] - assertEquals( - ImmutableList.of( - ImmutableList.of(candidate(100, 1), candidate(110, 2)), - ImmutableList.of(candidate(120, 1))), - transform(2, ImmutableList.of( - ImmutableList.of(candidate(100, 1)), - ImmutableList.of(), - ImmutableList.of(candidate(110, 2)), - ImmutableList.of(candidate(120, 1))))); - } - - @Test - public void nonCandidateCellsCountAsOne() { - // min page size 2: [0], [0], [0], -> [0, 0], [0] - assertEquals( - ImmutableList.of( - ImmutableList.of(candidate(100, 0), candidate(110, 0)), - ImmutableList.of(candidate(120, 0))), - transform(2, ImmutableList.of( - ImmutableList.of(candidate(100, 0)), - ImmutableList.of(candidate(110, 0)), - ImmutableList.of(candidate(120, 0))))); - } - - private static List> transform(int minPageSize, - List> inputPages) { - return ImmutableList.copyOf(new CandidatePageJoiningIterator(inputPages.iterator(), minPageSize)); - } - - private static CandidateCellForSweeping candidate(int rowName, int numTs) { - return ImmutableCandidateCellForSweeping.builder() - .cell(Cell.create(Ints.toByteArray(rowName), Ints.toByteArray(1))) - .isLatestValueEmpty(false) - .numCellsTsPairsExamined(123) - .sortedTimestamps(new long[numTs]) - .build(); - } - -} diff --git a/atlasdb-dbkvs/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidateGroupingIteratorTest.java b/atlasdb-dbkvs/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidateGroupingIteratorTest.java new file mode 100644 index 00000000000..8ae67dbf219 --- /dev/null +++ b/atlasdb-dbkvs/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidateGroupingIteratorTest.java @@ -0,0 +1,126 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import com.google.common.collect.ImmutableList; +import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweeping; +import com.palantir.atlasdb.keyvalue.api.Cell; +import com.palantir.atlasdb.keyvalue.api.ImmutableCandidateCellForSweeping; + +public class CandidateGroupingIteratorTest { + + @Rule + public final ExpectedException thrown = ExpectedException.none(); + + @Test + public void emptyInput() { + assertThat(group(ImmutableList.of()), empty()); + } + + @Test + public void singleCellInOnePage() { + assertThat( + group(ImmutableList.of(ImmutableList.of(cellTs("a", "x", 10L), cellTs("a", "x", 20L)))), + equalTo(ImmutableList.of(ImmutableList.of(candidate("a", "x", 2L, false, 10L, 20L))))); + } + + @Test + public void singleCellSpanningTwoPages() { + assertThat( + group(ImmutableList.of( + ImmutableList.of(cellTs("a", "x", 10L), cellTs("a", "x", 20L)), + ImmutableList.of(cellTs("a", "x", 30L)))), + equalTo(ImmutableList.of( + ImmutableList.of(), + ImmutableList.of(candidate("a", "x", 3L, false, 10L, 20L, 30L))))); + } + + @Test + public void severalCellsInSinglePage() { + assertThat( + group(ImmutableList.of( + ImmutableList.of(cellTs("a", "x", 10L), cellTs("a", "y", 10L), cellTs("b", "y", 10L)))), + equalTo(ImmutableList.of( + ImmutableList.of( + candidate("a", "x", 1L, false, 10L), + candidate("a", "y", 2L, false, 10L), + candidate("b", "y", 3L, false, 10L))))); + } + + @Test + public void latestValueEmpty() { + assertThat( + group(ImmutableList.of(ImmutableList.of(cellTs("a", "x", 10L, false), cellTs("a", "x", 20L, true)))), + equalTo(ImmutableList.of(ImmutableList.of(candidate("a", "x", 2L, true, 10L, 20L))))); + } + + @Test + public void nonLatestValueEmpty() { + assertThat( + group(ImmutableList.of(ImmutableList.of(cellTs("a", "x", 10L, true), cellTs("a", "x", 20L, false)))), + equalTo(ImmutableList.of(ImmutableList.of(candidate("a", "x", 2L, false, 10L, 20L))))); + } + + @Test + public void throwIfTimestampsAreOutOfOrder() { + thrown.expectMessage("Timestamps for each cell must be fed in strictly increasing order"); + group(ImmutableList.of(ImmutableList.of(cellTs("a", "x", 20L), cellTs("a", "x", 10L)))); + } + + private static CandidateCellForSweeping candidate(String rowName, + String colName, + long numCellTsPairsExamined, + boolean latestValEmpty, + long... ts) { + Arrays.sort(ts); + return ImmutableCandidateCellForSweeping.builder() + .cell(Cell.create(bytes(rowName), bytes(colName))) + .isLatestValueEmpty(latestValEmpty) + .numCellsTsPairsExamined(numCellTsPairsExamined) + .sortedTimestamps(ts) + .build(); + } + + private List> group(List> cellTsPairBatches) { + return ImmutableList.copyOf(CandidateGroupingIterator.create(cellTsPairBatches.iterator())); + } + + private static CellTsPairInfo cellTs(String rowName, String colName, long ts) { + return cellTs(rowName, colName, ts, false); + } + + private static CellTsPairInfo cellTs(String rowName, String colName, long ts, boolean emptyVal) { + return new CellTsPairInfo(bytes(rowName), bytes(colName), ts, emptyVal); + } + + private static byte[] bytes(String string) { + return string.getBytes(StandardCharsets.UTF_8); + } + +} diff --git a/atlasdb-dbkvs/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidatePagingStateTest.java b/atlasdb-dbkvs/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidatePagingStateTest.java deleted file mode 100644 index e4f2b4cc73a..00000000000 --- a/atlasdb-dbkvs/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/sweep/CandidatePagingStateTest.java +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Copyright 2017 Palantir Technologies, Inc. All rights reserved. - * - * Licensed under the BSD-3 License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep; - -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.empty; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Optional; - -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -import com.google.common.collect.ImmutableList; -import com.palantir.atlasdb.encoding.PtBytes; -import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweeping; -import com.palantir.atlasdb.keyvalue.api.Cell; -import com.palantir.atlasdb.keyvalue.api.ImmutableCandidateCellForSweeping; -import com.palantir.atlasdb.keyvalue.api.RangeRequests; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CandidatePagingState.CellTsPairInfo; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CandidatePagingState.StartingPosition; - -public class CandidatePagingStateTest { - - @Rule - public ExpectedException thrown = ExpectedException.none(); - - @Test - public void initialStartingPositionFromBeginningOfTable() { - CandidatePagingState state = CandidatePagingState.create(PtBytes.EMPTY_BYTE_ARRAY); - Optional pos = state.getNextStartingPosition(); - assertTrue(pos.isPresent()); - assertArrayEquals(PtBytes.EMPTY_BYTE_ARRAY, pos.get().rowName); - assertArrayEquals(PtBytes.EMPTY_BYTE_ARRAY, pos.get().colName); - assertNull(pos.get().timestamp); - } - - @Test - public void initialStartingPositionFromSpecificRow() { - CandidatePagingState state = CandidatePagingState.create(bytes("foo")); - Optional pos = state.getNextStartingPosition(); - assertTrue(pos.isPresent()); - assertArrayEquals(bytes("foo"), pos.get().rowName); - assertArrayEquals(PtBytes.EMPTY_BYTE_ARRAY, pos.get().colName); - assertNull(pos.get().timestamp); - } - - @Test - public void startingPositionForSecondBatch() { - CandidatePagingState state = CandidatePagingState.create(PtBytes.EMPTY_BYTE_ARRAY); - state.processBatch(ImmutableList.of(cellTs("a", "b", 10L), cellTs("a", "b", 20L)), false); - Optional pos = state.getNextStartingPosition(); - assertTrue(pos.isPresent()); - assertArrayEquals(bytes("a"), pos.get().rowName); - assertArrayEquals(bytes("b"), pos.get().colName); - assertEquals((Long) 21L, pos.get().timestamp); - } - - @Test - public void noNextStartingPositionAfterReachedEnd() { - CandidatePagingState state = CandidatePagingState.create(PtBytes.EMPTY_BYTE_ARRAY); - state.processBatch(ImmutableList.of(cellTs("a", "b", 10L)), true); - Optional pos = state.getNextStartingPosition(); - assertFalse(pos.isPresent()); - } - - @Test - public void reachedEndAfterScanningSingleCell() { - CandidatePagingState state = CandidatePagingState.create(PtBytes.EMPTY_BYTE_ARRAY); - - // We got 2 (cell, ts) pairs with the same cell and reached the end, so we expect a candidate. - List candidates = state.processBatch( - ImmutableList.of(cellTs("a", "b", 10L), cellTs("a", "b", 20L)), true); - assertThat(candidates, contains(candidate("a", "b", false, 2L, 10L, 20L))); - } - - @Test - public void singleCellSpanningTwoBatches() { - CandidatePagingState state = CandidatePagingState.create(PtBytes.EMPTY_BYTE_ARRAY); - - // We got 2 (cell, ts) pairs, both belonging to the same cell, and have not reached the end yet. - // This means that we can't form a candidate yet - there could be more timestamps for that cell - // in the next batch - List firstBatch = state.processBatch( - ImmutableList.of(cellTs("a", "b", 10L), cellTs("a", "b", 20L)), false); - assertThat(firstBatch, empty()); - - List secondBatch = state.processBatch(ImmutableList.of(cellTs("a", "b", 30L)), true); - assertThat(secondBatch, contains(candidate("a", "b", false, 3L, 10L, 20L, 30L))); - } - - @Test - public void severalCellsInOneBatch() { - CandidatePagingState state = CandidatePagingState.create(PtBytes.EMPTY_BYTE_ARRAY); - List batch = state.processBatch( - ImmutableList.of(cellTs("a", "x", 10L), cellTs("a", "y", 10L), cellTs("b", "y", 10L)), true); - assertThat(batch, contains( - candidate("a", "x", false, 1L, 10L), - candidate("a", "y", false, 2L, 10L), - candidate("b", "y", false, 3L, 10L))); - } - - @Test - public void testLatestValueEmpty() { - CandidatePagingState state = CandidatePagingState.create(PtBytes.EMPTY_BYTE_ARRAY); - List batch = state.processBatch( - ImmutableList.of(cellTs("a", "x", 10L, false), cellTs("a", "x", 20L, true)), true); - // Since the latest timestamp has the "isEmptyValue" flag set to true, we expect the candidate to - // indicate that the latest value is empty - assertThat(batch, contains(candidate("a", "x", true, 2L, 10L, 20L))); - } - - @Test - public void testNonLatestValueEmpty() { - CandidatePagingState state = CandidatePagingState.create(PtBytes.EMPTY_BYTE_ARRAY); - List batch = state.processBatch( - ImmutableList.of(cellTs("a", "x", 10L, true), cellTs("a", "x", 20L, false)), true); - // Since the latest timestamp has the "isEmptyValue" flag set to false, we expect the candidate to - // indicate that the latest value is not empty - assertThat(batch, contains(candidate("a", "x", false, 2L, 10L, 20L))); - } - - @Test - public void restartFromNextRow() { - CandidatePagingState state = CandidatePagingState.create(PtBytes.EMPTY_BYTE_ARRAY); - state.processBatch(ImmutableList.of(cellTs("a", "x", 10L)), true); - state.restartFromNextRow(); - Optional pos = state.getNextStartingPosition(); - assertTrue(pos.isPresent()); - // [ 'a', 0 ] is the lexicographically next key after [ 'a' ] - assertArrayEquals(new byte[] { 'a', 0 }, pos.get().rowName); - assertArrayEquals(PtBytes.EMPTY_BYTE_ARRAY, pos.get().colName); - assertNull(pos.get().timestamp); - } - - @Test - public void restartFromNextRowWhenThereIsNoNextRow() { - CandidatePagingState state = CandidatePagingState.create(PtBytes.EMPTY_BYTE_ARRAY); - byte[] rowName = RangeRequests.getLastRowName(); - CellTsPairInfo cellTs = new CellTsPairInfo(rowName, bytes("a"), 10L, false); - state.processBatch(ImmutableList.of(cellTs), false); - state.restartFromNextRow(); - // There is no next row if the current row is lexicographically last - assertFalse(state.getNextStartingPosition().isPresent()); - } - - @Test - public void cellTsPairsExaminedInCurrentRow() { - CandidatePagingState state = CandidatePagingState.create(PtBytes.EMPTY_BYTE_ARRAY); - state.processBatch(ImmutableList.of( - cellTs("a", "x", 10L), cellTs("b", "y", 10L), cellTs("b", "y", 20L), cellTs("b", "z", 30L)), - false); - // Three (cell, ts) pairs in row "b" so far: (b, y, 10), (b, y, 20), (b, z, 30) - assertEquals(3, state.getCellTsPairsExaminedInCurrentRow()); - - state.processBatch(ImmutableList.of(cellTs("b", "z", 40L)), false); - // Added (b, z, 40), so should be 4 total now - assertEquals(4, state.getCellTsPairsExaminedInCurrentRow()); - - state.processBatch(ImmutableList.of(cellTs("c", "z", 40L)), false); - // Started a new row "c", and only examined one entry - assertEquals(1, state.getCellTsPairsExaminedInCurrentRow()); - - state.restartFromNextRow(); - // Nothing examined yet if starting from a fresh row - assertEquals(0, state.getCellTsPairsExaminedInCurrentRow()); - } - - @Test - public void throwIfGotRepeatingTimestamps() { - CandidatePagingState state = CandidatePagingState.create(PtBytes.EMPTY_BYTE_ARRAY); - state.processBatch(ImmutableList.of(cellTs("a", "b", 20L)), false); - thrown.expectMessage("Timestamps for each cell must be fed in strictly increasing order"); - state.processBatch(ImmutableList.of(cellTs("a", "b", 20L)), false); - } - - @Test - public void throwIfTimestampIsMaxLong() { - CandidatePagingState state = CandidatePagingState.create(PtBytes.EMPTY_BYTE_ARRAY); - state.processBatch(ImmutableList.of(cellTs("a", "b", Long.MAX_VALUE)), false); - thrown.expectMessage("Timestamps must be strictly less than Long.MAX_VALUE"); - state.getNextStartingPosition(); - } - - private static CellTsPairInfo cellTs(String row, String col, long ts) { - return cellTs(row, col, ts, false); - } - - private static CellTsPairInfo cellTs(String row, String col, long ts, boolean emptyVal) { - return new CellTsPairInfo(bytes(row), bytes(col), ts, emptyVal); - } - - private static CandidateCellForSweeping candidate(String row, - String col, - boolean emptyLatestVal, - long numCellTsPairsExamined, - long... sortedTimestamps) { - return ImmutableCandidateCellForSweeping.builder() - .cell(Cell.create(bytes(row), bytes(col))) - .isLatestValueEmpty(emptyLatestVal) - .numCellsTsPairsExamined(numCellTsPairsExamined) - .sortedTimestamps(sortedTimestamps) - .build(); - } - - private static byte[] bytes(String string) { - return string.getBytes(StandardCharsets.UTF_8); - } - -}