Skip to content

Commit

Permalink
HBASE-28377 Fallback to simple is broken for blocking rpc client (apa…
Browse files Browse the repository at this point in the history
…che#5690)

Signed-off-by: Bryan Beaudreault <[email protected]>
(cherry picked from commit 7bc07a6)
  • Loading branch information
Apache9 committed Feb 19, 2024
1 parent 02ea688 commit ea1c057
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ public Boolean run() throws IOException {
// fall back to simple auth because server told us so.
// do not change authMethod and useSasl here, we should start from secure when
// reconnecting because regionserver may change its sasl config after restart.
saslRpcClient = null;
}
}
createStreams(inStream, outStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import javax.security.sasl.SaslException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
import org.apache.hadoop.hbase.ipc.FallbackDisallowedException;
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.RemoteException;
Expand Down Expand Up @@ -107,12 +108,9 @@ public boolean saslConnect(InputStream inS, OutputStream outS) throws IOExceptio
int len = inStream.readInt();
if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) {
if (!fallbackAllowed) {
throw new IOException("Server asks us to fall back to SIMPLE auth, "
+ "but this client is configured to only allow secure connections.");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Server asks us to fall back to simple auth.");
throw new FallbackDisallowedException();
}
LOG.debug("Server asks us to fall back to simple auth.");
dispose();
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,22 @@
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.loginKerberosPrincipal;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.setSecuredConfiguration;
import static org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders.SELECTOR_KEY;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;

import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
Expand All @@ -44,12 +49,13 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
import org.apache.hadoop.hbase.ipc.FallbackDisallowedException;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.security.provider.AuthenticationProviderSelector;
import org.apache.hadoop.hbase.security.provider.BuiltInProviderSelector;
import org.apache.hadoop.hbase.security.provider.SaslAuthMethod;
Expand Down Expand Up @@ -95,6 +101,7 @@ protected static void initKDCAndConf() throws Exception {
// set a smaller timeout and retry to speed up tests
TEST_UTIL.getConfiguration().setInt(RpcClient.SOCKET_TIMEOUT_READ, 2000);
TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxretries", 1);
TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxbackoff", 100);
}

protected static void stopKDC() throws InterruptedException {
Expand Down Expand Up @@ -237,7 +244,7 @@ public String getTokenKind() {
}

@Test
public void testRpcFallbackToSimpleAuth() throws Exception {
public void testRpcServerFallbackToSimpleAuth() throws Exception {
String clientUsername = "testuser";
UserGroupInformation clientUgi =
UserGroupInformation.createUserForTesting(clientUsername, new String[] { clientUsername });
Expand All @@ -252,6 +259,59 @@ public void testRpcFallbackToSimpleAuth() throws Exception {
callRpcService(User.create(clientUgi));
}

@Test
public void testRpcServerDisallowFallbackToSimpleAuth() throws Exception {
String clientUsername = "testuser";
UserGroupInformation clientUgi =
UserGroupInformation.createUserForTesting(clientUsername, new String[] { clientUsername });

// check that the client user is insecure
assertNotSame(ugi, clientUgi);
assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod());
assertEquals(clientUsername, clientUgi.getUserName());

clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
serverConf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, false);
IOException error =
assertThrows(IOException.class, () -> callRpcService(User.create(clientUgi)));
// server just closes the connection, so we could get broken pipe, or EOF, or connection closed
if (error.getMessage() == null || !error.getMessage().contains("Broken pipe")) {
assertThat(error,
either(instanceOf(EOFException.class)).or(instanceOf(ConnectionClosedException.class)));
}
}

@Test
public void testRpcClientFallbackToSimpleAuth() throws Exception {
String serverUsername = "testuser";
UserGroupInformation serverUgi =
UserGroupInformation.createUserForTesting(serverUsername, new String[] { serverUsername });
// check that the server user is insecure
assertNotSame(ugi, serverUgi);
assertEquals(AuthenticationMethod.SIMPLE, serverUgi.getAuthenticationMethod());
assertEquals(serverUsername, serverUgi.getUserName());

serverConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
clientConf.setBoolean(RpcClient.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, true);
callRpcService(User.create(serverUgi), User.create(ugi));
}

@Test
public void testRpcClientDisallowFallbackToSimpleAuth() throws Exception {
String serverUsername = "testuser";
UserGroupInformation serverUgi =
UserGroupInformation.createUserForTesting(serverUsername, new String[] { serverUsername });
// check that the server user is insecure
assertNotSame(ugi, serverUgi);
assertEquals(AuthenticationMethod.SIMPLE, serverUgi.getAuthenticationMethod());
assertEquals(serverUsername, serverUgi.getUserName());

serverConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
clientConf.setBoolean(RpcClient.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, false);
assertThrows(FallbackDisallowedException.class,
() -> callRpcService(User.create(serverUgi), User.create(ugi)));
}

private void setRpcProtection(String clientProtection, String serverProtection) {
clientConf.set("hbase.rpc.protection", clientProtection);
serverConf.set("hbase.rpc.protection", serverProtection);
Expand All @@ -263,25 +323,25 @@ private void setRpcProtection(String clientProtection, String serverProtection)
@Test
public void testSaslWithCommonQop() throws Exception {
setRpcProtection("privacy,authentication", "authentication");
callRpcService(User.create(ugi));
callRpcService();

setRpcProtection("authentication", "privacy,authentication");
callRpcService(User.create(ugi));
callRpcService();

setRpcProtection("integrity,authentication", "privacy,authentication");
callRpcService(User.create(ugi));
callRpcService();

setRpcProtection("integrity,authentication", "integrity,authentication");
callRpcService(User.create(ugi));
callRpcService();

setRpcProtection("privacy,authentication", "privacy,authentication");
callRpcService(User.create(ugi));
callRpcService();
}

@Test
public void testSaslNoCommonQop() throws Exception {
setRpcProtection("integrity", "privacy");
SaslException se = assertThrows(SaslException.class, () -> callRpcService(User.create(ugi)));
SaslException se = assertThrows(SaslException.class, () -> callRpcService());
assertEquals("No common protection layer between client and server", se.getMessage());
}

Expand All @@ -292,7 +352,7 @@ public void testSaslNoCommonQop() throws Exception {
public void testSaslWithCryptoAES() throws Exception {
setRpcProtection("privacy", "privacy");
setCryptoAES("true", "true");
callRpcService(User.create(ugi));
callRpcService();
}

/**
Expand All @@ -303,11 +363,11 @@ public void testDifferentConfWithCryptoAES() throws Exception {
setRpcProtection("privacy", "privacy");

setCryptoAES("false", "true");
callRpcService(User.create(ugi));
callRpcService();

setCryptoAES("true", "false");
try {
callRpcService(User.create(ugi));
callRpcService();
fail("The exception should be thrown out for the rpc timeout.");
} catch (Exception e) {
// ignore the expected exception
Expand All @@ -323,18 +383,20 @@ private void setCryptoAES(String clientCryptoAES, String serverCryptoAES) {
* Sets up a RPC Server and a Client. Does a RPC checks the result. If an exception is thrown from
* the stub, this function will throw root cause of that exception.
*/
private void callRpcService(User clientUser) throws Exception {
private void callRpcService(User serverUser, User clientUser) throws Exception {
SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
Mockito.when(securityInfoMock.getServerPrincipal())
.thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);

InetSocketAddress isa = new InetSocketAddress(HOST, 0);

RpcServerInterface rpcServer = RpcServerFactory.createRpcServer(null, "AbstractTestSecureIPC",
Lists
.newArrayList(new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)),
isa, serverConf, new FifoRpcScheduler(serverConf, 1));
RpcServer rpcServer = serverUser.getUGI()
.doAs((PrivilegedExceptionAction<
RpcServer>) () -> RpcServerFactory.createRpcServer(null, "AbstractTestSecureIPC",
Lists.newArrayList(
new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)),
isa, serverConf, new FifoRpcScheduler(serverConf, 1)));
rpcServer.start();
try (RpcClient rpcClient =
RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())) {
Expand Down Expand Up @@ -364,6 +426,14 @@ public void uncaughtException(Thread th, Throwable ex) {
}
}

private void callRpcService(User clientUser) throws Exception {
callRpcService(User.create(ugi), clientUser);
}

private void callRpcService() throws Exception {
callRpcService(User.create(ugi));
}

public static class TestThread extends Thread {
private final BlockingInterface stub;

Expand Down

0 comments on commit ea1c057

Please sign in to comment.