Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28417 TestBlockingIPC.testBadPreambleHeader sometimes fails with… #5740

Merged
merged 1 commit into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ public void operationComplete(Future<Boolean> future) throws Exception {
});
}

private void getConnectionRegistry(Channel ch, Call connectionRegistryCall) throws IOException {
private void getConnectionRegistry(Channel ch, Call connectionRegistryCall) {
assert eventLoop.inEventLoop();
PreambleCallHandler.setup(ch.pipeline(), rpcClient.readTO, this,
RpcClient.REGISTRY_PREAMBLE_HEADER, connectionRegistryCall);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -73,15 +74,18 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
import org.hamcrest.Matcher;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runners.Parameterized.Parameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -114,14 +118,12 @@ public abstract class AbstractTestIPC {
private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);

protected static final Configuration CONF = HBaseConfiguration.create();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you get rid of this static CONF instance entirely. All of its accesses in this class are using it with a copy constructor, but TestNettyTlsIPC is using/modifying it directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can not get rid of it, we need to change some configurations in setUp and tearDown. And in TestNettyTlsIPC, we need to change the configurations in setUpBeforeClass, so it has to be static...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The copying in the tests method is because we need to customize some client configurations, so we want to affect other test methods.

static {
// Set the default to be the old SimpleRpcServer. Subclasses test it and netty.
CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, SimpleRpcServer.class.getName());
}

protected abstract RpcServer createRpcServer(Server server, String name,
protected RpcServer createRpcServer(Server server, String name,
List<BlockingServiceAndInterface> services, InetSocketAddress bindAddress, Configuration conf,
RpcScheduler scheduler) throws IOException;
RpcScheduler scheduler) throws IOException {
return RpcServerFactory.createRpcServer(server, name, services, bindAddress, conf, scheduler);
}

private RpcServer createRpcServer(String name, List<BlockingServiceAndInterface> services,
InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException {
Expand All @@ -133,6 +135,14 @@ private RpcServer createRpcServer(String name, List<BlockingServiceAndInterface>
@Rule
public OpenTelemetryRule traceRule = OpenTelemetryRule.create();

@Parameter(0)
public Class<? extends RpcServer> rpcServerImpl;

@Before
public void setUpBeforeTest() {
CONF.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl, RpcServer.class);
}

/**
* Ensure we do not HAVE TO HAVE a codec.
*/
Expand Down Expand Up @@ -348,9 +358,43 @@ public void testTimeout() throws IOException {
}
}

protected abstract RpcServer createTestFailingRpcServer(final String name,
private static class FailingSimpleRpcServer extends SimpleRpcServer {

FailingSimpleRpcServer(Server server, String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException {
super(server, name, services, bindAddress, conf, scheduler, true);
}

final class FailingConnection extends SimpleServerRpcConnection {
private FailingConnection(FailingSimpleRpcServer rpcServer, SocketChannel channel,
long lastContact) {
super(rpcServer, channel, lastContact);
}

@Override
public void processRequest(ByteBuff buf) throws IOException, InterruptedException {
// this will throw exception after the connection header is read, and an RPC is sent
// from client
throw new DoNotRetryIOException("Failing for test");
}
}

@Override
protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) {
return new FailingConnection(this, channel, time);
}
}

protected RpcServer createTestFailingRpcServer(final String name,
final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException;
Configuration conf, RpcScheduler scheduler) throws IOException {
if (rpcServerImpl.equals(NettyRpcServer.class)) {
return new FailingNettyRpcServer(null, name, services, bindAddress, conf, scheduler);
} else {
return new FailingSimpleRpcServer(null, name, services, bindAddress, conf, scheduler);
}
}

/** Tests that the connection closing is handled by the client with outstanding RPC calls */
@Test
Expand Down Expand Up @@ -570,19 +614,33 @@ public void testTracingErrorIpc() throws IOException {

protected abstract AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf);

private IOException doBadPreableHeaderCall(BlockingInterface stub) {
ServiceException se = assertThrows(ServiceException.class,
() -> stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build()));
return ProtobufUtil.handleRemoteException(se);
}

@Test
public void testBadPreambleHeader() throws IOException, ServiceException {
public void testBadPreambleHeader() throws Exception {
Configuration clientConf = new Configuration(CONF);
RpcServer rpcServer = createRpcServer("testRpcServer", Collections.emptyList(),
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
try (AbstractRpcClient<?> client = createBadAuthRpcClient(clientConf)) {
rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
ServiceException se = assertThrows(ServiceException.class,
() -> stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build()));
IOException ioe = ProtobufUtil.handleRemoteException(se);
assertThat(ioe, instanceOf(BadAuthException.class));
assertThat(ioe.getMessage(), containsString("authName=unknown"));
BadAuthException error = null;
// for SimpleRpcServer, it is possible that we get a broken pipe before getting the
// BadAuthException, so we add some retries here, see HBASE-28417
for (int i = 0; i < 10; i++) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha, same request as I received from @NihalJain -- can this use Waiter instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the things are a bit different. It is not something that after some time, things will go right. We just need to try several times, and we should get at least one time, things go as expected...

IOException ioe = doBadPreableHeaderCall(stub);
if (ioe instanceof BadAuthException) {
error = (BadAuthException) ioe;
break;
}
Thread.sleep(100);
}
assertNotNull("Can not get expected BadAuthException", error);
assertThat(error.getMessage(), containsString("authName=unknown"));
} finally {
rpcServer.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,31 @@
package org.apache.hadoop.hbase.ipc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

@RunWith(Parameterized.class)
@Category({ RPCTests.class, MediumTests.class })
public class TestBlockingIPC extends AbstractTestIPC {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBlockingIPC.class);

@Override
protected RpcServer createRpcServer(Server server, String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException {
return RpcServerFactory.createRpcServer(server, name, services, bindAddress, conf, scheduler);
@Parameters(name = "{index}: rpcServerImpl={0}")
public static List<Object[]> data() {
return Arrays.asList(new Object[] { SimpleRpcServer.class },
new Object[] { NettyRpcServer.class });
}

@Override
Expand Down Expand Up @@ -73,41 +72,6 @@ protected boolean isTcpNoDelay() {
};
}

private static class TestFailingRpcServer extends SimpleRpcServer {

TestFailingRpcServer(Server server, String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException {
super(server, name, services, bindAddress, conf, scheduler, true);
}

final class FailingConnection extends SimpleServerRpcConnection {
private FailingConnection(TestFailingRpcServer rpcServer, SocketChannel channel,
long lastContact) {
super(rpcServer, channel, lastContact);
}

@Override
public void processRequest(ByteBuff buf) throws IOException, InterruptedException {
// this will throw exception after the connection header is read, and an RPC is sent
// from client
throw new DoNotRetryIOException("Failing for test");
}
}

@Override
protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) {
return new FailingConnection(this, channel, time);
}
}

@Override
protected RpcServer createTestFailingRpcServer(String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException {
return new TestFailingRpcServer(null, name, services, bindAddress, conf, scheduler);
}

@Override
protected AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf) {
return new BlockingRpcClient(conf) {
Expand All @@ -124,7 +88,6 @@ protected byte[] getConnectionHeaderPreamble() {
}
};
}

};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@
package org.apache.hadoop.hbase.ipc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RPCTests;
Expand All @@ -51,18 +48,27 @@ public class TestNettyIPC extends AbstractTestIPC {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestNettyIPC.class);

@Parameters(name = "{index}: EventLoop={0}")
public static Collection<Object[]> parameters() {
List<Object[]> params = new ArrayList<>();
params.add(new Object[] { "nio" });
params.add(new Object[] { "perClientNio" });
private static List<String> getEventLoopTypes() {
List<String> types = new ArrayList<>();
types.add("nio");
types.add("perClientNio");
if (JVM.isLinux() && JVM.isAmd64()) {
params.add(new Object[] { "epoll" });
types.add("epoll");
}
return types;
}

@Parameters(name = "{index}: rpcServerImpl={0}, EventLoop={1}")
public static List<Object[]> parameters() {
List<Object[]> params = new ArrayList<>();
for (String eventLoopType : getEventLoopTypes()) {
params.add(new Object[] { SimpleRpcServer.class, eventLoopType });
params.add(new Object[] { NettyRpcServer.class, eventLoopType });
}
return params;
}

@Parameter
@Parameter(1)
public String eventLoopType;

private static NioEventLoopGroup NIO;
Expand Down Expand Up @@ -103,13 +109,6 @@ private void setConf(Configuration conf) {
}
}

@Override
protected RpcServer createRpcServer(Server server, String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException {
return new NettyRpcServer(server, name, services, bindAddress, conf, scheduler, true);
}

@Override
protected NettyRpcClient createRpcClientNoCodec(Configuration conf) {
setConf(conf);
Expand Down Expand Up @@ -141,13 +140,6 @@ protected boolean isTcpNoDelay() {
};
}

@Override
protected RpcServer createTestFailingRpcServer(String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException {
return new FailingNettyRpcServer(null, name, services, bindAddress, conf, scheduler);
}

@Override
protected AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf) {
return new NettyRpcClient(conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,37 +67,41 @@ public class TestNettyTlsIPC extends AbstractTestIPC {

private static NettyEventLoopGroupConfig EVENT_LOOP_GROUP_CONFIG;

@Parameterized.Parameter(0)
@Parameterized.Parameter(1)
public X509KeyType caKeyType;

@Parameterized.Parameter(1)
@Parameterized.Parameter(2)
public X509KeyType certKeyType;

@Parameterized.Parameter(2)
@Parameterized.Parameter(3)
public char[] keyPassword;

@Parameterized.Parameter(3)
@Parameterized.Parameter(4)
public boolean acceptPlainText;

@Parameterized.Parameter(4)
@Parameterized.Parameter(5)
public boolean clientTlsEnabled;

private X509TestContext x509TestContext;

// only netty rpc server supports TLS, so here we will only test NettyRpcServer
@Parameterized.Parameters(
name = "{index}: caKeyType={0}, certKeyType={1}, keyPassword={2}, acceptPlainText={3},"
+ " clientTlsEnabled={4}")
name = "{index}: rpcServerImpl={0}, caKeyType={1}, certKeyType={2}, keyPassword={3},"
+ " acceptPlainText={4}, clientTlsEnabled={5}")
public static List<Object[]> data() {
List<Object[]> params = new ArrayList<>();
for (X509KeyType caKeyType : X509KeyType.values()) {
for (X509KeyType certKeyType : X509KeyType.values()) {
for (char[] keyPassword : new char[][] { "".toCharArray(), "pa$$w0rd".toCharArray() }) {
// do not accept plain text
params.add(new Object[] { caKeyType, certKeyType, keyPassword, false, true });
params.add(new Object[] { NettyRpcServer.class, caKeyType, certKeyType, keyPassword,
false, true });
// support plain text and client enables tls
params.add(new Object[] { caKeyType, certKeyType, keyPassword, true, true });
params.add(
new Object[] { NettyRpcServer.class, caKeyType, certKeyType, keyPassword, true, true });
// support plain text and client disables tls
params.add(new Object[] { caKeyType, certKeyType, keyPassword, true, false });
params.add(new Object[] { NettyRpcServer.class, caKeyType, certKeyType, keyPassword, true,
false });
}
}
}
Expand Down