Skip to content

Commit

Permalink
Sets timestamp and duration authoritatively (#14)
Browse files Browse the repository at this point in the history
Fixes #10
  • Loading branch information
adriancole authored Aug 10, 2016
1 parent 1512500 commit 59cf265
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 44 deletions.
50 changes: 27 additions & 23 deletions core/src/main/java/zipkin/finagle/MutableSpan.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package zipkin.finagle;

import com.twitter.finagle.service.TimeoutFilter;
import com.twitter.finagle.thrift.thrift.Constants;
import com.twitter.finagle.tracing.TraceId;
import com.twitter.util.Time;
import java.net.InetSocketAddress;
Expand All @@ -25,18 +24,29 @@
import zipkin.Endpoint;
import zipkin.Span;

import static com.twitter.finagle.thrift.thrift.Constants.CLIENT_RECV;
import static com.twitter.finagle.thrift.thrift.Constants.SERVER_SEND;

final class MutableSpan {
private final TraceId traceId;
private final Span.Builder span;
private final Time started;
private final List<Annotation> annotations = new ArrayList<>();
private final List<BinaryAnnotation> binaryAnnotations = new ArrayList<>();
private boolean isComplete = false;
private String name = "Unknown";
private String service = "Unknown";
private Endpoint endpoint = Endpoints.UNKNOWN;

MutableSpan(TraceId traceId, Time started) {
this.traceId = traceId;
this.span = Span.builder();
span.id(traceId.spanId().toLong());
if (traceId._parentId().isDefined()) {
span.parentId(traceId.parentId().toLong());
}
span.traceId(traceId.traceId().toLong());
if (traceId.flags().isDebug()) {
span.debug(true);
}
span.name("unknown");
this.started = started;
}

Expand All @@ -45,7 +55,7 @@ Time started() { // not synchronized as the field is immutable and final
}

synchronized MutableSpan setName(String n) {
name = n;
span.name(n);
return this;
}

Expand All @@ -55,13 +65,20 @@ synchronized MutableSpan setServiceName(String n) {
}

synchronized MutableSpan addAnnotation(Time timestamp, String value) {
if (!isComplete && (
value.equals(Constants.CLIENT_RECV) ||
value.equals(Constants.SERVER_SEND) ||
value.equals(TimeoutFilter.TimeoutAnnotation())
)) {
if (annotations.isEmpty()) {
span.timestamp(timestamp.inMicroseconds());
}

if (!isComplete &&
value.equals(CLIENT_RECV) ||
value.equals(SERVER_SEND) ||
value.equals(TimeoutFilter.TimeoutAnnotation())) {
if (!annotations.isEmpty()) {
span.duration(timestamp.inMicroseconds() - annotations.get(0).timestamp);
}
isComplete = true;
}

annotations.add(Annotation.create(timestamp.inMicroseconds(), value, endpoint));
return this;
}
Expand Down Expand Up @@ -93,18 +110,6 @@ synchronized MutableSpan setAddress(String key, InetSocketAddress ia) {
}

synchronized Span toSpan() {
Span.Builder span = Span.builder();

span.id(traceId.spanId().toLong());
if (traceId._parentId().isDefined()) {
span.parentId(traceId.parentId().toLong());
}
span.traceId(traceId.traceId().toLong());
span.name(name);
if (traceId.flags().isDebug()) {
span.debug(true);
}

// fill in the host/service data for all the annotations
for (Annotation ann : annotations) {
Endpoint ep = Endpoints.boundEndpoint(ann.endpoint);
Expand All @@ -117,7 +122,6 @@ synchronized Span toSpan() {
span.addBinaryAnnotation(
ann.toBuilder().endpoint(ep.toBuilder().serviceName(service).build()).build());
}

return span.build();
}

Expand Down
52 changes: 31 additions & 21 deletions core/src/test/java/zipkin/finagle/SpanRecorderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
import java.net.InetSocketAddress;
import java.util.Date;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import zipkin.Span;
import zipkin.storage.InMemoryStorage;

import static com.twitter.util.Time.fromMilliseconds;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -45,18 +46,21 @@ public class SpanRecorderTest {
MockTimer timer = new MockTimer();

InMemoryStatsReceiver stats = new InMemoryStatsReceiver();
InMemoryStorage mem = new InMemoryStorage();
BlockingQueue<List<Span>> spansSent = new LinkedBlockingDeque<>();
SpanRecorder recorder;

@Before
public void setRecorder() {
// Recorder schedules a flusher thread on instantiation. Do this in a Before block so
// that we can control time.
recorder = new SpanRecorder(mem.asyncSpanConsumer(), stats, timer);
recorder = new SpanRecorder((spans, callback) -> {
spansSent.add(spans);
callback.onSuccess(null);
}, stats, timer);
}

/** This is replaying actual events that happened with Finagle's tracer */
@Test public void examplerootAndChild() {
@Test public void examplerootAndChild() throws InterruptedException {

// Initiating a server-span based on an incoming request
advanceAndRecord(0, root, new Annotation.Rpc("GET"));
Expand All @@ -83,17 +87,12 @@ public void setRecorder() {
// Finishing the server span
advanceAndRecord(40, root, new Annotation.ServerSend());

List<Span> trace = mem.spanStore().getTrace(root.traceId().toLong());
assertThat(trace.get(0).annotations).extracting(a -> a.value).containsExactly(
"sr", "ss"
);
assertThat(trace.get(1).annotations).extracting(a -> a.value).containsExactly(
assertThat(spansSent.take().get(0).annotations).extracting(a -> a.value).containsExactly(
"cs", "ws", "wr", "cr"
);
}

private InetSocketAddress socketAddr(String host, int port) {
return new InetSocketAddress(host, port);
assertThat(spansSent.take().get(0).annotations).extracting(a -> a.value).containsExactly(
"sr", "ss"
);
}

@Test public void incrementsCounterWhenUnexpected_binaryAnnotation() throws Exception {
Expand All @@ -107,6 +106,11 @@ private InetSocketAddress socketAddr(String host, int port) {
);
}

/** Better to drop instead of crash on expected new Annotation types */
class FancyAnnotation implements Annotation {

}

@Test public void incrementsCounterWhenUnexpected_annotation() throws Exception {
recorder.record(
new Record(root, fromMilliseconds(TODAY), new FancyAnnotation(), empty())
Expand All @@ -121,30 +125,36 @@ private InetSocketAddress socketAddr(String host, int port) {
advanceAndRecord(0, root, new Annotation.ClientSend());
advanceAndRecord(1, root, new Annotation.ClientRecv());

Span span = mem.spanStore().getTrace(root.traceId().toLong()).get(0);
Span span = spansSent.take().get(0);
assertThat(span.annotations).extracting(a -> a.value).containsExactly(
"cs", "cr"
);
assertThat(span.timestamp).isEqualTo(TODAY * 1000);
assertThat(span.duration).isEqualTo(1000);
}

@Test public void reportsSpanOn_Timeout() throws Exception {
advanceAndRecord(0, root, new Annotation.ClientSend());
advanceAndRecord(1, root, new Annotation.Message("finagle.timeout"));

Span span = mem.spanStore().getTrace(root.traceId().toLong()).get(0);
Span span = spansSent.take().get(0);
assertThat(span.annotations).extracting(a -> a.value).containsExactly(
"cs", "finagle.timeout"
);
assertThat(span.timestamp).isEqualTo(TODAY * 1000);
assertThat(span.duration).isEqualTo(1000);
}

@Test public void reportsSpanOn_ServerSend() throws Exception {
advanceAndRecord(0, root, new Annotation.ServerRecv());
advanceAndRecord(1, root, new Annotation.ServerSend());

Span span = mem.spanStore().getTrace(root.traceId().toLong()).get(0);
Span span = spansSent.take().get(0);
assertThat(span.annotations).extracting(a -> a.value).containsExactly(
"sr", "ss"
);
assertThat(span.timestamp).isEqualTo(TODAY * 1000);
assertThat(span.duration).isEqualTo(1000);
}

/** ServiceName can be set late, but it should be consistent across annotations. */
Expand All @@ -154,7 +164,7 @@ private InetSocketAddress socketAddr(String host, int port) {
advanceAndRecord(0, root, new Annotation.ServiceName("frontend"));
advanceAndRecord(15, root, new Annotation.ServerSend());

Span span = mem.spanStore().getTrace(root.traceId().toLong()).get(0);
Span span = spansSent.take().get(0);
assertThat(span.annotations).extracting(a -> a.endpoint.serviceName).containsExactly(
"frontend", "frontend"
);
Expand All @@ -169,21 +179,21 @@ private InetSocketAddress socketAddr(String host, int port) {
time.advance(recorder.ttl.plus(Duration.fromMilliseconds(1))); // advance timer
timer.tick(); // invokes a flush

Span span = mem.spanStore().getTrace(root.traceId().toLong()).get(0);
Span span = spansSent.take().get(0);
assertThat(span.idString()).isEqualTo(root.toString());
assertThat(span.name).isEqualTo("get");
assertThat(span.annotations).extracting(a -> a.value).containsExactly(
"sr", "finagle.flush"
);
assertThat(span.duration).isNull();
}

private void advanceAndRecord(int millis, TraceId traceId, Annotation annotation) {
time.advance(Duration.fromMilliseconds(millis));
recorder.record(new Record(traceId, Time.now(), annotation, empty()));
}

/** Better to drop instead of crash on expected new Annotation types */
class FancyAnnotation implements Annotation {

private InetSocketAddress socketAddr(String host, int port) {
return new InetSocketAddress(host, port);
}
}

0 comments on commit 59cf265

Please sign in to comment.