Skip to content

Commit

Permalink
Reset mock transport service in CcrRetentionLeaseIT (elastic#42600)
Browse files Browse the repository at this point in the history
testRetentionLeaseIsAddedIfItDisappearsWhileFollowing does not reset the
mock transport service after test. Surviving transport interceptors from
that test can sneaky remove retention leases and make other tests fail.

Closes elastic#39331
Closes elastic#39509
Closes elastic#41428
Closes elastic#41679
Closes elastic#41737
Closes elastic#41756
  • Loading branch information
dnhatn committed May 28, 2019
1 parent ab832c4 commit 2077f9f
Showing 1 changed file with 46 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.ccr;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
Expand Down Expand Up @@ -44,7 +43,6 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.RemoteTransportException;
Expand Down Expand Up @@ -88,7 +86,6 @@
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;

@TestLogging(value = "org.elasticsearch.xpack.ccr:trace,org.elasticsearch.indices.recovery:trace,org.elasticsearch.index.seqno:debug")
public class CcrRetentionLeaseIT extends CcrIntegTestCase {

public static final class RetentionLeaseRenewIntervalSettingPlugin extends Plugin {
Expand Down Expand Up @@ -224,9 +221,9 @@ public void testRetentionLeaseIsRenewedDuringRecovery() throws Exception {

// block the recovery from completing; this ensures the background sync is still running
final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get();
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getNodes().values()) {
for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) {
final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName());
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName());
senderTransportService.addSendBehavior(
(connection, requestId, action, request, options) -> {
if (ClearCcrRestoreSessionAction.NAME.equals(action)
Expand All @@ -248,9 +245,9 @@ public void testRetentionLeaseIsRenewedDuringRecovery() throws Exception {
assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex);
latch.countDown();
} finally {
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getDataNodes().values()) {
for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) {
final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName());
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName());
senderTransportService.clearAllRules();
}
}
Expand Down Expand Up @@ -405,9 +402,9 @@ public void testUnfollowRemovesRetentionLeases() throws Exception {

final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get();
try {
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getNodes().values()) {
for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) {
final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName());
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName());
senderTransportService.addSendBehavior(
(connection, requestId, action, request, options) -> {
if (RetentionLeaseActions.Remove.ACTION_NAME.equals(action)
Expand Down Expand Up @@ -456,9 +453,9 @@ public void testUnfollowRemovesRetentionLeases() throws Exception {
assertThat(Strings.toString(shardStats), shardStats.getRetentionLeaseStats().retentionLeases().leases(), empty());
}
} finally {
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getDataNodes().values()) {
for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) {
final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName());
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName());
senderTransportService.clearAllRules();
}
}
Expand Down Expand Up @@ -488,9 +485,9 @@ public void testUnfollowFailsToRemoveRetentionLeases() throws Exception {

final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get();
try {
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getNodes().values()) {
for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) {
final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName());
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName());
senderTransportService.addSendBehavior(
(connection, requestId, action, request, options) -> {
if (RetentionLeaseActions.Remove.ACTION_NAME.equals(action)
Expand Down Expand Up @@ -526,9 +523,9 @@ public void testUnfollowFailsToRemoveRetentionLeases() throws Exception {
getLeaderCluster().getClusterName(),
new Index(leaderIndex, leaderUUID))));
} finally {
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getDataNodes().values()) {
for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) {
final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName());
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName());
senderTransportService.clearAllRules();
}
}
Expand Down Expand Up @@ -766,35 +763,36 @@ public void testRetentionLeaseIsAddedIfItDisappearsWhileFollowing() throws Excep
final CountDownLatch latch = new CountDownLatch(1);

final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get();
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getNodes().values()) {
final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName());
senderTransportService.addSendBehavior(
try {
for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) {
final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName());
senderTransportService.addSendBehavior(
(connection, requestId, action, request, options) -> {
if (RetentionLeaseActions.Renew.ACTION_NAME.equals(action)
|| TransportActionProxy.getProxyAction(RetentionLeaseActions.Renew.ACTION_NAME).equals(action)) {
|| TransportActionProxy.getProxyAction(RetentionLeaseActions.Renew.ACTION_NAME).equals(action)) {
senderTransportService.clearAllRules();
final RetentionLeaseActions.RenewRequest renewRequest = (RetentionLeaseActions.RenewRequest) request;
final String primaryShardNodeId =
getLeaderCluster()
.clusterService()
.state()
.routingTable()
.index(leaderIndex)
.shard(renewRequest.getShardId().id())
.primaryShard()
.currentNodeId();
getLeaderCluster()
.clusterService()
.state()
.routingTable()
.index(leaderIndex)
.shard(renewRequest.getShardId().id())
.primaryShard()
.currentNodeId();
final String primaryShardNodeName =
getLeaderCluster().clusterService().state().nodes().get(primaryShardNodeId).getName();
getLeaderCluster().clusterService().state().nodes().get(primaryShardNodeId).getName();
final IndexShard primary =
getLeaderCluster()
.getInstance(IndicesService.class, primaryShardNodeName)
.getShardOrNull(renewRequest.getShardId());
getLeaderCluster()
.getInstance(IndicesService.class, primaryShardNodeName)
.getShardOrNull(renewRequest.getShardId());
final CountDownLatch innerLatch = new CountDownLatch(1);
// this forces the background renewal from following to face a retention lease not found exception
primary.removeRetentionLease(
getRetentionLeaseId(followerIndex, leaderIndex),
ActionListener.wrap(r -> innerLatch.countDown(), e -> fail(e.toString())));
getRetentionLeaseId(followerIndex, leaderIndex),
ActionListener.wrap(r -> innerLatch.countDown(), e -> fail(e.toString())));

try {
innerLatch.await();
Expand All @@ -807,11 +805,18 @@ public void testRetentionLeaseIsAddedIfItDisappearsWhileFollowing() throws Excep
}
connection.sendRequest(requestId, action, request, options);
});
}
}

latch.await();
latch.await();

assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex);
assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex);
} finally {
for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) {
final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName());
senderTransportService.clearAllRules();
}
}
}

/**
Expand Down Expand Up @@ -858,9 +863,9 @@ public void testPeriodicRenewalDoesNotAddRetentionLeaseAfterUnfollow() throws Ex
final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get();

try {
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getNodes().values()) {
for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) {
final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName());
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName());
senderTransportService.addSendBehavior(
(connection, requestId, action, request, options) -> {
if (RetentionLeaseActions.Renew.ACTION_NAME.equals(action)
Expand Down Expand Up @@ -914,9 +919,9 @@ public void onResponseReceived(
assertThat(Strings.toString(shardStats), shardStats.getRetentionLeaseStats().retentionLeases().leases(), empty());
}
} finally {
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getDataNodes().values()) {
for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) {
final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName());
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName());
senderTransportService.clearAllRules();
}
}
Expand Down

0 comments on commit 2077f9f

Please sign in to comment.