From 7e489cef1b248f485da51a7185ecbee2eb85b592 Mon Sep 17 00:00:00 2001 From: gaobinlong Date: Fri, 9 Dec 2022 22:35:59 +0800 Subject: [PATCH] Add max_shard_size parameter for Shrink API (#5229) (#5498) * Add max_shard_size parameter for Shrink API Signed-off-by: Gao Binlong * add change log Signed-off-by: Gao Binlong * fix yaml test failed Signed-off-by: Gao Binlong * optimize the code Signed-off-by: Gao Binlong * fix test failed Signed-off-by: Gao Binlong * optimize changelog & code Signed-off-by: Gao Binlong Signed-off-by: Gao Binlong (cherry picked from commit 953a3d6851632c9b09c8dc15dbff02c8aafac6c1) --- CHANGELOG.md | 1 + .../client/indices/ResizeRequest.java | 20 ++ .../client/IndicesRequestConvertersTests.java | 3 + .../test/indices.shrink/40_max_shard_size.yml | 82 ++++++ .../admin/indices/create/ShrinkIndexIT.java | 71 ++++- .../admin/indices/shrink/ResizeRequest.java | 41 +++ .../indices/shrink/ResizeRequestBuilder.java | 9 + .../indices/shrink/TransportResizeAction.java | 58 ++++- .../shrink/TransportResizeActionTests.java | 245 +++++++++++++++++- 9 files changed, 522 insertions(+), 8 deletions(-) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/indices.shrink/40_max_shard_size.yml diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ab0531c52bc8..3ce004c561afc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Prevent deletion of snapshots that are backing searchable snapshot indexes ([#5069](https://github.com/opensearch-project/OpenSearch/pull/5069)) - Update to Gradle 7.6 ([#5382](https://github.com/opensearch-project/OpenSearch/pull/5382)) - Reject bulk requests with invalid actions ([#5299](https://github.com/opensearch-project/OpenSearch/issues/5299)) +- Add max_shard_size parameter for shrink API ([#5229](https://github.com/opensearch-project/OpenSearch/pull/5229)) ### Dependencies - Bump bcpg-fips from 1.0.5.1 to 1.0.7.1 ([#5148](https://github.com/opensearch-project/OpenSearch/pull/5148)) - Bumps `commons-compress` from 1.21 to 1.22 ([#5104](https://github.com/opensearch-project/OpenSearch/pull/5104)) diff --git a/client/rest-high-level/src/main/java/org/opensearch/client/indices/ResizeRequest.java b/client/rest-high-level/src/main/java/org/opensearch/client/indices/ResizeRequest.java index 2a22c8d7d19e9..ebbd813c9fe15 100644 --- a/client/rest-high-level/src/main/java/org/opensearch/client/indices/ResizeRequest.java +++ b/client/rest-high-level/src/main/java/org/opensearch/client/indices/ResizeRequest.java @@ -39,6 +39,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.ToXContentObject; import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.unit.ByteSizeValue; import java.io.IOException; import java.util.Collections; @@ -58,6 +59,7 @@ public class ResizeRequest extends TimedRequest implements Validatable, ToXConte private final String targetIndex; private Settings settings = Settings.EMPTY; private Set aliases = new HashSet<>(); + private ByteSizeValue maxShardSize; /** * Creates a new resize request @@ -155,6 +157,24 @@ public ActiveShardCount getWaitForActiveShards() { return waitForActiveShards; } + /** + * Sets the maximum size of a primary shard in the new shrunken index. + * This parameter can be used to calculate the lowest factor of the source index's shards number + * which satisfies the maximum shard size requirement. + * + * @param maxShardSize the maximum size of a primary shard in the new shrunken index + */ + public void setMaxShardSize(ByteSizeValue maxShardSize) { + this.maxShardSize = maxShardSize; + } + + /** + * Returns the maximum size of a primary shard in the new shrunken index. + */ + public ByteSizeValue getMaxShardSize() { + return maxShardSize; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/IndicesRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/opensearch/client/IndicesRequestConvertersTests.java index f853378e789fa..be3dc76cd0e97 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/IndicesRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/IndicesRequestConvertersTests.java @@ -79,6 +79,7 @@ import org.opensearch.common.xcontent.XContentType; import org.opensearch.test.OpenSearchTestCase; import org.junit.Assert; +import org.opensearch.common.unit.ByteSizeValue; import java.io.IOException; import java.util.Arrays; @@ -701,6 +702,8 @@ private void resizeTest(ResizeType resizeType, CheckedFunction client().admin() + .indices() + .prepareResizeIndex("source", "target") + .setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build() + ) + .setMaxShardSize(new ByteSizeValue(1)) + .setResizeType(ResizeType.SHRINK) + .get() + ); + assertEquals(exc.getMessage(), "Cannot set max_shard_size and index.number_of_shards at the same time!"); + + // use max_shard_size to calculate the target index's shards number + // set max_shard_size to 1 then the target index's shards number will be same with the source index's + assertAcked( + client().admin() + .indices() + .prepareResizeIndex("source", "target") + .setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .putNull(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey()) + .build() + ) + .setMaxShardSize(new ByteSizeValue(1)) + .setResizeType(ResizeType.SHRINK) + .get() + ); + ensureGreen(); + + GetSettingsResponse target = client().admin().indices().prepareGetSettings("target").get(); + assertEquals(String.valueOf(shardCount), target.getIndexToSettings().get("target").get("index.number_of_shards")); + } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/shrink/ResizeRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/shrink/ResizeRequest.java index 50784e60a3f19..20e9eb5e6b7d6 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/shrink/ResizeRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/shrink/ResizeRequest.java @@ -32,6 +32,7 @@ package org.opensearch.action.admin.indices.shrink; import org.opensearch.LegacyESVersion; +import org.opensearch.Version; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.IndicesRequest; import org.opensearch.action.admin.indices.alias.Alias; @@ -47,6 +48,7 @@ import org.opensearch.common.xcontent.ToXContentObject; import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.common.unit.ByteSizeValue; import java.io.IOException; import java.util.Objects; @@ -61,6 +63,8 @@ public class ResizeRequest extends AcknowledgedRequest implements IndicesRequest, ToXContentObject { public static final ObjectParser PARSER = new ObjectParser<>("resize_request"); + private static final ParseField MAX_SHARD_SIZE = new ParseField("max_shard_size"); + static { PARSER.declareField( (parser, request, context) -> request.getTargetIndexRequest().settings(parser.map()), @@ -72,12 +76,19 @@ public class ResizeRequest extends AcknowledgedRequest implements new ParseField("aliases"), ObjectParser.ValueType.OBJECT ); + PARSER.declareField( + ResizeRequest::setMaxShardSize, + (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_SHARD_SIZE.getPreferredName()), + MAX_SHARD_SIZE, + ObjectParser.ValueType.STRING + ); } private CreateIndexRequest targetIndexRequest; private String sourceIndex; private ResizeType type = ResizeType.SHRINK; private Boolean copySettings = true; + private ByteSizeValue maxShardSize; public ResizeRequest(StreamInput in) throws IOException { super(in); @@ -85,6 +96,9 @@ public ResizeRequest(StreamInput in) throws IOException { sourceIndex = in.readString(); type = in.readEnum(ResizeType.class); copySettings = in.readOptionalBoolean(); + if (in.getVersion().onOrAfter(Version.V_2_5_0)) { + maxShardSize = in.readOptionalWriteable(ByteSizeValue::new); + } } ResizeRequest() {} @@ -109,6 +123,9 @@ public ActionRequestValidationException validate() { if (type == ResizeType.SPLIT && IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.exists(targetIndexRequest.settings()) == false) { validationException = addValidationError("index.number_of_shards is required for split operations", validationException); } + if (maxShardSize != null && maxShardSize.getBytes() <= 0) { + validationException = addValidationError("max_shard_size must be greater than 0", validationException); + } assert copySettings == null || copySettings; return validationException; } @@ -127,6 +144,9 @@ public void writeTo(StreamOutput out) throws IOException { } out.writeEnum(type); out.writeOptionalBoolean(copySettings); + if (out.getVersion().onOrAfter(Version.V_2_5_0)) { + out.writeOptionalWriteable(maxShardSize); + } } @Override @@ -209,6 +229,24 @@ public Boolean getCopySettings() { return copySettings; } + /** + * Sets the maximum size of a primary shard in the new shrunken index. + * This parameter can be used to calculate the lowest factor of the source index's shards number + * which satisfies the maximum shard size requirement. + * + * @param maxShardSize the maximum size of a primary shard in the new shrunken index + */ + public void setMaxShardSize(ByteSizeValue maxShardSize) { + this.maxShardSize = maxShardSize; + } + + /** + * Returns the maximum size of a primary shard in the new shrunken index. + */ + public ByteSizeValue getMaxShardSize() { + return maxShardSize; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -225,6 +263,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } } builder.endObject(); + if (maxShardSize != null) { + builder.field(MAX_SHARD_SIZE.getPreferredName(), maxShardSize); + } } builder.endObject(); return builder; diff --git a/server/src/main/java/org/opensearch/action/admin/indices/shrink/ResizeRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/indices/shrink/ResizeRequestBuilder.java index 418e83a5431ec..eb05c0a69b78b 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/shrink/ResizeRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/shrink/ResizeRequestBuilder.java @@ -37,6 +37,7 @@ import org.opensearch.action.support.master.AcknowledgedRequestBuilder; import org.opensearch.client.OpenSearchClient; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.ByteSizeValue; /** * Transport request builder for resizing an index @@ -95,4 +96,12 @@ public ResizeRequestBuilder setResizeType(ResizeType type) { this.request.setResizeType(type); return this; } + + /** + * Sets the maximum size of a primary shard in the new shrunken index. + */ + public ResizeRequestBuilder setMaxShardSize(ByteSizeValue maxShardSize) { + this.request.setMaxShardSize(maxShardSize); + return this; + } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/shrink/TransportResizeAction.java b/server/src/main/java/org/opensearch/action/admin/indices/shrink/TransportResizeAction.java index ba079aeb03921..7f55e5efe801b 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/shrink/TransportResizeAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/shrink/TransportResizeAction.java @@ -57,6 +57,8 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; +import org.opensearch.common.unit.ByteSizeValue; +import org.opensearch.index.store.StoreStats; import java.io.IOException; import java.util.Locale; @@ -141,11 +143,12 @@ protected void clusterManagerOperation( .prepareStats(sourceIndex) .clear() .setDocs(true) + .setStore(true) .execute(ActionListener.delegateFailure(listener, (delegatedListener, indicesStatsResponse) -> { CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(resizeRequest, state, i -> { IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i); return shard == null ? null : shard.getPrimary().getDocs(); - }, sourceIndex, targetIndex); + }, indicesStatsResponse.getPrimaries().store, sourceIndex, targetIndex); createIndexService.createIndex( updateRequest, ActionListener.map( @@ -162,6 +165,7 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest( final ResizeRequest resizeRequest, final ClusterState state, final IntFunction perShardDocStats, + final StoreStats primaryShardsStoreStats, String sourceIndexName, String targetIndexName ) { @@ -176,12 +180,22 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest( targetIndexSettingsBuilder.remove(IndexMetadata.SETTING_HISTORY_UUID); final Settings targetIndexSettings = targetIndexSettingsBuilder.build(); final int numShards; + + // max_shard_size is only supported for shrink + ByteSizeValue maxShardSize = resizeRequest.getMaxShardSize(); + if (resizeRequest.getResizeType() != ResizeType.SHRINK && maxShardSize != null) { + throw new IllegalArgumentException("Unsupported parameter [max_shard_size]"); + } + if (IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.exists(targetIndexSettings)) { numShards = IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.get(targetIndexSettings); + if (resizeRequest.getResizeType() == ResizeType.SHRINK && maxShardSize != null) { + throw new IllegalArgumentException("Cannot set max_shard_size and index.number_of_shards at the same time!"); + } } else { assert resizeRequest.getResizeType() != ResizeType.SPLIT : "split must specify the number of shards explicitly"; if (resizeRequest.getResizeType() == ResizeType.SHRINK) { - numShards = 1; + numShards = calculateTargetIndexShardsNum(maxShardSize, primaryShardsStoreStats, metadata); } else { assert resizeRequest.getResizeType() == ResizeType.CLONE; numShards = metadata.getNumberOfShards(); @@ -250,6 +264,46 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest( .copySettings(resizeRequest.getCopySettings() == null ? false : resizeRequest.getCopySettings()); } + /** + * Calculate target index's shards count according to max_shard_ize and the source index's storage(only primary shards included) + * for shrink. Target index's shards count is the lowest factor of the source index's primary shards count which satisfies the + * maximum shard size requirement. If max_shard_size is less than the source index's single shard size, then target index's shards count + * will be equal to the source index's shards count. + * @param maxShardSize the maximum size of a primary shard in the target index + * @param sourceIndexShardStoreStats primary shards' store stats of the source index + * @param sourceIndexMetaData source index's metadata + * @return target index's shards number + */ + protected static int calculateTargetIndexShardsNum( + ByteSizeValue maxShardSize, + StoreStats sourceIndexShardStoreStats, + IndexMetadata sourceIndexMetaData + ) { + if (maxShardSize == null + || sourceIndexShardStoreStats == null + || maxShardSize.getBytes() == 0 + || sourceIndexShardStoreStats.getSizeInBytes() == 0) { + return 1; + } + + int sourceIndexShardsNum = sourceIndexMetaData.getNumberOfShards(); + // calculate the minimum shards count according to source index's storage, ceiling ensures that the minimum shards count is never + // less than 1 + int minValue = (int) Math.ceil((double) sourceIndexShardStoreStats.getSizeInBytes() / maxShardSize.getBytes()); + // if minimum shards count is greater than the source index's shards count, then the source index's shards count will be returned + if (minValue >= sourceIndexShardsNum) { + return sourceIndexShardsNum; + } + + // find the lowest factor of the source index's shards count here, because minimum shards count may not be a factor + for (int i = minValue; i < sourceIndexShardsNum; i++) { + if (sourceIndexShardsNum % i == 0) { + return i; + } + } + return sourceIndexShardsNum; + } + @Override protected String getClusterManagerActionName(DiscoveryNode node) { return super.getClusterManagerActionName(node); diff --git a/server/src/test/java/org/opensearch/action/admin/indices/shrink/TransportResizeActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/shrink/TransportResizeActionTests.java index e4b79ac54f8fd..5705362cc73f4 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/shrink/TransportResizeActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/shrink/TransportResizeActionTests.java @@ -38,8 +38,8 @@ import org.opensearch.action.support.ActiveShardCount; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.OpenSearchAllocationTestCase; import org.opensearch.cluster.EmptyClusterInfoService; +import org.opensearch.cluster.OpenSearchAllocationTestCase; import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; @@ -52,7 +52,9 @@ import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.index.shard.DocsStats; +import org.opensearch.index.store.StoreStats; import org.opensearch.snapshots.EmptySnapshotsInfoService; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.gateway.TestGatewayAllocator; @@ -107,6 +109,7 @@ public void testErrorCondition() { new ResizeRequest("target", "source"), state, (i) -> new DocsStats(Integer.MAX_VALUE, between(1, 1000), between(1, 100)), + new StoreStats(between(1, 10000), between(1, 10000)), "source", "target" ) @@ -121,6 +124,7 @@ public void testErrorCondition() { req, clusterState, (i) -> i == 2 || i == 3 ? new DocsStats(Integer.MAX_VALUE / 2, between(1, 1000), between(1, 10000)) : null, + new StoreStats(between(1, 10000), between(1, 10000)), "source", "target" ); @@ -139,6 +143,7 @@ public void testErrorCondition() { req, clusterState, (i) -> new DocsStats(between(10, 1000), between(1, 10), between(1, 10000)), + new StoreStats(between(1, 10000), between(1, 10000)), "source", "target" ); @@ -167,6 +172,7 @@ public void testErrorCondition() { new ResizeRequest("target", "source"), clusterState, (i) -> new DocsStats(between(1, 1000), between(1, 1000), between(0, 10000)), + new StoreStats(between(1, 10000), between(1, 10000)), "source", "target" ); @@ -193,13 +199,27 @@ public void testPassNumRoutingShards() { ResizeRequest resizeRequest = new ResizeRequest("target", "source"); resizeRequest.setResizeType(ResizeType.SPLIT); resizeRequest.getTargetIndexRequest().settings(Settings.builder().put("index.number_of_shards", 2).build()); - TransportResizeAction.prepareCreateIndexRequest(resizeRequest, clusterState, null, "source", "target"); + TransportResizeAction.prepareCreateIndexRequest( + resizeRequest, + clusterState, + null, + new StoreStats(between(1, 10000), between(1, 10000)), + "source", + "target" + ); resizeRequest.getTargetIndexRequest() .settings( Settings.builder().put("index.number_of_routing_shards", randomIntBetween(2, 10)).put("index.number_of_shards", 2).build() ); - TransportResizeAction.prepareCreateIndexRequest(resizeRequest, clusterState, null, "source", "target"); + TransportResizeAction.prepareCreateIndexRequest( + resizeRequest, + clusterState, + null, + new StoreStats(between(1, 10000), between(1, 10000)), + "source", + "target" + ); } public void testPassNumRoutingShardsAndFail() { @@ -224,7 +244,14 @@ public void testPassNumRoutingShardsAndFail() { ResizeRequest resizeRequest = new ResizeRequest("target", "source"); resizeRequest.setResizeType(ResizeType.SPLIT); resizeRequest.getTargetIndexRequest().settings(Settings.builder().put("index.number_of_shards", numShards * 2).build()); - TransportResizeAction.prepareCreateIndexRequest(resizeRequest, clusterState, null, "source", "target"); + TransportResizeAction.prepareCreateIndexRequest( + resizeRequest, + clusterState, + null, + new StoreStats(between(1, 10000), between(1, 10000)), + "source", + "target" + ); resizeRequest.getTargetIndexRequest() .settings( @@ -233,7 +260,14 @@ public void testPassNumRoutingShardsAndFail() { ClusterState finalState = clusterState; IllegalArgumentException iae = expectThrows( IllegalArgumentException.class, - () -> TransportResizeAction.prepareCreateIndexRequest(resizeRequest, finalState, null, "source", "target") + () -> TransportResizeAction.prepareCreateIndexRequest( + resizeRequest, + finalState, + null, + new StoreStats(between(1, 10000), between(1, 10000)), + "source", + "target" + ) ); assertEquals("cannot provide index.number_of_routing_shards on resize", iae.getMessage()); } @@ -266,6 +300,7 @@ public void testShrinkIndexSettings() { target, clusterState, (i) -> stats, + new StoreStats(between(1, 10000), between(1, 10000)), indexName, "target" ); @@ -276,6 +311,206 @@ public void testShrinkIndexSettings() { assertEquals(request.waitForActiveShards(), activeShardCount); } + public void testShrinkWithMaxShardSize() { + String indexName = randomAlphaOfLength(10); + // create one that won't fail + ClusterState clusterState = ClusterState.builder( + createClusterState(indexName, 10, 0, Settings.builder().put("index.blocks.write", true).build()) + ).nodes(DiscoveryNodes.builder().add(newNode("node1"))).build(); + + // Cannot set max_shard_size when split or clone + ResizeRequest resizeRequestForFailure = new ResizeRequest("target", indexName); + ResizeType resizeType = ResizeType.SPLIT; + if (randomBoolean()) { + resizeType = ResizeType.CLONE; + } + resizeRequestForFailure.setResizeType(resizeType); + resizeRequestForFailure.setMaxShardSize(new ByteSizeValue(100)); + resizeRequestForFailure.getTargetIndexRequest() + .settings(Settings.builder().put("index.number_of_shards", randomIntBetween(1, 100)).build()); + ClusterState finalState = clusterState; + IllegalArgumentException iae = expectThrows( + IllegalArgumentException.class, + () -> TransportResizeAction.prepareCreateIndexRequest( + resizeRequestForFailure, + finalState, + null, + new StoreStats(between(1, 10000), between(1, 10000)), + indexName, + "target" + ) + ); + assertEquals("Unsupported parameter [max_shard_size]", iae.getMessage()); + + // Cannot set max_shard_size and index.number_of_shards at the same time + ResizeRequest resizeRequest = new ResizeRequest("target", indexName); + resizeRequest.setResizeType(ResizeType.SHRINK); + resizeRequest.setMaxShardSize(new ByteSizeValue(100)); + resizeRequest.getTargetIndexRequest().settings(Settings.builder().put("index.number_of_shards", randomIntBetween(1, 100)).build()); + iae = expectThrows( + IllegalArgumentException.class, + () -> TransportResizeAction.prepareCreateIndexRequest( + resizeRequest, + finalState, + null, + new StoreStats(between(1, 10000), between(1, 10000)), + indexName, + "target" + ) + ); + assertEquals("Cannot set max_shard_size and index.number_of_shards at the same time!", iae.getMessage()); + + AllocationService service = new AllocationService( + new AllocationDeciders(Collections.singleton(new MaxRetryAllocationDecider())), + new TestGatewayAllocator(), + new BalancedShardsAllocator(Settings.EMPTY), + EmptyClusterInfoService.INSTANCE, + EmptySnapshotsInfoService.INSTANCE + ); + RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + // now we start the shard + routingTable = OpenSearchAllocationTestCase.startInitializingShardsAndReroute(service, clusterState, indexName).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + int numSourceShards = clusterState.metadata().index(indexName).getNumberOfShards(); + DocsStats stats = new DocsStats(between(0, (IndexWriter.MAX_DOCS) / numSourceShards), between(1, 1000), between(1, 10000)); + + // target index's shards number must be the lowest factor of the source index's shards number + int expectedShardsNum = 5; + resizeRequest.setMaxShardSize(new ByteSizeValue(25)); + // clear index settings + resizeRequest.getTargetIndexRequest().settings(Settings.builder().build()); + resizeRequest.setWaitForActiveShards(expectedShardsNum); + CreateIndexClusterStateUpdateRequest request = TransportResizeAction.prepareCreateIndexRequest( + resizeRequest, + clusterState, + (i) -> stats, + new StoreStats(100, between(1, 10000)), + indexName, + "target" + ); + assertNotNull(request.recoverFrom()); + assertEquals(indexName, request.recoverFrom().getName()); + assertEquals(String.valueOf(expectedShardsNum), request.settings().get("index.number_of_shards")); + assertEquals("shrink_index", request.cause()); + assertEquals(request.waitForActiveShards(), ActiveShardCount.from(expectedShardsNum)); + + // if max_shard_size is greater than whole of the source primary shards' storage, + // then the target index will only have one primary shard. + expectedShardsNum = 1; + resizeRequest.setMaxShardSize(new ByteSizeValue(1000)); + // clear index settings + resizeRequest.getTargetIndexRequest().settings(Settings.builder().build()); + resizeRequest.setWaitForActiveShards(expectedShardsNum); + request = TransportResizeAction.prepareCreateIndexRequest( + resizeRequest, + clusterState, + (i) -> stats, + new StoreStats(100, between(1, 10000)), + indexName, + "target" + ); + assertNotNull(request.recoverFrom()); + assertEquals(indexName, request.recoverFrom().getName()); + assertEquals(String.valueOf(expectedShardsNum), request.settings().get("index.number_of_shards")); + assertEquals("shrink_index", request.cause()); + assertEquals(request.waitForActiveShards(), ActiveShardCount.from(expectedShardsNum)); + + // if max_shard_size is less than the primary shard's storage of the source index, + // then the target index's shards number will be equal to the source index's. + expectedShardsNum = numSourceShards; + resizeRequest.setMaxShardSize(new ByteSizeValue(1)); + // clear index settings + resizeRequest.getTargetIndexRequest().settings(Settings.builder().build()); + resizeRequest.setWaitForActiveShards(expectedShardsNum); + request = TransportResizeAction.prepareCreateIndexRequest( + resizeRequest, + clusterState, + (i) -> stats, + new StoreStats(100, between(1, 10000)), + indexName, + "target" + ); + assertNotNull(request.recoverFrom()); + assertEquals(indexName, request.recoverFrom().getName()); + assertEquals(String.valueOf(expectedShardsNum), request.settings().get("index.number_of_shards")); + assertEquals("shrink_index", request.cause()); + assertEquals(request.waitForActiveShards(), ActiveShardCount.from(expectedShardsNum)); + } + + public void testCalculateTargetIndexShardsNum() { + String indexName = randomAlphaOfLength(10); + ClusterState clusterState = ClusterState.builder( + createClusterState(indexName, randomIntBetween(2, 10), 0, Settings.builder().put("index.blocks.write", true).build()) + ).nodes(DiscoveryNodes.builder().add(newNode("node1"))).build(); + IndexMetadata indexMetadata = clusterState.metadata().index(indexName); + + assertEquals(TransportResizeAction.calculateTargetIndexShardsNum(null, new StoreStats(100, between(1, 10000)), indexMetadata), 1); + assertEquals( + TransportResizeAction.calculateTargetIndexShardsNum( + new ByteSizeValue(0), + new StoreStats(100, between(1, 10000)), + indexMetadata + ), + 1 + ); + assertEquals(TransportResizeAction.calculateTargetIndexShardsNum(new ByteSizeValue(1), null, indexMetadata), 1); + assertEquals(TransportResizeAction.calculateTargetIndexShardsNum(new ByteSizeValue(1), new StoreStats(0, 0), indexMetadata), 1); + assertEquals( + TransportResizeAction.calculateTargetIndexShardsNum( + new ByteSizeValue(1000), + new StoreStats(100, between(1, 10000)), + indexMetadata + ), + 1 + ); + assertEquals( + TransportResizeAction.calculateTargetIndexShardsNum( + new ByteSizeValue(1), + new StoreStats(100, between(1, 10000)), + indexMetadata + ), + indexMetadata.getNumberOfShards() + ); + + clusterState = ClusterState.builder( + createClusterState(indexName, 10, 0, Settings.builder().put("index.blocks.write", true).build()) + ).nodes(DiscoveryNodes.builder().add(newNode("node1"))).build(); + indexMetadata = clusterState.metadata().index(indexName); + assertEquals( + TransportResizeAction.calculateTargetIndexShardsNum( + new ByteSizeValue(10), + new StoreStats(100, between(1, 10000)), + indexMetadata + ), + 10 + ); + assertEquals( + TransportResizeAction.calculateTargetIndexShardsNum( + new ByteSizeValue(12), + new StoreStats(100, between(1, 10000)), + indexMetadata + ), + indexMetadata.getNumberOfShards() + ); + assertEquals( + TransportResizeAction.calculateTargetIndexShardsNum( + new ByteSizeValue(20), + new StoreStats(100, between(1, 10000)), + indexMetadata + ), + 5 + ); + assertEquals( + TransportResizeAction.calculateTargetIndexShardsNum( + new ByteSizeValue(50), + new StoreStats(100, between(1, 10000)), + indexMetadata + ), + 2 + ); + } + private DiscoveryNode newNode(String nodeId) { final Set roles = Collections.unmodifiableSet( new HashSet<>(Arrays.asList(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE))