Skip to content

Commit

Permalink
[ML] Make ML indices hidden when the node becomes master (elastic#77416)
Browse files Browse the repository at this point in the history
  • Loading branch information
przemekwitek authored Oct 11, 2021
1 parent 68ee6d4 commit 0a6b6d9
Show file tree
Hide file tree
Showing 5 changed files with 474 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.ml.integration;

import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlDailyMaintenanceService;
import org.elasticsearch.xpack.ml.MlInitializationService;
import org.junit.Before;

import java.util.Arrays;
import java.util.Collections;
import java.util.stream.Stream;

import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_HIDDEN;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class MlInitializationServiceIT extends MlNativeAutodetectIntegTestCase {

private ThreadPool threadPool;
private MlInitializationService mlInitializationService;

@Before
public void setUpMocks() {
threadPool = mock(ThreadPool.class);
when(threadPool.executor(ThreadPool.Names.SAME)).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
when(threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
MlDailyMaintenanceService mlDailyMaintenanceService = mock(MlDailyMaintenanceService.class);
ClusterService clusterService = mock(ClusterService.class);
mlInitializationService = new MlInitializationService(client(), threadPool, mlDailyMaintenanceService, clusterService);
}

public void testThatMlIndicesBecomeHiddenWhenTheNodeBecomesMaster() throws Exception {
String[] mlHiddenIndexNames = {
".ml-anomalies-7",
".ml-state-000001",
".ml-stats-000001",
".ml-notifications-000002",
".ml-annotations-6"
};
String[] otherIndexNames = { "some-index-1", "some-other-index-2" };
String[] allIndexNames = Stream.concat(Arrays.stream(mlHiddenIndexNames), Arrays.stream(otherIndexNames)).toArray(String[]::new);

for (String indexName : mlHiddenIndexNames) {
try {
assertAcked(prepareCreate(indexName).setSettings(Collections.singletonMap(SETTING_INDEX_HIDDEN, randomBoolean())));
} catch (ResourceAlreadyExistsException e) {
logger.info("Index " + indexName + "already exists: {}", e.getDetailedMessage());
}
}
createIndex(otherIndexNames);

GetSettingsResponse settingsResponse =
client().admin().indices().prepareGetSettings(allIndexNames)
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN)
.get();
assertThat(settingsResponse, is(notNullValue()));
for (String indexName : mlHiddenIndexNames) {
Settings settings = settingsResponse.getIndexToSettings().get(indexName);
assertThat(settings, is(notNullValue()));
}
for (String indexName : otherIndexNames) {
Settings settings = settingsResponse.getIndexToSettings().get(indexName);
assertThat(settings, is(notNullValue()));
assertThat(
"Index " + indexName + " expected not to be hidden but was",
settings.getAsBoolean(SETTING_INDEX_HIDDEN, false), is(equalTo(false)));
}

mlInitializationService.onMaster();
assertBusy(() -> assertTrue(mlInitializationService.areMlInternalIndicesHidden()));

settingsResponse =
client().admin().indices().prepareGetSettings(allIndexNames)
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN)
.get();
assertThat(settingsResponse, is(notNullValue()));
for (String indexName : mlHiddenIndexNames) {
Settings settings = settingsResponse.getIndexToSettings().get(indexName);
assertThat(settings, is(notNullValue()));
assertThat(
"Index " + indexName + " expected to be hidden but wasn't, settings = " + settings,
settings.getAsBoolean(SETTING_INDEX_HIDDEN, false), is(equalTo(true)));
}
for (String indexName : otherIndexNames) {
Settings settings = settingsResponse.getIndexToSettings().get(indexName);
assertThat(settings, is(notNullValue()));
assertThat(
"Index " + indexName + " expected not to be hidden but was, settings = " + settings,
settings.getAsBoolean(SETTING_INDEX_HIDDEN, false), is(equalTo(false)));
}
}

@Override
public Settings indexSettings() {
return Settings.builder().put(super.indexSettings())
.put(IndexMetadata.SETTING_DATA_PATH, (String) null)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1419,15 +1419,29 @@ public static SystemIndexDescriptor getInferenceIndexSecurityDescriptor() {
.build();
}

@Override
public Collection<AssociatedIndexDescriptor> getAssociatedIndexDescriptors() {
return List.of(
/**
* These are the ML hidden indices. They are "associated" in the sense that if the ML system indices
* are backed up or deleted then these hidden indices should also be backed up or deleted.
*/
private static Collection<AssociatedIndexDescriptor> ASSOCIATED_INDEX_DESCRIPTORS =
List.of(
new AssociatedIndexDescriptor(RESULTS_INDEX_PREFIX + "*", "Results indices"),
new AssociatedIndexDescriptor(STATE_INDEX_PREFIX + "*", "State indices"),
new AssociatedIndexDescriptor(MlStatsIndex.indexPattern(), "ML stats index"),
new AssociatedIndexDescriptor(".ml-notifications*", "ML notifications indices"),
new AssociatedIndexDescriptor(".ml-annotations*", "Ml annotations indices")
new AssociatedIndexDescriptor(".ml-annotations*", "ML annotations indices")
);

@Override
public Collection<AssociatedIndexDescriptor> getAssociatedIndexDescriptors() {
return ASSOCIATED_INDEX_DESCRIPTORS;
}

public static String[] getMlHiddenIndexPatterns() {
return ASSOCIATED_INDEX_DESCRIPTORS
.stream()
.map(AssociatedIndexDescriptor::getIndexPattern)
.toArray(String[]::new);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,54 @@
*/
package org.elasticsearch.xpack.ml;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_HIDDEN;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

class MlInitializationService implements ClusterStateListener {
public class MlInitializationService implements ClusterStateListener {

private static final Logger logger = LogManager.getLogger(MlInitializationService.class);

private final Client client;
private final ThreadPool threadPool;
private final AtomicBoolean isIndexCreationInProgress = new AtomicBoolean(false);
private final AtomicBoolean mlInternalIndicesHidden = new AtomicBoolean(false);

private final MlDailyMaintenanceService mlDailyMaintenanceService;

Expand All @@ -37,6 +62,7 @@ class MlInitializationService implements ClusterStateListener {
MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client,
MlAssignmentNotifier mlAssignmentNotifier) {
this(client,
threadPool,
new MlDailyMaintenanceService(
settings,
Objects.requireNonNull(clusterService).getClusterName(),
Expand All @@ -49,8 +75,10 @@ class MlInitializationService implements ClusterStateListener {
}

// For testing
MlInitializationService(Client client, MlDailyMaintenanceService dailyMaintenanceService, ClusterService clusterService) {
public MlInitializationService(Client client, ThreadPool threadPool, MlDailyMaintenanceService dailyMaintenanceService,
ClusterService clusterService) {
this.client = Objects.requireNonNull(client);
this.threadPool = threadPool;
this.mlDailyMaintenanceService = dailyMaintenanceService;
clusterService.addListener(this);
clusterService.addLifecycleListener(new LifecycleListener() {
Expand All @@ -71,6 +99,7 @@ public void beforeStop() {

public void onMaster() {
mlDailyMaintenanceService.start();
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(this::makeMlInternalIndicesHidden);
}

public void offMaster() {
Expand Down Expand Up @@ -112,5 +141,106 @@ MlDailyMaintenanceService getDailyMaintenanceService() {
return mlDailyMaintenanceService;
}

/** For testing */
public boolean areMlInternalIndicesHidden() {
return mlInternalIndicesHidden.get();
}

private void makeMlInternalIndicesHidden() {
String[] mlHiddenIndexPatterns = MachineLearning.getMlHiddenIndexPatterns();

// Step 5: Handle errors encountered on the way.
ActionListener<AcknowledgedResponse> finalListener = ActionListener.wrap(
updateAliasesResponse -> {
if (updateAliasesResponse.isAcknowledged() == false) {
logger.error("One or more of the ML internal aliases could not be made hidden.");
return;
}
mlInternalIndicesHidden.set(true);
},
e -> logger.error("An error occurred while making ML internal indices and aliases hidden", e)
);

// Step 4: Extract ML internal aliases that are not hidden and make them hidden.
ActionListener<GetAliasesResponse> getAliasesResponseListener = ActionListener.wrap(
getAliasesResponse -> {
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
for (ObjectObjectCursor<String, List<AliasMetadata>> entry : getAliasesResponse.getAliases()) {
String index = entry.key;
String[] nonHiddenAliases = entry.value.stream()
.filter(metadata -> metadata.isHidden() == null || metadata.isHidden() == false)
.map(AliasMetadata::alias)
.toArray(String[]::new);
if (nonHiddenAliases.length == 0) {
continue;
}
indicesAliasesRequest.addAliasAction(
IndicesAliasesRequest.AliasActions.add()
.index(index)
.aliases(entry.value.stream().map(AliasMetadata::alias).toArray(String[]::new))
.isHidden(true));
}
if (indicesAliasesRequest.getAliasActions().isEmpty()) {
logger.debug("There are no ML internal aliases that need to be made hidden, [{}]", getAliasesResponse.getAliases());
finalListener.onResponse(AcknowledgedResponse.TRUE);
return;
}
String indicesWithNonHiddenAliasesString =
indicesAliasesRequest.getAliasActions().stream()
.map(aliasAction -> aliasAction.indices()[0] + ": " + String.join(",", aliasAction.aliases()))
.collect(Collectors.joining("; "));
logger.debug("The following ML internal aliases will now be made hidden: [{}]", indicesWithNonHiddenAliasesString);
executeAsyncWithOrigin(client, ML_ORIGIN, IndicesAliasesAction.INSTANCE, indicesAliasesRequest, finalListener);
},
finalListener::onFailure
);

// Step 3: Once indices are hidden, fetch ML internal aliases to find out whether the aliases are hidden or not.
ActionListener<AcknowledgedResponse> updateSettingsListener = ActionListener.wrap(
updateSettingsResponse -> {
if (updateSettingsResponse.isAcknowledged() == false) {
logger.error("One or more of the ML internal indices could not be made hidden.");
return;
}
GetAliasesRequest getAliasesRequest = new GetAliasesRequest()
.indices(mlHiddenIndexPatterns)
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN);
executeAsyncWithOrigin(client, ML_ORIGIN, GetAliasesAction.INSTANCE, getAliasesRequest, getAliasesResponseListener);
},
finalListener::onFailure
);

// Step 2: Extract ML internal indices that are not hidden and make them hidden.
ActionListener<GetSettingsResponse> getSettingsListener = ActionListener.wrap(
getSettingsResponse -> {
String[] nonHiddenIndices =
getSettingsResponse.getIndexToSettings().stream()
.filter(e -> e.getValue().getAsBoolean(SETTING_INDEX_HIDDEN, false) == false)
.map(Map.Entry::getKey)
.toArray(String[]::new);
if (nonHiddenIndices.length == 0) {
logger.debug("There are no ML internal indices that need to be made hidden, [{}]", getSettingsResponse);
updateSettingsListener.onResponse(AcknowledgedResponse.TRUE);
return;
}
String nonHiddenIndicesString = Arrays.stream(nonHiddenIndices).collect(Collectors.joining(", "));
logger.debug("The following ML internal indices will now be made hidden: [{}]", nonHiddenIndicesString);
UpdateSettingsRequest updateSettingsRequest =
new UpdateSettingsRequest()
.indices(nonHiddenIndices)
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN)
.settings(Collections.singletonMap(SETTING_INDEX_HIDDEN, true));
executeAsyncWithOrigin(client, ML_ORIGIN, UpdateSettingsAction.INSTANCE, updateSettingsRequest, updateSettingsListener);
},
finalListener::onFailure
);

// Step 1: Fetch ML internal indices settings to find out whether they are already hidden or not.
GetSettingsRequest getSettingsRequest =
new GetSettingsRequest()
.indices(mlHiddenIndexPatterns)
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN);
client.admin().indices().getSettings(getSettingsRequest, getSettingsListener);
}
}

Loading

0 comments on commit 0a6b6d9

Please sign in to comment.