-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add support for InfluxDB (open-telemetry#10850)
- Loading branch information
1 parent
f7c88d1
commit fd5d7e1
Showing
13 changed files
with
981 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
plugins { | ||
id("otel.javaagent-instrumentation") | ||
} | ||
|
||
muzzle { | ||
pass { | ||
group.set("org.influxdb") | ||
module.set("influxdb-java") | ||
versions.set("[2.4,)") | ||
assertInverse.set(true) | ||
} | ||
} | ||
|
||
dependencies { | ||
compileOnly("org.influxdb:influxdb-java:2.4") | ||
|
||
compileOnly("com.google.auto.value:auto-value-annotations") | ||
annotationProcessor("com.google.auto.value:auto-value") | ||
|
||
// we use methods that weren't present before 2.14 in tests | ||
testLibrary("org.influxdb:influxdb-java:2.14") | ||
} | ||
|
||
testing { | ||
suites { | ||
val test24 by registering(JvmTestSuite::class) { | ||
dependencies { | ||
implementation(project()) | ||
implementation("org.influxdb:influxdb-java:2.4") | ||
implementation("org.testcontainers:testcontainers") | ||
} | ||
} | ||
} | ||
} | ||
|
||
tasks { | ||
test { | ||
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) | ||
} | ||
|
||
if (!(findProperty("testLatestDeps") as Boolean)) { | ||
check { | ||
dependsOn(testing.suites) | ||
} | ||
} | ||
} |
48 changes: 48 additions & 0 deletions
48
...va/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbAttributesGetter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.javaagent.instrumentation.influxdb.v2_4; | ||
|
||
import io.opentelemetry.api.internal.StringUtils; | ||
import io.opentelemetry.instrumentation.api.incubator.semconv.db.DbClientAttributesGetter; | ||
import javax.annotation.Nullable; | ||
|
||
final class InfluxDbAttributesGetter implements DbClientAttributesGetter<InfluxDbRequest> { | ||
|
||
@Nullable | ||
@Override | ||
public String getStatement(InfluxDbRequest request) { | ||
return request.getSqlStatementInfo().getFullStatement(); | ||
} | ||
|
||
@Nullable | ||
@Override | ||
public String getOperation(InfluxDbRequest request) { | ||
if (request.getSqlStatementInfo() != null) { | ||
String operation = request.getSqlStatementInfo().getOperation(); | ||
return StringUtils.isNullOrEmpty(operation) ? request.getSql() : operation; | ||
} | ||
return null; | ||
} | ||
|
||
@Nullable | ||
@Override | ||
public String getSystem(InfluxDbRequest request) { | ||
return "influxdb"; | ||
} | ||
|
||
@Nullable | ||
@Override | ||
public String getUser(InfluxDbRequest request) { | ||
return null; | ||
} | ||
|
||
@Nullable | ||
@Override | ||
public String getName(InfluxDbRequest request) { | ||
String dbName = request.getDbName(); | ||
return StringUtils.isNullOrEmpty(dbName) ? null : dbName; | ||
} | ||
} |
18 changes: 18 additions & 0 deletions
18
...main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbConstants.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.javaagent.instrumentation.influxdb.v2_4; | ||
|
||
final class InfluxDbConstants { | ||
|
||
private InfluxDbConstants() {} | ||
|
||
public static final String CREATE_DATABASE_STATEMENT_NEW = "CREATE DATABASE \"%s\""; | ||
|
||
/** In influxDB 0.x version, it uses below statement format to create a database. */ | ||
public static final String CREATE_DATABASE_STATEMENT_OLD = "CREATE DATABASE IF NOT EXISTS \"%s\""; | ||
|
||
public static final String DELETE_DATABASE_STATEMENT = "DROP DATABASE \"%s\""; | ||
} |
211 changes: 211 additions & 0 deletions
211
...io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbImplInstrumentation.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,211 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.javaagent.instrumentation.influxdb.v2_4; | ||
|
||
import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext; | ||
import static io.opentelemetry.javaagent.instrumentation.influxdb.v2_4.InfluxDbConstants.CREATE_DATABASE_STATEMENT_NEW; | ||
import static io.opentelemetry.javaagent.instrumentation.influxdb.v2_4.InfluxDbConstants.CREATE_DATABASE_STATEMENT_OLD; | ||
import static io.opentelemetry.javaagent.instrumentation.influxdb.v2_4.InfluxDbConstants.DELETE_DATABASE_STATEMENT; | ||
import static io.opentelemetry.javaagent.instrumentation.influxdb.v2_4.InfluxDbSingletons.instrumenter; | ||
import static net.bytebuddy.matcher.ElementMatchers.isEnum; | ||
import static net.bytebuddy.matcher.ElementMatchers.isMethod; | ||
import static net.bytebuddy.matcher.ElementMatchers.named; | ||
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf; | ||
import static net.bytebuddy.matcher.ElementMatchers.takesArgument; | ||
import static net.bytebuddy.matcher.ElementMatchers.takesArguments; | ||
|
||
import io.opentelemetry.context.Context; | ||
import io.opentelemetry.context.Scope; | ||
import io.opentelemetry.javaagent.bootstrap.CallDepth; | ||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; | ||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; | ||
import net.bytebuddy.asm.Advice; | ||
import net.bytebuddy.description.type.TypeDescription; | ||
import net.bytebuddy.implementation.bytecode.assign.Assigner; | ||
import net.bytebuddy.matcher.ElementMatcher; | ||
import okhttp3.HttpUrl; | ||
import org.influxdb.dto.BatchPoints; | ||
import org.influxdb.dto.Query; | ||
import org.influxdb.impl.InfluxDBImpl; | ||
import retrofit2.Retrofit; | ||
|
||
public class InfluxDbImplInstrumentation implements TypeInstrumentation { | ||
|
||
@Override | ||
public ElementMatcher<TypeDescription> typeMatcher() { | ||
return named("org.influxdb.impl.InfluxDBImpl"); | ||
} | ||
|
||
@Override | ||
public void transform(TypeTransformer transformer) { | ||
transformer.applyAdviceToMethod( | ||
isMethod().and(named("query")).and(takesArgument(0, named("org.influxdb.dto.Query"))), | ||
this.getClass().getName() + "$InfluxDbQueryAdvice"); | ||
|
||
transformer.applyAdviceToMethod( | ||
isMethod() | ||
.and(named("write")) | ||
.and( | ||
takesArguments(1) | ||
.and(takesArgument(0, named("org.influxdb.dto.BatchPoints"))) | ||
.or(takesArguments(2).and(takesArgument(0, int.class))) | ||
.or( | ||
takesArguments(4) | ||
.and(takesArgument(0, String.class)) | ||
.and(takesArgument(1, String.class)) | ||
.and(takesArgument(2, isEnum()))) | ||
.or( | ||
takesArguments(5) | ||
.and(takesArgument(0, String.class)) | ||
.and(takesArgument(1, String.class)) | ||
.and(takesArgument(2, isEnum())) | ||
.and(takesArgument(3, named("java.util.concurrent.TimeUnit"))))), | ||
this.getClass().getName() + "$InfluxDbModifyAdvice"); | ||
transformer.applyAdviceToMethod( | ||
isMethod().and(namedOneOf("createDatabase", "deleteDatabase")), | ||
this.getClass().getName() + "$InfluxDbModifyAdvice"); | ||
} | ||
|
||
@SuppressWarnings("unused") | ||
public static class InfluxDbQueryAdvice { | ||
|
||
@Advice.OnMethodEnter(suppress = Throwable.class) | ||
public static void onEnter( | ||
@Advice.Argument(0) Query query, | ||
@Advice.AllArguments(readOnly = false, typing = Assigner.Typing.DYNAMIC) Object[] arguments, | ||
@Advice.FieldValue(value = "retrofit") Retrofit retrofit, | ||
@Advice.Local("otelCallDepth") CallDepth callDepth, | ||
@Advice.Local("otelRequest") InfluxDbRequest influxDbRequest, | ||
@Advice.Local("otelContext") Context context, | ||
@Advice.Local("otelScope") Scope scope) { | ||
callDepth = CallDepth.forClass(InfluxDBImpl.class); | ||
if (callDepth.getAndIncrement() > 0) { | ||
return; | ||
} | ||
|
||
if (query == null) { | ||
return; | ||
} | ||
Context parentContext = currentContext(); | ||
|
||
HttpUrl httpUrl = retrofit.baseUrl(); | ||
influxDbRequest = | ||
InfluxDbRequest.create( | ||
httpUrl.host(), httpUrl.port(), query.getDatabase(), query.getCommand()); | ||
|
||
if (!instrumenter().shouldStart(parentContext, influxDbRequest)) { | ||
return; | ||
} | ||
|
||
// wrap callbacks so they'd run in the context of the parent span | ||
Object[] newArguments = new Object[arguments.length]; | ||
boolean hasChangedArgument = false; | ||
for (int i = 0; i < arguments.length; i++) { | ||
newArguments[i] = InfluxDbObjetWrapper.wrap(arguments[i], parentContext); | ||
hasChangedArgument |= newArguments[i] != arguments[i]; | ||
} | ||
if (hasChangedArgument) { | ||
arguments = newArguments; | ||
} | ||
|
||
context = instrumenter().start(parentContext, influxDbRequest); | ||
scope = context.makeCurrent(); | ||
} | ||
|
||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) | ||
public static void onExit( | ||
@Advice.Thrown Throwable throwable, | ||
@Advice.Local("otelCallDepth") CallDepth callDepth, | ||
@Advice.Local("otelRequest") InfluxDbRequest influxDbRequest, | ||
@Advice.Local("otelContext") Context context, | ||
@Advice.Local("otelScope") Scope scope) { | ||
|
||
if (callDepth.decrementAndGet() > 0) { | ||
return; | ||
} | ||
|
||
if (scope == null) { | ||
return; | ||
} | ||
|
||
scope.close(); | ||
|
||
instrumenter().end(context, influxDbRequest, null, throwable); | ||
} | ||
} | ||
|
||
@SuppressWarnings("unused") | ||
public static class InfluxDbModifyAdvice { | ||
|
||
@Advice.OnMethodEnter(suppress = Throwable.class) | ||
public static void onEnter( | ||
@Advice.This InfluxDBImpl influxDbImpl, | ||
@Advice.Origin("#m") String methodName, | ||
@Advice.Argument(0) Object arg0, | ||
@Advice.FieldValue(value = "retrofit") Retrofit retrofit, | ||
@Advice.Local("otelCallDepth") CallDepth callDepth, | ||
@Advice.Local("otelRequest") InfluxDbRequest influxDbRequest, | ||
@Advice.Local("otelContext") Context context, | ||
@Advice.Local("otelScope") Scope scope) { | ||
callDepth = CallDepth.forClass(InfluxDBImpl.class); | ||
if (callDepth.getAndIncrement() > 0) { | ||
return; | ||
} | ||
|
||
if (arg0 == null) { | ||
return; | ||
} | ||
|
||
Context parentContext = currentContext(); | ||
|
||
HttpUrl httpUrl = retrofit.baseUrl(); | ||
String database = | ||
(arg0 instanceof BatchPoints) | ||
? ((BatchPoints) arg0).getDatabase() | ||
// write data by UDP protocol, in this way, can't get database name. | ||
: arg0 instanceof Integer ? "" : String.valueOf(arg0); | ||
|
||
String sql = methodName; | ||
if ("createDatabase".equals(methodName)) { | ||
sql = | ||
influxDbImpl.version().startsWith("0.") | ||
? String.format(CREATE_DATABASE_STATEMENT_OLD, database) | ||
: String.format(CREATE_DATABASE_STATEMENT_NEW, database); | ||
} else if ("deleteDatabase".equals(methodName)) { | ||
sql = String.format(DELETE_DATABASE_STATEMENT, database); | ||
} | ||
|
||
influxDbRequest = InfluxDbRequest.create(httpUrl.host(), httpUrl.port(), database, sql); | ||
|
||
if (!instrumenter().shouldStart(parentContext, influxDbRequest)) { | ||
return; | ||
} | ||
|
||
context = instrumenter().start(parentContext, influxDbRequest); | ||
scope = context.makeCurrent(); | ||
} | ||
|
||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) | ||
public static void onExit( | ||
@Advice.Thrown Throwable throwable, | ||
@Advice.Local("otelCallDepth") CallDepth callDepth, | ||
@Advice.Local("otelRequest") InfluxDbRequest influxDbRequest, | ||
@Advice.Local("otelContext") Context context, | ||
@Advice.Local("otelScope") Scope scope) { | ||
|
||
if (callDepth.decrementAndGet() > 0) { | ||
return; | ||
} | ||
|
||
if (scope == null) { | ||
return; | ||
} | ||
scope.close(); | ||
|
||
instrumenter().end(context, influxDbRequest, null, throwable); | ||
} | ||
} | ||
} |
26 changes: 26 additions & 0 deletions
26
.../opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbInstrumentationModule.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.javaagent.instrumentation.influxdb.v2_4; | ||
|
||
import static java.util.Collections.singletonList; | ||
|
||
import com.google.auto.service.AutoService; | ||
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; | ||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; | ||
import java.util.List; | ||
|
||
@AutoService(InstrumentationModule.class) | ||
public class InfluxDbInstrumentationModule extends InstrumentationModule { | ||
|
||
public InfluxDbInstrumentationModule() { | ||
super("influxdb", "influxdb-2.4"); | ||
} | ||
|
||
@Override | ||
public List<TypeInstrumentation> typeInstrumentations() { | ||
return singletonList(new InfluxDbImplInstrumentation()); | ||
} | ||
} |
21 changes: 21 additions & 0 deletions
21
...pentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbNetworkAttributesGetter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.javaagent.instrumentation.influxdb.v2_4; | ||
|
||
import io.opentelemetry.instrumentation.api.semconv.network.ServerAttributesGetter; | ||
|
||
final class InfluxDbNetworkAttributesGetter implements ServerAttributesGetter<InfluxDbRequest> { | ||
|
||
@Override | ||
public String getServerAddress(InfluxDbRequest request) { | ||
return request.getHost(); | ||
} | ||
|
||
@Override | ||
public Integer getServerPort(InfluxDbRequest request) { | ||
return request.getPort(); | ||
} | ||
} |
Oops, something went wrong.