Skip to content

Commit

Permalink
Changes the secondary sampling tests to use http abstraction
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrian Cole committed Sep 10, 2019
1 parent e2bd688 commit 4a132a1
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package brave.features.propagation;
package brave.http.features.secondary_sampling;

import brave.propagation.ExtraFieldPlugin;
import brave.propagation.ExtraFieldPropagation.FieldUpdater;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package brave.features.propagation;
package brave.http.features.secondary_sampling;

import brave.propagation.ExtraFieldPlugin;
import brave.propagation.TraceContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package brave.features.propagation;
package brave.http.features.secondary_sampling;

import brave.handler.MutableSpan;
import brave.propagation.ExtraFieldPlugin;
Expand All @@ -21,6 +21,11 @@
import java.util.Collections;
import java.util.List;

/**
* Secondary sampling is top-level feature of Brave at the moment, but it can be built as an {@link
* ExtraFieldPlugin}. Those wanting to implement secondary sampling would install this plugin as a
* base.
*/
final class SecondarySamplingPlugin extends ExtraFieldPlugin {
private static final SecondarySamplingPlugin DEFAULT = new SecondarySamplingPlugin();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package brave.features.propagation;
package brave.http.features.secondary_sampling;

import brave.Tracing;
import brave.TracingCustomizer;
import brave.features.propagation.CustomerSupportSamplingPlugin.SetMultimap;
import brave.http.features.secondary_sampling.CustomerSupportSamplingPlugin.SetMultimap;
import brave.propagation.B3SinglePropagation;
import brave.propagation.ExtraFieldCustomizer;
import brave.propagation.ExtraFieldPlugin;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,19 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package brave.features.propagation;
package brave.http.features.secondary_sampling;

import brave.Span;
import brave.Span.Kind;
import brave.Tracer;
import brave.Tracing;
import brave.propagation.TraceContext.Extractor;
import brave.propagation.TraceContext.Injector;
import brave.propagation.TraceContextOrSamplingFlags;
import brave.http.HttpClientHandler;
import brave.http.HttpClientRequest;
import brave.http.HttpClientResponse;
import brave.http.HttpServerHandler;
import brave.http.HttpServerRequest;
import brave.http.HttpServerResponse;
import brave.http.HttpTracing;
import brave.propagation.CurrentTraceContext;
import brave.propagation.CurrentTraceContext.Scope;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -77,44 +81,126 @@ TracedNode create(String serviceName, BiPredicate<String, String> routeFunction)
final String localServiceName;
final BiPredicate<String, String> routeFunction;
final List<TracedNode> downstream = new ArrayList<>();
final Tracing tracing;
final Extractor<Map<String, String>> extractor;
final Injector<Map<String, String>> injector;
final CurrentTraceContext current;
final HttpClientHandler<HttpClientRequest, HttpClientResponse> clientHandler;
final HttpServerHandler<HttpServerRequest, HttpServerResponse> serverHandler;

TracedNode(String localServiceName, Function<String, Tracing> tracingFunction,
BiPredicate<String, String> routeFunction) {
this.localServiceName = localServiceName;
this.routeFunction = routeFunction;
this.tracing = tracingFunction.apply(localServiceName);
this.extractor = tracing.propagation().extractor(Map::get);
this.injector = tracing.propagation().injector(Map::put);
Tracing tracing = tracingFunction.apply(localServiceName);
this.current = tracing.currentTraceContext();
HttpTracing httpTracing = HttpTracing.create(tracingFunction.apply(localServiceName));
this.serverHandler = HttpServerHandler.create(httpTracing);
this.clientHandler = HttpClientHandler.create(httpTracing);
}

TracedNode addDownStream(TracedNode downstream) {
this.downstream.add(downstream);
return this;
}

void execute(String endpoint, Map<String, String> incoming) {
Tracer tracer = tracing.tracer();
TraceContextOrSamplingFlags extracted = extractor.extract(incoming);
Span server = extracted.context() != null
? tracer.joinSpan(extracted.context())
: tracer.nextSpan(extracted);
server.kind(Kind.SERVER).name(endpoint).start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(server)) {
void execute(String path, Map<String, String> headers) {
ServerRequest serverRequest = new ServerRequest(path, headers);
Span span = serverHandler.handleReceive(serverRequest);
try (Scope ws = current.newScope(span.context())) {
for (TracedNode down : downstream) {
if (routeFunction.test(endpoint, down.localServiceName)) callDownstream(down, endpoint);
if (routeFunction.test(serverRequest.path, down.localServiceName)) {
callDownstream(down, serverRequest.path);
}
}
}
server.finish();
serverHandler.handleSend(new ServerResponse(), null, span);
}

void callDownstream(TracedNode down, String endpoint) {
Span client = tracing.tracer().nextSpan().name(endpoint).kind(Kind.CLIENT).start();
Map<String, String> request = new LinkedHashMap<>();
injector.inject(client.context(), request);
down.execute(endpoint, request);
client.finish();
ClientRequest clientRequest = new ClientRequest(endpoint);
Span span = clientHandler.handleSend(clientRequest);
down.execute(endpoint, clientRequest.headers);
clientHandler.handleReceive(new ClientResponse(), null, span);
}

static final class ServerRequest extends HttpServerRequest {
final String path;
final Map<String, String> headers;

ServerRequest(String path, Map<String, String> headers) {
this.path = path;
this.headers = headers;
}

@Override public Object unwrap() {
return this;
}

@Override public String method() {
return "GET";
}

@Override public String path() {
return path;
}

@Override public String url() {
return null;
}

@Override public String header(String name) {
return headers.get(name);
}
}

static final class ServerResponse extends HttpServerResponse {
@Override public Object unwrap() {
return this;
}

@Override public int statusCode() {
return 200;
}
}

static final class ClientRequest extends HttpClientRequest {
final String path;
final Map<String, String> headers = new LinkedHashMap<>();

ClientRequest(String path) {
this.path = path;
}

@Override public Object unwrap() {
return this;
}

@Override public String method() {
return "GET";
}

@Override public String path() {
return path;
}

@Override public String url() {
return null;
}

@Override public String header(String name) {
return headers.get(name);
}

@Override public void header(String name, String value) {
headers.put(name, value);
}
}

static final class ClientResponse extends HttpClientResponse {
@Override public Object unwrap() {
return this;
}

@Override public int statusCode() {
return 200;
}
}
}

0 comments on commit 4a132a1

Please sign in to comment.