From 0268691a975469f0a675f9be5fdaee2d148e92cb Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Tue, 22 Sep 2015 20:48:09 -0700 Subject: [PATCH 1/2] Refines behavior of java implementation vs upstream Some behaviors of zipkin proper were not captured in the java port. In other cases, dead-code was carried over. This corrects both points. --- .../src/main/java/io/zipkin/Constants.java | 25 +++++++++++++++++ .../src/main/java/io/zipkin/Span.java | 27 ------------------- .../io/zipkin/query/InMemoryZipkinQuery.java | 15 +++++++---- .../java/io/zipkin/query/QueryRequest.java | 7 +++++ .../zipkin/query/InMemoryZipkinQueryTest.java | 3 ++- .../java/io/zipkin/query/ZipkinQueryTest.java | 17 +++++++++--- .../io/zipkin/scribe/ScribeSpanConsumer.java | 7 +++-- 7 files changed, 61 insertions(+), 40 deletions(-) diff --git a/zipkin-java-core/src/main/java/io/zipkin/Constants.java b/zipkin-java-core/src/main/java/io/zipkin/Constants.java index 8fdd96c2dc5..23f34e32d20 100644 --- a/zipkin-java-core/src/main/java/io/zipkin/Constants.java +++ b/zipkin-java-core/src/main/java/io/zipkin/Constants.java @@ -13,6 +13,11 @@ */ package io.zipkin; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.Set; + public final class Constants { /* Common annotation values */ public static final String CLIENT_RECV = "cr"; @@ -27,12 +32,32 @@ public final class Constants { * Annotation#host()} */ public static final String CLIENT_ADDR = "ca"; + /** * The endpoint associated with "SERVER_" annotations is not necessarily {@link * Annotation#host()} */ public static final String SERVER_ADDR = "sa"; + /** + * Zipkin Span Storage implementations should not expect users to search by core annotations. + * + * When storage implementations separate annotation indexing from storage, filter out these + * values. + */ + public static final Set CORE_ANNOTATIONS = Collections.unmodifiableSet( + new LinkedHashSet<>(Arrays.asList( + CLIENT_RECV, + CLIENT_SEND, + SERVER_RECV, + SERVER_SEND, + WIRE_SEND, + WIRE_RECV, + CLIENT_ADDR, + SERVER_ADDR + )) + ); + private Constants() { } } diff --git a/zipkin-java-core/src/main/java/io/zipkin/Span.java b/zipkin-java-core/src/main/java/io/zipkin/Span.java index a8394a65854..190fe13a056 100644 --- a/zipkin-java-core/src/main/java/io/zipkin/Span.java +++ b/zipkin-java-core/src/main/java/io/zipkin/Span.java @@ -18,9 +18,7 @@ import com.facebook.swift.codec.ThriftStruct; import com.google.auto.value.AutoValue; import io.zipkin.internal.Nullable; -import java.util.LinkedHashSet; import java.util.List; -import java.util.Set; import static com.facebook.swift.codec.ThriftField.Requiredness.OPTIONAL; @@ -59,31 +57,6 @@ public static Builder builder(Span source) { @ThriftField(value = 9, requiredness = OPTIONAL) public abstract Boolean debug(); - /** - * Assuming this is an RPC span, is it from the client side? - */ - public boolean isClientSide() { - for (Annotation a : annotations()) { - if (a.value().equals(Constants.CLIENT_SEND) || a.value().equals(Constants.CLIENT_RECV)) { - return true; - } - } - return false; - } - - /** - * Returns the lower-cased set of service names this span has annotations for. - */ - public Set serviceNames() { - Set result = new LinkedHashSet<>(); - for (Annotation a : annotations()) { - if (a.host() != null) { - result.add(a.host().serviceName().toLowerCase()); - } - } - return result; - } - @AutoValue.Builder public interface Builder { diff --git a/zipkin-java-query/src/main/java/io/zipkin/query/InMemoryZipkinQuery.java b/zipkin-java-query/src/main/java/io/zipkin/query/InMemoryZipkinQuery.java index 88cd5814b61..0d7b44ba00d 100644 --- a/zipkin-java-query/src/main/java/io/zipkin/query/InMemoryZipkinQuery.java +++ b/zipkin-java-query/src/main/java/io/zipkin/query/InMemoryZipkinQuery.java @@ -15,6 +15,7 @@ import io.zipkin.Annotation; import io.zipkin.BinaryAnnotation; +import io.zipkin.Constants; import io.zipkin.Span; import io.zipkin.Trace; import io.zipkin.internal.Nullable; @@ -28,18 +29,20 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Consumer; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; -public final class InMemoryZipkinQuery implements ZipkinQuery { +public final class InMemoryZipkinQuery implements ZipkinQuery, Consumer> { private static final Charset UTF_8 = Charset.forName("UTF-8"); private final Multimap traceIdToSpans = new Multimap<>(LinkedList::new); private final Multimap serviceToTraceIds = new Multimap<>(LinkedHashSet::new); private final Multimap serviceToSpanNames = new Multimap<>(LinkedHashSet::new); - public synchronized void accept(Iterable spans) { + @Override + public synchronized void accept(List spans) { spans.forEach(span -> { long traceId = span.traceId(); traceIdToSpans.put(span.traceId(), span); @@ -125,14 +128,16 @@ static Predicate spanPredicate(QueryRequest request) { spanPredicate = spanPredicate.and(s -> s.name().equals(request.spanName())); } if (request.annotations() != null && !request.annotations().isEmpty()) { - spanPredicate = spanPredicate.and(s -> s.annotations().stream().map( - Annotation::value).allMatch(v -> request.annotations().contains(v))); + spanPredicate = spanPredicate.and(s -> s.annotations().stream() + .map(Annotation::value) + .filter(v -> !Constants.CORE_ANNOTATIONS.contains(v)) // don't return core annotations + .anyMatch(v -> request.annotations().contains(v))); } if (request.binaryAnnotations() != null && !request.binaryAnnotations().isEmpty()) { spanPredicate = spanPredicate.and(s -> s.binaryAnnotations().stream() .filter(b -> b.type() == BinaryAnnotation.Type.STRING) .filter(b -> request.binaryAnnotations().containsKey(b.key())) - .allMatch( + .anyMatch( b -> request.binaryAnnotations().get(b.key()).equals(new String(b.value(), UTF_8)))); } return spanPredicate; diff --git a/zipkin-java-query/src/main/java/io/zipkin/query/QueryRequest.java b/zipkin-java-query/src/main/java/io/zipkin/query/QueryRequest.java index cd744f8c563..b989bda9f57 100644 --- a/zipkin-java-query/src/main/java/io/zipkin/query/QueryRequest.java +++ b/zipkin-java-query/src/main/java/io/zipkin/query/QueryRequest.java @@ -17,6 +17,7 @@ import com.facebook.swift.codec.ThriftField; import com.facebook.swift.codec.ThriftStruct; import com.google.auto.value.AutoValue; +import io.zipkin.BinaryAnnotation; import io.zipkin.internal.Nullable; import java.util.List; import java.util.Map; @@ -38,10 +39,16 @@ public static Builder builder() { @ThriftField(value = 2, requiredness = OPTIONAL) public abstract String spanName(); + /** + * Custom-defined annotation values to search for. + */ @Nullable @ThriftField(value = 3, requiredness = OPTIONAL) public abstract List annotations(); + /** + * Binary annotations of type {@link BinaryAnnotation.Type#STRING} to search for. + */ @Nullable @ThriftField(value = 8, requiredness = OPTIONAL) public abstract Map binaryAnnotations(); diff --git a/zipkin-java-query/src/test/java/io/zipkin/query/InMemoryZipkinQueryTest.java b/zipkin-java-query/src/test/java/io/zipkin/query/InMemoryZipkinQueryTest.java index f09dae2d70c..def79e0adc0 100644 --- a/zipkin-java-query/src/test/java/io/zipkin/query/InMemoryZipkinQueryTest.java +++ b/zipkin-java-query/src/test/java/io/zipkin/query/InMemoryZipkinQueryTest.java @@ -14,6 +14,7 @@ package io.zipkin.query; import io.zipkin.Span; +import java.util.List; public class InMemoryZipkinQueryTest extends ZipkinQueryTest { InMemoryZipkinQuery query = new InMemoryZipkinQuery(); @@ -22,7 +23,7 @@ public class InMemoryZipkinQueryTest extends ZipkinQueryTest { return query; } - @Override protected void reload(Iterable spans) { + @Override protected void reload(List spans) { query.clear(); query.accept(spans); } diff --git a/zipkin-java-query/src/test/java/io/zipkin/query/ZipkinQueryTest.java b/zipkin-java-query/src/test/java/io/zipkin/query/ZipkinQueryTest.java index e77dfc32a9f..7359451192f 100644 --- a/zipkin-java-query/src/test/java/io/zipkin/query/ZipkinQueryTest.java +++ b/zipkin-java-query/src/test/java/io/zipkin/query/ZipkinQueryTest.java @@ -42,7 +42,7 @@ public abstract class ZipkinQueryTest { protected abstract ZipkinQuery query(); - protected abstract void reload(Iterable spans); + protected abstract void reload(List spans); @Before public void load() { @@ -64,6 +64,7 @@ public void load() { .serviceName("db").build(); Annotation webSR = Annotation.builder().timestamp(10).value(SERVER_RECV).host(web).build(); + Annotation webCustom = Annotation.builder().timestamp(10).value("finagle.retry").host(web).build(); BinaryAnnotation httpUri = BinaryAnnotation.builder() .key("http.uri").value("/foo".getBytes()) .type(BinaryAnnotation.Type.STRING).host(web).build(); @@ -76,7 +77,7 @@ public void load() { .traceId(1L) .name("GET") .id(1L) - .annotations(asList(webSR)) + .annotations(asList(webSR, webCustom)) .binaryAnnotations(asList(httpUri)) .build(), Span.builder() @@ -214,12 +215,22 @@ public void getTraces_service_name() { public void getTraces_annotation_name() { assertThat(query().getTraces(QueryRequest.builder() .serviceName("web") - .annotations(asList(SERVER_SEND)) + .annotations(asList(webCustom.value())) .binaryAnnotations(emptyMap()) .endTs(afterWebSS) .limit(100).build())).containsExactly(trace1); } + @Test + public void getTraces_annotation_name_core_ignored() { + assertThat(query().getTraces(QueryRequest.builder() + .serviceName("web") + .annotations(asList(SERVER_SEND)) + .binaryAnnotations(emptyMap()) + .endTs(afterWebSS) + .limit(100).build())).isEmpty(); + } + @Test public void getTraces_annotation_name_and_value() { Map binaryAnnotations = new LinkedHashMap<>(); diff --git a/zipkin-java-scribe/src/main/java/io/zipkin/scribe/ScribeSpanConsumer.java b/zipkin-java-scribe/src/main/java/io/zipkin/scribe/ScribeSpanConsumer.java index 389a097c68c..9c52e14670c 100644 --- a/zipkin-java-scribe/src/main/java/io/zipkin/scribe/ScribeSpanConsumer.java +++ b/zipkin-java-scribe/src/main/java/io/zipkin/scribe/ScribeSpanConsumer.java @@ -26,10 +26,10 @@ public final class ScribeSpanConsumer implements Scribe { - private final Consumer> consumer; + private final Consumer> consumer; private final ThriftCodec spanCodec = new ThriftCodecManager().getCodec(Span.class); - public ScribeSpanConsumer(Consumer> consumer) { + public ScribeSpanConsumer(Consumer> consumer) { this.consumer = consumer; } @@ -48,8 +48,7 @@ public ResultCode log(List messages) { return null; } }) - .filter(s -> s != null) - .filter(s -> !(s.isClientSide() && s.serviceNames().contains("client"))); + .filter(s -> s != null); consumer.accept(spansToStore.collect(Collectors.toList())); return ResultCode.OK; } From f9a707b99e404ba940d6b0e3caf3ca9aee782520 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Tue, 22 Sep 2015 20:49:27 -0700 Subject: [PATCH 2/2] Adds SpanStore adapter and spec from the Scala project Eventhough the conversion is arduous, there's at least temporary value in running the authoritative tests from the scala side of OpenZipkin. --- zipkin-java-query/pom.xml | 60 +++++++ .../InMemoryZipkinQuerySpanStoreTest.java | 29 ++++ .../query/ZipkinQuerySpanStoreAdapter.java | 158 ++++++++++++++++++ 3 files changed, 247 insertions(+) create mode 100644 zipkin-java-query/src/test/java/io/zipkin/query/InMemoryZipkinQuerySpanStoreTest.java create mode 100644 zipkin-java-query/src/test/java/io/zipkin/query/ZipkinQuerySpanStoreAdapter.java diff --git a/zipkin-java-query/pom.xml b/zipkin-java-query/pom.xml index 5d41dc58f9f..b207667cea2 100644 --- a/zipkin-java-query/pom.xml +++ b/zipkin-java-query/pom.xml @@ -29,6 +29,12 @@ Zipkin Java Query Zipkin Java Query + + + 1.9.0 + 2.2.5 + + ${project.groupId} @@ -39,5 +45,59 @@ com.facebook.swift swift-annotations + + + + ${project.groupId} + swift-codec + + + + + io.zipkin + zipkin-common + ${zipkin-scala.version} + test + + + org.apache.thrift + libthrift + + + + + + io.zipkin + zipkin-scrooge + ${zipkin-scala.version} + test + + + org.apache.thrift + libthrift + + + + + + io.zipkin + zipkin-common + ${zipkin-scala.version} + test + test + + + org.apache.thrift + libthrift + + + + + + org.scalatest + scalatest_2.10 + ${scalatest.version} + test + diff --git a/zipkin-java-query/src/test/java/io/zipkin/query/InMemoryZipkinQuerySpanStoreTest.java b/zipkin-java-query/src/test/java/io/zipkin/query/InMemoryZipkinQuerySpanStoreTest.java new file mode 100644 index 00000000000..ba5749f2d68 --- /dev/null +++ b/zipkin-java-query/src/test/java/io/zipkin/query/InMemoryZipkinQuerySpanStoreTest.java @@ -0,0 +1,29 @@ +/** + * Copyright 2015 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.zipkin.query; + +import com.twitter.zipkin.storage.SpanStore; +import com.twitter.zipkin.storage.SpanStoreSpec; + +public class InMemoryZipkinQuerySpanStoreTest extends SpanStoreSpec { + private ZipkinQuerySpanStoreAdapter mem; + + public SpanStore store() { + return mem; + } + + public void clear() { + mem = new ZipkinQuerySpanStoreAdapter<>(new InMemoryZipkinQuery()); + } +} diff --git a/zipkin-java-query/src/test/java/io/zipkin/query/ZipkinQuerySpanStoreAdapter.java b/zipkin-java-query/src/test/java/io/zipkin/query/ZipkinQuerySpanStoreAdapter.java new file mode 100644 index 00000000000..6578a77b288 --- /dev/null +++ b/zipkin-java-query/src/test/java/io/zipkin/query/ZipkinQuerySpanStoreAdapter.java @@ -0,0 +1,158 @@ +/** + * Copyright 2015 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.zipkin.query; + +import com.facebook.swift.codec.ThriftCodec; +import com.facebook.swift.codec.ThriftCodecManager; +import com.twitter.util.Future; +import com.twitter.zipkin.common.Span; +import com.twitter.zipkin.conversions.thrift; +import com.twitter.zipkin.storage.IndexedTraceId; +import com.twitter.zipkin.storage.SpanStore; +import com.twitter.zipkin.thriftscala.Span$; +import io.zipkin.Annotation; +import io.zipkin.Trace; +import io.zipkin.internal.Nullable; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TMemoryBuffer; +import scala.Option; +import scala.collection.Iterator; +import scala.collection.JavaConversions; +import scala.collection.Seq; +import scala.collection.immutable.Set; +import scala.runtime.BoxedUnit; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; + +/** + * Adapts {@link ZipkinQuery} to a scala/scrooge {@link com.twitter.zipkin.storage.SpanStore} in + * order to test against its {@link com.twitter.zipkin.storage.SpanStoreSpec} for interoperability + * reasons. + */ +public final class ZipkinQuerySpanStoreAdapter>> extends SpanStore { + private static final ThriftCodec spanCodec = new ThriftCodecManager().getCodec(io.zipkin.Span.class); + + private final T spanStore; + + public ZipkinQuerySpanStoreAdapter(T spanStore) { + this.spanStore = spanStore; + } + + @Override + public Future>> getSpansByTraceIds(Seq traceIds) { + final List input = new ArrayList<>(traceIds.size()); + for (Iterator i = traceIds.iterator(); i.hasNext(); input.add((Long) i.next())) ; + return Future.value( + JavaConversions.asScalaBuffer(spanStore.getTracesByIds(input, false).stream() + .map(t -> JavaConversions.asScalaBuffer( + t.spans().stream() + .map(ZipkinQuerySpanStoreAdapter::convert) + .filter(s -> s != null) + .collect(Collectors.toList())) + ).collect(Collectors.toList()))); + } + + @Override + public Future> getTraceIdsByName(String serviceName, Option spanName, long endTs, int limit) { + QueryRequest request = QueryRequest.builder() + .serviceName(serviceName) + .spanName(spanName.isDefined() ? spanName.get() : null) + .endTs(endTs) + .limit(limit) + .build(); + return indexedTraceIdFuture(spanStore.getTraces(request)); + } + + @Override + public Future> getTraceIdsByAnnotation(String serviceName, String annotation, Option value, long endTs, int limit) { + QueryRequest request = QueryRequest.builder() + .serviceName(serviceName) + .annotations(value.isEmpty() ? singletonList(annotation) : emptyList()) + .binaryAnnotations(value.isDefined() ? singletonMap(annotation, new String(value.get().array(), Charset.forName("UTF-8"))) : emptyMap()) + .endTs(endTs) + .limit(limit) + .build(); + return indexedTraceIdFuture(spanStore.getTraces(request)); + } + + @Override + public Future> getAllServiceNames() { + return Future.value(JavaConversions.asScalaSet(spanStore.getServiceNames()).toSet()); + } + + @Override + public Future> getSpanNames(String service) { + return Future.value(JavaConversions.asScalaSet(spanStore.getSpanNames(service)).toSet()); + } + + @Override + public Future apply(Seq input) { + List spans = JavaConversions.asJavaCollection(input).stream() + .map(ZipkinQuerySpanStoreAdapter::invert) + .filter(s -> s != null) + .collect(Collectors.toList()); + spanStore.accept(spans); + return Future.Unit(); + } + + @Override + public void close() { + } + + static Future> indexedTraceIdFuture(List matchingSpans) { + return Future.value(JavaConversions.asScalaBuffer(matchingSpans.stream().map(trace -> { + long traceId = trace.spans().get(0).traceId(); + long maxTimestamp = trace.spans().stream() + .map(io.zipkin.Span::annotations).flatMap(List::stream) + .mapToLong(Annotation::timestamp).max().getAsLong(); + return IndexedTraceId.apply(traceId, maxTimestamp); + }).collect(Collectors.toList()))); + } + + @Nullable + static Span convert(io.zipkin.Span input) { + try { + TMemoryBuffer transport = new TMemoryBuffer(0); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + spanCodec.write(input, protocol); + com.twitter.zipkin.thriftscala.Span scroogeThrift = Span$.MODULE$.decode(protocol); + return new thrift.WrappedSpan(scroogeThrift).toSpan(); + } catch (Exception e) { + e.printStackTrace(); + return null; + } + } + + @Nullable + static io.zipkin.Span invert(Span input) { + try { + TMemoryBuffer transport = new TMemoryBuffer(0); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + Span$.MODULE$.encode(new thrift.ThriftSpan(input).toThrift(), protocol); + return spanCodec.read(protocol); + } catch (Exception e) { + e.printStackTrace(); + return null; + } + } +} \ No newline at end of file