diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java index b05c7a0dde368..1f6373aff6a8c 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java @@ -630,6 +630,22 @@ public void onFailure(Exception e) { public void testGetFollowStats() throws Exception { RestHighLevelClient client = highLevelClient(); + { + // Create leader index: + CreateIndexRequest createIndexRequest = new CreateIndexRequest("leader"); + createIndexRequest.settings(Collections.singletonMap("index.soft_deletes.enabled", true)); + CreateIndexResponse response = client.indices().create(createIndexRequest, RequestOptions.DEFAULT); + assertThat(response.isAcknowledged(), is(true)); + } + { + // Follow index, so that we can query for follow stats: + PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", "follower"); + PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT); + assertThat(putFollowResponse.isFollowIndexCreated(), is(true)); + assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true)); + assertThat(putFollowResponse.isIndexFollowingStarted(), is(true)); + } + // tag::ccr-get-follow-stats-request FollowStatsRequest request = new FollowStatsRequest("follower"); // <1> @@ -671,6 +687,12 @@ public void onFailure(Exception e) { // end::ccr-get-follow-stats-execute-async assertTrue(latch.await(30L, TimeUnit.SECONDS)); + + { + PauseFollowRequest pauseFollowRequest = new PauseFollowRequest("follower"); + AcknowledgedResponse pauseFollowResponse = client.ccr().pauseFollow(pauseFollowRequest, RequestOptions.DEFAULT); + assertThat(pauseFollowResponse.isAcknowledged(), is(true)); + } } static Map toMap(Response response) throws IOException { diff --git a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_stats.yml b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_stats.yml index aa63c804aba21..5b3e6c18ef29b 100644 --- a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_stats.yml +++ b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_stats.yml @@ -43,6 +43,12 @@ - is_true: follow_index_shards_acked - is_true: index_following_started + - do: + ccr.follow_stats: + index: _all + - length: { indices: 1 } + - match: { indices.0.index: "bar" } + # we can not reliably wait for replication to occur so we test the endpoint without indexing any documents - do: ccr.follow_stats: @@ -77,3 +83,7 @@ index: bar - is_true: acknowledged + - do: + catch: missing + ccr.follow_stats: + index: unknown diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsAction.java index 8ab66aec8e80b..dc684fbc904ce 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsAction.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ccr.action; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.TaskOperationFailure; @@ -13,6 +14,7 @@ import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; @@ -65,6 +67,15 @@ protected void doExecute( listener.onFailure(LicenseUtils.newComplianceException("ccr")); return; } + + if (Strings.isAllOrWildcard(request.indices()) == false) { + final ClusterState state = clusterService.state(); + Set shardFollowTaskFollowerIndices = findFollowerIndicesFromShardFollowTasks(state, request.indices()); + if (shardFollowTaskFollowerIndices.isEmpty()) { + String resources = String.join(",", request.indices()); + throw new ResourceNotFoundException("No shard follow tasks for follower indices [{}]", resources); + } + } super.doExecute(task, request, listener); } @@ -80,21 +91,7 @@ protected FollowStatsAction.StatsResponses newResponse( @Override protected void processTasks(final FollowStatsAction.StatsRequest request, final Consumer operation) { final ClusterState state = clusterService.state(); - final PersistentTasksCustomMetaData persistentTasksMetaData = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); - if (persistentTasksMetaData == null) { - return; - } - - final Set requestedFollowerIndices = request.indices() != null ? - new HashSet<>(Arrays.asList(request.indices())) : Collections.emptySet(); - final Set followerIndices = persistentTasksMetaData.tasks().stream() - .filter(persistentTask -> persistentTask.getTaskName().equals(ShardFollowTask.NAME)) - .map(persistentTask -> { - ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams(); - return shardFollowTask.getFollowShardId().getIndexName(); - }) - .filter(followerIndex -> requestedFollowerIndices.isEmpty() || requestedFollowerIndices.contains(followerIndex)) - .collect(Collectors.toSet()); + final Set followerIndices = findFollowerIndicesFromShardFollowTasks(state, request.indices()); for (final Task task : taskManager.getTasks().values()) { if (task instanceof ShardFollowNodeTask) { @@ -114,4 +111,22 @@ protected void taskOperation( listener.onResponse(new FollowStatsAction.StatsResponse(task.getStatus())); } + static Set findFollowerIndicesFromShardFollowTasks(ClusterState state, String[] indices) { + final PersistentTasksCustomMetaData persistentTasksMetaData = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); + if (persistentTasksMetaData == null) { + return Collections.emptySet(); + } + + final Set requestedFollowerIndices = indices != null ? + new HashSet<>(Arrays.asList(indices)) : Collections.emptySet(); + return persistentTasksMetaData.tasks().stream() + .filter(persistentTask -> persistentTask.getTaskName().equals(ShardFollowTask.NAME)) + .map(persistentTask -> { + ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams(); + return shardFollowTask.getFollowShardId().getIndexName(); + }) + .filter(followerIndex -> Strings.isAllOrWildcard(indices) || requestedFollowerIndices.contains(followerIndex)) + .collect(Collectors.toSet()); + } + } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowStatsIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowStatsIT.java index 409746f9d851b..1f1c6cd5c64e3 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowStatsIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowStatsIT.java @@ -6,9 +6,12 @@ package org.elasticsearch.xpack.ccr; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; @@ -116,4 +119,106 @@ public void testFollowStatsApiFollowerIndexFiltering() throws Exception { }); } + public void testFollowStatsApiResourceNotFound() throws Exception { + FollowStatsAction.StatsRequest statsRequest = new FollowStatsAction.StatsRequest(); + FollowStatsAction.StatsResponses response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet(); + assertThat(response.getStatsResponses().size(), equalTo(0)); + + statsRequest.setIndices(new String[] {"follower1"}); + Exception e = expectThrows(ResourceNotFoundException.class, + () -> client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet()); + assertThat(e.getMessage(), equalTo("No shard follow tasks for follower indices [follower1]")); + + final String leaderIndexSettings = getIndexSettings(1, 0, + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("leader1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureGreen("leader1"); + + PutFollowAction.Request followRequest = getPutFollowRequest("leader1", "follower1"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); + + response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet(); + assertThat(response.getStatsResponses().size(), equalTo(1)); + assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1")); + + statsRequest.setIndices(new String[] {"follower2"}); + e = expectThrows(ResourceNotFoundException.class, + () -> client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet()); + assertThat(e.getMessage(), equalTo("No shard follow tasks for follower indices [follower2]")); + + assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet()); + } + + public void testFollowStatsApiIncludeShardFollowStatsWithRemovedFollowerIndex() throws Exception { + final String leaderIndexSettings = getIndexSettings(1, 0, + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("leader1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureGreen("leader1"); + + PutFollowAction.Request followRequest = getPutFollowRequest("leader1", "follower1"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); + + FollowStatsAction.StatsRequest statsRequest = new FollowStatsAction.StatsRequest(); + FollowStatsAction.StatsResponses response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet(); + assertThat(response.getStatsResponses().size(), equalTo(1)); + assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1")); + + statsRequest = new FollowStatsAction.StatsRequest(); + statsRequest.setIndices(new String[] {"follower1"}); + response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet(); + assertThat(response.getStatsResponses().size(), equalTo(1)); + assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1")); + + assertAcked(client().admin().indices().delete(new DeleteIndexRequest("follower1")).actionGet()); + + statsRequest = new FollowStatsAction.StatsRequest(); + response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet(); + assertThat(response.getStatsResponses().size(), equalTo(1)); + assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1")); + + statsRequest = new FollowStatsAction.StatsRequest(); + statsRequest.setIndices(new String[] {"follower1"}); + response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet(); + assertThat(response.getStatsResponses().size(), equalTo(1)); + assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1")); + + assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet()); + } + + public void testFollowStatsApiIncludeShardFollowStatsWithClosedFollowerIndex() throws Exception { + final String leaderIndexSettings = getIndexSettings(1, 0, + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("leader1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureGreen("leader1"); + + PutFollowAction.Request followRequest = getPutFollowRequest("leader1", "follower1"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); + + FollowStatsAction.StatsRequest statsRequest = new FollowStatsAction.StatsRequest(); + FollowStatsAction.StatsResponses response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet(); + assertThat(response.getStatsResponses().size(), equalTo(1)); + assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1")); + + statsRequest = new FollowStatsAction.StatsRequest(); + statsRequest.setIndices(new String[] {"follower1"}); + response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet(); + assertThat(response.getStatsResponses().size(), equalTo(1)); + assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1")); + + assertAcked(client().admin().indices().close(new CloseIndexRequest("follower1")).actionGet()); + + statsRequest = new FollowStatsAction.StatsRequest(); + response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet(); + assertThat(response.getStatsResponses().size(), equalTo(1)); + assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1")); + + statsRequest = new FollowStatsAction.StatsRequest(); + statsRequest.setIndices(new String[] {"follower1"}); + response = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet(); + assertThat(response.getStatsResponses().size(), equalTo(1)); + assertThat(response.getStatsResponses().get(0).status().followerIndex(), equalTo("follower1")); + + assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet()); + } + } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsActionTests.java new file mode 100644 index 0000000000000..bc8c58f1de7de --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsActionTests.java @@ -0,0 +1,66 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.test.ESTestCase; + +import java.util.Collections; +import java.util.Set; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class TransportFollowStatsActionTests extends ESTestCase { + + public void testFindFollowerIndicesFromShardFollowTasks() { + PersistentTasksCustomMetaData.Builder persistentTasks = PersistentTasksCustomMetaData.builder() + .addTask("1", ShardFollowTask.NAME, createShardFollowTask("abc"), null) + .addTask("2", ShardFollowTask.NAME, createShardFollowTask("def"), null); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_cluster")) + .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, persistentTasks.build()).build()) + .build(); + Set result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState, null); + assertThat(result.size(), equalTo(2)); + assertThat(result.contains("abc"), is(true)); + assertThat(result.contains("def"), is(true)); + + result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState, new String[]{"def"}); + assertThat(result.size(), equalTo(1)); + assertThat(result.contains("def"), is(true)); + + result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState, new String[]{"ghi"}); + assertThat(result.size(), equalTo(0)); + } + + private static ShardFollowTask createShardFollowTask(String followerIndex) { + return new ShardFollowTask( + null, + new ShardId(followerIndex, "", 0), + new ShardId("leader_index", "", 0), + 1024, + TransportResumeFollowAction.DEFAULT_MAX_READ_REQUEST_SIZE, + 1, + 1024, + TransportResumeFollowAction.DEFAULT_MAX_READ_REQUEST_SIZE, + 1, + 10240, + new ByteSizeValue(512, ByteSizeUnit.MB), + TimeValue.timeValueMillis(10), + TimeValue.timeValueMillis(10), + Collections.emptyMap() + ); + } + +}