Skip to content

Commit

Permalink
Fix unassigned ml system shard replicas (#1315) (#1324)
Browse files Browse the repository at this point in the history
* Fix unassigned ml system shard replicas



* Adjust auto replica settings to keep it consistent with AOS default setting



* Update plugin/src/main/java/org/opensearch/ml/indices/MLIndicesHandler.java




* Modify exception handling



* Modify exception messages



* Add response check



* Add response check and exception handling



* Keep error message consistent



* Keep error message consistent



* Keep error message consistent



---------

Signed-off-by: Sicheng Song <[email protected]>
Co-authored-by: Yaliang Wu <[email protected]>
  • Loading branch information
b4sjoo and ylwu-amzn authored Sep 12, 2023
1 parent a436169 commit daaf476
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 13 deletions.
10 changes: 5 additions & 5 deletions common/src/main/java/org/opensearch/ml/common/CommonValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ public class CommonValue {
public static final String ML_MODEL_GROUP_INDEX = ".plugins-ml-model-group";
public static final String ML_MODEL_INDEX = ".plugins-ml-model";
public static final String ML_TASK_INDEX = ".plugins-ml-task";
public static final Integer ML_MODEL_GROUP_INDEX_SCHEMA_VERSION = 1;
public static final Integer ML_MODEL_INDEX_SCHEMA_VERSION = 6;
public static final Integer ML_MODEL_GROUP_INDEX_SCHEMA_VERSION = 2;
public static final Integer ML_MODEL_INDEX_SCHEMA_VERSION = 7;
public static final String ML_CONNECTOR_INDEX = ".plugins-ml-connector";
public static final Integer ML_TASK_INDEX_SCHEMA_VERSION = 1;
public static final Integer ML_CONNECTOR_SCHEMA_VERSION = 1;
public static final Integer ML_TASK_INDEX_SCHEMA_VERSION = 2;
public static final Integer ML_CONNECTOR_SCHEMA_VERSION = 2;
public static final String ML_CONFIG_INDEX = ".plugins-ml-config";
public static final Integer ML_CONFIG_INDEX_SCHEMA_VERSION = 1;
public static final Integer ML_CONFIG_INDEX_SCHEMA_VERSION = 2;
public static final String USER_FIELD_MAPPING = " \""
+ CommonValue.USER
+ "\": {\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
package org.opensearch.ml.indices;

import static org.opensearch.ml.common.CommonValue.META;
import static org.opensearch.ml.common.CommonValue.ML_MODEL_INDEX;
import static org.opensearch.ml.common.CommonValue.ML_TASK_INDEX;
import static org.opensearch.ml.common.CommonValue.SCHEMA_VERSION_FIELD;

import java.util.HashMap;
Expand All @@ -17,6 +15,7 @@
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -38,11 +37,13 @@ public class MLIndicesHandler {

ClusterService clusterService;
Client client;

private static final Map<String, Object> indexSettings = Map.of("index.auto_expand_replicas", "0-5");
private static final Map<String, AtomicBoolean> indexMappingUpdated = new HashMap<>();

static {
indexMappingUpdated.put(ML_MODEL_INDEX, new AtomicBoolean(false));
indexMappingUpdated.put(ML_TASK_INDEX, new AtomicBoolean(false));
for (MLIndex mlIndex : MLIndex.values()) {
indexMappingUpdated.put(mlIndex.getIndexName(), new AtomicBoolean(false));
}
}

public void initModelGroupIndexIfAbsent(ActionListener<Boolean> listener) {
Expand Down Expand Up @@ -83,7 +84,7 @@ public void initMLIndexIfAbsent(MLIndex index, ActionListener<Boolean> listener)
log.error("Failed to create index " + indexName, e);
internalListener.onFailure(e);
});
CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(mapping);
CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(mapping).settings(indexSettings);
client.admin().indices().create(request, actionListener);
} else {
log.debug("index:{} is already created", indexName);
Expand All @@ -98,8 +99,23 @@ public void initMLIndexIfAbsent(MLIndex index, ActionListener<Boolean> listener)
new PutMappingRequest().indices(indexName).source(mapping, XContentType.JSON),
ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
indexMappingUpdated.get(indexName).set(true);
internalListener.onResponse(true);
UpdateSettingsRequest updateSettingRequest = new UpdateSettingsRequest();
updateSettingRequest.indices(indexName).settings(indexSettings);
client
.admin()
.indices()
.updateSettings(updateSettingRequest, ActionListener.wrap(updateResponse -> {
if (response.isAcknowledged()) {
indexMappingUpdated.get(indexName).set(true);
internalListener.onResponse(true);
} else {
internalListener
.onFailure(new MLException("Failed to update index setting for: " + indexName));
}
}, exception -> {
log.error("Failed to update index setting for: " + indexName, exception);
internalListener.onFailure(exception);
}));
} else {
internalListener.onFailure(new MLException("Failed to update index: " + indexName));
}
Expand Down

0 comments on commit daaf476

Please sign in to comment.