Skip to content

Commit

Permalink
Refactor instrumentation to be pluggable, defaulting to OTel
Browse files Browse the repository at this point in the history
  • Loading branch information
swallez committed Aug 21, 2023
1 parent 2e94c7e commit 9bf3327
Show file tree
Hide file tree
Showing 12 changed files with 628 additions and 701 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@
import javax.annotation.Nullable;
import javax.net.ssl.SSLException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -231,12 +231,8 @@ private String queryString(Request request, TransportOptions options) {
.entrySet()
.stream()
.map(e -> {
try {
return URLEncoder.encode(e.getKey(), "UTF-8") + "=" +
URLEncoder.encode(e.getValue(), "UTF-8");
} catch(UnsupportedEncodingException ex) {
throw new RuntimeException(ex);
}
return URLEncoder.encode(e.getKey(), StandardCharsets.UTF_8) + "=" +
URLEncoder.encode(e.getValue(), StandardCharsets.UTF_8);
})
.collect(Collectors.joining("&"));
}
Expand Down
6 changes: 3 additions & 3 deletions java-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ dependencies {
// the Java API client coexists with a 7.x HLRC work fine
val elasticsearchVersion = "7.17.7"
val jacksonVersion = "2.13.3"
val openTelemetryVersion = "1.27.0"
val openTelemetryVersion = "1.29.0"

// Apache 2.0
// https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-low.html
Expand All @@ -206,8 +206,8 @@ dependencies {
// Apache 2.0
// https://github.com/open-telemetry/opentelemetry-java
implementation("io.opentelemetry", "opentelemetry-api", openTelemetryVersion)
implementation("io.opentelemetry", "opentelemetry-semconv", "$openTelemetryVersion-alpha")

// Use it once it's stable (see Instrumentation.java). Limited to tests for now.
testImplementation("io.opentelemetry", "opentelemetry-semconv", "$openTelemetryVersion-alpha")

// EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// https://github.com/eclipse-ee4j/jsonb-api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import co.elastic.clients.transport.endpoints.BooleanResponse;
import co.elastic.clients.transport.http.HeaderMap;
import co.elastic.clients.transport.http.TransportHttpClient;
import co.elastic.clients.transport.instrumentation.Instrumentation;
import co.elastic.clients.transport.instrumentation.NoopInstrumentation;
import co.elastic.clients.transport.instrumentation.OpenTelemetryForElasticsearch;
import co.elastic.clients.util.LanguageRuntimeVersions;
import co.elastic.clients.util.ApiTypeHelper;
import co.elastic.clients.util.BinaryData;
Expand Down Expand Up @@ -74,6 +77,7 @@ public abstract class ElasticsearchTransportBase implements ElasticsearchTranspo
}

private final TransportHttpClient httpClient;
private final Instrumentation instrumentation;

@Override
public void close() throws IOException {
Expand All @@ -84,9 +88,27 @@ public void close() throws IOException {
protected final TransportOptions transportOptions;

public ElasticsearchTransportBase(TransportHttpClient httpClient, TransportOptions options, JsonpMapper jsonpMapper) {
this(httpClient, options, jsonpMapper, null);
}

public ElasticsearchTransportBase(
TransportHttpClient httpClient,
TransportOptions options,
JsonpMapper jsonpMapper,
@Nullable Instrumentation instrumentation
) {
this.mapper = jsonpMapper;
this.httpClient = httpClient;
this.transportOptions = httpClient.createOptions(options);

// If no instrumentation is provided, fallback to OpenTelemetry and ultimately noop
if (instrumentation == null) {
instrumentation = OpenTelemetryForElasticsearch.getDefault();
}
if (instrumentation == null) {
instrumentation = NoopInstrumentation.INSTANCE;
}
this.instrumentation = instrumentation;
}

@Override
Expand All @@ -105,10 +127,25 @@ public final <RequestT, ResponseT, ErrorT> ResponseT performRequest(
Endpoint<RequestT, ResponseT, ErrorT> endpoint,
@Nullable TransportOptions options
) throws IOException {
TransportOptions opts = options == null ? transportOptions : options;
TransportHttpClient.Request req = prepareTransportRequest(request, endpoint);
TransportHttpClient.Response resp = httpClient.performRequest(endpoint.id(), null, req, opts);
return getApiResponse(resp, endpoint);
try (Instrumentation.Context ctx = instrumentation.newContext(request, endpoint)) {
try (Instrumentation.ThreadScope ts = ctx.makeCurrent()) {

TransportOptions opts = options == null ? transportOptions : options;
TransportHttpClient.Request req = prepareTransportRequest(request, endpoint);
ctx.beforeSendingHttpRequest(req, options);

TransportHttpClient.Response resp = httpClient.performRequest(endpoint.id(), null, req, opts);
ctx.afterReceivingHttpResponse(resp);

ResponseT apiResponse = getApiResponse(resp, endpoint);
ctx.afterDecodingApiResponse(apiResponse);

return apiResponse;
} catch (Throwable throwable){
ctx.recordException(throwable);
throw throwable;
}
}
}

@Override
Expand All @@ -117,12 +154,17 @@ public final <RequestT, ResponseT, ErrorT> CompletableFuture<ResponseT> performR
Endpoint<RequestT, ResponseT, ErrorT> endpoint,
@Nullable TransportOptions options
) {
Instrumentation.Context ctx = instrumentation.newContext(request, endpoint);

TransportOptions opts = options == null ? transportOptions : options;
TransportHttpClient.Request clientReq;
try {
try (Instrumentation.ThreadScope ss = ctx.makeCurrent()) {
clientReq = prepareTransportRequest(request, endpoint);
ctx.beforeSendingHttpRequest(clientReq, options);
} catch (Exception e) {
// Terminate early
ctx.recordException(e);
ctx.close();
CompletableFuture<ResponseT> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
Expand All @@ -148,17 +190,27 @@ public boolean cancel(boolean mayInterruptIfRunning) {
};

clientFuture.handle((clientResp, thr) -> {
if (thr != null) {
future.completeExceptionally(thr);
} else {
try (ApiTypeHelper.DisabledChecksHandle h =
ApiTypeHelper.DANGEROUS_disableRequiredPropertiesCheck(disableRequiredChecks)) {

ResponseT response = getApiResponse(clientResp, endpoint);
future.complete(response);
try (Instrumentation.ThreadScope ts = ctx.makeCurrent()) {
if (thr != null) {
// Exception executing the http request
ctx.recordException(thr);
ctx.close();
future.completeExceptionally(thr);

} catch (Throwable e) {
future.completeExceptionally(e);
} else {
try (ApiTypeHelper.DisabledChecksHandle h =
ApiTypeHelper.DANGEROUS_disableRequiredPropertiesCheck(disableRequiredChecks)) {
ctx.afterReceivingHttpResponse(clientResp);
ResponseT response = getApiResponse(clientResp, endpoint);
ctx.afterDecodingApiResponse(response);
future.complete(response);

} catch (Throwable e) {
ctx.recordException(e);
future.completeExceptionally(e);
} finally {
ctx.close();
}
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package co.elastic.clients.transport.instrumentation;

import co.elastic.clients.transport.Endpoint;
import co.elastic.clients.transport.TransportOptions;
import co.elastic.clients.transport.http.TransportHttpClient;

public interface Instrumentation {

<TRequest> Context newContext(TRequest request, Endpoint<TRequest, ?, ?> endpoint);

interface Context extends AutoCloseable {
ThreadScope makeCurrent();

void beforeSendingHttpRequest(TransportHttpClient.Request httpRequest, TransportOptions options);
void afterReceivingHttpResponse(TransportHttpClient.Response httpResponse);
<TResponse> void afterDecodingApiResponse(TResponse apiResponse);
void recordException(Throwable thr);

@Override
void close();
}

interface ThreadScope extends AutoCloseable {
@Override
void close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package co.elastic.clients.transport.instrumentation;

import co.elastic.clients.transport.Endpoint;
import co.elastic.clients.transport.TransportOptions;
import co.elastic.clients.transport.http.TransportHttpClient;

/**
* A no-operation instrumentation. Used when no instrumentation has been set. It can also be used to
* bypass OpenTelemetry automatic discovery.
*/
public class NoopInstrumentation implements Instrumentation {

public static NoopInstrumentation INSTANCE = new NoopInstrumentation();

private NoopInstrumentation() {}

@Override
public <TRequest> Context newContext(TRequest request, Endpoint<TRequest, ?, ?> endpoint) {
return CONTEXT;
}

private static final NoopContext CONTEXT = new NoopContext();
private static final NoopScope SCOPE = new NoopScope();

private static class NoopContext implements Instrumentation.Context {
@Override
public ThreadScope makeCurrent() {
return SCOPE;
}

@Override
public void beforeSendingHttpRequest(TransportHttpClient.Request httpRequest, TransportOptions options) {}

@Override
public void afterReceivingHttpResponse(TransportHttpClient.Response httpResponse) {}

@Override
public <TResponse> void afterDecodingApiResponse(TResponse apiResponse) {}

@Override
public void recordException(Throwable thr) {}

@Override
public void close() {}
}

private static class NoopScope implements Instrumentation.ThreadScope {
@Override
public void close() {}
}
}
Loading

0 comments on commit 9bf3327

Please sign in to comment.