diff --git a/java-client/src/test/java/org/apache/pegasus/rpc/async/ReplicaSessionTest.java b/java-client/src/test/java/org/apache/pegasus/rpc/async/ReplicaSessionTest.java index 1afae69214..34be3e626c 100644 --- a/java-client/src/test/java/org/apache/pegasus/rpc/async/ReplicaSessionTest.java +++ b/java-client/src/test/java/org/apache/pegasus/rpc/async/ReplicaSessionTest.java @@ -24,7 +24,6 @@ import io.netty.channel.nio.NioEventLoopGroup; import java.util.ArrayList; import java.util.Objects; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; import java.util.concurrent.atomic.AtomicBoolean; @@ -53,12 +52,12 @@ import org.slf4j.Logger; public class ReplicaSessionTest { - private String metaList = "127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603"; private final Logger logger = org.slf4j.LoggerFactory.getLogger(ReplicaSessionTest.class); private ClusterManager manager; @BeforeEach public void before() throws Exception { + final String metaList = "127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603"; manager = new ClusterManager(ClientOptions.builder().metaServers(metaList).build()); } @@ -71,16 +70,16 @@ public void after() throws Exception { @Test public void testConnect() throws Exception { // test1: connect to an invalid address. - rpc_address addr = new rpc_address(); - addr.fromString("127.0.0.1:12345"); - ReplicaSession rs = manager.getReplicaSession(addr); + ReplicaSession rs = + manager.getReplicaSession( + Objects.requireNonNull(rpc_address.fromIpPort("127.0.0.1:12345"))); - ArrayList> callbacks = new ArrayList>(); + ArrayList> callbacks = new ArrayList<>(); for (int i = 0; i < 100; ++i) { final client_operator op = new rrdb_put_operator(new gpid(-1, -1), "", null, 0); final FutureTask cb = - new FutureTask( + new FutureTask<>( () -> { assertEquals(error_code.error_types.ERR_SESSION_RESET, op.rpc_error.errno); return null; @@ -98,30 +97,33 @@ public void testConnect() throws Exception { } } - final ReplicaSession cp_rs = rs; - Toollet.waitCondition(() -> ReplicaSession.ConnState.DISCONNECTED == cp_rs.getState(), 5); + ReplicaSession finalRs = rs; + assertTrue( + Toollet.waitCondition( + () -> ReplicaSession.ConnState.DISCONNECTED == finalRs.getState(), 5)); // test2: connect to a valid address, and then close the server. - addr.fromString("127.0.0.1:34801"); callbacks.clear(); + final rpc_address addr = rpc_address.fromIpPort("127.0.0.1:34801"); + assertNotNull(addr); + rs = manager.getReplicaSession(addr); rs.setMessageResponseFilter((err, header) -> true); for (int i = 0; i < 20; ++i) { // Send query request to replica server, expect it to be timeout. - final int index = i; - update_request req = + final update_request req = new update_request(new blob("hello".getBytes()), new blob("world".getBytes()), 0); - final client_operator op = new Toollet.test_operator(new gpid(-1, -1), req); - final rpc_address cp_addr = addr; + + final int index = i; final FutureTask cb = - new FutureTask( + new FutureTask<>( () -> { assertEquals(error_code.error_types.ERR_TIMEOUT, op.rpc_error.errno); - // for the last request, we kill the server + // Kill the server at the last request. if (index == 19) { - Toollet.closeServer(cp_addr); + Toollet.closeServer(addr); } return null; }); @@ -132,18 +134,20 @@ public void testConnect() throws Exception { for (int i = 0; i < 80; ++i) { // Re-send query request to replica server, but the timeout is longer. - update_request req = + final update_request req = new update_request(new blob("hello".getBytes()), new blob("world".getBytes()), 0); final client_operator op = new Toollet.test_operator(new gpid(-1, -1), req); + final FutureTask cb = - new FutureTask( + new FutureTask<>( () -> { assertEquals(error_code.error_types.ERR_SESSION_RESET, op.rpc_error.errno); return null; }); callbacks.add(cb); - // The request has longer timeout, so it should be responsed later than the server been + + // The request has longer timeout, so response should be received after the server was // killed. rs.asyncSend(op, cb, 2000, false); } @@ -177,26 +181,23 @@ public void recv_data(TProtocol iprot) throws TException { } } - rpc_address addr = new rpc_address(); - addr.fromString("127.0.0.1:34801"); - ReplicaSession rs = manager.getReplicaSession(addr); + final ReplicaSession rs = + manager.getReplicaSession( + Objects.requireNonNull(rpc_address.fromIpPort("127.0.0.1:34801"))); for (int pid = 0; pid < 16; pid++) { - // find a valid partition held on 127.0.0.1:34801 + // Find a valid partition held on 127.0.0.1:34801. blob req = new blob(PegasusClient.generateKey("a".getBytes(), "".getBytes())); final client_operator op = new test_operator(new gpid(1, pid), req); - FutureTask cb = - new FutureTask( - new Callable() { - @Override - public Void call() throws Exception { - if (op.rpc_error.errno != error_code.error_types.ERR_OBJECT_NOT_FOUND - && op.rpc_error.errno != error_code.error_types.ERR_INVALID_STATE - && op.rpc_error.errno != error_code.error_types.ERR_SESSION_RESET) { - assertEquals(error_code.error_types.ERR_INVALID_DATA, op.rpc_error.errno); - } - return null; + final FutureTask cb = + new FutureTask<>( + () -> { + if (op.rpc_error.errno != error_code.error_types.ERR_OBJECT_NOT_FOUND + && op.rpc_error.errno != error_code.error_types.ERR_INVALID_STATE + && op.rpc_error.errno != error_code.error_types.ERR_SESSION_RESET) { + assertEquals(error_code.error_types.ERR_INVALID_DATA, op.rpc_error.errno); } + return null; }); rs.asyncSend(op, cb, 2000, false); Tools.waitUninterruptable(cb, Integer.MAX_VALUE); @@ -205,11 +206,11 @@ public Void call() throws Exception { @Test public void testTryNotifyWithSequenceID() throws Exception { - rpc_address addr = new rpc_address(); - addr.fromString("127.0.0.1:34801"); - ReplicaSession rs = manager.getReplicaSession(addr); + final ReplicaSession rs = + manager.getReplicaSession( + Objects.requireNonNull(rpc_address.fromIpPort("127.0.0.1:34801"))); - // no pending RequestEntry, ensure no NPE thrown + // There's no pending RequestEntry, ensure no NPE thrown. assertTrue(rs.pendingResponse.isEmpty()); try { rs.tryNotifyFailureWithSeqID(100, error_code.error_types.ERR_TIMEOUT, false); @@ -229,66 +230,72 @@ public void testTryNotifyWithSequenceID() throws Exception { rs.tryNotifyFailureWithSeqID(100, error_code.error_types.ERR_TIMEOUT, false); assertTrue(passed.get()); - // simulate the entry has been removed, ensure no NPE thrown + // Simulate the entry has been removed, ensure no NPE thrown. rs.getAndRemoveEntry(entry.sequenceId); rs.tryNotifyFailureWithSeqID(entry.sequenceId, entry.op.rpc_error.errno, true); - // ensure mark session state to disconnect when TryNotifyWithSequenceID incur any exception - ReplicaSession mockRs = Mockito.spy(rs); + // Ensure mark session state to disconnect when TryNotifyWithSequenceID incur any exception. + final ReplicaSession mockRs = Mockito.spy(rs); mockRs.pendingSend.offer(entry); mockRs.fields.state = ReplicaSession.ConnState.CONNECTED; Mockito.doThrow(new Exception()) .when(mockRs) .tryNotifyFailureWithSeqID(entry.sequenceId, entry.op.rpc_error.errno, false); mockRs.markSessionDisconnect(); - assertEquals(mockRs.getState(), ReplicaSession.ConnState.DISCONNECTED); + assertEquals(ReplicaSession.ConnState.DISCONNECTED, mockRs.getState()); } @Test public void testCloseSession() throws InterruptedException { - rpc_address addr = new rpc_address(); - addr.fromString("127.0.0.1:34801"); - ReplicaSession rs = manager.getReplicaSession(addr); + final ReplicaSession rs = + manager.getReplicaSession( + Objects.requireNonNull(rpc_address.fromIpPort("127.0.0.1:34801"))); rs.tryConnect().awaitUninterruptibly(); Thread.sleep(200); - assertEquals(rs.getState(), ReplicaSession.ConnState.CONNECTED); + assertEquals(ReplicaSession.ConnState.CONNECTED, rs.getState()); rs.closeSession(); Thread.sleep(100); - assertEquals(rs.getState(), ReplicaSession.ConnState.DISCONNECTED); + assertEquals(ReplicaSession.ConnState.DISCONNECTED, rs.getState()); rs.fields.state = ReplicaSession.ConnState.CONNECTING; rs.closeSession(); Thread.sleep(100); - assertEquals(rs.getState(), ReplicaSession.ConnState.DISCONNECTED); + assertEquals(ReplicaSession.ConnState.DISCONNECTED, rs.getState()); } @Test public void testSessionConnectTimeout() throws InterruptedException { - rpc_address addr = new rpc_address(); - addr.fromString( - "www.baidu.com:34801"); // this website normally ignores incorrect request without replying + final long start = System.currentTimeMillis(); - long start = System.currentTimeMillis(); - EventLoopGroup rpcGroup = new NioEventLoopGroup(4); - EventLoopGroup timeoutTaskGroup = new NioEventLoopGroup(4); - ReplicaSession rs = + final EventLoopGroup rpcGroup = new NioEventLoopGroup(4); + final EventLoopGroup timeoutTaskGroup = new NioEventLoopGroup(4); + final ReplicaSession rs = new ReplicaSession( - addr, rpcGroup, timeoutTaskGroup, 1000, 30, (ReplicaSessionInterceptorManager) null); + // This website normally ignores incorrect request without replying. + rpc_address.fromIpPort("www.baidu.com:34801"), + rpcGroup, + timeoutTaskGroup, + 1000, + 30, + (ReplicaSessionInterceptorManager) null); rs.tryConnect().awaitUninterruptibly(); - long end = System.currentTimeMillis(); + + final long end = System.currentTimeMillis(); + assertEquals((end - start) / 1000, 1); // ensure connect failed within 1sec Thread.sleep(100); - assertEquals(rs.getState(), ReplicaSession.ConnState.DISCONNECTED); + assertEquals(ReplicaSession.ConnState.DISCONNECTED, rs.getState()); } @Test public void testSessionTryConnectWhenConnected() throws InterruptedException { - ReplicaSession rs = + final ReplicaSession rs = manager.getReplicaSession( Objects.requireNonNull(rpc_address.fromIpPort("127.0.0.1:34801"))); rs.tryConnect().awaitUninterruptibly(); Thread.sleep(100); + assertEquals(ReplicaSession.ConnState.CONNECTED, rs.getState()); assertNull(rs.tryConnect()); // do not connect again }