Skip to content

Commit

Permalink
Adds Endpoint.ipv6 field with supporting changes to MySQL and UI
Browse files Browse the repository at this point in the history
This adds `Endpoint.ipv6` as a fixed-width byte array (16 bytes). This
formalizes `Endpoint.ipv4 == 0` implying there's no ipv4 address.

In thrift, this remains a byte array (String) at field 4.
In json, this is normal string formatting.
 * normal java utilities are used for codec (Inet6Address)

In the UI, the ipv6 address is preferred and bracketed when present.
Ex. [2001:db8:0:0:0:0:0:c002]:8080

In MySQL, this is mapped to a `BINARY(16)` field named
`zipkin_annotations.endpoint_ipv6`. When this column is missing a
warning like below is printed:

```
Jul 11, 2016 4:08:53 PM zipkin.storage.mysql.HasIpv6 compute
WARNING: zipkin_annotations.ipv6 doesn't exist, so Endpoint.ipv6 is not supported. Execute: alter table zipkin_annotations add `endpoint_ipv6` BINARY(16)
```

Fixes #306
  • Loading branch information
Adrian Cole committed Jul 11, 2016
1 parent 9ad7f59 commit 0fad8fd
Show file tree
Hide file tree
Showing 23 changed files with 474 additions and 41 deletions.
26 changes: 25 additions & 1 deletion benchmarks/src/main/java/zipkin/benchmarks/CodecBenchmarks.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public byte[] writeClientSpan_thrift_java() {
return Codec.THRIFT.writeSpan(clientSpan);
}

static final byte[] rpcSpanJson = read("/span-client.json");
static final byte[] rpcSpanJson = read("/span-rpc.json");
static final Span rpcSpan = Codec.JSON.readSpan(rpcSpanJson);
static final byte[] rpcSpanThrift = Codec.THRIFT.writeSpan(rpcSpan);

Expand All @@ -122,6 +122,30 @@ public byte[] writeRpcSpan_thrift_java() {
return Codec.THRIFT.writeSpan(rpcSpan);
}

static final byte[] rpcV6SpanJson = read("/span-rpc-ipv6.json");
static final Span rpcV6Span = Codec.JSON.readSpan(rpcV6SpanJson);
static final byte[] rpcV6SpanThrift = Codec.THRIFT.writeSpan(rpcV6Span);

@Benchmark
public Span readRpcV6Span_json_java() {
return Codec.JSON.readSpan(rpcV6SpanJson);
}

@Benchmark
public Span readRpcV6Span_thrift_java() {
return Codec.THRIFT.readSpan(rpcV6SpanThrift);
}

@Benchmark
public byte[] writeRpcV6Span_json_java() {
return Codec.JSON.writeSpan(rpcV6Span);
}

@Benchmark
public byte[] writeRpcV6Span_thrift_java() {
return Codec.THRIFT.writeSpan(rpcV6Span);
}

