From 37687b1f1872ef01ed10f1b677eae73d8184bc22 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Sun, 10 Jul 2016 09:21:12 +0800 Subject: [PATCH] Over-fetches cassandra trace indexes to improve UX and fixes Cassandra index (#1177) * Over-fetches cassandra trace indexes to improve UX Even when optimized, cassandra indexes will have more rows than distinct (trace_id, timestamp) needed to satisfy query requests. This side-effect in most cases is that users get less than `QueryRequest.limit` results back. Lacking the ability to do any deduplication server-side, the only opportunity left is to address this client-side. This over-fetches by a multiplier `CASSANDRA_INDEX_FETCH_MULTIPLIER`, which defaults to 3. For example, if a user requests 10 traces, 30 rows are requested from indexes, but only 10 distinct trace ids are queried for span data. To disable this feature, set `CASSANDRA_INDEX_FETCH_MULTIPLIER=1` Fixes #1142 * Fixes Cassandra indexes that lost traces in the same millisecond (#1153) A schema bug resulted in Cassandra not indexing more than bucket count (10) trace ids per millisecond+search input. This manifested as less traces retrieved by UI search or Api query than expected. For example, if you had 1000 traces that happened on the same service in the same millisecond, only 10 would return. The indexes affected are `service_span_name_index`, `service_name_index` and `annotations_index` and this was a schema-only change. Those with existing zipkin installations should recreate these indexes to solve the problem. Fixes #1142 --- .../ZipkinCassandraStorageProperties.java | 13 +++++- zipkin-server/README.md | 1 + .../src/main/resources/zipkin-server.yml | 2 + zipkin-storage/cassandra/README.md | 4 ++ .../storage/cassandra/CassandraSpanStore.java | 35 +++++++++------ .../storage/cassandra/CassandraStorage.java | 22 ++++++++- .../main/resources/cassandra-schema-cql3.txt | 28 ++++++------ .../cassandra/CassandraSpanStoreTest.java | 45 +++++++++++++++---- ...sandraWithOriginalSchemaSpanStoreTest.java | 11 +++++ 9 files changed, 123 insertions(+), 38 deletions(-) diff --git a/zipkin-autoconfigure/storage-cassandra/src/main/java/zipkin/autoconfigure/storage/cassandra/ZipkinCassandraStorageProperties.java b/zipkin-autoconfigure/storage-cassandra/src/main/java/zipkin/autoconfigure/storage/cassandra/ZipkinCassandraStorageProperties.java index 5bd43ce2fc7..14fda6f06fb 100644 --- a/zipkin-autoconfigure/storage-cassandra/src/main/java/zipkin/autoconfigure/storage/cassandra/ZipkinCassandraStorageProperties.java +++ b/zipkin-autoconfigure/storage-cassandra/src/main/java/zipkin/autoconfigure/storage/cassandra/ZipkinCassandraStorageProperties.java @@ -32,6 +32,8 @@ public class ZipkinCassandraStorageProperties { private int indexCacheMax = 100000; /** See {@link CassandraStorage.Builder#indexCacheTtl(int)} */ private int indexCacheTtl = 60; + /** See {@link CassandraStorage.Builder#indexFetchMultiplier(int)} */ + private int indexFetchMultiplier = 3; public String getKeyspace() { return keyspace; @@ -137,6 +139,14 @@ public void setIndexCacheTtl(int indexCacheTtl) { this.indexCacheTtl = indexCacheTtl; } + public int getIndexFetchMultiplier() { + return indexFetchMultiplier; + } + + public void setIndexFetchMultiplier(int indexFetchMultiplier) { + this.indexFetchMultiplier = indexFetchMultiplier; + } + public CassandraStorage.Builder toBuilder() { return CassandraStorage.builder() .keyspace(keyspace) @@ -149,6 +159,7 @@ public CassandraStorage.Builder toBuilder() { .spanTtl(spanTtl) .indexTtl(indexTtl) .indexCacheMax(indexCacheMax) - .indexCacheTtl(indexCacheTtl); + .indexCacheTtl(indexCacheTtl) + .indexFetchMultiplier(indexFetchMultiplier); } } diff --git a/zipkin-server/README.md b/zipkin-server/README.md index 811ea737829..9cd38ed2173 100644 --- a/zipkin-server/README.md +++ b/zipkin-server/README.md @@ -113,6 +113,7 @@ The following are tuning parameters which may not concern all users: * `CASSANDRA_MAX_CONNECTIONS`: Max pooled connections per datacenter-local host. Defaults to 8 * `CASSANDRA_INDEX_CACHE_MAX`: Maximum trace index metadata entries to cache. Zero disables caching. Defaults to 100000. * `CASSANDRA_INDEX_CACHE_TTL`: How many seconds to cache index metadata about a trace. Defaults to 60. + * `CASSANDRA_INDEX_FETCH_MULTIPLIER`: How many more index rows to fetch than the user-supplied query limit. Defaults to 3. Example usage: diff --git a/zipkin-server/src/main/resources/zipkin-server.yml b/zipkin-server/src/main/resources/zipkin-server.yml index 36fe309a90d..818f5ac456f 100644 --- a/zipkin-server/src/main/resources/zipkin-server.yml +++ b/zipkin-server/src/main/resources/zipkin-server.yml @@ -49,6 +49,8 @@ zipkin: index-cache-max: ${CASSANDRA_INDEX_CACHE_MAX:100000} # how long to cache index metadata about a trace. 1 minute in seconds index-cache-ttl: ${CASSANDRA_INDEX_CACHE_TTL:60} + # how many more index rows to fetch than the user-supplied query limit + index-fetch-multiplier: ${CASSANDRA_INDEX_FETCH_MULTIPLIER:3} elasticsearch: cluster: ${ES_CLUSTER:elasticsearch} hosts: ${ES_HOSTS:localhost:9300} diff --git a/zipkin-storage/cassandra/README.md b/zipkin-storage/cassandra/README.md index 4b397a754ee..26704b0051c 100644 --- a/zipkin-storage/cassandra/README.md +++ b/zipkin-storage/cassandra/README.md @@ -20,6 +20,10 @@ Redundant requests to store service or span names are ignored for an hour to red Indexing of traces are optimized by default. This reduces writes to Cassandra at the cost of memory needed to cache state. This cache is tunable based on your typical trace duration and span count. + +User-supplied query limits are over-fetched according to a configured index fetch multiplier in +attempts to mitigate redundant data returned from index queries. + See [CassandraStorage](src/main/java/zipkin/storage/cassandra/CassandraStorage.java) for details. ## Testing this component diff --git a/zipkin-storage/cassandra/src/main/java/zipkin/storage/cassandra/CassandraSpanStore.java b/zipkin-storage/cassandra/src/main/java/zipkin/storage/cassandra/CassandraSpanStore.java index 12f4fe4e9d0..598bed9eeef 100644 --- a/zipkin-storage/cassandra/src/main/java/zipkin/storage/cassandra/CassandraSpanStore.java +++ b/zipkin-storage/cassandra/src/main/java/zipkin/storage/cassandra/CassandraSpanStore.java @@ -22,6 +22,7 @@ import com.datastax.driver.core.querybuilder.QueryBuilder; import com.google.common.base.Function; import com.google.common.collect.ContiguousSet; +import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; @@ -78,6 +79,7 @@ public int compare(List left, List right) { private final int durationTtl; private final int maxTraceCols; + private final int indexFetchMultiplier; private final Session session; private final TimestampCodec timestampCodec; private final Set buckets; @@ -92,9 +94,11 @@ public int compare(List left, List right) { private final PreparedStatement selectTraceIdsBySpanDuration; private final Function> traceIdToTimestamp; - CassandraSpanStore(Session session, int bucketCount, int indexTtl, int maxTraceCols) { + CassandraSpanStore(Session session, int bucketCount, int indexTtl, int maxTraceCols, + int indexFetchMultiplier) { this.session = session; this.maxTraceCols = maxTraceCols; + this.indexFetchMultiplier = indexFetchMultiplier; ProtocolVersion protocolVersion = session.getCluster() .getConfiguration().getProtocolOptions().getProtocolVersion(); this.timestampCodec = new TimestampCodec(protocolVersion); @@ -212,15 +216,17 @@ public int compare(List left, List right) { */ @Override public ListenableFuture>> getTraces(final QueryRequest request) { + // Over fetch on indexes as they don't return distinct (trace id, timestamp) rows. + final int traceIndexFetchSize = request.limit * indexFetchMultiplier; ListenableFuture> traceIdToTimestamp; if (request.minDuration != null || request.maxDuration != null) { - traceIdToTimestamp = getTraceIdsByDuration(request); + traceIdToTimestamp = getTraceIdsByDuration(request, traceIndexFetchSize); } else if (request.spanName != null) { traceIdToTimestamp = getTraceIdsBySpanName(request.serviceName, request.spanName, - request.endTs * 1000, request.lookback * 1000, request.limit); + request.endTs * 1000, request.lookback * 1000, traceIndexFetchSize); } else if (request.serviceName != null) { traceIdToTimestamp = getTraceIdsByServiceNames(Collections.singletonList(request.serviceName), - request.endTs * 1000, request.lookback * 1000, request.limit); + request.endTs * 1000, request.lookback * 1000, traceIndexFetchSize); } else { checkArgument(selectTraceIdsByServiceNames != null, "getTraces without serviceName requires Cassandra 2.2 or later"); @@ -228,7 +234,7 @@ public ListenableFuture>> getTraces(final QueryRequest request) new AsyncFunction, Map>() { @Override public ListenableFuture> apply(List serviceNames) { return getTraceIdsByServiceNames(serviceNames, - request.endTs * 1000, request.lookback * 1000, request.limit); + request.endTs * 1000, request.lookback * 1000, traceIndexFetchSize); } }); } @@ -242,18 +248,19 @@ public ListenableFuture>> getTraces(final QueryRequest request) traceIds = Futures.transform(traceIdToTimestamp, CassandraUtil.keyset()); } else { // While a valid port of the scala cassandra span store (from zipkin 1.35), there is a fault. - // each annotation key is an intersection, which means we are likely to return < limit. + // each annotation key is an intersection, meaning we likely return < traceIndexFetchSize. List>> futureKeySetsToIntersect = new ArrayList<>(); futureKeySetsToIntersect.add(traceIdToTimestamp); for (String annotationKey : annotationKeys) { futureKeySetsToIntersect.add(getTraceIdsByAnnotation(annotationKey, - request.endTs * 1000, request.lookback * 1000, request.limit)); + request.endTs * 1000, request.lookback * 1000, traceIndexFetchSize)); } // We achieve the AND goal, by intersecting each of the key sets. traceIds = Futures.transform(allAsList(futureKeySetsToIntersect), CassandraUtil.intersectKeySets()); } return transform(traceIds, new AsyncFunction, List>>() { @Override public ListenableFuture>> apply(Set traceIds) { + traceIds = FluentIterable.from(traceIds).limit(request.limit).toSet(); return transform(getSpansByTraceIds(traceIds, maxTraceCols), AdjustTraces.INSTANCE); } @@ -504,7 +511,8 @@ ListenableFuture> getTraceIdsByAnnotation(String annotationKey, } /** Returns a map of trace id to timestamp (in microseconds) */ - ListenableFuture> getTraceIdsByDuration(QueryRequest request) { + ListenableFuture> getTraceIdsByDuration(QueryRequest request, + int indexFetchSize) { checkArgument(request.serviceName != null, "serviceName required on duration query"); long oldestData = (System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(durationTtl)) * 1000; @@ -520,7 +528,7 @@ ListenableFuture> getTraceIdsByDuration(QueryRequest request) { List>> futures = new ArrayList<>(); for (int i = startBucket; i <= endBucket; i++) { // range closed - futures.add(oneBucketDurationQuery(request, i, startTs, endTs)); + futures.add(oneBucketDurationQuery(request, i, startTs, endTs, indexFetchSize)); } return transform(allAsList(futures), @@ -540,12 +548,11 @@ ListenableFuture> getTraceIdsByDuration(QueryRequest request) { } ListenableFuture> oneBucketDurationQuery(QueryRequest request, int bucket, - final long startTs, final long endTs) { + final long startTs, final long endTs, int indexFetchSize) { String serviceName = request.serviceName; String spanName = spanName(request.spanName); long minDuration = request.minDuration; long maxDuration = request.maxDuration != null ? request.maxDuration : Long.MAX_VALUE; - int limit = request.limit; BoundStatement bound = CassandraUtil.bindWithName(selectTraceIdsBySpanDuration, "select-trace-ids-by-span-duration") .setInt("time_bucket", bucket) @@ -554,10 +561,10 @@ ListenableFuture> oneBucketDurationQuery(QueryRequest request, .setLong("min_duration", minDuration) .setLong("max_duration", maxDuration); - // optimistically setting fetch size to 'limit' here. Since we are likely to filter some results - // because their timestamps are out of range, we may need to fetch again. + // optimistically setting fetch size to 'indexFetchSize' here. Since we are likely to filter + // some results because their timestamps are out of range, we may need to fetch again. // TODO figure out better strategy - bound.setFetchSize(limit); + bound.setFetchSize(indexFetchSize); return transform(session.executeAsync(bound), new Function>() { @Override public List apply(ResultSet rs) { diff --git a/zipkin-storage/cassandra/src/main/java/zipkin/storage/cassandra/CassandraStorage.java b/zipkin-storage/cassandra/src/main/java/zipkin/storage/cassandra/CassandraStorage.java index 8d44a2e4777..96b07cdbc72 100644 --- a/zipkin-storage/cassandra/src/main/java/zipkin/storage/cassandra/CassandraStorage.java +++ b/zipkin-storage/cassandra/src/main/java/zipkin/storage/cassandra/CassandraStorage.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; import zipkin.internal.Nullable; +import zipkin.storage.QueryRequest; import zipkin.storage.guava.LazyGuavaStorageComponent; import static java.lang.String.format; @@ -60,6 +61,7 @@ public static final class Builder { int maxTraceCols = 100000; int indexCacheMax = 100000; int indexCacheTtl = 60; + int indexFetchMultiplier = 3; /** * Used to avoid hot spots when writing indexes used to query by service name or annotation. @@ -195,6 +197,21 @@ public Builder indexCacheTtl(int indexCacheTtl) { return this; } + /** + * How many more index rows to fetch than the user-supplied query limit. Defaults to 3. + * + *

Backend requests will request {@link QueryRequest#limit} times this factor rows from + * Cassandra indexes in attempts to return {@link QueryRequest#limit} traces. + * + *

Indexing in cassandra will usually have more rows than trace identifiers due to factors + * including table design and collection implementation. As there's no way to DISTINCT out + * duplicates server-side, this over-fetches client-side when {@code indexFetchMultiplier} > 1. + */ + public Builder indexFetchMultiplier(int indexFetchMultiplier) { + this.indexFetchMultiplier = indexFetchMultiplier; + return this; + } + public CassandraStorage build() { return new CassandraStorage(this); } @@ -217,6 +234,7 @@ public CassandraStorage build() { final boolean ensureSchema; final String keyspace; final CacheBuilderSpec indexCacheSpec; + final int indexFetchMultiplier; final LazySession session; CassandraStorage(Builder builder) { @@ -236,6 +254,7 @@ public CassandraStorage build() { ? null : CacheBuilderSpec.parse("maximumSize=" + builder.indexCacheMax + ",expireAfterWrite=" + builder.indexCacheTtl + "s"); + this.indexFetchMultiplier = builder.indexFetchMultiplier; } /** Lazy initializes or returns the session in use by this storage component. */ @@ -244,7 +263,8 @@ public Session session() { } @Override protected CassandraSpanStore computeGuavaSpanStore() { - return new CassandraSpanStore(session.get(), bucketCount, indexTtl, maxTraceCols); + return new CassandraSpanStore(session.get(), bucketCount, indexTtl, maxTraceCols, + indexFetchMultiplier); } @Override protected CassandraSpanConsumer computeGuavaSpanConsumer() { diff --git a/zipkin-storage/cassandra/src/main/resources/cassandra-schema-cql3.txt b/zipkin-storage/cassandra/src/main/resources/cassandra-schema-cql3.txt index 48687e8fb6c..655bb9e4944 100644 --- a/zipkin-storage/cassandra/src/main/resources/cassandra-schema-cql3.txt +++ b/zipkin-storage/cassandra/src/main/resources/cassandra-schema-cql3.txt @@ -1,21 +1,21 @@ CREATE KEYSPACE IF NOT EXISTS zipkin WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; CREATE TABLE IF NOT EXISTS zipkin.service_span_name_index ( - service_span_name text, - ts timestamp, - trace_id bigint, - PRIMARY KEY (service_span_name, ts) + service_span_name text, // Endpoint.serviceName + "." + Span.name + ts timestamp, // start timestamp of the span, truncated to millisecond precision + trace_id bigint, // trace ID. Included as a clustering column to avoid clashes (however unlikely) + PRIMARY KEY (service_span_name, ts, trace_id) ) WITH CLUSTERING ORDER BY (ts DESC) AND compaction = {'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy', 'max_window_size_seconds': '86400'} AND default_time_to_live = 259200; CREATE TABLE IF NOT EXISTS zipkin.service_name_index ( - service_name text, - bucket int, - ts timestamp, - trace_id bigint, - PRIMARY KEY ((service_name, bucket), ts) + service_name text, // Endpoint.serviceName + bucket int, // avoids hot spots by distributing writes across each bucket, usually 0-9 + ts timestamp, // start timestamp of the span, truncated to millisecond precision + trace_id bigint, // trace ID. Included as a clustering column to avoid clashes (however unlikely) + PRIMARY KEY ((service_name, bucket), ts, trace_id) ) WITH CLUSTERING ORDER BY (ts DESC) AND compaction = {'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy', 'max_window_size_seconds': '86400'} @@ -31,11 +31,11 @@ CREATE TABLE IF NOT EXISTS zipkin.span_names ( AND default_time_to_live = 259200; CREATE TABLE IF NOT EXISTS zipkin.annotations_index ( - annotation blob, - bucket int, - ts timestamp, - trace_id bigint, - PRIMARY KEY ((annotation, bucket), ts) + annotation blob, // Annotation.value or BinaryAnnotation.key + bucket int, // avoids hot spots by distributing writes across each bucket, usually 0-9 + ts timestamp, // start timestamp of the span, truncated to millisecond precision + trace_id bigint, // trace ID. Included as a clustering column to avoid clashes (however unlikely) + PRIMARY KEY ((annotation, bucket), ts, trace_id) ) WITH CLUSTERING ORDER BY (ts DESC) AND compaction = {'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy', 'max_window_size_seconds': '86400'} diff --git a/zipkin-storage/cassandra/src/test/java/zipkin/storage/cassandra/CassandraSpanStoreTest.java b/zipkin-storage/cassandra/src/test/java/zipkin/storage/cassandra/CassandraSpanStoreTest.java index 6def6bd60fe..f08c4dc6a2f 100644 --- a/zipkin-storage/cassandra/src/test/java/zipkin/storage/cassandra/CassandraSpanStoreTest.java +++ b/zipkin-storage/cassandra/src/test/java/zipkin/storage/cassandra/CassandraSpanStoreTest.java @@ -13,14 +13,19 @@ */ package zipkin.storage.cassandra; +import java.util.ArrayList; +import java.util.List; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import zipkin.Annotation; import zipkin.Span; import zipkin.TestObjects; import zipkin.internal.ApplyTimestampAndDuration; +import zipkin.storage.QueryRequest; import zipkin.storage.SpanStoreTest; +import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; public class CassandraSpanStoreTest extends SpanStoreTest { @@ -60,14 +65,38 @@ public void rawTraceStoredWithoutAdjustments() { .containsExactly(rawSpan); } - /** - * The PRIMARY KEY of {@link Tables#SERVICE_NAME_INDEX} doesn't consider trace_id, so will only - * see bucket count traces to a service per millisecond. - */ - @Override public void getTraces_manyTraces() { - thrown.expect(AssertionError.class); - thrown.expectMessage("Expected size:<1000> but was:<10>"); + @Test + public void overFetchesToCompensateForDuplicateIndexData() { + int traceCount = 100; + + List spans = new ArrayList<>(); + for (int i = 0; i < traceCount; i++) { + final long delta = i * 1000; // all timestamps happen a millisecond later + for (Span s : TestObjects.TRACE) { + spans.add(TestObjects.TRACE.get(0).toBuilder() + .traceId(s.traceId + i * 10) + .id(s.id + i * 10) + .timestamp(s.timestamp + delta) + .annotations(s.annotations.stream() + .map(a -> Annotation.create(a.timestamp + delta, a.value, a.endpoint)) + .collect(toList())) + .build()); + } + } + + accept(spans.toArray(new Span[0])); + + // Index ends up containing more rows than services * trace count, and cannot be de-duped + // in a server-side query. + assertThat(rowCount(Tables.SERVICE_NAME_INDEX)) + .isGreaterThan(traceCount * store().getServiceNames().size()); + + // Implementation over-fetches on the index to allow the user to receive unsurprising results. + assertThat(store().getTraces(QueryRequest.builder().limit(traceCount).build())) + .hasSize(traceCount); + } - super.getTraces_manyTraces(); + long rowCount(String table) { + return storage.session().execute("SELECT COUNT(*) from " + table).one().getLong(0); } } diff --git a/zipkin-storage/cassandra/src/test/java/zipkin/storage/cassandra/CassandraWithOriginalSchemaSpanStoreTest.java b/zipkin-storage/cassandra/src/test/java/zipkin/storage/cassandra/CassandraWithOriginalSchemaSpanStoreTest.java index 3e8f8fe1557..3cf7b0ff7b7 100644 --- a/zipkin-storage/cassandra/src/test/java/zipkin/storage/cassandra/CassandraWithOriginalSchemaSpanStoreTest.java +++ b/zipkin-storage/cassandra/src/test/java/zipkin/storage/cassandra/CassandraWithOriginalSchemaSpanStoreTest.java @@ -18,4 +18,15 @@ public class CassandraWithOriginalSchemaSpanStoreTest extends CassandraSpanStore public CassandraWithOriginalSchemaSpanStoreTest() { super(CassandraWithOriginalSchemaTestGraph.INSTANCE.storage.get()); } + + /** + * The PRIMARY KEY of {@link Tables#SERVICE_NAME_INDEX} doesn't consider trace_id, so will only + * see bucket count traces to a service per millisecond. + */ + @Override public void getTraces_manyTraces() { + thrown.expect(AssertionError.class); + thrown.expectMessage("Expected size:<1000> but was:<10>"); + + super.getTraces_manyTraces(); + } }