diff --git a/zipkin-zookeeper/src/main/java/zipkin/collector/zookeeper/ZooKeeperCollectorSampler.java b/zipkin-zookeeper/src/main/java/zipkin/collector/zookeeper/ZooKeeperCollectorSampler.java index 62a390d880b..2f8d8ecff73 100644 --- a/zipkin-zookeeper/src/main/java/zipkin/collector/zookeeper/ZooKeeperCollectorSampler.java +++ b/zipkin-zookeeper/src/main/java/zipkin/collector/zookeeper/ZooKeeperCollectorSampler.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import zipkin.Span; import zipkin.collector.CollectorSampler; +import zipkin.internal.Nullable; import static com.google.common.base.Preconditions.checkState; import static zipkin.internal.Util.UTF_8; @@ -43,8 +44,9 @@ * zipkin storage layer. It works by coordinating a sample rate based on multiple instances vs a * target storage rate in spans/minute. * - *

This assumes that each instance is storing every span it {@link #isSampled(Span) samples}, and - * that the store rate is a useful metric (ex spans have relatively the same size and depth. + *

This assumes that each instance is storing every span it {@link #isSampled(long, Boolean) + * samples}, and that the store rate is a useful metric (ex spans have relatively the same size and + * depth. * *

If the storage layer is capable of 10k spans/minute, you'd set the target rate in ZooKeeper to * 10000. With this in mind, 10 balanced collectors writing 10k spans/minute would eventually see a @@ -52,8 +54,8 @@ * *

Implementation notes

* - *

This object spawns a single scheduling thread that reports its rate of {@link #isSampled(Span) - * spans sampled}, per the {@link Builder#updateFrequency(int) update frequency}. + *

This object spawns a single scheduling thread that reports its rate of {@link #isSampled(long, + * Boolean) spans sampled}, per the {@link Builder#updateFrequency(int) update frequency}. * *

When a leader, this object summarizes recent sample rates and compares them against a target. * @@ -232,8 +234,12 @@ public void close() throws IOException { closer.close(); } - @Override public boolean isSampled(Span span) { - boolean result = super.isSampled(span); + @Override @Deprecated public boolean isSampled(Span span) { + return isSampled(span.traceId, span.debug); + } + + @Override public boolean isSampled(long traceId, @Nullable Boolean debug) { + boolean result = super.isSampled(traceId, debug); if (result) spanCount.incrementAndGet(); return result; } diff --git a/zipkin/src/main/java/zipkin/collector/Collector.java b/zipkin/src/main/java/zipkin/collector/Collector.java index 3bc170213f4..0909d430194 100644 --- a/zipkin/src/main/java/zipkin/collector/Collector.java +++ b/zipkin/src/main/java/zipkin/collector/Collector.java @@ -34,7 +34,7 @@ * before storage is attempted. This ensures that calling threads are disconnected from storage * threads. */ -public final class Collector { +public class Collector { // not final for mocking /** Needed to scope this to the correct logging category */ public static Builder builder(Class loggingClass) { @@ -44,8 +44,8 @@ public static Builder builder(Class loggingClass) { public static final class Builder { final Logger logger; StorageComponent storage = null; - CollectorSampler sampler = CollectorSampler.ALWAYS_SAMPLE; - CollectorMetrics metrics = CollectorMetrics.NOOP_METRICS; + CollectorSampler sampler = null; + CollectorMetrics metrics = null; Builder(Logger logger) { this.logger = logger; @@ -140,7 +140,7 @@ public void accept(List spans, Callback callback) { List sample(List input) { List sampled = new ArrayList<>(input.size()); for (Span s : input) { - if (sampler.isSampled(s)) sampled.add(s); + if (sampler.isSampled(s.traceId, s.debug)) sampled.add(s); } int dropped = input.size() - sampled.size(); if (dropped > 0) metrics.incrementSpansDropped(dropped); @@ -185,24 +185,31 @@ RuntimeException errorStoringSpans(List spans, Throwable e) { } RuntimeException doError(String message, Throwable e) { - if (e instanceof RuntimeException && e.getMessage() != null && e.getMessage() - .startsWith("Malformed")) { - logger.log(WARNING, e.getMessage(), e); + String exceptionMessage = e.getMessage() != null ? e.getMessage() : ""; + if (e instanceof RuntimeException && exceptionMessage.startsWith("Malformed")) { + warn(exceptionMessage, e); return (RuntimeException) e; } else { - message = format("%s due to %s(%s)", message, e.getClass().getSimpleName(), - e.getMessage() == null ? "" : e.getMessage()); - logger.log(WARNING, message, e); + message = format("%s due to %s(%s)", message, e.getClass().getSimpleName(), exceptionMessage); + warn(message, e); return new RuntimeException(message, e); } } - static StringBuilder appendSpanIds(List spans, StringBuilder message) { + void warn(String message, Throwable e) { + logger.log(WARNING, message, e); + } + + StringBuilder appendSpanIds(List spans, StringBuilder message) { message.append("["); for (Iterator iterator = spans.iterator(); iterator.hasNext(); ) { - message.append(iterator.next().idString()); + message.append(idString(iterator.next())); if (iterator.hasNext()) message.append(", "); } return message.append("]"); } + + String idString(Span span) { + return span.idString(); + } } diff --git a/zipkin/src/main/java/zipkin/collector/CollectorComponent.java b/zipkin/src/main/java/zipkin/collector/CollectorComponent.java index 651576d7fce..161995ee11d 100644 --- a/zipkin/src/main/java/zipkin/collector/CollectorComponent.java +++ b/zipkin/src/main/java/zipkin/collector/CollectorComponent.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -15,7 +15,6 @@ import java.util.List; import zipkin.Component; -import zipkin.Span; import zipkin.storage.AsyncSpanConsumer; import zipkin.storage.Callback; import zipkin.storage.StorageComponent; @@ -49,8 +48,8 @@ interface Builder { Builder metrics(CollectorMetrics metrics); /** - * {@link CollectorSampler#isSampled(Span) samples spans} to reduce load on the storage system. - * Defaults to always sample. + * {@link CollectorSampler#isSampled(long, Boolean) samples spans} to reduce load on the storage + * system. Defaults to always sample. */ Builder sampler(CollectorSampler sampler); diff --git a/zipkin/src/main/java/zipkin/collector/CollectorMetrics.java b/zipkin/src/main/java/zipkin/collector/CollectorMetrics.java index 69bd790ac7f..f31599a3459 100644 --- a/zipkin/src/main/java/zipkin/collector/CollectorMetrics.java +++ b/zipkin/src/main/java/zipkin/collector/CollectorMetrics.java @@ -14,16 +14,16 @@ package zipkin.collector; import java.util.List; -import zipkin.storage.AsyncSpanConsumer; -import zipkin.storage.Callback; import zipkin.Codec; import zipkin.Span; +import zipkin.storage.AsyncSpanConsumer; +import zipkin.storage.Callback; /** * Instrumented applications report spans over a transport such as Kafka. Zipkin collectors receive - * these messages, {@link Codec#readSpans(byte[]) decoding them into spans}, {@link - * CollectorSampler#isSampled(Span) apply sampling}, and {@link AsyncSpanConsumer#accept(List, - * Callback) queue them for storage}. + * these messages, {@link Codec#readSpans(byte[]) decoding them into spans}, + * {@link CollectorSampler# isSampled(long, Boolean) apply sampling}, and + * {@link AsyncSpanConsumer#accept(List, Callback) queues them for storage}. * *

Callbacks on this type are invoked by zipkin collectors to improve the visibility of the * system. A typical implementation will report metrics to a telemetry system for analysis and @@ -48,7 +48,7 @@ * messages sent from instrumentation. *

  • Stored spans <= {@link #incrementSpans(int) Accepted spans} - {@link * #incrementSpansDropped(int) Dropped spans}. Alert when this drops below the - * {@link CollectorSampler#isSampled(Span) collection-tier sample rate}. + * {@link CollectorSampler#isSampled(long, Boolean) collection-tier sample rate}. *
  • * * diff --git a/zipkin/src/main/java/zipkin/collector/CollectorSampler.java b/zipkin/src/main/java/zipkin/collector/CollectorSampler.java index c226d961b88..bf9e83928f1 100644 --- a/zipkin/src/main/java/zipkin/collector/CollectorSampler.java +++ b/zipkin/src/main/java/zipkin/collector/CollectorSampler.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -14,6 +14,7 @@ package zipkin.collector; import zipkin.Span; +import zipkin.internal.Nullable; import static zipkin.internal.Util.checkArgument; @@ -50,21 +51,31 @@ public static CollectorSampler create(float rate) { protected abstract long boundary(); /** - * Returns true if the span should be recorded to storage. + * Returns true if spans with this trace ID should be recorded to storage. * *

    Zipkin v1 allows storage-layer sampling, which can help prevent spikes in traffic from - * overloading the system. {@link Span#debug Debug} spans are always stored. + * overloading the system. Debug spans are always stored. + * + *

    This uses only the lower 64 bits of the trace ID as instrumentation still send mixed trace + * ID width. + * + * @param traceId the lower 64 bits of the span's trace ID + * @param debug when true, always passes sampling */ - public boolean isSampled(Span span) { - if (span.debug != null && span.debug) { - return true; - } + public boolean isSampled(long traceId, @Nullable Boolean debug) { + if (Boolean.TRUE.equals(debug)) return true; + // The absolute value of Long.MIN_VALUE is larger than a long, so Math.abs returns identity. // This converts to MAX_VALUE to avoid always dropping when traceId == Long.MIN_VALUE - long t = span.traceId == Long.MIN_VALUE ? Long.MAX_VALUE : Math.abs(span.traceId); + long t = traceId == Long.MIN_VALUE ? Long.MAX_VALUE : Math.abs(traceId); return t <= boundary(); } + /** @deprecated use {@link #isSampled(long, Boolean)} as that works with multiple span formats */ + @Deprecated public boolean isSampled(Span span) { + return isSampled(span.traceId, span.debug); + } + @Override public String toString() { return "CollectorSampler(" + boundary() + ")"; diff --git a/zipkin/src/main/java/zipkin/storage/AsyncSpanConsumer.java b/zipkin/src/main/java/zipkin/storage/AsyncSpanConsumer.java index f8d36f5ecec..dfb084c1fbf 100644 --- a/zipkin/src/main/java/zipkin/storage/AsyncSpanConsumer.java +++ b/zipkin/src/main/java/zipkin/storage/AsyncSpanConsumer.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -28,10 +28,6 @@ // @FunctionalInterface public interface AsyncSpanConsumer { - /** - * Stores a list of spans {@link Codec#readSpans(byte[]) read} from a transport. - * - * @param spans may be subject to a {@link CollectorSampler#isSampled(Span) sampling policy}. - */ + /** Stores a list of spans {@link Codec#readSpans(byte[]) read} from a transport. */ void accept(List spans, Callback callback); } diff --git a/zipkin/src/test/java/zipkin/collector/CollectorSamplerTest.java b/zipkin/src/test/java/zipkin/collector/CollectorSamplerTest.java index 7f3420cd6f2..536e7b0a72d 100644 --- a/zipkin/src/test/java/zipkin/collector/CollectorSamplerTest.java +++ b/zipkin/src/test/java/zipkin/collector/CollectorSamplerTest.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -17,11 +17,11 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import zipkin.Span; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.data.Percentage.withPercentage; import static zipkin.TestObjects.LOTS_OF_SPANS; -import static zipkin.TestObjects.span; public class CollectorSamplerTest { @@ -36,15 +36,15 @@ public class CollectorSamplerTest { public void mostNegativeNumberDefence() { CollectorSampler sampler = CollectorSampler.create(0.1f); - assertThat(sampler.isSampled(span(Long.MIN_VALUE))) - .isEqualTo(sampler.isSampled(span(Long.MAX_VALUE))); + assertThat(sampler.isSampled(Long.MIN_VALUE, null)) + .isEqualTo(sampler.isSampled(Long.MAX_VALUE, null)); } @Test public void debugWins() { CollectorSampler sampler = CollectorSampler.create(0.0f); - assertThat(sampler.isSampled(span(Long.MIN_VALUE).toBuilder().debug(true).build())) + assertThat(sampler.isSampled(Long.MIN_VALUE, true)) .isTrue(); } @@ -53,9 +53,7 @@ public void retain10Percent() { float sampleRate = 0.1f; CollectorSampler sampler = CollectorSampler.create(sampleRate); - long passCount = Stream.of(LOTS_OF_SPANS).parallel().filter(sampler::isSampled).count(); - - assertThat(passCount) + assertThat(lotsOfSpans().filter(s -> sampler.isSampled(s.traceId, null)).count()) .isCloseTo((long) (LOTS_OF_SPANS.length * sampleRate), withPercentage(3)); } @@ -67,15 +65,15 @@ public void idempotent() { CollectorSampler sampler1 = CollectorSampler.create(0.1f); CollectorSampler sampler2 = CollectorSampler.create(0.1f); - assertThat(Stream.of(LOTS_OF_SPANS).parallel().filter(sampler1::isSampled).toArray()) - .containsExactly(Stream.of(LOTS_OF_SPANS).parallel().filter(sampler2::isSampled).toArray()); + assertThat(lotsOfSpans().filter(s -> sampler1.isSampled(s.traceId, null)).toArray()) + .containsExactly(lotsOfSpans().filter(s -> sampler2.isSampled(s.traceId, null)).toArray()); } @Test public void zeroMeansDropAllTraces() { CollectorSampler sampler = CollectorSampler.create(0.0f); - assertThat(Stream.of(LOTS_OF_SPANS).parallel().filter(sampler::isSampled).findAny()) + assertThat(lotsOfSpans().filter(s -> sampler.isSampled(s.traceId, null))) .isEmpty(); } @@ -83,8 +81,8 @@ public void zeroMeansDropAllTraces() { public void oneMeansKeepAllTraces() { CollectorSampler sampler = CollectorSampler.create(1.0f); - assertThat(Stream.of(LOTS_OF_SPANS).parallel().filter(sampler::isSampled).count()) - .isEqualTo(LOTS_OF_SPANS.length); + assertThat(lotsOfSpans().filter(s -> sampler.isSampled(s.traceId, null))) + .hasSize(LOTS_OF_SPANS.length); } @Test @@ -102,4 +100,8 @@ public void rateCantBeOverOne() { CollectorSampler.create(1.1f); } + + static Stream lotsOfSpans() { + return Stream.of(LOTS_OF_SPANS).parallel(); + } } diff --git a/zipkin/src/test/java/zipkin/collector/CollectorTest.java b/zipkin/src/test/java/zipkin/collector/CollectorTest.java index 6ec099a582f..dee0875fd9f 100644 --- a/zipkin/src/test/java/zipkin/collector/CollectorTest.java +++ b/zipkin/src/test/java/zipkin/collector/CollectorTest.java @@ -13,127 +13,130 @@ */ package zipkin.collector; -import java.util.ArrayList; -import java.util.List; -import java.util.logging.Level; -import java.util.logging.Logger; +import org.junit.Before; import org.junit.Test; import zipkin.Span; import zipkin.internal.ApplyTimestampAndDuration; import zipkin.storage.Callback; -import zipkin.storage.InMemoryStorage; +import zipkin.storage.StorageComponent; import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static zipkin.TestObjects.LOTS_OF_SPANS; -import static zipkin.TestObjects.span; import static zipkin.storage.Callback.NOOP; public class CollectorTest { - List messages = new ArrayList<>(); - - Collector collector = new Collector.Builder(new Logger("", null) { - @Override - public void log(Level level, String msg, Throwable thrown) { - assertThat(level).isEqualTo(Level.WARNING); - messages.add(msg); - } - }).storage(new InMemoryStorage()).build(); - + StorageComponent storage = mock(StorageComponent.class); + Collector collector; Span span1 = ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[0]); Span span2 = ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1]); + @Before public void setup() throws Exception { + collector = spy(Collector.builder(Collector.class) + .storage(storage).build()); + when(collector.idString(span1)).thenReturn("1"); // to make expectations easier to read + doAnswer(invocation -> null).when(collector).warn(any(String.class), any(Throwable.class)); + } + @Test public void acceptSpansCallback_toStringIncludesSpanIds() { + when(collector.idString(span2)).thenReturn("2"); + assertThat(collector.acceptSpansCallback(asList(span1, span2))) - .hasToString("AcceptSpans([" + span1.idString() + ", " + span2.idString() + "])"); + .hasToString("AcceptSpans([1, 2])"); } @Test public void acceptSpansCallback_onErrorWithNullMessage() { Callback callback = collector.acceptSpansCallback(asList(span1)); - callback.onError(new RuntimeException()); - assertThat(messages) - .containsExactly("Cannot store spans [" + span1.idString() + "] due to RuntimeException()"); + RuntimeException exception = new RuntimeException(); + callback.onError(exception); + + verify(collector).warn("Cannot store spans [1] due to RuntimeException()", exception); } @Test public void acceptSpansCallback_onErrorWithMessage() { Callback callback = collector.acceptSpansCallback(asList(span1)); - callback.onError(new IllegalArgumentException("no beer")); + RuntimeException exception = new IllegalArgumentException("no beer"); + callback.onError(exception); - assertThat(messages) - .containsExactly( - "Cannot store spans [" + span1.idString() + "] due to IllegalArgumentException(no beer)"); + verify(collector) + .warn("Cannot store spans [1] due to IllegalArgumentException(no beer)", exception); } @Test public void errorAcceptingSpans_onErrorWithNullMessage() { String message = - collector.errorStoringSpans(asList(span1), new RuntimeException()).getMessage(); + collector.errorStoringSpans(asList(span1), new RuntimeException()).getMessage(); - assertThat(messages) - .containsExactly(message) - .containsExactly("Cannot store spans [" + span1.idString() + "] due to RuntimeException()"); + assertThat(message) + .isEqualTo("Cannot store spans [1] due to RuntimeException()"); } @Test public void errorAcceptingSpans_onErrorWithMessage() { - String message = - collector.errorStoringSpans(asList(span1), new IllegalArgumentException("no beer")) - .getMessage(); + RuntimeException exception = new IllegalArgumentException("no beer"); + String message = collector.errorStoringSpans(asList(span1), exception).getMessage(); - assertThat(messages) - .containsExactly(message) - .containsExactly( - "Cannot store spans [" + span1.idString() + "] due to IllegalArgumentException(no beer)"); + assertThat(message) + .isEqualTo("Cannot store spans [1] due to IllegalArgumentException(no beer)"); } @Test public void errorDecoding_onErrorWithNullMessage() { String message = collector.errorReading(new RuntimeException()).getMessage(); - assertThat(messages) - .containsExactly(message) - .containsExactly("Cannot decode spans due to RuntimeException()"); + assertThat(message) + .isEqualTo("Cannot decode spans due to RuntimeException()"); } @Test public void errorDecoding_onErrorWithMessage() { - String message = - collector.errorReading(new IllegalArgumentException("no beer")).getMessage(); + RuntimeException exception = new IllegalArgumentException("no beer"); + String message = collector.errorReading(exception).getMessage(); - assertThat(messages) - .containsExactly(message) - .containsExactly("Cannot decode spans due to IllegalArgumentException(no beer)"); + assertThat(message) + .isEqualTo("Cannot decode spans due to IllegalArgumentException(no beer)"); } @Test public void errorDecoding_doesntWrapMalformedException() { - String message = - collector.errorReading(new IllegalArgumentException("Malformed reading spans")).getMessage(); + RuntimeException exception = new IllegalArgumentException("Malformed reading spans"); - assertThat(messages) - .containsExactly(message) - .containsExactly("Malformed reading spans"); + String message = collector.errorReading(exception).getMessage(); + + assertThat(message) + .isEqualTo("Malformed reading spans"); } - @Test - public void unsampledSpansArentStored() { - collector = Collector.builder(Collector.class) - .sampler(CollectorSampler.create(0f)) - .storage(new InMemoryStorage()).build(); + @Test public void unsampledSpansArentStored() { + when(storage.asyncSpanConsumer()).thenThrow(new AssertionError()); - collector.accept(asList(span(Long.MIN_VALUE)), NOOP); + collector = Collector.builder(Collector.class) + .sampler(CollectorSampler.create(0.0f)) + .storage(storage).build(); - assertThat(collector.storage.spanStore().getServiceNames()).isEmpty(); + collector.accept(asList(span1), NOOP); } - @Test - public void debugFlagWins() { - collector.accept(asList(span(Long.MIN_VALUE).toBuilder().debug(true).build()), NOOP); + @Test public void doesntCallDeprecatedSampleMethod() { + CollectorSampler sampler = mock(CollectorSampler.class); + when(sampler.isSampled(span1)).thenThrow(new AssertionError()); + + collector = Collector.builder(Collector.class) + .sampler(sampler) + .storage(storage).build(); + + collector.accept(asList(span1), NOOP); - assertThat(collector.storage.spanStore().getServiceNames()).containsExactly("service"); + verify(sampler).isSampled(span1.traceId, span1.debug); } }