Skip to content

Commit

Permalink
Changes v2 IDs and IPs to validated Strings (openzipkin#1721)
Browse files Browse the repository at this point in the history
  • Loading branch information
adriancole authored and abesto committed Sep 10, 2019
1 parent 2ed46c9 commit 1a62537
Show file tree
Hide file tree
Showing 64 changed files with 1,696 additions and 994 deletions.
30 changes: 19 additions & 11 deletions benchmarks/src/main/java/zipkin/benchmarks/CodecBenchmarks.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
Expand All @@ -42,9 +41,8 @@
import zipkin.Codec;
import zipkin.Endpoint;
import zipkin.internal.v2.Span;
import zipkin.internal.v2.codec.MessageEncoder;
import zipkin.internal.v2.codec.Decoder;
import zipkin.internal.v2.codec.Encoder;
import zipkin.internal.v2.codec.BytesDecoder;
import zipkin.internal.v2.codec.BytesEncoder;

/**
* This compares the speed of the bundled java codec with the approach used in the scala
Expand Down Expand Up @@ -158,19 +156,29 @@ public byte[] writeClientSpan_thrift_libthrift() throws TException {
return serialize(clientSpanLibThrift);
}

static final Span span2 = Decoder.JSON.decodeList(read("/span2.json")).get(0);
static final byte[] tenClientSpan2sJson = MessageEncoder.JSON_BYTES.encode(
Collections.nCopies(10, span2).stream().map(Encoder.JSON::encode).collect(Collectors.toList())
);
static final byte[] span2Json = read("/span2.json");
static final Span span2 = BytesDecoder.JSON.decode(span2Json);
static final List<Span> tenSpan2s = Collections.nCopies(10, span2);
static final byte[] tenSpan2sJson = BytesEncoder.JSON.encodeList(tenSpan2s);

@Benchmark
public Span readClientSpan_json_span2() {
return BytesDecoder.JSON.decode(span2Json);
}

@Benchmark
public List<Span> readTenClientSpans_json_span2() {
return Decoder.JSON.decodeList(tenClientSpan2sJson);
return BytesDecoder.JSON.decodeList(tenSpan2sJson);
}

@Benchmark
public byte[] writeClientSpan_json_span2() {
return Encoder.JSON.encode(span2);
return BytesEncoder.JSON.encode(span2);
}

@Benchmark
public byte[] writeTenClientSpans_json_span2() {
return BytesEncoder.JSON.encodeList(tenSpan2s);
}

static final byte[] rpcSpanJson = read("/span-rpc.json");
Expand Down Expand Up @@ -246,7 +254,7 @@ public byte[] writeRpcV6Span_thrift_libthrift() throws TException {
// Convenience main entry-point
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include("CodecBenchmarks.readTenClientSpans_json_span2")
.include(".*" + CodecBenchmarks.class.getSimpleName() + ".*ClientSpan.*")
.build();

new Runner(opt).run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import zipkin.internal.V2SpanConverter;
import zipkin.internal.Util;

import static zipkin.internal.V2SpanConverter.convert;

@Measurement(iterations = 5, time = 1)
@Warmup(iterations = 10, time = 1)
@Fork(3)
Expand Down Expand Up @@ -88,15 +90,15 @@ public class Span2ConverterBenchmarks {
.addBinaryAnnotation(BinaryAnnotation.address(Constants.CLIENT_ADDR, frontend))
.build();

Span server2 = Span.builder()
Span server2 = Span.newBuilder()
.traceId("7180c278b62e8f6a216a2aea45d08fc9")
.parentId("6b221d5bc9e6496c")
.id("5b4185666d50f68b")
.name("get")
.kind(Span.Kind.SERVER)
.shared(true)
.localEndpoint(backend)
.remoteEndpoint(frontend)
.localEndpoint(convert(backend))
.remoteEndpoint(convert(frontend))
.timestamp(1472470996250000L)
.duration(100000L)
.putTag(TraceKeys.HTTP_PATH, "/backend")
Expand Down
30 changes: 20 additions & 10 deletions benchmarks/src/main/java/zipkin/benchmarks/SpanBenchmarks.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import zipkin.Constants;
import zipkin.Endpoint;
import zipkin.TraceKeys;
import zipkin.internal.v2.Span;
import zipkin.internal.Util;
import zipkin.internal.v2.Span;

@Measurement(iterations = 5, time = 1)
@Warmup(iterations = 10, time = 1)
Expand Down Expand Up @@ -69,15 +69,25 @@ public zipkin.Span buildLocalSpan() {
.build();
}

static final long traceId = Util.lowerHexToUnsignedLong("86154a4ba6e91385");
static final long spanId = Util.lowerHexToUnsignedLong("4d1e00c0db9010db");
static final String traceIdHex = "86154a4ba6e91385";
static final String spanIdHex = "4d1e00c0db9010db";
static final long traceId = Util.lowerHexToUnsignedLong(traceIdHex);
static final long spanId = Util.lowerHexToUnsignedLong(spanIdHex);
static final Endpoint frontend = Endpoint.create("frontend", 127 << 24 | 1);
static final Endpoint backend = Endpoint.builder()
.serviceName("backend")
.ipv4(192 << 24 | 168 << 16 | 99 << 8 | 101)
.port(9000)
.build();

static final zipkin.internal.v2.Endpoint frontend2 = zipkin.internal.v2.Endpoint.newBuilder()
.serviceName("frontend")
.ip("127.0.0.1")
.build();
static final zipkin.internal.v2.Endpoint backend2 = zipkin.internal.v2.Endpoint.newBuilder()
.serviceName("backend")
.ip("192.168.99.101")
.port(9000)
.build();
@Benchmark
public zipkin.Span buildClientOnlySpan() {
return buildClientOnlySpan(zipkin.Span.builder());
Expand Down Expand Up @@ -108,18 +118,18 @@ public zipkin.Span buildClientOnlySpan_clear() {

@Benchmark
public Span buildClientOnlySpan2() {
return buildClientOnlySpan2(Span.builder());
return buildClientOnlySpan2(Span.newBuilder());
}

static Span buildClientOnlySpan2(Span.Builder builder) {
return builder
.traceId(traceId)
.parentId(traceId)
.id(spanId)
.traceId(traceIdHex)
.parentId(traceIdHex)
.id(spanIdHex)
.name("get")
.kind(Span.Kind.CLIENT)
.localEndpoint(frontend)
.remoteEndpoint(backend)
.localEndpoint(frontend2)
.remoteEndpoint(backend2)
.timestamp(1472470996199000L)
.duration(207000L)
.addAnnotation(1472470996238000L, Constants.WIRE_SEND)
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/main/resources/span-client.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"traceId": "86154a4ba6e91385",
"traceId": "4d1e00c0db9010db86154a4ba6e91385",
"name": "get",
"id": "4d1e00c0db9010db",
"parentId": "86154a4ba6e91385",
Expand Down
8 changes: 4 additions & 4 deletions benchmarks/src/main/resources/span2.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[{
"traceId": "86154a4ba6e91385",
{
"traceId": "4d1e00c0db9010db86154a4ba6e91385",
"parentId": "86154a4ba6e91385",
"id": "4d1e00c0db9010db",
"kind": "CLIENT",
Expand All @@ -8,7 +8,7 @@
"duration": 207000,
"localEndpoint": {
"serviceName": "frontend",
"ipv4": "127.0.0.1"
"ipv6": "7::0.128.128.127"
},
"remoteEndpoint": {
"serviceName": "backend",
Expand All @@ -29,4 +29,4 @@
"http.path": "/api",
"clnt/finagle.version": "6.45.0"
}
}]
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@
import zipkin.collector.kafka.KafkaCollector.Builder;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.V2SpanConverter;
import zipkin.internal.v2.codec.MessageEncoder;
import zipkin.internal.v2.codec.Encoder;
import zipkin.internal.v2.codec.BytesEncoder;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.AsyncSpanStore;
import zipkin.storage.SpanStore;
Expand Down Expand Up @@ -147,9 +146,9 @@ public void messageWithMultipleSpans_json2() throws Exception {
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1])
);

byte[] message = MessageEncoder.JSON_BYTES.encode(asList(
Encoder.JSON.encode(V2SpanConverter.fromSpan(spans.get(0)).get(0)),
Encoder.JSON.encode(V2SpanConverter.fromSpan(spans.get(1)).get(0))
byte[] message = BytesEncoder.JSON.encodeList(asList(
V2SpanConverter.fromSpan(spans.get(0)).get(0),
V2SpanConverter.fromSpan(spans.get(1)).get(0)
));

producer.send(new KeyedMessage<>(builder.topic, message));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@
import zipkin.collector.kafka10.KafkaCollector.Builder;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.V2SpanConverter;
import zipkin.internal.v2.codec.MessageEncoder;
import zipkin.internal.v2.codec.Encoder;
import zipkin.internal.v2.codec.BytesEncoder;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.AsyncSpanStore;
import zipkin.storage.SpanStore;
Expand Down Expand Up @@ -201,9 +200,9 @@ public void messageWithMultipleSpans_json2() throws Exception {
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1])
);

byte[] message = MessageEncoder.JSON_BYTES.encode(asList(
Encoder.JSON.encode(V2SpanConverter.fromSpan(spans.get(0)).get(0)),
Encoder.JSON.encode(V2SpanConverter.fromSpan(spans.get(1)).get(0))
byte[] message = BytesEncoder.JSON.encodeList(asList(
V2SpanConverter.fromSpan(spans.get(0)).get(0),
V2SpanConverter.fromSpan(spans.get(1)).get(0)
));

produceSpans(message, builder.topic);
Expand Down
32 changes: 5 additions & 27 deletions zipkin-junit/src/main/java/zipkin/junit/ZipkinDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package zipkin.junit;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import javax.annotation.Nullable;
Expand All @@ -32,13 +31,14 @@
import zipkin.collector.CollectorMetrics;
import zipkin.internal.V2JsonSpanDecoder;
import zipkin.internal.V2StorageComponent;
import zipkin.internal.v2.codec.Encoder;
import zipkin.internal.v2.codec.BytesEncoder;
import zipkin.internal.v2.internal.Platform;
import zipkin.storage.Callback;
import zipkin.storage.QueryRequest;
import zipkin.storage.SpanStore;

import static zipkin.internal.Util.lowerHexToUnsignedLong;
import static zipkin.internal.v2.Span.normalizeTraceId;

final class ZipkinDispatcher extends Dispatcher {
static final long DEFAULT_LOOKBACK = 86400000L; // 1 day in millis
Expand Down Expand Up @@ -131,39 +131,17 @@ MockResponse queryV2(HttpUrl url) throws IOException {
return jsonResponse(Codec.JSON.writeDependencyLinks(result));
} else if (url.encodedPath().equals("/api/v2/traces")) {
List<List<zipkin.internal.v2.Span>> traces = store2.getTraces(toQueryRequest2(url)).execute();
ByteArrayOutputStream bout = new ByteArrayOutputStream();
bout.write('[');
for (int i = 0, length = traces.size(); i < length; ) {
List<zipkin.internal.v2.Span> trace = traces.get(i);
writeTrace(bout, trace);
if (++i < length) bout.write(',');
}
bout.write(']');
return jsonResponse(bout.toByteArray());
return jsonResponse(BytesEncoder.JSON.encodeNestedList(traces));
} else if (url.encodedPath().startsWith("/api/v2/trace/")) {
String traceIdHex = url.encodedPath().replace("/api/v2/trace/", "");
long traceIdHigh = traceIdHex.length() == 32 ? lowerHexToUnsignedLong(traceIdHex, 0) : 0L;
long traceIdLow = lowerHexToUnsignedLong(traceIdHex);
List<zipkin.internal.v2.Span> trace = store2.getTrace(traceIdHigh, traceIdLow).execute();
List<zipkin.internal.v2.Span> trace = store2.getTrace(normalizeTraceId(traceIdHex)).execute();
if (!trace.isEmpty()) {
ByteArrayOutputStream bout = new ByteArrayOutputStream();
writeTrace(bout, trace);
return jsonResponse(bout.toByteArray());
return jsonResponse(BytesEncoder.JSON.encodeList(trace));
}
}
return new MockResponse().setResponseCode(404);
}

static void writeTrace(ByteArrayOutputStream bout, List<zipkin.internal.v2.Span> trace)
throws IOException {
bout.write('[');
for (int i = 0, length = trace.size(); i < length; ) {
bout.write(Encoder.JSON.encode(trace.get(i)));
if (++i < length) bout.write(',');
}
bout.write(']');
}

MockResponse acceptSpans(RecordedRequest request, SpanDecoder decoder) {
metrics.incrementMessages();
byte[] body = request.getBody().readByteArray();
Expand Down
5 changes: 0 additions & 5 deletions zipkin-junit/src/test/java/zipkin/junit/ITHttpStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,8 @@

import java.io.IOException;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import zipkin.Span;
import zipkin.TestObjects;

import static org.assertj.core.api.Assertions.assertThat;

@RunWith(Enclosed.class)
public class ITHttpStorage {
Expand Down
9 changes: 4 additions & 5 deletions zipkin-junit/src/test/java/zipkin/junit/ZipkinRuleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@
import zipkin.Span;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.V2SpanConverter;
import zipkin.internal.v2.codec.MessageEncoder;
import zipkin.internal.v2.codec.Encoder;
import zipkin.internal.v2.codec.BytesEncoder;

import static java.lang.String.format;
import static java.util.Arrays.asList;
Expand Down Expand Up @@ -69,9 +68,9 @@ public void getTraces_storedViaPostVersion2() throws IOException {
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1])
);

byte[] message = MessageEncoder.JSON_BYTES.encode(asList(
Encoder.JSON.encode(V2SpanConverter.fromSpan(spans.get(0)).get(0)),
Encoder.JSON.encode(V2SpanConverter.fromSpan(spans.get(1)).get(0))
byte[] message = BytesEncoder.JSON.encodeList(asList(
V2SpanConverter.fromSpan(spans.get(0)).get(0),
V2SpanConverter.fromSpan(spans.get(1)).get(0)
));

// write the span to the zipkin using http api v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import okhttp3.RequestBody;
import okio.Buffer;
import zipkin.internal.v2.Span;
import zipkin.internal.v2.codec.Encoder;
import zipkin.internal.v2.codec.BytesEncoder;
import zipkin.internal.v2.storage.SpanConsumer;

/** Implements the span consumer interface by forwarding requests over http. */
Expand All @@ -36,7 +36,7 @@ final class HttpV2SpanConsumer implements SpanConsumer {
Buffer json = new Buffer();
json.writeByte('[');
for (int i = 0, length = spans.size(); i < length; ) {
json.write(Encoder.JSON.encode(spans.get(i)));
json.write(BytesEncoder.JSON.encode(spans.get(i)));
if (++i < length) json.writeByte(',');
}
json.writeByte(']');
Expand Down
12 changes: 5 additions & 7 deletions zipkin-junit/src/test/java/zipkin/junit/v2/HttpV2SpanStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@
import okhttp3.Request;
import zipkin.Codec;
import zipkin.DependencyLink;
import zipkin.internal.Util;
import zipkin.internal.v2.Call;
import zipkin.internal.v2.Span;
import zipkin.internal.v2.codec.Decoder;
import zipkin.internal.v2.codec.BytesDecoder;
import zipkin.internal.v2.storage.QueryRequest;
import zipkin.internal.v2.storage.SpanStore;

Expand All @@ -47,14 +46,13 @@ final class HttpV2SpanStore implements SpanStore {
maybeAddQueryParam(url, "lookback", request.lookback());
maybeAddQueryParam(url, "limit", request.limit());
return factory.newCall(new Request.Builder().url(url.build()).build(),
content -> Decoder.JSON.decodeNestedList(content.readByteArray()));
content -> BytesDecoder.JSON.decodeNestedList(content.readByteArray()));
}

@Override public Call<List<Span>> getTrace(long traceIdHigh, long traceIdLow) {
String traceIdHex = Util.toLowerHex(traceIdHigh, traceIdLow);
@Override public Call<List<Span>> getTrace(String traceId) {
return factory.newCall(new Request.Builder()
.url(factory.baseUrl.resolve("/api/v2/trace/" + traceIdHex))
.build(), content -> Decoder.JSON.decodeList(content.readByteArray()))
.url(factory.baseUrl.resolve("/api/v2/trace/" + Span.normalizeTraceId(traceId)))
.build(), content -> BytesDecoder.JSON.decodeList(content.readByteArray()))
.handleError(((error, callback) -> {
if (error instanceof HttpException && ((HttpException) error).code == 404) {
callback.onSuccess(Collections.emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.context.annotation.Import;
import zipkin.autoconfigure.ui.ZipkinUiAutoConfiguration;
import zipkin.server.brave.BraveConfiguration;

@Target(ElementType.TYPE)
Expand Down
Loading

0 comments on commit 1a62537

Please sign in to comment.