From 892e1757b2c597e2d0d58574e4b01ea84d756c8e Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 13 Jun 2018 12:05:09 +0200 Subject: [PATCH] iter --- .../xpack/ccr/action/FollowIndexAction.java | 105 +++++++++--------- .../ccr/action/FollowIndexActionTests.java | 12 +- 2 files changed, 64 insertions(+), 53 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java index f7c271c8bc6bb..1dca0522eb739 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java @@ -193,6 +193,7 @@ protected void doExecute(Request request, ActionListener listener) { start(request, null, leaderIndexMetadata, followIndexMetadata, listener); } catch (IOException e) { listener.onFailure(e); + return; } } else { // Following an index in remote cluster, so use remote client to fetch leader IndexMetaData: @@ -229,75 +230,75 @@ void start(Request request, String clusterNameAlias, IndexMetaData leaderIndexMe ActionListener handler) throws IOException { MapperService mapperService = followIndexMetadata != null ? indicesService.createIndexMapperService(followIndexMetadata) : null; validate(request, leaderIndexMetadata, followIndexMetadata, mapperService); - final int numShards = followIndexMetadata.getNumberOfShards(); - final AtomicInteger counter = new AtomicInteger(numShards); - final AtomicReferenceArray responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards()); - Map filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream() - .filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));for (int i = 0; i < numShards; i++) { - final int shardId = i; - String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId; - ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias, - new ShardId(followIndexMetadata.getIndex(), shardId), - new ShardId(leaderIndexMetadata.getIndex(), shardId), - request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes, filteredHeaders); - persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask, - new ActionListener>() { - @Override - public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { - responses.set(shardId, task); - finalizeResponse(); - } - + final int numShards = followIndexMetadata.getNumberOfShards(); + final AtomicInteger counter = new AtomicInteger(numShards); + final AtomicReferenceArray responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards()); + Map filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream() + .filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));for (int i = 0; i < numShards; i++) { + final int shardId = i; + String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId; + ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias, + new ShardId(followIndexMetadata.getIndex(), shardId), + new ShardId(leaderIndexMetadata.getIndex(), shardId), + request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes, filteredHeaders); + persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask, + new ActionListener>() { @Override - public void onFailure(Exception e) { - responses.set(shardId, e); + public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { + responses.set(shardId, task); finalizeResponse(); } - void finalizeResponse() { - Exception error = null; - if (counter.decrementAndGet() == 0) { - for (int j = 0; j < responses.length(); j++) { - Object response = responses.get(j); - if (response instanceof Exception) { - if (error == null) { - error = (Exception) response; - } else { - error.addSuppressed((Throwable) response); - } + @Override + public void onFailure(Exception e) { + responses.set(shardId, e); + finalizeResponse(); + } + + void finalizeResponse() { + Exception error = null; + if (counter.decrementAndGet() == 0) { + for (int j = 0; j < responses.length(); j++) { + Object response = responses.get(j); + if (response instanceof Exception) { + if (error == null) { + error = (Exception) response; + } else { + error.addSuppressed((Throwable) response); } } + } - if (error == null) { - // include task ids? - handler.onResponse(new Response(true)); - } else { - // TODO: cancel all started tasks - handler.onFailure(error); - } + if (error == null) { + // include task ids? + handler.onResponse(new Response(true)); + } else { + // TODO: cancel all started tasks + handler.onFailure(error); } } } + } ); } } } - + private static final Set> WHITELISTED_SETTINGS; - + static { Set> whiteListedSettings = new HashSet<>(); whiteListedSettings.add(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING); whiteListedSettings.add(IndexMetaData.INDEX_AUTO_EXPAND_REPLICAS_SETTING); - + whiteListedSettings.add(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING); whiteListedSettings.add(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING); whiteListedSettings.add(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING); whiteListedSettings.add(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING); whiteListedSettings.add(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING); whiteListedSettings.add(ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING); - + whiteListedSettings.add(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING); whiteListedSettings.add(IndexSettings.MAX_RESULT_WINDOW_SETTING); whiteListedSettings.add(IndexSettings.INDEX_WARMER_ENABLED_SETTING); @@ -311,7 +312,7 @@ void finalizeResponse() { whiteListedSettings.add(IndexSettings.ALLOW_UNMAPPED); whiteListedSettings.add(IndexSettings.INDEX_SEARCH_IDLE_AFTER); whiteListedSettings.add(BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING); - + whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_DEBUG_SETTING); whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_WARN_SETTING); whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_INFO_SETTING); @@ -328,10 +329,10 @@ void finalizeResponse() { whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_LEVEL_SETTING); whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_REFORMAT_SETTING); whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_MAX_SOURCE_CHARS_TO_LOG_SETTING); - + whiteListedSettings.add(IndexSettings.INDEX_SOFT_DELETES_SETTING); whiteListedSettings.add(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING); - + WHITELISTED_SETTINGS = Collections.unmodifiableSet(whiteListedSettings); } @@ -356,19 +357,19 @@ static void validate(Request request, IndexMetaData leaderIndex, IndexMetaData f if (leaderIndex.getState() != IndexMetaData.State.OPEN || followIndex.getState() != IndexMetaData.State.OPEN) { throw new IllegalArgumentException("leader and follow index must be open"); } - + // Make a copy, remove settings that are allowed to be different and then compare if the settings are equal. Settings leaderSettings = filter(leaderIndex.getSettings()); Settings followerSettings = filter(followIndex.getSettings()); if (leaderSettings.equals(followerSettings) == false) { throw new IllegalArgumentException("the leader and follower index settings must be identical"); } - + // Validates if the current follower mapping is mergable with the leader mapping. // This also validates for example whether specific mapper plugins have been installed followerMapperService.merge(leaderIndex, MapperService.MergeReason.MAPPING_RECOVERY); } - + private static Settings filter(Settings originalSettings) { Settings.Builder settings = Settings.builder().put(originalSettings); // Remove settings that are always going to be different between leader and follow index: @@ -376,7 +377,7 @@ private static Settings filter(Settings originalSettings) { settings.remove(IndexMetaData.SETTING_INDEX_UUID); settings.remove(IndexMetaData.SETTING_INDEX_PROVIDED_NAME); settings.remove(IndexMetaData.SETTING_CREATION_DATE); - + Iterator iterator = settings.keys().iterator(); while (iterator.hasNext()) { String key = iterator.next(); @@ -389,5 +390,5 @@ private static Settings filter(Settings originalSettings) { } return settings.build(); } - + } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java index 313a21a414444..a27294ccf2df5 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java @@ -27,15 +27,18 @@ public void testValidation() throws IOException { request.setFollowIndex("index2"); { + // should fail, because leader index does not exist Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(request, null, null, null)); assertThat(e.getMessage(), equalTo("leader index [index1] does not exist")); } { + // should fail, because follow index does not exist IndexMetaData leaderIMD = createIMD("index1", 5); Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(request, leaderIMD, null, null)); assertThat(e.getMessage(), equalTo("follow index [index2] does not exist")); } { + // should fail because leader index does not have soft deletes enabled IndexMetaData leaderIMD = createIMD("index1", 5); IndexMetaData followIMD = createIMD("index2", 5); Exception e = expectThrows(IllegalArgumentException.class, @@ -43,6 +46,7 @@ public void testValidation() throws IOException { assertThat(e.getMessage(), equalTo("leader index [index1] does not have soft deletes enabled")); } { + // should fail because the number of primary shards between leader and follow index are not equal IndexMetaData leaderIMD = createIMD("index1", 5, new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); IndexMetaData followIMD = createIMD("index2", 4); Exception e = expectThrows(IllegalArgumentException.class, @@ -51,6 +55,7 @@ public void testValidation() throws IOException { equalTo("leader index primary shards [5] does not match with the number of shards of the follow index [4]")); } { + // should fail, because leader index is closed IndexMetaData leaderIMD = createIMD("index1", State.CLOSE, "{}", 5, new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); IndexMetaData followIMD = createIMD("index2", State.OPEN, "{}", 5, @@ -60,6 +65,7 @@ public void testValidation() throws IOException { assertThat(e.getMessage(), equalTo("leader and follow index must be open")); } { + // should fail, because leader has a field with the same name mapped as keyword and follower as text IndexMetaData leaderIMD = createIMD("index1", State.OPEN, "{\"properties\": {\"field\": {\"type\": \"keyword\"}}}", 5, new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); IndexMetaData followIMD = createIMD("index2", State.OPEN, "{\"properties\": {\"field\": {\"type\": \"text\"}}}", 5); @@ -70,6 +76,7 @@ public void testValidation() throws IOException { assertThat(e.getMessage(), equalTo("mapper [field] of different type, current_type [text], merged_type [keyword]")); } { + // should fail because of non whitelisted settings not the same between leader and follow index String mapping = "{\"properties\": {\"field\": {\"type\": \"text\", \"analyzer\": \"my_analyzer\"}}}"; IndexMetaData leaderIMD = createIMD("index1", State.OPEN, mapping, 5, new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"), @@ -83,6 +90,7 @@ public void testValidation() throws IOException { assertThat(e.getMessage(), equalTo("the leader and follower index settings must be identical")); } { + // should succeed IndexMetaData leaderIMD = createIMD("index1", 5, new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); IndexMetaData followIMD = createIMD("index2", 5); MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), Settings.EMPTY, "index2"); @@ -90,6 +98,7 @@ public void testValidation() throws IOException { FollowIndexAction.validate(request, leaderIMD, followIMD, mapperService); } { + // should succeed, index settings are identical String mapping = "{\"properties\": {\"field\": {\"type\": \"text\", \"analyzer\": \"my_analyzer\"}}}"; IndexMetaData leaderIMD = createIMD("index1", State.OPEN, mapping, 5, new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"), @@ -104,6 +113,7 @@ public void testValidation() throws IOException { FollowIndexAction.validate(request, leaderIMD, followIMD, mapperService); } { + // should succeed despite whitelisted settings being different String mapping = "{\"properties\": {\"field\": {\"type\": \"text\", \"analyzer\": \"my_analyzer\"}}}"; IndexMetaData leaderIMD = createIMD("index1", State.OPEN, mapping, 5, new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"), @@ -124,7 +134,7 @@ public void testValidation() throws IOException { private static IndexMetaData createIMD(String index, int numShards, Tuple... settings) throws IOException { return createIMD(index, State.OPEN, "{\"properties\": {}}", numShards, settings); } - + private static IndexMetaData createIMD(String index, State state, String mapping, int numShards, Tuple... settings) throws IOException { Settings.Builder settingsBuilder = settings(Version.CURRENT);