From d3586f55ef6f85a7c564d0175200ce03ced91f48 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Mon, 21 Aug 2017 13:16:21 +0800 Subject: [PATCH] Switches CollectorSampler to use (trace ID, debug) vs Span v1 object Currently, transports can decode zipkin2 format, but it has to be converted to v1 objects prior to storage. Part of the reason is CollectorSampler (used for load shedding to protect storage), accepts an object type as opposed to the parameters trace ID and debug. This deprecates the former in favor of the latter, as trace ID and debug exist in both formats. We are switching code to permit use of zipkin2 format in transports. --- .../zookeeper/ZooKeeperCollectorSampler.java | 20 ++- .../main/java/zipkin/collector/Collector.java | 31 +++-- .../zipkin/collector/CollectorComponent.java | 7 +- .../zipkin/collector/CollectorMetrics.java | 12 +- .../zipkin/collector/CollectorSampler.java | 27 ++-- .../zipkin/storage/AsyncSpanConsumer.java | 8 +- .../collector/CollectorSamplerTest.java | 28 ++-- .../java/zipkin/collector/CollectorTest.java | 121 +++++++++--------- 8 files changed, 139 insertions(+), 115 deletions(-) diff --git a/zipkin-zookeeper/src/main/java/zipkin/collector/zookeeper/ZooKeeperCollectorSampler.java b/zipkin-zookeeper/src/main/java/zipkin/collector/zookeeper/ZooKeeperCollectorSampler.java index 62a390d880b..2f8d8ecff73 100644 --- a/zipkin-zookeeper/src/main/java/zipkin/collector/zookeeper/ZooKeeperCollectorSampler.java +++ b/zipkin-zookeeper/src/main/java/zipkin/collector/zookeeper/ZooKeeperCollectorSampler.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import zipkin.Span; import zipkin.collector.CollectorSampler; +import zipkin.internal.Nullable; import static com.google.common.base.Preconditions.checkState; import static zipkin.internal.Util.UTF_8; @@ -43,8 +44,9 @@ * zipkin storage layer. It works by coordinating a sample rate based on multiple instances vs a * target storage rate in spans/minute. * - *

