Skip to content

Commit

Permalink
HBASE-28558 Fix constructors for sub classes of Connection (apache#5861)
Browse files Browse the repository at this point in the history
Signed-off-by: Guanghao Zhang <[email protected]>
Signed-off-by: GeorryHuang <[email protected]>
  • Loading branch information
Apache9 authored and vinayakphegde committed May 21, 2024
1 parent 7cf4da6 commit 721fe2b
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Throwables;

/**
* A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of
Expand Down Expand Up @@ -74,6 +78,8 @@
@InterfaceAudience.Public
public class ConnectionFactory {

private static final Logger LOG = LoggerFactory.getLogger(ConnectionFactory.class);

public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL =
"hbase.client.async.connection.impl";

Expand Down Expand Up @@ -386,16 +392,39 @@ public static Connection createConnection(URI connectionUri, Configuration conf,
Class<?> clazz = conf.getClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
ConnectionOverAsyncConnection.class, Connection.class);
if (clazz != ConnectionOverAsyncConnection.class) {
try {
// Default HCM#HCI is not accessible; make it so before invoking.
Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class,
ExecutorService.class, User.class, Map.class);
constructor.setAccessible(true);
return user.runAs((PrivilegedExceptionAction<Connection>) () -> (Connection) constructor
.newInstance(conf, pool, user, connectionAttributes));
} catch (Exception e) {
throw new IOException(e);
}
return TraceUtil.trace(() -> {
try {
// Default HCM#HCI is not accessible; make it so before invoking.
Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class,
ExecutorService.class, User.class, ConnectionRegistry.class, Map.class);
constructor.setAccessible(true);
ConnectionRegistry registry = connectionUri != null
? ConnectionRegistryFactory.create(connectionUri, conf, user)
: ConnectionRegistryFactory.create(conf, user);
return user.runAs((PrivilegedExceptionAction<Connection>) () -> (Connection) constructor
.newInstance(conf, pool, user, registry, connectionAttributes));
} catch (NoSuchMethodException e) {
LOG.debug("Constructor with connection registry not found for class {},"
+ " fallback to use old constructor", clazz.getName(), e);
} catch (Exception e) {
Throwables.throwIfInstanceOf(e, IOException.class);
Throwables.throwIfUnchecked(e);
throw new IOException(e);
}

try {
// Default HCM#HCI is not accessible; make it so before invoking.
Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class,
ExecutorService.class, User.class, Map.class);
constructor.setAccessible(true);
return user.runAs((PrivilegedExceptionAction<Connection>) () -> (Connection) constructor
.newInstance(conf, pool, user, connectionAttributes));
} catch (Exception e) {
Throwables.throwIfInstanceOf(e, IOException.class);
Throwables.throwIfUnchecked(e);
throw new IOException(e);
}
}, () -> TraceUtil.createSpan(ConnectionFactory.class.getSimpleName() + ".createConnection"));
} else {
return FutureUtils.get(createAsyncConnection(connectionUri, conf, user, connectionAttributes))
.toConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.mapreduce;

import static org.apache.hadoop.hbase.client.ConnectionFactory.createAsyncConnection;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -79,6 +78,7 @@
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.ConnectionRegistry;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.Hbck;
import org.apache.hadoop.hbase.client.Put;
Expand Down Expand Up @@ -1668,9 +1668,14 @@ private static class ConfigurationCaptorConnection implements Connection {
private final Connection delegate;

public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user,
Map<String, byte[]> connectionAttributes) throws IOException {
ConnectionRegistry registry, Map<String, byte[]> connectionAttributes) throws IOException {
// here we do not use this registry, so close it...
registry.close();
// here we use createAsyncConnection, to avoid infinite recursive as we reset the Connection
// implementation in below method
delegate =
FutureUtils.get(createAsyncConnection(conf, user, connectionAttributes)).toConnection();
FutureUtils.get(ConnectionFactory.createAsyncConnection(conf, user, connectionAttributes))
.toConnection();

final String uuid = conf.get(UUID_KEY);
if (uuid != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionRegistry;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionLocator;
Expand Down Expand Up @@ -124,7 +125,7 @@ public static class MRSplitsConnection implements Connection {
static final AtomicInteger creations = new AtomicInteger(0);

MRSplitsConnection(Configuration conf, ExecutorService pool, User user,
Map<String, byte[]> connectionAttributes) throws IOException {
ConnectionRegistry registry, Map<String, byte[]> connectionAttributes) throws IOException {
this.configuration = conf;
creations.incrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionRegistry;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
Expand Down Expand Up @@ -213,7 +214,7 @@ private static class ConnectionForMergeTesting implements Connection {
}

ConnectionForMergeTesting(Configuration conf, ExecutorService pool, User user,
Map<String, byte[]> connectionAttributes) throws IOException {
ConnectionRegistry registry, Map<String, byte[]> connectionAttributes) throws IOException {
}

@Override
Expand Down

0 comments on commit 721fe2b

Please sign in to comment.