Skip to content

Commit

Permalink
Add support for OpenTelemetry in JDBC connectors
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Aug 21, 2023
1 parent 20e3927 commit 7b269fa
Show file tree
Hide file tree
Showing 20 changed files with 218 additions and 31 deletions.
11 changes: 11 additions & 0 deletions plugin/trino-base-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@
<artifactId>units</artifactId>
</dependency>

<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-jdbc</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-cache</artifactId>
Expand Down Expand Up @@ -129,6 +134,12 @@
<artifactId>jmxutils</artifactId>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-spi</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.jdbc;

import io.opentelemetry.api.OpenTelemetry;
import io.trino.plugin.jdbc.credential.CredentialPropertiesProvider;
import io.trino.plugin.jdbc.credential.CredentialProvider;
import io.trino.plugin.jdbc.credential.DefaultCredentialPropertiesProvider;
Expand All @@ -34,6 +35,7 @@ public class DriverConnectionFactory
private final String connectionUrl;
private final Properties connectionProperties;
private final CredentialPropertiesProvider<String, String> credentialPropertiesProvider;
private final TracingDataSource dataSource;

public DriverConnectionFactory(Driver driver, BaseJdbcConfig config, CredentialProvider credentialProvider)
{
Expand All @@ -45,24 +47,35 @@ public DriverConnectionFactory(Driver driver, BaseJdbcConfig config, CredentialP

public DriverConnectionFactory(Driver driver, String connectionUrl, Properties connectionProperties, CredentialProvider credentialProvider)
{
this(driver, connectionUrl, connectionProperties, new DefaultCredentialPropertiesProvider(credentialProvider));
this(driver, connectionUrl, connectionProperties, new DefaultCredentialPropertiesProvider(credentialProvider), OpenTelemetry.noop());
}

public DriverConnectionFactory(Driver driver, String connectionUrl, Properties connectionProperties, CredentialPropertiesProvider<String, String> credentialPropertiesProvider)
public DriverConnectionFactory(Driver driver, String connectionUrl, Properties connectionProperties, CredentialProvider credentialProvider, OpenTelemetry openTelemetry)
{
this(driver, connectionUrl, connectionProperties, new DefaultCredentialPropertiesProvider(credentialProvider), openTelemetry);
}

public DriverConnectionFactory(
Driver driver,
String connectionUrl,
Properties connectionProperties,
CredentialPropertiesProvider<String, String> credentialPropertiesProvider,
OpenTelemetry openTelemetry)
{
this.driver = requireNonNull(driver, "driver is null");
this.connectionUrl = requireNonNull(connectionUrl, "connectionUrl is null");
this.connectionProperties = new Properties();
this.connectionProperties.putAll(requireNonNull(connectionProperties, "connectionProperties is null"));
this.credentialPropertiesProvider = requireNonNull(credentialPropertiesProvider, "credentialPropertiesProvider is null");
this.dataSource = new TracingDataSource(requireNonNull(openTelemetry, "openTelemetry is null"), driver, connectionUrl);
}

@Override
public Connection openConnection(ConnectorSession session)
throws SQLException
{
Properties properties = getCredentialProperties(session.getIdentity());
Connection connection = driver.connect(connectionUrl, properties);
Connection connection = dataSource.getConnection(properties);
checkState(connection != null, "Driver returned null connection, make sure the connection URL '%s' is valid for the driver %s", connectionUrl, driver);
return connection;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.inject.Injector;
import com.google.inject.Module;
import io.airlift.bootstrap.Bootstrap;
import io.opentelemetry.api.OpenTelemetry;
import io.trino.plugin.base.CatalogName;
import io.trino.spi.NodeManager;
import io.trino.spi.VersionEmbedder;
Expand Down Expand Up @@ -61,6 +62,7 @@ public Connector create(String catalogName, Map<String, String> requiredConfig,
binder -> binder.bind(TypeManager.class).toInstance(context.getTypeManager()),
binder -> binder.bind(NodeManager.class).toInstance(context.getNodeManager()),
binder -> binder.bind(VersionEmbedder.class).toInstance(context.getVersionEmbedder()),
binder -> binder.bind(OpenTelemetry.class).toInstance(context.getOpenTelemetry()),
binder -> binder.bind(CatalogName.class).toInstance(new CatalogName(catalogName)),
new JdbcModule(),
module);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.jdbc;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.jdbc.datasource.OpenTelemetryDataSource;

import javax.sql.DataSource;

import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.SQLException;
import java.util.Properties;
import java.util.logging.Logger;

import static java.util.Objects.requireNonNull;

public class TracingDataSource
{
private final OpenTelemetry openTelemetry;
private final Driver driver;
private final String connectionUrl;

public TracingDataSource(OpenTelemetry openTelemetry, Driver driver, String connectionUrl)
{
this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null");
this.driver = requireNonNull(driver, "driver is null");
this.connectionUrl = requireNonNull(connectionUrl, "connectionUrl is null");
}

public Connection getConnection(Properties properties)
throws SQLException
{
DataSource dataSource = new JdbcDataSource(driver, connectionUrl, properties);
try (OpenTelemetryDataSource openTelemetryDataSource = new OpenTelemetryDataSource(dataSource, openTelemetry)) {
return openTelemetryDataSource.getConnection();
}
catch (Exception e) {
throw new SQLException(e);
}
}

private static class JdbcDataSource
implements DataSource
{
private final Driver driver;
private final String connectionUrl;
private final Properties properties;

public JdbcDataSource(Driver driver, String connectionUrl, Properties properties)
{
this.driver = requireNonNull(driver, "driver is null");
this.connectionUrl = requireNonNull(connectionUrl, "connectionUrl is null");
this.properties = requireNonNull(properties, "properties is null");
}

@Override
public Connection getConnection()
throws SQLException
{
return driver.connect(connectionUrl, properties);
}

@Override
public Connection getConnection(String username, String password)
{
throw new UnsupportedOperationException();
}

@Override
public PrintWriter getLogWriter()
{
throw new UnsupportedOperationException();
}

@Override
public void setLogWriter(PrintWriter out)
{
throw new UnsupportedOperationException();
}

@Override
public void setLoginTimeout(int seconds)
{
throw new UnsupportedOperationException();
}

@Override
public int getLoginTimeout()
{
throw new UnsupportedOperationException();
}

@Override
public Logger getParentLogger()
{
throw new UnsupportedOperationException();
}

@Override
public <T> T unwrap(Class<T> iface)
{
throw new UnsupportedOperationException();
}

@Override
public boolean isWrapperFor(Class<?> iface)
{
throw new UnsupportedOperationException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import io.opentelemetry.api.OpenTelemetry;
import io.trino.plugin.base.mapping.IdentifierMapping;
import io.trino.plugin.jdbc.credential.EmptyCredentialProvider;
import io.trino.testing.QueryRunner;
Expand Down Expand Up @@ -46,7 +47,7 @@ protected QueryRunner createQueryRunner()
throws Exception
{
String connectionUrl = createH2ConnectionUrl();
DriverConnectionFactory delegate = new DriverConnectionFactory(new Driver(), connectionUrl, new Properties(), new EmptyCredentialProvider());
DriverConnectionFactory delegate = new DriverConnectionFactory(new Driver(), connectionUrl, new Properties(), new EmptyCredentialProvider(), OpenTelemetry.noop());
this.connectionFactory = new ConnectionCountingConnectionFactory(delegate);
return createH2QueryRunner(
ImmutableList.of(NATION, REGION),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.jdbc;

import io.opentelemetry.api.OpenTelemetry;
import io.trino.plugin.jdbc.credential.EmptyCredentialProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitSource;
Expand Down Expand Up @@ -45,7 +46,7 @@ public TestingDatabase()
String connectionUrl = "jdbc:h2:mem:" + databaseName + ";NON_KEYWORDS=KEY,VALUE"; // key and value are reserved keywords in H2 2.x
jdbcClient = new TestingH2JdbcClient(
new BaseJdbcConfig(),
new DriverConnectionFactory(new Driver(), connectionUrl, new Properties(), new EmptyCredentialProvider()));
new DriverConnectionFactory(new Driver(), connectionUrl, new Properties(), new EmptyCredentialProvider(), OpenTelemetry.noop()));

connection = DriverManager.getConnection(connectionUrl);
connection.createStatement().execute("CREATE SCHEMA example");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import io.airlift.configuration.ConfigBinder;
import io.opentelemetry.api.OpenTelemetry;
import io.trino.plugin.jdbc.BaseJdbcConfig;
import io.trino.plugin.jdbc.ConnectionFactory;
import io.trino.plugin.jdbc.DecimalModule;
Expand Down Expand Up @@ -54,11 +55,11 @@ public void configure(Binder binder)
@Provides
@Singleton
@ForBaseJdbc
public static ConnectionFactory createConnectionFactory(BaseJdbcConfig config, CredentialProvider credentialProvider)
public static ConnectionFactory createConnectionFactory(BaseJdbcConfig config, CredentialProvider credentialProvider, OpenTelemetry openTelemetry)
{
Properties properties = new Properties();
// The connector expects byte array for FixedString and String types
properties.setProperty(USE_BINARY_STRING.getKey(), "true");
return new ClickHouseConnectionFactory(new DriverConnectionFactory(new ClickHouseDriver(), config.getConnectionUrl(), properties, credentialProvider));
return new ClickHouseConnectionFactory(new DriverConnectionFactory(new ClickHouseDriver(), config.getConnectionUrl(), properties, credentialProvider, openTelemetry));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import io.opentelemetry.api.OpenTelemetry;
import io.trino.plugin.jdbc.BaseJdbcConfig;
import io.trino.plugin.jdbc.ConnectionFactory;
import io.trino.plugin.jdbc.DriverConnectionFactory;
Expand Down Expand Up @@ -45,13 +46,14 @@ public void configure(Binder binder)
@Provides
@Singleton
@ForBaseJdbc
public static ConnectionFactory createConnectionFactory(BaseJdbcConfig config, CredentialProvider credentialProvider)
public static ConnectionFactory createConnectionFactory(BaseJdbcConfig config, CredentialProvider credentialProvider, OpenTelemetry openTelemetry)
{
Properties connectionProperties = new Properties();
return new DriverConnectionFactory(
new Driver(),
config.getConnectionUrl(),
connectionProperties,
credentialProvider);
credentialProvider,
openTelemetry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.opentelemetry.api.OpenTelemetry;
import io.trino.plugin.jdbc.BaseJdbcConfig;
import io.trino.plugin.jdbc.ConnectionFactory;
import io.trino.plugin.jdbc.DriverConnectionFactory;
Expand All @@ -41,10 +42,10 @@ public void setup(Binder binder)
@Provides
@Singleton
@ForBaseJdbc
public static ConnectionFactory getConnectionFactory(BaseJdbcConfig config, CredentialProvider credentialProvider)
public static ConnectionFactory getConnectionFactory(BaseJdbcConfig config, CredentialProvider credentialProvider, OpenTelemetry openTelemetry)
throws SQLException
{
Properties connectionProperties = new Properties();
return new DriverConnectionFactory(DriverManager.getDriver(config.getConnectionUrl()), config.getConnectionUrl(), connectionProperties, credentialProvider);
return new DriverConnectionFactory(DriverManager.getDriver(config.getConnectionUrl()), config.getConnectionUrl(), connectionProperties, credentialProvider, openTelemetry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import io.opentelemetry.api.OpenTelemetry;
import io.trino.plugin.jdbc.BaseJdbcConfig;
import io.trino.plugin.jdbc.ConnectionFactory;
import io.trino.plugin.jdbc.DecimalModule;
Expand All @@ -28,6 +29,8 @@
import io.trino.plugin.jdbc.credential.CredentialProvider;
import org.apache.ignite.IgniteJdbcThinDriver;

import java.util.Properties;

import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.trino.plugin.jdbc.JdbcModule.bindTablePropertiesProvider;
Expand All @@ -48,11 +51,13 @@ public void configure(Binder binder)
@Provides
@Singleton
@ForBaseJdbc
public static ConnectionFactory createConnectionFactory(BaseJdbcConfig config, CredentialProvider credentialProvider)
public static ConnectionFactory createConnectionFactory(BaseJdbcConfig config, CredentialProvider credentialProvider, OpenTelemetry openTelemetry)
{
return new DriverConnectionFactory(
new IgniteJdbcThinDriver(),
config,
credentialProvider);
config.getConnectionUrl(),
new Properties(),
credentialProvider,
openTelemetry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import io.opentelemetry.api.OpenTelemetry;
import io.trino.plugin.jdbc.BaseJdbcConfig;
import io.trino.plugin.jdbc.ConnectionFactory;
import io.trino.plugin.jdbc.DecimalModule;
Expand Down Expand Up @@ -49,9 +50,9 @@ public void configure(Binder binder)
@Provides
@Singleton
@ForBaseJdbc
public static ConnectionFactory createConnectionFactory(BaseJdbcConfig config, CredentialProvider credentialProvider)
public static ConnectionFactory createConnectionFactory(BaseJdbcConfig config, CredentialProvider credentialProvider, OpenTelemetry openTelemetry)
{
return new DriverConnectionFactory(new Driver(), config.getConnectionUrl(), getConnectionProperties(), credentialProvider);
return new DriverConnectionFactory(new Driver(), config.getConnectionUrl(), getConnectionProperties(), credentialProvider, openTelemetry);
}

private static Properties getConnectionProperties()
Expand Down
Loading

0 comments on commit 7b269fa

Please sign in to comment.