Skip to content

Commit

Permalink
Over-fetches cassandra trace indexes to improve UX and fixes Cassandr…
Browse files Browse the repository at this point in the history
…a index (#1177)

* Over-fetches cassandra trace indexes to improve UX

Even when optimized, cassandra indexes will have more rows than distinct
(trace_id, timestamp) needed to satisfy query requests. This side-effect
in most cases is that users get less than `QueryRequest.limit` results
back. Lacking the ability to do any deduplication server-side, the only
opportunity left is to address this client-side.

This over-fetches by a multiplier `CASSANDRA_INDEX_FETCH_MULTIPLIER`,
which defaults to 3. For example, if a user requests 10 traces, 30 rows
are requested from indexes, but only 10 distinct trace ids are queried
for span data.

To disable this feature, set `CASSANDRA_INDEX_FETCH_MULTIPLIER=1`

Fixes #1142

* Fixes Cassandra indexes that lost traces in the same millisecond (#1153)

A schema bug resulted in Cassandra not indexing more than bucket count
(10) trace ids per millisecond+search input. This manifested as less
traces retrieved by UI search or Api query than expected. For example,
if you had 1000 traces that happened on the same service in the same
millisecond, only 10 would return.

The indexes affected are `service_span_name_index`, `service_name_index`
and `annotations_index` and this was a schema-only change. Those with
existing zipkin installations should recreate these indexes to solve the
problem.

Fixes #1142
  • Loading branch information
adriancole authored Jul 10, 2016
1 parent 4dc8ba9 commit 37687b1
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class ZipkinCassandraStorageProperties {
private int indexCacheMax = 100000;
/** See {@link CassandraStorage.Builder#indexCacheTtl(int)} */
private int indexCacheTtl = 60;
/** See {@link CassandraStorage.Builder#indexFetchMultiplier(int)} */
private int indexFetchMultiplier = 3;

public String getKeyspace() {
return keyspace;
Expand Down Expand Up @@ -137,6 +139,14 @@ public void setIndexCacheTtl(int indexCacheTtl) {
this.indexCacheTtl = indexCacheTtl;
}

public int getIndexFetchMultiplier() {
return indexFetchMultiplier;
}

public void setIndexFetchMultiplier(int indexFetchMultiplier) {
this.indexFetchMultiplier = indexFetchMultiplier;
}

public CassandraStorage.Builder toBuilder() {
return CassandraStorage.builder()
.keyspace(keyspace)
Expand All @@ -149,6 +159,7 @@ public CassandraStorage.Builder toBuilder() {
.spanTtl(spanTtl)
.indexTtl(indexTtl)
.indexCacheMax(indexCacheMax)
.indexCacheTtl(indexCacheTtl);
.indexCacheTtl(indexCacheTtl)
.indexFetchMultiplier(indexFetchMultiplier);
}
}
1 change: 1 addition & 0 deletions zipkin-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ The following are tuning parameters which may not concern all users:
* `CASSANDRA_MAX_CONNECTIONS`: Max pooled connections per datacenter-local host. Defaults to 8
* `CASSANDRA_INDEX_CACHE_MAX`: Maximum trace index metadata entries to cache. Zero disables caching. Defaults to 100000.
* `CASSANDRA_INDEX_CACHE_TTL`: How many seconds to cache index metadata about a trace. Defaults to 60.
* `CASSANDRA_INDEX_FETCH_MULTIPLIER`: How many more index rows to fetch than the user-supplied query limit. Defaults to 3.

Example usage:

Expand Down
2 changes: 2 additions & 0 deletions zipkin-server/src/main/resources/zipkin-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ zipkin:
index-cache-max: ${CASSANDRA_INDEX_CACHE_MAX:100000}
# how long to cache index metadata about a trace. 1 minute in seconds
index-cache-ttl: ${CASSANDRA_INDEX_CACHE_TTL:60}
# how many more index rows to fetch than the user-supplied query limit
index-fetch-multiplier: ${CASSANDRA_INDEX_FETCH_MULTIPLIER:3}
elasticsearch:
cluster: ${ES_CLUSTER:elasticsearch}
hosts: ${ES_HOSTS:localhost:9300}
Expand Down
4 changes: 4 additions & 0 deletions zipkin-storage/cassandra/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ Redundant requests to store service or span names are ignored for an hour to red

Indexing of traces are optimized by default. This reduces writes to Cassandra at the cost of memory
needed to cache state. This cache is tunable based on your typical trace duration and span count.

User-supplied query limits are over-fetched according to a configured index fetch multiplier in
attempts to mitigate redundant data returned from index queries.

See [CassandraStorage](src/main/java/zipkin/storage/cassandra/CassandraStorage.java) for details.

## Testing this component
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.google.common.base.Function;
import com.google.common.collect.ContiguousSet;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
Expand Down Expand Up @@ -78,6 +79,7 @@ public int compare(List<Span> left, List<Span> right) {

private final int durationTtl;
private final int maxTraceCols;
private final int indexFetchMultiplier;
private final Session session;
private final TimestampCodec timestampCodec;
private final Set<Integer> buckets;
Expand All @@ -92,9 +94,11 @@ public int compare(List<Span> left, List<Span> right) {
private final PreparedStatement selectTraceIdsBySpanDuration;
private final Function<ResultSet, Map<Long, Long>> traceIdToTimestamp;

CassandraSpanStore(Session session, int bucketCount, int indexTtl, int maxTraceCols) {
CassandraSpanStore(Session session, int bucketCount, int indexTtl, int maxTraceCols,
int indexFetchMultiplier) {
this.session = session;
this.maxTraceCols = maxTraceCols;
this.indexFetchMultiplier = indexFetchMultiplier;
ProtocolVersion protocolVersion = session.getCluster()
.getConfiguration().getProtocolOptions().getProtocolVersion();
this.timestampCodec = new TimestampCodec(protocolVersion);
Expand Down Expand Up @@ -212,23 +216,25 @@ public int compare(List<Span> left, List<Span> right) {
*/
@Override
public ListenableFuture<List<List<Span>>> getTraces(final QueryRequest request) {
// Over fetch on indexes as they don't return distinct (trace id, timestamp) rows.
final int traceIndexFetchSize = request.limit * indexFetchMultiplier;
ListenableFuture<Map<Long, Long>> traceIdToTimestamp;
if (request.minDuration != null || request.maxDuration != null) {
traceIdToTimestamp = getTraceIdsByDuration(request);
traceIdToTimestamp = getTraceIdsByDuration(request, traceIndexFetchSize);
} else if (request.spanName != null) {
traceIdToTimestamp = getTraceIdsBySpanName(request.serviceName, request.spanName,
request.endTs * 1000, request.lookback * 1000, request.limit);
request.endTs * 1000, request.lookback * 1000, traceIndexFetchSize);
} else if (request.serviceName != null) {
traceIdToTimestamp = getTraceIdsByServiceNames(Collections.singletonList(request.serviceName),
request.endTs * 1000, request.lookback * 1000, request.limit);
request.endTs * 1000, request.lookback * 1000, traceIndexFetchSize);
} else {
checkArgument(selectTraceIdsByServiceNames != null,
"getTraces without serviceName requires Cassandra 2.2 or later");
traceIdToTimestamp = transform(getServiceNames(),
new AsyncFunction<List<String>, Map<Long, Long>>() {
@Override public ListenableFuture<Map<Long, Long>> apply(List<String> serviceNames) {
return getTraceIdsByServiceNames(serviceNames,
request.endTs * 1000, request.lookback * 1000, request.limit);
request.endTs * 1000, request.lookback * 1000, traceIndexFetchSize);
}
});
}
Expand All @@ -242,18 +248,19 @@ public ListenableFuture<List<List<Span>>> getTraces(final QueryRequest request)
traceIds = Futures.transform(traceIdToTimestamp, CassandraUtil.keyset());
} else {
// While a valid port of the scala cassandra span store (from zipkin 1.35), there is a fault.
// each annotation key is an intersection, which means we are likely to return < limit.
// each annotation key is an intersection, meaning we likely return < traceIndexFetchSize.
List<ListenableFuture<Map<Long, Long>>> futureKeySetsToIntersect = new ArrayList<>();
futureKeySetsToIntersect.add(traceIdToTimestamp);
for (String annotationKey : annotationKeys) {
futureKeySetsToIntersect.add(getTraceIdsByAnnotation(annotationKey,
request.endTs * 1000, request.lookback * 1000, request.limit));
request.endTs * 1000, request.lookback * 1000, traceIndexFetchSize));
}
// We achieve the AND goal, by intersecting each of the key sets.
traceIds = Futures.transform(allAsList(futureKeySetsToIntersect), CassandraUtil.intersectKeySets());
}
return transform(traceIds, new AsyncFunction<Set<Long>, List<List<Span>>>() {
@Override public ListenableFuture<List<List<Span>>> apply(Set<Long> traceIds) {
traceIds = FluentIterable.from(traceIds).limit(request.limit).toSet();
return transform(getSpansByTraceIds(traceIds, maxTraceCols), AdjustTraces.INSTANCE);
}

Expand Down Expand Up @@ -504,7 +511,8 @@ ListenableFuture<Map<Long, Long>> getTraceIdsByAnnotation(String annotationKey,
}

/** Returns a map of trace id to timestamp (in microseconds) */
ListenableFuture<Map<Long, Long>> getTraceIdsByDuration(QueryRequest request) {
ListenableFuture<Map<Long, Long>> getTraceIdsByDuration(QueryRequest request,
int indexFetchSize) {
checkArgument(request.serviceName != null, "serviceName required on duration query");
long oldestData = (System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(durationTtl)) * 1000;

Expand All @@ -520,7 +528,7 @@ ListenableFuture<Map<Long, Long>> getTraceIdsByDuration(QueryRequest request) {

List<ListenableFuture<List<DurationRow>>> futures = new ArrayList<>();
for (int i = startBucket; i <= endBucket; i++) { // range closed
futures.add(oneBucketDurationQuery(request, i, startTs, endTs));
futures.add(oneBucketDurationQuery(request, i, startTs, endTs, indexFetchSize));
}

return transform(allAsList(futures),
Expand All @@ -540,12 +548,11 @@ ListenableFuture<Map<Long, Long>> getTraceIdsByDuration(QueryRequest request) {
}

ListenableFuture<List<DurationRow>> oneBucketDurationQuery(QueryRequest request, int bucket,
final long startTs, final long endTs) {
final long startTs, final long endTs, int indexFetchSize) {
String serviceName = request.serviceName;
String spanName = spanName(request.spanName);
long minDuration = request.minDuration;
long maxDuration = request.maxDuration != null ? request.maxDuration : Long.MAX_VALUE;
int limit = request.limit;
BoundStatement bound =
CassandraUtil.bindWithName(selectTraceIdsBySpanDuration, "select-trace-ids-by-span-duration")
.setInt("time_bucket", bucket)
Expand All @@ -554,10 +561,10 @@ ListenableFuture<List<DurationRow>> oneBucketDurationQuery(QueryRequest request,
.setLong("min_duration", minDuration)
.setLong("max_duration", maxDuration);

// optimistically setting fetch size to 'limit' here. Since we are likely to filter some results
// because their timestamps are out of range, we may need to fetch again.
// optimistically setting fetch size to 'indexFetchSize' here. Since we are likely to filter
// some results because their timestamps are out of range, we may need to fetch again.
// TODO figure out better strategy
bound.setFetchSize(limit);
bound.setFetchSize(indexFetchSize);

return transform(session.executeAsync(bound), new Function<ResultSet, List<DurationRow>>() {
@Override public List<DurationRow> apply(ResultSet rs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
import zipkin.internal.Nullable;
import zipkin.storage.QueryRequest;
import zipkin.storage.guava.LazyGuavaStorageComponent;

import static java.lang.String.format;
Expand Down Expand Up @@ -60,6 +61,7 @@ public static final class Builder {
int maxTraceCols = 100000;
int indexCacheMax = 100000;
int indexCacheTtl = 60;
int indexFetchMultiplier = 3;

/**
* Used to avoid hot spots when writing indexes used to query by service name or annotation.
Expand Down Expand Up @@ -195,6 +197,21 @@ public Builder indexCacheTtl(int indexCacheTtl) {
return this;
}

/**
* How many more index rows to fetch than the user-supplied query limit. Defaults to 3.
*
* <p>Backend requests will request {@link QueryRequest#limit} times this factor rows from
* Cassandra indexes in attempts to return {@link QueryRequest#limit} traces.
*
* <p>Indexing in cassandra will usually have more rows than trace identifiers due to factors
* including table design and collection implementation. As there's no way to DISTINCT out
* duplicates server-side, this over-fetches client-side when {@code indexFetchMultiplier} > 1.
*/
public Builder indexFetchMultiplier(int indexFetchMultiplier) {
this.indexFetchMultiplier = indexFetchMultiplier;
return this;
}

public CassandraStorage build() {
return new CassandraStorage(this);
}
Expand All @@ -217,6 +234,7 @@ public CassandraStorage build() {
final boolean ensureSchema;
final String keyspace;
final CacheBuilderSpec indexCacheSpec;
final int indexFetchMultiplier;
final LazySession session;

CassandraStorage(Builder builder) {
Expand All @@ -236,6 +254,7 @@ public CassandraStorage build() {
? null
: CacheBuilderSpec.parse("maximumSize=" + builder.indexCacheMax
+ ",expireAfterWrite=" + builder.indexCacheTtl + "s");
this.indexFetchMultiplier = builder.indexFetchMultiplier;
}

/** Lazy initializes or returns the session in use by this storage component. */
Expand All @@ -244,7 +263,8 @@ public Session session() {
}

@Override protected CassandraSpanStore computeGuavaSpanStore() {
return new CassandraSpanStore(session.get(), bucketCount, indexTtl, maxTraceCols);
return new CassandraSpanStore(session.get(), bucketCount, indexTtl, maxTraceCols,
indexFetchMultiplier);
}

@Override protected CassandraSpanConsumer computeGuavaSpanConsumer() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
CREATE KEYSPACE IF NOT EXISTS zipkin WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};

CREATE TABLE IF NOT EXISTS zipkin.service_span_name_index (
service_span_name text,
ts timestamp,
trace_id bigint,
PRIMARY KEY (service_span_name, ts)
service_span_name text, // Endpoint.serviceName + "." + Span.name
ts timestamp, // start timestamp of the span, truncated to millisecond precision
trace_id bigint, // trace ID. Included as a clustering column to avoid clashes (however unlikely)
PRIMARY KEY (service_span_name, ts, trace_id)
)
WITH CLUSTERING ORDER BY (ts DESC)
AND compaction = {'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy', 'max_window_size_seconds': '86400'}
AND default_time_to_live = 259200;

CREATE TABLE IF NOT EXISTS zipkin.service_name_index (
service_name text,
bucket int,
ts timestamp,
trace_id bigint,
PRIMARY KEY ((service_name, bucket), ts)
service_name text, // Endpoint.serviceName
bucket int, // avoids hot spots by distributing writes across each bucket, usually 0-9
ts timestamp, // start timestamp of the span, truncated to millisecond precision
trace_id bigint, // trace ID. Included as a clustering column to avoid clashes (however unlikely)
PRIMARY KEY ((service_name, bucket), ts, trace_id)
)
WITH CLUSTERING ORDER BY (ts DESC)
AND compaction = {'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy', 'max_window_size_seconds': '86400'}
Expand All @@ -31,11 +31,11 @@ CREATE TABLE IF NOT EXISTS zipkin.span_names (
AND default_time_to_live = 259200;

CREATE TABLE IF NOT EXISTS zipkin.annotations_index (
annotation blob,
bucket int,
ts timestamp,
trace_id bigint,
PRIMARY KEY ((annotation, bucket), ts)
annotation blob, // Annotation.value or BinaryAnnotation.key
bucket int, // avoids hot spots by distributing writes across each bucket, usually 0-9
ts timestamp, // start timestamp of the span, truncated to millisecond precision
trace_id bigint, // trace ID. Included as a clustering column to avoid clashes (however unlikely)
PRIMARY KEY ((annotation, bucket), ts, trace_id)
)
WITH CLUSTERING ORDER BY (ts DESC)
AND compaction = {'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy', 'max_window_size_seconds': '86400'}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,19 @@
*/
package zipkin.storage.cassandra;

import java.util.ArrayList;
import java.util.List;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import zipkin.Annotation;
import zipkin.Span;
import zipkin.TestObjects;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.storage.QueryRequest;
import zipkin.storage.SpanStoreTest;

import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;

public class CassandraSpanStoreTest extends SpanStoreTest {
Expand Down Expand Up @@ -60,14 +65,38 @@ public void rawTraceStoredWithoutAdjustments() {
.containsExactly(rawSpan);
}

/**
* The PRIMARY KEY of {@link Tables#SERVICE_NAME_INDEX} doesn't consider trace_id, so will only
* see bucket count traces to a service per millisecond.
*/
@Override public void getTraces_manyTraces() {
thrown.expect(AssertionError.class);
thrown.expectMessage("Expected size:<1000> but was:<10>");
@Test
public void overFetchesToCompensateForDuplicateIndexData() {
int traceCount = 100;

List<Span> spans = new ArrayList<>();
for (int i = 0; i < traceCount; i++) {
final long delta = i * 1000; // all timestamps happen a millisecond later
for (Span s : TestObjects.TRACE) {
spans.add(TestObjects.TRACE.get(0).toBuilder()
.traceId(s.traceId + i * 10)
.id(s.id + i * 10)
.timestamp(s.timestamp + delta)
.annotations(s.annotations.stream()
.map(a -> Annotation.create(a.timestamp + delta, a.value, a.endpoint))
.collect(toList()))
.build());
}
}

accept(spans.toArray(new Span[0]));

// Index ends up containing more rows than services * trace count, and cannot be de-duped
// in a server-side query.
assertThat(rowCount(Tables.SERVICE_NAME_INDEX))
.isGreaterThan(traceCount * store().getServiceNames().size());

// Implementation over-fetches on the index to allow the user to receive unsurprising results.
assertThat(store().getTraces(QueryRequest.builder().limit(traceCount).build()))
.hasSize(traceCount);
}

super.getTraces_manyTraces();
long rowCount(String table) {
return storage.session().execute("SELECT COUNT(*) from " + table).one().getLong(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,15 @@ public class CassandraWithOriginalSchemaSpanStoreTest extends CassandraSpanStore
public CassandraWithOriginalSchemaSpanStoreTest() {
super(CassandraWithOriginalSchemaTestGraph.INSTANCE.storage.get());
}

/**
* The PRIMARY KEY of {@link Tables#SERVICE_NAME_INDEX} doesn't consider trace_id, so will only
* see bucket count traces to a service per millisecond.
*/
@Override public void getTraces_manyTraces() {
thrown.expect(AssertionError.class);
thrown.expectMessage("Expected size:<1000> but was:<10>");

super.getTraces_manyTraces();
}
}

0 comments on commit 37687b1

Please sign in to comment.