From 9c38e310c37ac669471cdbc236672a6646061847 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Mon, 15 Feb 2016 18:32:18 +0800 Subject: [PATCH] Decouples InMemorySpanStore and tests from zipkin-server This refactors InMemorySpanStore to not use Java 8 features. Now, it is inside the core jar vs being stuck in the zipkin-server package. The first consumer of this will be MockZipkinServer (#64) --- interop/pom.xml | 2 +- .../InMemoryScalaDependencyStoreTest.java | 2 +- .../InMemoryScalaSpanStoreTest.java | 2 +- pom.xml | 7 + .../java/zipkin/server/InMemorySpanStore.java | 241 --------------- .../server/ZipkinServerConfiguration.java | 1 + .../server/brave/TraceWritesSpanStore.java | 3 +- .../zipkin/server/ZipkinSpanWriterTest.java | 1 + .../brave/SpanStoreSpanCollectorTest.java | 2 +- zipkin-spanstores/jdbc/pom.xml | 13 + .../main/java/zipkin/jdbc/JDBCSpanStore.java | 5 +- .../zipkin/jdbc/JDBCDependenciesTest.java | 0 .../java/zipkin/jdbc/JDBCSpanStoreTest.java | 0 .../test/java/zipkin/jdbc/JDBCTestGraph.java | 50 ++++ zipkin/pom.xml | 11 + .../main/java/zipkin/InMemorySpanStore.java | 275 ++++++++++++++++++ zipkin/src/main/java/zipkin/SpanStore.java | 3 +- .../test/java/zipkin/DependenciesTest.java | 0 .../zipkin}/InMemoryDependenciesTest.java | 4 +- .../java/zipkin}/InMemorySpanStoreTest.java | 4 +- .../src/test/java/zipkin/SpanStoreTest.java | 0 21 files changed, 371 insertions(+), 255 deletions(-) rename interop/src/test/java/zipkin/{server => }/InMemoryScalaDependencyStoreTest.java (98%) rename interop/src/test/java/zipkin/{server => }/InMemoryScalaSpanStoreTest.java (98%) delete mode 100644 zipkin-server/src/main/java/zipkin/server/InMemorySpanStore.java rename {interop => zipkin-spanstores/jdbc}/src/test/java/zipkin/jdbc/JDBCDependenciesTest.java (100%) rename {interop => zipkin-spanstores/jdbc}/src/test/java/zipkin/jdbc/JDBCSpanStoreTest.java (100%) create mode 100644 zipkin-spanstores/jdbc/src/test/java/zipkin/jdbc/JDBCTestGraph.java create mode 100644 zipkin/src/main/java/zipkin/InMemorySpanStore.java rename {interop => zipkin}/src/test/java/zipkin/DependenciesTest.java (100%) rename {interop/src/test/java/zipkin/server => zipkin/src/test/java/zipkin}/InMemoryDependenciesTest.java (92%) rename {interop/src/test/java/zipkin/server => zipkin/src/test/java/zipkin}/InMemorySpanStoreTest.java (93%) rename {interop => zipkin}/src/test/java/zipkin/SpanStoreTest.java (100%) diff --git a/interop/pom.xml b/interop/pom.xml index cbb52a919e5..702649d3d16 100644 --- a/interop/pom.xml +++ b/interop/pom.xml @@ -41,7 +41,7 @@ ${project.groupId} - zipkin-server + spanstore-jdbc diff --git a/interop/src/test/java/zipkin/server/InMemoryScalaDependencyStoreTest.java b/interop/src/test/java/zipkin/InMemoryScalaDependencyStoreTest.java similarity index 98% rename from interop/src/test/java/zipkin/server/InMemoryScalaDependencyStoreTest.java rename to interop/src/test/java/zipkin/InMemoryScalaDependencyStoreTest.java index d48a038596a..1afc9f05812 100644 --- a/interop/src/test/java/zipkin/server/InMemoryScalaDependencyStoreTest.java +++ b/interop/src/test/java/zipkin/InMemoryScalaDependencyStoreTest.java @@ -11,7 +11,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package zipkin.server; +package zipkin; import com.twitter.zipkin.common.Span; import com.twitter.zipkin.storage.DependencyStore; diff --git a/interop/src/test/java/zipkin/server/InMemoryScalaSpanStoreTest.java b/interop/src/test/java/zipkin/InMemoryScalaSpanStoreTest.java similarity index 98% rename from interop/src/test/java/zipkin/server/InMemoryScalaSpanStoreTest.java rename to interop/src/test/java/zipkin/InMemoryScalaSpanStoreTest.java index 4ba32fc3eba..9f30beeed10 100644 --- a/interop/src/test/java/zipkin/server/InMemoryScalaSpanStoreTest.java +++ b/interop/src/test/java/zipkin/InMemoryScalaSpanStoreTest.java @@ -11,7 +11,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package zipkin.server; +package zipkin; import com.twitter.zipkin.storage.SpanStore; import com.twitter.zipkin.storage.SpanStoreSpec; diff --git a/pom.xml b/pom.xml index 77dd3166a6c..8d01cf3cb9e 100644 --- a/pom.xml +++ b/pom.xml @@ -118,6 +118,13 @@ ${project.version} + + ${project.groupId} + zipkin + test-jar + ${project.version} + + ${project.groupId} spanstore-jdbc diff --git a/zipkin-server/src/main/java/zipkin/server/InMemorySpanStore.java b/zipkin-server/src/main/java/zipkin/server/InMemorySpanStore.java deleted file mode 100644 index b166c3bebd7..00000000000 --- a/zipkin-server/src/main/java/zipkin/server/InMemorySpanStore.java +++ /dev/null @@ -1,241 +0,0 @@ -/** - * Copyright 2015-2016 The OpenZipkin Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package zipkin.server; - -import java.nio.charset.Charset; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.function.Predicate; -import java.util.function.Supplier; -import java.util.stream.Stream; -import zipkin.Annotation; -import zipkin.BinaryAnnotation; -import zipkin.Constants; -import zipkin.DependencyLink; -import zipkin.QueryRequest; -import zipkin.Span; -import zipkin.SpanStore; -import zipkin.internal.ApplyTimestampAndDuration; -import zipkin.internal.CorrectForClockSkew; -import zipkin.internal.DependencyLinkSpan; -import zipkin.internal.DependencyLinker; -import zipkin.internal.MergeById; -import zipkin.internal.Nullable; - -import static java.util.stream.Collectors.toList; -import static zipkin.internal.Util.sortedList; - -// TODO: make this not use Java 8, so we can put it into the core jar. -public final class InMemorySpanStore implements SpanStore { - private static final Charset UTF_8 = Charset.forName("UTF-8"); - - private final Multimap traceIdToSpans = new Multimap<>(LinkedList::new); - private final Multimap serviceToTraceIds = new Multimap<>(LinkedHashSet::new); - private final Multimap serviceToSpanNames = new Multimap<>(LinkedHashSet::new); - - @Override - public synchronized void accept(Iterator spans) { - while (spans.hasNext()) { - Span span = ApplyTimestampAndDuration.apply(spans.next()); - long traceId = span.traceId; - String spanName = span.name; - traceIdToSpans.put(span.traceId, span); - Stream.concat(span.annotations.stream().map(a -> a.endpoint), - span.binaryAnnotations.stream().map(a -> a.endpoint)) - .filter(e -> e != null && !e.serviceName.isEmpty()) - .map(e -> e.serviceName) - .distinct() - .forEach(serviceName -> { - serviceToTraceIds.put(serviceName, traceId); - serviceToSpanNames.put(serviceName, spanName); - }); - } - } - - synchronized void clear() { - traceIdToSpans.clear(); - serviceToTraceIds.clear(); - } - - @Override - public synchronized List> getTraces(QueryRequest request) { - Collection traceIds = serviceToTraceIds.get(request.serviceName); - if (traceIds == null || traceIds.isEmpty()) return Collections.emptyList(); - - return toSortedTraces(traceIds.stream().map(traceIdToSpans::get)).stream() - .filter(spansPredicate(request)) - .limit(request.limit).collect(toList()); - } - - @Override - public synchronized List> getTracesByIds(List traceIds) { - if (traceIds.isEmpty()) return Collections.emptyList(); - return toSortedTraces(traceIds.stream().map(traceIdToSpans::get)); - } - - @Override - public synchronized List getServiceNames() { - return sortedList(serviceToTraceIds.keySet()); - } - - @Override - public synchronized List getSpanNames(String service) { - if (service == null) return Collections.emptyList(); - service = service.toLowerCase(); // service names are always lowercase! - return sortedList(serviceToSpanNames.get(service)); - } - - @Override - public List getDependencies(long endTs, @Nullable Long lookback) { - endTs *= 1000; - if (lookback == null) { - lookback = endTs; - } else { - lookback *= 1000; - } - - DependencyLinker linksBuilder = new DependencyLinker(); - - for (Collection trace : traceIdToSpans.delegate.values()) { - if (trace.isEmpty()) continue; - - List linkSpans = new LinkedList<>(); - for (Span s : trace) { - Long timestamp = s.timestamp; - if (timestamp == null || - timestamp < (endTs - lookback) || - timestamp > endTs) { - continue; - } - DependencyLinkSpan.Builder linkSpan = new DependencyLinkSpan.Builder(s.parentId, s.id); - for (BinaryAnnotation a : s.binaryAnnotations) { - if (a.key.equals(Constants.CLIENT_ADDR) && a.endpoint != null) { - linkSpan.caService(a.endpoint.serviceName); - } else if (a.key.equals(Constants.SERVER_ADDR) && a.endpoint != null) { - linkSpan.saService(a.endpoint.serviceName); - } - } - for (Annotation a : s.annotations) { - if (a.value.equals(Constants.SERVER_RECV) && a.endpoint != null) { - linkSpan.srService(a.endpoint.serviceName); - break; - } - } - linkSpans.add(linkSpan.build()); - } - - linksBuilder.putTrace(linkSpans.iterator()); - } - return linksBuilder.link(); - } - - private static Predicate> spansPredicate(QueryRequest request) { - return spans -> { - Long timestamp = spans.get(0).timestamp; - if (timestamp == null || - timestamp < (request.endTs - request.lookback) * 1000 || - timestamp > request.endTs * 1000) { - return false; - } - Set serviceNames = new LinkedHashSet<>(); - Predicate durationPredicate = null; - if (request.minDuration != null && request.maxDuration != null) { - durationPredicate = d -> d >= request.minDuration && d <= request.maxDuration; - } else if (request.minDuration != null) { - durationPredicate = d -> d >= request.minDuration; - } - String spanName = request.spanName; - Set annotations = new LinkedHashSet<>(request.annotations); - Map binaryAnnotations = new LinkedHashMap<>(request.binaryAnnotations); - - Set currentServiceNames = new LinkedHashSet<>(); - for (Span span : spans) { - currentServiceNames.clear(); - - span.annotations.forEach(a -> { - annotations.remove(a.value); - if (a.endpoint != null) { - serviceNames.add(a.endpoint.serviceName); - currentServiceNames.add(a.endpoint.serviceName); - } - }); - - span.binaryAnnotations.forEach(b -> { - if (b.type == BinaryAnnotation.Type.STRING && binaryAnnotations.containsKey(b.key)) { - binaryAnnotations.remove(b.key, new String(b.value, UTF_8)); - } - if (b.endpoint != null) { - serviceNames.add(b.endpoint.serviceName); - currentServiceNames.add(b.endpoint.serviceName); - } - }); - - if (currentServiceNames.contains(request.serviceName) && durationPredicate != null) { - if (durationPredicate.test(span.duration)) { - durationPredicate = null; - } - } - - if (span.name.equals(spanName)) { - spanName = null; - } - } - return serviceNames.contains(request.serviceName) - && spanName == null - && annotations.isEmpty() - && binaryAnnotations.isEmpty() - && durationPredicate == null; - }; - } - - private static List> toSortedTraces(Stream> unfiltered) { - return unfiltered.filter(spans -> spans != null && !spans.isEmpty()) - .map(MergeById::apply) - .map(CorrectForClockSkew::apply) - .sorted((left, right) -> right.get(0).compareTo(left.get(0))) - .collect(toList()); - } - - static final class Multimap { - private final Map> delegate = new LinkedHashMap<>(); - private final Supplier> collectionFunction; - - public Multimap(Supplier> collectionFunction) { - this.collectionFunction = collectionFunction; - } - - public Set keySet() { - return delegate.keySet(); - } - - void put(K key, V value) { - delegate.computeIfAbsent(key, (k) -> collectionFunction.get()).add(value); - } - - void clear() { - delegate.clear(); - } - - Collection get(K key) { - return delegate.get(key); - } - } -} diff --git a/zipkin-server/src/main/java/zipkin/server/ZipkinServerConfiguration.java b/zipkin-server/src/main/java/zipkin/server/ZipkinServerConfiguration.java index a20a2ca92b7..570f3f89938 100644 --- a/zipkin-server/src/main/java/zipkin/server/ZipkinServerConfiguration.java +++ b/zipkin-server/src/main/java/zipkin/server/ZipkinServerConfiguration.java @@ -29,6 +29,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import zipkin.Codec; +import zipkin.InMemorySpanStore; import zipkin.Sampler; import zipkin.SpanStore; import zipkin.jdbc.JDBCSpanStore; diff --git a/zipkin-server/src/main/java/zipkin/server/brave/TraceWritesSpanStore.java b/zipkin-server/src/main/java/zipkin/server/brave/TraceWritesSpanStore.java index ed6d14900ef..bdbbca2f8d6 100644 --- a/zipkin-server/src/main/java/zipkin/server/brave/TraceWritesSpanStore.java +++ b/zipkin-server/src/main/java/zipkin/server/brave/TraceWritesSpanStore.java @@ -15,6 +15,7 @@ import com.github.kristofa.brave.Brave; import com.github.kristofa.brave.LocalTracer; +import java.util.Collection; import java.util.Iterator; import java.util.List; import zipkin.DependencyLink; @@ -52,7 +53,7 @@ public List> getTraces(QueryRequest request) { } @Override - public List> getTracesByIds(List traceIds) { + public List> getTracesByIds(Collection traceIds) { tracer.startNewSpan(component, "get-traces-by-ids"); try { return delegate.getTracesByIds(traceIds); diff --git a/zipkin-server/src/test/java/zipkin/server/ZipkinSpanWriterTest.java b/zipkin-server/src/test/java/zipkin/server/ZipkinSpanWriterTest.java index 2dcdb7eb272..c999225c80b 100644 --- a/zipkin-server/src/test/java/zipkin/server/ZipkinSpanWriterTest.java +++ b/zipkin-server/src/test/java/zipkin/server/ZipkinSpanWriterTest.java @@ -17,6 +17,7 @@ import zipkin.BinaryAnnotation; import zipkin.Constants; import zipkin.Endpoint; +import zipkin.InMemorySpanStore; import zipkin.Sampler; import zipkin.Span; diff --git a/zipkin-server/src/test/java/zipkin/server/brave/SpanStoreSpanCollectorTest.java b/zipkin-server/src/test/java/zipkin/server/brave/SpanStoreSpanCollectorTest.java index 8a5c98b17df..a14ca76b4f3 100644 --- a/zipkin-server/src/test/java/zipkin/server/brave/SpanStoreSpanCollectorTest.java +++ b/zipkin-server/src/test/java/zipkin/server/brave/SpanStoreSpanCollectorTest.java @@ -19,7 +19,7 @@ import zipkin.Constants; import zipkin.Endpoint; import zipkin.Span; -import zipkin.server.InMemorySpanStore; +import zipkin.InMemorySpanStore; import static java.util.Collections.singletonList; import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; diff --git a/zipkin-spanstores/jdbc/pom.xml b/zipkin-spanstores/jdbc/pom.xml index 92fca26d31b..4aee4c9a6fd 100644 --- a/zipkin-spanstores/jdbc/pom.xml +++ b/zipkin-spanstores/jdbc/pom.xml @@ -43,6 +43,19 @@ ${jooq.version} + + ${project.groupId} + zipkin + test-jar + test + + + + org.mariadb.jdbc + mariadb-java-client + test + + diff --git a/zipkin-spanstores/jdbc/src/main/java/zipkin/jdbc/JDBCSpanStore.java b/zipkin-spanstores/jdbc/src/main/java/zipkin/jdbc/JDBCSpanStore.java index 7549901d01a..f95312468ec 100644 --- a/zipkin-spanstores/jdbc/src/main/java/zipkin/jdbc/JDBCSpanStore.java +++ b/zipkin-spanstores/jdbc/src/main/java/zipkin/jdbc/JDBCSpanStore.java @@ -17,6 +17,7 @@ import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; @@ -166,7 +167,7 @@ public void accept(Iterator spans) { } } - private List> getTraces(@Nullable QueryRequest request, @Nullable List traceIds) { + List> getTraces(@Nullable QueryRequest request, @Nullable Collection traceIds) { final Map> spansWithoutAnnotations; final Map, List> dbAnnotations; try (Connection conn = datasource.getConnection()) { @@ -248,7 +249,7 @@ private DSLContext context(Connection conn) { } @Override - public List> getTracesByIds(List traceIds) { + public List> getTracesByIds(Collection traceIds) { return traceIds.isEmpty() ? emptyList() : getTraces(null, traceIds); } diff --git a/interop/src/test/java/zipkin/jdbc/JDBCDependenciesTest.java b/zipkin-spanstores/jdbc/src/test/java/zipkin/jdbc/JDBCDependenciesTest.java similarity index 100% rename from interop/src/test/java/zipkin/jdbc/JDBCDependenciesTest.java rename to zipkin-spanstores/jdbc/src/test/java/zipkin/jdbc/JDBCDependenciesTest.java diff --git a/interop/src/test/java/zipkin/jdbc/JDBCSpanStoreTest.java b/zipkin-spanstores/jdbc/src/test/java/zipkin/jdbc/JDBCSpanStoreTest.java similarity index 100% rename from interop/src/test/java/zipkin/jdbc/JDBCSpanStoreTest.java rename to zipkin-spanstores/jdbc/src/test/java/zipkin/jdbc/JDBCSpanStoreTest.java diff --git a/zipkin-spanstores/jdbc/src/test/java/zipkin/jdbc/JDBCTestGraph.java b/zipkin-spanstores/jdbc/src/test/java/zipkin/jdbc/JDBCTestGraph.java new file mode 100644 index 00000000000..b944e98dcad --- /dev/null +++ b/zipkin-spanstores/jdbc/src/test/java/zipkin/jdbc/JDBCTestGraph.java @@ -0,0 +1,50 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.jdbc; + +import java.sql.SQLException; +import org.jooq.conf.Settings; +import org.junit.AssumptionViolatedException; +import org.mariadb.jdbc.MariaDbDataSource; +import zipkin.internal.Nullable; + +import static zipkin.internal.Util.envOr; + +final class JDBCTestGraph { + + final JDBCSpanStore spanStore; + + JDBCTestGraph() throws SQLException { + String mysqlUrl = mysqlUrlFromEnv(); + if (mysqlUrl == null) { + throw new AssumptionViolatedException("Minimally, the environment variable MYSQL_USER must be set"); + } + MariaDbDataSource dataSource = new MariaDbDataSource(); + dataSource.setUrl(mysqlUrl); + this.spanStore = new JDBCSpanStore(dataSource, new Settings().withRenderSchema(false), null); + } + + @Nullable + private static String mysqlUrlFromEnv() { + if (System.getenv("MYSQL_USER") == null) return null; + String mysqlHost = envOr("MYSQL_HOST", "localhost"); + int mysqlPort = envOr("MYSQL_TCP_PORT", 3306); + String mysqlUser = envOr("MYSQL_USER", ""); + String mysqlPass = envOr("MYSQL_PASS", ""); + String mysqlDb = envOr("MYSQL_DB", "zipkin"); + + return String.format("jdbc:mysql://%s:%s/%s?user=%s&password=%s&autoReconnect=true", + mysqlHost, mysqlPort, mysqlDb, mysqlUser, mysqlPass); + } +} diff --git a/zipkin/pom.xml b/zipkin/pom.xml index 1c889e82b99..e44a551f93b 100644 --- a/zipkin/pom.xml +++ b/zipkin/pom.xml @@ -116,6 +116,17 @@ + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + diff --git a/zipkin/src/main/java/zipkin/InMemorySpanStore.java b/zipkin/src/main/java/zipkin/InMemorySpanStore.java new file mode 100644 index 00000000000..46cc3150487 --- /dev/null +++ b/zipkin/src/main/java/zipkin/InMemorySpanStore.java @@ -0,0 +1,275 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin; + +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import zipkin.internal.ApplyTimestampAndDuration; +import zipkin.internal.CorrectForClockSkew; +import zipkin.internal.DependencyLinkSpan; +import zipkin.internal.DependencyLinker; +import zipkin.internal.MergeById; +import zipkin.internal.Nullable; + +import static zipkin.internal.Util.sortedList; + +public final class InMemorySpanStore implements SpanStore { + private static final Charset UTF_8 = Charset.forName("UTF-8"); + + private final Multimap traceIdToSpans = new LinkedListMultimap<>(); + private final Multimap serviceToTraceIds = new LinkedHashSetMultimap<>(); + private final Multimap serviceToSpanNames = new LinkedHashSetMultimap<>(); + + @Override + public synchronized void accept(Iterator spans) { + while (spans.hasNext()) { + Span span = ApplyTimestampAndDuration.apply(spans.next()); + long traceId = span.traceId; + String spanName = span.name; + traceIdToSpans.put(span.traceId, span); + + for (String serviceName : serviceNames(span)) { + serviceToTraceIds.put(serviceName, traceId); + serviceToSpanNames.put(serviceName, spanName); + } + } + } + + synchronized void clear() { + traceIdToSpans.clear(); + serviceToTraceIds.clear(); + } + + @Override + public synchronized List> getTraces(QueryRequest request) { + Collection traceIds = serviceToTraceIds.get(request.serviceName); + if (traceIds == null || traceIds.isEmpty()) return Collections.emptyList(); + + List> result = new ArrayList<>(); + + List> unfiltered = getTracesByIds(traceIds); + for (int i = 0; i < unfiltered.size() && result.size() < request.limit; i++) { + List next = unfiltered.get(i); + if (test(request, next)) { + result.add(next); + } + } + return result; + } + + @Override + public synchronized List> getTracesByIds(Collection traceIds) { + if (traceIds.isEmpty()) return Collections.emptyList(); + List> result = new LinkedList<>(); + for (Long traceId : traceIds) { + Collection spans = traceIdToSpans.get(traceId); + if (spans == null || spans.isEmpty()) continue; + result.add(CorrectForClockSkew.apply(MergeById.apply(spans))); + } + Collections.sort(result, TRACE_DESCENDING); + return result; + } + + private static final Comparator> TRACE_DESCENDING = new Comparator>() { + @Override + public int compare(List left, List right) { + return right.get(0).compareTo(left.get(0)); + } + }; + + @Override + public synchronized List getServiceNames() { + return sortedList(serviceToTraceIds.keySet()); + } + + @Override + public synchronized List getSpanNames(String service) { + if (service == null) return Collections.emptyList(); + service = service.toLowerCase(); // service names are always lowercase! + return sortedList(serviceToSpanNames.get(service)); + } + + @Override + public List getDependencies(long endTs, @Nullable Long lookback) { + endTs *= 1000; + if (lookback == null) { + lookback = endTs; + } else { + lookback *= 1000; + } + + DependencyLinker linksBuilder = new DependencyLinker(); + + for (Collection trace : traceIdToSpans.delegate.values()) { + if (trace.isEmpty()) continue; + + List linkSpans = new LinkedList<>(); + for (Span s : trace) { + Long timestamp = s.timestamp; + if (timestamp == null || + timestamp < (endTs - lookback) || + timestamp > endTs) { + continue; + } + DependencyLinkSpan.Builder linkSpan = new DependencyLinkSpan.Builder(s.parentId, s.id); + for (BinaryAnnotation a : s.binaryAnnotations) { + if (a.key.equals(Constants.CLIENT_ADDR) && a.endpoint != null) { + linkSpan.caService(a.endpoint.serviceName); + } else if (a.key.equals(Constants.SERVER_ADDR) && a.endpoint != null) { + linkSpan.saService(a.endpoint.serviceName); + } + } + for (Annotation a : s.annotations) { + if (a.value.equals(Constants.SERVER_RECV) && a.endpoint != null) { + linkSpan.srService(a.endpoint.serviceName); + break; + } + } + linkSpans.add(linkSpan.build()); + } + + linksBuilder.putTrace(linkSpans.iterator()); + } + return linksBuilder.link(); + } + + private static boolean test(QueryRequest request, List spans) { + Long timestamp = spans.get(0).timestamp; + if (timestamp == null || + timestamp < (request.endTs - request.lookback) * 1000 || + timestamp > request.endTs * 1000) { + return false; + } + Set serviceNames = new LinkedHashSet<>(); + boolean testedDuration = request.minDuration == null && request.maxDuration == null; + + String spanName = request.spanName; + Set annotations = new LinkedHashSet<>(request.annotations); + Map binaryAnnotations = new LinkedHashMap<>(request.binaryAnnotations); + + Set currentServiceNames = new LinkedHashSet<>(); + for (Span span : spans) { + currentServiceNames.clear(); + + for (Annotation a: span.annotations) { + annotations.remove(a.value); + if (a.endpoint != null) { + serviceNames.add(a.endpoint.serviceName); + currentServiceNames.add(a.endpoint.serviceName); + } + } + + for (BinaryAnnotation b: span.binaryAnnotations) { + if (b.type == BinaryAnnotation.Type.STRING && + new String(b.value, UTF_8).equals(binaryAnnotations.get(b.key))) { + binaryAnnotations.remove(b.key); + } + if (b.endpoint != null) { + serviceNames.add(b.endpoint.serviceName); + currentServiceNames.add(b.endpoint.serviceName); + } + } + + if (currentServiceNames.contains(request.serviceName) && !testedDuration) { + if (request.minDuration != null && request.maxDuration != null) { + testedDuration = span.duration >= request.minDuration && span.duration <= request.maxDuration; + } else if (request.minDuration != null) { + testedDuration = span.duration >= request.minDuration; + } + } + + if (span.name.equals(spanName)) { + spanName = null; + } + } + return serviceNames.contains(request.serviceName) + && spanName == null + && annotations.isEmpty() + && binaryAnnotations.isEmpty() + && testedDuration; + } + + static final class LinkedListMultimap extends Multimap { + + @Override + Collection valueContainer() { + return new LinkedList<>(); + } + } + + static final class LinkedHashSetMultimap extends Multimap { + + @Override + Collection valueContainer() { + return new LinkedHashSet<>(); + } + } + + static abstract class Multimap { + private final Map> delegate = new LinkedHashMap<>(); + + abstract Collection valueContainer(); + + Set keySet() { + return delegate.keySet(); + } + + void put(K key, V value) { + Collection valueContainer = delegate.get(key); + if (valueContainer == null) { + synchronized (delegate) { + if (!delegate.containsKey(key)) { + valueContainer = valueContainer(); + delegate.put(key, valueContainer); + } + } + } + valueContainer.add(value); + } + + // not synchronized as only used for for testing + void clear() { + delegate.clear(); + } + + Collection get(K key) { + return delegate.get(key); + } + } + + static Set serviceNames(Span span) { + Set result = new LinkedHashSet<>(); + for (Annotation a : span.annotations) { + if (a.endpoint == null) continue; + if (a.endpoint.serviceName.isEmpty()) continue; + result.add(a.endpoint.serviceName); + } + for (BinaryAnnotation a : span.binaryAnnotations) { + if (a.endpoint == null) continue; + if (a.endpoint.serviceName.isEmpty()) continue; + result.add(a.endpoint.serviceName); + } + return result; + } +} diff --git a/zipkin/src/main/java/zipkin/SpanStore.java b/zipkin/src/main/java/zipkin/SpanStore.java index 292550093ed..4876d6fe89e 100644 --- a/zipkin/src/main/java/zipkin/SpanStore.java +++ b/zipkin/src/main/java/zipkin/SpanStore.java @@ -13,6 +13,7 @@ */ package zipkin; +import java.util.Collection; import java.util.Iterator; import java.util.List; import zipkin.internal.Nullable; @@ -41,7 +42,7 @@ public interface SpanStore { *

Results are sorted in order of the first span's timestamp, and contain less elements than * trace IDs when corresponding traces aren't available. */ - List> getTracesByIds(List traceIds); + List> getTracesByIds(Collection traceIds); /** * Get all the {@link Endpoint#serviceName service names}. diff --git a/interop/src/test/java/zipkin/DependenciesTest.java b/zipkin/src/test/java/zipkin/DependenciesTest.java similarity index 100% rename from interop/src/test/java/zipkin/DependenciesTest.java rename to zipkin/src/test/java/zipkin/DependenciesTest.java diff --git a/interop/src/test/java/zipkin/server/InMemoryDependenciesTest.java b/zipkin/src/test/java/zipkin/InMemoryDependenciesTest.java similarity index 92% rename from interop/src/test/java/zipkin/server/InMemoryDependenciesTest.java rename to zipkin/src/test/java/zipkin/InMemoryDependenciesTest.java index c6bed472bf1..69791ef8964 100644 --- a/interop/src/test/java/zipkin/server/InMemoryDependenciesTest.java +++ b/zipkin/src/test/java/zipkin/InMemoryDependenciesTest.java @@ -11,11 +11,9 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package zipkin.server; +package zipkin; import java.util.List; -import zipkin.DependenciesTest; -import zipkin.Span; public class InMemoryDependenciesTest extends DependenciesTest { diff --git a/interop/src/test/java/zipkin/server/InMemorySpanStoreTest.java b/zipkin/src/test/java/zipkin/InMemorySpanStoreTest.java similarity index 93% rename from interop/src/test/java/zipkin/server/InMemorySpanStoreTest.java rename to zipkin/src/test/java/zipkin/InMemorySpanStoreTest.java index 68a5ffd7e04..ffcc0c56475 100644 --- a/interop/src/test/java/zipkin/server/InMemorySpanStoreTest.java +++ b/zipkin/src/test/java/zipkin/InMemorySpanStoreTest.java @@ -11,9 +11,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package zipkin.server; - -import zipkin.SpanStoreTest; +package zipkin; public class InMemorySpanStoreTest extends SpanStoreTest { public InMemorySpanStoreTest() { diff --git a/interop/src/test/java/zipkin/SpanStoreTest.java b/zipkin/src/test/java/zipkin/SpanStoreTest.java similarity index 100% rename from interop/src/test/java/zipkin/SpanStoreTest.java rename to zipkin/src/test/java/zipkin/SpanStoreTest.java