Skip to content

Commit

Permalink
[ML] Jindex: Rolling upgrade tests (#35700)
Browse files Browse the repository at this point in the history
Adds rolling upgrade tests for jobs and datafeeds defined in both 
the clusterstate and as index documents
  • Loading branch information
davidkyle authored Nov 23, 2018
1 parent f003557 commit 4fd00d6
Show file tree
Hide file tree
Showing 28 changed files with 681 additions and 302 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,13 @@ public long estimateMemoryFootprint() {
if (establishedModelMemory != null && establishedModelMemory > 0) {
return establishedModelMemory + PROCESS_MEMORY_OVERHEAD.getBytes();
}
return ByteSizeUnit.MB.toBytes(analysisLimits.getModelMemoryLimit()) + PROCESS_MEMORY_OVERHEAD.getBytes();
// Pre v6.1 jobs may have a null analysis limits object or
// a null model memory limit
long modelMemoryLimit = AnalysisLimits.PRE_6_1_DEFAULT_MODEL_MEMORY_LIMIT_MB;
if (analysisLimits != null && analysisLimits.getModelMemoryLimit() != null) {
modelMemoryLimit = analysisLimits.getModelMemoryLimit();
}
return ByteSizeUnit.MB.toBytes(modelMemoryLimit) + PROCESS_MEMORY_OVERHEAD.getBytes();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,44 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

public final class XPackRestTestHelper {

public static final List<String> ML_PRE_V660_TEMPLATES = Collections.unmodifiableList(
Arrays.asList(AuditorField.NOTIFICATIONS_INDEX,
MlMetaIndex.INDEX_NAME,
AnomalyDetectorsIndex.jobStateIndexName(),
AnomalyDetectorsIndex.jobResultsIndexPrefix()));

public static final List<String> ML_POST_V660_TEMPLATES = Collections.unmodifiableList(
Arrays.asList(AuditorField.NOTIFICATIONS_INDEX,
MlMetaIndex.INDEX_NAME,
AnomalyDetectorsIndex.jobStateIndexName(),
AnomalyDetectorsIndex.jobResultsIndexPrefix(),
AnomalyDetectorsIndex.configIndexName()));

private XPackRestTestHelper() {
}

/**
* Waits for the Machine Learning templates to be created
* and check the version is up to date
*/
public static void waitForMlTemplates(RestClient client) throws InterruptedException {


/**
* For each template name wait for the template to be created and
* for the template version to be equal to the master node version.
*
* @param client The rest client
* @param templateNames Names of the templates to wait for
* @throws InterruptedException If the wait is interrupted
*/
public static void waitForTemplates(RestClient client, List<String> templateNames) throws InterruptedException {
AtomicReference<Version> masterNodeVersion = new AtomicReference<>();
ESTestCase.awaitBusy(() -> {
String response;
Expand All @@ -53,8 +77,6 @@ public static void waitForMlTemplates(RestClient client) throws InterruptedExcep
return false;
});

final List<String> templateNames = Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME,
AnomalyDetectorsIndex.jobStateIndexName(), AnomalyDetectorsIndex.jobResultsIndexPrefix());
for (String template : templateNames) {
ESTestCase.awaitBusy(() -> {
Map<?, ?> response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,8 @@ public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDat
public static boolean allTemplatesInstalled(ClusterState clusterState) {
boolean allPresent = true;
List<String> templateNames = Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME,
AnomalyDetectorsIndex.jobStateIndexName(), AnomalyDetectorsIndex.jobResultsIndexPrefix());
AnomalyDetectorsIndex.jobStateIndexName(), AnomalyDetectorsIndex.jobResultsIndexPrefix(),
AnomalyDetectorsIndex.configIndexName());
for (String templateName : templateNames) {
allPresent = allPresent && TemplateUtils.checkTemplateExistsAndVersionIsGTECurrentVersion(templateName, clusterState);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand All @@ -29,6 +30,7 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
Expand All @@ -48,10 +50,14 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;

public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJobAction.JobTask, CloseJobAction.Request,
CloseJobAction.Response, CloseJobAction.Response> {

private final ClusterService clusterService;
private final Client client;
private final Auditor auditor;
private final PersistentTasksService persistentTasksService;
private final DatafeedConfigProvider datafeedConfigProvider;
Expand All @@ -61,11 +67,12 @@ public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJ
public TransportCloseJobAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, Auditor auditor, PersistentTasksService persistentTasksService,
DatafeedConfigProvider datafeedConfigProvider, JobManager jobManager) {
DatafeedConfigProvider datafeedConfigProvider, JobManager jobManager, Client client) {
// We fork in innerTaskOperation(...), so we can use ThreadPool.Names.SAME here:
super(settings, CloseJobAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, CloseJobAction.Request::new, CloseJobAction.Response::new, ThreadPool.Names.SAME);
this.clusterService = clusterService;
this.client = client;
this.auditor = auditor;
this.persistentTasksService = persistentTasksService;
this.datafeedConfigProvider = datafeedConfigProvider;
Expand Down Expand Up @@ -419,7 +426,10 @@ void waitForJobClosed(CloseJobAction.Request request, WaitForCloseRequest waitFo
}, request.getCloseTimeout(), new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean result) {
listener.onResponse(response);
FinalizeJobExecutionAction.Request finalizeRequest = new FinalizeJobExecutionAction.Request(
waitForCloseRequest.jobsToFinalize.toArray(new String[0]));
executeAsyncWithOrigin(client, ML_ORIGIN, FinalizeJobExecutionAction.INSTANCE, finalizeRequest,
ActionListener.wrap(r -> listener.onResponse(response), listener::onFailure));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.action.update.UpdateAction;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand All @@ -24,23 +28,36 @@
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.utils.ChainTaskExecutor;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

// This action is only called from modes before version 6.6.0
public class TransportFinalizeJobExecutionAction extends TransportMasterNodeAction<FinalizeJobExecutionAction.Request,
AcknowledgedResponse> {
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

public class TransportFinalizeJobExecutionAction extends
TransportMasterNodeAction<FinalizeJobExecutionAction.Request, AcknowledgedResponse> {

private final Client client;
@Inject
public TransportFinalizeJobExecutionAction(Settings settings, TransportService transportService,
ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
IndexNameExpressionResolver indexNameExpressionResolver,
Client client) {
super(settings, FinalizeJobExecutionAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, FinalizeJobExecutionAction.Request::new);
this.client = client;
}

@Override
Expand All @@ -58,20 +75,60 @@ protected void masterOperation(FinalizeJobExecutionAction.Request request, Clust
ActionListener<AcknowledgedResponse> listener) {

MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
List<String> jobsInClusterState = Arrays.stream(request.getJobIds())
Set<String> jobsInClusterState = Arrays.stream(request.getJobIds())
.filter(id -> mlMetadata.getJobs().containsKey(id))
.collect(Collectors.toList());

// This action should not be called for jobs that have
// their configuration in index documents
.collect(Collectors.toSet());

if (jobsInClusterState.isEmpty()) {
// This action is a no-op for jobs not defined in the cluster state.
listener.onResponse(new AcknowledgedResponse(true));
return;
finalizeIndexJobs(Arrays.asList(request.getJobIds()), listener);
} else {
ActionListener<AcknowledgedResponse> finalizeClusterStateJobsListener = ActionListener.wrap(
ack -> finalizeClusterStateJobs(jobsInClusterState, listener),
listener::onFailure
);

Set<String> jobsInIndex = new HashSet<>(Arrays.asList(request.getJobIds()));
jobsInIndex.removeAll(jobsInClusterState);

finalizeIndexJobs(jobsInIndex, finalizeClusterStateJobsListener);
}
}

private void finalizeIndexJobs(Collection<String> jobIds, ActionListener<AcknowledgedResponse> listener) {
String jobIdString = String.join(",", jobIds);
logger.debug("finalizing jobs [{}]", jobIdString);

ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(threadPool.executor(
MachineLearning.UTILITY_THREAD_POOL_NAME), true);

Map<Object, Object> update = Collections.singletonMap(Job.FINISHED_TIME.getPreferredName(), new Date());

for (String jobId: jobIds) {
UpdateRequest updateRequest = new UpdateRequest(AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId));
updateRequest.retryOnConflict(3);
updateRequest.doc(update);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

chainTaskExecutor.add(chainedListener -> {
executeAsyncWithOrigin(client, ML_ORIGIN, UpdateAction.INSTANCE, updateRequest, ActionListener.wrap(
updateResponse -> chainedListener.onResponse(null),
chainedListener::onFailure
));
});
}

String jobIdString = String.join(",", jobsInClusterState);
chainTaskExecutor.execute(ActionListener.wrap(
aVoid -> {
logger.debug("finalized job [{}]", jobIdString);
listener.onResponse(new AcknowledgedResponse(true));
},
listener::onFailure
));
}

private void finalizeClusterStateJobs(Collection<String> jobIds, ActionListener<AcknowledgedResponse> listener) {
String jobIdString = String.join(",", jobIds);
String source = "finalize_job_execution [" + jobIdString + "]";
logger.debug("finalizing jobs [{}]", jobIdString);
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() {
Expand All @@ -82,7 +139,7 @@ public ClusterState execute(ClusterState currentState) {
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata);
Date finishedTime = new Date();

for (String jobId : jobsInClusterState) {
for (String jobId : jobIds) {
Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId));
jobBuilder.setFinishedTime(finishedTime);
mlMetadataBuilder.putJob(jobBuilder.build(), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -73,7 +74,7 @@ public TransportGetJobsStatsAction(Settings settings, TransportService transport

@Override
protected void doExecute(Task task, GetJobsStatsAction.Request request, ActionListener<GetJobsStatsAction.Response> finalListener) {

logger.debug("Get stats for job [{}]", request.getJobId());
jobManager.expandJobIds(request.getJobId(), request.allowNoJobs(), ActionListener.wrap(
expandedIds -> {
request.setExpandedJobsIds(new ArrayList<>(expandedIds));
Expand All @@ -96,6 +97,7 @@ protected GetJobsStatsAction.Response newResponse(GetJobsStatsAction.Request req
for (QueryPage<GetJobsStatsAction.Response.JobStats> task : tasks) {
stats.addAll(task.results());
}
Collections.sort(stats, Comparator.comparing(GetJobsStatsAction.Response.JobStats::getJobId));
return new GetJobsStatsAction.Response(taskOperationFailures, failedNodeExceptions, new QueryPage<>(stats, stats.size(),
Job.RESULTS_FIELD));
}
Expand All @@ -109,7 +111,6 @@ protected QueryPage<GetJobsStatsAction.Response.JobStats> readTaskResponse(Strea
protected void taskOperation(GetJobsStatsAction.Request request, TransportOpenJobAction.JobTask task,
ActionListener<QueryPage<GetJobsStatsAction.Response.JobStats>> listener) {
String jobId = task.getJobId();
logger.debug("Get stats for job [{}]", jobId);
ClusterState state = clusterService.state();
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
Optional<Tuple<DataCounts, ModelSizeStats>> stats = processManager.getStatistics(task);
Expand Down Expand Up @@ -159,6 +160,7 @@ void gatherStatsForClosedJobs(GetJobsStatsAction.Request request, GetJobsStatsAc
if (counter.decrementAndGet() == 0) {
List<GetJobsStatsAction.Response.JobStats> results = response.getResponse().results();
results.addAll(jobStats.asList());
Collections.sort(results, Comparator.comparing(GetJobsStatsAction.Response.JobStats::getJobId));
listener.onResponse(new GetJobsStatsAction.Response(response.getTaskFailures(), response.getNodeFailures(),
new QueryPage<>(results, results.size(), Job.RESULTS_FIELD)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j
int maxMachineMemoryPercent,
MlMemoryTracker memoryTracker,
Logger logger) {
if (job == null) {
logger.debug("[{}] select node job is null", jobId);
}

String resultsIndexName = job != null ? job.getResultsIndexName() : null;
List<String> unavailableIndices = verifyIndicesPrimaryShardsAreActive(resultsIndexName, clusterState);
if (unavailableIndices.size() != 0) {
Expand Down Expand Up @@ -236,6 +240,16 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j
reasons.add(reason);
continue;
}

boolean jobConfigIsStoredInIndex = job.getJobVersion().onOrAfter(Version.V_6_6_0);
if (jobConfigIsStoredInIndex && node.getVersion().before(Version.V_6_6_0)) {
String reason = "Not opening job [" + jobId + "] on node [" + nodeNameOrId(node)
+ "] version [" + node.getVersion() + "], because this node does not support " +
"jobs of version [" + job.getJobVersion() + "]";
logger.trace(reason);
reasons.add(reason);
continue;
}
}

long numberOfAssignedJobs = 0;
Expand Down Expand Up @@ -820,8 +834,16 @@ public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterS

@Override
public PersistentTasksCustomMetaData.Assignment getAssignment(OpenJobAction.JobParams params, ClusterState clusterState) {
Job foundJob = params.getJob();
if (foundJob == null) {
// The job was added to the persistent task parameters in 6.6.0
// if the field is not present the task was created before 6.6.0.
// In which case the job should still be in the clusterstate
foundJob = MlMetadata.getMlMetadata(clusterState).getJobs().get(params.getJobId());
}

PersistentTasksCustomMetaData.Assignment assignment = selectLeastLoadedMlNode(params.getJobId(),
params.getJob(),
foundJob,
clusterState,
maxConcurrentJobAllocations,
fallbackMaxNumberOfOpenJobs,
Expand Down
Loading

0 comments on commit 4fd00d6

Please sign in to comment.