Skip to content

Commit

Permalink
Use span name table instead of materialized view
Browse files Browse the repository at this point in the history
- This switches span name and service name lookups to use a table instead of a materialized view.  This change will fix #1360
- Change limit logic to limit on a set of trace IDs instead of limiting on the provided collection first. In pracitce this didn't make a noticiable difference in the results but it seems like the intended logic.
  • Loading branch information
llinder committed Nov 3, 2016
1 parent 4aa179d commit e57d95b
Show file tree
Hide file tree
Showing 6 changed files with 495 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,29 @@
import zipkin.Annotation;
import zipkin.BinaryAnnotation;
import zipkin.Span;
import zipkin.internal.Pair;
import zipkin.storage.cassandra3.Schema.AnnotationUDT;
import zipkin.storage.cassandra3.Schema.BinaryAnnotationUDT;
import zipkin.storage.guava.GuavaSpanConsumer;

import static com.google.common.util.concurrent.Futures.transform;
import static zipkin.internal.ApplyTimestampAndDuration.guessTimestamp;
import static zipkin.internal.Util.checkNotNull;
import static zipkin.storage.cassandra3.CassandraUtil.bindWithName;
import static zipkin.storage.cassandra3.CassandraUtil.durationIndexBucket;

final class CassandraSpanConsumer implements GuavaSpanConsumer {
private static final Logger LOG = LoggerFactory.getLogger(CassandraSpanConsumer.class);
private static final Function<Object, Void> TO_VOID = Functions.<Void>constant(null);
private static final long WRITTEN_NAMES_TTL
= Long.getLong("zipkin.store.cassandra3.internal.writtenNamesTtl", 60 * 60 * 1000);

private final Session session;
private final PreparedStatement insertSpan;
private final PreparedStatement insertTraceServiceSpanName;
private final PreparedStatement insertServiceSpanName;
private final Schema.Metadata metadata;
private final DeduplicatingExecutor deduplicatingExecutor;

CassandraSpanConsumer(Session session) {
this.session = session;
Expand All @@ -70,7 +76,7 @@ final class CassandraSpanConsumer implements GuavaSpanConsumer {
.value("binary_annotations", QueryBuilder.bindMarker("binary_annotations"))
.value("all_annotations", QueryBuilder.bindMarker("all_annotations")));

insertServiceSpanName = session.prepare(
insertTraceServiceSpanName = session.prepare(
QueryBuilder
.insertInto(Schema.TABLE_TRACE_BY_SERVICE_SPAN)
.value("service_name", QueryBuilder.bindMarker("service_name"))
Expand All @@ -79,6 +85,14 @@ final class CassandraSpanConsumer implements GuavaSpanConsumer {
.value("ts", QueryBuilder.bindMarker("ts"))
.value("trace_id", QueryBuilder.bindMarker("trace_id"))
.value("duration", QueryBuilder.bindMarker("duration")));

insertServiceSpanName = session.prepare(
QueryBuilder
.insertInto(Schema.TABLE_SERVICE_SPANS)
.value("service_name", QueryBuilder.bindMarker("service_name"))
.value("span_name", QueryBuilder.bindMarker("span_name")));

deduplicatingExecutor = new DeduplicatingExecutor(session, WRITTEN_NAMES_TTL);
}

/**
Expand All @@ -98,12 +112,13 @@ public ListenableFuture<Void> accept(List<Span> rawSpans) {
for (String serviceName : span.serviceNames()) {
// QueryRequest.min/maxDuration
if (timestamp != null) {
// Contract for Repository.storeServiceSpanName is to store the span twice, once with
// Contract for Repository.storeTraceServiceSpanName is to store the span twice, once with
// the span name and another with empty string.
futures.add(storeServiceSpanName(serviceName, span.name, timestamp, span.duration,
futures.add(storeTraceServiceSpanName(serviceName, span.name, timestamp, span.duration,
traceId));
if (!span.name.isEmpty()) { // If span.name == "", this would be redundant
futures.add(storeServiceSpanName(serviceName, "", timestamp, span.duration, traceId));
futures.add(storeTraceServiceSpanName(serviceName, "", timestamp, span.duration, traceId));
futures.add(storeServiceSpanName(serviceName, span.name));
}
}
}
Expand Down Expand Up @@ -162,32 +177,106 @@ ListenableFuture<?> storeSpan(Span span, BigInteger traceId, Long timestamp) {
}
}

ListenableFuture<?> storeServiceSpanName(
ListenableFuture<?> storeTraceServiceSpanName(
String serviceName,
String spanName,
long timestamp_micro,
Long duration,
BigInteger traceId) {

int bucket = durationIndexBucket(timestamp_micro);
UUID ts = new UUID(
UUIDs.startOf(timestamp_micro / 1000).getMostSignificantBits(),
UUIDs.random().getLeastSignificantBits());
try {
BoundStatement bound =
bindWithName(insertServiceSpanName, "insert-service-span-name")
bindWithName(insertTraceServiceSpanName, "insert-service-span-name")
.setString("service_name", serviceName)
.setString("span_name", spanName)
.setInt("bucket", bucket)
.setUUID("ts", new UUID(
UUIDs.startOf(timestamp_micro / 1000).getMostSignificantBits(),
UUIDs.random().getLeastSignificantBits()))
.setUUID("ts", ts)
.setVarint("trace_id", traceId);

if (null != duration) {
bound = bound.setLong("duration", duration);
}

return session.executeAsync(bound);
return deduplicatingExecutor.maybeExecuteAsync(bound,
new TraceServiceSpanNameKey(serviceName, spanName, bucket, ts));

} catch (RuntimeException ex) {
return Futures.immediateFailedFuture(ex);
}
}

ListenableFuture<?> storeServiceSpanName(
String serviceName,
String spanName
) {
try {
BoundStatement bound = bindWithName(insertServiceSpanName, "insert-service-span-name")
.setString("service_name", serviceName)
.setString("span_name", spanName);

return deduplicatingExecutor.maybeExecuteAsync(bound, Pair.create(serviceName, spanName));
} catch (RuntimeException ex) {
return Futures.immediateFailedFuture(ex);
}
}

static BigInteger traceId(Span span) {
return span.traceIdHigh != 0
? BigInteger.valueOf(span.traceIdHigh).shiftLeft(64).or(BigInteger.valueOf(span.traceId))
: BigInteger.valueOf(span.traceId);
}

final class TraceServiceSpanNameKey {

final String serviceName;
final String spanName;
final int bucket;
final UUID ts;

TraceServiceSpanNameKey(String serviceName, String spanName, int bucket, UUID ts) {
this.serviceName = serviceName;
this.spanName = spanName;
this.bucket = bucket;
this.ts = ts;
}

@Override
public String toString() {
return "(" + serviceName + ", " + spanName + ", " + bucket + ", " + ts + " )";
}

@Override
public boolean equals(Object o) {
if (o == this) {
return true;
}
if (o instanceof TraceServiceSpanNameKey) {
TraceServiceSpanNameKey that = (TraceServiceSpanNameKey) o;
return this.serviceName.equals(that.serviceName) &&
this.spanName.equals(that.spanName) &&
this.bucket == that.bucket &&
this.ts.equals(that.ts);
}
return false;
}

@Override
public int hashCode() {
int h = 1;
h *= 1000003;
h ^= this.serviceName.hashCode();
h *= 1000003;
h ^= this.spanName.hashCode();
h *= 1000003;
h ^= this.bucket;
h *= 1000003;
h ^= this.ts.hashCode();
return h;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import com.datastax.driver.core.utils.UUIDs;
import com.google.common.base.Function;
import com.google.common.collect.ContiguousSet;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.google.common.collect.Ordering;
import com.google.common.collect.Range;
import com.google.common.util.concurrent.AsyncFunction;
Expand Down Expand Up @@ -67,6 +67,7 @@
import static zipkin.internal.Util.getDays;
import static zipkin.storage.cassandra3.Schema.TABLE_TRACES;
import static zipkin.storage.cassandra3.Schema.TABLE_TRACE_BY_SERVICE_SPAN;
import static zipkin.storage.cassandra3.Schema.TABLE_SERVICE_SPANS;

final class CassandraSpanStore implements GuavaSpanStore {
private static final Logger LOG = LoggerFactory.getLogger(CassandraSpanStore.class);
Expand Down Expand Up @@ -116,11 +117,11 @@ public int compare(List<Span> left, List<Span> right) {
selectServiceNames = session.prepare(
QueryBuilder.select("service_name")
.distinct()
.from(Schema.VIEW_TRACE_BY_SERVICE));
.from(TABLE_SERVICE_SPANS));

selectSpanNames = session.prepare(
QueryBuilder.select("span_name")
.from(Schema.VIEW_TRACE_BY_SERVICE)
.from(TABLE_SERVICE_SPANS)
.where(QueryBuilder.eq("service_name", QueryBuilder.bindMarker("service_name")))
.limit(QueryBuilder.bindMarker("limit_")));

Expand Down Expand Up @@ -223,8 +224,9 @@ public ListenableFuture<List<List<Span>>> getTraces(final QueryRequest request)
}
return transform(traceIds, new AsyncFunction<Collection<BigInteger>, List<List<Span>>>() {
@Override public ListenableFuture<List<List<Span>>> apply(Collection<BigInteger> traceIds) {
traceIds = FluentIterable.from(traceIds).limit(request.limit).toSet();
return transform(getSpansByTraceIds(ImmutableSet.copyOf(traceIds), maxTraceCols), AdjustTraces.INSTANCE);
ImmutableSet<BigInteger> set = ImmutableSet.copyOf(traceIds);
set = ImmutableSet.copyOf(Iterators.limit(set.iterator(), request.limit));
return transform(getSpansByTraceIds(set, maxTraceCols), AdjustTraces.INSTANCE);
}

@Override public String toString() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin.storage.cassandra3;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Session;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Ticker;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.UncheckedExecutionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static zipkin.internal.Util.checkNotNull;

/**
* This reduces load on cassandra by preventing semantically equivalent requests from being invoked,
* subject to a local TTL.
*
* <p>Ex. If you want to test that you don't repeatedly send bad data, you could send a 400 back.
*
* <pre>{@code
* ttl = 60 * 1000; // 1 minute
* deduper = new DeduplicatingExecutor(session, ttl);
*
* // the result of the first execution against "foo" is returned to other callers
* // until it expires a minute later.
* deduper.maybeExecute(bound, "foo");
* deduper.maybeExecute(bound, "foo");
* }</pre>
*/
class DeduplicatingExecutor { // not final for testing

private final Session session;
private final LoadingCache<BoundStatementKey, ListenableFuture<Void>> cache;

/**
* @param session which conditionally executes bound statements
* @param ttl how long the results of statements are remembered, in milliseconds.
*/
DeduplicatingExecutor(Session session, long ttl) {
this.session = session;
this.cache = CacheBuilder.newBuilder()
.expireAfterWrite(ttl, TimeUnit.MILLISECONDS)
.ticker(new Ticker() {
@Override public long read() {
return nanoTime();
}
})
// TODO: maximum size or weight
.build(new CacheLoader<BoundStatementKey, ListenableFuture<Void>>() {
@Override public ListenableFuture<Void> load(final BoundStatementKey key) {
ListenableFuture<?> cassandraFuture = executeAsync(key.statement);

// Drop the cassandra future so that we don't hold references to cassandra state for
// long periods of time.
final SettableFuture<Void> disconnectedFuture = SettableFuture.create();
Futures.addCallback(cassandraFuture, new FutureCallback<Object>() {

@Override public void onSuccess(Object result) {
disconnectedFuture.set(null);
}

@Override public void onFailure(Throwable t) {
cache.invalidate(key);
disconnectedFuture.setException(t);
}
});
return disconnectedFuture;
}
});
}

/**
* Upon success, the statement's result will be remembered and returned for all subsequent
* executions with the same key, subject to a local TTL.
*
* <p>The results of failed statements are forgotten based on the supplied key.
*
* @param statement what to conditionally execute
* @param key determines equivalence of the bound statement
* @return future of work initiated by this or a previous request
*/
ListenableFuture<Void> maybeExecuteAsync(BoundStatement statement, Object key) {
BoundStatementKey cacheKey = new BoundStatementKey(statement, key);
try {
ListenableFuture<Void> result = cache.get(new BoundStatementKey(statement, key));
// A future could be constructed directly (i.e. immediate future), get the value to
// see if it was exceptional. If so, the catch block will invalidate that key.
if (result.isDone()) result.get();
return result;
} catch (UncheckedExecutionException | ExecutionException e) {
cache.invalidate(cacheKey);
return Futures.immediateFailedFuture(e.getCause());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError();
}
}

// visible for testing, since nanoTime is weird and can return negative
long nanoTime() {
return System.nanoTime();
}

@VisibleForTesting ListenableFuture<?> executeAsync(BoundStatement statement) {
return session.executeAsync(statement);
}

@VisibleForTesting void clear() {
cache.invalidateAll();
}

/** Used to hold a reference to the last statement executed, but without using it in hashCode */
static final class BoundStatementKey {
final BoundStatement statement;
final Object key;

BoundStatementKey(BoundStatement statement, Object key) {
this.statement = checkNotNull(statement, "statement");
this.key = checkNotNull(key, "key");
}

@Override
public String toString() {
return "(" + key + ", " + statement + ")";
}

@Override
public boolean equals(Object o) {
if (o == this) return true;
if (o instanceof BoundStatementKey) {
return this.key.equals(((BoundStatementKey) o).key);
}
return false;
}

@Override
public int hashCode() {
return key.hashCode();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ final class Schema {

static final String TABLE_TRACES = "traces";
static final String TABLE_TRACE_BY_SERVICE_SPAN = "trace_by_service_span";
static final String TABLE_SERVICE_SPANS = "span_name_by_service";
static final String TABLE_DEPENDENCIES = "dependencies";
static final String VIEW_TRACE_BY_SERVICE = "trace_by_service";

static final String DEFAULT_KEYSPACE = "zipkin3";
private static final String SCHEMA_RESOURCE = "/cassandra3-schema.cql";
Expand Down
Loading

0 comments on commit e57d95b

Please sign in to comment.