Skip to content

Commit

Permalink
WIP v2 http query api
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrian Cole committed Aug 30, 2017
1 parent b001c64 commit 9fdf25e
Show file tree
Hide file tree
Showing 11 changed files with 737 additions and 27 deletions.
133 changes: 108 additions & 25 deletions zipkin-junit/src/main/java/zipkin/junit/ZipkinDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package zipkin.junit;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import javax.annotation.Nullable;
Expand All @@ -30,23 +31,28 @@
import zipkin.collector.Collector;
import zipkin.collector.CollectorMetrics;
import zipkin.internal.V2JsonSpanDecoder;
import zipkin.internal.V2StorageComponent;
import zipkin.internal.v2.codec.Encoder;
import zipkin.internal.v2.internal.Platform;
import zipkin.storage.Callback;
import zipkin.storage.QueryRequest;
import zipkin.storage.SpanStore;
import zipkin.storage.StorageComponent;

import static zipkin.internal.Util.lowerHexToUnsignedLong;

final class ZipkinDispatcher extends Dispatcher {
static final long DEFAULT_LOOKBACK = 86400000L; // 1 day in millis
static final SpanDecoder JSON2_DECODER = new V2JsonSpanDecoder();

private final SpanStore store;
private final zipkin.internal.v2.storage.SpanStore store2;
private final Collector consumer;
private final CollectorMetrics metrics;
private final MockWebServer server;

ZipkinDispatcher(StorageComponent storage, CollectorMetrics metrics, MockWebServer server) {
ZipkinDispatcher(V2StorageComponent storage, CollectorMetrics metrics, MockWebServer server) {
this.store = storage.spanStore();
this.store2 = storage.v2SpanStore();
this.consumer = Collector.builder(getClass()).storage(storage).metrics(metrics).build();
this.metrics = metrics;
this.server = server;
Expand All @@ -58,27 +64,14 @@ public MockResponse dispatch(RecordedRequest request) {
if (request.getMethod().equals("GET")) {
if (url.encodedPath().equals("/health")) {
return new MockResponse().setBody("OK\n");
} else if (url.encodedPath().equals("/api/v1/services")) {
return jsonResponse(Codec.JSON.writeStrings(store.getServiceNames()));
} else if (url.encodedPath().equals("/api/v1/spans")) {
String serviceName = url.queryParameter("serviceName");
return jsonResponse(Codec.JSON.writeStrings(store.getSpanNames(serviceName)));
} else if (url.encodedPath().equals("/api/v1/dependencies")) {
Long endTs = maybeLong(url.queryParameter("endTs"));
Long lookback = maybeLong(url.queryParameter("lookback"));
List<DependencyLink> result = store.getDependencies(endTs, lookback);
return jsonResponse(Codec.JSON.writeDependencyLinks(result));
} else if (url.encodedPath().equals("/api/v1/traces")) {
QueryRequest queryRequest = toQueryRequest(url);
return jsonResponse(Codec.JSON.writeTraces(store.getTraces(queryRequest)));
} else if (url.encodedPath().startsWith("/api/v1/trace/")) {
String traceIdHex = url.encodedPath().replace("/api/v1/trace/", "");
long traceIdHigh = traceIdHex.length() == 32 ? lowerHexToUnsignedLong(traceIdHex, 0) : 0L;
long traceIdLow = lowerHexToUnsignedLong(traceIdHex);
List<Span> trace = url.queryParameterNames().contains("raw")
? store.getRawTrace(traceIdHigh, traceIdLow)
: store.getTrace(traceIdHigh, traceIdLow);
if (trace != null) return jsonResponse(Codec.JSON.writeSpans(trace));
} else if (url.encodedPath().startsWith("/api/v1/")) {
return queryV1(url);
} else if (url.encodedPath().startsWith("/api/v2/")) {
try {
return queryV2(url);
} catch (IOException e) {
throw Platform.get().uncheckedIOException(e);
}
}
} else if (request.getMethod().equals("POST")) {
if (url.encodedPath().equals("/api/v1/spans")) {
Expand All @@ -96,6 +89,81 @@ public MockResponse dispatch(RecordedRequest request) {
return new MockResponse().setResponseCode(404);
}

MockResponse queryV1(HttpUrl url) {
if (url.encodedPath().equals("/api/v1/services")) {
return jsonResponse(Codec.JSON.writeStrings(store.getServiceNames()));
} else if (url.encodedPath().equals("/api/v1/spans")) {
String serviceName = url.queryParameter("serviceName");
return jsonResponse(Codec.JSON.writeStrings(store.getSpanNames(serviceName)));
} else if (url.encodedPath().equals("/api/v1/dependencies")) {
Long endTs = maybeLong(url.queryParameter("endTs"));
Long lookback = maybeLong(url.queryParameter("lookback"));
List<DependencyLink> result = store.getDependencies(endTs, lookback);
return jsonResponse(Codec.JSON.writeDependencyLinks(result));
} else if (url.encodedPath().equals("/api/v1/traces")) {
QueryRequest queryRequest = toQueryRequest(url);
return jsonResponse(Codec.JSON.writeTraces(store.getTraces(queryRequest)));
} else if (url.encodedPath().startsWith("/api/v1/trace/")) {
String traceIdHex = url.encodedPath().replace("/api/v1/trace/", "");
long traceIdHigh = traceIdHex.length() == 32 ? lowerHexToUnsignedLong(traceIdHex, 0) : 0L;
long traceIdLow = lowerHexToUnsignedLong(traceIdHex);
List<Span> trace = url.queryParameterNames().contains("raw")
? store.getRawTrace(traceIdHigh, traceIdLow)
: store.getTrace(traceIdHigh, traceIdLow);
if (trace != null) return jsonResponse(Codec.JSON.writeSpans(trace));
}
return new MockResponse().setResponseCode(404);
}

MockResponse queryV2(HttpUrl url) throws IOException {
if (url.encodedPath().equals("/api/v2/services")) {
return jsonResponse(Codec.JSON.writeStrings(store2.getServiceNames().execute()));
} else if (url.encodedPath().equals("/api/v2/spans")) {
String serviceName = url.queryParameter("serviceName");
return jsonResponse(Codec.JSON.writeStrings(store2.getSpanNames(serviceName).execute()));
} else if (url.encodedPath().equals("/api/v2/dependencies")) {
Long endTs = maybeLong(url.queryParameter("endTs"));
Long lookback = maybeLong(url.queryParameter("lookback"));
List<DependencyLink> result = store2.getDependencies(
endTs != null ? endTs : System.currentTimeMillis(),
lookback != null ? lookback : DEFAULT_LOOKBACK
).execute();
return jsonResponse(Codec.JSON.writeDependencyLinks(result));
} else if (url.encodedPath().equals("/api/v2/traces")) {
List<List<zipkin.internal.v2.Span>> traces = store2.getTraces(toQueryRequest2(url)).execute();
ByteArrayOutputStream bout = new ByteArrayOutputStream();
bout.write('[');
for (int i = 0, length = traces.size(); i < length; ) {
List<zipkin.internal.v2.Span> trace = traces.get(i);
writeTrace(bout, trace);
if (++i < length) bout.write(',');
}
bout.write(']');
return jsonResponse(bout.toByteArray());
} else if (url.encodedPath().startsWith("/api/v2/trace/")) {
String traceIdHex = url.encodedPath().replace("/api/v2/trace/", "");
long traceIdHigh = traceIdHex.length() == 32 ? lowerHexToUnsignedLong(traceIdHex, 0) : 0L;
long traceIdLow = lowerHexToUnsignedLong(traceIdHex);
List<zipkin.internal.v2.Span> trace = store2.getTrace(traceIdHigh, traceIdLow).execute();
if (!trace.isEmpty()) {
ByteArrayOutputStream bout = new ByteArrayOutputStream();
writeTrace(bout, trace);
return jsonResponse(bout.toByteArray());
}
}
return new MockResponse().setResponseCode(404);
}

static void writeTrace(ByteArrayOutputStream bout, List<zipkin.internal.v2.Span> trace)
throws IOException {
bout.write('[');
for (int i = 0, length = trace.size(); i < length; ) {
bout.write(Encoder.JSON.encode(trace.get(i)));
if (++i < length) bout.write(',');
}
bout.write(']');
}

MockResponse acceptSpans(RecordedRequest request, SpanDecoder decoder) {
metrics.incrementMessages();
byte[] body = request.getBody().readByteArray();
Expand Down Expand Up @@ -137,11 +205,26 @@ static QueryRequest toQueryRequest(HttpUrl url) {
.limit(maybeInteger(url.queryParameter("limit"))).build();
}

static Long maybeLong(@Nullable String input) {
static zipkin.internal.v2.storage.QueryRequest toQueryRequest2(HttpUrl url) {
Long endTs = maybeLong(url.queryParameter("endTs"));
Long lookback = maybeLong(url.queryParameter("lookback"));
Integer limit = maybeInteger(url.queryParameter("limit"));
return zipkin.internal.v2.storage.QueryRequest.newBuilder()
.serviceName(url.queryParameter("serviceName"))
.spanName(url.queryParameter("spanName"))
.parseAnnotationQuery(url.queryParameter("annotationQuery"))
.minDuration(maybeLong(url.queryParameter("minDuration")))
.maxDuration(maybeLong(url.queryParameter("maxDuration")))
.endTs(endTs != null ? endTs : System.currentTimeMillis())
.lookback(lookback != null ? lookback : DEFAULT_LOOKBACK)
.limit(limit != null ? limit : 10).build();
}

static @Nullable Long maybeLong(@Nullable String input) {
return input != null ? Long.valueOf(input) : null;
}

static Integer maybeInteger(@Nullable String input) {
static @Nullable Integer maybeInteger(@Nullable String input) {
return input != null ? Integer.valueOf(input) : null;
}

Expand Down
27 changes: 27 additions & 0 deletions zipkin-junit/src/test/java/zipkin/junit/v2/HttpException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin.junit.v2;

final class HttpException extends RuntimeException {
final int code;

HttpException(String message, int code) {
super(message);
this.code = code;
}

int code() {
return code;
}
}
134 changes: 134 additions & 0 deletions zipkin-junit/src/test/java/zipkin/junit/v2/HttpV2Call.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/**
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin.junit.v2;

import java.io.Closeable;
import java.io.IOException;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okhttp3.internal.http.HttpHeaders;
import okio.BufferedSource;
import okio.GzipSource;
import okio.Okio;
import zipkin.internal.v2.Call;
import zipkin.internal.v2.Callback;

import static zipkin.internal.Util.propagateIfFatal;

public final class HttpV2Call<V> extends Call<V> {

public interface BodyConverter<V> {
V convert(BufferedSource content) throws IOException;
}

public static class Factory implements Closeable {
final OkHttpClient ok;
public final HttpUrl baseUrl;

public Factory(OkHttpClient ok, HttpUrl baseUrl) {
this.ok = ok;
this.baseUrl = baseUrl;
}

public <V> HttpV2Call<V> newCall(Request request, BodyConverter<V> bodyConverter) {
return new HttpV2Call<>(this, request, bodyConverter);
}

@Override public void close() {
ok.dispatcher().executorService().shutdownNow();
}
}

final okhttp3.Call call;
final BodyConverter<V> bodyConverter;

HttpV2Call(Factory factory, Request request, BodyConverter<V> bodyConverter) {
this(factory.ok.newCall(request), bodyConverter);
}

HttpV2Call(okhttp3.Call call, BodyConverter<V> bodyConverter) {
this.call = call;
this.bodyConverter = bodyConverter;
}

@Override public V execute() throws IOException {
return parseResponse(call.execute(), bodyConverter);
}

@Override public void enqueue(Callback<V> delegate) {
call.enqueue(new CallbackAdapter<>(bodyConverter, delegate));
}

@Override public void cancel() {
call.cancel();
}

@Override public boolean isCanceled() {
return call.isCanceled();
}

@Override public HttpV2Call<V> clone() {
return new HttpV2Call<>(call.clone(), bodyConverter);
}

static class CallbackAdapter<V> implements okhttp3.Callback {
final BodyConverter<V> bodyConverter;
final Callback<V> delegate;

CallbackAdapter(BodyConverter<V> bodyConverter, Callback<V> delegate) {
this.bodyConverter = bodyConverter;
this.delegate = delegate;
}

@Override public void onFailure(okhttp3.Call call, IOException e) {
delegate.onError(e);
}

/** Note: this runs on the {@link okhttp3.OkHttpClient#dispatcher() dispatcher} thread! */
@Override public void onResponse(okhttp3.Call call, Response response) {
try {
delegate.onSuccess(parseResponse(response, bodyConverter));
} catch (Throwable e) {
propagateIfFatal(e);
delegate.onError(e);
}
}
}

static <V> V parseResponse(Response response, BodyConverter<V> bodyConverter) throws IOException {
if (!HttpHeaders.hasBody(response)) {
if (response.isSuccessful()) {
return null;
} else {
throw new HttpException("response failed: " + response, response.code());
}
}
try (ResponseBody responseBody = response.body()) {
BufferedSource content = responseBody.source();
if ("gzip".equalsIgnoreCase(response.header("Content-Encoding"))) {
content = Okio.buffer(new GzipSource(responseBody.source()));
}
if (response.isSuccessful()) {
return bodyConverter.convert(content);
} else {
String tag = response.request().tag().toString();
throw new HttpException("response for " + tag + " failed: " + content.readUtf8(),
response.code());
}
}
}
}
49 changes: 49 additions & 0 deletions zipkin-junit/src/test/java/zipkin/junit/v2/HttpV2SpanConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin.junit.v2;

import java.util.List;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okio.Buffer;
import zipkin.internal.v2.Span;
import zipkin.internal.v2.codec.Encoder;
import zipkin.internal.v2.storage.SpanConsumer;

/** Implements the span consumer interface by forwarding requests over http. */
final class HttpV2SpanConsumer implements SpanConsumer {
final HttpV2Call.Factory factory;

HttpV2SpanConsumer(OkHttpClient client, HttpUrl baseUrl) {
this.factory = new HttpV2Call.Factory(client, baseUrl);
}

@Override public zipkin.internal.v2.Call<Void> accept(List<Span> spans) {
Buffer json = new Buffer();
json.writeByte('[');
for (int i = 0, length = spans.size(); i < length; ) {
json.write(Encoder.JSON.encode(spans.get(i)));
if (++i < length) json.writeByte(',');
}
json.writeByte(']');
return factory.newCall(new Request.Builder()
.url(factory.baseUrl.resolve("/api/v2/spans"))
.post(RequestBody.create(MediaType.parse("application/json"), json.readByteArray())).build(),
b -> null /* void */
);
}
}
Loading

0 comments on commit 9fdf25e

Please sign in to comment.