Skip to content

Commit

Permalink
[ML] adds new undocumented xpack.ml.delayed_data_check_freq setting f…
Browse files Browse the repository at this point in the history
…or increasing the delayed data check frequency in datafeeds (#76243) (#76253)
  • Loading branch information
benwtrent authored Aug 9, 2021
1 parent 8e5ff21 commit c321932
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,19 @@ public Set<DiscoveryNodeRole> getRoles() {
Property.OperatorDynamic,
Property.NodeScope);

/**
* This is the global setting for how often datafeeds should check for delayed data.
*
* This is usually only modified by tests that require all datafeeds to check for delayed data more quickly
*/
public static final Setting<TimeValue> DELAYED_DATA_CHECK_FREQ = Setting.timeSetting(
"xpack.ml.delayed_data_check_freq",
TimeValue.timeValueMinutes(15),
TimeValue.timeValueSeconds(1),
Property.Dynamic,
Setting.Property.NodeScope
);

private static final Logger logger = LogManager.getLogger(MachineLearning.class);

private final Settings settings;
Expand Down Expand Up @@ -584,7 +597,8 @@ public List<Setting<?>> getSettings() {
ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES,
NIGHTLY_MAINTENANCE_REQUESTS_PER_SECOND,
USE_AUTO_MACHINE_MEMORY_PERCENT,
MAX_ML_NODE_SIZE));
MAX_ML_NODE_SIZE,
DELAYED_DATA_CHECK_FREQ));
}

