Skip to content

Commit

Permalink
HBASE-28558 Fix constructors for sub classes of Connection (#5861)
Browse files Browse the repository at this point in the history
Signed-off-by: Guanghao Zhang <[email protected]>
Signed-off-by: GeorryHuang <[email protected]>
(cherry picked from commit e9ced39)
  • Loading branch information
Apache9 committed May 1, 2024
1 parent 49aad64 commit 72b5c8f
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Throwables;

/**
* A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of
Expand Down Expand Up @@ -74,6 +78,8 @@
@InterfaceAudience.Public
public class ConnectionFactory {

private static final Logger LOG = LoggerFactory.getLogger(ConnectionFactory.class);

public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL =
"hbase.client.async.connection.impl";

Expand Down Expand Up @@ -399,15 +405,35 @@ public static Connection createConnection(URI connectionUri, Configuration conf,
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
ConnectionRegistry registry = createConnectionRegistry(connectionUri, conf, user);
try {
// Default HCM#HCI is not accessible; make it so before invoking.
Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class,
ExecutorService.class, User.class, ConnectionRegistry.class, Map.class);
constructor.setAccessible(true);
ConnectionRegistry registry = connectionUri != null
? ConnectionRegistryFactory.create(connectionUri, conf, user)
: ConnectionRegistryFactory.create(conf, user);
return user.runAs((PrivilegedExceptionAction<Connection>) () -> (Connection) constructor
.newInstance(conf, pool, user, registry, connectionAttributes));
} catch (NoSuchMethodException e) {
LOG.debug("Constructor with connection registry not found for class {},"
+ " fallback to use old constructor", clazz.getName(), e);
} catch (Exception e) {
Throwables.throwIfInstanceOf(e, IOException.class);
Throwables.throwIfUnchecked(e);
throw new IOException(e);
}

try {
// Default HCM#HCI is not accessible; make it so before invoking.
Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class,
ExecutorService.class, User.class, Map.class);
constructor.setAccessible(true);
return user.runAs((PrivilegedExceptionAction<Connection>) () -> (Connection) constructor
.newInstance(conf, pool, user, connectionAttributes));
} catch (Exception e) {
Throwables.throwIfInstanceOf(e, IOException.class);
Throwables.throwIfUnchecked(e);
throw new IOException(e);
}
}, () -> TraceUtil.createSpan(ConnectionFactory.class.getSimpleName() + ".createConnection"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.ConnectionRegistry;
import org.apache.hadoop.hbase.client.Hbck;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
Expand Down Expand Up @@ -1606,7 +1607,9 @@ private static class ConfigurationCaptorConnection implements Connection {
private final Connection delegate;

public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user,
Map<String, byte[]> connectionAttributes) throws IOException {
ConnectionRegistry registry, Map<String, byte[]> connectionAttributes) throws IOException {
// here we do not use this registry, so close it...
registry.close();
Configuration confForDelegate = new Configuration(conf);
confForDelegate.unset(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL);
delegate = createConnection(confForDelegate, es, user, connectionAttributes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionRegistry;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
Expand Down Expand Up @@ -123,7 +124,7 @@ public static class MRSplitsConnection implements Connection {
static final AtomicInteger creations = new AtomicInteger(0);

MRSplitsConnection(Configuration conf, ExecutorService pool, User user,
Map<String, byte[]> connectionAttributes) throws IOException {
ConnectionRegistry registry, Map<String, byte[]> connectionAttributes) throws IOException {
this.configuration = conf;
creations.incrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
*/
package org.apache.hadoop.hbase.mapreduce;

import static org.junit.Assert.*;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -42,6 +42,7 @@
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionRegistry;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionLocator;
Expand Down Expand Up @@ -212,7 +213,7 @@ private static class ConnectionForMergeTesting implements Connection {
}

ConnectionForMergeTesting(Configuration conf, ExecutorService pool, User user,
Map<String, byte[]> connectionAttributes) throws IOException {
ConnectionRegistry registry, Map<String, byte[]> connectionAttributes) throws IOException {
}

@Override
Expand Down

0 comments on commit 72b5c8f

Please sign in to comment.