diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java index 548d16dcf7208..c85071cdb3e5e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.ccr; -import com.carrotsearch.hppc.cursors.ObjectCursor; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; @@ -222,9 +221,9 @@ public void testRetentionLeaseIsRenewedDuringRecovery() throws Exception { // block the recovery from completing; this ensures the background sync is still running final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get(); - for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getNodes().values()) { + for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) { final MockTransportService senderTransportService = - (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName()); senderTransportService.addSendBehavior( (connection, requestId, action, request, options) -> { if (ClearCcrRestoreSessionAction.NAME.equals(action) @@ -246,9 +245,9 @@ public void testRetentionLeaseIsRenewedDuringRecovery() throws Exception { assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex); latch.countDown(); } finally { - for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getDataNodes().values()) { + for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) { final MockTransportService senderTransportService = - (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName()); senderTransportService.clearAllRules(); } } @@ -403,9 +402,9 @@ public void testUnfollowRemovesRetentionLeases() throws Exception { final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get(); try { - for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getNodes().values()) { + for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) { final MockTransportService senderTransportService = - (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName()); senderTransportService.addSendBehavior( (connection, requestId, action, request, options) -> { if (RetentionLeaseActions.Remove.ACTION_NAME.equals(action) @@ -454,9 +453,9 @@ public void testUnfollowRemovesRetentionLeases() throws Exception { assertThat(shardStats.getRetentionLeaseStats().retentionLeases().leases(), empty()); } } finally { - for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getDataNodes().values()) { + for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) { final MockTransportService senderTransportService = - (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName()); senderTransportService.clearAllRules(); } } @@ -486,9 +485,9 @@ public void testUnfollowFailsToRemoveRetentionLeases() throws Exception { final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get(); try { - for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getNodes().values()) { + for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) { final MockTransportService senderTransportService = - (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName()); senderTransportService.addSendBehavior( (connection, requestId, action, request, options) -> { if (RetentionLeaseActions.Remove.ACTION_NAME.equals(action) @@ -524,9 +523,9 @@ public void testUnfollowFailsToRemoveRetentionLeases() throws Exception { getLeaderCluster().getClusterName(), new Index(leaderIndex, leaderUUID)))); } finally { - for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getDataNodes().values()) { + for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) { final MockTransportService senderTransportService = - (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName()); senderTransportService.clearAllRules(); } } @@ -614,7 +613,6 @@ public void testRetentionLeaseAdvancesWhileFollowing() throws Exception { }); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/39509") public void testRetentionLeaseRenewalIsCancelledWhenFollowingIsPaused() throws Exception { final String leaderIndex = "leader"; final String followerIndex = "follower"; @@ -765,35 +763,36 @@ public void testRetentionLeaseIsAddedIfItDisappearsWhileFollowing() throws Excep final CountDownLatch latch = new CountDownLatch(1); final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get(); - for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getNodes().values()) { - final MockTransportService senderTransportService = - (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); - senderTransportService.addSendBehavior( + try { + for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) { + final MockTransportService senderTransportService = + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName()); + senderTransportService.addSendBehavior( (connection, requestId, action, request, options) -> { if (RetentionLeaseActions.Renew.ACTION_NAME.equals(action) - || TransportActionProxy.getProxyAction(RetentionLeaseActions.Renew.ACTION_NAME).equals(action)) { + || TransportActionProxy.getProxyAction(RetentionLeaseActions.Renew.ACTION_NAME).equals(action)) { senderTransportService.clearAllRules(); final RetentionLeaseActions.RenewRequest renewRequest = (RetentionLeaseActions.RenewRequest) request; final String primaryShardNodeId = - getLeaderCluster() - .clusterService() - .state() - .routingTable() - .index(leaderIndex) - .shard(renewRequest.getShardId().id()) - .primaryShard() - .currentNodeId(); + getLeaderCluster() + .clusterService() + .state() + .routingTable() + .index(leaderIndex) + .shard(renewRequest.getShardId().id()) + .primaryShard() + .currentNodeId(); final String primaryShardNodeName = - getLeaderCluster().clusterService().state().nodes().get(primaryShardNodeId).getName(); + getLeaderCluster().clusterService().state().nodes().get(primaryShardNodeId).getName(); final IndexShard primary = - getLeaderCluster() - .getInstance(IndicesService.class, primaryShardNodeName) - .getShardOrNull(renewRequest.getShardId()); + getLeaderCluster() + .getInstance(IndicesService.class, primaryShardNodeName) + .getShardOrNull(renewRequest.getShardId()); final CountDownLatch innerLatch = new CountDownLatch(1); // this forces the background renewal from following to face a retention lease not found exception primary.removeRetentionLease( - getRetentionLeaseId(followerIndex, leaderIndex), - ActionListener.wrap(r -> innerLatch.countDown(), e -> fail(e.toString()))); + getRetentionLeaseId(followerIndex, leaderIndex), + ActionListener.wrap(r -> innerLatch.countDown(), e -> fail(e.toString()))); try { innerLatch.await(); @@ -806,11 +805,18 @@ public void testRetentionLeaseIsAddedIfItDisappearsWhileFollowing() throws Excep } connection.sendRequest(requestId, action, request, options); }); - } + } - latch.await(); + latch.await(); - assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex); + assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex); + } finally { + for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) { + final MockTransportService senderTransportService = + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName()); + senderTransportService.clearAllRules(); + } + } } /** @@ -857,9 +863,9 @@ public void testPeriodicRenewalDoesNotAddRetentionLeaseAfterUnfollow() throws Ex final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get(); try { - for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getNodes().values()) { + for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) { final MockTransportService senderTransportService = - (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName()); senderTransportService.addSendBehavior( (connection, requestId, action, request, options) -> { if (RetentionLeaseActions.Renew.ACTION_NAME.equals(action) @@ -913,9 +919,9 @@ public void onResponseReceived( assertThat(shardStats.getRetentionLeaseStats().retentionLeases().leases(), empty()); } } finally { - for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getDataNodes().values()) { + for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) { final MockTransportService senderTransportService = - (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName()); senderTransportService.clearAllRules(); } }