From f5a93eae1494e62f2ad787f679fcce74afd7ab2c Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Thu, 25 Feb 2016 12:55:30 +0800 Subject: [PATCH] Removes internal use of libthrift This implements TBinaryProtocol directly in the ThriftCodec. By doing the code, dependencies, and exceptions are simpler (as we are decoding via a buffer not a network). This uses Okio Buffer, which also simplified some code. The payment was having to implement a couple utilities from libthrift, namely the skip logic. Overall, the code is smaller and the binary shrunk from 275k to 225k. --- interop/pom.xml | 16 +- .../zipkin/interop/ScalaSpanStoreAdapter.java | 39 +- pom.xml | 13 - zipkin/pom.xml | 10 +- zipkin/src/main/java/zipkin/Span.java | 8 +- .../zipkin/internal/CorrectForClockSkew.java | 16 +- .../java/zipkin/internal/ThriftCodec.java | 736 ++++++++---------- .../src/test/java/zipkin/SpanStoreTest.java | 65 +- 8 files changed, 398 insertions(+), 505 deletions(-) diff --git a/interop/pom.xml b/interop/pom.xml index 5c31efcb004..d5c1de05959 100644 --- a/interop/pom.xml +++ b/interop/pom.xml @@ -29,7 +29,9 @@ ${project.basedir}/.. - 1.33.2 + + 0.9.3 + 1.34.0 2.2.5 @@ -51,6 +53,18 @@ ${zipkin-scala.version} + + io.zipkin + zipkin-scrooge + ${zipkin-scala.version} + + + + org.apache.thrift + libthrift + ${libthrift.version} + + ${project.groupId} spanstore-jdbc diff --git a/interop/src/main/java/zipkin/interop/ScalaSpanStoreAdapter.java b/interop/src/main/java/zipkin/interop/ScalaSpanStoreAdapter.java index 2d1cb4dfe4f..7713120880e 100644 --- a/interop/src/main/java/zipkin/interop/ScalaSpanStoreAdapter.java +++ b/interop/src/main/java/zipkin/interop/ScalaSpanStoreAdapter.java @@ -13,17 +13,17 @@ */ package zipkin.interop; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.twitter.util.Future; import com.twitter.zipkin.common.Span; -import com.twitter.zipkin.json.JsonSpan; +import com.twitter.zipkin.conversions.thrift$; import com.twitter.zipkin.json.ZipkinJson$; import com.twitter.zipkin.storage.QueryRequest; -import java.io.IOException; import java.util.ArrayList; -import java.util.stream.Collectors; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TList; +import org.apache.thrift.protocol.TType; +import org.apache.thrift.transport.TMemoryBuffer; import scala.Tuple2; import scala.collection.Iterator; import scala.collection.JavaConversions; @@ -40,11 +40,9 @@ * Adapts {@link SpanStore} to a scala {@link com.twitter.zipkin.storage.SpanStore} in order to test * against its {@link com.twitter.zipkin.storage.SpanStoreSpec} for interoperability reasons. * - *

This implementation uses json to ensure structures are compatible. + *

