Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
DbKvs sweep refactors to address last PR comments (#2497)
Browse files Browse the repository at this point in the history
* DbKvs sweep refactors to address last PR comments

As dxiao pointed out, CandidatePagingState does too many things. The
solution is to make CellTsPairLoader return an iterator and move all
paging logic inside that iterator, and remove CellTsPairLoaderFactory.
Then we replace CandidatePagingState with CandidateGroupingIterator,
which now doesn't have any paging logic and only takes care of grouping
(cell, ts) pairs by cell.

Also remove CandidatePageJoiningIterator because it's not used anymore.
We could have done this earlier but I forgot.

Plan for the future:
- Since we gave up on the in-database filtering idea, we can replace
  KVS.getCandidateCellsForSweeping() with a simpler call like
  getCellTsPairs() which would do what CellTsPairLoader does now.
- Implement the new call for the remaining KVS's. We need at least
  the InMemoryKVS. We should decide the destiny of JdbcKVS and CqlKVS.
- Remove all remaining usages of getRangeOfTimestamps(). I think that's
  basically deleteRange() on Cassandra.
- Remove KVS.getRangeOfTimestamps()!

[no release notes]

* Renames

* Create OracleCellTsPageLoader.PageIterator.Token class

* Create PostgresCellTsPageLaoder.PageIterator.Token

* Fix Precondition

having-next-ness is a state, not an argument

* One Token To Rull Them All

Well... two, actually. This one is a CellTsPairToken. We already had a
Token class in DbKvs :-/

* Make CellTsPairToken final; replace modification with factory methods

* Have computeNextStartPosition return a token

* Remove the TODO

I don't think we can do it nicely

* empty commit

* Nits

* Move duplicated precondition to CellTsPairToken and add error message
* Renames

* Make CellTsPairToken immutable

* Sort timestamps array, just in case
  • Loading branch information
gbonik authored and gsheasby committed Oct 18, 2017
1 parent 13a2b81 commit 53b8cbf
Show file tree
Hide file tree
Showing 13 changed files with 519 additions and 757 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,16 @@
import com.palantir.atlasdb.keyvalue.dbkvs.impl.batch.BatchingTaskRunner;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.batch.ImmediateSingleBatchTaskRunner;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.batch.ParallelTaskRunner;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.oracle.OracleCellTsPageLoaderFactory;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.oracle.OracleCellTsPageLoader;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.oracle.OracleGetRange;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.oracle.OracleOverflowValueLoader;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres.DbkvsVersionException;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres.PostgresCellTsPageLoaderFactory;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres.PostgresCellTsPageLoader;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres.PostgresGetRange;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres.PostgresPrefixedTableNames;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.ranges.DbKvsGetRange;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.ranges.DbKvsGetRanges;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairLoaderFactory;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairLoader;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.DbKvsGetCandidateCellsForSweeping;
import com.palantir.atlasdb.keyvalue.dbkvs.util.DbKvsPartitioners;
import com.palantir.atlasdb.keyvalue.impl.AbstractKeyValueService;
Expand Down Expand Up @@ -182,7 +182,7 @@ private static DbKvs createPostgres(ExecutorService executor,
PostgresPrefixedTableNames prefixedTableNames = new PostgresPrefixedTableNames(config);
DbTableFactory tableFactory = new PostgresDbTableFactory(config, prefixedTableNames);
TableMetadataCache tableMetadataCache = new TableMetadataCache(tableFactory);
CellTsPairLoaderFactory cellTsPairLoaderFactory = new PostgresCellTsPageLoaderFactory(
CellTsPairLoader cellTsPairLoader = new PostgresCellTsPageLoader(
prefixedTableNames, connections);
return new DbKvs(
executor,
Expand All @@ -192,7 +192,7 @@ private static DbKvs createPostgres(ExecutorService executor,
new ParallelTaskRunner(newFixedThreadPool(config.poolSize()), config.fetchBatchSize()),
(conns, tbl, ids) -> Collections.emptyMap(), // no overflow on postgres
new PostgresGetRange(prefixedTableNames, connections, tableMetadataCache),
new DbKvsGetCandidateCellsForSweeping(cellTsPairLoaderFactory));
new DbKvsGetCandidateCellsForSweeping(cellTsPairLoader));
}

private static DbKvs createOracle(ExecutorService executor,
Expand All @@ -204,7 +204,7 @@ private static DbKvs createOracle(ExecutorService executor,
OverflowValueLoader overflowValueLoader = new OracleOverflowValueLoader(oracleDdlConfig, tableNameGetter);
DbKvsGetRange getRange = new OracleGetRange(
connections, overflowValueLoader, tableNameGetter, valueStyleCache, oracleDdlConfig);
CellTsPairLoaderFactory cellTsPairLoaderFactory = new OracleCellTsPageLoaderFactory(
CellTsPairLoader cellTsPageLoader = new OracleCellTsPageLoader(
connections, tableNameGetter, valueStyleCache, oracleDdlConfig);
return new DbKvs(
executor,
Expand All @@ -214,7 +214,7 @@ private static DbKvs createOracle(ExecutorService executor,
new ImmediateSingleBatchTaskRunner(),
overflowValueLoader,
getRange,
new DbKvsGetCandidateCellsForSweeping(cellTsPairLoaderFactory));
new DbKvsGetCandidateCellsForSweeping(cellTsPageLoader));
}

private DbKvs(ExecutorService executor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@
package com.palantir.atlasdb.keyvalue.dbkvs.impl.oracle;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweepingRequest;
import com.palantir.atlasdb.keyvalue.api.RangeRequests;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.keyvalue.dbkvs.OracleDdlConfig;
import com.palantir.atlasdb.keyvalue.dbkvs.OracleTableNameGetter;
Expand All @@ -29,10 +34,9 @@
import com.palantir.atlasdb.keyvalue.dbkvs.impl.TableValueStyle;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.TableValueStyleCache;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.ranges.RangePredicateHelper;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CandidatePagingState.CellTsPairInfo;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CandidatePagingState.StartingPosition;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairInfo;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairLoader;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairLoaderFactory;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairToken;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.SweepQueryHelpers;
import com.palantir.atlasdb.keyvalue.impl.TableMappingNotFoundException;
import com.palantir.nexus.db.DBType;
Expand Down Expand Up @@ -73,62 +77,80 @@
// predicate and '(col_name > ? OR ts >= ?)' as the filter predicate. Once we exhaust all results
// in the row, we switch back to the "normal" mode, starting from the beginning of the lexicographically
// next row.
public class OracleCellTsPageLoaderFactory implements CellTsPairLoaderFactory {
public class OracleCellTsPageLoader implements CellTsPairLoader {
private final SqlConnectionSupplier connectionPool;
private final OracleTableNameGetter tableNameGetter;
private final TableValueStyleCache valueStyleCache;
private final OracleDdlConfig config;

private static final int DEFAULT_BATCH_SIZE = 1000;

public OracleCellTsPageLoaderFactory(SqlConnectionSupplier connectionPool,
OracleTableNameGetter tableNameGetter,
TableValueStyleCache valueStyleCache,
OracleDdlConfig config) {
public OracleCellTsPageLoader(SqlConnectionSupplier connectionPool,
OracleTableNameGetter tableNameGetter,
TableValueStyleCache valueStyleCache,
OracleDdlConfig config) {
this.connectionPool = connectionPool;
this.tableNameGetter = tableNameGetter;
this.valueStyleCache = valueStyleCache;
this.config = config;
}

@Override
public CellTsPairLoader createCellTsLoader(TableReference tableRef, CandidateCellForSweepingRequest request) {
public Iterator<List<CellTsPairInfo>> createPageIterator(TableReference tableRef,
CandidateCellForSweepingRequest request) {
TableDetails tableDetails = getTableDetailsUsingNewConnection(tableRef);
return new Loader(connectionPool, request, tableDetails, request.batchSizeHint().orElse(DEFAULT_BATCH_SIZE));
return new PageIterator(
connectionPool,
request,
tableDetails,
Math.max(1, request.batchSizeHint().orElse(DEFAULT_BATCH_SIZE)),
request.startRowInclusive());
}

private static class Loader implements CellTsPairLoader {
private final CandidateCellForSweepingRequest request;
private final TableDetails tableDetails;
private final int sqlRowLimit;
private final SqlConnectionSupplier connectionPool;
private static class PageIterator implements Iterator<List<CellTsPairInfo>> {
final SqlConnectionSupplier connectionPool;
final CandidateCellForSweepingRequest request;
final TableDetails tableDetails;
final int sqlRowLimit;

Loader(SqlConnectionSupplier connectionPool,
CandidateCellForSweepingRequest request,
TableDetails tableDetails,
int sqlRowLimit) {
CellTsPairToken token;
long cellTsPairsAlreadyExaminedInCurrentRow = 0L;

PageIterator(SqlConnectionSupplier connectionPool, CandidateCellForSweepingRequest request,
TableDetails tableDetails, int sqlRowLimit, byte[] startRowInclusive) {
this.connectionPool = connectionPool;
this.request = request;
this.tableDetails = tableDetails;
this.sqlRowLimit = sqlRowLimit;
this.token = CellTsPairToken.startRow(startRowInclusive);
}

@Override
public boolean hasNext() {
return !token.reachedEnd();
}

// We don't use AbstractIterator to make sure hasNext() is fast and doesn't actually load the next page.
// As a downside, our iterator can return an empty page at the end.
// However, we can just filter out empty pages later.
@Override
public Page loadNextPage(StartingPosition startInclusive,
long cellsAlreadyExaminedInStartingRow) {
boolean singleRow = shouldScanSingleRow(cellsAlreadyExaminedInStartingRow);
List<CellTsPairInfo> cellTsPairs = loadPage(startInclusive, singleRow);
return new Page(cellTsPairs, singleRow, cellTsPairs.size() < sqlRowLimit);
public List<CellTsPairInfo> next() {
Preconditions.checkState(hasNext());
boolean singleRow = shouldScanSingleRow();
List<CellTsPairInfo> cellTsPairs = loadPage(singleRow);
updateCountOfExaminedCellTsPairsInCurrentRow(cellTsPairs);
token = computeNextStartPosition(cellTsPairs, singleRow);
return cellTsPairs;
}

private boolean shouldScanSingleRow(long cellsAlreadyExaminedInCurrentRow) {
private boolean shouldScanSingleRow() {
// The idea is that we don't want to throw away more than N database rows in order to get N results.
// This only matters for tables with wide rows. We could tweak this.
return cellsAlreadyExaminedInCurrentRow > sqlRowLimit;
return cellTsPairsAlreadyExaminedInCurrentRow > sqlRowLimit;
}

private List<CellTsPairInfo> loadPage(StartingPosition startInclusive, boolean singleRow) {
FullQuery query = getQuery(startInclusive, singleRow);
private List<CellTsPairInfo> loadPage(boolean singleRow) {
FullQuery query = getQuery(singleRow);
try (ConnectionSupplier conns = new ConnectionSupplier(connectionPool);
AgnosticLightResultSet resultSet = executeQuery(conns.get(), query)) {
List<CellTsPairInfo> ret = new ArrayList<>();
Expand All @@ -143,14 +165,7 @@ private List<CellTsPairInfo> loadPage(StartingPosition startInclusive, boolean s
}
}

private AgnosticLightResultSet executeQuery(SqlConnection conn, FullQuery query) {
return conn.selectLightResultSetUnregisteredQueryWithFetchSize(
query.getQuery(),
sqlRowLimit,
query.getArgs());
}

private FullQuery getQuery(StartingPosition startingPos, boolean singleRow) {
private FullQuery getQuery(boolean singleRow) {
String pkIndex = PrimaryKeyConstraintNames.get(tableDetails.shortName);
FullQuery.Builder queryBuilder = FullQuery.builder()
.append("/* GET_CANDIDATE_CELLS_FOR_SWEEPING */ ")
Expand All @@ -166,26 +181,31 @@ private FullQuery getQuery(StartingPosition startingPos, boolean singleRow) {
.append(" FROM ").append(tableDetails.shortName).append(" t")
.append(" WHERE ts < ? ", request.sweepTimestamp());
SweepQueryHelpers.appendIgnoredTimestampPredicate(request, queryBuilder);
appendRangePredicates(startingPos, singleRow, queryBuilder);
appendRangePredicates(singleRow, queryBuilder);
return queryBuilder
.append(" ORDER BY row_name, col_name, ts")
.append(") WHERE rownum <= ").append(sqlRowLimit)
.append(" ORDER BY row_name, col_name, ts")
.build();
}

private void appendRangePredicates(StartingPosition startingPos, boolean singleRow, FullQuery.Builder builder) {
private void appendRangePredicates(boolean singleRow, FullQuery.Builder builder) {
if (singleRow) {
builder.append(" AND row_name = ?", startingPos.rowName);
if (startingPos.colName.length > 0) {
builder.append(" AND col_name >= ?", startingPos.colName);
if (startingPos.timestamp != null) {
builder.append(" AND (col_name > ? OR ts >= ?)", startingPos.colName, startingPos.timestamp);
builder.append(" AND row_name = ?", token.startRowInclusive());
if (token.startColInclusive().length > 0) {
builder.append(" AND col_name >= ?", token.startColInclusive());
if (token.startTsInclusive() != null) {
builder.append(" AND (col_name > ? OR ts >= ?)",
token.startColInclusive(),
token.startTsInclusive());
}
}
} else {
RangePredicateHelper.create(false, DBType.ORACLE, builder)
.startCellTsInclusive(startingPos.rowName, startingPos.colName, startingPos.timestamp);
.startCellTsInclusive(
token.startRowInclusive(),
token.startColInclusive(),
token.startTsInclusive());
}
}

Expand All @@ -199,6 +219,46 @@ private void appendEmptyValueFlagSelector(FullQuery.Builder builder) {
}
builder.append(" THEN 1 ELSE 0 END AS empty_val");
}

private AgnosticLightResultSet executeQuery(SqlConnection conn, FullQuery query) {
return conn.selectLightResultSetUnregisteredQueryWithFetchSize(
query.getQuery(),
sqlRowLimit,
query.getArgs());
}

private void updateCountOfExaminedCellTsPairsInCurrentRow(List<CellTsPairInfo> results) {
byte[] currentRow = token.startRowInclusive();
for (CellTsPairInfo cellTs : results) {
if (!Arrays.equals(currentRow, cellTs.rowName)) {
currentRow = cellTs.rowName;
cellTsPairsAlreadyExaminedInCurrentRow = 0L;
}
cellTsPairsAlreadyExaminedInCurrentRow += 1;
}
}

private CellTsPairToken computeNextStartPosition(List<CellTsPairInfo> results, boolean scannedSingleRow) {
if (results.size() < sqlRowLimit) {
if (scannedSingleRow) {
// If we scanned a single row and reached the end, we just restart
// from the lexicographically next row
byte[] nextRow = RangeRequests.getNextStartRowUnlessTerminal(false, token.startRowInclusive());
if (nextRow == null) {
return CellTsPairToken.end();
} else {
cellTsPairsAlreadyExaminedInCurrentRow = 0L;
return CellTsPairToken.startRow(nextRow);
}
} else {
return CellTsPairToken.end();
}
} else {
CellTsPairInfo lastResult = Iterables.getLast(results);
return CellTsPairToken.continueRow(lastResult);
}
}

}

private static class TableDetails {
Expand All @@ -209,7 +269,6 @@ private static class TableDetails {
this.shortName = shortName;
this.hasOverflow = hasOverflow;
}

}

private TableDetails getTableDetailsUsingNewConnection(TableReference tableRef) {
Expand Down

This file was deleted.

Loading

0 comments on commit 53b8cbf

Please sign in to comment.