diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml index d4ee67f0511c..ea99023e959c 100644 --- a/hbase-client/pom.xml +++ b/hbase-client/pom.xml @@ -170,6 +170,11 @@ mockito-core test + + org.mockito + mockito-inline + test + org.hamcrest hamcrest-library diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java index 716fb4863fe8..f4ef4496dfcf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.lang.reflect.Constructor; +import java.net.URI; import java.security.PrivilegedExceptionAction; import java.util.Collections; import java.util.Map; @@ -89,41 +90,55 @@ protected ConnectionFactory() { * instance. Typical usage: * *
-   * Connection connection = ConnectionFactory.createConnection();
-   * Table table = connection.getTable(TableName.valueOf("mytable"));
-   * try {
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
    *   table.get(...);
    *   ...
-   * } finally {
-   *   table.close();
-   *   connection.close();
    * }
    * 
* * @return Connection object for conf */ public static Connection createConnection() throws IOException { - Configuration conf = HBaseConfiguration.create(); - return createConnection(conf, null, AuthUtil.loginClient(conf)); + return createConnection(HBaseConfiguration.create()); + } + + /** + * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all + * housekeeping for a connection to the cluster. All tables and interfaces created from returned + * connection share zookeeper connection, meta cache, and connections to region servers and + * masters.
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: + * + *
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
+   *   table.get(...);
+   *   ...
+   * }
+   * 
+ * + * @param connectionUri the connection uri for the hbase cluster + * @return Connection object for conf + */ + public static Connection createConnection(URI connectionUri) throws IOException { + return createConnection(connectionUri, HBaseConfiguration.create()); } /** * Create a new Connection instance using the passed conf instance. Connection * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces - * created from returned connection share zookeeper connection, meta cache, and connections to - * region servers and masters.
+ * created from returned connection share zookeeper connection(if used), meta cache, and + * connections to region servers and masters.
* The caller is responsible for calling {@link Connection#close()} on the returned connection * instance. Typical usage: * *
-   * Connection connection = ConnectionFactory.createConnection(conf);
-   * Table table = connection.getTable(TableName.valueOf("mytable"));
-   * try {
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
    *   table.get(...);
    *   ...
-   * } finally {
-   *   table.close();
-   *   connection.close();
    * }
    * 
* @@ -137,20 +152,41 @@ public static Connection createConnection(Configuration conf) throws IOException /** * Create a new Connection instance using the passed conf instance. Connection * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces - * created from returned connection share zookeeper connection, meta cache, and connections to - * region servers and masters.
+ * created from returned connection share zookeeper connection(if used), meta cache, and + * connections to region servers and masters.
* The caller is responsible for calling {@link Connection#close()} on the returned connection * instance. Typical usage: * *
-   * Connection connection = ConnectionFactory.createConnection(conf);
-   * Table table = connection.getTable(TableName.valueOf("mytable"));
-   * try {
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
+   *   table.get(...);
+   *   ...
+   * }
+   * 
+ * + * @param connectionUri the connection uri for the hbase cluster + * @param conf configuration + * @return Connection object for conf + */ + public static Connection createConnection(URI connectionUri, Configuration conf) + throws IOException { + return createConnection(connectionUri, conf, null, AuthUtil.loginClient(conf)); + } + + /** + * Create a new Connection instance using the passed conf instance. Connection + * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces + * created from returned connection share zookeeper connection(if used), meta cache, and + * connections to region servers and masters.
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: + * + *
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
    *   table.get(...);
    *   ...
-   * } finally {
-   *   table.close();
-   *   connection.close();
    * }
    * 
* @@ -166,20 +202,42 @@ public static Connection createConnection(Configuration conf, ExecutorService po /** * Create a new Connection instance using the passed conf instance. Connection * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces - * created from returned connection share zookeeper connection, meta cache, and connections to - * region servers and masters.
+ * created from returned connection share zookeeper connection(if used), meta cache, and + * connections to region servers and masters.
* The caller is responsible for calling {@link Connection#close()} on the returned connection * instance. Typical usage: * *
-   * Connection connection = ConnectionFactory.createConnection(conf);
-   * Table table = connection.getTable(TableName.valueOf("table1"));
-   * try {
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
+   *   table.get(...);
+   *   ...
+   * }
+   * 
+ * + * @param connectionUri the connection uri for the hbase cluster + * @param conf configuration + * @param pool the thread pool to use for batch operations + * @return Connection object for conf + */ + public static Connection createConnection(URI connectionUri, Configuration conf, + ExecutorService pool) throws IOException { + return createConnection(connectionUri, conf, pool, AuthUtil.loginClient(conf)); + } + + /** + * Create a new Connection instance using the passed conf instance. Connection + * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces + * created from returned connection share zookeeper connection(if used), meta cache, and + * connections to region servers and masters.
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: + * + *
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
    *   table.get(...);
    *   ...
-   * } finally {
-   *   table.close();
-   *   connection.close();
    * }
    * 
* @@ -194,20 +252,42 @@ public static Connection createConnection(Configuration conf, User user) throws /** * Create a new Connection instance using the passed conf instance. Connection * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces - * created from returned connection share zookeeper connection, meta cache, and connections to - * region servers and masters.
+ * created from returned connection share zookeeper connection(if used), meta cache, and + * connections to region servers and masters.
* The caller is responsible for calling {@link Connection#close()} on the returned connection * instance. Typical usage: * *
-   * Connection connection = ConnectionFactory.createConnection(conf);
-   * Table table = connection.getTable(TableName.valueOf("table1"));
-   * try {
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
+   *   table.get(...);
+   *   ...
+   * }
+   * 
+ * + * @param connectionUri the connection uri for the hbase cluster + * @param conf configuration + * @param user the user the connection is for + * @return Connection object for conf + */ + public static Connection createConnection(URI connectionUri, Configuration conf, User user) + throws IOException { + return createConnection(connectionUri, conf, null, user); + } + + /** + * Create a new Connection instance using the passed conf instance. Connection + * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces + * created from returned connection share zookeeper connection(if used), meta cache, and + * connections to region servers and masters.
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: + * + *
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
    *   table.get(...);
    *   ...
-   * } finally {
-   *   table.close();
-   *   connection.close();
    * }
    * 
* @@ -224,20 +304,43 @@ public static Connection createConnection(Configuration conf, ExecutorService po /** * Create a new Connection instance using the passed conf instance. Connection * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces - * created from returned connection share zookeeper connection, meta cache, and connections to - * region servers and masters.
+ * created from returned connection share zookeeper connection(if used), meta cache, and + * connections to region servers and masters.
* The caller is responsible for calling {@link Connection#close()} on the returned connection * instance. Typical usage: * *
-   * Connection connection = ConnectionFactory.createConnection(conf);
-   * Table table = connection.getTable(TableName.valueOf("table1"));
-   * try {
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
+   *   table.get(...);
+   *   ...
+   * }
+   * 
+ * + * @param connectionUri the connection uri for the hbase cluster + * @param conf configuration + * @param user the user the connection is for + * @param pool the thread pool to use for batch operations + * @return Connection object for conf + */ + public static Connection createConnection(URI connectionUri, Configuration conf, + ExecutorService pool, User user) throws IOException { + return createConnection(connectionUri, conf, pool, user, Collections.emptyMap()); + } + + /** + * Create a new Connection instance using the passed conf instance. Connection + * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces + * created from returned connection share zookeeper connection(if used), meta cache, and + * connections to region servers and masters.
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: + * + *
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
    *   table.get(...);
    *   ...
-   * } finally {
-   *   table.close();
-   *   connection.close();
    * }
    * 
* @@ -249,6 +352,37 @@ public static Connection createConnection(Configuration conf, ExecutorService po */ public static Connection createConnection(Configuration conf, ExecutorService pool, final User user, Map connectionAttributes) throws IOException { + return createConnection(null, conf, pool, user, connectionAttributes); + } + + /** + * Create a new Connection instance using the passed conf instance. Connection + * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces + * created from returned connection share zookeeper connection(if used), meta cache, and + * connections to region servers and masters.
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: + * + *
+   * Connection connection = ConnectionFactory.createConnection(conf);
+   * Table table = connection.getTable(TableName.valueOf("table1"));
+   * try (Connection connection = ConnectionFactory.createConnection(conf);
+   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
+   *   table.get(...);
+   *   ...
+   * }
+   * 
+ * + * @param connectionUri the connection uri for the hbase cluster + * @param conf configuration + * @param user the user the connection is for + * @param pool the thread pool to use for batch operations + * @param connectionAttributes attributes to be sent along to server during connection establish + * @return Connection object for conf + */ + public static Connection createConnection(URI connectionUri, Configuration conf, + ExecutorService pool, final User user, Map connectionAttributes) + throws IOException { Class clazz = conf.getClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, ConnectionOverAsyncConnection.class, Connection.class); if (clazz != ConnectionOverAsyncConnection.class) { @@ -263,7 +397,7 @@ public static Connection createConnection(Configuration conf, ExecutorService po throw new IOException(e); } } else { - return FutureUtils.get(createAsyncConnection(conf, user, connectionAttributes)) + return FutureUtils.get(createAsyncConnection(connectionUri, conf, user, connectionAttributes)) .toConnection(); } } @@ -277,6 +411,16 @@ public static CompletableFuture createAsyncConnection() { return createAsyncConnection(HBaseConfiguration.create()); } + /** + * Call {@link #createAsyncConnection(URI, Configuration)} using default HBaseConfiguration. + * @param connectionUri the connection uri for the hbase cluster + * @see #createAsyncConnection(URI, Configuration) + * @return AsyncConnection object wrapped by CompletableFuture + */ + public static CompletableFuture createAsyncConnection(URI connectionUri) { + return createAsyncConnection(connectionUri, HBaseConfiguration.create()); + } + /** * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code conf} and a * User object created by {@link UserProvider}. The given {@code conf} will also be used to @@ -287,6 +431,21 @@ public static CompletableFuture createAsyncConnection() { * @see UserProvider */ public static CompletableFuture createAsyncConnection(Configuration conf) { + return createAsyncConnection(null, conf); + } + + /** + * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code connectionUri}, + * {@code conf} and a User object created by {@link UserProvider}. The given {@code conf} will + * also be used to initialize the {@link UserProvider}. + * @param connectionUri the connection uri for the hbase cluster + * @param conf configuration + * @return AsyncConnection object wrapped by CompletableFuture + * @see #createAsyncConnection(Configuration, User) + * @see UserProvider + */ + public static CompletableFuture createAsyncConnection(URI connectionUri, + Configuration conf) { User user; try { user = AuthUtil.loginClient(conf); @@ -295,7 +454,7 @@ public static CompletableFuture createAsyncConnection(Configura future.completeExceptionally(e); return future; } - return createAsyncConnection(conf, user); + return createAsyncConnection(connectionUri, conf, user); } /** @@ -315,7 +474,28 @@ public static CompletableFuture createAsyncConnection(Configura */ public static CompletableFuture createAsyncConnection(Configuration conf, final User user) { - return createAsyncConnection(conf, user, null); + return createAsyncConnection(null, conf, user); + } + + /** + * Create a new AsyncConnection instance using the passed {@code connectionUri}, {@code conf} and + * {@code user}. AsyncConnection encapsulates all housekeeping for a connection to the cluster. + * All tables and interfaces created from returned connection share zookeeper connection(if used), + * meta cache, and connections to region servers and masters. + *

+ * The caller is responsible for calling {@link AsyncConnection#close()} on the returned + * connection instance. + *

+ * Usually you should only create one AsyncConnection instance in your code and use it everywhere + * as it is thread safe. + * @param connectionUri the connection uri for the hbase cluster + * @param conf configuration + * @param user the user the asynchronous connection is for + * @return AsyncConnection object wrapped by CompletableFuture + */ + public static CompletableFuture createAsyncConnection(URI connectionUri, + Configuration conf, final User user) { + return createAsyncConnection(connectionUri, conf, user, null); } /** @@ -336,9 +516,38 @@ public static CompletableFuture createAsyncConnection(Configura */ public static CompletableFuture createAsyncConnection(Configuration conf, final User user, Map connectionAttributes) { + return createAsyncConnection(null, conf, user, connectionAttributes); + } + + /** + * Create a new AsyncConnection instance using the passed {@code connectionUri}, {@code conf} and + * {@code user}. AsyncConnection encapsulates all housekeeping for a connection to the cluster. + * All tables and interfaces created from returned connection share zookeeper connection(if used), + * meta cache, and connections to region servers and masters. + *

+ * The caller is responsible for calling {@link AsyncConnection#close()} on the returned + * connection instance. + *

+ * Usually you should only create one AsyncConnection instance in your code and use it everywhere + * as it is thread safe. + * @param connectionUri the connection uri for the hbase cluster + * @param conf configuration + * @param user the user the asynchronous connection is for + * @param connectionAttributes attributes to be sent along to server during connection establish + * @return AsyncConnection object wrapped by CompletableFuture + */ + public static CompletableFuture createAsyncConnection(URI connectionUri, + Configuration conf, final User user, Map connectionAttributes) { return TraceUtil.tracedFuture(() -> { + ConnectionRegistry registry; + try { + registry = connectionUri != null + ? ConnectionRegistryFactory.create(connectionUri, conf, user) + : ConnectionRegistryFactory.create(conf, user); + } catch (Exception e) { + return FutureUtils.failedFuture(e); + } CompletableFuture future = new CompletableFuture<>(); - ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf, user); addListener(registry.getClusterId(), (clusterId, error) -> { if (error != null) { registry.close(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java index 415d46397b8f..5eef2c5f93e1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java @@ -17,27 +17,77 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY; - +import java.io.IOException; +import java.net.URI; +import java.util.ServiceLoader; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; /** - * Factory class to get the instance of configured connection registry. + * The entry point for creating a {@link ConnectionRegistry}. */ @InterfaceAudience.Private final class ConnectionRegistryFactory { + private static final Logger LOG = LoggerFactory.getLogger(ConnectionRegistryFactory.class); + + private static final ImmutableMap CREATORS; + static { + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (ConnectionRegistryURIFactory factory : ServiceLoader + .load(ConnectionRegistryURIFactory.class)) { + builder.put(factory.getScheme().toLowerCase(), factory); + } + // throw IllegalArgumentException if there are duplicated keys + CREATORS = builder.buildOrThrow(); + } + private ConnectionRegistryFactory() { } - /** Returns The connection registry implementation to use. */ - static ConnectionRegistry getRegistry(Configuration conf, User user) { + /** + * Returns the connection registry implementation to use, for the given connection url + * {@code uri}. + *

+ * We use {@link ServiceLoader} to load different implementations, and use the scheme of the given + * {@code uri} to select. And if there is no protocol specified, or we can not find a + * {@link ConnectionRegistryURIFactory} implementation for the given scheme, we will fallback to + * use the old way to create the {@link ConnectionRegistry}. Notice that, if fallback happens, the + * specified connection url {@code uri} will not take effect, we will load all the related + * configurations from the given Configuration instance {@code conf} + */ + static ConnectionRegistry create(URI uri, Configuration conf, User user) throws IOException { + if (StringUtils.isBlank(uri.getScheme())) { + LOG.warn("No scheme specified for {}, fallback to use old way", uri); + return create(conf, user); + } + ConnectionRegistryURIFactory creator = CREATORS.get(uri.getScheme().toLowerCase()); + if (creator == null) { + LOG.warn("No creator registered for {}, fallback to use old way", uri); + return create(conf, user); + } + return creator.create(uri, conf, user); + } + + /** + * Returns the connection registry implementation to use. + *

+ * This is used when we do not have a connection url, we will use the old way to load the + * connection registry, by checking the + * {@literal HConstants#CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY} configuration. + */ + static ConnectionRegistry create(Configuration conf, User user) { Class clazz = - conf.getClass(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, RpcConnectionRegistry.class, - ConnectionRegistry.class); + conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, + RpcConnectionRegistry.class, ConnectionRegistry.class); return ReflectionUtils.newInstance(clazz, conf, user); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryURIFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryURIFactory.java new file mode 100644 index 000000000000..ab2037a1c138 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryURIFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.net.URI; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.security.User; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * For creating different {@link ConnectionRegistry} implementation. + */ +@InterfaceAudience.Private +public interface ConnectionRegistryURIFactory { + + /** + * Instantiate the {@link ConnectionRegistry} using the given parameters. + */ + ConnectionRegistry create(URI uri, Configuration conf, User user) throws IOException; + + /** + * The scheme for this implementation. Used to register this URI factory to the + * {@link ConnectionRegistryFactory}. + */ + String getScheme(); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistryCreator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistryCreator.java new file mode 100644 index 000000000000..cb2338b1429d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistryCreator.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.net.URI; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.security.User; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Connection registry creator implementation for creating {@link RpcConnectionRegistry}. + */ +@InterfaceAudience.Private +public class RpcConnectionRegistryCreator implements ConnectionRegistryURIFactory { + + private static final Logger LOG = LoggerFactory.getLogger(RpcConnectionRegistryCreator.class); + + @Override + public ConnectionRegistry create(URI uri, Configuration conf, User user) throws IOException { + assert getScheme().equals(uri.getScheme()); + LOG.debug("connect to hbase cluster with rpc bootstrap servers='{}'", uri.getAuthority()); + Configuration c = new Configuration(conf); + c.set(RpcConnectionRegistry.BOOTSTRAP_NODES, uri.getAuthority()); + return new RpcConnectionRegistry(c, user); + } + + @Override + public String getScheme() { + return "hbase+rpc"; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistryCreator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistryCreator.java new file mode 100644 index 000000000000..8aa51e04fe4d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistryCreator.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.net.URI; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.security.User; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Connection registry creator implementation for creating {@link ZKConnectionRegistry}. + */ +@InterfaceAudience.Private +public class ZKConnectionRegistryCreator implements ConnectionRegistryURIFactory { + + private static final Logger LOG = LoggerFactory.getLogger(ZKConnectionRegistryCreator.class); + + @Override + public ConnectionRegistry create(URI uri, Configuration conf, User user) throws IOException { + assert getScheme().equals(uri.getScheme()); + LOG.debug("connect to hbase cluster with zk quorum='{}' and parent='{}'", uri.getAuthority(), + uri.getPath()); + Configuration c = new Configuration(conf); + c.set(HConstants.CLIENT_ZOOKEEPER_QUORUM, uri.getAuthority()); + c.set(HConstants.ZOOKEEPER_ZNODE_PARENT, uri.getPath()); + return new ZKConnectionRegistry(c, user); + } + + @Override + public String getScheme() { + return "hbase+zk"; + } +} diff --git a/hbase-client/src/main/resources/META-INF/services/org.apache.hadoop.hbase.client.ConnectionRegistryURIFactory b/hbase-client/src/main/resources/META-INF/services/org.apache.hadoop.hbase.client.ConnectionRegistryURIFactory new file mode 100644 index 000000000000..b25a569776f1 --- /dev/null +++ b/hbase-client/src/main/resources/META-INF/services/org.apache.hadoop.hbase.client.ConnectionRegistryURIFactory @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.hadoop.hbase.client.RpcConnectionRegistryCreator +org.apache.hadoop.hbase.client.ZKConnectionRegistryCreator \ No newline at end of file diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryCreatorUriParsing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryCreatorUriParsing.java new file mode 100644 index 000000000000..4dabd894b5b4 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryCreatorUriParsing.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.mockStatic; + +import java.net.URI; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.ReflectionUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.ArgumentCaptor; +import org.mockito.MockedConstruction; +import org.mockito.MockedStatic; + +/** + * Make sure we can successfully parse the URI component + */ +@Category({ ClientTests.class, SmallTests.class }) +public class TestConnectionRegistryCreatorUriParsing { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestConnectionRegistryCreatorUriParsing.class); + + private Configuration conf; + + private User user; + + private MockedConstruction mockedRpcRegistry; + + private MockedConstruction mockedZkRegistry; + + private MockedStatic mockedReflectionUtils; + + private List args; + + @Before + public void setUp() { + conf = HBaseConfiguration.create(); + user = mock(User.class); + args = null; + mockedRpcRegistry = mockConstruction(RpcConnectionRegistry.class, (mock, context) -> { + args = context.arguments(); + }); + mockedZkRegistry = mockConstruction(ZKConnectionRegistry.class, (mock, context) -> { + args = context.arguments(); + }); + mockedReflectionUtils = mockStatic(ReflectionUtils.class); + } + + @After + public void tearDown() { + mockedRpcRegistry.closeOnDemand(); + mockedZkRegistry.closeOnDemand(); + mockedReflectionUtils.closeOnDemand(); + } + + @Test + public void testParseRpcSingle() throws Exception { + ConnectionRegistryFactory.create(new URI("hbase+rpc://server1:123"), conf, user); + assertEquals(1, mockedRpcRegistry.constructed().size()); + assertSame(user, args.get(1)); + Configuration conf = (Configuration) args.get(0); + assertEquals("server1:123", conf.get(RpcConnectionRegistry.BOOTSTRAP_NODES)); + } + + @Test + public void testParseRpcMultiple() throws Exception { + ConnectionRegistryFactory.create(new URI("hbase+rpc://server1:123,server2:456,server3:789"), + conf, user); + assertEquals(1, mockedRpcRegistry.constructed().size()); + assertSame(user, args.get(1)); + Configuration conf = (Configuration) args.get(0); + assertEquals("server1:123,server2:456,server3:789", + conf.get(RpcConnectionRegistry.BOOTSTRAP_NODES)); + } + + @Test + public void testParseZkSingle() throws Exception { + ConnectionRegistryFactory.create(new URI("hbase+zk://server1:123/root"), conf, user); + assertEquals(1, mockedZkRegistry.constructed().size()); + assertSame(user, args.get(1)); + Configuration conf = (Configuration) args.get(0); + assertEquals("server1:123", conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM)); + assertEquals("/root", conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); + } + + @Test + public void testParseZkMultiple() throws Exception { + ConnectionRegistryFactory + .create(new URI("hbase+zk://server1:123,server2:456,server3:789/root/path"), conf, user); + assertEquals(1, mockedZkRegistry.constructed().size()); + assertSame(user, args.get(1)); + Configuration conf = (Configuration) args.get(0); + assertEquals("server1:123,server2:456,server3:789", + conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM)); + assertEquals("/root/path", conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); + } + + @Test + public void testFallbackNoScheme() throws Exception { + conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class, + ConnectionRegistry.class); + ConnectionRegistryFactory.create(new URI("server1:2181/path"), conf, user); + ArgumentCaptor> clazzCaptor = ArgumentCaptor.forClass(Class.class); + ArgumentCaptor argsCaptor = ArgumentCaptor.forClass(Object[].class); + mockedReflectionUtils + .verify(() -> ReflectionUtils.newInstance(clazzCaptor.capture(), argsCaptor.capture())); + assertEquals(ZKConnectionRegistry.class, clazzCaptor.getValue()); + assertSame(conf, argsCaptor.getValue()[0]); + assertSame(user, argsCaptor.getValue()[1]); + } + + @Test + public void testFallbackNoCreator() throws Exception { + conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, RpcConnectionRegistry.class, + ConnectionRegistry.class); + ConnectionRegistryFactory.create(new URI("hbase+tls://server1:123/path"), conf, user); + ArgumentCaptor> clazzCaptor = ArgumentCaptor.forClass(Class.class); + ArgumentCaptor argsCaptor = ArgumentCaptor.forClass(Object[].class); + mockedReflectionUtils + .verify(() -> ReflectionUtils.newInstance(clazzCaptor.capture(), argsCaptor.capture())); + assertEquals(RpcConnectionRegistry.class, clazzCaptor.getValue()); + assertSame(conf, argsCaptor.getValue()[0]); + assertSame(user, argsCaptor.getValue()[1]); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java index 7225f92b7ff9..ed90863763a7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java @@ -64,7 +64,7 @@ private static AsyncClusterConnection createAsyncClusterConnection(Configuration */ public static AsyncClusterConnection createAsyncClusterConnection(Configuration conf, SocketAddress localAddress, User user) throws IOException { - return createAsyncClusterConnection(conf, ConnectionRegistryFactory.getRegistry(conf, user), + return createAsyncClusterConnection(conf, ConnectionRegistryFactory.create(conf, user), localAddress, user); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java index 0ff105743e0c..031dff736c84 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java @@ -60,7 +60,7 @@ protected static void startClusterAndCreateTable() throws Exception { UTIL.getAdmin().createTable(td, SPLIT_KEYS); UTIL.waitTableAvailable(TABLE_NAME); try (ConnectionRegistry registry = - ConnectionRegistryFactory.getRegistry(UTIL.getConfiguration(), User.getCurrent())) { + ConnectionRegistryFactory.create(UTIL.getConfiguration(), User.getCurrent())) { RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry); } UTIL.getAdmin().balancerSwitch(false, true); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java index da400f29c0c6..bb0eb31d2549 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java @@ -56,7 +56,7 @@ public static void setUpBeforeClass() throws Exception { TestAsyncAdminBase.setUpBeforeClass(); HBaseTestingUtil.setReplicas(TEST_UTIL.getAdmin(), TableName.META_TABLE_NAME, 3); try (ConnectionRegistry registry = - ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent())) { + ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent())) { RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, registry); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java index 90d2cb51e8cf..e14cd32a3889 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java @@ -107,8 +107,7 @@ protected void before() throws Throwable { testUtil = miniClusterRule.getTestingUtility(); HBaseTestingUtil.setReplicas(admin, TableName.META_TABLE_NAME, 3); testUtil.waitUntilNoRegionsInTransition(); - registry = - ConnectionRegistryFactory.getRegistry(testUtil.getConfiguration(), User.getCurrent()); + registry = ConnectionRegistryFactory.create(testUtil.getConfiguration(), User.getCurrent()); RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(testUtil, registry); admin.balancerSwitch(false).get(); locator = new AsyncMetaRegionLocator(registry); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java index a6d0ab81f912..6a5230b3a128 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java @@ -128,7 +128,7 @@ public void setUpBeforeTest() throws InterruptedException, ExecutionException, I // Enable meta replica LoadBalance mode for this connection. c.set(RegionLocator.LOCATOR_META_REPLICAS_MODE, metaReplicaMode.toString()); ConnectionRegistry registry = - ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent()); + ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent()); conn = new AsyncConnectionImpl(c, registry, registry.getClusterId().get(), null, User.getCurrent()); locator = new AsyncNonMetaRegionLocator(conn); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java index 50c9ab9f5657..439d527effca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java @@ -125,7 +125,7 @@ public static void setUp() throws Exception { TEST_UTIL.startMiniCluster(3); TEST_UTIL.getAdmin().balancerSwitch(false, true); ConnectionRegistry registry = - ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent()); + ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent()); CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, registry.getClusterId().get(), null, User.getCurrent()); LOCATOR = new AsyncNonMetaRegionLocator(CONN); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java index bacd7bb32d70..2291c28a7c85 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java @@ -100,7 +100,7 @@ public static void setUp() throws Exception { TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLE_NAME); ConnectionRegistry registry = - ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent()); + ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent()); CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, registry.getClusterId().get(), null, User.getCurrent()); LOCATOR = CONN.getLocator(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java index 3c8327145f32..baa4ee74ade9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java @@ -73,7 +73,7 @@ public static void setUpBeforeClass() throws Exception { TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLE_NAME); ConnectionRegistry registry = - ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent()); + ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent()); CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, registry.getClusterId().get(), null, User.getCurrent()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java index 0de59a4c32bf..2803db20e710 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java @@ -95,8 +95,7 @@ public static void setUp() throws Exception { FailPrimaryMetaScanCp.class.getName()); UTIL.startMiniCluster(3); HBaseTestingUtil.setReplicas(UTIL.getAdmin(), TableName.META_TABLE_NAME, 3); - try (ConnectionRegistry registry = - ConnectionRegistryFactory.getRegistry(conf, User.getCurrent())) { + try (ConnectionRegistry registry = ConnectionRegistryFactory.create(conf, User.getCurrent())) { RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry); } try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBasicReadWriteWithDifferentConnectionRegistries.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBasicReadWriteWithDifferentConnectionRegistries.java new file mode 100644 index 000000000000..5746ffa67f6c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBasicReadWriteWithDifferentConnectionRegistries.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertFalse; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNameTestRule; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test basic read write operation with different {@link ConnectionRegistry} implementations. + */ +@RunWith(Parameterized.class) +@Category({ MediumTests.class, ClientTests.class }) +public class TestBasicReadWriteWithDifferentConnectionRegistries { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBasicReadWriteWithDifferentConnectionRegistries.class); + + private static final Logger LOG = + LoggerFactory.getLogger(TestBasicReadWriteWithDifferentConnectionRegistries.class); + + private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); + + public enum RegistryImpl { + ZK, + RPC, + ZK_URI, + RPC_URI + } + + @Parameter + public RegistryImpl impl; + + @Rule + public final TableNameTestRule name = new TableNameTestRule(); + + private byte[] FAMILY = Bytes.toBytes("family"); + + private Connection conn; + + @Parameters(name = "{index}: impl={0}") + public static List data() { + List data = new ArrayList(); + for (RegistryImpl impl : RegistryImpl.values()) { + data.add(new Object[] { impl }); + } + return data; + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + UTIL.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + switch (impl) { + case ZK: { + Configuration conf = HBaseConfiguration.create(); + conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, + ZKConnectionRegistry.class, ConnectionRegistry.class); + String quorum = UTIL.getZkCluster().getAddress().toString(); + String path = UTIL.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT); + conf.set(HConstants.CLIENT_ZOOKEEPER_QUORUM, quorum); + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, path); + LOG.info("connect to cluster through zk quorum={} and parent={}", quorum, path); + conn = ConnectionFactory.createConnection(conf); + break; + } + case RPC: { + Configuration conf = HBaseConfiguration.create(); + conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, + RpcConnectionRegistry.class, ConnectionRegistry.class); + String bootstrapServers = + UTIL.getMiniHBaseCluster().getMaster().getServerName().getAddress().toString(); + conf.set(RpcConnectionRegistry.BOOTSTRAP_NODES, bootstrapServers); + LOG.info("connect to cluster through rpc bootstrap servers={}", bootstrapServers); + conn = ConnectionFactory.createConnection(conf); + break; + } + case ZK_URI: { + String quorum = UTIL.getZkCluster().getAddress().toString(); + String path = UTIL.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT); + URI connectionUri = new URI("hbase+zk://" + quorum + path); + LOG.info("connect to cluster through connection url: {}", connectionUri); + conn = ConnectionFactory.createConnection(connectionUri); + break; + } + case RPC_URI: { + URI connectionUri = new URI("hbase+rpc://" + + UTIL.getMiniHBaseCluster().getMaster().getServerName().getAddress().toString()); + LOG.info("connect to cluster through connection url: {}", connectionUri); + conn = ConnectionFactory.createConnection(connectionUri); + break; + } + default: + throw new IllegalArgumentException("Unknown impl: " + impl); + } + try (Admin admin = conn.getAdmin()) { + admin.createTable(TableDescriptorBuilder.newBuilder(name.getTableName()) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build()); + } + } + + @After + public void tearDown() throws Exception { + TableName tableName = name.getTableName(); + try (Admin admin = conn.getAdmin()) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + conn.close(); + } + + @Test + public void testReadWrite() throws Exception { + byte[] row = Bytes.toBytes("row"); + byte[] qualifier = Bytes.toBytes("qualifier"); + byte[] value = Bytes.toBytes("value"); + try (Table table = conn.getTable(name.getTableName())) { + Put put = new Put(row).addColumn(FAMILY, qualifier, value); + table.put(put); + Result result = table.get(new Get(row)); + assertArrayEquals(value, result.getValue(FAMILY, qualifier)); + table.delete(new Delete(row)); + assertFalse(table.exists(new Get(row))); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java index 5c78e53f7e60..12f278ebbfd7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java @@ -77,8 +77,7 @@ public static void setUp() throws Exception { () -> TEST_UTIL.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME).size() >= numOfMetaReplica); - registry = - ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent()); + registry = ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent()); CONN = new AsyncConnectionImpl(conf, registry, registry.getClusterId().get(), null, User.getCurrent()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java index beb054eaf366..29223dea5dbe 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java @@ -64,8 +64,7 @@ public class TestMetaRegionLocationCache { public static void setUp() throws Exception { TEST_UTIL.startMiniCluster(3); HBaseTestingUtil.setReplicas(TEST_UTIL.getAdmin(), TableName.META_TABLE_NAME, 3); - REGISTRY = - ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent()); + REGISTRY = ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent()); RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY); TEST_UTIL.getAdmin().balancerSwitch(false, true); }