From 042c8a07b9569a8f5843dc25c023c281966dfda2 Mon Sep 17 00:00:00 2001 From: Greg Bonik Date: Wed, 4 Oct 2017 13:13:26 -0700 Subject: [PATCH 1/2] Simplify getCandidateCellsForSweeping() Initially I hoped to take advantage of in-database filtering if we get to implement the "transaction table sweeping" feature. However, seems like that wasn't a great decision on my part - unclear when (and if) we get that feature done, and also how much of improvement we would actually get. The extra logic makes the code significantly more complex, so I think we need to back off and give up the idea. This doesn't invalidate the sweep rewrite project. The new impls still bring significant performance improvements. --- .../api/CandidateCellForSweepingRequest.java | 13 -- .../atlasdb/keyvalue/api/KeyValueService.java | 28 +-- ...andraGetCandidateCellsForSweepingImpl.java | 22 +- .../GetCandidateCellsForSweepingShim.java | 16 +- .../dbkvs/DbkvsPostgresTestSuite.java | 3 +- ...tgresGetCandidateCellsForSweepingTest.java | 99 +-------- .../atlasdb/keyvalue/dbkvs/impl/DbKvs.java | 2 +- .../PostgresGetCandidateCellsForSweeping.java | 200 +++++------------- .../atlasdb/sweep/SweepTaskRunner.java | 2 - ...etCandidateCellsForSweepingBenchmarks.java | 1 - ...tractGetCandidateCellsForSweepingTest.java | 110 +++++----- 11 files changed, 122 insertions(+), 374 deletions(-) diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/CandidateCellForSweepingRequest.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/CandidateCellForSweepingRequest.java index 70a5a68e7e3..bca3365acc7 100644 --- a/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/CandidateCellForSweepingRequest.java +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/CandidateCellForSweepingRequest.java @@ -25,19 +25,6 @@ public interface CandidateCellForSweepingRequest { OptionalInt batchSizeHint(); - /** - * This can be used in the future when we implement the 'transaction table sweeping' feature. - * This should be set to the timestamp T such that all transactions with start timestamps less than T that - * appear in the given table are known to be committed. The number T can come from the previous run of sweep - * for the table. - * - * This enables in-database pre-filtering of cells that should be considered for sweeping. - * For example, if a cell has exactly one timestamp and this timestamp is known to belong to a committed - * transaction, then the cell doesn't need to be swept, and therefore we can avoid sending it over the network - * from the DB to the sweeper process. - */ - long minUncommittedStartTimestamp(); - /** * Only start timestamps that are strictly below this number will be considered. */ diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/KeyValueService.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/KeyValueService.java index 414e9949819..dd3e50f3958 100644 --- a/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/KeyValueService.java +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/KeyValueService.java @@ -450,30 +450,10 @@ ClosableIterator>> getRangeOfTimestamps( /** * For a given range of rows, returns all candidate cells for sweeping (and their timestamps). - * Here is the precise definition of a candidate cell: - *
- * Let {@code Ts} be {@code request.sweepTimestamp()}
- * Let {@code Tu} be {@code request.minUncommittedTimestamp()}
- * Let {@code V} be {@code request.shouldCheckIfLatestValueIsEmpty()}
- * Let {@code Ti} be set of timestamps in {@code request.timestampsToIgnore()}
- *

- * Consider a cell {@code C}. Let {@code Tc} be the set of all timestamps for {@code C} that are strictly - * less than {@code Ts}. Let {@code T} be {@code Tc \ Ti} (i.e. the cell timestamps minus the ignored - * timestamps). - *

- * Then {@code C} is a candidate for sweeping if and only if at least one of - * the following conditions is true: - *

    - *
  1. The set {@code T} has more than one element - *
  2. The set {@code T} contains an element that is greater than or equal to {@code Tu} - * (that is, there is a timestamp that can possibly come from an uncommitted or aborted transaction) - *
  3. The set {@code T} contains {@link Value#INVALID_VALUE_TIMESTAMP} - * (that is, there is a sentinel we can possibly clean up) - *
  4. {@code V} is true and the cell value corresponding to the maximum element of {@code T} is empty - * (that is, the latest sweepable value is a 'soft-delete' tombstone) - *
- * - *
+ *

+ * A candidate cell is a cell that has at least one timestamp that is less than request.sweepTimestamp() and is + * not in the set specified by request.timestampsToIgnore(). + *

* This method will scan the semi-open range of rows from the start row specified in the {@code request} * to the end of the table. If the given start row name is an empty byte array, the whole table will be * scanned. diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/sweep/CassandraGetCandidateCellsForSweepingImpl.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/sweep/CassandraGetCandidateCellsForSweepingImpl.java index 32a5629e141..f282334d195 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/sweep/CassandraGetCandidateCellsForSweepingImpl.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/sweep/CassandraGetCandidateCellsForSweepingImpl.java @@ -32,7 +32,6 @@ import com.palantir.atlasdb.keyvalue.api.Cell; import com.palantir.atlasdb.keyvalue.api.ImmutableCandidateCellForSweeping; import com.palantir.atlasdb.keyvalue.api.TableReference; -import com.palantir.atlasdb.keyvalue.api.Value; import com.palantir.atlasdb.keyvalue.cassandra.CassandraKeyValueServices; import com.palantir.atlasdb.keyvalue.cassandra.paging.CassandraRawCellValue; import com.palantir.atlasdb.keyvalue.cassandra.paging.CellPager; @@ -46,8 +45,6 @@ */ public class CassandraGetCandidateCellsForSweepingImpl { - private static final long[] EMPTY_LONG_ARRAY = new long[0]; - private final CellPager cellPager; public CassandraGetCandidateCellsForSweepingImpl(CellPager cellPager) { @@ -133,8 +130,8 @@ private Optional processColumn(CassandraRawCellValue c } private CandidateCellForSweeping createCandidate() { - boolean isCandidate = isCandidate(); - long[] sortedTimestamps = isCandidate ? sortTimestamps() : EMPTY_LONG_ARRAY; + currentTimestamps.reverse(); + long[] sortedTimestamps = currentTimestamps.toArray(); currentTimestamps.clear(); return ImmutableCandidateCellForSweeping.builder() .cell(currentCell) @@ -143,21 +140,6 @@ private CandidateCellForSweeping createCandidate() { .numCellsTsPairsExamined(numCellTsPairsExamined) .build(); } - - private long[] sortTimestamps() { - currentTimestamps.reverse(); - return currentTimestamps.toArray(); - } - - private boolean isCandidate() { - return currentTimestamps.size() > 1 - || currentLatestValEmpty - || (currentTimestamps.size() == 1 && timestampIsPotentiallySweepable(currentTimestamps.get(0))); - } - - private boolean timestampIsPotentiallySweepable(long ts) { - return ts == Value.INVALID_VALUE_TIMESTAMP || ts >= request.minUncommittedStartTimestamp(); - } } } diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/GetCandidateCellsForSweepingShim.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/GetCandidateCellsForSweepingShim.java index ae3b05f8c06..681d4e382a0 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/GetCandidateCellsForSweepingShim.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/GetCandidateCellsForSweepingShim.java @@ -82,10 +82,9 @@ public ClosableIterator> getCandidateCellsForSwee Cell cell = Cell.create(rr.getRowName(), colName); boolean latestValEmpty = isLatestValueEmpty(cell, peekingValues); numExamined.add(timestampArr.length); - boolean candidate = isCandidate(timestampArr, latestValEmpty, request); candidateBatch.add(ImmutableCandidateCellForSweeping.builder() .cell(cell) - .sortedTimestamps(candidate ? timestampArr : EMPTY_LONG_ARRAY) + .sortedTimestamps(timestampArr) .isLatestValueEmpty(latestValEmpty) .numCellsTsPairsExamined(numExamined.longValue()) .build()); @@ -109,18 +108,6 @@ private static Closer createCloserAndRelease(ReleasableCloseable... closeable return closer; } - private static boolean isCandidate(long[] timestamps, - boolean lastValEmpty, - CandidateCellForSweepingRequest request) { - return timestamps.length > 1 - || (request.shouldCheckIfLatestValueIsEmpty() && lastValEmpty) - || (timestamps.length == 1 && timestampIsPotentiallySweepable(timestamps[0], request)); - } - - private static boolean timestampIsPotentiallySweepable(long ts, CandidateCellForSweepingRequest request) { - return ts == Value.INVALID_VALUE_TIMESTAMP || ts >= request.minUncommittedStartTimestamp(); - } - private ClosableIterator> getValues(TableReference tableRef, RangeRequest range, long sweepTs, @@ -175,5 +162,4 @@ public void close() { } } - private static final long[] EMPTY_LONG_ARRAY = new long[0]; } diff --git a/atlasdb-dbkvs-tests/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/DbkvsPostgresTestSuite.java b/atlasdb-dbkvs-tests/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/DbkvsPostgresTestSuite.java index 8698ec71731..170ed5c417c 100644 --- a/atlasdb-dbkvs-tests/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/DbkvsPostgresTestSuite.java +++ b/atlasdb-dbkvs-tests/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/DbkvsPostgresTestSuite.java @@ -97,7 +97,8 @@ private static Callable canCreateKeyValueService() { kvs = ConnectionManagerAwareDbKvs.create(getKvsConfig()); return kvs.getConnectionManager().getConnection().isValid(5); } catch (Exception ex) { - if (ex.getMessage().contains("The connection attempt failed.")) { + if (ex.getMessage().contains("The connection attempt failed.") + || ex.getMessage().contains("the database system is starting up")) { return false; } else { throw ex; diff --git a/atlasdb-dbkvs-tests/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/DbKvsPostgresGetCandidateCellsForSweepingTest.java b/atlasdb-dbkvs-tests/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/DbKvsPostgresGetCandidateCellsForSweepingTest.java index 7d0661aa56c..2c47d1120e4 100644 --- a/atlasdb-dbkvs-tests/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/DbKvsPostgresGetCandidateCellsForSweepingTest.java +++ b/atlasdb-dbkvs-tests/src/test/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/DbKvsPostgresGetCandidateCellsForSweepingTest.java @@ -16,113 +16,18 @@ package com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres; -import static org.junit.Assert.assertEquals; - -import java.util.List; - -import org.junit.Test; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterators; -import com.palantir.atlasdb.encoding.PtBytes; -import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweeping; -import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweepingRequest; -import com.palantir.atlasdb.keyvalue.api.ImmutableCandidateCellForSweeping; import com.palantir.atlasdb.keyvalue.api.KeyValueService; import com.palantir.atlasdb.keyvalue.dbkvs.DbKeyValueServiceConfig; import com.palantir.atlasdb.keyvalue.dbkvs.DbkvsPostgresTestSuite; import com.palantir.atlasdb.keyvalue.dbkvs.impl.ConnectionManagerAwareDbKvs; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.SqlConnectionSupplier; -import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.DbKvsGetCandidateCellsForSweeping; import com.palantir.atlasdb.keyvalue.impl.AbstractGetCandidateCellsForSweepingTest; -import com.palantir.common.base.ClosableIterator; public class DbKvsPostgresGetCandidateCellsForSweepingTest extends AbstractGetCandidateCellsForSweepingTest { - private static DbKeyValueServiceConfig config; - private static SqlConnectionSupplier connectionSupplier; - @Override protected KeyValueService createKeyValueService() { - config = DbkvsPostgresTestSuite.getKvsConfig(); - ConnectionManagerAwareDbKvs kvs = ConnectionManagerAwareDbKvs.create(config); - connectionSupplier = kvs.getSqlConnectionSupplier(); - return kvs; - } - - @Test - public void singleCellSpanningSeveralPages() { - new TestDataBuilder() - .put(10, 1, 1000) - .put(10, 1, 1001) - .put(10, 1, 1002) - .put(10, 1, 1003) - .put(10, 1, 1004) - .store(); - List cells = getWithOverriddenLimit( - conservativeRequest(PtBytes.EMPTY_BYTE_ARRAY, 2000L, Long.MIN_VALUE), 2); - assertEquals(ImmutableList.of(ImmutableCandidateCellForSweeping.builder() - .cell(cell(10, 1)) - .isLatestValueEmpty(false) - .numCellsTsPairsExamined(5) - .sortedTimestamps(1000L, 1001L, 1002L, 1003L, 1004L) - .build()), cells); - } - - @Test - public void returnFirstAndLastCellOfThePage() { - new TestDataBuilder() - .put(10, 1, 1000) - .put(10, 2, 400) - // The cell (20, 1) is not a candidate because the minimumUncommittedTimestamp is 750, which is greater - // than 500. However, we still need to return this cell since it's at the page boundary. - .put(20, 1, 500) - // <---- page boundary here ----> - // Again, this cell is not a candidate, but we need to return it - // since it's the first SQL row in the page. - .put(30, 1, 600) - .store(); - List cells = getWithOverriddenLimit( - conservativeRequest(PtBytes.EMPTY_BYTE_ARRAY, 2000L, 750L), 3); - assertEquals( - ImmutableList.of( - ImmutableCandidateCellForSweeping.builder() - .cell(cell(10, 1)) - .isLatestValueEmpty(false) - .numCellsTsPairsExamined(1) - .sortedTimestamps(1000L) - .build(), - ImmutableCandidateCellForSweeping.builder() - .cell(cell(20, 1)) - .isLatestValueEmpty(false) - .numCellsTsPairsExamined(3) - // No timestamps because the cell is not a real candidate - .sortedTimestamps() - .build(), - ImmutableCandidateCellForSweeping.builder() - .cell(cell(30, 1)) - .isLatestValueEmpty(false) - .numCellsTsPairsExamined(4) - // No timestamps because the cell is not a real candidate - .sortedTimestamps() - .build()), - cells); - } - - private List getWithOverriddenLimit( - CandidateCellForSweepingRequest request, - int sqlRowLimitOverride) { - try (ClosableIterator> iter = createImpl(sqlRowLimitOverride) - .getCandidateCellsForSweeping(TEST_TABLE, request, null)) { - return ImmutableList.copyOf(Iterators.concat(Iterators.transform(iter, List::iterator))); - } - } - - private DbKvsGetCandidateCellsForSweeping createImpl(int sqlRowLimitOverride) { - return new PostgresGetCandidateCellsForSweeping( - new PostgresPrefixedTableNames(config.ddl()), - connectionSupplier, - x -> sqlRowLimitOverride); + DbKeyValueServiceConfig config = DbkvsPostgresTestSuite.getKvsConfig(); + return ConnectionManagerAwareDbKvs.create(config); } } diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java index 837147acabb..99175bf8754 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java @@ -189,7 +189,7 @@ private static DbKvs createPostgres(ExecutorService executor, new ParallelTaskRunner(newFixedThreadPool(config.poolSize()), config.fetchBatchSize()), (conns, tbl, ids) -> Collections.emptyMap(), // no overflow on postgres new PostgresGetRange(prefixedTableNames, connections, tableMetadataCache), - PostgresGetCandidateCellsForSweeping.create(prefixedTableNames, connections)); + new PostgresGetCandidateCellsForSweeping(prefixedTableNames, connections)); } private static DbKvs createOracle(ExecutorService executor, diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresGetCandidateCellsForSweeping.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresGetCandidateCellsForSweeping.java index ca3fc4573ef..6ccad8c49e6 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresGetCandidateCellsForSweeping.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/postgres/PostgresGetCandidateCellsForSweeping.java @@ -20,12 +20,9 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; -import java.util.function.IntToLongFunction; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.AbstractIterator; -import com.google.common.primitives.Longs; import com.palantir.atlasdb.encoding.PtBytes; import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweeping; import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweepingRequest; @@ -33,7 +30,6 @@ import com.palantir.atlasdb.keyvalue.api.ImmutableCandidateCellForSweeping; import com.palantir.atlasdb.keyvalue.api.KeyValueService; import com.palantir.atlasdb.keyvalue.api.TableReference; -import com.palantir.atlasdb.keyvalue.api.Value; import com.palantir.atlasdb.keyvalue.dbkvs.impl.ConnectionSupplier; import com.palantir.atlasdb.keyvalue.dbkvs.impl.DbKvs; import com.palantir.atlasdb.keyvalue.dbkvs.impl.FullQuery; @@ -48,79 +44,17 @@ import gnu.trove.list.TLongList; import gnu.trove.list.array.TLongArrayList; -// Two considerations that influenced the implementation: -// -// 1. Window functions (OVER) on Postgres seem to be significantly slower than equivalent aggregates (GROUP BY). -// -// 2. Limiting an outer query doesn't seem to work well, e.g.: -// SELECT ... FROM ( -// SELECT ... FROM my_table -// ... -// ORDER BY row_name, col_name, ts -// ... -// ) sub -// WHERE ... -// ORDER BY row_name, col_name, ts -// LIMIT 1000 -// Postgres will often choose a full table scan for the inner SELECT query, even when an index -// range scan is sufficient (because of the limit on the outer query). Postgres also doesn't -// have hints like Oracle, so one can't force it to use a certain plan. A trick with specifying -// a very large limit for the inner query seems to work, hinting the optimizer that only -// the first few rows matter: -// SELECT ... FROM ( -// SELECT ... FROM my_table -// ... -// ORDER BY row_name, col_name, ts -// ... -// LIMIT 100000000000000000000 -- a very large number that is guaranteed to exceed the table size -// ) sub -// WHERE ... -// ORDER BY row_name, col_name, ts -// LIMIT 1000 -// However, this looks very hacky and fragile, so instead we resort to simply limiting the inner query -// (i.e., limiting the number of examined cells rather than the number of returned candidates). -// -// So, our SQL query does the following: -// -// 1. Grab a page of SQL rows, starting at some (row_name, col_name, ts). -// 2. Number the rows, starting with 1. -// 3. Group the rows by cell key, i.e. (row_name, col_name), and compute a few aggregates: -// - Minimum and maximum row number -// - Minimum and maximum timestamp -// - An array of all timestamps (using ARRAY_AGG) -// 4. If the THOROUGH strategy is being used, do a self-join to check if the values are empty. -// 5. Filter out cells that don't satisfy the definition of a candidate, but keep the first -// and last SQL row in each page (except for the last page). -// -// public class PostgresGetCandidateCellsForSweeping implements DbKvsGetCandidateCellsForSweeping { private final PostgresPrefixedTableNames prefixedTableNames; private final SqlConnectionSupplier connectionPool; - private final IntToLongFunction sqlRowLimitProvider; - private final long[] emptyLongArray = new long[] {}; private static final int DEFAULT_BATCH_SIZE = 1000; - public static PostgresGetCandidateCellsForSweeping create( - PostgresPrefixedTableNames prefixedTableNames, - SqlConnectionSupplier connectionPool) { - return new PostgresGetCandidateCellsForSweeping( - prefixedTableNames, - connectionPool, - // Since in our SQL query we limit the number of examined cells rather than the number of - // returned candidates, we set the limit higher than the requested batch size. - batchHint -> 4 * batchHint); - } - - @VisibleForTesting - /* package */ PostgresGetCandidateCellsForSweeping( - PostgresPrefixedTableNames prefixedTableNames, - SqlConnectionSupplier connectionPool, - IntToLongFunction sqlRowLimitProvider) { + public PostgresGetCandidateCellsForSweeping(PostgresPrefixedTableNames prefixedTableNames, + SqlConnectionSupplier connectionPool) { this.prefixedTableNames = prefixedTableNames; this.connectionPool = connectionPool; - this.sqlRowLimitProvider = sqlRowLimitProvider; } @Override @@ -131,7 +65,7 @@ public ClosableIterator> getCandidateCellsForSwee String tableName = DbKvs.internalTableName(tableRef); PageIterator rawIterator = new PageIterator( request, - (int) sqlRowLimitProvider.applyAsLong(request.batchSizeHint().orElse(DEFAULT_BATCH_SIZE)), + request.batchSizeHint().orElse(DEFAULT_BATCH_SIZE), tableName, prefixedTableNames.get(tableRef), request.startRowInclusive()); @@ -150,7 +84,7 @@ private class PageIterator extends AbstractIterator computeNext() { - if (endOfResults) { + if (reachedEnd) { return endOfData(); } else { try (ConnectionSupplier conns = new ConnectionSupplier(connectionPool); @@ -187,18 +121,23 @@ protected List computeNext() { currentRowName = rowName; currentColName = colName; } - Object[] timestamps = (Object[]) sqlRow.getArray("timestamps"); - for (Object ts : timestamps) { - currentCellTimestamps.add((Long) ts); - } - cellTsPairsExaminedCurrentBatch = sqlRow.getLong("max_row_number"); if (request.shouldCheckIfLatestValueIsEmpty()) { + // We group timestamps in an array when we check the latest values + Object[] timestamps = (Object[]) sqlRow.getArray("timestamps"); + cellTsPairsExaminedCurrentBatch += timestamps.length; + for (Object ts : timestamps) { + currentCellTimestamps.add((Long) ts); + } currentIsLatestValueEmpty = sqlRow.getBoolean("latest_val_empty"); + } else { + // Otherwise, we return one row per (cell, ts) pair + cellTsPairsExaminedCurrentBatch += 1; + currentCellTimestamps.add(sqlRow.getLong("ts")); } } if (noResults || cellTsPairsExaminedCurrentBatch < sqlRowLimit) { getCurrentCandidate().ifPresent(results::add); - endOfResults = true; + reachedEnd = true; } else { computeNextStartPosition(); } @@ -235,21 +174,9 @@ private Optional getCurrentCandidate() { } private long[] getSortedTimestamps() { - if (isCandidate()) { - long[] sortedTimestamps = currentCellTimestamps.toArray(); - Arrays.sort(sortedTimestamps); - return sortedTimestamps; - } else { - return emptyLongArray; - } - } - - private boolean isCandidate() { - return currentCellTimestamps.size() > 1 - || currentCellTimestamps.get(currentCellTimestamps.size() - 1) - >= request.minUncommittedStartTimestamp() - || currentCellTimestamps.contains(Value.INVALID_VALUE_TIMESTAMP) - || currentIsLatestValueEmpty; + long[] sortedTimestamps = currentCellTimestamps.toArray(); + Arrays.sort(sortedTimestamps); + return sortedTimestamps; } private ClosableIterator selectNextPage(ConnectionSupplier conns) { @@ -263,59 +190,42 @@ private FullQuery getQuery() { RangeBoundPredicates bounds = RangeBoundPredicates.builder(false) .startCellTsInclusive(currentRowName, currentColName, firstCellStartTimestampInclusive) .build(); - boolean ignoreSentinels = areSentinelsIgnored(); - String query = "/* GET_CANDIDATE_CELLS_FOR_SWEEPING(" + tableName + ") */" - + " SELECT cells.row_name, cells.col_name, cells.timestamps, cells.max_rn AS max_row_number" - + (request.shouldCheckIfLatestValueIsEmpty() ? ", length(v.val) = 0 AS latest_val_empty" : "") - + " FROM (" - + " SELECT" - + " row_name," - + " col_name," - + " MIN(rn) AS min_rn," - + " MAX(rn) AS max_rn," - + " MIN(ts) AS min_ts," - + " MAX(ts) AS max_ts," - + (ignoreSentinels - ? "" - : " MAX((ts=" + Value.INVALID_VALUE_TIMESTAMP + ")::int) AS have_sentinel,") - + " ARRAY_AGG(ts) AS timestamps" - + " FROM (" - + " SELECT row_name, col_name, ts, ROW_NUMBER() OVER (ORDER BY row_name, col_name, ts) AS rn" - + " FROM " + prefixedTableName - + " WHERE ts < ? " + bounds.predicates + getIgnoredTimestampPredicate() - + " ORDER BY row_name, col_name, ts" - + " LIMIT " + sqlRowLimit - + " ) sub" - + " GROUP BY row_name, col_name" - + " ORDER BY row_name, col_name" - + " ) cells" - + (request.shouldCheckIfLatestValueIsEmpty() - ? " JOIN " + prefixedTableName + " v" - + " ON cells.row_name = v.row_name" - + " AND cells.col_name = v.col_name" - + " AND cells.max_ts = v.ts" - : "") - + " WHERE" - // See KVS.getCandidateCellsForSweeping() docs for the definition of a candidate cell: - // (1) The set T has more than one element - + " min_ts <> max_ts" - // (2) The set T contains an element that is greater than or equal to Tu - + " OR max_ts >= ?" - // (3) The set T contains Value.INVALID_VALUE_TIMESTAMP - + (ignoreSentinels ? "" : " OR have_sentinel = 1") - // (4) V is true and the cell value corresponding to the maximum element of T is empty - + (request.shouldCheckIfLatestValueIsEmpty() ? " OR length(v.val) = 0" : "") - // Also, always get the first cell, as well as the last one if the limit was reached - + " OR min_rn = 1 OR max_rn = " + sqlRowLimit - + " ORDER BY cells.row_name, cells.col_name"; - return new FullQuery(query) - .withArg(request.sweepTimestamp()) // "WHERE ts < ?" - .withArgs(bounds.args) - .withArg(request.minUncommittedStartTimestamp()); // "OR max_ts >= ?" - } - - private boolean areSentinelsIgnored() { - return Longs.contains(request.timestampsToIgnore(), Value.INVALID_VALUE_TIMESTAMP); + if (request.shouldCheckIfLatestValueIsEmpty()) { + String query = "/* GET_CANDIDATE_CELLS_FOR_SWEEPING_THOROUGH(" + tableName + ") */" + + " SELECT cells.row_name, cells.col_name, cells.timestamps, " + + " length(v.val) = 0 AS latest_val_empty" + + " FROM (" + + " SELECT" + + " row_name, col_name, MAX(ts) AS max_ts, ARRAY_AGG(ts) AS timestamps" + + " FROM (" + + " SELECT row_name, col_name, ts" + + " FROM " + prefixedTableName + + " WHERE ts < ? " + bounds.predicates + getIgnoredTimestampPredicate() + + " ORDER BY row_name, col_name, ts" + + " LIMIT " + sqlRowLimit + + " ) sub" + + " GROUP BY row_name, col_name" + + " ORDER BY row_name, col_name" + + " ) cells" + + " JOIN " + prefixedTableName + " v" + + " ON cells.row_name = v.row_name" + + " AND cells.col_name = v.col_name" + + " AND cells.max_ts = v.ts" + + " ORDER BY cells.row_name, cells.col_name"; + return new FullQuery(query) + .withArg(request.sweepTimestamp()) // "WHERE ts < ?" + .withArgs(bounds.args); + } else { + String query = "/* GET_CANDIDATE_CELLS_FOR_SWEEPING_CONSERVATIVE(" + tableName + ") */" + + " SELECT row_name, col_name, ts" + + " FROM " + prefixedTableName + + " WHERE ts < ? " + bounds.predicates + getIgnoredTimestampPredicate() + + " ORDER BY row_name, col_name, ts" + + " LIMIT " + sqlRowLimit; + return new FullQuery(query) + .withArg(request.sweepTimestamp()) // "WHERE ts < ?" + .withArgs(bounds.args); + } } private String getIgnoredTimestampPredicate() { diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/SweepTaskRunner.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/SweepTaskRunner.java index 8e475d141d8..0a53d52ddc2 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/SweepTaskRunner.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/SweepTaskRunner.java @@ -156,8 +156,6 @@ private SweepResults doRun(TableReference tableRef, CandidateCellForSweepingRequest request = ImmutableCandidateCellForSweepingRequest.builder() .startRowInclusive(startRow) .batchSizeHint(batchConfig.candidateBatchSize()) - // TODO(sberler): change once we figure out transaction table sweep - .minUncommittedStartTimestamp(Long.MIN_VALUE) .sweepTimestamp(sweepTs) .shouldCheckIfLatestValueIsEmpty(sweeper.shouldSweepLastCommitted()) .timestampsToIgnore(sweeper.getTimestampsToIgnore()) diff --git a/atlasdb-perf/src/main/java/com/palantir/atlasdb/performance/benchmarks/KvsGetCandidateCellsForSweepingBenchmarks.java b/atlasdb-perf/src/main/java/com/palantir/atlasdb/performance/benchmarks/KvsGetCandidateCellsForSweepingBenchmarks.java index 5131ea11485..317d8d5ebf9 100644 --- a/atlasdb-perf/src/main/java/com/palantir/atlasdb/performance/benchmarks/KvsGetCandidateCellsForSweepingBenchmarks.java +++ b/atlasdb-perf/src/main/java/com/palantir/atlasdb/performance/benchmarks/KvsGetCandidateCellsForSweepingBenchmarks.java @@ -75,7 +75,6 @@ private int fullTableScan(ConsecutiveNarrowTable table, boolean thorough) { CandidateCellForSweepingRequest request = ImmutableCandidateCellForSweepingRequest.builder() .startRowInclusive(PtBytes.EMPTY_BYTE_ARRAY) .batchSizeHint(1000) - .minUncommittedStartTimestamp(Long.MIN_VALUE) .sweepTimestamp(Long.MAX_VALUE) .shouldCheckIfLatestValueIsEmpty(thorough) .timestampsToIgnore(thorough ? new long[] {} : new long[] { Value.INVALID_VALUE_TIMESTAMP }) diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractGetCandidateCellsForSweepingTest.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractGetCandidateCellsForSweepingTest.java index dd96c21f4d8..41d37517932 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractGetCandidateCellsForSweepingTest.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractGetCandidateCellsForSweepingTest.java @@ -68,50 +68,45 @@ public static void closeKvs() { } @Test - public void returnCandidateIfPossiblyUncommittedTimestamp() { - new TestDataBuilder().put(1, 1, 10L).store(); - assertThat(getAllCandidates(conservativeRequest(PtBytes.EMPTY_BYTE_ARRAY, 40L, 5L))) - .containsExactly(ImmutableCandidateCellForSweeping.builder() - .cell(cell(1, 1)) - .sortedTimestamps(new long[] { 10L }) - .isLatestValueEmpty(false) - .numCellsTsPairsExamined(1) - .build()); - } - - @Test - public void doNotReturnCandidateIfOnlyCommittedTimestamp() { - new TestDataBuilder().put(1, 1, 10L).store(); - assertThat(getAllCandidates(conservativeRequest(PtBytes.EMPTY_BYTE_ARRAY, 40L, 30L))).isEmpty(); - } - - @Test - public void returnCandidateIfTwoCommittedTimestamps() { - new TestDataBuilder().put(1, 1, 10L).put(1, 1, 20L).store(); - assertThat(getAllCandidates(conservativeRequest(PtBytes.EMPTY_BYTE_ARRAY, 40L, 30L))) - .containsExactly(ImmutableCandidateCellForSweeping.builder() - .cell(cell(1, 1)) - .sortedTimestamps(new long[] { 10L, 20L }) - .isLatestValueEmpty(false) - .numCellsTsPairsExamined(2) - .build()); - } - - @Test - public void doNotReturnCandidateWithCommitedEmptyValueIfConservative() { - new TestDataBuilder().putEmpty(1, 1, 10L).store(); - assertThat(getAllCandidates(conservativeRequest(PtBytes.EMPTY_BYTE_ARRAY, 40L, 30L))).isEmpty(); + public void singleCellSpanningSeveralPages() { + new TestDataBuilder() + .put(10, 1, 1000) + .put(10, 1, 1001) + .put(10, 1, 1002) + .put(10, 1, 1003) + .put(10, 1, 1004) + .store(); + List cells = getAllCandidates( + conservativeRequest(PtBytes.EMPTY_BYTE_ARRAY, 2000L, 2)); + assertEquals(ImmutableList.of(ImmutableCandidateCellForSweeping.builder() + .cell(cell(10, 1)) + .isLatestValueEmpty(false) + .numCellsTsPairsExamined(5) + .sortedTimestamps(1000L, 1001L, 1002L, 1003L, 1004L) + .build()), cells); } @Test - public void returnCandidateWithCommitedEmptyValueIfThorough() { - new TestDataBuilder().putEmpty(1, 1, 10L).store(); - assertThat(getAllCandidates(thoroughRequest(PtBytes.EMPTY_BYTE_ARRAY, 40L, 30L))) - .containsExactly(ImmutableCandidateCellForSweeping.builder() + public void reportLatestEmptyValue() { + new TestDataBuilder() + .putEmpty(1, 1, 10L) + .put(1, 1, 5L) + .put(2, 2, 9L) + .putEmpty(2, 2, 4L) + .store(); + assertThat(getAllCandidates(thoroughRequest(PtBytes.EMPTY_BYTE_ARRAY, 40L))) + .containsExactly( + ImmutableCandidateCellForSweeping.builder() .cell(cell(1, 1)) - .sortedTimestamps(new long[] { 10L }) + .sortedTimestamps(5L, 10L) .isLatestValueEmpty(true) - .numCellsTsPairsExamined(1) + .numCellsTsPairsExamined(2) + .build(), + ImmutableCandidateCellForSweeping.builder() + .cell(cell(2, 2)) + .sortedTimestamps(4L, 9L) + .isLatestValueEmpty(false) + .numCellsTsPairsExamined(4) .build()); } @@ -125,7 +120,7 @@ public void returnCellsInOrder() { .putEmpty(3, 2, 10L) .putEmpty(3, 3, 10L) .store(); - assertThat(getAllCandidates(conservativeRequest(PtBytes.EMPTY_BYTE_ARRAY, 30L, 5L)) + assertThat(getAllCandidates(conservativeRequest(PtBytes.EMPTY_BYTE_ARRAY, 30L, 100)) .stream().map(CandidateCellForSweeping::cell).collect(Collectors.toList())) .containsExactly(cell(1, 1), cell(1, 2), cell(2, 2), cell(3, 1), cell(3, 2), cell(3, 3)); } @@ -140,13 +135,22 @@ public void startFromGivenRow() { .putEmpty(3, 1, 10L) .putEmpty(3, 2, 10L) .store(); - assertThat(getAllCandidates(conservativeRequest(cell(2, 2).getRowName(), 30L, 5L)) + assertThat(getAllCandidates(conservativeRequest(cell(2, 2).getRowName(), 30L, 100)) .stream().map(CandidateCellForSweeping::cell).collect(Collectors.toList())) .containsExactly(cell(2, 1), cell(2, 2), cell(3, 1), cell(3, 2)); } @Test - public void largerTableWithSmallBatchSizeReturnsCorrectResults() { + public void largerTableWithSmallBatchSizeReturnsCorrectResultsConservative() { + doTestLargerTable(false); + } + + @Test + public void largerTableWithSmallBatchSizeReturnsCorrectResultsThorough() { + doTestLargerTable(true); + } + + private void doTestLargerTable(boolean checkIfLatestValueIsEmpty) { TestDataBuilder builder = new TestDataBuilder(); List expectedCells = Lists.newArrayList(); for (int rowNum = 1; rowNum <= 50; ++rowNum) { @@ -161,13 +165,12 @@ public void largerTableWithSmallBatchSizeReturnsCorrectResults() { builder.store(); List candidates = getAllCandidates( ImmutableCandidateCellForSweepingRequest.builder() - .startRowInclusive(PtBytes.EMPTY_BYTE_ARRAY) - .sweepTimestamp(40L) - .minUncommittedStartTimestamp(1L) - .shouldCheckIfLatestValueIsEmpty(false) - .timestampsToIgnore(Value.INVALID_VALUE_TIMESTAMP) - .batchSizeHint(1) - .build()); + .startRowInclusive(PtBytes.EMPTY_BYTE_ARRAY) + .sweepTimestamp(40L) + .shouldCheckIfLatestValueIsEmpty(checkIfLatestValueIsEmpty) + .timestampsToIgnore(Value.INVALID_VALUE_TIMESTAMP) + .batchSizeHint(1) + .build()); assertEquals(expectedCells, candidates.stream().map(CandidateCellForSweeping::cell).collect(Collectors.toList())); } @@ -184,23 +187,20 @@ private List getAllCandidates(CandidateCellForSweeping protected static CandidateCellForSweepingRequest conservativeRequest(byte[] startRow, long sweepTs, - long minUncommittedTs) { + int batchSizeHint) { return ImmutableCandidateCellForSweepingRequest.builder() .startRowInclusive(startRow) .sweepTimestamp(sweepTs) - .minUncommittedStartTimestamp(minUncommittedTs) .shouldCheckIfLatestValueIsEmpty(false) .timestampsToIgnore(Value.INVALID_VALUE_TIMESTAMP) + .batchSizeHint(batchSizeHint) .build(); } - protected static CandidateCellForSweepingRequest thoroughRequest(byte[] startRow, - long sweepTs, - long minUncommittedTs) { + protected static CandidateCellForSweepingRequest thoroughRequest(byte[] startRow, long sweepTs) { return ImmutableCandidateCellForSweepingRequest.builder() .startRowInclusive(startRow) .sweepTimestamp(sweepTs) - .minUncommittedStartTimestamp(minUncommittedTs) .shouldCheckIfLatestValueIsEmpty(true) .timestampsToIgnore() .build(); From 77b8c16151a1576b564edd3b4aaf1dda5cfa108e Mon Sep 17 00:00:00 2001 From: Greg Bonik Date: Mon, 9 Oct 2017 12:15:10 -0700 Subject: [PATCH 2/2] Int -> long --- .../impl/AbstractGetCandidateCellsForSweepingTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractGetCandidateCellsForSweepingTest.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractGetCandidateCellsForSweepingTest.java index 41d37517932..2e639ceda2e 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractGetCandidateCellsForSweepingTest.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractGetCandidateCellsForSweepingTest.java @@ -70,11 +70,11 @@ public static void closeKvs() { @Test public void singleCellSpanningSeveralPages() { new TestDataBuilder() - .put(10, 1, 1000) - .put(10, 1, 1001) - .put(10, 1, 1002) - .put(10, 1, 1003) - .put(10, 1, 1004) + .put(10, 1, 1000L) + .put(10, 1, 1001L) + .put(10, 1, 1002L) + .put(10, 1, 1003L) + .put(10, 1, 1004L) .store(); List cells = getAllCandidates( conservativeRequest(PtBytes.EMPTY_BYTE_ARRAY, 2000L, 2));