-
Notifications
You must be signed in to change notification settings - Fork 3.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Adrian Cole
committed
Aug 30, 2017
1 parent
fc98b91
commit 413cc03
Showing
10 changed files
with
710 additions
and
36 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
133 changes: 133 additions & 0 deletions
133
zipkin-junit/src/test/java/zipkin/junit/v2/HttpV2Call.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
/** | ||
* Copyright 2015-2017 The OpenZipkin Authors | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except | ||
* in compliance with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License | ||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express | ||
* or implied. See the License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
package zipkin.junit.v2; | ||
|
||
import java.io.Closeable; | ||
import java.io.IOException; | ||
import okhttp3.HttpUrl; | ||
import okhttp3.OkHttpClient; | ||
import okhttp3.Request; | ||
import okhttp3.Response; | ||
import okhttp3.ResponseBody; | ||
import okhttp3.internal.http.HttpHeaders; | ||
import okio.BufferedSource; | ||
import okio.GzipSource; | ||
import okio.Okio; | ||
import zipkin.internal.v2.Call; | ||
import zipkin.internal.v2.Callback; | ||
|
||
import static zipkin.internal.Util.propagateIfFatal; | ||
|
||
public final class HttpV2Call<V> extends Call<V> { | ||
|
||
public interface BodyConverter<V> { | ||
V convert(BufferedSource content) throws IOException; | ||
} | ||
|
||
public static class Factory implements Closeable { | ||
final OkHttpClient ok; | ||
public final HttpUrl baseUrl; | ||
|
||
public Factory(OkHttpClient ok, HttpUrl baseUrl) { | ||
this.ok = ok; | ||
this.baseUrl = baseUrl; | ||
} | ||
|
||
public <V> HttpV2Call<V> newCall(Request request, BodyConverter<V> bodyConverter) { | ||
return new HttpV2Call<>(this, request, bodyConverter); | ||
} | ||
|
||
@Override public void close() { | ||
ok.dispatcher().executorService().shutdownNow(); | ||
} | ||
} | ||
|
||
final okhttp3.Call call; | ||
final BodyConverter<V> bodyConverter; | ||
|
||
HttpV2Call(Factory factory, Request request, BodyConverter<V> bodyConverter) { | ||
this(factory.ok.newCall(request), bodyConverter); | ||
} | ||
|
||
HttpV2Call(okhttp3.Call call, BodyConverter<V> bodyConverter) { | ||
this.call = call; | ||
this.bodyConverter = bodyConverter; | ||
} | ||
|
||
@Override public V execute() throws IOException { | ||
return parseResponse(call.execute(), bodyConverter); | ||
} | ||
|
||
@Override public void enqueue(Callback<V> delegate) { | ||
call.enqueue(new CallbackAdapter<>(bodyConverter, delegate)); | ||
} | ||
|
||
@Override public void cancel() { | ||
call.cancel(); | ||
} | ||
|
||
@Override public boolean isCanceled() { | ||
return call.isCanceled(); | ||
} | ||
|
||
@Override public HttpV2Call<V> clone() { | ||
return new HttpV2Call<>(call.clone(), bodyConverter); | ||
} | ||
|
||
static class CallbackAdapter<V> implements okhttp3.Callback { | ||
final BodyConverter<V> bodyConverter; | ||
final Callback<V> delegate; | ||
|
||
CallbackAdapter(BodyConverter<V> bodyConverter, Callback<V> delegate) { | ||
this.bodyConverter = bodyConverter; | ||
this.delegate = delegate; | ||
} | ||
|
||
@Override public void onFailure(okhttp3.Call call, IOException e) { | ||
delegate.onError(e); | ||
} | ||
|
||
/** Note: this runs on the {@link okhttp3.OkHttpClient#dispatcher() dispatcher} thread! */ | ||
@Override public void onResponse(okhttp3.Call call, Response response) { | ||
try { | ||
delegate.onSuccess(parseResponse(response, bodyConverter)); | ||
} catch (Throwable e) { | ||
propagateIfFatal(e); | ||
delegate.onError(e); | ||
} | ||
} | ||
} | ||
|
||
static <V> V parseResponse(Response response, BodyConverter<V> bodyConverter) throws IOException { | ||
if (!HttpHeaders.hasBody(response)) { | ||
if (response.isSuccessful()) { | ||
return null; | ||
} else { | ||
throw new IllegalStateException("response failed: " + response); | ||
} | ||
} | ||
try (ResponseBody responseBody = response.body()) { | ||
BufferedSource content = responseBody.source(); | ||
if ("gzip".equalsIgnoreCase(response.header("Content-Encoding"))) { | ||
content = Okio.buffer(new GzipSource(responseBody.source())); | ||
} | ||
if (response.isSuccessful()) { | ||
return bodyConverter.convert(content); | ||
} else { | ||
throw new IllegalStateException( | ||
"response for " + response.request().tag() + " failed: " + content.readUtf8()); | ||
} | ||
} | ||
} | ||
} |
49 changes: 49 additions & 0 deletions
49
zipkin-junit/src/test/java/zipkin/junit/v2/HttpV2SpanConsumer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
/** | ||
* Copyright 2015-2016 The OpenZipkin Authors | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except | ||
* in compliance with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License | ||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express | ||
* or implied. See the License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
package zipkin.junit.v2; | ||
|
||
import java.util.List; | ||
import okhttp3.HttpUrl; | ||
import okhttp3.MediaType; | ||
import okhttp3.OkHttpClient; | ||
import okhttp3.Request; | ||
import okhttp3.RequestBody; | ||
import okio.Buffer; | ||
import zipkin.internal.v2.Span; | ||
import zipkin.internal.v2.codec.Encoder; | ||
import zipkin.internal.v2.storage.SpanConsumer; | ||
|
||
/** Implements the span consumer interface by forwarding requests over http. */ | ||
final class HttpV2SpanConsumer implements SpanConsumer { | ||
final HttpV2Call.Factory factory; | ||
|
||
HttpV2SpanConsumer(OkHttpClient client, HttpUrl baseUrl) { | ||
this.factory = new HttpV2Call.Factory(client, baseUrl); | ||
} | ||
|
||
@Override public zipkin.internal.v2.Call<Void> accept(List<Span> spans) { | ||
Buffer json = new Buffer(); | ||
json.writeByte('['); | ||
for (int i = 0, length = spans.size(); i < length; ) { | ||
json.write(Encoder.JSON.encode(spans.get(i))); | ||
if (++i < length) json.writeByte(','); | ||
} | ||
json.writeByte(']'); | ||
return factory.newCall(new Request.Builder() | ||
.url(factory.baseUrl.resolve("/api/v2/spans")) | ||
.post(RequestBody.create(MediaType.parse("application/json"), json.readByteArray())).build(), | ||
b -> null /* void */ | ||
); | ||
} | ||
} |
Oops, something went wrong.