From aadb002f14e06be2f16e4c371a3a67ef7c10e2dc Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Tue, 22 Aug 2017 20:53:48 +0800 Subject: [PATCH] Deprecates Collector.acceptSpans(List .. --- .../collector/kafka/KafkaStreamProcessor.java | 16 +- .../kafka10/KafkaCollectorWorker.java | 16 +- .../collector/scribe/ScribeSpanConsumer.java | 13 +- .../main/java/zipkin/collector/Collector.java | 123 +++----------- .../main/java/zipkin/internal/Collector.java | 150 +++++++++++++++++ .../zipkin/internal/DetectingSpanDecoder.java | 6 +- .../java/zipkin/collector/CollectorTest.java | 107 ++++-------- .../java/zipkin/internal/CollectorTest.java | 153 ++++++++++++++++++ 8 files changed, 386 insertions(+), 198 deletions(-) create mode 100644 zipkin/src/main/java/zipkin/internal/Collector.java create mode 100644 zipkin/src/test/java/zipkin/internal/CollectorTest.java diff --git a/zipkin-collector/kafka/src/main/java/zipkin/collector/kafka/KafkaStreamProcessor.java b/zipkin-collector/kafka/src/main/java/zipkin/collector/kafka/KafkaStreamProcessor.java index af3e36ae410..e7f2f9ce3e0 100644 --- a/zipkin-collector/kafka/src/main/java/zipkin/collector/kafka/KafkaStreamProcessor.java +++ b/zipkin-collector/kafka/src/main/java/zipkin/collector/kafka/KafkaStreamProcessor.java @@ -16,11 +16,12 @@ import java.util.Collections; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; +import zipkin.Span; +import zipkin.SpanDecoder; import zipkin.collector.Collector; import zipkin.collector.CollectorMetrics; import static zipkin.SpanDecoder.DETECTING_DECODER; -import static zipkin.SpanDecoder.THRIFT_DECODER; import static zipkin.storage.Callback.NOOP; /** Consumes spans from Kafka messages, ignoring malformed input */ @@ -48,10 +49,17 @@ public void run() { continue; } - if (bytes[0] == '[' /* json list */ || bytes[0] == 12 /* thrift list */) { + // If we received legacy single-span encoding, decode it into a singleton list + if (bytes[0] <= 16 && bytes[0] != 12 /* thrift, but not a list */) { + try { + metrics.incrementBytes(bytes.length); + Span span = SpanDecoder.THRIFT_DECODER.readSpan(bytes); + collector.accept(Collections.singletonList(span), NOOP); + } catch (RuntimeException e) { + metrics.incrementMessagesDropped(); + } + } else { collector.acceptSpans(bytes, DETECTING_DECODER, NOOP); - } else { // assume legacy single-span encoding - collector.acceptSpans(Collections.singletonList(bytes), THRIFT_DECODER, NOOP); } } } diff --git a/zipkin-collector/kafka10/src/main/java/zipkin/collector/kafka10/KafkaCollectorWorker.java b/zipkin-collector/kafka10/src/main/java/zipkin/collector/kafka10/KafkaCollectorWorker.java index 7af3786feb3..6115fe0db4d 100644 --- a/zipkin-collector/kafka10/src/main/java/zipkin/collector/kafka10/KafkaCollectorWorker.java +++ b/zipkin-collector/kafka10/src/main/java/zipkin/collector/kafka10/KafkaCollectorWorker.java @@ -27,11 +27,12 @@ import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import zipkin.Span; +import zipkin.SpanDecoder; import zipkin.collector.Collector; import zipkin.collector.CollectorMetrics; import static zipkin.SpanDecoder.DETECTING_DECODER; -import static zipkin.SpanDecoder.THRIFT_DECODER; import static zipkin.storage.Callback.NOOP; /** Consumes spans from Kafka messages, ignoring malformed input */ @@ -75,10 +76,17 @@ public void run() { if (bytes.length == 0) { metrics.incrementMessagesDropped(); } else { - if (bytes[0] == '[' /* json list */ || bytes[0] == 12 /* thrift list */) { + // If we received legacy single-span encoding, decode it into a singleton list + if (bytes[0] <= 16 && bytes[0] != 12 /* thrift, but not a list */) { + metrics.incrementBytes(bytes.length); + try { + Span span = SpanDecoder.THRIFT_DECODER.readSpan(bytes); + collector.accept(Collections.singletonList(span), NOOP); + } catch (RuntimeException e) { + metrics.incrementMessagesDropped(); + } + } else { collector.acceptSpans(bytes, DETECTING_DECODER, NOOP); - } else { // assume legacy single-span encoding - collector.acceptSpans(Collections.singletonList(bytes), THRIFT_DECODER, NOOP); } } } diff --git a/zipkin-collector/scribe/src/main/java/zipkin/collector/scribe/ScribeSpanConsumer.java b/zipkin-collector/scribe/src/main/java/zipkin/collector/scribe/ScribeSpanConsumer.java index 1859ae780bf..577773fe8bf 100644 --- a/zipkin-collector/scribe/src/main/java/zipkin/collector/scribe/ScribeSpanConsumer.java +++ b/zipkin-collector/scribe/src/main/java/zipkin/collector/scribe/ScribeSpanConsumer.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -20,7 +20,8 @@ import java.util.Base64; import java.util.List; import java.util.stream.Collectors; -import zipkin.Codec; +import zipkin.Span; +import zipkin.SpanDecoder; import zipkin.collector.Collector; import zipkin.collector.CollectorMetrics; import zipkin.internal.Nullable; @@ -40,12 +41,14 @@ public ScribeSpanConsumer(ScribeCollector.Builder builder) { @Override public ListenableFuture log(List messages) { metrics.incrementMessages(); - List thrifts; + List spans; try { - thrifts = messages.stream() + spans = messages.stream() .filter(m -> m.category.equals(category)) .map(m -> m.message.getBytes(StandardCharsets.ISO_8859_1)) .map(b -> Base64.getMimeDecoder().decode(b)) // finagle-zipkin uses mime encoding + .peek(b -> metrics.incrementBytes(b.length)) + .map(SpanDecoder.THRIFT_DECODER::readSpan) .collect(Collectors.toList()); } catch (RuntimeException e) { metrics.incrementMessagesDropped(); @@ -53,7 +56,7 @@ public ListenableFuture log(List messages) { } SettableFuture result = SettableFuture.create(); - collector.acceptSpans(thrifts, Codec.THRIFT, new Callback() { + collector.accept(spans, new Callback() { @Override public void onSuccess(@Nullable Void value) { result.set(ResultCode.OK); } diff --git a/zipkin/src/main/java/zipkin/collector/Collector.java b/zipkin/src/main/java/zipkin/collector/Collector.java index 0909d430194..fff30bdcea0 100644 --- a/zipkin/src/main/java/zipkin/collector/Collector.java +++ b/zipkin/src/main/java/zipkin/collector/Collector.java @@ -14,16 +14,15 @@ package zipkin.collector; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.logging.Logger; import zipkin.Span; import zipkin.SpanDecoder; +import zipkin.internal.DetectingSpanDecoder; import zipkin.storage.Callback; import zipkin.storage.StorageComponent; -import static java.lang.String.format; -import static java.util.logging.Level.WARNING; +import static zipkin.internal.DetectingSpanDecoder.detectFormat; import static zipkin.internal.Util.checkNotNull; /** @@ -34,7 +33,7 @@ * before storage is attempted. This ensures that calling threads are disconnected from storage * threads. */ -public class Collector { // not final for mocking +public class Collector extends zipkin.internal.Collector { // not final for mock /** Needed to scope this to the correct logging category */ public static Builder builder(Class loggingClass) { @@ -74,31 +73,31 @@ public Collector build() { } } - final Logger logger; - final StorageComponent storage; final CollectorSampler sampler; - final CollectorMetrics metrics; + final StorageComponent storage; Collector(Builder builder) { - this.logger = checkNotNull(builder.logger, "logger"); + super(builder.logger, builder.metrics); this.storage = checkNotNull(builder.storage, "storage"); this.sampler = builder.sampler == null ? CollectorSampler.ALWAYS_SAMPLE : builder.sampler; - this.metrics = builder.metrics == null ? CollectorMetrics.NOOP_METRICS : builder.metrics; } + @Override public void acceptSpans(byte[] serializedSpans, SpanDecoder decoder, Callback callback) { - metrics.incrementBytes(serializedSpans.length); - List spans; try { - spans = decoder.readSpans(serializedSpans); + if (decoder instanceof DetectingSpanDecoder) decoder = detectFormat(serializedSpans); } catch (RuntimeException e) { + metrics.incrementBytes(serializedSpans.length); callback.onError(errorReading(e)); return; } - accept(spans, callback); + super.acceptSpans(serializedSpans, decoder, callback); } - public void acceptSpans(List serializedSpans, SpanDecoder decoder, + /** + * @deprecated All transports accept encoded lists of spans. Please update reporters to do so. + */ + @Deprecated public void acceptSpans(List serializedSpans, SpanDecoder decoder, Callback callback) { List spans = new ArrayList<>(serializedSpans.size()); try { @@ -115,101 +114,19 @@ public void acceptSpans(List serializedSpans, SpanDecoder decoder, accept(spans, callback); } - public void accept(List spans, Callback callback) { - if (spans.isEmpty()) { - callback.onSuccess(null); - return; - } - metrics.incrementSpans(spans.size()); - - List sampled = sample(spans); - if (sampled.isEmpty()) { - callback.onSuccess(null); - return; - } - - try { - storage.asyncSpanConsumer().accept(sampled, acceptSpansCallback(sampled)); - callback.onSuccess(null); - } catch (RuntimeException e) { - callback.onError(errorStoringSpans(sampled, e)); - return; - } + @Override protected List decodeList(SpanDecoder decoder, byte[] serialized) { + return decoder.readSpans(serialized); } - List sample(List input) { - List sampled = new ArrayList<>(input.size()); - for (Span s : input) { - if (sampler.isSampled(s.traceId, s.debug)) sampled.add(s); - } - int dropped = input.size() - sampled.size(); - if (dropped > 0) metrics.incrementSpansDropped(dropped); - return sampled; + @Override protected boolean isSampled(Span span) { + return sampler.isSampled(span.traceId, span.debug); } - Callback acceptSpansCallback(final List spans) { - return new Callback() { - @Override public void onSuccess(Void value) { - } - - @Override public void onError(Throwable t) { - errorStoringSpans(spans, t); - } - - @Override - public String toString() { - return appendSpanIds(spans, new StringBuilder("AcceptSpans(")).append(")").toString(); - } - }; - } - - RuntimeException errorReading(Throwable e) { - return errorReading("Cannot decode spans", e); - } - - RuntimeException errorReading(String message, Throwable e) { - metrics.incrementMessagesDropped(); - return doError(message, e); - } - - /** - * When storing spans, an exception can be raised before or after the fact. This adds context of - * span ids to give logs more relevance. - */ - RuntimeException errorStoringSpans(List spans, Throwable e) { - metrics.incrementSpansDropped(spans.size()); - // The exception could be related to a span being huge. Instead of filling logs, - // print trace id, span id pairs - StringBuilder msg = appendSpanIds(spans, new StringBuilder("Cannot store spans ")); - return doError(msg.toString(), e); - } - - RuntimeException doError(String message, Throwable e) { - String exceptionMessage = e.getMessage() != null ? e.getMessage() : ""; - if (e instanceof RuntimeException && exceptionMessage.startsWith("Malformed")) { - warn(exceptionMessage, e); - return (RuntimeException) e; - } else { - message = format("%s due to %s(%s)", message, e.getClass().getSimpleName(), exceptionMessage); - warn(message, e); - return new RuntimeException(message, e); - } - } - - void warn(String message, Throwable e) { - logger.log(WARNING, message, e); - } - - StringBuilder appendSpanIds(List spans, StringBuilder message) { - message.append("["); - for (Iterator iterator = spans.iterator(); iterator.hasNext(); ) { - message.append(idString(iterator.next())); - if (iterator.hasNext()) message.append(", "); - } - return message.append("]"); + @Override protected void record(List sampled, Callback callback) { + storage.asyncSpanConsumer().accept(sampled, callback); } - String idString(Span span) { + @Override protected String idString(Span span) { return span.idString(); } } diff --git a/zipkin/src/main/java/zipkin/internal/Collector.java b/zipkin/src/main/java/zipkin/internal/Collector.java new file mode 100644 index 00000000000..f0a5c487fff --- /dev/null +++ b/zipkin/src/main/java/zipkin/internal/Collector.java @@ -0,0 +1,150 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.internal; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.logging.Logger; +import zipkin.collector.CollectorMetrics; +import zipkin.storage.Callback; + +import static java.lang.String.format; +import static java.util.logging.Level.WARNING; +import static zipkin.internal.Util.checkNotNull; + +public abstract class Collector { + + protected final Logger logger; + protected final CollectorMetrics metrics; + + protected Collector(Logger logger, @Nullable CollectorMetrics metrics) { + this.logger = checkNotNull(logger, "logger"); + this.metrics = metrics == null ? CollectorMetrics.NOOP_METRICS : metrics; + } + + protected abstract List decodeList(D decoder, byte[] serialized); + + protected abstract boolean isSampled(S s); + + protected abstract void record(List spans, Callback callback); + + protected abstract String idString(S span); + + void warn(String message, Throwable e) { + logger.log(WARNING, message, e); + } + + protected void acceptSpans(byte[] serializedSpans, D decoder, Callback callback) { + metrics.incrementBytes(serializedSpans.length); + List spans; + try { + spans = decodeList(decoder, serializedSpans); + } catch (RuntimeException e) { + callback.onError(errorReading(e)); + return; + } + accept(spans, callback); + } + + public void accept(List spans, Callback callback) { + if (spans.isEmpty()) { + callback.onSuccess(null); + return; + } + metrics.incrementSpans(spans.size()); + + List sampled = sample(spans); + if (sampled.isEmpty()) { + callback.onSuccess(null); + return; + } + + try { + record(sampled, acceptSpansCallback(sampled)); + callback.onSuccess(null); + } catch (RuntimeException e) { + callback.onError(errorStoringSpans(sampled, e)); + return; + } + } + + List sample(List input) { + List sampled = new ArrayList<>(input.size()); + for (S s : input) { + if (isSampled(s)) sampled.add(s); + } + int dropped = input.size() - sampled.size(); + if (dropped > 0) metrics.incrementSpansDropped(dropped); + return sampled; + } + + Callback acceptSpansCallback(final List spans) { + return new Callback() { + @Override public void onSuccess(Void value) { + } + + @Override public void onError(Throwable t) { + errorStoringSpans(spans, t); + } + + @Override + public String toString() { + return appendSpanIds(spans, new StringBuilder("AcceptSpans(")).append(")").toString(); + } + }; + } + + protected RuntimeException errorReading(Throwable e) { + return errorReading("Cannot decode spans", e); + } + + RuntimeException errorReading(String message, Throwable e) { + metrics.incrementMessagesDropped(); + return doError(message, e); + } + + /** + * When storing spans, an exception can be raised before or after the fact. This adds context of + * span ids to give logs more relevance. + */ + RuntimeException errorStoringSpans(List spans, Throwable e) { + metrics.incrementSpansDropped(spans.size()); + // The exception could be related to a span being huge. Instead of filling logs, + // print trace id, span id pairs + StringBuilder msg = appendSpanIds(spans, new StringBuilder("Cannot store spans ")); + return doError(msg.toString(), e); + } + + RuntimeException doError(String message, Throwable e) { + String exceptionMessage = e.getMessage() != null ? e.getMessage() : ""; + if (e instanceof RuntimeException && exceptionMessage.startsWith("Malformed")) { + warn(exceptionMessage, e); + return (RuntimeException) e; + } else { + message = format("%s due to %s(%s)", message, e.getClass().getSimpleName(), exceptionMessage); + warn(message, e); + return new RuntimeException(message, e); + } + } + + StringBuilder appendSpanIds(List spans, StringBuilder message) { + message.append("["); + for (Iterator iterator = spans.iterator(); iterator.hasNext(); ) { + message.append(idString(iterator.next())); + if (iterator.hasNext()) message.append(", "); + } + return message.append("]"); + } +} diff --git a/zipkin/src/main/java/zipkin/internal/DetectingSpanDecoder.java b/zipkin/src/main/java/zipkin/internal/DetectingSpanDecoder.java index 014274d5071..91fccccfad0 100644 --- a/zipkin/src/main/java/zipkin/internal/DetectingSpanDecoder.java +++ b/zipkin/src/main/java/zipkin/internal/DetectingSpanDecoder.java @@ -32,7 +32,7 @@ public final class DetectingSpanDecoder implements SpanDecoder { static final SpanDecoder JSON2_DECODER = new Span2JsonSpanDecoder(); @Override public Span readSpan(byte[] span) { - SpanDecoder decoder = detectJsonFormat(span); + SpanDecoder decoder = detectFormat(span); if (span[0] == 12 /* List[ThriftSpan] */ || span[0] == '[') { throw new IllegalArgumentException("Expected json or thrift object, not list encoding"); } @@ -40,7 +40,7 @@ public final class DetectingSpanDecoder implements SpanDecoder { } @Override public List readSpans(byte[] span) { - SpanDecoder decoder = detectJsonFormat(span); + SpanDecoder decoder = detectFormat(span); if (span[0] != 12 /* List[ThriftSpan] */ && span[0] != '[') { throw new IllegalArgumentException("Expected json or thrift list encoding"); } @@ -48,7 +48,7 @@ public final class DetectingSpanDecoder implements SpanDecoder { } /** @throws IllegalArgumentException if the input isn't a json or thrift list or object. */ - public static SpanDecoder detectJsonFormat(byte[] bytes) { + public static SpanDecoder detectFormat(byte[] bytes) { if (bytes[0] <= 16 /* the first byte is the TType, in a range 0-16 */) { return THRIFT_DECODER; } else if (bytes[0] != '[' && bytes[0] != '{') { diff --git a/zipkin/src/test/java/zipkin/collector/CollectorTest.java b/zipkin/src/test/java/zipkin/collector/CollectorTest.java index dee0875fd9f..4522a576646 100644 --- a/zipkin/src/test/java/zipkin/collector/CollectorTest.java +++ b/zipkin/src/test/java/zipkin/collector/CollectorTest.java @@ -16,14 +16,17 @@ import org.junit.Before; import org.junit.Test; import zipkin.Span; +import zipkin.SpanDecoder; import zipkin.internal.ApplyTimestampAndDuration; -import zipkin.storage.Callback; +import zipkin.internal.DetectingSpanDecoder; +import zipkin.internal.Span2; +import zipkin.internal.Span2Converter; +import zipkin.internal.Util; +import zipkin.internal.v2.codec.Encoder; +import zipkin.internal.v2.codec.MessageEncoder; import zipkin.storage.StorageComponent; import static java.util.Arrays.asList; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -35,86 +38,12 @@ public class CollectorTest { StorageComponent storage = mock(StorageComponent.class); Collector collector; Span span1 = ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[0]); - Span span2 = ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1]); + Span2 span2_1 = Span2Converter.fromSpan(span1).get(0); @Before public void setup() throws Exception { collector = spy(Collector.builder(Collector.class) .storage(storage).build()); when(collector.idString(span1)).thenReturn("1"); // to make expectations easier to read - doAnswer(invocation -> null).when(collector).warn(any(String.class), any(Throwable.class)); - } - - @Test - public void acceptSpansCallback_toStringIncludesSpanIds() { - when(collector.idString(span2)).thenReturn("2"); - - assertThat(collector.acceptSpansCallback(asList(span1, span2))) - .hasToString("AcceptSpans([1, 2])"); - } - - @Test - public void acceptSpansCallback_onErrorWithNullMessage() { - Callback callback = collector.acceptSpansCallback(asList(span1)); - - RuntimeException exception = new RuntimeException(); - callback.onError(exception); - - verify(collector).warn("Cannot store spans [1] due to RuntimeException()", exception); - } - - @Test - public void acceptSpansCallback_onErrorWithMessage() { - Callback callback = collector.acceptSpansCallback(asList(span1)); - RuntimeException exception = new IllegalArgumentException("no beer"); - callback.onError(exception); - - verify(collector) - .warn("Cannot store spans [1] due to IllegalArgumentException(no beer)", exception); - } - - @Test - public void errorAcceptingSpans_onErrorWithNullMessage() { - String message = - collector.errorStoringSpans(asList(span1), new RuntimeException()).getMessage(); - - assertThat(message) - .isEqualTo("Cannot store spans [1] due to RuntimeException()"); - } - - @Test - public void errorAcceptingSpans_onErrorWithMessage() { - RuntimeException exception = new IllegalArgumentException("no beer"); - String message = collector.errorStoringSpans(asList(span1), exception).getMessage(); - - assertThat(message) - .isEqualTo("Cannot store spans [1] due to IllegalArgumentException(no beer)"); - } - - @Test - public void errorDecoding_onErrorWithNullMessage() { - String message = collector.errorReading(new RuntimeException()).getMessage(); - - assertThat(message) - .isEqualTo("Cannot decode spans due to RuntimeException()"); - } - - @Test - public void errorDecoding_onErrorWithMessage() { - RuntimeException exception = new IllegalArgumentException("no beer"); - String message = collector.errorReading(exception).getMessage(); - - assertThat(message) - .isEqualTo("Cannot decode spans due to IllegalArgumentException(no beer)"); - } - - @Test - public void errorDecoding_doesntWrapMalformedException() { - RuntimeException exception = new IllegalArgumentException("Malformed reading spans"); - - String message = collector.errorReading(exception).getMessage(); - - assertThat(message) - .isEqualTo("Malformed reading spans"); } @Test public void unsampledSpansArentStored() { @@ -139,4 +68,24 @@ public void errorDecoding_doesntWrapMalformedException() { verify(sampler).isSampled(span1.traceId, span1.debug); } + + @Test public void errorDetectingFormat() { + CollectorMetrics metrics = mock(CollectorMetrics.class); + + collector = Collector.builder(Collector.class) + .metrics(metrics) + .storage(storage).build(); + + collector.acceptSpans("foo".getBytes(Util.UTF_8), new DetectingSpanDecoder(), NOOP); + + verify(metrics).incrementMessagesDropped(); + } + + @Test public void convertsSpan2Format() { + byte[] bytes = MessageEncoder.JSON_BYTES.encode(asList(Encoder.JSON.encode(span2_1))); + collector.acceptSpans(bytes, SpanDecoder.DETECTING_DECODER, NOOP); + + verify(collector).acceptSpans(bytes, SpanDecoder.DETECTING_DECODER, NOOP); + verify(collector).accept(asList(span1), NOOP); + } } diff --git a/zipkin/src/test/java/zipkin/internal/CollectorTest.java b/zipkin/src/test/java/zipkin/internal/CollectorTest.java new file mode 100644 index 00000000000..dcf5b4f1b92 --- /dev/null +++ b/zipkin/src/test/java/zipkin/internal/CollectorTest.java @@ -0,0 +1,153 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.internal; + +import java.util.List; +import java.util.logging.Logger; +import org.junit.Before; +import org.junit.Test; +import zipkin.Span; +import zipkin.SpanDecoder; +import zipkin.storage.Callback; + +import static java.util.Arrays.asList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static zipkin.TestObjects.LOTS_OF_SPANS; +import static zipkin.storage.Callback.NOOP; + +public class CollectorTest { + Collector collector; + Span span1 = ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[0]); + Span span2 = ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1]); + + @Before public void setup() throws Exception { + collector = spy(new Collector(mock(Logger.class), null) { + + @Override protected List decodeList(SpanDecoder decoder, byte[] serialized) { + return decoder.readSpans(serialized); + } + + @Override protected boolean isSampled(Span span) { + throw new UnsupportedOperationException(); + } + + @Override protected void record(List spans, Callback callback) { + throw new UnsupportedOperationException(); + } + + @Override protected String idString(Span span) { + return "1"; + } + + @Override void warn(String message, Throwable e) { + } + }); + } + + @Test + public void acceptSpansCallback_toStringIncludesSpanIds() { + when(collector.idString(span2)).thenReturn("2"); + + assertThat(collector.acceptSpansCallback(asList(span1, span2))) + .hasToString("AcceptSpans([1, 2])"); + } + + @Test + public void acceptSpansCallback_onErrorWithNullMessage() { + Callback callback = collector.acceptSpansCallback(asList(span1)); + + RuntimeException exception = new RuntimeException(); + callback.onError(exception); + + verify(collector).warn("Cannot store spans [1] due to RuntimeException()", exception); + } + + @Test + public void acceptSpansCallback_onErrorWithMessage() { + Callback callback = collector.acceptSpansCallback(asList(span1)); + RuntimeException exception = new IllegalArgumentException("no beer"); + callback.onError(exception); + + verify(collector) + .warn("Cannot store spans [1] due to IllegalArgumentException(no beer)", exception); + } + + @Test + public void errorAcceptingSpans_onErrorWithNullMessage() { + String message = + collector.errorStoringSpans(asList(span1), new RuntimeException()).getMessage(); + + assertThat(message) + .isEqualTo("Cannot store spans [1] due to RuntimeException()"); + } + + @Test + public void errorAcceptingSpans_onErrorWithMessage() { + RuntimeException exception = new IllegalArgumentException("no beer"); + String message = collector.errorStoringSpans(asList(span1), exception).getMessage(); + + assertThat(message) + .isEqualTo("Cannot store spans [1] due to IllegalArgumentException(no beer)"); + } + + @Test + public void errorDecoding_onErrorWithNullMessage() { + String message = collector.errorReading(new RuntimeException()).getMessage(); + + assertThat(message) + .isEqualTo("Cannot decode spans due to RuntimeException()"); + } + + @Test + public void errorDecoding_onErrorWithMessage() { + RuntimeException exception = new IllegalArgumentException("no beer"); + String message = collector.errorReading(exception).getMessage(); + + assertThat(message) + .isEqualTo("Cannot decode spans due to IllegalArgumentException(no beer)"); + } + + @Test + public void errorDecoding_doesntWrapMalformedException() { + RuntimeException exception = new IllegalArgumentException("Malformed reading spans"); + + String message = collector.errorReading(exception).getMessage(); + + assertThat(message) + .isEqualTo("Malformed reading spans"); + } + + @Test public void sampledSpansAreStored() { + doReturn(true).when(collector).isSampled(span1); + + collector.accept(asList(span1), NOOP); + + verify(collector).record(eq(asList(span1)), any(Callback.class)); + } + + @Test public void unsampledSpansArentStored() { + doThrow(new AssertionError()).when(collector).record(any(List.class), any(Callback.class)); + doReturn(false).when(collector).isSampled(span1); + + collector.accept(asList(span1), NOOP); + } +}