Skip to content

Commit

Permalink
Adds Cassandra support for Autocomplete tags
Browse files Browse the repository at this point in the history
  • Loading branch information
zeagord authored and Adrian Cole committed Dec 17, 2018
1 parent 37d26cc commit fcceb20
Show file tree
Hide file tree
Showing 29 changed files with 792 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package zipkin2.autoconfigure.storage.cassandra;

import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
Expand Down Expand Up @@ -45,9 +46,12 @@ class ZipkinCassandraStorageAutoConfiguration {
StorageComponent storage(
ZipkinCassandraStorageProperties properties,
@Value("${zipkin.storage.strict-trace-id:true}") boolean strictTraceId,
@Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled) {
CassandraStorage.Builder builder =
properties.toBuilder().strictTraceId(strictTraceId).searchEnabled(searchEnabled);
@Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled,
@Value("${zipkin.storage.autocompleteKeys:}") List<String> autocompleteKeys) {
CassandraStorage.Builder builder = properties.toBuilder()
.strictTraceId(strictTraceId)
.searchEnabled(searchEnabled)
.autocompleteKeys(autocompleteKeys);
return tracingSessionFactory == null
? builder.build()
: builder.sessionFactory(tracingSessionFactory).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,17 @@ public void strictTraceId_canSetToFalse() {

assertThat(context.getBean(CassandraStorage.class).strictTraceId).isFalse();
}

@Test
public void tags_list() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:cassandra",
"zipkin.storage.autocompleteKeys:environment")
.applyTo(context);
Access.registerCassandra(context);
context.refresh();
assertThat(context.getBean(CassandraStorage.class).autocompleteKeys)
.containsOnly("environment");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package zipkin2.autoconfigure.storage.cassandra3;

import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
Expand Down Expand Up @@ -46,9 +47,12 @@ class ZipkinCassandra3StorageAutoConfiguration {
StorageComponent storage(
ZipkinCassandra3StorageProperties properties,
@Value("${zipkin.storage.strict-trace-id:true}") boolean strictTraceId,
@Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled) {
CassandraStorage.Builder builder =
properties.toBuilder().strictTraceId(strictTraceId).searchEnabled(searchEnabled);
@Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled,
@Value("${zipkin.storage.autocompleteKeys:}") List<String> autocompleteKeys) {
CassandraStorage.Builder builder = properties.toBuilder()
.strictTraceId(strictTraceId)
.searchEnabled(searchEnabled)
.autocompleteKeys(autocompleteKeys);
return tracingSessionFactory == null
? builder.build()
: builder.sessionFactory(tracingSessionFactory).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,18 @@ public void searchEnabled_canSetToFalse() {

assertThat(context.getBean(CassandraStorage.class).searchEnabled()).isFalse();
}

@Test
public void tags_list() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:cassandra3",
"zipkin.storage.autocompleteKeys:environment",
"zipkin.storage.search-enabled:true")
.applyTo(context);
Access.registerCassandra3(context);
context.refresh();
assertThat(context.getBean(CassandraStorage.class).autocompleteKeys())
.containsOnly("environment");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2015-2018 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin2.storage.cassandra.v1;

import java.util.List;
import zipkin2.Call;
import zipkin2.storage.AutocompleteTags;

final class CassandraAutocompleteTags implements AutocompleteTags {
final boolean enabled;
final Call<List<String>> keysCall;
final SelectAutocompleteValues.Factory valuesCallFactory;

CassandraAutocompleteTags(CassandraStorage storage) {
enabled = storage.searchEnabled && !storage.autocompleteKeys.isEmpty();
keysCall = Call.create(storage.autocompleteKeys);
valuesCallFactory = enabled ? new SelectAutocompleteValues.Factory(storage.session()) : null;
}

@Override public Call<List<String>> getKeys() {
if (!enabled) return Call.emptyList();
return keysCall.clone();
}

@Override public Call<List<String>> getValues(String key) {
if (key == null) throw new NullPointerException("key == null");
if (key.isEmpty()) throw new IllegalArgumentException("key was empty");
if (!enabled) return Call.emptyList();
return valuesCallFactory.create(key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import zipkin2.Annotation;
import zipkin2.Call;
Expand All @@ -33,13 +34,16 @@

final class CassandraSpanConsumer implements SpanConsumer {
static final int WRITTEN_NAMES_TTL =
Integer.getInteger("zipkin.store.cassandra.internal.writtenNamesTtl", 60 * 60 * 1000);

Integer.getInteger("zipkin.store.cassandra.internal.writtenNamesTtl", 60 * 60 * 1000);
static final long WRITTEN_TAGS_TTL =
Long.getLong("zipkin2.storage.cassandra.internal.writeTagsTtl", 60 * 60 * 1000);
final InsertTrace.Factory insertTrace;
final InsertServiceName.Factory insertServiceName;
final InsertSpanName.Factory insertSpanName;
final Schema.Metadata metadata;
final CompositeIndexer indexer;
final InsertAutocompleteValue.Factory insertTags;
final Set<String> autocompleteKeys;

CassandraSpanConsumer(CassandraStorage storage, CacheBuilderSpec indexCacheSpec) {
Session session = storage.session.get();
Expand All @@ -50,6 +54,8 @@ final class CassandraSpanConsumer implements SpanConsumer {
insertServiceName = new InsertServiceName.Factory(session, indexTtl, WRITTEN_NAMES_TTL);
insertSpanName = new InsertSpanName.Factory(session, indexTtl, WRITTEN_NAMES_TTL);
indexer = new CompositeIndexer(session, indexCacheSpec, storage.bucketCount, indexTtl);
insertTags = new InsertAutocompleteValue.Factory(session, WRITTEN_TAGS_TTL);
autocompleteKeys = new LinkedHashSet<>(storage.autocompleteKeys);
}

/**
Expand All @@ -66,6 +72,7 @@ public Call<Void> accept(List<Span> rawSpans) {
Set<InsertTrace.Input> insertTraces = new LinkedHashSet<>();
Set<String> insertServiceNames = new LinkedHashSet<>();
Set<InsertSpanName.Input> insertSpanNames = new LinkedHashSet<>();
Set<Map.Entry<String, String>> autocompleteTags = new LinkedHashSet<>();

for (Span v2 : rawSpans) {
V1Span span = converter.convert(v2);
Expand All @@ -80,7 +87,9 @@ public Call<Void> accept(List<Span> rawSpans) {
if (span.name() == null) continue;
insertSpanNames.add(insertSpanName.newInput(serviceName, span.name()));
}

for (Map.Entry<String, String> entry : v2.tags().entrySet()) {
if (autocompleteKeys.contains(entry.getKey())) autocompleteTags.add(entry);
}
if (ts_micro == 0L) continue; // search is only valid with a timestamp, don't index w/o it!
spansToIndex.add(span);
}
Expand All @@ -95,7 +104,9 @@ public Call<Void> accept(List<Span> rawSpans) {
for (InsertSpanName.Input insert : insertSpanNames) {
calls.add(insertSpanName.create(insert));
}

for (Map.Entry<String, String> entry : autocompleteTags) {
calls.add(insertTags.create(entry));
}
indexer.index(spansToIndex.build(), calls);
if (calls.size() == 1) return calls.get(0).map(r -> null);
return new StoreSpansCall(calls);
Expand All @@ -106,6 +117,7 @@ public Call<Void> accept(List<Span> rawSpans) {
void clear() {
insertServiceName.cache.clear();
insertSpanName.cache.clear();
insertTags.cache.clear();
indexer.clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public final class CassandraSpanStore implements SpanStore {
private final SelectDependencies.Factory dependencies;
private final Call<List<String>> serviceNames;
private final SelectSpanNames.Factory spanNames;

private final SelectTraceIdTimestampFromServiceName.Factory selectTraceIdsByServiceName;
private final SelectTraceIdTimestampFromServiceNames.Factory selectTraceIdsByServiceNames;
private final SelectTraceIdTimestampFromServiceSpanName.Factory selectTraceIdsBySpanName;
Expand All @@ -69,7 +70,6 @@ public final class CassandraSpanStore implements SpanStore {
dependencies = new SelectDependencies.Factory(session);
spanNames = new SelectSpanNames.Factory(session);
serviceNames = new SelectServiceNames.Factory(session).create();

selectTraceIdsByServiceName =
new SelectTraceIdTimestampFromServiceName.Factory(session, timestampCodec, buckets);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.google.common.cache.CacheBuilderSpec;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import zipkin2.CheckResult;
import zipkin2.internal.Nullable;
import zipkin2.storage.AutocompleteTags;
import zipkin2.storage.QueryRequest;
import zipkin2.storage.SpanConsumer;
import zipkin2.storage.SpanStore;
Expand Down Expand Up @@ -57,6 +61,7 @@ public static final class Builder extends StorageComponent.Builder {
int indexCacheMax = 100000;
int indexCacheTtl = 60;
int indexFetchMultiplier = 3;
List<String> autocompleteKeys = new ArrayList<>();

/**
* Used to avoid hot spots when writing indexes used to query by service name or annotation.
Expand Down Expand Up @@ -86,6 +91,12 @@ public Builder searchEnabled(boolean searchEnabled) {
return this;
}

@Override public Builder autocompleteKeys(List<String> keys) {
if (keys == null) throw new NullPointerException("keys == null");
this.autocompleteKeys = keys;
return this;
}

/** Override to control how sessions are created. */
public Builder sessionFactory(SessionFactory sessionFactory) {
this.sessionFactory = checkNotNull(sessionFactory, "sessionFactory");
Expand Down Expand Up @@ -255,12 +266,14 @@ public CassandraStorage build() {
final int indexFetchMultiplier;
final boolean strictTraceId, searchEnabled;
final LazySession session;
final List<String> autocompleteKeys;

/** close is typically called from a different thread */
volatile boolean closeCalled;

volatile CassandraSpanConsumer spanConsumer;
volatile CassandraSpanStore spanStore;
volatile CassandraAutocompleteTags tagStore;

CassandraStorage(Builder b) {
this.contactPoints = b.contactPoints;
Expand All @@ -286,6 +299,7 @@ public CassandraStorage build() {
this.indexCacheSpec = null;
}
this.indexFetchMultiplier = b.indexFetchMultiplier;
this.autocompleteKeys = b.autocompleteKeys;
}

/** Lazy initializes or returns the session in use by this storage component. */
Expand All @@ -306,6 +320,17 @@ public SpanStore spanStore() {
return spanStore;
}

@Override public AutocompleteTags autocompleteTags() {
if (tagStore == null) {
synchronized (this) {
if (tagStore == null) {
tagStore = new CassandraAutocompleteTags(this);
}
}
}
return tagStore;
}

/** {@inheritDoc} Memoized in order to avoid re-preparing statements */
@Override
public SpanConsumer spanConsumer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public IndexCall clone() {
}

void index(List<V1Span> spans, List<Call<ResultSet>> calls) {
// First parse each span into partition keys used to support query requests
// First parse each span into partition getKeys used to support query requests
Builder<PartitionKeyToTraceId, Long> parsed = ImmutableSetMultimap.builder();
for (V1Span span : spans) {
Long timestamp = guessTimestamp(span);
Expand Down
Loading

0 comments on commit fcceb20

Please sign in to comment.