diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java index a7dbb9d4f93b6..510d7f411a9e3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java @@ -131,15 +131,21 @@ private void deleteDatafeedConfig(DeleteDatafeedAction.Request request, ClusterS return; } - MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); - if (mlMetadata.getDatafeed(request.getDatafeedId()) != null) { - deleteDatafeedFromMetadata(request, listener); - } else { - datafeedConfigProvider.deleteDatafeedConfig(request.getDatafeedId(), ActionListener.wrap( - deleteResponse -> listener.onResponse(new AcknowledgedResponse(true)), - listener::onFailure - )); - } + + datafeedConfigProvider.deleteDatafeedConfig(request.getDatafeedId(), ActionListener.wrap( + deleteResponse -> listener.onResponse(new AcknowledgedResponse(true)), + e -> { + if (e.getClass() == ResourceNotFoundException.class) { + // is the datafeed in the clusterstate + MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); + if (mlMetadata.getDatafeed(request.getDatafeedId()) != null) { + deleteDatafeedFromMetadata(request, listener); + return; + } + } + listener.onFailure(e); + } + )); } private void deleteDatafeedFromMetadata(DeleteDatafeedAction.Request request, ActionListener listener) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index c9315a178148d..b676abe1a1fad 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -11,6 +11,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction; @@ -692,47 +693,53 @@ public void onTimeout(TimeValue timeout) { private void clearJobFinishedTime(String jobId, ActionListener listener) { - boolean jobIsInClusterState = ClusterStateJobUpdate.jobIsInClusterState(clusterService.state(), jobId); - if (jobIsInClusterState) { - clusterService.submitStateUpdateTask("clearing-job-finish-time-for-" + jobId, new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState); - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata); - Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId)); - jobBuilder.setFinishedTime(null); - - mlMetadataBuilder.putJob(jobBuilder.build(), true); - ClusterState.Builder builder = ClusterState.builder(currentState); - return builder.metaData(new MetaData.Builder(currentState.metaData()) - .putCustom(MlMetadata.TYPE, mlMetadataBuilder.build())) - .build(); - } + JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build(); - @Override - public void onFailure(String source, Exception e) { - logger.error("[" + jobId + "] Failed to clear finished_time; source [" + source + "]", e); + jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap( + job -> listener.onResponse(new AcknowledgedResponse(true)), + e -> { + if (e.getClass() == ResourceNotFoundException.class) { + // Maybe the config is in the clusterstate + if (ClusterStateJobUpdate.jobIsInClusterState(clusterService.state(), jobId)) { + clearJobFinishedTimeClusterState(jobId, listener); + return; + } + } + logger.error("[" + jobId + "] Failed to clear finished_time", e); + // Not a critical error so continue listener.onResponse(new AcknowledgedResponse(true)); } + )); + } - @Override - public void clusterStateProcessed(String source, ClusterState oldState, - ClusterState newState) { - listener.onResponse(new AcknowledgedResponse(true)); - } - }); - } else { - JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build(); - - jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap( - job -> listener.onResponse(new AcknowledgedResponse(true)), - e -> { - logger.error("[" + jobId + "] Failed to clear finished_time", e); - // Not a critical error so continue - listener.onResponse(new AcknowledgedResponse(true)); - } - )); - } + private void clearJobFinishedTimeClusterState(String jobId, ActionListener listener) { + clusterService.submitStateUpdateTask("clearing-job-finish-time-for-" + jobId, new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState); + MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata); + Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId)); + jobBuilder.setFinishedTime(null); + + mlMetadataBuilder.putJob(jobBuilder.build(), true); + ClusterState.Builder builder = ClusterState.builder(currentState); + return builder.metaData(new MetaData.Builder(currentState.metaData()) + .putCustom(MlMetadata.TYPE, mlMetadataBuilder.build())) + .build(); + } + + @Override + public void onFailure(String source, Exception e) { + logger.error("[" + jobId + "] Failed to clear finished_time; source [" + source + "]", e); + listener.onResponse(new AcknowledgedResponse(true)); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, + ClusterState newState) { + listener.onResponse(new AcknowledgedResponse(true)); + } + }); } private void cancelJobStart(PersistentTasksCustomMetaData.PersistentTask persistentTask, Exception exception, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java index 8ae6afdb0578c..115774cd351d9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.action; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; @@ -69,19 +70,6 @@ protected PutDatafeedAction.Response newResponse() { protected void masterOperation(UpdateDatafeedAction.Request request, ClusterState state, ActionListener listener) throws Exception { - MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); - boolean datafeedConfigIsInClusterState = mlMetadata.getDatafeed(request.getUpdate().getId()) != null; - if (datafeedConfigIsInClusterState) { - updateDatafeedInClusterState(request, listener); - } else { - updateDatafeedInIndex(request, state, listener); - } - } - - private void updateDatafeedInIndex(UpdateDatafeedAction.Request request, ClusterState state, - ActionListener listener) throws Exception { - final Map headers = threadPool.getThreadContext().getHeaders(); - // Check datafeed is stopped PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); if (MlTasks.getDatafeedTask(request.getUpdate().getId(), tasks) != null) { @@ -91,6 +79,12 @@ private void updateDatafeedInIndex(UpdateDatafeedAction.Request request, Cluster return; } + updateDatafeedInIndex(request, state, listener); + } + + private void updateDatafeedInIndex(UpdateDatafeedAction.Request request, ClusterState state, + ActionListener listener) throws Exception { + final Map headers = threadPool.getThreadContext().getHeaders(); String datafeedId = request.getUpdate().getId(); CheckedConsumer updateConsumer = ok -> { @@ -98,7 +92,17 @@ private void updateDatafeedInIndex(UpdateDatafeedAction.Request request, Cluster jobConfigProvider::validateDatafeedJob, ActionListener.wrap( updatedConfig -> listener.onResponse(new PutDatafeedAction.Response(updatedConfig)), - listener::onFailure + e -> { + if (e.getClass() == ResourceNotFoundException.class) { + // try the clusterstate + MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); + if (mlMetadata.getDatafeed(request.getUpdate().getId()) != null) { + updateDatafeedInClusterState(request, listener); + return; + } + } + listener.onFailure(e); + } )); }; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java index 6e55552fb6b24..959a9ca3c3a78 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.datafeed; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; @@ -50,17 +51,22 @@ public DatafeedConfigReader(DatafeedConfigProvider datafeedConfigProvider) { * @param listener DatafeedConfig listener */ public void datafeedConfig(String datafeedId, ClusterState state, ActionListener listener) { - MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); - DatafeedConfig config = mlMetadata.getDatafeed(datafeedId); - - if (config != null) { - listener.onResponse(config); - } else { - datafeedConfigProvider.getDatafeedConfig(datafeedId, ActionListener.wrap( - builder -> listener.onResponse(builder.build()), - listener::onFailure - )); - } + + datafeedConfigProvider.getDatafeedConfig(datafeedId, ActionListener.wrap( + builder -> listener.onResponse(builder.build()), + e -> { + if (e.getClass() == ResourceNotFoundException.class) { + // look in the clusterstate + MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); + DatafeedConfig config = mlMetadata.getDatafeed(datafeedId); + if (config != null) { + listener.onResponse(config); + return; + } + } + listener.onFailure(e); + } + )); } /** @@ -70,23 +76,16 @@ public void datafeedConfig(String datafeedId, ClusterState state, ActionListener public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, ClusterState clusterState, ActionListener> listener) { - Set clusterStateDatafeedIds = MlMetadata.getMlMetadata(clusterState).expandDatafeedIds(expression); ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(expression, allowNoDatafeeds); - requiredMatches.filterMatchedIds(clusterStateDatafeedIds); datafeedConfigProvider.expandDatafeedIdsWithoutMissingCheck(expression, ActionListener.wrap( expandedDatafeedIds -> { - // Check for duplicate Ids - expandedDatafeedIds.forEach(id -> { - if (clusterStateDatafeedIds.contains(id)) { - listener.onFailure(new IllegalStateException("Datafeed [" + id + "] configuration " + - "exists in both clusterstate and index")); - return; - } - }); - requiredMatches.filterMatchedIds(expandedDatafeedIds); + // now read from the clusterstate + Set clusterStateDatafeedIds = MlMetadata.getMlMetadata(clusterState).expandDatafeedIds(expression); + requiredMatches.filterMatchedIds(clusterStateDatafeedIds); + if (requiredMatches.hasUnmatchedIds()) { listener.onFailure(ExceptionsHelper.missingDatafeedException(requiredMatches.unmatchedIdsString())); } else { @@ -105,33 +104,32 @@ public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, Clust public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, ClusterState clusterState, ActionListener> listener) { - Map clusterStateConfigs = expandClusterStateDatafeeds(expression, clusterState); - ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(expression, allowNoDatafeeds); - requiredMatches.filterMatchedIds(clusterStateConfigs.keySet()); datafeedConfigProvider.expandDatafeedConfigsWithoutMissingCheck(expression, ActionListener.wrap( datafeedBuilders -> { - // Check for duplicate Ids - datafeedBuilders.forEach(datafeedBuilder -> { - if (clusterStateConfigs.containsKey(datafeedBuilder.getId())) { - listener.onFailure(new IllegalStateException("Datafeed [" + datafeedBuilder.getId() + "] configuration " + - "exists in both clusterstate and index")); - return; - } - }); - List datafeedConfigs = new ArrayList<>(); for (DatafeedConfig.Builder builder : datafeedBuilders) { datafeedConfigs.add(builder.build()); } + Map clusterStateConfigs = expandClusterStateDatafeeds(expression, clusterState); + + // Duplicate configs existing in both the clusterstate and index documents are ok + // this may occur during migration of configs. + // Prefer the index configs and filter duplicates from the clusterstate configs. + Set indexConfigIds = datafeedConfigs.stream().map(DatafeedConfig::getId).collect(Collectors.toSet()); + for (String clusterStateDatafeedId : clusterStateConfigs.keySet()) { + if (indexConfigIds.contains(clusterStateDatafeedId) == false) { + datafeedConfigs.add(clusterStateConfigs.get(clusterStateDatafeedId)); + } + } + requiredMatches.filterMatchedIds(datafeedConfigs.stream().map(DatafeedConfig::getId).collect(Collectors.toList())); if (requiredMatches.hasUnmatchedIds()) { listener.onFailure(ExceptionsHelper.missingDatafeedException(requiredMatches.unmatchedIdsString())); } else { - datafeedConfigs.addAll(clusterStateConfigs.values()); Collections.sort(datafeedConfigs, Comparator.comparing(DatafeedConfig::getId)); listener.onResponse(datafeedConfigs); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java index 2972b27dda789..900267581d8dc 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java @@ -234,7 +234,11 @@ public void onResponse(DeleteResponse deleteResponse) { } @Override public void onFailure(Exception e) { - actionListener.onFailure(e); + if (e.getClass() == IndexNotFoundException.class) { + actionListener.onFailure(ExceptionsHelper.missingDatafeedException(datafeedId)); + } else { + actionListener.onFailure(e); + } } }); } @@ -303,7 +307,11 @@ public void onResponse(GetResponse getResponse) { @Override public void onFailure(Exception e) { - updatedConfigListener.onFailure(e); + if (e.getClass() == IndexNotFoundException.class) { + updatedConfigListener.onFailure(ExceptionsHelper.missingDatafeedException(datafeedId)); + } else { + updatedConfigListener.onFailure(e); + } } }); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index be7acda15e8d9..92337c47e4b21 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -10,6 +10,7 @@ import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; @@ -212,28 +213,10 @@ private void getJobFromClusterState(String jobId, ActionListener jobListene * @param jobsListener The jobs listener */ public void expandJobs(String expression, boolean allowNoJobs, ActionListener> jobsListener) { - Map clusterStateJobs = expandJobsFromClusterState(expression, clusterService.state()); ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(expression, allowNoJobs); - requiredMatches.filterMatchedIds(clusterStateJobs.keySet()); - - // If expression contains a group Id it has been expanded to its - // constituent job Ids but Ids matcher needs to know the group - // has been matched - Set groupIds = clusterStateJobs.values().stream() - .filter(job -> job.getGroups() != null).flatMap(j -> j.getGroups().stream()).collect(Collectors.toSet()); - requiredMatches.filterMatchedIds(groupIds); jobConfigProvider.expandJobsWithoutMissingcheck(expression, false, ActionListener.wrap( jobBuilders -> { - // Check for duplicate jobs - for (Job.Builder jb : jobBuilders) { - if (clusterStateJobs.containsKey(jb.getId())) { - jobsListener.onFailure(new IllegalStateException("Job [" + jb.getId() + "] configuration " + - "exists in both clusterstate and index")); - return; - } - } - Set jobAndGroupIds = new HashSet<>(); // Merge cluster state and index jobs @@ -241,16 +224,32 @@ public void expandJobs(String expression, boolean allowNoJobs, ActionListener clusterStateJobs = expandJobsFromClusterState(expression, clusterService.state()); + for (String clusterStateJobId : clusterStateJobs.keySet()) { + boolean isDuplicate = jobAndGroupIds.contains(clusterStateJobId); + if (isDuplicate == false) { + Job csJob = clusterStateJobs.get(clusterStateJobId); + jobs.add(csJob); + jobAndGroupIds.add(csJob.getId()); + jobAndGroupIds.addAll(csJob.getGroups()); + } + } + requiredMatches.filterMatchedIds(jobAndGroupIds); if (requiredMatches.hasUnmatchedIds()) { jobsListener.onFailure(ExceptionsHelper.missingJobException(requiredMatches.unmatchedIdsString())); } else { - jobs.addAll(clusterStateJobs.values()); Collections.sort(jobs, Comparator.comparing(Job::getId)); jobsListener.onResponse(new QueryPage<>(jobs, jobs.size(), Job.RESULTS_FIELD)); } @@ -277,27 +276,21 @@ private Map expandJobsFromClusterState(String expression, ClusterSt * @param jobsListener The jobs listener */ public void expandJobIds(String expression, boolean allowNoJobs, ActionListener> jobsListener) { - Set clusterStateJobIds = MlMetadata.getMlMetadata(clusterService.state()).expandJobIds(expression); ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(expression, allowNoJobs); - requiredMatches.filterMatchedIds(clusterStateJobIds); - // If expression contains a group Id it has been expanded to its - // constituent job Ids but Ids matcher needs to know the group - // has been matched - requiredMatches.filterMatchedIds(MlMetadata.getMlMetadata(clusterService.state()).expandGroupIds(expression)); + jobConfigProvider.expandJobsIdsWithoutMissingCheck(expression, false, ActionListener.wrap( jobIdsAndGroups -> { - // Check for duplicate job Ids - for (String id : jobIdsAndGroups.getJobs()) { - if (clusterStateJobIds.contains(id)) { - jobsListener.onFailure(new IllegalStateException("Job [" + id + "] configuration " + - "exists in both clusterstate and index")); - return; - } - } - requiredMatches.filterMatchedIds(jobIdsAndGroups.getJobs()); + + Set clusterStateJobIds = MlMetadata.getMlMetadata(clusterService.state()).expandJobIds(expression); + requiredMatches.filterMatchedIds(clusterStateJobIds); + // If expression contains a group Id it has been expanded to its + // constituent job Ids but Ids matcher needs to know the group + // has been matched requiredMatches.filterMatchedIds(jobIdsAndGroups.getGroups()); + requiredMatches.filterMatchedIds(MlMetadata.getMlMetadata(clusterService.state()).expandGroupIds(expression)); + if (requiredMatches.hasUnmatchedIds()) { jobsListener.onFailure(ExceptionsHelper.missingJobException(requiredMatches.unmatchedIdsString())); } else { @@ -311,38 +304,50 @@ public void expandJobIds(String expression, boolean allowNoJobs, ActionListener< } /** - * Mark the job as being deleted. First looks in the cluster state for the - * job configuration then the index + * Mark the job as being deleted. First looks in the ml-config index + * for the job configuration then the clusterstate * * @param jobId To to mark * @param force Allows an open job to be marked * @param listener listener */ public void markJobAsDeleting(String jobId, boolean force, ActionListener listener) { - if (ClusterStateJobUpdate.jobIsInClusterState(clusterService.state(), jobId)) { - ClusterStateJobUpdate.markJobAsDeleting(jobId, force, clusterService, listener); - } else { - jobConfigProvider.markJobAsDeleting(jobId, listener); - } + jobConfigProvider.markJobAsDeleting(jobId, ActionListener.wrap( + listener::onResponse, + e -> { + if (e.getClass() == ResourceNotFoundException.class) { + // is the config in the cluster state + if (ClusterStateJobUpdate.jobIsInClusterState(clusterService.state(), jobId)) { + ClusterStateJobUpdate.markJobAsDeleting(jobId, force, clusterService, listener); + return; + } + } + listener.onFailure(e); + } + )); + } /** - * First try to delete the job from the cluster state, if it does not exist - * there try to delete the index job. + * First try to delete the job document from ml-config, if it does not exist + * there try to the clusterstate. * * @param request The delete job request * @param listener Delete listener */ public void deleteJob(DeleteJobAction.Request request, ActionListener listener) { - - if (ClusterStateJobUpdate.jobIsInClusterState(clusterService.state(), request.getJobId())) { - ClusterStateJobUpdate.deleteJob(request, clusterService, listener); - } else { - jobConfigProvider.deleteJob(request.getJobId(), false, ActionListener.wrap( - deleteResponse -> listener.onResponse(Boolean.TRUE), - listener::onFailure - )); - } + jobConfigProvider.deleteJob(request.getJobId(), false, ActionListener.wrap( + deleteResponse -> { + if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { + if (ClusterStateJobUpdate.jobIsInClusterState(clusterService.state(), request.getJobId())) { + ClusterStateJobUpdate.deleteJob(request, clusterService, listener); + } + } else { + listener.onResponse(deleteResponse.getResult() == DocWriteResponse.Result.DELETED); + } + }, + listener::onFailure + )); } /** @@ -462,18 +467,21 @@ public void onFailure(Exception e) { } public void updateJob(UpdateJobAction.Request request, ActionListener actionListener) { - MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state()); - if (ClusterStateJobUpdate.jobIsInMlMetadata(mlMetadata, request.getJobId())) { - updateJobClusterState(request, actionListener); - } else { - updateJobIndex(request, ActionListener.wrap( - updatedJob -> { - postJobUpdate(clusterService.state(), request); - actionListener.onResponse(new PutJobAction.Response(updatedJob)); - }, - actionListener::onFailure - )); - } + updateJobIndex(request, ActionListener.wrap( + updatedJob -> { + postJobUpdate(clusterService.state(), request); + actionListener.onResponse(new PutJobAction.Response(updatedJob)); + }, + e -> { + if (e.getClass() == ResourceNotFoundException.class) { + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state()); + if (ClusterStateJobUpdate.jobIsInMlMetadata(mlMetadata, request.getJobId())) { + updateJobClusterState(request, actionListener); + return; + } + } + actionListener.onFailure(e); + })); } private void postJobUpdate(ClusterState clusterState, UpdateJobAction.Request request) { @@ -658,16 +666,16 @@ public void notifyFilterChanged(MlFilter filter, Set addedItems, Set { threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> { - List allJobs = new ArrayList<>(); - // Check for duplicate jobs - for (Job indexJob : indexJobs) { - if (clusterStateJobs.containsKey(indexJob.getId())) { - logger.error("[" + indexJob.getId() + "] job configuration exists in both clusterstate and index"); - } else { - allJobs.add(indexJob); + List allJobs = new ArrayList<>(indexJobs); + // Duplicate configs existing in both the clusterstate and index documents are ok + // this may occur during migration of configs. + // Filter the duplicates so we don't update twice for duplicated jobs + for (String clusterStateJobId : clusterStateJobs.keySet()) { + boolean isDuplicate = allJobs.stream().anyMatch(job -> job.getId().equals(clusterStateJobId)); + if (isDuplicate == false) { + allJobs.add(clusterStateJobs.get(clusterStateJobId)); } } - allJobs.addAll(clusterStateJobs.values()); for (Job job: allJobs) { Set jobFilters = job.getAnalysisConfig().extractReferencedFilters(); @@ -740,7 +748,7 @@ public void updateProcessOnCalendarChanged(List calendarJobIds, ActionLi jobConfigProvider.expandGroupIds(calendarJobIds, ActionListener.wrap( expandedIds -> { threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> { - // Merge the expended group members with the request Ids which + // Merge the expanded group members with the request Ids // which are job ids rather than group Ids. expandedIds.addAll(calendarJobIds); @@ -798,39 +806,10 @@ public void revertSnapshot(RevertModelSnapshotAction.Request request, ActionList // Step 1. update the job // ------- - - Consumer updateJobHandler; - - if (ClusterStateJobUpdate.jobIsInClusterState(clusterService.state(), request.getJobId())) { - updateJobHandler = response -> clusterService.submitStateUpdateTask("revert-snapshot-" + request.getJobId(), - new AckedClusterStateUpdateTask(request, ActionListener.wrap(updateHandler, actionListener::onFailure)) { - - @Override - protected Boolean newResponse(boolean acknowledged) { - if (acknowledged) { - auditor.info(request.getJobId(), - Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription())); - return true; - } - actionListener.onFailure(new IllegalStateException("Could not revert modelSnapshot on job [" - + request.getJobId() + "], not acknowledged by master.")); - return false; - } - - @Override - public ClusterState execute(ClusterState currentState) { - Job job = MlMetadata.getMlMetadata(currentState).getJobs().get(request.getJobId()); - Job.Builder builder = new Job.Builder(job); - builder.setModelSnapshotId(modelSnapshot.getSnapshotId()); - builder.setEstablishedModelMemory(response); - return ClusterStateJobUpdate.putJobInClusterState(builder.build(), true, currentState); - } - }); - } else { - updateJobHandler = response -> { + Consumer establishedMemoryHandler = modelMem -> { JobUpdate update = new JobUpdate.Builder(request.getJobId()) .setModelSnapshotId(modelSnapshot.getSnapshotId()) - .setEstablishedModelMemory(response) + .setEstablishedModelMemory(modelMem) .build(); jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit, ActionListener.wrap( @@ -839,14 +818,53 @@ public ClusterState execute(ClusterState currentState) { Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription())); updateHandler.accept(Boolean.TRUE); }, - actionListener::onFailure + e -> { + if (e.getClass() == ResourceNotFoundException.class) { + // Not found? maybe it's in the index + if (ClusterStateJobUpdate.jobIsInClusterState(clusterService.state(), request.getJobId())) { + revertModelSnapshotClusterState(request, modelSnapshot, updateHandler, modelMem, actionListener); + return; + } + } + actionListener.onFailure(e); + } )); }; - } // Step 0. Find the appropriate established model memory for the reverted job // ------- - jobResultsProvider.getEstablishedMemoryUsage(request.getJobId(), modelSizeStats.getTimestamp(), modelSizeStats, updateJobHandler, - actionListener::onFailure); + jobResultsProvider.getEstablishedMemoryUsage(request.getJobId(), modelSizeStats.getTimestamp(), modelSizeStats, + establishedMemoryHandler, actionListener::onFailure); + } + + private void revertModelSnapshotClusterState(RevertModelSnapshotAction.Request request, + ModelSnapshot modelSnapshot, + CheckedConsumer updateHandler, Long modelMem, + ActionListener actionListener) { + clusterService.submitStateUpdateTask("revert-snapshot-" + request.getJobId(), + new AckedClusterStateUpdateTask(request, ActionListener.wrap(updateHandler, actionListener::onFailure)) { + + @Override + protected Boolean newResponse(boolean acknowledged) { + if (acknowledged) { + auditor.info(request.getJobId(), + Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription())); + return true; + } + // TODO is this an error? can actionListener.onFailure be called twice? + actionListener.onFailure(new IllegalStateException("Could not revert modelSnapshot on job [" + + request.getJobId() + "], not acknowledged by master.")); + return false; + } + + @Override + public ClusterState execute(ClusterState currentState) { + Job job = MlMetadata.getMlMetadata(currentState).getJobs().get(request.getJobId()); + Job.Builder builder = new Job.Builder(job); + builder.setModelSnapshotId(modelSnapshot.getSnapshotId()); + builder.setEstablishedModelMemory(modelMem); + return ClusterStateJobUpdate.putJobInClusterState(builder.build(), true, currentState); + } + }); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index 12cb5b5b00db3..e48bdfcd1f26a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -209,7 +209,11 @@ public void onResponse(MultiGetResponse multiGetResponse) { @Override public void onFailure(Exception e) { - listener.onFailure(e); + if (e.getClass() == IndexNotFoundException.class) { + listener.onFailure(ExceptionsHelper.missingJobException(String.join(",", jobIds))); + } else { + listener.onFailure(e); + } } }, client::multiGet); } @@ -244,7 +248,11 @@ public void onResponse(DeleteResponse deleteResponse) { } @Override public void onFailure(Exception e) { - actionListener.onFailure(e); + if (e.getClass() == IndexNotFoundException.class) { + actionListener.onFailure(ExceptionsHelper.missingJobException(jobId)); + } else { + actionListener.onFailure(e); + } } }); } @@ -298,7 +306,11 @@ public void onResponse(GetResponse getResponse) { @Override public void onFailure(Exception e) { - updatedJobListener.onFailure(e); + if (e.getClass() == IndexNotFoundException.class) { + updatedJobListener.onFailure(ExceptionsHelper.missingJobException(jobId)); + } else { + updatedJobListener.onFailure(e); + } } }); } @@ -366,7 +378,11 @@ public void onResponse(GetResponse getResponse) { @Override public void onFailure(Exception e) { - updatedJobListener.onFailure(e); + if (e.getClass() == IndexNotFoundException.class) { + updatedJobListener.onFailure(ExceptionsHelper.missingJobException(jobId)); + } else { + updatedJobListener.onFailure(e); + } } }); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReaderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReaderTests.java index 986b0d0cf46ae..929c957695470 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReaderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReaderTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.SortedSet; @@ -22,6 +23,7 @@ import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.hasSize; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; @@ -132,11 +134,41 @@ public void testExpandDatafeedConfigs_SplitBetweenClusterStateAndIndex() { e -> fail(e.getMessage()) )); + assertThat(configHolder.get(), hasSize(3)); assertEquals("cs-df", configHolder.get().get(0).getId()); assertEquals("index-df", configHolder.get().get(1).getId()); assertEquals("ll-df", configHolder.get().get(2).getId()); } + public void testExpandDatafeedConfigs_DuplicateConfigReturnsIndexDocument() { + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(buildJobBuilder("datafeed-in-clusterstate").build(), false); + mlMetadata.putDatafeed(createDatafeedConfig("df1", "datafeed-in-clusterstate"), Collections.emptyMap()); + + ClusterState clusterState = ClusterState.builder(new ClusterName("datafeedconfigreadertests")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + + DatafeedConfig.Builder indexConfig1 = createDatafeedConfigBuilder("df1", "datafeed-in-index"); + DatafeedConfig.Builder indexConfig2 = createDatafeedConfigBuilder("df2", "job-c"); + DatafeedConfigProvider provider = mock(DatafeedConfigProvider.class); + mockProviderWithExpectedConfig(provider, "_all", Arrays.asList(indexConfig1, indexConfig2)); + + DatafeedConfigReader reader = new DatafeedConfigReader(provider); + + AtomicReference> configHolder = new AtomicReference<>(); + reader.expandDatafeedConfigs("_all", true, clusterState, ActionListener.wrap( + configHolder::set, + e -> fail(e.getMessage()) + )); + + assertThat(configHolder.get(), hasSize(2)); + assertEquals("df1", configHolder.get().get(0).getId()); + assertEquals("datafeed-in-index", configHolder.get().get(0).getJobId()); + assertEquals("df2", configHolder.get().get(1).getId()); + } + private ClusterState buildClusterStateWithJob(DatafeedConfig datafeed) { MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); mlMetadata.putJob(buildJobBuilder(JOB_ID_FOO).build(), false); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index 4aba53d2614cb..76fc990995e95 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -86,7 +86,6 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -243,6 +242,7 @@ public void testExpandJob_GivenDuplicateConfig() throws IOException { List docsAsBytes = new ArrayList<>(); Job.Builder indexJob = buildJobBuilder("dupe"); + indexJob.setCustomSettings(Collections.singletonMap("job-saved-in-index", Boolean.TRUE)); docsAsBytes.add(toBytesReference(indexJob.build())); MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); @@ -256,9 +256,10 @@ public void testExpandJob_GivenDuplicateConfig() throws IOException { exceptionHolder::set )); - assertNull(jobsHolder.get()); - assertThat(exceptionHolder.get(), instanceOf(IllegalStateException.class)); - assertEquals("Job [dupe] configuration exists in both clusterstate and index", exceptionHolder.get().getMessage()); + assertThat(jobsHolder.get().results(), hasSize(1)); + Job foundJob = jobsHolder.get().results().get(0); + assertTrue((Boolean)foundJob.getCustomSettings().get("job-saved-in-index")); + assertNull(exceptionHolder.get()); } public void testExpandJobs_SplitBetweenClusterStateAndIndex() throws IOException { @@ -371,9 +372,8 @@ public void testExpandJobIds_GivenDuplicateConfig() { exceptionHolder::set )); - assertNull(jobIdsHolder.get()); - assertThat(exceptionHolder.get(), instanceOf(IllegalStateException.class)); - assertEquals("Job [dupe] configuration exists in both clusterstate and index", exceptionHolder.get().getMessage()); + assertThat(jobIdsHolder.get(), contains("dupe")); + assertNull(exceptionHolder.get()); } public void testExpandJobIdsFromClusterStateAndIndex_GivenAll() { @@ -870,6 +870,12 @@ public void testRevertSnapshot_GivenJobInClusterState() { when(clusterService.getClusterSettings()).thenReturn(clusterSettings); JobConfigProvider jobConfigProvider = mock(JobConfigProvider.class); + doAnswer(invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[3]; + listener.onFailure(new ResourceNotFoundException("missing job")); + return null; + }).when(jobConfigProvider).updateJob(anyString(), any(), any(), any(ActionListener.class)); + JobManager jobManager = new JobManager(environment, environment.settings(), jobResultsProvider, clusterService, auditor, threadPool, mock(Client.class), updateJobProcessNotifier, jobConfigProvider); @@ -888,7 +894,6 @@ public void testRevertSnapshot_GivenJobInClusterState() { jobManager.revertSnapshot(request, mock(ActionListener.class), modelSnapshot); verify(clusterService, times(1)).submitStateUpdateTask(eq("revert-snapshot-cs-revert"), any(AckedClusterStateUpdateTask.class)); - verify(jobConfigProvider, never()).updateJob(any(), any(), any(), any()); } private Job.Builder createJob() {