From fcceb200f7c1220b41a6cdef00ad2600c41ade76 Mon Sep 17 00:00:00 2001 From: Raja Sundaram Date: Mon, 17 Dec 2018 16:37:58 +0800 Subject: [PATCH] Adds Cassandra support for Autocomplete tags --- ...pkinCassandraStorageAutoConfiguration.java | 10 +- ...CassandraStorageAutoConfigurationTest.java | 13 ++ ...kinCassandra3StorageAutoConfiguration.java | 10 +- ...CassandraStorageAutoConfigurationTest.java | 14 ++ .../v1/CassandraAutocompleteTags.java | 42 ++++++ .../cassandra/v1/CassandraSpanConsumer.java | 20 ++- .../cassandra/v1/CassandraSpanStore.java | 2 +- .../cassandra/v1/CassandraStorage.java | 25 ++++ .../zipkin2/storage/cassandra/v1/Indexer.java | 2 +- .../cassandra/v1/InsertAutocompleteValue.java | 132 ++++++++++++++++++ .../v1/SelectAutocompleteValues.java | 89 ++++++++++++ .../zipkin2/storage/cassandra/v1/Tables.java | 6 +- .../main/resources/cassandra-schema-cql3.txt | 8 ++ .../cassandra/v1/ITCassandraStorage.java | 13 ++ .../storage/cassandra/v1/ITSpanConsumer.java | 34 ++++- .../cassandra/CassandraAutocompleteTags.java | 42 ++++++ .../cassandra/CassandraSpanConsumer.java | 33 +++-- .../storage/cassandra/CassandraSpanStore.java | 1 - .../storage/cassandra/CassandraStorage.java | 21 ++- .../storage/cassandra/CassandraUtil.java | 4 +- .../cassandra/InsertAutocompleteValue.java | 132 ++++++++++++++++++ .../zipkin2/storage/cassandra/Schema.java | 1 + .../cassandra/SelectAutocompleteValues.java | 90 ++++++++++++ .../storage/cassandra/SelectFromSpan.java | 2 +- .../storage/cassandra/SelectSpanNames.java | 10 +- .../main/resources/zipkin2-schema-indexes.cql | 14 ++ .../storage/cassandra/ITCassandraStorage.java | 13 ++ .../storage/cassandra/ITEnsureSchema.java | 1 + .../storage/cassandra/ITSpanConsumer.java | 46 +++++- 29 files changed, 792 insertions(+), 38 deletions(-) create mode 100644 zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/CassandraAutocompleteTags.java create mode 100644 zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/InsertAutocompleteValue.java create mode 100644 zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/SelectAutocompleteValues.java create mode 100644 zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraAutocompleteTags.java create mode 100644 zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/InsertAutocompleteValue.java create mode 100644 zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/SelectAutocompleteValues.java diff --git a/zipkin-autoconfigure/storage-cassandra/src/main/java/zipkin2/autoconfigure/storage/cassandra/ZipkinCassandraStorageAutoConfiguration.java b/zipkin-autoconfigure/storage-cassandra/src/main/java/zipkin2/autoconfigure/storage/cassandra/ZipkinCassandraStorageAutoConfiguration.java index e91d1fa44c8..abeb998b2b9 100644 --- a/zipkin-autoconfigure/storage-cassandra/src/main/java/zipkin2/autoconfigure/storage/cassandra/ZipkinCassandraStorageAutoConfiguration.java +++ b/zipkin-autoconfigure/storage-cassandra/src/main/java/zipkin2/autoconfigure/storage/cassandra/ZipkinCassandraStorageAutoConfiguration.java @@ -13,6 +13,7 @@ */ package zipkin2.autoconfigure.storage.cassandra; +import java.util.List; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; @@ -45,9 +46,12 @@ class ZipkinCassandraStorageAutoConfiguration { StorageComponent storage( ZipkinCassandraStorageProperties properties, @Value("${zipkin.storage.strict-trace-id:true}") boolean strictTraceId, - @Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled) { - CassandraStorage.Builder builder = - properties.toBuilder().strictTraceId(strictTraceId).searchEnabled(searchEnabled); + @Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled, + @Value("${zipkin.storage.autocompleteKeys:}") List autocompleteKeys) { + CassandraStorage.Builder builder = properties.toBuilder() + .strictTraceId(strictTraceId) + .searchEnabled(searchEnabled) + .autocompleteKeys(autocompleteKeys); return tracingSessionFactory == null ? builder.build() : builder.sessionFactory(tracingSessionFactory).build(); diff --git a/zipkin-autoconfigure/storage-cassandra/src/test/java/zipkin2/storage/cassandra/v1/ZipkinCassandraStorageAutoConfigurationTest.java b/zipkin-autoconfigure/storage-cassandra/src/test/java/zipkin2/storage/cassandra/v1/ZipkinCassandraStorageAutoConfigurationTest.java index e97dcd8556d..3642ce587d3 100644 --- a/zipkin-autoconfigure/storage-cassandra/src/test/java/zipkin2/storage/cassandra/v1/ZipkinCassandraStorageAutoConfigurationTest.java +++ b/zipkin-autoconfigure/storage-cassandra/src/test/java/zipkin2/storage/cassandra/v1/ZipkinCassandraStorageAutoConfigurationTest.java @@ -92,4 +92,17 @@ public void strictTraceId_canSetToFalse() { assertThat(context.getBean(CassandraStorage.class).strictTraceId).isFalse(); } + + @Test + public void tags_list() { + context = new AnnotationConfigApplicationContext(); + TestPropertyValues.of( + "zipkin.storage.type:cassandra", + "zipkin.storage.autocompleteKeys:environment") + .applyTo(context); + Access.registerCassandra(context); + context.refresh(); + assertThat(context.getBean(CassandraStorage.class).autocompleteKeys) + .containsOnly("environment"); + } } diff --git a/zipkin-autoconfigure/storage-cassandra3/src/main/java/zipkin2/autoconfigure/storage/cassandra3/ZipkinCassandra3StorageAutoConfiguration.java b/zipkin-autoconfigure/storage-cassandra3/src/main/java/zipkin2/autoconfigure/storage/cassandra3/ZipkinCassandra3StorageAutoConfiguration.java index ecb0051281b..1276df32226 100644 --- a/zipkin-autoconfigure/storage-cassandra3/src/main/java/zipkin2/autoconfigure/storage/cassandra3/ZipkinCassandra3StorageAutoConfiguration.java +++ b/zipkin-autoconfigure/storage-cassandra3/src/main/java/zipkin2/autoconfigure/storage/cassandra3/ZipkinCassandra3StorageAutoConfiguration.java @@ -13,6 +13,7 @@ */ package zipkin2.autoconfigure.storage.cassandra3; +import java.util.List; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; @@ -46,9 +47,12 @@ class ZipkinCassandra3StorageAutoConfiguration { StorageComponent storage( ZipkinCassandra3StorageProperties properties, @Value("${zipkin.storage.strict-trace-id:true}") boolean strictTraceId, - @Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled) { - CassandraStorage.Builder builder = - properties.toBuilder().strictTraceId(strictTraceId).searchEnabled(searchEnabled); + @Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled, + @Value("${zipkin.storage.autocompleteKeys:}") List autocompleteKeys) { + CassandraStorage.Builder builder = properties.toBuilder() + .strictTraceId(strictTraceId) + .searchEnabled(searchEnabled) + .autocompleteKeys(autocompleteKeys); return tracingSessionFactory == null ? builder.build() : builder.sessionFactory(tracingSessionFactory).build(); diff --git a/zipkin-autoconfigure/storage-cassandra3/src/test/java/zipkin2/storage/cassandra/ZipkinCassandraStorageAutoConfigurationTest.java b/zipkin-autoconfigure/storage-cassandra3/src/test/java/zipkin2/storage/cassandra/ZipkinCassandraStorageAutoConfigurationTest.java index a5df2bc79de..c3bf494fcf2 100644 --- a/zipkin-autoconfigure/storage-cassandra3/src/test/java/zipkin2/storage/cassandra/ZipkinCassandraStorageAutoConfigurationTest.java +++ b/zipkin-autoconfigure/storage-cassandra3/src/test/java/zipkin2/storage/cassandra/ZipkinCassandraStorageAutoConfigurationTest.java @@ -106,4 +106,18 @@ public void searchEnabled_canSetToFalse() { assertThat(context.getBean(CassandraStorage.class).searchEnabled()).isFalse(); } + + @Test + public void tags_list() { + context = new AnnotationConfigApplicationContext(); + TestPropertyValues.of( + "zipkin.storage.type:cassandra3", + "zipkin.storage.autocompleteKeys:environment", + "zipkin.storage.search-enabled:true") + .applyTo(context); + Access.registerCassandra3(context); + context.refresh(); + assertThat(context.getBean(CassandraStorage.class).autocompleteKeys()) + .containsOnly("environment"); + } } diff --git a/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/CassandraAutocompleteTags.java b/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/CassandraAutocompleteTags.java new file mode 100644 index 00000000000..1db26cadcdd --- /dev/null +++ b/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/CassandraAutocompleteTags.java @@ -0,0 +1,42 @@ +/* + * Copyright 2015-2018 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.storage.cassandra.v1; + +import java.util.List; +import zipkin2.Call; +import zipkin2.storage.AutocompleteTags; + +final class CassandraAutocompleteTags implements AutocompleteTags { + final boolean enabled; + final Call> keysCall; + final SelectAutocompleteValues.Factory valuesCallFactory; + + CassandraAutocompleteTags(CassandraStorage storage) { + enabled = storage.searchEnabled && !storage.autocompleteKeys.isEmpty(); + keysCall = Call.create(storage.autocompleteKeys); + valuesCallFactory = enabled ? new SelectAutocompleteValues.Factory(storage.session()) : null; + } + + @Override public Call> getKeys() { + if (!enabled) return Call.emptyList(); + return keysCall.clone(); + } + + @Override public Call> getValues(String key) { + if (key == null) throw new NullPointerException("key == null"); + if (key.isEmpty()) throw new IllegalArgumentException("key was empty"); + if (!enabled) return Call.emptyList(); + return valuesCallFactory.create(key); + } +} diff --git a/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/CassandraSpanConsumer.java b/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/CassandraSpanConsumer.java index 54c59fe17fd..ac7d8b27305 100644 --- a/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/CassandraSpanConsumer.java +++ b/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/CassandraSpanConsumer.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Set; import zipkin2.Annotation; import zipkin2.Call; @@ -33,13 +34,16 @@ final class CassandraSpanConsumer implements SpanConsumer { static final int WRITTEN_NAMES_TTL = - Integer.getInteger("zipkin.store.cassandra.internal.writtenNamesTtl", 60 * 60 * 1000); - + Integer.getInteger("zipkin.store.cassandra.internal.writtenNamesTtl", 60 * 60 * 1000); + static final long WRITTEN_TAGS_TTL = + Long.getLong("zipkin2.storage.cassandra.internal.writeTagsTtl", 60 * 60 * 1000); final InsertTrace.Factory insertTrace; final InsertServiceName.Factory insertServiceName; final InsertSpanName.Factory insertSpanName; final Schema.Metadata metadata; final CompositeIndexer indexer; + final InsertAutocompleteValue.Factory insertTags; + final Set autocompleteKeys; CassandraSpanConsumer(CassandraStorage storage, CacheBuilderSpec indexCacheSpec) { Session session = storage.session.get(); @@ -50,6 +54,8 @@ final class CassandraSpanConsumer implements SpanConsumer { insertServiceName = new InsertServiceName.Factory(session, indexTtl, WRITTEN_NAMES_TTL); insertSpanName = new InsertSpanName.Factory(session, indexTtl, WRITTEN_NAMES_TTL); indexer = new CompositeIndexer(session, indexCacheSpec, storage.bucketCount, indexTtl); + insertTags = new InsertAutocompleteValue.Factory(session, WRITTEN_TAGS_TTL); + autocompleteKeys = new LinkedHashSet<>(storage.autocompleteKeys); } /** @@ -66,6 +72,7 @@ public Call accept(List rawSpans) { Set insertTraces = new LinkedHashSet<>(); Set insertServiceNames = new LinkedHashSet<>(); Set insertSpanNames = new LinkedHashSet<>(); + Set> autocompleteTags = new LinkedHashSet<>(); for (Span v2 : rawSpans) { V1Span span = converter.convert(v2); @@ -80,7 +87,9 @@ public Call accept(List rawSpans) { if (span.name() == null) continue; insertSpanNames.add(insertSpanName.newInput(serviceName, span.name())); } - + for (Map.Entry entry : v2.tags().entrySet()) { + if (autocompleteKeys.contains(entry.getKey())) autocompleteTags.add(entry); + } if (ts_micro == 0L) continue; // search is only valid with a timestamp, don't index w/o it! spansToIndex.add(span); } @@ -95,7 +104,9 @@ public Call accept(List rawSpans) { for (InsertSpanName.Input insert : insertSpanNames) { calls.add(insertSpanName.create(insert)); } - + for (Map.Entry entry : autocompleteTags) { + calls.add(insertTags.create(entry)); + } indexer.index(spansToIndex.build(), calls); if (calls.size() == 1) return calls.get(0).map(r -> null); return new StoreSpansCall(calls); @@ -106,6 +117,7 @@ public Call accept(List rawSpans) { void clear() { insertServiceName.cache.clear(); insertSpanName.cache.clear(); + insertTags.cache.clear(); indexer.clear(); } diff --git a/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/CassandraSpanStore.java b/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/CassandraSpanStore.java index 5f4d94dd871..05e0811c784 100644 --- a/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/CassandraSpanStore.java +++ b/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/CassandraSpanStore.java @@ -49,6 +49,7 @@ public final class CassandraSpanStore implements SpanStore { private final SelectDependencies.Factory dependencies; private final Call> serviceNames; private final SelectSpanNames.Factory spanNames; + private final SelectTraceIdTimestampFromServiceName.Factory selectTraceIdsByServiceName; private final SelectTraceIdTimestampFromServiceNames.Factory selectTraceIdsByServiceNames; private final SelectTraceIdTimestampFromServiceSpanName.Factory selectTraceIdsBySpanName; @@ -69,7 +70,6 @@ public final class CassandraSpanStore implements SpanStore { dependencies = new SelectDependencies.Factory(session); spanNames = new SelectSpanNames.Factory(session); serviceNames = new SelectServiceNames.Factory(session).create(); - selectTraceIdsByServiceName = new SelectTraceIdTimestampFromServiceName.Factory(session, timestampCodec, buckets); diff --git a/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/CassandraStorage.java b/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/CassandraStorage.java index afcba30528a..2cca68faef0 100644 --- a/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/CassandraStorage.java +++ b/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/CassandraStorage.java @@ -16,9 +16,13 @@ import com.datastax.driver.core.Session; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.google.common.cache.CacheBuilderSpec; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; import zipkin2.CheckResult; import zipkin2.internal.Nullable; +import zipkin2.storage.AutocompleteTags; import zipkin2.storage.QueryRequest; import zipkin2.storage.SpanConsumer; import zipkin2.storage.SpanStore; @@ -57,6 +61,7 @@ public static final class Builder extends StorageComponent.Builder { int indexCacheMax = 100000; int indexCacheTtl = 60; int indexFetchMultiplier = 3; + List autocompleteKeys = new ArrayList<>(); /** * Used to avoid hot spots when writing indexes used to query by service name or annotation. @@ -86,6 +91,12 @@ public Builder searchEnabled(boolean searchEnabled) { return this; } + @Override public Builder autocompleteKeys(List keys) { + if (keys == null) throw new NullPointerException("keys == null"); + this.autocompleteKeys = keys; + return this; + } + /** Override to control how sessions are created. */ public Builder sessionFactory(SessionFactory sessionFactory) { this.sessionFactory = checkNotNull(sessionFactory, "sessionFactory"); @@ -255,12 +266,14 @@ public CassandraStorage build() { final int indexFetchMultiplier; final boolean strictTraceId, searchEnabled; final LazySession session; + final List autocompleteKeys; /** close is typically called from a different thread */ volatile boolean closeCalled; volatile CassandraSpanConsumer spanConsumer; volatile CassandraSpanStore spanStore; + volatile CassandraAutocompleteTags tagStore; CassandraStorage(Builder b) { this.contactPoints = b.contactPoints; @@ -286,6 +299,7 @@ public CassandraStorage build() { this.indexCacheSpec = null; } this.indexFetchMultiplier = b.indexFetchMultiplier; + this.autocompleteKeys = b.autocompleteKeys; } /** Lazy initializes or returns the session in use by this storage component. */ @@ -306,6 +320,17 @@ public SpanStore spanStore() { return spanStore; } + @Override public AutocompleteTags autocompleteTags() { + if (tagStore == null) { + synchronized (this) { + if (tagStore == null) { + tagStore = new CassandraAutocompleteTags(this); + } + } + } + return tagStore; + } + /** {@inheritDoc} Memoized in order to avoid re-preparing statements */ @Override public SpanConsumer spanConsumer() { diff --git a/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/Indexer.java b/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/Indexer.java index 7c1908b86b0..08fed6648fe 100644 --- a/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/Indexer.java +++ b/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/Indexer.java @@ -118,7 +118,7 @@ public IndexCall clone() { } void index(List spans, List> calls) { - // First parse each span into partition keys used to support query requests + // First parse each span into partition getKeys used to support query requests Builder parsed = ImmutableSetMultimap.builder(); for (V1Span span : spans) { Long timestamp = guessTimestamp(span); diff --git a/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/InsertAutocompleteValue.java b/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/InsertAutocompleteValue.java new file mode 100644 index 00000000000..1870f8440a2 --- /dev/null +++ b/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/InsertAutocompleteValue.java @@ -0,0 +1,132 @@ +/* + * Copyright 2015-2018 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin2.storage.cassandra.v1; + +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.google.common.base.Ticker; +import com.google.common.cache.CacheBuilder; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import zipkin2.Call; +import zipkin2.Callback; +import zipkin2.storage.cassandra.internal.call.ResultSetFutureCall; + +import static zipkin2.storage.cassandra.v1.Tables.TABLE_AUTOCOMPLETE_TAGS; + +final class InsertAutocompleteValue extends ResultSetFutureCall { + + static class Factory { + final Session session; + final PreparedStatement preparedStatement; + final ConcurrentMap, InsertAutocompleteValue> cache; + + Factory(Session session, long ttl) { + this.session = session; + this.preparedStatement = + session.prepare( + QueryBuilder.insertInto(TABLE_AUTOCOMPLETE_TAGS) + .value("key", QueryBuilder.bindMarker("key")) + .value("value", QueryBuilder.bindMarker("value"))); + this.cache = + CacheBuilder.newBuilder() + .expireAfterWrite(ttl, TimeUnit.MILLISECONDS) + .ticker( + new Ticker() { + @Override + public long read() { + return nanoTime(); + } + }) + // TODO: maximum size or weight + ., InsertAutocompleteValue>build() + .asMap(); + } + + // visible for testing, since nanoTime is weird and can return negative + long nanoTime() { + return System.nanoTime(); + } + + Call create(Map.Entry input) { + if (input == null) throw new NullPointerException("input == null"); + if (cache.containsKey(input)) return Call.create(null); + InsertAutocompleteValue realCall = new InsertAutocompleteValue(this, input); + if (cache.putIfAbsent(input, realCall) != null) return Call.create(null); + return realCall; + } + } + + final Factory factory; + final Map.Entry input; + + InsertAutocompleteValue(Factory factory, Map.Entry input) { + this.factory = factory; + this.input = input; + } + + @Override + protected ResultSetFuture newFuture() { + return factory.session.executeAsync(factory.preparedStatement.bind() + .setString("key", input.getKey()) + .setString("value", input.getValue())); + } + + @Override + protected ResultSet doExecute() throws IOException { + try { + return super.doExecute(); + } catch (IOException | RuntimeException | Error e) { + factory.cache.remove(input, InsertAutocompleteValue.this); // invalidate + throw e; + } + } + + @Override + protected void doEnqueue(Callback callback) { + super.doEnqueue( + new Callback() { + @Override + public void onSuccess(ResultSet value) { + callback.onSuccess(value); + } + + @Override + public void onError(Throwable t) { + factory.cache.remove(input, InsertAutocompleteValue.this); // invalidate + callback.onError(t); + } + }); + } + + @Override public String toString() { + return input.toString().replace("Input", "InsertAutocompleteValue"); + } + + @Override + protected void doCancel() { + factory.cache.remove(input, InsertAutocompleteValue.this); // invalidate + super.doCancel(); + } + + @Override public Call clone() { + return new InsertAutocompleteValue(factory, input); + } +} diff --git a/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/SelectAutocompleteValues.java b/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/SelectAutocompleteValues.java new file mode 100644 index 00000000000..5df4e463b0c --- /dev/null +++ b/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/SelectAutocompleteValues.java @@ -0,0 +1,89 @@ +/* + * Copyright 2015-2018 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.storage.cassandra.v1; + +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import java.util.ArrayList; +import java.util.List; +import java.util.function.BiConsumer; +import java.util.function.Supplier; +import zipkin2.Call; +import zipkin2.storage.cassandra.internal.call.AccumulateAllResults; +import zipkin2.storage.cassandra.internal.call.ResultSetFutureCall; + +import static zipkin2.storage.cassandra.v1.Tables.TABLE_AUTOCOMPLETE_TAGS; + +final class SelectAutocompleteValues extends ResultSetFutureCall { + static class Factory { + final Session session; + final PreparedStatement preparedStatement; + final AccumulateAutocompleteValues accumulateAutocompleteValues; + + Factory(Session session) { + this.session = session; + this.preparedStatement = session.prepare( + QueryBuilder.select("value") + .from(TABLE_AUTOCOMPLETE_TAGS) + .where(QueryBuilder.eq("key", QueryBuilder.bindMarker("key"))) + .limit(QueryBuilder.bindMarker("limit_"))); + this.accumulateAutocompleteValues = new AccumulateAutocompleteValues(); + } + + Call> create(String key) { + return new SelectAutocompleteValues(this, key).flatMap(accumulateAutocompleteValues); + } + } + + final SelectAutocompleteValues.Factory factory; + final String key; + + SelectAutocompleteValues(SelectAutocompleteValues.Factory factory, String key) { + this.factory = factory; + this.key = key; + } + + @Override protected ResultSetFuture newFuture() { + return factory.session.executeAsync(factory.preparedStatement + .bind() + .setString("key", key) + .setInt("limit_", 1000)); // no one is ever going to browse so many tag values + } + + @Override public Call clone() { + return new SelectAutocompleteValues(factory, key); + } + + static class AccumulateAutocompleteValues extends AccumulateAllResults> { + @Override protected Supplier> supplier() { + return ArrayList::new; // list is ok because it is distinct results + } + + @Override protected BiConsumer> accumulator() { + return (row, list) -> { + String result = row.getString("value"); + if (!result.isEmpty()) list.add(result); + }; + } + + @Override + public String toString() { + return "AccumulateAutocompleteValues{}"; + } + } +} diff --git a/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/Tables.java b/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/Tables.java index cdab41fd457..0352e09dc65 100644 --- a/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/Tables.java +++ b/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/Tables.java @@ -15,6 +15,7 @@ import zipkin2.Endpoint; import zipkin2.Span; +import zipkin2.storage.AutocompleteTags; import zipkin2.storage.QueryRequest; import zipkin2.storage.SpanStore; @@ -75,11 +76,14 @@ final class Tables { * annotations. Also, binary annotation values longer than 256 characters are not indexed. * *

Lookups are by equals (not partial match), so it is expected that {@link zipkin2.Annotation} - * and {@link Span#tags() tag} keys and values will be low or bounded cardinality. To avoid hot + * and {@link Span#tags() tag} getKeys and values will be low or bounded cardinality. To avoid hot * partitions, the partition key is the annotation field with a bucket (random number between 0 * and 9). */ static final String ANNOTATIONS_INDEX = "annotations_index"; + /** This table supports {@link AutocompleteTags#getValues(String key)}. */ + static final String TABLE_AUTOCOMPLETE_TAGS = "autocomplete_tags"; + private Tables() {} } diff --git a/zipkin-storage/cassandra-v1/src/main/resources/cassandra-schema-cql3.txt b/zipkin-storage/cassandra-v1/src/main/resources/cassandra-schema-cql3.txt index 56b360a7740..4119f36f0b5 100644 --- a/zipkin-storage/cassandra-v1/src/main/resources/cassandra-schema-cql3.txt +++ b/zipkin-storage/cassandra-v1/src/main/resources/cassandra-schema-cql3.txt @@ -65,3 +65,11 @@ CREATE TABLE IF NOT EXISTS zipkin.traces ( ) WITH compaction = {'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy', 'max_window_size_seconds': '86400'} AND default_time_to_live = 604800; + +CREATE TABLE IF NOT EXISTS zipkin.autocomplete_tags ( + key text, + value text, + PRIMARY KEY (key, value) +) + WITH compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'unchecked_tombstone_compaction': 'true', 'tombstone_threshold': '0.2'} + AND default_time_to_live = 259200 diff --git a/zipkin-storage/cassandra-v1/src/test/java/zipkin2/storage/cassandra/v1/ITCassandraStorage.java b/zipkin-storage/cassandra-v1/src/test/java/zipkin2/storage/cassandra/v1/ITCassandraStorage.java index dc796b4c46a..580a813a31d 100644 --- a/zipkin-storage/cassandra-v1/src/test/java/zipkin2/storage/cassandra/v1/ITCassandraStorage.java +++ b/zipkin-storage/cassandra-v1/src/test/java/zipkin2/storage/cassandra/v1/ITCassandraStorage.java @@ -172,6 +172,19 @@ public static class ITStrictTraceIdFalse extends zipkin2.storage.ITStrictTraceId } } + public static class ITAutocompleteTags extends zipkin2.storage.ITAutocompleteTags { + @ClassRule public static CassandraStorageRule backend = classRule(); + @Rule public TestName testName = new TestName(); + + @Override protected StorageComponent.Builder storageBuilder() { + return backend.computeStorageBuilder().keyspace(keyspace(testName)); + } + + @Before @Override public void clear() { + dropKeyspace(backend.session(), keyspace(testName)); + } + } + public static class ITDependencies extends zipkin2.storage.ITDependencies { @ClassRule public static CassandraStorageRule backend = classRule(); @Rule public TestName testName = new TestName(); diff --git a/zipkin-storage/cassandra-v1/src/test/java/zipkin2/storage/cassandra/v1/ITSpanConsumer.java b/zipkin-storage/cassandra-v1/src/test/java/zipkin2/storage/cassandra/v1/ITSpanConsumer.java index 03f1ee49e59..de559148049 100644 --- a/zipkin-storage/cassandra-v1/src/test/java/zipkin2/storage/cassandra/v1/ITSpanConsumer.java +++ b/zipkin-storage/cassandra-v1/src/test/java/zipkin2/storage/cassandra/v1/ITSpanConsumer.java @@ -30,7 +30,7 @@ abstract class ITSpanConsumer { @Before public void connect() { - storage = storageBuilder().keyspace(keyspace()).build(); + storage = storageBuilder().autocompleteKeys(asList("environment")).keyspace(keyspace()).build(); } abstract CassandraStorage.Builder storageBuilder(); @@ -101,4 +101,36 @@ long rowCount(String table) { .one() .getLong(0); } + + static String getTagValue(CassandraStorage storage, String key) { + return storage + .session() + .execute("SELECT value from " + Tables.TABLE_AUTOCOMPLETE_TAGS + " WHERE key='environment'") + .one() + .getString(0); + } + + @Test + public void insertTags_SelectTags_CalculateCount() throws IOException { + Span[] trace = new Span[2]; + trace[0] = TestObjects.CLIENT_SPAN; + + trace[1] = + Span.newBuilder() + .traceId(trace[0].traceId()) + .parentId(trace[0].id()) + .id(1) + .name("1") + .putTag("environment", "dev") + .putTag("a", "b") + .timestamp(trace[0].timestamp() * 1000) // child span timestamps happen 1 ms later + .addAnnotation(trace[0].annotations().get(0).timestamp() + 1000, "bar") + .build(); + accept(storage.spanConsumer(), trace); + + assertThat(rowCount(Tables.TABLE_AUTOCOMPLETE_TAGS)) + .isGreaterThanOrEqualTo(1L); + + assertThat(getTagValue(storage, "environment")).isEqualTo("dev"); + } } diff --git a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraAutocompleteTags.java b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraAutocompleteTags.java new file mode 100644 index 00000000000..0b333a0944f --- /dev/null +++ b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraAutocompleteTags.java @@ -0,0 +1,42 @@ +/* + * Copyright 2015-2018 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.storage.cassandra; + +import java.util.List; +import zipkin2.Call; +import zipkin2.storage.AutocompleteTags; + +class CassandraAutocompleteTags implements AutocompleteTags { + final boolean enabled; + final Call> keysCall; + final SelectAutocompleteValues.Factory valuesCallFactory; + + CassandraAutocompleteTags(CassandraStorage storage) { + enabled = storage.searchEnabled() && !storage.autocompleteKeys().isEmpty(); + keysCall = Call.create(storage.autocompleteKeys()); + valuesCallFactory = enabled ? new SelectAutocompleteValues.Factory(storage.session()) : null; + } + + @Override public Call> getKeys() { + if (!enabled) return Call.emptyList(); + return keysCall.clone(); + } + + @Override public Call> getValues(String key) { + if (key == null) throw new NullPointerException("key == null"); + if (key.isEmpty()) throw new IllegalArgumentException("key was empty"); + if (!enabled) return Call.emptyList(); + return valuesCallFactory.create(key); + } +} diff --git a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraSpanConsumer.java b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraSpanConsumer.java index 620452cf2e5..38d601372bc 100644 --- a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraSpanConsumer.java +++ b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraSpanConsumer.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import zipkin2.Annotation; @@ -33,19 +34,20 @@ class CassandraSpanConsumer implements SpanConsumer { // not final for testing private static final long WRITTEN_NAMES_TTL = - Long.getLong("zipkin2.storage.cassandra.internal.writtenNamesTtl", 60 * 60 * 1000); - + Long.getLong("zipkin2.storage.cassandra.internal.writtenNamesTtl", 60 * 60 * 1000); private final Session session; private final boolean strictTraceId, searchEnabled; private final InsertSpan.Factory insertSpan; @Nullable final InsertTraceByServiceSpan.Factory insertTraceByServiceSpan; @Nullable private final InsertServiceSpan.Factory insertServiceSpanName; + @Nullable final InsertAutocompleteValue.Factory insertTags; + final Set autocompleteKeys; CassandraSpanConsumer(CassandraStorage storage) { session = storage.session(); strictTraceId = storage.strictTraceId(); searchEnabled = storage.searchEnabled(); - + autocompleteKeys = new LinkedHashSet<>(storage.autocompleteKeys()); // warns when schema problems exist Schema.readMetadata(session); @@ -53,9 +55,11 @@ class CassandraSpanConsumer implements SpanConsumer { // not final for testing if (searchEnabled) { insertTraceByServiceSpan = new InsertTraceByServiceSpan.Factory(session, strictTraceId); insertServiceSpanName = new InsertServiceSpan.Factory(session, WRITTEN_NAMES_TTL); + insertTags = new InsertAutocompleteValue.Factory(session, WRITTEN_NAMES_TTL); } else { insertTraceByServiceSpan = null; insertServiceSpanName = null; + insertTags = null; } } @@ -70,6 +74,7 @@ public Call accept(List input) { Set spans = new LinkedHashSet<>(); Set serviceSpans = new LinkedHashSet<>(); Set traceByServiceSpans = new LinkedHashSet<>(); + Set> autocompleteTags = new LinkedHashSet<>(); for (Span s : input) { // indexing occurs by timestamp, so derive one if not present. @@ -78,10 +83,10 @@ public Call accept(List input) { // fallback to current time on the ts_uuid for span data, so we know when it was inserted UUID ts_uuid = - new UUID( - UUIDs.startOf(ts_micro != 0L ? (ts_micro / 1000L) : System.currentTimeMillis()) - .getMostSignificantBits(), - UUIDs.random().getLeastSignificantBits()); + new UUID( + UUIDs.startOf(ts_micro != 0L ? (ts_micro / 1000L) : System.currentTimeMillis()) + .getMostSignificantBits(), + UUIDs.random().getLeastSignificantBits()); spans.add(insertSpan.newInput(s, ts_uuid)); @@ -90,7 +95,7 @@ public Call accept(List input) { // Empty values allow for api queries with blank service or span name String service = s.localServiceName() != null ? s.localServiceName() : ""; String span = - null != s.name() ? s.name() : ""; // Empty value allows for api queries without span name + null != s.name() ? s.name() : ""; // Empty value allows for api queries without span name // service span index is refreshed regardless of timestamp if (null != s.remoteServiceName()) { // allows getServices to return remote service names @@ -105,10 +110,13 @@ public Call accept(List input) { int bucket = durationIndexBucket(ts_micro); // duration index is milliseconds not microseconds long duration = s.durationAsLong() / 1000L; traceByServiceSpans.add( - insertTraceByServiceSpan.newInput(service, span, bucket, ts_uuid, s.traceId(), duration)); + insertTraceByServiceSpan.newInput(service, span, bucket, ts_uuid, s.traceId(), duration)); if (span.isEmpty()) continue; traceByServiceSpans.add( // Allows lookup without the span name - insertTraceByServiceSpan.newInput(service, "", bucket, ts_uuid, s.traceId(), duration)); + insertTraceByServiceSpan.newInput(service, "", bucket, ts_uuid, s.traceId(), duration)); + for (Map.Entry entry : s.tags().entrySet()) { + if (autocompleteKeys.contains(entry.getKey())) autocompleteTags.add(entry); + } } List> calls = new ArrayList<>(); for (InsertSpan.Input span : spans) { @@ -122,13 +130,16 @@ public Call accept(List input) { calls.add(insertTraceByServiceSpan.create(serviceSpan)); } } + for (Map.Entry entry : autocompleteTags) { + calls.add(insertTags.create(entry)); + } if (calls.size() == 1) return calls.get(0).map(r -> null); return new StoreSpansCall(calls); } static long guessTimestamp(Span span) { Preconditions.checkState( - 0L == span.timestampAsLong(), "method only for when span has no timestamp"); + 0L == span.timestampAsLong(), "method only for when span has no timestamp"); for (Annotation annotation : span.annotations()) { if (0L < annotation.timestamp()) { return annotation.timestamp(); diff --git a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraSpanStore.java b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraSpanStore.java index ef1623071e0..e23dde2c2da 100644 --- a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraSpanStore.java +++ b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraSpanStore.java @@ -62,7 +62,6 @@ class CassandraSpanStore implements SpanStore { // not final for testing if (searchEnabled) { KeyspaceMetadata md = Schema.ensureKeyspaceMetadata(session, storage.keyspace()); indexTtl = md.getTable(TABLE_TRACE_BY_SERVICE_SPAN).getOptions().getDefaultTimeToLive(); - spanNames = new SelectSpanNames.Factory(session); serviceNames = new SelectServiceNames.Factory(session).create(); traceIdsFromServiceSpan = new SelectTraceIdsFromServiceSpan.Factory(session); diff --git a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraStorage.java b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraStorage.java index 1fcc3ff276b..64ff34ccfc4 100644 --- a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraStorage.java +++ b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraStorage.java @@ -19,8 +19,11 @@ import com.datastax.driver.core.querybuilder.QueryBuilder; import com.google.auto.value.AutoValue; import com.google.auto.value.extension.memoized.Memoized; +import java.util.ArrayList; +import java.util.List; import zipkin2.CheckResult; import zipkin2.internal.Nullable; +import zipkin2.storage.AutocompleteTags; import zipkin2.storage.QueryRequest; import zipkin2.storage.SpanConsumer; import zipkin2.storage.SpanStore; @@ -49,7 +52,7 @@ public interface SessionFactory { } public static Builder newBuilder() { - return new AutoValue_CassandraStorage.Builder() + return new $AutoValue_CassandraStorage.Builder() .strictTraceId(true) .searchEnabled(true) .keyspace(Schema.DEFAULT_KEYSPACE) @@ -60,7 +63,8 @@ public static Builder newBuilder() { .useSsl(false) .maxTraceCols(100000) .indexFetchMultiplier(3) - .sessionFactory(SessionFactory.DEFAULT); + .sessionFactory(SessionFactory.DEFAULT) + .autocompleteKeys(new ArrayList<>()); } @AutoValue.Builder @@ -73,6 +77,10 @@ public abstract static class Builder extends StorageComponent.Builder { @Override public abstract Builder searchEnabled(boolean searchEnabled); + /** {@inheritDoc} */ + @Override + public abstract Builder autocompleteKeys(List autocompleteKeys); + /** Override to control how sessions are created. */ public abstract Builder sessionFactory(SessionFactory sessionFactory); @@ -168,6 +176,8 @@ public final Builder maxConnections(int maxConnections) { abstract boolean searchEnabled(); + abstract List autocompleteKeys(); + abstract SessionFactory sessionFactory(); /** session and close are typically called from different threads */ @@ -188,6 +198,13 @@ public SpanStore spanStore() { return new CassandraSpanStore(this); } + /** {@inheritDoc} Memoized in order to avoid re-preparing statements */ + @Memoized + @Override + public AutocompleteTags autocompleteTags() { + return new CassandraAutocompleteTags(this); + } + /** {@inheritDoc} Memoized in order to avoid re-preparing statements */ @Memoized @Override diff --git a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraUtil.java b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraUtil.java index fa576f6e528..3a3fca2b74b 100644 --- a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraUtil.java +++ b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraUtil.java @@ -49,12 +49,12 @@ final class CassandraUtil { Long.getLong("zipkin.store.cassandra.internal.durationIndexBucket", 24 * 60 * 60); public static int durationIndexBucket(long ts_micro) { - // if the window constant has microsecond precision, the division produces negative values + // if the window constant has microsecond precision, the division produces negative getValues return (int) (ts_micro / (DURATION_INDEX_BUCKET_WINDOW_SECONDS * 1_000_000)); } /** - * Returns a set of annotation values and tags joined on equals, delimited by ░ + * Returns a set of annotation getValues and tags joined on equals, delimited by ░ * * @see QueryRequest#annotationQuery() */ diff --git a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/InsertAutocompleteValue.java b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/InsertAutocompleteValue.java new file mode 100644 index 00000000000..25dfd76020a --- /dev/null +++ b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/InsertAutocompleteValue.java @@ -0,0 +1,132 @@ +/* + * Copyright 2015-2018 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin2.storage.cassandra; + +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.google.common.base.Ticker; +import com.google.common.cache.CacheBuilder; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import zipkin2.Call; +import zipkin2.Callback; +import zipkin2.storage.cassandra.internal.call.ResultSetFutureCall; + +import static zipkin2.storage.cassandra.Schema.TABLE_AUTOCOMPLETE_TAGS; + +final class InsertAutocompleteValue extends ResultSetFutureCall { + + static class Factory { + final Session session; + final PreparedStatement preparedStatement; + final ConcurrentMap, InsertAutocompleteValue> cache; + + Factory(Session session, long ttl) { + this.session = session; + this.preparedStatement = + session.prepare( + QueryBuilder.insertInto(TABLE_AUTOCOMPLETE_TAGS) + .value("key", QueryBuilder.bindMarker("key")) + .value("value", QueryBuilder.bindMarker("value"))); + this.cache = + CacheBuilder.newBuilder() + .expireAfterWrite(ttl, TimeUnit.MILLISECONDS) + .ticker( + new Ticker() { + @Override + public long read() { + return nanoTime(); + } + }) + // TODO: maximum size or weight + ., InsertAutocompleteValue>build() + .asMap(); + } + + // visible for testing, since nanoTime is weird and can return negative + long nanoTime() { + return System.nanoTime(); + } + + Call create(Map.Entry input) { + if (input == null) throw new NullPointerException("input == null"); + if (cache.containsKey(input)) return Call.create(null); + InsertAutocompleteValue realCall = new InsertAutocompleteValue(this, input); + if (cache.putIfAbsent(input, realCall) != null) return Call.create(null); + return realCall; + } + } + + final Factory factory; + final Map.Entry input; + + InsertAutocompleteValue(Factory factory, Map.Entry input) { + this.factory = factory; + this.input = input; + } + + @Override + protected ResultSetFuture newFuture() { + return factory.session.executeAsync(factory.preparedStatement.bind() + .setString("key", input.getKey()) + .setString("value", input.getValue())); + } + + @Override + protected ResultSet doExecute() throws IOException { + try { + return super.doExecute(); + } catch (IOException | RuntimeException | Error e) { + factory.cache.remove(input, InsertAutocompleteValue.this); // invalidate + throw e; + } + } + + @Override + protected void doEnqueue(Callback callback) { + super.doEnqueue( + new Callback() { + @Override + public void onSuccess(ResultSet value) { + callback.onSuccess(value); + } + + @Override + public void onError(Throwable t) { + factory.cache.remove(input, InsertAutocompleteValue.this); // invalidate + callback.onError(t); + } + }); + } + + @Override public String toString() { + return input.toString().replace("Input", "InsertAutocompleteValue"); + } + + @Override + protected void doCancel() { + factory.cache.remove(input, InsertAutocompleteValue.this); // invalidate + super.doCancel(); + } + + @Override public Call clone() { + return new InsertAutocompleteValue(factory, input); + } +} diff --git a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/Schema.java b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/Schema.java index 8ae2bdbf1c6..79f677fd340 100644 --- a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/Schema.java +++ b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/Schema.java @@ -49,6 +49,7 @@ final class Schema { static final String TABLE_TRACE_BY_SERVICE_SPAN = "trace_by_service_span"; static final String TABLE_SERVICE_SPANS = "span_by_service"; static final String TABLE_DEPENDENCY = "dependency"; + static final String TABLE_AUTOCOMPLETE_TAGS = "autocomplete_tags"; static final String DEFAULT_KEYSPACE = "zipkin2"; private static final String SCHEMA_RESOURCE = "/zipkin2-schema.cql"; diff --git a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/SelectAutocompleteValues.java b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/SelectAutocompleteValues.java new file mode 100644 index 00000000000..d6a5b913d6f --- /dev/null +++ b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/SelectAutocompleteValues.java @@ -0,0 +1,90 @@ +/* + * Copyright 2015-2018 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin2.storage.cassandra; + +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import java.util.ArrayList; +import java.util.List; +import java.util.function.BiConsumer; +import java.util.function.Supplier; +import zipkin2.Call; +import zipkin2.storage.cassandra.internal.call.AccumulateAllResults; +import zipkin2.storage.cassandra.internal.call.ResultSetFutureCall; + +import static zipkin2.storage.cassandra.Schema.TABLE_AUTOCOMPLETE_TAGS; + +final class SelectAutocompleteValues extends ResultSetFutureCall { + static class Factory { + final Session session; + final PreparedStatement preparedStatement; + final AccumulateAutocompleteValues accumulateAutocompleteValues; + + Factory(Session session) { + this.session = session; + this.preparedStatement = session.prepare( + QueryBuilder.select("value") + .from(TABLE_AUTOCOMPLETE_TAGS) + .where(QueryBuilder.eq("key", QueryBuilder.bindMarker("key"))) + .limit(QueryBuilder.bindMarker("limit_"))); + this.accumulateAutocompleteValues = new AccumulateAutocompleteValues(); + } + + Call> create(String key) { + return new SelectAutocompleteValues(this, key).flatMap(accumulateAutocompleteValues); + } + } + + final SelectAutocompleteValues.Factory factory; + final String key; + + SelectAutocompleteValues(SelectAutocompleteValues.Factory factory, String key) { + this.factory = factory; + this.key = key; + } + + @Override protected ResultSetFuture newFuture() { + return factory.session.executeAsync(factory.preparedStatement + .bind() + .setString("key", key) + .setInt("limit_", 1000)); // no one is ever going to browse so many tag values + } + + @Override public Call clone() { + return new SelectAutocompleteValues(factory, key); + } + + static class AccumulateAutocompleteValues extends AccumulateAllResults> { + @Override protected Supplier> supplier() { + return ArrayList::new; // list is ok because it is distinct results + } + + @Override protected BiConsumer> accumulator() { + return (row, list) -> { + String result = row.getString("value"); + if (!result.isEmpty()) list.add(result); + }; + } + + @Override + public String toString() { + return "AccumulateAutocompleteValues{}"; + } + } +} diff --git a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/SelectFromSpan.java b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/SelectFromSpan.java index c94c683661c..73fce39731e 100644 --- a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/SelectFromSpan.java +++ b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/SelectFromSpan.java @@ -71,7 +71,7 @@ static class Factory { "shared", "debug") .from(TABLE_SPAN) - // when reading on the partition key, clustering keys are optional + // when reading on the partition key, clustering getKeys are optional .where(QueryBuilder.in("trace_id", QueryBuilder.bindMarker("trace_id"))) .limit(QueryBuilder.bindMarker("limit_"))); this.strictTraceId = strictTraceId; diff --git a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/SelectSpanNames.java b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/SelectSpanNames.java index 5a4507b6474..75fc6c05690 100644 --- a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/SelectSpanNames.java +++ b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/SelectSpanNames.java @@ -63,12 +63,10 @@ Call> create(String serviceName) { @Override protected ResultSetFuture newFuture() { - return factory.session.executeAsync( - factory - .preparedStatement - .bind() - .setString("service", service) - .setInt("limit_", 1000)); // no one is ever going to browse so many span names + return factory.session.executeAsync(factory.preparedStatement + .bind() + .setString("service", service) + .setInt("limit_", 1000)); // no one is ever going to browse so many span names } @Override diff --git a/zipkin-storage/cassandra/src/main/resources/zipkin2-schema-indexes.cql b/zipkin-storage/cassandra/src/main/resources/zipkin2-schema-indexes.cql index 1d31d56314e..640a2886261 100644 --- a/zipkin-storage/cassandra/src/main/resources/zipkin2-schema-indexes.cql +++ b/zipkin-storage/cassandra/src/main/resources/zipkin2-schema-indexes.cql @@ -47,3 +47,17 @@ CREATE CUSTOM INDEX IF NOT EXISTS ON zipkin2.span (l_service) USING 'org.apache. CREATE CUSTOM INDEX IF NOT EXISTS ON zipkin2.trace_by_service_span (duration) USING 'org.apache.cassandra.index.sasi.SASIIndex' WITH OPTIONS = {'mode': 'PREFIX'}; + +CREATE TABLE IF NOT EXISTS zipkin2.autocomplete_tags ( + key text, + value text, + PRIMARY KEY (key, value) +) + WITH compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'unchecked_tombstone_compaction': 'true', 'tombstone_threshold': '0.2'} + AND caching = {'rows_per_partition': 'ALL'} + AND default_time_to_live = 259200 + AND gc_grace_seconds = 3600 + AND read_repair_chance = 0 + AND dclocal_read_repair_chance = 0 + AND speculative_retry = '95percentile' + AND comment = 'Secondary table for looking up tag key and values for a service'; diff --git a/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/ITCassandraStorage.java b/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/ITCassandraStorage.java index 072565f52ef..f9c8f21d099 100644 --- a/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/ITCassandraStorage.java +++ b/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/ITCassandraStorage.java @@ -203,6 +203,19 @@ public static class ITStrictTraceIdFalse extends zipkin2.storage.ITStrictTraceId } } + public static class ITAutocompleteTags extends zipkin2.storage.ITAutocompleteTags { + @ClassRule public static CassandraStorageRule backend = classRule(); + @Rule public TestName testName = new TestName(); + + @Override protected StorageComponent.Builder storageBuilder() { + return backend.computeStorageBuilder().keyspace(keyspace(testName)); + } + + @Before @Override public void clear() { + dropKeyspace(backend.session(), keyspace(testName)); + } + } + public static class ITDependencies extends zipkin2.storage.ITDependencies { @ClassRule public static CassandraStorageRule backend = classRule(); @Rule public TestName testName = new TestName(); diff --git a/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/ITEnsureSchema.java b/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/ITEnsureSchema.java index b9708bf2f36..658de6717bb 100644 --- a/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/ITEnsureSchema.java +++ b/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/ITEnsureSchema.java @@ -50,5 +50,6 @@ abstract class ITEnsureSchema { KeyspaceMetadata metadata = session().getCluster().getMetadata().getKeyspace(keyspace()); assertThat(metadata.getTable("trace_by_service_span")).isNotNull(); + assertThat(metadata.getTable("autocomplete_tags")).isNotNull(); } } diff --git a/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/ITSpanConsumer.java b/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/ITSpanConsumer.java index fdca472cf24..85829cf19d8 100644 --- a/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/ITSpanConsumer.java +++ b/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/ITSpanConsumer.java @@ -31,7 +31,7 @@ abstract class ITSpanConsumer { private CassandraStorage storage; @Before public void connect() { - storage = storageBuilder().keyspace(keyspace()).build(); + storage = storageBuilder().autocompleteKeys(asList("environment")).keyspace(keyspace()).build(); } abstract CassandraStorage.Builder storageBuilder(); @@ -82,6 +82,35 @@ abstract class ITSpanConsumer { .isGreaterThanOrEqualTo(120L); // TODO: magic number assertThat(rowCountForTraceByServiceSpan(storage)) .isGreaterThanOrEqualTo(120L); + + } + + @Test + public void insertTags_SelectTags_CalculateCount() throws IOException { + Span[] trace = new Span[101]; + trace[0] = TestObjects.CLIENT_SPAN.toBuilder().kind(Span.Kind.SERVER).build(); + + IntStream.range(0, 100).forEach(i -> trace[i + 1] = Span.newBuilder() + .traceId(trace[0].traceId()) + .parentId(trace[0].id()) + .id(Long.toHexString(i)) + .name("get") + .kind(Span.Kind.CLIENT) + .localEndpoint(FRONTEND) + .putTag("environment", "dev") + .putTag("a", "b") + .timestamp( + trace[0].timestamp() + i * 1000) // all peer span timestamps happen a millisecond later + .duration(10L) + .build()); + + accept(storage.spanConsumer(), trace); + + assertThat(rowCountForTags(storage)) + .isEqualTo(1L); // Since tag {a,b} are not in the whitelist + + assertThat(getTagValue(storage, "environment")).isEqualTo("dev"); + } void accept(SpanConsumer consumer, Span... spans) throws IOException { @@ -95,4 +124,19 @@ static long rowCountForTraceByServiceSpan(CassandraStorage storage) { .one() .getLong(0); } + + static long rowCountForTags(CassandraStorage storage) { + return storage + .session() + .execute("SELECT COUNT(*) from " + Schema.TABLE_AUTOCOMPLETE_TAGS) + .one() + .getLong(0); + } + static String getTagValue(CassandraStorage storage, String key) { + return storage + .session() + .execute("SELECT value from " + Schema.TABLE_AUTOCOMPLETE_TAGS + " WHERE key='environment'") + .one() + .getString(0); + } }