Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

DbKvs sweep refactors to address last PR comments #2497

Merged
merged 13 commits into from
Oct 18, 2017
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,16 @@
import com.palantir.atlasdb.keyvalue.dbkvs.impl.batch.BatchingTaskRunner;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.batch.ImmediateSingleBatchTaskRunner;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.batch.ParallelTaskRunner;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.oracle.OracleCellTsPageLoaderFactory;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.oracle.OracleCellTsPageLoader;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.oracle.OracleGetRange;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.oracle.OracleOverflowValueLoader;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres.DbkvsVersionException;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres.PostgresCellTsPageLoaderFactory;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres.PostgresCellTsPageLoader;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres.PostgresGetRange;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres.PostgresPrefixedTableNames;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.ranges.DbKvsGetRange;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.ranges.DbKvsGetRanges;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairLoaderFactory;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairLoader;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.DbKvsGetCandidateCellsForSweeping;
import com.palantir.atlasdb.keyvalue.dbkvs.util.DbKvsPartitioners;
import com.palantir.atlasdb.keyvalue.impl.AbstractKeyValueService;
Expand Down Expand Up @@ -182,7 +182,7 @@ private static DbKvs createPostgres(ExecutorService executor,
PostgresPrefixedTableNames prefixedTableNames = new PostgresPrefixedTableNames(config);
DbTableFactory tableFactory = new PostgresDbTableFactory(config, prefixedTableNames);
TableMetadataCache tableMetadataCache = new TableMetadataCache(tableFactory);
CellTsPairLoaderFactory cellTsPairLoaderFactory = new PostgresCellTsPageLoaderFactory(
CellTsPairLoader cellTsPairLoader = new PostgresCellTsPageLoader(
prefixedTableNames, connections);
return new DbKvs(
executor,
Expand All @@ -192,7 +192,7 @@ private static DbKvs createPostgres(ExecutorService executor,
new ParallelTaskRunner(newFixedThreadPool(config.poolSize()), config.fetchBatchSize()),
(conns, tbl, ids) -> Collections.emptyMap(), // no overflow on postgres
new PostgresGetRange(prefixedTableNames, connections, tableMetadataCache),
new DbKvsGetCandidateCellsForSweeping(cellTsPairLoaderFactory));
new DbKvsGetCandidateCellsForSweeping(cellTsPairLoader));
}

private static DbKvs createOracle(ExecutorService executor,
Expand All @@ -204,7 +204,7 @@ private static DbKvs createOracle(ExecutorService executor,
OverflowValueLoader overflowValueLoader = new OracleOverflowValueLoader(oracleDdlConfig, tableNameGetter);
DbKvsGetRange getRange = new OracleGetRange(
connections, overflowValueLoader, tableNameGetter, valueStyleCache, oracleDdlConfig);
CellTsPairLoaderFactory cellTsPairLoaderFactory = new OracleCellTsPageLoaderFactory(
CellTsPairLoader cellTsPageLoader = new OracleCellTsPageLoader(
connections, tableNameGetter, valueStyleCache, oracleDdlConfig);
return new DbKvs(
executor,
Expand All @@ -214,7 +214,7 @@ private static DbKvs createOracle(ExecutorService executor,
new ImmediateSingleBatchTaskRunner(),
overflowValueLoader,
getRange,
new DbKvsGetCandidateCellsForSweeping(cellTsPairLoaderFactory));
new DbKvsGetCandidateCellsForSweeping(cellTsPageLoader));
}

private DbKvs(ExecutorService executor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@
package com.palantir.atlasdb.keyvalue.dbkvs.impl.oracle;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweepingRequest;
import com.palantir.atlasdb.keyvalue.api.RangeRequests;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.keyvalue.dbkvs.OracleDdlConfig;
import com.palantir.atlasdb.keyvalue.dbkvs.OracleTableNameGetter;
Expand All @@ -29,10 +34,9 @@
import com.palantir.atlasdb.keyvalue.dbkvs.impl.TableValueStyle;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.TableValueStyleCache;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.ranges.RangePredicateHelper;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CandidatePagingState.CellTsPairInfo;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CandidatePagingState.StartingPosition;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairInfo;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairLoader;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairLoaderFactory;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairToken;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.SweepQueryHelpers;
import com.palantir.atlasdb.keyvalue.impl.TableMappingNotFoundException;
import com.palantir.nexus.db.DBType;
Expand Down Expand Up @@ -73,62 +77,80 @@
// predicate and '(col_name > ? OR ts >= ?)' as the filter predicate. Once we exhaust all results
// in the row, we switch back to the "normal" mode, starting from the beginning of the lexicographically
// next row.
public class OracleCellTsPageLoaderFactory implements CellTsPairLoaderFactory {
public class OracleCellTsPageLoader implements CellTsPairLoader {
private final SqlConnectionSupplier connectionPool;
private final OracleTableNameGetter tableNameGetter;
private final TableValueStyleCache valueStyleCache;
private final OracleDdlConfig config;

private static final int DEFAULT_BATCH_SIZE = 1000;

public OracleCellTsPageLoaderFactory(SqlConnectionSupplier connectionPool,
OracleTableNameGetter tableNameGetter,
TableValueStyleCache valueStyleCache,
OracleDdlConfig config) {
public OracleCellTsPageLoader(SqlConnectionSupplier connectionPool,
OracleTableNameGetter tableNameGetter,
TableValueStyleCache valueStyleCache,
OracleDdlConfig config) {
this.connectionPool = connectionPool;
this.tableNameGetter = tableNameGetter;
this.valueStyleCache = valueStyleCache;
this.config = config;
}

@Override
public CellTsPairLoader createCellTsLoader(TableReference tableRef, CandidateCellForSweepingRequest request) {
public Iterator<List<CellTsPairInfo>> createPageIterator(TableReference tableRef,
CandidateCellForSweepingRequest request) {
TableDetails tableDetails = getTableDetailsUsingNewConnection(tableRef);
return new Loader(connectionPool, request, tableDetails, request.batchSizeHint().orElse(DEFAULT_BATCH_SIZE));
return new PageIterator(
connectionPool,
request,
tableDetails,
Math.max(1, request.batchSizeHint().orElse(DEFAULT_BATCH_SIZE)),
request.startRowInclusive());
}

private static class Loader implements CellTsPairLoader {
private final CandidateCellForSweepingRequest request;
private final TableDetails tableDetails;
private final int sqlRowLimit;
private final SqlConnectionSupplier connectionPool;
private static class PageIterator implements Iterator<List<CellTsPairInfo>> {
final SqlConnectionSupplier connectionPool;
final CandidateCellForSweepingRequest request;
final TableDetails tableDetails;
final int sqlRowLimit;

Loader(SqlConnectionSupplier connectionPool,
CandidateCellForSweepingRequest request,
TableDetails tableDetails,
int sqlRowLimit) {
CellTsPairToken token;
long cellTsPairsAlreadyExaminedInCurrentRow = 0L;

PageIterator(SqlConnectionSupplier connectionPool, CandidateCellForSweepingRequest request,
TableDetails tableDetails, int sqlRowLimit, byte[] startRowInclusive) {
this.connectionPool = connectionPool;
this.request = request;
this.tableDetails = tableDetails;
this.sqlRowLimit = sqlRowLimit;
this.token = CellTsPairToken.startRow(startRowInclusive);
}

@Override
public boolean hasNext() {
return !token.reachedEnd();
}

// We don't use AbstractIterator to make sure hasNext() is fast and doesn't actually load the next page.
// As a downside, our iterator can return an empty page at the end.
// However, we can just filter out empty pages later.
@Override
public Page loadNextPage(StartingPosition startInclusive,
long cellsAlreadyExaminedInStartingRow) {
boolean singleRow = shouldScanSingleRow(cellsAlreadyExaminedInStartingRow);
List<CellTsPairInfo> cellTsPairs = loadPage(startInclusive, singleRow);
return new Page(cellTsPairs, singleRow, cellTsPairs.size() < sqlRowLimit);
public List<CellTsPairInfo> next() {
Preconditions.checkState(hasNext());
boolean singleRow = shouldScanSingleRow();
List<CellTsPairInfo> cellTsPairs = loadPage(singleRow);
updateCountOfExaminedCellTsPairsInCurrentRow(cellTsPairs);
token = computeNextStartPosition(cellTsPairs, singleRow);
return cellTsPairs;
}

private boolean shouldScanSingleRow(long cellsAlreadyExaminedInCurrentRow) {
private boolean shouldScanSingleRow() {
// The idea is that we don't want to throw away more than N database rows in order to get N results.
// This only matters for tables with wide rows. We could tweak this.
return cellsAlreadyExaminedInCurrentRow > sqlRowLimit;
return cellTsPairsAlreadyExaminedInCurrentRow > sqlRowLimit;
}

private List<CellTsPairInfo> loadPage(StartingPosition startInclusive, boolean singleRow) {
FullQuery query = getQuery(startInclusive, singleRow);
private List<CellTsPairInfo> loadPage(boolean singleRow) {
FullQuery query = getQuery(singleRow);
try (ConnectionSupplier conns = new ConnectionSupplier(connectionPool);
AgnosticLightResultSet resultSet = executeQuery(conns.get(), query)) {
List<CellTsPairInfo> ret = new ArrayList<>();
Expand All @@ -143,14 +165,7 @@ private List<CellTsPairInfo> loadPage(StartingPosition startInclusive, boolean s
}
}

private AgnosticLightResultSet executeQuery(SqlConnection conn, FullQuery query) {
return conn.selectLightResultSetUnregisteredQueryWithFetchSize(
query.getQuery(),
sqlRowLimit,
query.getArgs());
}

private FullQuery getQuery(StartingPosition startingPos, boolean singleRow) {
private FullQuery getQuery(boolean singleRow) {
String pkIndex = PrimaryKeyConstraintNames.get(tableDetails.shortName);
FullQuery.Builder queryBuilder = FullQuery.builder()
.append("/* GET_CANDIDATE_CELLS_FOR_SWEEPING */ ")
Expand All @@ -166,26 +181,31 @@ private FullQuery getQuery(StartingPosition startingPos, boolean singleRow) {
.append(" FROM ").append(tableDetails.shortName).append(" t")
.append(" WHERE ts < ? ", request.sweepTimestamp());
SweepQueryHelpers.appendIgnoredTimestampPredicate(request, queryBuilder);
appendRangePredicates(startingPos, singleRow, queryBuilder);
appendRangePredicates(singleRow, queryBuilder);
return queryBuilder
.append(" ORDER BY row_name, col_name, ts")
.append(") WHERE rownum <= ").append(sqlRowLimit)
.append(" ORDER BY row_name, col_name, ts")
.build();
}

private void appendRangePredicates(StartingPosition startingPos, boolean singleRow, FullQuery.Builder builder) {
private void appendRangePredicates(boolean singleRow, FullQuery.Builder builder) {
if (singleRow) {
builder.append(" AND row_name = ?", startingPos.rowName);
if (startingPos.colName.length > 0) {
builder.append(" AND col_name >= ?", startingPos.colName);
if (startingPos.timestamp != null) {
builder.append(" AND (col_name > ? OR ts >= ?)", startingPos.colName, startingPos.timestamp);
builder.append(" AND row_name = ?", token.startRowInclusive());
if (token.startColInclusive().length > 0) {
builder.append(" AND col_name >= ?", token.startColInclusive());
if (token.startTsInclusive() != null) {
builder.append(" AND (col_name > ? OR ts >= ?)",
token.startColInclusive(),
token.startTsInclusive());
}
}
} else {
RangePredicateHelper.create(false, DBType.ORACLE, builder)
.startCellTsInclusive(startingPos.rowName, startingPos.colName, startingPos.timestamp);
.startCellTsInclusive(
token.startRowInclusive(),
token.startColInclusive(),
token.startTsInclusive());
}
}

Expand All @@ -199,6 +219,46 @@ private void appendEmptyValueFlagSelector(FullQuery.Builder builder) {
}
builder.append(" THEN 1 ELSE 0 END AS empty_val");
}

private AgnosticLightResultSet executeQuery(SqlConnection conn, FullQuery query) {
return conn.selectLightResultSetUnregisteredQueryWithFetchSize(
query.getQuery(),
sqlRowLimit,
query.getArgs());
}

private void updateCountOfExaminedCellTsPairsInCurrentRow(List<CellTsPairInfo> results) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're iterating through the results twice - once to bundle each one into a CellTsPairInfo, and again to figure out the count of cellTsPairsAlreadyExaminedInCurrentRow.

Is it worth cutting this iteration time in half by updating cellTsPairsAlreadyExaminedInCurrentRow within loadPage()?

byte[] currentRow = token.startRowInclusive();
for (CellTsPairInfo cellTs : results) {
if (!Arrays.equals(currentRow, cellTs.rowName)) {
currentRow = cellTs.rowName;
cellTsPairsAlreadyExaminedInCurrentRow = 0L;
}
cellTsPairsAlreadyExaminedInCurrentRow += 1;
}
}

private CellTsPairToken computeNextStartPosition(List<CellTsPairInfo> results, boolean scannedSingleRow) {
if (results.size() < sqlRowLimit) {
if (scannedSingleRow) {
// If we scanned a single row and reached the end, we just restart
// from the lexicographically next row
byte[] nextRow = RangeRequests.getNextStartRowUnlessTerminal(false, token.startRowInclusive());
if (nextRow == null) {
return CellTsPairToken.end();
} else {
cellTsPairsAlreadyExaminedInCurrentRow = 0L;
return CellTsPairToken.startRow(nextRow);
}
} else {
return CellTsPairToken.end();
}
} else {
CellTsPairInfo lastResult = Iterables.getLast(results);
return CellTsPairToken.continueRow(lastResult);
}
}

}

private static class TableDetails {
Expand All @@ -209,7 +269,6 @@ private static class TableDetails {
this.shortName = shortName;
this.hasOverflow = hasOverflow;
}

}

private TableDetails getTableDetailsUsingNewConnection(TableReference tableRef) {
Expand Down

This file was deleted.

Loading