Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
Revert "The big sweep rewrite, part 4: Cassandra impl (#2231)"
Browse files Browse the repository at this point in the history
This reverts commit 76e5636.
  • Loading branch information
gsheasby committed Oct 19, 2017
1 parent 9d02b7f commit d44aa27
Show file tree
Hide file tree
Showing 31 changed files with 102 additions and 1,518 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,65 @@
*/
package com.palantir.atlasdb.keyvalue.cassandra;

import java.util.Arrays;

import org.apache.commons.lang3.RandomStringUtils;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig;
import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfigManager;
import com.palantir.atlasdb.cassandra.ImmutableCassandraKeyValueServiceConfig;
import com.palantir.atlasdb.containers.CassandraContainer;
import com.palantir.atlasdb.containers.Containers;
import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.keyvalue.api.SweepResults;
import com.palantir.atlasdb.protos.generated.TableMetadataPersistence;
import com.palantir.atlasdb.sweep.AbstractSweepTaskRunnerTest;

@RunWith(Parameterized.class)
public class CassandraKeyValueServiceSweepTaskRunnerIntegrationTest extends AbstractSweepTaskRunnerTest {
@ClassRule
public static final Containers CONTAINERS = new Containers(
CassandraKeyValueServiceSweepTaskRunnerIntegrationTest.class)
.with(new CassandraContainer());

@Parameterized.Parameter
public boolean useColumnBatchSize;

@Parameterized.Parameters(name = "Use column batch size parameter = {0}")
public static Iterable<?> parameters() {
return Arrays.asList(true, false);
}


@Override
protected KeyValueService getKeyValueService() {
CassandraKeyValueServiceConfig config = useColumnBatchSize
? ImmutableCassandraKeyValueServiceConfig.copyOf(CassandraContainer.KVS_CONFIG)
.withTimestampsGetterBatchSize(10)
: CassandraContainer.KVS_CONFIG;

return CassandraKeyValueServiceImpl.create(
CassandraKeyValueServiceConfigManager.createSimpleManager(
CassandraContainer.KVS_CONFIG), CassandraContainer.LEADER_CONFIG);
CassandraKeyValueServiceConfigManager.createSimpleManager(config), CassandraContainer.LEADER_CONFIG);
}

@Test
public void should_not_oom_when_there_are_many_large_values_to_sweep() {
Assume.assumeTrue("should_not_oom test will always fail if column batch size is not set!", useColumnBatchSize);

createTable(TableMetadataPersistence.SweepStrategy.CONSERVATIVE);

long numInsertions = 100;
insertMultipleValues(numInsertions);

long sweepTimestamp = numInsertions + 1;
SweepResults results = completeSweep(sweepTimestamp);
Assert.assertEquals(numInsertions - 1, results.getStaleValuesDeleted());
}

@Test
Expand All @@ -53,4 +89,14 @@ public void should_return_values_for_multiple_columns_when_sweeping() {
Assert.assertEquals(28, results.getStaleValuesDeleted());
}

private void insertMultipleValues(long numInsertions) {
for (int ts = 1; ts <= numInsertions; ts++) {
System.out.println("putting with ts = " + ts);
putIntoDefaultColumn("row", makeLongRandomString(), ts);
}
}

private String makeLongRandomString() {
return RandomStringUtils.random(1_000_000);
}
}

This file was deleted.

2 changes: 0 additions & 2 deletions atlasdb-cassandra/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ dependencies {
processor group: 'org.immutables', name: 'value'
processor 'com.google.auto.service:auto-service:1.0-rc2'
processor project(":atlasdb-processors")

explicitShadow 'com.palantir.patches.sourceforge:trove3:' + libVersions.trove
}

shadowJar {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.palantir.atlasdb.keyvalue.cassandra;

import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.keyvalue.impl.GetCandidateCellsForSweepingShim;

public interface CassandraKeyValueService extends KeyValueService {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ static String encodeAsHex(byte[] array) {
return "0x" + PtBytes.encodeHexString(array);
}

public static ByteBuffer makeCompositeBuffer(byte[] colName, long positiveTimestamp) {
static ByteBuffer makeCompositeBuffer(byte[] colName, long positiveTimestamp) {
assert colName.length <= 1 << 16 : "Cannot use column names larger than 64KiB, was " + colName.length;

ByteBuffer buffer = ByteBuffer
Expand Down Expand Up @@ -218,7 +218,7 @@ static Pair<byte[], Long> decompose(ByteBuffer inputComposite) {
* Convenience method to get the name buffer for the specified column and
* decompose it into the name and timestamp.
*/
public static Pair<byte[], Long> decomposeName(Column column) {
static Pair<byte[], Long> decomposeName(Column column) {
ByteBuffer nameBuffer;
if (column.isSetName()) {
nameBuffer = column.bufferForName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.base.Supplier;
import com.palantir.atlasdb.keyvalue.api.ColumnSelection;
import com.palantir.atlasdb.keyvalue.api.RangeRequest;
import com.palantir.atlasdb.keyvalue.api.RangeRequests;
import com.palantir.atlasdb.keyvalue.api.RowResult;
import com.palantir.atlasdb.keyvalue.cassandra.ResultsExtractor;
import com.palantir.util.paging.AbstractPagingIterable;
Expand All @@ -42,17 +43,17 @@ public class CassandraRangePagingIterable<T>

private final int batchHint;
private final ColumnSelection selection;
private final RowRangeLoader rowRangeLoader;
private final RowGetter rowGetter;
private final SlicePredicate slicePredicate;

public CassandraRangePagingIterable(
RowRangeLoader rowRangeLoader,
RowGetter rowGetter,
SlicePredicate slicePredicate,
ColumnGetter columnGetter,
RangeRequest rangeRequest,
Supplier<ResultsExtractor<T>> resultsExtractor,
long timestamp) {
this.rowRangeLoader = rowRangeLoader;
this.rowGetter = rowGetter;
this.slicePredicate = slicePredicate;
this.columnGetter = columnGetter;
this.rangeRequest = rangeRequest;
Expand Down Expand Up @@ -89,8 +90,8 @@ private TokenBackedBasicResultsPage<RowResult<T>, byte[]> getSinglePage(byte[] s
}

private List<KeySlice> getRows(byte[] startKey) throws Exception {
KeyRange keyRange = KeyRanges.createKeyRange(startKey, rangeRequest.getEndExclusive(), batchHint);
return rowRangeLoader.getRows(keyRange, slicePredicate);
KeyRange keyRange = getKeyRange(startKey, rangeRequest.getEndExclusive());
return rowGetter.getRows(keyRange);
}

private Map<ByteBuffer, List<ColumnOrSuperColumn>> getColumns(List<KeySlice> firstPage) {
Expand All @@ -114,4 +115,15 @@ private TokenBackedBasicResultsPage<RowResult<T>, byte[]> pageWithNoMoreResultsA
return SimpleTokenBackedResultsPage.create(rangeRequest.getEndExclusive(), page.getResults(), false);
}

private KeyRange getKeyRange(byte[] startKey, byte[] endExclusive) {
KeyRange keyRange = new KeyRange(batchHint);
keyRange.setStart_key(startKey);
if (endExclusive.length == 0) {
keyRange.setEnd_key(endExclusive);
} else {
// We need the previous name because this is inclusive, not exclusive
keyRange.setEnd_key(RangeRequests.previousLexicographicName(endExclusive));
}
return keyRange;
}
}

This file was deleted.

Loading

0 comments on commit d44aa27

Please sign in to comment.