From 721fe2b1124eb2aec1df8643d48580827c43679b Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Wed, 1 May 2024 20:41:42 +0800 Subject: [PATCH] HBASE-28558 Fix constructors for sub classes of Connection (#5861) Signed-off-by: Guanghao Zhang Signed-off-by: GeorryHuang --- .../hbase/client/ConnectionFactory.java | 49 +++++++++++++++---- .../mapreduce/TestHFileOutputFormat2.java | 11 +++-- .../TestMultiTableInputFormatBase.java | 3 +- .../mapreduce/TestTableInputFormatBase.java | 3 +- 4 files changed, 51 insertions(+), 15 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java index f4ef4496dfcf..b9b156bf36d3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java @@ -36,6 +36,10 @@ import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.base.Throwables; /** * A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of @@ -74,6 +78,8 @@ @InterfaceAudience.Public public class ConnectionFactory { + private static final Logger LOG = LoggerFactory.getLogger(ConnectionFactory.class); + public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL = "hbase.client.async.connection.impl"; @@ -386,16 +392,39 @@ public static Connection createConnection(URI connectionUri, Configuration conf, Class clazz = conf.getClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, ConnectionOverAsyncConnection.class, Connection.class); if (clazz != ConnectionOverAsyncConnection.class) { - try { - // Default HCM#HCI is not accessible; make it so before invoking. - Constructor constructor = clazz.getDeclaredConstructor(Configuration.class, - ExecutorService.class, User.class, Map.class); - constructor.setAccessible(true); - return user.runAs((PrivilegedExceptionAction) () -> (Connection) constructor - .newInstance(conf, pool, user, connectionAttributes)); - } catch (Exception e) { - throw new IOException(e); - } + return TraceUtil.trace(() -> { + try { + // Default HCM#HCI is not accessible; make it so before invoking. + Constructor constructor = clazz.getDeclaredConstructor(Configuration.class, + ExecutorService.class, User.class, ConnectionRegistry.class, Map.class); + constructor.setAccessible(true); + ConnectionRegistry registry = connectionUri != null + ? ConnectionRegistryFactory.create(connectionUri, conf, user) + : ConnectionRegistryFactory.create(conf, user); + return user.runAs((PrivilegedExceptionAction) () -> (Connection) constructor + .newInstance(conf, pool, user, registry, connectionAttributes)); + } catch (NoSuchMethodException e) { + LOG.debug("Constructor with connection registry not found for class {}," + + " fallback to use old constructor", clazz.getName(), e); + } catch (Exception e) { + Throwables.throwIfInstanceOf(e, IOException.class); + Throwables.throwIfUnchecked(e); + throw new IOException(e); + } + + try { + // Default HCM#HCI is not accessible; make it so before invoking. + Constructor constructor = clazz.getDeclaredConstructor(Configuration.class, + ExecutorService.class, User.class, Map.class); + constructor.setAccessible(true); + return user.runAs((PrivilegedExceptionAction) () -> (Connection) constructor + .newInstance(conf, pool, user, connectionAttributes)); + } catch (Exception e) { + Throwables.throwIfInstanceOf(e, IOException.class); + Throwables.throwIfUnchecked(e); + throw new IOException(e); + } + }, () -> TraceUtil.createSpan(ConnectionFactory.class.getSimpleName() + ".createConnection")); } else { return FutureUtils.get(createAsyncConnection(connectionUri, conf, user, connectionAttributes)) .toConnection(); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index fc7f66129d35..e67ee3dbb736 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.mapreduce; -import static org.apache.hadoop.hbase.client.ConnectionFactory.createAsyncConnection; import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -79,6 +78,7 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.ConnectionRegistry; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.Hbck; import org.apache.hadoop.hbase.client.Put; @@ -1668,9 +1668,14 @@ private static class ConfigurationCaptorConnection implements Connection { private final Connection delegate; public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user, - Map connectionAttributes) throws IOException { + ConnectionRegistry registry, Map connectionAttributes) throws IOException { + // here we do not use this registry, so close it... + registry.close(); + // here we use createAsyncConnection, to avoid infinite recursive as we reset the Connection + // implementation in below method delegate = - FutureUtils.get(createAsyncConnection(conf, user, connectionAttributes)).toConnection(); + FutureUtils.get(ConnectionFactory.createAsyncConnection(conf, user, connectionAttributes)) + .toConnection(); final String uuid = conf.get(UUID_KEY); if (uuid != null) { diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormatBase.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormatBase.java index 0c879bd5ace3..7c136fa2a19f 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormatBase.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormatBase.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.BufferedMutatorParams; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionRegistry; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RegionLocator; @@ -124,7 +125,7 @@ public static class MRSplitsConnection implements Connection { static final AtomicInteger creations = new AtomicInteger(0); MRSplitsConnection(Configuration conf, ExecutorService pool, User user, - Map connectionAttributes) throws IOException { + ConnectionRegistry registry, Map connectionAttributes) throws IOException { this.configuration = conf; creations.incrementAndGet(); } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java index f41282b8f4f8..7b2170d19520 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.BufferedMutatorParams; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionRegistry; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; @@ -213,7 +214,7 @@ private static class ConnectionForMergeTesting implements Connection { } ConnectionForMergeTesting(Configuration conf, ExecutorService pool, User user, - Map connectionAttributes) throws IOException { + ConnectionRegistry registry, Map connectionAttributes) throws IOException { } @Override