Skip to content

Commit

Permalink
fixes from review
Browse files Browse the repository at this point in the history
  • Loading branch information
tylerbenson committed Jun 22, 2018
1 parent eac05fe commit 93db6cd
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 115 deletions.
20 changes: 12 additions & 8 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
plugins {
id "net.ltgt.errorprone" version "0.0.14"
}

allprojects {
version = '0.0.1-SNAPSHOT'
}

apply plugin: 'java'
apply plugin: 'java-library'
apply plugin: 'groovy'

repositories {
Expand All @@ -12,13 +16,13 @@ repositories {
}

dependencies {
compile group: 'io.zipkin.zipkin2', name: 'zipkin', version: '2.8.3'
compile group: 'io.zipkin.reporter2', name: 'zipkin-reporter', version: '2.6.0'
compile group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.16'
implementation group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.16'

compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.0'
implementation group: 'io.zipkin.brave', name: 'brave-bom', version: '5.1.2'
implementation group: 'io.zipkin.zipkin2', name: 'zipkin'
implementation group: 'io.zipkin.reporter2', name: 'zipkin-reporter'

testCompile group: 'io.ratpack', name: 'ratpack-groovy-test', version: '1.4.6'
testCompile group: 'org.spockframework', name: 'spock-core', version: '1.1-groovy-2.4'
testCompile group: 'org.codehaus.groovy', name: 'groovy-all', version: '2.4.15'
testImplementation group: 'io.ratpack', name: 'ratpack-groovy-test', version: '1.4.6'
testImplementation group: 'org.spockframework', name: 'spock-core', version: '1.1-groovy-2.4'
testImplementation group: 'org.codehaus.groovy', name: 'groovy-all', version: '2.4.15'
}
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.7-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-4.8.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
2 changes: 2 additions & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
rootProject.name = 'zipkin-datadog-reporter'

include ':zipkin-datadog-autoconfigure'

enableFeaturePreview('IMPROVED_POM_SUPPORT')
160 changes: 87 additions & 73 deletions src/main/java/zipkin2/reporter/datadog/DDApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,33 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.text.MessageFormat;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import org.msgpack.jackson.dataformat.MessagePackFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.util.logging.Level.FINE;
import static java.util.logging.Level.FINER;
import static java.util.logging.Level.FINEST;
import static java.util.logging.Level.WARNING;

/** The API pointing to a DD agent */
class DDApi {
public static final String DEFAULT_HOSTNAME = "localhost";
public static final int DEFAULT_PORT = 8126;
static final String DEFAULT_HOSTNAME = "localhost";
static final int DEFAULT_PORT = 8126;

private static final Logger log = LoggerFactory.getLogger(DDApi.class);
private static final Logger log = Logger.getLogger(DDApi.class.getName());

public static final String JAVA_VERSION = System.getProperty("java.version", "unknown");
public static final String JAVA_VM_NAME = System.getProperty("java.vm.name", "unknown");
static final String JAVA_VERSION = System.getProperty("java.version", "unknown");
static final String JAVA_VM_NAME = System.getProperty("java.vm.name", "unknown");
private static final String DATADOG_META_LANG = "Datadog-Meta-Lang";
private static final String DATADOG_META_LANG_VERSION = "Datadog-Meta-Lang-Version";
private static final String DATADOG_META_LANG_INTERPRETER = "Datadog-Meta-Lang-Interpreter";
Expand All @@ -38,52 +42,40 @@ class DDApi {
private static final long MILLISECONDS_BETWEEN_ERROR_LOG = TimeUnit.MINUTES.toMillis(5);

private final String tracesEndpoint;
private final List<ResponseListener> responseListeners = new ArrayList<>();

private AtomicInteger traceCount;
private volatile long nextAllowedLogTime = 0;

private static final ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory());

public DDApi(final String host, final int port) {
DDApi(final String host, final int port) {
this(host, port, traceEndpointAvailable("http://" + host + ":" + port + TRACES_ENDPOINT_V4));
}

DDApi(final String host, final int port, final boolean v4EndpointsAvailable) {
if (v4EndpointsAvailable) {
this.tracesEndpoint = "http://" + host + ":" + port + TRACES_ENDPOINT_V4;
} else {
log.debug("API v0.4 endpoints not available. Downgrading to v0.3");
log.log(FINE, "API v0.4 endpoints not available. Downgrading to v0.3");
this.tracesEndpoint = "http://" + host + ":" + port + TRACES_ENDPOINT_V3;
}
}

public void addResponseListener(final ResponseListener listener) {
if (!responseListeners.contains(listener)) {
responseListeners.add(listener);
}
}

public void addTraceCounter(final AtomicInteger traceCount) {
this.traceCount = traceCount;
}

/**
* Send traces to the DD agent
*
* @param traces the traces to be sent
* @return the staus code returned
*/
public boolean sendTraces(final List<List<DDMappingSpan>> traces) {
final int totalSize = traceCount == null ? traces.size() : traceCount.getAndSet(0);
void sendTraces(final List<List<DDMappingSpan>> traces) {
final int totalSize = traces.size();
try {
final HttpURLConnection httpCon = getHttpURLConnection(tracesEndpoint);
httpCon.setRequestProperty(X_DATADOG_TRACE_COUNT, String.valueOf(totalSize));

final OutputStream out = httpCon.getOutputStream();
objectMapper.writeValue(out, traces);
out.flush();
out.close();
try (final OutputStream out = httpCon.getOutputStream()) {
objectMapper.writeValue(out, traces);
out.flush();
}

String responseString = null;
{
Expand All @@ -101,47 +93,53 @@ public boolean sendTraces(final List<List<DDMappingSpan>> traces) {
responseString = sb.toString();
}

skipAllContent(httpCon);

final int responseCode = httpCon.getResponseCode();
if (responseCode != 200) {
if (log.isDebugEnabled()) {
log.debug(
"Error while sending {} of {} traces to the DD agent. Status: {}, ResponseMessage: ",
traces.size(),
totalSize,
responseCode,
httpCon.getResponseMessage());
if (log.isLoggable(FINER)) {
log.log(
FINER,
MessageFormat.format(
"Error while sending {} of {} traces to the DD agent. Status: {}, ResponseMessage: ",
traces.size(),
totalSize,
responseCode,
httpCon.getResponseMessage()));
} else if (nextAllowedLogTime < System.currentTimeMillis()) {
nextAllowedLogTime = System.currentTimeMillis() + MILLISECONDS_BETWEEN_ERROR_LOG;
log.warn(
"Error while sending {} of {} traces to the DD agent. Status: {} (going silent for {} seconds)",
traces.size(),
totalSize,
responseCode,
httpCon.getResponseMessage(),
TimeUnit.MILLISECONDS.toMinutes(MILLISECONDS_BETWEEN_ERROR_LOG));
log.log(
WARNING,
MessageFormat.format(
"Error while sending {} of {} traces to the DD agent. Status: {} (going silent for {} seconds)",
traces.size(),
totalSize,
responseCode,
httpCon.getResponseMessage(),
TimeUnit.MILLISECONDS.toMinutes(MILLISECONDS_BETWEEN_ERROR_LOG)));
}
return false;
return;
}

log.debug("Succesfully sent {} of {} traces to the DD agent.", traces.size(), totalSize);
log.log(
FINEST,
MessageFormat.format(
"Succesfully sent {} of {} traces to the DD agent.", traces.size(), totalSize));

try {
if (null != responseString
&& !"".equals(responseString.trim())
&& !"OK".equalsIgnoreCase(responseString.trim())) {
final JsonNode response = objectMapper.readTree(responseString);
for (final ResponseListener listener : responseListeners) {
listener.onResponse(tracesEndpoint, response);
}
}
} catch (final IOException e) {
log.debug("failed to parse DD agent response: " + responseString, e);
log.log(FINE, "failed to parse DD agent response: " + responseString, e);
}
return true;

} catch (final IOException e) {
if (log.isDebugEnabled()) {
log.debug(
if (log.isLoggable(FINE)) {
log.log(
FINE,
"Error while sending "
+ traces.size()
+ " of "
Expand All @@ -150,26 +148,23 @@ public boolean sendTraces(final List<List<DDMappingSpan>> traces) {
e);
} else if (nextAllowedLogTime < System.currentTimeMillis()) {
nextAllowedLogTime = System.currentTimeMillis() + MILLISECONDS_BETWEEN_ERROR_LOG;
log.warn(
"Error while sending {} of {} traces to the DD agent. {}: {} (going silent for {} minutes)",
traces.size(),
totalSize,
e.getClass().getName(),
e.getMessage(),
TimeUnit.MILLISECONDS.toMinutes(MILLISECONDS_BETWEEN_ERROR_LOG));
log.log(
WARNING,
MessageFormat.format(
"Error while sending {} of {} traces to the DD agent. {}: {} (going silent for {} minutes)",
traces.size(),
totalSize,
e.getClass().getName(),
e.getMessage(),
TimeUnit.MILLISECONDS.toMinutes(MILLISECONDS_BETWEEN_ERROR_LOG)));
}
return false;
}
}

private static boolean traceEndpointAvailable(final String endpoint) {
return endpointAvailable(endpoint, Collections.emptyList(), true);
}

private static boolean serviceEndpointAvailable(final String endpoint) {
return endpointAvailable(endpoint, Collections.emptyMap(), true);
}

private static boolean endpointAvailable(
final String endpoint, final Object data, final boolean retry) {
try {
Expand All @@ -179,10 +174,10 @@ private static boolean endpointAvailable(
httpCon.setConnectTimeout((int) TimeUnit.SECONDS.toMillis(1));
httpCon.setReadTimeout((int) TimeUnit.SECONDS.toMillis(1));

final OutputStream out = httpCon.getOutputStream();
objectMapper.writeValue(out, data);
out.flush();
out.close();
try (final OutputStream out = httpCon.getOutputStream()) {
objectMapper.writeValue(out, data);
out.flush();
}
return httpCon.getResponseCode() == 200;
} catch (final IOException e) {
if (retry) {
Expand All @@ -208,13 +203,32 @@ private static HttpURLConnection getHttpURLConnection(final String endpoint) thr
return httpCon;
}

/* Ensure we read the full response. Borrowed from https://github.com/openzipkin/zipkin-reporter-java/blob/2eb169e/urlconnection/src/main/java/zipkin2/reporter/urlconnection/URLConnectionSender.java#L231-L252 */
private void skipAllContent(HttpURLConnection connection) throws IOException {
InputStream in = connection.getInputStream();
IOException thrown = skipAndSuppress(in);
if (thrown == null) return;
InputStream err = connection.getErrorStream();
if (err != null) skipAndSuppress(err); // null is possible, if the connection was dropped
throw thrown;
}

private IOException skipAndSuppress(InputStream in) {
try {
while (in.read() != -1) ; // skip
return null;
} catch (IOException e) {
return e;
} finally {
try {
in.close();
} catch (IOException suppressed) {
}
}
}

@Override
public String toString() {
return "DDApi { tracesEndpoint=" + tracesEndpoint + " }";
}

public static interface ResponseListener {
/** Invoked after the api receives a response from the core agent. */
void onResponse(String endpoint, JsonNode responseJson);
}
}
13 changes: 11 additions & 2 deletions src/main/java/zipkin2/reporter/datadog/DDMappingSpan.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import zipkin2.Span;

public class DDMappingSpan {
Expand All @@ -16,24 +17,32 @@ public class DDMappingSpan {

@JsonGetter("start")
public long getStartTime() {
return delegateSpan.timestampAsLong() * 1000L;
return TimeUnit.MICROSECONDS.toNanos(delegateSpan.timestampAsLong());
}

@JsonGetter("duration")
public long getDurationNano() {
return delegateSpan.durationAsLong();
return TimeUnit.MICROSECONDS.toNanos(delegateSpan.durationAsLong());
}

@JsonGetter("service")
public String getServiceName() {
return delegateSpan.localServiceName();
}

/**
* This method only returns the lower 64 bits of the trace id, so that is all that will be sent to Datadog.
* @return
*/
@JsonGetter("trace_id")
public long getTraceId() {
return lowerHexToUnsignedLong(delegateSpan.traceId());
}

/**
* This method only returns the lower 64 bits of the span id, so that is all that will be sent to Datadog.
* @return
*/
@JsonGetter("span_id")
public long getSpanId() {
return lowerHexToUnsignedLong(delegateSpan.id());
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/zipkin2/reporter/datadog/DatadogReporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
* reported to the agent after a span is reported that appears to be the root, or after a
* TIMEOUT_DELAY period. A span is assumed to be a root if it has a Span.Kind of either SERVER or
* CONSUMER.
*
* <p>This implementation groups spans into traces using an unbounded ConcurrentHashMap.
* "Incomplete" traces are flushed after 30 seconds, but only 1 second if a root span is reported.
* This means that spikes of traffic might cause unbounded growth of the contained
* ConcurrentHashMap.
*/
public class DatadogReporter implements Reporter<Span>, Flushable, Closeable {
public static final long TIMEOUT_DELAY = TimeUnit.SECONDS.toNanos(30);
Expand Down Expand Up @@ -82,6 +87,11 @@ public void report(Span span) {
trace = previousTrace != null ? previousTrace : trace; // Handles race condition

trace.spans.add(new DDMappingSpan(span));

/* If the span kind is server or consumer, we assume it is the root of the trace.
* That implies all span children have likely already been reported and can be
* flushed in the next cycle, though in some async cases, this might not be the case.
*/
if (span.kind() == Span.Kind.SERVER
|| span.kind() == Span.Kind.CONSUMER
|| span.parentId() == null) {
Expand Down Expand Up @@ -118,7 +128,6 @@ private void sendTraces() {
trace = reportingTraces.poll();
}
ddApi.sendTraces(traces);
System.out.println("SEND TRACES:" + traces);
}

private void flushPeriodically() {
Expand Down
Loading

0 comments on commit 93db6cd

Please sign in to comment.