Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

compare cassandra revert to 0.60.0 #2522

Closed
wants to merge 11 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import java.nio.charset.StandardCharsets;

import javax.annotation.Nullable;

import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import com.google.common.cache.Cache;
Expand Down Expand Up @@ -120,13 +122,20 @@ public static String toString(final byte[] arr, int off, int len) {
return new String(arr, off, len, StandardCharsets.UTF_8);
}

public static String encodeHexString(byte[] name) {
public static String encodeHexString(@Nullable byte[] name) {
if (name == null) {
return "";
}
return BaseEncoding.base16().lowerCase().encode(name);
}

public static byte[] decodeHexString(@Nullable String hexString) {
if (hexString == null) {
return PtBytes.EMPTY_BYTE_ARRAY;
}
return BaseEncoding.base16().lowerCase().decode(hexString.toLowerCase());
}

public static final Function<byte[], String> BYTES_TO_HEX_STRING = PtBytes::encodeHexString;

public static void addIfNotEmpty(MoreObjects.ToStringHelper helper, String name, byte[] bytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,6 @@ public interface CandidateCellForSweepingRequest {

OptionalInt batchSizeHint();

/**
* This can be used in the future when we implement the 'transaction table sweeping' feature.
* This should be set to the timestamp T such that all transactions with start timestamps less than T that
* appear in the given table are known to be committed. The number T can come from the previous run of sweep
* for the table.
*
* This enables in-database pre-filtering of cells that should be considered for sweeping.
* For example, if a cell has exactly one timestamp and this timestamp is known to belong to a committed
* transaction, then the cell doesn't need to be swept, and therefore we can avoid sending it over the network
* from the DB to the sweeper process.
*/
long minUncommittedStartTimestamp();

/**
* Only start timestamps that are strictly below this number will be considered.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,30 +450,10 @@ ClosableIterator<RowResult<Set<Long>>> getRangeOfTimestamps(

/**
* For a given range of rows, returns all candidate cells for sweeping (and their timestamps).
* Here is the precise definition of a candidate cell:
* <blockquote>
* Let {@code Ts} be {@code request.sweepTimestamp()}<br>
* Let {@code Tu} be {@code request.minUncommittedTimestamp()}<br>
* Let {@code V} be {@code request.shouldCheckIfLatestValueIsEmpty()}<br>
* Let {@code Ti} be set of timestamps in {@code request.timestampsToIgnore()}<br>
* <p>
* Consider a cell {@code C}. Let {@code Tc} be the set of all timestamps for {@code C} that are strictly
* less than {@code Ts}. Let {@code T} be {@code Tc \ Ti} (i.e. the cell timestamps minus the ignored
* timestamps).
* <p>
* Then {@code C} is a candidate for sweeping if and only if at least one of
* the following conditions is true:
* <ol>
* <li> The set {@code T} has more than one element
* <li> The set {@code T} contains an element that is greater than or equal to {@code Tu}
* (that is, there is a timestamp that can possibly come from an uncommitted or aborted transaction)
* <li> The set {@code T} contains {@link Value#INVALID_VALUE_TIMESTAMP}
* (that is, there is a sentinel we can possibly clean up)
* <li> {@code V} is true and the cell value corresponding to the maximum element of {@code T} is empty
* (that is, the latest sweepable value is a 'soft-delete' tombstone)
* </ol>
*
* </blockquote>
* <p>
* A candidate cell is a cell that has at least one timestamp that is less than request.sweepTimestamp() and is
* not in the set specified by request.timestampsToIgnore().
* <p>
* This method will scan the semi-open range of rows from the start row specified in the {@code request}
* to the end of the table. If the given start row name is an empty byte array, the whole table will be
* scanned.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,34 @@ public Optional<byte[]> getNextStartRow() {

public abstract long getSweptTimestamp();

/**
* Returns a new {@link SweepResults} representing cumulative results from this instance and {@code other}. Assumes
* that {@code other} represents results from subsequent iteration of sweep (i.e., it happened after the run that
* produced this instance).
*/
public SweepResults accumulateWith(SweepResults other) {
return SweepResults.builder()
.cellTsPairsExamined(getCellTsPairsExamined() + other.getCellTsPairsExamined())
.staleValuesDeleted(getStaleValuesDeleted() + other.getStaleValuesDeleted())
.sweptTimestamp(other.getSweptTimestamp())
.nextStartRow(other.getNextStartRow())
.build();
}

public static ImmutableSweepResults.Builder builder() {
return ImmutableSweepResults.builder();
}

public static SweepResults createEmptySweepResult() {
return createEmptySweepResult(Optional.empty());
}

public static SweepResults createEmptySweepResult(Optional<byte[]> startRow) {
return builder()
.cellTsPairsExamined(0)
.staleValuesDeleted(0)
.sweptTimestamp(0)
.nextStartRow(startRow)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,9 @@ <T, E extends Exception> T runTaskThrowOnConflict(TransactionTask<T, E> task)
throws E, TransactionFailedRetriableException;

/**
* This will open and run a read only transaction. Read transactions are just like normal
* transactions, but will throw if any write operations are called.
* This will open and run a read-only transaction. Read-only transactions are similar to other
* transactions, but will throw if any write operations are called. Furthermore, they often
* make fewer network calls than their read/write counterparts so should be used where possible.
*
* @param task task to run
*
Expand Down
2 changes: 2 additions & 0 deletions atlasdb-cassandra/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ dependencies {
processor group: 'org.immutables', name: 'value'
processor 'com.google.auto.service:auto-service:1.0-rc2'
processor project(":atlasdb-processors")

explicitShadow 'com.palantir.patches.sourceforge:trove3:' + libVersions.trove
}

shadowJar {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ static String encodeAsHex(byte[] array) {
return "0x" + PtBytes.encodeHexString(array);
}

static ByteBuffer makeCompositeBuffer(byte[] colName, long positiveTimestamp) {
public static ByteBuffer makeCompositeBuffer(byte[] colName, long positiveTimestamp) {
assert colName.length <= 1 << 16 : "Cannot use column names larger than 64KiB, was " + colName.length;

ByteBuffer buffer = ByteBuffer
Expand Down Expand Up @@ -218,7 +218,7 @@ static Pair<byte[], Long> decompose(ByteBuffer inputComposite) {
* Convenience method to get the name buffer for the specified column and
* decompose it into the name and timestamp.
*/
static Pair<byte[], Long> decomposeName(Column column) {
public static Pair<byte[], Long> decomposeName(Column column) {
ByteBuffer nameBuffer;
if (column.isSetName()) {
nameBuffer = column.bufferForName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,4 @@ private SlicePredicate getSlicePredicate() {
predicate.setSlice_range(slice);
return predicate;
}
}
}
8 changes: 8 additions & 0 deletions atlasdb-cassandra/versions.lock
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@
"com.palantir.atlasdb:timestamp-impl": {
"project": true
},
"com.palantir.patches.sourceforge:trove3": {
"locked": "3.0.3-p5",
"requested": "3.0.3-p5"
},
"com.palantir.remoting-api:ssl-config": {
"locked": "1.1.0",
"transitive": [
Expand Down Expand Up @@ -672,6 +676,10 @@
"com.palantir.atlasdb:timestamp-impl": {
"project": true
},
"com.palantir.patches.sourceforge:trove3": {
"locked": "3.0.3-p5",
"requested": "3.0.3-p5"
},
"com.palantir.remoting-api:ssl-config": {
"locked": "1.1.0",
"transitive": [
Expand Down
1 change: 1 addition & 0 deletions atlasdb-cli-distribution/versions.lock
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@
"com.palantir.patches.sourceforge:trove3": {
"locked": "3.0.3-p5",
"transitive": [
"com.palantir.atlasdb:atlasdb-cassandra",
"com.palantir.atlasdb:atlasdb-impl-shared",
"com.palantir.atlasdb:lock-impl"
]
Expand Down
2 changes: 2 additions & 0 deletions atlasdb-cli/versions.lock
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@
"com.palantir.patches.sourceforge:trove3": {
"locked": "3.0.3-p5",
"transitive": [
"com.palantir.atlasdb:atlasdb-cassandra",
"com.palantir.atlasdb:atlasdb-impl-shared",
"com.palantir.atlasdb:lock-impl"
]
Expand Down Expand Up @@ -1324,6 +1325,7 @@
"com.palantir.patches.sourceforge:trove3": {
"locked": "3.0.3-p5",
"transitive": [
"com.palantir.atlasdb:atlasdb-cassandra",
"com.palantir.atlasdb:atlasdb-impl-shared",
"com.palantir.atlasdb:lock-impl"
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ private AtlasDbConstants() {
public static final long DEFAULT_SWEEP_PAUSE_MILLIS = 5 * 1000;
public static final long DEFAULT_SWEEP_PERSISTENT_LOCK_WAIT_MILLIS = 30_000L;
public static final int DEFAULT_SWEEP_DELETE_BATCH_HINT = 1_000;
// TODO(gsheasby): Bump up this default once getRangeOfTimestamps has been replaced.
public static final int DEFAULT_SWEEP_CANDIDATE_BATCH_HINT = 1;
public static final int DEFAULT_SWEEP_CANDIDATE_BATCH_HINT = 1024;
public static final int DEFAULT_SWEEP_READ_LIMIT = 1_000;

public static final int DEFAULT_STREAM_IN_MEMORY_THRESHOLD = 4 * 1024 * 1024;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,9 @@ public ClosableIterator<List<CandidateCellForSweeping>> getCandidateCellsForSwee
Cell cell = Cell.create(rr.getRowName(), colName);
boolean latestValEmpty = isLatestValueEmpty(cell, peekingValues);
numExamined.add(timestampArr.length);
boolean candidate = isCandidate(timestampArr, latestValEmpty, request);
candidateBatch.add(ImmutableCandidateCellForSweeping.builder()
.cell(cell)
.sortedTimestamps(candidate ? timestampArr : EMPTY_LONG_ARRAY)
.sortedTimestamps(timestampArr)
.isLatestValueEmpty(latestValEmpty)
.numCellsTsPairsExamined(numExamined.longValue())
.build());
Expand All @@ -109,18 +108,6 @@ private static Closer createCloserAndRelease(ReleasableCloseable<?>... closeable
return closer;
}

private static boolean isCandidate(long[] timestamps,
boolean lastValEmpty,
CandidateCellForSweepingRequest request) {
return timestamps.length > 1
|| (request.shouldCheckIfLatestValueIsEmpty() && lastValEmpty)
|| (timestamps.length == 1 && timestampIsPotentiallySweepable(timestamps[0], request));
}

private static boolean timestampIsPotentiallySweepable(long ts, CandidateCellForSweepingRequest request) {
return ts == Value.INVALID_VALUE_TIMESTAMP || ts >= request.minUncommittedStartTimestamp();
}

private ClosableIterator<RowResult<Value>> getValues(TableReference tableRef,
RangeRequest range,
long sweepTs,
Expand Down Expand Up @@ -175,5 +162,4 @@ public void close() {
}
}

private static final long[] EMPTY_LONG_ARRAY = new long[0];
}
1 change: 1 addition & 0 deletions atlasdb-console-distribution/versions.lock
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@
"com.palantir.patches.sourceforge:trove3": {
"locked": "3.0.3-p5",
"transitive": [
"com.palantir.atlasdb:atlasdb-cassandra",
"com.palantir.atlasdb:atlasdb-impl-shared",
"com.palantir.atlasdb:lock-impl"
]
Expand Down
12 changes: 12 additions & 0 deletions atlasdb-container-test-utils/versions.lock
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,12 @@
"com.palantir.docker.proxy:docker-proxy-rule": {
"locked": "0.3.0"
},
"com.palantir.patches.sourceforge:trove3": {
"locked": "3.0.3-p5",
"transitive": [
"com.palantir.atlasdb:atlasdb-cassandra"
]
},
"com.palantir.remoting-api:ssl-config": {
"locked": "1.1.0",
"transitive": [
Expand Down Expand Up @@ -865,6 +871,12 @@
"com.palantir.docker.proxy:docker-proxy-rule": {
"locked": "0.3.0"
},
"com.palantir.patches.sourceforge:trove3": {
"locked": "3.0.3-p5",
"transitive": [
"com.palantir.atlasdb:atlasdb-cassandra"
]
},
"com.palantir.remoting-api:ssl-config": {
"locked": "1.1.0",
"transitive": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.jayway.awaitility.Awaitility;
import com.jayway.awaitility.Duration;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.ConnectionManagerAwareDbKvs;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres.DbKvsPostgresGetCandidateCellsForSweepingTest;
import com.palantir.docker.compose.DockerComposeRule;
import com.palantir.docker.compose.configuration.ShutdownStrategy;
import com.palantir.docker.compose.connection.Container;
Expand Down Expand Up @@ -96,7 +97,8 @@ private static Callable<Boolean> canCreateKeyValueService() {
kvs = ConnectionManagerAwareDbKvs.create(getKvsConfig());
return kvs.getConnectionManager().getConnection().isValid(5);
} catch (Exception ex) {
if (ex.getMessage().contains("The connection attempt failed.")) {
if (ex.getMessage().contains("The connection attempt failed.")
|| ex.getMessage().contains("the database system is starting up")) {
return false;
} else {
throw ex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,20 @@
* limitations under the License.
*/

package com.palantir.atlasdb.keyvalue.dbkvs;
package com.palantir.atlasdb.keyvalue.dbkvs.impl.postgres;

import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.keyvalue.dbkvs.DbKeyValueServiceConfig;
import com.palantir.atlasdb.keyvalue.dbkvs.DbkvsPostgresTestSuite;
import com.palantir.atlasdb.keyvalue.dbkvs.impl.ConnectionManagerAwareDbKvs;
import com.palantir.atlasdb.keyvalue.impl.AbstractGetCandidateCellsForSweepingTest;

public class DbKvsPostgresGetCandidateCellsForSweepingTest extends AbstractGetCandidateCellsForSweepingTest {

@Override
protected KeyValueService createKeyValueService() {
return ConnectionManagerAwareDbKvs.create(DbkvsPostgresTestSuite.getKvsConfig());
DbKeyValueServiceConfig config = DbkvsPostgresTestSuite.getKvsConfig();
return ConnectionManagerAwareDbKvs.create(config);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@
import com.palantir.nexus.db.sql.SqlConnectionHelper;

// This class should be removed and replaced by DbKvs when InDbTimestampStore depends directly on DbKvs
public class ConnectionManagerAwareDbKvs extends ForwardingKeyValueService {
public final class ConnectionManagerAwareDbKvs extends ForwardingKeyValueService {
private final DbKvs kvs;
private final ConnectionManager connManager;
private final SqlConnectionSupplier sqlConnectionSupplier;

public static ConnectionManagerAwareDbKvs create(DbKeyValueServiceConfig config) {
HikariCPConnectionManager connManager = new HikariCPConnectionManager(config.connection());
ReentrantManagedConnectionSupplier connSupplier = new ReentrantManagedConnectionSupplier(connManager);
SqlConnectionSupplier sqlConnSupplier = getSimpleTimedSqlConnectionSupplier(connSupplier);

return new ConnectionManagerAwareDbKvs(DbKvs.create(config, sqlConnSupplier), connManager);
return new ConnectionManagerAwareDbKvs(DbKvs.create(config, sqlConnSupplier), connManager, sqlConnSupplier);
}

private static SqlConnectionSupplier getSimpleTimedSqlConnectionSupplier(
Expand Down Expand Up @@ -91,9 +91,13 @@ public void close() {
};
}

public ConnectionManagerAwareDbKvs(DbKvs dbKvs, ConnectionManager connManager) {
this.kvs = dbKvs;
private ConnectionManagerAwareDbKvs(
DbKvs kvs,
ConnectionManager connManager,
SqlConnectionSupplier sqlConnectionSupplier) {
this.kvs = kvs;
this.connManager = connManager;
this.sqlConnectionSupplier = sqlConnectionSupplier;
}

@VisibleForTesting
Expand All @@ -106,6 +110,10 @@ public ConnectionManager getConnectionManager() {
return connManager;
}

public SqlConnectionSupplier getSqlConnectionSupplier() {
return sqlConnectionSupplier;
}

public String getTablePrefix() {
return kvs.getTablePrefix();
}
Expand Down
Loading