Skip to content

Commit

Permalink
HBASE-28417 TestBlockingIPC.testBadPreambleHeader sometimes fails wit…
Browse files Browse the repository at this point in the history
…h broken pipe instead of bad auth (#5740)

Also change the IPC related tests to test different combinations of rpc server&client, for example, NettyRpcClient and SimpleRpcServer

Signed-off-by: Nick Dimiduk <[email protected]>
Signed-off-by: Bryan Beaudreault <[email protected]>
(cherry picked from commit 2306820)
  • Loading branch information
Apache9 committed Mar 6, 2024
1 parent 780ff56 commit bc0c12d
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
Expand All @@ -53,6 +52,7 @@
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -68,18 +68,22 @@
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.StringUtils;
import org.hamcrest.Matcher;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runners.Parameterized.Parameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;

import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
Expand All @@ -101,20 +105,26 @@ public abstract class AbstractTestIPC {
private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);

protected static final Configuration CONF = HBaseConfiguration.create();
static {
// Set the default to be the old SimpleRpcServer. Subclasses test it and netty.
CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, SimpleRpcServer.class.getName());
}

protected abstract RpcServer createRpcServer(final Server server, final String name,
final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException;
private RpcServer createRpcServer(Server server, String name,
List<BlockingServiceAndInterface> services, InetSocketAddress bindAddress, Configuration conf,
RpcScheduler scheduler) throws IOException {
return RpcServerFactory.createRpcServer(server, name, services, bindAddress, conf, scheduler);
}

protected abstract AbstractRpcClient<?> createRpcClientNoCodec(Configuration conf);

@Rule
public OpenTelemetryRule traceRule = OpenTelemetryRule.create();

@Parameter(0)
public Class<? extends RpcServer> rpcServerImpl;

@Before
public void setUpBeforeTest() {
CONF.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl, RpcServer.class);
}

/**
* Ensure we do not HAVE TO HAVE a codec.
*/
Expand Down Expand Up @@ -326,15 +336,81 @@ public void testTimeout() throws IOException {
}
}

