Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Update ML results mappings on process start #37758

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,24 @@
*/
package org.elasticsearch.xpack.core.ml.job.persistence;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DelayedDataCheckConfig;
Expand Down Expand Up @@ -38,10 +54,16 @@
import org.elasticsearch.xpack.core.ml.notifications.AuditMessage;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

/**
* Static methods to create Elasticsearch index mappings for the autodetect
Expand Down Expand Up @@ -107,6 +129,8 @@ public class ElasticsearchMappings {

static final String RAW = "raw";

private static final Logger logger = LogManager.getLogger(ElasticsearchMappings.class);

private ElasticsearchMappings() {
}

Expand Down Expand Up @@ -968,4 +992,94 @@ public static XContentBuilder auditMessageMapping() throws IOException {
.endObject()
.endObject();
}

static String[] mappingRequiresUpdate(ClusterState state, String[] concreteIndices, Version minVersion) throws IOException {
List<String> indicesToUpdate = new ArrayList<>();

ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> currentMapping = state.metaData().findMappings(concreteIndices,
new String[] {DOC_TYPE}, MapperPlugin.NOOP_FIELD_FILTER);

for (String index : concreteIndices) {
ImmutableOpenMap<String, MappingMetaData> innerMap = currentMapping.get(index);
if (innerMap != null) {
MappingMetaData metaData = innerMap.get(DOC_TYPE);
try {
@SuppressWarnings("unchecked")
Map<String, Object> meta = (Map<String, Object>) metaData.sourceAsMap().get("_meta");
if (meta != null) {
String versionString = (String) meta.get("version");
if (versionString == null) {
logger.info("Version of mappings for [{}] not found, recreating", index);
indicesToUpdate.add(index);
continue;
}

Version mappingVersion = Version.fromString(versionString);

if (mappingVersion.onOrAfter(minVersion)) {
continue;
} else {
logger.info("Mappings for [{}] are outdated [{}], updating it[{}].", index, mappingVersion, Version.CURRENT);
indicesToUpdate.add(index);
continue;
}
} else {
logger.info("Version of mappings for [{}] not found, recreating", index);
indicesToUpdate.add(index);
continue;
}
} catch (Exception e) {
logger.error(new ParameterizedMessage("Failed to retrieve mapping version for [{}], recreating", index), e);
indicesToUpdate.add(index);
continue;
}
} else {
logger.info("No mappings found for [{}], recreating", index);
indicesToUpdate.add(index);
}
}
return indicesToUpdate.toArray(new String[indicesToUpdate.size()]);
}

public static void addDocMappingIfMissing(String alias, CheckedSupplier<XContentBuilder, IOException> mappingSupplier,
Client client, ClusterState state, ActionListener<Boolean> listener) {
AliasOrIndex aliasOrIndex = state.metaData().getAliasAndIndexLookup().get(alias);
if (aliasOrIndex == null) {
// The index has never been created yet
listener.onResponse(true);
return;
}
String[] concreteIndices = aliasOrIndex.getIndices().stream().map(IndexMetaData::getIndex).map(Index::getName)
.toArray(String[]::new);

String[] indicesThatRequireAnUpdate;
try {
indicesThatRequireAnUpdate = mappingRequiresUpdate(state, concreteIndices, Version.CURRENT);
} catch (IOException e) {
listener.onFailure(e);
return;
}

if (indicesThatRequireAnUpdate.length > 0) {
try (XContentBuilder mapping = mappingSupplier.get()) {
PutMappingRequest putMappingRequest = new PutMappingRequest(indicesThatRequireAnUpdate);
putMappingRequest.type(DOC_TYPE);
putMappingRequest.source(mapping);
executeAsyncWithOrigin(client, ML_ORIGIN, PutMappingAction.INSTANCE, putMappingRequest,
ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
listener.onResponse(true);
} else {
listener.onFailure(new ElasticsearchException("Attempt to put missing mapping in indices "
+ Arrays.toString(indicesThatRequireAnUpdate) + " was not acknowledged"));
}
}, listener::onFailure));
} catch (IOException e) {
listener.onFailure(e);
}
} else {
logger.trace("Mappings are up to date.");
listener.onResponse(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,18 @@
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
Expand All @@ -30,6 +38,8 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -128,6 +138,110 @@ public void testTermFieldMapping() throws IOException {
assertNull(instanceMapping);
}


public void testMappingRequiresUpdateNoMapping() throws IOException {
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
ClusterState cs = csBuilder.build();
String[] indices = new String[] { "no_index" };

assertArrayEquals(new String[] { "no_index" }, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT));
}

public void testMappingRequiresUpdateNullMapping() throws IOException {
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("null_mapping", null));
String[] indices = new String[] { "null_index" };
assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT));
}

public void testMappingRequiresUpdateNoVersion() throws IOException {
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("no_version_field", "NO_VERSION_FIELD"));
String[] indices = new String[] { "no_version_field" };
assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT));
}

public void testMappingRequiresUpdateRecentMappingVersion() throws IOException {
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_current", Version.CURRENT.toString()));
String[] indices = new String[] { "version_current" };
assertArrayEquals(new String[] {}, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT));
}

public void testMappingRequiresUpdateMaliciousMappingVersion() throws IOException {
ClusterState cs = getClusterStateWithMappingsWithMetaData(
Collections.singletonMap("version_current", Collections.singletonMap("nested", "1.0")));
String[] indices = new String[] { "version_nested" };
assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT));
}

public void testMappingRequiresUpdateBogusMappingVersion() throws IOException {
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_bogus", "0.0"));
String[] indices = new String[] { "version_bogus" };
assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT));
}

public void testMappingRequiresUpdateNewerMappingVersion() throws IOException {
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer", Version.CURRENT));
String[] indices = new String[] { "version_newer" };
assertArrayEquals(new String[] {}, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousVersion()));
}

public void testMappingRequiresUpdateNewerMappingVersionMinor() throws IOException {
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer_minor", Version.CURRENT));
String[] indices = new String[] { "version_newer_minor" };
assertArrayEquals(new String[] {},
ElasticsearchMappings.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousMinorVersion()));
}

public void testMappingRequiresUpdateSomeVersionMix() throws IOException {
Map<String, Object> versionMix = new HashMap<>();
versionMix.put("version_54", Version.V_5_4_0);
versionMix.put("version_current", Version.CURRENT);
versionMix.put("version_null", null);
versionMix.put("version_current2", Version.CURRENT);
versionMix.put("version_bogus", "0.0.0");
versionMix.put("version_current3", Version.CURRENT);
versionMix.put("version_bogus2", "0.0.0");

ClusterState cs = getClusterStateWithMappingsWithMetaData(versionMix);
String[] indices = new String[] { "version_54", "version_null", "version_bogus", "version_bogus2" };
assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT));
}

private ClusterState getClusterStateWithMappingsWithMetaData(Map<String, Object> namesAndVersions) throws IOException {
MetaData.Builder metaDataBuilder = MetaData.builder();

for (Map.Entry<String, Object> entry : namesAndVersions.entrySet()) {

String indexName = entry.getKey();
Object version = entry.getValue();

IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName);
indexMetaData.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));

Map<String, Object> mapping = new HashMap<>();
Map<String, Object> properties = new HashMap<>();
for (int i = 0; i < 10; i++) {
properties.put("field" + i, Collections.singletonMap("type", "string"));
}
mapping.put("properties", properties);

Map<String, Object> meta = new HashMap<>();
if (version != null && version.equals("NO_VERSION_FIELD") == false) {
meta.put("version", version);
}
mapping.put("_meta", meta);

indexMetaData.putMapping(new MappingMetaData(ElasticsearchMappings.DOC_TYPE, mapping));

metaDataBuilder.put(indexMetaData);
}
MetaData metaData = metaDataBuilder.build();

ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
csBuilder.metaData(metaData);
return csBuilder.build();
}

private Set<String> collectResultsDocFieldNames() throws IOException {
// Only the mappings for the results index should be added below. Do NOT add mappings for other indexes here.
return collectFieldNames(ElasticsearchMappings.resultsMapping());
Expand Down
Loading