From be26b6d98ab57c4f72faf97c9ffdcfc256f4fe3b Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Tue, 1 Mar 2016 12:37:06 +0800 Subject: [PATCH] Adds codec methods needed for Cassandra compatibility This adds thrift read methods that accept ByteBuffers. This is in support of the underlying DataStax Cassandra driver, which returns data in ByteBuffers. The implementation takes care not to re-buffer input data. This also introduces an internal type, Dependencies, which is only used in cassandra: https://github.com/openzipkin/zipkin/issues/1008 See #89 --- .../src/main/java/zipkin/DependencyLink.java | 2 +- .../java/zipkin/internal/Dependencies.java | 150 ++++++++++++ .../java/zipkin/internal/ThriftCodec.java | 225 ++++++++++-------- .../zipkin/internal/DependenciesTest.java | 38 +++ .../java/zipkin/internal/ThriftCodecTest.java | 34 ++- 5 files changed, 347 insertions(+), 102 deletions(-) create mode 100644 zipkin/src/main/java/zipkin/internal/Dependencies.java create mode 100644 zipkin/src/test/java/zipkin/internal/DependenciesTest.java diff --git a/zipkin/src/main/java/zipkin/DependencyLink.java b/zipkin/src/main/java/zipkin/DependencyLink.java index ad1a2c8fafd..e0fb1d42cc7 100644 --- a/zipkin/src/main/java/zipkin/DependencyLink.java +++ b/zipkin/src/main/java/zipkin/DependencyLink.java @@ -29,7 +29,7 @@ public static DependencyLink create(String parent, String child, long callCount) /** child service name (callee) */ public final String child; - /** calls made during the duration (in microseconds) of this link */ + /** calls made during the duration (in milliseconds) of this link */ public final long callCount; DependencyLink(String parent, String child, long callCount) { diff --git a/zipkin/src/main/java/zipkin/internal/Dependencies.java b/zipkin/src/main/java/zipkin/internal/Dependencies.java new file mode 100644 index 00000000000..c9b156ad603 --- /dev/null +++ b/zipkin/src/main/java/zipkin/internal/Dependencies.java @@ -0,0 +1,150 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.internal; + +import java.nio.ByteBuffer; +import java.util.List; +import okio.Buffer; +import zipkin.DependencyLink; + +import static zipkin.internal.ThriftCodec.DEPENDENCY_LINKS_ADAPTER; +import static zipkin.internal.ThriftCodec.Field; +import static zipkin.internal.ThriftCodec.TYPE_I64; +import static zipkin.internal.ThriftCodec.TYPE_LIST; +import static zipkin.internal.ThriftCodec.TYPE_STOP; +import static zipkin.internal.ThriftCodec.ThriftAdapter; +import static zipkin.internal.ThriftCodec.read; +import static zipkin.internal.ThriftCodec.skip; +import static zipkin.internal.ThriftCodec.write; +import static zipkin.internal.Util.checkNotNull; + +/** + * Internal as only cassandra serializes the start and end timestamps along with link data, and + * those serialized timestamps are never read. + * + * @deprecated See https://github.com/openzipkin/zipkin/issues/1008 + */ +@Deprecated +public final class Dependencies { + + /** Reads from bytes serialized in TBinaryProtocol */ + public static Dependencies fromThrift(ByteBuffer bytes) { + return read(THRIFT_ADAPTER, bytes); + } + + /** Writes the current instance in TBinaryProtocol */ + public ByteBuffer toThrift() { + return ByteBuffer.wrap(write(THRIFT_ADAPTER, this)); + } + + public static Dependencies create(long startTs, long endTs, List links) { + return new Dependencies(startTs, endTs, links); + } + + /** milliseconds from epoch */ + public final long startTs; + + /** milliseconds from epoch) */ + public final long endTs; + + /** link information for every dependent service */ + public final List links; + + Dependencies(long startTs, long endTs, List links) { + this.startTs = startTs; + this.endTs = endTs; + this.links = checkNotNull(links, "links"); + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (o instanceof Dependencies) { + Dependencies that = (Dependencies) o; + return (this.startTs == that.startTs) + && (this.endTs == that.endTs) + && (this.links.equals(that.links)); + } + return false; + } + + @Override + public int hashCode() { + int h = 1; + h *= 1000003; + h ^= (startTs >>> 32) ^ startTs; + h *= 1000003; + h ^= (endTs >>> 32) ^ endTs; + h *= 1000003; + h ^= links.hashCode(); + return h; + } + + /** @deprecated See https://github.com/openzipkin/zipkin/issues/1008 */ + @Deprecated + static final ThriftAdapter THRIFT_ADAPTER = new ThriftAdapter() { + + final Field START_TS = new Field(TYPE_I64, 1); + final Field END_TS = new Field(TYPE_I64, 2); + final Field LINKS = new Field(TYPE_LIST, 3); + + @Override + public Dependencies read(ByteBuffer bytes) { + long startTs = 0L; + long endTs = 0L; + List links = null; + + Field field; + + while (true) { + field = Field.read(bytes); + if (field.type == TYPE_STOP) break; + + if (field.equals(START_TS)) { + startTs = bytes.getLong(); + } else if (field.equals(END_TS)) { + endTs = bytes.getLong(); + } else if (field.equals(LINKS)) { + links = DEPENDENCY_LINKS_ADAPTER.read(bytes); + } else { + skip(bytes, field.type); + } + } + + return Dependencies.create(startTs, endTs, links); + } + + @Override + public void write(Dependencies value, Buffer buffer) { + + START_TS.write(buffer); + buffer.writeLong(value.startTs); + + END_TS.write(buffer); + buffer.writeLong(value.endTs); + + LINKS.write(buffer); + DEPENDENCY_LINKS_ADAPTER.write(value.links, buffer); + + buffer.writeByte(TYPE_STOP); + } + + @Override + public String toString() { + return "Dependencies"; + } + }; +} diff --git a/zipkin/src/main/java/zipkin/internal/ThriftCodec.java b/zipkin/src/main/java/zipkin/internal/ThriftCodec.java index 728858130d4..2dc50433db9 100644 --- a/zipkin/src/main/java/zipkin/internal/ThriftCodec.java +++ b/zipkin/src/main/java/zipkin/internal/ThriftCodec.java @@ -14,10 +14,11 @@ package zipkin.internal; import java.io.EOFException; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import okio.Buffer; -import okio.ByteString; import zipkin.Annotation; import zipkin.BinaryAnnotation; import zipkin.Codec; @@ -25,6 +26,7 @@ import zipkin.Endpoint; import zipkin.Span; +import static zipkin.internal.Util.UTF_8; import static zipkin.internal.Util.checkArgument; /** @@ -32,8 +34,8 @@ * core jar. The hard coding not only keeps us with a single data-model, it also allows the minified * core jar free of SLFJ classes otherwise included in generated types. * - *

