diff --git a/benchmarks/src/main/java/zipkin/benchmarks/CodecBenchmarks.java b/benchmarks/src/main/java/zipkin/benchmarks/CodecBenchmarks.java index ff08ce9579d..9640ff60670 100644 --- a/benchmarks/src/main/java/zipkin/benchmarks/CodecBenchmarks.java +++ b/benchmarks/src/main/java/zipkin/benchmarks/CodecBenchmarks.java @@ -98,7 +98,7 @@ public byte[] writeClientSpan_thrift_java() { return Codec.THRIFT.writeSpan(clientSpan); } - static final byte[] rpcSpanJson = read("/span-client.json"); + static final byte[] rpcSpanJson = read("/span-rpc.json"); static final Span rpcSpan = Codec.JSON.readSpan(rpcSpanJson); static final byte[] rpcSpanThrift = Codec.THRIFT.writeSpan(rpcSpan); @@ -122,6 +122,30 @@ public byte[] writeRpcSpan_thrift_java() { return Codec.THRIFT.writeSpan(rpcSpan); } + static final byte[] rpcV6SpanJson = read("/span-rpc-ipv6.json"); + static final Span rpcV6Span = Codec.JSON.readSpan(rpcV6SpanJson); + static final byte[] rpcV6SpanThrift = Codec.THRIFT.writeSpan(rpcV6Span); + + @Benchmark + public Span readRpcV6Span_json_java() { + return Codec.JSON.readSpan(rpcV6SpanJson); + } + + @Benchmark + public Span readRpcV6Span_thrift_java() { + return Codec.THRIFT.readSpan(rpcV6SpanThrift); + } + + @Benchmark + public byte[] writeRpcV6Span_json_java() { + return Codec.JSON.writeSpan(rpcV6Span); + } + + @Benchmark + public byte[] writeRpcV6Span_thrift_java() { + return Codec.THRIFT.writeSpan(rpcV6Span); + } + // Convenience main entry-point public static void main(String[] args) throws RunnerException { Options opt = new OptionsBuilder() diff --git a/benchmarks/src/main/resources/span-rpc-ipv6.json b/benchmarks/src/main/resources/span-rpc-ipv6.json new file mode 100644 index 00000000000..f3096319636 --- /dev/null +++ b/benchmarks/src/main/resources/span-rpc-ipv6.json @@ -0,0 +1,130 @@ +{ + "traceId": "83c138a4ee0f3de9", + "name": "get", + "id": "d673d7abe80d2f3f", + "parentId": "83c138a4ee0f3de9", + "timestamp": 1461750491274000, + "duration": 51000, + "annotations": [ + { + "timestamp": 1461750491274000, + "value": "cs", + "endpoint": { + "serviceName": "zipkin-web", + "ipv6": "2001:db8::c001" + } + }, + { + "timestamp": 1461750491274000, + "value": "ws", + "endpoint": { + "serviceName": "zipkin-web", + "ipv6": "2001:db8::c001" + } + }, + { + "timestamp": 1461750491298000, + "value": "sr", + "endpoint": { + "serviceName": "zipkin-query", + "ipv6": "2001:db8::c002", + "port": 9411 + } + }, + { + "timestamp": 1461750491302000, + "value": "ss", + "endpoint": { + "serviceName": "zipkin-query", + "ipv6": "2001:db8::c002", + "port": 9411 + } + }, + { + "timestamp": 1461750491325000, + "value": "cr", + "endpoint": { + "serviceName": "zipkin-web", + "ipv6": "2001:db8::c001" + } + }, + { + "timestamp": 1461750491325000, + "value": "wr", + "endpoint": { + "serviceName": "zipkin-web", + "ipv6": "2001:db8::c001" + } + } + ], + "binaryAnnotations": [ + { + "key": "http.path", + "value": "/api/v1/traces", + "endpoint": { + "serviceName": "zipkin-query", + "ipv6": "2001:db8::c002" + } + }, + { + "key": "srv/finagle.version", + "value": "6.34.0", + "endpoint": { + "serviceName": "zipkin-query", + "ipv6": "2001:db8::c002" + } + }, + { + "key": "sa", + "value": true, + "endpoint": { + "serviceName": "zipkin-query", + "ipv6": "2001:db8::c002", + "port": 9411 + } + }, + { + "key": "ca", + "value": true, + "endpoint": { + "serviceName": "zipkin-query", + "ipv6": "2001:db8::c002", + "port": 42291 + } + }, + { + "key": "http.uri", + "value": "/api/v1/traces", + "endpoint": { + "serviceName": "zipkin-web", + "ipv6": "2001:db8::c001" + } + }, + { + "key": "clnt/finagle.version", + "value": "6.34.0", + "endpoint": { + "serviceName": "zipkin-web", + "ipv6": "2001:db8::c001" + } + }, + { + "key": "sa", + "value": true, + "endpoint": { + "serviceName": "zipkin-query", + "ipv6": "2001:db8::c002", + "port": 9411 + } + }, + { + "key": "ca", + "value": true, + "endpoint": { + "serviceName": "zipkin-web", + "ipv6": "2001:db8::c001", + "port": 42291 + } + } + ] +} diff --git a/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/HasIpv6.java b/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/HasIpv6.java new file mode 100644 index 00000000000..eb767f34b07 --- /dev/null +++ b/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/HasIpv6.java @@ -0,0 +1,59 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.storage.mysql; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.sql.DataSource; +import org.jooq.DSLContext; +import org.jooq.exception.DataAccessException; +import zipkin.internal.Lazy; + +import static zipkin.storage.mysql.internal.generated.tables.ZipkinAnnotations.ZIPKIN_ANNOTATIONS; + +final class HasIpv6 extends Lazy { + private static final Logger LOG = Logger.getLogger(HasIpv6.class.getName()); + + final DataSource datasource; + final DSLContexts context; + + HasIpv6(DataSource datasource, DSLContexts context) { + this.datasource = datasource; + this.context = context; + } + + @Override protected Boolean compute() { + try (Connection conn = datasource.getConnection()) { + DSLContext dsl = context.get(conn); + dsl.select(ZIPKIN_ANNOTATIONS.ENDPOINT_IPV6).from(ZIPKIN_ANNOTATIONS).limit(1).fetchAny(); + return true; + } catch (DataAccessException e) { + if (e.sqlState().equals("42S22")) { + LOG.warning("zipkin_annotations.ipv6 doesn't exist, so Endpoint.ipv6 is not supported. " + + "Execute: alter table zipkin_annotations add `endpoint_ipv6` BINARY(16)"); + return false; + } + problemReading(e); + } catch (SQLException | RuntimeException e) { + problemReading(e); + } + return false; + } + + static void problemReading(Exception e) { + LOG.log(Level.WARNING, "problem reading zipkin_annotations.ipv6", e); + } +} diff --git a/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/MySQLSpanConsumer.java b/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/MySQLSpanConsumer.java index 5dff66b3050..1aecce0284e 100644 --- a/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/MySQLSpanConsumer.java +++ b/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/MySQLSpanConsumer.java @@ -29,6 +29,7 @@ import zipkin.BinaryAnnotation; import zipkin.Span; import zipkin.internal.ApplyTimestampAndDuration; +import zipkin.internal.Lazy; import zipkin.storage.AsyncSpanConsumer; import zipkin.storage.StorageAdapters; @@ -38,10 +39,12 @@ final class MySQLSpanConsumer implements StorageAdapters.SpanConsumer { private final DataSource datasource; private final DSLContexts context; + private final Lazy hasIpv6; - MySQLSpanConsumer(DataSource datasource, DSLContexts context) { + MySQLSpanConsumer(DataSource datasource, DSLContexts context, Lazy hasIpv6) { this.datasource = datasource; this.context = context; + this.hasIpv6 = hasIpv6; } /** Blocking version of {@link AsyncSpanConsumer#accept} */ @@ -94,6 +97,9 @@ final class MySQLSpanConsumer implements StorageAdapters.SpanConsumer { if (annotation.endpoint != null) { insert.set(ZIPKIN_ANNOTATIONS.ENDPOINT_SERVICE_NAME, annotation.endpoint.serviceName); insert.set(ZIPKIN_ANNOTATIONS.ENDPOINT_IPV4, annotation.endpoint.ipv4); + if (annotation.endpoint.ipv6 != null && hasIpv6.get()) { + insert.set(ZIPKIN_ANNOTATIONS.ENDPOINT_IPV6, annotation.endpoint.ipv6); + } insert.set(ZIPKIN_ANNOTATIONS.ENDPOINT_PORT, annotation.endpoint.port); } inserts.add(insert.onDuplicateKeyIgnore()); @@ -110,6 +116,9 @@ final class MySQLSpanConsumer implements StorageAdapters.SpanConsumer { if (annotation.endpoint != null) { insert.set(ZIPKIN_ANNOTATIONS.ENDPOINT_SERVICE_NAME, annotation.endpoint.serviceName); insert.set(ZIPKIN_ANNOTATIONS.ENDPOINT_IPV4, annotation.endpoint.ipv4); + if (annotation.endpoint.ipv6 != null && hasIpv6.get()) { + insert.set(ZIPKIN_ANNOTATIONS.ENDPOINT_IPV6, annotation.endpoint.ipv6); + } insert.set(ZIPKIN_ANNOTATIONS.ENDPOINT_PORT, annotation.endpoint.port); } inserts.add(insert.onDuplicateKeyIgnore()); diff --git a/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/MySQLSpanStore.java b/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/MySQLSpanStore.java index 84f53cb86dc..ff9cfb9c419 100644 --- a/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/MySQLSpanStore.java +++ b/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/MySQLSpanStore.java @@ -16,6 +16,7 @@ import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; @@ -26,6 +27,7 @@ import org.jooq.Condition; import org.jooq.Cursor; import org.jooq.DSLContext; +import org.jooq.Field; import org.jooq.Record; import org.jooq.Record1; import org.jooq.Record5; @@ -42,6 +44,7 @@ import zipkin.internal.CorrectForClockSkew; import zipkin.internal.DependencyLinkSpan; import zipkin.internal.DependencyLinker; +import zipkin.internal.Lazy; import zipkin.internal.Nullable; import zipkin.internal.Pair; import zipkin.storage.QueryRequest; @@ -59,23 +62,33 @@ import static zipkin.storage.mysql.internal.generated.tables.ZipkinSpans.ZIPKIN_SPANS; final class MySQLSpanStore implements SpanStore { + static final Field[] ANNOTATION_FIELDS_WITHOUT_IPV6; + + static { + ArrayList> list = new ArrayList(Arrays.asList(ZIPKIN_ANNOTATIONS.fields())); + list.remove(ZIPKIN_ANNOTATIONS.ENDPOINT_IPV6); + list.trimToSize(); + ANNOTATION_FIELDS_WITHOUT_IPV6 = list.toArray(new Field[list.size()]); + } + private final DataSource datasource; private final DSLContexts context; + private final Lazy hasIpv6; - MySQLSpanStore(DataSource datasource, DSLContexts context) { + MySQLSpanStore(DataSource datasource, DSLContexts context, Lazy hasIpv6) { this.datasource = datasource; this.context = context; + this.hasIpv6 = hasIpv6; } - static Endpoint endpoint(Record a) { + private Endpoint endpoint(Record a) { String serviceName = a.getValue(ZIPKIN_ANNOTATIONS.ENDPOINT_SERVICE_NAME); - if (serviceName == null) { - return null; - } - Short port = a.getValue(ZIPKIN_ANNOTATIONS.ENDPOINT_PORT); - return port != null ? - Endpoint.create(serviceName, a.getValue(ZIPKIN_ANNOTATIONS.ENDPOINT_IPV4), port.intValue()) - : Endpoint.create(serviceName, a.getValue(ZIPKIN_ANNOTATIONS.ENDPOINT_IPV4)); + if (serviceName == null) return null; + return Endpoint.builder() + .serviceName(serviceName) + .port(a.getValue(ZIPKIN_ANNOTATIONS.ENDPOINT_PORT)) + .ipv4(a.getValue(ZIPKIN_ANNOTATIONS.ENDPOINT_IPV4)) + .ipv6(hasIpv6.get() ? a.getValue(ZIPKIN_ANNOTATIONS.ENDPOINT_IPV6) : null).build(); } private static SelectOffsetStep> toTraceIdQuery(DSLContext context, @@ -159,7 +172,8 @@ List> getTraces(@Nullable QueryRequest request, @Nullable Long traceI groupingBy((Span s) -> s.traceId, LinkedHashMap::new, Collectors.toList())); dbAnnotations = context.get(conn) - .selectFrom(ZIPKIN_ANNOTATIONS) + .select(hasIpv6.get() ? ZIPKIN_ANNOTATIONS.fields() : ANNOTATION_FIELDS_WITHOUT_IPV6) + .from(ZIPKIN_ANNOTATIONS) .where(ZIPKIN_ANNOTATIONS.TRACE_ID.in(spansWithoutAnnotations.keySet())) .orderBy(ZIPKIN_ANNOTATIONS.A_TIMESTAMP.asc(), ZIPKIN_ANNOTATIONS.A_KEY.asc()) .stream() diff --git a/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/MySQLStorage.java b/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/MySQLStorage.java index 5e4b9a856d4..6f8810355cb 100644 --- a/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/MySQLStorage.java +++ b/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/MySQLStorage.java @@ -19,6 +19,7 @@ import javax.sql.DataSource; import org.jooq.ExecuteListenerProvider; import org.jooq.conf.Settings; +import zipkin.internal.Lazy; import zipkin.internal.Nullable; import zipkin.storage.AsyncSpanConsumer; import zipkin.storage.AsyncSpanStore; @@ -78,6 +79,7 @@ public MySQLStorage build() { private final DataSource datasource; private final Executor executor; private final DSLContexts context; + final Lazy hasIpv6; private final SpanStore spanStore; private final AsyncSpanStore asyncSpanStore; private final AsyncSpanConsumer asyncSpanConsumer; @@ -86,9 +88,10 @@ public MySQLStorage build() { this.datasource = checkNotNull(builder.datasource, "datasource"); this.executor = checkNotNull(builder.executor, "executor"); this.context = new DSLContexts(builder.settings, builder.listenerProvider); - this.spanStore = new MySQLSpanStore(datasource, context); + this.hasIpv6 = new HasIpv6(datasource, context); + this.spanStore = new MySQLSpanStore(datasource, context, hasIpv6); this.asyncSpanStore = blockingToAsync(spanStore, executor); - this.asyncSpanConsumer = blockingToAsync(new MySQLSpanConsumer(datasource, context), executor); + this.asyncSpanConsumer = blockingToAsync(new MySQLSpanConsumer(datasource, context, hasIpv6), executor); } /** Returns the session in use by this storage component. */ @@ -128,7 +131,7 @@ void clear() { try (Connection conn = datasource.getConnection()) { context.get(conn).truncate(ZIPKIN_SPANS).execute(); context.get(conn).truncate(ZIPKIN_ANNOTATIONS).execute(); - } catch (SQLException e) { + } catch (SQLException | RuntimeException e) { throw new AssertionError(e); } } diff --git a/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/internal/generated/DefaultCatalog.java b/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/internal/generated/DefaultCatalog.java index a351e02c185..c35462ffd01 100644 --- a/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/internal/generated/DefaultCatalog.java +++ b/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/internal/generated/DefaultCatalog.java @@ -33,14 +33,14 @@ @Generated( value = { "http://www.jooq.org", - "jOOQ version:3.8.0" + "jOOQ version:3.8.3" }, comments = "This class is generated by jOOQ" ) @SuppressWarnings({ "all", "unchecked", "rawtypes" }) public class DefaultCatalog extends CatalogImpl { - private static final long serialVersionUID = 899141124; + private static final long serialVersionUID = -1715313401; /** * The reference instance of diff --git a/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/internal/generated/Tables.java b/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/internal/generated/Tables.java index e22914008bd..187aa895987 100644 --- a/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/internal/generated/Tables.java +++ b/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/internal/generated/Tables.java @@ -29,7 +29,7 @@ @Generated( value = { "http://www.jooq.org", - "jOOQ version:3.8.0" + "jOOQ version:3.8.3" }, comments = "This class is generated by jOOQ" ) diff --git a/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/internal/generated/Zipkin.java b/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/internal/generated/Zipkin.java index cf0ca174c2d..16ecabbd698 100644 --- a/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/internal/generated/Zipkin.java +++ b/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/internal/generated/Zipkin.java @@ -37,14 +37,14 @@ @Generated( value = { "http://www.jooq.org", - "jOOQ version:3.8.0" + "jOOQ version:3.8.3" }, comments = "This class is generated by jOOQ" ) @SuppressWarnings({ "all", "unchecked", "rawtypes" }) public class Zipkin extends SchemaImpl { - private static final long serialVersionUID = 2103762729; + private static final long serialVersionUID = 1459649196; /** * The reference instance of zipkin diff --git a/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/internal/generated/tables/ZipkinAnnotations.java b/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/internal/generated/tables/ZipkinAnnotations.java index ae8f6c2840a..59ab8cff0ac 100644 --- a/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/internal/generated/tables/ZipkinAnnotations.java +++ b/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/internal/generated/tables/ZipkinAnnotations.java @@ -35,14 +35,14 @@ @Generated( value = { "http://www.jooq.org", - "jOOQ version:3.8.0" + "jOOQ version:3.8.3" }, comments = "This class is generated by jOOQ" ) @SuppressWarnings({ "all", "unchecked", "rawtypes" }) public class ZipkinAnnotations extends TableImpl { - private static final long serialVersionUID = 1926736748; + private static final long serialVersionUID = 234778620; /** * The reference instance of zipkin.zipkin_annotations @@ -92,6 +92,11 @@ public Class getRecordType() { */ public final TableField ENDPOINT_IPV4 = createField("endpoint_ipv4", org.jooq.impl.SQLDataType.INTEGER, this, "Null when Binary/Annotation.endpoint is null"); + /** + * The column zipkin.zipkin_annotations.endpoint_ipv6. Null when Binary/Annotation.endpoint is null, or no IPv6 address + */ + public final TableField ENDPOINT_IPV6 = createField("endpoint_ipv6", org.jooq.impl.SQLDataType.BINARY.length(16), this, "Null when Binary/Annotation.endpoint is null, or no IPv6 address"); + /** * The column zipkin.zipkin_annotations.endpoint_port. Null when Binary/Annotation.endpoint is null */ diff --git a/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/internal/generated/tables/ZipkinSpans.java b/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/internal/generated/tables/ZipkinSpans.java index cb1bd3f9d5c..b177fb348de 100644 --- a/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/internal/generated/tables/ZipkinSpans.java +++ b/zipkin-storage/mysql/src/main/java/zipkin/storage/mysql/internal/generated/tables/ZipkinSpans.java @@ -35,14 +35,14 @@ @Generated( value = { "http://www.jooq.org", - "jOOQ version:3.8.0" + "jOOQ version:3.8.3" }, comments = "This class is generated by jOOQ" ) @SuppressWarnings({ "all", "unchecked", "rawtypes" }) public class ZipkinSpans extends TableImpl { - private static final long serialVersionUID = 1015565636; + private static final long serialVersionUID = -1910588191; /** * The reference instance of zipkin.zipkin_spans diff --git a/zipkin-storage/mysql/src/main/resources/mysql.sql b/zipkin-storage/mysql/src/main/resources/mysql.sql index 7bdfe123019..7bb6348bcfb 100644 --- a/zipkin-storage/mysql/src/main/resources/mysql.sql +++ b/zipkin-storage/mysql/src/main/resources/mysql.sql @@ -22,6 +22,7 @@ CREATE TABLE IF NOT EXISTS zipkin_annotations ( `a_type` INT NOT NULL COMMENT 'BinaryAnnotation.type() or -1 if Annotation', `a_timestamp` BIGINT COMMENT 'Used to implement TTL; Annotation.timestamp or zipkin_spans.timestamp', `endpoint_ipv4` INT COMMENT 'Null when Binary/Annotation.endpoint is null', + `endpoint_ipv6` BINARY(16) COMMENT 'Null when Binary/Annotation.endpoint is null, or no IPv6 address', `endpoint_port` SMALLINT COMMENT 'Null when Binary/Annotation.endpoint is null', `endpoint_service_name` VARCHAR(255) COMMENT 'Null when Binary/Annotation.endpoint is null' ) ENGINE=InnoDB ROW_FORMAT=COMPRESSED; @@ -32,4 +33,3 @@ ALTER TABLE zipkin_annotations ADD INDEX(`trace_id`) COMMENT 'for getTraces/ById ALTER TABLE zipkin_annotations ADD INDEX(`endpoint_service_name`) COMMENT 'for getTraces and getServiceNames'; ALTER TABLE zipkin_annotations ADD INDEX(`a_type`) COMMENT 'for getTraces'; ALTER TABLE zipkin_annotations ADD INDEX(`a_key`) COMMENT 'for getTraces'; -ALTER TABLE zipkin_annotations ADD INDEX(`endpoint_ipv4`) COMMENT 'for getTraces ordering'; diff --git a/zipkin-storage/mysql/src/test/java/zipkin/storage/mysql/MySQLStorageTest.java b/zipkin-storage/mysql/src/test/java/zipkin/storage/mysql/MySQLStorageTest.java index b50afd92cf2..54438be5272 100644 --- a/zipkin-storage/mysql/src/test/java/zipkin/storage/mysql/MySQLStorageTest.java +++ b/zipkin-storage/mysql/src/test/java/zipkin/storage/mysql/MySQLStorageTest.java @@ -14,7 +14,9 @@ package zipkin.storage.mysql; import java.sql.SQLException; +import java.sql.SQLSyntaxErrorException; import javax.sql.DataSource; +import org.jooq.exception.DataAccessException; import org.junit.Test; import zipkin.Component.CheckResult; @@ -38,4 +40,47 @@ public void check_failsInsteadOfThrowing() throws SQLException { assertThat(result.exception) .isInstanceOf(SQLException.class); } + + @Test + public void hasIpv6_falseWhenKnownSQLState() throws SQLException { + SQLSyntaxErrorException sqlException = new SQLSyntaxErrorException( + "Unknown column 'zipkin_annotations.endpoint_ipv6' in 'field list'", + "42S22", 1054); + DataSource dataSource = mock(DataSource.class); + + // cheats to lower mock count: this exception is really thrown during execution of the query + when(dataSource.getConnection()).thenThrow( + new DataAccessException(sqlException.getMessage(), sqlException)); + + Boolean result = MySQLStorage.builder() + .executor(Runnable::run) + .datasource(dataSource) + .build().hasIpv6.get(); + + assertThat(result).isFalse(); + } + + /** + * This returns false instead of failing when the SQLState code doesn't imply the column is + * missing. This is to prevent zipkin from crashing due to scenarios we haven't thought up, yet. + * The root error goes into the log in this case. + */ + @Test + public void hasIpv6_falseWhenUnknownSQLState() throws SQLException { + SQLSyntaxErrorException sqlException = new SQLSyntaxErrorException( + "java.sql.SQLSyntaxErrorException: Table 'zipkin.zipkin_annotations' doesn't exist", + "42S02", 1146); + DataSource dataSource = mock(DataSource.class); + + // cheats to lower mock count: this exception is really thrown during execution of the query + when(dataSource.getConnection()).thenThrow( + new DataAccessException(sqlException.getMessage(), sqlException)); + + Boolean result = MySQLStorage.builder() + .executor(Runnable::run) + .datasource(dataSource) + .build().hasIpv6.get(); + + assertThat(result).isFalse(); + } } diff --git a/zipkin-ui/js/component_ui/traceSummary.js b/zipkin-ui/js/component_ui/traceSummary.js index 94772dbde25..c2cab1f79c9 100644 --- a/zipkin-ui/js/component_ui/traceSummary.js +++ b/zipkin-ui/js/component_ui/traceSummary.js @@ -143,7 +143,8 @@ export function getTraceErrorType(spans) { } function endpointEquals(e1, e2) { - return e1.ipv4 === e2.ipv4 && e1.port === e2.port && e1.serviceName === e2.serviceName; + return (e1.ipv4 === e2.ipv4 || e1.ipv6 === e2.ipv6) + && e1.port === e2.port && e1.serviceName === e2.serviceName; } export function traceSummary(spans = []) { diff --git a/zipkin-ui/js/component_ui/traceToMustache.js b/zipkin-ui/js/component_ui/traceToMustache.js index e926ac34cab..a8c2bd3bb0f 100644 --- a/zipkin-ui/js/component_ui/traceToMustache.js +++ b/zipkin-ui/js/component_ui/traceToMustache.js @@ -79,10 +79,16 @@ function toSpanDepths(spans) { return treeDepths(entry, 1); } -export function formatEndpoint({ipv4, port = 0, serviceName = ''}) { +export function formatEndpoint({ipv4, ipv6, port = 0, serviceName = ''}) { if (serviceName) { + if (ipv6) { + return `[${ipv6}]:${port} (${serviceName})`; + } return `${ipv4}:${port} (${serviceName})`; } + if (ipv6) { + return `[${ipv6}]:${port}`; + } return `${ipv4}:${port}`; } diff --git a/zipkin-ui/test/component_ui/traceToMustache.test.js b/zipkin-ui/test/component_ui/traceToMustache.test.js index 1c62b978dfe..bbbf62f96e3 100644 --- a/zipkin-ui/test/component_ui/traceToMustache.test.js +++ b/zipkin-ui/test/component_ui/traceToMustache.test.js @@ -145,4 +145,14 @@ describe('formatEndpoint', () => { '150.151.152.153:9042' ); }); + + it('should put ipv6 in brackets', () => { + formatEndpoint({ipv6: '2001:db8::c001', port: 9042, serviceName: 'cassandra'}).should.equal( + '[2001:db8::c001]:9042 (cassandra)' + ); + + formatEndpoint({ipv6: '2001:db8::c001', port: 9042}).should.equal( + '[2001:db8::c001]:9042' + ); + }); }); diff --git a/zipkin/src/main/java/zipkin/Endpoint.java b/zipkin/src/main/java/zipkin/Endpoint.java index b020bd51d65..d7ce54b75b7 100644 --- a/zipkin/src/main/java/zipkin/Endpoint.java +++ b/zipkin/src/main/java/zipkin/Endpoint.java @@ -14,11 +14,13 @@ package zipkin; import java.net.InetSocketAddress; +import java.util.Arrays; import java.util.Locale; import zipkin.internal.JsonCodec; import zipkin.internal.Nullable; import zipkin.internal.Util; +import static zipkin.internal.Util.checkArgument; import static zipkin.internal.Util.checkNotNull; /** @@ -32,11 +34,11 @@ public final class Endpoint { public static Endpoint create(String serviceName, int ipv4, int port) { - return new Endpoint(serviceName, ipv4, port == 0 ? null : (short) (port & 0xffff)); + return new Endpoint(serviceName, ipv4, null, port == 0 ? null : (short) (port & 0xffff)); } public static Endpoint create(String serviceName, int ipv4) { - return new Endpoint(serviceName, ipv4, null); + return new Endpoint(serviceName, ipv4, null, null); } /** @@ -57,7 +59,7 @@ public static Endpoint create(String serviceName, int ipv4) { public final String serviceName; /** - * IPv4 endpoint address packed into 4 bytes. + * IPv4 endpoint address packed into 4 bytes or zero if unknown. * *

