diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java index 63adf7b90212..b7856c089133 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java @@ -291,7 +291,7 @@ public void operationComplete(Future future) throws Exception { }); } - private void getConnectionRegistry(Channel ch, Call connectionRegistryCall) throws IOException { + private void getConnectionRegistry(Channel ch, Call connectionRegistryCall) { assert eventLoop.inEventLoop(); PreambleCallHandler.setup(ch.pipeline(), rpcClient.readTO, this, RpcClient.REGISTRY_PREAMBLE_HEADER, connectionRegistryCall); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index 0ba7097fd535..e9d0e8de30b3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -56,6 +56,7 @@ import io.opentelemetry.sdk.trace.data.SpanData; import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.channels.SocketChannel; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; @@ -72,6 +73,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; @@ -80,8 +82,10 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.StringUtils; import org.hamcrest.Matcher; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.runners.Parameterized.Parameter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,14 +118,12 @@ public abstract class AbstractTestIPC { private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); protected static final Configuration CONF = HBaseConfiguration.create(); - static { - // Set the default to be the old SimpleRpcServer. Subclasses test it and netty. - CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, SimpleRpcServer.class.getName()); - } - protected abstract RpcServer createRpcServer(Server server, String name, + protected RpcServer createRpcServer(Server server, String name, List services, InetSocketAddress bindAddress, Configuration conf, - RpcScheduler scheduler) throws IOException; + RpcScheduler scheduler) throws IOException { + return RpcServerFactory.createRpcServer(server, name, services, bindAddress, conf, scheduler); + } private RpcServer createRpcServer(String name, List services, InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException { @@ -133,6 +135,14 @@ private RpcServer createRpcServer(String name, List @Rule public OpenTelemetryRule traceRule = OpenTelemetryRule.create(); + @Parameter(0) + public Class rpcServerImpl; + + @Before + public void setUpBeforeTest() { + CONF.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl, RpcServer.class); + } + /** * Ensure we do not HAVE TO HAVE a codec. */ @@ -348,9 +358,43 @@ public void testTimeout() throws IOException { } } - protected abstract RpcServer createTestFailingRpcServer(final String name, + private static class FailingSimpleRpcServer extends SimpleRpcServer { + + FailingSimpleRpcServer(Server server, String name, + List services, InetSocketAddress bindAddress, + Configuration conf, RpcScheduler scheduler) throws IOException { + super(server, name, services, bindAddress, conf, scheduler, true); + } + + final class FailingConnection extends SimpleServerRpcConnection { + private FailingConnection(FailingSimpleRpcServer rpcServer, SocketChannel channel, + long lastContact) { + super(rpcServer, channel, lastContact); + } + + @Override + public void processRequest(ByteBuff buf) throws IOException, InterruptedException { + // this will throw exception after the connection header is read, and an RPC is sent + // from client + throw new DoNotRetryIOException("Failing for test"); + } + } + + @Override + protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) { + return new FailingConnection(this, channel, time); + } + } + + protected RpcServer createTestFailingRpcServer(final String name, final List services, final InetSocketAddress bindAddress, - Configuration conf, RpcScheduler scheduler) throws IOException; + Configuration conf, RpcScheduler scheduler) throws IOException { + if (rpcServerImpl.equals(NettyRpcServer.class)) { + return new FailingNettyRpcServer(null, name, services, bindAddress, conf, scheduler); + } else { + return new FailingSimpleRpcServer(null, name, services, bindAddress, conf, scheduler); + } + } /** Tests that the connection closing is handled by the client with outstanding RPC calls */ @Test @@ -570,19 +614,33 @@ public void testTracingErrorIpc() throws IOException { protected abstract AbstractRpcClient createBadAuthRpcClient(Configuration conf); + private IOException doBadPreableHeaderCall(BlockingInterface stub) { + ServiceException se = assertThrows(ServiceException.class, + () -> stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build())); + return ProtobufUtil.handleRemoteException(se); + } + @Test - public void testBadPreambleHeader() throws IOException, ServiceException { + public void testBadPreambleHeader() throws Exception { Configuration clientConf = new Configuration(CONF); RpcServer rpcServer = createRpcServer("testRpcServer", Collections.emptyList(), new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); try (AbstractRpcClient client = createBadAuthRpcClient(clientConf)) { rpcServer.start(); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); - ServiceException se = assertThrows(ServiceException.class, - () -> stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build())); - IOException ioe = ProtobufUtil.handleRemoteException(se); - assertThat(ioe, instanceOf(BadAuthException.class)); - assertThat(ioe.getMessage(), containsString("authName=unknown")); + BadAuthException error = null; + // for SimpleRpcServer, it is possible that we get a broken pipe before getting the + // BadAuthException, so we add some retries here, see HBASE-28417 + for (int i = 0; i < 10; i++) { + IOException ioe = doBadPreableHeaderCall(stub); + if (ioe instanceof BadAuthException) { + error = (BadAuthException) ioe; + break; + } + Thread.sleep(100); + } + assertNotNull("Can not get expected BadAuthException", error); + assertThat(error.getMessage(), containsString("authName=unknown")); } finally { rpcServer.stop(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java index e60cc879fd4f..24177f28c40c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java @@ -18,20 +18,20 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.channels.SocketChannel; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.codec.Codec; -import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RPCTests; import org.junit.ClassRule; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +@RunWith(Parameterized.class) @Category({ RPCTests.class, MediumTests.class }) public class TestBlockingIPC extends AbstractTestIPC { @@ -39,11 +39,10 @@ public class TestBlockingIPC extends AbstractTestIPC { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestBlockingIPC.class); - @Override - protected RpcServer createRpcServer(Server server, String name, - List services, InetSocketAddress bindAddress, - Configuration conf, RpcScheduler scheduler) throws IOException { - return RpcServerFactory.createRpcServer(server, name, services, bindAddress, conf, scheduler); + @Parameters(name = "{index}: rpcServerImpl={0}") + public static List data() { + return Arrays.asList(new Object[] { SimpleRpcServer.class }, + new Object[] { NettyRpcServer.class }); } @Override @@ -73,41 +72,6 @@ protected boolean isTcpNoDelay() { }; } - private static class TestFailingRpcServer extends SimpleRpcServer { - - TestFailingRpcServer(Server server, String name, - List services, InetSocketAddress bindAddress, - Configuration conf, RpcScheduler scheduler) throws IOException { - super(server, name, services, bindAddress, conf, scheduler, true); - } - - final class FailingConnection extends SimpleServerRpcConnection { - private FailingConnection(TestFailingRpcServer rpcServer, SocketChannel channel, - long lastContact) { - super(rpcServer, channel, lastContact); - } - - @Override - public void processRequest(ByteBuff buf) throws IOException, InterruptedException { - // this will throw exception after the connection header is read, and an RPC is sent - // from client - throw new DoNotRetryIOException("Failing for test"); - } - } - - @Override - protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) { - return new FailingConnection(this, channel, time); - } - } - - @Override - protected RpcServer createTestFailingRpcServer(String name, - List services, InetSocketAddress bindAddress, - Configuration conf, RpcScheduler scheduler) throws IOException { - return new TestFailingRpcServer(null, name, services, bindAddress, conf, scheduler); - } - @Override protected AbstractRpcClient createBadAuthRpcClient(Configuration conf) { return new BlockingRpcClient(conf) { @@ -124,7 +88,6 @@ protected byte[] getConnectionHeaderPreamble() { } }; } - }; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java index a1b60e2cfa45..f2366a20fd2a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java @@ -18,13 +18,10 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; -import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RPCTests; @@ -51,18 +48,27 @@ public class TestNettyIPC extends AbstractTestIPC { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestNettyIPC.class); - @Parameters(name = "{index}: EventLoop={0}") - public static Collection parameters() { - List params = new ArrayList<>(); - params.add(new Object[] { "nio" }); - params.add(new Object[] { "perClientNio" }); + private static List getEventLoopTypes() { + List types = new ArrayList<>(); + types.add("nio"); + types.add("perClientNio"); if (JVM.isLinux() && JVM.isAmd64()) { - params.add(new Object[] { "epoll" }); + types.add("epoll"); + } + return types; + } + + @Parameters(name = "{index}: rpcServerImpl={0}, EventLoop={1}") + public static List parameters() { + List params = new ArrayList<>(); + for (String eventLoopType : getEventLoopTypes()) { + params.add(new Object[] { SimpleRpcServer.class, eventLoopType }); + params.add(new Object[] { NettyRpcServer.class, eventLoopType }); } return params; } - @Parameter + @Parameter(1) public String eventLoopType; private static NioEventLoopGroup NIO; @@ -103,13 +109,6 @@ private void setConf(Configuration conf) { } } - @Override - protected RpcServer createRpcServer(Server server, String name, - List services, InetSocketAddress bindAddress, - Configuration conf, RpcScheduler scheduler) throws IOException { - return new NettyRpcServer(server, name, services, bindAddress, conf, scheduler, true); - } - @Override protected NettyRpcClient createRpcClientNoCodec(Configuration conf) { setConf(conf); @@ -141,13 +140,6 @@ protected boolean isTcpNoDelay() { }; } - @Override - protected RpcServer createTestFailingRpcServer(String name, - List services, InetSocketAddress bindAddress, - Configuration conf, RpcScheduler scheduler) throws IOException { - return new FailingNettyRpcServer(null, name, services, bindAddress, conf, scheduler); - } - @Override protected AbstractRpcClient createBadAuthRpcClient(Configuration conf) { return new NettyRpcClient(conf) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyTlsIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyTlsIPC.java index d54878ff1e8a..fac615456dab 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyTlsIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyTlsIPC.java @@ -67,37 +67,41 @@ public class TestNettyTlsIPC extends AbstractTestIPC { private static NettyEventLoopGroupConfig EVENT_LOOP_GROUP_CONFIG; - @Parameterized.Parameter(0) + @Parameterized.Parameter(1) public X509KeyType caKeyType; - @Parameterized.Parameter(1) + @Parameterized.Parameter(2) public X509KeyType certKeyType; - @Parameterized.Parameter(2) + @Parameterized.Parameter(3) public char[] keyPassword; - @Parameterized.Parameter(3) + @Parameterized.Parameter(4) public boolean acceptPlainText; - @Parameterized.Parameter(4) + @Parameterized.Parameter(5) public boolean clientTlsEnabled; private X509TestContext x509TestContext; + // only netty rpc server supports TLS, so here we will only test NettyRpcServer @Parameterized.Parameters( - name = "{index}: caKeyType={0}, certKeyType={1}, keyPassword={2}, acceptPlainText={3}," - + " clientTlsEnabled={4}") + name = "{index}: rpcServerImpl={0}, caKeyType={1}, certKeyType={2}, keyPassword={3}," + + " acceptPlainText={4}, clientTlsEnabled={5}") public static List data() { List params = new ArrayList<>(); for (X509KeyType caKeyType : X509KeyType.values()) { for (X509KeyType certKeyType : X509KeyType.values()) { for (char[] keyPassword : new char[][] { "".toCharArray(), "pa$$w0rd".toCharArray() }) { // do not accept plain text - params.add(new Object[] { caKeyType, certKeyType, keyPassword, false, true }); + params.add(new Object[] { NettyRpcServer.class, caKeyType, certKeyType, keyPassword, + false, true }); // support plain text and client enables tls - params.add(new Object[] { caKeyType, certKeyType, keyPassword, true, true }); + params.add( + new Object[] { NettyRpcServer.class, caKeyType, certKeyType, keyPassword, true, true }); // support plain text and client disables tls - params.add(new Object[] { caKeyType, certKeyType, keyPassword, true, false }); + params.add(new Object[] { NettyRpcServer.class, caKeyType, certKeyType, keyPassword, true, + false }); } } }