public Settings additionalSettings() {
Expand Down Expand Up @@ -766,16 +780,16 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
jobDataCountsPersister, anomalyDetectionAnnotationPersister, autodetectProcessFactory,
normalizerFactory, nativeStorageProvider, indexNameExpressionResolver);
this.autodetectProcessManager.set(autodetectProcessManager);
DatafeedJobBuilder datafeedJobBuilder =
new DatafeedJobBuilder(
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(
client,
xContentRegistry,
anomalyDetectionAuditor,
anomalyDetectionAnnotationPersister,
System::currentTimeMillis,
jobResultsPersister,
settings,
clusterService.getNodeName());
clusterService
);
DatafeedContextProvider datafeedContextProvider = new DatafeedContextProvider(jobConfigProvider, datafeedConfigProvider,
jobResultsProvider);
DatafeedRunner datafeedRunner = new DatafeedRunner(threadPool, client, clusterService, datafeedJobBuilder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ class DatafeedJob {

private static final Logger LOGGER = LogManager.getLogger(DatafeedJob.class);
private static final int NEXT_TASK_DELAY_MS = 100;
static final long MISSING_DATA_CHECK_INTERVAL_MS = 900_000; //15 minutes in ms

private final AnomalyDetectionAuditor auditor;
private final AnnotationPersister annotationPersister;
Expand All @@ -67,6 +66,7 @@ class DatafeedJob {
private final Supplier<Long> currentTimeSupplier;
private final DelayedDataDetector delayedDataDetector;
private final Integer maxEmptySearches;
private final long delayedDataCheckFreq;

private volatile long lookbackStartTimeMs;
private volatile long latestFinalBucketEndTimeMs;
Expand All @@ -82,7 +82,7 @@ class DatafeedJob {
DataExtractorFactory dataExtractorFactory, DatafeedTimingStatsReporter timingStatsReporter, Client client,
AnomalyDetectionAuditor auditor, AnnotationPersister annotationPersister, Supplier<Long> currentTimeSupplier,
DelayedDataDetector delayedDataDetector, Integer maxEmptySearches, long latestFinalBucketEndTimeMs, long latestRecordTimeMs,
boolean haveSeenDataPreviously) {
boolean haveSeenDataPreviously, long delayedDataCheckFreq) {
this.jobId = jobId;
this.dataDescription = Objects.requireNonNull(dataDescription);
this.frequencyMs = frequencyMs;
Expand All @@ -101,6 +101,7 @@ class DatafeedJob {
lastEndTimeMs = lastEndTime;
}
this.haveEverSeenData = haveSeenDataPreviously;
this.delayedDataCheckFreq = delayedDataCheckFreq;
}

void isolate() {
Expand Down Expand Up @@ -294,7 +295,7 @@ private Annotation updateAnnotation(Annotation annotation) {
}

/**
* We wait a static interval of 15 minutes till the next missing data check.
* We wait for `delayedDataCheckFreq` interval till the next missing data check.
*
* However, if our delayed data window is smaller than that, we will probably want to check at every available window (if freq. allows).
* This is to help to miss as few buckets in the delayed data check as possible.
Expand All @@ -304,8 +305,7 @@ private Annotation updateAnnotation(Annotation annotation) {
* probably not even be noticeable at such a large timescale.
*/
private boolean checkForMissingDataTriggered() {
return this.currentTimeSupplier.get() > this.lastDataCheckTimeMs
+ Math.min(MISSING_DATA_CHECK_INTERVAL_MS, delayedDataDetector.getWindow());
return this.currentTimeSupplier.get() > this.lastDataCheckTimeMs + Math.min(delayedDataCheckFreq, delayedDataDetector.getWindow());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand All @@ -32,6 +33,8 @@
import java.util.Objects;
import java.util.function.Supplier;

import static org.elasticsearch.xpack.ml.MachineLearning.DELAYED_DATA_CHECK_FREQ;

public class DatafeedJobBuilder {

private final Client client;
Expand All @@ -43,17 +46,25 @@ public class DatafeedJobBuilder {
private final boolean remoteClusterClient;
private final String nodeName;

private volatile long delayedDataCheckFreq;

public DatafeedJobBuilder(Client client, NamedXContentRegistry xContentRegistry, AnomalyDetectionAuditor auditor,
AnnotationPersister annotationPersister, Supplier<Long> currentTimeSupplier,
JobResultsPersister jobResultsPersister, Settings settings, String nodeName) {
JobResultsPersister jobResultsPersister, Settings settings, ClusterService clusterService) {
this.client = client;
this.xContentRegistry = Objects.requireNonNull(xContentRegistry);
this.auditor = Objects.requireNonNull(auditor);
this.annotationPersister = Objects.requireNonNull(annotationPersister);
this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier);
this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister);
this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings);
this.nodeName = nodeName;
this.nodeName = clusterService.getNodeName();
this.delayedDataCheckFreq = DELAYED_DATA_CHECK_FREQ.get(settings).millis();
clusterService.getClusterSettings().addSettingsUpdateConsumer(DELAYED_DATA_CHECK_FREQ, this::setDelayedDataCheckFreq);
}

private void setDelayedDataCheckFreq(TimeValue value) {
this.delayedDataCheckFreq = value.millis();
}

void build(TransportStartDatafeedAction.DatafeedTask task, DatafeedContext context, ActionListener<DatafeedJob> listener) {
Expand Down Expand Up @@ -90,8 +101,7 @@ void build(TransportStartDatafeedAction.DatafeedTask task, DatafeedContext conte
TimeValue queryDelay = datafeedConfig.getQueryDelay();
DelayedDataDetector delayedDataDetector = DelayedDataDetectorFactory.buildDetector(job,
datafeedConfig, parentTaskAssigningClient, xContentRegistry);
DatafeedJob datafeedJob =
new DatafeedJob(
DatafeedJob datafeedJob = new DatafeedJob(
job.getId(),
buildDataDescription(job),
frequency.millis(),
Expand All @@ -106,7 +116,9 @@ void build(TransportStartDatafeedAction.DatafeedTask task, DatafeedContext conte
datafeedConfig.getMaxEmptySearches(),
latestFinalBucketEndMs,
latestRecordTimeMs,
context.getRestartTimeInfo().haveSeenDataPreviously());
context.getRestartTimeInfo().haveSeenDataPreviously(),
delayedDataCheckFreq
);

listener.onResponse(datafeedJob);
}, e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,15 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.node.Node;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -18,17 +25,19 @@
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction;
import org.elasticsearch.xpack.ml.annotations.AnnotationPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.RestartTimeInfo;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.junit.Before;

import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import static org.elasticsearch.test.NodeRoles.nonRemoteClusterClientNode;
import static org.hamcrest.Matchers.equalTo;
Expand All @@ -42,8 +51,8 @@ public class DatafeedJobBuilderTests extends ESTestCase {
private Client client;
private AnomalyDetectionAuditor auditor;
private AnnotationPersister annotationPersister;
private Consumer<Exception> taskHandler;
private JobResultsPersister jobResultsPersister;
private ClusterService clusterService;

private DatafeedJobBuilder datafeedJobBuilder;

Expand All @@ -57,19 +66,30 @@ public void init() {
when(client.settings()).thenReturn(Settings.EMPTY);
auditor = mock(AnomalyDetectionAuditor.class);
annotationPersister = mock(AnnotationPersister.class);
taskHandler = mock(Consumer.class);
jobResultsPersister = mock(JobResultsPersister.class);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY,
new HashSet<>(Arrays.asList(MachineLearning.DELAYED_DATA_CHECK_FREQ,
MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING,
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING,
ClusterService.USER_DEFINED_METADATA,
ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING)));
clusterService = new ClusterService(
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test_node").build(),
clusterSettings,
threadPool
);

datafeedJobBuilder =
new DatafeedJobBuilder(
datafeedJobBuilder = new DatafeedJobBuilder(
client,
xContentRegistry(),
auditor,
annotationPersister,
System::currentTimeMillis,
jobResultsPersister,
Settings.EMPTY,
"test_node");
clusterService
);
}

public void testBuild_GivenScrollDatafeedAndNewJob() throws Exception {
Expand Down Expand Up @@ -169,16 +189,16 @@ public void testBuild_GivenScrollDatafeedAndOldJobWithLatestBucketAfterLatestRec
}

public void testBuildGivenRemoteIndicesButNoRemoteSearching() throws Exception {
datafeedJobBuilder =
new DatafeedJobBuilder(
client,
xContentRegistry(),
auditor,
annotationPersister,
System::currentTimeMillis,
jobResultsPersister,
nonRemoteClusterClientNode(),
"test_node");
datafeedJobBuilder = new DatafeedJobBuilder(
client,
xContentRegistry(),
auditor,
annotationPersister,
System::currentTimeMillis,
jobResultsPersister,
nonRemoteClusterClientNode(),
clusterService
);
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedRunnerTests.createDatafeedJob();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.mock.orig.Mockito;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -65,6 +66,7 @@
import java.util.Optional;
import java.util.function.Supplier;

import static org.elasticsearch.xpack.ml.MachineLearning.DELAYED_DATA_CHECK_FREQ;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
Expand All @@ -85,6 +87,10 @@ public class DatafeedJobTests extends ESTestCase {

private static final String jobId = "_job_id";

private static final long DELAYED_DATA_WINDOW = TimeValue.timeValueMinutes(15).millis();
private static final long DELAYED_DATA_FREQ = TimeValue.timeValueMinutes(2).millis();
private static final long DELAYED_DATA_FREQ_HALF = TimeValue.timeValueMinutes(1).millis();

private AnomalyDetectionAuditor auditor;
private DataExtractorFactory dataExtractorFactory;
private DataExtractor dataExtractor;
Expand Down Expand Up @@ -124,7 +130,7 @@ public void setup() throws Exception {
annotationDocId = "AnnotationDocId";
flushJobResponse = new FlushJobAction.Response(true, Instant.now());
delayedDataDetector = mock(DelayedDataDetector.class);
when(delayedDataDetector.getWindow()).thenReturn(DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS);
when(delayedDataDetector.getWindow()).thenReturn(DELAYED_DATA_WINDOW);
currentTime = 0;
xContentType = XContentType.JSON;

Expand Down Expand Up @@ -252,10 +258,10 @@ public void testRealtimeRun() throws Exception {
when(client.execute(same(FlushJobAction.INSTANCE), flushJobRequests.capture())).thenReturn(flushJobFuture);
when(delayedDataDetector.detectMissingData(2000))
.thenReturn(Collections.singletonList(BucketWithMissingData.fromMissingAndBucket(10, bucket)));
currentTime = 60000L;
currentTime = DELAYED_DATA_FREQ_HALF;
long frequencyMs = 100;
long queryDelayMs = 1000;
DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1, false);
DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1, false, DELAYED_DATA_FREQ);
long next = datafeedJob.runRealtime();
assertEquals(currentTime + frequencyMs + 100, next);

Expand All @@ -268,7 +274,7 @@ public void testRealtimeRun() throws Exception {
verify(client, never()).execute(same(PersistJobAction.INSTANCE), any());

// Execute a second valid time, but do so in a smaller window than the interval
currentTime = 62000L;
currentTime = currentTime + DELAYED_DATA_FREQ_HALF - 1;
byte[] contentBytes = "content".getBytes(StandardCharsets.UTF_8);
InputStream inputStream = new ByteArrayInputStream(contentBytes);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
Expand All @@ -278,7 +284,7 @@ public void testRealtimeRun() throws Exception {

// Execute a third time, but this time make sure we exceed the data check interval, but keep the delayedDataDetector response
// the same
currentTime = 62000L + DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS + 1;
currentTime = currentTime + DELAYED_DATA_FREQ_HALF;
inputStream = new ByteArrayInputStream(contentBytes);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
when(dataExtractor.next()).thenReturn(Optional.of(inputStream));
Expand Down Expand Up @@ -313,7 +319,7 @@ public void testRealtimeRun() throws Exception {
IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(0);
assertThat(indexRequest.index(), equalTo(AnnotationIndex.WRITE_ALIAS_NAME));
assertThat(indexRequest.id(), nullValue());
assertThat(indexRequest.source(), equalTo(expectedSource));
assertThat(indexRequest.source().utf8ToString(), equalTo(expectedSource.utf8ToString()));
assertThat(indexRequest.opType(), equalTo(DocWriteRequest.OpType.INDEX));
}

Expand All @@ -325,7 +331,7 @@ public void testRealtimeRun() throws Exception {
when(delayedDataDetector.detectMissingData(2000))
.thenReturn(Arrays.asList(BucketWithMissingData.fromMissingAndBucket(10, bucket),
BucketWithMissingData.fromMissingAndBucket(5, bucket2)));
currentTime = currentTime + DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS + 1;
currentTime = currentTime + DELAYED_DATA_WINDOW + 1;
inputStream = new ByteArrayInputStream(contentBytes);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
when(dataExtractor.next()).thenReturn(Optional.of(inputStream));
Expand Down Expand Up @@ -365,7 +371,7 @@ public void testRealtimeRun() throws Exception {
}

// Execute a fifth time, no changes should occur as annotation is the same
currentTime = currentTime + DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS + 1;
currentTime = currentTime + DELAYED_DATA_WINDOW + 1;
inputStream = new ByteArrayInputStream(contentBytes);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
when(dataExtractor.next()).thenReturn(Optional.of(inputStream));
Expand Down Expand Up @@ -491,10 +497,16 @@ public void testFlushAnalysisProblemIsConflict() {

private DatafeedJob createDatafeedJob(long frequencyMs, long queryDelayMs, long latestFinalBucketEndTimeMs,
long latestRecordTimeMs, boolean haveSeenDataPreviously) {
return createDatafeedJob(frequencyMs, queryDelayMs, latestFinalBucketEndTimeMs, latestRecordTimeMs, haveSeenDataPreviously,
DELAYED_DATA_CHECK_FREQ.get(Settings.EMPTY).millis());
}

private DatafeedJob createDatafeedJob(long frequencyMs, long queryDelayMs, long latestFinalBucketEndTimeMs,
long latestRecordTimeMs, boolean haveSeenDataPreviously, long delayedDataFreq) {
Supplier<Long> currentTimeSupplier = () -> currentTime;
return new DatafeedJob(jobId, dataDescription.build(), frequencyMs, queryDelayMs, dataExtractorFactory, timingStatsReporter,
client, auditor, new AnnotationPersister(resultsPersisterService), currentTimeSupplier,
delayedDataDetector, null, latestFinalBucketEndTimeMs, latestRecordTimeMs, haveSeenDataPreviously);
delayedDataDetector, null, latestFinalBucketEndTimeMs, latestRecordTimeMs, haveSeenDataPreviously, delayedDataFreq);
}

@SuppressWarnings("unchecked")
Expand Down

0 comments on commit c321932

Please sign in to comment.