Skip to content

Commit

Permalink
Switches CollectorSampler to use (trace ID, debug) vs Span v1 object
Browse files Browse the repository at this point in the history
Currently, transports can decode zipkin2 format, but it has to be
converted to v1 objects prior to storage. Part of the reason is
CollectorSampler (used for load shedding to protect storage), accepts
an object type as opposed to the parameters trace ID and debug. This
deprecates the former in favor of the latter, as trace ID and debug
exist in both formats.
We are switching code to permit use of zipkin2 format in transports.
  • Loading branch information
Adrian Cole committed Aug 21, 2017
1 parent dc9de8b commit d3586f5
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 115 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -32,6 +32,7 @@
import org.slf4j.LoggerFactory;
import zipkin.Span;
import zipkin.collector.CollectorSampler;
import zipkin.internal.Nullable;

import static com.google.common.base.Preconditions.checkState;
import static zipkin.internal.Util.UTF_8;
Expand All @@ -43,17 +44,18 @@
* zipkin storage layer. It works by coordinating a sample rate based on multiple instances vs a
* target storage rate in spans/minute.
*
* <p>This assumes that each instance is storing every span it {@link #isSampled(Span) samples}, and
* that the store rate is a useful metric (ex spans have relatively the same size and depth.
* <p>This assumes that each instance is storing every span it {@link #isSampled(long, Boolean)
* samples}, and that the store rate is a useful metric (ex spans have relatively the same size and
* depth.
*
* <p>If the storage layer is capable of 10k spans/minute, you'd set the target rate in ZooKeeper to
* 10000. With this in mind, 10 balanced collectors writing 10k spans/minute would eventually see a
* sample rate of 0.10, slowing them down to match what the storage is capable of.
*
* <h3>Implementation notes</h3>
*
* <p>This object spawns a single scheduling thread that reports its rate of {@link #isSampled(Span)
* spans sampled}, per the {@link Builder#updateFrequency(int) update frequency}.
* <p>This object spawns a single scheduling thread that reports its rate of {@link #isSampled(long,
* Boolean) spans sampled}, per the {@link Builder#updateFrequency(int) update frequency}.
*
* <p>When a leader, this object summarizes recent sample rates and compares them against a target.
*
Expand Down Expand Up @@ -232,8 +234,12 @@ public void close() throws IOException {
closer.close();
}

@Override public boolean isSampled(Span span) {
boolean result = super.isSampled(span);
@Override @Deprecated public boolean isSampled(Span span) {
return isSampled(span.traceId, span.debug);
}

@Override public boolean isSampled(long traceId, @Nullable Boolean debug) {
boolean result = super.isSampled(traceId, debug);
if (result) spanCount.incrementAndGet();
return result;
}
Expand Down
31 changes: 19 additions & 12 deletions zipkin/src/main/java/zipkin/collector/Collector.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* before storage is attempted. This ensures that calling threads are disconnected from storage
* threads.
*/
public final class Collector {
public class Collector { // not final for mocking

/** Needed to scope this to the correct logging category */
public static Builder builder(Class<?> loggingClass) {
Expand All @@ -44,8 +44,8 @@ public static Builder builder(Class<?> loggingClass) {
public static final class Builder {
final Logger logger;
StorageComponent storage = null;
CollectorSampler sampler = CollectorSampler.ALWAYS_SAMPLE;
CollectorMetrics metrics = CollectorMetrics.NOOP_METRICS;
CollectorSampler sampler = null;
CollectorMetrics metrics = null;

Builder(Logger logger) {
this.logger = logger;
Expand Down Expand Up @@ -140,7 +140,7 @@ public void accept(List<Span> spans, Callback<Void> callback) {
List<Span> sample(List<Span> input) {
List<Span> sampled = new ArrayList<>(input.size());
for (Span s : input) {
if (sampler.isSampled(s)) sampled.add(s);
if (sampler.isSampled(s.traceId, s.debug)) sampled.add(s);
}
int dropped = input.size() - sampled.size();
if (dropped > 0) metrics.incrementSpansDropped(dropped);
Expand Down Expand Up @@ -185,24 +185,31 @@ RuntimeException errorStoringSpans(List<Span> spans, Throwable e) {
}

RuntimeException doError(String message, Throwable e) {
if (e instanceof RuntimeException && e.getMessage() != null && e.getMessage()
.startsWith("Malformed")) {
logger.log(WARNING, e.getMessage(), e);
String exceptionMessage = e.getMessage() != null ? e.getMessage() : "";
if (e instanceof RuntimeException && exceptionMessage.startsWith("Malformed")) {
warn(exceptionMessage, e);
return (RuntimeException) e;
} else {
message = format("%s due to %s(%s)", message, e.getClass().getSimpleName(),
e.getMessage() == null ? "" : e.getMessage());
logger.log(WARNING, message, e);
message = format("%s due to %s(%s)", message, e.getClass().getSimpleName(), exceptionMessage);
warn(message, e);
return new RuntimeException(message, e);
}
}

static StringBuilder appendSpanIds(List<Span> spans, StringBuilder message) {
void warn(String message, Throwable e) {
logger.log(WARNING, message, e);
}

StringBuilder appendSpanIds(List<Span> spans, StringBuilder message) {
message.append("[");
for (Iterator<Span> iterator = spans.iterator(); iterator.hasNext(); ) {
message.append(iterator.next().idString());
message.append(idString(iterator.next()));
if (iterator.hasNext()) message.append(", ");
}
return message.append("]");
}

String idString(Span span) {
return span.idString();
}
}
7 changes: 3 additions & 4 deletions zipkin/src/main/java/zipkin/collector/CollectorComponent.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -15,7 +15,6 @@

import java.util.List;
import zipkin.Component;
import zipkin.Span;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.Callback;
import zipkin.storage.StorageComponent;
Expand Down Expand Up @@ -49,8 +48,8 @@ interface Builder {
Builder metrics(CollectorMetrics metrics);

/**
* {@link CollectorSampler#isSampled(Span) samples spans} to reduce load on the storage system.
* Defaults to always sample.
* {@link CollectorSampler#isSampled(long, Boolean) samples spans} to reduce load on the storage
* system. Defaults to always sample.
*/
Builder sampler(CollectorSampler sampler);

Expand Down
12 changes: 6 additions & 6 deletions zipkin/src/main/java/zipkin/collector/CollectorMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@
package zipkin.collector;

import java.util.List;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.Callback;
import zipkin.Codec;
import zipkin.Span;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.Callback;

/**
* Instrumented applications report spans over a transport such as Kafka. Zipkin collectors receive
* these messages, {@link Codec#readSpans(byte[]) decoding them into spans}, {@link
* CollectorSampler#isSampled(Span) apply sampling}, and {@link AsyncSpanConsumer#accept(List,
* Callback) queue them for storage}.
* these messages, {@link Codec#readSpans(byte[]) decoding them into spans},
* {@link CollectorSampler# isSampled(long, Boolean) apply sampling}, and
* {@link AsyncSpanConsumer#accept(List, Callback) queues them for storage}.
*
* <p>Callbacks on this type are invoked by zipkin collectors to improve the visibility of the
* system. A typical implementation will report metrics to a telemetry system for analysis and
Expand All @@ -48,7 +48,7 @@
* messages sent from instrumentation.</li>
* <li>Stored spans <= {@link #incrementSpans(int) Accepted spans} - {@link
* #incrementSpansDropped(int) Dropped spans}. Alert when this drops below the
* {@link CollectorSampler#isSampled(Span) collection-tier sample rate}.
* {@link CollectorSampler#isSampled(long, Boolean) collection-tier sample rate}.
* </li>
* </ul>
* </pre>
Expand Down
27 changes: 19 additions & 8 deletions zipkin/src/main/java/zipkin/collector/CollectorSampler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -14,6 +14,7 @@
package zipkin.collector;

import zipkin.Span;
import zipkin.internal.Nullable;

import static zipkin.internal.Util.checkArgument;

Expand Down Expand Up @@ -50,21 +51,31 @@ public static CollectorSampler create(float rate) {
protected abstract long boundary();

/**
* Returns true if the span should be recorded to storage.
* Returns true if spans with this trace ID should be recorded to storage.
*
* <p>Zipkin v1 allows storage-layer sampling, which can help prevent spikes in traffic from
* overloading the system. {@link Span#debug Debug} spans are always stored.
* overloading the system. Debug spans are always stored.
*
* <p>This uses only the lower 64 bits of the trace ID as instrumentation still send mixed trace
* ID width.
*
* @param traceId the lower 64 bits of the span's trace ID
* @param debug when true, always passes sampling
*/
public boolean isSampled(Span span) {
if (span.debug != null && span.debug) {
return true;
}
public boolean isSampled(long traceId, @Nullable Boolean debug) {
if (Boolean.TRUE.equals(debug)) return true;

// The absolute value of Long.MIN_VALUE is larger than a long, so Math.abs returns identity.
// This converts to MAX_VALUE to avoid always dropping when traceId == Long.MIN_VALUE
long t = span.traceId == Long.MIN_VALUE ? Long.MAX_VALUE : Math.abs(span.traceId);
long t = traceId == Long.MIN_VALUE ? Long.MAX_VALUE : Math.abs(traceId);
return t <= boundary();
}

/** @deprecated use {@link #isSampled(long, Boolean)} as that works with multiple span formats */
@Deprecated public boolean isSampled(Span span) {
return isSampled(span.traceId, span.debug);
}

@Override
public String toString() {
return "CollectorSampler(" + boundary() + ")";
Expand Down
8 changes: 2 additions & 6 deletions zipkin/src/main/java/zipkin/storage/AsyncSpanConsumer.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -28,10 +28,6 @@
// @FunctionalInterface
public interface AsyncSpanConsumer {

/**
* Stores a list of spans {@link Codec#readSpans(byte[]) read} from a transport.
*
* @param spans may be subject to a {@link CollectorSampler#isSampled(Span) sampling policy}.
*/
/** Stores a list of spans {@link Codec#readSpans(byte[]) read} from a transport. */
void accept(List<Span> spans, Callback<Void> callback);
}
28 changes: 15 additions & 13 deletions zipkin/src/test/java/zipkin/collector/CollectorSamplerTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -17,11 +17,11 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import zipkin.Span;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.data.Percentage.withPercentage;
import static zipkin.TestObjects.LOTS_OF_SPANS;
import static zipkin.TestObjects.span;

public class CollectorSamplerTest {

Expand All @@ -36,15 +36,15 @@ public class CollectorSamplerTest {
public void mostNegativeNumberDefence() {
CollectorSampler sampler = CollectorSampler.create(0.1f);

assertThat(sampler.isSampled(span(Long.MIN_VALUE)))
.isEqualTo(sampler.isSampled(span(Long.MAX_VALUE)));
assertThat(sampler.isSampled(Long.MIN_VALUE, null))
.isEqualTo(sampler.isSampled(Long.MAX_VALUE, null));
}

@Test
public void debugWins() {
CollectorSampler sampler = CollectorSampler.create(0.0f);

assertThat(sampler.isSampled(span(Long.MIN_VALUE).toBuilder().debug(true).build()))
assertThat(sampler.isSampled(Long.MIN_VALUE, true))
.isTrue();
}

Expand All @@ -53,9 +53,7 @@ public void retain10Percent() {
float sampleRate = 0.1f;
CollectorSampler sampler = CollectorSampler.create(sampleRate);

long passCount = Stream.of(LOTS_OF_SPANS).parallel().filter(sampler::isSampled).count();

assertThat(passCount)
assertThat(lotsOfSpans().filter(s -> sampler.isSampled(s.traceId, null)).count())
.isCloseTo((long) (LOTS_OF_SPANS.length * sampleRate), withPercentage(3));
}

Expand All @@ -67,24 +65,24 @@ public void idempotent() {
CollectorSampler sampler1 = CollectorSampler.create(0.1f);
CollectorSampler sampler2 = CollectorSampler.create(0.1f);

assertThat(Stream.of(LOTS_OF_SPANS).parallel().filter(sampler1::isSampled).toArray())
.containsExactly(Stream.of(LOTS_OF_SPANS).parallel().filter(sampler2::isSampled).toArray());
assertThat(lotsOfSpans().filter(s -> sampler1.isSampled(s.traceId, null)).toArray())
.containsExactly(lotsOfSpans().filter(s -> sampler2.isSampled(s.traceId, null)).toArray());
}

@Test
public void zeroMeansDropAllTraces() {
CollectorSampler sampler = CollectorSampler.create(0.0f);

assertThat(Stream.of(LOTS_OF_SPANS).parallel().filter(sampler::isSampled).findAny())
assertThat(lotsOfSpans().filter(s -> sampler.isSampled(s.traceId, null)))
.isEmpty();
}

@Test
public void oneMeansKeepAllTraces() {
CollectorSampler sampler = CollectorSampler.create(1.0f);

assertThat(Stream.of(LOTS_OF_SPANS).parallel().filter(sampler::isSampled).count())
.isEqualTo(LOTS_OF_SPANS.length);
assertThat(lotsOfSpans().filter(s -> sampler.isSampled(s.traceId, null)))
.hasSize(LOTS_OF_SPANS.length);
}

@Test
Expand All @@ -102,4 +100,8 @@ public void rateCantBeOverOne() {

CollectorSampler.create(1.1f);
}

static Stream<Span> lotsOfSpans() {
return Stream.of(LOTS_OF_SPANS).parallel();
}
}
Loading

0 comments on commit d3586f5

Please sign in to comment.