Skip to content

Commit

Permalink
[ML] Rename JobProvider to JobResultsProvider (#32551)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle committed Aug 2, 2018
1 parent 5d804d0 commit 5ac551c
Show file tree
Hide file tree
Showing 33 changed files with 244 additions and 239 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@
import org.elasticsearch.xpack.ml.job.categorization.MlClassicTokenizer;
import org.elasticsearch.xpack.ml.job.categorization.MlClassicTokenizerFactory;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.NativeController;
Expand Down Expand Up @@ -362,9 +362,9 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
}

Auditor auditor = new Auditor(client, clusterService.nodeName());
JobProvider jobProvider = new JobProvider(client, settings);
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings);
UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(settings, client, clusterService, threadPool);
JobManager jobManager = new JobManager(env, settings, jobProvider, clusterService, auditor, client, notifier);
JobManager jobManager = new JobManager(env, settings, jobResultsProvider, clusterService, auditor, client, notifier);

JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(settings, client);
JobResultsPersister jobResultsPersister = new JobResultsPersister(settings, client);
Expand Down Expand Up @@ -395,10 +395,10 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory,
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME));
AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(env, settings, client, threadPool,
jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, xContentRegistry, auditor);
this.autodetectProcessManager.set(autodetectProcessManager);
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, jobProvider, auditor, System::currentTimeMillis);
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, jobResultsProvider, auditor, System::currentTimeMillis);
DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,
System::currentTimeMillis, auditor);
this.datafeedManager.set(datafeedManager);
Expand All @@ -413,7 +413,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster

return Arrays.asList(
mlLifeCycleService,
jobProvider,
jobResultsProvider,
jobManager,
autodetectProcessManager,
new MlInitializationService(settings, threadPool, clusterService, client),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.elasticsearch.xpack.core.ml.action.DeleteCalendarAction;
import org.elasticsearch.xpack.core.ml.calendars.Calendar;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
Expand All @@ -34,18 +34,18 @@ public class TransportDeleteCalendarAction extends HandledTransportAction<Delete

private final Client client;
private final JobManager jobManager;
private final JobProvider jobProvider;
private final JobResultsProvider jobResultsProvider;

@Inject
public TransportDeleteCalendarAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Client client, JobManager jobManager, JobProvider jobProvider) {
Client client, JobManager jobManager, JobResultsProvider jobResultsProvider) {
super(settings, DeleteCalendarAction.NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, DeleteCalendarAction.Request::new);
this.client = client;
this.jobManager = jobManager;
this.jobProvider = jobProvider;
this.jobResultsProvider = jobResultsProvider;
}

@Override
Expand All @@ -71,7 +71,7 @@ protected void doExecute(DeleteCalendarAction.Request request, ActionListener<De
listener::onFailure
);

jobProvider.calendar(calendarId, calendarListener);
jobResultsProvider.calendar(calendarId, calendarListener);
}

private DeleteByQueryRequest buildDeleteByQuery(String calendarId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.elasticsearch.xpack.core.ml.calendars.Calendar;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;

import java.util.Map;

Expand All @@ -38,18 +38,18 @@ public class TransportDeleteCalendarEventAction extends HandledTransportAction<D
DeleteCalendarEventAction.Response> {

private final Client client;
private final JobProvider jobProvider;
private final JobResultsProvider jobResultsProvider;
private final JobManager jobManager;

@Inject
public TransportDeleteCalendarEventAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Client client, JobProvider jobProvider, JobManager jobManager) {
Client client, JobResultsProvider jobResultsProvider, JobManager jobManager) {
super(settings, DeleteCalendarEventAction.NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, DeleteCalendarEventAction.Request::new);
this.client = client;
this.jobProvider = jobProvider;
this.jobResultsProvider = jobResultsProvider;
this.jobManager = jobManager;
}

Expand Down Expand Up @@ -89,7 +89,7 @@ protected void doExecute(DeleteCalendarEventAction.Request request, ActionListen
}, listener::onFailure);

