Skip to content

Commit

Permalink
[ML] fix updating opened jobs scheduled events (#31651) (#32881)
Browse files Browse the repository at this point in the history
* ML: fix updating opened jobs scheduled events (#31651)

* Adding UpdateParamsTests license header

* Adding integration test and addressing PR comments

* addressing test and job names
  • Loading branch information
benwtrent committed Aug 17, 2018
1 parent 7a2d700 commit ba38b2e
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public Version getJobVersion() {
}

public boolean isAutodetectProcessUpdate() {
return modelPlotConfig != null || detectorUpdates != null;
return modelPlotConfig != null || detectorUpdates != null || groups != null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ public void testIsAutodetectProcessUpdate() {
assertTrue(update.isAutodetectProcessUpdate());
update = new JobUpdate.Builder("foo").setDetectorUpdates(Collections.singletonList(mock(JobUpdate.DetectorUpdate.class))).build();
assertTrue(update.isAutodetectProcessUpdate());
update = new JobUpdate.Builder("foo").setGroups(Arrays.asList("bar")).build();
assertTrue(update.isAutodetectProcessUpdate());
}

public void testUpdateAnalysisLimitWithValueGreaterThanMax() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public static UpdateParams fromJobUpdate(JobUpdate jobUpdate) {
return new Builder(jobUpdate.getJobId())
.modelPlotConfig(jobUpdate.getModelPlotConfig())
.detectorUpdates(jobUpdate.getDetectorUpdates())
.updateScheduledEvents(jobUpdate.getGroups() != null)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process.autodetect;

import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.core.ml.job.config.Operator;
import org.elasticsearch.xpack.core.ml.job.config.RuleCondition;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;


public class UpdateParamsTests extends ESTestCase {

public void testFromJobUpdate() {
String jobId = "foo";
DetectionRule rule = new DetectionRule.Builder(Arrays.asList(
new RuleCondition(RuleCondition.AppliesTo.ACTUAL,
Operator.GT, 1.0))).build();
List<DetectionRule> rules = Arrays.asList(rule);
List<JobUpdate.DetectorUpdate> detectorUpdates = Collections.singletonList(
new JobUpdate.DetectorUpdate(2, null, rules));
JobUpdate.Builder updateBuilder = new JobUpdate.Builder(jobId)
.setModelPlotConfig(new ModelPlotConfig())
.setDetectorUpdates(detectorUpdates);

UpdateParams params = UpdateParams.fromJobUpdate(updateBuilder.build());

assertFalse(params.isUpdateScheduledEvents());
assertEquals(params.getDetectorUpdates(), updateBuilder.build().getDetectorUpdates());
assertEquals(params.getModelPlotConfig(), updateBuilder.build().getModelPlotConfig());

params = UpdateParams.fromJobUpdate(updateBuilder.setGroups(Arrays.asList("bar")).build());

assertTrue(params.isUpdateScheduledEvents());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
import org.elasticsearch.xpack.core.ml.action.GetRecordsAction;
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.junit.After;
Expand Down Expand Up @@ -193,9 +195,9 @@ public void testScheduledEventWithInterimResults() throws IOException {
/**
* Test an open job picks up changes to scheduled events/calendars
*/
public void testOnlineUpdate() throws Exception {
public void testAddEventsToOpenJob() throws Exception {
TimeValue bucketSpan = TimeValue.timeValueMinutes(30);
Job.Builder job = createJob("scheduled-events-online-update", bucketSpan);
Job.Builder job = createJob("scheduled-events-add-events-to-open-job", bucketSpan);

long startTime = 1514764800000L;
final int bucketCount = 5;
Expand All @@ -209,7 +211,7 @@ public void testOnlineUpdate() throws Exception {

// Now create a calendar and events for the job while it is open
String calendarId = "test-calendar-online-update";
putCalendar(calendarId, Collections.singletonList(job.getId()), "testOnlineUpdate calendar");
putCalendar(calendarId, Collections.singletonList(job.getId()), "testAddEventsToOpenJob calendar");

List<ScheduledEvent> events = new ArrayList<>();
long eventStartTime = startTime + (bucketCount + 1) * bucketSpan.millis();
Expand Down Expand Up @@ -257,6 +259,81 @@ public void testOnlineUpdate() throws Exception {
assertEquals(0, buckets.get(8).getScheduledEvents().size());
}

/**
* An open job that later gets added to a calendar, should take the scheduled events into account
*/
public void testAddOpenedJobToGroupWithCalendar() throws Exception {
TimeValue bucketSpan = TimeValue.timeValueMinutes(30);
String groupName = "opened-calendar-job-group";
Job.Builder job = createJob("scheduled-events-add-opened-job-to-group-with-calendar", bucketSpan);

long startTime = 1514764800000L;
final int bucketCount = 5;

// Open the job
openJob(job.getId());

// write some buckets of data
postData(job.getId(), generateData(startTime, bucketSpan, bucketCount, bucketIndex -> randomIntBetween(100, 200))
.stream().collect(Collectors.joining()));

String calendarId = "test-calendar-open-job-update";

// Create a new calendar referencing groupName
putCalendar(calendarId, Collections.singletonList(groupName), "testAddOpenedJobToGroupWithCalendar calendar");

// Put events in the calendar
List<ScheduledEvent> events = new ArrayList<>();
long eventStartTime = startTime + (bucketCount + 1) * bucketSpan.millis();
long eventEndTime = eventStartTime + (long)(1.5 * bucketSpan.millis());
events.add(new ScheduledEvent.Builder().description("Some Event")
.startTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(eventStartTime), ZoneOffset.UTC))
.endTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(eventEndTime), ZoneOffset.UTC))
.calendarId(calendarId).build());

postScheduledEvents(calendarId, events);

// Update the job to be a member of the group
UpdateJobAction.Request jobUpdateRequest = new UpdateJobAction.Request(job.getId(),
new JobUpdate.Builder(job.getId()).setGroups(Collections.singletonList(groupName)).build());
client().execute(UpdateJobAction.INSTANCE, jobUpdateRequest).actionGet();

// Wait until the notification that the job was updated is indexed
assertBusy(() -> {
SearchResponse searchResponse = client().prepareSearch(".ml-notifications")
.setSize(1)
.addSort("timestamp", SortOrder.DESC)
.setQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery("job_id", job.getId()))
.filter(QueryBuilders.termQuery("level", "info"))
).get();
SearchHit[] hits = searchResponse.getHits().getHits();
assertThat(hits.length, equalTo(1));
assertThat(hits[0].getSourceAsMap().get("message"), equalTo("Job updated: [groups]"));
});

// write some more buckets of data that cover the scheduled event period
postData(job.getId(), generateData(startTime + bucketCount * bucketSpan.millis(), bucketSpan, 5,
bucketIndex -> randomIntBetween(100, 200))
.stream().collect(Collectors.joining()));
// and close
closeJob(job.getId());

GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(job.getId());
List<Bucket> buckets = getBuckets(getBucketsRequest);

// the first 6 buckets have no events
for (int i=0; i<=bucketCount; i++) {
assertEquals(0, buckets.get(i).getScheduledEvents().size());
}
// 7th and 8th buckets have the event but the last one does not
assertEquals(1, buckets.get(6).getScheduledEvents().size());
assertEquals("Some Event", buckets.get(6).getScheduledEvents().get(0));
assertEquals(1, buckets.get(7).getScheduledEvents().size());
assertEquals("Some Event", buckets.get(7).getScheduledEvents().get(0));
assertEquals(0, buckets.get(8).getScheduledEvents().size());
}

private Job.Builder createJob(String jobId, TimeValue bucketSpan) {
Detector.Builder detector = new Detector.Builder("count", null);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
Expand Down

0 comments on commit ba38b2e

Please sign in to comment.