-
Notifications
You must be signed in to change notification settings - Fork 138
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Enable in-place update model (#1673)
* Enable in-place update model Signed-off-by: Sicheng Song <[email protected]> * Refactor inplace update api as well as adding more doc/comments/annotations for clarification Signed-off-by: Sicheng Song <[email protected]> * Address review concern Signed-off-by: Sicheng Song <[email protected]> * Refactor updatemodelcache pacakge Signed-off-by: Sicheng Song <[email protected]> * Fix UT Signed-off-by: Sicheng Song <[email protected]> --------- Signed-off-by: Sicheng Song <[email protected]>
- Loading branch information
Showing
26 changed files
with
2,171 additions
and
508 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
15 changes: 15 additions & 0 deletions
15
...c/main/java/org/opensearch/ml/common/transport/update_cache/MLUpdateModelCacheAction.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.ml.common.transport.update_cache; | ||
|
||
import org.opensearch.action.ActionType; | ||
|
||
public class MLUpdateModelCacheAction extends ActionType<MLUpdateModelCacheNodesResponse> { | ||
public static final MLUpdateModelCacheAction INSTANCE = new MLUpdateModelCacheAction(); | ||
public static final String NAME = "cluster:admin/opensearch/ml/models/update_model_cache"; | ||
|
||
private MLUpdateModelCacheAction() { super(NAME, MLUpdateModelCacheNodesResponse::new);} | ||
} |
32 changes: 32 additions & 0 deletions
32
...n/java/org/opensearch/ml/common/transport/update_cache/MLUpdateModelCacheNodeRequest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.ml.common.transport.update_cache; | ||
|
||
import org.opensearch.transport.TransportRequest; | ||
import java.io.IOException; | ||
import lombok.Getter; | ||
import org.opensearch.core.common.io.stream.StreamInput; | ||
import org.opensearch.core.common.io.stream.StreamOutput; | ||
|
||
public class MLUpdateModelCacheNodeRequest extends TransportRequest { | ||
@Getter | ||
private MLUpdateModelCacheNodesRequest updateModelCacheNodesRequest; | ||
|
||
public MLUpdateModelCacheNodeRequest(StreamInput in) throws IOException { | ||
super(in); | ||
this.updateModelCacheNodesRequest = new MLUpdateModelCacheNodesRequest(in); | ||
} | ||
|
||
public MLUpdateModelCacheNodeRequest(MLUpdateModelCacheNodesRequest request) { | ||
this.updateModelCacheNodesRequest = request; | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
super.writeTo(out); | ||
updateModelCacheNodesRequest.writeTo(out); | ||
} | ||
} |
68 changes: 68 additions & 0 deletions
68
.../java/org/opensearch/ml/common/transport/update_cache/MLUpdateModelCacheNodeResponse.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.ml.common.transport.update_cache; | ||
|
||
import lombok.Getter; | ||
import lombok.extern.log4j.Log4j2; | ||
import org.opensearch.action.support.nodes.BaseNodeResponse; | ||
import org.opensearch.cluster.node.DiscoveryNode; | ||
import org.opensearch.core.common.io.stream.StreamInput; | ||
import org.opensearch.core.common.io.stream.StreamOutput; | ||
import org.opensearch.core.xcontent.ToXContentFragment; | ||
import org.opensearch.core.xcontent.XContentBuilder; | ||
|
||
import java.io.IOException; | ||
import java.util.Map; | ||
|
||
@Getter | ||
@Log4j2 | ||
public class MLUpdateModelCacheNodeResponse extends BaseNodeResponse implements ToXContentFragment { | ||
private Map<String, String> modelUpdateStatus; | ||
|
||
public MLUpdateModelCacheNodeResponse(DiscoveryNode node, Map<String, String> modelUpdateStatus) { | ||
super(node); | ||
this.modelUpdateStatus = modelUpdateStatus; | ||
} | ||
|
||
public MLUpdateModelCacheNodeResponse(StreamInput in) throws IOException { | ||
super(in); | ||
if (in.readBoolean()) { | ||
this.modelUpdateStatus = in.readMap(StreamInput::readString, StreamInput::readString); | ||
} | ||
} | ||
|
||
public static MLUpdateModelCacheNodeResponse readStats(StreamInput in) throws IOException { | ||
return new MLUpdateModelCacheNodeResponse(in); | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
super.writeTo(out); | ||
|
||
if (!isModelUpdateStatusEmpty()) { | ||
out.writeBoolean(true); | ||
out.writeMap(modelUpdateStatus, StreamOutput::writeString, StreamOutput::writeString); | ||
} else { | ||
out.writeBoolean(false); | ||
} | ||
} | ||
|
||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { | ||
// Similar to deploy on node or undeploy on node response, this stat map is used to track update status on each node. | ||
builder.startObject("stats"); | ||
if (!isModelUpdateStatusEmpty()) { | ||
for (Map.Entry<String, String> stat : modelUpdateStatus.entrySet()) { | ||
builder.field(stat.getKey(), stat.getValue()); | ||
} | ||
} | ||
builder.endObject(); | ||
return builder; | ||
} | ||
|
||
public boolean isModelUpdateStatusEmpty() { | ||
return modelUpdateStatus == null || modelUpdateStatus.size() == 0; | ||
} | ||
} |
46 changes: 46 additions & 0 deletions
46
.../java/org/opensearch/ml/common/transport/update_cache/MLUpdateModelCacheNodesRequest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.ml.common.transport.update_cache; | ||
|
||
import lombok.Getter; | ||
import org.opensearch.action.support.nodes.BaseNodesRequest; | ||
import org.opensearch.cluster.node.DiscoveryNode; | ||
import org.opensearch.core.common.io.stream.StreamInput; | ||
import org.opensearch.core.common.io.stream.StreamOutput; | ||
import java.io.IOException; | ||
|
||
public class MLUpdateModelCacheNodesRequest extends BaseNodesRequest<MLUpdateModelCacheNodesRequest> { | ||
|
||
@Getter | ||
private String modelId; | ||
@Getter | ||
private boolean predictorUpdate; | ||
|
||
public MLUpdateModelCacheNodesRequest(StreamInput in) throws IOException { | ||
super(in); | ||
this.modelId = in.readString(); | ||
this.predictorUpdate = in.readBoolean(); | ||
} | ||
|
||
public MLUpdateModelCacheNodesRequest(String[] nodeIds, String modelId, boolean predictorUpdate) { | ||
super(nodeIds); | ||
this.modelId = modelId; | ||
this.predictorUpdate = predictorUpdate; | ||
} | ||
|
||
public MLUpdateModelCacheNodesRequest(DiscoveryNode[] nodeIds, String modelId, boolean predictorUpdate) { | ||
super(nodeIds); | ||
this.modelId = modelId; | ||
this.predictorUpdate = predictorUpdate; | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
super.writeTo(out); | ||
out.writeString(modelId); | ||
out.writeBoolean(predictorUpdate); | ||
} | ||
} |
Oops, something went wrong.