diff --git a/interop/pom.xml b/interop/pom.xml
index 5c31efcb004..d5c1de05959 100644
--- a/interop/pom.xml
+++ b/interop/pom.xml
@@ -29,7 +29,9 @@
${project.basedir}/..
- 1.33.2
+
+ 0.9.3
+ 1.34.0
2.2.5
@@ -51,6 +53,18 @@
${zipkin-scala.version}
+
+ io.zipkin
+ zipkin-scrooge
+ ${zipkin-scala.version}
+
+
+
+ org.apache.thrift
+ libthrift
+ ${libthrift.version}
+
+
${project.groupId}
spanstore-jdbc
diff --git a/interop/src/main/java/zipkin/interop/ScalaSpanStoreAdapter.java b/interop/src/main/java/zipkin/interop/ScalaSpanStoreAdapter.java
index 2d1cb4dfe4f..7713120880e 100644
--- a/interop/src/main/java/zipkin/interop/ScalaSpanStoreAdapter.java
+++ b/interop/src/main/java/zipkin/interop/ScalaSpanStoreAdapter.java
@@ -13,17 +13,17 @@
*/
package zipkin.interop;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.twitter.util.Future;
import com.twitter.zipkin.common.Span;
-import com.twitter.zipkin.json.JsonSpan;
+import com.twitter.zipkin.conversions.thrift$;
import com.twitter.zipkin.json.ZipkinJson$;
import com.twitter.zipkin.storage.QueryRequest;
-import java.io.IOException;
import java.util.ArrayList;
-import java.util.stream.Collectors;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TList;
+import org.apache.thrift.protocol.TType;
+import org.apache.thrift.transport.TMemoryBuffer;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.JavaConversions;
@@ -40,11 +40,9 @@
* Adapts {@link SpanStore} to a scala {@link com.twitter.zipkin.storage.SpanStore} in order to test
* against its {@link com.twitter.zipkin.storage.SpanStoreSpec} for interoperability reasons.
*
- * This implementation uses json to ensure structures are compatible.
+ * This implementation uses thrift TBinaryProtocol to ensure structures are compatible.
*/
public final class ScalaSpanStoreAdapter extends com.twitter.zipkin.storage.SpanStore {
- private static final ObjectMapper scalaCodec = ZipkinJson$.MODULE$;
-
private final SpanStore spanStore;
public ScalaSpanStoreAdapter(SpanStore spanStore) {
@@ -111,12 +109,11 @@ public void close() {
@Nullable
private static java.util.List convert(java.util.List input) {
- byte[] bytes = Codec.JSON.writeSpans(input);
+ byte[] bytes = Codec.THRIFT.writeSpans(input);
try {
- TypeReference> ref = new TypeReference>(){};
- java.util.List read = scalaCodec.readValue(bytes, ref);
- return read.stream().map(JsonSpan::invert).collect(Collectors.toList());
- } catch (IOException e) {
+ List read = thrift$.MODULE$.thriftListToSpans(bytes);
+ return JavaConversions.seqAsJavaList(read);
+ } catch (RuntimeException e) {
e.printStackTrace();
return null;
}
@@ -125,9 +122,19 @@ private static java.util.List convert(java.util.List input) {
@Nullable
private static java.util.List invert(Seq input) {
try {
- byte[] bytes = scalaCodec.writeValueAsBytes(input);
- return Codec.JSON.readSpans(bytes);
- } catch (JsonProcessingException e) {
+ TMemoryBuffer transport = new TMemoryBuffer(0);
+ TBinaryProtocol oproto = new TBinaryProtocol(transport);
+ oproto.writeListBegin(new TList(TType.STRUCT, input.size()));
+ Iterator iterator = input.iterator();
+ while (iterator.hasNext()) {
+ com.twitter.zipkin.thriftscala.Span thriftSpan =
+ thrift$.MODULE$.spanToThriftSpan(iterator.next()).toThrift();
+ thriftSpan.write(oproto);
+ }
+ oproto.writeListEnd();
+ byte[] bytes = transport.getArray();
+ return Codec.THRIFT.readSpans(bytes);
+ } catch (Exception e) {
e.printStackTrace();
return null;
}
diff --git a/pom.xml b/pom.xml
index 127fc78aa4f..3bae2b58512 100644
--- a/pom.xml
+++ b/pom.xml
@@ -42,7 +42,6 @@
1.1.0
1.6.0
1.3.2.RELEASE
- 0.9.3
+
maven-shade-plugin
@@ -87,10 +83,6 @@
com.squareup.moshi
zipkin.internal.moshi
-
- org.apache.thrift
- zipkin.internal.libthrift
-
diff --git a/zipkin/src/main/java/zipkin/Span.java b/zipkin/src/main/java/zipkin/Span.java
index fe21213b78a..de16ca0ec36 100644
--- a/zipkin/src/main/java/zipkin/Span.java
+++ b/zipkin/src/main/java/zipkin/Span.java
@@ -256,9 +256,9 @@ public Builder duration(@Nullable Long duration) {
*
* @see Span#annotations
*/
- public Builder annotations(Annotation... annotations) {
+ public Builder annotations(Collection annotations) {
this.annotations.clear();
- Collections.addAll(this.annotations, annotations);
+ this.annotations.addAll(annotations);
return this;
}
@@ -273,9 +273,9 @@ public Builder addAnnotation(Annotation annotation) {
*
* @see Span#binaryAnnotations
*/
- public Builder binaryAnnotations(BinaryAnnotation... binaryAnnotations) {
+ public Builder binaryAnnotations(Collection binaryAnnotations) {
this.binaryAnnotations.clear();
- Collections.addAll(this.binaryAnnotations, binaryAnnotations);
+ this.binaryAnnotations.addAll(binaryAnnotations);
return this;
}
diff --git a/zipkin/src/main/java/zipkin/internal/CorrectForClockSkew.java b/zipkin/src/main/java/zipkin/internal/CorrectForClockSkew.java
index f8f09da5e13..bffa2f0b483 100644
--- a/zipkin/src/main/java/zipkin/internal/CorrectForClockSkew.java
+++ b/zipkin/src/main/java/zipkin/internal/CorrectForClockSkew.java
@@ -78,25 +78,25 @@ private static void adjust(Node node, @Nullable ClockSkew skewFromParent)
}
/** If any annotation has an IP with skew associated, adjust accordingly. */
- private static Span adjustTimestamps(Span span, ClockSkew clockSkew) {
- Annotation[] annotations = null;
+ private static Span adjustTimestamps(Span span, ClockSkew skew) {
+ List annotations = null;
for (int i = 0, length = span.annotations.size(); i < length; i++) {
Annotation a = span.annotations.get(i);
if (a.endpoint == null) continue;
- if (clockSkew.endpoint.ipv4 == a.endpoint.ipv4) {
- if (annotations == null) annotations = span.annotations.toArray(new Annotation[length]);
- annotations[i] = new Annotation.Builder(a).timestamp(a.timestamp - clockSkew.skew).build();
+ if (skew.endpoint.ipv4 == a.endpoint.ipv4) {
+ if (annotations == null) annotations = new ArrayList<>(span.annotations);
+ annotations.set(i, new Annotation.Builder(a).timestamp(a.timestamp - skew.skew).build());
}
}
if (annotations != null) {
- return new Span.Builder(span).timestamp(annotations[0].timestamp).annotations(annotations).build();
+ return new Span.Builder(span).timestamp(annotations.get(0).timestamp).annotations(annotations).build();
}
// Search for a local span on the skewed endpoint
for (int i = 0, length = span.binaryAnnotations.size(); i < length; i++) {
BinaryAnnotation b = span.binaryAnnotations.get(i);
if (b.endpoint == null) continue;
- if (b.key.equals(Constants.LOCAL_COMPONENT) && clockSkew.endpoint.ipv4 == b.endpoint.ipv4) {
- return new Span.Builder(span).timestamp(span.timestamp - clockSkew.skew).build();
+ if (b.key.equals(Constants.LOCAL_COMPONENT) && skew.endpoint.ipv4 == b.endpoint.ipv4) {
+ return new Span.Builder(span).timestamp(span.timestamp - skew.skew).build();
}
}
return span;
diff --git a/zipkin/src/main/java/zipkin/internal/ThriftCodec.java b/zipkin/src/main/java/zipkin/internal/ThriftCodec.java
index 693d38a7f2e..728858130d4 100644
--- a/zipkin/src/main/java/zipkin/internal/ThriftCodec.java
+++ b/zipkin/src/main/java/zipkin/internal/ThriftCodec.java
@@ -13,21 +13,11 @@
*/
package zipkin.internal;
-import java.nio.ByteBuffer;
+import java.io.EOFException;
import java.util.ArrayList;
import java.util.List;
import okio.Buffer;
import okio.ByteString;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TField;
-import org.apache.thrift.protocol.TList;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.protocol.TStruct;
-import org.apache.thrift.protocol.TType;
-import org.apache.thrift.transport.TMemoryInputTransport;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
import zipkin.Annotation;
import zipkin.BinaryAnnotation;
import zipkin.Codec;
@@ -35,7 +25,6 @@
import zipkin.Endpoint;
import zipkin.Span;
-import static org.apache.thrift.protocol.TProtocolUtil.skip;
import static zipkin.internal.Util.checkArgument;
/**
@@ -43,13 +32,29 @@
* core jar. The hard coding not only keeps us with a single data-model, it also allows the minified
* core jar free of SLFJ classes otherwise included in generated types.
*
- * The implementation appears scarier than it is. It was made by mechanically copying in the
- * method bodies of generated thrift classes, specifically {@link org.apache.thrift.TBase#read} and
- * {@link org.apache.thrift.TBase#write}. Zipkin uses thrifts that have been very stable in the last
- * few years. Moreover, users consolidate to {@link TBinaryProtocol} at rest, even if the structs
- * are later compressed with snappy.
+ * This is an Okio-native TBinaryProtocol codec. Natively doing this reduces dependencies and
+ * array duplication.
*/
public final class ThriftCodec implements Codec {
+ // break vs decode huge structs, like > 1MB strings or 10k spans in a trace.
+ private static final int STRING_LENGTH_LIMIT = 1 * 1024 * 1024;
+ private static final int CONTAINER_LENGTH_LIMIT = 10 * 1000;
+ // break vs recursing infinitely when skipping data
+ private static int MAX_SKIP_DEPTH = 2147483647;
+
+ // taken from org.apache.thrift.protocol.TType
+ private static final byte TYPE_STOP = 0;
+ private static final byte TYPE_BOOL = 2;
+ private static final byte TYPE_BYTE = 3;
+ private static final byte TYPE_DOUBLE = 4;
+ private static final byte TYPE_I16 = 6;
+ private static final byte TYPE_I32 = 8;
+ private static final byte TYPE_I64 = 10;
+ private static final byte TYPE_STRING = 11;
+ private static final byte TYPE_STRUCT = 12;
+ private static final byte TYPE_MAP = 13;
+ private static final byte TYPE_SET = 14;
+ private static final byte TYPE_LIST = 15;
@Override
public Span readSpan(byte[] bytes) {
@@ -77,11 +82,11 @@ public byte[] writeTraces(List> value) {
}
interface ThriftWriter {
- void write(T value, TProtocol oprot) throws TException;
+ void write(T value, Buffer buffer);
}
interface ThriftReader {
- T read(TProtocol iprot) throws TException;
+ T read(Buffer buffer) throws EOFException;
}
interface ThriftAdapter extends ThriftReader, ThriftWriter {
@@ -89,392 +94,233 @@ interface ThriftAdapter extends ThriftReader, ThriftWriter {
static final ThriftAdapter ENDPOINT_ADAPTER = new ThriftAdapter() {
- private final TStruct STRUCT_DESC = new TStruct("Endpoint");
- private final TField IPV4_FIELD_DESC = new TField("ipv4", TType.I32, (short) 1);
- private final TField PORT_FIELD_DESC = new TField("port", TType.I16, (short) 2);
- private final TField SERVICE_NAME_FIELD_DESC = new TField("service_name", TType.STRING, (short) 3);
+ final Field IPV4 = new Field(TYPE_I32, 1);
+ final Field PORT = new Field(TYPE_I16, 2);
+ final Field SERVICE_NAME = new Field(TYPE_STRING, 3);
@Override
- public Endpoint read(TProtocol iprot) throws TException {
+ public Endpoint read(Buffer buffer) throws EOFException {
Endpoint.Builder result = new Endpoint.Builder();
- TField field;
- iprot.readStructBegin();
+ Field field;
+
while (true) {
- field = iprot.readFieldBegin();
- if (field.type == TType.STOP) {
- break;
- }
- switch (field.id) {
- case 1: // IPV4
- if (field.type == TType.I32) {
- result.ipv4(iprot.readI32());
- } else {
- skip(iprot, field.type);
- }
- break;
- case 2: // PORT
- if (field.type == TType.I16) {
- result.port(iprot.readI16());
- } else {
- skip(iprot, field.type);
- }
- break;
- case 3: // SERVICE_NAME
- if (field.type == TType.STRING) {
- result.serviceName(iprot.readString());
- } else {
- skip(iprot, field.type);
- }
- break;
- default:
- skip(iprot, field.type);
+ field = Field.read(buffer);
+ if (field.type == TYPE_STOP) break;
+
+ if (field.equals(IPV4)) {
+ result.ipv4(buffer.readInt());
+ } else if (field.equals(PORT)) {
+ result.port(buffer.readShort());
+ } else if (field.equals(SERVICE_NAME)) {
+ result.serviceName(readUtf8(buffer));
+ } else {
+ skip(buffer, field.type);
}
- iprot.readFieldEnd();
}
- iprot.readStructEnd();
return result.build();
}
@Override
- public void write(Endpoint value, TProtocol oprot) throws TException {
- oprot.writeStructBegin(STRUCT_DESC);
-
- oprot.writeFieldBegin(IPV4_FIELD_DESC);
- oprot.writeI32(value.ipv4);
- oprot.writeFieldEnd();
- oprot.writeFieldBegin(PORT_FIELD_DESC);
- oprot.writeI16(value.port == null ? 0 : value.port);
- oprot.writeFieldEnd();
- oprot.writeFieldBegin(SERVICE_NAME_FIELD_DESC);
- oprot.writeString(value.serviceName);
- oprot.writeFieldEnd();
-
- oprot.writeFieldStop();
- oprot.writeStructEnd();
+ public void write(Endpoint value, Buffer buffer) {
+ IPV4.write(buffer);
+ buffer.writeInt(value.ipv4);
+
+ PORT.write(buffer);
+ buffer.writeShort(value.port == null ? 0 : value.port);
+
+ SERVICE_NAME.write(buffer);
+ writeUtf8(buffer, value.serviceName);
+
+ buffer.writeByte(TYPE_STOP);
}
};
static final ThriftAdapter ANNOTATION_ADAPTER = new ThriftAdapter() {
- private final TStruct STRUCT_DESC = new TStruct("Annotation");
- private final TField TIMESTAMP_FIELD_DESC = new TField("timestamp", TType.I64, (short) 1);
- private final TField VALUE_FIELD_DESC = new TField("value", TType.STRING, (short) 2);
- private final TField HOST_FIELD_DESC = new TField("host", TType.STRUCT, (short) 3);
+ final Field TIMESTAMP = new Field(TYPE_I64, 1);
+ final Field VALUE = new Field(TYPE_STRING, 2);
+ final Field ENDPOINT = new Field(TYPE_STRUCT, 3);
@Override
- public Annotation read(TProtocol iprot) throws TException {
+ public Annotation read(Buffer buffer) throws EOFException {
Annotation.Builder result = new Annotation.Builder();
- TField field;
- iprot.readStructBegin();
+ Field field;
while (true) {
- field = iprot.readFieldBegin();
- if (field.type == TType.STOP) {
- break;
- }
- switch (field.id) {
- case 1: // TIMESTAMP
- if (field.type == TType.I64) {
- result.timestamp(iprot.readI64());
- } else {
- skip(iprot, field.type);
- }
- break;
- case 2: // VALUE
- if (field.type == TType.STRING) {
- result.value(iprot.readString());
- } else {
- skip(iprot, field.type);
- }
- break;
- case 3: // HOST
- if (field.type == TType.STRUCT) {
- result.endpoint(ENDPOINT_ADAPTER.read(iprot));
- } else {
- skip(iprot, field.type);
- }
- break;
- default:
- skip(iprot, field.type);
+ field = Field.read(buffer);
+ if (field.type == TYPE_STOP) break;
+
+ if (field.equals(TIMESTAMP)) {
+ result.timestamp(buffer.readLong());
+ } else if (field.equals(VALUE)) {
+ result.value(readUtf8(buffer));
+ } else if (field.equals(ENDPOINT)) {
+ result.endpoint(ENDPOINT_ADAPTER.read(buffer));
+ } else {
+ skip(buffer, field.type);
}
- iprot.readFieldEnd();
}
- iprot.readStructEnd();
return result.build();
}
@Override
- public void write(Annotation value, TProtocol oprot) throws TException {
- oprot.writeStructBegin(STRUCT_DESC);
-
- oprot.writeFieldBegin(TIMESTAMP_FIELD_DESC);
- oprot.writeI64(value.timestamp);
- oprot.writeFieldEnd();
+ public void write(Annotation value, Buffer buffer) {
+ TIMESTAMP.write(buffer);
+ buffer.writeLong(value.timestamp);
if (value.value != null) {
- oprot.writeFieldBegin(VALUE_FIELD_DESC);
- oprot.writeString(value.value);
- oprot.writeFieldEnd();
+ VALUE.write(buffer);
+ writeUtf8(buffer, value.value);
}
if (value.endpoint != null) {
- oprot.writeFieldBegin(HOST_FIELD_DESC);
- ENDPOINT_ADAPTER.write(value.endpoint, oprot);
- oprot.writeFieldEnd();
+ ENDPOINT.write(buffer);
+ ENDPOINT_ADAPTER.write(value.endpoint, buffer);
}
-
- oprot.writeFieldStop();
- oprot.writeStructEnd();
+ buffer.writeByte(TYPE_STOP);
}
};
static final ThriftAdapter BINARY_ANNOTATION_ADAPTER = new ThriftAdapter() {
- private final TStruct STRUCT_DESC = new TStruct("BinaryAnnotation");
- private final TField KEY_FIELD_DESC = new TField("key", TType.STRING, (short) 1);
- private final TField VALUE_FIELD_DESC = new TField("value", TType.STRING, (short) 2);
- private final TField ANNOTATION_TYPE_FIELD_DESC = new TField("annotation_type", TType.I32, (short) 3);
- private final TField HOST_FIELD_DESC = new TField("host", TType.STRUCT, (short) 4);
+ final Field KEY = new Field(TYPE_STRING, 1);
+ final Field VALUE = new Field(TYPE_STRING, 2);
+ final Field TYPE = new Field(TYPE_I32, 3);
+ final Field ENDPOINT = new Field(TYPE_STRUCT, 4);
@Override
- public BinaryAnnotation read(TProtocol iprot) throws TException {
+ public BinaryAnnotation read(Buffer buffer) throws EOFException {
BinaryAnnotation.Builder result = new BinaryAnnotation.Builder();
- TField field;
- iprot.readStructBegin();
+ Field field;
+
while (true) {
- field = iprot.readFieldBegin();
- if (field.type == TType.STOP) {
- break;
+ field = Field.read(buffer);
+ if (field.type == TYPE_STOP) break;
+
+ if (field.equals(KEY)) {
+ result.key(readUtf8(buffer));
+ } else if (field.equals(VALUE)) {
+ result.value(readBytes(buffer));
+ } else if (field.equals(TYPE)) {
+ result.type(BinaryAnnotation.Type.fromValue(buffer.readInt()));
+ } else if (field.equals(ENDPOINT)) {
+ result.endpoint(ENDPOINT_ADAPTER.read(buffer));
+ } else {
+ skip(buffer, field.type);
}
- switch (field.id) {
- case 1: // KEY
- if (field.type == TType.STRING) {
- result.key(iprot.readString());
- } else {
- skip(iprot, field.type);
- }
- break;
- case 2: // VALUE
- if (field.type == TType.STRING) {
- ByteBuffer buffer = iprot.readBinary();
- byte[] value = new byte[buffer.remaining()];
- buffer.get(value);
- result.value(value);
- } else {
- skip(iprot, field.type);
- }
- break;
- case 3: // ANNOTATION_TYPE
- if (field.type == TType.I32) {
- result.type(BinaryAnnotation.Type.fromValue(iprot.readI32()));
- } else {
- skip(iprot, field.type);
- }
- break;
- case 4: // HOST
- if (field.type == TType.STRUCT) {
- result.endpoint(ENDPOINT_ADAPTER.read(iprot));
- } else {
- skip(iprot, field.type);
- }
- break;
- default:
- skip(iprot, field.type);
- }
- iprot.readFieldEnd();
}
- iprot.readStructEnd();
return result.build();
}
@Override
- public void write(BinaryAnnotation value, TProtocol oprot) throws TException {
- oprot.writeStructBegin(STRUCT_DESC);
-
- oprot.writeFieldBegin(KEY_FIELD_DESC);
- oprot.writeString(value.key);
- oprot.writeFieldEnd();
+ public void write(BinaryAnnotation value, Buffer buffer) {
+ KEY.write(buffer);
+ writeUtf8(buffer, value.key);
- oprot.writeFieldBegin(VALUE_FIELD_DESC);
- oprot.writeBinary(ByteBuffer.wrap(value.value));
- oprot.writeFieldEnd();
+ VALUE.write(buffer);
+ buffer.writeInt(value.value.length);
+ buffer.write(value.value);
- oprot.writeFieldBegin(ANNOTATION_TYPE_FIELD_DESC);
- oprot.writeI32(value.type.value);
- oprot.writeFieldEnd();
+ TYPE.write(buffer);
+ buffer.writeInt(value.type.value);
if (value.endpoint != null) {
- oprot.writeFieldBegin(HOST_FIELD_DESC);
- ENDPOINT_ADAPTER.write(value.endpoint, oprot);
- oprot.writeFieldEnd();
+ ENDPOINT.write(buffer);
+ ENDPOINT_ADAPTER.write(value.endpoint, buffer);
}
- oprot.writeFieldStop();
- oprot.writeStructEnd();
+ buffer.writeByte(TYPE_STOP);
}
};
+ static final ThriftAdapter> ANNOTATIONS_ADAPTER = new ListAdapter<>(ANNOTATION_ADAPTER);
+ static final ThriftAdapter> BINARY_ANNOTATIONS_ADAPTER = new ListAdapter<>(BINARY_ANNOTATION_ADAPTER);
+
static final ThriftAdapter SPAN_ADAPTER = new ThriftAdapter() {
- private final TStruct STRUCT_DESC = new TStruct("Span");
- private final TField TRACE_ID_FIELD_DESC = new TField("trace_id", TType.I64, (short) 1);
- private final TField NAME_FIELD_DESC = new TField("name", TType.STRING, (short) 3);
- private final TField ID_FIELD_DESC = new TField("id", TType.I64, (short) 4);
- private final TField PARENT_ID_FIELD_DESC = new TField("parent_id", TType.I64, (short) 5);
- private final TField ANNOTATIONS_FIELD_DESC = new TField("annotations", TType.LIST, (short) 6);
- private final TField BINARY_ANNOTATIONS_FIELD_DESC = new TField("binary_annotations", TType.LIST, (short) 8);
- private final TField DEBUG_FIELD_DESC = new TField("debug", TType.BOOL, (short) 9);
- private final TField TIMESTAMP_FIELD_DESC = new TField("timestamp", TType.I64, (short) 10);
- private final TField DURATION_FIELD_DESC = new TField("duration", TType.I64, (short) 11);
+ final Field TRACE_ID = new Field(TYPE_I64, 1);
+ final Field NAME = new Field(TYPE_STRING, 3);
+ final Field ID = new Field(TYPE_I64, 4);
+ final Field PARENT_ID = new Field(TYPE_I64, 5);
+ final Field ANNOTATIONS = new Field(TYPE_LIST, 6);
+ final Field BINARY_ANNOTATIONS = new Field(TYPE_LIST, 8);
+ final Field DEBUG = new Field(TYPE_BOOL, 9);
+ final Field TIMESTAMP = new Field(TYPE_I64, 10);
+ final Field DURATION = new Field(TYPE_I64, 11);
@Override
- public Span read(TProtocol iprot) throws TException {
+ public Span read(Buffer buffer) throws EOFException {
Span.Builder result = new Span.Builder();
- TField field;
- iprot.readStructBegin();
+ Field field;
+
while (true) {
- field = iprot.readFieldBegin();
- if (field.type == TType.STOP) {
- break;
- }
- switch (field.id) {
- case 1: // TRACE_ID
- if (field.type == TType.I64) {
- result.traceId(iprot.readI64());
- } else {
- skip(iprot, field.type);
- }
- break;
- case 3: // NAME
- if (field.type == TType.STRING) {
- result.name(iprot.readString());
- } else {
- skip(iprot, field.type);
- }
- break;
- case 4: // ID
- if (field.type == TType.I64) {
- result.id(iprot.readI64());
- } else {
- skip(iprot, field.type);
- }
- break;
- case 5: // PARENT_ID
- if (field.type == TType.I64) {
- result.parentId(iprot.readI64());
- } else {
- skip(iprot, field.type);
- }
- break;
- case 6: // ANNOTATIONS
- if (field.type == TType.LIST) {
- TList annotations = iprot.readListBegin();
- for (int i = 0; i < annotations.size; i++) {
- result.addAnnotation(ANNOTATION_ADAPTER.read(iprot));
- }
- iprot.readListEnd();
- } else {
- skip(iprot, field.type);
- }
- break;
- case 8: // BINARY_ANNOTATIONS
- if (field.type == TType.LIST) {
- TList binaryAnnotations = iprot.readListBegin();
- for (int i = 0; i < binaryAnnotations.size; i++) {
- result.addBinaryAnnotation(BINARY_ANNOTATION_ADAPTER.read(iprot));
- }
- iprot.readListEnd();
- } else {
- skip(iprot, field.type);
- }
- break;
- case 9: // DEBUG
- if (field.type == TType.BOOL) {
- result.debug(iprot.readBool());
- } else {
- skip(iprot, field.type);
- }
- break;
- case 10: // TIMESTAMP
- if (field.type == TType.I64) {
- result.timestamp(iprot.readI64());
- } else {
- skip(iprot, field.type);
- }
- break;
- case 11: // DURATION
- if (field.type == TType.I64) {
- result.duration(iprot.readI64());
- } else {
- skip(iprot, field.type);
- }
- break;
- default:
- skip(iprot, field.type);
+ field = Field.read(buffer);
+ if (field.type == TYPE_STOP) break;
+
+ if (field.equals(TRACE_ID)) {
+ result.traceId(buffer.readLong());
+ } else if (field.equals(NAME)) {
+ result.name(readUtf8(buffer));
+ } else if (field.equals(ID)) {
+ result.id(buffer.readLong());
+ } else if (field.equals(PARENT_ID)) {
+ result.parentId(buffer.readLong());
+ } else if (field.equals(ANNOTATIONS)) {
+ result.annotations(ANNOTATIONS_ADAPTER.read(buffer));
+ } else if (field.equals(BINARY_ANNOTATIONS)) {
+ result.binaryAnnotations(BINARY_ANNOTATIONS_ADAPTER.read(buffer));
+ } else if (field.equals(DEBUG)) {
+ result.debug(buffer.readByte() == 1);
+ } else if (field.equals(TIMESTAMP)) {
+ result.timestamp(buffer.readLong());
+ } else if (field.equals(DURATION)) {
+ result.duration(buffer.readLong());
+ } else {
+ skip(buffer, field.type);
}
- iprot.readFieldEnd();
}
- iprot.readStructEnd();
+
return result.build();
}
@Override
- public void write(Span value, TProtocol oprot) throws TException {
- oprot.writeStructBegin(STRUCT_DESC);
+ public void write(Span value, Buffer buffer) {
- oprot.writeFieldBegin(TRACE_ID_FIELD_DESC);
- oprot.writeI64(value.traceId);
- oprot.writeFieldEnd();
+ TRACE_ID.write(buffer);
+ buffer.writeLong(value.traceId);
- oprot.writeFieldBegin(NAME_FIELD_DESC);
- oprot.writeString(value.name);
- oprot.writeFieldEnd();
+ NAME.write(buffer);
+ writeUtf8(buffer, value.name);
- oprot.writeFieldBegin(ID_FIELD_DESC);
- oprot.writeI64(value.id);
- oprot.writeFieldEnd();
+ ID.write(buffer);
+ buffer.writeLong(value.id);
if (value.parentId != null) {
- oprot.writeFieldBegin(PARENT_ID_FIELD_DESC);
- oprot.writeI64(value.parentId);
- oprot.writeFieldEnd();
+ PARENT_ID.write(buffer);
+ buffer.writeLong(value.parentId);
}
- oprot.writeFieldBegin(ANNOTATIONS_FIELD_DESC);
- oprot.writeListBegin(new TList(TType.STRUCT, value.annotations.size()));
- for (int i = 0, length = value.annotations.size(); i < length; i++) {
- ANNOTATION_ADAPTER.write(value.annotations.get(i), oprot);
- }
- oprot.writeListEnd();
- oprot.writeFieldEnd();
+ ANNOTATIONS.write(buffer);
+ ANNOTATIONS_ADAPTER.write(value.annotations, buffer);
- oprot.writeFieldBegin(BINARY_ANNOTATIONS_FIELD_DESC);
- oprot.writeListBegin(new TList(TType.STRUCT, value.binaryAnnotations.size()));
- for (int i = 0, length = value.binaryAnnotations.size(); i < length; i++) {
- BINARY_ANNOTATION_ADAPTER.write(value.binaryAnnotations.get(i), oprot);
- }
- oprot.writeListEnd();
- oprot.writeFieldEnd();
+ BINARY_ANNOTATIONS.write(buffer);
+ BINARY_ANNOTATIONS_ADAPTER.write(value.binaryAnnotations, buffer);
if (value.debug != null) {
- oprot.writeFieldBegin(DEBUG_FIELD_DESC);
- oprot.writeBool(value.debug);
- oprot.writeFieldEnd();
+ DEBUG.write(buffer);
+ buffer.writeByte(value.debug ? 1 : 0);
}
if (value.timestamp != null) {
- oprot.writeFieldBegin(TIMESTAMP_FIELD_DESC);
- oprot.writeI64(value.timestamp);
- oprot.writeFieldEnd();
+ TIMESTAMP.write(buffer);
+ buffer.writeLong(value.timestamp);
}
if (value.duration != null) {
- oprot.writeFieldBegin(DURATION_FIELD_DESC);
- oprot.writeI64(value.duration);
- oprot.writeFieldEnd();
+ DURATION.write(buffer);
+ buffer.writeLong(value.duration);
}
- oprot.writeFieldStop();
- oprot.writeStructEnd();
+ buffer.writeByte(TYPE_STOP);
}
@Override
@@ -488,70 +334,45 @@ public String toString() {
static final ThriftAdapter DEPENDENCY_LINK_ADAPTER = new ThriftAdapter() {
- private final TStruct STRUCT_DESC = new TStruct("DependencyLink");
- private final TField PARENT_FIELD_DESC = new TField("parent", TType.STRING, (short) 1);
- private final TField CHILD_FIELD_DESC = new TField("child", TType.STRING, (short) 2);
- private final TField CALL_COUNT_FIELD_DESC = new TField("call_count", TType.I64, (short) 4);
+ final Field PARENT = new Field(TYPE_STRING, 1);
+ final Field CHILD = new Field(TYPE_STRING, 2);
+ final Field CALL_COUNT = new Field(TYPE_I64, 4);
@Override
- public DependencyLink read(TProtocol iprot) throws TException {
+ public DependencyLink read(Buffer buffer) throws EOFException {
DependencyLink.Builder result = new DependencyLink.Builder();
- TField field;
- iprot.readStructBegin();
+ Field field;
+
while (true) {
- field = iprot.readFieldBegin();
- if (field.type == TType.STOP) {
- break;
+ field = Field.read(buffer);
+ if (field.type == TYPE_STOP) break;
+
+ if (field.equals(PARENT)) {
+ result.parent(readUtf8(buffer));
+ } else if (field.equals(CHILD)) {
+ result.child(readUtf8(buffer));
+ } else if (field.equals(CALL_COUNT)) {
+ result.callCount(buffer.readLong());
+ } else {
+ skip(buffer, field.type);
}
- switch (field.id) {
- case 1: // PARENT
- if (field.type == TType.STRING) {
- result.parent(iprot.readString());
- } else {
- skip(iprot, field.type);
- }
- break;
- case 2: // CHILD
- if (field.type == TType.STRING) {
- result.child(iprot.readString());
- } else {
- skip(iprot, field.type);
- }
- break;
- case 4: // CALL_COUNT
- if (field.type == TType.I64) {
- result.callCount(iprot.readI64());
- } else {
- skip(iprot, field.type);
- }
- break;
- default:
- skip(iprot, field.type);
- }
- iprot.readFieldEnd();
}
- iprot.readStructEnd();
+
return result.build();
}
@Override
- public void write(DependencyLink value, TProtocol oprot) throws TException {
- oprot.writeStructBegin(STRUCT_DESC);
-
- oprot.writeFieldBegin(PARENT_FIELD_DESC);
- oprot.writeString(value.parent);
- oprot.writeFieldEnd();
+ public void write(DependencyLink value, Buffer buffer) {
+ PARENT.write(buffer);
+ writeUtf8(buffer, value.parent);
- oprot.writeFieldBegin(CHILD_FIELD_DESC);
- oprot.writeString(value.child);
- oprot.writeFieldEnd();
+ CHILD.write(buffer);
+ writeUtf8(buffer, value.child);
- oprot.writeFieldBegin(CALL_COUNT_FIELD_DESC);
- oprot.writeI64(value.callCount);
- oprot.writeFieldEnd();
+ CALL_COUNT.write(buffer);
+ buffer.writeLong(value.callCount);
- oprot.writeFieldStop();
- oprot.writeStructEnd();
+ buffer.writeByte(TYPE_STOP);
}
@Override
@@ -560,8 +381,7 @@ public String toString() {
}
};
- static final ThriftAdapter> DEPENDENCY_LINKS_ADAPTER =
- new ListAdapter<>(DEPENDENCY_LINK_ADAPTER);
+ static final ThriftAdapter> DEPENDENCY_LINKS_ADAPTER = new ListAdapter<>(DEPENDENCY_LINK_ADAPTER);
@Override
public List readDependencyLinks(byte[] bytes) {
@@ -573,64 +393,58 @@ public byte[] writeDependencyLinks(List value) {
return write(DEPENDENCY_LINKS_ADAPTER, value);
}
- private static T read(ThriftReader reader, byte[] bytes) {
+ static T read(ThriftReader reader, byte[] bytes) {
checkArgument(bytes.length > 0, "Empty input reading %s", reader);
try {
- return reader.read(new TBinaryProtocol(new TMemoryInputTransport(bytes)));
- } catch (TException | RuntimeException e) {
+ return reader.read(new Buffer().write(bytes));
+ } catch (EOFException | RuntimeException e) {
throw exceptionReading(reader.toString(), bytes, e);
}
}
/** Inability to encode is a programming bug. */
- private static byte[] write(ThriftWriter writer, T value) {
- BufferTransport transport = new BufferTransport();
- TBinaryProtocol protocol = new TBinaryProtocol(transport);
+ static byte[] write(ThriftWriter writer, T value) {
+ Buffer buffer = new Buffer();
try {
- writer.write(value, protocol);
- } catch (TException | RuntimeException e) {
+ writer.write(value, buffer);
+ } catch (RuntimeException e) {
throw new AssertionError("Could not write " + value + " as TBinary", e);
}
- return transport.buffer.readByteArray();
+ return buffer.readByteArray();
}
- static List readList(ThriftReader reader, TProtocol iprot) throws TException {
- TList peekLength = iprot.readListBegin();
- if (peekLength.size > 10000) { // don't allocate massive arrays
- throw new TException(peekLength.size + " > 10000: possibly malformed thrift");
+ static List readList(ThriftReader reader, Buffer buffer) throws EOFException {
+ byte ignoredType = buffer.readByte();
+ int length = guardLength(buffer, CONTAINER_LENGTH_LIMIT);
+ List result = new ArrayList<>(length);
+ for (int i = 0; i < length; i++) {
+ result.add(reader.read(buffer));
}
- List result = new ArrayList<>(peekLength.size);
- for (int i = 0; i < peekLength.size; i++) {
- result.add(reader.read(iprot));
- }
- iprot.readListEnd();
return result;
}
- static void writeList(ThriftWriter writer, List value, TProtocol oprot)
- throws TException {
- oprot.writeListBegin(new TList(TType.STRUCT, value.size()));
+ static void writeList(ThriftWriter writer, List value, Buffer buffer) {
+ writeListBegin(buffer, value.size());
for (int i = 0, length = value.size(); i < length; i++) {
- writer.write(value.get(i), oprot);
+ writer.write(value.get(i), buffer);
}
- oprot.writeListEnd();
}
- private static final class ListAdapter implements ThriftAdapter> {
- private final ThriftAdapter adapter;
+ static final class ListAdapter implements ThriftAdapter> {
+ final ThriftAdapter adapter;
ListAdapter(ThriftAdapter adapter) {
this.adapter = adapter;
}
@Override
- public List read(TProtocol iprot) throws TException {
- return readList(adapter, iprot);
+ public List read(Buffer buffer) throws EOFException {
+ return readList(adapter, buffer);
}
@Override
- public void write(List value, TProtocol oprot) throws TException {
- writeList(adapter, value, oprot);
+ public void write(List value, Buffer buffer) {
+ writeList(adapter, value, buffer);
}
@Override
@@ -639,38 +453,114 @@ public String toString() {
}
}
- static final class BufferTransport extends TTransport {
- final Buffer buffer = new Buffer();
+ static IllegalArgumentException exceptionReading(String type, byte[] bytes, Exception e) {
+ String cause = e.getMessage() == null ? "Error" : e.getMessage();
+ if (e instanceof EOFException) cause = "EOF";
+ if (e instanceof IllegalStateException) cause = "Malformed";
+ String message =
+ String.format("%s reading %s from TBinary: %s", cause, type, ByteString.of(bytes).base64());
+ throw new IllegalArgumentException(message, e);
+ }
+
+ static final class Field {
+ final byte type;
+ final int id;
- @Override
- public boolean isOpen() {
- return true;
+ Field(byte type, int id) {
+ this.type = type;
+ this.id = id;
}
- @Override
- public void open() {
+ void write(Buffer buffer) {
+ buffer.writeByte(type);
+ buffer.writeShort(id);
}
- @Override
- public void close() {
+ static Field read(Buffer buffer) {
+ byte type = buffer.readByte();
+ return new Field(type, type == TYPE_STOP ? TYPE_STOP : buffer.readShort());
}
- @Override
- public int read(byte[] buf, int off, int len) {
- return buffer.read(buf, off, len);
+ boolean equals(Field that) {
+ return this.type == that.type && this.id == that.id;
}
+ }
- @Override
- public void write(byte[] buf, int off, int len) {
- buffer.write(buf, off, len);
+ static void skip(Buffer buffer, byte type) throws EOFException {
+ skip(buffer, type, MAX_SKIP_DEPTH);
+ }
+
+ static void skip(Buffer buffer, byte type, int maxDepth) throws EOFException {
+ if (maxDepth <= 0) throw new EOFException("Maximum skip depth exceeded");
+ switch (type) {
+ case TYPE_BOOL:
+ case TYPE_BYTE:
+ buffer.skip(1);
+ break;
+ case TYPE_I16:
+ buffer.skip(2);
+ break;
+ case TYPE_I32:
+ buffer.skip(4);
+ break;
+ case TYPE_DOUBLE:
+ case TYPE_I64:
+ buffer.skip(8);
+ break;
+ case TYPE_STRING:
+ int size = guardLength(buffer, STRING_LENGTH_LIMIT);
+ buffer.skip(size);
+ break;
+ case TYPE_STRUCT:
+ while (true) {
+ Field field = Field.read(buffer);
+ if (field.type == TYPE_STOP) return;
+ skip(buffer, field.type, maxDepth - 1);
+ }
+ case TYPE_MAP:
+ byte keyType = buffer.readByte();
+ byte valueType = buffer.readByte();
+ for (int i = 0, length = guardLength(buffer, CONTAINER_LENGTH_LIMIT); i < length; i++) {
+ skip(buffer, keyType, maxDepth - 1);
+ skip(buffer, valueType, maxDepth - 1);
+ }
+ break;
+ case TYPE_SET:
+ case TYPE_LIST:
+ byte elemType = buffer.readByte();
+ for (int i = 0, length = guardLength(buffer, CONTAINER_LENGTH_LIMIT); i < length; i++) {
+ skip(buffer, elemType, maxDepth - 1);
+ }
+ break;
+ default: // types that don't need explicit skipping
+ break;
}
}
- static IllegalArgumentException exceptionReading(String type, byte[] bytes, Exception e) {
- String cause = e.getMessage() == null ? "Error" : e.getMessage();
- if (e instanceof TTransportException || cause.indexOf("malformed") != -1) cause = "Malformed";
- String message =
- String.format("%s reading %s from TBinary: %s", cause, type, ByteString.of(bytes).base64());
- throw new IllegalArgumentException(message, e);
+ static byte[] readBytes(Buffer buffer) throws EOFException {
+ return buffer.readByteArray(guardLength(buffer, STRING_LENGTH_LIMIT));
+ }
+
+ static String readUtf8(Buffer buffer) throws EOFException {
+ return buffer.readUtf8(guardLength(buffer, STRING_LENGTH_LIMIT));
+ }
+
+ static int guardLength(Buffer buffer, int limit) {
+ int length = buffer.readInt();
+ if (length > limit) { // don't allocate massive arrays
+ throw new IllegalStateException(length + " > " + limit + ": possibly malformed thrift");
+ }
+ return length;
+ }
+
+ static void writeListBegin(Buffer buffer, int size) {
+ buffer.writeByte(TYPE_STRUCT);
+ buffer.writeInt(size);
+ }
+
+ static void writeUtf8(Buffer buffer, String string) {
+ Buffer temp = new Buffer().writeUtf8(string);
+ buffer.writeInt((int) temp.size());
+ buffer.write(temp, temp.size());
}
}
diff --git a/zipkin/src/test/java/zipkin/SpanStoreTest.java b/zipkin/src/test/java/zipkin/SpanStoreTest.java
index a8acbbceaeb..c4fc3aadc41 100644
--- a/zipkin/src/test/java/zipkin/SpanStoreTest.java
+++ b/zipkin/src/test/java/zipkin/SpanStoreTest.java
@@ -25,6 +25,7 @@
import org.junit.Test;
import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
import static org.assertj.core.api.Assertions.assertThat;
import static zipkin.Constants.LOCAL_COMPONENT;
@@ -74,39 +75,39 @@ private static long midnight(){
.name("methodcall")
.id(spanId)
.timestamp(ann1.timestamp).duration(9000L)
- .annotations(ann1, ann3)
- .binaryAnnotations(BinaryAnnotation.create("BAH", "BEH", ep)).build();
+ .annotations(asList(ann1, ann3))
+ .addBinaryAnnotation(BinaryAnnotation.create("BAH", "BEH", ep)).build();
Span span2 = new Span.Builder()
.traceId(456)
.name("methodcall")
.id(spanId)
.timestamp(ann2.timestamp)
- .annotations(ann2)
- .binaryAnnotations(BinaryAnnotation.create("BAH2", "BEH2", ep)).build();
+ .addAnnotation(ann2)
+ .addBinaryAnnotation(BinaryAnnotation.create("BAH2", "BEH2", ep)).build();
Span span3 = new Span.Builder()
.traceId(789)
.name("methodcall")
.id(spanId)
.timestamp(ann2.timestamp).duration(18000L)
- .annotations(ann2, ann3, ann4)
- .binaryAnnotations(BinaryAnnotation.create("BAH2", "BEH2", ep)).build();
+ .annotations(asList(ann2, ann3, ann4))
+ .addBinaryAnnotation(BinaryAnnotation.create("BAH2", "BEH2", ep)).build();
Span span4 = new Span.Builder()
.traceId(999)
.name("methodcall")
.id(spanId)
.timestamp(ann6.timestamp).duration(1000L)
- .annotations(ann6, ann7).build();
+ .annotations(asList(ann6, ann7)).build();
Span span5 = new Span.Builder()
.traceId(999)
.name("methodcall")
.id(spanId)
.timestamp(ann5.timestamp).duration(3000L)
- .annotations(ann5, ann8)
- .binaryAnnotations(BinaryAnnotation.create("BAH2", "BEH2", ep)).build();
+ .annotations(asList(ann5, ann8))
+ .addBinaryAnnotation(BinaryAnnotation.create("BAH2", "BEH2", ep)).build();
Span spanEmptySpanName = new Span.Builder()
.traceId(123)
@@ -114,7 +115,7 @@ private static long midnight(){
.id(spanId)
.parentId(1L)
.timestamp(ann1.timestamp).duration(1000L)
- .annotations(ann1, ann2).build();
+ .annotations(asList(ann1, ann2)).build();
Span spanEmptyServiceName = new Span.Builder()
.traceId(123)
@@ -139,7 +140,7 @@ public void getSpansByTraceIds() {
*/
@Test
public void tracesRetrieveInOrderDesc() {
- store.accept(iterator(span2, new Span.Builder(span1).annotations(ann3, ann1).build()));
+ store.accept(iterator(span2, new Span.Builder(span1).annotations(asList(ann3, ann1)).build()));
assertThat(store.getTracesByIds(asList(span2.traceId, span1.traceId)))
.containsOnly(asList(span2), asList(span1));
@@ -170,7 +171,7 @@ public void getSpanNames() {
@Test
public void getAllServiceNames() {
BinaryAnnotation yak = BinaryAnnotation.address("sa", Endpoint.create("yak", 127 << 24 | 1, 8080));
- store.accept(iterator(new Span.Builder(span1).binaryAnnotations(yak).build(), span4));
+ store.accept(iterator(new Span.Builder(span1).addBinaryAnnotation(yak).build(), span4));
// should be in order
assertThat(store.getServiceNames()).containsExactly("service", "yak");
@@ -221,7 +222,7 @@ public void getTraces_spanName() {
public void getTraces_serviceNameInBinaryAnnotation() {
List localTrace = asList(new Span.Builder().traceId(1L).name("targz").id(1L)
.timestamp(today * 1000 + 100L).duration(200L)
- .binaryAnnotations(BinaryAnnotation.create(LOCAL_COMPONENT, "archiver", ep)).build());
+ .addBinaryAnnotation(BinaryAnnotation.create(LOCAL_COMPONENT, "archiver", ep)).build());
store.accept(localTrace.iterator());
@@ -242,19 +243,19 @@ public void getTraces_duration() {
BinaryAnnotation archiver3 = component.endpoint(service3).build();
Span targz = new Span.Builder().traceId(1L).id(1L)
- .name("targz").timestamp(today * 1000 + 100L).duration(200L).binaryAnnotations(archiver1).build();
+ .name("targz").timestamp(today * 1000 + 100L).duration(200L).addBinaryAnnotation(archiver1).build();
Span tar = new Span.Builder().traceId(1L).id(2L).parentId(1L)
- .name("tar").timestamp(today * 1000 + 200L).duration(150L).binaryAnnotations(archiver2).build();
+ .name("tar").timestamp(today * 1000 + 200L).duration(150L).addBinaryAnnotation(archiver2).build();
Span gz = new Span.Builder().traceId(1L).id(3L).parentId(1L)
- .name("gz").timestamp(today * 1000 + 250L).duration(50L).binaryAnnotations(archiver3).build();
+ .name("gz").timestamp(today * 1000 + 250L).duration(50L).addBinaryAnnotation(archiver3).build();
Span zip = new Span.Builder().traceId(3L).id(3L)
- .name("zip").timestamp(today * 1000 + 130L).duration(50L).binaryAnnotations(archiver2).build();
+ .name("zip").timestamp(today * 1000 + 130L).duration(50L).addBinaryAnnotation(archiver2).build();
List trace1 = asList(targz, tar, gz);
List trace2 = asList(
- new Span.Builder(targz).traceId(2L).timestamp(today * 1000 + 110L).binaryAnnotations(archiver3).build(),
- new Span.Builder(tar).traceId(2L).timestamp(today * 1000 + 210L).binaryAnnotations(archiver2).build(),
- new Span.Builder(gz).traceId(2L).timestamp(today * 1000 + 260L).binaryAnnotations(archiver1).build());
+ new Span.Builder(targz).traceId(2L).timestamp(today * 1000 + 110L).binaryAnnotations(asList(archiver3)).build(),
+ new Span.Builder(tar).traceId(2L).timestamp(today * 1000 + 210L).binaryAnnotations(asList(archiver2)).build(),
+ new Span.Builder(gz).traceId(2L).timestamp(today * 1000 + 260L).binaryAnnotations(asList(archiver1)).build());
List trace3 = asList(zip);
store.accept(trace1.iterator());
@@ -292,13 +293,13 @@ public void getTraces_duration() {
@Test
public void getTraces_absentWhenNoTimestamp() {
// store the binary annotations
- store.accept(iterator(new Span.Builder(span1).timestamp(null).duration(null).annotations().build()));
+ store.accept(iterator(new Span.Builder(span1).timestamp(null).duration(null).annotations(emptyList()).build()));
assertThat(store.getTraces(new QueryRequest.Builder("service").build())).isEmpty();
assertThat(store.getTraces(new QueryRequest.Builder("service").serviceName("methodcall").build())).isEmpty();
// now store the timestamped annotations
- store.accept(iterator(new Span.Builder(span1).binaryAnnotations().build()));
+ store.accept(iterator(new Span.Builder(span1).binaryAnnotations(emptyList()).build()));
assertThat(store.getTraces(new QueryRequest.Builder("service").build()))
.containsExactly(asList(span1));
@@ -323,20 +324,22 @@ public void getTraces_annotation() {
public void getTraces_multipleAnnotationsBecomeAndFilter() {
Span foo = new Span.Builder().traceId(1).name("call1").id(1)
.timestamp((today + 1) * 1000)
- .annotations(Annotation.create((today + 1) * 1000, "foo", ep)).build();
+ .addAnnotation(Annotation.create((today + 1) * 1000, "foo", ep)).build();
// would be foo bar, except lexicographically bar precedes foo
Span barAndFoo = new Span.Builder().traceId(2).name("call2").id(2)
.timestamp((today + 2) * 1000)
- .annotations(Annotation.create((today + 2) * 1000, "bar", ep), Annotation.create((today + 2) * 1000, "foo", ep)).build();
+ .addAnnotation(Annotation.create((today + 2) * 1000, "bar", ep))
+ .addAnnotation(Annotation.create((today + 2) * 1000, "foo", ep)).build();
Span fooAndBazAndQux = new Span.Builder().traceId(3).name("call3").id(3)
.timestamp((today + 3) * 1000)
- .annotations(Annotation.create((today + 3) * 1000, "foo", ep))
- .binaryAnnotations(BinaryAnnotation.create("baz", "qux", ep))
+ .addAnnotation(Annotation.create((today + 3) * 1000, "foo", ep))
+ .addBinaryAnnotation(BinaryAnnotation.create("baz", "qux", ep))
.build();
Span barAndFooAndBazAndQux = new Span.Builder().traceId(4).name("call4").id(4)
.timestamp((today + 4) * 1000)
- .annotations(Annotation.create((today + 4) * 1000, "bar", ep), Annotation.create((today + 4) * 1000, "foo", ep))
- .binaryAnnotations(BinaryAnnotation.create("baz", "qux", ep))
+ .addAnnotation(Annotation.create((today + 4) * 1000, "bar", ep))
+ .addAnnotation(Annotation.create((today + 4) * 1000, "foo", ep))
+ .addBinaryAnnotation(BinaryAnnotation.create("baz", "qux", ep))
.build();
store.accept(iterator(foo, barAndFoo, fooAndBazAndQux, barAndFooAndBazAndQux));
@@ -365,8 +368,8 @@ public void getTraces_mergesSpans() {
Span merged = new Span.Builder(span4)
.timestamp(mergedAnnotations.first().timestamp)
.duration(mergedAnnotations.last().timestamp - mergedAnnotations.first().timestamp)
- .annotations(mergedAnnotations.toArray(new Annotation[0]))
- .binaryAnnotations(span5.binaryAnnotations.toArray(new BinaryAnnotation[0])).build();
+ .annotations(mergedAnnotations)
+ .binaryAnnotations(span5.binaryAnnotations).build();
assertThat(store.getTraces(new QueryRequest.Builder("service").build()))
.containsExactly(asList(merged), asList(span1));
@@ -487,7 +490,7 @@ public void correctsClockSkew() {
.id(778)
.parentId(666L)
.timestamp((today + 101) * 1000).duration(50L)
- .binaryAnnotations(BinaryAnnotation.create(LOCAL_COMPONENT, "framey", frontend)).build();
+ .addBinaryAnnotation(BinaryAnnotation.create(LOCAL_COMPONENT, "framey", frontend)).build();
List skewed = asList(parent, remoteChild, localChild);