// Convenience main entry-point
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
Expand Down
130 changes: 130 additions & 0 deletions benchmarks/src/main/resources/span-rpc-ipv6.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
{
"traceId": "83c138a4ee0f3de9",
"name": "get",
"id": "d673d7abe80d2f3f",
"parentId": "83c138a4ee0f3de9",
"timestamp": 1461750491274000,
"duration": 51000,
"annotations": [
{
"timestamp": 1461750491274000,
"value": "cs",
"endpoint": {
"serviceName": "zipkin-web",
"ipv6": "2001:db8::c001"
}
},
{
"timestamp": 1461750491274000,
"value": "ws",
"endpoint": {
"serviceName": "zipkin-web",
"ipv6": "2001:db8::c001"
}
},
{
"timestamp": 1461750491298000,
"value": "sr",
"endpoint": {
"serviceName": "zipkin-query",
"ipv6": "2001:db8::c002",
"port": 9411
}
},
{
"timestamp": 1461750491302000,
"value": "ss",
"endpoint": {
"serviceName": "zipkin-query",
"ipv6": "2001:db8::c002",
"port": 9411
}
},
{
"timestamp": 1461750491325000,
"value": "cr",
"endpoint": {
"serviceName": "zipkin-web",
"ipv6": "2001:db8::c001"
}
},
{
"timestamp": 1461750491325000,
"value": "wr",
"endpoint": {
"serviceName": "zipkin-web",
"ipv6": "2001:db8::c001"
}
}
],
"binaryAnnotations": [
{
"key": "http.path",
"value": "/api/v1/traces",
"endpoint": {
"serviceName": "zipkin-query",
"ipv6": "2001:db8::c002"
}
},
{
"key": "srv/finagle.version",
"value": "6.34.0",
"endpoint": {
"serviceName": "zipkin-query",
"ipv6": "2001:db8::c002"
}
},
{
"key": "sa",
"value": true,
"endpoint": {
"serviceName": "zipkin-query",
"ipv6": "2001:db8::c002",
"port": 9411
}
},
{
"key": "ca",
"value": true,
"endpoint": {
"serviceName": "zipkin-query",
"ipv6": "2001:db8::c002",
"port": 42291
}
},
{
"key": "http.uri",
"value": "/api/v1/traces",
"endpoint": {
"serviceName": "zipkin-web",
"ipv6": "2001:db8::c001"
}
},
{
"key": "clnt/finagle.version",
"value": "6.34.0",
"endpoint": {
"serviceName": "zipkin-web",
"ipv6": "2001:db8::c001"
}
},
{
"key": "sa",
"value": true,
"endpoint": {
"serviceName": "zipkin-query",
"ipv6": "2001:db8::c002",
"port": 9411
}
},
{
"key": "ca",
"value": true,
"endpoint": {
"serviceName": "zipkin-web",
"ipv6": "2001:db8::c001",
"port": 42291
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin.storage.mysql;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.sql.DataSource;
import org.jooq.DSLContext;
import org.jooq.exception.DataAccessException;
import zipkin.internal.Lazy;

import static zipkin.storage.mysql.internal.generated.tables.ZipkinAnnotations.ZIPKIN_ANNOTATIONS;

final class HasIpv6 extends Lazy<Boolean> {
private static final Logger LOG = Logger.getLogger(HasIpv6.class.getName());

final DataSource datasource;
final DSLContexts context;

HasIpv6(DataSource datasource, DSLContexts context) {
this.datasource = datasource;
this.context = context;
}

@Override protected Boolean compute() {
try (Connection conn = datasource.getConnection()) {
DSLContext dsl = context.get(conn);
dsl.select(ZIPKIN_ANNOTATIONS.ENDPOINT_IPV6).from(ZIPKIN_ANNOTATIONS).limit(1).fetchAny();
return true;
} catch (DataAccessException e) {
if (e.sqlState().equals("42S22")) {
LOG.warning("zipkin_annotations.ipv6 doesn't exist, so Endpoint.ipv6 is not supported. " +
"Execute: alter table zipkin_annotations add `endpoint_ipv6` BINARY(16)");
return false;
}
problemReading(e);
} catch (SQLException | RuntimeException e) {
problemReading(e);
}
return false;
}

static void problemReading(Exception e) {
LOG.log(Level.WARNING, "problem reading zipkin_annotations.ipv6", e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import zipkin.BinaryAnnotation;
import zipkin.Span;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.Lazy;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.StorageAdapters;

Expand All @@ -38,10 +39,12 @@
final class MySQLSpanConsumer implements StorageAdapters.SpanConsumer {
private final DataSource datasource;
private final DSLContexts context;
private final Lazy<Boolean> hasIpv6;

MySQLSpanConsumer(DataSource datasource, DSLContexts context) {
MySQLSpanConsumer(DataSource datasource, DSLContexts context, Lazy<Boolean> hasIpv6) {
this.datasource = datasource;
this.context = context;
this.hasIpv6 = hasIpv6;
}

/** Blocking version of {@link AsyncSpanConsumer#accept} */
Expand Down Expand Up @@ -94,6 +97,9 @@ final class MySQLSpanConsumer implements StorageAdapters.SpanConsumer {
if (annotation.endpoint != null) {
insert.set(ZIPKIN_ANNOTATIONS.ENDPOINT_SERVICE_NAME, annotation.endpoint.serviceName);
insert.set(ZIPKIN_ANNOTATIONS.ENDPOINT_IPV4, annotation.endpoint.ipv4);
if (annotation.endpoint.ipv6 != null && hasIpv6.get()) {
insert.set(ZIPKIN_ANNOTATIONS.ENDPOINT_IPV6, annotation.endpoint.ipv6);
}
insert.set(ZIPKIN_ANNOTATIONS.ENDPOINT_PORT, annotation.endpoint.port);
}
inserts.add(insert.onDuplicateKeyIgnore());
Expand All @@ -110,6 +116,9 @@ final class MySQLSpanConsumer implements StorageAdapters.SpanConsumer {
if (annotation.endpoint != null) {
insert.set(ZIPKIN_ANNOTATIONS.ENDPOINT_SERVICE_NAME, annotation.endpoint.serviceName);
insert.set(ZIPKIN_ANNOTATIONS.ENDPOINT_IPV4, annotation.endpoint.ipv4);
if (annotation.endpoint.ipv6 != null && hasIpv6.get()) {
insert.set(ZIPKIN_ANNOTATIONS.ENDPOINT_IPV6, annotation.endpoint.ipv6);
}
insert.set(ZIPKIN_ANNOTATIONS.ENDPOINT_PORT, annotation.endpoint.port);
}
inserts.add(insert.onDuplicateKeyIgnore());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
Expand All @@ -26,6 +27,7 @@
import org.jooq.Condition;
import org.jooq.Cursor;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.Record;
import org.jooq.Record1;
import org.jooq.Record5;
Expand All @@ -42,6 +44,7 @@
import zipkin.internal.CorrectForClockSkew;
import zipkin.internal.DependencyLinkSpan;
import zipkin.internal.DependencyLinker;
import zipkin.internal.Lazy;
import zipkin.internal.Nullable;
import zipkin.internal.Pair;
import zipkin.storage.QueryRequest;
Expand All @@ -59,23 +62,33 @@
import static zipkin.storage.mysql.internal.generated.tables.ZipkinSpans.ZIPKIN_SPANS;

final class MySQLSpanStore implements SpanStore {
static final Field<?>[] ANNOTATION_FIELDS_WITHOUT_IPV6;

static {
ArrayList<Field<?>> list = new ArrayList(Arrays.asList(ZIPKIN_ANNOTATIONS.fields()));
list.remove(ZIPKIN_ANNOTATIONS.ENDPOINT_IPV6);
list.trimToSize();
ANNOTATION_FIELDS_WITHOUT_IPV6 = list.toArray(new Field<?>[list.size()]);
}

private final DataSource datasource;
private final DSLContexts context;
private final Lazy<Boolean> hasIpv6;

MySQLSpanStore(DataSource datasource, DSLContexts context) {
MySQLSpanStore(DataSource datasource, DSLContexts context, Lazy<Boolean> hasIpv6) {
this.datasource = datasource;
this.context = context;
this.hasIpv6 = hasIpv6;
}

static Endpoint endpoint(Record a) {
private Endpoint endpoint(Record a) {
String serviceName = a.getValue(ZIPKIN_ANNOTATIONS.ENDPOINT_SERVICE_NAME);
if (serviceName == null) {
return null;
}
Short port = a.getValue(ZIPKIN_ANNOTATIONS.ENDPOINT_PORT);
return port != null ?
Endpoint.create(serviceName, a.getValue(ZIPKIN_ANNOTATIONS.ENDPOINT_IPV4), port.intValue())
: Endpoint.create(serviceName, a.getValue(ZIPKIN_ANNOTATIONS.ENDPOINT_IPV4));
if (serviceName == null) return null;
return Endpoint.builder()
.serviceName(serviceName)
.port(a.getValue(ZIPKIN_ANNOTATIONS.ENDPOINT_PORT))
.ipv4(a.getValue(ZIPKIN_ANNOTATIONS.ENDPOINT_IPV4))
.ipv6(hasIpv6.get() ? a.getValue(ZIPKIN_ANNOTATIONS.ENDPOINT_IPV6) : null).build();
}

private static SelectOffsetStep<Record1<Long>> toTraceIdQuery(DSLContext context,
Expand Down Expand Up @@ -159,7 +172,8 @@ List<List<Span>> getTraces(@Nullable QueryRequest request, @Nullable Long traceI
groupingBy((Span s) -> s.traceId, LinkedHashMap::new, Collectors.<Span>toList()));

dbAnnotations = context.get(conn)
.selectFrom(ZIPKIN_ANNOTATIONS)
.select(hasIpv6.get() ? ZIPKIN_ANNOTATIONS.fields() : ANNOTATION_FIELDS_WITHOUT_IPV6)
.from(ZIPKIN_ANNOTATIONS)
.where(ZIPKIN_ANNOTATIONS.TRACE_ID.in(spansWithoutAnnotations.keySet()))
.orderBy(ZIPKIN_ANNOTATIONS.A_TIMESTAMP.asc(), ZIPKIN_ANNOTATIONS.A_KEY.asc())
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import javax.sql.DataSource;
import org.jooq.ExecuteListenerProvider;
import org.jooq.conf.Settings;
import zipkin.internal.Lazy;
import zipkin.internal.Nullable;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.AsyncSpanStore;
Expand Down Expand Up @@ -78,6 +79,7 @@ public MySQLStorage build() {
private final DataSource datasource;
private final Executor executor;
private final DSLContexts context;
final Lazy<Boolean> hasIpv6;
private final SpanStore spanStore;
private final AsyncSpanStore asyncSpanStore;
private final AsyncSpanConsumer asyncSpanConsumer;
Expand All @@ -86,9 +88,10 @@ public MySQLStorage build() {
this.datasource = checkNotNull(builder.datasource, "datasource");
this.executor = checkNotNull(builder.executor, "executor");
this.context = new DSLContexts(builder.settings, builder.listenerProvider);
this.spanStore = new MySQLSpanStore(datasource, context);
this.hasIpv6 = new HasIpv6(datasource, context);
this.spanStore = new MySQLSpanStore(datasource, context, hasIpv6);
this.asyncSpanStore = blockingToAsync(spanStore, executor);
this.asyncSpanConsumer = blockingToAsync(new MySQLSpanConsumer(datasource, context), executor);
this.asyncSpanConsumer = blockingToAsync(new MySQLSpanConsumer(datasource, context, hasIpv6), executor);
}

/** Returns the session in use by this storage component. */
Expand Down Expand Up @@ -128,7 +131,7 @@ void clear() {
try (Connection conn = datasource.getConnection()) {
context.get(conn).truncate(ZIPKIN_SPANS).execute();
context.get(conn).truncate(ZIPKIN_ANNOTATIONS).execute();
} catch (SQLException e) {
} catch (SQLException | RuntimeException e) {
throw new AssertionError(e);
}
}
Expand Down
Loading

0 comments on commit 0fad8fd

Please sign in to comment.