Skip to content

Commit

Permalink
Accepts Zipkin v2 Span format in all current transports
Browse files Browse the repository at this point in the history
This accepts the json format from #1499 on current transports. It does
so by generalizing format detection from the two Kafka libraries, and
a new `SpanDecoder` interface. Types are still internal, but this allows
us to proceed with other work in #1644, including implementing reporters
in any language.

Concretely, you can send a json list of span2 format as a Kafka or Http
message. If using http, use the /api/v2/spans endpoint like so:

```bash
$ curl -X POST -s localhost:9411/api/v2/spans -H'Content-Type: application/json' -d'[{
  "timestamp_millis": 1502101460678,
  "traceId": "9032b04972e475c5",
  "id": "9032b04972e475c5",
  "kind": "SERVER",
  "name": "get",
  "timestamp": 1502101460678880,
  "duration": 612898,
  "localEndpoint": {
    "serviceName": "brave-webmvc-example",
    "ipv4": "192.168.1.113"
  },
  "remoteEndpoint": {
    "serviceName": "",
    "ipv4": "127.0.0.1",
    "port": 60149
  },
  "tags": {
    "error": "500 Internal Server Error",
    "http.path": "/a"
  }
}]'
```
  • Loading branch information
Adrian Cole committed Aug 12, 2017
1 parent f4a333e commit a0e7f7d
Show file tree
Hide file tree
Showing 14 changed files with 391 additions and 107 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -13,10 +13,8 @@
*/
package zipkin.collector.kafka;

import java.util.Collections;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import zipkin.Codec;
import zipkin.collector.Collector;
import zipkin.collector.CollectorMetrics;

Expand Down Expand Up @@ -47,24 +45,7 @@ public void run() {
continue;
}

// In TBinaryProtocol encoding, the first byte is the TType, in a range 0-16
// .. If the first byte isn't in that range, it isn't a thrift.
//
// When byte(0) == '[' (91), assume it is a list of json-encoded spans
//
// When byte(0) <= 16, assume it is a TBinaryProtocol-encoded thrift
// .. When serializing a Span (Struct), the first byte will be the type of a field
// .. When serializing a List[ThriftSpan], the first byte is the member type, TType.STRUCT(12)
// .. As ThriftSpan has no STRUCT fields: so, if the first byte is TType.STRUCT(12), it is a list.
if (bytes[0] == '[') {
collector.acceptSpans(bytes, Codec.JSON, NOOP);
} else {
if (bytes[0] == 12 /* TType.STRUCT */) {
collector.acceptSpans(bytes, Codec.THRIFT, NOOP);
} else {
collector.acceptSpans(Collections.singletonList(bytes), Codec.THRIFT, NOOP);
}
}
collector.acceptSpans(bytes, NOOP);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -13,6 +13,7 @@
*/
package zipkin.collector.kafka;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -29,12 +30,16 @@
import zipkin.TestObjects;
import zipkin.collector.InMemoryCollectorMetrics;
import zipkin.collector.kafka.KafkaCollector.Builder;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.Span2Codec;
import zipkin.internal.Span2Converter;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.AsyncSpanStore;
import zipkin.storage.SpanStore;
import zipkin.storage.StorageComponent;

import static org.assertj.core.api.Assertions.assertThat;
import static zipkin.TestObjects.LOTS_OF_SPANS;
import static zipkin.TestObjects.TRACE;

public class KafkaCollectorTest {
Expand Down Expand Up @@ -130,6 +135,32 @@ public void messageWithMultipleSpans_json() throws Exception {
assertThat(kafkaMetrics.spans()).isEqualTo(TestObjects.TRACE.size());
}

/** Ensures list encoding works: a version 2 json encoded list of spans */
@Test
public void messageWithMultipleSpans_json2() throws Exception {
Builder builder = builder("multiple_spans_json2");

List<Span> spans = Arrays.asList(
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[0]),
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1])
);

byte[] bytes = Span2Codec.JSON.writeSpans(Arrays.asList(
Span2Converter.fromSpan(spans.get(0)).get(0),
Span2Converter.fromSpan(spans.get(1)).get(0)
));

producer.send(new KeyedMessage<>(builder.topic, bytes));

try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {
assertThat(recvdSpans.take()).containsAll(spans);
}

assertThat(kafkaMetrics.messages()).isEqualTo(1);
assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length);
assertThat(kafkaMetrics.spans()).isEqualTo(spans.size());
}

