From 93b015d33cf0a7664a91cf093f3f9f950e1a6c6d Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Mon, 8 Feb 2016 01:13:04 +0800 Subject: [PATCH] Updates to latest zipkin-scala and adds in-mem dependency aggregation Previously, the in-memory provider did not support dependency aggregation. This meant users couldn't play with zipkin unless they ran MySQL. This adds aggregation support through a new type: `DependencyLinker`. This new type also corects the dependency graph, particularly around uninstrumented services or paths with local spans intermediating calls. --- interop/pom.xml | 2 +- .../test/java/zipkin/DependenciesTest.java | 175 ++++++++++++++++++ .../zipkin/jdbc/JDBCDependenciesTest.java | 41 ++++ .../jdbc/JDBCScalaDependencyStoreTest.java | 7 + .../server/InMemoryDependenciesTest.java | 35 ++++ .../InMemoryScalaDependencyStoreTest.java | 46 +++++ .../java/zipkin/server/InMemorySpanStore.java | 47 ++++- .../jdbc/DependencyLinkSpanIterator.java | 117 ++++++++++++ .../main/java/zipkin/jdbc/JDBCSpanStore.java | 90 ++++----- .../zipkin/internal/DependencyLinkSpan.java | 106 +++++++++++ .../zipkin/internal/DependencyLinker.java | 100 ++++++++++ .../java/zipkin/internal/PeekingIterator.java | 107 +++++++++++ .../zipkin/internal/PeekingIteratorTest.java | 80 ++++++++ 13 files changed, 892 insertions(+), 61 deletions(-) create mode 100644 interop/src/test/java/zipkin/DependenciesTest.java create mode 100644 interop/src/test/java/zipkin/jdbc/JDBCDependenciesTest.java create mode 100644 interop/src/test/java/zipkin/server/InMemoryDependenciesTest.java create mode 100644 interop/src/test/java/zipkin/server/InMemoryScalaDependencyStoreTest.java create mode 100644 zipkin-spanstores/jdbc/src/main/java/zipkin/jdbc/DependencyLinkSpanIterator.java create mode 100644 zipkin/src/main/java/zipkin/internal/DependencyLinkSpan.java create mode 100644 zipkin/src/main/java/zipkin/internal/DependencyLinker.java create mode 100644 zipkin/src/main/java/zipkin/internal/PeekingIterator.java create mode 100644 zipkin/src/test/java/zipkin/internal/PeekingIteratorTest.java diff --git a/interop/pom.xml b/interop/pom.xml index 156adce3165..ab17ac8966e 100644 --- a/interop/pom.xml +++ b/interop/pom.xml @@ -29,7 +29,7 @@ ${project.basedir}/.. - 1.30.2 + 1.32.3 2.2.5 diff --git a/interop/src/test/java/zipkin/DependenciesTest.java b/interop/src/test/java/zipkin/DependenciesTest.java new file mode 100644 index 00000000000..f6236f558ae --- /dev/null +++ b/interop/src/test/java/zipkin/DependenciesTest.java @@ -0,0 +1,175 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin; + +import java.util.Calendar; +import java.util.GregorianCalendar; +import java.util.Iterator; +import java.util.List; +import java.util.TimeZone; +import org.junit.Before; +import org.junit.Test; + +import static java.util.Arrays.asList; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Base test for {@link SpanStore} implementations that support dependency aggregation. Subtypes + * should create a connection to a real backend, even if that backend is in-process. + * + *