Ex for the IP 1.2.3.4, it would be {@code (1 << 24) | (2 << 16) | (3 << 8) | 4} * @@ -65,6 +67,14 @@ public static Endpoint create(String serviceName, int ipv4) { */ public final int ipv4; + /** + * IPv6 endpoint address packed into 16 bytes or null if unknown. + * + * @see java.net.Inet6Address#getAddress() + */ + @Nullable + public final byte[] ipv6; + /** * IPv4 port or null, if not known. * @@ -75,10 +85,11 @@ public static Endpoint create(String serviceName, int ipv4) { @Nullable public final Short port; - Endpoint(String serviceName, int ipv4, Short port) { + Endpoint(String serviceName, int ipv4, byte[] ipv6, Short port) { this.serviceName = checkNotNull(serviceName, "serviceName").isEmpty() ? "" : serviceName.toLowerCase(Locale.ROOT); this.ipv4 = ipv4; + this.ipv6 = ipv6; this.port = port; } @@ -93,6 +104,7 @@ public static Builder builder(){ public static final class Builder { private String serviceName; private Integer ipv4; + private byte[] ipv6; private Short port; Builder() { @@ -116,6 +128,15 @@ public Builder ipv4(int ipv4) { return this; } + /** @see Endpoint#ipv6 */ + public Builder ipv6(byte[] ipv6) { + if (ipv6 != null) { + checkArgument(ipv6.length == 16, "ipv6 addresses are 16 bytes: " + ipv6.length); + this.ipv6 = ipv6; + } + return this; + } + /** @see Endpoint#port */ public Builder port(Short port) { if (port == null || port != 0) { @@ -125,7 +146,7 @@ public Builder port(Short port) { } public Endpoint build() { - return new Endpoint(serviceName, checkNotNull(ipv4, "ipv4"), port); + return new Endpoint(serviceName, ipv4 == null ? 0 : ipv4, ipv6, port); } } @@ -143,6 +164,7 @@ public boolean equals(Object o) { Endpoint that = (Endpoint) o; return (this.serviceName.equals(that.serviceName)) && (this.ipv4 == that.ipv4) + && (Arrays.equals(this.ipv6, that.ipv6)) && Util.equal(this.port, that.port); } return false; @@ -156,6 +178,8 @@ public int hashCode() { h *= 1000003; h ^= ipv4; h *= 1000003; + h ^= Arrays.hashCode(ipv6); + h *= 1000003; h ^= (port == null) ? 0 : port.hashCode(); return h; } diff --git a/zipkin/src/main/java/zipkin/internal/CorrectForClockSkew.java b/zipkin/src/main/java/zipkin/internal/CorrectForClockSkew.java index 10a6e1b511a..a2a3ab545d1 100644 --- a/zipkin/src/main/java/zipkin/internal/CorrectForClockSkew.java +++ b/zipkin/src/main/java/zipkin/internal/CorrectForClockSkew.java @@ -14,6 +14,7 @@ package zipkin.internal; import java.util.ArrayList; +import java.util.Arrays; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -83,7 +84,7 @@ static Span adjustTimestamps(Span span, ClockSkew skew) { for (int i = 0, length = span.annotations.size(); i < length; i++) { Annotation a = span.annotations.get(i); if (a.endpoint == null) continue; - if (skew.endpoint.ipv4 == a.endpoint.ipv4) { + if (ipsMatch(skew.endpoint, a.endpoint)) { if (annotations == null) annotations = new ArrayList<>(span.annotations); annotations.set(i, a.toBuilder().timestamp(a.timestamp - skew.skew).build()); } @@ -95,13 +96,18 @@ static Span adjustTimestamps(Span span, ClockSkew skew) { for (int i = 0, length = span.binaryAnnotations.size(); i < length; i++) { BinaryAnnotation b = span.binaryAnnotations.get(i); if (b.endpoint == null) continue; - if (b.key.equals(Constants.LOCAL_COMPONENT) && skew.endpoint.ipv4 == b.endpoint.ipv4) { + if (b.key.equals(Constants.LOCAL_COMPONENT) && ipsMatch(skew.endpoint, b.endpoint)) { return span.toBuilder().timestamp(span.timestamp - skew.skew).build(); } } return span; } + static boolean ipsMatch(Endpoint skew, Endpoint that) { + return (skew.ipv6 != null && Arrays.equals(skew.ipv6, that.ipv6)) + || (skew.ipv4 != 0 && skew.ipv4 == that.ipv4); + } + /** Use client/server annotations to determine if there's clock skew. */ @Nullable static ClockSkew getClockSkew(Span span) { diff --git a/zipkin/src/main/java/zipkin/internal/JsonCodec.java b/zipkin/src/main/java/zipkin/internal/JsonCodec.java index faa94f157a4..91e29415e1d 100644 --- a/zipkin/src/main/java/zipkin/internal/JsonCodec.java +++ b/zipkin/src/main/java/zipkin/internal/JsonCodec.java @@ -19,6 +19,7 @@ import com.squareup.moshi.JsonWriter; import com.squareup.moshi.Moshi; import java.io.IOException; +import java.net.InetAddress; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; @@ -82,6 +83,12 @@ public Endpoint fromJson(JsonReader reader) throws IOException { } result.ipv4(ipv4); break; + case "ipv6": + String input = reader.nextString(); + // Shouldn't hit DNS, because it's an IP string literal. + byte[] ipv6 = InetAddress.getByName(input).getAddress(); + result.ipv6(ipv6); + break; case "port": result.port((short) reader.nextInt()); break; @@ -108,6 +115,9 @@ public void toJson(JsonWriter writer, Endpoint value) throws IOException { int port = value.port & 0xffff; if (port != 0) writer.name("port").value(port); } + if (value.ipv6 != null) { + writer.name("ipv6").value(InetAddress.getByAddress(value.ipv6).getHostAddress()); + } writer.endObject(); } }.nullSafe(); diff --git a/zipkin/src/main/java/zipkin/internal/ThriftCodec.java b/zipkin/src/main/java/zipkin/internal/ThriftCodec.java index e7508b90d84..925135f1018 100644 --- a/zipkin/src/main/java/zipkin/internal/ThriftCodec.java +++ b/zipkin/src/main/java/zipkin/internal/ThriftCodec.java @@ -110,6 +110,7 @@ interface ThriftAdapter extends ThriftReader, ThriftWriter { final Field IPV4 = new Field(TYPE_I32, 1); final Field PORT = new Field(TYPE_I16, 2); final Field SERVICE_NAME = new Field(TYPE_STRING, 3); + final Field IPV6 = new Field(TYPE_STRING, 4); @Override public Endpoint read(ByteBuffer bytes) { @@ -126,6 +127,8 @@ public Endpoint read(ByteBuffer bytes) { result.port(bytes.getShort()); } else if (field.isEqualTo(SERVICE_NAME)) { result.serviceName(readUtf8(bytes)); + } else if (field.isEqualTo(IPV6)) { + result.ipv6(readByteArray(bytes)); } else { skip(bytes, field.type); } @@ -144,6 +147,13 @@ public void write(Endpoint value, Buffer buffer) { SERVICE_NAME.write(buffer); writeUtf8(buffer, value.serviceName); + if (value.ipv6 != null) { + IPV6.write(buffer); + assert value.ipv6.length == 16; + buffer.writeInt(value.ipv6.length); + buffer.write(value.ipv6); + } + buffer.writeByte(TYPE_STOP); } }; diff --git a/zipkin/src/test/java/zipkin/EndpointTest.java b/zipkin/src/test/java/zipkin/EndpointTest.java index fe4fe0cfab5..bd97a7043c8 100644 --- a/zipkin/src/test/java/zipkin/EndpointTest.java +++ b/zipkin/src/test/java/zipkin/EndpointTest.java @@ -32,11 +32,9 @@ public void messageWhenMissingServiceName() { } @Test - public void messageWhenMissingIpv4() { - thrown.expect(NullPointerException.class); - thrown.expectMessage("ipv4"); - - Endpoint.builder().serviceName("foo").build(); + public void missingIpv4CoercesTo0() { + assertThat(Endpoint.builder().serviceName("foo").build().ipv4) + .isEqualTo(0); } @Test diff --git a/zipkin/src/test/java/zipkin/TestObjects.java b/zipkin/src/test/java/zipkin/TestObjects.java index debb6d92b21..5be419f2f94 100644 --- a/zipkin/src/test/java/zipkin/TestObjects.java +++ b/zipkin/src/test/java/zipkin/TestObjects.java @@ -37,8 +37,12 @@ public final class TestObjects { // Use real time, as most span-stores have TTL logic which looks back several days. public static final long TODAY = midnightUTC(System.currentTimeMillis()); - public static final Endpoint WEB_ENDPOINT = - Endpoint.create("web", 124 << 24 | 13 << 16 | 90 << 8 | 3); + public static final Endpoint WEB_ENDPOINT = Endpoint.builder() + .serviceName("web") + .ipv4(124 << 24 | 13 << 16 | 90 << 8 | 3) + // Cheat so we don't have to catch an exception here + .ipv6(sun.net.util.IPAddressUtil.textToNumericFormatV6("2001:db8::c001")) + .port((short) 80).build(); public static final Endpoint APP_ENDPOINT = Endpoint.create("app", 172 << 24 | 17 << 16 | 2, 8080); public static final Endpoint DB_ENDPOINT = diff --git a/zipkin/src/test/java/zipkin/internal/CorrectForClockSkewTest.java b/zipkin/src/test/java/zipkin/internal/CorrectForClockSkewTest.java new file mode 100644 index 00000000000..bf20b1f55fa --- /dev/null +++ b/zipkin/src/test/java/zipkin/internal/CorrectForClockSkewTest.java @@ -0,0 +1,75 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.internal; + +import org.junit.Test; +import zipkin.Endpoint; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static zipkin.internal.CorrectForClockSkew.ipsMatch; + +public class CorrectForClockSkewTest { + Endpoint ipv6 = Endpoint.builder() + .serviceName("web") + // Cheat so we don't have to catch an exception here + .ipv6(sun.net.util.IPAddressUtil.textToNumericFormatV6("2001:db8::c001")) + .build(); + + Endpoint ipv4 = Endpoint.builder() + .serviceName("web") + .ipv4(124 << 24 | 13 << 16 | 90 << 8 | 2) + .build(); + + Endpoint both = ipv4.toBuilder().ipv6(ipv6.ipv6).build(); + + @Test + public void ipsMatch_falseWhenNoIp() { + Endpoint noIp = Endpoint.builder().serviceName("foo").build(); + assertFalse(ipsMatch(noIp, ipv4)); + assertFalse(ipsMatch(noIp, ipv6)); + assertFalse(ipsMatch(ipv4, noIp)); + assertFalse(ipsMatch(ipv6, noIp)); + } + + @Test + public void ipsMatch_falseWhenIpv4Different() { + Endpoint different = ipv4.toBuilder() + .ipv4(124 << 24 | 13 << 16 | 90 << 8 | 3).build(); + assertFalse(ipsMatch(different, ipv4)); + assertFalse(ipsMatch(ipv4, different)); + } + + @Test + public void ipsMatch_falseWhenIpv6Different() { + Endpoint different = ipv6.toBuilder() + .ipv6(sun.net.util.IPAddressUtil.textToNumericFormatV6("2001:db8::c002")).build(); + assertFalse(ipsMatch(different, ipv6)); + assertFalse(ipsMatch(ipv6, different)); + } + + @Test + public void ipsMatch_whenIpv6Match() { + assertTrue(ipsMatch(ipv6, ipv6)); + assertTrue(ipsMatch(both, ipv6)); + assertTrue(ipsMatch(ipv6, both)); + } + + @Test + public void ipsMatch_whenIpv4Match() { + assertTrue(ipsMatch(ipv4, ipv4)); + assertTrue(ipsMatch(both, ipv4)); + assertTrue(ipsMatch(ipv4, both)); + } +}