/** Ensures malformed spans don't hang the collector */
@Test
public void skipsMalformedData() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin.Codec;
import zipkin.collector.Collector;
import zipkin.collector.CollectorMetrics;

Expand Down Expand Up @@ -74,24 +73,7 @@ public void run() {
if (bytes.length == 0) {
metrics.incrementMessagesDropped();
} else {
// In TBinaryProtocol encoding, the first byte is the TType, in a range 0-16
// .. If the first byte isn't in that range, it isn't a thrift.
//
// When byte(0) == '[' (91), assume it is a list of json-encoded spans
//
// When byte(0) <= 16, assume it is a TBinaryProtocol-encoded thrift
// .. When serializing a Span (Struct), the first byte will be the type of a field
// .. When serializing a List[ThriftSpan], the first byte is the member type, TType.STRUCT(12)
// .. As ThriftSpan has no STRUCT fields: so, if the first byte is TType.STRUCT(12), it is a list.
if (bytes[0] == '[') {
collector.acceptSpans(bytes, Codec.JSON, NOOP);
} else {
if (bytes[0] == 12 /* TType.STRUCT */) {
collector.acceptSpans(bytes, Codec.THRIFT, NOOP);
} else {
collector.acceptSpans(Collections.singletonList(bytes), Codec.THRIFT, NOOP);
}
}
collector.acceptSpans(bytes, NOOP);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.github.charithe.kafka.EphemeralKafkaBroker;
import com.github.charithe.kafka.KafkaJunitRule;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CopyOnWriteArraySet;
Expand All @@ -36,12 +37,16 @@
import zipkin.Span;
import zipkin.collector.InMemoryCollectorMetrics;
import zipkin.collector.kafka10.KafkaCollector.Builder;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.Span2Codec;
import zipkin.internal.Span2Converter;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.AsyncSpanStore;
import zipkin.storage.SpanStore;
import zipkin.storage.StorageComponent;

import static org.assertj.core.api.Assertions.assertThat;
import static zipkin.TestObjects.LOTS_OF_SPANS;
import static zipkin.TestObjects.TRACE;

public class KafkaCollectorTest {
Expand Down Expand Up @@ -184,6 +189,33 @@ public void messageWithMultipleSpans_json() throws Exception {
assertThat(kafkaMetrics.spans()).isEqualTo(TRACE.size());
}

/** Ensures list encoding works: a version 2 json encoded list of spans */
@Test
public void messageWithMultipleSpans_json2() throws Exception {
Builder builder = builder("multiple_spans_json2");

List<Span> spans = Arrays.asList(
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[0]),
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1])
);

byte[] bytes = Span2Codec.JSON.writeSpans(Arrays.asList(
Span2Converter.fromSpan(spans.get(0)).get(0),
Span2Converter.fromSpan(spans.get(1)).get(0)
));

produceSpans(bytes, builder.topic);

try (KafkaCollector collector = builder.build()) {
collector.start();
assertThat(receivedSpans.take()).containsAll(spans);
}

assertThat(kafkaMetrics.messages()).isEqualTo(1);
assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length);
assertThat(kafkaMetrics.spans()).isEqualTo(spans.size());
}