This is a replacement for {@code com.twitter.zipkin.storage.DependencyStoreSpec}. + */ +public abstract class DependenciesTest { + + /** Should maintain state between multiple calls within a test. */ + protected final T store; + + protected DependenciesTest(T store) { + this.store = store; + } + + /** Clears the span store between tests. */ + @Before + public abstract void clear(); + + /** + * Implementations should at least {@link SpanStore#accept(Iterator) store} the input. If + * dependency processing is a separate job, it should complete before returning from this method. + */ + protected abstract void processDependencies(List spans); + + + /** Notably, the cassandra implementation has day granularity */ + private static long midnight(){ + Calendar date = new GregorianCalendar(TimeZone.getTimeZone("GMT")); + // reset hour, minutes, seconds and millis + date.set(Calendar.HOUR_OF_DAY, 0); + date.set(Calendar.MINUTE, 0); + date.set(Calendar.SECOND, 0); + date.set(Calendar.MILLISECOND, 0); + return date.getTimeInMillis(); + } + + // Use real time, as most span-stores have TTL logic which looks back several days. + long today = midnight(); + + Endpoint zipkinWeb = Endpoint.create("zipkin-web", 172 << 24 | 17 << 16 | 3, 8080); + Endpoint zipkinQuery = Endpoint.create("zipkin-query", 172 << 24 | 17 << 16 | 2, 9411); + Endpoint zipkinJdbc = Endpoint.create("zipkin-jdbc", 172 << 24 | 17 << 16 | 2, 0); + + /** This test confirms that core ("sr", "cs", "cr", "ss") annotations are not required. */ + @Test + public void getDependencies_noCoreAnnotations() { + Endpoint someClient = Endpoint.create("some-client", 172 << 24 | 17 << 16 | 4, 80); + List trace = asList( + new Span.Builder().traceId(20L).id(20L).name("get") + .timestamp(today * 1000).duration(350L * 1000) + .addBinaryAnnotation(BinaryAnnotation.address(Constants.CLIENT_ADDR, someClient)) + .addBinaryAnnotation(BinaryAnnotation.address(Constants.SERVER_ADDR, zipkinWeb)).build(), + new Span.Builder().traceId(20L).parentId(20L).id(21L).name("get") + .timestamp((today + 50) * 1000).duration(250L * 1000) + .addBinaryAnnotation(BinaryAnnotation.address(Constants.CLIENT_ADDR, zipkinWeb)) + .addBinaryAnnotation(BinaryAnnotation.address(Constants.SERVER_ADDR, zipkinQuery)).build(), + new Span.Builder().traceId(20L).parentId(21L).id(22L).name("get") + .timestamp((today + 150) * 1000).duration(50L * 1000) + .addBinaryAnnotation(BinaryAnnotation.address(Constants.CLIENT_ADDR, zipkinQuery)) + .addBinaryAnnotation(BinaryAnnotation.address(Constants.SERVER_ADDR, zipkinJdbc)).build() + ); + + processDependencies(trace); + + assertThat(store.getDependencies(today * 1000, null)).containsOnly( + new DependencyLink("some-client", "zipkin-web", 1), + new DependencyLink("zipkin-web", "zipkin-query", 1), + new DependencyLink("zipkin-query", "zipkin-jdbc", 1) + ); + } + + /** + * This test confirms that the span store can process trace with intermediate + * spans like the below properly. + * + * span1: SR SS + * span2: intermediate call + * span3: CS SR SS CR: Dependency 1 + */ + @Test + public void getDependencies_intermediateSpans() { + List trace = asList( + new Span.Builder().traceId(20L).id(20L).name("get") + .timestamp(today * 1000).duration(350L * 1000) + .addAnnotation(Annotation.create(today * 1000, Constants.SERVER_RECV, zipkinWeb)) + .addAnnotation(Annotation.create((today + 350) * 1000, Constants.SERVER_SEND, zipkinWeb)).build(), + new Span.Builder().traceId(20L).parentId(20L).id(21L).name("call") + .timestamp((today + 25) * 1000).duration(325L * 1000) + .addBinaryAnnotation(BinaryAnnotation.create(Constants.LOCAL_COMPONENT, "depth2", zipkinWeb)).build(), + new Span.Builder().traceId(20L).parentId(21L).id(22L).name("get") + .timestamp((today + 50) * 1000).duration(250L * 1000) + .addAnnotation(Annotation.create((today + 50) * 1000, Constants.CLIENT_SEND, zipkinWeb)) + .addAnnotation(Annotation.create((today + 100) * 1000, Constants.SERVER_RECV, zipkinQuery)) + .addAnnotation(Annotation.create((today + 250) * 1000, Constants.SERVER_SEND, zipkinQuery)) + .addAnnotation(Annotation.create((today + 300) * 1000, Constants.CLIENT_RECV, zipkinWeb)).build(), + new Span.Builder().traceId(20L).parentId(22L).id(23L).name("call") + .timestamp((today + 110) * 1000).duration(130L * 1000) + .addBinaryAnnotation(BinaryAnnotation.create(Constants.LOCAL_COMPONENT, "depth4", zipkinQuery)).build(), + new Span.Builder().traceId(20L).parentId(23L).id(24L).name("call") + .timestamp((today + 125) * 1000).duration(105L * 1000) + .addBinaryAnnotation(BinaryAnnotation.create(Constants.LOCAL_COMPONENT, "depth5", zipkinQuery)).build(), + new Span.Builder().traceId(20L).parentId(24L).id(25L).name("get") + .timestamp((today + 150) * 1000).duration(50L * 1000) + .addAnnotation(Annotation.create((today + 150) * 1000, Constants.CLIENT_SEND, zipkinQuery)) + .addAnnotation(Annotation.create((today + 200) * 1000, Constants.CLIENT_RECV, zipkinQuery)) + .addBinaryAnnotation(BinaryAnnotation.address(Constants.SERVER_ADDR, zipkinJdbc)).build() + ); + + processDependencies(trace); + + assertThat(store.getDependencies(today * 1000, null)).containsOnly( + new DependencyLink("zipkin-web", "zipkin-query", 1), + new DependencyLink("zipkin-query", "zipkin-jdbc", 1) + ); + } + + /** + * This test confirms that the span store can process trace with intermediate + * spans like the below properly. + * + * span1: SR SS + * span2: intermediate call + * span3: CS SR SS CR: Dependency 1 + */ + @Test + public void getDependencies_duplicateAddress() { + List trace = asList( + new Span.Builder().traceId(20L).id(20L).name("get") + .timestamp(today * 1000).duration(350L * 1000) + .addAnnotation(Annotation.create(today * 1000, Constants.SERVER_RECV, zipkinWeb)) + .addAnnotation(Annotation.create((today + 350) * 1000, Constants.SERVER_SEND, zipkinWeb)) + .addBinaryAnnotation(BinaryAnnotation.address(Constants.CLIENT_ADDR, zipkinWeb)) + .addBinaryAnnotation(BinaryAnnotation.address(Constants.SERVER_ADDR, zipkinWeb)).build(), + new Span.Builder().traceId(20L).parentId(21L).id(22L).name("get") + .timestamp((today + 50) * 1000).duration(250L * 1000) + .addAnnotation(Annotation.create((today + 50) * 1000, Constants.CLIENT_SEND, zipkinWeb)) + .addAnnotation(Annotation.create((today + 300) * 1000, Constants.CLIENT_RECV, zipkinWeb)) + .addBinaryAnnotation(BinaryAnnotation.address(Constants.CLIENT_ADDR, zipkinQuery)) + .addBinaryAnnotation(BinaryAnnotation.address(Constants.SERVER_ADDR, zipkinQuery)).build() + ); + + processDependencies(trace); + + assertThat(store.getDependencies(today * 1000, null)).containsOnly( + new DependencyLink("zipkin-web", "zipkin-query", 1) + ); + } +} diff --git a/interop/src/test/java/zipkin/jdbc/JDBCDependenciesTest.java b/interop/src/test/java/zipkin/jdbc/JDBCDependenciesTest.java new file mode 100644 index 00000000000..050e903bad8 --- /dev/null +++ b/interop/src/test/java/zipkin/jdbc/JDBCDependenciesTest.java @@ -0,0 +1,41 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.jdbc; + +import java.sql.SQLException; +import java.util.List; +import zipkin.DependenciesTest; +import zipkin.Span; +import zipkin.SpanStoreTest; + +public class JDBCDependenciesTest extends DependenciesTest { + + public JDBCDependenciesTest() throws SQLException { + super(new JDBCTestGraph().spanStore); + } + + @Override + public void clear() { + try { + store.clear(); + } catch (SQLException e) { + throw new AssertionError(e); + } + } + + @Override + protected void processDependencies(List spans) { + store.accept(spans.iterator()); + } +} diff --git a/interop/src/test/java/zipkin/jdbc/JDBCScalaDependencyStoreTest.java b/interop/src/test/java/zipkin/jdbc/JDBCScalaDependencyStoreTest.java index d98087d9825..e3bab32fc5e 100644 --- a/interop/src/test/java/zipkin/jdbc/JDBCScalaDependencyStoreTest.java +++ b/interop/src/test/java/zipkin/jdbc/JDBCScalaDependencyStoreTest.java @@ -18,6 +18,7 @@ import com.twitter.zipkin.storage.DependencyStoreSpec; import java.sql.SQLException; import org.junit.BeforeClass; +import org.junit.Ignore; import scala.collection.immutable.List; import zipkin.interop.ScalaDependencyStoreAdapter; import zipkin.interop.ScalaSpanStoreAdapter; @@ -46,4 +47,10 @@ public void clear() { throw new AssertionError(e); } } + + @Override + @Ignore // TODO: re-enable in 1.32.4 per https://github.com/openzipkin/zipkin/pull/947 + public void canSearchForIntervalsBesidesToday() { + super.canSearchForIntervalsBesidesToday(); + } } diff --git a/interop/src/test/java/zipkin/server/InMemoryDependenciesTest.java b/interop/src/test/java/zipkin/server/InMemoryDependenciesTest.java new file mode 100644 index 00000000000..c6bed472bf1 --- /dev/null +++ b/interop/src/test/java/zipkin/server/InMemoryDependenciesTest.java @@ -0,0 +1,35 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.server; + +import java.util.List; +import zipkin.DependenciesTest; +import zipkin.Span; + +public class InMemoryDependenciesTest extends DependenciesTest { + + public InMemoryDependenciesTest() { + super(new InMemorySpanStore()); + } + + @Override + public void clear() { + store.clear(); + } + + @Override + protected void processDependencies(List spans) { + store.accept(spans.iterator()); + } +} diff --git a/interop/src/test/java/zipkin/server/InMemoryScalaDependencyStoreTest.java b/interop/src/test/java/zipkin/server/InMemoryScalaDependencyStoreTest.java new file mode 100644 index 00000000000..74e97fc7fc9 --- /dev/null +++ b/interop/src/test/java/zipkin/server/InMemoryScalaDependencyStoreTest.java @@ -0,0 +1,46 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.server; + +import com.twitter.zipkin.common.Span; +import com.twitter.zipkin.storage.DependencyStore; +import com.twitter.zipkin.storage.DependencyStoreSpec; +import org.junit.Ignore; +import scala.collection.immutable.List; +import zipkin.interop.ScalaDependencyStoreAdapter; +import zipkin.interop.ScalaSpanStoreAdapter; + +public class InMemoryScalaDependencyStoreTest extends DependencyStoreSpec { + private InMemorySpanStore mem = new InMemorySpanStore(); + + @Override + public DependencyStore store() { + return new ScalaDependencyStoreAdapter(mem); + } + + @Override + public void processDependencies(List spans) { + new ScalaSpanStoreAdapter(mem).apply(spans); + } + + public void clear() { + mem.clear(); + } + + @Override + @Ignore // TODO: re-enable in 1.32.4 per https://github.com/openzipkin/zipkin/pull/947 + public void canSearchForIntervalsBesidesToday() { + super.canSearchForIntervalsBesidesToday(); + } +} diff --git a/zipkin-server/src/main/java/zipkin/server/InMemorySpanStore.java b/zipkin-server/src/main/java/zipkin/server/InMemorySpanStore.java index 477f55ba671..b166c3bebd7 100644 --- a/zipkin-server/src/main/java/zipkin/server/InMemorySpanStore.java +++ b/zipkin-server/src/main/java/zipkin/server/InMemorySpanStore.java @@ -26,13 +26,17 @@ import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Stream; +import zipkin.Annotation; import zipkin.BinaryAnnotation; +import zipkin.Constants; import zipkin.DependencyLink; import zipkin.QueryRequest; import zipkin.Span; import zipkin.SpanStore; import zipkin.internal.ApplyTimestampAndDuration; import zipkin.internal.CorrectForClockSkew; +import zipkin.internal.DependencyLinkSpan; +import zipkin.internal.DependencyLinker; import zipkin.internal.MergeById; import zipkin.internal.Nullable; @@ -101,7 +105,46 @@ public synchronized List getSpanNames(String service) { @Override public List getDependencies(long endTs, @Nullable Long lookback) { - return Collections.emptyList(); + endTs *= 1000; + if (lookback == null) { + lookback = endTs; + } else { + lookback *= 1000; + } + + DependencyLinker linksBuilder = new DependencyLinker(); + + for (Collection trace : traceIdToSpans.delegate.values()) { + if (trace.isEmpty()) continue; + + List linkSpans = new LinkedList<>(); + for (Span s : trace) { + Long timestamp = s.timestamp; + if (timestamp == null || + timestamp < (endTs - lookback) || + timestamp > endTs) { + continue; + } + DependencyLinkSpan.Builder linkSpan = new DependencyLinkSpan.Builder(s.parentId, s.id); + for (BinaryAnnotation a : s.binaryAnnotations) { + if (a.key.equals(Constants.CLIENT_ADDR) && a.endpoint != null) { + linkSpan.caService(a.endpoint.serviceName); + } else if (a.key.equals(Constants.SERVER_ADDR) && a.endpoint != null) { + linkSpan.saService(a.endpoint.serviceName); + } + } + for (Annotation a : s.annotations) { + if (a.value.equals(Constants.SERVER_RECV) && a.endpoint != null) { + linkSpan.srService(a.endpoint.serviceName); + break; + } + } + linkSpans.add(linkSpan.build()); + } + + linksBuilder.putTrace(linkSpans.iterator()); + } + return linksBuilder.link(); } private static Predicate> spansPredicate(QueryRequest request) { @@ -116,7 +159,7 @@ private static Predicate> spansPredicate(QueryRequest request) { Predicate durationPredicate = null; if (request.minDuration != null && request.maxDuration != null) { durationPredicate = d -> d >= request.minDuration && d <= request.maxDuration; - } else if (request.minDuration != null){ + } else if (request.minDuration != null) { durationPredicate = d -> d >= request.minDuration; } String spanName = request.spanName; diff --git a/zipkin-spanstores/jdbc/src/main/java/zipkin/jdbc/DependencyLinkSpanIterator.java b/zipkin-spanstores/jdbc/src/main/java/zipkin/jdbc/DependencyLinkSpanIterator.java new file mode 100644 index 00000000000..7197363fa81 --- /dev/null +++ b/zipkin-spanstores/jdbc/src/main/java/zipkin/jdbc/DependencyLinkSpanIterator.java @@ -0,0 +1,117 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.jdbc; + +import java.util.Iterator; +import org.jooq.Record5; +import zipkin.internal.DependencyLinkSpan; +import zipkin.internal.PeekingIterator; + +import static zipkin.Constants.CLIENT_ADDR; +import static zipkin.Constants.SERVER_ADDR; +import static zipkin.Constants.SERVER_RECV; +import static zipkin.jdbc.internal.generated.tables.ZipkinAnnotations.ZIPKIN_ANNOTATIONS; +import static zipkin.jdbc.internal.generated.tables.ZipkinSpans.ZIPKIN_SPANS; + +/** Convenience that lazy converts rows into {@linkplain DependencyLinkSpan} objects. */ +final class DependencyLinkSpanIterator implements Iterator { + + /** Assumes the input records are sorted by trace id, span id */ + static final class ByTraceId implements Iterator> { + final PeekingIterator> delegate; + + Long currentTraceId; + + ByTraceId(Iterator> delegate) { + this.delegate = new PeekingIterator<>(delegate); + } + + @Override public boolean hasNext() { + return delegate.hasNext(); + } + + @Override public Iterator next() { + currentTraceId = delegate.peek().getValue(ZIPKIN_SPANS.TRACE_ID); + return new DependencyLinkSpanIterator(delegate, currentTraceId); + } + + @Override public void remove() { + throw new UnsupportedOperationException(); + } + } + + final PeekingIterator> delegate; + final Long traceId; + + DependencyLinkSpanIterator(PeekingIterator> delegate, + Long traceId) { + this.delegate = delegate; + this.traceId = traceId; + } + + @Override + public boolean hasNext() { + return delegate.hasNext() && delegate.peek().getValue(ZIPKIN_SPANS.TRACE_ID).equals(traceId); + } + + @Override + public DependencyLinkSpan next() { + Record5 row = delegate.next(); + + DependencyLinkSpan.Builder result = new DependencyLinkSpan.Builder( + row.getValue(ZIPKIN_SPANS.PARENT_ID), + row.getValue(ZIPKIN_SPANS.ID) + ); + parseClientAndServerNames( + result, + row.getValue(ZIPKIN_ANNOTATIONS.A_KEY), + row.getValue(ZIPKIN_ANNOTATIONS.ENDPOINT_SERVICE_NAME)); + + while (hasNext()) { + Record5 next = delegate.peek(); + if (next == null) { + continue; + } + if (row.getValue(ZIPKIN_SPANS.ID).equals(next.getValue(ZIPKIN_SPANS.ID))) { + delegate.next(); // advance the iterator since we are in the same span id + parseClientAndServerNames( + result, + next.getValue(ZIPKIN_ANNOTATIONS.A_KEY), + next.getValue(ZIPKIN_ANNOTATIONS.ENDPOINT_SERVICE_NAME)); + } else { + break; + } + } + return result.build(); + } + + void parseClientAndServerNames(DependencyLinkSpan.Builder span, String key, String value) { + if (key == null) return; // neither client nor server + switch (key) { + case CLIENT_ADDR: + span.caService(value); + break; + case SERVER_ADDR: + span.saService(value); + break; + case SERVER_RECV: + span.srService(value); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } +} diff --git a/zipkin-spanstores/jdbc/src/main/java/zipkin/jdbc/JDBCSpanStore.java b/zipkin-spanstores/jdbc/src/main/java/zipkin/jdbc/JDBCSpanStore.java index 889c41fc804..7549901d01a 100644 --- a/zipkin-spanstores/jdbc/src/main/java/zipkin/jdbc/JDBCSpanStore.java +++ b/zipkin-spanstores/jdbc/src/main/java/zipkin/jdbc/JDBCSpanStore.java @@ -22,17 +22,16 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.logging.Logger; import java.util.stream.Collectors; import javax.sql.DataSource; +import org.jooq.Cursor; import org.jooq.DSLContext; import org.jooq.ExecuteListenerProvider; import org.jooq.InsertSetMoreStep; import org.jooq.Query; import org.jooq.Record; import org.jooq.Record1; -import org.jooq.Record3; +import org.jooq.Record5; import org.jooq.SelectConditionStep; import org.jooq.SelectOffsetStep; import org.jooq.Table; @@ -51,20 +50,23 @@ import zipkin.SpanStore; import zipkin.internal.ApplyTimestampAndDuration; import zipkin.internal.CorrectForClockSkew; +import zipkin.internal.DependencyLinkSpan; +import zipkin.internal.DependencyLinker; import zipkin.internal.Nullable; import zipkin.internal.Pair; import zipkin.jdbc.internal.generated.tables.ZipkinAnnotations; import static java.util.Collections.emptyList; -import static java.util.logging.Level.FINEST; import static java.util.stream.Collectors.groupingBy; import static zipkin.BinaryAnnotation.Type.STRING; +import static zipkin.Constants.CLIENT_ADDR; +import static zipkin.Constants.SERVER_ADDR; +import static zipkin.Constants.SERVER_RECV; import static zipkin.internal.Util.checkNotNull; import static zipkin.jdbc.internal.generated.tables.ZipkinAnnotations.ZIPKIN_ANNOTATIONS; import static zipkin.jdbc.internal.generated.tables.ZipkinSpans.ZIPKIN_SPANS; public final class JDBCSpanStore implements SpanStore { - private static final Logger LOGGER = Logger.getLogger(JDBCSpanStore.class.getName()); private static final Charset UTF_8 = Charset.forName("UTF-8"); static { @@ -287,56 +289,40 @@ public List getSpanNames(String serviceName) { public List getDependencies(long endTs, @Nullable Long lookback) { endTs = endTs * 1000; try (Connection conn = datasource.getConnection()) { - Map>> parentChild = context(conn) - .select(ZIPKIN_SPANS.TRACE_ID, ZIPKIN_SPANS.PARENT_ID, ZIPKIN_SPANS.ID) - .from(ZIPKIN_SPANS) + // Lazy fetching the cursor prevents us from buffering the whole dataset in memory. + Cursor> cursor = context(conn) + .selectDistinct(ZIPKIN_SPANS.TRACE_ID, ZIPKIN_SPANS.PARENT_ID, ZIPKIN_SPANS.ID, + ZIPKIN_ANNOTATIONS.A_KEY, ZIPKIN_ANNOTATIONS.ENDPOINT_SERVICE_NAME) + // left joining allows us to keep a mapping of all span ids, not just ones that have + // special annotations. We need all span ids to reconstruct the trace tree. We need + // the whole trace tree so that we can accurately skip local spans. + .from(ZIPKIN_SPANS.leftJoin(ZIPKIN_ANNOTATIONS) + .on(ZIPKIN_SPANS.TRACE_ID.eq(ZIPKIN_ANNOTATIONS.TRACE_ID).and( + ZIPKIN_SPANS.ID.eq(ZIPKIN_ANNOTATIONS.SPAN_ID))) + .and(ZIPKIN_ANNOTATIONS.A_KEY.in(CLIENT_ADDR, SERVER_RECV, SERVER_ADDR))) .where(lookback == null ? ZIPKIN_SPANS.START_TS.lessOrEqual(endTs) : ZIPKIN_SPANS.START_TS.between(endTs - lookback * 1000, endTs)) - .and(ZIPKIN_SPANS.PARENT_ID.isNotNull()) - .stream().collect(Collectors.groupingBy(Record3::value1)); - - Map, String> traceSpanServiceName = traceSpanServiceName(conn, parentChild.keySet()); - - // links are merged by mapping to parent/child and summing corresponding links - Map, Long> linkMap = new LinkedHashMap<>(); - - parentChild.values().stream().flatMap(List::stream).forEach(r -> { - String parent = lookup(traceSpanServiceName, Pair.create(r.value1(), r.value2())); - // can be null if a root span is missing, or the root's span id doesn't eq the trace id - if (parent != null) { - String child = lookup(traceSpanServiceName, Pair.create(r.value1(), r.value3())); - if (child != null) { - Pair key = Pair.create(parent, child); - if (linkMap.containsKey(key)) { - linkMap.put(key, linkMap.get(key) + 1); - } else { - linkMap.put(key, 1L); - } - } - } - }); - List result = new ArrayList<>(linkMap.size()); - for (Map.Entry, Long> entry : linkMap.entrySet()) { - result.add(DependencyLink.create(entry.getKey()._1, entry.getKey()._2, entry.getValue())); + // Grouping so that later code knows when a span or trace is finished. + .groupBy(ZIPKIN_SPANS.TRACE_ID, ZIPKIN_SPANS.ID, ZIPKIN_ANNOTATIONS.A_KEY).fetchLazy(); + + Iterator> traces = + new DependencyLinkSpanIterator.ByTraceId(cursor.iterator()); + + if (!traces.hasNext()) return Collections.emptyList(); + + DependencyLinker linker = new DependencyLinker(); + + while (traces.hasNext()) { + linker.putTrace(traces.next()); } - return result; + + return linker.link(); } catch (SQLException e) { throw new RuntimeException("Error querying dependencies for endTs " + endTs + " and lookback " + lookback + ": " + e.getMessage()); } } - private Map, String> traceSpanServiceName(Connection conn, Set traceIds) { - return context(conn) - .selectDistinct(ZIPKIN_ANNOTATIONS.TRACE_ID, ZIPKIN_ANNOTATIONS.SPAN_ID, ZIPKIN_ANNOTATIONS.ENDPOINT_SERVICE_NAME) - .from(ZIPKIN_ANNOTATIONS) - .where(ZIPKIN_ANNOTATIONS.TRACE_ID.in(traceIds)) - .and(ZIPKIN_ANNOTATIONS.A_KEY.in("sr", "sa")) - .and(ZIPKIN_ANNOTATIONS.ENDPOINT_SERVICE_NAME.isNotNull()) - .groupBy(ZIPKIN_ANNOTATIONS.TRACE_ID, ZIPKIN_ANNOTATIONS.SPAN_ID) - .fetchMap(r -> Pair.create(r.value1(), r.value2()), Record3::value3); - } - private static Endpoint endpoint(Record a) { String serviceName = a.getValue(ZIPKIN_ANNOTATIONS.ENDPOINT_SERVICE_NAME); if (serviceName == null) { @@ -396,16 +382,4 @@ private static Table join(Table table, ZipkinAnnotations joinTable, String .and(joinTable.A_TYPE.eq(type)) .and(joinTable.A_KEY.eq(key)); } - - private static String lookup(Map, String> table, Pair key) { - String value = table.get(key); - if (value == null && LOGGER.isLoggable(FINEST)) { - if (key._1.equals(key._2)) { - LOGGER.log(FINEST, "could not find service name of root span " + key._1); - } else { - LOGGER.log(FINEST, "could not find service name of span " + key); - } - } - return value; - } } diff --git a/zipkin/src/main/java/zipkin/internal/DependencyLinkSpan.java b/zipkin/src/main/java/zipkin/internal/DependencyLinkSpan.java new file mode 100644 index 00000000000..bd927b15f6e --- /dev/null +++ b/zipkin/src/main/java/zipkin/internal/DependencyLinkSpan.java @@ -0,0 +1,106 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.internal; + +import static zipkin.internal.Util.checkNotNull; +import static zipkin.internal.Util.equal; + +/** + * Internal type used by {@link DependencyLinker linker} that holds the minimum state needed to + * aggregate {@link zipkin.DependencyLink dependency links}. + */ +// fields not exposed as public to further discourage use as a general type +public final class DependencyLinkSpan { + + /** + * Indicates the primary span type. + */ + enum Kind { + CLIENT, + /** The span includes a {@link zipkin.Constants#SERVER_RECV}. */ + SERVER, + UNKNOWN + } + + final Kind kind; + @Nullable + final Long parentId; + final long spanId; + @Nullable + final String service; + @Nullable + final String peerService; + + DependencyLinkSpan(Kind kind, Long parentId, long spanId, String service, String peerService) { + this.kind = checkNotNull(kind, "kind"); + this.parentId = parentId; + this.spanId = spanId; + this.service = service; + this.peerService = peerService; + } + + public static final class Builder { + private final Long parentId; + private final long spanId; + private String srService; + private String caService; + private String saService; + + public Builder(Long parentId, long spanId) { + this.spanId = spanId; + this.parentId = parentId; + } + + /** + * {@link zipkin.Constants#SERVER_RECV} is the preferred name of server, and this is a + * traditional span. + */ + public Builder srService(String srService) { + this.srService = srService; + return this; + } + + /** + * {@link zipkin.Constants#CLIENT_ADDR} is read to see calls into the root span from + * uninstrumented clients. + */ + public Builder caService(String caService) { + this.caService = caService; + return this; + } + + /** + * {@link zipkin.Constants#SERVER_ADDR} is only read at the leaf, when a client calls an + * un-instrumented server. + */ + public Builder saService(String saService) { + this.saService = saService; + return this; + } + + public DependencyLinkSpan build() { + // Finagle labels two sides of the same socket ("ca", "sa") with the same name. + // Skip the client side, so it isn't mistaken for a loopback request + if (equal(saService, caService)) { + caService = null; + } + if (srService != null) { + return new DependencyLinkSpan(Kind.SERVER, parentId, spanId, srService, caService); + } else if (saService != null) { + return new DependencyLinkSpan(Kind.CLIENT, parentId, spanId, caService, saService); + } + return new DependencyLinkSpan(Kind.UNKNOWN, parentId, spanId, null, null); + } + } +} \ No newline at end of file diff --git a/zipkin/src/main/java/zipkin/internal/DependencyLinker.java b/zipkin/src/main/java/zipkin/internal/DependencyLinker.java new file mode 100644 index 00000000000..cebf0588dad --- /dev/null +++ b/zipkin/src/main/java/zipkin/internal/DependencyLinker.java @@ -0,0 +1,100 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.internal; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import zipkin.DependencyLink; + +/** + * This parses a span tree into dependency links used by Web UI. Ex. http://zipkin/dependency + * + *

This implementation traverses the tree, and only creates links between {@link + * DependencyLinkSpan.Kind#SERVER server} spans. One exception is at the bottom of the trace tree. + * {@link DependencyLinkSpan.Kind#CLIENT client} spans that record their {@link + * DependencyLinkSpan#peerService peer} are included, as this accounts for uninstrumented + * services. + */ +public final class DependencyLinker { + + private final Map, Long> linkMap = new LinkedHashMap<>(); + + /** + * @param spans spans where all spans have the same trace id + */ + public void putTrace(Iterator spans) { + if (!spans.hasNext()) return; + + Node.TreeBuilder builder = new Node.TreeBuilder<>(); + while (spans.hasNext()) { + DependencyLinkSpan next = spans.next(); + builder.addNode(next.parentId, next.spanId, next); + } + Node tree = builder.build(); + + // find any nodes who have + for (Iterator> i = tree.traverse(); i.hasNext(); ) { + Node current = i.next(); + String server; + String client; + switch (current.value().kind) { + case SERVER: + server = current.value().service; + client = current.value().peerService; + if (current == tree) { // we are the root-most span. + if (client == null) { + continue; // skip if we can't read the root's uninstrumented client + } + } + break; + case CLIENT: + server = current.value().peerService; + client = current.value().service; + break; + default: + continue; // skip if we are missing the server's name + } + + // Local spans may be between the current node and its remote ancestor + // Look up the stack until we see a service name, and assume that's the client + Node parent = current.parent(); + while (parent != null && client == null) { + if (parent.value().kind == DependencyLinkSpan.Kind.SERVER) { + client = parent.value().service; + } + parent = parent.parent(); + } + if (client == null) continue; // skip if no ancestors were servers + + Pair key = Pair.create(client, server); + if (linkMap.containsKey(key)) { + linkMap.put(key, linkMap.get(key) + 1); + } else { + linkMap.put(key, 1L); + } + } + } + + public List link() { + // links are merged by mapping to parent/child and summing corresponding links + List result = new ArrayList<>(linkMap.size()); + for (Map.Entry, Long> entry : linkMap.entrySet()) { + result.add(DependencyLink.create(entry.getKey()._1, entry.getKey()._2, entry.getValue())); + } + return result; + } +} diff --git a/zipkin/src/main/java/zipkin/internal/PeekingIterator.java b/zipkin/src/main/java/zipkin/internal/PeekingIterator.java new file mode 100644 index 00000000000..6a210b6e2d6 --- /dev/null +++ b/zipkin/src/main/java/zipkin/internal/PeekingIterator.java @@ -0,0 +1,107 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.internal; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +import static zipkin.internal.Util.checkNotNull; + +/** + * adapted from guava's {@code com.google.common.collect.AbstractIterator}. + */ +public class PeekingIterator implements Iterator { + + private final Iterator delegate; + private PeekingIterator.State state = State.NOT_READY; + private T next; + + /** + * Constructor for use by subclasses. + */ + public PeekingIterator(Iterator delegate) { + this.delegate = checkNotNull(delegate, "delegate"); + } + + protected T computeNext() { + if (delegate.hasNext()) { + return delegate.next(); + } + return endOfData(); + } + + protected final T endOfData() { + state = State.DONE; + return null; + } + + @Override + public final boolean hasNext() { + switch (state) { + case DONE: + return false; + case READY: + return true; + default: + } + return tryToComputeNext(); + } + + private boolean tryToComputeNext() { + next = computeNext(); + if (state != State.DONE) { + state = State.READY; + return true; + } + return false; + } + + @Override + public final T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + state = State.NOT_READY; + return next; + } + + public T peek() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return next; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + private enum State { + /** + * We have computed the next element and haven't returned it yet. + */ + READY, + + /** + * We haven't yet computed or have already returned the element. + */ + NOT_READY, + + /** + * We have reached the end of the data and are finished. + */ + DONE, + } +} diff --git a/zipkin/src/test/java/zipkin/internal/PeekingIteratorTest.java b/zipkin/src/test/java/zipkin/internal/PeekingIteratorTest.java new file mode 100644 index 00000000000..47e815c533a --- /dev/null +++ b/zipkin/src/test/java/zipkin/internal/PeekingIteratorTest.java @@ -0,0 +1,80 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.internal; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertTrue; + +public class PeekingIteratorTest { + + @Rule + public final ExpectedException thrown = ExpectedException.none(); + + @Test + public void unmodifiable() { + thrown.expect(UnsupportedOperationException.class); + + PeekingIterator it = TrueThenDone.INSTANCE.iterator(); + assertThat(it).containsExactly(true); + it.remove(); + } + + @Test + public void next() { + thrown.expect(NoSuchElementException.class); + + PeekingIterator it = TrueThenDone.INSTANCE.iterator(); + assertThat(it).containsExactly(true); + it.next(); + } + + @Test + public void peek() { + thrown.expect(NoSuchElementException.class); + + PeekingIterator it = TrueThenDone.INSTANCE.iterator(); + assertTrue(it.peek()); + assertThat(it).containsExactly(true); + it.peek(); + } + + enum TrueThenDone implements Iterable { + INSTANCE; + + @Override + public PeekingIterator iterator() { + return new PeekingIterator<>(new Iterator() { + boolean val = true; + + @Override public boolean hasNext() { + return val; + } + + @Override public Boolean next() { + if (val) { + val = false; + return true; + } + return false; + } + }); + } + } +}