Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Simplify getCandidateCellsForSweeping() #2439

Merged
merged 2 commits into from
Oct 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,6 @@ public interface CandidateCellForSweepingRequest {

OptionalInt batchSizeHint();

/**
* This can be used in the future when we implement the 'transaction table sweeping' feature.
* This should be set to the timestamp T such that all transactions with start timestamps less than T that
* appear in the given table are known to be committed. The number T can come from the previous run of sweep
* for the table.
*
* This enables in-database pre-filtering of cells that should be considered for sweeping.
* For example, if a cell has exactly one timestamp and this timestamp is known to belong to a committed
* transaction, then the cell doesn't need to be swept, and therefore we can avoid sending it over the network
* from the DB to the sweeper process.
*/
long minUncommittedStartTimestamp();

/**
* Only start timestamps that are strictly below this number will be considered.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,30 +450,10 @@ ClosableIterator<RowResult<Set<Long>>> getRangeOfTimestamps(

/**
* For a given range of rows, returns all candidate cells for sweeping (and their timestamps).
* Here is the precise definition of a candidate cell:
* <blockquote>
* Let {@code Ts} be {@code request.sweepTimestamp()}<br>
* Let {@code Tu} be {@code request.minUncommittedTimestamp()}<br>
* Let {@code V} be {@code request.shouldCheckIfLatestValueIsEmpty()}<br>
* Let {@code Ti} be set of timestamps in {@code request.timestampsToIgnore()}<br>
* <p>
* Consider a cell {@code C}. Let {@code Tc} be the set of all timestamps for {@code C} that are strictly
* less than {@code Ts}. Let {@code T} be {@code Tc \ Ti} (i.e. the cell timestamps minus the ignored
* timestamps).
* <p>
* Then {@code C} is a candidate for sweeping if and only if at least one of
* the following conditions is true:
* <ol>
* <li> The set {@code T} has more than one element
* <li> The set {@code T} contains an element that is greater than or equal to {@code Tu}
* (that is, there is a timestamp that can possibly come from an uncommitted or aborted transaction)
* <li> The set {@code T} contains {@link Value#INVALID_VALUE_TIMESTAMP}
* (that is, there is a sentinel we can possibly clean up)
* <li> {@code V} is true and the cell value corresponding to the maximum element of {@code T} is empty
* (that is, the latest sweepable value is a 'soft-delete' tombstone)
* </ol>
*
* </blockquote>
* <p>
* A candidate cell is a cell that has at least one timestamp that is less than request.sweepTimestamp() and is
* not in the set specified by request.timestampsToIgnore().
* <p>
* This method will scan the semi-open range of rows from the start row specified in the {@code request}
* to the end of the table. If the given start row name is an empty byte array, the whole table will be
* scanned.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import com.palantir.atlasdb.keyvalue.api.Cell;
import com.palantir.atlasdb.keyvalue.api.ImmutableCandidateCellForSweeping;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.keyvalue.api.Value;
import com.palantir.atlasdb.keyvalue.cassandra.CassandraKeyValueServices;
import com.palantir.atlasdb.keyvalue.cassandra.paging.CassandraRawCellValue;
import com.palantir.atlasdb.keyvalue.cassandra.paging.CellPager;
Expand All @@ -46,8 +45,6 @@
*/
public class CassandraGetCandidateCellsForSweepingImpl {

private static final long[] EMPTY_LONG_ARRAY = new long[0];

private final CellPager cellPager;

public CassandraGetCandidateCellsForSweepingImpl(CellPager cellPager) {
Expand Down Expand Up @@ -133,8 +130,8 @@ private Optional<CandidateCellForSweeping> processColumn(CassandraRawCellValue c
}

private CandidateCellForSweeping createCandidate() {
boolean isCandidate = isCandidate();
long[] sortedTimestamps = isCandidate ? sortTimestamps() : EMPTY_LONG_ARRAY;
currentTimestamps.reverse();
long[] sortedTimestamps = currentTimestamps.toArray();
currentTimestamps.clear();
return ImmutableCandidateCellForSweeping.builder()
.cell(currentCell)
Expand All @@ -143,21 +140,6 @@ private CandidateCellForSweeping createCandidate() {
.numCellsTsPairsExamined(numCellTsPairsExamined)
.build();
}

private long[] sortTimestamps() {
currentTimestamps.reverse();
return currentTimestamps.toArray();
}

private boolean isCandidate() {
return currentTimestamps.size() > 1
|| currentLatestValEmpty
|| (currentTimestamps.size() == 1 && timestampIsPotentiallySweepable(currentTimestamps.get(0)));
}

private boolean timestampIsPotentiallySweepable(long ts) {
return ts == Value.INVALID_VALUE_TIMESTAMP || ts >= request.minUncommittedStartTimestamp();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,9 @@ public ClosableIterator<List<CandidateCellForSweeping>> getCandidateCellsForSwee
Cell cell = Cell.create(rr.getRowName(), colName);
boolean latestValEmpty = isLatestValueEmpty(cell, peekingValues);
numExamined.add(timestampArr.length);
boolean candidate = isCandidate(timestampArr, latestValEmpty, request);
candidateBatch.add(ImmutableCandidateCellForSweeping.builder()
.cell(cell)
.sortedTimestamps(candidate ? timestampArr : EMPTY_LONG_ARRAY)
.sortedTimestamps(timestampArr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to check if we should still be returning EMPTY_LONG_ARRAY if there is just one timestamp - I see that case excluded at SweepableCellFilter.getCellToSweep, so looks ok to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if there is one timestamp, you still might need to sweep it (if that timestamp is uncommitted)

.isLatestValueEmpty(latestValEmpty)
.numCellsTsPairsExamined(numExamined.longValue())
.build());
Expand All @@ -109,18 +108,6 @@ private static Closer createCloserAndRelease(ReleasableCloseable<?>... closeable
return closer;
}

private static boolean isCandidate(long[] timestamps,
boolean lastValEmpty,
CandidateCellForSweepingRequest request) {
return timestamps.length > 1
|| (request.shouldCheckIfLatestValueIsEmpty() && lastValEmpty)
|| (timestamps.length == 1 && timestampIsPotentiallySweepable(timestamps[0], request));
}

private static boolean timestampIsPotentiallySweepable(long ts, CandidateCellForSweepingRequest request) {
return ts == Value.INVALID_VALUE_TIMESTAMP || ts >= request.minUncommittedStartTimestamp();
}

private ClosableIterator<RowResult<Value>> getValues(TableReference tableRef,
RangeRequest range,
long sweepTs,
Expand Down Expand Up @@ -175,5 +162,4 @@ public void close() {
}
}

private static final long[] EMPTY_LONG_ARRAY = new long[0];
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ private static Callable<Boolean> canCreateKeyValueService() {
kvs = ConnectionManagerAwareDbKvs.create(getKvsConfig());
return kvs.getConnectionManager().getConnection().isValid(5);
} catch (Exception ex) {
if (ex.getMessage().contains("The connection attempt failed.")) {
if (ex.getMessage().contains("The connection attempt failed.")
|| ex.getMessage().contains("the database system is starting up")) {
return false;
} else {
throw ex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,113 +16,18 @@

package com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres;

import static org.junit.Assert.assertEquals;

import java.util.List;

import org.junit.Test;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.palantir.atlasdb.encoding.PtBytes;
import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweeping;
import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweepingRequest;
import com.palantir.atlasdb.keyvalue.api.ImmutableCandidateCellForSweeping;
import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.keyvalue.dbkvs.DbKeyValueServiceConfig;
import com.palantir.atlasdb.keyvalue.dbkvs.DbkvsPostgresTestSuite;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.ConnectionManagerAwareDbKvs;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.SqlConnectionSupplier;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.DbKvsGetCandidateCellsForSweeping;
import com.palantir.atlasdb.keyvalue.impl.AbstractGetCandidateCellsForSweepingTest;
import com.palantir.common.base.ClosableIterator;

public class DbKvsPostgresGetCandidateCellsForSweepingTest extends AbstractGetCandidateCellsForSweepingTest {

private static DbKeyValueServiceConfig config;
private static SqlConnectionSupplier connectionSupplier;

@Override
protected KeyValueService createKeyValueService() {
config = DbkvsPostgresTestSuite.getKvsConfig();
ConnectionManagerAwareDbKvs kvs = ConnectionManagerAwareDbKvs.create(config);
connectionSupplier = kvs.getSqlConnectionSupplier();
return kvs;
}

@Test
public void singleCellSpanningSeveralPages() {
new TestDataBuilder()
.put(10, 1, 1000)
.put(10, 1, 1001)
.put(10, 1, 1002)
.put(10, 1, 1003)
.put(10, 1, 1004)
.store();
List<CandidateCellForSweeping> cells = getWithOverriddenLimit(
conservativeRequest(PtBytes.EMPTY_BYTE_ARRAY, 2000L, Long.MIN_VALUE), 2);
assertEquals(ImmutableList.of(ImmutableCandidateCellForSweeping.builder()
.cell(cell(10, 1))
.isLatestValueEmpty(false)
.numCellsTsPairsExamined(5)
.sortedTimestamps(1000L, 1001L, 1002L, 1003L, 1004L)
.build()), cells);
}

@Test
public void returnFirstAndLastCellOfThePage() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test still seems valuable, why did we get rid of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tested implementation-specific behavior that doesn't exist anymore

new TestDataBuilder()
.put(10, 1, 1000)
.put(10, 2, 400)
// The cell (20, 1) is not a candidate because the minimumUncommittedTimestamp is 750, which is greater
// than 500. However, we still need to return this cell since it's at the page boundary.
.put(20, 1, 500)
// <---- page boundary here ---->
// Again, this cell is not a candidate, but we need to return it
// since it's the first SQL row in the page.
.put(30, 1, 600)
.store();
List<CandidateCellForSweeping> cells = getWithOverriddenLimit(
conservativeRequest(PtBytes.EMPTY_BYTE_ARRAY, 2000L, 750L), 3);
assertEquals(
ImmutableList.of(
ImmutableCandidateCellForSweeping.builder()
.cell(cell(10, 1))
.isLatestValueEmpty(false)
.numCellsTsPairsExamined(1)
.sortedTimestamps(1000L)
.build(),
ImmutableCandidateCellForSweeping.builder()
.cell(cell(20, 1))
.isLatestValueEmpty(false)
.numCellsTsPairsExamined(3)
// No timestamps because the cell is not a real candidate
.sortedTimestamps()
.build(),
ImmutableCandidateCellForSweeping.builder()
.cell(cell(30, 1))
.isLatestValueEmpty(false)
.numCellsTsPairsExamined(4)
// No timestamps because the cell is not a real candidate
.sortedTimestamps()
.build()),
cells);
}

private List<CandidateCellForSweeping> getWithOverriddenLimit(
CandidateCellForSweepingRequest request,
int sqlRowLimitOverride) {
try (ClosableIterator<List<CandidateCellForSweeping>> iter = createImpl(sqlRowLimitOverride)
.getCandidateCellsForSweeping(TEST_TABLE, request, null)) {
return ImmutableList.copyOf(Iterators.concat(Iterators.transform(iter, List::iterator)));
}
}

private DbKvsGetCandidateCellsForSweeping createImpl(int sqlRowLimitOverride) {
return new PostgresGetCandidateCellsForSweeping(
new PostgresPrefixedTableNames(config.ddl()),
connectionSupplier,
x -> sqlRowLimitOverride);
DbKeyValueServiceConfig config = DbkvsPostgresTestSuite.getKvsConfig();
return ConnectionManagerAwareDbKvs.create(config);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private static DbKvs createPostgres(ExecutorService executor,
new ParallelTaskRunner(newFixedThreadPool(config.poolSize()), config.fetchBatchSize()),
(conns, tbl, ids) -> Collections.emptyMap(), // no overflow on postgres
new PostgresGetRange(prefixedTableNames, connections, tableMetadataCache),
PostgresGetCandidateCellsForSweeping.create(prefixedTableNames, connections));
new PostgresGetCandidateCellsForSweeping(prefixedTableNames, connections));
}

private static DbKvs createOracle(ExecutorService executor,
Expand Down
Loading