/** Ensures malformed spans don't hang the collector */
@Test
public void skipsMalformedData() throws Exception {
Expand Down
70 changes: 41 additions & 29 deletions zipkin-junit/src/main/java/zipkin/junit/ZipkinDispatcher.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -25,8 +25,10 @@
import zipkin.Codec;
import zipkin.DependencyLink;
import zipkin.Span;
import zipkin.SpanDecoder;
import zipkin.collector.Collector;
import zipkin.collector.CollectorMetrics;
import zipkin.internal.Span2JsonDecoder;
import zipkin.storage.Callback;
import zipkin.storage.QueryRequest;
import zipkin.storage.SpanStore;
Expand All @@ -35,6 +37,8 @@
import static zipkin.internal.Util.lowerHexToUnsignedLong;

final class ZipkinDispatcher extends Dispatcher {
static final SpanDecoder JSON2_DECODER = new Span2JsonDecoder();

private final SpanStore store;
private final Collector consumer;
private final CollectorMetrics metrics;
Expand Down Expand Up @@ -77,42 +81,50 @@ public MockResponse dispatch(RecordedRequest request) {
}
} else if (request.getMethod().equals("POST")) {
if (url.encodedPath().equals("/api/v1/spans")) {
metrics.incrementMessages();
byte[] body = request.getBody().readByteArray();
String encoding = request.getHeader("Content-Encoding");
if (encoding != null && encoding.contains("gzip")) {
try {
Buffer result = new Buffer();
GzipSource source = new GzipSource(new Buffer().write(body));
while (source.read(result, Integer.MAX_VALUE) != -1) ;
body = result.readByteArray();
} catch (IOException e) {
metrics.incrementMessagesDropped();
return new MockResponse().setResponseCode(400).setBody("Cannot gunzip spans");
}
}
String type = request.getHeader("Content-Type");
Codec codec = type != null && type.contains("/x-thrift") ? Codec.THRIFT : Codec.JSON;

final MockResponse result = new MockResponse();
consumer.acceptSpans(body, codec, new Callback<Void>() {
@Override public void onSuccess(Void value) {
result.setResponseCode(202);
}

@Override public void onError(Throwable t) {
String message = t.getMessage();
result.setBody(message).setResponseCode(message.startsWith("Cannot store") ? 500 : 400);
}
});
return result;
SpanDecoder decoder = type != null && type.contains("/x-thrift")
? SpanDecoder.THRIFT_DECODER
: SpanDecoder.JSON_DECODER;
return acceptSpans(request, decoder);
} else if (url.encodedPath().equals("/api/v2/spans")) {
return acceptSpans(request, JSON2_DECODER);
}
} else { // unsupported method
return new MockResponse().setResponseCode(405);
}
return new MockResponse().setResponseCode(404);
}

MockResponse acceptSpans(RecordedRequest request, SpanDecoder decoder) {
metrics.incrementMessages();
byte[] body = request.getBody().readByteArray();
String encoding = request.getHeader("Content-Encoding");
if (encoding != null && encoding.contains("gzip")) {
try {
Buffer result = new Buffer();
GzipSource source = new GzipSource(new Buffer().write(body));
while (source.read(result, Integer.MAX_VALUE) != -1) ;
body = result.readByteArray();
} catch (IOException e) {
metrics.incrementMessagesDropped();
return new MockResponse().setResponseCode(400).setBody("Cannot gunzip spans");
}
}

final MockResponse result = new MockResponse();
consumer.acceptSpans(body, decoder, new Callback<Void>() {
@Override public void onSuccess(Void value) {
result.setResponseCode(202);
}

@Override public void onError(Throwable t) {
String message = t.getMessage();
result.setBody(message).setResponseCode(message.startsWith("Cannot store") ? 500 : 400);
}
});
return result;
}

static QueryRequest toQueryRequest(HttpUrl url) {
return QueryRequest.builder().serviceName(url.queryParameter("serviceName"))
.spanName(url.queryParameter("spanName"))
Expand Down
27 changes: 27 additions & 0 deletions zipkin-junit/src/test/java/zipkin/junit/ZipkinRuleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import zipkin.Annotation;
import zipkin.Codec;
import zipkin.Span;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.Span2Codec;
import zipkin.internal.Span2Converter;

import static java.lang.String.format;
import static java.util.Arrays.asList;
Expand Down Expand Up @@ -58,6 +61,30 @@ public void getTraces_storedViaPost() throws IOException {
.containsOnly(trace);
}

@Test
public void getTraces_storedViaPostVersion2() throws IOException {
List<Span> spans = Arrays.asList(
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[0]),
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1])
);

byte[] bytes = Span2Codec.JSON.writeSpans(Arrays.asList(
Span2Converter.fromSpan(spans.get(0)).get(0),
Span2Converter.fromSpan(spans.get(1)).get(0)
));

// write the span to the zipkin using http api v2
Response response = client.newCall(new Request.Builder()
.url(zipkin.httpUrl() + "/api/v2/spans")
.post(RequestBody.create(MediaType.parse("application/json"), bytes)).build()
).execute();
assertThat(response.code()).isEqualTo(202);

// read the traces directly
assertThat(zipkin.getTraces())
.containsOnly(asList(spans.get(0)), asList(spans.get(1)));
}

/** The rule is here to help debugging. Even partial spans should be returned */
@Test
public void getTraces_whenMissingTimestamps() throws IOException {
Expand Down
Loading

0 comments on commit a0e7f7d

Please sign in to comment.