Skip to content

Commit

Permalink
netty: Fix NPE in NettyClientTransport
Browse files Browse the repository at this point in the history
Fixes NPE when keepalive is enabled.

* Move creation of keepAliveManager to the bottom of start()
* Enable keepAlive in NettyClientTransportTest
* Add test cases checking if keepalive is enabled/disabled, specifically.

Fixes #2726 (Backports #2729)
  • Loading branch information
lukaszx0 authored and ejona86 committed Feb 17, 2017
1 parent 4b819e4 commit c8d21d5
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 12 deletions.
16 changes: 11 additions & 5 deletions netty/src/main/java/io/grpc/netty/NettyClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,6 @@ public Runnable start(Listener transportListener) {
lifecycleManager = new ClientTransportLifecycleManager(
Preconditions.checkNotNull(transportListener, "listener"));

if (enableKeepAlive) {
keepAliveManager = new KeepAliveManager(this, channel.eventLoop(), keepAliveDelayNanos,
keepAliveTimeoutNanos);
}

handler = newHandler();
HandlerSettings.setAutoWindow(handler);

Expand Down Expand Up @@ -234,6 +229,12 @@ public void operationComplete(ChannelFuture future) throws Exception {
Status.INTERNAL.withDescription("Connection closed with unknown cause"));
}
});

if (enableKeepAlive) {
keepAliveManager = new KeepAliveManager(this, channel.eventLoop(), keepAliveDelayNanos,
keepAliveTimeoutNanos);
}

return null;
}

Expand Down Expand Up @@ -276,6 +277,11 @@ Channel channel() {
return channel;
}

@VisibleForTesting
KeepAliveManager keepAliveManager() {
return keepAliveManager;
}

/**
* Convert ChannelFuture.cause() to a Status, taking into account that all handlers are removed
* from the pipeline when the channel is closed. Since handlers are removed, you may get an
Expand Down
42 changes: 35 additions & 7 deletions netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -181,7 +182,7 @@ address, NioSocketChannel.class, channelOptions, group, newNegotiator(),
public void overrideDefaultUserAgent() throws Exception {
startServer();
NettyClientTransport transport = newTransport(newNegotiator(),
DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent");
DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent", true);
transport.start(clientTransportListener);

new Rpc(transport, new Metadata()).halfClose().waitForResponse();
Expand All @@ -198,7 +199,7 @@ public void maxMessageSizeShouldBeEnforced() throws Throwable {
startServer();
// Allow the response payloads of up to 1 byte.
NettyClientTransport transport = newTransport(newNegotiator(),
1, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null);
1, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null, true);
transport.start(clientTransportListener);

try {
Expand Down Expand Up @@ -280,7 +281,7 @@ public void maxHeaderListSizeShouldBeEnforcedOnClient() throws Exception {
startServer();

NettyClientTransport transport =
newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, 1, null);
newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, 1, null, true);
transport.start(clientTransportListener);

try {
Expand Down Expand Up @@ -346,6 +347,30 @@ public void clientStreamGetsAttributes() throws Exception {
assertEquals(address, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
}

@Test
public void keepAliveEnabled() throws Exception {
startServer();
NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, true /* keep alive */);
transport.start(clientTransportListener);
Rpc rpc = new Rpc(transport).halfClose();
rpc.waitForResponse();

assertNotNull(transport.keepAliveManager());
}

@Test
public void keepAliveDisabled() throws Exception {
startServer();
NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, false /* keep alive */);
transport.start(clientTransportListener);
Rpc rpc = new Rpc(transport).halfClose();
rpc.waitForResponse();

assertNull(transport.keepAliveManager());
}

private Throwable getRootCause(Throwable t) {
if (t.getCause() == null) {
return t;
Expand All @@ -361,15 +386,18 @@ private ProtocolNegotiator newNegotiator() throws IOException {
}

private NettyClientTransport newTransport(ProtocolNegotiator negotiator) {
return newTransport(negotiator,
DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */);
return newTransport(negotiator, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
null /* user agent */, true /* keep alive */);
}

private NettyClientTransport newTransport(
ProtocolNegotiator negotiator, int maxMsgSize, int maxHeaderListSize, String userAgent) {
private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int maxMsgSize,
int maxHeaderListSize, String userAgent, boolean enableKeepAlive) {
NettyClientTransport transport = new NettyClientTransport(
address, NioSocketChannel.class, new HashMap<ChannelOption<?>, Object>(), group, negotiator,
DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize, authority, userAgent);
if (enableKeepAlive) {
transport.enableKeepAlive(true, 1000, 1000);
}
transports.add(transport);
return transport;
}
Expand Down

0 comments on commit c8d21d5

Please sign in to comment.