forked from opensearch-project/OpenSearch
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add rest, transport layer changes for Hot to warm tiering - dedicated…
… setup (opensearch-project#13980) (opensearch-project#14889) Signed-off-by: Neetika Singhal <[email protected]> Signed-off-by: kkewwei <[email protected]>
- Loading branch information
1 parent
1c21b08
commit 1b8a407
Showing
17 changed files
with
1,630 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
28 changes: 28 additions & 0 deletions
28
server/src/main/java/org/opensearch/action/admin/indices/tiering/HotToWarmTieringAction.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.action.admin.indices.tiering; | ||
|
||
import org.opensearch.action.ActionType; | ||
import org.opensearch.common.annotation.ExperimentalApi; | ||
|
||
/** | ||
* Tiering action to move indices from hot to warm | ||
* | ||
* @opensearch.experimental | ||
*/ | ||
@ExperimentalApi | ||
public class HotToWarmTieringAction extends ActionType<HotToWarmTieringResponse> { | ||
|
||
public static final HotToWarmTieringAction INSTANCE = new HotToWarmTieringAction(); | ||
public static final String NAME = "indices:admin/tier/hot_to_warm"; | ||
|
||
private HotToWarmTieringAction() { | ||
super(NAME, HotToWarmTieringResponse::new); | ||
} | ||
} |
157 changes: 157 additions & 0 deletions
157
...r/src/main/java/org/opensearch/action/admin/indices/tiering/HotToWarmTieringResponse.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.action.admin.indices.tiering; | ||
|
||
import org.opensearch.action.support.master.AcknowledgedResponse; | ||
import org.opensearch.common.annotation.ExperimentalApi; | ||
import org.opensearch.core.common.Strings; | ||
import org.opensearch.core.common.io.stream.StreamInput; | ||
import org.opensearch.core.common.io.stream.StreamOutput; | ||
import org.opensearch.core.common.io.stream.Writeable; | ||
import org.opensearch.core.xcontent.MediaTypeRegistry; | ||
import org.opensearch.core.xcontent.ToXContentFragment; | ||
import org.opensearch.core.xcontent.XContentBuilder; | ||
|
||
import java.io.IOException; | ||
import java.util.Collections; | ||
import java.util.Comparator; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.stream.Collectors; | ||
|
||
/** | ||
* Response object for an {@link TieringIndexRequest} which is sent to client after the initial verification of the request | ||
* by the backend service. The format of the response object will be as below: | ||
* | ||
* { | ||
* "acknowledged": true/false, | ||
* "failed_indices": [ | ||
* { | ||
* "index": "index1", | ||
* "error": "Low disk threshold watermark breached" | ||
* }, | ||
* { | ||
* "index": "index2", | ||
* "error": "Index is not a remote store backed index" | ||
* } | ||
* ] | ||
* } | ||
* | ||
* @opensearch.experimental | ||
*/ | ||
@ExperimentalApi | ||
public class HotToWarmTieringResponse extends AcknowledgedResponse { | ||
|
||
private final List<IndexResult> failedIndices; | ||
|
||
public HotToWarmTieringResponse(boolean acknowledged) { | ||
super(acknowledged); | ||
this.failedIndices = Collections.emptyList(); | ||
} | ||
|
||
public HotToWarmTieringResponse(boolean acknowledged, List<IndexResult> indicesResults) { | ||
super(acknowledged); | ||
this.failedIndices = (indicesResults == null) | ||
? Collections.emptyList() | ||
: indicesResults.stream().sorted(Comparator.comparing(IndexResult::getIndex)).collect(Collectors.toList()); | ||
} | ||
|
||
public HotToWarmTieringResponse(StreamInput in) throws IOException { | ||
super(in); | ||
failedIndices = Collections.unmodifiableList(in.readList(IndexResult::new)); | ||
} | ||
|
||
public List<IndexResult> getFailedIndices() { | ||
return this.failedIndices; | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
super.writeTo(out); | ||
out.writeList(this.failedIndices); | ||
} | ||
|
||
@Override | ||
protected void addCustomFields(XContentBuilder builder, Params params) throws IOException { | ||
super.addCustomFields(builder, params); | ||
builder.startArray("failed_indices"); | ||
|
||
for (IndexResult failedIndex : failedIndices) { | ||
failedIndex.toXContent(builder, params); | ||
} | ||
builder.endArray(); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return Strings.toString(MediaTypeRegistry.JSON, this); | ||
} | ||
|
||
/** | ||
* Inner class to represent the result of a failed index for tiering. | ||
* @opensearch.experimental | ||
*/ | ||
@ExperimentalApi | ||
public static class IndexResult implements Writeable, ToXContentFragment { | ||
private final String index; | ||
private final String failureReason; | ||
|
||
public IndexResult(String index, String failureReason) { | ||
this.index = index; | ||
this.failureReason = failureReason; | ||
} | ||
|
||
IndexResult(StreamInput in) throws IOException { | ||
this.index = in.readString(); | ||
this.failureReason = in.readString(); | ||
} | ||
|
||
public String getIndex() { | ||
return index; | ||
} | ||
|
||
public String getFailureReason() { | ||
return failureReason; | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
out.writeString(index); | ||
out.writeString(failureReason); | ||
} | ||
|
||
@Override | ||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { | ||
builder.startObject(); | ||
builder.field("index", index); | ||
builder.field("error", failureReason); | ||
return builder.endObject(); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) return true; | ||
if (o == null || getClass() != o.getClass()) return false; | ||
IndexResult that = (IndexResult) o; | ||
return Objects.equals(index, that.index) && Objects.equals(failureReason, that.failureReason); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
int result = Objects.hashCode(index); | ||
result = 31 * result + Objects.hashCode(failureReason); | ||
return result; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return Strings.toString(MediaTypeRegistry.JSON, this); | ||
} | ||
} | ||
} |
61 changes: 61 additions & 0 deletions
61
server/src/main/java/org/opensearch/action/admin/indices/tiering/RestWarmTieringAction.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.action.admin.indices.tiering; | ||
|
||
import org.opensearch.action.support.IndicesOptions; | ||
import org.opensearch.client.node.NodeClient; | ||
import org.opensearch.common.annotation.ExperimentalApi; | ||
import org.opensearch.rest.BaseRestHandler; | ||
import org.opensearch.rest.RestHandler; | ||
import org.opensearch.rest.RestRequest; | ||
import org.opensearch.rest.action.RestToXContentListener; | ||
|
||
import java.util.List; | ||
|
||
import static java.util.Collections.singletonList; | ||
import static org.opensearch.core.common.Strings.splitStringByCommaToArray; | ||
import static org.opensearch.rest.RestRequest.Method.POST; | ||
|
||
/** | ||
* Rest Tiering API action to move indices to warm tier | ||
* | ||
* @opensearch.experimental | ||
*/ | ||
@ExperimentalApi | ||
public class RestWarmTieringAction extends BaseRestHandler { | ||
|
||
private static final String TARGET_TIER = "warm"; | ||
|
||
@Override | ||
public List<RestHandler.Route> routes() { | ||
return singletonList(new RestHandler.Route(POST, "/{index}/_tier/" + TARGET_TIER)); | ||
} | ||
|
||
@Override | ||
public String getName() { | ||
return "warm_tiering_action"; | ||
} | ||
|
||
@Override | ||
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { | ||
final TieringIndexRequest tieringIndexRequest = new TieringIndexRequest( | ||
TARGET_TIER, | ||
splitStringByCommaToArray(request.param("index")) | ||
); | ||
tieringIndexRequest.timeout(request.paramAsTime("timeout", tieringIndexRequest.timeout())); | ||
tieringIndexRequest.clusterManagerNodeTimeout( | ||
request.paramAsTime("cluster_manager_timeout", tieringIndexRequest.clusterManagerNodeTimeout()) | ||
); | ||
tieringIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, tieringIndexRequest.indicesOptions())); | ||
tieringIndexRequest.waitForCompletion(request.paramAsBoolean("wait_for_completion", tieringIndexRequest.waitForCompletion())); | ||
return channel -> client.admin() | ||
.cluster() | ||
.execute(HotToWarmTieringAction.INSTANCE, tieringIndexRequest, new RestToXContentListener<>(channel)); | ||
} | ||
} |
Oops, something went wrong.