Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

DbKvs sweep refactors to address last PR comments #2497

Merged
merged 13 commits into from
Oct 18, 2017
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,16 @@
import com.palantir.atlasdb.keyvalue.dbkvs.impl.batch.BatchingTaskRunner;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.batch.ImmediateSingleBatchTaskRunner;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.batch.ParallelTaskRunner;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.oracle.OracleCellTsPageLoaderFactory;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.oracle.OracleCellTsPageLoader;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.oracle.OracleGetRange;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.oracle.OracleOverflowValueLoader;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres.DbkvsVersionException;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres.PostgresCellTsPageLoaderFactory;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres.PostgresCellTsPageLoader;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres.PostgresGetRange;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres.PostgresPrefixedTableNames;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.ranges.DbKvsGetRange;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.ranges.DbKvsGetRanges;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairLoaderFactory;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairLoader;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.DbKvsGetCandidateCellsForSweeping;
import com.palantir.atlasdb.keyvalue.dbkvs.util.DbKvsPartitioners;
import com.palantir.atlasdb.keyvalue.impl.AbstractKeyValueService;
Expand Down Expand Up @@ -182,7 +182,7 @@ private static DbKvs createPostgres(ExecutorService executor,
PostgresPrefixedTableNames prefixedTableNames = new PostgresPrefixedTableNames(config);
DbTableFactory tableFactory = new PostgresDbTableFactory(config, prefixedTableNames);
TableMetadataCache tableMetadataCache = new TableMetadataCache(tableFactory);
CellTsPairLoaderFactory cellTsPairLoaderFactory = new PostgresCellTsPageLoaderFactory(
CellTsPairLoader cellTsPairLoader = new PostgresCellTsPageLoader(
prefixedTableNames, connections);
return new DbKvs(
executor,
Expand All @@ -192,7 +192,7 @@ private static DbKvs createPostgres(ExecutorService executor,
new ParallelTaskRunner(newFixedThreadPool(config.poolSize()), config.fetchBatchSize()),
(conns, tbl, ids) -> Collections.emptyMap(), // no overflow on postgres
new PostgresGetRange(prefixedTableNames, connections, tableMetadataCache),
new DbKvsGetCandidateCellsForSweeping(cellTsPairLoaderFactory));
new DbKvsGetCandidateCellsForSweeping(cellTsPairLoader));
}

private static DbKvs createOracle(ExecutorService executor,
Expand All @@ -204,7 +204,7 @@ private static DbKvs createOracle(ExecutorService executor,
OverflowValueLoader overflowValueLoader = new OracleOverflowValueLoader(oracleDdlConfig, tableNameGetter);
DbKvsGetRange getRange = new OracleGetRange(
connections, overflowValueLoader, tableNameGetter, valueStyleCache, oracleDdlConfig);
CellTsPairLoaderFactory cellTsPairLoaderFactory = new OracleCellTsPageLoaderFactory(
CellTsPairLoader cellTsPairLoaderFactory = new OracleCellTsPageLoader(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename

connections, tableNameGetter, valueStyleCache, oracleDdlConfig);
return new DbKvs(
executor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,17 @@
package com.palantir.atlasdb.keyvalue.dbkvs.impl.oracle;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import javax.annotation.Nullable;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.palantir.atlasdb.encoding.PtBytes;
import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweepingRequest;
import com.palantir.atlasdb.keyvalue.api.RangeRequests;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.keyvalue.dbkvs.OracleDdlConfig;
import com.palantir.atlasdb.keyvalue.dbkvs.OracleTableNameGetter;
Expand All @@ -29,10 +37,8 @@
import com.palantir.atlasdb.keyvalue.dbkvs.impl.TableValueStyle;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.TableValueStyleCache;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.ranges.RangePredicateHelper;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CandidatePagingState.CellTsPairInfo;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CandidatePagingState.StartingPosition;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairInfo;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairLoader;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.CellTsPairLoaderFactory;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.sweep.SweepQueryHelpers;
import com.palantir.atlasdb.keyvalue.impl.TableMappingNotFoundException;
import com.palantir.nexus.db.DBType;
Expand Down Expand Up @@ -73,62 +79,83 @@
// predicate and '(col_name > ? OR ts >= ?)' as the filter predicate. Once we exhaust all results
// in the row, we switch back to the "normal" mode, starting from the beginning of the lexicographically
// next row.
public class OracleCellTsPageLoaderFactory implements CellTsPairLoaderFactory {
public class OracleCellTsPageLoader implements CellTsPairLoader {
private final SqlConnectionSupplier connectionPool;
private final OracleTableNameGetter tableNameGetter;
private final TableValueStyleCache valueStyleCache;
private final OracleDdlConfig config;

private static final int DEFAULT_BATCH_SIZE = 1000;

public OracleCellTsPageLoaderFactory(SqlConnectionSupplier connectionPool,
OracleTableNameGetter tableNameGetter,
TableValueStyleCache valueStyleCache,
OracleDdlConfig config) {
public OracleCellTsPageLoader(SqlConnectionSupplier connectionPool,
OracleTableNameGetter tableNameGetter,
TableValueStyleCache valueStyleCache,
OracleDdlConfig config) {
this.connectionPool = connectionPool;
this.tableNameGetter = tableNameGetter;
this.valueStyleCache = valueStyleCache;
this.config = config;
}

@Override
public CellTsPairLoader createCellTsLoader(TableReference tableRef, CandidateCellForSweepingRequest request) {
public Iterator<List<CellTsPairInfo>> createPageIterator(TableReference tableRef,
CandidateCellForSweepingRequest request) {
TableDetails tableDetails = getTableDetailsUsingNewConnection(tableRef);
return new Loader(connectionPool, request, tableDetails, request.batchSizeHint().orElse(DEFAULT_BATCH_SIZE));
return new PageIterator(
connectionPool,
request,
tableDetails,
Math.max(1, request.batchSizeHint().orElse(DEFAULT_BATCH_SIZE)),
request.startRowInclusive());
}

private static class Loader implements CellTsPairLoader {
private final CandidateCellForSweepingRequest request;
private final TableDetails tableDetails;
private final int sqlRowLimit;
private final SqlConnectionSupplier connectionPool;
private static class PageIterator implements Iterator<List<CellTsPairInfo>> {
final SqlConnectionSupplier connectionPool;
final CandidateCellForSweepingRequest request;
final TableDetails tableDetails;
final int sqlRowLimit;

byte[] startRowInclusive;
byte[] startColInclusive = PtBytes.EMPTY_BYTE_ARRAY;
@Nullable Long startTsInclusive = null;
long cellTsPairsAlreadyExaminedInCurrentRow = 0L;
boolean reachedEnd = false;

Loader(SqlConnectionSupplier connectionPool,
CandidateCellForSweepingRequest request,
TableDetails tableDetails,
int sqlRowLimit) {
PageIterator(SqlConnectionSupplier connectionPool, CandidateCellForSweepingRequest request,
TableDetails tableDetails, int sqlRowLimit, byte[] startRowInclusive) {
this.connectionPool = connectionPool;
this.request = request;
this.tableDetails = tableDetails;
this.sqlRowLimit = sqlRowLimit;
this.startRowInclusive = startRowInclusive;
}

@Override
public boolean hasNext() {
return !reachedEnd;
}

// We don't use AbstractIterator to make sure hasNext() is fast and doesn't actually load the next page.
// As a downside, our iterator can return an empty page at the end.
// However, we can just filter out empty pages later.
@Override
public Page loadNextPage(StartingPosition startInclusive,
long cellsAlreadyExaminedInStartingRow) {
boolean singleRow = shouldScanSingleRow(cellsAlreadyExaminedInStartingRow);
List<CellTsPairInfo> cellTsPairs = loadPage(startInclusive, singleRow);
return new Page(cellTsPairs, singleRow, cellTsPairs.size() < sqlRowLimit);
public List<CellTsPairInfo> next() {
Preconditions.checkArgument(hasNext());
boolean singleRow = shouldScanSingleRow();
List<CellTsPairInfo> cellTsPairs = loadPage(singleRow);
updateCountOfExaminedCellTsPairsInCurrentRow(cellTsPairs);
computeNextStartPosition(cellTsPairs, singleRow);
return cellTsPairs;
}

private boolean shouldScanSingleRow(long cellsAlreadyExaminedInCurrentRow) {
private boolean shouldScanSingleRow() {
// The idea is that we don't want to throw away more than N database rows in order to get N results.
// This only matters for tables with wide rows. We could tweak this.
return cellsAlreadyExaminedInCurrentRow > sqlRowLimit;
return cellTsPairsAlreadyExaminedInCurrentRow > sqlRowLimit;
}

private List<CellTsPairInfo> loadPage(StartingPosition startInclusive, boolean singleRow) {
FullQuery query = getQuery(startInclusive, singleRow);
private List<CellTsPairInfo> loadPage(boolean singleRow) {
FullQuery query = getQuery(singleRow);
try (ConnectionSupplier conns = new ConnectionSupplier(connectionPool);
AgnosticLightResultSet resultSet = executeQuery(conns.get(), query)) {
List<CellTsPairInfo> ret = new ArrayList<>();
Expand All @@ -143,14 +170,7 @@ private List<CellTsPairInfo> loadPage(StartingPosition startInclusive, boolean s
}
}

private AgnosticLightResultSet executeQuery(SqlConnection conn, FullQuery query) {
return conn.selectLightResultSetUnregisteredQueryWithFetchSize(
query.getQuery(),
sqlRowLimit,
query.getArgs());
}

private FullQuery getQuery(StartingPosition startingPos, boolean singleRow) {
private FullQuery getQuery(boolean singleRow) {
String pkIndex = PrimaryKeyConstraintNames.get(tableDetails.shortName);
FullQuery.Builder queryBuilder = FullQuery.builder()
.append("/* GET_CANDIDATE_CELLS_FOR_SWEEPING */ ")
Expand All @@ -166,26 +186,26 @@ private FullQuery getQuery(StartingPosition startingPos, boolean singleRow) {
.append(" FROM ").append(tableDetails.shortName).append(" t")
.append(" WHERE ts < ? ", request.sweepTimestamp());
SweepQueryHelpers.appendIgnoredTimestampPredicate(request, queryBuilder);
appendRangePredicates(startingPos, singleRow, queryBuilder);
appendRangePredicates(singleRow, queryBuilder);
return queryBuilder
.append(" ORDER BY row_name, col_name, ts")
.append(") WHERE rownum <= ").append(sqlRowLimit)
.append(" ORDER BY row_name, col_name, ts")
.build();
}

private void appendRangePredicates(StartingPosition startingPos, boolean singleRow, FullQuery.Builder builder) {
private void appendRangePredicates(boolean singleRow, FullQuery.Builder builder) {
if (singleRow) {
builder.append(" AND row_name = ?", startingPos.rowName);
if (startingPos.colName.length > 0) {
builder.append(" AND col_name >= ?", startingPos.colName);
if (startingPos.timestamp != null) {
builder.append(" AND (col_name > ? OR ts >= ?)", startingPos.colName, startingPos.timestamp);
builder.append(" AND row_name = ?", startRowInclusive);
if (startColInclusive.length > 0) {
builder.append(" AND col_name >= ?", startColInclusive);
if (startTsInclusive != null) {
builder.append(" AND (col_name > ? OR ts >= ?)", startColInclusive, startTsInclusive);
}
}
} else {
RangePredicateHelper.create(false, DBType.ORACLE, builder)
.startCellTsInclusive(startingPos.rowName, startingPos.colName, startingPos.timestamp);
.startCellTsInclusive(startRowInclusive, startColInclusive, startTsInclusive);
}
}

Expand All @@ -199,6 +219,51 @@ private void appendEmptyValueFlagSelector(FullQuery.Builder builder) {
}
builder.append(" THEN 1 ELSE 0 END AS empty_val");
}

private AgnosticLightResultSet executeQuery(SqlConnection conn, FullQuery query) {
return conn.selectLightResultSetUnregisteredQueryWithFetchSize(
query.getQuery(),
sqlRowLimit,
query.getArgs());
}

private void updateCountOfExaminedCellTsPairsInCurrentRow(List<CellTsPairInfo> results) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're iterating through the results twice - once to bundle each one into a CellTsPairInfo, and again to figure out the count of cellTsPairsAlreadyExaminedInCurrentRow.

Is it worth cutting this iteration time in half by updating cellTsPairsAlreadyExaminedInCurrentRow within loadPage()?

byte[] currentRow = startRowInclusive;
for (CellTsPairInfo cellTs : results) {
if (!Arrays.equals(currentRow, cellTs.rowName)) {
currentRow = cellTs.rowName;
cellTsPairsAlreadyExaminedInCurrentRow = 0L;
}
cellTsPairsAlreadyExaminedInCurrentRow += 1;
}
}

private void computeNextStartPosition(List<CellTsPairInfo> results, boolean scannedSingleRow) {
if (results.size() < sqlRowLimit) {
if (scannedSingleRow) {
// If we scanned a single row and reached the end, we just restart
// from the lexicographically next row
byte[] nextRow = RangeRequests.getNextStartRowUnlessTerminal(false, startRowInclusive);
if (nextRow == null) {
reachedEnd = true;
} else {
startRowInclusive = nextRow;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These fields go together very tightly. Consider extracting a Token class to cover startRowInclusive, startColInclusive, startTsInclusive, reachedEnd, and cellTsPairsAlreadyExaminedInCurrentRow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not cells..currentRow as only Oracle cares about this, and the Token could be used across Postgres and Oracle (and maybe Cassandra?)

startColInclusive = PtBytes.EMPTY_BYTE_ARRAY;
startTsInclusive = null;
cellTsPairsAlreadyExaminedInCurrentRow = 0L;
}
} else {
reachedEnd = true;
}
} else {
CellTsPairInfo lastResult = Iterables.getLast(results);
Preconditions.checkState(lastResult.ts != Long.MAX_VALUE);
startRowInclusive = lastResult.rowName;
startColInclusive = lastResult.colName;
startTsInclusive = lastResult.ts + 1;
}
}

}

private static class TableDetails {
Expand All @@ -209,7 +274,6 @@ private static class TableDetails {
this.shortName = shortName;
this.hasOverflow = hasOverflow;
}

}

private TableDetails getTableDetailsUsingNewConnection(TableReference tableRef) {
Expand Down

This file was deleted.

Loading