This is an Okio-native TBinaryProtocol codec. Natively doing this reduces dependencies and - * array duplication. + *

This directly implements TBinaryProtocol so as to reduce dependencies and array duplication. + * While reads internally use {@link ByteBuffer}, writes use {@link Buffer} as the latter can grow. */ public final class ThriftCodec implements Codec { // break vs decode huge structs, like > 1MB strings or 10k spans in a trace. @@ -43,22 +45,32 @@ public final class ThriftCodec implements Codec { private static int MAX_SKIP_DEPTH = 2147483647; // taken from org.apache.thrift.protocol.TType - private static final byte TYPE_STOP = 0; - private static final byte TYPE_BOOL = 2; - private static final byte TYPE_BYTE = 3; - private static final byte TYPE_DOUBLE = 4; - private static final byte TYPE_I16 = 6; - private static final byte TYPE_I32 = 8; - private static final byte TYPE_I64 = 10; - private static final byte TYPE_STRING = 11; - private static final byte TYPE_STRUCT = 12; - private static final byte TYPE_MAP = 13; - private static final byte TYPE_SET = 14; - private static final byte TYPE_LIST = 15; + static final byte TYPE_STOP = 0; + static final byte TYPE_BOOL = 2; + static final byte TYPE_BYTE = 3; + static final byte TYPE_DOUBLE = 4; + static final byte TYPE_I16 = 6; + static final byte TYPE_I32 = 8; + static final byte TYPE_I64 = 10; + static final byte TYPE_STRING = 11; + static final byte TYPE_STRUCT = 12; + static final byte TYPE_MAP = 13; + static final byte TYPE_SET = 14; + static final byte TYPE_LIST = 15; + + /** + * Added for DataStax Cassandra driver, which returns data in ByteBuffers. The implementation + * takes care not to re-buffer the data. + * + * @throws {@linkplain IllegalArgumentException} if the span couldn't be decoded + */ + public Span readSpan(ByteBuffer bytes) { + return read(SPAN_ADAPTER, bytes); + } @Override public Span readSpan(byte[] bytes) { - return read(SPAN_ADAPTER, bytes); + return read(SPAN_ADAPTER, ByteBuffer.wrap(bytes)); } @Override @@ -68,7 +80,7 @@ public byte[] writeSpan(Span value) { @Override public List readSpans(byte[] bytes) { - return read(SPANS_ADAPTER, bytes); + return read(SPANS_ADAPTER, ByteBuffer.wrap(bytes)); } @Override @@ -86,7 +98,7 @@ interface ThriftWriter { } interface ThriftReader { - T read(Buffer buffer) throws EOFException; + T read(ByteBuffer bytes); } interface ThriftAdapter extends ThriftReader, ThriftWriter { @@ -99,22 +111,22 @@ interface ThriftAdapter extends ThriftReader, ThriftWriter { final Field SERVICE_NAME = new Field(TYPE_STRING, 3); @Override - public Endpoint read(Buffer buffer) throws EOFException { + public Endpoint read(ByteBuffer bytes) { Endpoint.Builder result = new Endpoint.Builder(); Field field; while (true) { - field = Field.read(buffer); + field = Field.read(bytes); if (field.type == TYPE_STOP) break; if (field.equals(IPV4)) { - result.ipv4(buffer.readInt()); + result.ipv4(bytes.getInt()); } else if (field.equals(PORT)) { - result.port(buffer.readShort()); + result.port(bytes.getShort()); } else if (field.equals(SERVICE_NAME)) { - result.serviceName(readUtf8(buffer)); + result.serviceName(readUtf8(bytes)); } else { - skip(buffer, field.type); + skip(bytes, field.type); } } return result.build(); @@ -142,21 +154,21 @@ public void write(Endpoint value, Buffer buffer) { final Field ENDPOINT = new Field(TYPE_STRUCT, 3); @Override - public Annotation read(Buffer buffer) throws EOFException { + public Annotation read(ByteBuffer bytes) { Annotation.Builder result = new Annotation.Builder(); Field field; while (true) { - field = Field.read(buffer); + field = Field.read(bytes); if (field.type == TYPE_STOP) break; if (field.equals(TIMESTAMP)) { - result.timestamp(buffer.readLong()); + result.timestamp(bytes.getLong()); } else if (field.equals(VALUE)) { - result.value(readUtf8(buffer)); + result.value(readUtf8(bytes)); } else if (field.equals(ENDPOINT)) { - result.endpoint(ENDPOINT_ADAPTER.read(buffer)); + result.endpoint(ENDPOINT_ADAPTER.read(bytes)); } else { - skip(buffer, field.type); + skip(bytes, field.type); } } return result.build(); @@ -188,24 +200,24 @@ public void write(Annotation value, Buffer buffer) { final Field ENDPOINT = new Field(TYPE_STRUCT, 4); @Override - public BinaryAnnotation read(Buffer buffer) throws EOFException { + public BinaryAnnotation read(ByteBuffer bytes) { BinaryAnnotation.Builder result = new BinaryAnnotation.Builder(); Field field; while (true) { - field = Field.read(buffer); + field = Field.read(bytes); if (field.type == TYPE_STOP) break; if (field.equals(KEY)) { - result.key(readUtf8(buffer)); + result.key(readUtf8(bytes)); } else if (field.equals(VALUE)) { - result.value(readBytes(buffer)); + result.value(readByteArray(bytes)); } else if (field.equals(TYPE)) { - result.type(BinaryAnnotation.Type.fromValue(buffer.readInt())); + result.type(BinaryAnnotation.Type.fromValue(bytes.getInt())); } else if (field.equals(ENDPOINT)) { - result.endpoint(ENDPOINT_ADAPTER.read(buffer)); + result.endpoint(ENDPOINT_ADAPTER.read(bytes)); } else { - skip(buffer, field.type); + skip(bytes, field.type); } } return result.build(); @@ -248,34 +260,34 @@ public void write(BinaryAnnotation value, Buffer buffer) { final Field DURATION = new Field(TYPE_I64, 11); @Override - public Span read(Buffer buffer) throws EOFException { + public Span read(ByteBuffer bytes) { Span.Builder result = new Span.Builder(); Field field; while (true) { - field = Field.read(buffer); + field = Field.read(bytes); if (field.type == TYPE_STOP) break; if (field.equals(TRACE_ID)) { - result.traceId(buffer.readLong()); + result.traceId(bytes.getLong()); } else if (field.equals(NAME)) { - result.name(readUtf8(buffer)); + result.name(readUtf8(bytes)); } else if (field.equals(ID)) { - result.id(buffer.readLong()); + result.id(bytes.getLong()); } else if (field.equals(PARENT_ID)) { - result.parentId(buffer.readLong()); + result.parentId(bytes.getLong()); } else if (field.equals(ANNOTATIONS)) { - result.annotations(ANNOTATIONS_ADAPTER.read(buffer)); + result.annotations(ANNOTATIONS_ADAPTER.read(bytes)); } else if (field.equals(BINARY_ANNOTATIONS)) { - result.binaryAnnotations(BINARY_ANNOTATIONS_ADAPTER.read(buffer)); + result.binaryAnnotations(BINARY_ANNOTATIONS_ADAPTER.read(bytes)); } else if (field.equals(DEBUG)) { - result.debug(buffer.readByte() == 1); + result.debug(bytes.get() == 1); } else if (field.equals(TIMESTAMP)) { - result.timestamp(buffer.readLong()); + result.timestamp(bytes.getLong()); } else if (field.equals(DURATION)) { - result.duration(buffer.readLong()); + result.duration(bytes.getLong()); } else { - skip(buffer, field.type); + skip(bytes, field.type); } } @@ -339,22 +351,22 @@ public String toString() { final Field CALL_COUNT = new Field(TYPE_I64, 4); @Override - public DependencyLink read(Buffer buffer) throws EOFException { + public DependencyLink read(ByteBuffer bytes) { DependencyLink.Builder result = new DependencyLink.Builder(); Field field; while (true) { - field = Field.read(buffer); + field = Field.read(bytes); if (field.type == TYPE_STOP) break; if (field.equals(PARENT)) { - result.parent(readUtf8(buffer)); + result.parent(readUtf8(bytes)); } else if (field.equals(CHILD)) { - result.child(readUtf8(buffer)); + result.child(readUtf8(bytes)); } else if (field.equals(CALL_COUNT)) { - result.callCount(buffer.readLong()); + result.callCount(bytes.getLong()); } else { - skip(buffer, field.type); + skip(bytes, field.type); } } @@ -383,9 +395,19 @@ public String toString() { static final ThriftAdapter> DEPENDENCY_LINKS_ADAPTER = new ListAdapter<>(DEPENDENCY_LINK_ADAPTER); + /** + * Added for DataStax Cassandra driver, which returns data in ByteBuffers. The implementation + * takes care not to re-buffer the data. + * + * @throws {@linkplain IllegalArgumentException} if the links couldn't be decoded + */ + public List readDependencyLinks(ByteBuffer bytes) { + return read(DEPENDENCY_LINKS_ADAPTER, bytes); + } + @Override public List readDependencyLinks(byte[] bytes) { - return read(DEPENDENCY_LINKS_ADAPTER, bytes); + return read(DEPENDENCY_LINKS_ADAPTER, ByteBuffer.wrap(bytes)); } @Override @@ -393,11 +415,11 @@ public byte[] writeDependencyLinks(List value) { return write(DEPENDENCY_LINKS_ADAPTER, value); } - static T read(ThriftReader reader, byte[] bytes) { - checkArgument(bytes.length > 0, "Empty input reading %s", reader); + static T read(ThriftReader reader, ByteBuffer bytes) { + checkArgument(bytes.remaining() > 0, "Empty input reading %s", reader); try { - return reader.read(new Buffer().write(bytes)); - } catch (EOFException | RuntimeException e) { + return reader.read(bytes); + } catch (RuntimeException e) { throw exceptionReading(reader.toString(), bytes, e); } } @@ -413,12 +435,12 @@ static byte[] write(ThriftWriter writer, T value) { return buffer.readByteArray(); } - static List readList(ThriftReader reader, Buffer buffer) throws EOFException { - byte ignoredType = buffer.readByte(); - int length = guardLength(buffer, CONTAINER_LENGTH_LIMIT); + static List readList(ThriftReader reader, ByteBuffer bytes) { + byte ignoredType = bytes.get(); + int length = guardLength(bytes, CONTAINER_LENGTH_LIMIT); List result = new ArrayList<>(length); for (int i = 0; i < length; i++) { - result.add(reader.read(buffer)); + result.add(reader.read(bytes)); } return result; } @@ -438,8 +460,8 @@ static final class ListAdapter implements ThriftAdapter> { } @Override - public List read(Buffer buffer) throws EOFException { - return readList(adapter, buffer); + public List read(ByteBuffer bytes) { + return readList(adapter, bytes); } @Override @@ -453,12 +475,11 @@ public String toString() { } } - static IllegalArgumentException exceptionReading(String type, byte[] bytes, Exception e) { + static IllegalArgumentException exceptionReading(String type, ByteBuffer bytes, Exception e) { String cause = e.getMessage() == null ? "Error" : e.getMessage(); if (e instanceof EOFException) cause = "EOF"; - if (e instanceof IllegalStateException) cause = "Malformed"; - String message = - String.format("%s reading %s from TBinary: %s", cause, type, ByteString.of(bytes).base64()); + if (e instanceof IllegalStateException || e instanceof BufferUnderflowException) cause = "Malformed"; + String message = String.format("%s reading %s from TBinary: ", cause, type, bytes); throw new IllegalArgumentException(message, e); } @@ -476,9 +497,9 @@ void write(Buffer buffer) { buffer.writeShort(id); } - static Field read(Buffer buffer) { - byte type = buffer.readByte(); - return new Field(type, type == TYPE_STOP ? TYPE_STOP : buffer.readShort()); + static Field read(ByteBuffer bytes) { + byte type = bytes.get(); + return new Field(type, type == TYPE_STOP ? TYPE_STOP : bytes.getShort()); } boolean equals(Field that) { @@ -486,50 +507,50 @@ boolean equals(Field that) { } } - static void skip(Buffer buffer, byte type) throws EOFException { - skip(buffer, type, MAX_SKIP_DEPTH); + static void skip(ByteBuffer bytes, byte type) { + skip(bytes, type, MAX_SKIP_DEPTH); } - static void skip(Buffer buffer, byte type, int maxDepth) throws EOFException { - if (maxDepth <= 0) throw new EOFException("Maximum skip depth exceeded"); + static void skip(ByteBuffer bytes, byte type, int maxDepth) { + if (maxDepth <= 0) throw new IllegalStateException("Maximum skip depth exceeded"); switch (type) { case TYPE_BOOL: case TYPE_BYTE: - buffer.skip(1); + skip(bytes, 1); break; case TYPE_I16: - buffer.skip(2); + skip(bytes, 2); break; case TYPE_I32: - buffer.skip(4); + skip(bytes, 4); break; case TYPE_DOUBLE: case TYPE_I64: - buffer.skip(8); + skip(bytes, 8); break; case TYPE_STRING: - int size = guardLength(buffer, STRING_LENGTH_LIMIT); - buffer.skip(size); + int size = guardLength(bytes, STRING_LENGTH_LIMIT); + skip(bytes, size); break; case TYPE_STRUCT: while (true) { - Field field = Field.read(buffer); + Field field = Field.read(bytes); if (field.type == TYPE_STOP) return; - skip(buffer, field.type, maxDepth - 1); + skip(bytes, field.type, maxDepth - 1); } case TYPE_MAP: - byte keyType = buffer.readByte(); - byte valueType = buffer.readByte(); - for (int i = 0, length = guardLength(buffer, CONTAINER_LENGTH_LIMIT); i < length; i++) { - skip(buffer, keyType, maxDepth - 1); - skip(buffer, valueType, maxDepth - 1); + byte keyType = bytes.get(); + byte valueType = bytes.get(); + for (int i = 0, length = guardLength(bytes, CONTAINER_LENGTH_LIMIT); i < length; i++) { + skip(bytes, keyType, maxDepth - 1); + skip(bytes, valueType, maxDepth - 1); } break; case TYPE_SET: case TYPE_LIST: - byte elemType = buffer.readByte(); - for (int i = 0, length = guardLength(buffer, CONTAINER_LENGTH_LIMIT); i < length; i++) { - skip(buffer, elemType, maxDepth - 1); + byte elemType = bytes.get(); + for (int i = 0, length = guardLength(bytes, CONTAINER_LENGTH_LIMIT); i < length; i++) { + skip(bytes, elemType, maxDepth - 1); } break; default: // types that don't need explicit skipping @@ -537,16 +558,22 @@ static void skip(Buffer buffer, byte type, int maxDepth) throws EOFException { } } - static byte[] readBytes(Buffer buffer) throws EOFException { - return buffer.readByteArray(guardLength(buffer, STRING_LENGTH_LIMIT)); + static void skip(ByteBuffer bytes, int count) { + bytes.position(bytes.position() + count); + } + + static byte[] readByteArray(ByteBuffer bytes) { + byte[] result = new byte[guardLength(bytes, STRING_LENGTH_LIMIT)]; + bytes.get(result); + return result; } - static String readUtf8(Buffer buffer) throws EOFException { - return buffer.readUtf8(guardLength(buffer, STRING_LENGTH_LIMIT)); + static String readUtf8(ByteBuffer bytes) { + return new String(readByteArray(bytes), UTF_8); } - static int guardLength(Buffer buffer, int limit) { - int length = buffer.readInt(); + static int guardLength(ByteBuffer bytes, int limit) { + int length = bytes.getInt(); if (length > limit) { // don't allocate massive arrays throw new IllegalStateException(length + " > " + limit + ": possibly malformed thrift"); } diff --git a/zipkin/src/test/java/zipkin/internal/DependenciesTest.java b/zipkin/src/test/java/zipkin/internal/DependenciesTest.java new file mode 100644 index 00000000000..787a92c85ca --- /dev/null +++ b/zipkin/src/test/java/zipkin/internal/DependenciesTest.java @@ -0,0 +1,38 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.internal; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.junit.Test; +import zipkin.DependencyLink; + +import static java.util.Arrays.asList; +import static org.assertj.core.api.Assertions.assertThat; + +public final class DependenciesTest { + @Test + public void dependenciesRoundTrip() throws IOException { + Dependencies dependencies = Dependencies.create(1L, 2L, asList( + DependencyLink.create("foo", "bar", 2), + DependencyLink.create("bar", "baz", 3) + )); + + ByteBuffer bytes = dependencies.toThrift(); + assertThat(Dependencies.fromThrift(bytes)) + .isEqualTo(dependencies); + + assertThat(bytes.remaining()).isZero(); + } +} diff --git a/zipkin/src/test/java/zipkin/internal/ThriftCodecTest.java b/zipkin/src/test/java/zipkin/internal/ThriftCodecTest.java index b80bf758d77..ff4692a8abb 100644 --- a/zipkin/src/test/java/zipkin/internal/ThriftCodecTest.java +++ b/zipkin/src/test/java/zipkin/internal/ThriftCodecTest.java @@ -13,14 +13,44 @@ */ package zipkin.internal; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import org.junit.Test; import zipkin.Codec; import zipkin.CodecTest; +import zipkin.DependencyLink; +import zipkin.Span; +import zipkin.TestObjects; + +import static java.util.Arrays.asList; +import static org.assertj.core.api.Assertions.assertThat; public final class ThriftCodecTest extends CodecTest { - private final Codec codec = Codec.THRIFT; + private final ThriftCodec codec = new ThriftCodec(); @Override - protected Codec codec() { + protected ThriftCodec codec() { return codec; } + + @Test + public void readSpanFromByteBuffer() throws IOException { + for (Span span : TestObjects.TRACE) { + byte[] bytes = codec().writeSpan(span); + assertThat(codec().readSpan(ByteBuffer.wrap(bytes))) + .isEqualTo(span); + } + } + + @Test + public void readDependencyLinksFromByteBuffer() throws IOException { + List links = asList( + DependencyLink.create("foo", "bar", 2), + DependencyLink.create("bar", "baz", 3) + ); + byte[] bytes = codec().writeDependencyLinks(links); + assertThat(codec().readDependencyLinks(ByteBuffer.wrap(bytes))) + .isEqualTo(links); + } }