Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Refactor and Instrument CassandraClient api #2665

Merged
merged 8 commits into from
Nov 13, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.CqlResult;
Expand All @@ -39,30 +37,34 @@
import org.apache.cassandra.thrift.TimedOutException;
import org.apache.cassandra.thrift.UnavailableException;

import com.palantir.atlasdb.keyvalue.api.TableReference;

@SuppressWarnings({"all"}) // thrift variable names.
public interface CassandraClient {
Cassandra.Client rawClient();

Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(List<ByteBuffer> keys, ColumnParent column_parent,
Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(TableReference tableRef,
List<ByteBuffer> keys,
SlicePredicate predicate, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, org.apache.thrift.TException;

List<KeySlice> get_range_slices(ColumnParent column_parent, SlicePredicate predicate, KeyRange range,
List<KeySlice> get_range_slices(TableReference tableRef, SlicePredicate predicate, KeyRange range,
ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, org.apache.thrift.TException;

void batch_mutate(Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, org.apache.thrift.TException;

public ColumnOrSuperColumn get(ByteBuffer key, ColumnPath column_path, ConsistencyLevel consistency_level)
ColumnOrSuperColumn get(TableReference tableReference, ByteBuffer key,
byte[] column, ConsistencyLevel consistency_level)
throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException,
org.apache.thrift.TException;

CASResult cas(ByteBuffer key, String column_family, List<Column> expected, List<Column> updates,
CASResult cas(TableReference tableReference, ByteBuffer key, List<Column> expected, List<Column> updates,
ConsistencyLevel serial_consistency_level, ConsistencyLevel commit_consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, org.apache.thrift.TException;

public CqlResult execute_cql3_query(ByteBuffer query, Compression compression, ConsistencyLevel consistency)
CqlResult execute_cql3_query(ByteBuffer query, Compression compression, ConsistencyLevel consistency)
throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException,
org.apache.thrift.TException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.google.common.collect.Maps;
import com.palantir.atlasdb.cassandra.CassandraCredentialsConfig;
import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig;
import com.palantir.atlasdb.util.AtlasDbMetrics;
import com.palantir.common.exception.AtlasDbDependencyException;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.UnsafeArg;
Expand Down Expand Up @@ -88,7 +89,7 @@ public CassandraClient create() throws Exception {
}

private CassandraClient instrumentClient(Client client) {
return new CassandraClientImpl(client);
return AtlasDbMetrics.instrument(CassandraClient.class, new CassandraClientImpl(client));
}

private static Cassandra.Client getClient(InetSocketAddress addr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.thrift.TException;

import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.keyvalue.impl.AbstractKeyValueService;

@SuppressWarnings({"all"}) // thrift variable names.
public class CassandraClientImpl implements CassandraClient {
private Cassandra.Client client;
Expand All @@ -54,17 +57,22 @@ public Cassandra.Client rawClient() {
}

@Override
public Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(List<ByteBuffer> keys, ColumnParent column_parent,
public Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(
TableReference tableRef, List<ByteBuffer> keys,
SlicePredicate predicate, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException {
return client.multiget_slice(keys, column_parent, predicate, consistency_level);
ColumnParent colFam = new ColumnParent(AbstractKeyValueService.internalTableName(tableRef));
return client.multiget_slice(keys, colFam, predicate, consistency_level);
}

@Override
public List<KeySlice> get_range_slices(ColumnParent column_parent, SlicePredicate predicate, KeyRange range,
public List<KeySlice> get_range_slices(TableReference tableRef,
SlicePredicate predicate,
KeyRange range,
ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException {
return client.get_range_slices(column_parent, predicate, range, consistency_level);
ColumnParent colFam = new ColumnParent(AbstractKeyValueService.internalTableName(tableRef));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: refactor this line into a method

return client.get_range_slices(colFam, predicate, range, consistency_level);
}

@Override
Expand All @@ -75,16 +83,26 @@ public void batch_mutate(Map<ByteBuffer, Map<String, List<Mutation>>> mutation_m
}

@Override
public ColumnOrSuperColumn get(ByteBuffer key, ColumnPath column_path, ConsistencyLevel consistency_level)
public ColumnOrSuperColumn get(TableReference tableReference,
ByteBuffer key,
byte[] column,
ConsistencyLevel consistency_level)
throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException, TException {
return client.get(key, column_path, consistency_level);
ColumnPath columnPath = new ColumnPath(tableReference.getQualifiedName());
columnPath.setColumn(column);
return client.get(key, columnPath, consistency_level);
}

@Override
public CASResult cas(ByteBuffer key, String column_family, List<Column> expected, List<Column> updates,
ConsistencyLevel serial_consistency_level, ConsistencyLevel commit_consistency_level)
public CASResult cas(TableReference tableReference,
ByteBuffer key,
List<Column> expected,
List<Column> updates,
ConsistencyLevel serial_consistency_level,
ConsistencyLevel commit_consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException {
return client.cas(key, column_family, expected, updates, serial_consistency_level, commit_consistency_level);
return client.cas(key, tableReference.getQualifiedName(), expected, updates, serial_consistency_level,
commit_consistency_level);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,15 +486,6 @@ void runOneTimeStartupChecks() {
}
}

//todo dedupe this into a name-demangling class that everyone can access
protected static String internalTableName(TableReference tableRef) {
String tableName = tableRef.getQualifiedName();
if (tableName.startsWith("_")) {
return tableName;
}
return tableName.replaceFirst("\\.", "__");
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drive-by deletion, as this didn't seem to be used anywhere, and this exact method is available on AbstractKVS.internalTableName

private InetSocketAddress getAddressForHostThrowUnchecked(String host) {
try {
return getAddressForHost(host);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,12 +448,10 @@ public Map<Cell, Value> apply(CassandraClient client) throws Exception {

List<ByteBuffer> rowNames = wrap(batch);

ColumnParent colFam = new ColumnParent(internalTableName(tableRef));
Map<ByteBuffer, List<ColumnOrSuperColumn>> results = multigetInternal(
client,
tableRef,
rowNames,
colFam,
pred,
readConsistency);
Map<Cell, Value> ret = Maps.newHashMapWithExpectedSize(batch.size());
Expand Down Expand Up @@ -648,7 +646,7 @@ public Void apply(CassandraClient client) throws Exception {
}

Map<ByteBuffer, List<ColumnOrSuperColumn>> results =
multigetInternal(client, tableRef, rowNames, colFam, predicate, consistency);
multigetInternal(client, tableRef, rowNames, predicate, consistency);
visitor.visit(results);
return null;
}
Expand Down Expand Up @@ -792,9 +790,8 @@ public RowColumnRangeExtractor.RowColumnRangeResult apply(CassandraClient client
Limit limit = Limit.of(batchColumnRangeSelection.getBatchHint());
SlicePredicate pred = SlicePredicates.create(range, limit);

ColumnParent colFam = new ColumnParent(internalTableName(tableRef));
Map<ByteBuffer, List<ColumnOrSuperColumn>> results =
multigetInternal(client, tableRef, wrap(rows), colFam, pred, readConsistency);
multigetInternal(client, tableRef, wrap(rows), pred, readConsistency);

RowColumnRangeExtractor extractor = new RowColumnRangeExtractor();
extractor.extractResults(rows, results, startTs);
Expand Down Expand Up @@ -849,9 +846,8 @@ public TokenBackedBasicResultsPage<Entry<Cell, Value>, byte[]> apply(CassandraCl

ByteBuffer rowByteBuffer = ByteBuffer.wrap(row);

ColumnParent colFam = new ColumnParent(internalTableName(tableRef));
Map<ByteBuffer, List<ColumnOrSuperColumn>> results = multigetInternal(client, tableRef,
ImmutableList.of(rowByteBuffer), colFam, pred, readConsistency);
ImmutableList.of(rowByteBuffer), pred, readConsistency);

if (results.isEmpty()) {
return SimpleTokenBackedResultsPage.create(startCol, ImmutableList.of(), false);
Expand Down Expand Up @@ -1200,11 +1196,11 @@ private Map<ByteBuffer, List<ColumnOrSuperColumn>> multigetInternal(
CassandraClient client,
TableReference tableRef,
List<ByteBuffer> rowNames,
ColumnParent colFam,
SlicePredicate pred,
ConsistencyLevel consistency) throws TException {
try {
return queryRunner.run(client, tableRef, () -> client.multiget_slice(rowNames, colFam, pred, consistency));
return queryRunner.run(client, tableRef, () -> client.multiget_slice(tableRef, rowNames, pred,
consistency));
} catch (UnavailableException e) {
throw new InsufficientConsistencyException(
"This get operation requires " + consistency + " Cassandra nodes to be up and available.", e);
Expand Down Expand Up @@ -2165,8 +2161,8 @@ private CASResult executeCheckAndSet(CassandraClient client, CheckAndSetRequest

Column newColumn = makeColumn(colName, request.newValue(), timestamp);
return queryRunner.run(client, table, () -> client.cas(
table,
rowName,
internalTableName(table),
oldColumns,
ImmutableList.of(newColumn),
ConsistencyLevel.SERIAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,10 @@ public synchronized long getUpperLimit() {
@Override
public Long apply(CassandraClient client) {
ByteBuffer rowName = getRowName();
ColumnPath columnPath = new ColumnPath(AtlasDbConstants.TIMESTAMP_TABLE.getQualifiedName());
columnPath.setColumn(getColumnName());
ColumnOrSuperColumn result;
try {
result = client.get(rowName, columnPath, ConsistencyLevel.LOCAL_QUORUM);
result = client.get(AtlasDbConstants.TIMESTAMP_TABLE, rowName, getColumnName(),
ConsistencyLevel.LOCAL_QUORUM);
} catch (NotFoundException e) {
result = null;
} catch (Exception e) {
Expand Down Expand Up @@ -190,8 +189,8 @@ private void cas(CassandraClient client, Long oldVal, long newVal) {
try {
DebugLogger.logger.info("[CAS] Trying to set upper limit from {} to {}.", oldVal, newVal);
result = client.cas(
AtlasDbConstants.TIMESTAMP_TABLE,
getRowName(),
AtlasDbConstants.TIMESTAMP_TABLE.getQualifiedName(),
oldVal == null ? ImmutableList.of() : ImmutableList.of(makeColumn(oldVal)),
ImmutableList.of(makeColumn(newVal)),
ConsistencyLevel.SERIAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ private CASResult writeDdlLockWithCas(CassandraClient client, Column ourUpdate,
try {
return queryRunner.run(client, lockTable,
() -> client.cas(
lockTable,
SchemaMutationLock.getGlobalDdlLockRowName(),
lockTable.getQualifiedName(),
expected,
ImmutableList.of(ourUpdate),
ConsistencyLevel.SERIAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,13 +344,12 @@ private boolean trySchemaMutationUnlockOnce(long perOperationNodeId) {

private Optional<Column> queryExistingLockColumn(CassandraClient client) throws TException {
TableReference lockTableRef = lockTable.getOnlyTable();
ColumnPath columnPath = new ColumnPath(lockTableRef.getQualifiedName());
columnPath.setColumn(getGlobalDdlLockColumnName());
Column existingColumn = null;
ConsistencyLevel localQuorum = ConsistencyLevel.LOCAL_QUORUM;
try {
ColumnOrSuperColumn result = queryRunner.run(client, lockTableRef,
() -> client.get(getGlobalDdlLockRowName(), columnPath, localQuorum));
() -> client.get(lockTableRef, getGlobalDdlLockRowName(), getGlobalDdlLockColumnName(),
localQuorum));
existingColumn = result.getColumn();
} catch (UnavailableException e) {
throw new InsufficientConsistencyException(
Expand All @@ -372,8 +371,8 @@ private CASResult writeDdlLockWithCas(
TableReference lockTableRef = lockTable.getOnlyTable();
return queryRunner.run(client, lockTableRef,
() -> client.cas(
lockTableRef,
getGlobalDdlLockRowName(),
lockTableRef.getQualifiedName(),
expectedLockValue,
ImmutableList.of(newLockValue),
ConsistencyLevel.SERIAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public RowGetter(
}

public List<KeySlice> getRows(KeyRange keyRange, SlicePredicate slicePredicate) throws Exception {
ColumnParent colFam = new ColumnParent(CassandraKeyValueServiceImpl.internalTableName(tableRef));
InetSocketAddress host = clientPool.getRandomHostForKey(keyRange.getStart_key());
return clientPool.runWithRetryOnHost(
host,
Expand All @@ -61,7 +60,7 @@ public List<KeySlice> getRows(KeyRange keyRange, SlicePredicate slicePredicate)
public List<KeySlice> apply(CassandraClient client) throws Exception {
try {
return queryRunner.run(client, tableRef,
() -> client.get_range_slices(colFam, slicePredicate, keyRange, consistency));
() -> client.get_range_slices(tableRef, slicePredicate, keyRange, consistency));
} catch (UnavailableException e) {
throw new InsufficientConsistencyException("get_range_slices requires " + consistency
+ " Cassandra nodes to be up and available.", e);
Expand All @@ -72,7 +71,7 @@ public List<KeySlice> apply(CassandraClient client) throws Exception {

@Override
public String toString() {
return "get_range_slices(" + colFam + ")";
return "get_range_slices(" + tableRef + ")";
}
});
}
Expand Down