diff --git a/buildSrc/src/main/groovy/org/elasticsearch/gradle/doc/RestTestsFromSnippetsTask.groovy b/buildSrc/src/main/groovy/org/elasticsearch/gradle/doc/RestTestsFromSnippetsTask.groovy index 11f879a93bc71..99fc6c06f1b76 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/gradle/doc/RestTestsFromSnippetsTask.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/gradle/doc/RestTestsFromSnippetsTask.groovy @@ -104,7 +104,7 @@ public class RestTestsFromSnippetsTask extends SnippetsTask { * format of the response is incompatible i.e. it is not a JSON object. */ static shouldAddShardFailureCheck(String path) { - return path.startsWith('_cat') == false && path.startsWith('_xpack/ml/datafeeds/') == false + return path.startsWith('_cat') == false && path.startsWith('_xpack/ml/datafeeds/') == false } /** @@ -293,7 +293,7 @@ public class RestTestsFromSnippetsTask extends SnippetsTask { } void emitDo(String method, String pathAndQuery, String body, - String catchPart, List warnings, boolean inSetup) { + String catchPart, List warnings, boolean inSetup, boolean skipShardFailures) { def (String path, String query) = pathAndQuery.tokenize('?') if (path == null) { path = '' // Catch requests to the root... @@ -345,7 +345,7 @@ public class RestTestsFromSnippetsTask extends SnippetsTask { * section so we have to skip it there. We also omit the assertion * from APIs that don't return a JSON object */ - if (false == inSetup && shouldAddShardFailureCheck(path)) { + if (false == inSetup && skipShardFailures == false && shouldAddShardFailureCheck(path)) { current.println(" - is_false: _shards.failures") } } @@ -393,7 +393,7 @@ public class RestTestsFromSnippetsTask extends SnippetsTask { pathAndQuery = pathAndQuery.substring(1) } emitDo(method, pathAndQuery, body, catchPart, snippet.warnings, - inSetup) + inSetup, snippet.skipShardsFailures) } } diff --git a/buildSrc/src/main/groovy/org/elasticsearch/gradle/doc/SnippetsTask.groovy b/buildSrc/src/main/groovy/org/elasticsearch/gradle/doc/SnippetsTask.groovy index fbc231aa764dc..83a6a05ec5df7 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/gradle/doc/SnippetsTask.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/gradle/doc/SnippetsTask.groovy @@ -45,7 +45,7 @@ public class SnippetsTask extends DefaultTask { private static final String WARNING = /warning:(.+)/ private static final String CAT = /(_cat)/ private static final String TEST_SYNTAX = - /(?:$CATCH|$SUBSTITUTION|$SKIP|(continued)|$SETUP|$WARNING) ?/ + /(?:$CATCH|$SUBSTITUTION|$SKIP|(continued)|$SETUP|$WARNING|(skip_shard_failures)) ?/ /** * Action to take on each snippet. Called with a single parameter, an @@ -233,6 +233,10 @@ public class SnippetsTask extends DefaultTask { snippet.warnings.add(it.group(7)) return } + if (it.group(8) != null) { + snippet.skipShardsFailures = true + return + } throw new InvalidUserDataException( "Invalid test marker: $line") } @@ -329,6 +333,7 @@ public class SnippetsTask extends DefaultTask { String setup = null boolean curl List warnings = new ArrayList() + boolean skipShardsFailures = false @Override public String toString() { @@ -359,6 +364,9 @@ public class SnippetsTask extends DefaultTask { for (String warning in warnings) { result += "[warning:$warning]" } + if (skipShardsFailures) { + result += '[skip_shard_failures]' + } } if (testResponse) { result += '// TESTRESPONSE' diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrClient.java index b6c6866966725..373b94124d43e 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrClient.java @@ -27,6 +27,7 @@ import org.elasticsearch.client.ccr.FollowInfoResponse; import org.elasticsearch.client.ccr.FollowStatsRequest; import org.elasticsearch.client.ccr.FollowStatsResponse; +import org.elasticsearch.client.ccr.ForgetFollowerRequest; import org.elasticsearch.client.ccr.GetAutoFollowPatternRequest; import org.elasticsearch.client.ccr.GetAutoFollowPatternResponse; import org.elasticsearch.client.ccr.PauseFollowRequest; @@ -36,6 +37,7 @@ import org.elasticsearch.client.ccr.ResumeFollowRequest; import org.elasticsearch.client.ccr.UnfollowRequest; import org.elasticsearch.client.core.AcknowledgedResponse; +import org.elasticsearch.client.core.BroadcastResponse; import java.io.IOException; import java.util.Collections; @@ -233,6 +235,48 @@ public void unfollowAsync(UnfollowRequest request, ); } + /** + * Instructs an index acting as a leader index to forget the specified follower index. + * + * See the docs for more details + * on the intended usage of this API. + * + * @param request the request + * @param options the request options (e.g., headers), use {@link RequestOptions#DEFAULT} if the defaults are acceptable. + * @return the response + * @throws IOException if an I/O exception occurs while executing this request + */ + public BroadcastResponse forgetFollower(final ForgetFollowerRequest request, final RequestOptions options) throws IOException { + return restHighLevelClient.performRequestAndParseEntity( + request, + CcrRequestConverters::forgetFollower, + options, + BroadcastResponse::fromXContent, + Collections.emptySet()); + } + + /** + * Asynchronously instructs an index acting as a leader index to forget the specified follower index. + * + * See the docs for more details + * on the intended usage of this API. + * + * @param request the request + * @param options the request options (e.g., headers), use {@link RequestOptions#DEFAULT} if the defaults are acceptable. + */ + public void forgetFollowerAsync( + final ForgetFollowerRequest request, + final RequestOptions options, + final ActionListener listener) { + restHighLevelClient.performRequestAsyncAndParseEntity( + request, + CcrRequestConverters::forgetFollower, + options, + BroadcastResponse::fromXContent, + listener, + Collections.emptySet()); + } + /** * Stores an auto follow pattern. * diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrRequestConverters.java index 744714af41a3a..940a1e3c5b31a 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrRequestConverters.java @@ -28,6 +28,7 @@ import org.elasticsearch.client.ccr.DeleteAutoFollowPatternRequest; import org.elasticsearch.client.ccr.FollowInfoRequest; import org.elasticsearch.client.ccr.FollowStatsRequest; +import org.elasticsearch.client.ccr.ForgetFollowerRequest; import org.elasticsearch.client.ccr.GetAutoFollowPatternRequest; import org.elasticsearch.client.ccr.PauseFollowRequest; import org.elasticsearch.client.ccr.PutAutoFollowPatternRequest; @@ -80,6 +81,17 @@ static Request unfollow(UnfollowRequest unfollowRequest) { return new Request(HttpPost.METHOD_NAME, endpoint); } + static Request forgetFollower(final ForgetFollowerRequest forgetFollowerRequest) throws IOException { + final String endpoint = new RequestConverters.EndpointBuilder() + .addPathPart(forgetFollowerRequest.leaderIndex()) + .addPathPartAsIs("_ccr") + .addPathPartAsIs("forget_follower") + .build(); + final Request request = new Request(HttpPost.METHOD_NAME, endpoint); + request.setEntity(createEntity(forgetFollowerRequest, REQUEST_BODY_CONTENT_TYPE)); + return request; + } + static Request putAutoFollowPattern(PutAutoFollowPatternRequest putAutoFollowPatternRequest) throws IOException { String endpoint = new RequestConverters.EndpointBuilder() .addPathPartAsIs("_ccr", "auto_follow") diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/ForgetFollowerRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/ForgetFollowerRequest.java new file mode 100644 index 0000000000000..3d20a6d934d9d --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/ForgetFollowerRequest.java @@ -0,0 +1,89 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.ccr; + +import org.elasticsearch.client.Validatable; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +/** + * Represents a forget follower request. Note that this an expert API intended to be used only when unfollowing a follower index fails to + * remove the follower retention leases. Please be sure that you understand the purpose this API before using. + */ +public final class ForgetFollowerRequest implements ToXContentObject, Validatable { + + private final String followerCluster; + + private final String followerIndex; + + private final String followerIndexUUID; + + private final String leaderRemoteCluster; + + private final String leaderIndex; + + /** + * The name of the leader index. + * + * @return the name of the leader index + */ + public String leaderIndex() { + return leaderIndex; + } + + /** + * Construct a forget follower request. + * + * @param followerCluster the name of the cluster containing the follower index to forget + * @param followerIndex the name of follower index + * @param followerIndexUUID the UUID of the follower index + * @param leaderRemoteCluster the alias of the remote cluster containing the leader index from the perspective of the follower index + * @param leaderIndex the name of the leader index + */ + public ForgetFollowerRequest( + final String followerCluster, + final String followerIndex, + final String followerIndexUUID, + final String leaderRemoteCluster, + final String leaderIndex) { + this.followerCluster = Objects.requireNonNull(followerCluster); + this.followerIndex = Objects.requireNonNull(followerIndex); + this.followerIndexUUID = Objects.requireNonNull(followerIndexUUID); + this.leaderRemoteCluster = Objects.requireNonNull(leaderRemoteCluster); + this.leaderIndex = Objects.requireNonNull(leaderIndex); + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(); + { + builder.field("follower_cluster", followerCluster); + builder.field("follower_index", followerIndex); + builder.field("follower_index_uuid", followerIndexUUID); + builder.field("leader_remote_cluster", leaderRemoteCluster); + } + builder.endObject(); + return builder; + } + +} diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/core/BroadcastResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/core/BroadcastResponse.java new file mode 100644 index 0000000000000..3665ba5bf5009 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/core/BroadcastResponse.java @@ -0,0 +1,175 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.core; + +import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; + +/** + * Represents a response to a request that is broadcast to a collection of shards. + */ +public class BroadcastResponse { + + private final Shards shards; + + /** + * Represents the shard-level summary of the response execution. + * + * @return the shard-level response summary + */ + public Shards shards() { + return shards; + } + + BroadcastResponse(final Shards shards) { + this.shards = Objects.requireNonNull(shards); + } + + private static final ParseField SHARDS_FIELD = new ParseField("_shards"); + + static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "broadcast_response", + a -> new BroadcastResponse((Shards) a[0])); + + static { + PARSER.declareObject(ConstructingObjectParser.constructorArg(), Shards.SHARDS_PARSER, SHARDS_FIELD); + } + + /** + * Parses a broadcast response. + * + * @param parser the parser + * @return a broadcast response parsed from the specified parser + * @throws IOException if an I/O exception occurs parsing the response + */ + public static BroadcastResponse fromXContent(final XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + /** + * Represents the results of a collection of shards on which a request was executed against. + */ + public static class Shards { + + private final int total; + + /** + * The total number of shards on which a request was executed against. + * + * @return the total number of shards + */ + public int total() { + return total; + } + + private final int successful; + + /** + * The number of successful shards on which a request was executed against. + * + * @return the number of successful shards + */ + public int successful() { + return successful; + } + + private final int skipped; + + /** + * The number of shards skipped by the request. + * + * @return the number of skipped shards + */ + public int skipped() { + return skipped; + } + + private final int failed; + + /** + * The number of shards on which a request failed to be executed against. + * + * @return the number of failed shards + */ + public int failed() { + return failed; + } + + private final Collection failures; + + /** + * The failures corresponding to the shards on which a request failed to be executed against. Note that the number of failures might + * not match {@link #failed()} as some responses group together shard failures. + * + * @return the failures + */ + public Collection failures() { + return failures; + } + + Shards( + final int total, + final int successful, + final int skipped, + final int failed, + final Collection failures) { + this.total = total; + this.successful = successful; + this.skipped = skipped; + this.failed = failed; + this.failures = Collections.unmodifiableCollection(Objects.requireNonNull(failures)); + } + + private static final ParseField TOTAL_FIELD = new ParseField("total"); + private static final ParseField SUCCESSFUL_FIELD = new ParseField("successful"); + private static final ParseField SKIPPED_FIELD = new ParseField("skipped"); + private static final ParseField FAILED_FIELD = new ParseField("failed"); + private static final ParseField FAILURES_FIELD = new ParseField("failures"); + + @SuppressWarnings("unchecked") + static final ConstructingObjectParser SHARDS_PARSER = new ConstructingObjectParser<>( + "shards", + a -> new Shards( + (int) a[0], // total + (int) a[1], // successful + a[2] == null ? 0 : (int) a[2], // skipped + (int) a[3], // failed + a[4] == null ? Collections.emptyList() : (Collection) a[4])); // failures + + static { + SHARDS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), TOTAL_FIELD); + SHARDS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), SUCCESSFUL_FIELD); + SHARDS_PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), SKIPPED_FIELD); + SHARDS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), FAILED_FIELD); + SHARDS_PARSER.declareObjectArray( + ConstructingObjectParser.optionalConstructorArg(), + DefaultShardOperationFailedException.PARSER, FAILURES_FIELD); + } + + } + +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java index 2566bb4912105..6c6b1ebe6a79e 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java @@ -35,6 +35,7 @@ import org.elasticsearch.client.ccr.FollowInfoResponse; import org.elasticsearch.client.ccr.FollowStatsRequest; import org.elasticsearch.client.ccr.FollowStatsResponse; +import org.elasticsearch.client.ccr.ForgetFollowerRequest; import org.elasticsearch.client.ccr.GetAutoFollowPatternRequest; import org.elasticsearch.client.ccr.GetAutoFollowPatternResponse; import org.elasticsearch.client.ccr.IndicesFollowStats.ShardFollowStats; @@ -45,20 +46,25 @@ import org.elasticsearch.client.ccr.ResumeFollowRequest; import org.elasticsearch.client.ccr.UnfollowRequest; import org.elasticsearch.client.core.AcknowledgedResponse; +import org.elasticsearch.client.core.BroadcastResponse; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.test.rest.yaml.ObjectPath; import org.junit.Before; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -192,6 +198,61 @@ public void testIndexFollowing() throws Exception { assertThat(unfollowResponse.isAcknowledged(), is(true)); } + public void testForgetFollower() throws IOException { + final CcrClient ccrClient = highLevelClient().ccr(); + + final CreateIndexRequest createIndexRequest = new CreateIndexRequest("leader"); + final Map settings = new HashMap<>(3); + final int numberOfShards = randomIntBetween(1, 2); + settings.put("index.number_of_replicas", "0"); + settings.put("index.number_of_shards", Integer.toString(numberOfShards)); + settings.put("index.soft_deletes.enabled", Boolean.TRUE.toString()); + createIndexRequest.settings(settings); + final CreateIndexResponse response = highLevelClient().indices().create(createIndexRequest, RequestOptions.DEFAULT); + assertThat(response.isAcknowledged(), is(true)); + + final PutFollowRequest putFollowRequest = new PutFollowRequest("local_cluster", "leader", "follower", ActiveShardCount.ONE); + final PutFollowResponse putFollowResponse = execute(putFollowRequest, ccrClient::putFollow, ccrClient::putFollowAsync); + assertTrue(putFollowResponse.isFollowIndexCreated()); + assertTrue(putFollowResponse.isFollowIndexShardsAcked()); + assertTrue(putFollowResponse.isIndexFollowingStarted()); + + final String clusterName = highLevelClient().info(RequestOptions.DEFAULT).getClusterName().value(); + + final Request statsRequest = new Request("GET", "/follower/_stats"); + final Response statsResponse = client().performRequest(statsRequest); + final ObjectPath statsObjectPath = ObjectPath.createFromResponse(statsResponse); + final String followerIndexUUID = statsObjectPath.evaluate("indices.follower.uuid"); + + final PauseFollowRequest pauseFollowRequest = new PauseFollowRequest("follower"); + AcknowledgedResponse pauseFollowResponse = execute(pauseFollowRequest, ccrClient::pauseFollow, ccrClient::pauseFollowAsync); + assertTrue(pauseFollowResponse.isAcknowledged()); + + final ForgetFollowerRequest forgetFollowerRequest = + new ForgetFollowerRequest(clusterName, "follower", followerIndexUUID, "local_cluster", "leader"); + final BroadcastResponse forgetFollowerResponse = + execute(forgetFollowerRequest, ccrClient::forgetFollower, ccrClient::forgetFollowerAsync); + assertThat(forgetFollowerResponse.shards().total(), equalTo(numberOfShards)); + assertThat(forgetFollowerResponse.shards().successful(), equalTo(numberOfShards)); + assertThat(forgetFollowerResponse.shards().skipped(), equalTo(0)); + assertThat(forgetFollowerResponse.shards().failed(), equalTo(0)); + assertThat(forgetFollowerResponse.shards().failures(), empty()); + + final Request retentionLeasesRequest = new Request("GET", "/leader/_stats"); + retentionLeasesRequest.addParameter("level", "shards"); + final Response retentionLeasesResponse = client().performRequest(retentionLeasesRequest); + final Map shardsStats = ObjectPath.createFromResponse(retentionLeasesResponse).evaluate("indices.leader.shards"); + assertThat(shardsStats.keySet(), hasSize(numberOfShards)); + for (int i = 0; i < numberOfShards; i++) { + final List shardStats = (List) shardsStats.get(Integer.toString(i)); + assertThat(shardStats, hasSize(1)); + final Map shardStatsAsMap = (Map) shardStats.get(0); + final Map retentionLeasesStats = (Map) shardStatsAsMap.get("retention_leases"); + final List leases = (List) retentionLeasesStats.get("leases"); + assertThat(leases, empty()); + } + } + public void testAutoFollowing() throws Exception { CcrClient ccrClient = highLevelClient().ccr(); PutAutoFollowPatternRequest putAutoFollowPatternRequest = diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CcrRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CcrRequestConvertersTests.java index 8470194b65449..e440421d46b40 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CcrRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CcrRequestConvertersTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.client.ccr.FollowConfig; import org.elasticsearch.client.ccr.FollowInfoRequest; import org.elasticsearch.client.ccr.FollowStatsRequest; +import org.elasticsearch.client.ccr.ForgetFollowerRequest; import org.elasticsearch.client.ccr.GetAutoFollowPatternRequest; import org.elasticsearch.client.ccr.PauseFollowRequest; import org.elasticsearch.client.ccr.PutAutoFollowPatternRequest; @@ -39,9 +40,11 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; +import java.io.IOException; import java.util.Arrays; import java.util.Locale; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; @@ -91,6 +94,20 @@ public void testUnfollow() { assertThat(result.getEntity(), nullValue()); } + public void testForgetFollower() throws IOException { + final ForgetFollowerRequest request = new ForgetFollowerRequest( + randomAlphaOfLength(8), + randomAlphaOfLength(8), + randomAlphaOfLength(8), + randomAlphaOfLength(8), + randomAlphaOfLength(8)); + final Request convertedRequest = CcrRequestConverters.forgetFollower(request); + assertThat(convertedRequest.getMethod(), equalTo(HttpPost.METHOD_NAME)); + assertThat(convertedRequest.getEndpoint(), equalTo("/" + request.leaderIndex() + "/_ccr/forget_follower")); + assertThat(convertedRequest.getParameters().keySet(), empty()); + RequestConvertersTests.assertToXContentBody(request, convertedRequest.getEntity()); + } + public void testPutAutofollowPattern() throws Exception { PutAutoFollowPatternRequest putAutoFollowPatternRequest = new PutAutoFollowPatternRequest(randomAlphaOfLength(4), randomAlphaOfLength(4), Arrays.asList(generateRandomStringArray(4, 4, false))); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/core/BroadcastResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/core/BroadcastResponseTests.java new file mode 100644 index 0000000000000..96438725d4ef0 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/core/BroadcastResponseTests.java @@ -0,0 +1,90 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.core; + +import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.cluster.ClusterModule; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContent; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.isIn; + +public class BroadcastResponseTests extends ESTestCase { + + public void testFromXContent() throws IOException { + final String index = randomAlphaOfLength(8); + final String id = randomAlphaOfLength(8); + final int total = randomIntBetween(1, 16); + final int successful = total - scaledRandomIntBetween(0, total); + final int failed = scaledRandomIntBetween(0, total - successful); + final List failures = new ArrayList<>(); + final Set shardIds = new HashSet<>(); + for (int i = 0; i < failed; i++) { + final DefaultShardOperationFailedException failure = new DefaultShardOperationFailedException( + index, + randomValueOtherThanMany(shardIds::contains, () -> randomIntBetween(0, total - 1)), + new RetentionLeaseNotFoundException(id)); + failures.add(failure); + shardIds.add(failure.shardId()); + } + + final org.elasticsearch.action.support.broadcast.BroadcastResponse to = + new org.elasticsearch.action.support.broadcast.BroadcastResponse(total, successful, failed, failures); + + final XContentType xContentType = randomFrom(XContentType.values()); + final BytesReference bytes = toShuffledXContent(to, xContentType, ToXContent.EMPTY_PARAMS, randomBoolean()); + + final XContent xContent = XContentFactory.xContent(xContentType); + final XContentParser parser = xContent.createParser( + new NamedXContentRegistry(ClusterModule.getNamedXWriteables()), + LoggingDeprecationHandler.INSTANCE, + bytes.streamInput()); + final BroadcastResponse from = BroadcastResponse.fromXContent(parser); + assertThat(from.shards().total(), equalTo(total)); + assertThat(from.shards().successful(), equalTo(successful)); + assertThat(from.shards().skipped(), equalTo(0)); + assertThat(from.shards().failed(), equalTo(failed)); + assertThat(from.shards().failures(), hasSize(failed == 0 ? failed : 1)); // failures are grouped + if (failed > 0) { + final DefaultShardOperationFailedException groupedFailure = from.shards().failures().iterator().next(); + assertThat(groupedFailure.index(), equalTo(index)); + assertThat(groupedFailure.shardId(), isIn(shardIds)); + assertThat(groupedFailure.reason(), containsString("reason=retention lease with ID [" + id + "] not found")); + } + } + +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java index 23cdd39787d32..baf8132096cb8 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java @@ -40,6 +40,7 @@ import org.elasticsearch.client.ccr.FollowInfoResponse; import org.elasticsearch.client.ccr.FollowStatsRequest; import org.elasticsearch.client.ccr.FollowStatsResponse; +import org.elasticsearch.client.ccr.ForgetFollowerRequest; import org.elasticsearch.client.ccr.GetAutoFollowPatternRequest; import org.elasticsearch.client.ccr.GetAutoFollowPatternResponse; import org.elasticsearch.client.ccr.GetAutoFollowPatternResponse.Pattern; @@ -51,15 +52,18 @@ import org.elasticsearch.client.ccr.ResumeFollowRequest; import org.elasticsearch.client.ccr.UnfollowRequest; import org.elasticsearch.client.core.AcknowledgedResponse; +import org.elasticsearch.client.core.BroadcastResponse; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.test.rest.yaml.ObjectPath; import org.junit.Before; import java.io.IOException; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -395,6 +399,101 @@ public void onFailure(Exception e) { assertTrue(latch.await(30L, TimeUnit.SECONDS)); } + public void testForgetFollower() throws InterruptedException, IOException { + final RestHighLevelClient client = highLevelClient(); + final String leaderIndex = "leader"; + { + // create leader index + final CreateIndexRequest createIndexRequest = new CreateIndexRequest(leaderIndex); + final Map settings = new HashMap<>(2); + final int numberOfShards = randomIntBetween(1, 2); + settings.put("index.number_of_shards", Integer.toString(numberOfShards)); + settings.put("index.soft_deletes.enabled", Boolean.TRUE.toString()); + createIndexRequest.settings(settings); + final CreateIndexResponse response = client.indices().create(createIndexRequest, RequestOptions.DEFAULT); + assertThat(response.isAcknowledged(), is(true)); + } + final String followerIndex = "follower"; + + final PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followerIndex, ActiveShardCount.ONE); + final PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT); + assertTrue(putFollowResponse.isFollowIndexCreated()); + assertTrue((putFollowResponse.isFollowIndexShardsAcked())); + assertTrue(putFollowResponse.isIndexFollowingStarted()); + + final PauseFollowRequest pauseFollowRequest = new PauseFollowRequest("follower"); + AcknowledgedResponse pauseFollowResponse = client.ccr().pauseFollow(pauseFollowRequest, RequestOptions.DEFAULT); + assertTrue(pauseFollowResponse.isAcknowledged()); + + final String followerCluster = highLevelClient().info(RequestOptions.DEFAULT).getClusterName().value(); + final Request statsRequest = new Request("GET", "/follower/_stats"); + final Response statsResponse = client().performRequest(statsRequest); + final ObjectPath statsObjectPath = ObjectPath.createFromResponse(statsResponse); + final String followerIndexUUID = statsObjectPath.evaluate("indices.follower.uuid"); + + final String leaderCluster = "local"; + + // tag::ccr-forget-follower-request + final ForgetFollowerRequest request = new ForgetFollowerRequest( + followerCluster, // <1> + followerIndex, // <2> + followerIndexUUID, // <3> + leaderCluster, // <4> + leaderIndex); // <5> + // end::ccr-forget-follower-request + + // tag::ccr-forget-follower-execute + final BroadcastResponse response = client + .ccr() + .forgetFollower(request, RequestOptions.DEFAULT); + // end::ccr-forget-follower-execute + + // tag::ccr-forget-follower-response + final BroadcastResponse.Shards shards = response.shards(); // <1> + final int total = shards.total(); // <2> + final int successful = shards.successful(); // <3> + final int skipped = shards.skipped(); // <4> + final int failed = shards.failed(); // <5> + shards.failures().forEach(failure -> {}); // <6> + // end::ccr-forget-follower-response + + // tag::ccr-forget-follower-execute-listener + ActionListener listener = + new ActionListener() { + + @Override + public void onResponse(final BroadcastResponse response) { + final BroadcastResponse.Shards shards = // <1> + response.shards(); + final int total = shards.total(); + final int successful = shards.successful(); + final int skipped = shards.skipped(); + final int failed = shards.failed(); + shards.failures().forEach(failure -> {}); + } + + @Override + public void onFailure(final Exception e) { + // <2> + } + + }; + // end::ccr-forget-follower-execute-listener + + // replace the empty listener by a blocking listener in test + final CountDownLatch latch = new CountDownLatch(1); + listener = new LatchedActionListener<>(listener, latch); + + // tag::ccr-forget-follower-execute-async + client.ccr().forgetFollowerAsync( + request, + RequestOptions.DEFAULT, + listener); // <1> + // end::ccr-forget-follower-execute-async + + assertTrue(latch.await(30L, TimeUnit.SECONDS)); + } + public void testPutAutoFollowPattern() throws Exception { RestHighLevelClient client = highLevelClient(); diff --git a/docs/java-rest/high-level/ccr/forget_follower.asciidoc b/docs/java-rest/high-level/ccr/forget_follower.asciidoc new file mode 100644 index 0000000000000..bf1fde014b8e6 --- /dev/null +++ b/docs/java-rest/high-level/ccr/forget_follower.asciidoc @@ -0,0 +1,45 @@ +-- +:api: ccr-forget-follower +:request: ForgetFollowerRequest +:response: BroadcastResponse +-- + +[id="{upid}-{api}"] +=== Forget Follower API + +[id="{upid}-{api}-request"] +==== Request + +The Forget Follower API allows you to manually remove the follower retention +leases from the leader. Note that these retention leases are automatically +managed by the following index. This API exists only for cases when invoking +the unfollow API on the follower index is unable to remove the follower +retention leases. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests-file}[{api}-request] +-------------------------------------------------- +<1> The name of the cluster containing the follower index. +<2> The name of the follower index. +<3> The UUID of the follower index (can be obtained from index stats). +<4> The alias of the remote cluster containing the leader index. +<5> The name of the leader index. + +[id="{upid}-{api}-response"] +==== Response + +The returned +{response}+ indicates if the response was successful. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests-file}[{api}-response] +-------------------------------------------------- +<1> The high-level shards summary. +<2> The total number of shards the request was executed on. +<3> The total number of shards the request was successful on. +<4> The total number of shards the request was skipped on (should always be zero). +<5> The total number of shards the request failed on. +<6> The shard-level failures. + +include::../execution.asciidoc[] diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc index 61e7bf313a758..a5428845a8273 100644 --- a/docs/java-rest/high-level/supported-apis.asciidoc +++ b/docs/java-rest/high-level/supported-apis.asciidoc @@ -501,6 +501,7 @@ The Java High Level REST Client supports the following CCR APIs: * <<{upid}-ccr-pause-follow>> * <<{upid}-ccr-resume-follow>> * <<{upid}-ccr-unfollow>> +* <<{upid}-ccr-forget-follower>> * <<{upid}-ccr-put-auto-follow-pattern>> * <<{upid}-ccr-delete-auto-follow-pattern>> * <<{upid}-ccr-get-auto-follow-pattern>> @@ -512,6 +513,7 @@ include::ccr/put_follow.asciidoc[] include::ccr/pause_follow.asciidoc[] include::ccr/resume_follow.asciidoc[] include::ccr/unfollow.asciidoc[] +include::ccr/forget_follower.asciidoc[] include::ccr/put_auto_follow_pattern.asciidoc[] include::ccr/delete_auto_follow_pattern.asciidoc[] include::ccr/get_auto_follow_pattern.asciidoc[] diff --git a/docs/reference/aggregations/bucket/daterange-aggregation.asciidoc b/docs/reference/aggregations/bucket/daterange-aggregation.asciidoc index 5cf9865501a00..6d5f0389cd770 100644 --- a/docs/reference/aggregations/bucket/daterange-aggregation.asciidoc +++ b/docs/reference/aggregations/bucket/daterange-aggregation.asciidoc @@ -209,7 +209,7 @@ POST /sales/_search?size=0 // CONSOLE // TEST[setup:sales] -<1> This date will be converted to `2016-02-15T00:00:00.000+01:00`. +<1> This date will be converted to `2016-02-01T00:00:00.000+01:00`. <2> `now/d` will be rounded to the beginning of the day in the CET time zone. ==== Keyed Response diff --git a/docs/reference/analysis/tokenfilters/minhash-tokenfilter.asciidoc b/docs/reference/analysis/tokenfilters/minhash-tokenfilter.asciidoc index eb6a4d820ef1b..21c7387e0f7f5 100644 --- a/docs/reference/analysis/tokenfilters/minhash-tokenfilter.asciidoc +++ b/docs/reference/analysis/tokenfilters/minhash-tokenfilter.asciidoc @@ -1,7 +1,7 @@ [[analysis-minhash-tokenfilter]] -=== Minhash Token Filter +=== MinHash Token Filter -A token filter of type `min_hash` hashes each token of the token stream and divides +The `min_hash` token filter hashes each token of the token stream and divides the resulting hashes into buckets, keeping the lowest-valued hashes per bucket. It then returns these hashes as tokens. @@ -20,3 +20,120 @@ The following are settings that can be set for a `min_hash` token filter. bucket to its circular right. Only takes effect if hash_set_size is equal to one. Defaults to `true` if bucket_count is greater than one, else `false`. |======================================================================= + +Some points to consider while setting up a `min_hash` filter: + +* `min_hash` filter input tokens should typically be k-words shingles produced +from <>. You should +choose `k` large enough so that the probability of any given shingle +occurring in a document is low. At the same time, as +internally each shingle is hashed into to 128-bit hash, you should choose +`k` small enough so that all possible +different k-words shingles can be hashed to 128-bit hash with +minimal collision. 5-word shingles typically work well. + +* choosing the right settings for `hash_count`, `bucket_count` and +`hash_set_size` needs some experimentation. +** to improve the precision, you should increase `bucket_count` or +`hash_set_size`. Higher values of `bucket_count` or `hash_set_size` +will provide a higher guarantee that different tokens are +indexed to different buckets. +** to improve the recall, +you should increase `hash_token` parameter. For example, +setting `hash_count=2`, will make each token to be hashed in +two different ways, thus increasing the number of potential +candidates for search. + +* the default settings makes the `min_hash` filter to produce for +each document 512 `min_hash` tokens, each is of size 16 bytes. +Thus, each document's size will be increased by around 8Kb. + +* `min_hash` filter is used to hash for Jaccard similarity. This means +that it doesn't matter how many times a document contains a certain token, +only that if it contains it or not. + +==== Theory +MinHash token filter allows you to hash documents for similarity search. +Similarity search, or nearest neighbor search is a complex problem. +A naive solution requires an exhaustive pairwise comparison between a query +document and every document in an index. This is a prohibitive operation +if the index is large. A number of approximate nearest neighbor search +solutions have been developed to make similarity search more practical and +computationally feasible. One of these solutions involves hashing of documents. + +Documents are hashed in a way that similar documents are more likely +to produce the same hash code and are put into the same hash bucket, +while dissimilar documents are more likely to be hashed into +different hash buckets. This type of hashing is known as +locality sensitive hashing (LSH). + +Depending on what constitutes the similarity between documents, +various LSH functions https://arxiv.org/abs/1408.2927[have been proposed]. +For https://en.wikipedia.org/wiki/Jaccard_index[Jaccard similarity], a popular +LSH function is https://en.wikipedia.org/wiki/MinHash[MinHash]. +A general idea of the way MinHash produces a signature for a document +is by applying a random permutation over the whole index vocabulary (random +numbering for the vocabulary), and recording the minimum value for this permutation +for the document (the minimum number for a vocabulary word that is present +in the document). The permutations are run several times; +combining the minimum values for all of them will constitute a +signature for the document. + +In practice, instead of random permutations, a number of hash functions +are chosen. A hash function calculates a hash code for each of a +document's tokens and chooses the minimum hash code among them. +The minimum hash codes from all hash functions are combined +to form a signature for the document. + + +==== Example of setting MinHash Token Filter in Elasticsearch +Here is an example of setting up a `min_hash` filter: + +[source,js] +-------------------------------------------------- +POST /index1 +{ + "settings": { + "analysis": { + "filter": { + "my_shingle_filter": { <1> + "type": "shingle", + "min_shingle_size": 5, + "max_shingle_size": 5, + "output_unigrams": false + }, + "my_minhash_filter": { + "type": "min_hash", + "hash_count": 1, <2> + "bucket_count": 512, <3> + "hash_set_size": 1, <4> + "with_rotation": true <5> + } + }, + "analyzer": { + "my_analyzer": { + "tokenizer": "standard", + "filter": [ + "my_shingle_filter", + "my_minhash_filter" + ] + } + } + } + }, + "mappings": { + "properties": { + "text": { + "fingerprint": "text", + "analyzer": "my_analyzer" + } + } + } +} +-------------------------------------------------- +// NOTCONSOLE +<1> setting a shingle filter with 5-word shingles +<2> setting min_hash filter to hash with 1 hash +<3> setting min_hash filter to hash tokens into 512 buckets +<4> setting min_hash filter to keep only a single smallest hash in each bucket +<5> setting min_hash filter to fill empty buckets with values from neighboring buckets diff --git a/docs/reference/ccr/apis/ccr-apis.asciidoc b/docs/reference/ccr/apis/ccr-apis.asciidoc index c7c5194790360..3a745f239867d 100644 --- a/docs/reference/ccr/apis/ccr-apis.asciidoc +++ b/docs/reference/ccr/apis/ccr-apis.asciidoc @@ -19,6 +19,7 @@ You can use the following APIs to perform {ccr} operations. * <> * <> * <> +* <> * <> * <> @@ -38,6 +39,7 @@ include::follow/put-follow.asciidoc[] include::follow/post-pause-follow.asciidoc[] include::follow/post-resume-follow.asciidoc[] include::follow/post-unfollow.asciidoc[] +include::follow/post-forget-follower.asciidoc[] include::follow/get-follow-stats.asciidoc[] include::follow/get-follow-info.asciidoc[] diff --git a/docs/reference/ccr/apis/follow/post-forget-follower.asciidoc b/docs/reference/ccr/apis/follow/post-forget-follower.asciidoc new file mode 100644 index 0000000000000..5d5fb6a218449 --- /dev/null +++ b/docs/reference/ccr/apis/follow/post-forget-follower.asciidoc @@ -0,0 +1,152 @@ +[role="xpack"] +[testenv="platinum"] +[[ccr-post-forget-follower]] +=== Forget Follower API +++++ +Forget Follower +++++ + +Removes the follower retention leases from the leader. + +==== Description + +A following index takes out retention leases on its leader index. These +retention leases are used to increase the likelihood that the shards of the +leader index retain the history of operations that the shards of the following +index need to execute replication. When a follower index is converted to a +regular index via the <> (either via explicit +execution of this API, or implicitly via {ilm}), these retention leases are +removed. However, removing these retention leases can fail (e.g., if the remote +cluster containing the leader index is unavailable). While these retention +leases will eventually expire on their own, their extended existence can cause +the leader index to hold more history than necessary, and prevent {ilm} from +performing some operations on the leader index. This API exists to enable +manually removing these retention leases when the unfollow API was unable to do +so. + +NOTE: This API does not stop replication by a following index. If you use this +API targeting a follower index that is still actively following, the following +index will add back retention leases on the leader. The only purpose of this API +is to handle the case of failure to remove the following retention leases after +the <> is invoked. + +==== Request + +////////////////////////// + +[source,js] +-------------------------------------------------- +PUT /follower_index/_ccr/follow?wait_for_active_shards=1 +{ + "remote_cluster" : "remote_cluster", + "leader_index" : "leader_index" +} +-------------------------------------------------- +// CONSOLE +// TESTSETUP +// TEST[setup:remote_cluster_and_leader_index] + +[source,js] +-------------------------------------------------- +POST /follower_index/_ccr/pause_follow +-------------------------------------------------- +// CONSOLE +// TEARDOWN + +////////////////////////// + +[source,js] +-------------------------------------------------- +POST //_ccr/forget_follower +{ + "follower_cluster" : "", + "follower_index" : "", + "follower_index_uuid" : "", + "leader_remote_cluster" : "" +} +-------------------------------------------------- +// CONSOLE +// TEST[s//leader_index/] +// TEST[s//follower_cluster/] +// TEST[s//follower_index/] +// TEST[s//follower_index_uuid/] +// TEST[s//leader_remote_cluster/] +// TEST[skip_shard_failures] + +[source,js] +-------------------------------------------------- +{ + "_shards" : { + "total" : 1, + "successful" : 1, + "failed" : 0, + "failures" : [ ] + } +} +-------------------------------------------------- +// TESTRESPONSE[s/"total" : 1/"total" : $body._shards.total/] +// TESTRESPONSE[s/"successful" : 1/"successful" : $body._shards.successful/] +// TESTRESPONSE[s/"failed" : 0/"failed" : $body._shards.failed/] +// TESTRESPONSE[s/"failures" : \[ \]/"failures" : $body._shards.failures/] + +==== Path Parameters + +`leader_index` (required):: + (string) the name of the leader index + +==== Request Body +`follower_cluster` (required):: + (string) the name of the cluster containing the follower index + +`follower_index` (required):: + (string) the name of the follower index + +`follower_index_uuid` (required):: + (string) the UUID of the follower index + +`leader_remote_cluster` (required):: + (string) the alias (from the perspective of the cluster containing the + follower index) of the <> containing + the leader index + +==== Authorization + +If the {es} {security-features} are enabled, you must have `manage_leader_index` +index privileges for the leader index. For more information, see +{stack-ov}/security-privileges.html[Security privileges]. + +==== Example + +This example removes the follower retention leases for `follower_index` from +`leader_index`. + +[source,js] +-------------------------------------------------- +POST /leader_index/_ccr/forget_follower +{ + "follower_cluster" : "", + "follower_index" : "follower_index", + "follower_index_uuid" : "", + "leader_remote_cluster" : "" +} +-------------------------------------------------- +// CONSOLE +// TEST[skip_shard_failures] + +The API returns the following result: + +[source,js] +-------------------------------------------------- +{ + "_shards" : { + "total" : 1, + "successful" : 1, + "failed" : 0, + "failures" : [ ] + } +} +-------------------------------------------------- +// TESTRESPONSE[s/"total" : 1/"total" : $body._shards.total/] +// TESTRESPONSE[s/"successful" : 1/"successful" : $body._shards.successful/] +// TESTRESPONSE[s/"failed" : 0/"failed" : $body._shards.failed/] +// TESTRESPONSE[s/"failures" : \[ \]/"failures" : $body._shards.failures/] diff --git a/docs/reference/migration/migrate_6_7.asciidoc b/docs/reference/migration/migrate_6_7.asciidoc index 13b7d4b2cbc91..6e25291bc03d4 100644 --- a/docs/reference/migration/migrate_6_7.asciidoc +++ b/docs/reference/migration/migrate_6_7.asciidoc @@ -43,6 +43,24 @@ will result in an error. Additionally, there are two minor breaking changes here - `plugin.mandatory` is no longer compatible with `ingest-geoip` nor `ingest-user-agent` +Elasticsearch 6.7.0 checks that there are no leftover geoip database files in +the plugin configuration directory because the new module does not use them. +Therefore, remove the `ingest-geoip` plugin prior to upgrading to 6.7.0 with +the `--purge` option to also delete the old database files: + +[source,sh] +------------------------------------------------------ +./bin/elasticsearch-plugin remove --purge ingest-geoip +------------------------------------------------------ + +Otherwise you will see the following error message upon startup (assuming +`/etc/elasticsearch/ingest-geoip` as the plugin configuration directory): + +[source,text] +--------------------------------------------------------------------------------------- +expected database [GeoLite2-ASN.mmdb] to not exist in [/etc/elasticsearch/ingest-geoip] +--------------------------------------------------------------------------------------- + [float] [[breaking_67_settings_changes]] diff --git a/docs/reference/release-notes/6.6.asciidoc b/docs/reference/release-notes/6.6.asciidoc index a0f3c14399a87..0135a51022179 100644 --- a/docs/reference/release-notes/6.6.asciidoc +++ b/docs/reference/release-notes/6.6.asciidoc @@ -1,3 +1,57 @@ +[[release-notes-6.6.2]] +== {es} version 6.6.1 + +coming[6.6.2] + +Also see <>. + +[[breaking-6.6.2]] +[float] +=== Breaking changes + +Authentication:: +* Disable BWC mode in TokenService by default {pull}38881[#38881] + + + +[[enhancement-6.6.2]] +[float] +=== Enhancements + +SQL:: +* SQL: Enhance checks for inexact fields {pull}39427[#39427] (issue: {issue}38501[#38501]) +* SQL: add "validate.properties" property to JDBC's allowed list of settings {pull}39050[#39050] (issue: {issue}38068[#38068]) + + + +[[bug-6.6.2]] +[float] +=== Bug fixes + +Authentication:: +* Use consistent view of realms for authentication {pull}38815[#38815] (issue: {issue}30301[#30301]) + +Engine:: +* Bubble up exception when processing NoOp {pull}39338[#39338] (issue: {issue}38898[#38898]) +* Advance max_seq_no before add operation to Lucene {pull}38879[#38879] (issue: {issue}31629[#31629]) + +Features/Watcher:: +* Only flush Watcher's bulk processor if Watcher is enabled {pull}38803[#38803] (issue: {issue}38798[#38798]) + +Machine Learning:: +* [ML] Stop the ML memory tracker before closing node {pull}39111[#39111] (issue: {issue}37117[#37117]) + +SQL:: +* SQL: Fix merging of incompatible multi-fields {pull}39560[#39560] (issue: {issue}39547[#39547]) +* SQL: ignore UNSUPPORTED fields for JDBC and ODBC modes in 'SYS COLUMNS' {pull}39518[#39518] (issue: {issue}39471[#39471]) +* SQL: Use underlying exact field for LIKE/RLIKE {pull}39443[#39443] (issue: {issue}39442[#39442]) +* SQL: enforce JDBC driver - ES server version parity {pull}38972[#38972] (issue: {issue}38775[#38775]) +* SQL: fall back to using the field name for column label {pull}38842[#38842] (issue: {issue}38831[#38831]) +* SQL: normalized keywords shouldn't be allowed for groupings and sorting [ISSUE] {pull}35203[#35203] + +Search:: +* Fix Fuzziness#asDistance(String) {pull}39643[#39643] (issue: {issue}39614[#39614]) +* Fix simple query string serialization conditional {pull}38960[#38960] (issues: {issue}21504[#21504], {issue}38889[#38889]) [[release-notes-6.6.1]] == {es} version 6.6.1 diff --git a/docs/reference/release-notes/6.7.asciidoc b/docs/reference/release-notes/6.7.asciidoc index 21bd757348673..46d686d73818c 100644 --- a/docs/reference/release-notes/6.7.asciidoc +++ b/docs/reference/release-notes/6.7.asciidoc @@ -376,6 +376,7 @@ Audit:: * Fix NPE in Logfile Audit Filter {pull}38120[#38120] (issue: {issue}38097[#38097]) Authentication:: +* Fix security index auto-create and state recovery race {pull}39582[#39582] * Use consistent view of realms for authentication {pull}38815[#38815] (issue: {issue}30301[#30301]) * Enhance parsing of StatusCode in SAML Responses {pull}38628[#38628] * Limit token expiry to 1 hour maximum {pull}38244[#38244] @@ -403,6 +404,8 @@ CRUD:: * Fix Reindex from remote query logic {pull}36908[#36908] Distributed:: +* Use cause to determine if node with primary is closing {pull}39723[#39723] (issue: {issue}39584[#39584]) +* Don’t ack if unable to remove failing replica {pull}39584[#39584] (issue: {issue}39467[#39467]) * Ignore waitForActiveShards when syncing leases {pull}39224[#39224] (issue: {issue}39089[#39089]) * Fix synchronization in LocalCheckpointTracker#contains {pull}38755[#38755] (issues: {issue}33871[#33871], {issue}38633[#38633]) * TransportVerifyShardBeforeCloseAction should force a flush {pull}38401[#38401] (issues: {issue}33888[#33888], {issue}37961[#37961]) @@ -417,6 +420,7 @@ Engine:: * Advance max_seq_no before add operation to Lucene {pull}38879[#38879] (issue: {issue}31629[#31629]) Features/Features:: +* Check for .watches that wasn't upgraded properly {pull}39609[#39609] * Link to 7.0 documentation in deprecation checks {pull}39194[#39194] * Handle Null in FetchSourceContext#fetchSource {pull}36839[#36839] (issue: {issue}29293[#29293]) @@ -454,12 +458,16 @@ Features/Monitoring:: * Allow built-in monitoring_user role to call GET _xpack API {pull}38060[#38060] (issue: {issue}37970[#37970]) Features/Watcher:: +* Use any index specified by .watches for Watcher {pull}39541[#39541] (issue: {issue}39478[#39478]) * Resolve concurrency with watcher trigger service {pull}39092[#39092] (issue: {issue}39087[#39087]) * Only flush Watcher's bulk processor if Watcher is enabled {pull}38803[#38803] (issue: {issue}38798[#38798]) Geo:: * Geo: Do not normalize the longitude with value -180 for Lucene shapes {pull}37299[#37299] (issue: {issue}37297[#37297]) +Highlighting:: +* Bug fix for AnnotatedTextHighlighter {pull}39525[#39525] (issue: {issue}39395[#39395]) + Infra/Core:: * Correct name of basic_date_time_no_millis {pull}39367[#39367] * Fix DateFormatters.parseMillis when no timezone is given {pull}39100[#39100] (issue: {issue}39067[#39067]) @@ -514,6 +522,7 @@ Recovery:: * RecoveryMonitor#lastSeenAccessTime should be volatile {pull}36781[#36781] SQL:: +* SQL: Don't allow inexact fields for MIN/MAX {pull}39563[#39563] (issue: {issue}39427[#39427]) * SQL: Fix merging of incompatible multi-fields {pull}39560[#39560] (issue: {issue}39547[#39547]) * SQL: fix COUNT DISTINCT column name {pull}39537[#39537] (issue: {issue}39511[#39511]) * SQL: ignore UNSUPPORTED fields for JDBC and ODBC modes in 'SYS COLUMNS' {pull}39518[#39518] (issue: {issue}39471[#39471]) @@ -544,6 +553,7 @@ SQL:: * SQL: normalized keywords shouldn't be allowed for groupings and sorting [ISSUE] {pull}35203[#35203] Search:: +* Fix Fuzziness#asDistance(String) {pull}39643[#39643] (issue: {issue}39614[#39614]) * Fix simple query string serialization conditional {pull}38960[#38960] (issues: {issue}21504[#21504], {issue}38889[#38889]) * Ensure that maxConcurrentShardRequests is never defaulted to 0 {pull}38734[#38734] * Look up connection using the right cluster alias when releasing contexts {pull}38570[#38570] @@ -570,6 +580,7 @@ Task Management:: * Un-assign persistent tasks as nodes exit the cluster {pull}37656[#37656] ZenDiscovery:: +* Fixing the custom object serialization bug in diffable utils. {pull}39544[#39544] * Always return metadata version if metadata is requested {pull}37674[#37674] diff --git a/plugins/mapper-annotated-text/src/main/java/org/elasticsearch/index/mapper/annotatedtext/AnnotatedTextFieldMapper.java b/plugins/mapper-annotated-text/src/main/java/org/elasticsearch/index/mapper/annotatedtext/AnnotatedTextFieldMapper.java index a4a58d0c9946a..835003521f2d9 100644 --- a/plugins/mapper-annotated-text/src/main/java/org/elasticsearch/index/mapper/annotatedtext/AnnotatedTextFieldMapper.java +++ b/plugins/mapper-annotated-text/src/main/java/org/elasticsearch/index/mapper/annotatedtext/AnnotatedTextFieldMapper.java @@ -57,6 +57,7 @@ import org.elasticsearch.index.mapper.TextFieldMapper; import org.elasticsearch.index.mapper.annotatedtext.AnnotatedTextFieldMapper.AnnotatedText.AnnotationToken; import org.elasticsearch.index.query.QueryShardContext; +import org.elasticsearch.search.fetch.FetchSubPhase.HitContext; import java.io.IOException; import java.io.Reader; @@ -317,46 +318,13 @@ public AnnotationToken getAnnotation(int index) { // When asked to tokenize plain-text versions by the highlighter it tokenizes the // original markup form in order to inject annotations. public static final class AnnotatedHighlighterAnalyzer extends AnalyzerWrapper { - private Analyzer delegate; - private AnnotatedText[] annotations; - public AnnotatedHighlighterAnalyzer(Analyzer delegate){ + private final Analyzer delegate; + private final HitContext hitContext; + public AnnotatedHighlighterAnalyzer(Analyzer delegate, HitContext hitContext){ super(delegate.getReuseStrategy()); this.delegate = delegate; + this.hitContext = hitContext; } - - public void init(String[] markedUpFieldValues) { - this.annotations = new AnnotatedText[markedUpFieldValues.length]; - for (int i = 0; i < markedUpFieldValues.length; i++) { - annotations[i] = AnnotatedText.parse(markedUpFieldValues[i]); - } - } - - public String [] getPlainTextValuesForHighlighter(){ - String [] result = new String[annotations.length]; - for (int i = 0; i < annotations.length; i++) { - result[i] = annotations[i].textMinusMarkup; - } - return result; - } - - public AnnotationToken[] getIntersectingAnnotations(int start, int end) { - List intersectingAnnotations = new ArrayList<>(); - int fieldValueOffset =0; - for (AnnotatedText fieldValueAnnotations : this.annotations) { - //This is called from a highlighter where all of the field values are concatenated - // so each annotation offset will need to be adjusted so that it takes into account - // the previous values AND the MULTIVAL delimiter - for (AnnotationToken token : fieldValueAnnotations.annotations) { - if(token.intersects(start - fieldValueOffset , end - fieldValueOffset)) { - intersectingAnnotations.add(new AnnotationToken(token.offset + fieldValueOffset, - token.endOffset + fieldValueOffset, token.value)); - } - } - //add 1 for the fieldvalue separator character - fieldValueOffset +=fieldValueAnnotations.textMinusMarkup.length() +1; - } - return intersectingAnnotations.toArray(new AnnotationToken[intersectingAnnotations.size()]); - } @Override public Analyzer getWrappedAnalyzer(String fieldName) { @@ -370,7 +338,8 @@ protected TokenStreamComponents wrapComponents(String fieldName, TokenStreamComp return components; } AnnotationsInjector injector = new AnnotationsInjector(components.getTokenStream()); - return new AnnotatedHighlighterTokenStreamComponents(components.getTokenizer(), injector, this.annotations); + AnnotatedText[] annotations = (AnnotatedText[]) hitContext.cache().get(AnnotatedText.class.getName()); + return new AnnotatedHighlighterTokenStreamComponents(components.getTokenizer(), injector, annotations); } } private static final class AnnotatedHighlighterTokenStreamComponents extends TokenStreamComponents{ diff --git a/plugins/mapper-annotated-text/src/main/java/org/elasticsearch/search/fetch/subphase/highlight/AnnotatedPassageFormatter.java b/plugins/mapper-annotated-text/src/main/java/org/elasticsearch/search/fetch/subphase/highlight/AnnotatedPassageFormatter.java index ad1acc85031dd..7d360dd0b9bac 100644 --- a/plugins/mapper-annotated-text/src/main/java/org/elasticsearch/search/fetch/subphase/highlight/AnnotatedPassageFormatter.java +++ b/plugins/mapper-annotated-text/src/main/java/org/elasticsearch/search/fetch/subphase/highlight/AnnotatedPassageFormatter.java @@ -23,7 +23,7 @@ import org.apache.lucene.search.uhighlight.Passage; import org.apache.lucene.search.uhighlight.PassageFormatter; import org.apache.lucene.search.uhighlight.Snippet; -import org.elasticsearch.index.mapper.annotatedtext.AnnotatedTextFieldMapper.AnnotatedHighlighterAnalyzer; +import org.elasticsearch.index.mapper.annotatedtext.AnnotatedTextFieldMapper.AnnotatedText; import org.elasticsearch.index.mapper.annotatedtext.AnnotatedTextFieldMapper.AnnotatedText.AnnotationToken; import java.io.UnsupportedEncodingException; @@ -42,11 +42,11 @@ public class AnnotatedPassageFormatter extends PassageFormatter { public static final String SEARCH_HIT_TYPE = "_hit_term"; private final Encoder encoder; - private AnnotatedHighlighterAnalyzer annotatedHighlighterAnalyzer; + AnnotatedText[] annotations; - public AnnotatedPassageFormatter(AnnotatedHighlighterAnalyzer annotatedHighlighterAnalyzer, Encoder encoder) { - this.annotatedHighlighterAnalyzer = annotatedHighlighterAnalyzer; + public AnnotatedPassageFormatter(AnnotatedText[] annotations, Encoder encoder) { this.encoder = encoder; + this.annotations = annotations; } static class MarkupPassage { @@ -158,7 +158,7 @@ public Snippet[] format(Passage[] passages, String content) { int pos; int j = 0; for (Passage passage : passages) { - AnnotationToken [] annotations = annotatedHighlighterAnalyzer.getIntersectingAnnotations(passage.getStartOffset(), + AnnotationToken [] annotations = getIntersectingAnnotations(passage.getStartOffset(), passage.getEndOffset()); MarkupPassage mergedMarkup = mergeAnnotations(annotations, passage); @@ -194,6 +194,27 @@ public Snippet[] format(Passage[] passages, String content) { } return snippets; } + + public AnnotationToken[] getIntersectingAnnotations(int start, int end) { + List intersectingAnnotations = new ArrayList<>(); + int fieldValueOffset =0; + for (AnnotatedText fieldValueAnnotations : this.annotations) { + //This is called from a highlighter where all of the field values are concatenated + // so each annotation offset will need to be adjusted so that it takes into account + // the previous values AND the MULTIVAL delimiter + for (int i = 0; i < fieldValueAnnotations.numAnnotations(); i++) { + AnnotationToken token = fieldValueAnnotations.getAnnotation(i); + if (token.intersects(start - fieldValueOffset, end - fieldValueOffset)) { + intersectingAnnotations + .add(new AnnotationToken(token.offset + fieldValueOffset, token.endOffset + + fieldValueOffset, token.value)); + } + } + //add 1 for the fieldvalue separator character + fieldValueOffset +=fieldValueAnnotations.textMinusMarkup.length() +1; + } + return intersectingAnnotations.toArray(new AnnotationToken[intersectingAnnotations.size()]); + } private void append(StringBuilder dest, String content, int start, int end) { dest.append(encoder.encodeText(content.substring(start, end))); diff --git a/plugins/mapper-annotated-text/src/main/java/org/elasticsearch/search/fetch/subphase/highlight/AnnotatedTextHighlighter.java b/plugins/mapper-annotated-text/src/main/java/org/elasticsearch/search/fetch/subphase/highlight/AnnotatedTextHighlighter.java index d93316c78921a..2ba7838b90950 100644 --- a/plugins/mapper-annotated-text/src/main/java/org/elasticsearch/search/fetch/subphase/highlight/AnnotatedTextHighlighter.java +++ b/plugins/mapper-annotated-text/src/main/java/org/elasticsearch/search/fetch/subphase/highlight/AnnotatedTextHighlighter.java @@ -25,24 +25,22 @@ import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.annotatedtext.AnnotatedTextFieldMapper.AnnotatedHighlighterAnalyzer; +import org.elasticsearch.index.mapper.annotatedtext.AnnotatedTextFieldMapper.AnnotatedText; import org.elasticsearch.search.fetch.FetchSubPhase.HitContext; import org.elasticsearch.search.fetch.subphase.highlight.SearchContextHighlight.Field; import org.elasticsearch.search.internal.SearchContext; import java.io.IOException; -import java.util.Arrays; +import java.util.ArrayList; import java.util.List; public class AnnotatedTextHighlighter extends UnifiedHighlighter { public static final String NAME = "annotated"; - - AnnotatedHighlighterAnalyzer annotatedHighlighterAnalyzer = null; @Override - protected Analyzer getAnalyzer(DocumentMapper docMapper, MappedFieldType type) { - annotatedHighlighterAnalyzer = new AnnotatedHighlighterAnalyzer(super.getAnalyzer(docMapper, type)); - return annotatedHighlighterAnalyzer; + protected Analyzer getAnalyzer(DocumentMapper docMapper, MappedFieldType type, HitContext hitContext) { + return new AnnotatedHighlighterAnalyzer(super.getAnalyzer(docMapper, type, hitContext), hitContext); } // Convert the marked-up values held on-disk to plain-text versions for highlighting @@ -51,14 +49,26 @@ protected List loadFieldValues(MappedFieldType fieldType, Field field, S throws IOException { List fieldValues = super.loadFieldValues(fieldType, field, context, hitContext); String[] fieldValuesAsString = fieldValues.toArray(new String[fieldValues.size()]); - annotatedHighlighterAnalyzer.init(fieldValuesAsString); - return Arrays.asList((Object[]) annotatedHighlighterAnalyzer.getPlainTextValuesForHighlighter()); + + AnnotatedText[] annotations = new AnnotatedText[fieldValuesAsString.length]; + for (int i = 0; i < fieldValuesAsString.length; i++) { + annotations[i] = AnnotatedText.parse(fieldValuesAsString[i]); + } + // Store the annotations in the hitContext + hitContext.cache().put(AnnotatedText.class.getName(), annotations); + + ArrayList result = new ArrayList<>(annotations.length); + for (int i = 0; i < annotations.length; i++) { + result.add(annotations[i].textMinusMarkup); + } + return result; } @Override - protected PassageFormatter getPassageFormatter(SearchContextHighlight.Field field, Encoder encoder) { - return new AnnotatedPassageFormatter(annotatedHighlighterAnalyzer, encoder); - + protected PassageFormatter getPassageFormatter(HitContext hitContext,SearchContextHighlight.Field field, Encoder encoder) { + // Retrieve the annotations from the hitContext + AnnotatedText[] annotations = (AnnotatedText[]) hitContext.cache().get(AnnotatedText.class.getName()); + return new AnnotatedPassageFormatter(annotations, encoder); } } diff --git a/plugins/mapper-annotated-text/src/test/java/org/elasticsearch/search/highlight/AnnotatedTextHighlighterTests.java b/plugins/mapper-annotated-text/src/test/java/org/elasticsearch/search/highlight/AnnotatedTextHighlighterTests.java index 1710b46fab115..e462d25426531 100644 --- a/plugins/mapper-annotated-text/src/test/java/org/elasticsearch/search/highlight/AnnotatedTextHighlighterTests.java +++ b/plugins/mapper-annotated-text/src/test/java/org/elasticsearch/search/highlight/AnnotatedTextHighlighterTests.java @@ -40,36 +40,50 @@ import org.apache.lucene.search.highlight.DefaultEncoder; import org.apache.lucene.search.uhighlight.CustomSeparatorBreakIterator; import org.apache.lucene.search.uhighlight.CustomUnifiedHighlighter; -import org.apache.lucene.search.uhighlight.PassageFormatter; import org.apache.lucene.search.uhighlight.Snippet; import org.apache.lucene.search.uhighlight.SplittingBreakIterator; import org.apache.lucene.store.Directory; import org.elasticsearch.common.Strings; import org.elasticsearch.index.mapper.annotatedtext.AnnotatedTextFieldMapper.AnnotatedHighlighterAnalyzer; +import org.elasticsearch.index.mapper.annotatedtext.AnnotatedTextFieldMapper.AnnotatedText; import org.elasticsearch.index.mapper.annotatedtext.AnnotatedTextFieldMapper.AnnotationAnalyzerWrapper; +import org.elasticsearch.search.fetch.FetchSubPhase.HitContext; import org.elasticsearch.search.fetch.subphase.highlight.AnnotatedPassageFormatter; import org.elasticsearch.test.ESTestCase; import java.net.URLEncoder; import java.text.BreakIterator; +import java.util.ArrayList; import java.util.Locale; import static org.apache.lucene.search.uhighlight.CustomUnifiedHighlighter.MULTIVAL_SEP_CHAR; import static org.hamcrest.CoreMatchers.equalTo; public class AnnotatedTextHighlighterTests extends ESTestCase { + private void assertHighlightOneDoc(String fieldName, String []markedUpInputs, Query query, Locale locale, BreakIterator breakIterator, int noMatchSize, String[] expectedPassages) throws Exception { + // Annotated fields wrap the usual analyzer with one that injects extra tokens Analyzer wrapperAnalyzer = new AnnotationAnalyzerWrapper(new StandardAnalyzer()); - AnnotatedHighlighterAnalyzer hiliteAnalyzer = new AnnotatedHighlighterAnalyzer(wrapperAnalyzer); - hiliteAnalyzer.init(markedUpInputs); - PassageFormatter passageFormatter = new AnnotatedPassageFormatter(hiliteAnalyzer,new DefaultEncoder()); - String []plainTextForHighlighter = hiliteAnalyzer.getPlainTextValuesForHighlighter(); + HitContext mockHitContext = new HitContext(); + AnnotatedHighlighterAnalyzer hiliteAnalyzer = new AnnotatedHighlighterAnalyzer(wrapperAnalyzer, mockHitContext); + + AnnotatedText[] annotations = new AnnotatedText[markedUpInputs.length]; + for (int i = 0; i < markedUpInputs.length; i++) { + annotations[i] = AnnotatedText.parse(markedUpInputs[i]); + } + mockHitContext.cache().put(AnnotatedText.class.getName(), annotations); + AnnotatedPassageFormatter passageFormatter = new AnnotatedPassageFormatter(annotations,new DefaultEncoder()); + + ArrayList plainTextForHighlighter = new ArrayList<>(annotations.length); + for (int i = 0; i < annotations.length; i++) { + plainTextForHighlighter.add(annotations[i].textMinusMarkup); + } Directory dir = newDirectory(); IndexWriterConfig iwc = newIndexWriterConfig(wrapperAnalyzer); @@ -94,7 +108,7 @@ private void assertHighlightOneDoc(String fieldName, String []markedUpInputs, iw.close(); TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 1, Sort.INDEXORDER); assertThat(topDocs.totalHits, equalTo(1L)); - String rawValue = Strings.arrayToDelimitedString(plainTextForHighlighter, String.valueOf(MULTIVAL_SEP_CHAR)); + String rawValue = Strings.collectionToDelimitedString(plainTextForHighlighter, String.valueOf(MULTIVAL_SEP_CHAR)); CustomUnifiedHighlighter highlighter = new CustomUnifiedHighlighter(searcher, hiliteAnalyzer, null, passageFormatter, locale, diff --git a/plugins/mapper-annotated-text/src/test/resources/rest-api-spec/test/mapper_annotatedtext/10_basic.yml b/plugins/mapper-annotated-text/src/test/resources/rest-api-spec/test/mapper_annotatedtext/10_basic.yml index d55ee0ff15b9a..f24e2e3d0fc35 100644 --- a/plugins/mapper-annotated-text/src/test/resources/rest-api-spec/test/mapper_annotatedtext/10_basic.yml +++ b/plugins/mapper-annotated-text/src/test/resources/rest-api-spec/test/mapper_annotatedtext/10_basic.yml @@ -42,3 +42,80 @@ body: { "query" : {"term" : { "text" : "quick" } }, "highlight" : { "type" : "annotated", "require_field_match": false, "fields" : { "text" : {} } } } - match: {hits.hits.0.highlight.text.0: "The [quick](_hit_term=quick) brown fox is brown."} + +--- +"issue 39395 thread safety issue -requires multiple calls to reveal": + - skip: + version: " - 6.4.99" + reason: Annotated text type introduced in 6.5.0 + + - do: + indices.create: + index: annotated + body: + settings: + number_of_shards: "5" + number_of_replicas: "0" + mappings: + doc: + properties: + my_field: + type: annotated_text + + - do: + index: + index: annotated + type: doc + id: 1 + body: + "my_field" : "[A](~MARK0&~MARK0) [B](~MARK1)" + - do: + index: + index: annotated + type: doc + id: 2 + body: + "my_field" : "[A](~MARK0) [C](~MARK2)" + refresh: true + - do: + search: + request_cache: false + body: { "query" : {"match_phrase" : { "my_field" : {"query": "~MARK0", "analyzer": "whitespace"} } }, "highlight" : { "type" : "annotated", "fields" : { "my_field" : {} } } } + - match: {_shards.failed: 0} + + - do: + search: + request_cache: false + body: { "query" : {"match_phrase" : { "my_field" : {"query": "~MARK0", "analyzer": "whitespace"} } }, "highlight" : { "type" : "annotated", "fields" : { "my_field" : {} } } } + - match: {_shards.failed: 0} + + - do: + search: + request_cache: false + body: { "query" : {"match_phrase" : { "my_field" : {"query": "~MARK0", "analyzer": "whitespace"} } }, "highlight" : { "type" : "annotated", "fields" : { "my_field" : {} } } } + - match: {_shards.failed: 0} + + - do: + search: + request_cache: false + body: { "query" : {"match_phrase" : { "my_field" : {"query": "~MARK0", "analyzer": "whitespace"} } }, "highlight" : { "type" : "annotated", "fields" : { "my_field" : {} } } } + - match: {_shards.failed: 0} + + - do: + search: + request_cache: false + body: { "query" : {"match_phrase" : { "my_field" : {"query": "~MARK0", "analyzer": "whitespace"} } }, "highlight" : { "type" : "annotated", "fields" : { "my_field" : {} } } } + - match: {_shards.failed: 0} + + - do: + search: + request_cache: false + body: { "query" : {"match_phrase" : { "my_field" : {"query": "~MARK0", "analyzer": "whitespace"} } }, "highlight" : { "type" : "annotated", "fields" : { "my_field" : {} } } } + - match: {_shards.failed: 0} + + - do: + search: + request_cache: false + body: { "query" : {"match_phrase" : { "my_field" : {"query": "~MARK0", "analyzer": "whitespace"} } }, "highlight" : { "type" : "annotated", "fields" : { "my_field" : {} } } } + - match: {_shards.failed: 0} + diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index 9ae7d065dd949..d67c181ae256f 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -46,7 +46,6 @@ import java.io.IOException; import java.util.Objects; -import java.util.function.Consumer; public class TransportVerifyShardBeforeCloseAction extends TransportReplicationAction< TransportVerifyShardBeforeCloseAction.ShardRequest, TransportVerifyShardBeforeCloseAction.ShardRequest, ReplicationResponse> { @@ -130,10 +129,8 @@ class VerifyShardBeforeCloseActionReplicasProxy extends ReplicasProxy { } @Override - public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String allocationId, final Runnable onSuccess, - final Consumer onPrimaryDemoted, final Consumer onIgnoredFailure) { - shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, - createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); + public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String allocationId, final ActionListener listener) { + shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, listener); } } diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index e9a6e7b48152d..3f09f00b9ac1e 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -48,7 +48,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.function.Consumer; import java.util.function.Supplier; public class TransportResyncReplicationAction extends TransportWriteAction onPrimaryDemoted, Consumer onIgnoredFailure) { - shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, false, message, exception, - createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); + public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener listener) { + shardStateAction.remoteShardFailed( + replica.shardId(), replica.allocationId().getId(), primaryTerm, false, message, exception, listener); } } } diff --git a/server/src/main/java/org/elasticsearch/action/support/DefaultShardOperationFailedException.java b/server/src/main/java/org/elasticsearch/action/support/DefaultShardOperationFailedException.java index d297df478a4b8..85d8a2c1a38db 100644 --- a/server/src/main/java/org/elasticsearch/action/support/DefaultShardOperationFailedException.java +++ b/server/src/main/java/org/elasticsearch/action/support/DefaultShardOperationFailedException.java @@ -41,7 +41,7 @@ public class DefaultShardOperationFailedException extends ShardOperationFailedEx private static final String SHARD_ID = "shard"; private static final String REASON = "reason"; - private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "failures", true, arg -> new DefaultShardOperationFailedException((String) arg[0], (int) arg[1] ,(Throwable) arg[2])); static { diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 0da39a593a2c1..d5fe63149db58 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -21,12 +21,14 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.Assertions; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.TransportActions; +import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; @@ -34,7 +36,9 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.transport.TransportException; import java.io.IOException; import java.util.ArrayList; @@ -43,7 +47,6 @@ import java.util.Locale; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; public class ReplicationOperation< Request extends ReplicationRequest, @@ -133,10 +136,7 @@ private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Replica for (String allocationId : replicationGroup.getUnavailableInSyncShards()) { pendingActions.incrementAndGet(); replicasProxy.markShardCopyAsStaleIfNeeded(replicaRequest.shardId(), allocationId, - ReplicationOperation.this::decPendingAndFinishIfNeeded, - ReplicationOperation.this::onPrimaryDemoted, - throwable -> decPendingAndFinishIfNeeded() - ); + ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary)); } } @@ -192,20 +192,33 @@ public void onFailure(Exception replicaException) { shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false)); } String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard); - replicasProxy.failShardIfNeeded(shard, message, - replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded, - ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded()); + replicasProxy.failShardIfNeeded(shard, message, replicaException, + ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary)); } }); } - private void onPrimaryDemoted(Exception demotionFailure) { - String primaryFail = String.format(Locale.ROOT, - "primary shard [%s] was demoted while failing replica shard", - primary.routingEntry()); - // we are no longer the primary, fail ourselves and start over - primary.failShard(primaryFail, demotionFailure); - finishAsFailed(new RetryOnPrimaryException(primary.routingEntry().shardId(), primaryFail, demotionFailure)); + private void onNoLongerPrimary(Exception failure) { + final Throwable cause = ExceptionsHelper.unwrapCause(failure); + final boolean nodeIsClosing = cause instanceof NodeClosedException + || (cause instanceof TransportException && "TransportService is closed stopped can't send request".equals(cause.getMessage())); + final String message; + if (nodeIsClosing) { + message = String.format(Locale.ROOT, + "node with primary [%s] is shutting down while failing replica shard", primary.routingEntry()); + // We prefer not to fail the primary to avoid unnecessary warning log + // when the node with the primary shard is gracefully shutting down. + } else { + if (Assertions.ENABLED) { + if (failure instanceof ShardStateAction.NoLongerPrimaryShardException == false) { + throw new AssertionError("unexpected failure", failure); + } + } + // we are no longer the primary, fail ourselves and start over + message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard", primary.routingEntry()); + primary.failShard(message, failure); + } + finishAsFailed(new RetryOnPrimaryException(primary.routingEntry().shardId(), message, failure)); } /** @@ -365,31 +378,23 @@ void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpo * of active shards. Whether a failure is needed is left up to the * implementation. * - * @param replica shard to fail - * @param message a (short) description of the reason - * @param exception the original exception which caused the ReplicationOperation to request the shard to be failed - * @param onSuccess a callback to call when the shard has been successfully removed from the active set. - * @param onPrimaryDemoted a callback to call when the shard can not be failed because the current primary has been demoted - * by the master. - * @param onIgnoredFailure a callback to call when failing a shard has failed, but it that failure can be safely ignored and the + * @param replica shard to fail + * @param message a (short) description of the reason + * @param exception the original exception which caused the ReplicationOperation to request the shard to be failed + * @param listener a listener that will be notified when the failing shard has been removed from the in-sync set */ - void failShardIfNeeded(ShardRouting replica, String message, Exception exception, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure); + void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener listener); /** * Marks shard copy as stale if needed, removing its allocation id from * the set of in-sync allocation ids. Whether marking as stale is needed * is left up to the implementation. * - * @param shardId shard id - * @param allocationId allocation id to remove from the set of in-sync allocation ids - * @param onSuccess a callback to call when the allocation id has been successfully removed from the in-sync set. - * @param onPrimaryDemoted a callback to call when the request failed because the current primary was already demoted - * by the master. - * @param onIgnoredFailure a callback to call when the request failed, but the failure can be safely ignored. + * @param shardId shard id + * @param allocationId allocation id to remove from the set of in-sync allocation ids + * @param listener a listener that will be notified when the failing shard has been removed from the in-sync set */ - void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure); + void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener listener); } /** diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 5b4004f6ed88e..b19d13ec3b3a3 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -84,7 +84,6 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; import java.util.function.Supplier; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; @@ -1177,47 +1176,21 @@ public void performOn( } @Override - public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, - Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener listener) { // This does not need to fail the shard. The idea is that this // is a non-write operation (something like a refresh or a global // checkpoint sync) and therefore the replica should still be // "alive" if it were to fail. - onSuccess.run(); + listener.onResponse(null); } @Override - public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener listener) { // This does not need to make the shard stale. The idea is that this // is a non-write operation (something like a refresh or a global // checkpoint sync) and therefore the replica should still be // "alive" if it were to be marked as stale. - onSuccess.run(); - } - - protected final ActionListener createShardActionListener(final Runnable onSuccess, - final Consumer onPrimaryDemoted, - final Consumer onIgnoredFailure) { - return new ActionListener() { - @Override - public void onResponse(Void aVoid) { - onSuccess.run(); - } - - @Override - public void onFailure(Exception shardFailedError) { - if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) { - onPrimaryDemoted.accept(shardFailedError); - } else { - // these can occur if the node is shutting down and are okay - // any other exception here is not expected and merits investigation - assert shardFailedError instanceof TransportException || - shardFailedError instanceof NodeClosedException : shardFailedError; - onIgnoredFailure.accept(shardFailedError); - } - } - }; + listener.onResponse(null); } } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index f44694f55d960..4781682437545 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -47,7 +47,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.function.Supplier; /** @@ -376,20 +375,17 @@ class WriteActionReplicasProxy extends ReplicasProxy { } @Override - public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, - Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener listener) { if (TransportActions.isShardNotAvailableException(exception) == false) { logger.warn(new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception); } - shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, true, message, exception, - createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); + shardStateAction.remoteShardFailed( + replica.shardId(), replica.allocationId().getId(), primaryTerm, true, message, exception, listener); } @Override - public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { - shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, - createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener listener) { + shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, listener); } } } diff --git a/server/src/main/java/org/elasticsearch/search/fetch/FetchSubPhase.java b/server/src/main/java/org/elasticsearch/search/fetch/FetchSubPhase.java index 84154926bf665..8a8e4e8d77ff6 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/FetchSubPhase.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/FetchSubPhase.java @@ -74,7 +74,6 @@ public Map cache() { } return cache; } - } /** diff --git a/server/src/main/java/org/elasticsearch/search/fetch/subphase/highlight/UnifiedHighlighter.java b/server/src/main/java/org/elasticsearch/search/fetch/subphase/highlight/UnifiedHighlighter.java index d957300f98dba..e4cbf28486e8e 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/subphase/highlight/UnifiedHighlighter.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/subphase/highlight/UnifiedHighlighter.java @@ -39,6 +39,7 @@ import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.search.fetch.FetchPhaseExecutionException; import org.elasticsearch.search.fetch.FetchSubPhase; +import org.elasticsearch.search.fetch.FetchSubPhase.HitContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.index.IndexSettings; @@ -70,12 +71,13 @@ public HighlightField highlight(HighlighterContext highlighterContext) { int numberOfFragments; try { - final Analyzer analyzer = getAnalyzer(context.mapperService().documentMapper(hitContext.hit().getType()), fieldType); + final Analyzer analyzer = getAnalyzer(context.mapperService().documentMapper(hitContext.hit().getType()), fieldType, + hitContext); List fieldValues = loadFieldValues(fieldType, field, context, hitContext); if (fieldValues.size() == 0) { return null; } - final PassageFormatter passageFormatter = getPassageFormatter(field, encoder); + final PassageFormatter passageFormatter = getPassageFormatter(hitContext, field, encoder); final IndexSearcher searcher = new IndexSearcher(hitContext.reader()); final CustomUnifiedHighlighter highlighter; final String fieldValue = mergeFieldValues(fieldValues, MULTIVAL_SEP_CHAR); @@ -155,14 +157,14 @@ public HighlightField highlight(HighlighterContext highlighterContext) { return null; } - protected PassageFormatter getPassageFormatter(SearchContextHighlight.Field field, Encoder encoder) { + protected PassageFormatter getPassageFormatter(HitContext hitContext, SearchContextHighlight.Field field, Encoder encoder) { CustomPassageFormatter passageFormatter = new CustomPassageFormatter(field.fieldOptions().preTags()[0], field.fieldOptions().postTags()[0], encoder); return passageFormatter; } - protected Analyzer getAnalyzer(DocumentMapper docMapper, MappedFieldType type) { + protected Analyzer getAnalyzer(DocumentMapper docMapper, MappedFieldType type, HitContext hitContext) { return HighlightUtils.getAnalyzer(docMapper, type); } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 116a3f45b0087..48c1356e8659a 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -514,9 +514,6 @@ private void notifyFailedSnapshotShard(Snapshot snapshot, ShardId shardId, Strin void sendSnapshotShardUpdate(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status, DiscoveryNode masterNode) { try { if (masterNode.getVersion().onOrAfter(Version.V_6_1_0)) { - UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status); - transportService.sendRequest(transportService.getLocalNode(), UPDATE_SNAPSHOT_STATUS_ACTION_NAME, request, INSTANCE_SAME); - } else { remoteFailedRequestDeduplicator.executeOnce( new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status), new ActionListener() { @@ -557,6 +554,9 @@ public String executor() { } }) ); + } else { + transportService.sendRequest(masterNode, UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6, + new UpdateSnapshotStatusRequestV6(snapshot, shardId, status), INSTANCE_SAME); } } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", snapshot, status), e); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index 8fa10c4ee26d7..8adb9c2f26b1a 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -21,14 +21,16 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.store.AlreadyClosedException; -import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; @@ -39,7 +41,10 @@ import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.SendRequestTransportException; +import org.elasticsearch.transport.TransportException; import java.util.ArrayList; import java.util.Collections; @@ -51,7 +56,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.function.Supplier; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; @@ -115,10 +119,8 @@ public void testReplication() throws Exception { final TestReplicaProxy replicasProxy = new TestReplicaProxy(primaryTerm, simulatedFailures); final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup); - final TestReplicationOperation op = new TestReplicationOperation(request, - primary, listener, replicasProxy); + final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy); op.execute(); - assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); assertThat(request.processedOnReplicas, equalTo(expectedReplicas)); assertThat(replicasProxy.failedReplicas, equalTo(simulatedFailures.keySet())); @@ -162,7 +164,7 @@ private void addTrackingInfo(IndexShardRoutingTable indexShardRoutingTable, Shar } } - public void testDemotedPrimary() throws Exception { + public void testNoLongerPrimary() throws Exception { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); @@ -198,26 +200,34 @@ public void testDemotedPrimary() throws Exception { Request request = new Request(shardId); PlainActionFuture listener = new PlainActionFuture<>(); final boolean testPrimaryDemotedOnStaleShardCopies = randomBoolean(); + final Exception shardActionFailure; + if (randomBoolean()) { + shardActionFailure = new NodeClosedException(new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT)); + } else if (randomBoolean()) { + shardActionFailure = new SendRequestTransportException( + new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT), "internal:cluster/shard/failure", + new TransportException("TransportService is closed stopped can't send request")); + } else { + shardActionFailure = new ShardStateAction.NoLongerPrimaryShardException(failedReplica.shardId(), "the king is dead"); + } final TestReplicaProxy replicasProxy = new TestReplicaProxy(primaryTerm, expectedFailures) { @Override public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, - Runnable onSuccess, Consumer onPrimaryDemoted, - Consumer onIgnoredFailure) { + ActionListener shardActionListener) { if (testPrimaryDemotedOnStaleShardCopies) { - super.failShardIfNeeded(replica, message, exception, onSuccess, onPrimaryDemoted, onIgnoredFailure); + super.failShardIfNeeded(replica, message, exception, shardActionListener); } else { assertThat(replica, equalTo(failedReplica)); - onPrimaryDemoted.accept(new ElasticsearchException("the king is dead")); + shardActionListener.onFailure(shardActionFailure); } } @Override - public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener shardActionListener) { if (testPrimaryDemotedOnStaleShardCopies) { - onPrimaryDemoted.accept(new ElasticsearchException("the king is dead")); + shardActionListener.onFailure(shardActionFailure); } else { - super.markShardCopyAsStaleIfNeeded(shardId, allocationId, onSuccess, onPrimaryDemoted, onIgnoredFailure); + super.markShardCopyAsStaleIfNeeded(shardId, allocationId, shardActionListener); } } }; @@ -225,6 +235,7 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, R final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup) { @Override public void failShard(String message, Exception exception) { + assertThat(exception, instanceOf(ShardStateAction.NoLongerPrimaryShardException.class)); assertTrue(primaryFailed.compareAndSet(false, true)); } }; @@ -233,7 +244,11 @@ public void failShard(String message, Exception exception) { assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); assertTrue("listener is not marked as done", listener.isDone()); - assertTrue(primaryFailed.get()); + if (shardActionFailure instanceof ShardStateAction.NoLongerPrimaryShardException) { + assertTrue(primaryFailed.get()); + } else { + assertFalse(primaryFailed.get()); + } assertListenerThrows("should throw exception to trigger retry", listener, ReplicationOperation.RetryOnPrimaryException.class); } @@ -594,33 +609,23 @@ public void performOn( } @Override - public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener listener) { if (failedReplicas.add(replica) == false) { fail("replica [" + replica + "] was failed twice"); } if (opFailures.containsKey(replica)) { - if (randomBoolean()) { - onSuccess.run(); - } else { - onIgnoredFailure.accept(new ElasticsearchException("simulated")); - } + listener.onResponse(null); } else { fail("replica [" + replica + "] was failed"); } } @Override - public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener listener) { if (markedAsStaleCopies.add(allocationId) == false) { fail("replica [" + allocationId + "] was marked as stale twice"); } - if (randomBoolean()) { - onSuccess.run(); - } else { - onIgnoredFailure.accept(new ElasticsearchException("simulated")); - } + listener.onResponse(null); } } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 427ff3d3c1e1f..a375b9c7f5ade 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -736,11 +736,9 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { } AtomicReference failure = new AtomicReference<>(); - AtomicReference ignoredFailure = new AtomicReference<>(); AtomicBoolean success = new AtomicBoolean(); proxy.failShardIfNeeded(replica, "test", new ElasticsearchException("simulated"), - () -> success.set(true), failure::set, ignoredFailure::set - ); + ActionListener.wrap(r -> success.set(true), failure::set)); CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear(); // A replication action doesn't not fail the request assertEquals(0, shardFailedRequests.length); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 01a1d1b5a335b..fa77c39bf0617 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -310,11 +310,9 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { } AtomicReference failure = new AtomicReference<>(); - AtomicReference ignoredFailure = new AtomicReference<>(); AtomicBoolean success = new AtomicBoolean(); proxy.failShardIfNeeded(replica, "test", new ElasticsearchException("simulated"), - () -> success.set(true), failure::set, ignoredFailure::set - ); + ActionListener.wrap(r -> success.set(true), failure::set)); CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear(); // A write replication action proxy should fail the shard assertEquals(1, shardFailedRequests.length); @@ -328,8 +326,6 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE); assertTrue(success.get()); assertNull(failure.get()); - assertNull(ignoredFailure.get()); - } else if (randomBoolean()) { // simulate the primary has been demoted transport.handleRemoteError(shardFailedRequest.requestId, @@ -337,15 +333,12 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { "shard-failed-test")); assertFalse(success.get()); assertNotNull(failure.get()); - assertNull(ignoredFailure.get()); - } else { - // simulated an "ignored" exception + // simulated a node closing exception transport.handleRemoteError(shardFailedRequest.requestId, new NodeClosedException(state.nodes().getLocalNode())); assertFalse(success.get()); - assertNull(failure.get()); - assertNotNull(ignoredFailure.get()); + assertNotNull(failure.get()); } } diff --git a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java index ce135eab01d35..205535d34d95d 100644 --- a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java @@ -37,8 +37,12 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.disruption.NetworkDisruption; @@ -52,6 +56,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -65,7 +70,10 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.isIn; import static org.hamcrest.Matchers.isOneOf; import static org.hamcrest.Matchers.not; @@ -432,4 +440,48 @@ public void testIndicesDeleted() throws Exception { assertFalse(client().admin().indices().prepareExists(idxName).get().isExists()); } + public void testRestartNodeWhileIndexing() throws Exception { + startCluster(3); + String index = "restart_while_indexing"; + assertAcked(client().admin().indices().prepareCreate(index).setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, between(1, 2)))); + AtomicBoolean stopped = new AtomicBoolean(); + Thread[] threads = new Thread[between(1, 4)]; + AtomicInteger docID = new AtomicInteger(); + Set ackedDocs = ConcurrentCollections.newConcurrentSet(); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + while (stopped.get() == false && docID.get() < 5000) { + String id = Integer.toString(docID.incrementAndGet()); + try { + IndexResponse response = client().prepareIndex(index, "_doc", id).setSource("{}", XContentType.JSON).get(); + assertThat(response.getResult(), isOneOf(DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED)); + logger.info("--> index id={} seq_no={}", response.getId(), response.getSeqNo()); + ackedDocs.add(response.getId()); + } catch (ElasticsearchException ignore) { + logger.info("--> fail to index id={}", id); + } + } + }); + threads[i].start(); + } + ensureGreen(index); + assertBusy(() -> assertThat(docID.get(), greaterThanOrEqualTo(100))); + internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback()); + ensureGreen(index); + assertBusy(() -> assertThat(docID.get(), greaterThanOrEqualTo(200))); + stopped.set(true); + for (Thread thread : threads) { + thread.join(); + } + ClusterState clusterState = internalCluster().clusterService().state(); + for (ShardRouting shardRouting : clusterState.routingTable().allShards(index)) { + String nodeName = clusterState.nodes().get(shardRouting.currentNodeId()).getName(); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName); + IndexShard shard = indicesService.getShardOrNull(shardRouting.shardId()); + Set docs = IndexShardTestCase.getShardDocUIDs(shard); + assertThat("shard [" + shard.routingEntry() + "] docIds [" + docs + "] vs " + " acked docIds [" + ackedDocs + "]", + ackedDocs, everyItem(isIn(docs))); + } + } } diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index b0a50e789be30..6dd9550701fe3 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.search; import com.carrotsearch.hppc.IntArrayList; + import org.apache.lucene.search.Query; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; @@ -200,7 +201,8 @@ public void testClearIndexDelete() { } public void testCloseSearchContextOnRewriteException() { - createIndex("index"); + // if refresh happens while checking the exception, the subsequent reference count might not match, so we switch it off + createIndex("index", Settings.builder().put("index.refresh_interval", -1).build()); client().prepareIndex("index", "type", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); SearchService service = getInstanceFromNode(SearchService.class); diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index ba0988206a17b..e6357c8a56dac 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -96,7 +96,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.BiFunction; -import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -721,15 +720,12 @@ public void onFailure(Exception e) { } @Override - public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, - Runnable onSuccess, Consumer onPrimaryDemoted, - Consumer onIgnoredFailure) { + public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener listener) { throw new UnsupportedOperationException("failing shard " + replica + " isn't supported. failure: " + message, exception); } @Override - public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener listener) { throw new UnsupportedOperationException("can't mark " + shardId + ", aid [" + allocationId + "] as stale"); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 5a57e8f8e538d..3a6df333400e0 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -36,6 +36,7 @@ import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.WarningsHandler; +import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.PathUtils; @@ -72,6 +73,7 @@ import java.security.cert.CertificateException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -449,6 +451,8 @@ private void wipeCluster() throws Exception { waitForPendingRollupTasks(); } + final Map>> inProgressSnapshots = wipeSnapshots(); + if (preserveIndicesUponCompletion() == false) { // wipe indices try { @@ -489,8 +493,6 @@ private void wipeCluster() throws Exception { } } - wipeSnapshots(); - // wipe cluster settings if (preserveClusterSettings() == false) { wipeClusterSettings(); @@ -499,14 +501,18 @@ private void wipeCluster() throws Exception { if (hasXPack && false == preserveILMPoliciesUponCompletion()) { deleteAllPolicies(); } + + assertTrue("Found in progress snapshots [" + inProgressSnapshots + "].", inProgressSnapshots.isEmpty()); } /** * Wipe fs snapshots we created one by one and all repositories so that the next test can create the repositories fresh and they'll * start empty. There isn't an API to delete all snapshots. There is an API to delete all snapshot repositories but that leaves all of * the snapshots intact in the repository. + * @return Map of repository name to list of snapshots found in unfinished state */ - private void wipeSnapshots() throws IOException { + private Map>> wipeSnapshots() throws IOException { + final Map>> inProgressSnapshots = new HashMap<>(); for (Map.Entry repo : entityAsMap(adminClient.performRequest(new Request("GET", "/_snapshot/_all"))).entrySet()) { String repoName = repo.getKey(); Map repoSpec = (Map) repo.getValue(); @@ -519,6 +525,9 @@ private void wipeSnapshots() throws IOException { for (Object snapshot : snapshots) { Map snapshotInfo = (Map) snapshot; String name = (String) snapshotInfo.get("snapshot"); + if (SnapshotsInProgress.State.valueOf((String) snapshotInfo.get("state")).completed() == false) { + inProgressSnapshots.computeIfAbsent(repoName, key -> new ArrayList<>()).add(snapshotInfo); + } logger.debug("wiping snapshot [{}/{}]", repoName, name); adminClient().performRequest(new Request("DELETE", "/_snapshot/" + repoName + "/" + name)); } @@ -528,6 +537,7 @@ private void wipeSnapshots() throws IOException { adminClient().performRequest(new Request("DELETE", "_snapshot/" + repoName)); } } + return inProgressSnapshots; } /** diff --git a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/forget_follower.yml b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/forget_follower.yml new file mode 100644 index 0000000000000..08475a0026aef --- /dev/null +++ b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/forget_follower.yml @@ -0,0 +1,80 @@ +--- +"Test forget follower": + - do: + cluster.state: {} + + - set: {master_node: master} + + - do: + nodes.info: {} + + - set: {nodes.$master.transport_address: local_ip} + + - do: + cluster.put_settings: + body: + transient: + cluster.remote.remote_cluster.seeds: $local_ip + flat_settings: true + + - match: {transient: {cluster.remote.remote_cluster.seeds: $local_ip}} + + - do: + indices.create: + index: leader_index + body: + settings: + index: + number_of_shards: 1 + soft_deletes: + enabled: true + - is_true: acknowledged + + - do: + ccr.follow: + index: follower_index + wait_for_active_shards: 1 + body: + remote_cluster: remote_cluster + leader_index: leader_index + - is_true: follow_index_created + - is_true: follow_index_shards_acked + - is_true: index_following_started + + - do: + info: {} + + - set: {cluster_name: cluster_name} + + - do: + indices.stats: {index: follower_index} + + - set: {indices.follower_index.uuid: follower_index_uuid} + + - do: + ccr.forget_follower: + index: leader_index + body: + follower_cluster: $cluster_name + follower_index: follower_index + follower_index_uuid: $follower_index_uuid + leader_remote_cluster: remote_cluster + - match: { _shards.total: 1 } + - match: { _shards.successful: 1} + - match: { _shards.failed: 0} + - is_false: _shards.failure + + - do: + ccr.pause_follow: + index: follower_index + - is_true: acknowledged + + - do: + indices.close: + index: follower_index + - is_true: acknowledged + + - do: + ccr.unfollow: + index: follower_index + - is_true: acknowledged diff --git a/x-pack/plugin/ccr/qa/security/build.gradle b/x-pack/plugin/ccr/qa/security/build.gradle index af4238c20075e..e1a735e0b2668 100644 --- a/x-pack/plugin/ccr/qa/security/build.gradle +++ b/x-pack/plugin/ccr/qa/security/build.gradle @@ -22,7 +22,7 @@ leaderClusterTestCluster { setupCommand 'setupTestAdmin', 'bin/elasticsearch-users', 'useradd', "test_admin", '-p', 'x-pack-test-password', '-r', "superuser" setupCommand 'setupCcrUser', - 'bin/elasticsearch-users', 'useradd', "test_ccr", '-p', 'x-pack-test-password', '-r', "manage_ccr" + 'bin/elasticsearch-users', 'useradd', "test_ccr", '-p', 'x-pack-test-password', '-r', "ccruser" waitCondition = { node, ant -> File tmpFile = new File(node.cwd, 'wait.success') ant.get(src: "http://${node.httpUri()}/_cluster/health?wait_for_nodes=>=${numNodes}&wait_for_status=yellow", diff --git a/x-pack/plugin/ccr/qa/security/follower-roles.yml b/x-pack/plugin/ccr/qa/security/follower-roles.yml index be3e6cf5e1755..4a91c072043bb 100644 --- a/x-pack/plugin/ccr/qa/security/follower-roles.yml +++ b/x-pack/plugin/ccr/qa/security/follower-roles.yml @@ -2,7 +2,7 @@ ccruser: cluster: - manage_ccr indices: - - names: [ 'allowed-index', 'logs-eu-*' ] + - names: [ 'allowed-index', 'forget-follower', 'logs-eu-*' ] privileges: - monitor - read diff --git a/x-pack/plugin/ccr/qa/security/leader-roles.yml b/x-pack/plugin/ccr/qa/security/leader-roles.yml index 99fa62cbe832b..944af38b92ce5 100644 --- a/x-pack/plugin/ccr/qa/security/leader-roles.yml +++ b/x-pack/plugin/ccr/qa/security/leader-roles.yml @@ -2,7 +2,8 @@ ccruser: cluster: - read_ccr indices: - - names: [ 'allowed-index', 'logs-eu-*' ] + - names: [ 'allowed-index', 'forget-leader', 'logs-eu-*' ] privileges: - monitor - read + - manage_leader_index diff --git a/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java b/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java index 07f0802bf7078..034646e8250db 100644 --- a/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java +++ b/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ccr; import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.Strings; @@ -13,14 +14,19 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.test.rest.yaml.ObjectPath; +import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; public class FollowIndexSecurityIT extends ESCCRRestTestCase { @@ -176,4 +182,55 @@ public void testAutoFollowPatterns() throws Exception { pauseFollow(client(), allowedIndex); } + public void testForgetFollower() throws IOException { + final String forgetLeader = "forget-leader"; + final String forgetFollower = "forget-follower"; + if ("leader".equals(targetCluster)) { + logger.info("running against leader cluster"); + final Settings indexSettings = Settings.builder() + .put("index.number_of_replicas", 0) + .put("index.number_of_shards", 1) + .put("index.soft_deletes.enabled", true) + .build(); + createIndex(forgetLeader, indexSettings); + } else { + logger.info("running against follower cluster"); + followIndex(client(), "leader_cluster", forgetLeader, forgetFollower); + + final Response response = client().performRequest(new Request("GET", "/" + forgetFollower + "/_stats")); + final String followerIndexUUID = ObjectPath.createFromResponse(response).evaluate("indices." + forgetFollower + ".uuid"); + + assertOK(client().performRequest(new Request("POST", "/" + forgetFollower + "/_ccr/pause_follow"))); + + try (RestClient leaderClient = buildLeaderClient(restClientSettings())) { + final Request request = new Request("POST", "/" + forgetLeader + "/_ccr/forget_follower"); + final String requestBody = "{" + + "\"follower_cluster\":\"follow-cluster\"," + + "\"follower_index\":\"" + forgetFollower + "\"," + + "\"follower_index_uuid\":\"" + followerIndexUUID + "\"," + + "\"leader_remote_cluster\":\"leader_cluster\"" + + "}"; + request.setJsonEntity(requestBody); + final Response forgetFollowerResponse = leaderClient.performRequest(request); + assertOK(forgetFollowerResponse); + final Map shards = ObjectPath.createFromResponse(forgetFollowerResponse).evaluate("_shards"); + assertNull(shards.get("failures")); + assertThat(shards.get("total"), equalTo(1)); + assertThat(shards.get("successful"), equalTo(1)); + assertThat(shards.get("failed"), equalTo(0)); + + final Request retentionLeasesRequest = new Request("GET", "/" + forgetLeader + "/_stats"); + retentionLeasesRequest.addParameter("level", "shards"); + final Response retentionLeasesResponse = leaderClient.performRequest(retentionLeasesRequest); + final ArrayList shardsStats = + ObjectPath.createFromResponse(retentionLeasesResponse).evaluate("indices." + forgetLeader + ".shards.0"); + assertThat(shardsStats, hasSize(1)); + final Map shardStatsAsMap = (Map) shardsStats.get(0); + final Map retentionLeasesStats = (Map) shardStatsAsMap.get("retention_leases"); + final List leases = (List) retentionLeasesStats.get("leases"); + assertThat(leases, empty()); + } + } + } + } diff --git a/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java b/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java index d2d6d40a1009a..f415198f8da7f 100644 --- a/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java +++ b/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java @@ -253,16 +253,25 @@ protected RestClient buildLeaderClient() throws IOException { return buildClient(System.getProperty("tests.leader_host")); } + protected RestClient buildLeaderClient(final Settings settings) throws IOException { + assert "leader".equals(targetCluster) == false; + return buildClient(System.getProperty("tests.leader_host"), settings); + } + protected RestClient buildMiddleClient() throws IOException { assert "middle".equals(targetCluster) == false; return buildClient(System.getProperty("tests.middle_host")); } private RestClient buildClient(final String url) throws IOException { + return buildClient(url, restAdminSettings()); + } + + private RestClient buildClient(final String url, final Settings settings) throws IOException { int portSeparator = url.lastIndexOf(':'); HttpHost httpHost = new HttpHost(url.substring(0, portSeparator), - Integer.parseInt(url.substring(portSeparator + 1)), getProtocol()); - return buildClient(restAdminSettings(), new HttpHost[]{httpHost}); + Integer.parseInt(url.substring(portSeparator + 1)), getProtocol()); + return buildClient(settings, new HttpHost[]{httpHost}); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index e0ff248f1d539..4bedfdf962fa4 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -56,6 +56,7 @@ import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.TransportFollowInfoAction; import org.elasticsearch.xpack.ccr.action.TransportFollowStatsAction; +import org.elasticsearch.xpack.ccr.action.TransportForgetFollowerAction; import org.elasticsearch.xpack.ccr.action.TransportGetAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.TransportPauseFollowAction; import org.elasticsearch.xpack.ccr.action.TransportPutAutoFollowPatternAction; @@ -76,6 +77,7 @@ import org.elasticsearch.xpack.ccr.rest.RestDeleteAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestFollowInfoAction; import org.elasticsearch.xpack.ccr.rest.RestFollowStatsAction; +import org.elasticsearch.xpack.ccr.rest.RestForgetFollowerAction; import org.elasticsearch.xpack.ccr.rest.RestGetAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestPauseFollowAction; import org.elasticsearch.xpack.ccr.rest.RestPutAutoFollowPatternAction; @@ -90,6 +92,7 @@ import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction; import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; +import org.elasticsearch.xpack.core.ccr.action.ForgetFollowerAction; import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; @@ -230,7 +233,9 @@ public List> getPersistentTasksExecutor(ClusterServic // auto-follow actions new ActionHandler<>(DeleteAutoFollowPatternAction.INSTANCE, TransportDeleteAutoFollowPatternAction.class), new ActionHandler<>(PutAutoFollowPatternAction.INSTANCE, TransportPutAutoFollowPatternAction.class), - new ActionHandler<>(GetAutoFollowPatternAction.INSTANCE, TransportGetAutoFollowPatternAction.class)); + new ActionHandler<>(GetAutoFollowPatternAction.INSTANCE, TransportGetAutoFollowPatternAction.class), + // forget follower action + new ActionHandler<>(ForgetFollowerAction.INSTANCE, TransportForgetFollowerAction.class)); } public List getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings, @@ -254,7 +259,9 @@ public List getRestHandlers(Settings settings, RestController restC // auto-follow APIs new RestDeleteAutoFollowPatternAction(settings, restController), new RestPutAutoFollowPatternAction(settings, restController), - new RestGetAutoFollowPatternAction(settings, restController)); + new RestGetAutoFollowPatternAction(settings, restController), + // forget follower API + new RestForgetFollowerAction(settings, restController)); } public List getNamedWriteables() { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java new file mode 100644 index 0000000000000..3be5ee94dcef6 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java @@ -0,0 +1,156 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.Assertions; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.broadcast.BroadcastResponse; +import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.PlainShardsIterator; +import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardsIterator; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.CcrRetentionLeases; +import org.elasticsearch.xpack.core.ccr.action.ForgetFollowerAction; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutionException; + +public class TransportForgetFollowerAction extends TransportBroadcastByNodeAction< + ForgetFollowerAction.Request, + BroadcastResponse, + TransportBroadcastByNodeAction.EmptyResult> { + + private final ClusterService clusterService; + private final IndicesService indicesService; + + @Inject + public TransportForgetFollowerAction( + final Settings settings, + final ThreadPool threadPool, + final ClusterService clusterService, + final TransportService transportService, + final ActionFilters actionFilters, + final IndexNameExpressionResolver indexNameExpressionResolver, + final IndicesService indicesService) { + super( + settings, + ForgetFollowerAction.NAME, + Objects.requireNonNull(threadPool), + Objects.requireNonNull(clusterService), + Objects.requireNonNull(transportService), + Objects.requireNonNull(actionFilters), + Objects.requireNonNull(indexNameExpressionResolver), + ForgetFollowerAction.Request::new, + ThreadPool.Names.MANAGEMENT); + this.clusterService = clusterService; + this.indicesService = Objects.requireNonNull(indicesService); + } + + @Override + protected EmptyResult readShardResult(final StreamInput in) { + return EmptyResult.readEmptyResultFrom(in); + } + + @Override + protected BroadcastResponse newResponse( + final ForgetFollowerAction.Request request, + final int totalShards, + final int successfulShards, + final int failedShards, List emptyResults, + final List shardFailures, + final ClusterState clusterState) { + return new BroadcastResponse(totalShards, successfulShards, failedShards, shardFailures); + } + + @Override + protected ForgetFollowerAction.Request readRequestFrom(final StreamInput in) throws IOException { + return new ForgetFollowerAction.Request(in); + } + + @Override + protected EmptyResult shardOperation(final ForgetFollowerAction.Request request, final ShardRouting shardRouting) { + final Index followerIndex = new Index(request.followerIndex(), request.followerIndexUUID()); + final Index leaderIndex = clusterService.state().metaData().index(request.leaderIndex()).getIndex(); + final String id = CcrRetentionLeases.retentionLeaseId( + request.followerCluster(), + followerIndex, + request.leaderRemoteCluster(), + leaderIndex); + + final IndexShard indexShard = indicesService.indexServiceSafe(leaderIndex).getShard(shardRouting.shardId().id()); + + final PlainActionFuture permit = new PlainActionFuture<>(); + indexShard.acquirePrimaryOperationPermit(permit, ThreadPool.Names.SAME, request); + try (Releasable ignored = permit.get()) { + final PlainActionFuture future = new PlainActionFuture<>(); + indexShard.removeRetentionLease(id, future); + future.get(); + } catch (final ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + + return EmptyResult.INSTANCE; + } + + @Override + protected ShardsIterator shards( + final ClusterState clusterState, + final ForgetFollowerAction.Request request, + final String[] concreteIndices) { + final GroupShardsIterator activePrimaryShards = + clusterState.routingTable().activePrimaryShardsGrouped(concreteIndices, false); + final List shardRoutings = new ArrayList<>(); + final Iterator it = activePrimaryShards.iterator(); + while (it.hasNext()) { + final ShardIterator shardIterator = it.next(); + final ShardRouting primaryShard = shardIterator.nextOrNull(); + assert primaryShard != null; + shardRoutings.add(primaryShard); + if (Assertions.ENABLED) { + final ShardRouting maybeNextPrimaryShard = shardIterator.nextOrNull(); + assert maybeNextPrimaryShard == null : maybeNextPrimaryShard; + } + } + return new PlainShardsIterator(shardRoutings); + } + + @Override + protected ClusterBlockException checkGlobalBlock(final ClusterState state, final ForgetFollowerAction.Request request) { + return null; + } + + @Override + protected ClusterBlockException checkRequestBlock( + final ClusterState state, + final ForgetFollowerAction.Request request, + final String[] concreteIndices) { + return null; + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestForgetFollowerAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestForgetFollowerAction.java new file mode 100644 index 0000000000000..dc39aea372d81 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestForgetFollowerAction.java @@ -0,0 +1,51 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.ccr.action.ForgetFollowerAction; + +import java.io.IOException; +import java.util.Objects; + +public class RestForgetFollowerAction extends BaseRestHandler { + + public RestForgetFollowerAction(final Settings settings, final RestController restController) { + super(Objects.requireNonNull(settings)); + Objects.requireNonNull(restController); + restController.registerHandler(RestRequest.Method.POST, "/{index}/_ccr/forget_follower", this); + } + + @Override + public String getName() { + return "forget_follower_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) { + final String leaderIndex = restRequest.param("index"); + + return channel -> { + try (XContentParser parser = restRequest.contentOrSourceParamParser()) { + final ForgetFollowerAction.Request request = ForgetFollowerAction.Request.fromXContent(parser, leaderIndex); + client.execute(ForgetFollowerAction.INSTANCE, request, new RestToXContentListener<>(channel)); + } catch (final IOException e) { + channel.sendResponse(new BytesRestResponse(channel, RestStatus.INTERNAL_SERVER_ERROR, e)); + } + }; + + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index 61aea8d47af55..14385bf4d6812 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -286,7 +286,6 @@ public void testRateLimitingIsEmployed() throws Exception { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/39245") public void testIndividualActionsTimeout() throws Exception { ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); TimeValue timeValue = TimeValue.timeValueMillis(100); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java index 44b38ddbadfa0..5a93291dacd1b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java @@ -19,6 +19,7 @@ import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.broadcast.BroadcastResponse; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; @@ -53,6 +54,7 @@ import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction; import org.elasticsearch.xpack.ccr.repository.CcrRepository; +import org.elasticsearch.xpack.core.ccr.action.ForgetFollowerAction; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; @@ -80,6 +82,7 @@ import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.emptyArray; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; @@ -918,6 +921,79 @@ public void onResponseReceived( } } + public void testForgetFollower() throws Exception { + final String leaderIndex = "leader"; + final String followerIndex = "follower"; + final int numberOfShards = randomIntBetween(1, 4); + final String leaderIndexSettings = + getIndexSettings(numberOfShards, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + ensureLeaderYellow(leaderIndex); + final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + ensureFollowerGreen(true, followerIndex); + + pauseFollow(followerIndex); + followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet(); + + final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get(); + try { + for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getNodes().values()) { + final MockTransportService senderTransportService = + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); + senderTransportService.addSendBehavior( + (connection, requestId, action, request, options) -> { + if (RetentionLeaseActions.Remove.ACTION_NAME.equals(action) + || TransportActionProxy.getProxyAction(RetentionLeaseActions.Remove.ACTION_NAME).equals(action)) { + final RetentionLeaseActions.RemoveRequest removeRequest = (RetentionLeaseActions.RemoveRequest) request; + if (randomBoolean()) { + throw new ConnectTransportException(connection.getNode(), "connection failed"); + } else { + throw new IndexShardClosedException(removeRequest.getShardId()); + } + } + connection.sendRequest(requestId, action, request, options); + }); + } + + expectThrows( + ElasticsearchException.class, + () -> followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request(followerIndex)).actionGet()); + + final ClusterStateResponse followerIndexClusterState = + followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); + final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID(); + + final BroadcastResponse forgetFollowerResponse = leaderClient().execute( + ForgetFollowerAction.INSTANCE, + new ForgetFollowerAction.Request( + getFollowerCluster().getClusterName(), + followerIndex, + followerUUID, + "leader_cluster", + leaderIndex)).actionGet(); + + assertThat(forgetFollowerResponse.getTotalShards(), equalTo(numberOfShards)); + assertThat(forgetFollowerResponse.getSuccessfulShards(), equalTo(numberOfShards)); + assertThat(forgetFollowerResponse.getFailedShards(), equalTo(0)); + assertThat(forgetFollowerResponse.getShardFailures(), emptyArray()); + + final IndicesStatsResponse afterForgetFollowerStats = + leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); + final List afterForgetFollowerShardsStats = getShardsStats(afterForgetFollowerStats); + for (final ShardStats shardStats : afterForgetFollowerShardsStats) { + assertThat(shardStats.getRetentionLeaseStats().retentionLeases().leases(), empty()); + } + } finally { + for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getDataNodes().values()) { + final MockTransportService senderTransportService = + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); + senderTransportService.clearAllRules(); + } + } + } + private void assertRetentionLeaseRenewal( final int numberOfShards, final int numberOfReplicas, diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java index f2f94978c2985..22e02e868e675 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java @@ -122,7 +122,6 @@ public void testFailOverOnFollower() throws Exception { pauseFollow("follower-index"); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33337") public void testFollowIndexAndCloseNode() throws Exception { getFollowerCluster().ensureAtLeastNumDataNodes(3); String leaderIndexSettings = getIndexSettings(3, 1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ForgetFollowerAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ForgetFollowerAction.java new file mode 100644 index 0000000000000..d19166eb1fa49 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ForgetFollowerAction.java @@ -0,0 +1,186 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.ccr.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.broadcast.BroadcastRequest; +import org.elasticsearch.action.support.broadcast.BroadcastResponse; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Objects; + +public class ForgetFollowerAction extends Action { + + public static final String NAME = "indices:admin/xpack/ccr/forget_follower"; + public static final ForgetFollowerAction INSTANCE = new ForgetFollowerAction(); + + private ForgetFollowerAction() { + super(NAME); + } + + @Override + public BroadcastResponse newResponse() { + return new BroadcastResponse(); + } + + /** + * Represents a forget follower request. Note that this an expert API intended to be used only when unfollowing a follower index fails + * to emove the follower retention leases. Please be sure that you understand the purpose this API before using. + */ + public static class Request extends BroadcastRequest { + + private static final ParseField FOLLOWER_CLUSTER = new ParseField("follower_cluster"); + private static final ParseField FOLLOWER_INDEX = new ParseField("follower_index"); + private static final ParseField FOLLOWER_INDEX_UUID = new ParseField("follower_index_uuid"); + private static final ParseField LEADER_REMOTE_CLUSTER = new ParseField("leader_remote_cluster"); + + private static final ObjectParser PARSER = new ObjectParser<>(NAME, () -> new String[4]); + + static { + PARSER.declareString((parameters, value) -> parameters[0] = value, FOLLOWER_CLUSTER); + PARSER.declareString((parameters, value) -> parameters[1] = value, FOLLOWER_INDEX); + PARSER.declareString((parameters, value) -> parameters[2] = value, FOLLOWER_INDEX_UUID); + PARSER.declareString((parameters, value) -> parameters[3] = value, LEADER_REMOTE_CLUSTER); + } + + public static ForgetFollowerAction.Request fromXContent( + final XContentParser parser, + final String leaderIndex) throws IOException { + final String[] parameters = PARSER.parse(parser, null); + return new Request(parameters[0], parameters[1], parameters[2], parameters[3], leaderIndex); + } + + private String followerCluster; + + /** + * The name of the cluster containing the follower index. + * + * @return the name of the cluster containing the follower index + */ + public String followerCluster() { + return followerCluster; + } + + private String followerIndex; + + /** + * The name of the follower index. + * + * @return the name of the follower index + */ + public String followerIndex() { + return followerIndex; + } + + private String followerIndexUUID; + + /** + * The UUID of the follower index. + * + * @return the UUID of the follower index + */ + public String followerIndexUUID() { + return followerIndexUUID; + } + + private String leaderRemoteCluster; + + /** + * The alias of the remote cluster containing the leader index. + * + * @return the alias of the remote cluster + */ + public String leaderRemoteCluster() { + return leaderRemoteCluster; + } + + private String leaderIndex; + + /** + * The name of the leader index. + * + * @return the name of the leader index + */ + public String leaderIndex() { + return leaderIndex; + } + + public Request() { + + } + + /** + * Construct a forget follower request. + * + * @param followerCluster the name of the cluster containing the follower index to forget + * @param followerIndex the name of follower index + * @param followerIndexUUID the UUID of the follower index + * @param leaderRemoteCluster the alias of the remote cluster containing the leader index from the perspective of the follower index + * @param leaderIndex the name of the leader index + */ + public Request( + final String followerCluster, + final String followerIndex, + final String followerIndexUUID, + final String leaderRemoteCluster, + final String leaderIndex) { + super(new String[]{leaderIndex}); + this.followerCluster = Objects.requireNonNull(followerCluster); + this.leaderIndex = Objects.requireNonNull(leaderIndex); + this.leaderRemoteCluster = Objects.requireNonNull(leaderRemoteCluster); + this.followerIndex = Objects.requireNonNull(followerIndex); + this.followerIndexUUID = Objects.requireNonNull(followerIndexUUID); + } + + public Request(final StreamInput in) throws IOException { + super.readFrom(in); + followerCluster = in.readString(); + leaderIndex = in.readString(); + leaderRemoteCluster = in.readString(); + followerIndex = in.readString(); + followerIndexUUID = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(followerCluster); + out.writeString(leaderIndex); + out.writeString(leaderRemoteCluster); + out.writeString(followerIndex); + out.writeString(followerIndexUUID); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + } + + @Override + public RequestBuilder newRequestBuilder(final ElasticsearchClient client) { + return new RequestBuilder(client, this); + } + + public static class RequestBuilder extends ActionRequestBuilder { + + public RequestBuilder(final ElasticsearchClient client, final Action action) { + super(client, action, new Request()); + } + + } + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java index f35a14314338c..00b115131d1fb 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java @@ -9,11 +9,13 @@ import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.broadcast.BroadcastResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; +import org.elasticsearch.xpack.core.ccr.action.ForgetFollowerAction; import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; @@ -96,6 +98,16 @@ public ActionFuture unfollow(final UnfollowAction.Request return listener; } + public void forgetFollower(final ForgetFollowerAction.Request request, final ActionListener listener) { + client.execute(ForgetFollowerAction.INSTANCE, request, listener); + } + + public ActionFuture forgetFollower(final ForgetFollowerAction.Request request) { + final PlainActionFuture listener = PlainActionFuture.newFuture(); + client.execute(ForgetFollowerAction.INSTANCE, request, listener); + return listener; + } + public void putAutoFollowPattern( final PutAutoFollowPatternAction.Request request, final ActionListener listener) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/IndexPrivilege.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/IndexPrivilege.java index d24863d6d53c4..e20e76ee47b37 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/IndexPrivilege.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/IndexPrivilege.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.xpack.core.ccr.action.ForgetFollowerAction; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; import org.elasticsearch.xpack.core.indexlifecycle.action.ExplainLifecycleAction; @@ -62,6 +63,7 @@ public final class IndexPrivilege extends Privilege { ExplainLifecycleAction.NAME); private static final Automaton MANAGE_FOLLOW_INDEX_AUTOMATON = patterns(PutFollowAction.NAME, UnfollowAction.NAME, CloseIndexAction.NAME + "*"); + private static final Automaton MANAGE_LEADER_INDEX_AUTOMATON = patterns(ForgetFollowerAction.NAME + "*"); private static final Automaton MANAGE_ILM_AUTOMATON = patterns("indices:admin/ilm/*"); public static final IndexPrivilege NONE = new IndexPrivilege("none", Automatons.EMPTY); @@ -78,6 +80,7 @@ public final class IndexPrivilege extends Privilege { public static final IndexPrivilege CREATE_INDEX = new IndexPrivilege("create_index", CREATE_INDEX_AUTOMATON); public static final IndexPrivilege VIEW_METADATA = new IndexPrivilege("view_index_metadata", VIEW_METADATA_AUTOMATON); public static final IndexPrivilege MANAGE_FOLLOW_INDEX = new IndexPrivilege("manage_follow_index", MANAGE_FOLLOW_INDEX_AUTOMATON); + public static final IndexPrivilege MANAGE_LEADER_INDEX = new IndexPrivilege("manage_leader_index", MANAGE_LEADER_INDEX_AUTOMATON); public static final IndexPrivilege MANAGE_ILM = new IndexPrivilege("manage_ilm", MANAGE_ILM_AUTOMATON); private static final Map VALUES = MapBuilder.newMapBuilder() @@ -95,6 +98,7 @@ public final class IndexPrivilege extends Privilege { .put("view_index_metadata", VIEW_METADATA) .put("read_cross_cluster", READ_CROSS_CLUSTER) .put("manage_follow_index", MANAGE_FOLLOW_INDEX) + .put("manage_leader_index", MANAGE_LEADER_INDEX) .put("manage_ilm", MANAGE_ILM) .immutableMap(); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.forget_follower.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.forget_follower.json new file mode 100644 index 0000000000000..92d38e5e999e8 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.forget_follower.json @@ -0,0 +1,21 @@ +{ + "ccr.forget_follower": { + "documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current", + "methods": [ "POST" ], + "url": { + "path": "/{index}/_ccr/forget_follower", + "paths": [ "/{index}/_ccr/forget_follower" ], + "parts": { + "index": { + "type": "string", + "required": true, + "description": "the name of the leader index for which specified follower retention leases should be removed" + } + } + }, + "body": { + "description" : "the name and UUID of the follower index, the name of the cluster containing the follower index, and the alias from the perspective of that cluster for the remote cluster containing the leader index", + "required" : true + } + } +}