Skip to content

Commit

Permalink
refactored instrumentation
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Wert <[email protected]>
  • Loading branch information
AlexanderWert committed Jul 4, 2023
1 parent 73cab12 commit d81f7dc
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 42 deletions.
2 changes: 1 addition & 1 deletion java-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ dependencies {
// the Java API client coexists with a 7.x HLRC work fine
val elasticsearchVersion = "7.17.7"
val jacksonVersion = "2.13.3"
val openTelemetryVersion = "1.26.0"
val openTelemetryVersion = "1.27.0"

// Apache 2.0
// https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-low.html
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ public interface Endpoint<RequestT, ResponseT, ErrorT> {
*/
String requestUrl(RequestT request);

/**
* Get the route for a request (i.e. URL pattern).
*/
String route(RequestT request);

/**
* Get the path parameters for a request.
*/
Expand Down Expand Up @@ -116,7 +111,6 @@ default BinaryEndpoint<RequestT> withBinaryResponse() {
this.id(),
this::method,
this::requestUrl,
this::route,
this::pathParameters,
this::queryParameters,
this::headers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public BinaryEndpoint(
String id,
Function<RequestT, String> method,
Function<RequestT, String> requestUrl,
Function<RequestT, String> route,
Function<RequestT,
Map<String, String>> pathParameters,
Function<RequestT,
Expand All @@ -37,14 +36,13 @@ public BinaryEndpoint(
Function<RequestT, Object> body,
Object ignored // same number of arguments as SimpleEndpoint
) {
super(id, method, requestUrl, route, pathParameters, queryParameters, headers, body);
super(id, method, requestUrl, pathParameters, queryParameters, headers, body);
}

public BinaryEndpoint(
String id,
Function<RequestT, String> method,
Function<RequestT, String> requestUrl,
Function<RequestT, String> route,
Function<RequestT,
Map<String, String>> pathParameters,
Function<RequestT,
Expand All @@ -53,7 +51,7 @@ public BinaryEndpoint(
boolean hasRequestBody,
Object ignored // same number of arguments as SimpleEndpoint
) {
super(id, method, requestUrl, route, pathParameters, queryParameters, headers, hasRequestBody ? returnSelf() : returnNull());
super(id, method, requestUrl, pathParameters, queryParameters, headers, hasRequestBody ? returnSelf() : returnNull());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public BooleanEndpoint(
String id,
Function<RequestT, String> method,
Function<RequestT, String> requestUrl,
Function<RequestT, String> route,
Function<RequestT,
Map<String, String>> pathParameters,
Function<RequestT,
Expand All @@ -37,7 +36,7 @@ public BooleanEndpoint(
boolean hasRequestBody,
Object ignored // same number of arguments as SimpleEndpoint
) {
super(id, method, requestUrl, route, pathParameters, queryParameters, headers, hasRequestBody ? returnSelf() : returnNull());
super(id, method, requestUrl, pathParameters, queryParameters, headers, hasRequestBody ? returnSelf() : returnNull());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ public String requestUrl(Req request) {
return endpoint.requestUrl(request);
}

@Override
public String route(Req request) {
return endpoint.route(request);
}

@Override
public Map<String, String> pathParameters(Req request) {
return endpoint.pathParameters(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ static <T, U> Function<T, U> returnSelf() {
protected final String id;
protected final Function<RequestT, String> method;
protected final Function<RequestT, String> requestUrl;
protected final Function<RequestT, String> route;
protected final Function<RequestT, Map<String, String>> pathParameters;
protected final Function<RequestT, Map<String, String>> queryParameters;
protected final Function<RequestT, Map<String, String>> headers;
Expand All @@ -74,7 +73,6 @@ public EndpointBase(
String id,
Function<RequestT, String> method,
Function<RequestT, String> requestUrl,
Function<RequestT, String> route,
Function<RequestT, Map<String, String>> pathParameters,
Function<RequestT, Map<String, String>> queryParameters,
Function<RequestT, Map<String, String>> headers,
Expand All @@ -83,7 +81,6 @@ public EndpointBase(
this.id = id;
this.method = method;
this.requestUrl = requestUrl;
this.route = route;
this.pathParameters = pathParameters;
this.queryParameters = queryParameters;
this.headers = headers;
Expand All @@ -105,11 +102,6 @@ public String requestUrl(RequestT request) {
return this.requestUrl.apply(request);
}

@Override
public String route(RequestT request) {
return this.route.apply(request);
}

@Override
public Map<String, String> pathParameters(RequestT request) {
return this.pathParameters.apply(request);
Expand Down Expand Up @@ -149,7 +141,6 @@ public <NewResponseT> SimpleEndpoint<RequestT, NewResponseT> withResponseDeseria
id,
method,
requestUrl,
route,
pathParameters,
queryParameters,
headers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,20 @@ public SimpleEndpoint(
String id,
Function<RequestT, String> method,
Function<RequestT, String> requestUrl,
Function<RequestT, String> route,
Function<RequestT, Map<String, String>> pathParameters,
Function<RequestT, Map<String, String>> queryParameters,
Function<RequestT, Map<String, String>> headers,
Function<RequestT, Object> body,
JsonpDeserializer<ResponseT> responseParser
) {
super(id, method, requestUrl, route, pathParameters, queryParameters, headers, body);
super(id, method, requestUrl, pathParameters, queryParameters, headers, body);
this.responseParser = responseParser;
}

public SimpleEndpoint(
String id,
Function<RequestT, String> method,
Function<RequestT, String> requestUrl,
Function<RequestT, String> route,
Function<RequestT, Map<String, String>> pathParameters,
Function<RequestT, Map<String, String>> queryParameters,
Function<RequestT, Map<String, String>> headers,
Expand All @@ -62,7 +60,6 @@ public SimpleEndpoint(
id,
method,
requestUrl,
route,
pathParameters,
queryParameters,
headers,
Expand All @@ -88,7 +85,6 @@ public <NewResponseT> SimpleEndpoint<RequestT, NewResponseT> withResponseDeseria
id,
method,
requestUrl,
route,
pathParameters,
queryParameters,
headers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public SimpleJsonEndpoint(
String id,
Function<RequestT, String> method,
Function<RequestT, String> requestUrl,
Function<RequestT, String> route,
Function<RequestT,
Map<String, String>> pathParameters,
Function<RequestT,
Expand All @@ -42,6 +41,6 @@ public SimpleJsonEndpoint(
boolean hasRequestBody,
JsonpDeserializer<ResponseT> responseParser
) {
super(id, method, requestUrl, route, pathParameters, queryParameters, headers, hasRequestBody, responseParser);
super(id, method, requestUrl, pathParameters, queryParameters, headers, hasRequestBody, responseParser);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package co.elastic.clients.transport.rest_client;

import co.elastic.clients.transport.Endpoint;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.elasticsearch.client.Response;

import javax.annotation.Nullable;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class Instrumentation {

private static final Set<String> SEARCH_ENDPOINTS = new HashSet<>(Arrays.asList(
"render_search_template",
"terms_enum",
"msearch_template",
"eql.search",
"msearch",
"search_template",
"async_search.submit",
"search"));


// these reflect the config options in the OTel Java agent
private static final boolean INSTRUMENTATION_ENABLED =
Boolean.parseBoolean(
ConfigUtil.getConfigOption("otel.instrumentation.elasticsearch.enabled", "true"));

private static final boolean CAPTURE_SEARCH_BODY =
Boolean.parseBoolean(
ConfigUtil.getConfigOption("otel.instrumentation.elasticsearch.capture-search-query", "false"));

private static final Log logger = LogFactory.getLog(Instrumentation.class);

private final Tracer tracer;

protected Instrumentation(@Nullable OpenTelemetry openTelemetry) {
if (openTelemetry == null) {
openTelemetry = GlobalOpenTelemetry.get();
}

tracer = openTelemetry.getTracer("elasticsearch-api");
}

protected <RequestT, ResponseT, ErrorT> Span createSpanForRequest(RequestT request,
Endpoint<RequestT, ResponseT, ErrorT> endpoint) {
if (!INSTRUMENTATION_ENABLED) {
return Span.getInvalid();
}

Span span = tracer.spanBuilder(endpoint.id()).setSpanKind(SpanKind.CLIENT).startSpan();
if (isInvalidSpan(span)) {
span.setAttribute(OTelAttributes.DB_SYSTEM, "elasticsearch");
span.setAttribute(OTelAttributes.DB_OPERATION, endpoint.id());
span.setAttribute(OTelAttributes.HTTP_REQUEST_METHOD, endpoint.method(request));

for (Map.Entry<String, String> pathParamEntry : endpoint.pathParameters(request).entrySet()) {
String attributeKey = OTelAttributes.PATH_PART_PREFIX + pathParamEntry.getKey();
span.setAttribute(AttributeKey.stringKey(attributeKey), pathParamEntry.getValue());
}
}

return span;
}

protected void captureResponseInformation(@Nullable Span span, Response response) {
if (isInvalidSpan(span)) {
return;
}

HttpHost host = response.getHost();
String uri = response.getRequestLine().getUri();
uri = uri.startsWith("/") ? uri : "/" + uri;
String fullUrl = host.toURI() + uri;

span.setAttribute(OTelAttributes.URL_FULL, fullUrl);
span.setAttribute(OTelAttributes.SERVER_PORT, host.getPort());

InetAddress hostAddress = response.getHost().getAddress();
if (hostAddress != null) {
span.setAttribute(OTelAttributes.SERVER_ADDRESS, hostAddress.getHostAddress());
}
}

protected <RequestT> void captureBody(@Nullable Span span, Endpoint<RequestT, ?, ?> endpoint,
HttpEntity httpEntity) {
try {
if (shouldCaptureBody(span, endpoint, httpEntity)) {

String body = new BufferedReader(
new InputStreamReader(httpEntity.getContent(), StandardCharsets.UTF_8))
.lines()
.collect(Collectors.joining());

span.setAttribute(OTelAttributes.DB_STATEMENT, body);
}
} catch (IOException e) {
logger.debug("Failed reading HTTP body content.", e);
}
}

private <RequestT> boolean shouldCaptureBody(@Nullable Span span, Endpoint<RequestT, ?, ?> endpoint, HttpEntity httpEntity) {
return !isInvalidSpan(span)
&& CAPTURE_SEARCH_BODY
&& SEARCH_ENDPOINTS.contains(endpoint.id())
&& httpEntity != null
&& httpEntity.isRepeatable();
}

private boolean isInvalidSpan(@Nullable Span span) {
return !INSTRUMENTATION_ENABLED || span == null || !span.isRecording();
}

private static final class OTelAttributes {
private static final AttributeKey<String> DB_SYSTEM = SemanticAttributes.DB_SYSTEM;
private static final AttributeKey<String> DB_OPERATION = SemanticAttributes.DB_OPERATION;
private static final AttributeKey<String> DB_STATEMENT = SemanticAttributes.DB_STATEMENT;
private static final AttributeKey<String> HTTP_REQUEST_METHOD = AttributeKey.stringKey("http.request.method");
private static final AttributeKey<String> URL_FULL = AttributeKey.stringKey("url.full");
private static final AttributeKey<String> SERVER_ADDRESS = AttributeKey.stringKey("server.address");
private static final AttributeKey<Long> SERVER_PORT = AttributeKey.longKey("server.port");

private static final String PATH_PART_PREFIX = "db.elasticsearch.path_parts.";
}

private static final class ConfigUtil {
private static String getConfigOption(String key, String defaultValue) {
String normalizedKey = normalizePropertyKey(key);
String systemProperty =
System.getProperties().entrySet().stream()
.filter(entry -> normalizedKey.equals(normalizePropertyKey(entry.getKey().toString())))
.map(entry -> entry.getValue().toString())
.findFirst()
.orElse(null);
if (systemProperty != null) {
return systemProperty;
}
return System.getenv().entrySet().stream()
.filter(entry -> normalizedKey.equals(normalizeEnvironmentVariableKey(entry.getKey())))
.map(Map.Entry::getValue)
.findFirst()
.orElse(defaultValue);
}

/**
* Normalize an environment variable key by converting to lower case and replacing "_" with ".".
*/
private static String normalizeEnvironmentVariableKey(String key) {
return key.toLowerCase(Locale.ROOT).replace("_", ".");
}

/**
* Normalize a property key by converting to lower case and replacing "-" with ".".
*/
private static String normalizePropertyKey(String key) {
return key.toLowerCase(Locale.ROOT).replace("-", ".");
}
}
}
Loading

0 comments on commit d81f7dc

Please sign in to comment.