diff --git a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanConsumer.java b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanConsumer.java index 17b8e55457a..afe794dc5d6 100644 --- a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanConsumer.java +++ b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanConsumer.java @@ -49,6 +49,7 @@ final class CassandraSpanConsumer implements GuavaSpanConsumer { private final Session session; private final PreparedStatement insertSpan; + private final PreparedStatement insertTraceServiceSpanName; private final PreparedStatement insertServiceSpanName; private final Schema.Metadata metadata; @@ -70,7 +71,7 @@ final class CassandraSpanConsumer implements GuavaSpanConsumer { .value("binary_annotations", QueryBuilder.bindMarker("binary_annotations")) .value("all_annotations", QueryBuilder.bindMarker("all_annotations"))); - insertServiceSpanName = session.prepare( + insertTraceServiceSpanName = session.prepare( QueryBuilder .insertInto(Schema.TABLE_TRACE_BY_SERVICE_SPAN) .value("service_name", QueryBuilder.bindMarker("service_name")) @@ -79,6 +80,12 @@ final class CassandraSpanConsumer implements GuavaSpanConsumer { .value("ts", QueryBuilder.bindMarker("ts")) .value("trace_id", QueryBuilder.bindMarker("trace_id")) .value("duration", QueryBuilder.bindMarker("duration"))); + + insertServiceSpanName = session.prepare( + QueryBuilder + .insertInto(Schema.TABLE_SERVICE_SPANS) + .value("service_name", QueryBuilder.bindMarker("service_name")) + .value("span_name", QueryBuilder.bindMarker("span_name"))); } /** @@ -98,12 +105,13 @@ public ListenableFuture accept(List rawSpans) { for (String serviceName : span.serviceNames()) { // QueryRequest.min/maxDuration if (timestamp != null) { - // Contract for Repository.storeServiceSpanName is to store the span twice, once with + // Contract for Repository.storeTraceServiceSpanName is to store the span twice, once with // the span name and another with empty string. - futures.add(storeServiceSpanName(serviceName, span.name, timestamp, span.duration, + futures.add(storeTraceServiceSpanName(serviceName, span.name, timestamp, span.duration, traceId)); if (!span.name.isEmpty()) { // If span.name == "", this would be redundant - futures.add(storeServiceSpanName(serviceName, "", timestamp, span.duration, traceId)); + futures.add(storeTraceServiceSpanName(serviceName, "", timestamp, span.duration, traceId)); + futures.add(storeServiceSpanName(serviceName, span.name)); } } } @@ -162,7 +170,7 @@ ListenableFuture storeSpan(Span span, BigInteger traceId, Long timestamp) { } } - ListenableFuture storeServiceSpanName( + ListenableFuture storeTraceServiceSpanName( String serviceName, String spanName, long timestamp_micro, @@ -172,7 +180,7 @@ ListenableFuture storeServiceSpanName( int bucket = durationIndexBucket(timestamp_micro); try { BoundStatement bound = - bindWithName(insertServiceSpanName, "insert-service-span-name") + bindWithName(insertTraceServiceSpanName, "insert-service-span-name") .setString("service_name", serviceName) .setString("span_name", spanName) .setInt("bucket", bucket) @@ -191,6 +199,20 @@ ListenableFuture storeServiceSpanName( } } + ListenableFuture storeServiceSpanName( + String serviceName, + String spanName + ) { + try { + BoundStatement bound = bindWithName(insertServiceSpanName, "insert-service-span-name") + .setString("service_name", serviceName) + .setString("span_name", spanName); + return session.executeAsync(bound); + } catch (RuntimeException ex) { + return Futures.immediateFailedFuture(ex); + } + } + static BigInteger traceId(Span span) { return span.traceIdHigh != 0 ? BigInteger.valueOf(span.traceIdHigh).shiftLeft(64).or(BigInteger.valueOf(span.traceId)) diff --git a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanStore.java b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanStore.java index b0ebf42b3b7..3e9c6c85673 100644 --- a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanStore.java +++ b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanStore.java @@ -23,9 +23,9 @@ import com.datastax.driver.core.utils.UUIDs; import com.google.common.base.Function; import com.google.common.collect.ContiguousSet; -import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterators; import com.google.common.collect.Ordering; import com.google.common.collect.Range; import com.google.common.util.concurrent.AsyncFunction; @@ -67,6 +67,7 @@ import static zipkin.internal.Util.getDays; import static zipkin.storage.cassandra3.Schema.TABLE_TRACES; import static zipkin.storage.cassandra3.Schema.TABLE_TRACE_BY_SERVICE_SPAN; +import static zipkin.storage.cassandra3.Schema.TABLE_SERVICE_SPANS; final class CassandraSpanStore implements GuavaSpanStore { private static final Logger LOG = LoggerFactory.getLogger(CassandraSpanStore.class); @@ -116,11 +117,11 @@ public int compare(List left, List right) { selectServiceNames = session.prepare( QueryBuilder.select("service_name") .distinct() - .from(Schema.VIEW_TRACE_BY_SERVICE)); + .from(TABLE_SERVICE_SPANS)); selectSpanNames = session.prepare( QueryBuilder.select("span_name") - .from(Schema.VIEW_TRACE_BY_SERVICE) + .from(TABLE_SERVICE_SPANS) .where(QueryBuilder.eq("service_name", QueryBuilder.bindMarker("service_name"))) .limit(QueryBuilder.bindMarker("limit_"))); @@ -223,8 +224,9 @@ public ListenableFuture>> getTraces(final QueryRequest request) } return transform(traceIds, new AsyncFunction, List>>() { @Override public ListenableFuture>> apply(Collection traceIds) { - traceIds = FluentIterable.from(traceIds).limit(request.limit).toSet(); - return transform(getSpansByTraceIds(ImmutableSet.copyOf(traceIds), maxTraceCols), AdjustTraces.INSTANCE); + ImmutableSet set = ImmutableSet.copyOf(traceIds); + set = ImmutableSet.copyOf(Iterators.limit(set.iterator(), request.limit)); + return transform(getSpansByTraceIds(set, maxTraceCols), AdjustTraces.INSTANCE); } @Override public String toString() { diff --git a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/Schema.java b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/Schema.java index 512330b4933..35974932079 100644 --- a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/Schema.java +++ b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/Schema.java @@ -42,8 +42,8 @@ final class Schema { static final String TABLE_TRACES = "traces"; static final String TABLE_TRACE_BY_SERVICE_SPAN = "trace_by_service_span"; + static final String TABLE_SERVICE_SPANS = "span_name_by_service"; static final String TABLE_DEPENDENCIES = "dependencies"; - static final String VIEW_TRACE_BY_SERVICE = "trace_by_service"; static final String DEFAULT_KEYSPACE = "zipkin3"; private static final String SCHEMA_RESOURCE = "/cassandra3-schema.cql"; diff --git a/zipkin-storage/cassandra3/src/main/resources/cassandra3-schema.cql b/zipkin-storage/cassandra3/src/main/resources/cassandra3-schema.cql index 3370e249016..c24ef1ed948 100644 --- a/zipkin-storage/cassandra3/src/main/resources/cassandra3-schema.cql +++ b/zipkin-storage/cassandra3/src/main/resources/cassandra3-schema.cql @@ -51,14 +51,13 @@ CREATE TABLE zipkin3.trace_by_service_span ( AND compaction = {'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'} AND default_time_to_live = 259200; - -CREATE MATERIALIZED VIEW IF NOT EXISTS zipkin3.trace_by_service - AS SELECT service_name, span_name, bucket, ts, trace_id FROM trace_by_service_span - WHERE service_name is not null AND span_name is not null AND bucket is not null AND ts is not null - PRIMARY KEY (service_name, bucket, span_name, ts) - WITH CLUSTERING ORDER BY (bucket DESC, ts DESC) - AND compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'unchecked_tombstone_compaction': 'true', 'tombstone_threshold': '0.2'} - AND default_time_to_live = 259200; +CREATE TABLE zipkin3.span_name_by_service ( + service_name text, + span_name text, + PRIMARY KEY (service_name, span_name) +) + WITH compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'unchecked_tombstone_compaction': 'true', 'tombstone_threshold': '0.2'} + AND default_time_to_live = 259200; CREATE CUSTOM INDEX ON zipkin3.traces (all_annotations) USING 'org.apache.cassandra.index.sasi.SASIIndex' WITH OPTIONS = {