diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index d5461d855998b..5e7b7fbf68ebd 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -162,7 +162,7 @@ import org.elasticsearch.xpack.ml.job.categorization.MlClassicTokenizer; import org.elasticsearch.xpack.ml.job.categorization.MlClassicTokenizerFactory; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.NativeController; @@ -357,9 +357,9 @@ public Collection createComponents(Client client, ClusterService cluster } Auditor auditor = new Auditor(client, clusterService.nodeName()); - JobProvider jobProvider = new JobProvider(client, settings); + JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings); UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(settings, client, clusterService, threadPool); - JobManager jobManager = new JobManager(env, settings, jobProvider, clusterService, auditor, client, notifier); + JobManager jobManager = new JobManager(env, settings, jobResultsProvider, clusterService, auditor, client, notifier); JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(settings, client); JobResultsPersister jobResultsPersister = new JobResultsPersister(settings, client); @@ -390,10 +390,10 @@ public Collection createComponents(Client client, ClusterService cluster NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory, threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)); AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(env, settings, client, threadPool, - jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, + jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, xContentRegistry, auditor); this.autodetectProcessManager.set(autodetectProcessManager); - DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, jobProvider, auditor, System::currentTimeMillis); + DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, jobResultsProvider, auditor, System::currentTimeMillis); DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, System::currentTimeMillis, auditor); this.datafeedManager.set(datafeedManager); @@ -408,7 +408,7 @@ public Collection createComponents(Client client, ClusterService cluster return Arrays.asList( mlLifeCycleService, - jobProvider, + jobResultsProvider, jobManager, autodetectProcessManager, new MlInitializationService(settings, threadPool, clusterService, client), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarAction.java index 9c712efe693ca..c171acd71b59b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarAction.java @@ -24,7 +24,7 @@ import org.elasticsearch.xpack.core.ml.action.DeleteCalendarAction; import org.elasticsearch.xpack.core.ml.calendars.Calendar; import org.elasticsearch.xpack.ml.job.JobManager; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import java.util.function.Supplier; @@ -35,16 +35,17 @@ public class TransportDeleteCalendarAction extends HandledTransportAction) DeleteCalendarAction.Request::new); this.client = client; this.jobManager = jobManager; - this.jobProvider = jobProvider; + this.jobResultsProvider = jobResultsProvider; } @Override @@ -70,7 +71,7 @@ protected void doExecute(Task task, DeleteCalendarAction.Request request, Action listener::onFailure ); - jobProvider.calendar(calendarId, calendarListener); + jobResultsProvider.calendar(calendarId, calendarListener); } private DeleteByQueryRequest buildDeleteByQuery(String calendarId) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarEventAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarEventAction.java index 52896751de1d3..c52f144923e08 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarEventAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarEventAction.java @@ -26,7 +26,7 @@ import org.elasticsearch.xpack.core.ml.calendars.Calendar; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.job.JobManager; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import java.util.Map; @@ -37,16 +37,16 @@ public class TransportDeleteCalendarEventAction extends HandledTransportAction { private final Client client; - private final JobProvider jobProvider; + private final JobResultsProvider jobResultsProvider; private final JobManager jobManager; @Inject public TransportDeleteCalendarEventAction(Settings settings, TransportService transportService, ActionFilters actionFilters, - Client client, JobProvider jobProvider, JobManager jobManager) { + Client client, JobResultsProvider jobResultsProvider, JobManager jobManager) { super(settings, DeleteCalendarEventAction.NAME, transportService, actionFilters, DeleteCalendarEventAction.Request::new); this.client = client; - this.jobProvider = jobProvider; + this.jobResultsProvider = jobResultsProvider; this.jobManager = jobManager; } @@ -87,7 +87,7 @@ protected void doExecute(Task task, DeleteCalendarEventAction.Request request, }, listener::onFailure); // Get the calendar first so we check the calendar exists before checking the event exists - jobProvider.calendar(request.getCalendarId(), calendarListener); + jobResultsProvider.calendar(request.getCalendarId(), calendarListener); } private void deleteEvent(String eventId, Calendar calendar, ActionListener listener) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteModelSnapshotAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteModelSnapshotAction.java index c63f8a4405b89..398b9930f7d45 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteModelSnapshotAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteModelSnapshotAction.java @@ -22,7 +22,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.JobDataDeleter; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.JobManager; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.notifications.Auditor; import java.util.Collections; @@ -32,17 +32,18 @@ public class TransportDeleteModelSnapshotAction extends HandledTransportAction { private final Client client; - private final JobProvider jobProvider; + private final JobResultsProvider jobResultsProvider; private final ClusterService clusterService; private final Auditor auditor; @Inject public TransportDeleteModelSnapshotAction(Settings settings, TransportService transportService, ActionFilters actionFilters, - JobProvider jobProvider, ClusterService clusterService, Client client, Auditor auditor) { + JobResultsProvider jobResultsProvider, ClusterService clusterService, Client client, + Auditor auditor) { super(settings, DeleteModelSnapshotAction.NAME, transportService, actionFilters, DeleteModelSnapshotAction.Request::new); this.client = client; - this.jobProvider = jobProvider; + this.jobResultsProvider = jobResultsProvider; this.clusterService = clusterService; this.auditor = auditor; } @@ -51,7 +52,7 @@ public TransportDeleteModelSnapshotAction(Settings settings, TransportService tr protected void doExecute(Task task, DeleteModelSnapshotAction.Request request, ActionListener listener) { // Verify the snapshot exists - jobProvider.modelSnapshots( + jobResultsProvider.modelSnapshots( request.getJobId(), 0, 1, null, null, null, true, request.getSnapshotId(), page -> { List deleteCandidates = page.results(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportForecastJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportForecastJobAction.java index f42f7003b909c..ad9e6a7c2630a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportForecastJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportForecastJobAction.java @@ -24,7 +24,7 @@ import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.job.JobManager; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams; @@ -40,15 +40,15 @@ public class TransportForecastJobAction extends TransportJobTaskAction { - private final JobProvider jobProvider; + private final JobResultsProvider jobResultsProvider; private final JobManager jobManager; private final Client client; @Inject public TransportGetBucketsAction(Settings settings, TransportService transportService, - ActionFilters actionFilters, JobProvider jobProvider, JobManager jobManager, Client client) { + ActionFilters actionFilters, JobResultsProvider jobResultsProvider, + JobManager jobManager, Client client) { super(settings, GetBucketsAction.NAME, transportService, actionFilters, (Supplier) GetBucketsAction.Request::new); - this.jobProvider = jobProvider; + this.jobResultsProvider = jobResultsProvider; this.jobManager = jobManager; this.client = client; } @@ -59,7 +60,7 @@ protected void doExecute(Task task, GetBucketsAction.Request request, ActionList query.start(request.getStart()); query.end(request.getEnd()); } - jobProvider.buckets(request.getJobId(), query, q -> + jobResultsProvider.buckets(request.getJobId(), query, q -> listener.onResponse(new GetBucketsAction.Response(q)), listener::onFailure, client); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarEventsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarEventsAction.java index 2e30ad80d859a..96ba9e6fbbebf 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarEventsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarEventsAction.java @@ -21,7 +21,7 @@ import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder; import java.util.Collections; @@ -31,15 +31,16 @@ public class TransportGetCalendarEventsAction extends HandledTransportAction { - private final JobProvider jobProvider; + private final JobResultsProvider jobResultsProvider; private final ClusterService clusterService; @Inject public TransportGetCalendarEventsAction(Settings settings, TransportService transportService, - ActionFilters actionFilters, ClusterService clusterService, JobProvider jobProvider) { + ActionFilters actionFilters, ClusterService clusterService, + JobResultsProvider jobResultsProvider) { super(settings, GetCalendarEventsAction.NAME, transportService, actionFilters, (Supplier) GetCalendarEventsAction.Request::new); - this.jobProvider = jobProvider; + this.jobResultsProvider = jobResultsProvider; this.clusterService = clusterService; } @@ -85,9 +86,9 @@ protected void doExecute(Task task, GetCalendarEventsAction.Request request, jobGroups = job.getGroups(); } - jobProvider.scheduledEventsForJob(requestId, jobGroups, query, eventsListener); + jobResultsProvider.scheduledEventsForJob(requestId, jobGroups, query, eventsListener); } else { - jobProvider.scheduledEvents(query, eventsListener); + jobResultsProvider.scheduledEvents(query, eventsListener); } }, listener::onFailure); @@ -101,7 +102,7 @@ private void checkCalendarExists(String calendarId, ActionListener list return; } - jobProvider.calendar(calendarId, ActionListener.wrap( + jobResultsProvider.calendar(calendarId, ActionListener.wrap( c -> listener.onResponse(true), listener::onFailure )); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarsAction.java index ed837139ade1c..e9a9cd06d92c5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarsAction.java @@ -17,20 +17,20 @@ import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.calendars.Calendar; import org.elasticsearch.xpack.ml.job.persistence.CalendarQueryBuilder; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import java.util.Collections; public class TransportGetCalendarsAction extends HandledTransportAction { - private final JobProvider jobProvider; + private final JobResultsProvider jobResultsProvider; @Inject public TransportGetCalendarsAction(Settings settings, TransportService transportService, - ActionFilters actionFilters, JobProvider jobProvider) { + ActionFilters actionFilters, JobResultsProvider jobResultsProvider) { super(settings, GetCalendarsAction.NAME, transportService, actionFilters, GetCalendarsAction.Request::new); - this.jobProvider = jobProvider; + this.jobResultsProvider = jobResultsProvider; } @Override @@ -49,7 +49,7 @@ protected void doExecute(Task task, GetCalendarsAction.Request request, ActionLi private void getCalendar(String calendarId, ActionListener listener) { - jobProvider.calendar(calendarId, ActionListener.wrap( + jobResultsProvider.calendar(calendarId, ActionListener.wrap( calendar -> { QueryPage page = new QueryPage<>(Collections.singletonList(calendar), 1, Calendar.RESULTS_FIELD); listener.onResponse(new GetCalendarsAction.Response(page)); @@ -60,7 +60,7 @@ private void getCalendar(String calendarId, ActionListener listener) { CalendarQueryBuilder query = new CalendarQueryBuilder().pageParams(pageParams).sort(true); - jobProvider.calendars(query, ActionListener.wrap( + jobResultsProvider.calendars(query, ActionListener.wrap( calendars -> { listener.onResponse(new GetCalendarsAction.Response(calendars)); }, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCategoriesAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCategoriesAction.java index 0e0481f394ccf..4bac6321a3ebf 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCategoriesAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCategoriesAction.java @@ -15,22 +15,23 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.action.GetCategoriesAction; import org.elasticsearch.xpack.ml.job.JobManager; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import java.util.function.Supplier; public class TransportGetCategoriesAction extends HandledTransportAction { - private final JobProvider jobProvider; + private final JobResultsProvider jobResultsProvider; private final Client client; private final JobManager jobManager; @Inject public TransportGetCategoriesAction(Settings settings, TransportService transportService, - ActionFilters actionFilters, JobProvider jobProvider, Client client, JobManager jobManager) { + ActionFilters actionFilters, JobResultsProvider jobResultsProvider, + Client client, JobManager jobManager) { super(settings, GetCategoriesAction.NAME, transportService, actionFilters, (Supplier) GetCategoriesAction.Request::new); - this.jobProvider = jobProvider; + this.jobResultsProvider = jobResultsProvider; this.client = client; this.jobManager = jobManager; } @@ -41,7 +42,7 @@ protected void doExecute(Task task, GetCategoriesAction.Request request, ActionL Integer from = request.getPageParams() != null ? request.getPageParams().getFrom() : null; Integer size = request.getPageParams() != null ? request.getPageParams().getSize() : null; - jobProvider.categoryDefinitions(request.getJobId(), request.getCategoryId(), true, from, size, + jobResultsProvider.categoryDefinitions(request.getJobId(), request.getCategoryId(), true, from, size, r -> listener.onResponse(new GetCategoriesAction.Response(r)), listener::onFailure, client); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetInfluencersAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetInfluencersAction.java index 125e31fcf63cf..e62538831b5cb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetInfluencersAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetInfluencersAction.java @@ -16,22 +16,23 @@ import org.elasticsearch.xpack.core.ml.action.GetInfluencersAction; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import java.util.function.Supplier; public class TransportGetInfluencersAction extends HandledTransportAction { - private final JobProvider jobProvider; + private final JobResultsProvider jobResultsProvider; private final Client client; private final JobManager jobManager; @Inject public TransportGetInfluencersAction(Settings settings, TransportService transportService, - ActionFilters actionFilters, JobProvider jobProvider, Client client, JobManager jobManager) { + ActionFilters actionFilters, JobResultsProvider jobResultsProvider, + Client client, JobManager jobManager) { super(settings, GetInfluencersAction.NAME, transportService, actionFilters, (Supplier) GetInfluencersAction.Request::new); - this.jobProvider = jobProvider; + this.jobResultsProvider = jobResultsProvider; this.client = client; this.jobManager = jobManager; } @@ -49,7 +50,7 @@ protected void doExecute(Task task, GetInfluencersAction.Request request, Action .influencerScoreThreshold(request.getInfluencerScore()) .sortField(request.getSort()) .sortDescending(request.isDescending()).build(); - jobProvider.influencers(request.getJobId(), query, + jobResultsProvider.influencers(request.getJobId(), query, page -> listener.onResponse(new GetInfluencersAction.Response(page)), listener::onFailure, client); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java index 28034d757dac2..ab1ef73780e5e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java @@ -32,7 +32,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.stats.ForecastStats; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import java.io.IOException; @@ -52,18 +52,18 @@ public class TransportGetJobsStatsAction extends TransportTasksAction handler, Consumer errorHandler) { - jobProvider.getForecastStats(jobId, handler, errorHandler); + jobResultsProvider.getForecastStats(jobId, handler, errorHandler); } void gatherDataCountsAndModelSizeStats(String jobId, BiConsumer handler, Consumer errorHandler) { - jobProvider.dataCounts(jobId, dataCounts -> { - jobProvider.modelSizeStats(jobId, modelSizeStats -> { + jobResultsProvider.dataCounts(jobId, dataCounts -> { + jobResultsProvider.modelSizeStats(jobId, modelSizeStats -> { handler.accept(dataCounts, modelSizeStats); }, errorHandler); }, errorHandler); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetModelSnapshotsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetModelSnapshotsAction.java index b69db8d48d60f..7a0e0b1c4deb1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetModelSnapshotsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetModelSnapshotsAction.java @@ -16,22 +16,22 @@ import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.JobManager; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import java.util.stream.Collectors; public class TransportGetModelSnapshotsAction extends HandledTransportAction { - private final JobProvider jobProvider; + private final JobResultsProvider jobResultsProvider; private final JobManager jobManager; @Inject public TransportGetModelSnapshotsAction(Settings settings, TransportService transportService, - ActionFilters actionFilters, JobProvider jobProvider, JobManager jobManager) { + ActionFilters actionFilters, JobResultsProvider jobResultsProvider, JobManager jobManager) { super(settings, GetModelSnapshotsAction.NAME, transportService, actionFilters, GetModelSnapshotsAction.Request::new); - this.jobProvider = jobProvider; + this.jobResultsProvider = jobResultsProvider; this.jobManager = jobManager; } @@ -45,7 +45,7 @@ protected void doExecute(Task task, GetModelSnapshotsAction.Request request, jobManager.getJobOrThrowIfUnknown(request.getJobId()); - jobProvider.modelSnapshots(request.getJobId(), request.getPageParams().getFrom(), request.getPageParams().getSize(), + jobResultsProvider.modelSnapshots(request.getJobId(), request.getPageParams().getFrom(), request.getPageParams().getSize(), request.getStart(), request.getEnd(), request.getSort(), request.getDescOrder(), request.getSnapshotId(), page -> { listener.onResponse(new GetModelSnapshotsAction.Response(clearQuantiles(page))); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetRecordsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetRecordsAction.java index b1556ba6e45c7..15a78efd9fda2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetRecordsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetRecordsAction.java @@ -15,23 +15,24 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.action.GetRecordsAction; import org.elasticsearch.xpack.ml.job.JobManager; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.RecordsQueryBuilder; import java.util.function.Supplier; public class TransportGetRecordsAction extends HandledTransportAction { - private final JobProvider jobProvider; + private final JobResultsProvider jobResultsProvider; private final JobManager jobManager; private final Client client; @Inject public TransportGetRecordsAction(Settings settings, TransportService transportService, - ActionFilters actionFilters, JobProvider jobProvider, JobManager jobManager, Client client) { + ActionFilters actionFilters, JobResultsProvider jobResultsProvider, + JobManager jobManager, Client client) { super(settings, GetRecordsAction.NAME, transportService, actionFilters, (Supplier) GetRecordsAction.Request::new); - this.jobProvider = jobProvider; + this.jobResultsProvider = jobResultsProvider; this.jobManager = jobManager; this.client = client; } @@ -50,7 +51,7 @@ protected void doExecute(Task task, GetRecordsAction.Request request, ActionList .recordScore(request.getRecordScoreFilter()) .sortField(request.getSort()) .sortDescending(request.isDescending()); - jobProvider.records(request.getJobId(), query, page -> + jobResultsProvider.records(request.getJobId(), query, page -> listener.onResponse(new GetRecordsAction.Response(page)), listener::onFailure, client); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index d378b19aad57a..019715b30853e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -64,7 +64,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import java.io.IOException; @@ -95,19 +95,20 @@ public class TransportOpenJobAction extends TransportMasterNodeAction { + jobResultsProvider.getEstablishedMemoryUsage(job.getId(), null, null, establishedModelMemory -> { if (establishedModelMemory != null && establishedModelMemory > 0) { JobUpdate update = new JobUpdate.Builder(job.getId()) .setEstablishedModelMemory(establishedModelMemory).build(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java index c1279248908a2..7284a490eaa8f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java @@ -27,7 +27,7 @@ import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.job.JobManager; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import java.io.IOException; import java.util.Collections; @@ -40,16 +40,17 @@ public class TransportPostCalendarEventsAction extends HandledTransportAction { private final Client client; - private final JobProvider jobProvider; + private final JobResultsProvider jobResultsProvider; private final JobManager jobManager; @Inject public TransportPostCalendarEventsAction(Settings settings, TransportService transportService, - ActionFilters actionFilters, Client client, JobProvider jobProvider, JobManager jobManager) { + ActionFilters actionFilters, Client client, + JobResultsProvider jobResultsProvider, JobManager jobManager) { super(settings, PostCalendarEventsAction.NAME, transportService, actionFilters, PostCalendarEventsAction.Request::new); this.client = client; - this.jobProvider = jobProvider; + this.jobResultsProvider = jobResultsProvider; this.jobManager = jobManager; } @@ -92,6 +93,6 @@ public void onFailure(Exception e) { }, listener::onFailure); - jobProvider.calendar(request.getCalendarId(), calendarListener); + jobResultsProvider.calendar(request.getCalendarId(), calendarListener); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java index 68125b8f5cc3c..07b9dade4d8fa 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java @@ -17,8 +17,10 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; @@ -28,7 +30,7 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import java.util.Date; import java.util.function.Consumer; @@ -38,19 +40,19 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio private final Client client; private final JobManager jobManager; - private final JobProvider jobProvider; + private final JobResultsProvider jobResultsProvider; private final JobDataCountsPersister jobDataCountsPersister; @Inject public TransportRevertModelSnapshotAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - JobManager jobManager, JobProvider jobProvider, + JobManager jobManager, JobResultsProvider jobResultsProvider, ClusterService clusterService, Client client, JobDataCountsPersister jobDataCountsPersister) { super(settings, RevertModelSnapshotAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, RevertModelSnapshotAction.Request::new); this.client = client; this.jobManager = jobManager; - this.jobProvider = jobProvider; + this.jobResultsProvider = jobResultsProvider; this.jobDataCountsPersister = jobDataCountsPersister; } @@ -66,17 +68,19 @@ protected RevertModelSnapshotAction.Response newResponse() { @Override protected void masterOperation(RevertModelSnapshotAction.Request request, ClusterState state, - ActionListener listener) throws Exception { + ActionListener listener) { logger.debug("Received request to revert to snapshot id '{}' for job '{}', deleting intervening results: {}", request.getSnapshotId(), request.getJobId(), request.getDeleteInterveningResults()); - Job job = JobManager.getJobOrThrowIfUnknown(request.getJobId(), clusterService.state()); - JobState jobState = jobManager.getJobState(job.getId()); + Job job = JobManager.getJobOrThrowIfUnknown(request.getJobId(), state); + PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + JobState jobState = MlTasks.getJobState(job.getId(), tasks); + if (jobState.equals(JobState.CLOSED) == false) { throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_REVERT)); } - getModelSnapshot(request, jobProvider, modelSnapshot -> { + getModelSnapshot(request, jobResultsProvider, modelSnapshot -> { ActionListener wrappedListener = listener; if (request.getDeleteInterveningResults()) { wrappedListener = wrapDeleteOldDataListener(wrappedListener, modelSnapshot, request.getJobId()); @@ -86,7 +90,7 @@ protected void masterOperation(RevertModelSnapshotAction.Request request, Cluste }, listener::onFailure); } - private void getModelSnapshot(RevertModelSnapshotAction.Request request, JobProvider provider, Consumer handler, + private void getModelSnapshot(RevertModelSnapshotAction.Request request, JobResultsProvider provider, Consumer handler, Consumer errorHandler) { logger.info("Reverting to snapshot '" + request.getSnapshotId() + "'"); @@ -134,7 +138,7 @@ private ActionListener wrapRevertDataCountsL return ActionListener.wrap(response -> { - jobProvider.dataCounts(jobId, counts -> { + jobResultsProvider.dataCounts(jobId, counts -> { counts.setLatestRecordTimeStamp(modelSnapshot.getLatestRecordTimeStamp()); jobDataCountsPersister.persistDataCounts(jobId, counts, new ActionListener() { @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateCalendarJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateCalendarJobAction.java index c7c9488c26825..adb3d35765c03 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateCalendarJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateCalendarJobAction.java @@ -16,20 +16,20 @@ import org.elasticsearch.xpack.core.ml.action.PutCalendarAction; import org.elasticsearch.xpack.core.ml.action.UpdateCalendarJobAction; import org.elasticsearch.xpack.ml.job.JobManager; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import java.util.Set; public class TransportUpdateCalendarJobAction extends HandledTransportAction { - private final JobProvider jobProvider; + private final JobResultsProvider jobResultsProvider; private final JobManager jobManager; @Inject public TransportUpdateCalendarJobAction(Settings settings, TransportService transportService, - ActionFilters actionFilters, JobProvider jobProvider, JobManager jobManager) { + ActionFilters actionFilters, JobResultsProvider jobResultsProvider, JobManager jobManager) { super(settings, UpdateCalendarJobAction.NAME, transportService, actionFilters, UpdateCalendarJobAction.Request::new); - this.jobProvider = jobProvider; + this.jobResultsProvider = jobResultsProvider; this.jobManager = jobManager; } @@ -38,7 +38,7 @@ protected void doExecute(Task task, UpdateCalendarJobAction.Request request, Act Set jobIdsToAdd = Strings.tokenizeByCommaToSet(request.getJobIdsToAddExpression()); Set jobIdsToRemove = Strings.tokenizeByCommaToSet(request.getJobIdsToRemoveExpression()); - jobProvider.updateCalendar(request.getCalendarId(), jobIdsToAdd, jobIdsToRemove, + jobResultsProvider.updateCalendar(request.getCalendarId(), jobIdsToAdd, jobIdsToRemove, c -> { jobManager.updateProcessOnCalendarChanged(c.getJobIds()); listener.onResponse(new PutCalendarAction.Response(c)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateModelSnapshotAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateModelSnapshotAction.java index 8000eaacd4fbe..9517bedc9a0f0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateModelSnapshotAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateModelSnapshotAction.java @@ -27,7 +27,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.results.Result; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import java.io.IOException; import java.util.function.Consumer; @@ -38,15 +38,15 @@ public class TransportUpdateModelSnapshotAction extends HandledTransportAction { - private final JobProvider jobProvider; + private final JobResultsProvider jobResultsProvider; private final Client client; @Inject public TransportUpdateModelSnapshotAction(Settings settings, TransportService transportService, - ActionFilters actionFilters, JobProvider jobProvider, Client client) { + ActionFilters actionFilters, JobResultsProvider jobResultsProvider, Client client) { super(settings, UpdateModelSnapshotAction.NAME, transportService, actionFilters, UpdateModelSnapshotAction.Request::new); - this.jobProvider = jobProvider; + this.jobResultsProvider = jobResultsProvider; this.client = client; } @@ -54,7 +54,7 @@ public TransportUpdateModelSnapshotAction(Settings settings, TransportService tr protected void doExecute(Task task, UpdateModelSnapshotAction.Request request, ActionListener listener) { logger.debug("Received request to update model snapshot [{}] for job [{}]", request.getSnapshotId(), request.getJobId()); - jobProvider.getModelSnapshot(request.getJobId(), request.getSnapshotId(), modelSnapshot -> { + jobResultsProvider.getModelSnapshot(request.getJobId(), request.getSnapshotId(), modelSnapshot -> { if (modelSnapshot == null) { listener.onFailure(new ResourceNotFoundException(Messages.getMessage( Messages.REST_NO_SUCH_MODEL_SNAPSHOT, request.getSnapshotId(), request.getJobId()))); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java index ecc6573e7bb87..efe332346efec 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java @@ -18,7 +18,7 @@ import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.Result; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.notifications.Auditor; import java.util.Collections; @@ -29,13 +29,13 @@ public class DatafeedJobBuilder { private final Client client; - private final JobProvider jobProvider; + private final JobResultsProvider jobResultsProvider; private final Auditor auditor; private final Supplier currentTimeSupplier; - public DatafeedJobBuilder(Client client, JobProvider jobProvider, Auditor auditor, Supplier currentTimeSupplier) { + public DatafeedJobBuilder(Client client, JobResultsProvider jobResultsProvider, Auditor auditor, Supplier currentTimeSupplier) { this.client = client; - this.jobProvider = Objects.requireNonNull(jobProvider); + this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider); this.auditor = Objects.requireNonNull(auditor); this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier); } @@ -79,7 +79,7 @@ void build(Job job, DatafeedConfig datafeed, ActionListener listene TimeValue bucketSpan = job.getAnalysisConfig().getBucketSpan(); context.latestFinalBucketEndMs = buckets.results().get(0).getTimestamp().getTime() + bucketSpan.millis() - 1; } - jobProvider.dataCounts(job.getId(), dataCountsHandler, listener::onFailure); + jobResultsProvider.dataCounts(job.getId(), dataCountsHandler, listener::onFailure); }; // Step 1. Collect latest bucket @@ -87,7 +87,7 @@ void build(Job job, DatafeedConfig datafeed, ActionListener listene .sortField(Result.TIMESTAMP.getPreferredName()) .sortDescending(true).size(1) .includeInterim(false); - jobProvider.bucketsViaInternalClient(job.getId(), latestBucketQuery, bucketsHandler, e -> { + jobResultsProvider.bucketsViaInternalClient(job.getId(), latestBucketQuery, bucketsHandler, e -> { if (e instanceof ResourceNotFoundException) { QueryPage empty = new QueryPage<>(Collections.emptyList(), 0, Bucket.RESULT_TYPE_FIELD); bucketsHandler.accept(empty); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index fcc9151b755a6..c18c2c940a4c6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -52,7 +52,7 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams; import org.elasticsearch.xpack.ml.notifications.Auditor; @@ -86,7 +86,7 @@ public class JobManager extends AbstractComponent { new DeprecationLogger(Loggers.getLogger(JobManager.class)); private final Environment environment; - private final JobProvider jobProvider; + private final JobResultsProvider jobResultsProvider; private final ClusterService clusterService; private final Auditor auditor; private final Client client; @@ -97,11 +97,12 @@ public class JobManager extends AbstractComponent { /** * Create a JobManager */ - public JobManager(Environment environment, Settings settings, JobProvider jobProvider, ClusterService clusterService, Auditor auditor, + public JobManager(Environment environment, Settings settings, JobResultsProvider jobResultsProvider, + ClusterService clusterService, Auditor auditor, Client client, UpdateJobProcessNotifier updateJobProcessNotifier) { super(settings); this.environment = environment; - this.jobProvider = Objects.requireNonNull(jobProvider); + this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider); this.clusterService = Objects.requireNonNull(clusterService); this.auditor = Objects.requireNonNull(auditor); this.client = Objects.requireNonNull(client); @@ -167,11 +168,6 @@ public QueryPage expandJobs(String expression, boolean allowNoJobs, Cluster return new QueryPage<>(jobs, jobs.size(), Job.RESULTS_FIELD); } - public JobState getJobState(String jobId) { - PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - return MlTasks.getJobState(jobId, tasks); - } - /** * Validate the char filter/tokenizer/token filter names used in the categorization analyzer config (if any). * This validation has to be done server-side; it cannot be done in a client as that won't have loaded the @@ -249,12 +245,12 @@ public void onFailure(Exception e) { ActionListener checkForLeftOverDocs = ActionListener.wrap( response -> { - jobProvider.createJobResultIndex(job, state, putJobListener); + jobResultsProvider.createJobResultIndex(job, state, putJobListener); }, actionListener::onFailure ); - jobProvider.checkForLeftOverDocuments(job, checkForLeftOverDocs); + jobResultsProvider.checkForLeftOverDocuments(job, checkForLeftOverDocs); } public void updateJob(UpdateJobAction.Request request, ActionListener actionListener) { @@ -275,14 +271,14 @@ private void validate(JobUpdate jobUpdate, Job job, ActionListener handler private void validateModelSnapshotIdUpdate(Job job, String modelSnapshotId, ChainTaskExecutor chainTaskExecutor) { if (modelSnapshotId != null) { chainTaskExecutor.add(listener -> { - jobProvider.getModelSnapshot(job.getId(), modelSnapshotId, newModelSnapshot -> { + jobResultsProvider.getModelSnapshot(job.getId(), modelSnapshotId, newModelSnapshot -> { if (newModelSnapshot == null) { String message = Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, modelSnapshotId, job.getId()); listener.onFailure(new ResourceNotFoundException(message)); return; } - jobProvider.getModelSnapshot(job.getId(), job.getModelSnapshotId(), oldModelSnapshot -> { + jobResultsProvider.getModelSnapshot(job.getId(), job.getModelSnapshotId(), oldModelSnapshot -> { if (oldModelSnapshot != null && newModelSnapshot.result.getTimestamp().before(oldModelSnapshot.result.getTimestamp())) { String message = "Job [" + job.getId() + "] has a more recent model snapshot [" + @@ -307,7 +303,7 @@ private void validateAnalysisLimitsUpdate(Job job, AnalysisLimits newLimits, Cha + " while the job is open")); return; } - jobProvider.modelSizeStats(job.getId(), modelSizeStats -> { + jobResultsProvider.modelSizeStats(job.getId(), modelSizeStats -> { if (modelSizeStats != null) { ByteSizeValue modelSize = new ByteSizeValue(modelSizeStats.getModelBytes(), ByteSizeUnit.BYTES); if (newModelMemoryLimit < modelSize.getMb()) { @@ -539,7 +535,7 @@ public ClusterState execute(ClusterState currentState) { // Step 2. Remove the job from any calendars CheckedConsumer removeFromCalendarsHandler = response -> { - jobProvider.removeJobFromCalendars(jobId, ActionListener.wrap(deleteJobStateHandler::accept, + jobResultsProvider.removeJobFromCalendars(jobId, ActionListener.wrap(deleteJobStateHandler::accept, actionListener::onFailure )); }; @@ -607,7 +603,7 @@ public ClusterState execute(ClusterState currentState) { // Step 0. Find the appropriate established model memory for the reverted job // ------- - jobProvider.getEstablishedMemoryUsage(request.getJobId(), modelSizeStats.getTimestamp(), modelSizeStats, clusterStateHandler, + jobResultsProvider.getEstablishedMemoryUsage(request.getJobId(), modelSizeStats.getTimestamp(), modelSizeStats, clusterStateHandler, actionListener::onFailure); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java similarity index 99% rename from x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java rename to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java index 7513cb5a5bbc0..e850d737d3183 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java @@ -129,8 +129,8 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin; -public class JobProvider { - private static final Logger LOGGER = Loggers.getLogger(JobProvider.class); +public class JobResultsProvider { + private static final Logger LOGGER = Loggers.getLogger(JobResultsProvider.class); private static final int RECORDS_SIZE_PARAM = 10000; public static final int BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE = 20; @@ -139,7 +139,7 @@ public class JobProvider { private final Client client; private final Settings settings; - public JobProvider(Client client, Settings settings) { + public JobResultsProvider(Client client, Settings settings) { this.client = Objects.requireNonNull(client); this.settings = settings; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 77e7fe1471611..063ab3b49d146 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -44,7 +44,7 @@ import org.elasticsearch.xpack.ml.action.TransportOpenJobAction.JobTask; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder; @@ -112,7 +112,7 @@ public class AutodetectProcessManager extends AbstractComponent { private final Environment environment; private final ThreadPool threadPool; private final JobManager jobManager; - private final JobProvider jobProvider; + private final JobResultsProvider jobResultsProvider; private final AutodetectProcessFactory autodetectProcessFactory; private final NormalizerFactory normalizerFactory; @@ -132,7 +132,7 @@ public class AutodetectProcessManager extends AbstractComponent { private final Auditor auditor; public AutodetectProcessManager(Environment environment, Settings settings, Client client, ThreadPool threadPool, - JobManager jobManager, JobProvider jobProvider, JobResultsPersister jobResultsPersister, + JobManager jobManager, JobResultsProvider jobResultsProvider, JobResultsPersister jobResultsPersister, JobDataCountsPersister jobDataCountsPersister, AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory, NamedXContentRegistry xContentRegistry, Auditor auditor) { @@ -145,7 +145,7 @@ public AutodetectProcessManager(Environment environment, Settings settings, Clie this.autodetectProcessFactory = autodetectProcessFactory; this.normalizerFactory = normalizerFactory; this.jobManager = jobManager; - this.jobProvider = jobProvider; + this.jobResultsProvider = jobResultsProvider; this.jobResultsPersister = jobResultsPersister; this.jobDataCountsPersister = jobDataCountsPersister; this.auditor = auditor; @@ -362,7 +362,7 @@ public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams Job job = jobManager.getJobOrThrowIfUnknown(jobTask.getJobId()); DataCounts dataCounts = getStatistics(jobTask).get().v1(); ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().start(job.earliestValidTimestamp(dataCounts)); - jobProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener); + jobResultsProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener); } else { eventsListener.onResponse(null); } @@ -403,7 +403,7 @@ public void openJob(JobTask jobTask, Consumer handler) { logger.info("Opening job [{}]", jobId); processByAllocation.putIfAbsent(jobTask.getAllocationId(), new ProcessContext(jobTask)); - jobProvider.getAutodetectParams(job, params -> { + jobResultsProvider.getAutodetectParams(job, params -> { // We need to fork, otherwise we restore model state from a network thread (several GET api calls): threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { @Override @@ -495,7 +495,7 @@ AutodetectCommunicator create(JobTask jobTask, AutodetectParams autodetectParams ExecutorService autoDetectExecutorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME); DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, autodetectParams.dataCounts(), jobDataCountsPersister); - ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobProvider, + ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobResultsProvider, new JobRenormalizedResultsPersister(job.getId(), settings, client), normalizerFactory); ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME); Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater, @@ -504,7 +504,7 @@ AutodetectCommunicator create(JobTask jobTask, AutodetectParams autodetectParams AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams, autoDetectExecutorService, onProcessCrash(jobTask)); AutoDetectResultProcessor processor = new AutoDetectResultProcessor( - client, auditor, jobId, renormalizer, jobResultsPersister, jobProvider, autodetectParams.modelSizeStats(), + client, auditor, jobId, renormalizer, jobResultsPersister, jobResultsProvider, autodetectParams.modelSizeStats(), autodetectParams.modelSnapshot() != null); ExecutorService autodetectWorkerExecutor; try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index da5e70112f045..6bec0cad1e52f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -34,7 +34,7 @@ import org.elasticsearch.xpack.core.ml.job.results.Influencer; import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; @@ -88,7 +88,7 @@ public class AutoDetectResultProcessor { private final String jobId; private final Renormalizer renormalizer; private final JobResultsPersister persister; - private final JobProvider jobProvider; + private final JobResultsProvider jobResultsProvider; private final boolean restoredSnapshot; final CountDownLatch completionLatch = new CountDownLatch(1); @@ -107,20 +107,22 @@ public class AutoDetectResultProcessor { private volatile boolean haveNewLatestModelSizeStats; private Future scheduledEstablishedModelMemoryUpdate; // only accessed in synchronized methods - public AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer, JobResultsPersister persister, - JobProvider jobProvider, ModelSizeStats latestModelSizeStats, boolean restoredSnapshot) { - this(client, auditor, jobId, renormalizer, persister, jobProvider, latestModelSizeStats, restoredSnapshot, new FlushListener()); + public AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer, + JobResultsPersister persister, JobResultsProvider jobResultsProvider, + ModelSizeStats latestModelSizeStats, boolean restoredSnapshot) { + this(client, auditor, jobId, renormalizer, persister, jobResultsProvider, latestModelSizeStats, + restoredSnapshot, new FlushListener()); } AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer, JobResultsPersister persister, - JobProvider jobProvider, ModelSizeStats latestModelSizeStats, boolean restoredSnapshot, + JobResultsProvider jobResultsProvider, ModelSizeStats latestModelSizeStats, boolean restoredSnapshot, FlushListener flushListener) { this.client = Objects.requireNonNull(client); this.auditor = Objects.requireNonNull(auditor); this.jobId = Objects.requireNonNull(jobId); this.renormalizer = Objects.requireNonNull(renormalizer); this.persister = Objects.requireNonNull(persister); - this.jobProvider = Objects.requireNonNull(jobProvider); + this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider); this.flushListener = Objects.requireNonNull(flushListener); this.latestModelSizeStats = Objects.requireNonNull(latestModelSizeStats); this.restoredSnapshot = restoredSnapshot; @@ -214,7 +216,7 @@ void processResult(Context context, AutodetectResult result) { // if we haven't previously set established model memory, consider trying again after // a reasonable number of buckets have elapsed since the last model size stats update - long minEstablishedTimespanMs = JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE * bucket.getBucketSpan() * 1000L; + long minEstablishedTimespanMs = JobResultsProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE * bucket.getBucketSpan() * 1000L; if (haveNewLatestModelSizeStats && latestEstablishedModelMemory == 0 && latestDateForEstablishedModelMemoryCalc.getTime() > latestModelSizeStats.getTimestamp().getTime() + minEstablishedTimespanMs) { scheduleEstablishedModelMemoryUpdate(ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY); @@ -314,7 +316,7 @@ private void processModelSizeStats(Context context, ModelSizeStats modelSizeStat // This is a crude way to NOT refresh the index and NOT attempt to update established model memory during the first 20 buckets // because this is when the model size stats are likely to be least stable and lots of updates will be coming through, and // we'll NEVER consider memory usage to be established during this period - if (restoredSnapshot || bucketCount >= JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE) { + if (restoredSnapshot || bucketCount >= JobResultsProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE) { scheduleEstablishedModelMemoryUpdate(ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY); } } @@ -429,7 +431,7 @@ private void updateEstablishedModelMemoryOnJob() { // We need to make all results written up to and including these stats available for the established memory calculation persister.commitResultWrites(jobId); - jobProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStatsForCalc, establishedModelMemory -> { + jobResultsProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStatsForCalc, establishedModelMemory -> { if (latestEstablishedModelMemory != establishedModelMemory) { JobUpdate update = new JobUpdate.Builder(jobId).setEstablishedModelMemory(establishedModelMemory).build(); UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java index ccda255206478..cfb5660c911b5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java @@ -13,7 +13,7 @@ import org.elasticsearch.xpack.core.ml.job.results.Influencer; import org.elasticsearch.xpack.core.ml.job.results.Result; import org.elasticsearch.xpack.ml.job.persistence.BatchedDocumentsIterator; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister; import java.util.ArrayList; @@ -43,17 +43,17 @@ public class ScoresUpdater { private static final long MILLISECONDS_IN_SECOND = 1000; private final String jobId; - private final JobProvider jobProvider; + private final JobResultsProvider jobResultsProvider; private final JobRenormalizedResultsPersister updatesPersister; private final NormalizerFactory normalizerFactory; private int bucketSpan; private long normalizationWindow; private volatile boolean shutdown; - public ScoresUpdater(Job job, JobProvider jobProvider, JobRenormalizedResultsPersister jobRenormalizedResultsPersister, + public ScoresUpdater(Job job, JobResultsProvider jobResultsProvider, JobRenormalizedResultsPersister jobRenormalizedResultsPersister, NormalizerFactory normalizerFactory) { jobId = job.getId(); - this.jobProvider = Objects.requireNonNull(jobProvider); + this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider); updatesPersister = Objects.requireNonNull(jobRenormalizedResultsPersister); this.normalizerFactory = Objects.requireNonNull(normalizerFactory); bucketSpan = ((Long) job.getAnalysisConfig().getBucketSpan().seconds()).intValue(); @@ -96,7 +96,7 @@ public void update(String quantilesState, long endBucketEpochMs, long windowExte private void updateBuckets(Normalizer normalizer, String quantilesState, long endBucketEpochMs, long windowExtensionMs, int[] counts, boolean perPartitionNormalization) { BatchedDocumentsIterator> bucketsIterator = - jobProvider.newBatchedBucketsIterator(jobId) + jobResultsProvider.newBatchedBucketsIterator(jobId) .timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs) .includeInterim(false); @@ -145,7 +145,7 @@ private void normalizeBuckets(Normalizer normalizer, List no private void updateRecords(Normalizer normalizer, String quantilesState, long endBucketEpochMs, long windowExtensionMs, int[] counts, boolean perPartitionNormalization) { - BatchedDocumentsIterator> recordsIterator = jobProvider.newBatchedRecordsIterator(jobId) + BatchedDocumentsIterator> recordsIterator = jobResultsProvider.newBatchedRecordsIterator(jobId) .timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs) .includeInterim(false); @@ -168,7 +168,7 @@ private void updateRecords(Normalizer normalizer, String quantilesState, long en private void updateInfluencers(Normalizer normalizer, String quantilesState, long endBucketEpochMs, long windowExtensionMs, int[] counts, boolean perPartitionNormalization) { - BatchedDocumentsIterator> influencersIterator = jobProvider.newBatchedInfluencersIterator(jobId) + BatchedDocumentsIterator> influencersIterator = jobResultsProvider.newBatchedInfluencersIterator(jobId) .timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs) .includeInterim(false); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java index 6dd52626f7de1..f37deef12d083 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java @@ -19,7 +19,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.results.Bucket; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.junit.Before; @@ -41,7 +41,7 @@ public class DatafeedJobBuilderTests extends ESTestCase { private Client client; private Auditor auditor; - private JobProvider jobProvider; + private JobResultsProvider jobResultsProvider; private Consumer taskHandler; private DatafeedJobBuilder datafeedJobBuilder; @@ -54,9 +54,9 @@ public void init() { when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); when(client.settings()).thenReturn(Settings.EMPTY); auditor = mock(Auditor.class); - jobProvider = mock(JobProvider.class); + jobResultsProvider = mock(JobResultsProvider.class); taskHandler = mock(Consumer.class); - datafeedJobBuilder = new DatafeedJobBuilder(client, jobProvider, auditor, System::currentTimeMillis); + datafeedJobBuilder = new DatafeedJobBuilder(client, jobResultsProvider, auditor, System::currentTimeMillis); Mockito.doAnswer(invocationOnMock -> { String jobId = (String) invocationOnMock.getArguments()[0]; @@ -64,14 +64,14 @@ public void init() { Consumer handler = (Consumer) invocationOnMock.getArguments()[1]; handler.accept(new DataCounts(jobId)); return null; - }).when(jobProvider).dataCounts(any(), any(), any()); + }).when(jobResultsProvider).dataCounts(any(), any(), any()); doAnswer(invocationOnMock -> { @SuppressWarnings("unchecked") Consumer consumer = (Consumer) invocationOnMock.getArguments()[3]; consumer.accept(new ResourceNotFoundException("dummy")); return null; - }).when(jobProvider).bucketsViaInternalClient(any(), any(), any(), any()); + }).when(jobResultsProvider).bucketsViaInternalClient(any(), any(), any(), any()); } public void testBuild_GivenScrollDatafeedAndNewJob() throws Exception { @@ -157,7 +157,7 @@ public void testBuild_GivenBucketsRequestFails() { Consumer consumer = (Consumer) invocationOnMock.getArguments()[3]; consumer.accept(error); return null; - }).when(jobProvider).bucketsViaInternalClient(any(), any(), any(), any()); + }).when(jobResultsProvider).bucketsViaInternalClient(any(), any(), any(), any()); datafeedJobBuilder.build(jobBuilder.build(new Date()), datafeed, ActionListener.wrap(datafeedJob -> fail(), taskHandler)); @@ -173,7 +173,7 @@ private void givenLatestTimes(long latestRecordTimestamp, long latestBucketTimes dataCounts.setLatestRecordTimeStamp(new Date(latestRecordTimestamp)); handler.accept(dataCounts); return null; - }).when(jobProvider).dataCounts(any(), any(), any()); + }).when(jobResultsProvider).dataCounts(any(), any(), any()); doAnswer(invocationOnMock -> { @SuppressWarnings("unchecked") @@ -183,6 +183,6 @@ private void givenLatestTimes(long latestRecordTimestamp, long latestBucketTimes QueryPage bucketQueryPage = new QueryPage(Collections.singletonList(bucket), 1, Bucket.RESULTS_FIELD); consumer.accept(bucketQueryPage); return null; - }).when(jobProvider).bucketsViaInternalClient(any(), any(), any(), any()); + }).when(jobResultsProvider).bucketsViaInternalClient(any(), any(), any(), any()); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index 09bb3f7591677..8f6005a216115 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -36,7 +36,7 @@ import org.elasticsearch.xpack.ml.MlSingleNodeTestCase; import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.RecordsQueryBuilder; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; @@ -72,7 +72,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase { private static final String JOB_ID = "autodetect-result-processor-it-job"; - private JobProvider jobProvider; + private JobResultsProvider jobResultsProvider; private List capturedUpdateModelSnapshotOnJobRequests; private AutoDetectResultProcessor resultProcessor; private Renormalizer renormalizer; @@ -98,11 +98,11 @@ public void createComponents() throws Exception { Settings.Builder builder = Settings.builder() .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1)); Auditor auditor = new Auditor(client(), "test_node"); - jobProvider = new JobProvider(client(), builder.build()); + jobResultsProvider = new JobResultsProvider(client(), builder.build()); renormalizer = mock(Renormalizer.class); capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>(); resultProcessor = new AutoDetectResultProcessor(client(), auditor, JOB_ID, renormalizer, - new JobResultsPersister(nodeSettings(), client()), jobProvider, new ModelSizeStats.Builder(JOB_ID).build(), false) { + new JobResultsPersister(nodeSettings(), client()), jobResultsProvider, new ModelSizeStats.Builder(JOB_ID).build(), false) { @Override protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) { capturedUpdateModelSnapshotOnJobRequests.add(modelSnapshot); @@ -159,7 +159,7 @@ public void testProcessResults() throws Exception { assertEquals(1, persistedDefinition.count()); assertEquals(categoryDefinition, persistedDefinition.results().get(0)); - QueryPage persistedModelPlot = jobProvider.modelPlot(JOB_ID, 0, 100); + QueryPage persistedModelPlot = jobResultsProvider.modelPlot(JOB_ID, 0, 100); assertEquals(1, persistedModelPlot.count()); assertEquals(modelPlot, persistedModelPlot.results().get(0)); @@ -443,7 +443,7 @@ private QueryPage getBucketQueryPage(BucketsQueryBuilder bucketsQuery) t AtomicReference errorHolder = new AtomicReference<>(); AtomicReference> resultHolder = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - jobProvider.buckets(JOB_ID, bucketsQuery, r -> { + jobResultsProvider.buckets(JOB_ID, bucketsQuery, r -> { resultHolder.set(r); latch.countDown(); }, e -> { @@ -461,7 +461,7 @@ private QueryPage getCategoryDefinition(long categoryId) thr AtomicReference errorHolder = new AtomicReference<>(); AtomicReference> resultHolder = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - jobProvider.categoryDefinitions(JOB_ID, categoryId, false, null, null, r -> { + jobResultsProvider.categoryDefinitions(JOB_ID, categoryId, false, null, null, r -> { resultHolder.set(r); latch.countDown(); }, e -> { @@ -479,7 +479,7 @@ private ModelSizeStats getModelSizeStats() throws Exception { AtomicReference errorHolder = new AtomicReference<>(); AtomicReference resultHolder = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - jobProvider.modelSizeStats(JOB_ID, modelSizeStats -> { + jobResultsProvider.modelSizeStats(JOB_ID, modelSizeStats -> { resultHolder.set(modelSizeStats); latch.countDown(); }, e -> { @@ -497,7 +497,7 @@ private QueryPage getInfluencers() throws Exception { AtomicReference errorHolder = new AtomicReference<>(); AtomicReference> resultHolder = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - jobProvider.influencers(JOB_ID, new InfluencersQueryBuilder().build(), page -> { + jobResultsProvider.influencers(JOB_ID, new InfluencersQueryBuilder().build(), page -> { resultHolder.set(page); latch.countDown(); }, e -> { @@ -515,7 +515,7 @@ private QueryPage getRecords(RecordsQueryBuilder recordsQuery) th AtomicReference errorHolder = new AtomicReference<>(); AtomicReference> resultHolder = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - jobProvider.records(JOB_ID, recordsQuery, page -> { + jobResultsProvider.records(JOB_ID, recordsQuery, page -> { resultHolder.set(page); latch.countDown(); }, e -> { @@ -533,7 +533,7 @@ private QueryPage getModelSnapshots() throws Exception { AtomicReference errorHolder = new AtomicReference<>(); AtomicReference> resultHolder = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - jobProvider.modelSnapshots(JOB_ID, 0, 100, page -> { + jobResultsProvider.modelSnapshots(JOB_ID, 0, 100, page -> { resultHolder.set(page); latch.countDown(); }, e -> { @@ -551,7 +551,7 @@ private Optional getQuantiles() throws Exception { AtomicReference errorHolder = new AtomicReference<>(); AtomicReference> resultHolder = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - jobProvider.getAutodetectParams(JobTests.buildJobBuilder(JOB_ID).build(),params -> { + jobResultsProvider.getAutodetectParams(JobTests.buildJobBuilder(JOB_ID).build(), params -> { resultHolder.set(Optional.ofNullable(params.quantiles())); latch.countDown(); }, e -> { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java index b8e04a8922c5d..df3af13f71403 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java @@ -11,7 +11,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.results.Bucket; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.junit.Before; @@ -26,13 +26,13 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase { private long bucketSpan = AnalysisConfig.Builder.DEFAULT_BUCKET_SPAN.getMillis(); - private JobProvider jobProvider; + private JobResultsProvider jobResultsProvider; private JobResultsPersister jobResultsPersister; @Before public void createComponents() { Settings settings = nodeSettings(0); - jobProvider = new JobProvider(client(), settings); + jobResultsProvider = new JobResultsProvider(client(), settings); jobResultsPersister = new JobResultsPersister(settings, client()); } @@ -251,7 +251,7 @@ private Long queryEstablishedMemoryUsage(String jobId, Integer bucketNum, ModelS CountDownLatch latch = new CountDownLatch(1); Date latestBucketTimestamp = (bucketNum != null) ? new Date(bucketSpan * bucketNum) : null; - jobProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, latestModelSizeStats, memUse -> { + jobResultsProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, latestModelSizeStats, memUse -> { establishedModelMemoryUsage.set(memUse); latch.countDown(); }, e -> { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java similarity index 99% rename from x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobProviderIT.java rename to x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java index 856b930ac49b5..303fce3d81321 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java @@ -45,7 +45,7 @@ import org.elasticsearch.xpack.ml.MlSingleNodeTestCase; import org.elasticsearch.xpack.ml.job.persistence.CalendarQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; @@ -72,9 +72,9 @@ import static org.hamcrest.core.Is.is; -public class JobProviderIT extends MlSingleNodeTestCase { +public class JobResultsProviderIT extends MlSingleNodeTestCase { - private JobProvider jobProvider; + private JobResultsProvider jobProvider; @Override protected Settings nodeSettings() { @@ -95,7 +95,7 @@ protected Collection> getPlugins() { public void createComponents() throws Exception { Settings.Builder builder = Settings.builder() .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1)); - jobProvider = new JobProvider(client(), builder.build()); + jobProvider = new JobResultsProvider(client(), builder.build()); waitForMlTemplates(); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index e15b5828df96a..a9162cb2ae4df 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -34,7 +34,7 @@ import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.config.RuleScope; import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzerTests; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.junit.Before; @@ -68,7 +68,7 @@ public class JobManagerTests extends ESTestCase { private AnalysisRegistry analysisRegistry; private Client client; private ClusterService clusterService; - private JobProvider jobProvider; + private JobResultsProvider jobResultsProvider; private Auditor auditor; private UpdateJobProcessNotifier updateJobProcessNotifier; @@ -79,7 +79,7 @@ public void setup() throws Exception { analysisRegistry = CategorizationAnalyzerTests.buildTestAnalysisRegistry(environment); client = mock(Client.class); clusterService = mock(ClusterService.class); - jobProvider = mock(JobProvider.class); + jobResultsProvider = mock(JobResultsProvider.class); auditor = mock(Auditor.class); updateJobProcessNotifier = mock(UpdateJobProcessNotifier.class); } @@ -131,7 +131,7 @@ public void testPutJob_AddsCreateTime() throws IOException { ActionListener listener = (ActionListener) invocation.getArguments()[2]; listener.onResponse(true); return null; - }).when(jobProvider).createJobResultIndex(requestCaptor.capture(), any(ClusterState.class), any(ActionListener.class)); + }).when(jobResultsProvider).createJobResultIndex(requestCaptor.capture(), any(ClusterState.class), any(ActionListener.class)); ClusterState clusterState = createClusterState(); @@ -404,7 +404,8 @@ private JobManager createJobManager() { ClusterSettings clusterSettings = new ClusterSettings(environment.settings(), Collections.singleton(MachineLearningField.MAX_MODEL_MEMORY_LIMIT)); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); - return new JobManager(environment, environment.settings(), jobProvider, clusterService, auditor, client, updateJobProcessNotifier); + return new JobManager(environment, environment.settings(), jobResultsProvider, clusterService, + auditor, client, updateJobProcessNotifier); } private ClusterState createClusterState() { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java similarity index 95% rename from x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java rename to x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java index e33dbc69db607..2c0681c33f22a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java @@ -72,7 +72,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class JobProviderTests extends ESTestCase { +public class JobResultsProviderTests extends ESTestCase { private static final String CLUSTER_NAME = "myCluster"; @SuppressWarnings("unchecked") @@ -87,7 +87,7 @@ public void testCreateJobResultsIndex() { clientBuilder.prepareAlias(resultsIndexName, AnomalyDetectorsIndex.resultsWriteAlias("foo")); Job.Builder job = buildJobBuilder("foo"); - JobProvider provider = createProvider(clientBuilder.build()); + JobResultsProvider provider = createProvider(clientBuilder.build()); AtomicReference resultHolder = new AtomicReference<>(); ClusterState cs = ClusterState.builder(new ClusterName("_name")) @@ -141,7 +141,7 @@ public void testCreateJobWithExistingIndex() { Job.Builder job = buildJobBuilder("foo123"); job.setResultsIndexName("foo"); - JobProvider provider = createProvider(clientBuilder.build()); + JobResultsProvider provider = createProvider(clientBuilder.build()); Index index = mock(Index.class); when(index.getName()).thenReturn(AnomalyDetectorsIndex.jobResultsAliasedName("foo")); @@ -202,7 +202,7 @@ public void testCreateJobRelatedIndicies_createsAliasBecauseIndexNameIsSet() { Job.Builder job = buildJobBuilder("foo"); job.setResultsIndexName("bar"); Client client = clientBuilder.build(); - JobProvider provider = createProvider(client); + JobResultsProvider provider = createProvider(client); ImmutableOpenMap indexMap = ImmutableOpenMap.builder().build(); @@ -248,7 +248,7 @@ public void testBuckets_OneBucketNoInterim() throws IOException { int from = 0; int size = 10; Client client = getMockedClient(queryBuilder -> queryBuilderHolder[0] = queryBuilder, response); - JobProvider provider = createProvider(client); + JobResultsProvider provider = createProvider(client); BucketsQueryBuilder bq = new BucketsQueryBuilder().from(from).size(size).anomalyScoreThreshold(1.0); @@ -281,7 +281,7 @@ public void testBuckets_OneBucketInterim() throws IOException { int size = 17; Client client = getMockedClient(queryBuilder -> queryBuilderHolder[0] = queryBuilder, response); - JobProvider provider = createProvider(client); + JobResultsProvider provider = createProvider(client); BucketsQueryBuilder bq = new BucketsQueryBuilder().from(from).size(size).anomalyScoreThreshold(5.1) .includeInterim(true); @@ -314,7 +314,7 @@ public void testBuckets_UsingBuilder() throws IOException { int size = 17; Client client = getMockedClient(queryBuilder -> queryBuilderHolder[0] = queryBuilder, response); - JobProvider provider = createProvider(client); + JobResultsProvider provider = createProvider(client); BucketsQueryBuilder bq = new BucketsQueryBuilder(); bq.from(from); @@ -341,7 +341,7 @@ public void testBucket_NoBucketNoExpand() throws IOException { SearchResponse response = createSearchResponse(source); Client client = getMockedClient(queryBuilder -> {}, response); - JobProvider provider = createProvider(client); + JobResultsProvider provider = createProvider(client); BucketsQueryBuilder bq = new BucketsQueryBuilder(); bq.timestamp(Long.toString(timestamp)); @@ -363,7 +363,7 @@ public void testBucket_OneBucketNoExpand() throws IOException { SearchResponse response = createSearchResponse(source); Client client = getMockedClient(queryBuilder -> {}, response); - JobProvider provider = createProvider(client); + JobResultsProvider provider = createProvider(client); BucketsQueryBuilder bq = new BucketsQueryBuilder(); bq.timestamp(Long.toString(now.getTime())); @@ -403,7 +403,7 @@ public void testRecords() throws IOException { String sortfield = "minefield"; SearchResponse response = createSearchResponse(source); Client client = getMockedClient(qb -> {}, response); - JobProvider provider = createProvider(client); + JobResultsProvider provider = createProvider(client); RecordsQueryBuilder rqb = new RecordsQueryBuilder().from(from).size(size).epochStart(String.valueOf(now.getTime())) .epochEnd(String.valueOf(now.getTime())).includeInterim(true).sortField(sortfield) @@ -451,7 +451,7 @@ public void testRecords_UsingBuilder() throws IOException { SearchResponse response = createSearchResponse(source); Client client = getMockedClient(qb -> {}, response); - JobProvider provider = createProvider(client); + JobResultsProvider provider = createProvider(client); RecordsQueryBuilder rqb = new RecordsQueryBuilder(); rqb.from(from); @@ -505,7 +505,7 @@ public void testBucketRecords() throws IOException { String sortfield = "minefield"; SearchResponse response = createSearchResponse(source); Client client = getMockedClient(qb -> {}, response); - JobProvider provider = createProvider(client); + JobResultsProvider provider = createProvider(client); @SuppressWarnings({"unchecked"}) QueryPage[] holder = new QueryPage[1]; @@ -542,7 +542,7 @@ public void testexpandBucket() throws IOException { SearchResponse response = createSearchResponse(source); Client client = getMockedClient(qb -> {}, response); - JobProvider provider = createProvider(client); + JobResultsProvider provider = createProvider(client); Integer[] holder = new Integer[1]; provider.expandBucket(jobId, false, bucket, records -> holder[0] = records, RuntimeException::new, client); @@ -567,7 +567,7 @@ public void testCategoryDefinitions() throws IOException { int size = 10; Client client = getMockedClient(q -> {}, response); - JobProvider provider = createProvider(client); + JobResultsProvider provider = createProvider(client); @SuppressWarnings({"unchecked"}) QueryPage[] holder = new QueryPage[1]; provider.categoryDefinitions(jobId, null, false, from, size, r -> holder[0] = r, @@ -589,7 +589,7 @@ public void testCategoryDefinition() throws IOException { SearchResponse response = createSearchResponse(Collections.singletonList(source)); Client client = getMockedClient(q -> {}, response); - JobProvider provider = createProvider(client); + JobResultsProvider provider = createProvider(client); @SuppressWarnings({"unchecked"}) QueryPage[] holder = new QueryPage[1]; provider.categoryDefinitions(jobId, categoryId, false, null, null, @@ -630,7 +630,7 @@ public void testInfluencers_NoInterim() throws IOException { QueryBuilder[] qbHolder = new QueryBuilder[1]; SearchResponse response = createSearchResponse(source); Client client = getMockedClient(q -> qbHolder[0] = q, response); - JobProvider provider = createProvider(client); + JobResultsProvider provider = createProvider(client); @SuppressWarnings({"unchecked"}) QueryPage[] holder = new QueryPage[1]; @@ -690,7 +690,7 @@ public void testInfluencers_WithInterim() throws IOException { QueryBuilder[] qbHolder = new QueryBuilder[1]; SearchResponse response = createSearchResponse(source); Client client = getMockedClient(q -> qbHolder[0] = q, response); - JobProvider provider = createProvider(client); + JobResultsProvider provider = createProvider(client); @SuppressWarnings({"unchecked"}) QueryPage[] holder = new QueryPage[1]; @@ -745,7 +745,7 @@ public void testModelSnapshots() throws IOException { int size = 3; SearchResponse response = createSearchResponse(source); Client client = getMockedClient(qb -> {}, response); - JobProvider provider = createProvider(client); + JobResultsProvider provider = createProvider(client); @SuppressWarnings({"unchecked"}) QueryPage[] holder = new QueryPage[1]; @@ -783,11 +783,11 @@ public void testViolatedFieldCountLimit() throws Exception { MetaData metaData = MetaData.builder() .put(indexMetaData1) .build(); - boolean result = JobProvider.violatedFieldCountLimit("index1", 0, 10, + boolean result = JobResultsProvider.violatedFieldCountLimit("index1", 0, 10, ClusterState.builder(new ClusterName("_name")).metaData(metaData).build()); assertFalse(result); - result = JobProvider.violatedFieldCountLimit("index1", 1, 10, + result = JobResultsProvider.violatedFieldCountLimit("index1", 1, 10, ClusterState.builder(new ClusterName("_name")).metaData(metaData).build()); assertTrue(result); @@ -801,7 +801,7 @@ public void testViolatedFieldCountLimit() throws Exception { metaData = MetaData.builder() .put(indexMetaData2) .build(); - result = JobProvider.violatedFieldCountLimit("index1", 0, 19, + result = JobResultsProvider.violatedFieldCountLimit("index1", 0, 19, ClusterState.builder(new ClusterName("_name")).metaData(metaData).build()); assertTrue(result); } @@ -811,7 +811,7 @@ public void testCountFields() { mapping.put("field1", Collections.singletonMap("type", "string")); mapping.put("field2", Collections.singletonMap("type", "string")); mapping.put("field3", Collections.singletonMap("type", "string")); - assertEquals(3, JobProvider.countFields(Collections.singletonMap("properties", mapping))); + assertEquals(3, JobResultsProvider.countFields(Collections.singletonMap("properties", mapping))); Map objectProperties = new HashMap<>(); objectProperties.put("field4", Collections.singletonMap("type", "string")); @@ -822,15 +822,15 @@ public void testCountFields() { objectField.put("properties", objectProperties); mapping.put("field4", objectField); - assertEquals(7, JobProvider.countFields(Collections.singletonMap("properties", mapping))); + assertEquals(7, JobResultsProvider.countFields(Collections.singletonMap("properties", mapping))); } private Bucket createBucketAtEpochTime(long epoch) { return new Bucket("foo", new Date(epoch), 123); } - private JobProvider createProvider(Client client) { - return new JobProvider(client, Settings.EMPTY); + private JobResultsProvider createProvider(Client client) { + return new JobResultsProvider(client, Settings.EMPTY); } private static GetResponse createGetResponse(boolean exists, Map source) throws IOException { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 313f449cadd81..43cc909e392ea 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -39,7 +39,7 @@ import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzerTests; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; @@ -101,7 +101,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { private Environment environment; private AnalysisRegistry analysisRegistry; private JobManager jobManager; - private JobProvider jobProvider; + private JobResultsProvider jobResultsProvider; private JobResultsPersister jobResultsPersister; private JobDataCountsPersister jobDataCountsPersister; private NormalizerFactory normalizerFactory; @@ -119,7 +119,7 @@ public void setup() throws Exception { environment = TestEnvironment.newEnvironment(settings); analysisRegistry = CategorizationAnalyzerTests.buildTestAnalysisRegistry(environment); jobManager = mock(JobManager.class); - jobProvider = mock(JobProvider.class); + jobResultsProvider = mock(JobResultsProvider.class); jobResultsPersister = mock(JobResultsPersister.class); when(jobResultsPersister.bulkPersisterBuilder(any())).thenReturn(mock(JobResultsPersister.Builder.class)); jobDataCountsPersister = mock(JobDataCountsPersister.class); @@ -132,7 +132,7 @@ public void setup() throws Exception { Consumer handler = (Consumer) invocationOnMock.getArguments()[1]; handler.accept(buildAutodetectParams()); return null; - }).when(jobProvider).getAutodetectParams(any(), any(), any()); + }).when(jobResultsProvider).getAutodetectParams(any(), any(), any()); } public void testMaxOpenJobsSetting_givenDefault() { @@ -227,7 +227,7 @@ public void testOpenJob_exceedMaxNumJobs() { Settings.Builder settings = Settings.builder(); settings.put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), 3); AutodetectProcessManager manager = spy(new AutodetectProcessManager(environment, settings.build(), client, threadPool, - jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, + jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor)); doReturn(executorService).when(manager).createAutodetectExecutorService(any()); @@ -582,7 +582,7 @@ public void testCreate_notEnoughThreads() throws IOException { AutodetectProcessFactory autodetectProcessFactory = (j, autodetectParams, e, onProcessCrash) -> autodetectProcess; AutodetectProcessManager manager = new AutodetectProcessManager(environment, Settings.EMPTY, - client, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, + client, threadPool, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor); JobTask jobTask = mock(JobTask.class); @@ -655,7 +655,7 @@ private AutodetectProcessManager createNonSpyManager(String jobId) { AutodetectProcessFactory autodetectProcessFactory = (j, autodetectParams, e, onProcessCrash) -> autodetectProcess; return new AutodetectProcessManager(environment, Settings.EMPTY, client, threadPool, jobManager, - jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, + jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor); } @@ -680,7 +680,7 @@ private AutodetectProcessManager createManager(AutodetectCommunicator communicat when(threadPool.executor(anyString())).thenReturn(EsExecutors.newDirectExecutorService()); AutodetectProcessFactory autodetectProcessFactory = mock(AutodetectProcessFactory.class); AutodetectProcessManager manager = new AutodetectProcessManager(environment, Settings.EMPTY, - client, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, + client, threadPool, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor); manager = spy(manager); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index 8eb0317ba0dbe..3b74c6e661596 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -29,7 +29,7 @@ import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.core.ml.job.results.Influencer; import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; @@ -75,7 +75,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { private Auditor auditor; private Renormalizer renormalizer; private JobResultsPersister persister; - private JobProvider jobProvider; + private JobResultsProvider jobResultsProvider; private FlushListener flushListener; private AutoDetectResultProcessor processorUnderTest; private ScheduledThreadPoolExecutor executor; @@ -90,9 +90,9 @@ public void setUpMocks() { when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); renormalizer = mock(Renormalizer.class); persister = mock(JobResultsPersister.class); - jobProvider = mock(JobProvider.class); + jobResultsProvider = mock(JobResultsProvider.class); flushListener = mock(FlushListener.class); - processorUnderTest = new AutoDetectResultProcessor(client, auditor, JOB_ID, renormalizer, persister, jobProvider, + processorUnderTest = new AutoDetectResultProcessor(client, auditor, JOB_ID, renormalizer, persister, jobResultsProvider, new ModelSizeStats.Builder(JOB_ID).setTimestamp(new Date(BUCKET_SPAN_MS)).build(), false, flushListener); } @@ -294,8 +294,8 @@ public void testProcessResult_modelSizeStats() { verify(persister, times(1)).persistModelSizeStats(modelSizeStats); verifyNoMoreInteractions(persister); - // No interactions with the jobProvider confirms that the established memory calculation did not run - verifyNoMoreInteractions(jobProvider, auditor); + // No interactions with the jobResultsProvider confirms that the established memory calculation did not run + verifyNoMoreInteractions(jobResultsProvider, auditor); assertEquals(modelSizeStats, processorUnderTest.modelSizeStats()); } @@ -347,7 +347,7 @@ public void testProcessResult_modelSizeStatsAfterManyBuckets() throws Exception AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; - for (int i = 0; i < JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE; ++i) { + for (int i = 0; i < JobResultsProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE; ++i) { AutodetectResult result = mock(AutodetectResult.class); Bucket bucket = mock(Bucket.class); when(result.getBucket()).thenReturn(bucket); @@ -366,9 +366,9 @@ public void testProcessResult_modelSizeStatsAfterManyBuckets() throws Exception verify(persister, times(1)).persistModelSizeStats(modelSizeStats); verify(persister, times(1)).commitResultWrites(JOB_ID); verifyNoMoreInteractions(persister); - verify(jobProvider, times(1)).getEstablishedMemoryUsage(eq(JOB_ID), eq(timestamp), eq(modelSizeStats), any(Consumer.class), - any(Consumer.class)); - verifyNoMoreInteractions(jobProvider); + verify(jobResultsProvider, times(1)).getEstablishedMemoryUsage(eq(JOB_ID), eq(timestamp), + eq(modelSizeStats), any(Consumer.class), any(Consumer.class)); + verifyNoMoreInteractions(jobResultsProvider); assertEquals(modelSizeStats, processorUnderTest.modelSizeStats()); }); } @@ -383,13 +383,13 @@ public void testProcessResult_manyModelSizeStatsInQuickSuccession() throws Excep AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; ModelSizeStats modelSizeStats = null; - for (int i = 1; i <= JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE + 5; ++i) { + for (int i = 1; i <= JobResultsProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE + 5; ++i) { AutodetectResult result = mock(AutodetectResult.class); Bucket bucket = mock(Bucket.class); when(bucket.getTimestamp()).thenReturn(new Date(BUCKET_SPAN_MS * i)); when(result.getBucket()).thenReturn(bucket); processorUnderTest.processResult(context, result); - if (i > JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE) { + if (i > JobResultsProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE) { result = mock(AutodetectResult.class); modelSizeStats = mock(ModelSizeStats.class); when(modelSizeStats.getTimestamp()).thenReturn(new Date(BUCKET_SPAN_MS * i)); @@ -409,9 +409,9 @@ public void testProcessResult_manyModelSizeStatsInQuickSuccession() throws Excep // ...but only the last should trigger an established model memory update verify(persister, times(1)).commitResultWrites(JOB_ID); verifyNoMoreInteractions(persister); - verify(jobProvider, times(1)).getEstablishedMemoryUsage(eq(JOB_ID), eq(lastTimestamp), eq(lastModelSizeStats), + verify(jobResultsProvider, times(1)).getEstablishedMemoryUsage(eq(JOB_ID), eq(lastTimestamp), eq(lastModelSizeStats), any(Consumer.class), any(Consumer.class)); - verifyNoMoreInteractions(jobProvider); + verifyNoMoreInteractions(jobResultsProvider); assertEquals(lastModelSizeStats, processorUnderTest.modelSizeStats()); }); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdaterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdaterTests.java index 5f8b685f8442c..2acaf97359477 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdaterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdaterTests.java @@ -16,7 +16,7 @@ import org.elasticsearch.xpack.core.ml.job.results.BucketInfluencer; import org.elasticsearch.xpack.core.ml.job.results.Influencer; import org.elasticsearch.xpack.core.ml.job.results.Result; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.MockBatchedDocumentsIterator; import org.junit.Before; @@ -52,7 +52,7 @@ public class ScoresUpdaterTests extends ESTestCase { private static final long DEFAULT_BUCKET_SPAN = 3600; private static final long DEFAULT_START_TIME = 0; - private JobProvider jobProvider = mock(JobProvider.class); + private JobResultsProvider jobResultsProvider = mock(JobResultsProvider.class); private JobRenormalizedResultsPersister jobRenormalizedResultsPersister = mock(JobRenormalizedResultsPersister.class); private Normalizer normalizer = mock(Normalizer.class); private NormalizerFactory normalizerFactory = mock(NormalizerFactory.class); @@ -78,7 +78,7 @@ public void setUpMocks() throws IOException { job = jobBuilder.build(new Date()); - scoresUpdater = new ScoresUpdater(job, jobProvider, jobRenormalizedResultsPersister, normalizerFactory); + scoresUpdater = new ScoresUpdater(job, jobResultsProvider, jobRenormalizedResultsPersister, normalizerFactory); givenProviderReturnsNoBuckets(); givenProviderReturnsNoRecords(); @@ -210,7 +210,7 @@ public void testUpdate_GivenTwoBucketsWithFirstHavingEnoughRecordsToForceSecondN MockBatchedDocumentsIterator recordIter = new MockBatchedDocumentsIterator<>( recordBatches, AnomalyRecord.RESULT_TYPE_VALUE); recordIter.requireIncludeInterim(false); - when(jobProvider.newBatchedRecordsIterator(JOB_ID)).thenReturn(recordIter); + when(jobResultsProvider.newBatchedRecordsIterator(JOB_ID)).thenReturn(recordIter); scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); @@ -376,7 +376,7 @@ private void givenBuckets(List> batches) { MockBatchedDocumentsIterator bucketIter = new MockBatchedDocumentsIterator<>(batchesWithIndex, Bucket.RESULT_TYPE_VALUE); bucketIter.requireIncludeInterim(false); - when(jobProvider.newBatchedBucketsIterator(JOB_ID)).thenReturn(bucketIter); + when(jobResultsProvider.newBatchedBucketsIterator(JOB_ID)).thenReturn(bucketIter); } private void givenProviderReturnsNoRecords() { @@ -394,7 +394,7 @@ private void givenProviderReturnsRecords(Deque records) { MockBatchedDocumentsIterator recordIter = new MockBatchedDocumentsIterator<>( batches, AnomalyRecord.RESULT_TYPE_VALUE); recordIter.requireIncludeInterim(false); - when(jobProvider.newBatchedRecordsIterator(JOB_ID)).thenReturn(recordIter); + when(jobResultsProvider.newBatchedRecordsIterator(JOB_ID)).thenReturn(recordIter); } private void givenProviderReturnsNoInfluencers() { @@ -410,7 +410,7 @@ private void givenProviderReturnsInfluencers(Deque influencers) { batches.add(queue); MockBatchedDocumentsIterator iterator = new MockBatchedDocumentsIterator<>(batches, Influencer.RESULT_TYPE_VALUE); iterator.requireIncludeInterim(false); - when(jobProvider.newBatchedInfluencersIterator(JOB_ID)).thenReturn(iterator); + when(jobResultsProvider.newBatchedInfluencersIterator(JOB_ID)).thenReturn(iterator); } private void verifyNormalizerWasInvoked(int times) throws IOException {