From 9727c6ea978072ef558f13227bbb8c01e808be4a Mon Sep 17 00:00:00 2001 From: Steve Rao Date: Wed, 8 May 2024 11:08:52 +0800 Subject: [PATCH] Add support for InfluxDB (#10850) --- docs/supported-libraries.md | 1 + .../influxdb-2.4/javaagent/build.gradle.kts | 46 +++ .../v2_4/InfluxDbAttributesGetter.java | 48 +++ .../influxdb/v2_4/InfluxDbConstants.java | 18 + .../v2_4/InfluxDbImplInstrumentation.java | 211 +++++++++++ .../v2_4/InfluxDbInstrumentationModule.java | 26 ++ .../v2_4/InfluxDbNetworkAttributesGetter.java | 21 ++ .../influxdb/v2_4/InfluxDbObjetWrapper.java | 44 +++ .../influxdb/v2_4/InfluxDbRequest.java | 32 ++ .../influxdb/v2_4/InfluxDbSingletons.java | 38 ++ .../influxdb/v2_4/InfluxDbClientTest.java | 334 ++++++++++++++++++ .../influxdb/v2_4/InfluxDbClient24Test.java | 161 +++++++++ settings.gradle.kts | 1 + 13 files changed, 981 insertions(+) create mode 100644 instrumentation/influxdb-2.4/javaagent/build.gradle.kts create mode 100644 instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbAttributesGetter.java create mode 100644 instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbConstants.java create mode 100644 instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbImplInstrumentation.java create mode 100644 instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbInstrumentationModule.java create mode 100644 instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbNetworkAttributesGetter.java create mode 100644 instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbObjetWrapper.java create mode 100644 instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbRequest.java create mode 100644 instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbSingletons.java create mode 100644 instrumentation/influxdb-2.4/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbClientTest.java create mode 100644 instrumentation/influxdb-2.4/javaagent/src/test24/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbClient24Test.java diff --git a/docs/supported-libraries.md b/docs/supported-libraries.md index fb7eceebb6e0..a484c3f771b6 100644 --- a/docs/supported-libraries.md +++ b/docs/supported-libraries.md @@ -73,6 +73,7 @@ These are the supported libraries and frameworks: | [HikariCP](https://github.com/brettwooldridge/HikariCP) | 3.0+ | [opentelemetry-hikaricp-3.0](../instrumentation/hikaricp-3.0/library) | [Database Pool Metrics] | | [HttpURLConnection](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/net/HttpURLConnection.html) | Java 8+ | N/A | [HTTP Client Spans], [HTTP Client Metrics] | | [Hystrix](https://github.com/Netflix/Hystrix) | 1.4+ | N/A | none | +| [InfluxDB Client](https://github.com/influxdata/influxdb-java) | 2.4+ | N/A | [Database Client Spans] | | [Java Executors](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executor.html) | Java 8+ | N/A | Context propagation | | [Java Http Client](https://docs.oracle.com/en/java/javase/11/docs/api/java.net.http/java/net/http/package-summary.html) | Java 11+ | [opentelemetry-java-http-client](../instrumentation/java-http-client/library) | [HTTP Client Spans], [HTTP Client Metrics] | | [java.util.logging](https://docs.oracle.com/javase/8/docs/api/java/util/logging/package-summary.html) | Java 8+ | N/A | none | diff --git a/instrumentation/influxdb-2.4/javaagent/build.gradle.kts b/instrumentation/influxdb-2.4/javaagent/build.gradle.kts new file mode 100644 index 000000000000..d6a74c455b43 --- /dev/null +++ b/instrumentation/influxdb-2.4/javaagent/build.gradle.kts @@ -0,0 +1,46 @@ +plugins { + id("otel.javaagent-instrumentation") +} + +muzzle { + pass { + group.set("org.influxdb") + module.set("influxdb-java") + versions.set("[2.4,)") + assertInverse.set(true) + } +} + +dependencies { + compileOnly("org.influxdb:influxdb-java:2.4") + + compileOnly("com.google.auto.value:auto-value-annotations") + annotationProcessor("com.google.auto.value:auto-value") + + // we use methods that weren't present before 2.14 in tests + testLibrary("org.influxdb:influxdb-java:2.14") +} + +testing { + suites { + val test24 by registering(JvmTestSuite::class) { + dependencies { + implementation(project()) + implementation("org.influxdb:influxdb-java:2.4") + implementation("org.testcontainers:testcontainers") + } + } + } +} + +tasks { + test { + usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) + } + + if (!(findProperty("testLatestDeps") as Boolean)) { + check { + dependsOn(testing.suites) + } + } +} diff --git a/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbAttributesGetter.java b/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbAttributesGetter.java new file mode 100644 index 000000000000..0f8be4f6af6e --- /dev/null +++ b/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbAttributesGetter.java @@ -0,0 +1,48 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.influxdb.v2_4; + +import io.opentelemetry.api.internal.StringUtils; +import io.opentelemetry.instrumentation.api.incubator.semconv.db.DbClientAttributesGetter; +import javax.annotation.Nullable; + +final class InfluxDbAttributesGetter implements DbClientAttributesGetter { + + @Nullable + @Override + public String getStatement(InfluxDbRequest request) { + return request.getSqlStatementInfo().getFullStatement(); + } + + @Nullable + @Override + public String getOperation(InfluxDbRequest request) { + if (request.getSqlStatementInfo() != null) { + String operation = request.getSqlStatementInfo().getOperation(); + return StringUtils.isNullOrEmpty(operation) ? request.getSql() : operation; + } + return null; + } + + @Nullable + @Override + public String getSystem(InfluxDbRequest request) { + return "influxdb"; + } + + @Nullable + @Override + public String getUser(InfluxDbRequest request) { + return null; + } + + @Nullable + @Override + public String getName(InfluxDbRequest request) { + String dbName = request.getDbName(); + return StringUtils.isNullOrEmpty(dbName) ? null : dbName; + } +} diff --git a/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbConstants.java b/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbConstants.java new file mode 100644 index 000000000000..06cbb2832da5 --- /dev/null +++ b/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbConstants.java @@ -0,0 +1,18 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.influxdb.v2_4; + +final class InfluxDbConstants { + + private InfluxDbConstants() {} + + public static final String CREATE_DATABASE_STATEMENT_NEW = "CREATE DATABASE \"%s\""; + + /** In influxDB 0.x version, it uses below statement format to create a database. */ + public static final String CREATE_DATABASE_STATEMENT_OLD = "CREATE DATABASE IF NOT EXISTS \"%s\""; + + public static final String DELETE_DATABASE_STATEMENT = "DROP DATABASE \"%s\""; +} diff --git a/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbImplInstrumentation.java b/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbImplInstrumentation.java new file mode 100644 index 000000000000..2633e1d82433 --- /dev/null +++ b/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbImplInstrumentation.java @@ -0,0 +1,211 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.influxdb.v2_4; + +import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext; +import static io.opentelemetry.javaagent.instrumentation.influxdb.v2_4.InfluxDbConstants.CREATE_DATABASE_STATEMENT_NEW; +import static io.opentelemetry.javaagent.instrumentation.influxdb.v2_4.InfluxDbConstants.CREATE_DATABASE_STATEMENT_OLD; +import static io.opentelemetry.javaagent.instrumentation.influxdb.v2_4.InfluxDbConstants.DELETE_DATABASE_STATEMENT; +import static io.opentelemetry.javaagent.instrumentation.influxdb.v2_4.InfluxDbSingletons.instrumenter; +import static net.bytebuddy.matcher.ElementMatchers.isEnum; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.namedOneOf; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.javaagent.bootstrap.CallDepth; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.implementation.bytecode.assign.Assigner; +import net.bytebuddy.matcher.ElementMatcher; +import okhttp3.HttpUrl; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Query; +import org.influxdb.impl.InfluxDBImpl; +import retrofit2.Retrofit; + +public class InfluxDbImplInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("org.influxdb.impl.InfluxDBImpl"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isMethod().and(named("query")).and(takesArgument(0, named("org.influxdb.dto.Query"))), + this.getClass().getName() + "$InfluxDbQueryAdvice"); + + transformer.applyAdviceToMethod( + isMethod() + .and(named("write")) + .and( + takesArguments(1) + .and(takesArgument(0, named("org.influxdb.dto.BatchPoints"))) + .or(takesArguments(2).and(takesArgument(0, int.class))) + .or( + takesArguments(4) + .and(takesArgument(0, String.class)) + .and(takesArgument(1, String.class)) + .and(takesArgument(2, isEnum()))) + .or( + takesArguments(5) + .and(takesArgument(0, String.class)) + .and(takesArgument(1, String.class)) + .and(takesArgument(2, isEnum())) + .and(takesArgument(3, named("java.util.concurrent.TimeUnit"))))), + this.getClass().getName() + "$InfluxDbModifyAdvice"); + transformer.applyAdviceToMethod( + isMethod().and(namedOneOf("createDatabase", "deleteDatabase")), + this.getClass().getName() + "$InfluxDbModifyAdvice"); + } + + @SuppressWarnings("unused") + public static class InfluxDbQueryAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.Argument(0) Query query, + @Advice.AllArguments(readOnly = false, typing = Assigner.Typing.DYNAMIC) Object[] arguments, + @Advice.FieldValue(value = "retrofit") Retrofit retrofit, + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.Local("otelRequest") InfluxDbRequest influxDbRequest, + @Advice.Local("otelContext") Context context, + @Advice.Local("otelScope") Scope scope) { + callDepth = CallDepth.forClass(InfluxDBImpl.class); + if (callDepth.getAndIncrement() > 0) { + return; + } + + if (query == null) { + return; + } + Context parentContext = currentContext(); + + HttpUrl httpUrl = retrofit.baseUrl(); + influxDbRequest = + InfluxDbRequest.create( + httpUrl.host(), httpUrl.port(), query.getDatabase(), query.getCommand()); + + if (!instrumenter().shouldStart(parentContext, influxDbRequest)) { + return; + } + + // wrap callbacks so they'd run in the context of the parent span + Object[] newArguments = new Object[arguments.length]; + boolean hasChangedArgument = false; + for (int i = 0; i < arguments.length; i++) { + newArguments[i] = InfluxDbObjetWrapper.wrap(arguments[i], parentContext); + hasChangedArgument |= newArguments[i] != arguments[i]; + } + if (hasChangedArgument) { + arguments = newArguments; + } + + context = instrumenter().start(parentContext, influxDbRequest); + scope = context.makeCurrent(); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void onExit( + @Advice.Thrown Throwable throwable, + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.Local("otelRequest") InfluxDbRequest influxDbRequest, + @Advice.Local("otelContext") Context context, + @Advice.Local("otelScope") Scope scope) { + + if (callDepth.decrementAndGet() > 0) { + return; + } + + if (scope == null) { + return; + } + + scope.close(); + + instrumenter().end(context, influxDbRequest, null, throwable); + } + } + + @SuppressWarnings("unused") + public static class InfluxDbModifyAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.This InfluxDBImpl influxDbImpl, + @Advice.Origin("#m") String methodName, + @Advice.Argument(0) Object arg0, + @Advice.FieldValue(value = "retrofit") Retrofit retrofit, + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.Local("otelRequest") InfluxDbRequest influxDbRequest, + @Advice.Local("otelContext") Context context, + @Advice.Local("otelScope") Scope scope) { + callDepth = CallDepth.forClass(InfluxDBImpl.class); + if (callDepth.getAndIncrement() > 0) { + return; + } + + if (arg0 == null) { + return; + } + + Context parentContext = currentContext(); + + HttpUrl httpUrl = retrofit.baseUrl(); + String database = + (arg0 instanceof BatchPoints) + ? ((BatchPoints) arg0).getDatabase() + // write data by UDP protocol, in this way, can't get database name. + : arg0 instanceof Integer ? "" : String.valueOf(arg0); + + String sql = methodName; + if ("createDatabase".equals(methodName)) { + sql = + influxDbImpl.version().startsWith("0.") + ? String.format(CREATE_DATABASE_STATEMENT_OLD, database) + : String.format(CREATE_DATABASE_STATEMENT_NEW, database); + } else if ("deleteDatabase".equals(methodName)) { + sql = String.format(DELETE_DATABASE_STATEMENT, database); + } + + influxDbRequest = InfluxDbRequest.create(httpUrl.host(), httpUrl.port(), database, sql); + + if (!instrumenter().shouldStart(parentContext, influxDbRequest)) { + return; + } + + context = instrumenter().start(parentContext, influxDbRequest); + scope = context.makeCurrent(); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void onExit( + @Advice.Thrown Throwable throwable, + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.Local("otelRequest") InfluxDbRequest influxDbRequest, + @Advice.Local("otelContext") Context context, + @Advice.Local("otelScope") Scope scope) { + + if (callDepth.decrementAndGet() > 0) { + return; + } + + if (scope == null) { + return; + } + scope.close(); + + instrumenter().end(context, influxDbRequest, null, throwable); + } + } +} diff --git a/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbInstrumentationModule.java b/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbInstrumentationModule.java new file mode 100644 index 000000000000..3aba153fc408 --- /dev/null +++ b/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbInstrumentationModule.java @@ -0,0 +1,26 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.influxdb.v2_4; + +import static java.util.Collections.singletonList; + +import com.google.auto.service.AutoService; +import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import java.util.List; + +@AutoService(InstrumentationModule.class) +public class InfluxDbInstrumentationModule extends InstrumentationModule { + + public InfluxDbInstrumentationModule() { + super("influxdb", "influxdb-2.4"); + } + + @Override + public List typeInstrumentations() { + return singletonList(new InfluxDbImplInstrumentation()); + } +} diff --git a/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbNetworkAttributesGetter.java b/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbNetworkAttributesGetter.java new file mode 100644 index 000000000000..0338653cd1f9 --- /dev/null +++ b/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbNetworkAttributesGetter.java @@ -0,0 +1,21 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.influxdb.v2_4; + +import io.opentelemetry.instrumentation.api.semconv.network.ServerAttributesGetter; + +final class InfluxDbNetworkAttributesGetter implements ServerAttributesGetter { + + @Override + public String getServerAddress(InfluxDbRequest request) { + return request.getHost(); + } + + @Override + public Integer getServerPort(InfluxDbRequest request) { + return request.getPort(); + } +} diff --git a/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbObjetWrapper.java b/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbObjetWrapper.java new file mode 100644 index 000000000000..7be6efe42aed --- /dev/null +++ b/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbObjetWrapper.java @@ -0,0 +1,44 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.influxdb.v2_4; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +public final class InfluxDbObjetWrapper { + + @SuppressWarnings("unchecked") + public static Object wrap(Object object, Context parentContext) { + if (object instanceof Consumer) { + return (Consumer) + o -> { + try (Scope ignore = parentContext.makeCurrent()) { + ((Consumer) object).accept(o); + } + }; + } else if (object instanceof BiConsumer) { + return (BiConsumer) + (o1, o2) -> { + try (Scope ignore = parentContext.makeCurrent()) { + ((BiConsumer) object).accept(o1, o2); + } + }; + } else if (object instanceof Runnable) { + return (Runnable) + () -> { + try (Scope ignore = parentContext.makeCurrent()) { + ((Runnable) object).run(); + } + }; + } + + return object; + } + + private InfluxDbObjetWrapper() {} +} diff --git a/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbRequest.java b/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbRequest.java new file mode 100644 index 000000000000..265b911d7102 --- /dev/null +++ b/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbRequest.java @@ -0,0 +1,32 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.influxdb.v2_4; + +import com.google.auto.value.AutoValue; +import io.opentelemetry.instrumentation.api.incubator.semconv.db.SqlStatementInfo; +import io.opentelemetry.instrumentation.api.incubator.semconv.db.SqlStatementSanitizer; +import io.opentelemetry.javaagent.bootstrap.internal.CommonConfig; + +@AutoValue +public abstract class InfluxDbRequest { + + private static final SqlStatementSanitizer sanitizer = + SqlStatementSanitizer.create(CommonConfig.get().isStatementSanitizationEnabled()); + + public static InfluxDbRequest create(String host, Integer port, String dbName, String sql) { + return new AutoValue_InfluxDbRequest(host, port, dbName, sql, sanitizer.sanitize(sql)); + } + + public abstract String getHost(); + + public abstract Integer getPort(); + + public abstract String getDbName(); + + public abstract String getSql(); + + public abstract SqlStatementInfo getSqlStatementInfo(); +} diff --git a/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbSingletons.java b/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbSingletons.java new file mode 100644 index 000000000000..3f928f2f125e --- /dev/null +++ b/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbSingletons.java @@ -0,0 +1,38 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.influxdb.v2_4; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.instrumentation.api.incubator.semconv.db.DbClientAttributesExtractor; +import io.opentelemetry.instrumentation.api.incubator.semconv.db.DbClientSpanNameExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; +import io.opentelemetry.instrumentation.api.semconv.network.ServerAttributesExtractor; + +public final class InfluxDbSingletons { + + private static final Instrumenter INSTRUMENTER; + + static { + InfluxDbAttributesGetter dbAttributesGetter = new InfluxDbAttributesGetter(); + + INSTRUMENTER = + Instrumenter.builder( + GlobalOpenTelemetry.get(), + "io.opentelemetry.influxdb-2.4", + DbClientSpanNameExtractor.create(dbAttributesGetter)) + .addAttributesExtractor(DbClientAttributesExtractor.create(dbAttributesGetter)) + .addAttributesExtractor( + ServerAttributesExtractor.create(new InfluxDbNetworkAttributesGetter())) + .buildInstrumenter(SpanKindExtractor.alwaysClient()); + } + + public static Instrumenter instrumenter() { + return INSTRUMENTER; + } + + private InfluxDbSingletons() {} +} diff --git a/instrumentation/influxdb-2.4/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbClientTest.java b/instrumentation/influxdb-2.4/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbClientTest.java new file mode 100644 index 000000000000..d239606bfd37 --- /dev/null +++ b/instrumentation/influxdb-2.4/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbClientTest.java @@ -0,0 +1,334 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.influxdb.v2_4; + +import static io.opentelemetry.javaagent.instrumentation.influxdb.v2_4.InfluxDbConstants.CREATE_DATABASE_STATEMENT_NEW; +import static io.opentelemetry.javaagent.instrumentation.influxdb.v2_4.InfluxDbConstants.DELETE_DATABASE_STATEMENT; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static java.util.Arrays.asList; +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; +import io.opentelemetry.semconv.ServerAttributes; +import io.opentelemetry.semconv.incubating.DbIncubatingAttributes; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestInstance.Lifecycle; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.testcontainers.containers.GenericContainer; + +// ignore using deprecated createDatabase and deleteDatabase methods warning. +@SuppressWarnings("deprecation") +@TestInstance(Lifecycle.PER_CLASS) +class InfluxDbClientTest { + + @RegisterExtension + private static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + private static final GenericContainer influxDbServer = + new GenericContainer<>("influxdb:1.8.10-alpine").withExposedPorts(8086); + + private static InfluxDB influxDb; + + private static final String databaseName = "mydb"; + + private static String host; + + private static int port; + + @BeforeAll + void setup() { + influxDbServer.start(); + port = influxDbServer.getMappedPort(8086); + host = influxDbServer.getHost(); + String serverUrl = "http://" + host + ":" + port + "/"; + String username = "root"; + String password = "root"; + influxDb = InfluxDBFactory.connect(serverUrl, username, password); + influxDb.createDatabase(databaseName); + } + + @AfterAll + void cleanup() { + influxDb.deleteDatabase(databaseName); + influxDb.close(); + influxDbServer.stop(); + } + + @Test + void testQueryAndModifyWithOneArgument() { + String dbName = databaseName + "2"; + influxDb.createDatabase(dbName); + BatchPoints batchPoints = + BatchPoints.database(dbName).tag("async", "true").retentionPolicy("autogen").build(); + Point point1 = + Point.measurement("cpu") + .tag("atag", "test") + .addField("idle", 90L) + .addField("usertime", 9L) + .addField("system", 1L) + .build(); + Point point2 = + Point.measurement("disk") + .tag("atag", "test") + .addField("used", 80L) + .addField("free", 1L) + .build(); + batchPoints.point(point1); + batchPoints.point(point2); + influxDb.write(batchPoints); + Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName); + QueryResult result = influxDb.query(query); + assertThat(result.getResults().get(0).getSeries().get(0).getTags()).isNotEmpty(); + influxDb.deleteDatabase(dbName); + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("CREATE DATABASE " + dbName) + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfying( + attributeAssertions( + String.format(CREATE_DATABASE_STATEMENT_NEW, dbName), + "CREATE DATABASE", + dbName))), + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("write " + dbName) + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfying(attributeAssertions("write", "write", dbName))), + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("SELECT " + dbName) + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfying( + attributeAssertions("SELECT * FROM cpu GROUP BY *", "SELECT", dbName))), + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("DROP DATABASE " + dbName) + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfying( + attributeAssertions( + String.format(DELETE_DATABASE_STATEMENT, dbName), + "DROP DATABASE", + dbName)))); + } + + @Test + void testQueryWithTwoArguments() { + Query query = new Query("SELECT * FROM cpu_load where test1 = 'influxDb'", databaseName); + influxDb.query(query, TimeUnit.MILLISECONDS); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("SELECT " + databaseName) + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfying( + attributeAssertions( + "SELECT * FROM cpu_load where test1 = ?", + "SELECT", + databaseName)))); + } + + @Test + void testQueryWithThreeArguments() throws InterruptedException { + Query query = + new Query( + "SELECT * FROM cpu_load where time >= '2022-01-01T08:00:00Z' AND time <= '2022-01-01T20:00:00Z'", + databaseName); + BlockingQueue queue = new LinkedBlockingQueue<>(); + + influxDb.query(query, 2, result -> queue.add(result)); + queue.poll(20, TimeUnit.SECONDS); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("SELECT " + databaseName) + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfying( + attributeAssertions( + "SELECT * FROM cpu_load where time >= ? AND time <= ?", + "SELECT", + databaseName)))); + } + + @Test + void testQueryWithThreeArgumentsCallback() throws InterruptedException { + Query query = new Query("SELECT * FROM cpu_load", databaseName); + BlockingQueue queue = new LinkedBlockingQueue<>(); + + influxDb.query(query, 2, result -> queue.add(result)); + queue.poll(20, TimeUnit.SECONDS); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("SELECT " + databaseName) + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfying( + attributeAssertions( + "SELECT * FROM cpu_load", "SELECT", databaseName)))); + } + + @Test + void testQueryWithFiveArguments() throws InterruptedException { + CountDownLatch countDownLatch = new CountDownLatch(1); + Query query = + new Query( + "SELECT MEAN(water_level) FROM h2o_feet where time = '2022-01-01T08:00:00Z'; SELECT water_level FROM h2o_feet LIMIT 2", + databaseName); + testing.runWithSpan( + "parent", + () -> { + influxDb.query( + query, + 10, + (cancellable, queryResult) -> countDownLatch.countDown(), + () -> testing.runWithSpan("child", () -> {}), + throwable -> {}); + }); + assertThat(countDownLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName("SELECT " + databaseName) + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfying( + attributeAssertions( + "SELECT MEAN(water_level) FROM h2o_feet where time = ?; SELECT water_level FROM h2o_feet LIMIT ?", + "SELECT", + databaseName)), + span -> + span.hasName("child").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(0)))); + } + + @Test + void testQueryFailedWithFiveArguments() throws InterruptedException { + CountDownLatch countDownLatchFailure = new CountDownLatch(1); + Query query = new Query("SELECT MEAN(water_level) FROM;", databaseName); + testing.runWithSpan( + "parent", + () -> { + influxDb.query( + query, + 10, + (cancellable, queryResult) -> {}, + () -> {}, + throwable -> { + testing.runWithSpan("child", () -> {}); + countDownLatchFailure.countDown(); + }); + }); + assertThat(countDownLatchFailure.await(10, TimeUnit.SECONDS)).isTrue(); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName("SELECT " + databaseName) + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfying( + attributeAssertions( + "SELECT MEAN(water_level) FROM;", "SELECT", databaseName)), + span -> + span.hasName("child").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(0)))); + } + + @Test + void testWriteWithFourArguments() { + String measurement = "cpu_load"; + List records = new ArrayList<>(); + records.add(measurement + ",atag=test1 idle=100,usertime=10,system=1 1485273600"); + influxDb.write(databaseName, "autogen", InfluxDB.ConsistencyLevel.ONE, records); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("write " + databaseName) + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfying( + attributeAssertions("write", "write", databaseName)))); + } + + @Test + void testWriteWithFiveArguments() { + String measurement = "cpu_load"; + List records = new ArrayList<>(); + records.add(measurement + ",atag=test1 idle=100,usertime=10,system=1 1485273600"); + influxDb.write( + databaseName, "autogen", InfluxDB.ConsistencyLevel.ONE, TimeUnit.SECONDS, records); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("write " + databaseName) + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfying( + attributeAssertions("write", "write", databaseName)))); + } + + @Test + void testWriteWithUdp() { + List lineProtocols = new ArrayList<>(); + for (int i = 0; i < 2000; i++) { + Point point = Point.measurement("udp_single_poit").addField("v", i).build(); + lineProtocols.add(point.lineProtocol()); + } + influxDb.write(port, lineProtocols); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("write") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfying(attributeAssertions("write", "write", null)))); + } + + private static List attributeAssertions( + String statement, String operation, String databaseName) { + return asList( + equalTo(DbIncubatingAttributes.DB_SYSTEM, "influxdb"), + equalTo(DbIncubatingAttributes.DB_NAME, databaseName), + equalTo(ServerAttributes.SERVER_ADDRESS, host), + equalTo(ServerAttributes.SERVER_PORT, port), + equalTo(DbIncubatingAttributes.DB_STATEMENT, statement), + equalTo(DbIncubatingAttributes.DB_OPERATION, operation)); + } +} diff --git a/instrumentation/influxdb-2.4/javaagent/src/test24/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbClient24Test.java b/instrumentation/influxdb-2.4/javaagent/src/test24/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbClient24Test.java new file mode 100644 index 000000000000..faed6afea662 --- /dev/null +++ b/instrumentation/influxdb-2.4/javaagent/src/test24/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbClient24Test.java @@ -0,0 +1,161 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.influxdb.v2_4; + +import static io.opentelemetry.javaagent.instrumentation.influxdb.v2_4.InfluxDbConstants.CREATE_DATABASE_STATEMENT_NEW; +import static io.opentelemetry.javaagent.instrumentation.influxdb.v2_4.InfluxDbConstants.DELETE_DATABASE_STATEMENT; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static java.util.Arrays.asList; +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; +import io.opentelemetry.semconv.ServerAttributes; +import io.opentelemetry.semconv.incubating.DbIncubatingAttributes; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestInstance.Lifecycle; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.testcontainers.containers.GenericContainer; + +@TestInstance(Lifecycle.PER_CLASS) +class InfluxDbClient24Test { + + @RegisterExtension + private static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + private static final GenericContainer influxDbServer = + new GenericContainer<>("influxdb:1.8.10-alpine").withExposedPorts(8086); + + private static InfluxDB influxDb; + + private static final String databaseName = "mydb"; + + private static String host; + + private static int port; + + @BeforeAll + void setup() { + influxDbServer.start(); + port = influxDbServer.getMappedPort(8086); + host = influxDbServer.getHost(); + String serverUrl = "http://" + host + ":" + port + "/"; + String username = "root"; + String password = "root"; + influxDb = InfluxDBFactory.connect(serverUrl, username, password); + influxDb.createDatabase(databaseName); + } + + @AfterAll + void cleanup() { + influxDb.deleteDatabase(databaseName); + influxDbServer.stop(); + } + + @Test + void testQueryAndModifyWithOneArgument() { + String dbName = databaseName + "2"; + influxDb.createDatabase(dbName); + BatchPoints batchPoints = + BatchPoints.database(dbName).tag("async", "true").retentionPolicy("autogen").build(); + Point point1 = + Point.measurement("cpu") + .tag("atag", "test") + .addField("idle", 90L) + .addField("usertime", 9L) + .addField("system", 1L) + .build(); + Point point2 = + Point.measurement("disk") + .tag("atag", "test") + .addField("used", 80L) + .addField("free", 1L) + .build(); + batchPoints.point(point1); + batchPoints.point(point2); + influxDb.write(batchPoints); + Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName); + QueryResult result = influxDb.query(query); + assertThat(result.getResults().get(0).getSeries().get(0).getTags()).isNotEmpty(); + influxDb.deleteDatabase(dbName); + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("CREATE DATABASE " + dbName) + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfying( + attributeAssertions( + String.format(CREATE_DATABASE_STATEMENT_NEW, dbName), + "CREATE DATABASE", + dbName))), + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("write " + dbName) + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfying(attributeAssertions("write", "write", dbName))), + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("SELECT " + dbName) + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfying( + attributeAssertions("SELECT * FROM cpu GROUP BY *", "SELECT", dbName))), + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("DROP DATABASE " + dbName) + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfying( + attributeAssertions( + String.format(DELETE_DATABASE_STATEMENT, dbName), + "DROP DATABASE", + dbName)))); + } + + @Test + void testQueryWithTwoArguments() { + Query query = new Query("SELECT * FROM cpu_load where test1 = 'influxDb'", databaseName); + influxDb.query(query, TimeUnit.MILLISECONDS); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("SELECT " + databaseName) + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfying( + attributeAssertions( + "SELECT * FROM cpu_load where test1 = ?", + "SELECT", + databaseName)))); + } + + private static List attributeAssertions( + String statement, String operation, String databaseName) { + return asList( + equalTo(DbIncubatingAttributes.DB_SYSTEM, "influxdb"), + equalTo(DbIncubatingAttributes.DB_NAME, databaseName), + equalTo(ServerAttributes.SERVER_ADDRESS, host), + equalTo(ServerAttributes.SERVER_PORT, port), + equalTo(DbIncubatingAttributes.DB_STATEMENT, statement), + equalTo(DbIncubatingAttributes.DB_OPERATION, operation)); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 9b0efb97dd77..f2b5debbd36f 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -261,6 +261,7 @@ include(":instrumentation:hikaricp-3.0:library") include(":instrumentation:hikaricp-3.0:testing") include(":instrumentation:http-url-connection:javaagent") include(":instrumentation:hystrix-1.4:javaagent") +include(":instrumentation:influxdb-2.4:javaagent") include(":instrumentation:internal:internal-application-logger:bootstrap") include(":instrumentation:internal:internal-application-logger:javaagent") include(":instrumentation:internal:internal-class-loader:javaagent")