This assumes that each instance is storing every span it {@link #isSampled(Span) samples}, and - * that the store rate is a useful metric (ex spans have relatively the same size and depth. + *

This assumes that each instance is storing every span it {@link #isSampled(long, Boolean) + * samples}, and that the store rate is a useful metric (ex spans have relatively the same size and + * depth. * *

If the storage layer is capable of 10k spans/minute, you'd set the target rate in ZooKeeper to * 10000. With this in mind, 10 balanced collectors writing 10k spans/minute would eventually see a @@ -52,8 +54,8 @@ * *

Implementation notes

* - *

This object spawns a single scheduling thread that reports its rate of {@link #isSampled(Span) - * spans sampled}, per the {@link Builder#updateFrequency(int) update frequency}. + *

This object spawns a single scheduling thread that reports its rate of {@link #isSampled(long, + * Boolean) spans sampled}, per the {@link Builder#updateFrequency(int) update frequency}. * *

When a leader, this object summarizes recent sample rates and compares them against a target. * @@ -232,8 +234,12 @@ public void close() throws IOException { closer.close(); } - @Override public boolean isSampled(Span span) { - boolean result = super.isSampled(span); + @Override @Deprecated public boolean isSampled(Span span) { + return isSampled(span.traceId, span.debug); + } + + @Override public boolean isSampled(long traceId, @Nullable Boolean debug) { + boolean result = super.isSampled(traceId, debug); if (result) spanCount.incrementAndGet(); return result; } diff --git a/zipkin/src/main/java/zipkin/collector/Collector.java b/zipkin/src/main/java/zipkin/collector/Collector.java index 3bc170213f4..0909d430194 100644 --- a/zipkin/src/main/java/zipkin/collector/Collector.java +++ b/zipkin/src/main/java/zipkin/collector/Collector.java @@ -34,7 +34,7 @@ * before storage is attempted. This ensures that calling threads are disconnected from storage * threads. */ -public final class Collector { +public class Collector { // not final for mocking /** Needed to scope this to the correct logging category */ public static Builder builder(Class loggingClass) { @@ -44,8 +44,8 @@ public static Builder builder(Class loggingClass) { public static final class Builder { final Logger logger; StorageComponent storage = null; - CollectorSampler sampler = CollectorSampler.ALWAYS_SAMPLE; - CollectorMetrics metrics = CollectorMetrics.NOOP_METRICS; + CollectorSampler sampler = null; + CollectorMetrics metrics = null; Builder(Logger logger) { this.logger = logger; @@ -140,7 +140,7 @@ public void accept(List spans, Callback callback) { List sample(List input) { List sampled = new ArrayList<>(input.size()); for (Span s : input) { - if (sampler.isSampled(s)) sampled.add(s); + if (sampler.isSampled(s.traceId, s.debug)) sampled.add(s); } int dropped = input.size() - sampled.size(); if (dropped > 0) metrics.incrementSpansDropped(dropped); @@ -185,24 +185,31 @@ RuntimeException errorStoringSpans(List spans, Throwable e) { } RuntimeException doError(String message, Throwable e) { - if (e instanceof RuntimeException && e.getMessage() != null && e.getMessage() - .startsWith("Malformed")) { - logger.log(WARNING, e.getMessage(), e); + String exceptionMessage = e.getMessage() != null ? e.getMessage() : ""; + if (e instanceof RuntimeException && exceptionMessage.startsWith("Malformed")) { + warn(exceptionMessage, e); return (RuntimeException) e; } else { - message = format("%s due to %s(%s)", message, e.getClass().getSimpleName(), - e.getMessage() == null ? "" : e.getMessage()); - logger.log(WARNING, message, e); + message = format("%s due to %s(%s)", message, e.getClass().getSimpleName(), exceptionMessage); + warn(message, e); return new RuntimeException(message, e); } } - static StringBuilder appendSpanIds(List spans, StringBuilder message) { + void warn(String message, Throwable e) { + logger.log(WARNING, message, e); + } + + StringBuilder appendSpanIds(List spans, StringBuilder message) { message.append("["); for (Iterator iterator = spans.iterator(); iterator.hasNext(); ) { - message.append(iterator.next().idString()); + message.append(idString(iterator.next())); if (iterator.hasNext()) message.append(", "); } return message.append("]"); } + + String idString(Span span) { + return span.idString(); + } } diff --git a/zipkin/src/main/java/zipkin/collector/CollectorComponent.java b/zipkin/src/main/java/zipkin/collector/CollectorComponent.java index 651576d7fce..161995ee11d 100644 --- a/zipkin/src/main/java/zipkin/collector/CollectorComponent.java +++ b/zipkin/src/main/java/zipkin/collector/CollectorComponent.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -15,7 +15,6 @@ import java.util.List; import zipkin.Component; -import zipkin.Span; import zipkin.storage.AsyncSpanConsumer; import zipkin.storage.Callback; import zipkin.storage.StorageComponent; @@ -49,8 +48,8 @@ interface Builder { Builder metrics(CollectorMetrics metrics); /** - * {@link CollectorSampler#isSampled(Span) samples spans} to reduce load on the storage system. - * Defaults to always sample. + * {@link CollectorSampler#isSampled(long, Boolean) samples spans} to reduce load on the storage + * system. Defaults to always sample. */ Builder sampler(CollectorSampler sampler); diff --git a/zipkin/src/main/java/zipkin/collector/CollectorMetrics.java b/zipkin/src/main/java/zipkin/collector/CollectorMetrics.java index 69bd790ac7f..f31599a3459 100644 --- a/zipkin/src/main/java/zipkin/collector/CollectorMetrics.java +++ b/zipkin/src/main/java/zipkin/collector/CollectorMetrics.java @@ -14,16 +14,16 @@ package zipkin.collector; import java.util.List; -import zipkin.storage.AsyncSpanConsumer; -import zipkin.storage.Callback; import zipkin.Codec; import zipkin.Span; +import zipkin.storage.AsyncSpanConsumer; +import zipkin.storage.Callback; /** * Instrumented applications report spans over a transport such as Kafka. Zipkin collectors receive - * these messages, {@link Codec#readSpans(byte[]) decoding them into spans}, {@link - * CollectorSampler#isSampled(Span) apply sampling}, and {@link AsyncSpanConsumer#accept(List, - * Callback) queue them for storage}. + * these messages, {@link Codec#readSpans(byte[]) decoding them into spans}, + * {@link CollectorSampler# isSampled(long, Boolean) apply sampling}, and + * {@link AsyncSpanConsumer#accept(List, Callback) queues them for storage}. * *

Callbacks on this type are invoked by zipkin collectors to improve the visibility of the * system. A typical implementation will report metrics to a telemetry system for analysis and @@ -48,7 +48,7 @@ * messages sent from instrumentation. *

  • Stored spans <= {@link #incrementSpans(int) Accepted spans} - {@link * #incrementSpansDropped(int) Dropped spans}. Alert when this drops below the - * {@link CollectorSampler#isSampled(Span) collection-tier sample rate}. + * {@link CollectorSampler#isSampled(long, Boolean) collection-tier sample rate}. *
  • * * diff --git a/zipkin/src/main/java/zipkin/collector/CollectorSampler.java b/zipkin/src/main/java/zipkin/collector/CollectorSampler.java index c226d961b88..bf9e83928f1 100644 --- a/zipkin/src/main/java/zipkin/collector/CollectorSampler.java +++ b/zipkin/src/main/java/zipkin/collector/CollectorSampler.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -14,6 +14,7 @@ package zipkin.collector; import zipkin.Span; +import zipkin.internal.Nullable; import static zipkin.internal.Util.checkArgument; @@ -50,21 +51,31 @@ public static CollectorSampler create(float rate) { protected abstract long boundary(); /** - * Returns true if the span should be recorded to storage. + * Returns true if spans with this trace ID should be recorded to storage. * *

    Zipkin v1 allows storage-layer sampling, which can help prevent spikes in traffic from - * overloading the system. {@link Span#debug Debug} spans are always stored. + * overloading the system. Debug spans are always stored. + * + *

    This uses only the lower 64 bits of the trace ID as instrumentation still send mixed trace + * ID width. + * + * @param traceId the lower 64 bits of the span's trace ID + * @param debug when true, always passes sampling */ - public boolean isSampled(Span span) { - if (span.debug != null && span.debug) { - return true; - } + public boolean isSampled(long traceId, @Nullable Boolean debug) { + if (Boolean.TRUE.equals(debug)) return true; + // The absolute value of Long.MIN_VALUE is larger than a long, so Math.abs returns identity. // This converts to MAX_VALUE to avoid always dropping when traceId == Long.MIN_VALUE - long t = span.traceId == Long.MIN_VALUE ? Long.MAX_VALUE : Math.abs(span.traceId); + long t = traceId == Long.MIN_VALUE ? Long.MAX_VALUE : Math.abs(traceId); return t <= boundary(); } + /** @deprecated use {@link #isSampled(long, Boolean)} as that works with multiple span formats */ + @Deprecated public boolean isSampled(Span span) { + return isSampled(span.traceId, span.debug); + } + @Override public String toString() { return "CollectorSampler(" + boundary() + ")"; diff --git a/zipkin/src/main/java/zipkin/storage/AsyncSpanConsumer.java b/zipkin/src/main/java/zipkin/storage/AsyncSpanConsumer.java index f8d36f5ecec..dfb084c1fbf 100644 --- a/zipkin/src/main/java/zipkin/storage/AsyncSpanConsumer.java +++ b/zipkin/src/main/java/zipkin/storage/AsyncSpanConsumer.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -28,10 +28,6 @@ // @FunctionalInterface public interface AsyncSpanConsumer { - /** - * Stores a list of spans {@link Codec#readSpans(byte[]) read} from a transport. - * - * @param spans may be subject to a {@link CollectorSampler#isSampled(Span) sampling policy}. - */ + /** Stores a list of spans {@link Codec#readSpans(byte[]) read} from a transport. */ void accept(List spans, Callback callback); } diff --git a/zipkin/src/test/java/zipkin/collector/CollectorSamplerTest.java b/zipkin/src/test/java/zipkin/collector/CollectorSamplerTest.java index 7f3420cd6f2..536e7b0a72d 100644 --- a/zipkin/src/test/java/zipkin/collector/CollectorSamplerTest.java +++ b/zipkin/src/test/java/zipkin/collector/CollectorSamplerTest.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -17,11 +17,11 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import zipkin.Span; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.data.Percentage.withPercentage; import static zipkin.TestObjects.LOTS_OF_SPANS; -import static zipkin.TestObjects.span; public class CollectorSamplerTest { @@ -36,15 +36,15 @@ public class CollectorSamplerTest { public void mostNegativeNumberDefence() { CollectorSampler sampler = CollectorSampler.create(0.1f); - assertThat(sampler.isSampled(span(Long.MIN_VALUE))) - .isEqualTo(sampler.isSampled(span(Long.MAX_VALUE))); + assertThat(sampler.isSampled(Long.MIN_VALUE, null)) + .isEqualTo(sampler.isSampled(Long.MAX_VALUE, null)); } @Test public void debugWins() { CollectorSampler sampler = CollectorSampler.create(0.0f); - assertThat(sampler.isSampled(span(Long.MIN_VALUE).toBuilder().debug(true).build())) + assertThat(sampler.isSampled(Long.MIN_VALUE, true)) .isTrue(); } @@ -53,9 +53,7 @@ public void retain10Percent() { float sampleRate = 0.1f; CollectorSampler sampler = CollectorSampler.create(sampleRate); - long passCount = Stream.of(LOTS_OF_SPANS).parallel().filter(sampler::isSampled).count(); - - assertThat(passCount) + assertThat(lotsOfSpans().filter(s -> sampler.isSampled(s.traceId, null)).count()) .isCloseTo((long) (LOTS_OF_SPANS.length * sampleRate), withPercentage(3)); } @@ -67,15 +65,15 @@ public void idempotent() { CollectorSampler sampler1 = CollectorSampler.create(0.1f); CollectorSampler sampler2 = CollectorSampler.create(0.1f); - assertThat(Stream.of(LOTS_OF_SPANS).parallel().filter(sampler1::isSampled).toArray()) - .containsExactly(Stream.of(LOTS_OF_SPANS).parallel().filter(sampler2::isSampled).toArray()); + assertThat(lotsOfSpans().filter(s -> sampler1.isSampled(s.traceId, null)).toArray()) + .containsExactly(lotsOfSpans().filter(s -> sampler2.isSampled(s.traceId, null)).toArray()); } @Test public void zeroMeansDropAllTraces() { CollectorSampler sampler = CollectorSampler.create(0.0f); - assertThat(Stream.of(LOTS_OF_SPANS).parallel().filter(sampler::isSampled).findAny()) + assertThat(lotsOfSpans().filter(s -> sampler.isSampled(s.traceId, null))) .isEmpty(); } @@ -83,8 +81,8 @@ public void zeroMeansDropAllTraces() { public void oneMeansKeepAllTraces() { CollectorSampler sampler = CollectorSampler.create(1.0f); - assertThat(Stream.of(LOTS_OF_SPANS).parallel().filter(sampler::isSampled).count()) - .isEqualTo(LOTS_OF_SPANS.length); + assertThat(lotsOfSpans().filter(s -> sampler.isSampled(s.traceId, null))) + .hasSize(LOTS_OF_SPANS.length); } @Test @@ -102,4 +100,8 @@ public void rateCantBeOverOne() { CollectorSampler.create(1.1f); } + + static Stream lotsOfSpans() { + return Stream.of(LOTS_OF_SPANS).parallel(); + } } diff --git a/zipkin/src/test/java/zipkin/collector/CollectorTest.java b/zipkin/src/test/java/zipkin/collector/CollectorTest.java index 6ec099a582f..dee0875fd9f 100644 --- a/zipkin/src/test/java/zipkin/collector/CollectorTest.java +++ b/zipkin/src/test/java/zipkin/collector/CollectorTest.java @@ -13,127 +13,130 @@ */ package zipkin.collector; -import java.util.ArrayList; -import java.util.List; -import java.util.logging.Level; -import java.util.logging.Logger; +import org.junit.Before; import org.junit.Test; import zipkin.Span; import zipkin.internal.ApplyTimestampAndDuration; import zipkin.storage.Callback; -import zipkin.storage.InMemoryStorage; +import zipkin.storage.StorageComponent; import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static zipkin.TestObjects.LOTS_OF_SPANS; -import static zipkin.TestObjects.span; import static zipkin.storage.Callback.NOOP; public class CollectorTest { - List messages = new ArrayList<>(); - - Collector collector = new Collector.Builder(new Logger("", null) { - @Override - public void log(Level level, String msg, Throwable thrown) { - assertThat(level).isEqualTo(Level.WARNING); - messages.add(msg); - } - }).storage(new InMemoryStorage()).build(); - + StorageComponent storage = mock(StorageComponent.class); + Collector collector; Span span1 = ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[0]); Span span2 = ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1]); + @Before public void setup() throws Exception { + collector = spy(Collector.builder(Collector.class) + .storage(storage).build()); + when(collector.idString(span1)).thenReturn("1"); // to make expectations easier to read + doAnswer(invocation -> null).when(collector).warn(any(String.class), any(Throwable.class)); + } + @Test public void acceptSpansCallback_toStringIncludesSpanIds() { + when(collector.idString(span2)).thenReturn("2"); + assertThat(collector.acceptSpansCallback(asList(span1, span2))) - .hasToString("AcceptSpans([" + span1.idString() + ", " + span2.idString() + "])"); + .hasToString("AcceptSpans([1, 2])"); } @Test public void acceptSpansCallback_onErrorWithNullMessage() { Callback callback = collector.acceptSpansCallback(asList(span1)); - callback.onError(new RuntimeException()); - assertThat(messages) - .containsExactly("Cannot store spans [" + span1.idString() + "] due to RuntimeException()"); + RuntimeException exception = new RuntimeException(); + callback.onError(exception); + + verify(collector).warn("Cannot store spans [1] due to RuntimeException()", exception); } @Test public void acceptSpansCallback_onErrorWithMessage() { Callback callback = collector.acceptSpansCallback(asList(span1)); - callback.onError(new IllegalArgumentException("no beer")); + RuntimeException exception = new IllegalArgumentException("no beer"); + callback.onError(exception); - assertThat(messages) - .containsExactly( - "Cannot store spans [" + span1.idString() + "] due to IllegalArgumentException(no beer)"); + verify(collector) + .warn("Cannot store spans [1] due to IllegalArgumentException(no beer)", exception); } @Test public void errorAcceptingSpans_onErrorWithNullMessage() { String message = - collector.errorStoringSpans(asList(span1), new RuntimeException()).getMessage(); + collector.errorStoringSpans(asList(span1), new RuntimeException()).getMessage(); - assertThat(messages) - .containsExactly(message) - .containsExactly("Cannot store spans [" + span1.idString() + "] due to RuntimeException()"); + assertThat(message) + .isEqualTo("Cannot store spans [1] due to RuntimeException()"); } @Test public void errorAcceptingSpans_onErrorWithMessage() { - String message = - collector.errorStoringSpans(asList(span1), new IllegalArgumentException("no beer")) - .getMessage(); + RuntimeException exception = new IllegalArgumentException("no beer"); + String message = collector.errorStoringSpans(asList(span1), exception).getMessage(); - assertThat(messages) - .containsExactly(message) - .containsExactly( - "Cannot store spans [" + span1.idString() + "] due to IllegalArgumentException(no beer)"); + assertThat(message) + .isEqualTo("Cannot store spans [1] due to IllegalArgumentException(no beer)"); } @Test public void errorDecoding_onErrorWithNullMessage() { String message = collector.errorReading(new RuntimeException()).getMessage(); - assertThat(messages) - .containsExactly(message) - .containsExactly("Cannot decode spans due to RuntimeException()"); + assertThat(message) + .isEqualTo("Cannot decode spans due to RuntimeException()"); } @Test public void errorDecoding_onErrorWithMessage() { - String message = - collector.errorReading(new IllegalArgumentException("no beer")).getMessage(); + RuntimeException exception = new IllegalArgumentException("no beer"); + String message = collector.errorReading(exception).getMessage(); - assertThat(messages) - .containsExactly(message) - .containsExactly("Cannot decode spans due to IllegalArgumentException(no beer)"); + assertThat(message) + .isEqualTo("Cannot decode spans due to IllegalArgumentException(no beer)"); } @Test public void errorDecoding_doesntWrapMalformedException() { - String message = - collector.errorReading(new IllegalArgumentException("Malformed reading spans")).getMessage(); + RuntimeException exception = new IllegalArgumentException("Malformed reading spans"); - assertThat(messages) - .containsExactly(message) - .containsExactly("Malformed reading spans"); + String message = collector.errorReading(exception).getMessage(); + + assertThat(message) + .isEqualTo("Malformed reading spans"); } - @Test - public void unsampledSpansArentStored() { - collector = Collector.builder(Collector.class) - .sampler(CollectorSampler.create(0f)) - .storage(new InMemoryStorage()).build(); + @Test public void unsampledSpansArentStored() { + when(storage.asyncSpanConsumer()).thenThrow(new AssertionError()); - collector.accept(asList(span(Long.MIN_VALUE)), NOOP); + collector = Collector.builder(Collector.class) + .sampler(CollectorSampler.create(0.0f)) + .storage(storage).build(); - assertThat(collector.storage.spanStore().getServiceNames()).isEmpty(); + collector.accept(asList(span1), NOOP); } - @Test - public void debugFlagWins() { - collector.accept(asList(span(Long.MIN_VALUE).toBuilder().debug(true).build()), NOOP); + @Test public void doesntCallDeprecatedSampleMethod() { + CollectorSampler sampler = mock(CollectorSampler.class); + when(sampler.isSampled(span1)).thenThrow(new AssertionError()); + + collector = Collector.builder(Collector.class) + .sampler(sampler) + .storage(storage).build(); + + collector.accept(asList(span1), NOOP); - assertThat(collector.storage.spanStore().getServiceNames()).containsExactly("service"); + verify(sampler).isSampled(span1.traceId, span1.debug); } }