Skip to content

Commit

Permalink
Use span name table instead of materialized view
Browse files Browse the repository at this point in the history
- This switches span name and service name lookups to use a table instead of a materialized view.  This change will fix #1360
- Change limit logic to limit on a set of trace IDs instead of limiting on the provided collection first. In pracitce this didn't make a noticiable difference in the results but it seems like the intended logic.
  • Loading branch information
llinder committed Nov 2, 2016
1 parent ce7d386 commit 274c0be
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ final class CassandraSpanConsumer implements GuavaSpanConsumer {

private final Session session;
private final PreparedStatement insertSpan;
private final PreparedStatement insertTraceServiceSpanName;
private final PreparedStatement insertServiceSpanName;
private final Schema.Metadata metadata;

Expand All @@ -70,7 +71,7 @@ final class CassandraSpanConsumer implements GuavaSpanConsumer {
.value("binary_annotations", QueryBuilder.bindMarker("binary_annotations"))
.value("all_annotations", QueryBuilder.bindMarker("all_annotations")));

insertServiceSpanName = session.prepare(
insertTraceServiceSpanName = session.prepare(
QueryBuilder
.insertInto(Schema.TABLE_TRACE_BY_SERVICE_SPAN)
.value("service_name", QueryBuilder.bindMarker("service_name"))
Expand All @@ -79,6 +80,12 @@ final class CassandraSpanConsumer implements GuavaSpanConsumer {
.value("ts", QueryBuilder.bindMarker("ts"))
.value("trace_id", QueryBuilder.bindMarker("trace_id"))
.value("duration", QueryBuilder.bindMarker("duration")));

insertServiceSpanName = session.prepare(
QueryBuilder
.insertInto(Schema.TABLE_SERVICE_SPANS)
.value("service_name", QueryBuilder.bindMarker("service_name"))
.value("span_name", QueryBuilder.bindMarker("span_name")));
}

/**
Expand All @@ -98,12 +105,13 @@ public ListenableFuture<Void> accept(List<Span> rawSpans) {
for (String serviceName : span.serviceNames()) {
// QueryRequest.min/maxDuration
if (timestamp != null) {
// Contract for Repository.storeServiceSpanName is to store the span twice, once with
// Contract for Repository.storeTraceServiceSpanName is to store the span twice, once with
// the span name and another with empty string.
futures.add(storeServiceSpanName(serviceName, span.name, timestamp, span.duration,
futures.add(storeTraceServiceSpanName(serviceName, span.name, timestamp, span.duration,
traceId));
if (!span.name.isEmpty()) { // If span.name == "", this would be redundant
futures.add(storeServiceSpanName(serviceName, "", timestamp, span.duration, traceId));
futures.add(storeTraceServiceSpanName(serviceName, "", timestamp, span.duration, traceId));
futures.add(storeServiceSpanName(serviceName, span.name));
}
}
}
Expand Down Expand Up @@ -162,7 +170,7 @@ ListenableFuture<?> storeSpan(Span span, BigInteger traceId, Long timestamp) {
}
}

ListenableFuture<?> storeServiceSpanName(
ListenableFuture<?> storeTraceServiceSpanName(
String serviceName,
String spanName,
long timestamp_micro,
Expand All @@ -172,7 +180,7 @@ ListenableFuture<?> storeServiceSpanName(
int bucket = durationIndexBucket(timestamp_micro);
try {
BoundStatement bound =
bindWithName(insertServiceSpanName, "insert-service-span-name")
bindWithName(insertTraceServiceSpanName, "insert-service-span-name")
.setString("service_name", serviceName)
.setString("span_name", spanName)
.setInt("bucket", bucket)
Expand All @@ -190,4 +198,25 @@ ListenableFuture<?> storeServiceSpanName(
return Futures.immediateFailedFuture(ex);
}
}

ListenableFuture<?> storeServiceSpanName(
String serviceName,
String spanName
) {
try {
BoundStatement bound = bindWithName(insertServiceSpanName, "insert-service-span-name")
.setString("service_name", serviceName)
.setString("span_name", spanName);
return session.executeAsync(bound);
} catch (RuntimeException ex) {
return Futures.immediateFailedFuture(ex);
}
}

static BigInteger traceId(Span span) {
return span.traceIdHigh != 0
? BigInteger.valueOf(span.traceIdHigh).shiftLeft(64).or(BigInteger.valueOf(span.traceId))
: BigInteger.valueOf(span.traceId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import com.datastax.driver.core.utils.UUIDs;
import com.google.common.base.Function;
import com.google.common.collect.ContiguousSet;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.google.common.collect.Ordering;
import com.google.common.collect.Range;
import com.google.common.util.concurrent.AsyncFunction;
Expand Down Expand Up @@ -67,6 +67,7 @@
import static zipkin.internal.Util.getDays;
import static zipkin.storage.cassandra3.Schema.TABLE_TRACES;
import static zipkin.storage.cassandra3.Schema.TABLE_TRACE_BY_SERVICE_SPAN;
import static zipkin.storage.cassandra3.Schema.TABLE_SERVICE_SPANS;

final class CassandraSpanStore implements GuavaSpanStore {
private static final Logger LOG = LoggerFactory.getLogger(CassandraSpanStore.class);
Expand Down Expand Up @@ -116,11 +117,11 @@ public int compare(List<Span> left, List<Span> right) {
selectServiceNames = session.prepare(
QueryBuilder.select("service_name")
.distinct()
.from(Schema.VIEW_TRACE_BY_SERVICE));
.from(TABLE_SERVICE_SPANS));

selectSpanNames = session.prepare(
QueryBuilder.select("span_name")
.from(Schema.VIEW_TRACE_BY_SERVICE)
.from(TABLE_SERVICE_SPANS)
.where(QueryBuilder.eq("service_name", QueryBuilder.bindMarker("service_name")))
.limit(QueryBuilder.bindMarker("limit_")));

Expand Down Expand Up @@ -223,8 +224,9 @@ public ListenableFuture<List<List<Span>>> getTraces(final QueryRequest request)
}
return transform(traceIds, new AsyncFunction<Collection<BigInteger>, List<List<Span>>>() {
@Override public ListenableFuture<List<List<Span>>> apply(Collection<BigInteger> traceIds) {
traceIds = FluentIterable.from(traceIds).limit(request.limit).toSet();
return transform(getSpansByTraceIds(ImmutableSet.copyOf(traceIds), maxTraceCols), AdjustTraces.INSTANCE);
ImmutableSet<BigInteger> set = ImmutableSet.copyOf(traceIds);
set = ImmutableSet.copyOf(Iterators.limit(set.iterator(), request.limit));
return transform(getSpansByTraceIds(set, maxTraceCols), AdjustTraces.INSTANCE);
}

@Override public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ final class Schema {

static final String TABLE_TRACES = "traces";
static final String TABLE_TRACE_BY_SERVICE_SPAN = "trace_by_service_span";
static final String TABLE_SERVICE_SPANS = "span_name_by_service";
static final String TABLE_DEPENDENCIES = "dependencies";
static final String VIEW_TRACE_BY_SERVICE = "trace_by_service";

static final String DEFAULT_KEYSPACE = "zipkin3";
private static final String SCHEMA_RESOURCE = "/cassandra3-schema.cql";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,13 @@ CREATE TABLE zipkin3.trace_by_service_span (
AND compaction = {'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'}
AND default_time_to_live = 259200;
CREATE MATERIALIZED VIEW IF NOT EXISTS zipkin3.trace_by_service
AS SELECT service_name, span_name, bucket, ts, trace_id FROM trace_by_service_span
WHERE service_name is not null AND span_name is not null AND bucket is not null AND ts is not null
PRIMARY KEY (service_name, bucket, span_name, ts)
WITH CLUSTERING ORDER BY (bucket DESC, ts DESC)
AND compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'unchecked_tombstone_compaction': 'true', 'tombstone_threshold': '0.2'}
AND default_time_to_live = 259200;
CREATE TABLE zipkin3.span_name_by_service (
service_name text,
span_name text,
PRIMARY KEY (service_name, span_name)
)
WITH compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'unchecked_tombstone_compaction': 'true', 'tombstone_threshold': '0.2'}
AND default_time_to_live = 259200;
CREATE CUSTOM INDEX ON zipkin3.traces (all_annotations) USING 'org.apache.cassandra.index.sasi.SASIIndex'
WITH OPTIONS = {
Expand Down

0 comments on commit 274c0be

Please sign in to comment.