Skip to content

Commit

Permalink
refactor ReplicaSessionTest
Browse files Browse the repository at this point in the history
  • Loading branch information
empiredan committed Oct 22, 2024
1 parent ffdb756 commit e8907a2
Showing 1 changed file with 67 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.netty.channel.nio.NioEventLoopGroup;
import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -53,12 +52,12 @@
import org.slf4j.Logger;

public class ReplicaSessionTest {
private String metaList = "127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603";
private final Logger logger = org.slf4j.LoggerFactory.getLogger(ReplicaSessionTest.class);
private ClusterManager manager;

@BeforeEach
public void before() throws Exception {
final String metaList = "127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603";
manager = new ClusterManager(ClientOptions.builder().metaServers(metaList).build());
}

Expand All @@ -71,16 +70,16 @@ public void after() throws Exception {
@Test
public void testConnect() throws Exception {
// test1: connect to an invalid address.
rpc_address addr = new rpc_address();
addr.fromString("127.0.0.1:12345");
ReplicaSession rs = manager.getReplicaSession(addr);
ReplicaSession rs =
manager.getReplicaSession(
Objects.requireNonNull(rpc_address.fromIpPort("127.0.0.1:12345")));

ArrayList<FutureTask<Void>> callbacks = new ArrayList<FutureTask<Void>>();
ArrayList<FutureTask<Void>> callbacks = new ArrayList<>();

for (int i = 0; i < 100; ++i) {
final client_operator op = new rrdb_put_operator(new gpid(-1, -1), "", null, 0);
final FutureTask<Void> cb =
new FutureTask<Void>(
new FutureTask<>(
() -> {
assertEquals(error_code.error_types.ERR_SESSION_RESET, op.rpc_error.errno);
return null;
Expand All @@ -98,30 +97,33 @@ public void testConnect() throws Exception {
}
}

final ReplicaSession cp_rs = rs;
Toollet.waitCondition(() -> ReplicaSession.ConnState.DISCONNECTED == cp_rs.getState(), 5);
ReplicaSession finalRs = rs;
assertTrue(
Toollet.waitCondition(
() -> ReplicaSession.ConnState.DISCONNECTED == finalRs.getState(), 5));

// test2: connect to a valid address, and then close the server.
addr.fromString("127.0.0.1:34801");
callbacks.clear();

final rpc_address addr = rpc_address.fromIpPort("127.0.0.1:34801");
assertNotNull(addr);

rs = manager.getReplicaSession(addr);
rs.setMessageResponseFilter((err, header) -> true);
for (int i = 0; i < 20; ++i) {
// Send query request to replica server, expect it to be timeout.
final int index = i;
update_request req =
final update_request req =
new update_request(new blob("hello".getBytes()), new blob("world".getBytes()), 0);

final client_operator op = new Toollet.test_operator(new gpid(-1, -1), req);
final rpc_address cp_addr = addr;

final int index = i;
final FutureTask<Void> cb =
new FutureTask<Void>(
new FutureTask<>(
() -> {
assertEquals(error_code.error_types.ERR_TIMEOUT, op.rpc_error.errno);
// for the last request, we kill the server
// Kill the server at the last request.
if (index == 19) {
Toollet.closeServer(cp_addr);
Toollet.closeServer(addr);
}
return null;
});
Expand All @@ -132,18 +134,20 @@ public void testConnect() throws Exception {

for (int i = 0; i < 80; ++i) {
// Re-send query request to replica server, but the timeout is longer.
update_request req =
final update_request req =
new update_request(new blob("hello".getBytes()), new blob("world".getBytes()), 0);
final client_operator op = new Toollet.test_operator(new gpid(-1, -1), req);

final FutureTask<Void> cb =
new FutureTask<Void>(
new FutureTask<>(
() -> {
assertEquals(error_code.error_types.ERR_SESSION_RESET, op.rpc_error.errno);
return null;
});

callbacks.add(cb);
// The request has longer timeout, so it should be responsed later than the server been

// The request has longer timeout, so response should be received after the server was
// killed.
rs.asyncSend(op, cb, 2000, false);
}
Expand Down Expand Up @@ -177,26 +181,23 @@ public void recv_data(TProtocol iprot) throws TException {
}
}

rpc_address addr = new rpc_address();
addr.fromString("127.0.0.1:34801");
ReplicaSession rs = manager.getReplicaSession(addr);
final ReplicaSession rs =
manager.getReplicaSession(
Objects.requireNonNull(rpc_address.fromIpPort("127.0.0.1:34801")));

for (int pid = 0; pid < 16; pid++) {
// find a valid partition held on 127.0.0.1:34801
// Find a valid partition held on 127.0.0.1:34801.
blob req = new blob(PegasusClient.generateKey("a".getBytes(), "".getBytes()));
final client_operator op = new test_operator(new gpid(1, pid), req);
FutureTask<Void> cb =
new FutureTask<Void>(
new Callable<Void>() {
@Override
public Void call() throws Exception {
if (op.rpc_error.errno != error_code.error_types.ERR_OBJECT_NOT_FOUND
&& op.rpc_error.errno != error_code.error_types.ERR_INVALID_STATE
&& op.rpc_error.errno != error_code.error_types.ERR_SESSION_RESET) {
assertEquals(error_code.error_types.ERR_INVALID_DATA, op.rpc_error.errno);
}
return null;
final FutureTask<Void> cb =
new FutureTask<>(
() -> {
if (op.rpc_error.errno != error_code.error_types.ERR_OBJECT_NOT_FOUND
&& op.rpc_error.errno != error_code.error_types.ERR_INVALID_STATE
&& op.rpc_error.errno != error_code.error_types.ERR_SESSION_RESET) {
assertEquals(error_code.error_types.ERR_INVALID_DATA, op.rpc_error.errno);
}
return null;
});
rs.asyncSend(op, cb, 2000, false);
Tools.waitUninterruptable(cb, Integer.MAX_VALUE);
Expand All @@ -205,11 +206,11 @@ public Void call() throws Exception {

@Test
public void testTryNotifyWithSequenceID() throws Exception {
rpc_address addr = new rpc_address();
addr.fromString("127.0.0.1:34801");
ReplicaSession rs = manager.getReplicaSession(addr);
final ReplicaSession rs =
manager.getReplicaSession(
Objects.requireNonNull(rpc_address.fromIpPort("127.0.0.1:34801")));

// no pending RequestEntry, ensure no NPE thrown
// There's no pending RequestEntry, ensure no NPE thrown.
assertTrue(rs.pendingResponse.isEmpty());
try {
rs.tryNotifyFailureWithSeqID(100, error_code.error_types.ERR_TIMEOUT, false);
Expand All @@ -229,66 +230,72 @@ public void testTryNotifyWithSequenceID() throws Exception {
rs.tryNotifyFailureWithSeqID(100, error_code.error_types.ERR_TIMEOUT, false);
assertTrue(passed.get());

// simulate the entry has been removed, ensure no NPE thrown
// Simulate the entry has been removed, ensure no NPE thrown.
rs.getAndRemoveEntry(entry.sequenceId);
rs.tryNotifyFailureWithSeqID(entry.sequenceId, entry.op.rpc_error.errno, true);

// ensure mark session state to disconnect when TryNotifyWithSequenceID incur any exception
ReplicaSession mockRs = Mockito.spy(rs);
// Ensure mark session state to disconnect when TryNotifyWithSequenceID incur any exception.
final ReplicaSession mockRs = Mockito.spy(rs);
mockRs.pendingSend.offer(entry);
mockRs.fields.state = ReplicaSession.ConnState.CONNECTED;
Mockito.doThrow(new Exception())
.when(mockRs)
.tryNotifyFailureWithSeqID(entry.sequenceId, entry.op.rpc_error.errno, false);
mockRs.markSessionDisconnect();
assertEquals(mockRs.getState(), ReplicaSession.ConnState.DISCONNECTED);
assertEquals(ReplicaSession.ConnState.DISCONNECTED, mockRs.getState());
}

@Test
public void testCloseSession() throws InterruptedException {
rpc_address addr = new rpc_address();
addr.fromString("127.0.0.1:34801");
ReplicaSession rs = manager.getReplicaSession(addr);
final ReplicaSession rs =
manager.getReplicaSession(
Objects.requireNonNull(rpc_address.fromIpPort("127.0.0.1:34801")));
rs.tryConnect().awaitUninterruptibly();
Thread.sleep(200);
assertEquals(rs.getState(), ReplicaSession.ConnState.CONNECTED);
assertEquals(ReplicaSession.ConnState.CONNECTED, rs.getState());

rs.closeSession();
Thread.sleep(100);
assertEquals(rs.getState(), ReplicaSession.ConnState.DISCONNECTED);
assertEquals(ReplicaSession.ConnState.DISCONNECTED, rs.getState());

rs.fields.state = ReplicaSession.ConnState.CONNECTING;
rs.closeSession();
Thread.sleep(100);
assertEquals(rs.getState(), ReplicaSession.ConnState.DISCONNECTED);
assertEquals(ReplicaSession.ConnState.DISCONNECTED, rs.getState());
}

@Test
public void testSessionConnectTimeout() throws InterruptedException {
rpc_address addr = new rpc_address();
addr.fromString(
"www.baidu.com:34801"); // this website normally ignores incorrect request without replying
final long start = System.currentTimeMillis();

long start = System.currentTimeMillis();
EventLoopGroup rpcGroup = new NioEventLoopGroup(4);
EventLoopGroup timeoutTaskGroup = new NioEventLoopGroup(4);
ReplicaSession rs =
final EventLoopGroup rpcGroup = new NioEventLoopGroup(4);
final EventLoopGroup timeoutTaskGroup = new NioEventLoopGroup(4);
final ReplicaSession rs =
new ReplicaSession(
addr, rpcGroup, timeoutTaskGroup, 1000, 30, (ReplicaSessionInterceptorManager) null);
// This website normally ignores incorrect request without replying.
rpc_address.fromIpPort("www.baidu.com:34801"),
rpcGroup,
timeoutTaskGroup,
1000,
30,
(ReplicaSessionInterceptorManager) null);
rs.tryConnect().awaitUninterruptibly();
long end = System.currentTimeMillis();

final long end = System.currentTimeMillis();

assertEquals((end - start) / 1000, 1); // ensure connect failed within 1sec
Thread.sleep(100);
assertEquals(rs.getState(), ReplicaSession.ConnState.DISCONNECTED);
assertEquals(ReplicaSession.ConnState.DISCONNECTED, rs.getState());
}

@Test
public void testSessionTryConnectWhenConnected() throws InterruptedException {
ReplicaSession rs =
final ReplicaSession rs =
manager.getReplicaSession(
Objects.requireNonNull(rpc_address.fromIpPort("127.0.0.1:34801")));
rs.tryConnect().awaitUninterruptibly();
Thread.sleep(100);

assertEquals(ReplicaSession.ConnState.CONNECTED, rs.getState());
assertNull(rs.tryConnect()); // do not connect again
}
Expand Down

0 comments on commit e8907a2

Please sign in to comment.