Skip to content

Commit

Permalink
Adds Cassandra support for Autocomplete tags
Browse files Browse the repository at this point in the history
  • Loading branch information
zeagord authored and Adrian Cole committed Dec 18, 2018
1 parent 0efe65a commit 3ea8062
Show file tree
Hide file tree
Showing 39 changed files with 855 additions and 500 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package zipkin2.autoconfigure.storage.cassandra;

import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
Expand Down Expand Up @@ -45,9 +46,12 @@ class ZipkinCassandraStorageAutoConfiguration {
StorageComponent storage(
ZipkinCassandraStorageProperties properties,
@Value("${zipkin.storage.strict-trace-id:true}") boolean strictTraceId,
@Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled) {
CassandraStorage.Builder builder =
properties.toBuilder().strictTraceId(strictTraceId).searchEnabled(searchEnabled);
@Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled,
@Value("${zipkin.storage.autocomplete-keys:}") List<String> autocompleteKeys) {
CassandraStorage.Builder builder = properties.toBuilder()
.strictTraceId(strictTraceId)
.searchEnabled(searchEnabled)
.autocompleteKeys(autocompleteKeys);
return tracingSessionFactory == null
? builder.build()
: builder.sessionFactory(tracingSessionFactory).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,18 @@ public void strictTraceId_canSetToFalse() {

assertThat(context.getBean(CassandraStorage.class).strictTraceId).isFalse();
}

@Test
public void autocompleteKeys_list() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:cassandra",
"zipkin.storage.autocomplete-keys:environment")
.applyTo(context);
Access.registerCassandra(context);
context.refresh();

assertThat(context.getBean(CassandraStorage.class).autocompleteKeys)
.containsOnly("environment");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package zipkin2.autoconfigure.storage.cassandra3;

import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
Expand Down Expand Up @@ -46,9 +47,12 @@ class ZipkinCassandra3StorageAutoConfiguration {
StorageComponent storage(
ZipkinCassandra3StorageProperties properties,
@Value("${zipkin.storage.strict-trace-id:true}") boolean strictTraceId,
@Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled) {
CassandraStorage.Builder builder =
properties.toBuilder().strictTraceId(strictTraceId).searchEnabled(searchEnabled);
@Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled,
@Value("${zipkin.storage.autocomplete-keys:}") List<String> autocompleteKeys) {
CassandraStorage.Builder builder = properties.toBuilder()
.strictTraceId(strictTraceId)
.searchEnabled(searchEnabled)
.autocompleteKeys(autocompleteKeys);
return tracingSessionFactory == null
? builder.build()
: builder.sessionFactory(tracingSessionFactory).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,18 @@ public void searchEnabled_canSetToFalse() {

assertThat(context.getBean(CassandraStorage.class).searchEnabled()).isFalse();
}

@Test
public void autocompleteKeys_list() {
context = new AnnotationConfigApplicationContext();
TestPropertyValues.of(
"zipkin.storage.type:cassandra3",
"zipkin.storage.autocomplete-keys:environment")
.applyTo(context);
Access.registerCassandra3(context);
context.refresh();

assertThat(context.getBean(CassandraStorage.class).autocompleteKeys())
.containsOnly("environment");
}
}
6 changes: 4 additions & 2 deletions zipkin-storage/cassandra-v1/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ That said, all integration tests run on pull request via Travis.
## Tuning
This component is tuned to help reduce the size of indexes needed to perform query operations. The most important aspects are described below. See [CassandraStorage](src/main/java/zipkin/storage/cassandra/CassandraStorage.java) for details.

### Service and span name indexing
Redundant requests to store service or span names are ignored for an hour to reduce load.
### Autocomplete indexing
Redundant requests to store service names, span names, and autocomplete
values are ignored for an hour to reduce load. This is implemented by
[DeduplicatingCall](../cassandra/src/main/java/zipkin2/storage/cassandra/internal/call/DeduplicatingCall.java).

### Trace indexing
Indexing of traces are optimized by default. This reduces writes to Cassandra at the cost of memory
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2015-2018 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin2.storage.cassandra.v1;

import java.util.List;
import zipkin2.Call;
import zipkin2.storage.AutocompleteTags;

final class CassandraAutocompleteTags implements AutocompleteTags {
final boolean enabled;
final Call<List<String>> keysCall;
final SelectAutocompleteValues.Factory valuesCallFactory;

CassandraAutocompleteTags(CassandraStorage storage) {
enabled = storage.searchEnabled && !storage.autocompleteKeys.isEmpty();
keysCall = Call.create(storage.autocompleteKeys);
valuesCallFactory = enabled ? new SelectAutocompleteValues.Factory(storage.session()) : null;
}

@Override public Call<List<String>> getKeys() {
if (!enabled) return Call.emptyList();
return keysCall.clone();
}

@Override public Call<List<String>> getValues(String key) {
if (key == null) throw new NullPointerException("key == null");
if (key.isEmpty()) throw new IllegalArgumentException("key was empty");
if (!enabled) return Call.emptyList();
return valuesCallFactory.create(key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import zipkin2.Annotation;
import zipkin2.Call;
Expand All @@ -33,13 +34,14 @@

final class CassandraSpanConsumer implements SpanConsumer {
static final int WRITTEN_NAMES_TTL =
Integer.getInteger("zipkin.store.cassandra.internal.writtenNamesTtl", 60 * 60 * 1000);

Integer.getInteger("zipkin.store.cassandra.internal.writtenNamesTtl", 60 * 60 * 1000);
final InsertTrace.Factory insertTrace;
final InsertServiceName.Factory insertServiceName;
final InsertSpanName.Factory insertSpanName;
final Schema.Metadata metadata;
final CompositeIndexer indexer;
final InsertAutocompleteValue.Factory insertAutocompleteValue;
final Set<String> autocompleteKeys;

CassandraSpanConsumer(CassandraStorage storage, CacheBuilderSpec indexCacheSpec) {
Session session = storage.session.get();
Expand All @@ -49,7 +51,9 @@ final class CassandraSpanConsumer implements SpanConsumer {
insertTrace = new InsertTrace.Factory(session, metadata, spanTtl);
insertServiceName = new InsertServiceName.Factory(session, indexTtl, WRITTEN_NAMES_TTL);
insertSpanName = new InsertSpanName.Factory(session, indexTtl, WRITTEN_NAMES_TTL);
insertAutocompleteValue = new InsertAutocompleteValue.Factory(session, indexTtl, WRITTEN_NAMES_TTL);
indexer = new CompositeIndexer(session, indexCacheSpec, storage.bucketCount, indexTtl);
autocompleteKeys = new LinkedHashSet<>(storage.autocompleteKeys);
}

/**
Expand All @@ -66,6 +70,7 @@ public Call<Void> accept(List<Span> rawSpans) {
Set<InsertTrace.Input> insertTraces = new LinkedHashSet<>();
Set<String> insertServiceNames = new LinkedHashSet<>();
Set<InsertSpanName.Input> insertSpanNames = new LinkedHashSet<>();
Set<Map.Entry<String, String>> autocompleteTags = new LinkedHashSet<>();

for (Span v2 : rawSpans) {
V1Span span = converter.convert(v2);
Expand All @@ -80,7 +85,9 @@ public Call<Void> accept(List<Span> rawSpans) {
if (span.name() == null) continue;
insertSpanNames.add(insertSpanName.newInput(serviceName, span.name()));
}

for (Map.Entry<String, String> entry : v2.tags().entrySet()) {
if (autocompleteKeys.contains(entry.getKey())) autocompleteTags.add(entry);
}
if (ts_micro == 0L) continue; // search is only valid with a timestamp, don't index w/o it!
spansToIndex.add(span);
}
Expand All @@ -95,7 +102,9 @@ public Call<Void> accept(List<Span> rawSpans) {
for (InsertSpanName.Input insert : insertSpanNames) {
calls.add(insertSpanName.create(insert));
}

for (Map.Entry<String, String> entry : autocompleteTags) {
calls.add(insertAutocompleteValue.create(entry));
}
indexer.index(spansToIndex.build(), calls);
if (calls.size() == 1) return calls.get(0).map(r -> null);
return new StoreSpansCall(calls);
Expand All @@ -104,8 +113,9 @@ public Call<Void> accept(List<Span> rawSpans) {
/** Clears any caches */
@VisibleForTesting
void clear() {
insertServiceName.cache.clear();
insertSpanName.cache.clear();
insertServiceName.clear();
insertSpanName.clear();
insertAutocompleteValue.clear();
indexer.clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.google.common.cache.CacheBuilderSpec;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import zipkin2.CheckResult;
import zipkin2.internal.Nullable;
import zipkin2.storage.AutocompleteTags;
import zipkin2.storage.QueryRequest;
import zipkin2.storage.SpanConsumer;
import zipkin2.storage.SpanStore;
Expand Down Expand Up @@ -57,6 +61,7 @@ public static final class Builder extends StorageComponent.Builder {
int indexCacheMax = 100000;
int indexCacheTtl = 60;
int indexFetchMultiplier = 3;
List<String> autocompleteKeys = new ArrayList<>();

/**
* Used to avoid hot spots when writing indexes used to query by service name or annotation.
Expand Down Expand Up @@ -86,6 +91,12 @@ public Builder searchEnabled(boolean searchEnabled) {
return this;
}

@Override public Builder autocompleteKeys(List<String> keys) {
if (keys == null) throw new NullPointerException("keys == null");
this.autocompleteKeys = keys;
return this;
}

/** Override to control how sessions are created. */
public Builder sessionFactory(SessionFactory sessionFactory) {
this.sessionFactory = checkNotNull(sessionFactory, "sessionFactory");
Expand Down Expand Up @@ -255,12 +266,14 @@ public CassandraStorage build() {
final int indexFetchMultiplier;
final boolean strictTraceId, searchEnabled;
final LazySession session;
final List<String> autocompleteKeys;

/** close is typically called from a different thread */
volatile boolean closeCalled;

volatile CassandraSpanConsumer spanConsumer;
volatile CassandraSpanStore spanStore;
volatile CassandraAutocompleteTags tagStore;

CassandraStorage(Builder b) {
this.contactPoints = b.contactPoints;
Expand All @@ -286,6 +299,7 @@ public CassandraStorage build() {
this.indexCacheSpec = null;
}
this.indexFetchMultiplier = b.indexFetchMultiplier;
this.autocompleteKeys = b.autocompleteKeys;
}

/** Lazy initializes or returns the session in use by this storage component. */
Expand All @@ -306,6 +320,17 @@ public SpanStore spanStore() {
return spanStore;
}

@Override public AutocompleteTags autocompleteTags() {
if (tagStore == null) {
synchronized (this) {
if (tagStore == null) {
tagStore = new CassandraAutocompleteTags(this);
}
}
}
return tagStore;
}

/** {@inheritDoc} Memoized in order to avoid re-preparing statements */
@Override
public SpanConsumer spanConsumer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@

/**
* Inserts index rows into Cassandra according to {@link IndexSupport} of a table. This skips
* entries that don't improve results based on {@link QueryRequest#endTs} and {@link
* QueryRequest#lookback}. For example, it doesn't insert rows that only vary on timestamp and exist
* between timestamps of existing rows.
* entries that don't improve results based on {@link QueryRequest#endTs()} and {@link
* QueryRequest#lookback()}. For example, it doesn't insert rows that only vary on timestamp and
* exist between timestamps of existing rows.
*/
final class Indexer {
static final Logger LOG = LoggerFactory.getLogger(Indexer.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2015-2018 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package zipkin2.storage.cassandra.v1;

import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import java.util.Map;
import zipkin2.Call;
import zipkin2.storage.cassandra.internal.call.DeduplicatingCall;

import static zipkin2.storage.cassandra.v1.Tables.TABLE_AUTOCOMPLETE_TAGS;

final class InsertAutocompleteValue extends DeduplicatingCall<Map.Entry<String, String>> {

static class Factory
extends DeduplicatingCall.Factory<Map.Entry<String, String>, InsertAutocompleteValue> {
final Session session;
final PreparedStatement preparedStatement;

/**
* @param indexTtl how long cassandra will persist the rows
* @param redundantCallTtl how long in milliseconds to obviate redundant calls
*/
Factory(Session session, int indexTtl, int redundantCallTtl) {
super(redundantCallTtl);
this.session = session;
Insert insertQuery = QueryBuilder.insertInto(TABLE_AUTOCOMPLETE_TAGS)
.value("key", QueryBuilder.bindMarker("key"))
.value("value", QueryBuilder.bindMarker("value"));
if (indexTtl > 0) insertQuery.using(QueryBuilder.ttl(indexTtl));
this.preparedStatement = session.prepare(insertQuery);
}

@Override protected InsertAutocompleteValue newCall(Map.Entry<String, String> input) {
return new InsertAutocompleteValue(this, input);
}
}

final Factory factory;
final Map.Entry<String, String> input;

InsertAutocompleteValue(Factory factory, Map.Entry<String, String> input) {
super(factory, input);
this.factory = factory;
this.input = input;
}

@Override protected ResultSetFuture newFuture() {
return factory.session.executeAsync(factory.preparedStatement.bind()
.setString("key", input.getKey())
.setString("value", input.getValue()));
}

@Override public String toString() {
return "InsertAutocompleteValue(" + input + ")";
}

@Override public Call<ResultSet> clone() {
return new InsertAutocompleteValue(factory, input);
}
}
Loading

0 comments on commit 3ea8062

Please sign in to comment.