protected abstract RpcServer createTestFailingRpcServer(final Server server, final String name,
private static class FailingSimpleRpcServer extends SimpleRpcServer {

FailingSimpleRpcServer(Server server, String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException {
super(server, name, services, bindAddress, conf, scheduler, true);
}

final class FailingConnection extends SimpleServerRpcConnection {
private FailingConnection(FailingSimpleRpcServer rpcServer, SocketChannel channel,
long lastContact) {
super(rpcServer, channel, lastContact);
}

@Override
public void processRequest(ByteBuff buf) throws IOException, InterruptedException {
// this will throw exception after the connection header is read, and an RPC is sent
// from client
throw new DoNotRetryIOException("Failing for test");
}
}

@Override
protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) {
return new FailingConnection(this, channel, time);
}
}

private static class FailingNettyRpcServer extends NettyRpcServer {

FailingNettyRpcServer(Server server, String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException {
super(server, name, services, bindAddress, conf, scheduler, true);
}

static final class FailingConnection extends NettyServerRpcConnection {
private FailingConnection(FailingNettyRpcServer rpcServer, Channel channel) {
super(rpcServer, channel);
}

@Override
public void processRequest(ByteBuff buf) throws IOException, InterruptedException {
// this will throw exception after the connection header is read, and an RPC is sent
// from client
throw new DoNotRetryIOException("Failing for test");
}
}

@Override
protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() {
return new NettyRpcServerPreambleHandler(FailingNettyRpcServer.this) {
@Override
protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) {
return new FailingConnection(FailingNettyRpcServer.this, channel);
}
};
}
}

private RpcServer createTestFailingRpcServer(final String name,
final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException;
Configuration conf, RpcScheduler scheduler) throws IOException {
if (rpcServerImpl.equals(NettyRpcServer.class)) {
return new FailingNettyRpcServer(null, name, services, bindAddress, conf, scheduler);
} else {
return new FailingSimpleRpcServer(null, name, services, bindAddress, conf, scheduler);
}
}

/** Tests that the connection closing is handled by the client with outstanding RPC calls */
@Test
public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException {
Configuration conf = new Configuration(CONF);
RpcServer rpcServer = createTestFailingRpcServer(null, "testRpcServer",
RpcServer rpcServer = createTestFailingRpcServer("testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));

Expand Down Expand Up @@ -543,19 +619,33 @@ public void testTracingErrorIpc() throws IOException {

protected abstract AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf);

private IOException doBadPreableHeaderCall(BlockingInterface stub) {
ServiceException se = assertThrows(ServiceException.class,
() -> stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build()));
return ProtobufUtil.handleRemoteException(se);
}

@Test
public void testBadPreambleHeader() throws IOException, ServiceException {
public void testBadPreambleHeader() throws Exception {
Configuration clientConf = new Configuration(CONF);
RpcServer rpcServer = createRpcServer(null, "testRpcServer", Collections.emptyList(),
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
try (AbstractRpcClient<?> client = createBadAuthRpcClient(clientConf)) {
rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
ServiceException se = assertThrows(ServiceException.class,
() -> stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build()));
IOException ioe = ProtobufUtil.handleRemoteException(se);
assertThat(ioe, instanceOf(BadAuthException.class));
assertThat(ioe.getMessage(), containsString("authName=unknown"));
BadAuthException error = null;
// for SimpleRpcServer, it is possible that we get a broken pipe before getting the
// BadAuthException, so we add some retries here, see HBASE-28417
for (int i = 0; i < 10; i++) {
IOException ioe = doBadPreableHeaderCall(stub);
if (ioe instanceof BadAuthException) {
error = (BadAuthException) ioe;
break;
}
Thread.sleep(100);
}
assertNotNull("Can not get expected BadAuthException", error);
assertThat(error.getMessage(), containsString("authName=unknown"));
} finally {
rpcServer.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,31 @@
package org.apache.hadoop.hbase.ipc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

@RunWith(Parameterized.class)
@Category({ RPCTests.class, MediumTests.class })
public class TestBlockingIPC extends AbstractTestIPC {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBlockingIPC.class);

@Override
protected RpcServer createRpcServer(Server server, String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException {
return RpcServerFactory.createRpcServer(server, name, services, bindAddress, conf, scheduler);
@Parameters(name = "{index}: rpcServerImpl={0}")
public static List<Object[]> data() {
return Arrays.asList(new Object[] { SimpleRpcServer.class },
new Object[] { NettyRpcServer.class });
}

@Override
Expand Down Expand Up @@ -73,41 +72,6 @@ boolean isTcpNoDelay() {
};
}

private static class TestFailingRpcServer extends SimpleRpcServer {

TestFailingRpcServer(Server server, String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException {
super(server, name, services, bindAddress, conf, scheduler, true);
}

final class FailingConnection extends SimpleServerRpcConnection {
private FailingConnection(TestFailingRpcServer rpcServer, SocketChannel channel,
long lastContact) {
super(rpcServer, channel, lastContact);
}

@Override
public void processRequest(ByteBuff buf) throws IOException, InterruptedException {
// this will throw exception after the connection header is read, and an RPC is sent
// from client
throw new DoNotRetryIOException("Failing for test");
}
}

@Override
protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) {
return new FailingConnection(this, channel, time);
}
}

@Override
protected RpcServer createTestFailingRpcServer(Server server, String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException {
return new TestFailingRpcServer(server, name, services, bindAddress, conf, scheduler);
}

@Override
protected AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf) {
return new BlockingRpcClient(conf) {
Expand All @@ -124,7 +88,6 @@ protected byte[] getConnectionHeaderPreamble() {
}
};
}

};
}
}
Loading

0 comments on commit bc0c12d

Please sign in to comment.