From 278d75abc83b72f626d57ac478df519c2ae093a7 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Tue, 17 Nov 2015 14:43:42 -0800 Subject: [PATCH] Moves codec ops to an interface in preparation of tracing Codec had a larger interface than needed, and instances were used directly. This moves to an interface for reasons including preparation of codec tracing via dependency injection. --- .../src/main/java/io/zipkin/Codec.java | 38 ++- .../java/io/zipkin/internal/JsonCodec.java | 128 ++++----- .../java/io/zipkin/internal/ThriftCodec.java | 247 ++++++++++-------- .../main/java/io/zipkin/internal/Util.java | 23 -- .../src/test/java/io/zipkin/CodecTest.java | 41 ++- .../java/io/zipkin/internal/UtilTest.java | 31 --- .../zipkin/interop/ScalaSpanStoreAdapter.java | 29 +- .../io/zipkin/server/ZipkinQueryApiV1.java | 33 +-- 8 files changed, 273 insertions(+), 297 deletions(-) diff --git a/zipkin-java-core/src/main/java/io/zipkin/Codec.java b/zipkin-java-core/src/main/java/io/zipkin/Codec.java index 770a7d1c7fe..90569af4266 100644 --- a/zipkin-java-core/src/main/java/io/zipkin/Codec.java +++ b/zipkin-java-core/src/main/java/io/zipkin/Codec.java @@ -23,30 +23,46 @@ */ public interface Codec { + interface Factory { + /** Returns a codec for the given media type (ex. "application/json") or null if not found. */ + @Nullable + Codec get(String mediaType); + } + Codec JSON = new JsonCodec(); Codec THRIFT = new ThriftCodec(); - /** Returns null if the span couldn't be decoded */ - @Nullable - Span readSpan(byte[] bytes); + Factory FACTORY = new Factory() { + + @Override + public Codec get(String mediaType) { + if (mediaType.startsWith("application/json")) { + return JSON; + } else if (mediaType.startsWith("application/x-thrift")) { + return THRIFT; + } + return null; + } + }; - /** Returns null if the span couldn't be encoded */ - @Nullable - byte[] writeSpan(Span value); /** Returns null if the spans couldn't be decoded */ @Nullable List readSpans(byte[] bytes); - /** Returns null if the span couldn't be encoded */ + /** Returns null if the spans couldn't be encoded */ @Nullable byte[] writeSpans(List value); - /** Returns null if the dependency link couldn't be decoded */ + /** Returns null if the traces couldn't be encoded */ + @Nullable + byte[] writeTraces(List> value); + + /** Returns null if the dependency links couldn't be decoded */ @Nullable - DependencyLink readDependencyLink(byte[] bytes); + List readDependencyLinks(byte[] bytes); - /** Returns null if the dependency link couldn't be encoded */ + /** Returns null if the dependency links couldn't be encoded */ @Nullable - byte[] writeDependencyLink(DependencyLink value); + byte[] writeDependencyLinks(List value); } diff --git a/zipkin-java-core/src/main/java/io/zipkin/internal/JsonCodec.java b/zipkin-java-core/src/main/java/io/zipkin/internal/JsonCodec.java index 8a74f0de45e..4df33c9b6db 100644 --- a/zipkin-java-core/src/main/java/io/zipkin/internal/JsonCodec.java +++ b/zipkin-java-core/src/main/java/io/zipkin/internal/JsonCodec.java @@ -24,6 +24,7 @@ import io.zipkin.Endpoint; import io.zipkin.Span; import java.io.IOException; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.logging.Logger; @@ -63,11 +64,6 @@ public Long fromJson(JsonReader reader) throws IOException { public void toJson(JsonWriter writer, Long value) throws IOException { writer.value(String.format("%016x", value)); } - - @Override - public String toString() { - return "JsonAdapter(HexLong)"; - } }; public static final JsonAdapter ENDPOINT_ADAPTER = new JsonAdapter() { @@ -115,11 +111,6 @@ public void toJson(JsonWriter writer, Endpoint value) throws IOException { } writer.endObject(); } - - @Override - public String toString() { - return "JsonAdapter(Endpoint)"; - } }; public static final JsonAdapter ANNOTATION_ADAPTER = new Moshi.Builder() @@ -234,11 +225,6 @@ public void toJson(JsonWriter writer, BinaryAnnotation value) throws IOException } writer.endObject(); } - - @Override - public String toString() { - return "JsonAdapter(BinaryAnnotation)"; - } }; public static final JsonAdapter SPAN_ADAPTER = new JsonAdapter() { @@ -326,58 +312,36 @@ public void toJson(JsonWriter writer, Span value) throws IOException { } writer.endObject(); } - - @Override - public String toString() { - return "JsonAdapter(Span)"; - } }; @Override - public Span readSpan(byte[] bytes) { - return read(SPAN_ADAPTER, bytes); + public List readSpans(byte[] bytes) { + return readList(SPAN_ADAPTER, bytes); } @Override - public byte[] writeSpan(Span value) { - return write(SPAN_ADAPTER, value); + public byte[] writeSpans(List value) { + return writeList(SPAN_ADAPTER, value); } - public static final JsonAdapter> SPAN_LIST_ADAPTER = new JsonAdapter>() { - @Override - public List fromJson(JsonReader reader) throws IOException { - List spans = new LinkedList<>(); // cause we don't know how long it will be - reader.beginArray(); - while (reader.hasNext()) { - spans.add(SPAN_ADAPTER.fromJson(reader)); - } - reader.endArray(); - return spans; - } - - @Override - public void toJson(JsonWriter writer, List value) throws IOException { - writer.beginArray(); - for (int i = 0, length = value.size(); i < length; i++) { - SPAN_ADAPTER.toJson(writer, value.get(i)); + @Override + public byte[] writeTraces(List> traces) { + Buffer buffer = new Buffer(); + buffer.writeUtf8CodePoint('['); // start list of traces + for (Iterator> trace = traces.iterator(); trace.hasNext(); ) { + + buffer.writeUtf8CodePoint('['); // start trace + // write each span + for (Iterator span = trace.next().iterator(); span.hasNext(); ) { + if (!write(SPAN_ADAPTER, span.next(), buffer)) return null; + if (span.hasNext()) buffer.writeUtf8CodePoint(','); } - writer.endArray(); - } + buffer.writeUtf8CodePoint(']'); // stop trace - @Override - public String toString() { - return "JsonAdapter(List)"; + if (trace.hasNext()) buffer.writeUtf8CodePoint(','); } - }; - - @Override - public List readSpans(byte[] bytes) { - return read(SPAN_LIST_ADAPTER, bytes); - } - - @Override - public byte[] writeSpans(List value) { - return write(SPAN_LIST_ADAPTER, value); + buffer.writeUtf8CodePoint(']'); // stop list of traces + return buffer.readByteArray(); } public static final JsonAdapter DEPENDENCY_LINK_ADAPTER = new JsonAdapter() { @@ -413,45 +377,61 @@ public void toJson(JsonWriter writer, DependencyLink value) throws IOException { writer.name("callCount").value(value.callCount); writer.endObject(); } - - @Override - public String toString() { - return "JsonAdapter(DependencyLink)"; - } }; @Override - public DependencyLink readDependencyLink(byte[] bytes) { - return read(DEPENDENCY_LINK_ADAPTER, bytes); + public List readDependencyLinks(byte[] bytes) { + return readList(DEPENDENCY_LINK_ADAPTER, bytes); } @Override - public byte[] writeDependencyLink(DependencyLink value) { - return write(DEPENDENCY_LINK_ADAPTER, value); + public byte[] writeDependencyLinks(List value) { + return writeList(DEPENDENCY_LINK_ADAPTER, value); } - private T read(JsonAdapter adapter, byte[] bytes) { - Buffer buffer = new Buffer(); - buffer.write(bytes); + private static List readList(JsonAdapter adapter, byte[] bytes) { + JsonReader reader = JsonReader.of(new Buffer().write(bytes)); + List result = new LinkedList<>(); // cause we don't know how long it will be try { - return adapter.fromJson(buffer); + reader.beginArray(); + while (reader.hasNext()) { + T next = adapter.fromJson(reader); + if (next == null) return null; + result.add(next); + } + reader.endArray(); + return result; } catch (IOException e) { if (LOGGER.isLoggable(FINEST)) { - LOGGER.log(FINEST, adapter + " could not read " + new String(bytes, UTF_8), e); + LOGGER.log(FINEST, "Could not read " + adapter + " from json" + new String(bytes, UTF_8), e); } return null; } } - private byte[] write(JsonAdapter adapter, T value) { + /** Returns null if any element could not be written. */ + @Nullable + private static byte[] writeList(JsonAdapter adapter, List values) { Buffer buffer = new Buffer(); + buffer.writeUtf8CodePoint('['); + for (Iterator i = values.iterator(); i.hasNext(); ) { + if (!write(adapter, i.next(), buffer)) return null; + if (i.hasNext()) buffer.writeUtf8CodePoint(','); + } + buffer.writeUtf8CodePoint(']'); + return buffer.readByteArray(); + } + + /** Returns false when the value could not be written */ + private static boolean write(JsonAdapter adapter, T value, Buffer buffer) { try { - adapter.toJson(buffer, value); + adapter.toJson(JsonWriter.of(buffer), value); + return true; } catch (IOException e) { if (LOGGER.isLoggable(FINEST)) { - LOGGER.log(FINEST, adapter + " could not write " + value, e); + LOGGER.log(FINEST, "Could not write " + value + " as json", e); } + return false; } - return buffer.readByteArray(); } } diff --git a/zipkin-java-core/src/main/java/io/zipkin/internal/ThriftCodec.java b/zipkin-java-core/src/main/java/io/zipkin/internal/ThriftCodec.java index 428fc45d21e..ffd114357c1 100644 --- a/zipkin-java-core/src/main/java/io/zipkin/internal/ThriftCodec.java +++ b/zipkin-java-core/src/main/java/io/zipkin/internal/ThriftCodec.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.logging.Logger; +import okio.Buffer; import okio.ByteString; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; @@ -31,8 +32,8 @@ import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.protocol.TStruct; import org.apache.thrift.protocol.TType; -import org.apache.thrift.transport.TMemoryBuffer; import org.apache.thrift.transport.TMemoryInputTransport; +import org.apache.thrift.transport.TTransport; import static java.util.logging.Level.FINEST; import static org.apache.thrift.protocol.TProtocolUtil.skip; @@ -52,39 +53,37 @@ public final class ThriftCodec implements Codec { private static final Logger LOGGER = Logger.getLogger(ThriftCodec.class.getName()); @Override - public Span readSpan(byte[] bytes) { - return read(SpanAdapter.INSTANCE, bytes); + public List readSpans(byte[] bytes) { + return read(SPANS_ADAPTER, bytes); } @Override - public byte[] writeSpan(Span value) { - return write(SpanAdapter.INSTANCE, value); + public byte[] writeSpans(List value) { + return write(SPANS_ADAPTER, value); } @Override - public List readSpans(byte[] bytes) { - return read(SpanListAdapter.INSTANCE, bytes); + public byte[] writeTraces(List> value) { + return write(TRACES_ADAPTER, value); } - @Override - public byte[] writeSpans(List value) { - return write(SpanListAdapter.INSTANCE, value); + private interface ThriftWriter { + void write(T value, TProtocol oprot) throws TException; } - interface ThriftAdapter { + private interface ThriftReader { T read(TProtocol iprot) throws TException; - - void write(T value, TProtocol oprot) throws TException; } - enum EndpointAdapter implements ThriftAdapter { - INSTANCE; + private interface ThriftAdapter extends ThriftReader, ThriftWriter { + } - private static final TStruct STRUCT_DESC = new TStruct("Endpoint"); + static final ThriftAdapter ENDPOINT_ADAPTER = new ThriftAdapter() { - private static final TField IPV4_FIELD_DESC = new TField("ipv4", TType.I32, (short) 1); - private static final TField PORT_FIELD_DESC = new TField("port", TType.I16, (short) 2); - private static final TField SERVICE_NAME_FIELD_DESC = new TField("service_name", TType.STRING, (short) 3); + private final TStruct STRUCT_DESC = new TStruct("Endpoint"); + private final TField IPV4_FIELD_DESC = new TField("ipv4", TType.I32, (short) 1); + private final TField PORT_FIELD_DESC = new TField("port", TType.I16, (short) 2); + private final TField SERVICE_NAME_FIELD_DESC = new TField("service_name", TType.STRING, (short) 3); @Override public Endpoint read(TProtocol iprot) throws TException { @@ -144,15 +143,14 @@ public void write(Endpoint value, TProtocol oprot) throws TException { oprot.writeFieldStop(); oprot.writeStructEnd(); } - } + }; - enum AnnotationAdapter implements ThriftAdapter { - INSTANCE; + static final ThriftAdapter ANNOTATION_ADAPTER = new ThriftAdapter() { - private static final TStruct STRUCT_DESC = new TStruct("Annotation"); - private static final TField TIMESTAMP_FIELD_DESC = new TField("timestamp", TType.I64, (short) 1); - private static final TField VALUE_FIELD_DESC = new TField("value", TType.STRING, (short) 2); - private static final TField HOST_FIELD_DESC = new TField("host", TType.STRUCT, (short) 3); + private final TStruct STRUCT_DESC = new TStruct("Annotation"); + private final TField TIMESTAMP_FIELD_DESC = new TField("timestamp", TType.I64, (short) 1); + private final TField VALUE_FIELD_DESC = new TField("value", TType.STRING, (short) 2); + private final TField HOST_FIELD_DESC = new TField("host", TType.STRUCT, (short) 3); @Override public Annotation read(TProtocol iprot) throws TException { @@ -181,7 +179,7 @@ public Annotation read(TProtocol iprot) throws TException { break; case 3: // HOST if (field.type == TType.STRUCT) { - result.endpoint(EndpointAdapter.INSTANCE.read(iprot)); + result.endpoint(ENDPOINT_ADAPTER.read(iprot)); } else { skip(iprot, field.type); } @@ -211,23 +209,22 @@ public void write(Annotation value, TProtocol oprot) throws TException { if (value.endpoint != null) { oprot.writeFieldBegin(HOST_FIELD_DESC); - EndpointAdapter.INSTANCE.write(value.endpoint, oprot); + ENDPOINT_ADAPTER.write(value.endpoint, oprot); oprot.writeFieldEnd(); } oprot.writeFieldStop(); oprot.writeStructEnd(); } - } + }; - enum BinaryAnnotationAdapter implements ThriftAdapter { - INSTANCE; + static final ThriftAdapter BINARY_ANNOTATION_ADAPTER = new ThriftAdapter() { - private static final TStruct STRUCT_DESC = new TStruct("BinaryAnnotation"); - private static final TField KEY_FIELD_DESC = new TField("key", TType.STRING, (short) 1); - private static final TField VALUE_FIELD_DESC = new TField("value", TType.STRING, (short) 2); - private static final TField ANNOTATION_TYPE_FIELD_DESC = new TField("annotation_type", TType.I32, (short) 3); - private static final TField HOST_FIELD_DESC = new TField("host", TType.STRUCT, (short) 4); + private final TStruct STRUCT_DESC = new TStruct("BinaryAnnotation"); + private final TField KEY_FIELD_DESC = new TField("key", TType.STRING, (short) 1); + private final TField VALUE_FIELD_DESC = new TField("value", TType.STRING, (short) 2); + private final TField ANNOTATION_TYPE_FIELD_DESC = new TField("annotation_type", TType.I32, (short) 3); + private final TField HOST_FIELD_DESC = new TField("host", TType.STRUCT, (short) 4); @Override public BinaryAnnotation read(TProtocol iprot) throws TException { @@ -266,7 +263,7 @@ public BinaryAnnotation read(TProtocol iprot) throws TException { break; case 4: // HOST if (field.type == TType.STRUCT) { - result.endpoint(EndpointAdapter.INSTANCE.read(iprot)); + result.endpoint(ENDPOINT_ADAPTER.read(iprot)); } else { skip(iprot, field.type); } @@ -298,29 +295,27 @@ public void write(BinaryAnnotation value, TProtocol oprot) throws TException { if (value.endpoint != null) { oprot.writeFieldBegin(HOST_FIELD_DESC); - EndpointAdapter.INSTANCE.write(value.endpoint, oprot); + ENDPOINT_ADAPTER.write(value.endpoint, oprot); oprot.writeFieldEnd(); } oprot.writeFieldStop(); oprot.writeStructEnd(); } - } + }; - enum SpanAdapter implements ThriftAdapter { - INSTANCE; + static final ThriftAdapter SPAN_ADAPTER = new ThriftAdapter() { - private static final TStruct STRUCT_DESC = new TStruct("Span"); - - private static final TField TRACE_ID_FIELD_DESC = new TField("trace_id", TType.I64, (short) 1); - private static final TField NAME_FIELD_DESC = new TField("name", TType.STRING, (short) 3); - private static final TField ID_FIELD_DESC = new TField("id", TType.I64, (short) 4); - private static final TField PARENT_ID_FIELD_DESC = new TField("parent_id", TType.I64, (short) 5); - private static final TField ANNOTATIONS_FIELD_DESC = new TField("annotations", TType.LIST, (short) 6); - private static final TField BINARY_ANNOTATIONS_FIELD_DESC = new TField("binary_annotations", TType.LIST, (short) 8); - private static final TField DEBUG_FIELD_DESC = new TField("debug", TType.BOOL, (short) 9); - private static final TField TIMESTAMP_FIELD_DESC = new TField("timestamp", TType.I64, (short) 10); - private static final TField DURATION_FIELD_DESC = new TField("duration", TType.I64, (short) 11); + private final TStruct STRUCT_DESC = new TStruct("Span"); + private final TField TRACE_ID_FIELD_DESC = new TField("trace_id", TType.I64, (short) 1); + private final TField NAME_FIELD_DESC = new TField("name", TType.STRING, (short) 3); + private final TField ID_FIELD_DESC = new TField("id", TType.I64, (short) 4); + private final TField PARENT_ID_FIELD_DESC = new TField("parent_id", TType.I64, (short) 5); + private final TField ANNOTATIONS_FIELD_DESC = new TField("annotations", TType.LIST, (short) 6); + private final TField BINARY_ANNOTATIONS_FIELD_DESC = new TField("binary_annotations", TType.LIST, (short) 8); + private final TField DEBUG_FIELD_DESC = new TField("debug", TType.BOOL, (short) 9); + private final TField TIMESTAMP_FIELD_DESC = new TField("timestamp", TType.I64, (short) 10); + private final TField DURATION_FIELD_DESC = new TField("duration", TType.I64, (short) 11); @Override public Span read(TProtocol iprot) throws TException { @@ -365,7 +360,7 @@ public Span read(TProtocol iprot) throws TException { if (field.type == TType.LIST) { TList annotations = iprot.readListBegin(); for (int i = 0; i < annotations.size; i++) { - result.addAnnotation(AnnotationAdapter.INSTANCE.read(iprot)); + result.addAnnotation(ANNOTATION_ADAPTER.read(iprot)); } iprot.readListEnd(); } else { @@ -376,7 +371,7 @@ public Span read(TProtocol iprot) throws TException { if (field.type == TType.LIST) { TList binaryAnnotations = iprot.readListBegin(); for (int i = 0; i < binaryAnnotations.size; i++) { - result.addBinaryAnnotation(BinaryAnnotationAdapter.INSTANCE.read(iprot)); + result.addBinaryAnnotation(BINARY_ANNOTATION_ADAPTER.read(iprot)); } iprot.readListEnd(); } else { @@ -438,7 +433,7 @@ public void write(Span value, TProtocol oprot) throws TException { oprot.writeFieldBegin(ANNOTATIONS_FIELD_DESC); oprot.writeListBegin(new TList(TType.STRUCT, value.annotations.size())); for (int i = 0, length = value.annotations.size(); i < length; i++) { - AnnotationAdapter.INSTANCE.write(value.annotations.get(i), oprot); + ANNOTATION_ADAPTER.write(value.annotations.get(i), oprot); } oprot.writeListEnd(); oprot.writeFieldEnd(); @@ -446,7 +441,7 @@ public void write(Span value, TProtocol oprot) throws TException { oprot.writeFieldBegin(BINARY_ANNOTATIONS_FIELD_DESC); oprot.writeListBegin(new TList(TType.STRUCT, value.binaryAnnotations.size())); for (int i = 0, length = value.binaryAnnotations.size(); i < length; i++) { - BinaryAnnotationAdapter.INSTANCE.write(value.binaryAnnotations.get(i), oprot); + BINARY_ANNOTATION_ADAPTER.write(value.binaryAnnotations.get(i), oprot); } oprot.writeListEnd(); oprot.writeFieldEnd(); @@ -472,42 +467,17 @@ public void write(Span value, TProtocol oprot) throws TException { oprot.writeFieldStop(); oprot.writeStructEnd(); } - } - - enum SpanListAdapter implements ThriftAdapter> { - INSTANCE; - - @Override - public List read(TProtocol iprot) throws TException { - TList spans = iprot.readListBegin(); - if (spans.size > 10000) { // don't allocate massive arrays - throw new IllegalArgumentException(spans.size + " > 10000: possibly malformed thrift"); - } - List result = new ArrayList<>(spans.size); - for (int i = 0; i < spans.size; i++) { - result.add(SpanAdapter.INSTANCE.read(iprot)); - } - iprot.readListEnd(); - return result; - } + }; - @Override - public void write(List value, TProtocol oprot) throws TException { - oprot.writeListBegin(new TList(TType.STRUCT, value.size())); - for (int i = 0, length = value.size(); i < length; i++) { - SpanAdapter.INSTANCE.write(value.get(i), oprot); - } - oprot.writeListEnd(); - } - } + static final ThriftAdapter> SPANS_ADAPTER = new ListAdapter<>(SPAN_ADAPTER); + static final ThriftAdapter>> TRACES_ADAPTER = new ListAdapter<>(SPANS_ADAPTER); - enum DependencyLinkAdapter implements ThriftAdapter { - INSTANCE; + static final ThriftAdapter DEPENDENCY_LINK_ADAPTER = new ThriftAdapter() { - private static final TStruct STRUCT_DESC = new TStruct("DependencyLink"); - private static final TField PARENT_FIELD_DESC = new TField("parent", TType.STRING, (short) 1); - private static final TField CHILD_FIELD_DESC = new TField("child", TType.STRING, (short) 2); - private static final TField CALL_COUNT_FIELD_DESC = new TField("call_count", TType.I64, (short) 4); + private final TStruct STRUCT_DESC = new TStruct("DependencyLink"); + private final TField PARENT_FIELD_DESC = new TField("parent", TType.STRING, (short) 1); + private final TField CHILD_FIELD_DESC = new TField("child", TType.STRING, (short) 2); + private final TField CALL_COUNT_FIELD_DESC = new TField("call_count", TType.I64, (short) 4); @Override public DependencyLink read(TProtocol iprot) throws TException { @@ -569,40 +539,113 @@ public void write(DependencyLink value, TProtocol oprot) throws TException { oprot.writeFieldStop(); oprot.writeStructEnd(); } - } + }; + + static final ThriftAdapter> DEPENDENCY_LINKS_ADAPTER = new ListAdapter<>(DEPENDENCY_LINK_ADAPTER); @Override - public DependencyLink readDependencyLink(byte[] bytes) { - return read(DependencyLinkAdapter.INSTANCE, bytes); + public List readDependencyLinks(byte[] bytes) { + return read(DEPENDENCY_LINKS_ADAPTER, bytes); } @Override - public byte[] writeDependencyLink(DependencyLink value) { - return write(DependencyLinkAdapter.INSTANCE, value); + public byte[] writeDependencyLinks(List value) { + return write(DEPENDENCY_LINKS_ADAPTER, value); } - private static T read(ThriftAdapter adapter, byte[] bytes) { + private static T read(ThriftReader reader, byte[] bytes) { try { - return adapter.read(new TBinaryProtocol(new TMemoryInputTransport(bytes))); + return reader.read(new TBinaryProtocol(new TMemoryInputTransport(bytes))); } catch (Exception e) { if (LOGGER.isLoggable(FINEST)) { - LOGGER.log(FINEST, adapter + " could not read " + ByteString.of(bytes).base64(), e); + LOGGER.log(FINEST, "Could not read " + reader + " from TBinary " + ByteString.of(bytes).base64(), e); } return null; } } - private static byte[] write(ThriftAdapter adapter, T value) { - TMemoryBuffer transport = new TMemoryBuffer(0); + private static byte[] write(ThriftWriter writer, T value) { + BufferTransport transport = new BufferTransport(); TBinaryProtocol protocol = new TBinaryProtocol(transport); try { - adapter.write(value, protocol); + writer.write(value, protocol); } catch (Exception e) { if (LOGGER.isLoggable(FINEST)) { - LOGGER.log(FINEST, adapter + " could not write " + value, e); + LOGGER.log(FINEST, "Could not write " + value + " as TBinary", e); } return null; } - return transport.getArray(); + return transport.buffer.readByteArray(); + } + + private static List readList(ThriftReader reader, TProtocol iprot) throws TException { + TList spans = iprot.readListBegin(); + if (spans.size > 10000) { // don't allocate massive arrays + throw new IllegalArgumentException(spans.size + " > 10000: possibly malformed thrift"); + } + List result = new ArrayList<>(spans.size); + for (int i = 0; i < spans.size; i++) { + result.add(reader.read(iprot)); + } + iprot.readListEnd(); + return result; + } + + private static void writeList(ThriftWriter writer, List value, TProtocol oprot) throws TException { + oprot.writeListBegin(new TList(TType.STRUCT, value.size())); + for (int i = 0, length = value.size(); i < length; i++) { + writer.write(value.get(i), oprot); + } + oprot.writeListEnd(); + } + + private static final class ListAdapter implements ThriftAdapter> { + private final ThriftAdapter adapter; + + ListAdapter(ThriftAdapter adapter) { + this.adapter = adapter; + } + + @Override + public List read(TProtocol iprot) throws TException { + return readList(adapter, iprot); + } + + @Override + public void write(List value, TProtocol oprot) throws TException { + writeList(adapter, value, oprot); + } + + @Override + public String toString() { + return "List<" + adapter + ">"; + } + } + + private static final class BufferTransport extends TTransport { + final Buffer buffer = new Buffer(); + + @Override + public boolean isOpen() { + return true; + } + + @Override + public void open() { + } + + @Override + public void close() { + } + + @Override + public int read(byte[] buf, int off, int len) { + return buffer.read(buf, off, len); + } + + @Override + public void write(byte[] buf, int off, int len) { + buffer.write(buf, off, len); + } } } diff --git a/zipkin-java-core/src/main/java/io/zipkin/internal/Util.java b/zipkin-java-core/src/main/java/io/zipkin/internal/Util.java index 2b85687244f..20968552f40 100644 --- a/zipkin-java-core/src/main/java/io/zipkin/internal/Util.java +++ b/zipkin-java-core/src/main/java/io/zipkin/internal/Util.java @@ -18,12 +18,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import okio.Buffer; public final class Util { public static final Charset UTF_8 = Charset.forName("UTF-8"); @@ -95,27 +93,6 @@ public static List merge(Collection spans) { return result; } - /** Only here as this is a Java 7 module. */ - public interface Serializer { - byte[] apply(T in); - } - - public static Serializer> writeJsonList(final Serializer elementWriter) { - return new Serializer>() { - @Override - public byte[] apply(List input) { - Buffer buffer = new Buffer(); - buffer.writeUtf8CodePoint('['); - for (Iterator i = input.iterator(); i.hasNext(); ) { - buffer.write(elementWriter.apply(i.next())); - if (i.hasNext()) buffer.writeUtf8CodePoint(','); - } - buffer.writeUtf8CodePoint(']'); - return buffer.readByteArray(); - } - }; - } - private Util() { } } diff --git a/zipkin-java-core/src/test/java/io/zipkin/CodecTest.java b/zipkin-java-core/src/test/java/io/zipkin/CodecTest.java index f7759b696b4..547b16f3fba 100644 --- a/zipkin-java-core/src/test/java/io/zipkin/CodecTest.java +++ b/zipkin-java-core/src/test/java/io/zipkin/CodecTest.java @@ -14,29 +14,16 @@ package io.zipkin; import java.io.IOException; +import java.util.List; import org.junit.Test; +import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; public abstract class CodecTest { protected abstract Codec codec(); - @Test - public void spanRoundTrip() throws IOException { - for (Span span : TestObjects.TRACE) { - byte[] bytes = codec().writeSpan(span); - assertThat(codec().readSpan(bytes)) - .isEqualTo(span); - } - } - - @Test - public void spanDecodesToNullOnEmpty() throws IOException { - assertThat(codec().readSpan(new byte[0])) - .isNull(); - } - @Test public void spansRoundTrip() throws IOException { byte[] bytes = codec().writeSpans(TestObjects.TRACE); @@ -44,6 +31,13 @@ public void spansRoundTrip() throws IOException { .isEqualTo(TestObjects.TRACE); } + @Test + public void writeTraces() throws IOException { + byte[] bytes = codec().writeTraces(asList(TestObjects.TRACE, TestObjects.TRACE)); + assertThat(codec().writeSpans(TestObjects.TRACE).length * 2) + .isLessThan(bytes.length); + } + @Test public void spansDecodeToNullOnEmpty() throws IOException { assertThat(codec().readSpans(new byte[0])) @@ -60,16 +54,19 @@ public void spansDecodeToNullOnMalformed() throws IOException { } @Test - public void dependencyLinkRoundTrip() throws IOException { - DependencyLink link = DependencyLink.create("tfe", "mobileweb", 6); - byte[] bytes = codec().writeDependencyLink(link); - assertThat(codec().readDependencyLink(bytes)) - .isEqualTo(link); + public void dependencyLinksRoundTrip() throws IOException { + List links = asList( + DependencyLink.create("foo", "bar", 2), + DependencyLink.create("bar", "baz", 3) + ); + byte[] bytes = codec().writeDependencyLinks(links); + assertThat(codec().readDependencyLinks(bytes)) + .isEqualTo(links); } @Test - public void dependencyLinkDecodesToNullOnEmpty() throws IOException { - assertThat(codec().readDependencyLink(new byte[0])) + public void dependencyLinksDecodeToNullOnEmpty() throws IOException { + assertThat(codec().readDependencyLinks(new byte[0])) .isNull(); } } diff --git a/zipkin-java-core/src/test/java/io/zipkin/internal/UtilTest.java b/zipkin-java-core/src/test/java/io/zipkin/internal/UtilTest.java index 7caf51f0923..1e35253e0ea 100644 --- a/zipkin-java-core/src/test/java/io/zipkin/internal/UtilTest.java +++ b/zipkin-java-core/src/test/java/io/zipkin/internal/UtilTest.java @@ -13,15 +13,9 @@ */ package io.zipkin.internal; -import io.zipkin.internal.Util.Serializer; -import java.util.Arrays; -import java.util.List; import org.junit.Test; -import static io.zipkin.internal.Util.UTF_8; import static io.zipkin.internal.Util.equal; -import static io.zipkin.internal.Util.writeJsonList; -import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -34,29 +28,4 @@ public void equalTest() { assertFalse(equal("1", null)); assertFalse(equal("1", "2")); } - - Serializer> intsSerializer = writeJsonList(new Serializer() { - @Override - public byte[] apply(Integer in) { - return Integer.toString(in).getBytes(UTF_8); - } - }); - - @Test - public void writeJsonList_empty() { - byte[] bytes = intsSerializer.apply(Arrays.asList()); - assertThat(new String(bytes, UTF_8)).isEqualTo("[]"); - } - - @Test - public void writeJsonList_one() { - byte[] bytes = intsSerializer.apply(Arrays.asList(1)); - assertThat(new String(bytes, UTF_8)).isEqualTo("[1]"); - } - - @Test - public void writeJsonList_two() { - byte[] bytes = intsSerializer.apply(Arrays.asList(1, 2)); - assertThat(new String(bytes, UTF_8)).isEqualTo("[1,2]"); - } } diff --git a/zipkin-java-interop/src/main/java/io/zipkin/interop/ScalaSpanStoreAdapter.java b/zipkin-java-interop/src/main/java/io/zipkin/interop/ScalaSpanStoreAdapter.java index 3209812e914..3b05c967f8f 100644 --- a/zipkin-java-interop/src/main/java/io/zipkin/interop/ScalaSpanStoreAdapter.java +++ b/zipkin-java-interop/src/main/java/io/zipkin/interop/ScalaSpanStoreAdapter.java @@ -14,6 +14,7 @@ package io.zipkin.interop; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.twitter.util.Future; import com.twitter.zipkin.common.Span; @@ -25,6 +26,7 @@ import io.zipkin.internal.Nullable; import java.io.IOException; import java.util.ArrayList; +import java.util.stream.Collectors; import scala.Tuple2; import scala.collection.Iterator; import scala.collection.JavaConversions; @@ -81,13 +83,7 @@ public Future>> getTracesByIds(Seq input) { static Future>> toSeqFuture(java.util.List> traces) { ArrayList> result = new ArrayList<>(traces.size()); for (java.util.List trace : traces) { - ArrayList spans = new ArrayList<>(trace.size()); - for (io.zipkin.Span span : trace) { - Span converted = convert(span); - if (converted != null) { - spans.add(converted); - } - } + java.util.List spans = convert(trace); result.add(JavaConversions.asScalaBuffer(spans).toList()); } return Future.value(JavaConversions.asScalaBuffer(result)); @@ -105,12 +101,7 @@ public Future> getSpanNames(String service) { @Override public Future apply(Seq input) { - java.util.List spans = JavaConversions.asJavaCollection(input).stream() - .map(ScalaSpanStoreAdapter::invert) - .filter(i -> i != null) - .collect(toList()); - - this.spanStore.accept(spans); + this.spanStore.accept(ScalaSpanStoreAdapter.invert(input)); return Future.Unit(); } @@ -120,10 +111,12 @@ public void close() { } @Nullable - static Span convert(io.zipkin.Span input) { - byte[] bytes = Codec.JSON.writeSpan(input); + static java.util.List convert(java.util.List input) { + byte[] bytes = Codec.JSON.writeSpans(input); try { - return JsonSpan.invert(scalaCodec.readValue(bytes, JsonSpan.class)); + TypeReference> ref = new TypeReference>(){}; + java.util.List read = scalaCodec.readValue(bytes, ref); + return read.stream().map(JsonSpan::invert).collect(Collectors.toList()); } catch (IOException e) { e.printStackTrace(); return null; @@ -131,10 +124,10 @@ static Span convert(io.zipkin.Span input) { } @Nullable - static io.zipkin.Span invert(Span input) { + static java.util.List invert(Seq input) { try { byte[] bytes = scalaCodec.writeValueAsBytes(input); - return Codec.JSON.readSpan(bytes); + return Codec.JSON.readSpans(bytes); } catch (JsonProcessingException e) { e.printStackTrace(); return null; diff --git a/zipkin-java-server/src/main/java/io/zipkin/server/ZipkinQueryApiV1.java b/zipkin-java-server/src/main/java/io/zipkin/server/ZipkinQueryApiV1.java index dbb540694e3..defb82aa36e 100644 --- a/zipkin-java-server/src/main/java/io/zipkin/server/ZipkinQueryApiV1.java +++ b/zipkin-java-server/src/main/java/io/zipkin/server/ZipkinQueryApiV1.java @@ -14,11 +14,9 @@ package io.zipkin.server; import io.zipkin.Codec; -import io.zipkin.DependencyLink; import io.zipkin.QueryRequest; import io.zipkin.Span; import io.zipkin.SpanStore; -import io.zipkin.internal.Util.Serializer; import java.util.Collections; import java.util.List; import okio.Buffer; @@ -33,7 +31,7 @@ import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; -import static io.zipkin.internal.Util.writeJsonList; +import static io.zipkin.internal.Util.checkNotNull; import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE; /** @@ -45,13 +43,16 @@ @RequestMapping("/api/v1") public class ZipkinQueryApiV1 { - static final Serializer>> TRACES_TO_JSON = writeJsonList(Codec.JSON::writeSpans); - static final Serializer> DEPENDENCY_LINKS_TO_JSON = writeJsonList(Codec.JSON::writeDependencyLink); - private static final String THRIFT = "application/x-thrift"; + private static final String APPLICATION_THRIFT = "application/x-thrift"; + private static final String DEFAULT_LOOKBACK = "86400000"; // 7 days in millis - private SpanStore spanStore; - private ZipkinSpanWriter spanWriter; - private final static String DEFAULT_LOOKBACK = "86400000"; // 7 days in millis + @Autowired(required = false) + Codec.Factory codecFactory = Codec.FACTORY; + Codec jsonCodec = checkNotNull(codecFactory.get(APPLICATION_JSON_VALUE), APPLICATION_JSON_VALUE); + Codec thriftCodec = checkNotNull(codecFactory.get(APPLICATION_THRIFT), APPLICATION_THRIFT); + + private final SpanStore spanStore; + private final ZipkinSpanWriter spanWriter; @Autowired public ZipkinQueryApiV1(SpanStore spanStore, ZipkinSpanWriter spanWriter) { @@ -62,7 +63,7 @@ public ZipkinQueryApiV1(SpanStore spanStore, ZipkinSpanWriter spanWriter) { @RequestMapping(value = "/dependencies", method = RequestMethod.GET, produces = APPLICATION_JSON_VALUE) public byte[] getDependencies(@RequestParam(value = "endTs", required = true) long endTs, @RequestParam(value = "lookback", required = false, defaultValue = DEFAULT_LOOKBACK) long lookback) { - return DEPENDENCY_LINKS_TO_JSON.apply(this.spanStore.getDependencies(endTs, lookback)); + return this.jsonCodec.writeDependencyLinks(this.spanStore.getDependencies(endTs, lookback)); } @RequestMapping(value = "/services", method = RequestMethod.GET) @@ -79,16 +80,16 @@ public List getSpanNames( @RequestMapping(value = "/spans", method = RequestMethod.POST) @ResponseStatus(HttpStatus.ACCEPTED) public void uploadSpansJson(@RequestBody byte[] body) { - List spans = Codec.JSON.readSpans(body); + List spans = this.jsonCodec.readSpans(body); if (spans == null) throw new MalformedSpansException(APPLICATION_JSON_VALUE); this.spanWriter.write(this.spanStore, spans); } - @RequestMapping(value = "/spans", method = RequestMethod.POST, consumes = THRIFT) + @RequestMapping(value = "/spans", method = RequestMethod.POST, consumes = APPLICATION_THRIFT) @ResponseStatus(HttpStatus.ACCEPTED) public void uploadSpansThrift(@RequestBody byte[] body) { - List spans = Codec.THRIFT.readSpans(body); - if (spans == null) throw new MalformedSpansException(THRIFT); + List spans = this.thriftCodec.readSpans(body); + if (spans == null) throw new MalformedSpansException(APPLICATION_THRIFT); this.spanWriter.write(this.spanStore, spans); } @@ -124,7 +125,7 @@ public byte[] getTraces( } } } - return TRACES_TO_JSON.apply(this.spanStore.getTraces(builder.build())); + return this.jsonCodec.writeTraces(this.spanStore.getTraces(builder.build())); } @RequestMapping(value = "/trace/{traceId}", method = RequestMethod.GET, produces = APPLICATION_JSON_VALUE) @@ -136,7 +137,7 @@ public byte[] getTrace(@PathVariable String traceId) { if (traces.isEmpty()) { throw new TraceNotFoundException(traceId, id); } - return Codec.JSON.writeSpans(traces.get(0)); + return this.jsonCodec.writeSpans(traces.get(0)); } @ExceptionHandler(TraceNotFoundException.class)