From 10c0a4d7e22e2b797a2af5ee5e521e20a7f5be55 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 1 Jun 2016 14:35:51 +0200 Subject: [PATCH 1/9] Allow `_shrink` to N shards if source shards is a multiple of N Today we allow to shrink to 1 shard but that might not be possible due to too many documnent or a single shard doesn't meet the requirements for the index. The logic can be expanded to N shards if the source index shards is a mutiple of N. This guarantees that there are not hotspots created due to different number of shards being shrunk into one. --- .../admin/indices/shrink/ShrinkRequest.java | 2 +- .../indices/shrink/ShrinkRequestBuilder.java | 2 +- .../indices/shrink/TransportShrinkAction.java | 56 +++--- .../cluster/metadata/IndexMetaData.java | 87 ++++++++- .../metadata/MetaDataCreateIndexService.java | 23 ++- .../cluster/routing/OperationRouting.java | 4 +- .../elasticsearch/index/shard/IndexShard.java | 7 +- .../admin/indices/RestShrinkIndexAction.java | 2 +- .../admin/indices/create/CreateIndexIT.java | 70 +++++++ .../shrink/TransportShrinkActionTests.java | 37 ++-- .../MetaDataCreateIndexServiceTests.java | 33 ++-- .../routing/OperationRoutingTests.java | 173 ++++++++++++++++++ 12 files changed, 429 insertions(+), 67 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java index 069c27ce4751c..a14a3f37a8698 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java @@ -94,7 +94,7 @@ public void setShrinkIndex(CreateIndexRequest shrinkIndexRequest) { /** * Returns the {@link CreateIndexRequest} for the shrink index */ - public CreateIndexRequest getShrinkIndexReqeust() { + public CreateIndexRequest getShrinkIndexRequest() { return shrinkIndexRequest; } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java index a098215b7503f..ab392a7f8249d 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java @@ -41,7 +41,7 @@ public ShrinkRequestBuilder setSourceIndex(String index) { } public ShrinkRequestBuilder setSettings(Settings settings) { - this.request.getShrinkIndexReqeust().settings(settings); + this.request.getShrinkIndexRequest().settings(settings); return this; } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java index ca007e312163a..7aebcf3911fce 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.stats.IndexShardStats; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; @@ -34,27 +35,17 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.shard.DocsStats; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Predicate; +import java.util.function.IntFunction; /** * Main class to initiate shrinking an index into a new index with a single shard @@ -87,7 +78,7 @@ protected ShrinkResponse newResponse() { @Override protected ClusterBlockException checkBlock(ShrinkRequest request, ClusterState state) { - return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getShrinkIndexReqeust().index()); + return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getShrinkIndexRequest().index()); } @Override @@ -98,7 +89,10 @@ protected void masterOperation(final ShrinkRequest shrinkRequest, final ClusterS @Override public void onResponse(IndicesStatsResponse indicesStatsResponse) { CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(shrinkRequest, state, - indicesStatsResponse.getTotal().getDocs(), indexNameExpressionResolver); + (i) -> { + IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i); + return shard == null ? null : shard.getPrimary().getDocs(); + }, indexNameExpressionResolver); createIndexService.createIndex(updateRequest, new ActionListener() { @Override public void onResponse(ClusterStateUpdateResponse response) { @@ -127,24 +121,36 @@ public void onFailure(Throwable e) { // static for unittesting this method static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final ShrinkRequest shrinkReqeust, final ClusterState state - , final DocsStats docsStats, IndexNameExpressionResolver indexNameExpressionResolver) { + , final IntFunction perShardDocStats, IndexNameExpressionResolver indexNameExpressionResolver) { final String sourceIndex = indexNameExpressionResolver.resolveDateMathExpression(shrinkReqeust.getSourceIndex()); - final CreateIndexRequest targetIndex = shrinkReqeust.getShrinkIndexReqeust(); + final CreateIndexRequest targetIndex = shrinkReqeust.getShrinkIndexRequest(); final String targetIndexName = indexNameExpressionResolver.resolveDateMathExpression(targetIndex.index()); final IndexMetaData metaData = state.metaData().index(sourceIndex); final Settings targetIndexSettings = Settings.builder().put(targetIndex.settings()) .normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX).build(); - long count = docsStats.getCount(); - if (count >= IndexWriter.MAX_DOCS) { - throw new IllegalStateException("Can't merge index with more than [" + IndexWriter.MAX_DOCS - + "] docs - too many documents"); + int numShards = 1; + if (IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.exists(targetIndexSettings)) { + numShards = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(targetIndexSettings); + } + for (int i = 0; i < numShards; i++) { + Set shardIds = IndexMetaData.selectShrinkShards(i, metaData, numShards); + long count = 0; + for (ShardId id : shardIds) { + DocsStats docsStats = perShardDocStats.apply(id.id()); + if (docsStats != null) { + count += docsStats.getCount(); + } + if (count >= IndexWriter.MAX_DOCS) { + throw new IllegalStateException("Can't merge index with more than [" + IndexWriter.MAX_DOCS + + "] docs - too many documents in shards " + shardIds); + } + } + } targetIndex.cause("shrink_index"); - targetIndex.settings(Settings.builder() - .put(targetIndexSettings) - // we can only shrink to 1 index so far! - .put("index.number_of_shards", 1) - ); + Settings.Builder settingsBuilder = Settings.builder().put(targetIndexSettings); + settingsBuilder.put("index.number_of_shards", numShards); + targetIndex.settings(settingsBuilder); return new CreateIndexClusterStateUpdateRequest(targetIndex, "shrink_index", targetIndexName, true) diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java index a87c887a4a1d8..fee583ebe54d2 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java @@ -56,17 +56,20 @@ import org.elasticsearch.gateway.MetaDataStateFormat; import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import java.io.IOException; import java.text.ParseException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; @@ -234,6 +237,13 @@ public static State fromString(String state) { public static final String INDEX_STATE_FILE_PREFIX = "state-"; + + public static final Setting INDEX_ROUTING_NUM_SHARDS = Setting.intSetting("index.routing.num_shards", INDEX_NUMBER_OF_SHARDS_SETTING, 1); + public static final Setting INDEX_ROUTING_FACTOR = Setting.intSetting("index.routing.factor", 1, 1); + + private final int routingNumShards; + private final int routingFactor; + private final int numberOfShards; private final int numberOfReplicas; @@ -268,7 +278,8 @@ private IndexMetaData(Index index, long version, long[] primaryTerms, State stat ImmutableOpenMap mappings, ImmutableOpenMap aliases, ImmutableOpenMap customs, ImmutableOpenIntMap> activeAllocationIds, DiscoveryNodeFilters requireFilters, DiscoveryNodeFilters initialRecoveryFilters, DiscoveryNodeFilters includeFilters, DiscoveryNodeFilters excludeFilters, - Version indexCreatedVersion, Version indexUpgradedVersion, org.apache.lucene.util.Version minimumCompatibleLuceneVersion) { + Version indexCreatedVersion, Version indexUpgradedVersion, org.apache.lucene.util.Version minimumCompatibleLuceneVersion, + int routingNumShards, int routingFactor) { this.index = index; this.version = version; @@ -290,6 +301,8 @@ private IndexMetaData(Index index, long version, long[] primaryTerms, State stat this.indexCreatedVersion = indexCreatedVersion; this.indexUpgradedVersion = indexUpgradedVersion; this.minimumCompatibleLuceneVersion = minimumCompatibleLuceneVersion; + this.routingNumShards = routingNumShards; + this.routingFactor = routingFactor; } public Index getIndex() { @@ -533,6 +546,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws private static class IndexMetaDataDiff implements Diff { private final String index; + private final int routingFactor; + private final int routingNumShards; private final long version; private final long[] primaryTerms; private final State state; @@ -545,6 +560,8 @@ private static class IndexMetaDataDiff implements Diff { public IndexMetaDataDiff(IndexMetaData before, IndexMetaData after) { index = after.index.getName(); version = after.version; + routingFactor = after.routingFactor; + routingNumShards = after.routingNumShards; state = after.state; settings = after.settings; primaryTerms = after.primaryTerms; @@ -557,6 +574,8 @@ public IndexMetaDataDiff(IndexMetaData before, IndexMetaData after) { public IndexMetaDataDiff(StreamInput in) throws IOException { index = in.readString(); + routingFactor = in.readInt(); + routingNumShards = in.readInt(); version = in.readLong(); state = State.fromId(in.readByte()); settings = Settings.readSettingsFromStream(in); @@ -582,6 +601,8 @@ public Diff readDiff(StreamInput in, String key) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(index); + out.writeInt(routingFactor); + out.writeInt(routingNumShards); out.writeLong(version); out.writeByte(state.id); Settings.writeSettingsToStream(settings, out); @@ -596,6 +617,8 @@ public void writeTo(StreamOutput out) throws IOException { public IndexMetaData apply(IndexMetaData part) { Builder builder = builder(index); builder.version(version); + builder.setRoutingFactor(routingFactor); + builder.setRoutingNumShards(routingNumShards); builder.state(state); builder.settings(settings); builder.primaryTerms(primaryTerms); @@ -611,6 +634,8 @@ public IndexMetaData apply(IndexMetaData part) { public IndexMetaData readFrom(StreamInput in) throws IOException { Builder builder = new Builder(in.readString()); builder.version(in.readLong()); + builder.setRoutingFactor(in.readInt()); + builder.setRoutingNumShards(in.readInt()); builder.state(State.fromId(in.readByte())); builder.settings(readSettingsFromStream(in)); builder.primaryTerms(in.readVLongArray()); @@ -643,6 +668,8 @@ public IndexMetaData readFrom(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { out.writeString(index.getName()); // uuid will come as part of settings out.writeLong(version); + out.writeInt(routingFactor); + out.writeInt(routingNumShards); out.writeByte(state.id()); writeSettingsToStream(settings, out); out.writeVLongArray(primaryTerms); @@ -685,6 +712,8 @@ public static class Builder { private final ImmutableOpenMap.Builder aliases; private final ImmutableOpenMap.Builder customs; private final ImmutableOpenIntMap.Builder> activeAllocationIds; + private Integer routingNumShards; + private int routingFactor = 1; public Builder(String index) { this.index = index; @@ -703,6 +732,8 @@ public Builder(IndexMetaData indexMetaData) { this.mappings = ImmutableOpenMap.builder(indexMetaData.mappings); this.aliases = ImmutableOpenMap.builder(indexMetaData.aliases); this.customs = ImmutableOpenMap.builder(indexMetaData.customs); + this.routingFactor = indexMetaData.routingFactor; + this.routingNumShards = indexMetaData.routingNumShards; this.activeAllocationIds = ImmutableOpenIntMap.builder(indexMetaData.activeAllocationIds); } @@ -720,6 +751,24 @@ public Builder numberOfShards(int numberOfShards) { return this; } + public Builder setRoutingNumShards(int routingNumShards) { + this.routingNumShards = routingNumShards; + return this; + } + + public Builder setRoutingFactor(int routingFactor) { + this.routingFactor = routingFactor; + return this; + } + + public int getRoutingNumShards() { + return routingNumShards == null ? numberOfShards() : routingNumShards; + } + + public int getRoutingFactor() { + return routingFactor; + } + public int numberOfShards() { return settings.getAsInt(SETTING_NUMBER_OF_SHARDS, -1); } @@ -934,13 +983,15 @@ public IndexMetaData build() { final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE); return new IndexMetaData(new Index(index, uuid), version, primaryTerms, state, numberOfShards, numberOfReplicas, tmpSettings, mappings.build(), tmpAliases.build(), customs.build(), filledActiveAllocationIds.build(), requireFilters, initialRecoveryFilters, includeFilters, excludeFilters, - indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion); + indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion, getRoutingNumShards(), routingFactor); } public static void toXContent(IndexMetaData indexMetaData, XContentBuilder builder, ToXContent.Params params) throws IOException { builder.startObject(indexMetaData.getIndex().getName()); builder.field(KEY_VERSION, indexMetaData.getVersion()); + builder.field("routing_factor", indexMetaData.getRoutingFactor()); + builder.field("routing_num_shards", indexMetaData.getRoutingNumShards()); builder.field(KEY_STATE, indexMetaData.getState().toString().toLowerCase(Locale.ENGLISH)); boolean binary = params.paramAsBoolean("binary", false); @@ -1101,6 +1152,10 @@ public static IndexMetaData fromXContent(XContentParser parser) throws IOExcepti builder.state(State.fromString(parser.text())); } else if (KEY_VERSION.equals(currentFieldName)) { builder.version(parser.longValue()); + } else if ("routing_factor".equals(currentFieldName)) { + builder.setRoutingFactor(parser.intValue()); + } else if ("routing_num_shards".equals(currentFieldName)) { + builder.setRoutingNumShards(parser.intValue()); } else { throw new IllegalArgumentException("Unexpected field [" + currentFieldName + "]"); } @@ -1175,4 +1230,32 @@ public IndexMetaData fromXContent(XContentParser parser) throws IOException { return Builder.fromXContent(parser); } }; + + public int getRoutingNumShards() { + return routingNumShards; + } + + public int getRoutingFactor() { + return routingFactor; + } + + + public static Set selectShrinkShards(int shardId, IndexMetaData sourceIndexMetadata, int numTargetShards) { + int shrinkFactor = getShrinkFactor(sourceIndexMetadata, numTargetShards); + Set shards = new HashSet<>(shrinkFactor); + for (int i = shardId * shrinkFactor; i < shrinkFactor*shardId + shrinkFactor; i++) { + shards.add(new ShardId(sourceIndexMetadata.getIndex(), i)); + } + return shards; + } + + public static int getShrinkFactor(IndexMetaData sourceIndexMetadata, int targetNumberOfShards) { + int sourceNumberOfShards = sourceIndexMetadata.getNumberOfShards(); + int factor = sourceNumberOfShards / targetNumberOfShards; + if (factor * targetNumberOfShards != sourceNumberOfShards || factor <= 1) { + throw new IllegalStateException("the number of source shards [" + sourceNumberOfShards + "] must be a must be a multiple of [" + + targetNumberOfShards + "]"); + } + return factor; + } } diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index 47528428928ad..13b5fc35984c1 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -299,15 +299,23 @@ public ClusterState execute(ClusterState currentState) throws Exception { indexSettingsBuilder.put(SETTING_INDEX_UUID, UUIDs.randomBase64UUID()); final Index shrinkFromIndex = request.shrinkFrom(); + int routingFactor = 1; + int routingNumShards = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(indexSettingsBuilder.build());; if (shrinkFromIndex != null) { prepareShrinkIndexSettings(currentState, mappings.keySet(), indexSettingsBuilder, shrinkFromIndex, request.index()); + IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(shrinkFromIndex); + int shrinkFactor = IndexMetaData.getShrinkFactor(sourceMetaData, routingNumShards); + routingFactor = sourceMetaData.getRoutingFactor() * shrinkFactor; + routingNumShards = sourceMetaData.getRoutingNumShards(); } Settings actualIndexSettings = indexSettingsBuilder.build(); - + IndexMetaData.Builder tmpImdBuilder = IndexMetaData.builder(request.index()) + .setRoutingFactor(routingFactor) + .setRoutingNumShards(routingNumShards); // Set up everything, now locally create the index to see that things are ok, and apply - final IndexMetaData tmpImd = IndexMetaData.builder(request.index()).settings(actualIndexSettings).build(); + final IndexMetaData tmpImd = tmpImdBuilder.settings(actualIndexSettings).build(); // create the index here (on the master) to validate it can be created, as well as adding the mapping final IndexService indexService = indicesService.createIndex(nodeServicesProvider, tmpImd, Collections.emptyList()); createdIndex = indexService.index(); @@ -339,7 +347,9 @@ public ClusterState execute(ClusterState currentState) throws Exception { mappingsMetaData.put(mapper.type(), mappingMd); } - final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(request.index()).settings(actualIndexSettings); + final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(request.index()).settings(actualIndexSettings) + .setRoutingFactor(routingFactor) + .setRoutingNumShards(routingNumShards); for (MappingMetaData mappingMd : mappingsMetaData.values()) { indexMetaDataBuilder.putMapping(mappingMd); } @@ -499,9 +509,8 @@ static List validateShrinkIndex(ClusterState state, String sourceIndex, throw new IllegalArgumentException("mappings are not allowed when shrinking indices" + ", all mappings are copied from the source index"); } - if (IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.exists(targetIndexSettings) - && IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(targetIndexSettings) > 1) { - throw new IllegalArgumentException("can not shrink index into more than one shard"); + if (IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.exists(targetIndexSettings)) { + IndexMetaData.getShrinkFactor(sourceMetaData, IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(targetIndexSettings)); } // now check that index is all on one node @@ -533,8 +542,6 @@ static void prepareShrinkIndexSettings(ClusterState currentState, Set ma final Predicate analysisSimilarityPredicate = (s) -> s.startsWith("index.similarity.") || s.startsWith("index.analysis."); indexSettingsBuilder - // we can only shrink to 1 shard so far! - .put("index.number_of_shards", 1) // we use "i.r.a.initial_recovery" rather than "i.r.a.require|include" since we want the replica to allocate right away // once we are allocated. .put("index.routing.allocation.initial_recovery._id", diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java b/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java index 3fb6f55a91943..be689fa40e512 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java @@ -218,14 +218,14 @@ public ShardId shardId(ClusterState clusterState, String index, String id, @Null return new ShardId(indexMetaData.getIndex(), generateShardId(indexMetaData, id, routing)); } - private int generateShardId(IndexMetaData indexMetaData, String id, @Nullable String routing) { + static int generateShardId(IndexMetaData indexMetaData, String id, @Nullable String routing) { final int hash; if (routing == null) { hash = Murmur3HashFunction.hash(id); } else { hash = Murmur3HashFunction.hash(routing); } - return Math.floorMod(hash, indexMetaData.getNumberOfShards()); + return Math.floorDiv(Math.floorMod(hash, indexMetaData.getRoutingNumShards()), indexMetaData.getRoutingFactor()); } private void ensureNodeIdExists(DiscoveryNodes nodes, String nodeId) { diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 33d1aec7ed0ae..e55fc48a7b3d4 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -126,6 +126,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -133,6 +134,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.BiConsumer; +import java.util.stream.Collectors; public class IndexShard extends AbstractIndexShardComponent { @@ -1411,7 +1413,10 @@ public void startRecovery(DiscoveryNode localNode, DiscoveryNode sourceNode, Rec markAsRecovering("from local shards", recoveryState); // mark the shard as recovering on the cluster state thread threadPool.generic().execute(() -> { try { - if (recoverFromLocalShards(mappingUpdateConsumer, startedShards)) { + Set shards = indexMetaData.selectShrinkShards(shardId().id(), sourceIndexService.getMetaData(), + indexMetaData.getNumberOfShards()); + if (recoverFromLocalShards(mappingUpdateConsumer, startedShards.stream() + .filter((s) -> shards.contains(s.shardId())).collect(Collectors.toList()))) { recoveryListener.onRecoveryDone(recoveryState); } } catch (Throwable t) { diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestShrinkIndexAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestShrinkIndexAction.java index b5cc861b65eae..42117cbf328ff 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestShrinkIndexAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestShrinkIndexAction.java @@ -52,7 +52,7 @@ public void handleRequest(final RestRequest request, final RestChannel channel, } ShrinkRequest shrinkIndexRequest = new ShrinkRequest(request.param("target"), request.param("index")); if (request.hasContent()) { - shrinkIndexRequest.getShrinkIndexReqeust().source(request.content()); + shrinkIndexRequest.getShrinkIndexRequest().source(request.content()); } shrinkIndexRequest.timeout(request.paramAsTime("timeout", shrinkIndexRequest.timeout())); shrinkIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", shrinkIndexRequest.masterNodeTimeout())); diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java index 60c4d0f486e7f..a4e75f4a8cc07 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java @@ -289,6 +289,76 @@ public void testRestartIndexCreationAfterFullClusterRestart() throws Exception { ensureGreen("test"); } + public void testCreateShrinkIndexToN() { + int[][] possibleShardSplits = new int[][] { + {8,4,2}, {9, 3, 1}, {4, 2, 1}, {15,5,1} + }; + int[] shardSplits = randomFrom(possibleShardSplits); + assertEquals(shardSplits[0], (shardSplits[0] / shardSplits[1]) * shardSplits[1]); + assertEquals(shardSplits[1], (shardSplits[1] / shardSplits[2]) * shardSplits[2]); + internalCluster().ensureAtLeastNumDataNodes(2); + prepareCreate("source").setSettings(Settings.builder().put(indexSettings()).put("number_of_shards", shardSplits[0])).get(); + for (int i = 0; i < 20; i++) { + client().prepareIndex("source", "t1", Integer.toString(i)).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get(); + } + ImmutableOpenMap dataNodes = client().admin().cluster().prepareState().get().getState().nodes() + .getDataNodes(); + assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2); + DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class); + String mergeNode = discoveryNodes[0].getName(); + // relocate all shards to one node such that we can merge it. + client().admin().indices().prepareUpdateSettings("source") + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", mergeNode) + .put("index.blocks.write", true)).get(); + ensureGreen(); + // now merge source into a 4 shard index + assertAcked(client().admin().indices().prepareShrinkIndex("source", "first_shrink") + .setSettings(Settings.builder() + .put("index.number_of_replicas", 0) + .put("index.number_of_shards", shardSplits[1]).build()).get()); + ensureGreen(); + assertHitCount(client().prepareSearch("first_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); + + for (int i = 0; i < 20; i++) { // now update + client().prepareIndex("first_shrink", "t1", Integer.toString(i)).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get(); + } + flushAndRefresh(); + assertHitCount(client().prepareSearch("first_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); + assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); + + // relocate all shards to one node such that we can merge it. + client().admin().indices().prepareUpdateSettings("first_shrink") + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", mergeNode) + .put("index.blocks.write", true)).get(); + ensureGreen(); + // now merge source into a 2 shard index + assertAcked(client().admin().indices().prepareShrinkIndex("first_shrink", "second_shrink") + .setSettings(Settings.builder() + .put("index.number_of_replicas", 0) + .put("index.number_of_shards", shardSplits[2]).build()).get()); + ensureGreen(); + assertHitCount(client().prepareSearch("second_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); + // let it be allocated anywhere and bump replicas + client().admin().indices().prepareUpdateSettings("second_shrink") + .setSettings(Settings.builder() + .putNull("index.routing.allocation.include._id") + .put("index.number_of_replicas", 1)).get(); + ensureGreen(); + assertHitCount(client().prepareSearch("second_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); + + for (int i = 0; i < 20; i++) { // now update + client().prepareIndex("second_shrink", "t1", Integer.toString(i)).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get(); + } + flushAndRefresh(); + assertHitCount(client().prepareSearch("second_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); + assertHitCount(client().prepareSearch("first_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); + assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); + + + } + public void testCreateShrinkIndex() { internalCluster().ensureAtLeastNumDataNodes(2); prepareCreate("source").setSettings(Settings.builder().put(indexSettings()).put("number_of_shards", randomIntBetween(2, 7))).get(); diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java b/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java index 50bf8715f198f..fa6c5e5453b4e 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java @@ -28,7 +28,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingTable; @@ -39,9 +38,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.DummyTransportAddress; -import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.shard.DocsStats; -import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.gateway.NoopGatewayAllocator; @@ -70,15 +67,26 @@ private ClusterState createClusterState(String name, int numShards, int numRepli } public void testErrorCondition() { - ClusterState state = createClusterState("source", randomIntBetween(2, 100), randomIntBetween(0, 10), + ClusterState state = createClusterState("source", randomIntBetween(2, 42), randomIntBetween(0, 10), Settings.builder().put("index.blocks.write", true).build()); - DocsStats stats = new DocsStats(randomIntBetween(0, IndexWriter.MAX_DOCS-1), randomIntBetween(1, 1000)); - - assertEquals("Can't merge index with more than [2147483519] docs - too many documents", + assertTrue( expectThrows(IllegalStateException.class, () -> TransportShrinkAction.prepareCreateIndexRequest(new ShrinkRequest("target", "source"), state, - new DocsStats(Integer.MAX_VALUE, randomIntBetween(1, 1000)), new IndexNameExpressionResolver(Settings.EMPTY)) - ).getMessage()); + (i) -> new DocsStats(Integer.MAX_VALUE, randomIntBetween(1, 1000)), new IndexNameExpressionResolver(Settings.EMPTY)) + ).getMessage().startsWith("Can't merge index with more than [2147483519] docs - too many documents in shards ")); + + + assertTrue( + expectThrows(IllegalStateException.class, () -> { + ShrinkRequest req = new ShrinkRequest("target", "source"); + req.getShrinkIndexRequest().settings(Settings.builder().put("index.number_of_shards", 4)); + ClusterState clusterState = createClusterState("source", 8, 1, + Settings.builder().put("index.blocks.write", true).build()); + TransportShrinkAction.prepareCreateIndexRequest(req, clusterState, + (i) -> i == 2 || i == 3 ? new DocsStats(Integer.MAX_VALUE/2, randomIntBetween(1, 1000)) : null, + new IndexNameExpressionResolver(Settings.EMPTY)); + } + ).getMessage().startsWith("Can't merge index with more than [2147483519] docs - too many documents in shards ")); // create one that won't fail @@ -96,8 +104,8 @@ public void testErrorCondition() { routingTable.index("source").shardsWithState(ShardRoutingState.INITIALIZING)).routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); - TransportShrinkAction.prepareCreateIndexRequest(new ShrinkRequest("target", "source"), clusterState, stats, - new IndexNameExpressionResolver(Settings.EMPTY)); + TransportShrinkAction.prepareCreateIndexRequest(new ShrinkRequest("target", "source"), clusterState, + (i) -> new DocsStats(randomIntBetween(1, 1000), randomIntBetween(1, 1000)), new IndexNameExpressionResolver(Settings.EMPTY)); } public void testShrinkIndexSettings() { @@ -118,11 +126,12 @@ public void testShrinkIndexSettings() { routingTable = service.applyStartedShards(clusterState, routingTable.index(indexName).shardsWithState(ShardRoutingState.INITIALIZING)).routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); - - DocsStats stats = new DocsStats(randomIntBetween(0, IndexWriter.MAX_DOCS-1), randomIntBetween(1, 1000)); + int numSourceShards = clusterState.metaData().index(indexName).getNumberOfShards(); + DocsStats stats = new DocsStats(randomIntBetween(0, (IndexWriter.MAX_DOCS-1) / numSourceShards), randomIntBetween(1, 1000)); ShrinkRequest target = new ShrinkRequest("target", indexName); CreateIndexClusterStateUpdateRequest request = TransportShrinkAction.prepareCreateIndexRequest( - target, clusterState, stats, new IndexNameExpressionResolver(Settings.EMPTY)); + target, clusterState, (i) -> stats, + new IndexNameExpressionResolver(Settings.EMPTY)); assertNotNull(request.shrinkFrom()); assertEquals(indexName, request.shrinkFrom().getName()); assertEquals("1", request.settings().get("index.number_of_shards")); diff --git a/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java index aa226e311ddf9..416d00fd1541b 100644 --- a/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java @@ -66,8 +66,15 @@ private ClusterState createClusterState(String name, int numShards, int numRepli return clusterState; } + public static boolean isShrinkable(int source, int target) { + int x = source / target; + assert source > target : source + " <= " + target; + return target * x == source; + } + public void testValidateShrinkIndex() { - ClusterState state = createClusterState("source", randomIntBetween(2, 100), randomIntBetween(0, 10), + int numShards = randomIntBetween(2, 42); + ClusterState state = createClusterState("source", numShards, randomIntBetween(0, 10), Settings.builder().put("index.blocks.write", true).build()); assertEquals("index [source] already exists", @@ -81,8 +88,7 @@ public void testValidateShrinkIndex() { ).getMessage()); assertEquals("can't shrink an index with only one shard", - expectThrows(IllegalArgumentException.class, () -> - MetaDataCreateIndexService.validateShrinkIndex(createClusterState("source", 1, 0, + expectThrows(IllegalArgumentException.class, () -> MetaDataCreateIndexService.validateShrinkIndex(createClusterState("source", 1, 0, Settings.builder().put("index.blocks.write", true).build()), "source", Collections.emptySet(), "target", Settings.EMPTY) ).getMessage()); @@ -99,11 +105,11 @@ public void testValidateShrinkIndex() { MetaDataCreateIndexService.validateShrinkIndex(state, "source", Collections.emptySet(), "target", Settings.EMPTY) ).getMessage()); - - assertEquals("can not shrink index into more than one shard", - expectThrows(IllegalArgumentException.class, () -> - MetaDataCreateIndexService.validateShrinkIndex(state, "source", Collections.emptySet(), "target", - Settings.builder().put("index.number_of_shards", 2).build()) + assertEquals("the number of source shards [8] must be a must be a multiple of [3]", + expectThrows(IllegalStateException.class, () -> + MetaDataCreateIndexService.validateShrinkIndex(createClusterState("source", 8, randomIntBetween(0, 10), + Settings.builder().put("index.blocks.write", true).build()), "source", Collections.emptySet(), "target", + Settings.builder().put("index.number_of_shards", 3).build()) ).getMessage()); assertEquals("mappings are not allowed when shrinking indices, all mappings are copied from the source index", @@ -114,7 +120,7 @@ public void testValidateShrinkIndex() { ).getMessage()); // create one that won't fail - ClusterState clusterState = ClusterState.builder(createClusterState("source", randomIntBetween(2, 10), 0, + ClusterState clusterState = ClusterState.builder(createClusterState("source", numShards, 0, Settings.builder().put("index.blocks.write", true).build())).nodes(DiscoveryNodes.builder().put(newNode("node1"))) .build(); AllocationService service = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY, @@ -127,8 +133,12 @@ public void testValidateShrinkIndex() { routingTable = service.applyStartedShards(clusterState, routingTable.index("source").shardsWithState(ShardRoutingState.INITIALIZING)).routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); - - MetaDataCreateIndexService.validateShrinkIndex(clusterState, "source", Collections.emptySet(), "target", Settings.EMPTY); + int targetShards; + do { + targetShards = randomIntBetween(1, numShards/2); + } while (isShrinkable(numShards, targetShards) == false); + MetaDataCreateIndexService.validateShrinkIndex(clusterState, "source", Collections.emptySet(), "target", + Settings.builder().put("index.number_of_shards", targetShards).build()); } public void testShrinkIndexSettings() { @@ -155,7 +165,6 @@ public void testShrinkIndexSettings() { Settings.Builder builder = Settings.builder(); MetaDataCreateIndexService.prepareShrinkIndexSettings( clusterState, Collections.emptySet(), builder, clusterState.metaData().index(indexName).getIndex(), "target"); - assertEquals("1", builder.build().get("index.number_of_shards")); assertEquals("similarity settings must be copied", "BM25", builder.build().get("index.similarity.default.type")); assertEquals("analysis settings must be copied", "keyword", builder.build().get("index.analysis.analyzer.my_analyzer.tokenizer")); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java new file mode 100644 index 0000000000000..23ca5d703e62d --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java @@ -0,0 +1,173 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.routing; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; + +import java.util.Arrays; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +public class OperationRoutingTests extends ESTestCase{ + + public void testGenerateShardId() { + int[][] possibleValues = new int[][] { + {8,4,2}, {20, 10, 2}, {36, 12, 3}, {15,5,1} + }; + for (int i = 0; i < 10; i++) { + int[] shardSplits = randomFrom(possibleValues); + assertEquals(shardSplits[0], (shardSplits[0] / shardSplits[1]) * shardSplits[1]); + assertEquals(shardSplits[1], (shardSplits[1] / shardSplits[2]) * shardSplits[2]); + IndexMetaData metaData = IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(shardSplits[0]) + .numberOfReplicas(1).build(); + String term = randomAsciiOfLength(10); + final int shard = OperationRouting.generateShardId(metaData, term, null); + IndexMetaData shrunk = IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(shardSplits[1]) + .numberOfReplicas(1) + .setRoutingFactor(shardSplits[0] / shardSplits[1]).setRoutingNumShards(shardSplits[0]).build(); + int shrunkShard = OperationRouting.generateShardId(shrunk, term, null); + Set shardIds = IndexMetaData.selectShrinkShards(shrunkShard, metaData, shrunk.getNumberOfShards()); + assertEquals(1, shardIds.stream().filter((sid) -> sid.id() == shard).count()); + + shrunk = IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(shardSplits[2]).numberOfReplicas(1) + .setRoutingFactor((shardSplits[0] / shardSplits[1]) * (shardSplits[1] / shardSplits[2])).setRoutingNumShards(shardSplits[0]).build(); + shrunkShard = OperationRouting.generateShardId(shrunk, term, null); + shardIds = IndexMetaData.selectShrinkShards(shrunkShard, metaData, shrunk.getNumberOfShards()); + assertEquals(Arrays.toString(shardSplits), 1, shardIds.stream().filter((sid) -> sid.id() == shard).count()); + } + } + + /** + * Ensures that all changes to the hash-function / shard selection are BWC + */ + public void testBWC() { + Map termToShard = new TreeMap<>(); + termToShard.put("sEERfFzPSI", 1); + termToShard.put("cNRiIrjzYd", 7); + termToShard.put("BgfLBXUyWT", 5); + termToShard.put("cnepjZhQnb", 3); + termToShard.put("OKCmuYkeCK", 6); + termToShard.put("OutXGRQUja", 5); + termToShard.put("yCdyocKWou", 1); + termToShard.put("KXuNWWNgVj", 2); + termToShard.put("DGJOYrpESx", 4); + termToShard.put("upLDybdTGs", 5); + termToShard.put("yhZhzCPQby", 1); + termToShard.put("EyCVeiCouA", 1); + termToShard.put("tFyVdQauWR", 6); + termToShard.put("nyeRYDnDQr", 6); + termToShard.put("hswhrppvDH", 0); + termToShard.put("BSiWvDOsNE", 5); + termToShard.put("YHicpFBSaY", 1); + termToShard.put("EquPtdKaBZ", 4); + termToShard.put("rSjLZHCDfT", 5); + termToShard.put("qoZALVcite", 7); + termToShard.put("yDCCPVBiCm", 7); + termToShard.put("ngizYtQgGK", 5); + termToShard.put("FYQRIBcNqz", 0); + termToShard.put("EBzEDAPODe", 2); + termToShard.put("YePigbXgKb", 1); + termToShard.put("PeGJjomyik", 3); + termToShard.put("cyQIvDmyYD", 7); + termToShard.put("yIEfZrYfRk", 5); + termToShard.put("kblouyFUbu", 7); + termToShard.put("xvIGbRiGJF", 3); + termToShard.put("KWimwsREPf", 4); + termToShard.put("wsNavvIcdk", 7); + termToShard.put("xkWaPcCmpT", 0); + termToShard.put("FKKTOnJMDy", 7); + termToShard.put("RuLzobYixn", 2); + termToShard.put("mFohLeFRvF", 4); + termToShard.put("aAMXnamRJg", 7); + termToShard.put("zKBMYJDmBI", 0); + termToShard.put("ElSVuJQQuw", 7); + termToShard.put("pezPtTQAAm", 7); + termToShard.put("zBjjNEjAex", 2); + termToShard.put("PGgHcLNPYX", 7); + termToShard.put("hOkpeQqTDF", 3); + termToShard.put("chZXraUPBH", 7); + termToShard.put("FAIcSmmNXq", 5); + termToShard.put("EZmDicyayC", 0); + termToShard.put("GRIueBeIyL", 7); + termToShard.put("qCChjGZYLp", 3); + termToShard.put("IsSZQwwnUT", 3); + termToShard.put("MGlxLFyyCK", 3); + termToShard.put("YmscwrKSpB", 0); + termToShard.put("czSljcjMop", 5); + termToShard.put("XhfGWwNlng", 1); + termToShard.put("cWpKJjlzgj", 7); + termToShard.put("eDzIfMKbvk", 1); + termToShard.put("WFFWYBfnTb", 0); + termToShard.put("oDdHJxGxja", 7); + termToShard.put("PDOQQqgIKE", 1); + termToShard.put("bGEIEBLATe", 6); + termToShard.put("xpRkJPWVpu", 2); + termToShard.put("kTwZnPEeIi", 2); + termToShard.put("DifcuqSsKk", 1); + termToShard.put("CEmLmljpXe", 5); + termToShard.put("cuNKtLtyJQ", 7); + termToShard.put("yNjiAnxAmt", 5); + termToShard.put("bVDJDCeaFm", 2); + termToShard.put("vdnUhGLFtl", 0); + termToShard.put("LnqSYezXbr", 5); + termToShard.put("EzHgydDCSR", 3); + termToShard.put("ZSKjhJlcpn", 1); + termToShard.put("WRjUoZwtUz", 3); + termToShard.put("RiBbcCdIgk", 4); + termToShard.put("yizTqyjuDn", 4); + termToShard.put("QnFjcpcZUT", 4); + termToShard.put("agYhXYUUpl", 7); + termToShard.put("UOjiTugjNC", 7); + termToShard.put("nICGuWTdfV", 0); + termToShard.put("NrnSmcnUVF", 2); + termToShard.put("ZSzFcbpDqP", 3); + termToShard.put("YOhahLSzzE", 5); + termToShard.put("iWswCilUaT", 1); + termToShard.put("zXAamKsRwj", 2); + termToShard.put("aqGsrUPHFq", 5); + termToShard.put("eDItImYWTS", 1); + termToShard.put("JAYDZMRcpW", 4); + termToShard.put("lmvAaEPflK", 7); + termToShard.put("IKuOwPjKCx", 5); + termToShard.put("schsINzlYB", 1); + termToShard.put("OqbFNxrKrF", 2); + termToShard.put("QrklDfvEJU", 6); + termToShard.put("VLxKRKdLbx", 4); + termToShard.put("imoydNTZhV", 1); + termToShard.put("uFZyTyOMRO", 4); + termToShard.put("nVAZVMPNNx", 3); + termToShard.put("rPIdESYaAO", 5); + termToShard.put("nbZWPWJsIM", 0); + termToShard.put("wRZXPSoEgd", 3); + termToShard.put("nGzpgwsSBc", 4); + termToShard.put("AITyyoyLLs", 4); + IndexMetaData metaData = IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(8) + .numberOfReplicas(1).build(); + for (Map.Entry entry : termToShard.entrySet()) { + String key = entry.getKey(); + int shard = randomBoolean() ? + OperationRouting.generateShardId(metaData, key, null) : OperationRouting.generateShardId(metaData, "foobar", key); + assertEquals(shard, entry.getValue().intValue()); + } + } +} From e9d36b4dd520522c90cd654810868a5d144787d6 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 3 Jun 2016 10:29:51 +0200 Subject: [PATCH 2/9] add javadocs and simplify code --- .../indices/shrink/TransportShrinkAction.java | 2 +- .../cluster/metadata/IndexMetaData.java | 77 ++++++++++--------- .../metadata/MetaDataCreateIndexService.java | 11 +-- .../cluster/routing/OperationRouting.java | 4 +- .../admin/indices/create/CreateIndexIT.java | 6 +- .../shrink/TransportShrinkActionTests.java | 2 +- .../routing/OperationRoutingTests.java | 4 +- 7 files changed, 51 insertions(+), 55 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java index 7aebcf3911fce..a00dd23b76423 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java @@ -140,7 +140,7 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final Shri if (docsStats != null) { count += docsStats.getCount(); } - if (count >= IndexWriter.MAX_DOCS) { + if (count > IndexWriter.MAX_DOCS) { throw new IllegalStateException("Can't merge index with more than [" + IndexWriter.MAX_DOCS + "] docs - too many documents in shards " + shardIds); } diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java index fee583ebe54d2..4318f280113b6 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java @@ -63,13 +63,11 @@ import java.io.IOException; import java.text.ParseException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; @@ -229,6 +227,7 @@ public static State fromString(String state) { public static final String KEY_ACTIVE_ALLOCATIONS = "active_allocations"; static final String KEY_VERSION = "version"; + static final String KEY_ROUTING_NUM_SHARDS = "routing_num_shards"; static final String KEY_SETTINGS = "settings"; static final String KEY_STATE = "state"; static final String KEY_MAPPINGS = "mappings"; @@ -236,11 +235,6 @@ public static State fromString(String state) { public static final String KEY_PRIMARY_TERMS = "primary_terms"; public static final String INDEX_STATE_FILE_PREFIX = "state-"; - - - public static final Setting INDEX_ROUTING_NUM_SHARDS = Setting.intSetting("index.routing.num_shards", INDEX_NUMBER_OF_SHARDS_SETTING, 1); - public static final Setting INDEX_ROUTING_FACTOR = Setting.intSetting("index.routing.factor", 1, 1); - private final int routingNumShards; private final int routingFactor; @@ -279,7 +273,7 @@ private IndexMetaData(Index index, long version, long[] primaryTerms, State stat ImmutableOpenMap customs, ImmutableOpenIntMap> activeAllocationIds, DiscoveryNodeFilters requireFilters, DiscoveryNodeFilters initialRecoveryFilters, DiscoveryNodeFilters includeFilters, DiscoveryNodeFilters excludeFilters, Version indexCreatedVersion, Version indexUpgradedVersion, org.apache.lucene.util.Version minimumCompatibleLuceneVersion, - int routingNumShards, int routingFactor) { + int routingNumShards) { this.index = index; this.version = version; @@ -302,7 +296,9 @@ private IndexMetaData(Index index, long version, long[] primaryTerms, State stat this.indexUpgradedVersion = indexUpgradedVersion; this.minimumCompatibleLuceneVersion = minimumCompatibleLuceneVersion; this.routingNumShards = routingNumShards; - this.routingFactor = routingFactor; + this.routingFactor = routingNumShards / numberOfShards; + assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards; + } public Index getIndex() { @@ -546,7 +542,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws private static class IndexMetaDataDiff implements Diff { private final String index; - private final int routingFactor; private final int routingNumShards; private final long version; private final long[] primaryTerms; @@ -560,7 +555,6 @@ private static class IndexMetaDataDiff implements Diff { public IndexMetaDataDiff(IndexMetaData before, IndexMetaData after) { index = after.index.getName(); version = after.version; - routingFactor = after.routingFactor; routingNumShards = after.routingNumShards; state = after.state; settings = after.settings; @@ -574,7 +568,6 @@ public IndexMetaDataDiff(IndexMetaData before, IndexMetaData after) { public IndexMetaDataDiff(StreamInput in) throws IOException { index = in.readString(); - routingFactor = in.readInt(); routingNumShards = in.readInt(); version = in.readLong(); state = State.fromId(in.readByte()); @@ -601,7 +594,6 @@ public Diff readDiff(StreamInput in, String key) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(index); - out.writeInt(routingFactor); out.writeInt(routingNumShards); out.writeLong(version); out.writeByte(state.id); @@ -617,7 +609,6 @@ public void writeTo(StreamOutput out) throws IOException { public IndexMetaData apply(IndexMetaData part) { Builder builder = builder(index); builder.version(version); - builder.setRoutingFactor(routingFactor); builder.setRoutingNumShards(routingNumShards); builder.state(state); builder.settings(settings); @@ -634,7 +625,6 @@ public IndexMetaData apply(IndexMetaData part) { public IndexMetaData readFrom(StreamInput in) throws IOException { Builder builder = new Builder(in.readString()); builder.version(in.readLong()); - builder.setRoutingFactor(in.readInt()); builder.setRoutingNumShards(in.readInt()); builder.state(State.fromId(in.readByte())); builder.settings(readSettingsFromStream(in)); @@ -668,7 +658,6 @@ public IndexMetaData readFrom(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { out.writeString(index.getName()); // uuid will come as part of settings out.writeLong(version); - out.writeInt(routingFactor); out.writeInt(routingNumShards); out.writeByte(state.id()); writeSettingsToStream(settings, out); @@ -713,7 +702,6 @@ public static class Builder { private final ImmutableOpenMap.Builder customs; private final ImmutableOpenIntMap.Builder> activeAllocationIds; private Integer routingNumShards; - private int routingFactor = 1; public Builder(String index) { this.index = index; @@ -732,7 +720,6 @@ public Builder(IndexMetaData indexMetaData) { this.mappings = ImmutableOpenMap.builder(indexMetaData.mappings); this.aliases = ImmutableOpenMap.builder(indexMetaData.aliases); this.customs = ImmutableOpenMap.builder(indexMetaData.customs); - this.routingFactor = indexMetaData.routingFactor; this.routingNumShards = indexMetaData.routingNumShards; this.activeAllocationIds = ImmutableOpenIntMap.builder(indexMetaData.activeAllocationIds); } @@ -756,19 +743,10 @@ public Builder setRoutingNumShards(int routingNumShards) { return this; } - public Builder setRoutingFactor(int routingFactor) { - this.routingFactor = routingFactor; - return this; - } - public int getRoutingNumShards() { return routingNumShards == null ? numberOfShards() : routingNumShards; } - public int getRoutingFactor() { - return routingFactor; - } - public int numberOfShards() { return settings.getAsInt(SETTING_NUMBER_OF_SHARDS, -1); } @@ -983,15 +961,14 @@ public IndexMetaData build() { final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE); return new IndexMetaData(new Index(index, uuid), version, primaryTerms, state, numberOfShards, numberOfReplicas, tmpSettings, mappings.build(), tmpAliases.build(), customs.build(), filledActiveAllocationIds.build(), requireFilters, initialRecoveryFilters, includeFilters, excludeFilters, - indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion, getRoutingNumShards(), routingFactor); + indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion, getRoutingNumShards()); } public static void toXContent(IndexMetaData indexMetaData, XContentBuilder builder, ToXContent.Params params) throws IOException { builder.startObject(indexMetaData.getIndex().getName()); builder.field(KEY_VERSION, indexMetaData.getVersion()); - builder.field("routing_factor", indexMetaData.getRoutingFactor()); - builder.field("routing_num_shards", indexMetaData.getRoutingNumShards()); + builder.field(KEY_ROUTING_NUM_SHARDS, indexMetaData.getRoutingNumShards()); builder.field(KEY_STATE, indexMetaData.getState().toString().toLowerCase(Locale.ENGLISH)); boolean binary = params.paramAsBoolean("binary", false); @@ -1152,9 +1129,7 @@ public static IndexMetaData fromXContent(XContentParser parser) throws IOExcepti builder.state(State.fromString(parser.text())); } else if (KEY_VERSION.equals(currentFieldName)) { builder.version(parser.longValue()); - } else if ("routing_factor".equals(currentFieldName)) { - builder.setRoutingFactor(parser.intValue()); - } else if ("routing_num_shards".equals(currentFieldName)) { + } else if (KEY_ROUTING_NUM_SHARDS.equals(currentFieldName)) { builder.setRoutingNumShards(parser.intValue()); } else { throw new IllegalArgumentException("Unexpected field [" + currentFieldName + "]"); @@ -1231,25 +1206,53 @@ public IndexMetaData fromXContent(XContentParser parser) throws IOException { } }; + /** + * Returns the number of shards that should be used for routing. This basically defines the hash space we use in + * {@link org.elasticsearch.cluster.routing.OperationRouting#generateShardId(IndexMetaData, String, String)} to route documents + * to shards based on their ID or their specific routing value. The default value is {@link #getNumberOfShards()}. This value only + * changes if and index is shrunk. + */ public int getRoutingNumShards() { return routingNumShards; } + /** + * Returns the routing factor for this index. The default is 1. + * + * @see #getRoutingFactor(IndexMetaData, int) for details + */ public int getRoutingFactor() { return routingFactor; } + /** + * Returns the source shard ids to shrink into the given shard id. + * @param shardId the id of the target shard to shrink to + * @param sourceIndexMetadata the source index metadata + * @param numTargetShards the total number of shards in the target index + * @return a set of shard IDs to shrink into the given shard ID. + */ public static Set selectShrinkShards(int shardId, IndexMetaData sourceIndexMetadata, int numTargetShards) { - int shrinkFactor = getShrinkFactor(sourceIndexMetadata, numTargetShards); - Set shards = new HashSet<>(shrinkFactor); - for (int i = shardId * shrinkFactor; i < shrinkFactor*shardId + shrinkFactor; i++) { + int routingFactor = getRoutingFactor(sourceIndexMetadata, numTargetShards); + Set shards = new HashSet<>(routingFactor); + for (int i = shardId * routingFactor; i < routingFactor*shardId + routingFactor; i++) { shards.add(new ShardId(sourceIndexMetadata.getIndex(), i)); } return shards; } - public static int getShrinkFactor(IndexMetaData sourceIndexMetadata, int targetNumberOfShards) { + /** + * Returns the routing factor for and shrunk index with the given number of target shards. + * This factor is used in the hash function in + * {@link org.elasticsearch.cluster.routing.OperationRouting#generateShardId(IndexMetaData, String, String)} to guarantee consistent + * hashing / routing of documents even if the number of shards changed (ie. a shrunk index). + * + * @param sourceIndexMetadata the metadata of the source index + * @param targetNumberOfShards the total number of shards in the target index + * @return the routing factor for and shrunk index with the given number of target shards. + */ + public static int getRoutingFactor(IndexMetaData sourceIndexMetadata, int targetNumberOfShards) { int sourceNumberOfShards = sourceIndexMetadata.getNumberOfShards(); int factor = sourceNumberOfShards / targetNumberOfShards; if (factor * targetNumberOfShards != sourceNumberOfShards || factor <= 1) { diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index 13b5fc35984c1..932286f1f91a0 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -21,7 +21,6 @@ import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; -import org.apache.lucene.index.IndexWriter; import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; @@ -65,7 +64,6 @@ import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.query.QueryShardContext; -import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndexCreationException; import org.elasticsearch.indices.IndicesService; @@ -305,14 +303,11 @@ public ClusterState execute(ClusterState currentState) throws Exception { prepareShrinkIndexSettings(currentState, mappings.keySet(), indexSettingsBuilder, shrinkFromIndex, request.index()); IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(shrinkFromIndex); - int shrinkFactor = IndexMetaData.getShrinkFactor(sourceMetaData, routingNumShards); - routingFactor = sourceMetaData.getRoutingFactor() * shrinkFactor; routingNumShards = sourceMetaData.getRoutingNumShards(); } Settings actualIndexSettings = indexSettingsBuilder.build(); IndexMetaData.Builder tmpImdBuilder = IndexMetaData.builder(request.index()) - .setRoutingFactor(routingFactor) .setRoutingNumShards(routingNumShards); // Set up everything, now locally create the index to see that things are ok, and apply final IndexMetaData tmpImd = tmpImdBuilder.settings(actualIndexSettings).build(); @@ -347,8 +342,8 @@ public ClusterState execute(ClusterState currentState) throws Exception { mappingsMetaData.put(mapper.type(), mappingMd); } - final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(request.index()).settings(actualIndexSettings) - .setRoutingFactor(routingFactor) + final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(request.index()) + .settings(actualIndexSettings) .setRoutingNumShards(routingNumShards); for (MappingMetaData mappingMd : mappingsMetaData.values()) { indexMetaDataBuilder.putMapping(mappingMd); @@ -510,7 +505,7 @@ static List validateShrinkIndex(ClusterState state, String sourceIndex, ", all mappings are copied from the source index"); } if (IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.exists(targetIndexSettings)) { - IndexMetaData.getShrinkFactor(sourceMetaData, IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(targetIndexSettings)); + IndexMetaData.getRoutingFactor(sourceMetaData, IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(targetIndexSettings)); } // now check that index is all on one node diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java b/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java index be689fa40e512..9b1c82e7c0678 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java @@ -225,7 +225,9 @@ static int generateShardId(IndexMetaData indexMetaData, String id, @Nullable Str } else { hash = Murmur3HashFunction.hash(routing); } - return Math.floorDiv(Math.floorMod(hash, indexMetaData.getRoutingNumShards()), indexMetaData.getRoutingFactor()); + // we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size + // of original index to hash documents + return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor(); } private void ensureNodeIdExists(DiscoveryNodes nodes, String nodeId) { diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java index a4e75f4a8cc07..f5333d1288efa 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java @@ -290,9 +290,7 @@ public void testRestartIndexCreationAfterFullClusterRestart() throws Exception { } public void testCreateShrinkIndexToN() { - int[][] possibleShardSplits = new int[][] { - {8,4,2}, {9, 3, 1}, {4, 2, 1}, {15,5,1} - }; + int[][] possibleShardSplits = new int[][] {{8,4,2}, {9, 3, 1}, {4, 2, 1}, {15,5,1}}; int[] shardSplits = randomFrom(possibleShardSplits); assertEquals(shardSplits[0], (shardSplits[0] / shardSplits[1]) * shardSplits[1]); assertEquals(shardSplits[1], (shardSplits[1] / shardSplits[2]) * shardSplits[2]); @@ -355,8 +353,6 @@ public void testCreateShrinkIndexToN() { assertHitCount(client().prepareSearch("second_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); assertHitCount(client().prepareSearch("first_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); - - } public void testCreateShrinkIndex() { diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java b/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java index fa6c5e5453b4e..01be8da894463 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java @@ -127,7 +127,7 @@ public void testShrinkIndexSettings() { routingTable.index(indexName).shardsWithState(ShardRoutingState.INITIALIZING)).routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); int numSourceShards = clusterState.metaData().index(indexName).getNumberOfShards(); - DocsStats stats = new DocsStats(randomIntBetween(0, (IndexWriter.MAX_DOCS-1) / numSourceShards), randomIntBetween(1, 1000)); + DocsStats stats = new DocsStats(randomIntBetween(0, (IndexWriter.MAX_DOCS) / numSourceShards), randomIntBetween(1, 1000)); ShrinkRequest target = new ShrinkRequest("target", indexName); CreateIndexClusterStateUpdateRequest request = TransportShrinkAction.prepareCreateIndexRequest( target, clusterState, (i) -> stats, diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java index 23ca5d703e62d..e515e0f9bf443 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java @@ -44,13 +44,13 @@ public void testGenerateShardId() { final int shard = OperationRouting.generateShardId(metaData, term, null); IndexMetaData shrunk = IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(shardSplits[1]) .numberOfReplicas(1) - .setRoutingFactor(shardSplits[0] / shardSplits[1]).setRoutingNumShards(shardSplits[0]).build(); + .setRoutingNumShards(shardSplits[0]).build(); int shrunkShard = OperationRouting.generateShardId(shrunk, term, null); Set shardIds = IndexMetaData.selectShrinkShards(shrunkShard, metaData, shrunk.getNumberOfShards()); assertEquals(1, shardIds.stream().filter((sid) -> sid.id() == shard).count()); shrunk = IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(shardSplits[2]).numberOfReplicas(1) - .setRoutingFactor((shardSplits[0] / shardSplits[1]) * (shardSplits[1] / shardSplits[2])).setRoutingNumShards(shardSplits[0]).build(); + .setRoutingNumShards(shardSplits[0]).build(); shrunkShard = OperationRouting.generateShardId(shrunk, term, null); shardIds = IndexMetaData.selectShrinkShards(shrunkShard, metaData, shrunk.getNumberOfShards()); assertEquals(Arrays.toString(shardSplits), 1, shardIds.stream().filter((sid) -> sid.id() == shard).count()); From 69b346c2c6549cf7dea5bb5b7f9c19e51a23a43a Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 3 Jun 2016 10:58:12 +0200 Subject: [PATCH 3/9] fix docs --- .../cluster/metadata/IndexMetaData.java | 5 +++ .../metadata/MetaDataCreateIndexService.java | 3 ++ .../MetaDataCreateIndexServiceTests.java | 11 +++++-- docs/reference/indices/shrink-index.asciidoc | 33 +++++++++++++------ 4 files changed, 40 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java index 4318f280113b6..1214f122d2a18 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java @@ -1251,9 +1251,14 @@ public static Set selectShrinkShards(int shardId, IndexMetaData sourceI * @param sourceIndexMetadata the metadata of the source index * @param targetNumberOfShards the total number of shards in the target index * @return the routing factor for and shrunk index with the given number of target shards. + * @throw IllegalArgumentException if the number of source shards is greater than the number of target shards or if the source shards + * are not divisible by the number of target shards. */ public static int getRoutingFactor(IndexMetaData sourceIndexMetadata, int targetNumberOfShards) { int sourceNumberOfShards = sourceIndexMetadata.getNumberOfShards(); + if (sourceNumberOfShards < targetNumberOfShards) { + throw new IllegalArgumentException("the number of target shards must be less that the number of source shards"); + } int factor = sourceNumberOfShards / targetNumberOfShards; if (factor * targetNumberOfShards != sourceNumberOfShards || factor <= 1) { throw new IllegalStateException("the number of source shards [" + sourceNumberOfShards + "] must be a must be a multiple of [" diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index 932286f1f91a0..fe5bf7049620a 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -499,12 +499,15 @@ static List validateShrinkIndex(ClusterState state, String sourceIndex, throw new IllegalArgumentException("can't shrink an index with only one shard"); } + if ((targetIndexMappingsTypes.size() > 1 || (targetIndexMappingsTypes.isEmpty() || targetIndexMappingsTypes.contains(MapperService.DEFAULT_MAPPING)) == false)) { throw new IllegalArgumentException("mappings are not allowed when shrinking indices" + ", all mappings are copied from the source index"); } if (IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.exists(targetIndexSettings)) { + // this method applies all necessary checks ie. if the target shards are less than the source shards + // of if the source shards are divisible by the number of target shards IndexMetaData.getRoutingFactor(sourceMetaData, IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(targetIndexSettings)); } diff --git a/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java index 416d00fd1541b..fc0d409f1cd59 100644 --- a/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java @@ -88,11 +88,18 @@ public void testValidateShrinkIndex() { ).getMessage()); assertEquals("can't shrink an index with only one shard", - expectThrows(IllegalArgumentException.class, () -> MetaDataCreateIndexService.validateShrinkIndex(createClusterState("source", 1, 0, - Settings.builder().put("index.blocks.write", true).build()), "source", Collections.emptySet(), + expectThrows(IllegalArgumentException.class, () -> MetaDataCreateIndexService.validateShrinkIndex(createClusterState("source", + 1, 0, Settings.builder().put("index.blocks.write", true).build()), "source", Collections.emptySet(), "target", Settings.EMPTY) ).getMessage()); + assertEquals("the number of target shards must be less that the number of source shards", + expectThrows(IllegalArgumentException.class, () -> MetaDataCreateIndexService.validateShrinkIndex(createClusterState("source", + 5, 0, Settings.builder().put("index.blocks.write", true).build()), "source", Collections.emptySet(), + "target", Settings.builder().put("index.number_of_shards", 10).build()) + ).getMessage()); + + assertEquals("index source must be read-only to shrink index. use \"index.blocks.write=true\"", expectThrows(IllegalStateException.class, () -> MetaDataCreateIndexService.validateShrinkIndex( diff --git a/docs/reference/indices/shrink-index.asciidoc b/docs/reference/indices/shrink-index.asciidoc index d531477e77c57..7d5ec6fc2b4fb 100644 --- a/docs/reference/indices/shrink-index.asciidoc +++ b/docs/reference/indices/shrink-index.asciidoc @@ -2,13 +2,18 @@ == Shrink Index The shrink index API allows you to shrink an existing index into a new index -with a single primary shard. Before shrinking, a (primary or replica) copy of -every shard in the index must be present on the same node. +with fewer primary shards. The number of primary shards in the target index +must be a factor of the shards in the source index. For example an index with +`8` primary shards can be shrunk into `4`, `2` or `1` primary shard or an index +with `15` primary shards can be shrunk into `5`, `3` or `1`. If the number +of shards in the index is a prime number it can only be shrunk into a single +primary shard. Before shrinking, a (primary or replica) copy of every shard +in the index must be present on the same node. Shrinking works as follows: * First, it creates a new target index with the same definition as the source - index, but with a single primary shard. + index, but with a viewer number of primary shards. * Then it hard-links segments from the source index into the target index. (If the file system doesn't support hard-linking, then all segments are copied @@ -64,15 +69,19 @@ the cluster state -- it doesn't wait for the shrink operation to start. [IMPORTANT] ===================================== -Indices can only be shrunk into a single shard if they satisfy the following requirements: +Indices can only be shrunk into a a viewer number of shard if they satisfy the following requirements: - * the target index must not exist +* the target index must not exist -* The index must have more than one primary shard. +* The index must have more primary shards than the target index. + +* The number of primary shards in the target index must be a factor of the + number of primary shards in the source index. must have more primary shards + than the target index. * The index must not contain more than `2,147,483,519` documents in total - across all shards as this is the maximum number of docs that can fit into a - single shard. + across all shards that will be shrunk into a single shard on the target index + as this is the maximum number of docs that can fit into a single shard. * The node handling the shrink process must have sufficient free disk space to accommodate a second copy of the existing index. @@ -88,7 +97,8 @@ POST my_source_index/_shrink/my_target_index { "settings": { "index.number_of_replicas": 1, - "index.codec": "best_compression" <1> + "index.number_of_shards": 1, <1> + "index.codec": "best_compression" <2> }, "aliases": { "my_search_indices": {} @@ -96,10 +106,13 @@ POST my_source_index/_shrink/my_target_index } -------------------------------------------------- -<1> Best compression will only take affect when new writes are made to the +<1> The number of shards in the target index. This must be a factor of the + number of shards in the source index. +<2> Best compression will only take affect when new writes are made to the index, such as when <> the shard to a single segment. + NOTE: Mappings may not be specified in the `_shrink` request, and all `index.analysis.*` and `index.similarity.*` settings will be overwritten with the settings from the source index. From 63b2e0074cc8ed4149a94cd9f6af0ed6b2c4f691 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 3 Jun 2016 11:16:06 +0200 Subject: [PATCH 4/9] qualify by class --- .../src/main/java/org/elasticsearch/index/shard/IndexShard.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index e55fc48a7b3d4..105d1e5b057cf 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1413,7 +1413,7 @@ public void startRecovery(DiscoveryNode localNode, DiscoveryNode sourceNode, Rec markAsRecovering("from local shards", recoveryState); // mark the shard as recovering on the cluster state thread threadPool.generic().execute(() -> { try { - Set shards = indexMetaData.selectShrinkShards(shardId().id(), sourceIndexService.getMetaData(), + final Set shards = IndexMetaData.selectShrinkShards(shardId().id(), sourceIndexService.getMetaData(), indexMetaData.getNumberOfShards()); if (recoverFromLocalShards(mappingUpdateConsumer, startedShards.stream() .filter((s) -> shards.contains(s.shardId())).collect(Collectors.toList()))) { From 4e9265430492ebebca6321c63b00fb5abb17499b Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 3 Jun 2016 12:42:06 +0200 Subject: [PATCH 5/9] add additional tests for IndexMetaData --- .../cluster/metadata/IndexMetaData.java | 30 ++++- .../cluster/metadata/IndexMetaDataTests.java | 126 ++++++++++++++++++ .../MetaDataCreateIndexServiceTests.java | 2 +- 3 files changed, 151 insertions(+), 7 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/cluster/metadata/IndexMetaDataTests.java diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java index 1214f122d2a18..89d8f704c7748 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java @@ -30,10 +30,8 @@ import org.elasticsearch.cluster.DiffableUtils; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeFilters; import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseFieldMatcher; @@ -298,7 +296,6 @@ private IndexMetaData(Index index, long version, long[] primaryTerms, State stat this.routingNumShards = routingNumShards; this.routingFactor = routingNumShards / numberOfShards; assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards; - } public Index getIndex() { @@ -493,7 +490,12 @@ public boolean equals(Object o) { if (!customs.equals(that.customs)) { return false; } - + if (routingNumShards != that.routingNumShards) { + return false; + } + if (routingFactor != that.routingFactor) { + return false; + } if (Arrays.equals(primaryTerms, that.primaryTerms) == false) { return false; } @@ -512,6 +514,8 @@ public int hashCode() { result = 31 * result + settings.hashCode(); result = 31 * result + mappings.hashCode(); result = 31 * result + customs.hashCode(); + result = 31 * result + Long.hashCode(routingFactor); + result = 31 * result + Long.hashCode(routingNumShards); result = 31 * result + Arrays.hashCode(primaryTerms); result = 31 * result + activeAllocationIds.hashCode(); return result; @@ -738,11 +742,22 @@ public Builder numberOfShards(int numberOfShards) { return this; } + /** + * Sets the number of shards that should be used for routing. This should only be used if the number of shards in + * an index has changed ie if the index is shrunk. + */ public Builder setRoutingNumShards(int routingNumShards) { this.routingNumShards = routingNumShards; return this; } + /** + * Returns number of shards that should be used for routing. By default this method will return the number of shards + * for this index. + * + * @see #setRoutingNumShards(int) + * @see #numberOfShards() + */ public int getRoutingNumShards() { return routingNumShards == null ? numberOfShards() : routingNumShards; } @@ -1225,7 +1240,6 @@ public int getRoutingFactor() { return routingFactor; } - /** * Returns the source shard ids to shrink into the given shard id. * @param shardId the id of the target shard to shrink to @@ -1234,6 +1248,10 @@ public int getRoutingFactor() { * @return a set of shard IDs to shrink into the given shard ID. */ public static Set selectShrinkShards(int shardId, IndexMetaData sourceIndexMetadata, int numTargetShards) { + if (shardId >= numTargetShards) { + throw new IllegalArgumentException("the number of target shards (" + numTargetShards + ") must be greater than the shard id: " + + shardId); + } int routingFactor = getRoutingFactor(sourceIndexMetadata, numTargetShards); Set shards = new HashSet<>(routingFactor); for (int i = shardId * routingFactor; i < routingFactor*shardId + routingFactor; i++) { @@ -1261,7 +1279,7 @@ public static int getRoutingFactor(IndexMetaData sourceIndexMetadata, int target } int factor = sourceNumberOfShards / targetNumberOfShards; if (factor * targetNumberOfShards != sourceNumberOfShards || factor <= 1) { - throw new IllegalStateException("the number of source shards [" + sourceNumberOfShards + "] must be a must be a multiple of [" + throw new IllegalArgumentException("the number of source shards [" + sourceNumberOfShards + "] must be a must be a multiple of [" + targetNumberOfShards + "]"); } return factor; diff --git a/core/src/test/java/org/elasticsearch/cluster/metadata/IndexMetaDataTests.java b/core/src/test/java/org/elasticsearch/cluster/metadata/IndexMetaDataTests.java new file mode 100644 index 0000000000000..0c9827587ea18 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/metadata/IndexMetaDataTests.java @@ -0,0 +1,126 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.Set; + +public class IndexMetaDataTests extends ESTestCase { + + public void testIndexMetaDataSerialization() throws IOException { + Integer numShard = randomFrom(1, 2, 4, 8, 16); + int numberOfReplicas = randomIntBetween(0, 10); + IndexMetaData metaData = IndexMetaData.builder("foo") + .settings(Settings.builder() + .put("index.version.created", 1) + .put("index.number_of_shards", numShard) + .put("index.number_of_replicas", numberOfReplicas) + .build()) + .creationDate(randomLong()) + .primaryTerm(0, 2) + .setRoutingNumShards(32) + .build(); + + final XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject(); + metaData.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + XContentParser parser = XContentType.JSON.xContent().createParser(builder.bytes()); + final IndexMetaData fromXContentMeta = IndexMetaData.PROTO.fromXContent(parser, null); + assertEquals(metaData, fromXContentMeta); + assertEquals(metaData.hashCode(), fromXContentMeta.hashCode()); + + assertEquals(metaData.getNumberOfReplicas(), fromXContentMeta.getNumberOfReplicas()); + assertEquals(metaData.getNumberOfShards(), fromXContentMeta.getNumberOfShards()); + assertEquals(metaData.getCreationVersion(), fromXContentMeta.getCreationVersion()); + assertEquals(metaData.getRoutingNumShards(), fromXContentMeta.getRoutingNumShards()); + assertEquals(metaData.getCreationDate(), fromXContentMeta.getCreationDate()); + assertEquals(metaData.getRoutingFactor(), fromXContentMeta.getRoutingFactor()); + assertEquals(metaData.primaryTerm(0), fromXContentMeta.primaryTerm(0)); + + final BytesStreamOutput out = new BytesStreamOutput(); + metaData.writeTo(out); + IndexMetaData deserialized = IndexMetaData.PROTO.readFrom(StreamInput.wrap(out.bytes())); + assertEquals(metaData, deserialized); + assertEquals(metaData.hashCode(), deserialized.hashCode()); + + assertEquals(metaData.getNumberOfReplicas(), deserialized.getNumberOfReplicas()); + assertEquals(metaData.getNumberOfShards(), deserialized.getNumberOfShards()); + assertEquals(metaData.getCreationVersion(), deserialized.getCreationVersion()); + assertEquals(metaData.getRoutingNumShards(), deserialized.getRoutingNumShards()); + assertEquals(metaData.getCreationDate(), deserialized.getCreationDate()); + assertEquals(metaData.getRoutingFactor(), deserialized.getRoutingFactor()); + assertEquals(metaData.primaryTerm(0), deserialized.primaryTerm(0)); + } + + public void testGetRoutingFactor() { + int numberOfReplicas = randomIntBetween(0, 10); + IndexMetaData metaData = IndexMetaData.builder("foo") + .settings(Settings.builder() + .put("index.version.created", 1) + .put("index.number_of_shards", 32) + .put("index.number_of_replicas", numberOfReplicas) + .build()) + .creationDate(randomLong()) + .build(); + Integer numShard = randomFrom(1, 2, 4, 8, 16); + int routingFactor = IndexMetaData.getRoutingFactor(metaData, numShard); + assertEquals(routingFactor * numShard, metaData.getNumberOfShards()); + + Integer brokenNumShards = randomFrom(3, 5, 9, 12, 29, 42, 64); + expectThrows(IllegalArgumentException.class, () -> IndexMetaData.getRoutingFactor(metaData, brokenNumShards)); + } + + public void testSelectShrinkShards() { + int numberOfReplicas = randomIntBetween(0, 10); + IndexMetaData metaData = IndexMetaData.builder("foo") + .settings(Settings.builder() + .put("index.version.created", 1) + .put("index.number_of_shards", 32) + .put("index.number_of_replicas", numberOfReplicas) + .build()) + .creationDate(randomLong()) + .build(); + Set shardIds = IndexMetaData.selectShrinkShards(0, metaData, 8); + assertEquals(shardIds, Sets.newHashSet(new ShardId(metaData.getIndex(), 0), new ShardId(metaData.getIndex(), 1), + new ShardId(metaData.getIndex(), 2), new ShardId(metaData.getIndex(), 3))); + shardIds = IndexMetaData.selectShrinkShards(1, metaData, 8); + assertEquals(shardIds, Sets.newHashSet(new ShardId(metaData.getIndex(), 4), new ShardId(metaData.getIndex(), 5), + new ShardId(metaData.getIndex(), 6), new ShardId(metaData.getIndex(), 7))); + shardIds = IndexMetaData.selectShrinkShards(7, metaData, 8); + assertEquals(shardIds, Sets.newHashSet(new ShardId(metaData.getIndex(), 28), new ShardId(metaData.getIndex(), 29), + new ShardId(metaData.getIndex(), 30), new ShardId(metaData.getIndex(), 31))); + + assertEquals("the number of target shards (8) must be greater than the shard id: 8", + expectThrows(IllegalArgumentException.class, () -> IndexMetaData.selectShrinkShards(8, metaData, 8)).getMessage()); + } +} diff --git a/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java index fc0d409f1cd59..53dffc799b1f5 100644 --- a/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java @@ -113,7 +113,7 @@ public void testValidateShrinkIndex() { ).getMessage()); assertEquals("the number of source shards [8] must be a must be a multiple of [3]", - expectThrows(IllegalStateException.class, () -> + expectThrows(IllegalArgumentException.class, () -> MetaDataCreateIndexService.validateShrinkIndex(createClusterState("source", 8, randomIntBetween(0, 10), Settings.builder().put("index.blocks.write", true).build()), "source", Collections.emptySet(), "target", Settings.builder().put("index.number_of_shards", 3).build()) From 6cbad6469535057e60b406ccbaefd41b34715dfe Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 3 Jun 2016 23:27:47 +0200 Subject: [PATCH 6/9] apply feedback --- .../cluster/metadata/MetaDataCreateIndexService.java | 1 - docs/reference/indices/shrink-index.asciidoc | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index fe5bf7049620a..2b4826358793b 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -297,7 +297,6 @@ public ClusterState execute(ClusterState currentState) throws Exception { indexSettingsBuilder.put(SETTING_INDEX_UUID, UUIDs.randomBase64UUID()); final Index shrinkFromIndex = request.shrinkFrom(); - int routingFactor = 1; int routingNumShards = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(indexSettingsBuilder.build());; if (shrinkFromIndex != null) { prepareShrinkIndexSettings(currentState, mappings.keySet(), indexSettingsBuilder, shrinkFromIndex, diff --git a/docs/reference/indices/shrink-index.asciidoc b/docs/reference/indices/shrink-index.asciidoc index 7d5ec6fc2b4fb..39c5134e23c1c 100644 --- a/docs/reference/indices/shrink-index.asciidoc +++ b/docs/reference/indices/shrink-index.asciidoc @@ -4,7 +4,7 @@ The shrink index API allows you to shrink an existing index into a new index with fewer primary shards. The number of primary shards in the target index must be a factor of the shards in the source index. For example an index with -`8` primary shards can be shrunk into `4`, `2` or `1` primary shard or an index +`8` primary shards can be shrunk into `4`, `2` or `1` primary shards or an index with `15` primary shards can be shrunk into `5`, `3` or `1`. If the number of shards in the index is a prime number it can only be shrunk into a single primary shard. Before shrinking, a (primary or replica) copy of every shard @@ -13,7 +13,7 @@ in the index must be present on the same node. Shrinking works as follows: * First, it creates a new target index with the same definition as the source - index, but with a viewer number of primary shards. + index, but with a smaller number of primary shards. * Then it hard-links segments from the source index into the target index. (If the file system doesn't support hard-linking, then all segments are copied @@ -69,7 +69,7 @@ the cluster state -- it doesn't wait for the shrink operation to start. [IMPORTANT] ===================================== -Indices can only be shrunk into a a viewer number of shard if they satisfy the following requirements: +Indices can only be shrunk if they satisfy the following requirements: * the target index must not exist From bb911759a1860c625b873251ba7ce9d2fe8d5169 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 6 Jun 2016 15:49:19 +0200 Subject: [PATCH 7/9] fix expected shard size calculation for shard shrinking --- .../decider/DiskThresholdDecider.java | 9 ++++++-- .../DiskThresholdDeciderUnitTests.java | 23 +++++++++++++++---- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index f64791d834a89..ca341451fed57 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -46,6 +46,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; import java.util.Set; @@ -653,10 +654,14 @@ public static final long getExpectedShardSize(ShardRouting shard, RoutingAllocat if (metaData.getMergeSourceIndex() != null && shard.allocatedPostIndexCreate(metaData) == false) { // in the shrink index case we sum up the source index shards since we basically make a copy of the shard in // the worst case - Index mergeSourceIndex = metaData.getMergeSourceIndex(); long targetShardSize = 0; + final Index mergeSourceIndex = metaData.getMergeSourceIndex(); + final IndexMetaData sourceIndexMeta = allocation.metaData().getIndexSafe(metaData.getMergeSourceIndex()); + final Set shardIds = IndexMetaData.selectShrinkShards(shard.id(), sourceIndexMeta, metaData.getNumberOfShards()); for (IndexShardRoutingTable shardRoutingTable : allocation.routingTable().index(mergeSourceIndex.getName())) { - targetShardSize += info.getShardSize(shardRoutingTable.primaryShard(), 0); + if (shardIds.contains(shardRoutingTable.shardId())) { + targetShardSize += info.getShardSize(shardRoutingTable.primaryShard(), 0); + } } return targetShardSize == 0 ? defaultValue : targetShardSize; } else { diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index 52ba45ab7ab6f..b2578510fbaf4 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -298,17 +298,22 @@ public void testSizeShrinkIndex() { ImmutableOpenMap.Builder shardSizes = ImmutableOpenMap.builder(); shardSizes.put("[test][0][p]", 10L); shardSizes.put("[test][1][p]", 100L); - shardSizes.put("[test][2][p]", 1000L); + shardSizes.put("[test][2][p]", 500L); + shardSizes.put("[test][3][p]", 500l); + ClusterInfo info = new DevNullClusterInfo(ImmutableOpenMap.of(), ImmutableOpenMap.of(), shardSizes.build()); MetaData.Builder metaBuilder = MetaData.builder(); metaBuilder.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT).put("index.uuid", "1234")) - .numberOfShards(3).numberOfReplicas(0)); + .numberOfShards(4).numberOfReplicas(0)); metaBuilder.put(IndexMetaData.builder("target").settings(settings(Version.CURRENT).put("index.uuid", "5678") .put("index.shrink.source.name", "test").put("index.shrink.source.uuid", "1234")).numberOfShards(1).numberOfReplicas(0)); + metaBuilder.put(IndexMetaData.builder("target2").settings(settings(Version.CURRENT).put("index.uuid", "9101112") + .put("index.shrink.source.name", "test").put("index.shrink.source.uuid", "1234")).numberOfShards(2).numberOfReplicas(0)); MetaData metaData = metaBuilder.build(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); routingTableBuilder.addAsNew(metaData.index("test")); routingTableBuilder.addAsNew(metaData.index("target")); + routingTableBuilder.addAsNew(metaData.index("target2")); ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT) .metaData(metaData).routingTable(routingTableBuilder.build()).build(); @@ -339,18 +344,26 @@ public void testSizeShrinkIndex() { new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); test_2 = ShardRoutingHelper.initialize(test_2, "node1"); - assertEquals(1000L, DiskThresholdDecider.getExpectedShardSize(test_2, allocation, 0)); + ShardRouting test_3 = ShardRouting.newUnassigned(new ShardId(index, 3), null, true, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); + test_3 = ShardRoutingHelper.initialize(test_3, "node1"); + assertEquals(500l, DiskThresholdDecider.getExpectedShardSize(test_3, allocation, 0)); + assertEquals(500L, DiskThresholdDecider.getExpectedShardSize(test_2, allocation, 0)); assertEquals(100L, DiskThresholdDecider.getExpectedShardSize(test_1, allocation, 0)); assertEquals(10L, DiskThresholdDecider.getExpectedShardSize(test_0, allocation, 0)); ShardRouting target = ShardRouting.newUnassigned(new ShardId(new Index("target", "5678"), 0), null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); - - assertEquals(1110L, DiskThresholdDecider.getExpectedShardSize(target, allocation, 0)); + ShardRouting target2 = ShardRouting.newUnassigned(new ShardId(new Index("target2", "9101112"), 0), + null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); + assertEquals(110L, DiskThresholdDecider.getExpectedShardSize(target2, allocation, 0)); + target2 = ShardRouting.newUnassigned(new ShardId(new Index("target2", "9101112"), 1), + null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); + assertEquals(1000L, DiskThresholdDecider.getExpectedShardSize(target2, allocation, 0)); } } From c4061fb2b68f336ed793bf7842bf55a2ee38ad7f Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 6 Jun 2016 16:49:14 +0200 Subject: [PATCH 8/9] make checkstyle happy --- .../allocation/decider/DiskThresholdDeciderUnitTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index b2578510fbaf4..aaed025a33e06 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -299,7 +299,7 @@ public void testSizeShrinkIndex() { shardSizes.put("[test][0][p]", 10L); shardSizes.put("[test][1][p]", 100L); shardSizes.put("[test][2][p]", 500L); - shardSizes.put("[test][3][p]", 500l); + shardSizes.put("[test][3][p]", 500L); ClusterInfo info = new DevNullClusterInfo(ImmutableOpenMap.of(), ImmutableOpenMap.of(), shardSizes.build()); MetaData.Builder metaBuilder = MetaData.builder(); From c7f7cdbbd1d4b07fb0955038bd7caf224a4d2e41 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 7 Jun 2016 10:04:15 +0200 Subject: [PATCH 9/9] add extra ensureGreen() --- .../action/admin/indices/create/CreateIndexIT.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java index f5333d1288efa..8fbb489d9c263 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java @@ -304,6 +304,10 @@ public void testCreateShrinkIndexToN() { assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2); DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class); String mergeNode = discoveryNodes[0].getName(); + // ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node + // if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due + // to the require._name below. + ensureGreen(); // relocate all shards to one node such that we can merge it. client().admin().indices().prepareUpdateSettings("source") .setSettings(Settings.builder()