Skip to content

Commit

Permalink
Adds ability to disable indexing for in-memory and cassandra v2 storage
Browse files Browse the repository at this point in the history
This starts towards "firehose mode" by making it possible to disable
indexing, notably on cassandra where SASI amplifies writes into the
cluster.

When enabled, only retrieval by ID is allowed. That said, someone can
feel free to run the dependency linker if they like.. If 100% sampling
is on, the dependency links will relate to actual service calls made.

See #1869
  • Loading branch information
Adrian Cole committed Jan 12, 2018
1 parent 3331eeb commit 85fcb2d
Show file tree
Hide file tree
Showing 22 changed files with 614 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ public final Builder indexReplicas(int indexReplicas) {
return this;
}

@Override public StorageComponent.Builder indexingEnabled(boolean indexingEnabled) {
delegate.indexingEnabled(indexingEnabled);
return this;
}

@Override public final ElasticsearchHttpStorage build() {
return new ElasticsearchHttpStorage(delegate.build(), legacyReadsEnabled);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public static Builder newBuilder(OkHttpClient client) {
.hosts(Collections.singletonList("http://localhost:9200"))
.maxRequests(64)
.strictTraceId(true)
.indexingEnabled(true)
.index("zipkin")
.dateSeparator('-')
.indexShards(5)
Expand Down Expand Up @@ -177,6 +178,8 @@ public final Builder dateSeparator(char dateSeparator) {

@Override public abstract Builder strictTraceId(boolean strictTraceId);

@Override public abstract Builder indexingEnabled(boolean indexingEnabled);

@Override public abstract ElasticsearchStorage build();

abstract IndexNameFormatter.Builder indexNameFormatterBuilder();
Expand All @@ -199,6 +202,8 @@ public final Builder dateSeparator(char dateSeparator) {

public abstract boolean strictTraceId();

abstract boolean indexingEnabled();

abstract int indexShards();

abstract int indexReplicas();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import zipkin2.Annotation;
import zipkin2.Call;
import zipkin2.Span;
import zipkin2.internal.Nullable;
import zipkin2.storage.SpanConsumer;
import zipkin2.storage.cassandra.internal.call.AggregateCall;

Expand All @@ -36,21 +37,27 @@ class CassandraSpanConsumer implements SpanConsumer { // not final for testing
= Long.getLong("zipkin2.storage.cassandra.internal.writtenNamesTtl", 60 * 60 * 1000);

private final Session session;
private final boolean strictTraceId;
private final boolean strictTraceId, indexingEnabled;
private final InsertSpan.Factory insertSpan;
private final InsertTraceByServiceSpan.Factory insertTraceByServiceSpan;
private final InsertServiceSpan.Factory insertServiceSpanName;
@Nullable final InsertTraceByServiceSpan.Factory insertTraceByServiceSpan;
@Nullable private final InsertServiceSpan.Factory insertServiceSpanName;

CassandraSpanConsumer(CassandraStorage storage) {
session = storage.session();
strictTraceId = storage.strictTraceId();
indexingEnabled = storage.indexingEnabled();

// warns when schema problems exist
Schema.readMetadata(session);

insertSpan = new InsertSpan.Factory(session, strictTraceId);
insertTraceByServiceSpan = new InsertTraceByServiceSpan.Factory(session, strictTraceId);
insertServiceSpanName = new InsertServiceSpan.Factory(session, WRITTEN_NAMES_TTL);
insertSpan = new InsertSpan.Factory(session, strictTraceId, indexingEnabled);
if (indexingEnabled) {
insertTraceByServiceSpan = new InsertTraceByServiceSpan.Factory(session, strictTraceId);
insertServiceSpanName = new InsertServiceSpan.Factory(session, WRITTEN_NAMES_TTL);
} else {
insertTraceByServiceSpan = null;
insertServiceSpanName = null;
}
}

/**
Expand All @@ -77,6 +84,8 @@ public Call<Void> accept(List<Span> input) {

spans.add(insertSpan.newInput(s, ts_uuid));

if (!indexingEnabled) continue;

// Empty values allow for api queries with blank service or span name
String service = s.localServiceName() != null ? s.localServiceName() : "";
String span =
Expand Down Expand Up @@ -106,11 +115,13 @@ public Call<Void> accept(List<Span> input) {
for (InsertSpan.Input span : spans) {
calls.add(insertSpan.create(span));
}
for (InsertServiceSpan.Input serviceSpan : serviceSpans) {
calls.add(insertServiceSpanName.create(serviceSpan));
}
for (InsertTraceByServiceSpan.Input serviceSpan : traceByServiceSpans) {
calls.add(insertTraceByServiceSpan.create(serviceSpan));
if (indexingEnabled) {
for (InsertServiceSpan.Input serviceSpan : serviceSpans) {
calls.add(insertServiceSpanName.create(serviceSpan));
}
for (InsertTraceByServiceSpan.Input serviceSpan : traceByServiceSpans) {
calls.add(insertTraceByServiceSpan.create(serviceSpan));
}
}
return new StoreSpansCall(calls);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.datastax.driver.core.Session;
import com.datastax.driver.core.utils.UUIDs;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -36,7 +37,7 @@
class CassandraSpanStore implements SpanStore { // not final for testing
private final int maxTraceCols;
private final int indexFetchMultiplier;
private final boolean strictTraceId;
private final boolean strictTraceId, indexingEnabled;
private final SelectFromSpan.Factory spans;
private final SelectDependencies.Factory dependencies;
private final SelectSpanNames.Factory spanNames;
Expand All @@ -50,6 +51,7 @@ class CassandraSpanStore implements SpanStore { // not final for testing
maxTraceCols = storage.maxTraceCols();
indexFetchMultiplier = storage.indexFetchMultiplier();
strictTraceId = storage.strictTraceId();
indexingEnabled = storage.indexingEnabled();
KeyspaceMetadata md = Schema.getKeyspaceMetadata(session);
indexTtl = md.getTable(TABLE_TRACE_BY_SERVICE_SPAN).getOptions().getDefaultTimeToLive();

Expand All @@ -74,6 +76,8 @@ class CassandraSpanStore implements SpanStore { // not final for testing
*/
@Override
public Call<List<List<Span>>> getTraces(QueryRequest request) {
if (!indexingEnabled) return Call.emptyList();

return strictTraceId ? doGetTraces(request) :
doGetTraces(request).map(new FilterTraces(request));
}
Expand All @@ -85,7 +89,8 @@ Call<List<List<Span>>> doGetTraces(QueryRequest request) {
final int traceIndexFetchSize = request.limit() * indexFetchMultiplier;
List<Call<Map<String, Long>>> callsToIntersect = new ArrayList<>();

List<String> annotationKeys = CassandraUtil.annotationKeys(request);
List<String> annotationKeys =
indexingEnabled ? CassandraUtil.annotationKeys(request) : Collections.emptyList();
for (String annotationKey : annotationKeys) {
callsToIntersect.add(spanTable.newCall(
request.serviceName(),
Expand Down Expand Up @@ -174,10 +179,12 @@ Call<Set<Entry<String, Long>>> newBucketedTraceIdCall(QueryRequest request,
}

@Override public Call<List<String>> getServiceNames() {
if (!indexingEnabled) return Call.emptyList();
return serviceNames.clone();
}

@Override public Call<List<String>> getSpanNames(String serviceName) {
if (!indexingEnabled) return Call.emptyList();
return spanNames.create(serviceName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public interface SessionFactory {
public static Builder newBuilder() {
return new AutoValue_CassandraStorage.Builder()
.strictTraceId(true)
.indexingEnabled(true)
.keyspace(Schema.DEFAULT_KEYSPACE)
.contactPoints("localhost")
// Zipkin collectors can create out a lot of async requests in bursts
Expand All @@ -67,6 +68,9 @@ public static abstract class Builder extends StorageComponent.Builder {
/** {@inheritDoc} */
@Override public abstract Builder strictTraceId(boolean strictTraceId);

/** {@inheritDoc} */
@Override public abstract Builder indexingEnabled(boolean indexingEnabled);

/** Override to control how sessions are created. */
public abstract Builder sessionFactory(SessionFactory sessionFactory);

Expand Down Expand Up @@ -146,6 +150,7 @@ public final Builder maxConnections(int maxConnections) {
abstract String keyspace();
abstract int indexFetchMultiplier();
abstract boolean strictTraceId();
abstract boolean indexingEnabled();
abstract SessionFactory sessionFactory();

/** session and close are typically called from different threads */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.google.auto.value.AutoValue;
import java.util.Collections;
Expand Down Expand Up @@ -71,11 +72,11 @@ final class InsertSpan extends ResultSetFutureCall {
static class Factory {
final Session session;
final PreparedStatement preparedStatement;
final boolean strictTraceId;
final boolean strictTraceId, indexingEnabled;

Factory(Session session, boolean strictTraceId) {
Factory(Session session, boolean strictTraceId, boolean indexingEnabled) {
this.session = session;
this.preparedStatement = session.prepare(QueryBuilder.insertInto(TABLE_SPAN)
Insert insertQuery = QueryBuilder.insertInto(TABLE_SPAN)
.value("trace_id", QueryBuilder.bindMarker("trace_id"))
.value("trace_id_high", QueryBuilder.bindMarker("trace_id_high"))
.value("ts_uuid", QueryBuilder.bindMarker("ts_uuid"))
Expand All @@ -86,14 +87,20 @@ static class Factory {
.value("ts", QueryBuilder.bindMarker("ts"))
.value("duration", QueryBuilder.bindMarker("duration"))
.value("l_ep", QueryBuilder.bindMarker("l_ep"))
.value("l_service", QueryBuilder.bindMarker("l_service"))
.value("r_ep", QueryBuilder.bindMarker("r_ep"))
.value("annotations", QueryBuilder.bindMarker("annotations"))
.value("tags", QueryBuilder.bindMarker("tags"))
.value("shared", QueryBuilder.bindMarker("shared"))
.value("debug", QueryBuilder.bindMarker("debug"))
.value("annotation_query", QueryBuilder.bindMarker("annotation_query")));
.value("debug", QueryBuilder.bindMarker("debug"));

if (indexingEnabled) {
insertQuery.value("l_service", QueryBuilder.bindMarker("l_service"));
insertQuery.value("annotation_query", QueryBuilder.bindMarker("annotation_query"));
}

this.preparedStatement = session.prepare(insertQuery);
this.strictTraceId = strictTraceId;
this.indexingEnabled = indexingEnabled;
}

Input newInput(zipkin2.Span span, UUID ts_uuid) {
Expand All @@ -106,7 +113,7 @@ Input newInput(zipkin2.Span span, UUID ts_uuid) {
} else {
annotations = Collections.emptyList();
}
String annotation_query = CassandraUtil.annotationQuery(span);
String annotation_query = indexingEnabled ? CassandraUtil.annotationQuery(span): null;
return new AutoValue_InsertSpan_Input(
ts_uuid,
traceIdHigh ? span.traceId().substring(0, 16) : null,
Expand Down Expand Up @@ -178,15 +185,18 @@ Call<ResultSet> create(Input span) {
if (null != input.ts()) bound.setLong("ts", input.ts());
if (null != input.duration()) bound.setLong("duration", input.duration());
if (null != input.l_ep()) bound.set("l_ep", input.l_ep(), EndpointUDT.class);
if (null != input.l_ep()) bound.setString("l_service", input.l_ep().getService());
if (null != input.r_ep()) bound.set("r_ep", input.r_ep(), EndpointUDT.class);
if (!input.annotations().isEmpty()) bound.setList("annotations", input.annotations());
if (!input.tags().isEmpty()) bound.setMap("tags", input.tags());
if (null != input.annotation_query()) {
bound.setString("annotation_query", input.annotation_query());
}
if (input.shared()) bound.setBool("shared", true);
if (input.debug()) bound.setBool("debug", true);

if (factory.indexingEnabled) {
if (null != input.l_ep()) bound.setString("l_service", input.l_ep().getService());
if (null != input.annotation_query()) {
bound.setString("annotation_query", input.annotation_query());
}
}
return factory.session.executeAsync(bound);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.rules.TestName;
import zipkin.DependencyLink;
import zipkin2.storage.SpanConsumer;

Expand Down Expand Up @@ -74,6 +75,11 @@ public static void blockWhileInFlight(CassandraStorage storage) {
}
}

public static String keyspace(TestName testName) {
String result = testName.getMethodName().toLowerCase();
return result.length() <= 48 ? result : result.substring(result.length() - 48);
}

public static void dropKeyspace(Session session, String keyspace) {
session.execute("DROP KEYSPACE IF EXISTS " + keyspace);
assertThat(session.getCluster().getMetadata().getKeyspace(keyspace)).isNull();
Expand Down
Loading

0 comments on commit 85fcb2d

Please sign in to comment.