From 413cc038e9e65bfeeb02226640fe02eb6acf24ca Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Wed, 30 Aug 2017 15:58:39 +0800 Subject: [PATCH] WIP v2 http query api --- .../java/zipkin/junit/ZipkinDispatcher.java | 151 +++++++++++---- .../test/java/zipkin/junit/v2/HttpV2Call.java | 133 +++++++++++++ .../zipkin/junit/v2/HttpV2SpanConsumer.java | 49 +++++ .../java/zipkin/junit/v2/HttpV2SpanStore.java | 82 ++++++++ .../java/zipkin/junit/v2/HttpV2Storage.java | 63 +++++++ .../java/zipkin/junit/v2/ITHttpV2Storage.java | 50 +++++ .../zipkin/server/EnableZipkinServer.java | 11 +- .../java/zipkin/server/ZipkinQueryApiV2.java | 176 ++++++++++++++++++ .../zipkin/internal/v2/codec/Decoder.java | 7 + .../internal/v2/codec/Span2JsonAdapters.java | 24 +++ 10 files changed, 710 insertions(+), 36 deletions(-) create mode 100644 zipkin-junit/src/test/java/zipkin/junit/v2/HttpV2Call.java create mode 100644 zipkin-junit/src/test/java/zipkin/junit/v2/HttpV2SpanConsumer.java create mode 100644 zipkin-junit/src/test/java/zipkin/junit/v2/HttpV2SpanStore.java create mode 100644 zipkin-junit/src/test/java/zipkin/junit/v2/HttpV2Storage.java create mode 100644 zipkin-junit/src/test/java/zipkin/junit/v2/ITHttpV2Storage.java create mode 100644 zipkin-server/src/main/java/zipkin/server/ZipkinQueryApiV2.java diff --git a/zipkin-junit/src/main/java/zipkin/junit/ZipkinDispatcher.java b/zipkin-junit/src/main/java/zipkin/junit/ZipkinDispatcher.java index a69f165251e..2adc3c04050 100644 --- a/zipkin-junit/src/main/java/zipkin/junit/ZipkinDispatcher.java +++ b/zipkin-junit/src/main/java/zipkin/junit/ZipkinDispatcher.java @@ -13,6 +13,7 @@ */ package zipkin.junit; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.List; import javax.annotation.Nullable; @@ -30,23 +31,28 @@ import zipkin.collector.Collector; import zipkin.collector.CollectorMetrics; import zipkin.internal.V2JsonSpanDecoder; +import zipkin.internal.V2StorageComponent; +import zipkin.internal.v2.codec.Encoder; +import zipkin.internal.v2.internal.Platform; import zipkin.storage.Callback; import zipkin.storage.QueryRequest; import zipkin.storage.SpanStore; -import zipkin.storage.StorageComponent; import static zipkin.internal.Util.lowerHexToUnsignedLong; final class ZipkinDispatcher extends Dispatcher { + static final long DEFAULT_LOOKBACK = 86400000L; // 1 day in millis static final SpanDecoder JSON2_DECODER = new V2JsonSpanDecoder(); private final SpanStore store; + private final zipkin.internal.v2.storage.SpanStore store2; private final Collector consumer; private final CollectorMetrics metrics; private final MockWebServer server; - ZipkinDispatcher(StorageComponent storage, CollectorMetrics metrics, MockWebServer server) { + ZipkinDispatcher(V2StorageComponent storage, CollectorMetrics metrics, MockWebServer server) { this.store = storage.spanStore(); + this.store2 = storage.v2SpanStore(); this.consumer = Collector.builder(getClass()).storage(storage).metrics(metrics).build(); this.metrics = metrics; this.server = server; @@ -58,27 +64,14 @@ public MockResponse dispatch(RecordedRequest request) { if (request.getMethod().equals("GET")) { if (url.encodedPath().equals("/health")) { return new MockResponse().setBody("OK\n"); - } else if (url.encodedPath().equals("/api/v1/services")) { - return jsonResponse(Codec.JSON.writeStrings(store.getServiceNames())); - } else if (url.encodedPath().equals("/api/v1/spans")) { - String serviceName = url.queryParameter("serviceName"); - return jsonResponse(Codec.JSON.writeStrings(store.getSpanNames(serviceName))); - } else if (url.encodedPath().equals("/api/v1/dependencies")) { - Long endTs = maybeLong(url.queryParameter("endTs")); - Long lookback = maybeLong(url.queryParameter("lookback")); - List result = store.getDependencies(endTs, lookback); - return jsonResponse(Codec.JSON.writeDependencyLinks(result)); - } else if (url.encodedPath().equals("/api/v1/traces")) { - QueryRequest queryRequest = toQueryRequest(url); - return jsonResponse(Codec.JSON.writeTraces(store.getTraces(queryRequest))); - } else if (url.encodedPath().startsWith("/api/v1/trace/")) { - String traceIdHex = url.encodedPath().replace("/api/v1/trace/", ""); - long traceIdHigh = traceIdHex.length() == 32 ? lowerHexToUnsignedLong(traceIdHex, 0) : 0L; - long traceIdLow = lowerHexToUnsignedLong(traceIdHex); - List trace = url.queryParameterNames().contains("raw") - ? store.getRawTrace(traceIdHigh, traceIdLow) - : store.getTrace(traceIdHigh, traceIdLow); - if (trace != null) return jsonResponse(Codec.JSON.writeSpans(trace)); + } else if (url.encodedPath().startsWith("/api/v1/")) { + return queryV1(url); + } else if (url.encodedPath().startsWith("/api/v2/")) { + try { + return queryV2(url); + } catch (IOException e) { + throw Platform.get().uncheckedIOException(e); + } } } else if (request.getMethod().equals("POST")) { if (url.encodedPath().equals("/api/v1/spans")) { @@ -96,6 +89,81 @@ public MockResponse dispatch(RecordedRequest request) { return new MockResponse().setResponseCode(404); } + MockResponse queryV1(HttpUrl url) { + if (url.encodedPath().equals("/api/v1/services")) { + return jsonResponse(Codec.JSON.writeStrings(store.getServiceNames())); + } else if (url.encodedPath().equals("/api/v1/spans")) { + String serviceName = url.queryParameter("serviceName"); + return jsonResponse(Codec.JSON.writeStrings(store.getSpanNames(serviceName))); + } else if (url.encodedPath().equals("/api/v1/dependencies")) { + Long endTs = maybeLong(url.queryParameter("endTs")); + Long lookback = maybeLong(url.queryParameter("lookback")); + List result = store.getDependencies(endTs, lookback); + return jsonResponse(Codec.JSON.writeDependencyLinks(result)); + } else if (url.encodedPath().equals("/api/v1/traces")) { + QueryRequest queryRequest = toQueryRequest(url); + return jsonResponse(Codec.JSON.writeTraces(store.getTraces(queryRequest))); + } else if (url.encodedPath().startsWith("/api/v1/trace/")) { + String traceIdHex = url.encodedPath().replace("/api/v1/trace/", ""); + long traceIdHigh = traceIdHex.length() == 32 ? lowerHexToUnsignedLong(traceIdHex, 0) : 0L; + long traceIdLow = lowerHexToUnsignedLong(traceIdHex); + List trace = url.queryParameterNames().contains("raw") + ? store.getRawTrace(traceIdHigh, traceIdLow) + : store.getTrace(traceIdHigh, traceIdLow); + if (trace != null) return jsonResponse(Codec.JSON.writeSpans(trace)); + } + return new MockResponse().setResponseCode(404); + } + + MockResponse queryV2(HttpUrl url) throws IOException { + if (url.encodedPath().equals("/api/v2/services")) { + return jsonResponse(Codec.JSON.writeStrings(store2.getServiceNames().execute())); + } else if (url.encodedPath().equals("/api/v2/spans")) { + String serviceName = url.queryParameter("serviceName"); + return jsonResponse(Codec.JSON.writeStrings(store2.getSpanNames(serviceName).execute())); + } else if (url.encodedPath().equals("/api/v2/dependencies")) { + Long endTs = maybeLong(url.queryParameter("endTs")); + Long lookback = maybeLong(url.queryParameter("lookback")); + List result = store2.getDependencies( + endTs != null ? endTs : System.currentTimeMillis(), + lookback != null ? lookback : DEFAULT_LOOKBACK + ).execute(); + return jsonResponse(Codec.JSON.writeDependencyLinks(result)); + } else if (url.encodedPath().equals("/api/v2/traces")) { + List> traces = store2.getTraces(toQueryRequest2(url)).execute(); + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + bout.write('['); + for (int i = 0, iLength = traces.size(); i < iLength; ) { + List trace = traces.get(i); + writeTrace(bout, trace); + if (++i < iLength) bout.write(','); + } + bout.write(']'); + return jsonResponse(bout.toByteArray()); + } else if (url.encodedPath().startsWith("/api/v2/trace/")) { + String traceIdHex = url.encodedPath().replace("/api/v2/trace/", ""); + long traceIdHigh = traceIdHex.length() == 32 ? lowerHexToUnsignedLong(traceIdHex, 0) : 0L; + long traceIdLow = lowerHexToUnsignedLong(traceIdHex); + List trace = store2.getTrace(traceIdHigh, traceIdLow).execute(); + if (!trace.isEmpty()) { + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + writeTrace(bout, trace); + return jsonResponse(bout.toByteArray()); + } + } + return new MockResponse().setResponseCode(404); + } + + static void writeTrace(ByteArrayOutputStream bout, List trace) + throws IOException { + bout.write('['); + for (int i = 0, length = trace.size(); i < length; ) { + bout.write(Encoder.JSON.encode(trace.get(i))); + if (++i < length) bout.write(','); + } + bout.write(']'); + } + MockResponse acceptSpans(RecordedRequest request, SpanDecoder decoder) { metrics.incrementMessages(); byte[] body = request.getBody().readByteArray(); @@ -128,26 +196,41 @@ MockResponse acceptSpans(RecordedRequest request, SpanDecoder decoder) { static QueryRequest toQueryRequest(HttpUrl url) { return QueryRequest.builder().serviceName(url.queryParameter("serviceName")) - .spanName(url.queryParameter("spanName")) - .parseAnnotationQuery(url.queryParameter("annotationQuery")) - .minDuration(maybeLong(url.queryParameter("minDuration"))) - .maxDuration(maybeLong(url.queryParameter("maxDuration"))) - .endTs(maybeLong(url.queryParameter("endTs"))) - .lookback(maybeLong(url.queryParameter("lookback"))) - .limit(maybeInteger(url.queryParameter("limit"))).build(); + .spanName(url.queryParameter("spanName")) + .parseAnnotationQuery(url.queryParameter("annotationQuery")) + .minDuration(maybeLong(url.queryParameter("minDuration"))) + .maxDuration(maybeLong(url.queryParameter("maxDuration"))) + .endTs(maybeLong(url.queryParameter("endTs"))) + .lookback(maybeLong(url.queryParameter("lookback"))) + .limit(maybeInteger(url.queryParameter("limit"))).build(); + } + + static zipkin.internal.v2.storage.QueryRequest toQueryRequest2(HttpUrl url) { + Long endTs = maybeLong(url.queryParameter("endTs")); + Long lookback = maybeLong(url.queryParameter("lookback")); + Integer limit = maybeInteger(url.queryParameter("limit")); + return zipkin.internal.v2.storage.QueryRequest.newBuilder() + .serviceName(url.queryParameter("serviceName")) + .spanName(url.queryParameter("spanName")) + .parseAnnotationQuery(url.queryParameter("annotationQuery")) + .minDuration(maybeLong(url.queryParameter("minDuration"))) + .maxDuration(maybeLong(url.queryParameter("maxDuration"))) + .endTs(endTs != null ? endTs : System.currentTimeMillis()) + .lookback(lookback != null ? lookback : DEFAULT_LOOKBACK) + .limit(limit != null ? limit : 10).build(); } - static Long maybeLong(@Nullable String input) { + static @Nullable Long maybeLong(@Nullable String input) { return input != null ? Long.valueOf(input) : null; } - static Integer maybeInteger(@Nullable String input) { + static @Nullable Integer maybeInteger(@Nullable String input) { return input != null ? Integer.valueOf(input) : null; } static MockResponse jsonResponse(byte[] content) { return new MockResponse() - .addHeader("Content-Type", "application/json") - .setBody(new Buffer().write(content)); + .addHeader("Content-Type", "application/json") + .setBody(new Buffer().write(content)); } } diff --git a/zipkin-junit/src/test/java/zipkin/junit/v2/HttpV2Call.java b/zipkin-junit/src/test/java/zipkin/junit/v2/HttpV2Call.java new file mode 100644 index 00000000000..aa243b54d00 --- /dev/null +++ b/zipkin-junit/src/test/java/zipkin/junit/v2/HttpV2Call.java @@ -0,0 +1,133 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.junit.v2; + +import java.io.Closeable; +import java.io.IOException; +import okhttp3.HttpUrl; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.ResponseBody; +import okhttp3.internal.http.HttpHeaders; +import okio.BufferedSource; +import okio.GzipSource; +import okio.Okio; +import zipkin.internal.v2.Call; +import zipkin.internal.v2.Callback; + +import static zipkin.internal.Util.propagateIfFatal; + +public final class HttpV2Call extends Call { + + public interface BodyConverter { + V convert(BufferedSource content) throws IOException; + } + + public static class Factory implements Closeable { + final OkHttpClient ok; + public final HttpUrl baseUrl; + + public Factory(OkHttpClient ok, HttpUrl baseUrl) { + this.ok = ok; + this.baseUrl = baseUrl; + } + + public HttpV2Call newCall(Request request, BodyConverter bodyConverter) { + return new HttpV2Call<>(this, request, bodyConverter); + } + + @Override public void close() { + ok.dispatcher().executorService().shutdownNow(); + } + } + + final okhttp3.Call call; + final BodyConverter bodyConverter; + + HttpV2Call(Factory factory, Request request, BodyConverter bodyConverter) { + this(factory.ok.newCall(request), bodyConverter); + } + + HttpV2Call(okhttp3.Call call, BodyConverter bodyConverter) { + this.call = call; + this.bodyConverter = bodyConverter; + } + + @Override public V execute() throws IOException { + return parseResponse(call.execute(), bodyConverter); + } + + @Override public void enqueue(Callback delegate) { + call.enqueue(new CallbackAdapter<>(bodyConverter, delegate)); + } + + @Override public void cancel() { + call.cancel(); + } + + @Override public boolean isCanceled() { + return call.isCanceled(); + } + + @Override public HttpV2Call clone() { + return new HttpV2Call<>(call.clone(), bodyConverter); + } + + static class CallbackAdapter implements okhttp3.Callback { + final BodyConverter bodyConverter; + final Callback delegate; + + CallbackAdapter(BodyConverter bodyConverter, Callback delegate) { + this.bodyConverter = bodyConverter; + this.delegate = delegate; + } + + @Override public void onFailure(okhttp3.Call call, IOException e) { + delegate.onError(e); + } + + /** Note: this runs on the {@link okhttp3.OkHttpClient#dispatcher() dispatcher} thread! */ + @Override public void onResponse(okhttp3.Call call, Response response) { + try { + delegate.onSuccess(parseResponse(response, bodyConverter)); + } catch (Throwable e) { + propagateIfFatal(e); + delegate.onError(e); + } + } + } + + static V parseResponse(Response response, BodyConverter bodyConverter) throws IOException { + if (!HttpHeaders.hasBody(response)) { + if (response.isSuccessful()) { + return null; + } else { + throw new IllegalStateException("response failed: " + response); + } + } + try (ResponseBody responseBody = response.body()) { + BufferedSource content = responseBody.source(); + if ("gzip".equalsIgnoreCase(response.header("Content-Encoding"))) { + content = Okio.buffer(new GzipSource(responseBody.source())); + } + if (response.isSuccessful()) { + return bodyConverter.convert(content); + } else { + throw new IllegalStateException( + "response for " + response.request().tag() + " failed: " + content.readUtf8()); + } + } + } +} diff --git a/zipkin-junit/src/test/java/zipkin/junit/v2/HttpV2SpanConsumer.java b/zipkin-junit/src/test/java/zipkin/junit/v2/HttpV2SpanConsumer.java new file mode 100644 index 00000000000..2b51ee20ad0 --- /dev/null +++ b/zipkin-junit/src/test/java/zipkin/junit/v2/HttpV2SpanConsumer.java @@ -0,0 +1,49 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.junit.v2; + +import java.util.List; +import okhttp3.HttpUrl; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okio.Buffer; +import zipkin.internal.v2.Span; +import zipkin.internal.v2.codec.Encoder; +import zipkin.internal.v2.storage.SpanConsumer; + +/** Implements the span consumer interface by forwarding requests over http. */ +final class HttpV2SpanConsumer implements SpanConsumer { + final HttpV2Call.Factory factory; + + HttpV2SpanConsumer(OkHttpClient client, HttpUrl baseUrl) { + this.factory = new HttpV2Call.Factory(client, baseUrl); + } + + @Override public zipkin.internal.v2.Call accept(List spans) { + Buffer json = new Buffer(); + json.writeByte('['); + for (int i = 0, length = spans.size(); i < length; ) { + json.write(Encoder.JSON.encode(spans.get(i))); + if (++i < length) json.writeByte(','); + } + json.writeByte(']'); + return factory.newCall(new Request.Builder() + .url(factory.baseUrl.resolve("/api/v2/spans")) + .post(RequestBody.create(MediaType.parse("application/json"), json.readByteArray())).build(), + b -> null /* void */ + ); + } +} diff --git a/zipkin-junit/src/test/java/zipkin/junit/v2/HttpV2SpanStore.java b/zipkin-junit/src/test/java/zipkin/junit/v2/HttpV2SpanStore.java new file mode 100644 index 00000000000..b56b1cfdc70 --- /dev/null +++ b/zipkin-junit/src/test/java/zipkin/junit/v2/HttpV2SpanStore.java @@ -0,0 +1,82 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.junit.v2; + +import java.util.List; +import javax.annotation.Nullable; +import okhttp3.HttpUrl; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import zipkin.Codec; +import zipkin.DependencyLink; +import zipkin.internal.Util; +import zipkin.internal.v2.Call; +import zipkin.internal.v2.Span; +import zipkin.internal.v2.codec.Decoder; +import zipkin.internal.v2.storage.QueryRequest; +import zipkin.internal.v2.storage.SpanStore; + +/** Implements the span store interface by forwarding requests over http. */ +final class HttpV2SpanStore implements SpanStore { + final HttpV2Call.Factory factory; + + HttpV2SpanStore(OkHttpClient client, HttpUrl baseUrl) { + this.factory = new HttpV2Call.Factory(client, baseUrl); + } + + @Override public Call>> getTraces(QueryRequest request) { + HttpUrl.Builder url = factory.baseUrl.newBuilder("/api/v2/traces"); + maybeAddQueryParam(url, "serviceName", request.serviceName()); + maybeAddQueryParam(url, "spanName", request.spanName()); + maybeAddQueryParam(url, "annotationQuery", request.annotationQueryString()); + maybeAddQueryParam(url, "minDuration", request.minDuration()); + maybeAddQueryParam(url, "maxDuration", request.maxDuration()); + maybeAddQueryParam(url, "endTs", request.endTs()); + maybeAddQueryParam(url, "lookback", request.lookback()); + maybeAddQueryParam(url, "limit", request.limit()); + return factory.newCall(new Request.Builder().url(url.build()).build(), + content -> Decoder.JSON.decodeNestedList(content.readByteArray())); + } + + @Override public Call> getTrace(long traceIdHigh, long traceIdLow) { + String traceIdHex = Util.toLowerHex(traceIdHigh, traceIdLow); + return factory.newCall(new Request.Builder() + .url(factory.baseUrl.resolve("/api/v2/trace/" + traceIdHex)) + .build(), content -> Decoder.JSON.decodeList(content.readByteArray())); + } + + @Override + public Call> getServiceNames() { + return factory.newCall(new Request.Builder() + .url(factory.baseUrl.resolve("/api/v2/services")) + .build(), content -> Codec.JSON.readStrings(content.readByteArray())); + } + + @Override + public Call> getSpanNames(String serviceName) { + return factory.newCall(new Request.Builder() + .url(factory.baseUrl.resolve("/api/v2/spans?serviceName=" + serviceName)) + .build(), content -> Codec.JSON.readStrings(content.readByteArray())); + } + + @Override public Call> getDependencies(long endTs, long lookback) { + return factory.newCall(new Request.Builder() + .url(factory.baseUrl.resolve("/api/v2/dependencies?endTs=" + endTs + "&lookback=" + lookback)) + .build(), content -> Codec.JSON.readDependencyLinks(content.readByteArray())); + } + + void maybeAddQueryParam(HttpUrl.Builder builder, String name, @Nullable Object value) { + if (value != null) builder.addQueryParameter(name, value.toString()); + } +} diff --git a/zipkin-junit/src/test/java/zipkin/junit/v2/HttpV2Storage.java b/zipkin-junit/src/test/java/zipkin/junit/v2/HttpV2Storage.java new file mode 100644 index 00000000000..0cf2b038e97 --- /dev/null +++ b/zipkin-junit/src/test/java/zipkin/junit/v2/HttpV2Storage.java @@ -0,0 +1,63 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.junit.v2; + +import okhttp3.HttpUrl; +import okhttp3.OkHttpClient; +import zipkin.internal.V2StorageComponent; +import zipkin.internal.v2.storage.SpanConsumer; +import zipkin.internal.v2.storage.SpanStore; + +/** + * Test storage component that forwards requests to an HTTP endpoint. + * + *

Note: this inherits the {@link Builder#strictTraceId(boolean)} from the backend. + */ +final class HttpV2Storage extends V2StorageComponent { + private final OkHttpClient client; + private final HttpUrl baseUrl; + private final HttpV2SpanStore spanStore; + private final HttpV2SpanConsumer spanConsumer; + + /** + * @param baseUrl Ex "http://localhost:9411" + */ + HttpV2Storage(String baseUrl) { + this.client = new OkHttpClient(); + this.baseUrl = HttpUrl.parse(baseUrl); + this.spanStore = new HttpV2SpanStore(this.client, this.baseUrl); + this.spanConsumer = new HttpV2SpanConsumer(this.client, this.baseUrl); + } + + @Override public SpanStore v2SpanStore() { + return spanStore; + } + + @Override public SpanConsumer v2SpanConsumer() { + return spanConsumer; + } + + @Override public CheckResult check() { + try { + spanStore.getServiceNames(); + } catch (RuntimeException e) { + return CheckResult.failed(e); + } + return CheckResult.OK; + } + + @Override public void close() { + client.connectionPool().evictAll(); + } +} diff --git a/zipkin-junit/src/test/java/zipkin/junit/v2/ITHttpV2Storage.java b/zipkin-junit/src/test/java/zipkin/junit/v2/ITHttpV2Storage.java new file mode 100644 index 00000000000..7acb4209f0f --- /dev/null +++ b/zipkin-junit/src/test/java/zipkin/junit/v2/ITHttpV2Storage.java @@ -0,0 +1,50 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.junit.v2; + +import java.io.IOException; +import org.junit.Rule; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import zipkin.junit.ZipkinRule; + +@RunWith(Enclosed.class) +public class ITHttpV2Storage { + + public static class DependenciesTest extends zipkin.storage.DependenciesTest { + @Rule public ZipkinRule server = new ZipkinRule(); + HttpV2Storage storage = new HttpV2Storage(server.httpUrl()); + + @Override protected HttpV2Storage storage() { + return storage; + } + + @Override public void clear() { + // no need.. the test rule does this + } + } + + public static class SpanStoreTest extends zipkin.storage.SpanStoreTest { + @Rule public ZipkinRule server = new ZipkinRule(); + HttpV2Storage storage = new HttpV2Storage(server.httpUrl()); + + @Override protected HttpV2Storage storage() { + return storage; + } + + @Override public void clear() throws IOException { + // no need.. the test rule does this + } + } +} diff --git a/zipkin-server/src/main/java/zipkin/server/EnableZipkinServer.java b/zipkin-server/src/main/java/zipkin/server/EnableZipkinServer.java index 57e78222908..6f0d2f54d9a 100644 --- a/zipkin-server/src/main/java/zipkin/server/EnableZipkinServer.java +++ b/zipkin-server/src/main/java/zipkin/server/EnableZipkinServer.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -19,12 +19,19 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import org.springframework.context.annotation.Import; +import zipkin.autoconfigure.ui.ZipkinUiAutoConfiguration; import zipkin.server.brave.BraveConfiguration; @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented -@Import({ZipkinServerConfiguration.class, BraveConfiguration.class, ZipkinQueryApiV1.class, ZipkinHttpCollector.class}) +@Import({ + ZipkinServerConfiguration.class, + BraveConfiguration.class, + ZipkinQueryApiV1.class, + ZipkinQueryApiV2.class, + ZipkinHttpCollector.class +}) public @interface EnableZipkinServer { } diff --git a/zipkin-server/src/main/java/zipkin/server/ZipkinQueryApiV2.java b/zipkin-server/src/main/java/zipkin/server/ZipkinQueryApiV2.java new file mode 100644 index 00000000000..c183b548900 --- /dev/null +++ b/zipkin-server/src/main/java/zipkin/server/ZipkinQueryApiV2.java @@ -0,0 +1,176 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.server; + +import com.squareup.moshi.JsonWriter; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import okio.Buffer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.http.CacheControl; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.CrossOrigin; +import org.springframework.web.bind.annotation.ExceptionHandler; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.ResponseStatus; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.context.request.WebRequest; +import zipkin.Codec; +import zipkin.DependencyLink; +import zipkin.internal.V2StorageComponent; +import zipkin.internal.v2.Call; +import zipkin.internal.v2.Span; +import zipkin.internal.v2.codec.Encoder; +import zipkin.internal.v2.storage.QueryRequest; + +import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE; +import static zipkin.internal.Util.lowerHexToUnsignedLong; + +@RestController +@RequestMapping("/api/v2") +@CrossOrigin("${zipkin.query.allowed-origins:*}") +@ConditionalOnProperty(name = "zipkin.query.enabled", matchIfMissing = true) +@ConditionalOnBean(V2StorageComponent.class) +public class ZipkinQueryApiV2 { + + @Autowired + @Value("${zipkin.query.lookback:86400000}") + long defaultLookback = 86400000; // 1 day in millis + + /** The Cache-Control max-age (seconds) for /api/v1/services and /api/v1/spans */ + @Value("${zipkin.query.names-max-age:300}") + int namesMaxAge = 300; // 5 minutes + volatile int serviceCount; // used as a threshold to start returning cache-control headers + + private final V2StorageComponent storage; + + @Autowired ZipkinQueryApiV2(V2StorageComponent storage) { + this.storage = storage; // don't cache spanStore here as it can cause the app to crash! + } + + @RequestMapping(value = "/dependencies", method = RequestMethod.GET, produces = APPLICATION_JSON_VALUE) + public byte[] getDependencies( + @RequestParam(value = "endTs", required = true) long endTs, + @Nullable @RequestParam(value = "lookback", required = false) Long lookback + ) throws IOException { + Call> call = storage.v2SpanStore() + .getDependencies(endTs, lookback != null ? lookback : defaultLookback); + return Codec.JSON.writeDependencyLinks(call.execute()); + } + + @RequestMapping(value = "/services", method = RequestMethod.GET) + public ResponseEntity> getServiceNames() throws IOException { + List serviceNames = storage.v2SpanStore().getServiceNames().execute(); + serviceCount = serviceNames.size(); + return maybeCacheNames(serviceNames); + } + + @RequestMapping(value = "/spans", method = RequestMethod.GET) + public ResponseEntity> getSpanNames( + @RequestParam(value = "serviceName", required = true) String serviceName + ) throws IOException { + return maybeCacheNames(storage.v2SpanStore().getSpanNames(serviceName).execute()); + } + + @RequestMapping(value = "/traces", method = RequestMethod.GET, produces = APPLICATION_JSON_VALUE) + public String getTraces( + @Nullable @RequestParam(value = "serviceName", required = false) String serviceName, + @Nullable @RequestParam(value = "spanName", required = false) String spanName, + @Nullable @RequestParam(value = "annotationQuery", required = false) String annotationQuery, + @Nullable @RequestParam(value = "minDuration", required = false) Long minDuration, + @Nullable @RequestParam(value = "maxDuration", required = false) Long maxDuration, + @Nullable @RequestParam(value = "endTs", required = false) Long endTs, + @Nullable @RequestParam(value = "lookback", required = false) Long lookback, + @RequestParam(value = "limit", defaultValue = "10") int limit + ) throws IOException { + QueryRequest queryRequest = QueryRequest.newBuilder() + .serviceName(serviceName) + .spanName(spanName) + .parseAnnotationQuery(annotationQuery) + .minDuration(minDuration) + .maxDuration(maxDuration) + .endTs(endTs != null ? endTs : System.currentTimeMillis()) + .lookback(lookback != null ? lookback : defaultLookback) + .limit(limit).build(); + + List> traces = storage.v2SpanStore().getTraces(queryRequest).execute(); + Buffer buffer = new Buffer(); + JsonWriter writer = JsonWriter.of(buffer); + writer.beginArray(); + for (int i = 0, iLength = traces.size(); i < iLength; i++) { + writer.beginArray(); + List trace = traces.get(i); + for (int j = 0, jLength = trace.size(); j < jLength; j++) { + buffer.write(Encoder.JSON.encode(trace.get(j))); + if (j < jLength) buffer.writeByte(','); + } + writer.endArray(); + } + writer.endArray(); + return buffer.readUtf8(); + } + + @RequestMapping(value = "/trace/{traceIdHex}", method = RequestMethod.GET, produces = APPLICATION_JSON_VALUE) + public String getTrace(@PathVariable String traceIdHex, WebRequest request) throws IOException { + long traceIdHigh = traceIdHex.length() == 32 ? lowerHexToUnsignedLong(traceIdHex, 0) : 0L; + long traceIdLow = lowerHexToUnsignedLong(traceIdHex); + List trace = storage.v2SpanStore().getTrace(traceIdHigh, traceIdLow).execute(); + if (trace.isEmpty()) { + throw new TraceNotFoundException(traceIdHex, traceIdHigh, traceIdLow); + } + Buffer buffer = new Buffer(); + JsonWriter writer = JsonWriter.of(buffer); + writer.beginArray(); + for (int i = 0, length = trace.size(); i < length; i++) { + buffer.write(Encoder.JSON.encode(trace.get(i))); + if (i < length) buffer.writeByte(','); + } + writer.endArray(); + return buffer.readUtf8(); + } + + @ExceptionHandler(TraceNotFoundException.class) + @ResponseStatus(HttpStatus.NOT_FOUND) + public void notFound() { + } + + static class TraceNotFoundException extends RuntimeException { + TraceNotFoundException(String traceIdHex, long traceIdHigh, long traceId) { + super(String.format("Cannot find trace for id=%s, parsed value=%s", traceIdHex, + traceIdHigh != 0 ? traceIdHigh + "," + traceId : traceId)); + } + } + + /** + * We cache names if there are more than 3 services. This helps people getting started: if we + * cache empty results, users have more questions. We assume caching becomes a concern when zipkin + * is in active use, and active use usually implies more than 3 services. + */ + ResponseEntity> maybeCacheNames(List names) { + ResponseEntity.BodyBuilder response = ResponseEntity.ok(); + if (serviceCount > 3) { + response.cacheControl(CacheControl.maxAge(namesMaxAge, TimeUnit.SECONDS).mustRevalidate()); + } + return response.body(names); + } +} diff --git a/zipkin/src/main/java/zipkin/internal/v2/codec/Decoder.java b/zipkin/src/main/java/zipkin/internal/v2/codec/Decoder.java index 45e64b3adcc..8e7586bb37a 100644 --- a/zipkin/src/main/java/zipkin/internal/v2/codec/Decoder.java +++ b/zipkin/src/main/java/zipkin/internal/v2/codec/Decoder.java @@ -29,10 +29,17 @@ public interface Decoder { @Override public List decodeList(byte[] span) { return JsonCodec.readList(new Span2JsonAdapters.Span2Reader(), span); } + + @Override public List> decodeNestedList(byte[] span) { + return JsonCodec.readList(new Span2JsonAdapters.Span2ListReader(), span); + } }; Encoding encoding(); /** throws {@linkplain IllegalArgumentException} if the spans couldn't be decoded */ List decodeList(byte[] span); + + /** throws {@linkplain IllegalArgumentException} if the spans couldn't be decoded */ + List> decodeNestedList(byte[] span); } diff --git a/zipkin/src/main/java/zipkin/internal/v2/codec/Span2JsonAdapters.java b/zipkin/src/main/java/zipkin/internal/v2/codec/Span2JsonAdapters.java index 9779e9813f2..97d397440b1 100644 --- a/zipkin/src/main/java/zipkin/internal/v2/codec/Span2JsonAdapters.java +++ b/zipkin/src/main/java/zipkin/internal/v2/codec/Span2JsonAdapters.java @@ -17,7 +17,10 @@ import com.google.gson.stream.JsonToken; import com.google.gson.stream.MalformedJsonException; import java.io.IOException; +import java.util.Collections; import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import zipkin.Annotation; import zipkin.Endpoint; @@ -342,4 +345,25 @@ static final class Span2Reader implements JsonReaderAdapter { b.writeAscii(",\"value\":\"").writeJsonEscaped(value.value).writeAscii("\"}"); } }; + + static final class Span2ListReader implements JsonReaderAdapter> { + Span2Reader spanReader; + + @Override public List fromJson(JsonReader reader) throws IOException { + reader.beginArray(); + if (!reader.hasNext()) { + reader.endArray(); + return Collections.emptyList(); + } + List result = new LinkedList<>(); // because we don't know how long it will be + if (spanReader == null) spanReader = new Span2Reader(); + while (reader.hasNext()) result.add(spanReader.fromJson(reader)); + reader.endArray(); + return result; + } + + @Override public String toString() { + return "List"; + } + } }