Skip to content

Commit

Permalink
Optimizes out cassandra index inserts that don't improve query results (
Browse files Browse the repository at this point in the history
#1172)

For example, a trace that fans out into N spans against the same service
will end up with a fixed amount of cassandra inserts as opposed to a
function of N.

This removes extra requests when a collector receives a large amount of
span data for a single trace. That could be the case when one or more of
the following are true:

* instrumentation bundles all local spans in a trace into one message
* traces are routed consistently to a single storage component
* there's only one collector

The overhead of this feature with defaults should be tens of megabytes
heap. Disable by setting `CassandraStorage.Builder.indexCacheMax(0)`
  • Loading branch information
adriancole authored Jul 9, 2016
1 parent d45ec7a commit 4dc8ba9
Show file tree
Hide file tree
Showing 18 changed files with 807 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public class ZipkinCassandraStorageProperties {
private String password;
private int spanTtl = (int) TimeUnit.DAYS.toSeconds(7);
private int indexTtl = (int) TimeUnit.DAYS.toSeconds(3);
/** See {@link CassandraStorage.Builder#indexCacheMax(int)} */
private int indexCacheMax = 100000;
/** See {@link CassandraStorage.Builder#indexCacheTtl(int)} */
private int indexCacheTtl = 60;

public String getKeyspace() {
return keyspace;
Expand Down Expand Up @@ -117,6 +121,22 @@ public void setIndexTtl(int indexTtl) {
this.indexTtl = indexTtl;
}

public int getIndexCacheMax() {
return indexCacheMax;
}

public void setIndexCacheMax(int indexCacheMax) {
this.indexCacheMax = indexCacheMax;
}

public int getIndexCacheTtl() {
return indexCacheTtl;
}

public void setIndexCacheTtl(int indexCacheTtl) {
this.indexCacheTtl = indexCacheTtl;
}

public CassandraStorage.Builder toBuilder() {
return CassandraStorage.builder()
.keyspace(keyspace)
Expand All @@ -127,6 +147,8 @@ public CassandraStorage.Builder toBuilder() {
.username(username)
.password(password)
.spanTtl(spanTtl)
.indexTtl(indexTtl);
.indexTtl(indexTtl)
.indexCacheMax(indexCacheMax)
.indexCacheTtl(indexCacheTtl);
}
}
7 changes: 6 additions & 1 deletion zipkin-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,15 @@ supports version 2.2+ and applies when `STORAGE_TYPE` is set to `cassandra`:
* `CASSANDRA_KEYSPACE`: The keyspace to use. Defaults to "zipkin".
* `CASSANDRA_CONTACT_POINTS`: Comma separated list of hosts / ip addresses part of Cassandra cluster. Defaults to localhost
* `CASSANDRA_LOCAL_DC`: Name of the datacenter that will be considered "local" for latency load balancing. When unset, load-balancing is round-robin.
* `CASSANDRA_MAX_CONNECTIONS`: Max pooled connections per datacenter-local host. Defaults to 8
* `CASSANDRA_ENSURE_SCHEMA`: Ensuring cassandra has the latest schema. If enabled tries to execute scripts in the classpath prefixed with `cassandra-schema-cql3`. Defaults to true
* `CASSANDRA_USERNAME` and `CASSANDRA_PASSWORD`: Cassandra authentication. Will throw an exception on startup if authentication fails. No default

The following are tuning parameters which may not concern all users:

* `CASSANDRA_MAX_CONNECTIONS`: Max pooled connections per datacenter-local host. Defaults to 8
* `CASSANDRA_INDEX_CACHE_MAX`: Maximum trace index metadata entries to cache. Zero disables caching. Defaults to 100000.
* `CASSANDRA_INDEX_CACHE_TTL`: How many seconds to cache index metadata about a trace. Defaults to 60.

Example usage:

```bash
Expand Down
4 changes: 4 additions & 0 deletions zipkin-server/src/main/resources/zipkin-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ zipkin:
span-ttl: ${CASSANDRA_SPAN_TTL:604800}
# 3 days in seconds
index-ttl: ${CASSANDRA_INDEX_TTL:259200}
# the maximum trace index metadata entries to cache
index-cache-max: ${CASSANDRA_INDEX_CACHE_MAX:100000}
# how long to cache index metadata about a trace. 1 minute in seconds
index-cache-ttl: ${CASSANDRA_INDEX_CACHE_TTL:60}
elasticsearch:
cluster: ${ES_CLUSTER:elasticsearch}
hosts: ${ES_HOSTS:localhost:9300}
Expand Down
8 changes: 5 additions & 3 deletions zipkin-storage/cassandra/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,23 @@ This CQL-based Cassandra 2.2+ storage component includes a `GuavaSpanStore` and

The implementation uses the [Datastax Java Driver 3.x](https://github.com/datastax/java-driver).

The CQL schema is the same as [zipkin-scala](https://github.com/openzipkin/zipkin/tree/master/zipkin-cassandra).

`zipkin.storage.cassandra.CassandraStorage.Builder` includes defaults that will
operate against a local Cassandra installation.

## Logging
Queries are logged to the category "com.datastax.driver.core.QueryLogger" when debug or trace is
enabled via SLF4J. Trace level includes bound values.

See [Logging Query Latencies](http://docs.datastax.com/en/developer/java-driver/2.1/supplemental/manual/logging/#logging-query-latencies) for more details.
See [Logging Query Latencies](http://docs.datastax.com/en/developer/java-driver/3.0/supplemental/manual/logging/#logging-query-latencies) for more details.

## Performance notes

Redundant requests to store service or span names are ignored for an hour to reduce load.

Indexing of traces are optimized by default. This reduces writes to Cassandra at the cost of memory
needed to cache state. This cache is tunable based on your typical trace duration and span count.
See [CassandraStorage](src/main/java/zipkin/storage/cassandra/CassandraStorage.java) for details.

## Testing this component
This module conditionally runs integration tests against a local Cassandra instance.

Expand Down
7 changes: 7 additions & 0 deletions zipkin-storage/cassandra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-guava</artifactId>
<version>3.0.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,23 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.cache.CacheBuilderSpec;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin.Codec;
import zipkin.Span;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.Nullable;
import zipkin.internal.Pair;
import zipkin.storage.guava.GuavaSpanConsumer;

import static com.google.common.util.concurrent.Futures.transform;
import static zipkin.storage.cassandra.CassandraUtil.annotationKeys;
import static zipkin.storage.cassandra.CassandraUtil.bindWithName;
import static zipkin.storage.cassandra.CassandraUtil.durationIndexBucket;

Expand All @@ -48,32 +48,28 @@ final class CassandraSpanConsumer implements GuavaSpanConsumer {
= Long.getLong("zipkin.store.cassandra.internal.writtenNamesTtl", 60 * 60 * 1000);

private static final Function<Object, Void> TO_VOID = Functions.<Void>constant(null);
private static final Random RAND = new Random();

private final Session session;
private final TimestampCodec timestampCodec;
private final int bucketCount;
@Deprecated
private final int spanTtl;
@Deprecated
private final int indexTtl;
private final Integer indexTtl;
private final PreparedStatement insertSpan;
private final PreparedStatement insertServiceName;
private final PreparedStatement insertSpanName;
private final PreparedStatement insertTraceIdByServiceName;
private final PreparedStatement insertTraceIdBySpanName;
private final PreparedStatement insertTraceIdByAnnotation;
private final PreparedStatement insertTraceIdBySpanDuration;
private final Schema.Metadata metadata;
private final DeduplicatingExecutor deduplicatingExecutor;
private final CompositeIndexer indexer;

CassandraSpanConsumer(Session session, int bucketCount, int spanTtl, int indexTtl) {
CassandraSpanConsumer(Session session, int bucketCount, int spanTtl, int indexTtl,
@Nullable CacheBuilderSpec indexCacheSpec) {
this.session = session;
this.timestampCodec = new TimestampCodec(session);
this.bucketCount = bucketCount;
this.spanTtl = spanTtl;
this.indexTtl = indexTtl;
this.metadata = Schema.readMetadata(session);
this.indexTtl = metadata.hasDefaultTtl ? null : indexTtl;
insertSpan = session.prepare(
maybeUseTtl(QueryBuilder
.insertInto("traces")
Expand All @@ -94,29 +90,6 @@ final class CassandraSpanConsumer implements GuavaSpanConsumer {
.value("bucket", 0) // bucket is deprecated on this index
.value("span_name", QueryBuilder.bindMarker("span_name"))));

insertTraceIdByServiceName = session.prepare(
maybeUseTtl(QueryBuilder
.insertInto(Tables.SERVICE_NAME_INDEX)
.value("service_name", QueryBuilder.bindMarker("service_name"))
.value("bucket", QueryBuilder.bindMarker("bucket"))
.value("ts", QueryBuilder.bindMarker("ts"))
.value("trace_id", QueryBuilder.bindMarker("trace_id"))));

insertTraceIdBySpanName = session.prepare(
maybeUseTtl(QueryBuilder
.insertInto(Tables.SERVICE_SPAN_NAME_INDEX)
.value("service_span_name", QueryBuilder.bindMarker("service_span_name"))
.value("ts", QueryBuilder.bindMarker("ts"))
.value("trace_id", QueryBuilder.bindMarker("trace_id"))));

insertTraceIdByAnnotation = session.prepare(
maybeUseTtl(QueryBuilder
.insertInto(Tables.ANNOTATIONS_INDEX)
.value("annotation", QueryBuilder.bindMarker("annotation"))
.value("bucket", QueryBuilder.bindMarker("bucket"))
.value("ts", QueryBuilder.bindMarker("ts"))
.value("trace_id", QueryBuilder.bindMarker("trace_id"))));

insertTraceIdBySpanDuration = session.prepare(
maybeUseTtl(QueryBuilder
.insertInto(Tables.SPAN_DURATION_INDEX)
Expand All @@ -128,10 +101,11 @@ final class CassandraSpanConsumer implements GuavaSpanConsumer {
.value("trace_id", QueryBuilder.bindMarker("trace_id"))));

deduplicatingExecutor = new DeduplicatingExecutor(session, WRITTEN_NAMES_TTL);
indexer = new CompositeIndexer(session, indexCacheSpec, bucketCount, this.indexTtl);
}

private RegularStatement maybeUseTtl(Insert value) {
return metadata.hasDefaultTtl
return indexTtl == null
? value
: value.using(QueryBuilder.ttl(QueryBuilder.bindMarker("ttl_")));
}
Expand All @@ -142,10 +116,13 @@ private RegularStatement maybeUseTtl(Insert value) {
*/
@Override
public ListenableFuture<Void> accept(List<Span> rawSpans) {
List<ListenableFuture<?>> futures = new LinkedList<>();
ImmutableSet.Builder<ListenableFuture<?>> futures = ImmutableSet.builder();

ImmutableList.Builder<Span> spans = ImmutableList.builder();
for (Span rawSpan : rawSpans) {
// indexing occurs by timestamp, so derive one if not present.
Span span = ApplyTimestampAndDuration.apply(rawSpan);
spans.add(span);

futures.add(storeSpan(
span.traceId,
Expand All @@ -165,37 +142,21 @@ public ListenableFuture<Void> accept(List<Span> rawSpans) {
futures.add(storeSpanName(serviceName, span.name));
}

if (span.timestamp != null) {
// QueryRequest.serviceName
futures.add(storeTraceIdByServiceName(serviceName, span.timestamp, span.traceId));

// QueryRequest.spanName
if (!span.name.isEmpty()) {
futures.add(storeTraceIdBySpanName(
serviceName, span.name, span.timestamp, span.traceId));
}

// QueryRequest.min/maxDuration
if (span.duration != null) {
// Contract for Repository.storeTraceIdByDuration is to store the span twice, once with
// the span name and another with empty string.
// QueryRequest.min/maxDuration
if (span.timestamp != null && span.duration != null) {
// Contract for Repository.storeTraceIdByDuration is to store the span twice, once with
// the span name and another with empty string.
futures.add(storeTraceIdByDuration(
serviceName, span.name, span.timestamp, span.duration, span.traceId));
if (!span.name.isEmpty()) { // If span.name == "", this would be redundant
futures.add(storeTraceIdByDuration(
serviceName, span.name, span.timestamp, span.duration, span.traceId));
if (!span.name.isEmpty()) { // If span.name == "", this would be redundant
futures.add(storeTraceIdByDuration(
serviceName, "", span.timestamp, span.duration, span.traceId));
}
serviceName, "", span.timestamp, span.duration, span.traceId));
}
}
}
// QueryRequest.annotations/binaryAnnotations
if (span.timestamp != null) {
for (String annotation : annotationKeys(span)) {
futures.add(storeTraceIdByAnnotation(annotation, span.timestamp, span.traceId));
}
}
}
return transform(Futures.allAsList(futures), TO_VOID);
futures.addAll(indexer.index(spans.build()));
return transform(Futures.allAsList(futures.build()), TO_VOID);
}

/**
Expand Down Expand Up @@ -225,68 +186,18 @@ ListenableFuture<?> storeSpan(long traceId, long timestamp, String key, ByteBuff
ListenableFuture<?> storeServiceName(final String serviceName) {
BoundStatement bound = bindWithName(insertServiceName, "insert-service-name")
.setString("service_name", serviceName);
if (!metadata.hasDefaultTtl) bound.setInt("ttl_", indexTtl);
if (indexTtl != null) bound.setInt("ttl_", indexTtl);
return deduplicatingExecutor.maybeExecuteAsync(bound, serviceName);
}

ListenableFuture<?> storeSpanName(String serviceName, String spanName) {
BoundStatement bound = bindWithName(insertSpanName, "insert-span-name")
.setString("service_name", serviceName)
.setString("span_name", spanName);
if (!metadata.hasDefaultTtl) bound.setInt("ttl_", indexTtl);
if (indexTtl != null) bound.setInt("ttl_", indexTtl);
return deduplicatingExecutor.maybeExecuteAsync(bound, Pair.create(serviceName, spanName));
}

ListenableFuture<?> storeTraceIdByServiceName(String serviceName, long timestamp, long traceId) {
int bucket = RAND.nextInt(bucketCount);
try {
BoundStatement bound =
bindWithName(insertTraceIdByServiceName, "insert-trace-id-by-service-name")
.setInt("bucket", bucket)
.setString("service_name", serviceName)
.setBytesUnsafe("ts", timestampCodec.serialize(timestamp))
.setLong("trace_id", traceId);
if (!metadata.hasDefaultTtl) bound.setInt("ttl_", indexTtl);

return session.executeAsync(bound);
} catch (RuntimeException ex) {
return Futures.immediateFailedFuture(ex);
}
}

ListenableFuture<?> storeTraceIdBySpanName(String serviceName, String spanName, long timestamp,
long traceId) {
String serviceSpanName = serviceName + "." + spanName;

try {
BoundStatement bound = bindWithName(insertTraceIdBySpanName, "insert-trace-id-by-span-name")
.setString("service_span_name", serviceSpanName)
.setBytesUnsafe("ts", timestampCodec.serialize(timestamp))
.setLong("trace_id", traceId);
if (!metadata.hasDefaultTtl) bound.setInt("ttl_", indexTtl);

return session.executeAsync(bound);
} catch (RuntimeException ex) {
return Futures.immediateFailedFuture(ex);
}
}

ListenableFuture<?> storeTraceIdByAnnotation(String annotationKey, long timestamp, long traceId) {
int bucket = RAND.nextInt(bucketCount);
try {
BoundStatement bound = bindWithName(insertTraceIdByAnnotation, "insert-trace-id-by-annotation")
.setInt("bucket", bucket)
.setBytes("annotation", CassandraUtil.toByteBuffer(annotationKey))
.setBytesUnsafe("ts", timestampCodec.serialize(timestamp))
.setLong("trace_id", traceId);
if (!metadata.hasDefaultTtl) bound.setInt("ttl_", indexTtl);

return session.executeAsync(bound);
} catch (CharacterCodingException | RuntimeException ex) {
return Futures.immediateFailedFuture(ex);
}
}

ListenableFuture<?> storeTraceIdByDuration(String serviceName, String spanName,
long timestamp, long duration, long traceId) {
int bucket = durationIndexBucket(timestamp);
Expand All @@ -299,7 +210,7 @@ ListenableFuture<?> storeTraceIdByDuration(String serviceName, String spanName,
.setBytesUnsafe("ts", timestampCodec.serialize(timestamp))
.setLong("duration", duration)
.setLong("trace_id", traceId);
if (!metadata.hasDefaultTtl) bound.setInt("ttl_", indexTtl);
if (indexTtl != null) bound.setInt("ttl_", indexTtl);

return session.executeAsync(bound);
} catch (RuntimeException ex) {
Expand All @@ -309,6 +220,7 @@ ListenableFuture<?> storeTraceIdByDuration(String serviceName, String spanName,

/** Clears any caches */
@VisibleForTesting void clear() {
indexer.clear();
deduplicatingExecutor.clear();
}
}
Loading

0 comments on commit 4dc8ba9

Please sign in to comment.