diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-security/roles.yml b/x-pack/plugin/ccr/qa/multi-cluster-with-security/roles.yml index 9ae4922b8e634..700a2416c6609 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster-with-security/roles.yml +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-security/roles.yml @@ -2,7 +2,7 @@ ccruser: cluster: - manage_ccr indices: - - names: [ 'index1' ] + - names: [ 'allowed-index' ] privileges: - monitor - read diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java index 2fcc0890f2bc0..e8f4abd0bc849 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java @@ -60,37 +60,28 @@ protected boolean preserveClusterUponCompletion() { public void testFollowIndex() throws Exception { final int numDocs = 16; - final String indexName1 = "index1"; - final String indexName2 = "index2"; + final String allowedIndex = "allowed-index"; + final String unallowedIndex = "unallowed-index"; if (runningAgainstLeaderCluster) { logger.info("Running against leader cluster"); - Settings indexSettings = Settings.builder() - .put("index.soft_deletes.enabled", true) - .build(); - createIndex(indexName1, indexSettings); - createIndex(indexName2, indexSettings); + Settings indexSettings = Settings.builder().put("index.soft_deletes.enabled", true).build(); + createIndex(allowedIndex, indexSettings); + createIndex(unallowedIndex, indexSettings); for (int i = 0; i < numDocs; i++) { logger.info("Indexing doc [{}]", i); - index(indexName1, Integer.toString(i), "field", i); + index(allowedIndex, Integer.toString(i), "field", i); } for (int i = 0; i < numDocs; i++) { logger.info("Indexing doc [{}]", i); - index(indexName2, Integer.toString(i), "field", i); + index(unallowedIndex, Integer.toString(i), "field", i); } - refresh(indexName1); - verifyDocuments(adminClient(), indexName1, numDocs); + refresh(allowedIndex); + verifyDocuments(adminClient(), allowedIndex, numDocs); } else { - logger.info("Running against follow cluster"); - Settings indexSettings = Settings.builder() - .put("index.xpack.ccr.following_index", true) - .build(); - // TODO: remove mapping here when ccr syncs mappings too - createIndex(indexName1, indexSettings, "\"doc\": { \"properties\": { \"field\": { \"type\": \"long\" }}}"); - ensureYellow(indexName1); - followIndex("leader_cluster:" + indexName1, indexName1); - assertBusy(() -> verifyDocuments(client(), indexName1, numDocs)); + createAndFollowIndex("leader_cluster:" + allowedIndex, allowedIndex); + assertBusy(() -> verifyDocuments(client(), allowedIndex, numDocs)); assertThat(countCcrNodeTasks(), equalTo(1)); - assertOK(client().performRequest("POST", "/" + indexName1 + "/_xpack/ccr/_unfollow")); + assertOK(client().performRequest("POST", "/" + allowedIndex + "/_xpack/ccr/_unfollow")); // Make sure that there are no other ccr relates operations running: assertBusy(() -> { Map clusterState = toMap(adminClient().performRequest("GET", "/_cluster/state")); @@ -98,16 +89,28 @@ public void testFollowIndex() throws Exception { assertThat(tasks.size(), equalTo(0)); assertThat(countCcrNodeTasks(), equalTo(0)); }); - - // TODO: remove mapping here when ccr syncs mappings too - createIndex(indexName2, indexSettings, "\"doc\": { \"properties\": { \"field\": { \"type\": \"long\" }}}"); - ensureYellow(indexName2); - followIndex("leader_cluster:" + indexName2, indexName2); + + followIndex("leader_cluster:" + allowedIndex, allowedIndex); + assertThat(countCcrNodeTasks(), equalTo(1)); + assertOK(client().performRequest("POST", "/" + allowedIndex + "/_xpack/ccr/_unfollow")); + // Make sure that there are no other ccr relates operations running: + assertBusy(() -> { + Map clusterState = toMap(adminClient().performRequest("GET", "/_cluster/state")); + List tasks = (List) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", clusterState); + assertThat(tasks.size(), equalTo(0)); + assertThat(countCcrNodeTasks(), equalTo(0)); + }); + + createAndFollowIndex("leader_cluster:" + unallowedIndex, unallowedIndex); // Verify that nothing has been replicated and no node tasks are running // These node tasks should have been failed due to the fact that the user // has no sufficient priviledges. assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0))); - verifyDocuments(adminClient(), indexName2, 0); + verifyDocuments(adminClient(), unallowedIndex, 0); + + followIndex("leader_cluster:" + unallowedIndex, unallowedIndex); + assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0))); + verifyDocuments(adminClient(), unallowedIndex, 0); } } @@ -148,6 +151,11 @@ private static void followIndex(String leaderIndex, String followIndex) throws I assertOK(client().performRequest("POST", "/" + followIndex + "/_xpack/ccr/_follow", params)); } + private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException { + Map params = Collections.singletonMap("leader_index", leaderIndex); + assertOK(client().performRequest("POST", "/" + followIndex + "/_xpack/ccr/_create_and_follow", params)); + } + void verifyDocuments(RestClient client, String index, int expectedNumDocs) throws IOException { Map params = new HashMap<>(); params.put("size", Integer.toString(expectedNumDocs)); @@ -184,13 +192,4 @@ protected static void createIndex(String name, Settings settings, String mapping + ", \"mappings\" : {" + mapping + "} }", ContentType.APPLICATION_JSON))); } - private static void ensureYellow(String index) throws IOException { - Map params = new HashMap<>(); - params.put("wait_for_status", "yellow"); - params.put("wait_for_no_relocating_shards", "true"); - params.put("timeout", "30s"); - params.put("level", "shards"); - assertOK(adminClient().performRequest("GET", "_cluster/health/" + index, params)); - } - } diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java index 61889e62484f4..69aa9b2542b8f 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java @@ -57,23 +57,17 @@ public void testFollowIndex() throws Exception { } else { logger.info("Running against follow cluster"); final String followIndexName = "test_index2"; - Settings indexSettings = Settings.builder() - .put("index.xpack.ccr.following_index", true) - .build(); - // TODO: remove mapping here when ccr syncs mappings too - createIndex(followIndexName, indexSettings, "\"doc\": { \"properties\": { \"field\": { \"type\": \"integer\" }}}"); - ensureYellow(followIndexName); - - followIndex("leader_cluster:" + leaderIndexName, followIndexName); + createAndFollowIndex("leader_cluster:" + leaderIndexName, followIndexName); assertBusy(() -> verifyDocuments(followIndexName, numDocs)); - + // unfollow and then follow and then index a few docs in leader index: + unfollowIndex(followIndexName); + followIndex("leader_cluster:" + leaderIndexName, followIndexName); try (RestClient leaderClient = buildLeaderClient()) { int id = numDocs; index(leaderClient, leaderIndexName, Integer.toString(id), "field", id); index(leaderClient, leaderIndexName, Integer.toString(id + 1), "field", id + 1); index(leaderClient, leaderIndexName, Integer.toString(id + 2), "field", id + 2); } - assertBusy(() -> verifyDocuments(followIndexName, numDocs + 3)); } } @@ -97,6 +91,15 @@ private static void followIndex(String leaderIndex, String followIndex) throws I assertOK(client().performRequest("POST", "/" + followIndex + "/_xpack/ccr/_follow", params)); } + private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException { + Map params = Collections.singletonMap("leader_index", leaderIndex); + assertOK(client().performRequest("POST", "/" + followIndex + "/_xpack/ccr/_create_and_follow", params)); + } + + private static void unfollowIndex(String followIndex) throws IOException { + assertOK(client().performRequest("POST", "/" + followIndex + "/_xpack/ccr/_unfollow")); + } + private static void verifyDocuments(String index, int expectedNumDocs) throws IOException { Map params = new HashMap<>(); params.put("size", Integer.toString(expectedNumDocs)); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index c5dd6f861fc1a..e98904c9b28f0 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -34,7 +34,8 @@ import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction; +import org.elasticsearch.xpack.ccr.action.FollowIndexAction; +import org.elasticsearch.xpack.ccr.action.CreateAndFollowIndexAction; import org.elasticsearch.xpack.ccr.action.ShardChangesAction; import org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask; import org.elasticsearch.xpack.ccr.action.ShardFollowTask; @@ -43,7 +44,8 @@ import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction; import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; -import org.elasticsearch.xpack.ccr.rest.RestFollowExistingIndexAction; +import org.elasticsearch.xpack.ccr.rest.RestFollowIndexAction; +import org.elasticsearch.xpack.ccr.rest.RestCreateAndFollowIndexAction; import org.elasticsearch.xpack.ccr.rest.RestUnfollowIndexAction; import org.elasticsearch.xpack.core.XPackPlugin; @@ -90,9 +92,10 @@ public List> getPersistentTasksExecutor(ClusterServic return Arrays.asList( new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class), - new ActionHandler<>(FollowExistingIndexAction.INSTANCE, FollowExistingIndexAction.TransportAction.class), + new ActionHandler<>(FollowIndexAction.INSTANCE, FollowIndexAction.TransportAction.class), new ActionHandler<>(UnfollowIndexAction.INSTANCE, UnfollowIndexAction.TransportAction.class), - new ActionHandler<>(BulkShardOperationsAction.INSTANCE, TransportBulkShardOperationsAction.class) + new ActionHandler<>(BulkShardOperationsAction.INSTANCE, TransportBulkShardOperationsAction.class), + new ActionHandler<>(CreateAndFollowIndexAction.INSTANCE, CreateAndFollowIndexAction.TransportAction.class) ); } @@ -102,7 +105,8 @@ public List getRestHandlers(Settings settings, RestController restC Supplier nodesInCluster) { return Arrays.asList( new RestUnfollowIndexAction(settings, restController), - new RestFollowExistingIndexAction(settings, restController) + new RestFollowIndexAction(settings, restController), + new RestCreateAndFollowIndexAction(settings, restController) ); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java new file mode 100644 index 0000000000000..423dba222eab1 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java @@ -0,0 +1,309 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.ActiveShardsObserver; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.CcrSettings; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +public class CreateAndFollowIndexAction extends Action { + + public static final CreateAndFollowIndexAction INSTANCE = new CreateAndFollowIndexAction(); + public static final String NAME = "cluster:admin/xpack/ccr/create_and_follow_index"; + + private CreateAndFollowIndexAction() { + super(NAME); + } + + @Override + public RequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new RequestBuilder(client); + } + + @Override + public Response newResponse() { + return new Response(); + } + + public static class Request extends AcknowledgedRequest { + + private FollowIndexAction.Request followRequest; + + public FollowIndexAction.Request getFollowRequest() { + return followRequest; + } + + public void setFollowRequest(FollowIndexAction.Request followRequest) { + this.followRequest = followRequest; + } + + @Override + public ActionRequestValidationException validate() { + return followRequest.validate(); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + followRequest = new FollowIndexAction.Request(); + followRequest.readFrom(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + followRequest.writeTo(out); + } + } + + public static class Response extends ActionResponse implements ToXContentObject { + + private boolean followIndexCreated; + private boolean followIndexShardsAcked; + private boolean indexFollowingStarted; + + Response() { + } + + Response(boolean followIndexCreated, boolean followIndexShardsAcked, boolean indexFollowingStarted) { + this.followIndexCreated = followIndexCreated; + this.followIndexShardsAcked = followIndexShardsAcked; + this.indexFollowingStarted = indexFollowingStarted; + } + + public boolean isFollowIndexCreated() { + return followIndexCreated; + } + + public boolean isFollowIndexShardsAcked() { + return followIndexShardsAcked; + } + + public boolean isIndexFollowingStarted() { + return indexFollowingStarted; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + followIndexCreated = in.readBoolean(); + followIndexShardsAcked = in.readBoolean(); + indexFollowingStarted = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(followIndexCreated); + out.writeBoolean(followIndexShardsAcked); + out.writeBoolean(indexFollowingStarted); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + builder.field("follow_index_created", followIndexCreated); + builder.field("follow_index_shards_acked", followIndexShardsAcked); + builder.field("index_following_started", indexFollowingStarted); + } + builder.endObject(); + return builder; + } + } + + public static class RequestBuilder extends ActionRequestBuilder { + + RequestBuilder(ElasticsearchClient client) { + super(client, INSTANCE, new Request()); + } + } + + public static class TransportAction extends TransportMasterNodeAction { + + private final Client client; + private final AllocationService allocationService; + private final RemoteClusterService remoteClusterService; + private final ActiveShardsObserver activeShardsObserver; + + @Inject + public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Client client, + AllocationService allocationService) { + super(settings, NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, Request::new); + this.client = client; + this.allocationService = allocationService; + this.remoteClusterService = transportService.getRemoteClusterService(); + this.activeShardsObserver = new ActiveShardsObserver(settings, clusterService, threadPool); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected Response newResponse() { + return new Response(); + } + + @Override + protected void masterOperation(Request request, ClusterState state, ActionListener listener) throws Exception { + String[] indices = new String[]{request.getFollowRequest().getLeaderIndex()}; + Map> remoteClusterIndices = remoteClusterService.groupClusterIndices(indices, s -> false); + if (remoteClusterIndices.containsKey(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { + // Following an index in local cluster, so use local cluster state to fetch leader IndexMetaData: + IndexMetaData leaderIndexMetadata = state.getMetaData().index(request.getFollowRequest().getLeaderIndex()); + createFollowIndex(leaderIndexMetadata, request, listener); + } else { + // Following an index in remote cluster, so use remote client to fetch leader IndexMetaData: + assert remoteClusterIndices.size() == 1; + Map.Entry> entry = remoteClusterIndices.entrySet().iterator().next(); + assert entry.getValue().size() == 1; + String clusterNameAlias = entry.getKey(); + String leaderIndex = entry.getValue().get(0); + + Client remoteClient = client.getRemoteClusterClient(clusterNameAlias); + ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.clear(); + clusterStateRequest.metaData(true); + clusterStateRequest.indices(leaderIndex); + remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(r -> { + ClusterState remoteClusterState = r.getState(); + IndexMetaData leaderIndexMetadata = remoteClusterState.getMetaData().index(leaderIndex); + createFollowIndex(leaderIndexMetadata, request, listener); + }, listener::onFailure)); + } + } + + private void createFollowIndex(IndexMetaData leaderIndexMetaData, Request request, ActionListener listener) { + if (leaderIndexMetaData == null) { + listener.onFailure(new IllegalArgumentException("leader index [" + request.getFollowRequest().getLeaderIndex() + + "] does not exist")); + return; + } + + ActionListener handler = ActionListener.wrap( + result -> { + if (result) { + initiateFollowing(request, listener); + } else { + listener.onResponse(new Response(true, false, false)); + } + }, + listener::onFailure); + // Can't use create index api here, because then index templates can alter the mappings / settings. + // And index templates could introduce settings / mappings that are incompatible with the leader index. + clusterService.submitStateUpdateTask("follow_index_action", new AckedClusterStateUpdateTask(request, handler) { + + @Override + protected Boolean newResponse(boolean acknowledged) { + return acknowledged; + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + IndexMetaData currentIndex = currentState.metaData().index(request.getFollowRequest().getFollowIndex()); + if (currentIndex != null) { + throw new ResourceAlreadyExistsException(currentIndex.getIndex()); + } + + MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData()); + IndexMetaData.Builder imdBuilder = IndexMetaData.builder(request.getFollowRequest().getFollowIndex()); + + // Copy all settings, but overwrite a few settings. + Settings.Builder settingsBuilder = Settings.builder(); + settingsBuilder.put(leaderIndexMetaData.getSettings()); + // Overwriting UUID here, because otherwise we can't follow indices in the same cluster + settingsBuilder.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID()); + settingsBuilder.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getFollowRequest().getFollowIndex()); + settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); + imdBuilder.settings(settingsBuilder); + + // Copy mappings from leader IMD to follow IMD + for (ObjectObjectCursor cursor : leaderIndexMetaData.getMappings()) { + imdBuilder.putMapping(cursor.value); + } + mdBuilder.put(imdBuilder.build(), false); + + ClusterState.Builder builder = ClusterState.builder(currentState); + builder.metaData(mdBuilder.build()); + ClusterState updatedState = builder.build(); + + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable()) + .addAsNew(updatedState.metaData().index(request.getFollowRequest().getFollowIndex())); + updatedState = allocationService.reroute( + ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build(), + "follow index [" + request.getFollowRequest().getFollowIndex() + "] created"); + + return updatedState; + } + }); + } + + private void initiateFollowing(Request request, ActionListener listener) { + activeShardsObserver.waitForActiveShards(new String[]{request.followRequest.getFollowIndex()}, + ActiveShardCount.DEFAULT, request.timeout(), result -> { + if (result) { + client.execute(FollowIndexAction.INSTANCE, request.getFollowRequest(), ActionListener.wrap( + r -> listener.onResponse(new Response(true, true, r.isAcknowledged())), + listener::onFailure + )); + } else { + listener.onResponse(new Response(true, false, false)); + } + }, listener::onFailure); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowRequest().getFollowIndex()); + } + + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java similarity index 97% rename from x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexAction.java rename to x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java index 2ebf1fc52b424..4a2d6767e88f4 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java @@ -40,13 +40,13 @@ import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.stream.Collectors; -public class FollowExistingIndexAction extends Action { +public class FollowIndexAction extends Action { - public static final FollowExistingIndexAction INSTANCE = new FollowExistingIndexAction(); - public static final String NAME = "cluster:admin/xpack/ccr/follow_existing_index"; + public static final FollowIndexAction INSTANCE = new FollowIndexAction(); + public static final String NAME = "cluster:admin/xpack/ccr/follow_index"; - private FollowExistingIndexAction() { + private FollowIndexAction() { super(NAME); } @@ -134,7 +134,7 @@ public void writeTo(StreamOutput out) throws IOException { } } - public static class RequestBuilder extends ActionRequestBuilder { + public static class RequestBuilder extends ActionRequestBuilder { RequestBuilder(ElasticsearchClient client, Action action) { super(client, action, new Request()); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestCreateAndFollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestCreateAndFollowIndexAction.java new file mode 100644 index 0000000000000..22d3390671935 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestCreateAndFollowIndexAction.java @@ -0,0 +1,38 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.io.IOException; + +import static org.elasticsearch.xpack.ccr.action.CreateAndFollowIndexAction.INSTANCE; +import static org.elasticsearch.xpack.ccr.action.CreateAndFollowIndexAction.Request; + +public class RestCreateAndFollowIndexAction extends BaseRestHandler { + + public RestCreateAndFollowIndexAction(Settings settings, RestController controller) { + super(settings); + controller.registerHandler(RestRequest.Method.POST, "/{index}/_xpack/ccr/_create_and_follow", this); + } + + @Override + public String getName() { + return "xpack_ccr_create_and_follow_index_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + Request request = new Request(); + request.setFollowRequest(RestFollowIndexAction.createRequest(restRequest)); + return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel)); + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowExistingIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowIndexAction.java similarity index 82% rename from x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowExistingIndexAction.java rename to x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowIndexAction.java index 538cf646a7ea1..235308f902926 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowExistingIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowIndexAction.java @@ -15,12 +15,12 @@ import java.io.IOException; -import static org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction.INSTANCE; -import static org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction.Request; +import static org.elasticsearch.xpack.ccr.action.FollowIndexAction.INSTANCE; +import static org.elasticsearch.xpack.ccr.action.FollowIndexAction.Request; -public class RestFollowExistingIndexAction extends BaseRestHandler { +public class RestFollowIndexAction extends BaseRestHandler { - public RestFollowExistingIndexAction(Settings settings, RestController controller) { + public RestFollowIndexAction(Settings settings, RestController controller) { super(settings); controller.registerHandler(RestRequest.Method.POST, "/{index}/_xpack/ccr/_follow", this); } @@ -32,6 +32,11 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + Request request = createRequest(restRequest); + return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel)); + } + + static Request createRequest(RestRequest restRequest) { Request request = new Request(); request.setLeaderIndex(restRequest.param("leader_index")); request.setFollowIndex(restRequest.param("index")); @@ -45,6 +50,6 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient long value = Long.valueOf(restRequest.param(ShardFollowTask.PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName())); request.setProcessorMaxTranslogBytes(value); } - return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel)); + return request; } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index 4ebe757938928..b8b3d4450c073 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -19,7 +19,6 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; @@ -28,7 +27,8 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.MockHttpTransport; import org.elasticsearch.test.discovery.TestZenDiscovery; -import org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction; +import org.elasticsearch.xpack.ccr.action.CreateAndFollowIndexAction; +import org.elasticsearch.xpack.ccr.action.FollowIndexAction; import org.elasticsearch.xpack.ccr.action.ShardChangesAction; import org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask; import org.elasticsearch.xpack.ccr.action.ShardFollowTask; @@ -143,21 +143,18 @@ public void testGetOperationsBasedOnGlobalSequenceId() throws Exception { public void testFollowIndex() throws Exception { final int numberOfPrimaryShards = randomIntBetween(1, 3); - final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureGreen("index1"); - final String followerIndexSettings = - getIndexSettings(numberOfPrimaryShards, Collections.singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true")); - assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON)); - - ensureGreen("index1", "index2"); - - final FollowExistingIndexAction.Request followRequest = new FollowExistingIndexAction.Request(); + final FollowIndexAction.Request followRequest = new FollowIndexAction.Request(); followRequest.setLeaderIndex("index1"); followRequest.setFollowIndex("index2"); - client().execute(FollowExistingIndexAction.INSTANCE, followRequest).get(); + + final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(); + createAndFollowRequest.setFollowRequest(followRequest); + client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); final int firstBatchNumDocs = randomIntBetween(2, 64); for (int i = 0; i < firstBatchNumDocs; i++) { @@ -180,6 +177,8 @@ public void testFollowIndex() throws Exception { assertBusy(assertExpectedDocumentRunnable(i)); } + unfollowIndex("index2"); + client().execute(FollowIndexAction.INSTANCE, followRequest).get(); final int secondBatchNumDocs = randomIntBetween(2, 64); for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); @@ -200,27 +199,7 @@ public void testFollowIndex() throws Exception { for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { assertBusy(assertExpectedDocumentRunnable(i)); } - - final UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request(); - unfollowRequest.setFollowIndex("index2"); - client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get(); - - assertBusy(() -> { - final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - assertThat(tasks.tasks().size(), equalTo(0)); - - ListTasksRequest listTasksRequest = new ListTasksRequest(); - listTasksRequest.setDetailed(true); - ListTasksResponse listTasksResponse = client().admin().cluster().listTasks(listTasksRequest).get(); - int numNodeTasks = 0; - for (TaskInfo taskInfo : listTasksResponse.getTasks()) { - if (taskInfo.getAction().startsWith(ListTasksAction.NAME) == false) { - numNodeTasks++; - } - } - assertThat(numNodeTasks, equalTo(0)); - }); + unfollowIndex("index2"); } public void testFollowIndexWithNestedField() throws Exception { @@ -234,10 +213,10 @@ public void testFollowIndexWithNestedField() throws Exception { ensureGreen("index1", "index2"); - final FollowExistingIndexAction.Request followRequest = new FollowExistingIndexAction.Request(); + final FollowIndexAction.Request followRequest = new FollowIndexAction.Request(); followRequest.setLeaderIndex("index1"); followRequest.setFollowIndex("index2"); - client().execute(FollowExistingIndexAction.INSTANCE, followRequest).get(); + client().execute(FollowIndexAction.INSTANCE, followRequest).get(); final int numDocs = randomIntBetween(2, 64); for (int i = 0; i < numDocs; i++) { @@ -287,19 +266,19 @@ public void testUnfollowNonExistingIndex() { public void testFollowNonExistentIndex() throws Exception { assertAcked(client().admin().indices().prepareCreate("test-leader").get()); assertAcked(client().admin().indices().prepareCreate("test-follower").get()); - final FollowExistingIndexAction.Request followRequest = new FollowExistingIndexAction.Request(); + final FollowIndexAction.Request followRequest = new FollowIndexAction.Request(); // Leader index does not exist. followRequest.setLeaderIndex("non-existent-leader"); followRequest.setFollowIndex("test-follower"); - expectThrows(IllegalArgumentException.class, () -> client().execute(FollowExistingIndexAction.INSTANCE, followRequest).actionGet()); + expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest).actionGet()); // Follower index does not exist. followRequest.setLeaderIndex("test-leader"); followRequest.setFollowIndex("non-existent-follower"); - expectThrows(IllegalArgumentException.class, () -> client().execute(FollowExistingIndexAction.INSTANCE, followRequest).actionGet()); + expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest).actionGet()); // Both indices do not exist. followRequest.setLeaderIndex("non-existent-leader"); followRequest.setFollowIndex("non-existent-follower"); - expectThrows(IllegalArgumentException.class, () -> client().execute(FollowExistingIndexAction.INSTANCE, followRequest).actionGet()); + expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest).actionGet()); } private CheckedRunnable assertTask(final int numberOfPrimaryShards, final Map numDocsPerShard) { @@ -338,6 +317,28 @@ private CheckedRunnable assertTask(final int numberOfPrimaryShards, f }; } + private void unfollowIndex(String index) throws Exception { + final UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request(); + unfollowRequest.setFollowIndex(index); + client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get(); + assertBusy(() -> { + final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + assertThat(tasks.tasks().size(), equalTo(0)); + + ListTasksRequest listTasksRequest = new ListTasksRequest(); + listTasksRequest.setDetailed(true); + ListTasksResponse listTasksResponse = client().admin().cluster().listTasks(listTasksRequest).get(); + int numNodeTasks = 0; + for (TaskInfo taskInfo : listTasksResponse.getTasks()) { + if (taskInfo.getAction().startsWith(ListTasksAction.NAME) == false) { + numNodeTasks++; + } + } + assertThat(numNodeTasks, equalTo(0)); + }); + } + private CheckedRunnable assertExpectedDocumentRunnable(final int value) { return () -> { final GetResponse getResponse = client().prepareGet("index2", "doc", Integer.toString(value)).get(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java similarity index 82% rename from x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexActionTests.java rename to x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java index 5c6e3e3f9543d..e67119bd76b2a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java @@ -14,41 +14,41 @@ import static org.hamcrest.Matchers.equalTo; -public class FollowExistingIndexActionTests extends ESTestCase { +public class FollowIndexActionTests extends ESTestCase { public void testValidation() { - FollowExistingIndexAction.Request request = new FollowExistingIndexAction.Request(); + FollowIndexAction.Request request = new FollowIndexAction.Request(); request.setLeaderIndex("index1"); request.setFollowIndex("index2"); { - Exception e = expectThrows(IllegalArgumentException.class, () -> FollowExistingIndexAction.validate(null, null, request)); + Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(null, null, request)); assertThat(e.getMessage(), equalTo("leader index [index1] does not exist")); } { IndexMetaData leaderIMD = createIMD("index1", 5); - Exception e = expectThrows(IllegalArgumentException.class, () -> FollowExistingIndexAction.validate(leaderIMD, null, request)); + Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(leaderIMD, null, request)); assertThat(e.getMessage(), equalTo("follow index [index2] does not exist")); } { IndexMetaData leaderIMD = createIMD("index1", 5); IndexMetaData followIMD = createIMD("index2", 5); Exception e = expectThrows(IllegalArgumentException.class, - () -> FollowExistingIndexAction.validate(leaderIMD, followIMD, request)); + () -> FollowIndexAction.validate(leaderIMD, followIMD, request)); assertThat(e.getMessage(), equalTo("leader index [index1] does not have soft deletes enabled")); } { IndexMetaData leaderIMD = createIMD("index1", 5, new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); IndexMetaData followIMD = createIMD("index2", 4); Exception e = expectThrows(IllegalArgumentException.class, - () -> FollowExistingIndexAction.validate(leaderIMD, followIMD, request)); + () -> FollowIndexAction.validate(leaderIMD, followIMD, request)); assertThat(e.getMessage(), equalTo("leader index primary shards [5] does not match with the number of shards of the follow index [4]")); } { IndexMetaData leaderIMD = createIMD("index1", 5, new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); IndexMetaData followIMD = createIMD("index2", 5); - FollowExistingIndexAction.validate(leaderIMD, followIMD, request); + FollowIndexAction.validate(leaderIMD, followIMD, request); } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.ccr.create_and_follow_index.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.ccr.create_and_follow_index.json new file mode 100644 index 0000000000000..236e8e6759cb1 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.ccr.create_and_follow_index.json @@ -0,0 +1,24 @@ +{ + "xpack.ccr.create_and_follow_index": { + "documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current", + "methods": [ "POST" ], + "url": { + "path": "/{index}/_xpack/ccr/_create_and_follow", + "paths": [ "/{index}/_xpack/ccr/_create_and_follow" ], + "parts": { + "index": { + "type": "string", + "required": true, + "description": "The name of the index that follows the leader index." + } + }, + "params": { + "leader_index": { + "type": "string", + "required": true, + "description": "The name of the index to read the changes from." + } + } + } + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.ccr.follow_existing_index.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.ccr.follow_index.json similarity index 94% rename from x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.ccr.follow_existing_index.json rename to x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.ccr.follow_index.json index 68b29adecb2c1..9decf2537bdec 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.ccr.follow_existing_index.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.ccr.follow_index.json @@ -1,5 +1,5 @@ { - "xpack.ccr.follow_existing_index": { + "xpack.ccr.follow_index": { "documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current", "methods": [ "POST" ], "url": { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml index 2d0a361f2d492..74b5d7e756c8f 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml @@ -16,18 +16,20 @@ - is_true: acknowledged - do: - indices.create: + xpack.ccr.create_and_follow_index: + leader_index: foo + index: bar + - is_true: follow_index_created + - is_true: follow_index_shards_acked + - is_true: index_following_started + + - do: + xpack.ccr.unfollow_index: index: bar - body: - mappings: - doc: - properties: - field: - type: keyword - is_true: acknowledged - do: - xpack.ccr.follow_existing_index: + xpack.ccr.follow_index: leader_index: foo index: bar - is_true: acknowledged