Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unassigned ml system shard replicas #1315

Merged
merged 10 commits into from
Sep 9, 2023
10 changes: 5 additions & 5 deletions common/src/main/java/org/opensearch/ml/common/CommonValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ public class CommonValue {
public static final String ML_MODEL_GROUP_INDEX = ".plugins-ml-model-group";
public static final String ML_MODEL_INDEX = ".plugins-ml-model";
public static final String ML_TASK_INDEX = ".plugins-ml-task";
public static final Integer ML_MODEL_GROUP_INDEX_SCHEMA_VERSION = 1;
public static final Integer ML_MODEL_INDEX_SCHEMA_VERSION = 6;
public static final Integer ML_MODEL_GROUP_INDEX_SCHEMA_VERSION = 2;
public static final Integer ML_MODEL_INDEX_SCHEMA_VERSION = 7;
public static final String ML_CONNECTOR_INDEX = ".plugins-ml-connector";
public static final Integer ML_TASK_INDEX_SCHEMA_VERSION = 1;
public static final Integer ML_CONNECTOR_SCHEMA_VERSION = 1;
public static final Integer ML_TASK_INDEX_SCHEMA_VERSION = 2;
public static final Integer ML_CONNECTOR_SCHEMA_VERSION = 2;
public static final String ML_CONFIG_INDEX = ".plugins-ml-config";
public static final Integer ML_CONFIG_INDEX_SCHEMA_VERSION = 1;
public static final Integer ML_CONFIG_INDEX_SCHEMA_VERSION = 2;
public static final String USER_FIELD_MAPPING = " \""
+ CommonValue.USER
+ "\": {\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
package org.opensearch.ml.indices;

import static org.opensearch.ml.common.CommonValue.META;
import static org.opensearch.ml.common.CommonValue.ML_MODEL_INDEX;
import static org.opensearch.ml.common.CommonValue.ML_TASK_INDEX;
import static org.opensearch.ml.common.CommonValue.SCHEMA_VERSION_FIELD;

import java.util.HashMap;
Expand All @@ -17,6 +15,7 @@
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -41,8 +40,9 @@ public class MLIndicesHandler {

private static final Map<String, AtomicBoolean> indexMappingUpdated = new HashMap<>();
static {
indexMappingUpdated.put(ML_MODEL_INDEX, new AtomicBoolean(false));
indexMappingUpdated.put(ML_TASK_INDEX, new AtomicBoolean(false));
for (MLIndex mlIndex : MLIndex.values()) {
b4sjoo marked this conversation as resolved.
Show resolved Hide resolved
indexMappingUpdated.put(mlIndex.getIndexName(), new AtomicBoolean(false));
}
}

public void initModelGroupIndexIfAbsent(ActionListener<Boolean> listener) {
Expand All @@ -68,6 +68,7 @@ public void initMLConfigIndex(ActionListener<Boolean> listener) {
public void initMLIndexIfAbsent(MLIndex index, ActionListener<Boolean> listener) {
String indexName = index.getIndexName();
String mapping = index.getMapping();
final Map<String, Object> indexSettings = Map.of("index.auto_expand_replicas", "0-6");
b4sjoo marked this conversation as resolved.
Show resolved Hide resolved

try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) {
ActionListener<Boolean> internalListener = ActionListener.runBefore(listener, () -> threadContext.restore());
Expand All @@ -83,7 +84,7 @@ public void initMLIndexIfAbsent(MLIndex index, ActionListener<Boolean> listener)
log.error("Failed to create index " + indexName, e);
internalListener.onFailure(e);
});
CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(mapping);
CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(mapping).settings(indexSettings);
client.admin().indices().create(request, actionListener);
} else {
log.debug("index:{} is already created", indexName);
Expand All @@ -97,12 +98,19 @@ public void initMLIndexIfAbsent(MLIndex index, ActionListener<Boolean> listener)
.putMapping(
new PutMappingRequest().indices(indexName).source(mapping, XContentType.JSON),
ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
indexMappingUpdated.get(indexName).set(true);
internalListener.onResponse(true);
} else {
internalListener.onFailure(new MLException("Failed to update index: " + indexName));
}
UpdateSettingsRequest updateSettingRequest = new UpdateSettingsRequest();
b4sjoo marked this conversation as resolved.
Show resolved Hide resolved
updateSettingRequest.indices(indexName).settings(indexSettings);
client
.admin()
.indices()
.updateSettings(updateSettingRequest, ActionListener.wrap(updateResponse -> {
if (response.isAcknowledged()) {
indexMappingUpdated.get(indexName).set(true);
internalListener.onResponse(true);
} else {
internalListener.onFailure(new MLException("Failed to update index: " + indexName));
b4sjoo marked this conversation as resolved.
Show resolved Hide resolved
}
}, internalListener::onFailure));
b4sjoo marked this conversation as resolved.
Show resolved Hide resolved
}, exception -> {
log.error("Failed to update index " + indexName, exception);
internalListener.onFailure(exception);
Expand Down