Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Add traces for kvs-get #5466

Merged
merged 6 commits into from
May 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import com.palantir.common.streams.KeyedStream;
import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.SafeArg;
import com.palantir.tracing.CloseableTracer;
import com.palantir.tracing.Tracers;
import com.palantir.tritium.metrics.MetricRegistries;
import com.palantir.tritium.metrics.registry.TaggedMetricRegistry;
Expand Down Expand Up @@ -777,7 +778,9 @@ public Map<Cell, Value> get(TableReference tableRef, Map<Cell, Long> timestampBy
ImmutableMap.Builder<Cell, Value> builder = ImmutableMap.builder();
for (long ts : cellsByTs.keySet()) {
StartTsResultsCollector collector = new StartTsResultsCollector(metricsManager, ts);
cellLoader.loadWithTs("get", tableRef, cellsByTs.get(ts), ts, false, collector, readConsistency);
try (CloseableTracer tracer = CloseableTracer.startSpan("loadWithTs")) {
cellLoader.loadWithTs("get", tableRef, cellsByTs.get(ts), ts, false, collector, readConsistency);
}
builder.putAll(collector.getCollectedResults());
}
return builder.build();
Expand All @@ -789,7 +792,10 @@ public Map<Cell, Value> get(TableReference tableRef, Map<Cell, Long> timestampBy
private Map<Cell, Value> get(
String kvsMethodName, TableReference tableRef, Set<Cell> cells, long maxTimestampExclusive) {
StartTsResultsCollector collector = new StartTsResultsCollector(metricsManager, maxTimestampExclusive);
cellLoader.loadWithTs(kvsMethodName, tableRef, cells, maxTimestampExclusive, false, collector, readConsistency);
try (CloseableTracer tracer = CloseableTracer.startSpan("loadWithTs")) {
cellLoader.loadWithTs(
kvsMethodName, tableRef, cells, maxTimestampExclusive, false, collector, readConsistency);
}
return collector.getCollectedResults();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.palantir.atlasdb.keyvalue.cassandra;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand All @@ -29,6 +30,7 @@
import com.palantir.atlasdb.util.AnnotationType;
import com.palantir.common.base.FunctionCheckedException;
import com.palantir.logsafe.SafeArg;
import com.palantir.tracing.CloseableTracer;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand Down Expand Up @@ -92,8 +94,20 @@ void loadWithTs(
boolean loadAllTs,
CassandraKeyValueServices.ThreadSafeResultVisitor visitor,
ConsistencyLevel consistency) {
Map<InetSocketAddress, List<Cell>> hostsAndCells =
HostPartitioner.partitionByHost(clientPool, cells, Cell::getRowName);
Map<InetSocketAddress, List<Cell>> hostsAndCells;
try (CloseableTracer tracer = CloseableTracer.startSpan(
"partitionByHost",
ImmutableMap.of(
"cells",
String.valueOf(cells.size()),
"tableRef",
LoggingArgs.safeInternalTableNameOrPlaceholder(tableRef.toString()),
"timestampClause",
loadAllTs ? "for all timestamps " : "",
"startTs",
String.valueOf(startTs)))) {
hostsAndCells = HostPartitioner.partitionByHost(clientPool, cells, Cell::getRowName);
}
int totalPartitions = hostsAndCells.keySet().size();

if (log.isTraceEnabled()) {
Expand All @@ -118,15 +132,17 @@ void loadWithTs(
SafeArg.of("ipPort", hostAndCells.getKey()));
}

tasks.addAll(getLoadWithTsTasksForSingleHost(
kvsMethodName,
hostAndCells.getKey(),
tableRef,
hostAndCells.getValue(),
startTs,
loadAllTs,
visitor,
consistency));
try (CloseableTracer tracer = CloseableTracer.startSpan("getLoadWithTsTasksForSingleHost")) {
tasks.addAll(getLoadWithTsTasksForSingleHost(
kvsMethodName,
hostAndCells.getKey(),
tableRef,
hostAndCells.getValue(),
startTs,
loadAllTs,
visitor,
consistency));
}
}

taskRunner.runAllTasksCancelOnFailure(tasks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,25 @@
*/
package com.palantir.atlasdb.keyvalue.cassandra;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.palantir.common.base.Throwables;
import com.palantir.tracing.DetachedSpan;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

class TaskRunner {
private ExecutorService executor;
private final ListeningExecutorService listeningExecutor;

TaskRunner(ExecutorService executor) {
this.executor = executor;
this.listeningExecutor = MoreExecutors.listeningDecorator(executor);
}

/*
Expand All @@ -44,22 +50,43 @@ <V> List<V> runAllTasksCancelOnFailure(List<Callable<V>> tasks) {
}
}

List<Future<V>> futures = new ArrayList<>(tasks.size());
List<ListenableFuture<V>> futures = new ArrayList<>(tasks.size());
for (Callable<V> task : tasks) {
futures.add(executor.submit(task));
DetachedSpan detachedSpan = DetachedSpan.start("task");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not actionable now: Yep, this is a reasonable first step. Later on we probably want a bit more information about what the tasks are doing (i.e. maybe change the method's signature or also allow users to provide a List where that has metadata about th task), but this would give us some useful signal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed

ListenableFuture<V> future = listeningExecutor.submit(task);
futures.add(attachDetachedSpanCompletion(detachedSpan, future, listeningExecutor));
}
try {
List<V> results = new ArrayList<>(tasks.size());
for (Future<V> future : futures) {
for (ListenableFuture<V> future : futures) {
results.add(future.get());
}
return results;
} catch (Exception e) {
throw Throwables.unwrapAndThrowAtlasDbDependencyException(e);
} finally {
for (Future<V> future : futures) {
for (ListenableFuture<V> future : futures) {
future.cancel(true);
}
}
}

private static <V> ListenableFuture<V> attachDetachedSpanCompletion(
DetachedSpan detachedSpan, ListenableFuture<V> future, Executor tracingExecutorService) {
Futures.addCallback(
future,
new FutureCallback<V>() {
@Override
public void onSuccess(V result) {
detachedSpan.complete();
}

@Override
public void onFailure(Throwable throwable) {
detachedSpan.complete();
}
},
tracingExecutorService);
return future;
}
}
6 changes: 6 additions & 0 deletions changelog/@unreleased/pr-5466.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
type: improvement
improvement:
description: |-
Add traces inside kvs-get to get more granular performance information.
links:
- https://github.com/palantir/atlasdb/pull/5466