Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Propagate top-level KVS method names to CassandraClient #2669

Merged
merged 5 commits into from
Nov 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,28 +43,42 @@
public interface CassandraClient {
Cassandra.Client rawClient();

Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(TableReference tableRef,
Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(String kvsMethodName,
TableReference tableRef,
List<ByteBuffer> keys,
SlicePredicate predicate, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, org.apache.thrift.TException;

List<KeySlice> get_range_slices(TableReference tableRef, SlicePredicate predicate, KeyRange range,
List<KeySlice> get_range_slices(String kvsMethodName,
TableReference tableRef,
SlicePredicate predicate,
KeyRange range,
ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, org.apache.thrift.TException;

void batch_mutate(Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map, ConsistencyLevel consistency_level)
void batch_mutate(String kvsMethodName,
Map<ByteBuffer, Map<String, List<Mutation>>> mutation_map,
ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, org.apache.thrift.TException;

ColumnOrSuperColumn get(TableReference tableReference, ByteBuffer key,
byte[] column, ConsistencyLevel consistency_level)
ColumnOrSuperColumn get(TableReference tableReference,
ByteBuffer key,
byte[] column,
ConsistencyLevel consistency_level)
throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException,
org.apache.thrift.TException;

CASResult cas(TableReference tableReference, ByteBuffer key, List<Column> expected, List<Column> updates,
ConsistencyLevel serial_consistency_level, ConsistencyLevel commit_consistency_level)
CASResult cas(TableReference tableReference,
ByteBuffer key,
List<Column> expected,
List<Column> updates,
ConsistencyLevel serial_consistency_level,
ConsistencyLevel commit_consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, org.apache.thrift.TException;

CqlResult execute_cql3_query(ByteBuffer query, Compression compression, ConsistencyLevel consistency)
CqlResult execute_cql3_query(ByteBuffer query,
Compression compression,
ConsistencyLevel consistency)
throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException,
org.apache.thrift.TException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public CassandraClient create() throws Exception {
}

private CassandraClient instrumentClient(Client client) {
// TODO(ssouza): use the kvsMethodName to tag the timers.
return AtlasDbMetrics.instrument(CassandraClient.class, new CassandraClientImpl(client));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,19 @@ public Cassandra.Client rawClient() {

@Override
public Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(
TableReference tableRef, List<ByteBuffer> keys,
SlicePredicate predicate, ConsistencyLevel consistency_level)
String kvsMethodName,
TableReference tableRef,
List<ByteBuffer> keys,
SlicePredicate predicate,
ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException {
ColumnParent colFam = getColumnParent(tableRef);
return client.multiget_slice(keys, colFam, predicate, consistency_level);
}

@Override
public List<KeySlice> get_range_slices(TableReference tableRef,
public List<KeySlice> get_range_slices(String kvsMethodName,
TableReference tableRef,
SlicePredicate predicate,
KeyRange range,
ConsistencyLevel consistency_level)
Expand All @@ -76,7 +80,8 @@ public List<KeySlice> get_range_slices(TableReference tableRef,
}

@Override
public void batch_mutate(Map<ByteBuffer, Map<String, List<Mutation>>> mutation_map,
public void batch_mutate(String kvsMethodName,
Map<ByteBuffer, Map<String, List<Mutation>>> mutation_map,
ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException {
client.batch_mutate(mutation_map, consistency_level);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,20 +73,14 @@ private CassandraExpiringKeyValueService(

@Override
public void put(TableReference tableRef, Map<Cell, byte[]> values, long timestamp, long time, TimeUnit unit) {
try {
putInternal(
tableRef,
KeyValueServices.toConstantTimestampValues(values.entrySet(), timestamp),
CassandraKeyValueServices.convertTtl(time, unit));
} catch (Exception e) {
throw Throwables.throwUncheckedException(e);
}
expiringPut("put", tableRef, values, timestamp, time, unit);
}

@Override
public void putWithTimestamps(TableReference tableRef, Multimap<Cell, Value> values, long time, TimeUnit unit) {
try {
putInternal(tableRef, values.entries(), CassandraKeyValueServices.convertTtl(time, unit));
putInternal("putWithTimestamps", tableRef, values.entries(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this also call expiringPut?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried, but it actually have signatures (note that expiringPut adds the timestamp to the cells, where here we already have it).

CassandraKeyValueServices.convertTtl(time, unit));
} catch (Exception e) {
throw Throwables.throwUncheckedException(e);
}
Expand All @@ -112,7 +106,7 @@ public void multiPut(
for (final List<Entry<Cell, byte[]>> p : partitions) {
callables.add(() -> {
Thread.currentThread().setName("Atlas expiry multiPut of " + p.size() + " cells into " + table);
put(table, Maps2.fromEntries(p), timestamp, time, unit);
expiringPut("multiPut", table, Maps2.fromEntries(p), timestamp, time, unit);
return null;
});
}
Expand All @@ -135,4 +129,15 @@ public void multiPut(
}
}

private void expiringPut(String kvsMethodName, TableReference tableRef, Map<Cell, byte[]> values, long timestamp,
long time, TimeUnit unit) {
try {
putInternal(kvsMethodName,
tableRef,
KeyValueServices.toConstantTimestampValues(values.entrySet(), timestamp),
CassandraKeyValueServices.convertTtl(time, unit));
} catch (Exception e) {
throw Throwables.throwUncheckedException(e);
}
}
}
Loading