Skip to content

Commit

Permalink
Changes v2 IP addresses to strings and fleshed v2 model
Browse files Browse the repository at this point in the history
This adds decoupled Annotation and Endpoint variants for v2.

This also fleshes out codec for use in benchmarks, spark and query apis.
  • Loading branch information
Adrian Cole committed Sep 2, 2017
1 parent 5bc51b1 commit 458c74a
Show file tree
Hide file tree
Showing 57 changed files with 1,439 additions and 712 deletions.
30 changes: 19 additions & 11 deletions benchmarks/src/main/java/zipkin/benchmarks/CodecBenchmarks.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
Expand All @@ -42,9 +41,8 @@
import zipkin.Codec;
import zipkin.Endpoint;
import zipkin.internal.v2.Span;
import zipkin.internal.v2.codec.MessageEncoder;
import zipkin.internal.v2.codec.Decoder;
import zipkin.internal.v2.codec.Encoder;
import zipkin.internal.v2.codec.BytesDecoder;
import zipkin.internal.v2.codec.BytesEncoder;

/**
* This compares the speed of the bundled java codec with the approach used in the scala
Expand Down Expand Up @@ -158,19 +156,29 @@ public byte[] writeClientSpan_thrift_libthrift() throws TException {
return serialize(clientSpanLibThrift);
}

static final Span span2 = Decoder.JSON.decodeList(read("/span2.json")).get(0);
static final byte[] tenClientSpan2sJson = MessageEncoder.JSON_BYTES.encode(
Collections.nCopies(10, span2).stream().map(Encoder.JSON::encode).collect(Collectors.toList())
);
static final byte[] span2Json = read("/span2.json");
static final Span span2 = BytesDecoder.JSON.decode(span2Json);
static final List<Span> tenSpan2s = Collections.nCopies(10, span2);
static final byte[] tenSpan2sJson = BytesEncoder.JSON.encodeList(tenSpan2s);

@Benchmark
public Span readClientSpan_json_span2() {
return BytesDecoder.JSON.decode(span2Json);
}

@Benchmark
public List<Span> readTenClientSpans_json_span2() {
return Decoder.JSON.decodeList(tenClientSpan2sJson);
return BytesDecoder.JSON.decodeList(tenSpan2sJson);
}

@Benchmark
public byte[] writeClientSpan_json_span2() {
return Encoder.JSON.encode(span2);
return BytesEncoder.JSON.encode(span2);
}

@Benchmark
public byte[] writeTenClientSpans_json_span2() {
return BytesEncoder.JSON.encodeList(tenSpan2s);
}

static final byte[] rpcSpanJson = read("/span-rpc.json");
Expand Down Expand Up @@ -246,7 +254,7 @@ public byte[] writeRpcV6Span_thrift_libthrift() throws TException {
// Convenience main entry-point
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include("CodecBenchmarks.readTenClientSpans_json_span2")
.include(".*" + CodecBenchmarks.class.getSimpleName() + ".*ClientSpan.*")
.build();

new Runner(opt).run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import zipkin.internal.V2SpanConverter;
import zipkin.internal.Util;

import static zipkin.internal.V2SpanConverter.convert;

@Measurement(iterations = 5, time = 1)
@Warmup(iterations = 10, time = 1)
@Fork(3)
Expand Down Expand Up @@ -88,15 +90,15 @@ public class Span2ConverterBenchmarks {
.addBinaryAnnotation(BinaryAnnotation.address(Constants.CLIENT_ADDR, frontend))
.build();

Span server2 = Span.builder()
Span server2 = Span.newBuilder()
.traceId("7180c278b62e8f6a216a2aea45d08fc9")
.parentId("6b221d5bc9e6496c")
.id("5b4185666d50f68b")
.name("get")
.kind(Span.Kind.SERVER)
.shared(true)
.localEndpoint(backend)
.remoteEndpoint(frontend)
.localEndpoint(convert(backend))
.remoteEndpoint(convert(frontend))
.timestamp(1472470996250000L)
.duration(100000L)
.putTag(TraceKeys.HTTP_PATH, "/backend")
Expand Down
18 changes: 13 additions & 5 deletions benchmarks/src/main/java/zipkin/benchmarks/SpanBenchmarks.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import zipkin.Constants;
import zipkin.Endpoint;
import zipkin.TraceKeys;
import zipkin.internal.v2.Span;
import zipkin.internal.Util;
import zipkin.internal.v2.Span;

@Measurement(iterations = 5, time = 1)
@Warmup(iterations = 10, time = 1)
Expand Down Expand Up @@ -79,7 +79,15 @@ public zipkin.Span buildLocalSpan() {
.ipv4(192 << 24 | 168 << 16 | 99 << 8 | 101)
.port(9000)
.build();

static final zipkin.internal.v2.Endpoint frontend2 = zipkin.internal.v2.Endpoint.newBuilder()
.serviceName("frontend")
.ip("127.0.0.1")
.build();
static final zipkin.internal.v2.Endpoint backend2 = zipkin.internal.v2.Endpoint.newBuilder()
.serviceName("backend")
.ip("192.168.99.101")
.port(9000)
.build();
@Benchmark
public zipkin.Span buildClientOnlySpan() {
return buildClientOnlySpan(zipkin.Span.builder());
Expand Down Expand Up @@ -110,7 +118,7 @@ public zipkin.Span buildClientOnlySpan_clear() {

@Benchmark
public Span buildClientOnlySpan2() {
return buildClientOnlySpan2(Span.builder());
return buildClientOnlySpan2(Span.newBuilder());
}

static Span buildClientOnlySpan2(Span.Builder builder) {
Expand All @@ -120,8 +128,8 @@ static Span buildClientOnlySpan2(Span.Builder builder) {
.id(spanIdHex)
.name("get")
.kind(Span.Kind.CLIENT)
.localEndpoint(frontend)
.remoteEndpoint(backend)
.localEndpoint(frontend2)
.remoteEndpoint(backend2)
.timestamp(1472470996199000L)
.duration(207000L)
.addAnnotation(1472470996238000L, Constants.WIRE_SEND)
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/main/resources/span-client.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"traceId": "86154a4ba6e91385",
"traceId": "4d1e00c0db9010db86154a4ba6e91385",
"name": "get",
"id": "4d1e00c0db9010db",
"parentId": "86154a4ba6e91385",
Expand Down
8 changes: 4 additions & 4 deletions benchmarks/src/main/resources/span2.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[{
"traceId": "86154a4ba6e91385",
{
"traceId": "4d1e00c0db9010db86154a4ba6e91385",
"parentId": "86154a4ba6e91385",
"id": "4d1e00c0db9010db",
"kind": "CLIENT",
Expand All @@ -8,7 +8,7 @@
"duration": 207000,
"localEndpoint": {
"serviceName": "frontend",
"ipv4": "127.0.0.1"
"ipv6": "7::0.128.128.127"
},
"remoteEndpoint": {
"serviceName": "backend",
Expand All @@ -29,4 +29,4 @@
"http.path": "/api",
"clnt/finagle.version": "6.45.0"
}
}]
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@
import zipkin.collector.kafka.KafkaCollector.Builder;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.V2SpanConverter;
import zipkin.internal.v2.codec.MessageEncoder;
import zipkin.internal.v2.codec.Encoder;
import zipkin.internal.v2.codec.BytesEncoder;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.AsyncSpanStore;
import zipkin.storage.SpanStore;
Expand Down Expand Up @@ -147,9 +146,9 @@ public void messageWithMultipleSpans_json2() throws Exception {
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1])
);

byte[] message = MessageEncoder.JSON_BYTES.encode(asList(
Encoder.JSON.encode(V2SpanConverter.fromSpan(spans.get(0)).get(0)),
Encoder.JSON.encode(V2SpanConverter.fromSpan(spans.get(1)).get(0))
byte[] message = BytesEncoder.JSON.encodeList(asList(
V2SpanConverter.fromSpan(spans.get(0)).get(0),
V2SpanConverter.fromSpan(spans.get(1)).get(0)
));

producer.send(new KeyedMessage<>(builder.topic, message));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@
import zipkin.collector.kafka10.KafkaCollector.Builder;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.V2SpanConverter;
import zipkin.internal.v2.codec.MessageEncoder;
import zipkin.internal.v2.codec.Encoder;
import zipkin.internal.v2.codec.BytesEncoder;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.AsyncSpanStore;
import zipkin.storage.SpanStore;
Expand Down Expand Up @@ -201,9 +200,9 @@ public void messageWithMultipleSpans_json2() throws Exception {
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1])
);

byte[] message = MessageEncoder.JSON_BYTES.encode(asList(
Encoder.JSON.encode(V2SpanConverter.fromSpan(spans.get(0)).get(0)),
Encoder.JSON.encode(V2SpanConverter.fromSpan(spans.get(1)).get(0))
byte[] message = BytesEncoder.JSON.encodeList(asList(
V2SpanConverter.fromSpan(spans.get(0)).get(0),
V2SpanConverter.fromSpan(spans.get(1)).get(0)
));

produceSpans(message, builder.topic);
Expand Down
27 changes: 3 additions & 24 deletions zipkin-junit/src/main/java/zipkin/junit/ZipkinDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package zipkin.junit;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import javax.annotation.Nullable;
Expand All @@ -32,7 +31,7 @@
import zipkin.collector.CollectorMetrics;
import zipkin.internal.V2JsonSpanDecoder;
import zipkin.internal.V2StorageComponent;
import zipkin.internal.v2.codec.Encoder;
import zipkin.internal.v2.codec.BytesEncoder;
import zipkin.internal.v2.internal.Platform;
import zipkin.storage.Callback;
import zipkin.storage.QueryRequest;
Expand Down Expand Up @@ -132,37 +131,17 @@ MockResponse queryV2(HttpUrl url) throws IOException {
return jsonResponse(Codec.JSON.writeDependencyLinks(result));
} else if (url.encodedPath().equals("/api/v2/traces")) {
List<List<zipkin.internal.v2.Span>> traces = store2.getTraces(toQueryRequest2(url)).execute();
ByteArrayOutputStream bout = new ByteArrayOutputStream();
bout.write('[');
for (int i = 0, length = traces.size(); i < length; ) {
List<zipkin.internal.v2.Span> trace = traces.get(i);
writeTrace(bout, trace);
if (++i < length) bout.write(',');
}
bout.write(']');
return jsonResponse(bout.toByteArray());
return jsonResponse(BytesEncoder.JSON.encodeNestedList(traces));
} else if (url.encodedPath().startsWith("/api/v2/trace/")) {
String traceIdHex = url.encodedPath().replace("/api/v2/trace/", "");
List<zipkin.internal.v2.Span> trace = store2.getTrace(normalizeTraceId(traceIdHex)).execute();
if (!trace.isEmpty()) {
ByteArrayOutputStream bout = new ByteArrayOutputStream();
writeTrace(bout, trace);
return jsonResponse(bout.toByteArray());
return jsonResponse(BytesEncoder.JSON.encodeList(trace));
}
}
return new MockResponse().setResponseCode(404);
}

static void writeTrace(ByteArrayOutputStream bout, List<zipkin.internal.v2.Span> trace)
throws IOException {
bout.write('[');
for (int i = 0, length = trace.size(); i < length; ) {
bout.write(Encoder.JSON.encode(trace.get(i)));
if (++i < length) bout.write(',');
}
bout.write(']');
}

MockResponse acceptSpans(RecordedRequest request, SpanDecoder decoder) {
metrics.incrementMessages();
byte[] body = request.getBody().readByteArray();
Expand Down
5 changes: 0 additions & 5 deletions zipkin-junit/src/test/java/zipkin/junit/ITHttpStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,8 @@

import java.io.IOException;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import zipkin.Span;
import zipkin.TestObjects;

import static org.assertj.core.api.Assertions.assertThat;

@RunWith(Enclosed.class)
public class ITHttpStorage {
Expand Down
9 changes: 4 additions & 5 deletions zipkin-junit/src/test/java/zipkin/junit/ZipkinRuleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@
import zipkin.Span;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.V2SpanConverter;
import zipkin.internal.v2.codec.MessageEncoder;
import zipkin.internal.v2.codec.Encoder;
import zipkin.internal.v2.codec.BytesEncoder;

import static java.lang.String.format;
import static java.util.Arrays.asList;
Expand Down Expand Up @@ -69,9 +68,9 @@ public void getTraces_storedViaPostVersion2() throws IOException {
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1])
);

byte[] message = MessageEncoder.JSON_BYTES.encode(asList(
Encoder.JSON.encode(V2SpanConverter.fromSpan(spans.get(0)).get(0)),
Encoder.JSON.encode(V2SpanConverter.fromSpan(spans.get(1)).get(0))
byte[] message = BytesEncoder.JSON.encodeList(asList(
V2SpanConverter.fromSpan(spans.get(0)).get(0),
V2SpanConverter.fromSpan(spans.get(1)).get(0)
));

// write the span to the zipkin using http api v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import okhttp3.RequestBody;
import okio.Buffer;
import zipkin.internal.v2.Span;
import zipkin.internal.v2.codec.Encoder;
import zipkin.internal.v2.codec.BytesEncoder;
import zipkin.internal.v2.storage.SpanConsumer;

/** Implements the span consumer interface by forwarding requests over http. */
Expand All @@ -36,7 +36,7 @@ final class HttpV2SpanConsumer implements SpanConsumer {
Buffer json = new Buffer();
json.writeByte('[');
for (int i = 0, length = spans.size(); i < length; ) {
json.write(Encoder.JSON.encode(spans.get(i)));
json.write(BytesEncoder.JSON.encode(spans.get(i)));
if (++i < length) json.writeByte(',');
}
json.writeByte(']');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import zipkin.DependencyLink;
import zipkin.internal.v2.Call;
import zipkin.internal.v2.Span;
import zipkin.internal.v2.codec.Decoder;
import zipkin.internal.v2.codec.BytesDecoder;
import zipkin.internal.v2.storage.QueryRequest;
import zipkin.internal.v2.storage.SpanStore;

Expand All @@ -46,13 +46,13 @@ final class HttpV2SpanStore implements SpanStore {
maybeAddQueryParam(url, "lookback", request.lookback());
maybeAddQueryParam(url, "limit", request.limit());
return factory.newCall(new Request.Builder().url(url.build()).build(),
content -> Decoder.JSON.decodeNestedList(content.readByteArray()));
content -> BytesDecoder.JSON.decodeNestedList(content.readByteArray()));
}

@Override public Call<List<Span>> getTrace(String traceId) {
return factory.newCall(new Request.Builder()
.url(factory.baseUrl.resolve("/api/v2/trace/" + Span.normalizeTraceId(traceId)))
.build(), content -> Decoder.JSON.decodeList(content.readByteArray()))
.build(), content -> BytesDecoder.JSON.decodeList(content.readByteArray()))
.handleError(((error, callback) -> {
if (error instanceof HttpException && ((HttpException) error).code == 404) {
callback.onSuccess(Collections.emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.context.annotation.Import;
import zipkin.autoconfigure.ui.ZipkinUiAutoConfiguration;
import zipkin.server.brave.BraveConfiguration;

@Target(ElementType.TYPE)
Expand Down
Loading

0 comments on commit 458c74a

Please sign in to comment.