Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
Use TimestampService Timestamps For Cassandra multiPut() and delete() (
Browse files Browse the repository at this point in the history
…#3224)

* fresh timestamp supplier adapter

* don't overwrite cass ts for multiPut, sweep sentinel, putWithTimestamps, and metadataTableDropper

* dont' override timestamp for tombstones

* only create column once when overwriting

* cleanup

* attempt to require timelock without a real compile dep

* CMTPs

* Updated plumbing

* Make some tests use the nonlegacy timestamp provider

* fix benchmarks

* changelog

* Default timestamp adapter seems super super fishy

* PR comments, Part 1

* Edge case in FTSA

* CR comments, Part 2

* CR comments part 3

* baseline

* compile break

* Add dev break to release notes

* FreshTimestampSupplierAdapterTest

* Timestamp Providers test

* range tombstones

* Add test for CKVSs

* Mention range tombstones in release notes

* CR comment cleanup part 4

* dbkvs

* rename to ...ForTesting

* legacy API, not legacyModeForTests API

* remove datatypeconverter

* Fix perf issue with deletion calling freshtimestamp a lot

* rename tests and change tolerance to 1

* Add API documentation for CMTP

* CVP should use primitive

* Delete now-bad comments
  • Loading branch information
jeremyk-91 authored Jun 11, 2018
1 parent 9137542 commit 8e208e0
Show file tree
Hide file tree
Showing 45 changed files with 944 additions and 214 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.palantir.atlasdb.spi;

import java.util.Optional;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

import org.slf4j.Logger;
Expand All @@ -30,8 +31,13 @@
import com.palantir.timestamp.TimestampStoreInvalidator;

public interface AtlasDbFactory {
Logger log = LoggerFactory.getLogger(AtlasDbFactory.class);

long NO_OP_FAST_FORWARD_TIMESTAMP = Long.MIN_VALUE + 1; // Note: Long.MIN_VALUE itself is not allowed.
boolean DEFAULT_INITIALIZE_ASYNC = false;
LongSupplier THROWING_FRESH_TIMESTAMP_SOURCE = () -> {
throw new UnsupportedOperationException("Not expecting to use fresh timestamps");
};

String getType();

Expand All @@ -41,6 +47,7 @@ default KeyValueService createRawKeyValueService(
Optional::empty,
leaderConfig,
Optional.empty(),
THROWING_FRESH_TIMESTAMP_SOURCE,
DEFAULT_INITIALIZE_ASYNC,
FakeQosClient.INSTANCE);
}
Expand All @@ -53,6 +60,8 @@ default KeyValueService createRawKeyValueService(
* @param leaderConfig If the implementation supports it, the optional leader configuration.
* @param namespace If the implementation supports it, this is the namespace to use when the namespace in config is
* absent. If both are present, they must match.
* @param freshTimestampSource If present, a source of fresh timestamps, which may be relevant for some KVS
* operations.
* @param initializeAsync If the implementations supports it, and initializeAsync is true, the KVS will initialize
* asynchronously when synchronous initialization fails.
* @param qosClient the client for checking limits from the Quality-of-Service service.
Expand All @@ -63,6 +72,7 @@ KeyValueService createRawKeyValueService(
Supplier<Optional<KeyValueServiceRuntimeConfig>> runtimeConfig,
Optional<LeaderConfig> leaderConfig,
Optional<String> namespace,
LongSupplier freshTimestampSource,
boolean initializeAsync,
QosClient qosClient);

Expand All @@ -77,7 +87,6 @@ TimestampService createTimestampService(

default TimestampStoreInvalidator createTimestampStoreInvalidator(KeyValueService rawKvs) {
return () -> {
Logger log = LoggerFactory.getLogger(AtlasDbFactory.class);
log.warn("AtlasDB doesn't yet support automated migration for KVS type {}.", getType());
return NO_OP_FAST_FORWARD_TIMESTAMP;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,12 @@ protected KeyValueService getKeyValueService() {
? ImmutableCassandraKeyValueServiceConfig.copyOf(CassandraContainer.KVS_CONFIG)
.withTimestampsGetterBatchSize(10)
: CassandraContainer.KVS_CONFIG;

// Need to ensure that C* timestamps for sentinels and deletes occur after timestamps where values were put
// (which is true in practice assuming timestamp service is working properly)
return CassandraKeyValueServiceImpl.create(
config,
CassandraContainer.LEADER_CONFIG);
CassandraContainer.LEADER_CONFIG,
CassandraTestTools.getMutationProviderWithStartingTimestamp(1_000_000));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Range;
import com.palantir.atlasdb.AtlasDbConstants;
import com.palantir.atlasdb.cassandra.CassandraMutationTimestampProviders;
import com.palantir.atlasdb.cassandra.ImmutableCassandraKeyValueServiceConfig;
import com.palantir.atlasdb.containers.CassandraContainer;
import com.palantir.atlasdb.containers.Containers;
Expand All @@ -57,6 +58,7 @@ public class CassandraClientPoolIntegrationTest {
private CassandraKeyValueService kv = CassandraKeyValueServiceImpl.create(
CassandraContainer.KVS_CONFIG,
CassandraContainer.LEADER_CONFIG,
CassandraMutationTimestampProviders.legacyModeForTestsOnly(),
clientPool);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ public class CassandraConnectionIntegrationTest {

@Test
public void testAuthProvided() {
CassandraKeyValueServiceImpl.create(
CassandraKeyValueServiceImpl.createForTesting(
CassandraContainer.KVS_CONFIG,
CassandraContainer.LEADER_CONFIG).close();
}

@Test
public void testAuthMissing() {
CassandraKeyValueServiceImpl.create(
CassandraKeyValueServiceImpl.createForTesting(
NO_CREDS_CKVS_CONFIG,
CassandraContainer.LEADER_CONFIG).close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.slf4j.Logger;

import com.google.common.collect.ImmutableList;
import com.palantir.atlasdb.cassandra.CassandraMutationTimestampProviders;
import com.palantir.atlasdb.containers.CassandraContainer;
import com.palantir.atlasdb.containers.Containers;
import com.palantir.atlasdb.encoding.PtBytes;
Expand All @@ -41,6 +42,7 @@ protected KeyValueService createKeyValueService() {
return CassandraKeyValueServiceImpl.create(
CassandraContainer.KVS_CONFIG,
CassandraContainer.LEADER_CONFIG,
CassandraMutationTimestampProviders.legacyModeForTestsOnly(),
Mockito.mock(Logger.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,18 @@
import static org.mockito.Mockito.verify;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.CqlRow;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.thrift.TException;
import org.junit.ClassRule;
import org.junit.Ignore;
Expand All @@ -49,11 +54,14 @@
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Range;
import com.google.common.io.BaseEncoding;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.palantir.atlasdb.AtlasDbConstants;
import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig;
Expand Down Expand Up @@ -87,6 +95,8 @@ public class CassandraKeyValueServiceIntegrationTest extends AbstractKeyValueSer
private static final int FOUR_DAYS_IN_SECONDS = 4 * 24 * 60 * 60;
private static final int ONE_HOUR_IN_SECONDS = 60 * 60;

private static final long STARTING_ATLAS_TIMESTAMP = 10_000_000;

private byte[] tableMetadata = new TableDefinition() {
{
rowName();
Expand All @@ -102,16 +112,19 @@ public class CassandraKeyValueServiceIntegrationTest extends AbstractKeyValueSer
cachePriority(TableMetadataPersistence.CachePriority.COLD);
}
}.toTableMetadata().persistToBytes();
public static final Cell CELL = Cell.create(PtBytes.toBytes("row"), PtBytes.toBytes("column"));

@Override
protected KeyValueService getKeyValueService() {
return createKvs(getConfigWithGcGraceSeconds(FOUR_DAYS_IN_SECONDS), logger);
}

private CassandraKeyValueService createKvs(CassandraKeyValueServiceConfig config, Logger testLogger) {
// Mutation provider is needed, because deletes/sentinels are to be written after writes
return CassandraKeyValueServiceImpl.create(
config,
CassandraContainer.LEADER_CONFIG,
CassandraTestTools.getMutationProviderWithStartingTimestamp(STARTING_ATLAS_TIMESTAMP),
testLogger);
}

Expand Down Expand Up @@ -324,6 +337,104 @@ public void testCleanCassLocksStateCli() throws Exception {
}
}

@Test
public void sweepSentinelsAreWrittenAtFreshTimestamp() throws Exception {
TableReference tableReference =
TableReference.createFromFullyQualifiedName("test." + RandomStringUtils.randomAlphanumeric(16));
keyValueService.createTable(tableReference, AtlasDbConstants.GENERIC_TABLE_METADATA);

keyValueService.addGarbageCollectionSentinelValues(tableReference, ImmutableList.of(CELL));

putDummyValueAtCellAndTimestamp(
tableReference,
CELL,
Value.INVALID_VALUE_TIMESTAMP,
STARTING_ATLAS_TIMESTAMP - 1);

Map<Cell, Value> results = keyValueService.get(tableReference, ImmutableMap.of(CELL, 1L));
byte[] contents = results.get(CELL).getContents();
assertThat(Arrays.equals(contents, PtBytes.EMPTY_BYTE_ARRAY), is(true));
}

@Test
public void deletionTakesPlaceAtFreshTimestamp() throws Exception {
TableReference tableReference =
TableReference.createFromFullyQualifiedName("test." + RandomStringUtils.randomAlphanumeric(16));
keyValueService.createTable(tableReference, AtlasDbConstants.GENERIC_TABLE_METADATA);
byte[] data = PtBytes.toBytes("data");
byte[] moreData = PtBytes.toBytes("data2");

keyValueService.putWithTimestamps(tableReference, ImmutableListMultimap.of(CELL, Value.create(data, 8L)));
keyValueService.putWithTimestamps(tableReference, ImmutableListMultimap.of(CELL, Value.create(moreData, 88L)));
keyValueService.delete(tableReference, ImmutableListMultimap.of(CELL, 8L));

putDummyValueAtCellAndTimestamp(tableReference, CELL, 8L, STARTING_ATLAS_TIMESTAMP - 1);
Map<Cell, Value> results = keyValueService.get(tableReference, ImmutableMap.of(CELL, 8L + 1));
assertThat(results.containsKey(CELL), is(false));
}

@Test
public void rangeTombstonesWrittenAtFreshTimestamp() throws Exception {
TableReference tableReference =
TableReference.createFromFullyQualifiedName("test." + RandomStringUtils.randomAlphanumeric(16));
keyValueService.createTable(tableReference, AtlasDbConstants.GENERIC_TABLE_METADATA);

keyValueService.deleteAllTimestamps(
tableReference,
ImmutableMap.of(CELL, 1_234_567L),
true);

putDummyValueAtCellAndTimestamp(tableReference, CELL, 1337L, STARTING_ATLAS_TIMESTAMP - 1);
Map<Cell, Value> resultExpectedCoveredByRangeTombstone =
keyValueService.get(tableReference, ImmutableMap.of(CELL, 1337L + 1));
assertThat(resultExpectedCoveredByRangeTombstone.containsKey(CELL), is(false));
}

@Test
public void cassandraTimestampsAreNotUsedAsAtlasTimestampsForRangeTombstone() throws Exception {
TableReference tableReference =
TableReference.createFromFullyQualifiedName("test." + RandomStringUtils.randomAlphanumeric(16));
keyValueService.createTable(tableReference, AtlasDbConstants.GENERIC_TABLE_METADATA);

keyValueService.deleteAllTimestamps(
tableReference,
ImmutableMap.of(CELL, 1_234_567L),
true);

// A value written outside of the range tombstone should not be covered by the range tombstone, even if
// the Cassandra timestamp of the value is much lower than that of the range tombstone.
// This test is likely to fail if the implementation confuses Cassandra timestamps for Atlas timestamps.
putDummyValueAtCellAndTimestamp(tableReference, CELL, 1_333_337L, STARTING_ATLAS_TIMESTAMP - 1);
Map<Cell, Value> resultsOutsideRangeTombstone =
keyValueService.get(tableReference, ImmutableMap.of(CELL, Long.MAX_VALUE));
assertThat(resultsOutsideRangeTombstone.containsKey(CELL), is(true));
}

private void putDummyValueAtCellAndTimestamp(
TableReference tableReference, Cell cell, long atlasTimestamp, long cassandraTimestamp)
throws TException {
CassandraKeyValueServiceImpl ckvs = (CassandraKeyValueServiceImpl) keyValueService;
ckvs.getClientPool().runWithRetry(input -> {
CqlQuery cqlQuery = new CqlQuery(String.format("INSERT INTO \"%s\".\"%s\" (key, column1, column2, value)"
+ " VALUES (%s, %s, %s, %s) USING TIMESTAMP %s;",
CassandraContainer.KVS_CONFIG.getKeyspaceOrThrow(),
tableReference.getQualifiedName().replaceAll("\\.", "__"),
convertBytesToHexString(cell.getRowName()),
convertBytesToHexString(cell.getColumnName()),
~atlasTimestamp,
convertBytesToHexString(PtBytes.toBytes("testtesttest")),
cassandraTimestamp));
return input.execute_cql3_query(
cqlQuery,
Compression.NONE,
ConsistencyLevel.QUORUM);
});
}

private String convertBytesToHexString(byte[] bytes) {
return "0x" + BaseEncoding.base16().lowerCase().encode(bytes);
}

private void createExtraLocksTable(SchemaMutationLockTables lockTables,
CassandraKeyValueServiceImpl kvs) throws TException {
TableReference originalTable = Iterables.getOnlyElement(lockTables.getAllLockTables());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class CassandraKeyValueServiceSerializableTransactionIntegrationTest

@Override
protected KeyValueService getKeyValueService() {
return CassandraKeyValueServiceImpl.create(
return CassandraKeyValueServiceImpl.createForTesting(
CassandraContainer.KVS_CONFIG,
CassandraContainer.LEADER_CONFIG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@ protected KeyValueService getKeyValueService() {
.withTimestampsGetterBatchSize(10)
: CassandraContainer.KVS_CONFIG;

return CassandraKeyValueServiceImpl.create(config, CassandraContainer.LEADER_CONFIG);
// Timestamp of 1,000,000 is done to ensure that tombstones are written at a Cassandra timestamp that is
// greater than the Atlas timestamp for any values written during the test.
return CassandraKeyValueServiceImpl.create(
config,
CassandraContainer.LEADER_CONFIG,
CassandraTestTools.getMutationProviderWithStartingTimestamp(1_000_000));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ public void setUp() {
ImmutableCassandraKeyValueServiceConfig quickTimeoutConfig = ImmutableCassandraKeyValueServiceConfig
.copyOf(CassandraContainer.KVS_CONFIG)
.withSchemaMutationTimeoutMillis(500);
kvs = CassandraKeyValueServiceImpl.create(
kvs = CassandraKeyValueServiceImpl.createForTesting(
quickTimeoutConfig,
CassandraContainer.LEADER_CONFIG);

ImmutableCassandraKeyValueServiceConfig slowTimeoutConfig = ImmutableCassandraKeyValueServiceConfig
.copyOf(CassandraContainer.KVS_CONFIG)
.withSchemaMutationTimeoutMillis(6 * 1000);
slowTimeoutKvs = CassandraKeyValueServiceImpl.create(
slowTimeoutKvs = CassandraKeyValueServiceImpl.createForTesting(
slowTimeoutConfig,
CassandraContainer.LEADER_CONFIG);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,51 @@
*/
package com.palantir.atlasdb.keyvalue.cassandra;

import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.rules.TestRule;

import com.palantir.atlasdb.cassandra.CassandraMutationTimestampProviders;
import com.palantir.atlasdb.containers.CassandraContainer;
import com.palantir.atlasdb.containers.Containers;
import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.transaction.impl.AbstractTransactionTest;
import com.palantir.exception.NotInitializedException;
import com.palantir.flake.FlakeRetryingRule;
import com.palantir.flake.ShouldRetry;
import com.palantir.timestamp.TimestampManagementService;

@ShouldRetry // The first test can fail with a TException: No host tried was able to create the keyspace requested.
public class CassandraKeyValueServiceTransactionIntegrationTest extends AbstractTransactionTest {
@ClassRule
public static final Containers CONTAINERS = new Containers(CassandraKeyValueServiceTransactionIntegrationTest.class)
.with(new CassandraContainer());

// This constant exists so that fresh timestamps are always greater than the write timestamps of values used in the
// test.
private static final long ONE_BILLION = 1_000_000_000;

@Rule
public final TestRule flakeRetryingRule = new FlakeRetryingRule();

@Before
public void advanceTimestamp() {
((TimestampManagementService) timestampService).fastForwardTimestamp(ONE_BILLION);
}

@Override
protected KeyValueService getKeyValueService() {
return CassandraKeyValueServiceImpl.create(
CassandraContainer.KVS_CONFIG,
CassandraContainer.LEADER_CONFIG);
CassandraContainer.LEADER_CONFIG,
CassandraMutationTimestampProviders.singleLongSupplierBacked(
() -> {
if (timestampService == null) {
throw new NotInitializedException("timestamp service");
}
return timestampService.getFreshTimestamp();
}));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void setup() {
protected KeyValueService getKeyValueService() {
CassandraKeyValueServiceConfig config = CassandraContainer.KVS_CONFIG;

return CassandraKeyValueServiceImpl.create(config, CassandraContainer.LEADER_CONFIG);
return CassandraKeyValueServiceImpl.createForTesting(config, CassandraContainer.LEADER_CONFIG);
}

@Override
Expand Down
Loading

0 comments on commit 8e208e0

Please sign in to comment.