diff --git a/build.gradle b/build.gradle index c82f41e..e18e33c 100644 --- a/build.gradle +++ b/build.gradle @@ -1,8 +1,12 @@ +plugins { + id "net.ltgt.errorprone" version "0.0.14" +} + allprojects { version = '0.0.1-SNAPSHOT' } -apply plugin: 'java' +apply plugin: 'java-library' apply plugin: 'groovy' repositories { @@ -12,13 +16,13 @@ repositories { } dependencies { - compile group: 'io.zipkin.zipkin2', name: 'zipkin', version: '2.8.3' - compile group: 'io.zipkin.reporter2', name: 'zipkin-reporter', version: '2.6.0' - compile group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.16' + implementation group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.16' - compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.0' + implementation group: 'io.zipkin.brave', name: 'brave-bom', version: '5.1.2' + implementation group: 'io.zipkin.zipkin2', name: 'zipkin' + implementation group: 'io.zipkin.reporter2', name: 'zipkin-reporter' - testCompile group: 'io.ratpack', name: 'ratpack-groovy-test', version: '1.4.6' - testCompile group: 'org.spockframework', name: 'spock-core', version: '1.1-groovy-2.4' - testCompile group: 'org.codehaus.groovy', name: 'groovy-all', version: '2.4.15' + testImplementation group: 'io.ratpack', name: 'ratpack-groovy-test', version: '1.4.6' + testImplementation group: 'org.spockframework', name: 'spock-core', version: '1.1-groovy-2.4' + testImplementation group: 'org.codehaus.groovy', name: 'groovy-all', version: '2.4.15' } diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 16d2805..2d80b69 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-4.7-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-4.8.1-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/settings.gradle b/settings.gradle index 603b68f..6874314 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,3 +1,5 @@ rootProject.name = 'zipkin-datadog-reporter' include ':zipkin-datadog-autoconfigure' + +enableFeaturePreview('IMPROVED_POM_SUPPORT') diff --git a/src/main/java/zipkin2/reporter/datadog/DDApi.java b/src/main/java/zipkin2/reporter/datadog/DDApi.java index 2f0b9b6..c507f19 100644 --- a/src/main/java/zipkin2/reporter/datadog/DDApi.java +++ b/src/main/java/zipkin2/reporter/datadog/DDApi.java @@ -4,29 +4,33 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.URL; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; +import java.text.MessageFormat; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Logger; import org.msgpack.jackson.dataformat.MessagePackFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import static java.util.logging.Level.FINE; +import static java.util.logging.Level.FINER; +import static java.util.logging.Level.FINEST; +import static java.util.logging.Level.WARNING; /** The API pointing to a DD agent */ class DDApi { - public static final String DEFAULT_HOSTNAME = "localhost"; - public static final int DEFAULT_PORT = 8126; + static final String DEFAULT_HOSTNAME = "localhost"; + static final int DEFAULT_PORT = 8126; - private static final Logger log = LoggerFactory.getLogger(DDApi.class); + private static final Logger log = Logger.getLogger(DDApi.class.getName()); - public static final String JAVA_VERSION = System.getProperty("java.version", "unknown"); - public static final String JAVA_VM_NAME = System.getProperty("java.vm.name", "unknown"); + static final String JAVA_VERSION = System.getProperty("java.version", "unknown"); + static final String JAVA_VM_NAME = System.getProperty("java.vm.name", "unknown"); private static final String DATADOG_META_LANG = "Datadog-Meta-Lang"; private static final String DATADOG_META_LANG_VERSION = "Datadog-Meta-Lang-Version"; private static final String DATADOG_META_LANG_INTERPRETER = "Datadog-Meta-Lang-Interpreter"; @@ -38,14 +42,12 @@ class DDApi { private static final long MILLISECONDS_BETWEEN_ERROR_LOG = TimeUnit.MINUTES.toMillis(5); private final String tracesEndpoint; - private final List responseListeners = new ArrayList<>(); - private AtomicInteger traceCount; private volatile long nextAllowedLogTime = 0; private static final ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory()); - public DDApi(final String host, final int port) { + DDApi(final String host, final int port) { this(host, port, traceEndpointAvailable("http://" + host + ":" + port + TRACES_ENDPOINT_V4)); } @@ -53,37 +55,27 @@ public DDApi(final String host, final int port) { if (v4EndpointsAvailable) { this.tracesEndpoint = "http://" + host + ":" + port + TRACES_ENDPOINT_V4; } else { - log.debug("API v0.4 endpoints not available. Downgrading to v0.3"); + log.log(FINE, "API v0.4 endpoints not available. Downgrading to v0.3"); this.tracesEndpoint = "http://" + host + ":" + port + TRACES_ENDPOINT_V3; } } - public void addResponseListener(final ResponseListener listener) { - if (!responseListeners.contains(listener)) { - responseListeners.add(listener); - } - } - - public void addTraceCounter(final AtomicInteger traceCount) { - this.traceCount = traceCount; - } - /** * Send traces to the DD agent * * @param traces the traces to be sent * @return the staus code returned */ - public boolean sendTraces(final List> traces) { - final int totalSize = traceCount == null ? traces.size() : traceCount.getAndSet(0); + void sendTraces(final List> traces) { + final int totalSize = traces.size(); try { final HttpURLConnection httpCon = getHttpURLConnection(tracesEndpoint); httpCon.setRequestProperty(X_DATADOG_TRACE_COUNT, String.valueOf(totalSize)); - final OutputStream out = httpCon.getOutputStream(); - objectMapper.writeValue(out, traces); - out.flush(); - out.close(); + try (final OutputStream out = httpCon.getOutputStream()) { + objectMapper.writeValue(out, traces); + out.flush(); + } String responseString = null; { @@ -101,47 +93,53 @@ public boolean sendTraces(final List> traces) { responseString = sb.toString(); } + skipAllContent(httpCon); + final int responseCode = httpCon.getResponseCode(); if (responseCode != 200) { - if (log.isDebugEnabled()) { - log.debug( - "Error while sending {} of {} traces to the DD agent. Status: {}, ResponseMessage: ", - traces.size(), - totalSize, - responseCode, - httpCon.getResponseMessage()); + if (log.isLoggable(FINER)) { + log.log( + FINER, + MessageFormat.format( + "Error while sending {} of {} traces to the DD agent. Status: {}, ResponseMessage: ", + traces.size(), + totalSize, + responseCode, + httpCon.getResponseMessage())); } else if (nextAllowedLogTime < System.currentTimeMillis()) { nextAllowedLogTime = System.currentTimeMillis() + MILLISECONDS_BETWEEN_ERROR_LOG; - log.warn( - "Error while sending {} of {} traces to the DD agent. Status: {} (going silent for {} seconds)", - traces.size(), - totalSize, - responseCode, - httpCon.getResponseMessage(), - TimeUnit.MILLISECONDS.toMinutes(MILLISECONDS_BETWEEN_ERROR_LOG)); + log.log( + WARNING, + MessageFormat.format( + "Error while sending {} of {} traces to the DD agent. Status: {} (going silent for {} seconds)", + traces.size(), + totalSize, + responseCode, + httpCon.getResponseMessage(), + TimeUnit.MILLISECONDS.toMinutes(MILLISECONDS_BETWEEN_ERROR_LOG))); } - return false; + return; } - log.debug("Succesfully sent {} of {} traces to the DD agent.", traces.size(), totalSize); + log.log( + FINEST, + MessageFormat.format( + "Succesfully sent {} of {} traces to the DD agent.", traces.size(), totalSize)); try { if (null != responseString && !"".equals(responseString.trim()) && !"OK".equalsIgnoreCase(responseString.trim())) { final JsonNode response = objectMapper.readTree(responseString); - for (final ResponseListener listener : responseListeners) { - listener.onResponse(tracesEndpoint, response); - } } } catch (final IOException e) { - log.debug("failed to parse DD agent response: " + responseString, e); + log.log(FINE, "failed to parse DD agent response: " + responseString, e); } - return true; } catch (final IOException e) { - if (log.isDebugEnabled()) { - log.debug( + if (log.isLoggable(FINE)) { + log.log( + FINE, "Error while sending " + traces.size() + " of " @@ -150,15 +148,16 @@ public boolean sendTraces(final List> traces) { e); } else if (nextAllowedLogTime < System.currentTimeMillis()) { nextAllowedLogTime = System.currentTimeMillis() + MILLISECONDS_BETWEEN_ERROR_LOG; - log.warn( - "Error while sending {} of {} traces to the DD agent. {}: {} (going silent for {} minutes)", - traces.size(), - totalSize, - e.getClass().getName(), - e.getMessage(), - TimeUnit.MILLISECONDS.toMinutes(MILLISECONDS_BETWEEN_ERROR_LOG)); + log.log( + WARNING, + MessageFormat.format( + "Error while sending {} of {} traces to the DD agent. {}: {} (going silent for {} minutes)", + traces.size(), + totalSize, + e.getClass().getName(), + e.getMessage(), + TimeUnit.MILLISECONDS.toMinutes(MILLISECONDS_BETWEEN_ERROR_LOG))); } - return false; } } @@ -166,10 +165,6 @@ private static boolean traceEndpointAvailable(final String endpoint) { return endpointAvailable(endpoint, Collections.emptyList(), true); } - private static boolean serviceEndpointAvailable(final String endpoint) { - return endpointAvailable(endpoint, Collections.emptyMap(), true); - } - private static boolean endpointAvailable( final String endpoint, final Object data, final boolean retry) { try { @@ -179,10 +174,10 @@ private static boolean endpointAvailable( httpCon.setConnectTimeout((int) TimeUnit.SECONDS.toMillis(1)); httpCon.setReadTimeout((int) TimeUnit.SECONDS.toMillis(1)); - final OutputStream out = httpCon.getOutputStream(); - objectMapper.writeValue(out, data); - out.flush(); - out.close(); + try (final OutputStream out = httpCon.getOutputStream()) { + objectMapper.writeValue(out, data); + out.flush(); + } return httpCon.getResponseCode() == 200; } catch (final IOException e) { if (retry) { @@ -208,13 +203,32 @@ private static HttpURLConnection getHttpURLConnection(final String endpoint) thr return httpCon; } + /* Ensure we read the full response. Borrowed from https://github.com/openzipkin/zipkin-reporter-java/blob/2eb169e/urlconnection/src/main/java/zipkin2/reporter/urlconnection/URLConnectionSender.java#L231-L252 */ + private void skipAllContent(HttpURLConnection connection) throws IOException { + InputStream in = connection.getInputStream(); + IOException thrown = skipAndSuppress(in); + if (thrown == null) return; + InputStream err = connection.getErrorStream(); + if (err != null) skipAndSuppress(err); // null is possible, if the connection was dropped + throw thrown; + } + + private IOException skipAndSuppress(InputStream in) { + try { + while (in.read() != -1) ; // skip + return null; + } catch (IOException e) { + return e; + } finally { + try { + in.close(); + } catch (IOException suppressed) { + } + } + } + @Override public String toString() { return "DDApi { tracesEndpoint=" + tracesEndpoint + " }"; } - - public static interface ResponseListener { - /** Invoked after the api receives a response from the core agent. */ - void onResponse(String endpoint, JsonNode responseJson); - } } diff --git a/src/main/java/zipkin2/reporter/datadog/DDMappingSpan.java b/src/main/java/zipkin2/reporter/datadog/DDMappingSpan.java index 87bef0f..c8df916 100644 --- a/src/main/java/zipkin2/reporter/datadog/DDMappingSpan.java +++ b/src/main/java/zipkin2/reporter/datadog/DDMappingSpan.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude.Include; import java.util.Map; +import java.util.concurrent.TimeUnit; import zipkin2.Span; public class DDMappingSpan { @@ -16,12 +17,12 @@ public class DDMappingSpan { @JsonGetter("start") public long getStartTime() { - return delegateSpan.timestampAsLong() * 1000L; + return TimeUnit.MICROSECONDS.toNanos(delegateSpan.timestampAsLong()); } @JsonGetter("duration") public long getDurationNano() { - return delegateSpan.durationAsLong(); + return TimeUnit.MICROSECONDS.toNanos(delegateSpan.durationAsLong()); } @JsonGetter("service") @@ -29,11 +30,19 @@ public String getServiceName() { return delegateSpan.localServiceName(); } + /** + * This method only returns the lower 64 bits of the trace id, so that is all that will be sent to Datadog. + * @return + */ @JsonGetter("trace_id") public long getTraceId() { return lowerHexToUnsignedLong(delegateSpan.traceId()); } + /** + * This method only returns the lower 64 bits of the span id, so that is all that will be sent to Datadog. + * @return + */ @JsonGetter("span_id") public long getSpanId() { return lowerHexToUnsignedLong(delegateSpan.id()); diff --git a/src/main/java/zipkin2/reporter/datadog/DatadogReporter.java b/src/main/java/zipkin2/reporter/datadog/DatadogReporter.java index 81c0537..3f3e509 100644 --- a/src/main/java/zipkin2/reporter/datadog/DatadogReporter.java +++ b/src/main/java/zipkin2/reporter/datadog/DatadogReporter.java @@ -20,6 +20,11 @@ * reported to the agent after a span is reported that appears to be the root, or after a * TIMEOUT_DELAY period. A span is assumed to be a root if it has a Span.Kind of either SERVER or * CONSUMER. + * + *

This implementation groups spans into traces using an unbounded ConcurrentHashMap. + * "Incomplete" traces are flushed after 30 seconds, but only 1 second if a root span is reported. + * This means that spikes of traffic might cause unbounded growth of the contained + * ConcurrentHashMap. */ public class DatadogReporter implements Reporter, Flushable, Closeable { public static final long TIMEOUT_DELAY = TimeUnit.SECONDS.toNanos(30); @@ -82,6 +87,11 @@ public void report(Span span) { trace = previousTrace != null ? previousTrace : trace; // Handles race condition trace.spans.add(new DDMappingSpan(span)); + + /* If the span kind is server or consumer, we assume it is the root of the trace. + * That implies all span children have likely already been reported and can be + * flushed in the next cycle, though in some async cases, this might not be the case. + */ if (span.kind() == Span.Kind.SERVER || span.kind() == Span.Kind.CONSUMER || span.parentId() == null) { @@ -118,7 +128,6 @@ private void sendTraces() { trace = reportingTraces.poll(); } ddApi.sendTraces(traces); - System.out.println("SEND TRACES:" + traces); } private void flushPeriodically() { diff --git a/src/test/groovy/DatadogReporterTest.groovy b/src/test/groovy/DatadogReporterTest.groovy index 6961380..839478e 100644 --- a/src/test/groovy/DatadogReporterTest.groovy +++ b/src/test/groovy/DatadogReporterTest.groovy @@ -9,72 +9,106 @@ import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.atomic.AtomicLong class DatadogReporterTest extends Specification { - AtomicLong timestamp = new AtomicLong(); - BlockingQueue> reported = new LinkedBlockingQueue<>(); + AtomicLong timestamp = new AtomicLong() + BlockingQueue> reported = new LinkedBlockingQueue<>() DatadogReporter reporter = new DatadogReporter(reported) { @Override protected long nanoTime() { - return timestamp.get(); + return timestamp.get() } - }; + } + + def cleanup() { + reporter.close() + } def "test reporting"() { when: - timestamp.set(1); + timestamp.set(1) def client = newSpan("a", "b", "c", Span.Kind.CLIENT, "get /backend") - reporter.report(client); - timestamp.set(2); + reporter.report(client) + timestamp.set(2) def serverF = newSpan("f", null, "f", Span.Kind.SERVER, "get /other") - reporter.report(serverF); + reporter.report(serverF) - reporter.flush(); + reporter.flush() then: reported.each { it.delegateSpan }.asList() == [] when: - timestamp.set(3 + DatadogReporter.COMPLETION_DELAY); - reporter.flush(); + timestamp.set(3 + DatadogReporter.COMPLETION_DELAY) + reporter.flush() then: reported.collect { it.delegateSpan }.asList() == [[serverF]] when: - timestamp.set(4); + timestamp.set(4) def childB = newSpan("a", "a", "b", null, "callBackend") - reporter.report(childB); - timestamp.set(5); + reporter.report(childB) + timestamp.set(5) def serverA = newSpan("a", null, "a", Span.Kind.SERVER, "get /frontend") - reporter.report(serverA); + reporter.report(serverA) - timestamp.set(6 + DatadogReporter.COMPLETION_DELAY); - reporter.flush(); + timestamp.set(6 + DatadogReporter.COMPLETION_DELAY) + reporter.flush() then: reported.collect { it.delegateSpan }.asList() == [[serverF], [client, childB, serverA]] when: - timestamp.set(7); + timestamp.set(7) def childE = newSpan("a", "a", "e", null, "lateCall") - reporter.report(childE); + reporter.report(childE) - timestamp.set(8 + DatadogReporter.COMPLETION_DELAY); - reporter.flush(); + timestamp.set(8 + DatadogReporter.COMPLETION_DELAY) + reporter.flush() then: reported.collect { it.delegateSpan }.asList() == [[serverF], [client, childB, serverA]] when: - timestamp.set(8 + DatadogReporter.TIMEOUT_DELAY); - reporter.flush(); + timestamp.set(8 + DatadogReporter.TIMEOUT_DELAY) + reporter.flush() then: reported.collect { it.delegateSpan }.asList() == [[serverF], [client, childB, serverA], [childE]] } + def "test with multiple reporters"() { + setup: + def otherReported = new LinkedBlockingQueue>() + def otherReporter = new DatadogReporter(otherReported) { + @Override + protected long nanoTime() { + return timestamp.get() + } + } + + def span = newSpan("a", "b", "c", Span.Kind.SERVER, "get /d") + timestamp.set(0) + + when: + reporter.report(span) + otherReporter.report(span) + + timestamp.set(1 + DatadogReporter.COMPLETION_DELAY) + + reporter.flush() + otherReporter.flush() + + then: + reported.collect { it.delegateSpan }.asList() == [[span]] + otherReported.collect { it.delegateSpan }.asList() == [[span]] + + cleanup: + otherReporter.close() + } + static Span newSpan(String traceId, String parentId, String id, Span.Kind kind, String spanName) { - Span.Builder result = Span.newBuilder().traceId(traceId).parentId(parentId).id(id).kind(kind).name(spanName).localEndpoint(Endpoint.newBuilder().serviceName("my-app").build()); - return result.build(); + Span.Builder result = Span.newBuilder().traceId(traceId).parentId(parentId).id(id).kind(kind).name(spanName).localEndpoint(Endpoint.newBuilder().serviceName("my-app").build()) + return result.build() } } diff --git a/zipkin-datadog-autoconfigure/build.gradle b/zipkin-datadog-autoconfigure/build.gradle index 5da5c22..09502c9 100644 --- a/zipkin-datadog-autoconfigure/build.gradle +++ b/zipkin-datadog-autoconfigure/build.gradle @@ -1,4 +1,4 @@ -apply plugin: 'java' +apply plugin: 'java-library' repositories { mavenLocal() @@ -7,8 +7,11 @@ repositories { } dependencies { - compile project.parent - compile group: 'org.springframework.boot', name: 'spring-boot', version: '2.0.1.RELEASE' - compile group: 'org.springframework.boot', name: 'spring-boot-autoconfigure', version: '2.0.1.RELEASE' - compile group: 'org.springframework.cloud', name: 'spring-cloud-sleuth-zipkin', version: '1.3.3.RELEASE' + implementation project.parent + + implementation group: 'org.springframework.boot', name: 'spring-boot-dependencies', version: '2.0.1.RELEASE' + implementation group: 'org.springframework.boot', name: 'spring-boot' + implementation group: 'org.springframework.boot', name: 'spring-boot-autoconfigure' + + implementation group: 'org.springframework.cloud', name: 'spring-cloud-sleuth-zipkin', version: '1.3.3.RELEASE' }