Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use span name table instead of materialized view #1374

Merged
merged 1 commit into from
Nov 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,29 @@
import zipkin.Annotation;
import zipkin.BinaryAnnotation;
import zipkin.Span;
import zipkin.internal.Pair;
import zipkin.storage.cassandra3.Schema.AnnotationUDT;
import zipkin.storage.cassandra3.Schema.BinaryAnnotationUDT;
import zipkin.storage.guava.GuavaSpanConsumer;

import static com.google.common.util.concurrent.Futures.transform;
import static zipkin.internal.ApplyTimestampAndDuration.guessTimestamp;
import static zipkin.internal.Util.checkNotNull;
import static zipkin.storage.cassandra3.CassandraUtil.bindWithName;
import static zipkin.storage.cassandra3.CassandraUtil.durationIndexBucket;

final class CassandraSpanConsumer implements GuavaSpanConsumer {
private static final Logger LOG = LoggerFactory.getLogger(CassandraSpanConsumer.class);
private static final Function<Object, Void> TO_VOID = Functions.<Void>constant(null);
private static final long WRITTEN_NAMES_TTL
= Long.getLong("zipkin.store.cassandra3.internal.writtenNamesTtl", 60 * 60 * 1000);

private final Session session;
private final PreparedStatement insertSpan;
private final PreparedStatement insertTraceServiceSpanName;
private final PreparedStatement insertServiceSpanName;
private final Schema.Metadata metadata;
private final DeduplicatingExecutor deduplicatingExecutor;

CassandraSpanConsumer(Session session) {
this.session = session;
Expand All @@ -70,7 +76,7 @@ final class CassandraSpanConsumer implements GuavaSpanConsumer {
.value("binary_annotations", QueryBuilder.bindMarker("binary_annotations"))
.value("all_annotations", QueryBuilder.bindMarker("all_annotations")));

insertServiceSpanName = session.prepare(
insertTraceServiceSpanName = session.prepare(
QueryBuilder
.insertInto(Schema.TABLE_TRACE_BY_SERVICE_SPAN)
.value("service_name", QueryBuilder.bindMarker("service_name"))
Expand All @@ -79,6 +85,14 @@ final class CassandraSpanConsumer implements GuavaSpanConsumer {
.value("ts", QueryBuilder.bindMarker("ts"))
.value("trace_id", QueryBuilder.bindMarker("trace_id"))
.value("duration", QueryBuilder.bindMarker("duration")));

insertServiceSpanName = session.prepare(
QueryBuilder
.insertInto(Schema.TABLE_SERVICE_SPANS)
.value("service_name", QueryBuilder.bindMarker("service_name"))
.value("span_name", QueryBuilder.bindMarker("span_name")));

deduplicatingExecutor = new DeduplicatingExecutor(session, WRITTEN_NAMES_TTL);
}

/**
Expand All @@ -98,12 +112,13 @@ public ListenableFuture<Void> accept(List<Span> rawSpans) {
for (String serviceName : span.serviceNames()) {
// QueryRequest.min/maxDuration
if (timestamp != null) {
// Contract for Repository.storeServiceSpanName is to store the span twice, once with
// Contract for Repository.storeTraceServiceSpanName is to store the span twice, once with
// the span name and another with empty string.
futures.add(storeServiceSpanName(serviceName, span.name, timestamp, span.duration,
futures.add(storeTraceServiceSpanName(serviceName, span.name, timestamp, span.duration,
traceId));
if (!span.name.isEmpty()) { // If span.name == "", this would be redundant
futures.add(storeServiceSpanName(serviceName, "", timestamp, span.duration, traceId));
futures.add(storeTraceServiceSpanName(serviceName, "", timestamp, span.duration, traceId));
futures.add(storeServiceSpanName(serviceName, span.name));
}
}
}
Expand Down Expand Up @@ -162,32 +177,106 @@ ListenableFuture<?> storeSpan(Span span, BigInteger traceId, Long timestamp) {
}
}

ListenableFuture<?> storeServiceSpanName(
ListenableFuture<?> storeTraceServiceSpanName(
String serviceName,
String spanName,
long timestamp_micro,
Long duration,
BigInteger traceId) {

int bucket = durationIndexBucket(timestamp_micro);
UUID ts = new UUID(
UUIDs.startOf(timestamp_micro / 1000).getMostSignificantBits(),
UUIDs.random().getLeastSignificantBits());
try {
BoundStatement bound =
bindWithName(insertServiceSpanName, "insert-service-span-name")
bindWithName(insertTraceServiceSpanName, "insert-service-span-name")
.setString("service_name", serviceName)
.setString("span_name", spanName)
.setInt("bucket", bucket)
.setUUID("ts", new UUID(
UUIDs.startOf(timestamp_micro / 1000).getMostSignificantBits(),
UUIDs.random().getLeastSignificantBits()))
.setUUID("ts", ts)
.setVarint("trace_id", traceId);

if (null != duration) {
bound = bound.setLong("duration", duration);
}

return session.executeAsync(bound);
return deduplicatingExecutor.maybeExecuteAsync(bound,
new TraceServiceSpanNameKey(serviceName, spanName, bucket, ts));

} catch (RuntimeException ex) {
return Futures.immediateFailedFuture(ex);
}
}

ListenableFuture<?> storeServiceSpanName(
String serviceName,
String spanName
) {
try {
BoundStatement bound = bindWithName(insertServiceSpanName, "insert-service-span-name")
.setString("service_name", serviceName)
.setString("span_name", spanName);

return deduplicatingExecutor.maybeExecuteAsync(bound, Pair.create(serviceName, spanName));
} catch (RuntimeException ex) {
return Futures.immediateFailedFuture(ex);
}
}

static BigInteger traceId(Span span) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs a rebase on master, as this shifted

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically this is in CassandraUtil now

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's still some drift here, but I'll take care of it post merge

return span.traceIdHigh != 0
? BigInteger.valueOf(span.traceIdHigh).shiftLeft(64).or(BigInteger.valueOf(span.traceId))
: BigInteger.valueOf(span.traceId);
}

final class TraceServiceSpanNameKey {

final String serviceName;
final String spanName;
final int bucket;
final UUID ts;

TraceServiceSpanNameKey(String serviceName, String spanName, int bucket, UUID ts) {
this.serviceName = serviceName;
this.spanName = spanName;
this.bucket = bucket;
this.ts = ts;
}

@Override
public String toString() {
return "(" + serviceName + ", " + spanName + ", " + bucket + ", " + ts + " )";
}

@Override
public boolean equals(Object o) {
if (o == this) {
return true;
}
if (o instanceof TraceServiceSpanNameKey) {
TraceServiceSpanNameKey that = (TraceServiceSpanNameKey) o;
return this.serviceName.equals(that.serviceName) &&
this.spanName.equals(that.spanName) &&
this.bucket == that.bucket &&
this.ts.equals(that.ts);
}
return false;
}

@Override
public int hashCode() {
int h = 1;
h *= 1000003;
h ^= this.serviceName.hashCode();
h *= 1000003;
h ^= this.spanName.hashCode();
h *= 1000003;
h ^= this.bucket;
h *= 1000003;
h ^= this.ts.hashCode();
return h;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import com.datastax.driver.core.utils.UUIDs;
import com.google.common.base.Function;
import com.google.common.collect.ContiguousSet;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.google.common.collect.Ordering;
import com.google.common.collect.Range;
import com.google.common.util.concurrent.AsyncFunction;
Expand Down Expand Up @@ -67,6 +67,7 @@
import static zipkin.internal.Util.getDays;
import static zipkin.storage.cassandra3.Schema.TABLE_TRACES;
import static zipkin.storage.cassandra3.Schema.TABLE_TRACE_BY_SERVICE_SPAN;
import static zipkin.storage.cassandra3.Schema.TABLE_SERVICE_SPANS;

final class CassandraSpanStore implements GuavaSpanStore {
private static final Logger LOG = LoggerFactory.getLogger(CassandraSpanStore.class);
Expand Down Expand Up @@ -116,11 +117,11 @@ public int compare(List<Span> left, List<Span> right) {
selectServiceNames = session.prepare(
QueryBuilder.select("service_name")
.distinct()
.from(Schema.VIEW_TRACE_BY_SERVICE));
.from(TABLE_SERVICE_SPANS));

selectSpanNames = session.prepare(
QueryBuilder.select("span_name")
.from(Schema.VIEW_TRACE_BY_SERVICE)
.from(TABLE_SERVICE_SPANS)
.where(QueryBuilder.eq("service_name", QueryBuilder.bindMarker("service_name")))
.limit(QueryBuilder.bindMarker("limit_")));

Expand Down Expand Up @@ -223,8 +224,9 @@ public ListenableFuture<List<List<Span>>> getTraces(final QueryRequest request)
}
return transform(traceIds, new AsyncFunction<Collection<BigInteger>, List<List<Span>>>() {
@Override public ListenableFuture<List<List<Span>>> apply(Collection<BigInteger> traceIds) {
traceIds = FluentIterable.from(traceIds).limit(request.limit).toSet();
return transform(getSpansByTraceIds(ImmutableSet.copyOf(traceIds), maxTraceCols), AdjustTraces.INSTANCE);
ImmutableSet<BigInteger> set = ImmutableSet.copyOf(traceIds);
set = ImmutableSet.copyOf(Iterators.limit(set.iterator(), request.limit));
return transform(getSpansByTraceIds(set, maxTraceCols), AdjustTraces.INSTANCE);
}

@Override public String toString() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin.storage.cassandra3;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Session;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Ticker;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.UncheckedExecutionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static zipkin.internal.Util.checkNotNull;

/**
* This reduces load on cassandra by preventing semantically equivalent requests from being invoked,
* subject to a local TTL.
*
* <p>Ex. If you want to test that you don't repeatedly send bad data, you could send a 400 back.
*
* <pre>{@code
* ttl = 60 * 1000; // 1 minute
* deduper = new DeduplicatingExecutor(session, ttl);
*
* // the result of the first execution against "foo" is returned to other callers
* // until it expires a minute later.
* deduper.maybeExecute(bound, "foo");
* deduper.maybeExecute(bound, "foo");
* }</pre>
*/
class DeduplicatingExecutor { // not final for testing

private final Session session;
private final LoadingCache<BoundStatementKey, ListenableFuture<Void>> cache;

/**
* @param session which conditionally executes bound statements
* @param ttl how long the results of statements are remembered, in milliseconds.
*/
DeduplicatingExecutor(Session session, long ttl) {
this.session = session;
this.cache = CacheBuilder.newBuilder()
.expireAfterWrite(ttl, TimeUnit.MILLISECONDS)
.ticker(new Ticker() {
@Override public long read() {
return nanoTime();
}
})
// TODO: maximum size or weight
.build(new CacheLoader<BoundStatementKey, ListenableFuture<Void>>() {
@Override public ListenableFuture<Void> load(final BoundStatementKey key) {
ListenableFuture<?> cassandraFuture = executeAsync(key.statement);

// Drop the cassandra future so that we don't hold references to cassandra state for
// long periods of time.
final SettableFuture<Void> disconnectedFuture = SettableFuture.create();
Futures.addCallback(cassandraFuture, new FutureCallback<Object>() {

@Override public void onSuccess(Object result) {
disconnectedFuture.set(null);
}

@Override public void onFailure(Throwable t) {
cache.invalidate(key);
disconnectedFuture.setException(t);
}
});
return disconnectedFuture;
}
});
}

/**
* Upon success, the statement's result will be remembered and returned for all subsequent
* executions with the same key, subject to a local TTL.
*
* <p>The results of failed statements are forgotten based on the supplied key.
*
* @param statement what to conditionally execute
* @param key determines equivalence of the bound statement
* @return future of work initiated by this or a previous request
*/
ListenableFuture<Void> maybeExecuteAsync(BoundStatement statement, Object key) {
BoundStatementKey cacheKey = new BoundStatementKey(statement, key);
try {
ListenableFuture<Void> result = cache.get(new BoundStatementKey(statement, key));
// A future could be constructed directly (i.e. immediate future), get the value to
// see if it was exceptional. If so, the catch block will invalidate that key.
if (result.isDone()) result.get();
return result;
} catch (UncheckedExecutionException | ExecutionException e) {
cache.invalidate(cacheKey);
return Futures.immediateFailedFuture(e.getCause());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError();
}
}

// visible for testing, since nanoTime is weird and can return negative
long nanoTime() {
return System.nanoTime();
}

@VisibleForTesting ListenableFuture<?> executeAsync(BoundStatement statement) {
return session.executeAsync(statement);
}

@VisibleForTesting void clear() {
cache.invalidateAll();
}

/** Used to hold a reference to the last statement executed, but without using it in hashCode */
static final class BoundStatementKey {
final BoundStatement statement;
final Object key;

BoundStatementKey(BoundStatement statement, Object key) {
this.statement = checkNotNull(statement, "statement");
this.key = checkNotNull(key, "key");
}

@Override
public String toString() {
return "(" + key + ", " + statement + ")";
}

@Override
public boolean equals(Object o) {
if (o == this) return true;
if (o instanceof BoundStatementKey) {
return this.key.equals(((BoundStatementKey) o).key);
}
return false;
}

@Override
public int hashCode() {
return key.hashCode();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ final class Schema {

static final String TABLE_TRACES = "traces";
static final String TABLE_TRACE_BY_SERVICE_SPAN = "trace_by_service_span";
static final String TABLE_SERVICE_SPANS = "span_name_by_service";
static final String TABLE_DEPENDENCIES = "dependencies";
static final String VIEW_TRACE_BY_SERVICE = "trace_by_service";

static final String DEFAULT_KEYSPACE = "zipkin3";
private static final String SCHEMA_RESOURCE = "/cassandra3-schema.cql";
Expand Down
Loading