Skip to content

Commit

Permalink
HBASE-28417 TestBlockingIPC.testBadPreambleHeader sometimes fails wit…
Browse files Browse the repository at this point in the history
…h broken pipe instead of bad auth (#5740)

Also change the IPC related tests to test different combinations of rpc server&client, for example, NettyRpcClient and SimpleRpcServer

Signed-off-by: Nick Dimiduk <[email protected]>
Signed-off-by: Bryan Beaudreault <[email protected]>
(cherry picked from commit 2306820)
  • Loading branch information
Apache9 committed Mar 6, 2024
1 parent cb49c62 commit bf9a8af
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ public void operationComplete(Future<Boolean> future) throws Exception {
});
}

private void getConnectionRegistry(Channel ch, Call connectionRegistryCall) throws IOException {
private void getConnectionRegistry(Channel ch, Call connectionRegistryCall) {
assert eventLoop.inEventLoop();
PreambleCallHandler.setup(ch.pipeline(), rpcClient.readTO, this,
RpcClient.REGISTRY_PREAMBLE_HEADER, connectionRegistryCall);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -72,6 +73,7 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
Expand All @@ -80,8 +82,10 @@
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
import org.hamcrest.Matcher;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runners.Parameterized.Parameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -114,14 +118,12 @@ public abstract class AbstractTestIPC {
private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);

protected static final Configuration CONF = HBaseConfiguration.create();
static {
// Set the default to be the old SimpleRpcServer. Subclasses test it and netty.
CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, SimpleRpcServer.class.getName());
}

protected abstract RpcServer createRpcServer(Server server, String name,
protected RpcServer createRpcServer(Server server, String name,
List<BlockingServiceAndInterface> services, InetSocketAddress bindAddress, Configuration conf,
RpcScheduler scheduler) throws IOException;
RpcScheduler scheduler) throws IOException {
return RpcServerFactory.createRpcServer(server, name, services, bindAddress, conf, scheduler);
}

private RpcServer createRpcServer(String name, List<BlockingServiceAndInterface> services,
InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException {
Expand All @@ -133,6 +135,14 @@ private RpcServer createRpcServer(String name, List<BlockingServiceAndInterface>
@Rule
public OpenTelemetryRule traceRule = OpenTelemetryRule.create();

@Parameter(0)
public Class<? extends RpcServer> rpcServerImpl;

@Before
public void setUpBeforeTest() {
CONF.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl, RpcServer.class);
}

/**
* Ensure we do not HAVE TO HAVE a codec.
*/
Expand Down Expand Up @@ -348,9 +358,43 @@ public void testTimeout() throws IOException {
}
}

protected abstract RpcServer createTestFailingRpcServer(final String name,
private static class FailingSimpleRpcServer extends SimpleRpcServer {

FailingSimpleRpcServer(Server server, String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException {
super(server, name, services, bindAddress, conf, scheduler, true);
}

final class FailingConnection extends SimpleServerRpcConnection {
private FailingConnection(FailingSimpleRpcServer rpcServer, SocketChannel channel,
long lastContact) {
super(rpcServer, channel, lastContact);
}

@Override
public void processRequest(ByteBuff buf) throws IOException, InterruptedException {
// this will throw exception after the connection header is read, and an RPC is sent
// from client
throw new DoNotRetryIOException("Failing for test");
}
}

@Override
protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) {
return new FailingConnection(this, channel, time);
}
}

protected RpcServer createTestFailingRpcServer(final String name,
final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException;
Configuration conf, RpcScheduler scheduler) throws IOException {
if (rpcServerImpl.equals(NettyRpcServer.class)) {
return new FailingNettyRpcServer(null, name, services, bindAddress, conf, scheduler);
} else {
return new FailingSimpleRpcServer(null, name, services, bindAddress, conf, scheduler);
}
}

/** Tests that the connection closing is handled by the client with outstanding RPC calls */
@Test
Expand Down Expand Up @@ -570,19 +614,33 @@ public void testTracingErrorIpc() throws IOException {

protected abstract AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf);

private IOException doBadPreableHeaderCall(BlockingInterface stub) {
ServiceException se = assertThrows(ServiceException.class,
() -> stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build()));
return ProtobufUtil.handleRemoteException(se);
}

@Test
public void testBadPreambleHeader() throws IOException, ServiceException {
public void testBadPreambleHeader() throws Exception {
Configuration clientConf = new Configuration(CONF);
RpcServer rpcServer = createRpcServer("testRpcServer", Collections.emptyList(),
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
try (AbstractRpcClient<?> client = createBadAuthRpcClient(clientConf)) {
rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
ServiceException se = assertThrows(ServiceException.class,
() -> stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build()));
IOException ioe = ProtobufUtil.handleRemoteException(se);
assertThat(ioe, instanceOf(BadAuthException.class));
assertThat(ioe.getMessage(), containsString("authName=unknown"));
BadAuthException error = null;
// for SimpleRpcServer, it is possible that we get a broken pipe before getting the
// BadAuthException, so we add some retries here, see HBASE-28417
for (int i = 0; i < 10; i++) {
IOException ioe = doBadPreableHeaderCall(stub);
if (ioe instanceof BadAuthException) {
error = (BadAuthException) ioe;
break;
}
Thread.sleep(100);
}
assertNotNull("Can not get expected BadAuthException", error);
assertThat(error.getMessage(), containsString("authName=unknown"));
} finally {
rpcServer.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,31 @@
package org.apache.hadoop.hbase.ipc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

@RunWith(Parameterized.class)
@Category({ RPCTests.class, MediumTests.class })
public class TestBlockingIPC extends AbstractTestIPC {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBlockingIPC.class);

@Override
protected RpcServer createRpcServer(Server server, String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException {
return RpcServerFactory.createRpcServer(server, name, services, bindAddress, conf, scheduler);
@Parameters(name = "{index}: rpcServerImpl={0}")
public static List<Object[]> data() {
return Arrays.asList(new Object[] { SimpleRpcServer.class },
new Object[] { NettyRpcServer.class });
}

@Override
Expand Down Expand Up @@ -73,41 +72,6 @@ protected boolean isTcpNoDelay() {
};
}

private static class TestFailingRpcServer extends SimpleRpcServer {

TestFailingRpcServer(Server server, String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException {
super(server, name, services, bindAddress, conf, scheduler, true);
}

final class FailingConnection extends SimpleServerRpcConnection {
private FailingConnection(TestFailingRpcServer rpcServer, SocketChannel channel,
long lastContact) {
super(rpcServer, channel, lastContact);
}

@Override
public void processRequest(ByteBuff buf) throws IOException, InterruptedException {
// this will throw exception after the connection header is read, and an RPC is sent
// from client
throw new DoNotRetryIOException("Failing for test");
}
}

@Override
protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) {
return new FailingConnection(this, channel, time);
}
}

@Override
protected RpcServer createTestFailingRpcServer(String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException {
return new TestFailingRpcServer(null, name, services, bindAddress, conf, scheduler);
}

@Override
protected AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf) {
return new BlockingRpcClient(conf) {
Expand All @@ -124,7 +88,6 @@ protected byte[] getConnectionHeaderPreamble() {
}
};
}

};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@
package org.apache.hadoop.hbase.ipc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RPCTests;
Expand All @@ -51,18 +48,27 @@ public class TestNettyIPC extends AbstractTestIPC {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestNettyIPC.class);

@Parameters(name = "{index}: EventLoop={0}")
public static Collection<Object[]> parameters() {
List<Object[]> params = new ArrayList<>();
params.add(new Object[] { "nio" });
params.add(new Object[] { "perClientNio" });
private static List<String> getEventLoopTypes() {
List<String> types = new ArrayList<>();
types.add("nio");
types.add("perClientNio");
if (JVM.isLinux() && JVM.isAmd64()) {
params.add(new Object[] { "epoll" });
types.add("epoll");
}
return types;
}

@Parameters(name = "{index}: rpcServerImpl={0}, EventLoop={1}")
public static List<Object[]> parameters() {
List<Object[]> params = new ArrayList<>();
for (String eventLoopType : getEventLoopTypes()) {
params.add(new Object[] { SimpleRpcServer.class, eventLoopType });
params.add(new Object[] { NettyRpcServer.class, eventLoopType });
}
return params;
}

@Parameter
@Parameter(1)
public String eventLoopType;

private static NioEventLoopGroup NIO;
Expand Down Expand Up @@ -103,13 +109,6 @@ private void setConf(Configuration conf) {
}
}

@Override
protected RpcServer createRpcServer(Server server, String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException {
return new NettyRpcServer(server, name, services, bindAddress, conf, scheduler, true);
}

@Override
protected NettyRpcClient createRpcClientNoCodec(Configuration conf) {
setConf(conf);
Expand Down Expand Up @@ -141,13 +140,6 @@ protected boolean isTcpNoDelay() {
};
}

@Override
protected RpcServer createTestFailingRpcServer(String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException {
return new FailingNettyRpcServer(null, name, services, bindAddress, conf, scheduler);
}

@Override
protected AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf) {
return new NettyRpcClient(conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,37 +67,41 @@ public class TestNettyTlsIPC extends AbstractTestIPC {

private static NettyEventLoopGroupConfig EVENT_LOOP_GROUP_CONFIG;

@Parameterized.Parameter(0)
@Parameterized.Parameter(1)
public X509KeyType caKeyType;

@Parameterized.Parameter(1)
@Parameterized.Parameter(2)
public X509KeyType certKeyType;

@Parameterized.Parameter(2)
@Parameterized.Parameter(3)
public char[] keyPassword;

@Parameterized.Parameter(3)
@Parameterized.Parameter(4)
public boolean acceptPlainText;

@Parameterized.Parameter(4)
@Parameterized.Parameter(5)
public boolean clientTlsEnabled;

private X509TestContext x509TestContext;

// only netty rpc server supports TLS, so here we will only test NettyRpcServer
@Parameterized.Parameters(
name = "{index}: caKeyType={0}, certKeyType={1}, keyPassword={2}, acceptPlainText={3},"
+ " clientTlsEnabled={4}")
name = "{index}: rpcServerImpl={0}, caKeyType={1}, certKeyType={2}, keyPassword={3},"
+ " acceptPlainText={4}, clientTlsEnabled={5}")
public static List<Object[]> data() {
List<Object[]> params = new ArrayList<>();
for (X509KeyType caKeyType : X509KeyType.values()) {
for (X509KeyType certKeyType : X509KeyType.values()) {
for (char[] keyPassword : new char[][] { "".toCharArray(), "pa$$w0rd".toCharArray() }) {
// do not accept plain text
params.add(new Object[] { caKeyType, certKeyType, keyPassword, false, true });
params.add(new Object[] { NettyRpcServer.class, caKeyType, certKeyType, keyPassword,
false, true });
// support plain text and client enables tls
params.add(new Object[] { caKeyType, certKeyType, keyPassword, true, true });
params.add(
new Object[] { NettyRpcServer.class, caKeyType, certKeyType, keyPassword, true, true });
// support plain text and client disables tls
params.add(new Object[] { caKeyType, certKeyType, keyPassword, true, false });
params.add(new Object[] { NettyRpcServer.class, caKeyType, certKeyType, keyPassword, true,
false });
}
}
}
Expand Down

0 comments on commit bf9a8af

Please sign in to comment.