This implementation uses thrift TBinaryProtocol to ensure structures are compatible. */ public final class ScalaSpanStoreAdapter extends com.twitter.zipkin.storage.SpanStore { - private static final ObjectMapper scalaCodec = ZipkinJson$.MODULE$; - private final SpanStore spanStore; public ScalaSpanStoreAdapter(SpanStore spanStore) { @@ -111,12 +109,11 @@ public void close() { @Nullable private static java.util.List convert(java.util.List input) { - byte[] bytes = Codec.JSON.writeSpans(input); + byte[] bytes = Codec.THRIFT.writeSpans(input); try { - TypeReference> ref = new TypeReference>(){}; - java.util.List read = scalaCodec.readValue(bytes, ref); - return read.stream().map(JsonSpan::invert).collect(Collectors.toList()); - } catch (IOException e) { + List read = thrift$.MODULE$.thriftListToSpans(bytes); + return JavaConversions.seqAsJavaList(read); + } catch (RuntimeException e) { e.printStackTrace(); return null; } @@ -125,9 +122,19 @@ private static java.util.List convert(java.util.List input) { @Nullable private static java.util.List invert(Seq input) { try { - byte[] bytes = scalaCodec.writeValueAsBytes(input); - return Codec.JSON.readSpans(bytes); - } catch (JsonProcessingException e) { + TMemoryBuffer transport = new TMemoryBuffer(0); + TBinaryProtocol oproto = new TBinaryProtocol(transport); + oproto.writeListBegin(new TList(TType.STRUCT, input.size())); + Iterator iterator = input.iterator(); + while (iterator.hasNext()) { + com.twitter.zipkin.thriftscala.Span thriftSpan = + thrift$.MODULE$.spanToThriftSpan(iterator.next()).toThrift(); + thriftSpan.write(oproto); + } + oproto.writeListEnd(); + byte[] bytes = transport.getArray(); + return Codec.THRIFT.readSpans(bytes); + } catch (Exception e) { e.printStackTrace(); return null; } diff --git a/pom.xml b/pom.xml index 127fc78aa4f..3bae2b58512 100644 --- a/pom.xml +++ b/pom.xml @@ -42,7 +42,6 @@ 1.1.0 1.6.0 1.3.2.RELEASE - 0.9.3 + maven-shade-plugin @@ -87,10 +83,6 @@ com.squareup.moshi zipkin.internal.moshi - - org.apache.thrift - zipkin.internal.libthrift - diff --git a/zipkin/src/main/java/zipkin/Span.java b/zipkin/src/main/java/zipkin/Span.java index fe21213b78a..de16ca0ec36 100644 --- a/zipkin/src/main/java/zipkin/Span.java +++ b/zipkin/src/main/java/zipkin/Span.java @@ -256,9 +256,9 @@ public Builder duration(@Nullable Long duration) { * * @see Span#annotations */ - public Builder annotations(Annotation... annotations) { + public Builder annotations(Collection annotations) { this.annotations.clear(); - Collections.addAll(this.annotations, annotations); + this.annotations.addAll(annotations); return this; } @@ -273,9 +273,9 @@ public Builder addAnnotation(Annotation annotation) { * * @see Span#binaryAnnotations */ - public Builder binaryAnnotations(BinaryAnnotation... binaryAnnotations) { + public Builder binaryAnnotations(Collection binaryAnnotations) { this.binaryAnnotations.clear(); - Collections.addAll(this.binaryAnnotations, binaryAnnotations); + this.binaryAnnotations.addAll(binaryAnnotations); return this; } diff --git a/zipkin/src/main/java/zipkin/internal/CorrectForClockSkew.java b/zipkin/src/main/java/zipkin/internal/CorrectForClockSkew.java index f8f09da5e13..bffa2f0b483 100644 --- a/zipkin/src/main/java/zipkin/internal/CorrectForClockSkew.java +++ b/zipkin/src/main/java/zipkin/internal/CorrectForClockSkew.java @@ -78,25 +78,25 @@ private static void adjust(Node node, @Nullable ClockSkew skewFromParent) } /** If any annotation has an IP with skew associated, adjust accordingly. */ - private static Span adjustTimestamps(Span span, ClockSkew clockSkew) { - Annotation[] annotations = null; + private static Span adjustTimestamps(Span span, ClockSkew skew) { + List annotations = null; for (int i = 0, length = span.annotations.size(); i < length; i++) { Annotation a = span.annotations.get(i); if (a.endpoint == null) continue; - if (clockSkew.endpoint.ipv4 == a.endpoint.ipv4) { - if (annotations == null) annotations = span.annotations.toArray(new Annotation[length]); - annotations[i] = new Annotation.Builder(a).timestamp(a.timestamp - clockSkew.skew).build(); + if (skew.endpoint.ipv4 == a.endpoint.ipv4) { + if (annotations == null) annotations = new ArrayList<>(span.annotations); + annotations.set(i, new Annotation.Builder(a).timestamp(a.timestamp - skew.skew).build()); } } if (annotations != null) { - return new Span.Builder(span).timestamp(annotations[0].timestamp).annotations(annotations).build(); + return new Span.Builder(span).timestamp(annotations.get(0).timestamp).annotations(annotations).build(); } // Search for a local span on the skewed endpoint for (int i = 0, length = span.binaryAnnotations.size(); i < length; i++) { BinaryAnnotation b = span.binaryAnnotations.get(i); if (b.endpoint == null) continue; - if (b.key.equals(Constants.LOCAL_COMPONENT) && clockSkew.endpoint.ipv4 == b.endpoint.ipv4) { - return new Span.Builder(span).timestamp(span.timestamp - clockSkew.skew).build(); + if (b.key.equals(Constants.LOCAL_COMPONENT) && skew.endpoint.ipv4 == b.endpoint.ipv4) { + return new Span.Builder(span).timestamp(span.timestamp - skew.skew).build(); } } return span; diff --git a/zipkin/src/main/java/zipkin/internal/ThriftCodec.java b/zipkin/src/main/java/zipkin/internal/ThriftCodec.java index 693d38a7f2e..728858130d4 100644 --- a/zipkin/src/main/java/zipkin/internal/ThriftCodec.java +++ b/zipkin/src/main/java/zipkin/internal/ThriftCodec.java @@ -13,21 +13,11 @@ */ package zipkin.internal; -import java.nio.ByteBuffer; +import java.io.EOFException; import java.util.ArrayList; import java.util.List; import okio.Buffer; import okio.ByteString; -import org.apache.thrift.TException; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TField; -import org.apache.thrift.protocol.TList; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.protocol.TStruct; -import org.apache.thrift.protocol.TType; -import org.apache.thrift.transport.TMemoryInputTransport; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; import zipkin.Annotation; import zipkin.BinaryAnnotation; import zipkin.Codec; @@ -35,7 +25,6 @@ import zipkin.Endpoint; import zipkin.Span; -import static org.apache.thrift.protocol.TProtocolUtil.skip; import static zipkin.internal.Util.checkArgument; /** @@ -43,13 +32,29 @@ * core jar. The hard coding not only keeps us with a single data-model, it also allows the minified * core jar free of SLFJ classes otherwise included in generated types. * - *

The implementation appears scarier than it is. It was made by mechanically copying in the - * method bodies of generated thrift classes, specifically {@link org.apache.thrift.TBase#read} and - * {@link org.apache.thrift.TBase#write}. Zipkin uses thrifts that have been very stable in the last - * few years. Moreover, users consolidate to {@link TBinaryProtocol} at rest, even if the structs - * are later compressed with snappy. + *

This is an Okio-native TBinaryProtocol codec. Natively doing this reduces dependencies and + * array duplication. */ public final class ThriftCodec implements Codec { + // break vs decode huge structs, like > 1MB strings or 10k spans in a trace. + private static final int STRING_LENGTH_LIMIT = 1 * 1024 * 1024; + private static final int CONTAINER_LENGTH_LIMIT = 10 * 1000; + // break vs recursing infinitely when skipping data + private static int MAX_SKIP_DEPTH = 2147483647; + + // taken from org.apache.thrift.protocol.TType + private static final byte TYPE_STOP = 0; + private static final byte TYPE_BOOL = 2; + private static final byte TYPE_BYTE = 3; + private static final byte TYPE_DOUBLE = 4; + private static final byte TYPE_I16 = 6; + private static final byte TYPE_I32 = 8; + private static final byte TYPE_I64 = 10; + private static final byte TYPE_STRING = 11; + private static final byte TYPE_STRUCT = 12; + private static final byte TYPE_MAP = 13; + private static final byte TYPE_SET = 14; + private static final byte TYPE_LIST = 15; @Override public Span readSpan(byte[] bytes) { @@ -77,11 +82,11 @@ public byte[] writeTraces(List> value) { } interface ThriftWriter { - void write(T value, TProtocol oprot) throws TException; + void write(T value, Buffer buffer); } interface ThriftReader { - T read(TProtocol iprot) throws TException; + T read(Buffer buffer) throws EOFException; } interface ThriftAdapter extends ThriftReader, ThriftWriter { @@ -89,392 +94,233 @@ interface ThriftAdapter extends ThriftReader, ThriftWriter { static final ThriftAdapter ENDPOINT_ADAPTER = new ThriftAdapter() { - private final TStruct STRUCT_DESC = new TStruct("Endpoint"); - private final TField IPV4_FIELD_DESC = new TField("ipv4", TType.I32, (short) 1); - private final TField PORT_FIELD_DESC = new TField("port", TType.I16, (short) 2); - private final TField SERVICE_NAME_FIELD_DESC = new TField("service_name", TType.STRING, (short) 3); + final Field IPV4 = new Field(TYPE_I32, 1); + final Field PORT = new Field(TYPE_I16, 2); + final Field SERVICE_NAME = new Field(TYPE_STRING, 3); @Override - public Endpoint read(TProtocol iprot) throws TException { + public Endpoint read(Buffer buffer) throws EOFException { Endpoint.Builder result = new Endpoint.Builder(); - TField field; - iprot.readStructBegin(); + Field field; + while (true) { - field = iprot.readFieldBegin(); - if (field.type == TType.STOP) { - break; - } - switch (field.id) { - case 1: // IPV4 - if (field.type == TType.I32) { - result.ipv4(iprot.readI32()); - } else { - skip(iprot, field.type); - } - break; - case 2: // PORT - if (field.type == TType.I16) { - result.port(iprot.readI16()); - } else { - skip(iprot, field.type); - } - break; - case 3: // SERVICE_NAME - if (field.type == TType.STRING) { - result.serviceName(iprot.readString()); - } else { - skip(iprot, field.type); - } - break; - default: - skip(iprot, field.type); + field = Field.read(buffer); + if (field.type == TYPE_STOP) break; + + if (field.equals(IPV4)) { + result.ipv4(buffer.readInt()); + } else if (field.equals(PORT)) { + result.port(buffer.readShort()); + } else if (field.equals(SERVICE_NAME)) { + result.serviceName(readUtf8(buffer)); + } else { + skip(buffer, field.type); } - iprot.readFieldEnd(); } - iprot.readStructEnd(); return result.build(); } @Override - public void write(Endpoint value, TProtocol oprot) throws TException { - oprot.writeStructBegin(STRUCT_DESC); - - oprot.writeFieldBegin(IPV4_FIELD_DESC); - oprot.writeI32(value.ipv4); - oprot.writeFieldEnd(); - oprot.writeFieldBegin(PORT_FIELD_DESC); - oprot.writeI16(value.port == null ? 0 : value.port); - oprot.writeFieldEnd(); - oprot.writeFieldBegin(SERVICE_NAME_FIELD_DESC); - oprot.writeString(value.serviceName); - oprot.writeFieldEnd(); - - oprot.writeFieldStop(); - oprot.writeStructEnd(); + public void write(Endpoint value, Buffer buffer) { + IPV4.write(buffer); + buffer.writeInt(value.ipv4); + + PORT.write(buffer); + buffer.writeShort(value.port == null ? 0 : value.port); + + SERVICE_NAME.write(buffer); + writeUtf8(buffer, value.serviceName); + + buffer.writeByte(TYPE_STOP); } }; static final ThriftAdapter ANNOTATION_ADAPTER = new ThriftAdapter() { - private final TStruct STRUCT_DESC = new TStruct("Annotation"); - private final TField TIMESTAMP_FIELD_DESC = new TField("timestamp", TType.I64, (short) 1); - private final TField VALUE_FIELD_DESC = new TField("value", TType.STRING, (short) 2); - private final TField HOST_FIELD_DESC = new TField("host", TType.STRUCT, (short) 3); + final Field TIMESTAMP = new Field(TYPE_I64, 1); + final Field VALUE = new Field(TYPE_STRING, 2); + final Field ENDPOINT = new Field(TYPE_STRUCT, 3); @Override - public Annotation read(TProtocol iprot) throws TException { + public Annotation read(Buffer buffer) throws EOFException { Annotation.Builder result = new Annotation.Builder(); - TField field; - iprot.readStructBegin(); + Field field; while (true) { - field = iprot.readFieldBegin(); - if (field.type == TType.STOP) { - break; - } - switch (field.id) { - case 1: // TIMESTAMP - if (field.type == TType.I64) { - result.timestamp(iprot.readI64()); - } else { - skip(iprot, field.type); - } - break; - case 2: // VALUE - if (field.type == TType.STRING) { - result.value(iprot.readString()); - } else { - skip(iprot, field.type); - } - break; - case 3: // HOST - if (field.type == TType.STRUCT) { - result.endpoint(ENDPOINT_ADAPTER.read(iprot)); - } else { - skip(iprot, field.type); - } - break; - default: - skip(iprot, field.type); + field = Field.read(buffer); + if (field.type == TYPE_STOP) break; + + if (field.equals(TIMESTAMP)) { + result.timestamp(buffer.readLong()); + } else if (field.equals(VALUE)) { + result.value(readUtf8(buffer)); + } else if (field.equals(ENDPOINT)) { + result.endpoint(ENDPOINT_ADAPTER.read(buffer)); + } else { + skip(buffer, field.type); } - iprot.readFieldEnd(); } - iprot.readStructEnd(); return result.build(); } @Override - public void write(Annotation value, TProtocol oprot) throws TException { - oprot.writeStructBegin(STRUCT_DESC); - - oprot.writeFieldBegin(TIMESTAMP_FIELD_DESC); - oprot.writeI64(value.timestamp); - oprot.writeFieldEnd(); + public void write(Annotation value, Buffer buffer) { + TIMESTAMP.write(buffer); + buffer.writeLong(value.timestamp); if (value.value != null) { - oprot.writeFieldBegin(VALUE_FIELD_DESC); - oprot.writeString(value.value); - oprot.writeFieldEnd(); + VALUE.write(buffer); + writeUtf8(buffer, value.value); } if (value.endpoint != null) { - oprot.writeFieldBegin(HOST_FIELD_DESC); - ENDPOINT_ADAPTER.write(value.endpoint, oprot); - oprot.writeFieldEnd(); + ENDPOINT.write(buffer); + ENDPOINT_ADAPTER.write(value.endpoint, buffer); } - - oprot.writeFieldStop(); - oprot.writeStructEnd(); + buffer.writeByte(TYPE_STOP); } }; static final ThriftAdapter BINARY_ANNOTATION_ADAPTER = new ThriftAdapter() { - private final TStruct STRUCT_DESC = new TStruct("BinaryAnnotation"); - private final TField KEY_FIELD_DESC = new TField("key", TType.STRING, (short) 1); - private final TField VALUE_FIELD_DESC = new TField("value", TType.STRING, (short) 2); - private final TField ANNOTATION_TYPE_FIELD_DESC = new TField("annotation_type", TType.I32, (short) 3); - private final TField HOST_FIELD_DESC = new TField("host", TType.STRUCT, (short) 4); + final Field KEY = new Field(TYPE_STRING, 1); + final Field VALUE = new Field(TYPE_STRING, 2); + final Field TYPE = new Field(TYPE_I32, 3); + final Field ENDPOINT = new Field(TYPE_STRUCT, 4); @Override - public BinaryAnnotation read(TProtocol iprot) throws TException { + public BinaryAnnotation read(Buffer buffer) throws EOFException { BinaryAnnotation.Builder result = new BinaryAnnotation.Builder(); - TField field; - iprot.readStructBegin(); + Field field; + while (true) { - field = iprot.readFieldBegin(); - if (field.type == TType.STOP) { - break; + field = Field.read(buffer); + if (field.type == TYPE_STOP) break; + + if (field.equals(KEY)) { + result.key(readUtf8(buffer)); + } else if (field.equals(VALUE)) { + result.value(readBytes(buffer)); + } else if (field.equals(TYPE)) { + result.type(BinaryAnnotation.Type.fromValue(buffer.readInt())); + } else if (field.equals(ENDPOINT)) { + result.endpoint(ENDPOINT_ADAPTER.read(buffer)); + } else { + skip(buffer, field.type); } - switch (field.id) { - case 1: // KEY - if (field.type == TType.STRING) { - result.key(iprot.readString()); - } else { - skip(iprot, field.type); - } - break; - case 2: // VALUE - if (field.type == TType.STRING) { - ByteBuffer buffer = iprot.readBinary(); - byte[] value = new byte[buffer.remaining()]; - buffer.get(value); - result.value(value); - } else { - skip(iprot, field.type); - } - break; - case 3: // ANNOTATION_TYPE - if (field.type == TType.I32) { - result.type(BinaryAnnotation.Type.fromValue(iprot.readI32())); - } else { - skip(iprot, field.type); - } - break; - case 4: // HOST - if (field.type == TType.STRUCT) { - result.endpoint(ENDPOINT_ADAPTER.read(iprot)); - } else { - skip(iprot, field.type); - } - break; - default: - skip(iprot, field.type); - } - iprot.readFieldEnd(); } - iprot.readStructEnd(); return result.build(); } @Override - public void write(BinaryAnnotation value, TProtocol oprot) throws TException { - oprot.writeStructBegin(STRUCT_DESC); - - oprot.writeFieldBegin(KEY_FIELD_DESC); - oprot.writeString(value.key); - oprot.writeFieldEnd(); + public void write(BinaryAnnotation value, Buffer buffer) { + KEY.write(buffer); + writeUtf8(buffer, value.key); - oprot.writeFieldBegin(VALUE_FIELD_DESC); - oprot.writeBinary(ByteBuffer.wrap(value.value)); - oprot.writeFieldEnd(); + VALUE.write(buffer); + buffer.writeInt(value.value.length); + buffer.write(value.value); - oprot.writeFieldBegin(ANNOTATION_TYPE_FIELD_DESC); - oprot.writeI32(value.type.value); - oprot.writeFieldEnd(); + TYPE.write(buffer); + buffer.writeInt(value.type.value); if (value.endpoint != null) { - oprot.writeFieldBegin(HOST_FIELD_DESC); - ENDPOINT_ADAPTER.write(value.endpoint, oprot); - oprot.writeFieldEnd(); + ENDPOINT.write(buffer); + ENDPOINT_ADAPTER.write(value.endpoint, buffer); } - oprot.writeFieldStop(); - oprot.writeStructEnd(); + buffer.writeByte(TYPE_STOP); } }; + static final ThriftAdapter> ANNOTATIONS_ADAPTER = new ListAdapter<>(ANNOTATION_ADAPTER); + static final ThriftAdapter> BINARY_ANNOTATIONS_ADAPTER = new ListAdapter<>(BINARY_ANNOTATION_ADAPTER); + static final ThriftAdapter SPAN_ADAPTER = new ThriftAdapter() { - private final TStruct STRUCT_DESC = new TStruct("Span"); - private final TField TRACE_ID_FIELD_DESC = new TField("trace_id", TType.I64, (short) 1); - private final TField NAME_FIELD_DESC = new TField("name", TType.STRING, (short) 3); - private final TField ID_FIELD_DESC = new TField("id", TType.I64, (short) 4); - private final TField PARENT_ID_FIELD_DESC = new TField("parent_id", TType.I64, (short) 5); - private final TField ANNOTATIONS_FIELD_DESC = new TField("annotations", TType.LIST, (short) 6); - private final TField BINARY_ANNOTATIONS_FIELD_DESC = new TField("binary_annotations", TType.LIST, (short) 8); - private final TField DEBUG_FIELD_DESC = new TField("debug", TType.BOOL, (short) 9); - private final TField TIMESTAMP_FIELD_DESC = new TField("timestamp", TType.I64, (short) 10); - private final TField DURATION_FIELD_DESC = new TField("duration", TType.I64, (short) 11); + final Field TRACE_ID = new Field(TYPE_I64, 1); + final Field NAME = new Field(TYPE_STRING, 3); + final Field ID = new Field(TYPE_I64, 4); + final Field PARENT_ID = new Field(TYPE_I64, 5); + final Field ANNOTATIONS = new Field(TYPE_LIST, 6); + final Field BINARY_ANNOTATIONS = new Field(TYPE_LIST, 8); + final Field DEBUG = new Field(TYPE_BOOL, 9); + final Field TIMESTAMP = new Field(TYPE_I64, 10); + final Field DURATION = new Field(TYPE_I64, 11); @Override - public Span read(TProtocol iprot) throws TException { + public Span read(Buffer buffer) throws EOFException { Span.Builder result = new Span.Builder(); - TField field; - iprot.readStructBegin(); + Field field; + while (true) { - field = iprot.readFieldBegin(); - if (field.type == TType.STOP) { - break; - } - switch (field.id) { - case 1: // TRACE_ID - if (field.type == TType.I64) { - result.traceId(iprot.readI64()); - } else { - skip(iprot, field.type); - } - break; - case 3: // NAME - if (field.type == TType.STRING) { - result.name(iprot.readString()); - } else { - skip(iprot, field.type); - } - break; - case 4: // ID - if (field.type == TType.I64) { - result.id(iprot.readI64()); - } else { - skip(iprot, field.type); - } - break; - case 5: // PARENT_ID - if (field.type == TType.I64) { - result.parentId(iprot.readI64()); - } else { - skip(iprot, field.type); - } - break; - case 6: // ANNOTATIONS - if (field.type == TType.LIST) { - TList annotations = iprot.readListBegin(); - for (int i = 0; i < annotations.size; i++) { - result.addAnnotation(ANNOTATION_ADAPTER.read(iprot)); - } - iprot.readListEnd(); - } else { - skip(iprot, field.type); - } - break; - case 8: // BINARY_ANNOTATIONS - if (field.type == TType.LIST) { - TList binaryAnnotations = iprot.readListBegin(); - for (int i = 0; i < binaryAnnotations.size; i++) { - result.addBinaryAnnotation(BINARY_ANNOTATION_ADAPTER.read(iprot)); - } - iprot.readListEnd(); - } else { - skip(iprot, field.type); - } - break; - case 9: // DEBUG - if (field.type == TType.BOOL) { - result.debug(iprot.readBool()); - } else { - skip(iprot, field.type); - } - break; - case 10: // TIMESTAMP - if (field.type == TType.I64) { - result.timestamp(iprot.readI64()); - } else { - skip(iprot, field.type); - } - break; - case 11: // DURATION - if (field.type == TType.I64) { - result.duration(iprot.readI64()); - } else { - skip(iprot, field.type); - } - break; - default: - skip(iprot, field.type); + field = Field.read(buffer); + if (field.type == TYPE_STOP) break; + + if (field.equals(TRACE_ID)) { + result.traceId(buffer.readLong()); + } else if (field.equals(NAME)) { + result.name(readUtf8(buffer)); + } else if (field.equals(ID)) { + result.id(buffer.readLong()); + } else if (field.equals(PARENT_ID)) { + result.parentId(buffer.readLong()); + } else if (field.equals(ANNOTATIONS)) { + result.annotations(ANNOTATIONS_ADAPTER.read(buffer)); + } else if (field.equals(BINARY_ANNOTATIONS)) { + result.binaryAnnotations(BINARY_ANNOTATIONS_ADAPTER.read(buffer)); + } else if (field.equals(DEBUG)) { + result.debug(buffer.readByte() == 1); + } else if (field.equals(TIMESTAMP)) { + result.timestamp(buffer.readLong()); + } else if (field.equals(DURATION)) { + result.duration(buffer.readLong()); + } else { + skip(buffer, field.type); } - iprot.readFieldEnd(); } - iprot.readStructEnd(); + return result.build(); } @Override - public void write(Span value, TProtocol oprot) throws TException { - oprot.writeStructBegin(STRUCT_DESC); + public void write(Span value, Buffer buffer) { - oprot.writeFieldBegin(TRACE_ID_FIELD_DESC); - oprot.writeI64(value.traceId); - oprot.writeFieldEnd(); + TRACE_ID.write(buffer); + buffer.writeLong(value.traceId); - oprot.writeFieldBegin(NAME_FIELD_DESC); - oprot.writeString(value.name); - oprot.writeFieldEnd(); + NAME.write(buffer); + writeUtf8(buffer, value.name); - oprot.writeFieldBegin(ID_FIELD_DESC); - oprot.writeI64(value.id); - oprot.writeFieldEnd(); + ID.write(buffer); + buffer.writeLong(value.id); if (value.parentId != null) { - oprot.writeFieldBegin(PARENT_ID_FIELD_DESC); - oprot.writeI64(value.parentId); - oprot.writeFieldEnd(); + PARENT_ID.write(buffer); + buffer.writeLong(value.parentId); } - oprot.writeFieldBegin(ANNOTATIONS_FIELD_DESC); - oprot.writeListBegin(new TList(TType.STRUCT, value.annotations.size())); - for (int i = 0, length = value.annotations.size(); i < length; i++) { - ANNOTATION_ADAPTER.write(value.annotations.get(i), oprot); - } - oprot.writeListEnd(); - oprot.writeFieldEnd(); + ANNOTATIONS.write(buffer); + ANNOTATIONS_ADAPTER.write(value.annotations, buffer); - oprot.writeFieldBegin(BINARY_ANNOTATIONS_FIELD_DESC); - oprot.writeListBegin(new TList(TType.STRUCT, value.binaryAnnotations.size())); - for (int i = 0, length = value.binaryAnnotations.size(); i < length; i++) { - BINARY_ANNOTATION_ADAPTER.write(value.binaryAnnotations.get(i), oprot); - } - oprot.writeListEnd(); - oprot.writeFieldEnd(); + BINARY_ANNOTATIONS.write(buffer); + BINARY_ANNOTATIONS_ADAPTER.write(value.binaryAnnotations, buffer); if (value.debug != null) { - oprot.writeFieldBegin(DEBUG_FIELD_DESC); - oprot.writeBool(value.debug); - oprot.writeFieldEnd(); + DEBUG.write(buffer); + buffer.writeByte(value.debug ? 1 : 0); } if (value.timestamp != null) { - oprot.writeFieldBegin(TIMESTAMP_FIELD_DESC); - oprot.writeI64(value.timestamp); - oprot.writeFieldEnd(); + TIMESTAMP.write(buffer); + buffer.writeLong(value.timestamp); } if (value.duration != null) { - oprot.writeFieldBegin(DURATION_FIELD_DESC); - oprot.writeI64(value.duration); - oprot.writeFieldEnd(); + DURATION.write(buffer); + buffer.writeLong(value.duration); } - oprot.writeFieldStop(); - oprot.writeStructEnd(); + buffer.writeByte(TYPE_STOP); } @Override @@ -488,70 +334,45 @@ public String toString() { static final ThriftAdapter DEPENDENCY_LINK_ADAPTER = new ThriftAdapter() { - private final TStruct STRUCT_DESC = new TStruct("DependencyLink"); - private final TField PARENT_FIELD_DESC = new TField("parent", TType.STRING, (short) 1); - private final TField CHILD_FIELD_DESC = new TField("child", TType.STRING, (short) 2); - private final TField CALL_COUNT_FIELD_DESC = new TField("call_count", TType.I64, (short) 4); + final Field PARENT = new Field(TYPE_STRING, 1); + final Field CHILD = new Field(TYPE_STRING, 2); + final Field CALL_COUNT = new Field(TYPE_I64, 4); @Override - public DependencyLink read(TProtocol iprot) throws TException { + public DependencyLink read(Buffer buffer) throws EOFException { DependencyLink.Builder result = new DependencyLink.Builder(); - TField field; - iprot.readStructBegin(); + Field field; + while (true) { - field = iprot.readFieldBegin(); - if (field.type == TType.STOP) { - break; + field = Field.read(buffer); + if (field.type == TYPE_STOP) break; + + if (field.equals(PARENT)) { + result.parent(readUtf8(buffer)); + } else if (field.equals(CHILD)) { + result.child(readUtf8(buffer)); + } else if (field.equals(CALL_COUNT)) { + result.callCount(buffer.readLong()); + } else { + skip(buffer, field.type); } - switch (field.id) { - case 1: // PARENT - if (field.type == TType.STRING) { - result.parent(iprot.readString()); - } else { - skip(iprot, field.type); - } - break; - case 2: // CHILD - if (field.type == TType.STRING) { - result.child(iprot.readString()); - } else { - skip(iprot, field.type); - } - break; - case 4: // CALL_COUNT - if (field.type == TType.I64) { - result.callCount(iprot.readI64()); - } else { - skip(iprot, field.type); - } - break; - default: - skip(iprot, field.type); - } - iprot.readFieldEnd(); } - iprot.readStructEnd(); + return result.build(); } @Override - public void write(DependencyLink value, TProtocol oprot) throws TException { - oprot.writeStructBegin(STRUCT_DESC); - - oprot.writeFieldBegin(PARENT_FIELD_DESC); - oprot.writeString(value.parent); - oprot.writeFieldEnd(); + public void write(DependencyLink value, Buffer buffer) { + PARENT.write(buffer); + writeUtf8(buffer, value.parent); - oprot.writeFieldBegin(CHILD_FIELD_DESC); - oprot.writeString(value.child); - oprot.writeFieldEnd(); + CHILD.write(buffer); + writeUtf8(buffer, value.child); - oprot.writeFieldBegin(CALL_COUNT_FIELD_DESC); - oprot.writeI64(value.callCount); - oprot.writeFieldEnd(); + CALL_COUNT.write(buffer); + buffer.writeLong(value.callCount); - oprot.writeFieldStop(); - oprot.writeStructEnd(); + buffer.writeByte(TYPE_STOP); } @Override @@ -560,8 +381,7 @@ public String toString() { } }; - static final ThriftAdapter> DEPENDENCY_LINKS_ADAPTER = - new ListAdapter<>(DEPENDENCY_LINK_ADAPTER); + static final ThriftAdapter> DEPENDENCY_LINKS_ADAPTER = new ListAdapter<>(DEPENDENCY_LINK_ADAPTER); @Override public List readDependencyLinks(byte[] bytes) { @@ -573,64 +393,58 @@ public byte[] writeDependencyLinks(List value) { return write(DEPENDENCY_LINKS_ADAPTER, value); } - private static T read(ThriftReader reader, byte[] bytes) { + static T read(ThriftReader reader, byte[] bytes) { checkArgument(bytes.length > 0, "Empty input reading %s", reader); try { - return reader.read(new TBinaryProtocol(new TMemoryInputTransport(bytes))); - } catch (TException | RuntimeException e) { + return reader.read(new Buffer().write(bytes)); + } catch (EOFException | RuntimeException e) { throw exceptionReading(reader.toString(), bytes, e); } } /** Inability to encode is a programming bug. */ - private static byte[] write(ThriftWriter writer, T value) { - BufferTransport transport = new BufferTransport(); - TBinaryProtocol protocol = new TBinaryProtocol(transport); + static byte[] write(ThriftWriter writer, T value) { + Buffer buffer = new Buffer(); try { - writer.write(value, protocol); - } catch (TException | RuntimeException e) { + writer.write(value, buffer); + } catch (RuntimeException e) { throw new AssertionError("Could not write " + value + " as TBinary", e); } - return transport.buffer.readByteArray(); + return buffer.readByteArray(); } - static List readList(ThriftReader reader, TProtocol iprot) throws TException { - TList peekLength = iprot.readListBegin(); - if (peekLength.size > 10000) { // don't allocate massive arrays - throw new TException(peekLength.size + " > 10000: possibly malformed thrift"); + static List readList(ThriftReader reader, Buffer buffer) throws EOFException { + byte ignoredType = buffer.readByte(); + int length = guardLength(buffer, CONTAINER_LENGTH_LIMIT); + List result = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + result.add(reader.read(buffer)); } - List result = new ArrayList<>(peekLength.size); - for (int i = 0; i < peekLength.size; i++) { - result.add(reader.read(iprot)); - } - iprot.readListEnd(); return result; } - static void writeList(ThriftWriter writer, List value, TProtocol oprot) - throws TException { - oprot.writeListBegin(new TList(TType.STRUCT, value.size())); + static void writeList(ThriftWriter writer, List value, Buffer buffer) { + writeListBegin(buffer, value.size()); for (int i = 0, length = value.size(); i < length; i++) { - writer.write(value.get(i), oprot); + writer.write(value.get(i), buffer); } - oprot.writeListEnd(); } - private static final class ListAdapter implements ThriftAdapter> { - private final ThriftAdapter adapter; + static final class ListAdapter implements ThriftAdapter> { + final ThriftAdapter adapter; ListAdapter(ThriftAdapter adapter) { this.adapter = adapter; } @Override - public List read(TProtocol iprot) throws TException { - return readList(adapter, iprot); + public List read(Buffer buffer) throws EOFException { + return readList(adapter, buffer); } @Override - public void write(List value, TProtocol oprot) throws TException { - writeList(adapter, value, oprot); + public void write(List value, Buffer buffer) { + writeList(adapter, value, buffer); } @Override @@ -639,38 +453,114 @@ public String toString() { } } - static final class BufferTransport extends TTransport { - final Buffer buffer = new Buffer(); + static IllegalArgumentException exceptionReading(String type, byte[] bytes, Exception e) { + String cause = e.getMessage() == null ? "Error" : e.getMessage(); + if (e instanceof EOFException) cause = "EOF"; + if (e instanceof IllegalStateException) cause = "Malformed"; + String message = + String.format("%s reading %s from TBinary: %s", cause, type, ByteString.of(bytes).base64()); + throw new IllegalArgumentException(message, e); + } + + static final class Field { + final byte type; + final int id; - @Override - public boolean isOpen() { - return true; + Field(byte type, int id) { + this.type = type; + this.id = id; } - @Override - public void open() { + void write(Buffer buffer) { + buffer.writeByte(type); + buffer.writeShort(id); } - @Override - public void close() { + static Field read(Buffer buffer) { + byte type = buffer.readByte(); + return new Field(type, type == TYPE_STOP ? TYPE_STOP : buffer.readShort()); } - @Override - public int read(byte[] buf, int off, int len) { - return buffer.read(buf, off, len); + boolean equals(Field that) { + return this.type == that.type && this.id == that.id; } + } - @Override - public void write(byte[] buf, int off, int len) { - buffer.write(buf, off, len); + static void skip(Buffer buffer, byte type) throws EOFException { + skip(buffer, type, MAX_SKIP_DEPTH); + } + + static void skip(Buffer buffer, byte type, int maxDepth) throws EOFException { + if (maxDepth <= 0) throw new EOFException("Maximum skip depth exceeded"); + switch (type) { + case TYPE_BOOL: + case TYPE_BYTE: + buffer.skip(1); + break; + case TYPE_I16: + buffer.skip(2); + break; + case TYPE_I32: + buffer.skip(4); + break; + case TYPE_DOUBLE: + case TYPE_I64: + buffer.skip(8); + break; + case TYPE_STRING: + int size = guardLength(buffer, STRING_LENGTH_LIMIT); + buffer.skip(size); + break; + case TYPE_STRUCT: + while (true) { + Field field = Field.read(buffer); + if (field.type == TYPE_STOP) return; + skip(buffer, field.type, maxDepth - 1); + } + case TYPE_MAP: + byte keyType = buffer.readByte(); + byte valueType = buffer.readByte(); + for (int i = 0, length = guardLength(buffer, CONTAINER_LENGTH_LIMIT); i < length; i++) { + skip(buffer, keyType, maxDepth - 1); + skip(buffer, valueType, maxDepth - 1); + } + break; + case TYPE_SET: + case TYPE_LIST: + byte elemType = buffer.readByte(); + for (int i = 0, length = guardLength(buffer, CONTAINER_LENGTH_LIMIT); i < length; i++) { + skip(buffer, elemType, maxDepth - 1); + } + break; + default: // types that don't need explicit skipping + break; } } - static IllegalArgumentException exceptionReading(String type, byte[] bytes, Exception e) { - String cause = e.getMessage() == null ? "Error" : e.getMessage(); - if (e instanceof TTransportException || cause.indexOf("malformed") != -1) cause = "Malformed"; - String message = - String.format("%s reading %s from TBinary: %s", cause, type, ByteString.of(bytes).base64()); - throw new IllegalArgumentException(message, e); + static byte[] readBytes(Buffer buffer) throws EOFException { + return buffer.readByteArray(guardLength(buffer, STRING_LENGTH_LIMIT)); + } + + static String readUtf8(Buffer buffer) throws EOFException { + return buffer.readUtf8(guardLength(buffer, STRING_LENGTH_LIMIT)); + } + + static int guardLength(Buffer buffer, int limit) { + int length = buffer.readInt(); + if (length > limit) { // don't allocate massive arrays + throw new IllegalStateException(length + " > " + limit + ": possibly malformed thrift"); + } + return length; + } + + static void writeListBegin(Buffer buffer, int size) { + buffer.writeByte(TYPE_STRUCT); + buffer.writeInt(size); + } + + static void writeUtf8(Buffer buffer, String string) { + Buffer temp = new Buffer().writeUtf8(string); + buffer.writeInt((int) temp.size()); + buffer.write(temp, temp.size()); } } diff --git a/zipkin/src/test/java/zipkin/SpanStoreTest.java b/zipkin/src/test/java/zipkin/SpanStoreTest.java index a8acbbceaeb..c4fc3aadc41 100644 --- a/zipkin/src/test/java/zipkin/SpanStoreTest.java +++ b/zipkin/src/test/java/zipkin/SpanStoreTest.java @@ -25,6 +25,7 @@ import org.junit.Test; import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; import static org.assertj.core.api.Assertions.assertThat; import static zipkin.Constants.LOCAL_COMPONENT; @@ -74,39 +75,39 @@ private static long midnight(){ .name("methodcall") .id(spanId) .timestamp(ann1.timestamp).duration(9000L) - .annotations(ann1, ann3) - .binaryAnnotations(BinaryAnnotation.create("BAH", "BEH", ep)).build(); + .annotations(asList(ann1, ann3)) + .addBinaryAnnotation(BinaryAnnotation.create("BAH", "BEH", ep)).build(); Span span2 = new Span.Builder() .traceId(456) .name("methodcall") .id(spanId) .timestamp(ann2.timestamp) - .annotations(ann2) - .binaryAnnotations(BinaryAnnotation.create("BAH2", "BEH2", ep)).build(); + .addAnnotation(ann2) + .addBinaryAnnotation(BinaryAnnotation.create("BAH2", "BEH2", ep)).build(); Span span3 = new Span.Builder() .traceId(789) .name("methodcall") .id(spanId) .timestamp(ann2.timestamp).duration(18000L) - .annotations(ann2, ann3, ann4) - .binaryAnnotations(BinaryAnnotation.create("BAH2", "BEH2", ep)).build(); + .annotations(asList(ann2, ann3, ann4)) + .addBinaryAnnotation(BinaryAnnotation.create("BAH2", "BEH2", ep)).build(); Span span4 = new Span.Builder() .traceId(999) .name("methodcall") .id(spanId) .timestamp(ann6.timestamp).duration(1000L) - .annotations(ann6, ann7).build(); + .annotations(asList(ann6, ann7)).build(); Span span5 = new Span.Builder() .traceId(999) .name("methodcall") .id(spanId) .timestamp(ann5.timestamp).duration(3000L) - .annotations(ann5, ann8) - .binaryAnnotations(BinaryAnnotation.create("BAH2", "BEH2", ep)).build(); + .annotations(asList(ann5, ann8)) + .addBinaryAnnotation(BinaryAnnotation.create("BAH2", "BEH2", ep)).build(); Span spanEmptySpanName = new Span.Builder() .traceId(123) @@ -114,7 +115,7 @@ private static long midnight(){ .id(spanId) .parentId(1L) .timestamp(ann1.timestamp).duration(1000L) - .annotations(ann1, ann2).build(); + .annotations(asList(ann1, ann2)).build(); Span spanEmptyServiceName = new Span.Builder() .traceId(123) @@ -139,7 +140,7 @@ public void getSpansByTraceIds() { */ @Test public void tracesRetrieveInOrderDesc() { - store.accept(iterator(span2, new Span.Builder(span1).annotations(ann3, ann1).build())); + store.accept(iterator(span2, new Span.Builder(span1).annotations(asList(ann3, ann1)).build())); assertThat(store.getTracesByIds(asList(span2.traceId, span1.traceId))) .containsOnly(asList(span2), asList(span1)); @@ -170,7 +171,7 @@ public void getSpanNames() { @Test public void getAllServiceNames() { BinaryAnnotation yak = BinaryAnnotation.address("sa", Endpoint.create("yak", 127 << 24 | 1, 8080)); - store.accept(iterator(new Span.Builder(span1).binaryAnnotations(yak).build(), span4)); + store.accept(iterator(new Span.Builder(span1).addBinaryAnnotation(yak).build(), span4)); // should be in order assertThat(store.getServiceNames()).containsExactly("service", "yak"); @@ -221,7 +222,7 @@ public void getTraces_spanName() { public void getTraces_serviceNameInBinaryAnnotation() { List localTrace = asList(new Span.Builder().traceId(1L).name("targz").id(1L) .timestamp(today * 1000 + 100L).duration(200L) - .binaryAnnotations(BinaryAnnotation.create(LOCAL_COMPONENT, "archiver", ep)).build()); + .addBinaryAnnotation(BinaryAnnotation.create(LOCAL_COMPONENT, "archiver", ep)).build()); store.accept(localTrace.iterator()); @@ -242,19 +243,19 @@ public void getTraces_duration() { BinaryAnnotation archiver3 = component.endpoint(service3).build(); Span targz = new Span.Builder().traceId(1L).id(1L) - .name("targz").timestamp(today * 1000 + 100L).duration(200L).binaryAnnotations(archiver1).build(); + .name("targz").timestamp(today * 1000 + 100L).duration(200L).addBinaryAnnotation(archiver1).build(); Span tar = new Span.Builder().traceId(1L).id(2L).parentId(1L) - .name("tar").timestamp(today * 1000 + 200L).duration(150L).binaryAnnotations(archiver2).build(); + .name("tar").timestamp(today * 1000 + 200L).duration(150L).addBinaryAnnotation(archiver2).build(); Span gz = new Span.Builder().traceId(1L).id(3L).parentId(1L) - .name("gz").timestamp(today * 1000 + 250L).duration(50L).binaryAnnotations(archiver3).build(); + .name("gz").timestamp(today * 1000 + 250L).duration(50L).addBinaryAnnotation(archiver3).build(); Span zip = new Span.Builder().traceId(3L).id(3L) - .name("zip").timestamp(today * 1000 + 130L).duration(50L).binaryAnnotations(archiver2).build(); + .name("zip").timestamp(today * 1000 + 130L).duration(50L).addBinaryAnnotation(archiver2).build(); List trace1 = asList(targz, tar, gz); List trace2 = asList( - new Span.Builder(targz).traceId(2L).timestamp(today * 1000 + 110L).binaryAnnotations(archiver3).build(), - new Span.Builder(tar).traceId(2L).timestamp(today * 1000 + 210L).binaryAnnotations(archiver2).build(), - new Span.Builder(gz).traceId(2L).timestamp(today * 1000 + 260L).binaryAnnotations(archiver1).build()); + new Span.Builder(targz).traceId(2L).timestamp(today * 1000 + 110L).binaryAnnotations(asList(archiver3)).build(), + new Span.Builder(tar).traceId(2L).timestamp(today * 1000 + 210L).binaryAnnotations(asList(archiver2)).build(), + new Span.Builder(gz).traceId(2L).timestamp(today * 1000 + 260L).binaryAnnotations(asList(archiver1)).build()); List trace3 = asList(zip); store.accept(trace1.iterator()); @@ -292,13 +293,13 @@ public void getTraces_duration() { @Test public void getTraces_absentWhenNoTimestamp() { // store the binary annotations - store.accept(iterator(new Span.Builder(span1).timestamp(null).duration(null).annotations().build())); + store.accept(iterator(new Span.Builder(span1).timestamp(null).duration(null).annotations(emptyList()).build())); assertThat(store.getTraces(new QueryRequest.Builder("service").build())).isEmpty(); assertThat(store.getTraces(new QueryRequest.Builder("service").serviceName("methodcall").build())).isEmpty(); // now store the timestamped annotations - store.accept(iterator(new Span.Builder(span1).binaryAnnotations().build())); + store.accept(iterator(new Span.Builder(span1).binaryAnnotations(emptyList()).build())); assertThat(store.getTraces(new QueryRequest.Builder("service").build())) .containsExactly(asList(span1)); @@ -323,20 +324,22 @@ public void getTraces_annotation() { public void getTraces_multipleAnnotationsBecomeAndFilter() { Span foo = new Span.Builder().traceId(1).name("call1").id(1) .timestamp((today + 1) * 1000) - .annotations(Annotation.create((today + 1) * 1000, "foo", ep)).build(); + .addAnnotation(Annotation.create((today + 1) * 1000, "foo", ep)).build(); // would be foo bar, except lexicographically bar precedes foo Span barAndFoo = new Span.Builder().traceId(2).name("call2").id(2) .timestamp((today + 2) * 1000) - .annotations(Annotation.create((today + 2) * 1000, "bar", ep), Annotation.create((today + 2) * 1000, "foo", ep)).build(); + .addAnnotation(Annotation.create((today + 2) * 1000, "bar", ep)) + .addAnnotation(Annotation.create((today + 2) * 1000, "foo", ep)).build(); Span fooAndBazAndQux = new Span.Builder().traceId(3).name("call3").id(3) .timestamp((today + 3) * 1000) - .annotations(Annotation.create((today + 3) * 1000, "foo", ep)) - .binaryAnnotations(BinaryAnnotation.create("baz", "qux", ep)) + .addAnnotation(Annotation.create((today + 3) * 1000, "foo", ep)) + .addBinaryAnnotation(BinaryAnnotation.create("baz", "qux", ep)) .build(); Span barAndFooAndBazAndQux = new Span.Builder().traceId(4).name("call4").id(4) .timestamp((today + 4) * 1000) - .annotations(Annotation.create((today + 4) * 1000, "bar", ep), Annotation.create((today + 4) * 1000, "foo", ep)) - .binaryAnnotations(BinaryAnnotation.create("baz", "qux", ep)) + .addAnnotation(Annotation.create((today + 4) * 1000, "bar", ep)) + .addAnnotation(Annotation.create((today + 4) * 1000, "foo", ep)) + .addBinaryAnnotation(BinaryAnnotation.create("baz", "qux", ep)) .build(); store.accept(iterator(foo, barAndFoo, fooAndBazAndQux, barAndFooAndBazAndQux)); @@ -365,8 +368,8 @@ public void getTraces_mergesSpans() { Span merged = new Span.Builder(span4) .timestamp(mergedAnnotations.first().timestamp) .duration(mergedAnnotations.last().timestamp - mergedAnnotations.first().timestamp) - .annotations(mergedAnnotations.toArray(new Annotation[0])) - .binaryAnnotations(span5.binaryAnnotations.toArray(new BinaryAnnotation[0])).build(); + .annotations(mergedAnnotations) + .binaryAnnotations(span5.binaryAnnotations).build(); assertThat(store.getTraces(new QueryRequest.Builder("service").build())) .containsExactly(asList(merged), asList(span1)); @@ -487,7 +490,7 @@ public void correctsClockSkew() { .id(778) .parentId(666L) .timestamp((today + 101) * 1000).duration(50L) - .binaryAnnotations(BinaryAnnotation.create(LOCAL_COMPONENT, "framey", frontend)).build(); + .addBinaryAnnotation(BinaryAnnotation.create(LOCAL_COMPONENT, "framey", frontend)).build(); List skewed = asList(parent, remoteChild, localChild);