// Get the calendar first so we check the calendar exists before checking the event exists
jobProvider.calendar(request.getCalendarId(), calendarListener);
jobResultsProvider.calendar(request.getCalendarId(), calendarListener);
}

private void deleteEvent(String eventId, Calendar calendar, ActionListener<DeleteCalendarEventAction.Response> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.elasticsearch.xpack.core.ml.job.persistence.JobDataDeleter;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.notifications.Auditor;

import java.util.Collections;
Expand All @@ -33,26 +33,27 @@ public class TransportDeleteModelSnapshotAction extends HandledTransportAction<D
DeleteModelSnapshotAction.Response> {

private final Client client;
private final JobProvider jobProvider;
private final JobResultsProvider jobResultsProvider;
private final ClusterService clusterService;
private final Auditor auditor;

@Inject
public TransportDeleteModelSnapshotAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
JobProvider jobProvider, ClusterService clusterService, Client client, Auditor auditor) {
JobResultsProvider jobResultsProvider, ClusterService clusterService, Client client,
Auditor auditor) {
super(settings, DeleteModelSnapshotAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
DeleteModelSnapshotAction.Request::new);
this.client = client;
this.jobProvider = jobProvider;
this.jobResultsProvider = jobResultsProvider;
this.clusterService = clusterService;
this.auditor = auditor;
}

@Override
protected void doExecute(DeleteModelSnapshotAction.Request request, ActionListener<DeleteModelSnapshotAction.Response> listener) {
// Verify the snapshot exists
jobProvider.modelSnapshots(
jobResultsProvider.modelSnapshots(
request.getJobId(), 0, 1, null, null, null, true, request.getSnapshotId(),
page -> {
List<ModelSnapshot> deleteCandidates = page.results();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;

Expand All @@ -41,16 +41,16 @@ public class TransportForecastJobAction extends TransportJobTaskAction<ForecastJ

private static final ByteSizeValue FORECAST_LOCAL_STORAGE_LIMIT = new ByteSizeValue(500, ByteSizeUnit.MB);

private final JobProvider jobProvider;
private final JobResultsProvider jobResultsProvider;
@Inject
public TransportForecastJobAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ClusterService clusterService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, JobProvider jobProvider,
IndexNameExpressionResolver indexNameExpressionResolver, JobResultsProvider jobResultsProvider,
AutodetectProcessManager processManager) {
super(settings, ForecastJobAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, ForecastJobAction.Request::new, ForecastJobAction.Response::new,
ThreadPool.Names.SAME, processManager);
this.jobProvider = jobProvider;
this.jobResultsProvider = jobResultsProvider;
// ThreadPool.Names.SAME, because operations is executed by autodetect worker thread
}

Expand Down Expand Up @@ -109,7 +109,7 @@ protected void taskOperation(ForecastJobAction.Request request, TransportOpenJob
}
};

jobProvider.getForecastRequestStats(request.getJobId(), params.getForecastId(),
jobResultsProvider.getForecastRequestStats(request.getJobId(), params.getForecastId(),
forecastRequestStatsHandler, listener::onFailure);
} else {
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;

public class TransportGetBucketsAction extends HandledTransportAction<GetBucketsAction.Request, GetBucketsAction.Response> {

private final JobProvider jobProvider;
private final JobResultsProvider jobResultsProvider;
private final JobManager jobManager;
private final Client client;

@Inject
public TransportGetBucketsAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
JobProvider jobProvider, JobManager jobManager, Client client) {
JobResultsProvider jobResultsProvider, JobManager jobManager, Client client) {
super(settings, GetBucketsAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
GetBucketsAction.Request::new);
this.jobProvider = jobProvider;
this.jobResultsProvider = jobResultsProvider;
this.jobManager = jobManager;
this.client = client;
}
Expand Down Expand Up @@ -59,7 +59,7 @@ protected void doExecute(GetBucketsAction.Request request, ActionListener<GetBuc
query.start(request.getStart());
query.end(request.getEnd());
}
jobProvider.buckets(request.getJobId(), query, q ->
jobResultsProvider.buckets(request.getJobId(), query, q ->
listener.onResponse(new GetBucketsAction.Response(q)), listener::onFailure, client);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,25 @@
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;

import java.util.Collections;
import java.util.List;

public class TransportGetCalendarEventsAction extends HandledTransportAction<GetCalendarEventsAction.Request,
GetCalendarEventsAction.Response> {

private final JobProvider jobProvider;
private final JobResultsProvider jobResultsProvider;
private final ClusterService clusterService;

@Inject
public TransportGetCalendarEventsAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, JobProvider jobProvider) {
ClusterService clusterService, JobResultsProvider jobResultsProvider) {
super(settings, GetCalendarEventsAction.NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, GetCalendarEventsAction.Request::new);
this.jobProvider = jobProvider;
this.jobResultsProvider = jobResultsProvider;
this.clusterService = clusterService;
}

Expand Down Expand Up @@ -87,9 +87,9 @@ protected void doExecute(GetCalendarEventsAction.Request request,
jobGroups = job.getGroups();
}

jobProvider.scheduledEventsForJob(requestId, jobGroups, query, eventsListener);
jobResultsProvider.scheduledEventsForJob(requestId, jobGroups, query, eventsListener);
} else {
jobProvider.scheduledEvents(query, eventsListener);
jobResultsProvider.scheduledEvents(query, eventsListener);
}
},
listener::onFailure);
Expand All @@ -103,7 +103,7 @@ private void checkCalendarExists(String calendarId, ActionListener<Boolean> list
return;
}

jobProvider.calendar(calendarId, ActionListener.wrap(
jobResultsProvider.calendar(calendarId, ActionListener.wrap(
c -> listener.onResponse(true),
listener::onFailure
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.calendars.Calendar;
import org.elasticsearch.xpack.ml.job.persistence.CalendarQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;

import java.util.Collections;

public class TransportGetCalendarsAction extends HandledTransportAction<GetCalendarsAction.Request, GetCalendarsAction.Response> {

private final JobProvider jobProvider;
private final JobResultsProvider jobResultsProvider;

@Inject
public TransportGetCalendarsAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
JobProvider jobProvider) {
JobResultsProvider jobResultsProvider) {
super(settings, GetCalendarsAction.NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, GetCalendarsAction.Request::new);
this.jobProvider = jobProvider;
this.jobResultsProvider = jobResultsProvider;
}

@Override
Expand All @@ -52,7 +52,7 @@ protected void doExecute(GetCalendarsAction.Request request, ActionListener<GetC

private void getCalendar(String calendarId, ActionListener<GetCalendarsAction.Response> listener) {

jobProvider.calendar(calendarId, ActionListener.wrap(
jobResultsProvider.calendar(calendarId, ActionListener.wrap(
calendar -> {
QueryPage<Calendar> page = new QueryPage<>(Collections.singletonList(calendar), 1, Calendar.RESULTS_FIELD);
listener.onResponse(new GetCalendarsAction.Response(page));
Expand All @@ -63,7 +63,7 @@ private void getCalendar(String calendarId, ActionListener<GetCalendarsAction.Re

private void getCalendars(PageParams pageParams, ActionListener<GetCalendarsAction.Response> listener) {
CalendarQueryBuilder query = new CalendarQueryBuilder().pageParams(pageParams).sort(true);
jobProvider.calendars(query, ActionListener.wrap(
jobResultsProvider.calendars(query, ActionListener.wrap(
calendars -> {
listener.onResponse(new GetCalendarsAction.Response(calendars));
},
Expand Down
Loading

0 comments on commit 5ac551c

Please sign in to comment.