Skip to content

Commit

Permalink
HBASE-28436 Use connection url to specify the connection registry inf…
Browse files Browse the repository at this point in the history
…ormation (apache#5770)

Signed-off-by: Istvan Toth <[email protected]>
Signed-off-by: Nick Dimiduk <[email protected]>
Reviewed-by: Bryan Beaudreault <[email protected]>
(cherry picked from commit e3761ba)
  • Loading branch information
Apache9 committed Apr 23, 2024
1 parent a36da5e commit 0f6a7a0
Show file tree
Hide file tree
Showing 22 changed files with 843 additions and 79 deletions.
5 changes: 5 additions & 0 deletions hbase-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {

try {
if (registry == null) {
this.registry = ConnectionRegistryFactory.getRegistry(conf, user);
this.registry = ConnectionRegistryFactory.create(conf, user);
} else {
this.registry = registry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,76 @@
*/
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;

import java.io.IOException;
import java.net.URI;
import java.util.ServiceLoader;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;

/**
* Factory class to get the instance of configured connection registry.
* The entry point for creating a {@link ConnectionRegistry}.
*/
@InterfaceAudience.Private
final class ConnectionRegistryFactory {

private static final Logger LOG = LoggerFactory.getLogger(ConnectionRegistryFactory.class);

private static final ImmutableMap<String, ConnectionRegistryURIFactory> CREATORS;
static {
ImmutableMap.Builder<String, ConnectionRegistryURIFactory> builder = ImmutableMap.builder();
for (ConnectionRegistryURIFactory factory : ServiceLoader
.load(ConnectionRegistryURIFactory.class)) {
builder.put(factory.getScheme().toLowerCase(), factory);
}
// throw IllegalArgumentException if there are duplicated keys
CREATORS = builder.buildOrThrow();
}

private ConnectionRegistryFactory() {
}

/** Returns The connection registry implementation to use. */
static ConnectionRegistry getRegistry(Configuration conf, User user) {
/**
* Returns the connection registry implementation to use, for the given connection url
* {@code uri}.
* <p/>
* We use {@link ServiceLoader} to load different implementations, and use the scheme of the given
* {@code uri} to select. And if there is no protocol specified, or we can not find a
* {@link ConnectionRegistryURIFactory} implementation for the given scheme, we will fallback to
* use the old way to create the {@link ConnectionRegistry}. Notice that, if fallback happens, the
* specified connection url {@code uri} will not take effect, we will load all the related
* configurations from the given Configuration instance {@code conf}
*/
static ConnectionRegistry create(URI uri, Configuration conf, User user) throws IOException {
if (StringUtils.isBlank(uri.getScheme())) {
LOG.warn("No scheme specified for {}, fallback to use old way", uri);
return create(conf, user);
}
ConnectionRegistryURIFactory creator = CREATORS.get(uri.getScheme().toLowerCase());
if (creator == null) {
LOG.warn("No creator registered for {}, fallback to use old way", uri);
return create(conf, user);
}
return creator.create(uri, conf, user);
}

/**
* Returns the connection registry implementation to use.
* <p/>
* This is used when we do not have a connection url, we will use the old way to load the
* connection registry, by checking the
* {@literal HConstants#CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY} configuration.
*/
static ConnectionRegistry create(Configuration conf, User user) {
Class<? extends ConnectionRegistry> clazz =
conf.getClass(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class,
conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class,
ConnectionRegistry.class);
return ReflectionUtils.newInstance(clazz, conf, user);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;

/**
* For creating different {@link ConnectionRegistry} implementation.
*/
@InterfaceAudience.Private
public interface ConnectionRegistryURIFactory {

/**
* Instantiate the {@link ConnectionRegistry} using the given parameters.
*/
ConnectionRegistry create(URI uri, Configuration conf, User user) throws IOException;

/**
* The scheme for this implementation. Used to register this URI factory to the
* {@link ConnectionRegistryFactory}.
*/
String getScheme();
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ public static void setupMasterlessConnection(Configuration conf) {
*/
static class MasterlessConnection extends ConnectionImplementation {
MasterlessConnection(Configuration conf, ExecutorService pool, User user,
Map<String, byte[]> requestAttributes) throws IOException {
super(conf, pool, user, requestAttributes);
ConnectionRegistry registry, Map<String, byte[]> requestAttributes) throws IOException {
super(conf, pool, user, registry, requestAttributes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Connection registry creator implementation for creating {@link RpcConnectionRegistry}.
*/
@InterfaceAudience.Private
public class RpcConnectionRegistryCreator implements ConnectionRegistryURIFactory {

private static final Logger LOG = LoggerFactory.getLogger(RpcConnectionRegistryCreator.class);

@Override
public ConnectionRegistry create(URI uri, Configuration conf, User user) throws IOException {
assert getScheme().equals(uri.getScheme());
LOG.debug("connect to hbase cluster with rpc bootstrap servers='{}'", uri.getAuthority());
Configuration c = new Configuration(conf);
c.set(RpcConnectionRegistry.BOOTSTRAP_NODES, uri.getAuthority());
return new RpcConnectionRegistry(c, user);
}

@Override
public String getScheme() {
return "hbase+rpc";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Connection registry creator implementation for creating {@link ZKConnectionRegistry}.
*/
@InterfaceAudience.Private
public class ZKConnectionRegistryCreator implements ConnectionRegistryURIFactory {

private static final Logger LOG = LoggerFactory.getLogger(ZKConnectionRegistryCreator.class);

@Override
public ConnectionRegistry create(URI uri, Configuration conf, User user) throws IOException {
assert getScheme().equals(uri.getScheme());
LOG.debug("connect to hbase cluster with zk quorum='{}' and parent='{}'", uri.getAuthority(),
uri.getPath());
Configuration c = new Configuration(conf);
c.set(HConstants.CLIENT_ZOOKEEPER_QUORUM, uri.getAuthority());
c.set(HConstants.ZOOKEEPER_ZNODE_PARENT, uri.getPath());
return new ZKConnectionRegistry(c, user);
}

@Override
public String getScheme() {
return "hbase+zk";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.hadoop.hbase.client.RpcConnectionRegistryCreator
org.apache.hadoop.hbase.client.ZKConnectionRegistryCreator
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,8 @@ static class RegionServerStoppedOnScannerOpenConnection extends ConnectionImplem
final ClientService.BlockingInterface stub;

RegionServerStoppedOnScannerOpenConnection(Configuration conf, ExecutorService pool, User user,
Map<String, byte[]> requestAttributes) throws IOException {
super(conf, pool, user, requestAttributes);
ConnectionRegistry registry, Map<String, byte[]> requestAttributes) throws IOException {
super(conf, pool, user, registry, requestAttributes);
// Mock up my stub so open scanner returns a scanner id and then on next, we throw
// exceptions for three times and then after that, we return no more to scan.
this.stub = Mockito.mock(ClientService.BlockingInterface.class);
Expand Down Expand Up @@ -361,8 +361,8 @@ static class RpcTimeoutConnection extends ConnectionImplementation {
final ClientService.BlockingInterface stub;

RpcTimeoutConnection(Configuration conf, ExecutorService pool, User user,
Map<String, byte[]> requestAttributes) throws IOException {
super(conf, pool, user, requestAttributes);
ConnectionRegistry registry, Map<String, byte[]> requestAttributes) throws IOException {
super(conf, pool, user, registry, requestAttributes);
// Mock up my stub so an exists call -- which turns into a get -- throws an exception
this.stub = Mockito.mock(ClientService.BlockingInterface.class);
try {
Expand Down Expand Up @@ -405,8 +405,8 @@ static class ManyServersManyRegionsConnection extends ConnectionImplementation {
private final Configuration conf;

ManyServersManyRegionsConnection(Configuration conf, ExecutorService pool, User user,
Map<String, byte[]> requestAttributes) throws IOException {
super(conf, pool, user, requestAttributes);
ConnectionRegistry registry, Map<String, byte[]> requestAttributes) throws IOException {
super(conf, pool, user, registry, requestAttributes);
int serverCount = conf.getInt("hbase.test.servers", 10);
this.serversByClient = new HashMap<>(serverCount);
this.meta =
Expand Down
Loading

0 comments on commit 0f6a7a0

Please sign in to comment.