Skip to content

Commit

Permalink
Add max_shard_size parameter for Shrink API (#5229)
Browse files Browse the repository at this point in the history
* Add max_shard_size parameter for Shrink API

Signed-off-by: Gao Binlong <[email protected]>

* add change log

Signed-off-by: Gao Binlong <[email protected]>

* fix yaml test failed

Signed-off-by: Gao Binlong <[email protected]>

* optimize the code

Signed-off-by: Gao Binlong <[email protected]>

* fix test failed

Signed-off-by: Gao Binlong <[email protected]>

* optimize changelog & code

Signed-off-by: Gao Binlong <[email protected]>

Signed-off-by: Gao Binlong <[email protected]>
  • Loading branch information
gaobinlong authored Nov 30, 2022
1 parent 0210b76 commit 953a3d6
Show file tree
Hide file tree
Showing 9 changed files with 522 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 2.x]
### Added
- Prevent deletion of snapshots that are backing searchable snapshot indexes ([#5069](https://github.com/opensearch-project/OpenSearch/pull/5069))
- Add max_shard_size parameter for shrink API ([#5229](https://github.com/opensearch-project/OpenSearch/pull/5229))

### Dependencies
- Bumps `bcpg-fips` from 1.0.5.1 to 1.0.7.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.unit.ByteSizeValue;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -58,6 +59,7 @@ public class ResizeRequest extends TimedRequest implements Validatable, ToXConte
private final String targetIndex;
private Settings settings = Settings.EMPTY;
private Set<Alias> aliases = new HashSet<>();
private ByteSizeValue maxShardSize;

/**
* Creates a new resize request
Expand Down Expand Up @@ -155,6 +157,24 @@ public ActiveShardCount getWaitForActiveShards() {
return waitForActiveShards;
}

/**
* Sets the maximum size of a primary shard in the new shrunken index.
* This parameter can be used to calculate the lowest factor of the source index's shards number
* which satisfies the maximum shard size requirement.
*
* @param maxShardSize the maximum size of a primary shard in the new shrunken index
*/
public void setMaxShardSize(ByteSizeValue maxShardSize) {
this.maxShardSize = maxShardSize;
}

/**
* Returns the maximum size of a primary shard in the new shrunken index.
*/
public ByteSizeValue getMaxShardSize() {
return maxShardSize;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Assert;
import org.opensearch.common.unit.ByteSizeValue;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -701,6 +702,8 @@ private void resizeTest(ResizeType resizeType, CheckedFunction<ResizeRequest, Re
RequestConvertersTests.setRandomWaitForActiveShards(resizeRequest::setWaitForActiveShards, expectedParams);
if (resizeType == ResizeType.SPLIT) {
resizeRequest.setSettings(Settings.builder().put("index.number_of_shards", 2).build());
} else if (resizeType == ResizeType.SHRINK) {
resizeRequest.setMaxShardSize(new ByteSizeValue(randomIntBetween(1, 1000)));
}

Request request = function.apply(resizeRequest);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
---
"Shrink index with max_shard_size":
# shrink index with max_shard_size parameter, which is used to generate an optimum
# number_of_shards for the target index.

- skip:
version: " - 2.9.99"
reason: "only available in 3.0+"
features: allowed_warnings

- do:
nodes.info:
node_id: data:true
- set:
nodes._arbitrary_key_: node_id

- do:
indices.create:
index: source
wait_for_active_shards: 1
body:
settings:
# ensure everything is allocated on the same data node
index.routing.allocation.include._id: $node_id
index.number_of_shards: 3
index.number_of_replicas: 0
- do:
index:
index: source
id: "1"
body: { "foo": "hello world" }

- do:
get:
index: source
id: "1"

- match: { _index: source }
- match: { _id: "1" }
- match: { _source: { foo: "hello world" } }

# make it read-only
- do:
indices.put_settings:
index: source
body:
index.blocks.write: true
index.number_of_replicas: 0

- do:
cluster.health:
wait_for_status: green
index: source

# shrink with max_shard_size
- do:
allowed_warnings:
- "Parameter [master_timeout] is deprecated and will be removed in 3.0. To support inclusive language, please use [cluster_manager_timeout] instead."
indices.shrink:
index: "source"
target: "new_shrunken_index"
wait_for_active_shards: 1
master_timeout: 10s
body:
settings:
index.number_of_replicas: 0
max_shard_size: "10gb"

- do:
cluster.health:
wait_for_status: green

- do:
get:
index: "new_shrunken_index"
id: "1"

- do:
indices.get_settings:
index: "new_shrunken_index"

- match: { new_shrunken_index.settings.index.number_of_shards: "1" }
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.opensearch.common.Priority;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.Index;
Expand All @@ -75,8 +76,8 @@
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.VersionUtils;

import java.util.Arrays;
Expand Down Expand Up @@ -760,4 +761,72 @@ public void testShrinkThenSplitWithFailedNode() throws Exception {
);
ensureGreen("splitagain");
}

public void testCreateShrinkIndexWithMaxShardSize() {
internalCluster().ensureAtLeastNumDataNodes(2);
final String shrinkNode = internalCluster().startDataOnlyNode();

final int shardCount = between(2, 5);
prepareCreate("source").setSettings(
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, shardCount)
).get();
for (int i = 0; i < 20; i++) {
client().prepareIndex("source").setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", XContentType.JSON).get();
}
client().admin().indices().prepareFlush("source").get();
ensureGreen();

client().admin()
.indices()
.prepareUpdateSettings("source")
.setSettings(
Settings.builder()
.put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(), shrinkNode)
.put(IndexMetadata.SETTING_BLOCKS_WRITE, true)
)
.get();
ensureGreen();

// Cannot set max_shard_size and index.number_of_shards at the same time
IllegalArgumentException exc = expectThrows(
IllegalArgumentException.class,
() -> client().admin()
.indices()
.prepareResizeIndex("source", "target")
.setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
)
.setMaxShardSize(new ByteSizeValue(1))
.setResizeType(ResizeType.SHRINK)
.get()
);
assertEquals(exc.getMessage(), "Cannot set max_shard_size and index.number_of_shards at the same time!");

// use max_shard_size to calculate the target index's shards number
// set max_shard_size to 1 then the target index's shards number will be same with the source index's
assertAcked(
client().admin()
.indices()
.prepareResizeIndex("source", "target")
.setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.putNull(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey())
.build()
)
.setMaxShardSize(new ByteSizeValue(1))
.setResizeType(ResizeType.SHRINK)
.get()
);
ensureGreen();

GetSettingsResponse target = client().admin().indices().prepareGetSettings("target").get();
assertEquals(String.valueOf(shardCount), target.getIndexToSettings().get("target").get("index.number_of_shards"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

package org.opensearch.action.admin.indices.shrink;

import org.opensearch.Version;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.IndicesRequest;
import org.opensearch.action.admin.indices.alias.Alias;
Expand All @@ -46,6 +47,7 @@
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.unit.ByteSizeValue;

import java.io.IOException;
import java.util.Objects;
Expand All @@ -60,6 +62,8 @@
public class ResizeRequest extends AcknowledgedRequest<ResizeRequest> implements IndicesRequest, ToXContentObject {

public static final ObjectParser<ResizeRequest, Void> PARSER = new ObjectParser<>("resize_request");
private static final ParseField MAX_SHARD_SIZE = new ParseField("max_shard_size");

static {
PARSER.declareField(
(parser, request, context) -> request.getTargetIndexRequest().settings(parser.map()),
Expand All @@ -71,19 +75,29 @@ public class ResizeRequest extends AcknowledgedRequest<ResizeRequest> implements
new ParseField("aliases"),
ObjectParser.ValueType.OBJECT
);
PARSER.declareField(
ResizeRequest::setMaxShardSize,
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_SHARD_SIZE.getPreferredName()),
MAX_SHARD_SIZE,
ObjectParser.ValueType.STRING
);
}

private CreateIndexRequest targetIndexRequest;
private String sourceIndex;
private ResizeType type = ResizeType.SHRINK;
private Boolean copySettings = true;
private ByteSizeValue maxShardSize;

public ResizeRequest(StreamInput in) throws IOException {
super(in);
targetIndexRequest = new CreateIndexRequest(in);
sourceIndex = in.readString();
type = in.readEnum(ResizeType.class);
copySettings = in.readOptionalBoolean();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
maxShardSize = in.readOptionalWriteable(ByteSizeValue::new);
}
}

ResizeRequest() {}
Expand All @@ -108,6 +122,9 @@ public ActionRequestValidationException validate() {
if (type == ResizeType.SPLIT && IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.exists(targetIndexRequest.settings()) == false) {
validationException = addValidationError("index.number_of_shards is required for split operations", validationException);
}
if (maxShardSize != null && maxShardSize.getBytes() <= 0) {
validationException = addValidationError("max_shard_size must be greater than 0", validationException);
}
assert copySettings == null || copySettings;
return validationException;
}
Expand All @@ -123,6 +140,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(sourceIndex);
out.writeEnum(type);
out.writeOptionalBoolean(copySettings);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalWriteable(maxShardSize);
}
}

@Override
Expand Down Expand Up @@ -205,6 +225,24 @@ public Boolean getCopySettings() {
return copySettings;
}

/**
* Sets the maximum size of a primary shard in the new shrunken index.
* This parameter can be used to calculate the lowest factor of the source index's shards number
* which satisfies the maximum shard size requirement.
*
* @param maxShardSize the maximum size of a primary shard in the new shrunken index
*/
public void setMaxShardSize(ByteSizeValue maxShardSize) {
this.maxShardSize = maxShardSize;
}

/**
* Returns the maximum size of a primary shard in the new shrunken index.
*/
public ByteSizeValue getMaxShardSize() {
return maxShardSize;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand All @@ -221,6 +259,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
}
builder.endObject();
if (maxShardSize != null) {
builder.field(MAX_SHARD_SIZE.getPreferredName(), maxShardSize);
}
}
builder.endObject();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.action.support.master.AcknowledgedRequestBuilder;
import org.opensearch.client.OpenSearchClient;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeValue;

/**
* Transport request builder for resizing an index
Expand Down Expand Up @@ -95,4 +96,12 @@ public ResizeRequestBuilder setResizeType(ResizeType type) {
this.request.setResizeType(type);
return this;
}

/**
* Sets the maximum size of a primary shard in the new shrunken index.
*/
public ResizeRequestBuilder setMaxShardSize(ByteSizeValue maxShardSize) {
this.request.setMaxShardSize(maxShardSize);
return this;
}
}
Loading

0 comments on commit 953a3d6

Please sign in to comment.