Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

The big sweep rewrite, part 4: Cassandra impl #2231

Merged
merged 16 commits into from
Oct 5, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.mockito.Mockito;
Expand All @@ -38,6 +39,7 @@
import com.palantir.atlasdb.keyvalue.cassandra.paging.CassandraRawCellValue;
import com.palantir.atlasdb.keyvalue.cassandra.paging.CellPager;
import com.palantir.atlasdb.keyvalue.cassandra.paging.CellPagerBatchSizingStrategy;
import com.palantir.atlasdb.keyvalue.cassandra.paging.ImmutableCassandraRawCellValue;
import com.palantir.atlasdb.keyvalue.cassandra.paging.SingleRowColumnPager;
import com.palantir.atlasdb.keyvalue.impl.TestDataBuilder;
import com.palantir.atlasdb.keyvalue.impl.TracingPrefsConfig;
Expand All @@ -54,14 +56,16 @@ public class CellPagerIntegrationTest {
private CellPagerBatchSizingStrategy pageSizingStrategy = Mockito.mock(CellPagerBatchSizingStrategy.class);
private CellPager cellPager = null;

@BeforeClass
public static void setUpKvs() {
kvs = CassandraKeyValueService.create(
CassandraKeyValueServiceConfigManager.createSimpleManager(CassandraContainer.KVS_CONFIG),
CassandraContainer.LEADER_CONFIG,
Mockito.mock(Logger.class));
}

@Before
public void setUp() {
if (kvs == null) {
kvs = CassandraKeyValueService.create(
CassandraKeyValueServiceConfigManager.createSimpleManager(CassandraContainer.KVS_CONFIG),
CassandraContainer.LEADER_CONFIG,
Mockito.mock(Logger.class));
}
kvs.createTable(TEST_TABLE, AtlasDbConstants.GENERIC_TABLE_METADATA);
kvs.truncateTable(TEST_TABLE);
TracingQueryRunner queryRunner = new TracingQueryRunner(
Expand Down Expand Up @@ -127,7 +131,10 @@ private static CassandraRawCellValue val(int row, int col, long ts, String value
column.setName(CassandraKeyValueServices.makeCompositeBuffer(TestDataBuilder.row(col), ts));
column.setValue(TestDataBuilder.value(value));
column.setTimestamp(ts);
return new CassandraRawCellValue(TestDataBuilder.row(row), column);
return ImmutableCassandraRawCellValue.builder()
.rowKey(TestDataBuilder.row(row))
.column(column)
.build();
}

private TestDataBuilder testDataBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,53 +16,14 @@

package com.palantir.atlasdb.keyvalue.cassandra.paging;

import java.util.Arrays;

import org.apache.cassandra.thrift.Column;
import org.immutables.value.Value;

public class CassandraRawCellValue {
private final byte[] rowKey;
private final Column column;

public CassandraRawCellValue(byte[] rowKey, Column column) {
this.rowKey = rowKey;
this.column = column;
}

public byte[] getRowKey() {
return rowKey;
}

public Column getColumn() {
return column;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}

CassandraRawCellValue that = (CassandraRawCellValue) other;
@Value.Immutable
public interface CassandraRawCellValue {

if (!Arrays.equals(rowKey, that.rowKey)) {
return false;
}
return column != null ? column.equals(that.column) : that.column == null;
}
byte[] getRowKey();

@Override
public int hashCode() {
int result = Arrays.hashCode(rowKey);
result = 31 * result + (column != null ? column.hashCode() : 0);
return result;
}
Column getColumn();

@Override
public String toString() {
return "CassandraRawCellValue{" + "rowKey=" + Arrays.toString(rowKey) + ", column=" + column + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.KeySlice;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
Expand All @@ -40,6 +41,7 @@
import com.palantir.atlasdb.keyvalue.cassandra.CassandraClientPool;
import com.palantir.atlasdb.keyvalue.cassandra.TracingQueryRunner;
import com.palantir.atlasdb.keyvalue.cassandra.paging.CellPagerBatchSizingStrategy.PageSizes;
import com.palantir.common.annotation.Output;

/*
* A class for iterating uniformly through raw cells, i.e. (rowName, columnName, timestamp) triplets, in a given table
Expand Down Expand Up @@ -136,7 +138,7 @@ private class PageIterator extends AbstractIterator<List<CassandraRawCellValue>>

private byte[] nextRow;

private final Queue<Iterator<List<CassandraRawCellValue>>> queuedTasks = new ArrayDeque<>();
private final Queue<Iterator<List<CassandraRawCellValue>>> cellsToReturn = new ArrayDeque<>();
private final StatsAccumulator stats = new StatsAccumulator();
private PageSizes pageSizes = null;

Expand All @@ -152,16 +154,20 @@ private class PageIterator extends AbstractIterator<List<CassandraRawCellValue>>
@Override
protected List<CassandraRawCellValue> computeNext() {
while (true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The options in this loop could use a bit of explanation, especially referring to the steps above - e.g. fetchNextRange means we'd be up to the start of step 8 above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (queuedTasks.isEmpty()) {
if (cellsToReturn.isEmpty()) {
// We have no pre-loaded rows from the previous "rectangle", so we need to fetch more if we can
if (nextRow == null) {
return endOfData();
} else {
// Get the next "rectangle" of data (step 8 in the example above) and try again
fetchNextRange();
}
} else if (queuedTasks.peek().hasNext()) {
return queuedTasks.peek().next();
} else if (cellsToReturn.peek().hasNext()) {
// Scan through the remainder of the row (steps 3, 6 and 10 in the example above)
return cellsToReturn.peek().next();
} else {
queuedTasks.poll();
// We exhausted an iterator - pop it from the queue and try again
cellsToReturn.poll();
}
}
}
Expand All @@ -174,41 +180,16 @@ private void fetchNextRange() {
if (slices.isEmpty()) {
nextRow = null;
} else {
splitFetchedRowsIntoTasks(slices);
cellsToReturn.addAll(splitFetchedRowsIntoTasks(
slices,
pageSizes.columnPerRowLimit,
(rowKey, lastSeenColumn) -> singleRowPager.createColumnIterator(
tableRef, rowKey, cellBatchHint, lastSeenColumn, consistencyLevel),
stats));
computeNextStartRow(slices);
}
}

private void splitFetchedRowsIntoTasks(List<KeySlice> slices) {
// Split the returned slices into single partially fetched rows and contiguous runs of fully fetched rows
List<KeySlice> loadedRows = new ArrayList<>();
for (KeySlice slice : slices) {
loadedRows.add(slice);
if (isFullyFetched(slice)) {
stats.add(slice.getColumnsSize());
} else {
queuedTasks.add(ImmutableList.of(keySlicesToCells(loadedRows)).iterator());
loadedRows.clear();
// If the row was only partially fetched, we enqueue an iterator to page through
// the remainder of that single row.
Column lastSeenColumn = Iterables.getLast(slice.getColumns()).column;
Iterator<List<ColumnOrSuperColumn>> rawColumnIter = singleRowPager.createColumnIterator(
tableRef, slice.getKey(), cellBatchHint, lastSeenColumn, consistencyLevel);
Iterator<List<ColumnOrSuperColumn>> statsUpdatingIter = new StatsUpdatingIterator(
rawColumnIter, stats, slice.getColumnsSize());
queuedTasks.add(Iterators.transform(
statsUpdatingIter, cols -> columnsToCells(cols, slice.getKey())));
}
}
if (!loadedRows.isEmpty()) {
queuedTasks.add(ImmutableList.of(keySlicesToCells(loadedRows)).iterator());
}
}

private boolean isFullyFetched(KeySlice slice) {
return slice.getColumnsSize() < pageSizes.columnPerRowLimit;
}

private void computeNextStartRow(List<KeySlice> slices) {
if (slices.size() < pageSizes.rowLimit) {
nextRow = null;
Expand All @@ -220,6 +201,48 @@ private void computeNextStartRow(List<KeySlice> slices) {
}
}

@VisibleForTesting
interface RowRemainderIteratorFactory {
Iterator<List<ColumnOrSuperColumn>> createIteratorForRemainderOfRow(byte[] rowKey, Column lastSeenColumn);
}

@VisibleForTesting
static List<Iterator<List<CassandraRawCellValue>>> splitFetchedRowsIntoTasks(
List<KeySlice> slices,
int columnPerRowLimit,
RowRemainderIteratorFactory rowRemainderIteratorFactory,
@Output StatsAccumulator statsToAccumulate) {
// Split the returned slices into single partially fetched rows and contiguous runs of fully fetched rows
List<Iterator<List<CassandraRawCellValue>>> ret = new ArrayList<>();
List<KeySlice> loadedRows = new ArrayList<>();
for (KeySlice slice : slices) {
loadedRows.add(slice);
if (isFullyFetched(slice, columnPerRowLimit)) {
statsToAccumulate.add(slice.getColumnsSize());
} else {
ret.add(ImmutableList.of(keySlicesToCells(loadedRows)).iterator());
loadedRows.clear();
// If the row was only partially fetched, we enqueue an iterator to page through
// the remainder of that single row.
Column lastSeenColumn = Iterables.getLast(slice.getColumns()).column;
Iterator<List<ColumnOrSuperColumn>> rawColumnIter = rowRemainderIteratorFactory
.createIteratorForRemainderOfRow(slice.getKey(), lastSeenColumn);
Iterator<List<ColumnOrSuperColumn>> statsUpdatingIter = new StatsUpdatingIterator(
rawColumnIter, statsToAccumulate, slice.getColumnsSize());
ret.add(Iterators.transform(
statsUpdatingIter, cols -> columnsToCells(cols, slice.getKey())));
}
}
if (!loadedRows.isEmpty()) {
ret.add(ImmutableList.of(keySlicesToCells(loadedRows)).iterator());
}
return ret;
}

private static boolean isFullyFetched(KeySlice slice, int columnPerRowLimit) {
return slice.getColumnsSize() < columnPerRowLimit;
}

private static class StatsUpdatingIterator extends AbstractIterator<List<ColumnOrSuperColumn>> {
private final Iterator<List<ColumnOrSuperColumn>> delegate;
private final StatsAccumulator stats;
Expand Down Expand Up @@ -248,14 +271,20 @@ private static List<CassandraRawCellValue> keySlicesToCells(List<KeySlice> slice
List<CassandraRawCellValue> ret = new ArrayList<>();
for (KeySlice slice : slices) {
for (ColumnOrSuperColumn col : slice.getColumns()) {
ret.add(new CassandraRawCellValue(slice.getKey(), col.getColumn()));
ret.add(ImmutableCassandraRawCellValue.builder()
.rowKey(slice.getKey())
.column(col.getColumn())
.build());
}
}
return ret;
}

private static List<CassandraRawCellValue> columnsToCells(List<ColumnOrSuperColumn> columns, byte[] rowKey) {
return Lists.transform(columns, col -> new CassandraRawCellValue(rowKey, col.getColumn()));
return Lists.transform(columns, col -> ImmutableCassandraRawCellValue.builder()
.rowKey(rowKey)
.column(col.getColumn())
.build());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class CellPagerBatchSizingStrategy {
public PageSizes computePageSizes(int cellBatchHint, StatsAccumulator stats) {
if (stats.count() == 0) {
// No data yet: go with a square
return negotiateExactPageSize(Math.round(Math.sqrt(cellBatchHint)), cellBatchHint);
return getExactPageSize(Math.round(Math.sqrt(cellBatchHint)), cellBatchHint);
} else {
return computePageSizesAssumingNormalDistribution(
cellBatchHint, stats.mean(), stats.populationStandardDeviation());
Expand All @@ -47,12 +47,12 @@ public PageSizes computePageSizes(int cellBatchHint, StatsAccumulator stats) {
* Here, the height of the rectangle "h" is KeyRange.count and the width "w" is SliceRange.count.
* We want the area of our rectangle to be on the order of "cellBatchHint" supplied by the user, i.e.
*
* w * h ~ cellBatchHint (*)
* w * h ~ cellBatchHint (1)
*
* The question is how to choose "w" and "h" appropriately. If a row contains "w" or more columns (like rows 1 and
* 4 in the picture above), we need to make at least one extra round trip to retrieve that row individually
* using the "get_slice" call. Thus, setting "w" too low will result in extra round trips to retrieve the individual
* rows, while setting it too high would make "h" low (since "w" and "h" are inversely proportional as in (*)),
* rows, while setting it too high would make "h" low (since "w" and "h" are inversely proportional as in (1)),
* resulting in more round trips to make progress "vertically".
*
* It makes sense to try to minimize the number of round trips to Cassandra. Let's assume that rows will be short
Expand All @@ -65,16 +65,16 @@ public PageSizes computePageSizes(int cellBatchHint, StatsAccumulator stats) {
*
* (N / h) + (# of rows with >= w columns)
*
* Let F(k) be the cumulative distribution function of the number of columns per row. Also, remember from (*)
* Let F(k) be the cumulative distribution function of the number of columns per row. Also, remember from (1)
* that "h" is on the order of "cellBatchHint / w". Hence the above expression is approximately equal to
*
* N * w / cellBatchHint + N * (1 - F(w - 1))
*
* Minimizing this is equivalent to minimizing
*
* w / cellBatchHint - F(w - 1) (**)
* w / cellBatchHint - F(w - 1) (2)
*
* So if we had an adequate model of F, we could estimate its parameters and minimize (**) with respect to "w".
* So if we had an adequate model of F, we could estimate its parameters and minimize (2) with respect to "w".
* The problem is that the nature of F can depend on the format of a particular table. For example, an append-only
* table with five named columns where all five columns are always written at once for every row, will have
* a degenerate distribution localized at 5; a single-column table with mutations might exhibit a Poisson-like
Expand All @@ -86,28 +86,28 @@ public PageSizes computePageSizes(int cellBatchHint, StatsAccumulator stats) {
*
* Pragmatically speaking, we want a formula like
*
* w = < estimated mean # columns per row > + 1 + g(sigma, cellBatchHint) (***)
* w = < estimated mean # columns per row > + 1 + g(sigma, cellBatchHint) (3)
*
* where g(sigma, cellBatchHint) is some term that grows with variance sigma^2. The term is necessary to efficiently
* handle both the "append-only fixed-column table" (zero variance) and the "partially dirty table" (high variance)
* cases.
*
* If we minimize (**) in the case of F being the normal distribution, we get (after doing all the calculus):
* If we minimize (2) in the case of F being the normal distribution, we get (after doing all the calculus):
*
* w = mu + 1 + sigma * sqrt(-2 * ln(sqrt(2*pi*sigma^2) / cellBatchHint)) (****)
* w = mu + 1 + sigma * sqrt(-2 * ln(sqrt(2*pi*sigma^2) / cellBatchHint)) (4)
*
* where "mu" is the mean and "sigma" is the standard deviation. This actually looks like (***) for sufficiently
* where "mu" is the mean and "sigma" is the standard deviation. This actually looks like (3) for sufficiently
* small values of sigma, with
*
* g(sigma, cellBatchHint) = sigma * sqrt(-2 * ln(sqrt(2*pi*sigma^2) / cellBatchHint)) (*****)
* g(sigma, cellBatchHint) = sigma * sqrt(-2 * ln(sqrt(2*pi*sigma^2) / cellBatchHint)) (5)
*
* Of course, a real table can't have normally distributed row widths (simply because the
* row width is a discrete quantity), but:
*
* - the normal distribution approximates the "clean narrow table" (low variance) case quite well,
* and is a half-decent approximation to the Poisson distribution (the "single-column dirty" case);
* - it's easy to estimate the parameters for it (mu and sigma), as well as to do the calculus exercise
* to get (****);
* to get (4);
* - the numbers it produces actually seem reasonable.
*
* To illustrate the last point, here is a table of g(sigma, cellBatchHint) for some values of sigma. (The physical
Expand Down Expand Up @@ -137,19 +137,20 @@ private static PageSizes computePageSizesAssumingNormalDistribution(
// just use the minimum columnPerRowLimit to at least minimize the number of calls
// to get_range_slice.
double logArg = Math.min(1.0, SQRT_2_PI * sigma / cellBatchHint);
// "extraColumnsToFetch" is the value of "g" defined in (*****)
// "extraColumnsToFetch" is the value of "g" defined in (5)
// Mathematically, the limit of x*ln(x) is 0 as x->0, but numerically we need to compute the logarithm
// separately (unless we use a Taylor approximation to x*ln(x) instead, which we don't want),
// so we handle the case of small "s" separately to avoid the logarithm blow up at 0.
double extraColumnsToFetch = logArg < 1e-12 ? 0.0 : sigma * Math.sqrt(-2.0 * Math.log(logArg));
return negotiateExactPageSize(columnsPerRowMean + 1.0 + extraColumnsToFetch, cellBatchHint);
return getExactPageSize(columnsPerRowMean + 1.0 + extraColumnsToFetch, cellBatchHint);
}

private static PageSizes negotiateExactPageSize(double desiredColumnPerRowLimit, int cellBatchHint) {
int columnPerRowLimit1 = Math.max(2, Math.min(cellBatchHint, (int) Math.round(desiredColumnPerRowLimit)));
int rowLimit = Math.max(1, cellBatchHint / columnPerRowLimit1);
int columnPerRowLimit2 = Math.max(2, cellBatchHint / rowLimit);
return new PageSizes(rowLimit, columnPerRowLimit2);
private static PageSizes getExactPageSize(double desiredColumnPerRowLimit, int cellBatchHint) {
int approximateColumnPerRowLimit = Math.max(2,
Math.min(cellBatchHint, (int) Math.round(desiredColumnPerRowLimit)));
int rowLimit = Math.max(1, cellBatchHint / approximateColumnPerRowLimit);
int finalColumnPerRowLimit = Math.max(2, cellBatchHint / rowLimit);
return new PageSizes(rowLimit, finalColumnPerRowLimit);
}

public static class PageSizes {
Expand Down
Loading