diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index b3ee367699116..273f51bdf9af1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -530,6 +530,19 @@ public Set getRoles() { Property.OperatorDynamic, Property.NodeScope); + /** + * This is the global setting for how often datafeeds should check for delayed data. + * + * This is usually only modified by tests that require all datafeeds to check for delayed data more quickly + */ + public static final Setting DELAYED_DATA_CHECK_FREQ = Setting.timeSetting( + "xpack.ml.delayed_data_check_freq", + TimeValue.timeValueMinutes(15), + TimeValue.timeValueSeconds(1), + Property.Dynamic, + Setting.Property.NodeScope + ); + private static final Logger logger = LogManager.getLogger(MachineLearning.class); private final Settings settings; @@ -584,7 +597,8 @@ public List> getSettings() { ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES, NIGHTLY_MAINTENANCE_REQUESTS_PER_SECOND, USE_AUTO_MACHINE_MEMORY_PERCENT, - MAX_ML_NODE_SIZE)); + MAX_ML_NODE_SIZE, + DELAYED_DATA_CHECK_FREQ)); } public Settings additionalSettings() { @@ -766,8 +780,7 @@ public Collection createComponents(Client client, ClusterService cluster jobDataCountsPersister, anomalyDetectionAnnotationPersister, autodetectProcessFactory, normalizerFactory, nativeStorageProvider, indexNameExpressionResolver); this.autodetectProcessManager.set(autodetectProcessManager); - DatafeedJobBuilder datafeedJobBuilder = - new DatafeedJobBuilder( + DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder( client, xContentRegistry, anomalyDetectionAuditor, @@ -775,7 +788,8 @@ public Collection createComponents(Client client, ClusterService cluster System::currentTimeMillis, jobResultsPersister, settings, - clusterService.getNodeName()); + clusterService + ); DatafeedContextProvider datafeedContextProvider = new DatafeedContextProvider(jobConfigProvider, datafeedConfigProvider, jobResultsProvider); DatafeedRunner datafeedRunner = new DatafeedRunner(threadPool, client, clusterService, datafeedJobBuilder, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index 9646f08000dff..fe6c5f61f60a7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -53,7 +53,6 @@ class DatafeedJob { private static final Logger LOGGER = LogManager.getLogger(DatafeedJob.class); private static final int NEXT_TASK_DELAY_MS = 100; - static final long MISSING_DATA_CHECK_INTERVAL_MS = 900_000; //15 minutes in ms private final AnomalyDetectionAuditor auditor; private final AnnotationPersister annotationPersister; @@ -67,6 +66,7 @@ class DatafeedJob { private final Supplier currentTimeSupplier; private final DelayedDataDetector delayedDataDetector; private final Integer maxEmptySearches; + private final long delayedDataCheckFreq; private volatile long lookbackStartTimeMs; private volatile long latestFinalBucketEndTimeMs; @@ -82,7 +82,7 @@ class DatafeedJob { DataExtractorFactory dataExtractorFactory, DatafeedTimingStatsReporter timingStatsReporter, Client client, AnomalyDetectionAuditor auditor, AnnotationPersister annotationPersister, Supplier currentTimeSupplier, DelayedDataDetector delayedDataDetector, Integer maxEmptySearches, long latestFinalBucketEndTimeMs, long latestRecordTimeMs, - boolean haveSeenDataPreviously) { + boolean haveSeenDataPreviously, long delayedDataCheckFreq) { this.jobId = jobId; this.dataDescription = Objects.requireNonNull(dataDescription); this.frequencyMs = frequencyMs; @@ -101,6 +101,7 @@ class DatafeedJob { lastEndTimeMs = lastEndTime; } this.haveEverSeenData = haveSeenDataPreviously; + this.delayedDataCheckFreq = delayedDataCheckFreq; } void isolate() { @@ -294,7 +295,7 @@ private Annotation updateAnnotation(Annotation annotation) { } /** - * We wait a static interval of 15 minutes till the next missing data check. + * We wait for `delayedDataCheckFreq` interval till the next missing data check. * * However, if our delayed data window is smaller than that, we will probably want to check at every available window (if freq. allows). * This is to help to miss as few buckets in the delayed data check as possible. @@ -304,8 +305,7 @@ private Annotation updateAnnotation(Annotation annotation) { * probably not even be noticeable at such a large timescale. */ private boolean checkForMissingDataTriggered() { - return this.currentTimeSupplier.get() > this.lastDataCheckTimeMs - + Math.min(MISSING_DATA_CHECK_INTERVAL_MS, delayedDataDetector.getWindow()); + return this.currentTimeSupplier.get() > this.lastDataCheckTimeMs + Math.min(delayedDataCheckFreq, delayedDataDetector.getWindow()); } /** diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java index ca885ac21bb64..6f57ae5866fec 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java @@ -10,6 +10,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.ParentTaskAssigningClient; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -32,6 +33,8 @@ import java.util.Objects; import java.util.function.Supplier; +import static org.elasticsearch.xpack.ml.MachineLearning.DELAYED_DATA_CHECK_FREQ; + public class DatafeedJobBuilder { private final Client client; @@ -43,9 +46,11 @@ public class DatafeedJobBuilder { private final boolean remoteClusterClient; private final String nodeName; + private volatile long delayedDataCheckFreq; + public DatafeedJobBuilder(Client client, NamedXContentRegistry xContentRegistry, AnomalyDetectionAuditor auditor, AnnotationPersister annotationPersister, Supplier currentTimeSupplier, - JobResultsPersister jobResultsPersister, Settings settings, String nodeName) { + JobResultsPersister jobResultsPersister, Settings settings, ClusterService clusterService) { this.client = client; this.xContentRegistry = Objects.requireNonNull(xContentRegistry); this.auditor = Objects.requireNonNull(auditor); @@ -53,7 +58,13 @@ public DatafeedJobBuilder(Client client, NamedXContentRegistry xContentRegistry, this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier); this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister); this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings); - this.nodeName = nodeName; + this.nodeName = clusterService.getNodeName(); + this.delayedDataCheckFreq = DELAYED_DATA_CHECK_FREQ.get(settings).millis(); + clusterService.getClusterSettings().addSettingsUpdateConsumer(DELAYED_DATA_CHECK_FREQ, this::setDelayedDataCheckFreq); + } + + private void setDelayedDataCheckFreq(TimeValue value) { + this.delayedDataCheckFreq = value.millis(); } void build(TransportStartDatafeedAction.DatafeedTask task, DatafeedContext context, ActionListener listener) { @@ -90,8 +101,7 @@ void build(TransportStartDatafeedAction.DatafeedTask task, DatafeedContext conte TimeValue queryDelay = datafeedConfig.getQueryDelay(); DelayedDataDetector delayedDataDetector = DelayedDataDetectorFactory.buildDetector(job, datafeedConfig, parentTaskAssigningClient, xContentRegistry); - DatafeedJob datafeedJob = - new DatafeedJob( + DatafeedJob datafeedJob = new DatafeedJob( job.getId(), buildDataDescription(job), frequency.millis(), @@ -106,7 +116,9 @@ void build(TransportStartDatafeedAction.DatafeedTask task, DatafeedContext conte datafeedConfig.getMaxEmptySearches(), latestFinalBucketEndMs, latestRecordTimeMs, - context.getRestartTimeInfo().haveSeenDataPreviously()); + context.getRestartTimeInfo().haveSeenDataPreviously(), + delayedDataCheckFreq + ); listener.onResponse(datafeedJob); }, e -> { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java index db4e69f1d4370..4e2a9300631cf 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java @@ -8,8 +8,15 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; +import org.elasticsearch.cluster.routing.OperationRouting; +import org.elasticsearch.cluster.service.ClusterApplierService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.node.Node; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -18,6 +25,7 @@ import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.messages.Messages; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction; import org.elasticsearch.xpack.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; @@ -25,10 +33,11 @@ import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import org.junit.Before; +import java.util.Arrays; import java.util.Collections; import java.util.Date; +import java.util.HashSet; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; import static org.elasticsearch.test.NodeRoles.nonRemoteClusterClientNode; import static org.hamcrest.Matchers.equalTo; @@ -42,8 +51,8 @@ public class DatafeedJobBuilderTests extends ESTestCase { private Client client; private AnomalyDetectionAuditor auditor; private AnnotationPersister annotationPersister; - private Consumer taskHandler; private JobResultsPersister jobResultsPersister; + private ClusterService clusterService; private DatafeedJobBuilder datafeedJobBuilder; @@ -57,11 +66,21 @@ public void init() { when(client.settings()).thenReturn(Settings.EMPTY); auditor = mock(AnomalyDetectionAuditor.class); annotationPersister = mock(AnnotationPersister.class); - taskHandler = mock(Consumer.class); jobResultsPersister = mock(JobResultsPersister.class); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, + new HashSet<>(Arrays.asList(MachineLearning.DELAYED_DATA_CHECK_FREQ, + MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, + OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, + AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, + ClusterService.USER_DEFINED_METADATA, + ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING))); + clusterService = new ClusterService( + Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test_node").build(), + clusterSettings, + threadPool + ); - datafeedJobBuilder = - new DatafeedJobBuilder( + datafeedJobBuilder = new DatafeedJobBuilder( client, xContentRegistry(), auditor, @@ -69,7 +88,8 @@ public void init() { System::currentTimeMillis, jobResultsPersister, Settings.EMPTY, - "test_node"); + clusterService + ); } public void testBuild_GivenScrollDatafeedAndNewJob() throws Exception { @@ -169,16 +189,16 @@ public void testBuild_GivenScrollDatafeedAndOldJobWithLatestBucketAfterLatestRec } public void testBuildGivenRemoteIndicesButNoRemoteSearching() throws Exception { - datafeedJobBuilder = - new DatafeedJobBuilder( - client, - xContentRegistry(), - auditor, - annotationPersister, - System::currentTimeMillis, - jobResultsPersister, - nonRemoteClusterClientNode(), - "test_node"); + datafeedJobBuilder = new DatafeedJobBuilder( + client, + xContentRegistry(), + auditor, + annotationPersister, + System::currentTimeMillis, + jobResultsPersister, + nonRemoteClusterClientNode(), + clusterService + ); DataDescription.Builder dataDescription = new DataDescription.Builder(); dataDescription.setTimeField("time"); Job.Builder jobBuilder = DatafeedRunnerTests.createDatafeedJob(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java index 452e50290e4d7..d80368bab1390 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.xcontent.XContentElasticsearchExtension; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.mock.orig.Mockito; import org.elasticsearch.test.ESTestCase; @@ -65,6 +66,7 @@ import java.util.Optional; import java.util.function.Supplier; +import static org.elasticsearch.xpack.ml.MachineLearning.DELAYED_DATA_CHECK_FREQ; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; @@ -85,6 +87,10 @@ public class DatafeedJobTests extends ESTestCase { private static final String jobId = "_job_id"; + private static final long DELAYED_DATA_WINDOW = TimeValue.timeValueMinutes(15).millis(); + private static final long DELAYED_DATA_FREQ = TimeValue.timeValueMinutes(2).millis(); + private static final long DELAYED_DATA_FREQ_HALF = TimeValue.timeValueMinutes(1).millis(); + private AnomalyDetectionAuditor auditor; private DataExtractorFactory dataExtractorFactory; private DataExtractor dataExtractor; @@ -124,7 +130,7 @@ public void setup() throws Exception { annotationDocId = "AnnotationDocId"; flushJobResponse = new FlushJobAction.Response(true, Instant.now()); delayedDataDetector = mock(DelayedDataDetector.class); - when(delayedDataDetector.getWindow()).thenReturn(DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS); + when(delayedDataDetector.getWindow()).thenReturn(DELAYED_DATA_WINDOW); currentTime = 0; xContentType = XContentType.JSON; @@ -252,10 +258,10 @@ public void testRealtimeRun() throws Exception { when(client.execute(same(FlushJobAction.INSTANCE), flushJobRequests.capture())).thenReturn(flushJobFuture); when(delayedDataDetector.detectMissingData(2000)) .thenReturn(Collections.singletonList(BucketWithMissingData.fromMissingAndBucket(10, bucket))); - currentTime = 60000L; + currentTime = DELAYED_DATA_FREQ_HALF; long frequencyMs = 100; long queryDelayMs = 1000; - DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1, false); + DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1, false, DELAYED_DATA_FREQ); long next = datafeedJob.runRealtime(); assertEquals(currentTime + frequencyMs + 100, next); @@ -268,7 +274,7 @@ public void testRealtimeRun() throws Exception { verify(client, never()).execute(same(PersistJobAction.INSTANCE), any()); // Execute a second valid time, but do so in a smaller window than the interval - currentTime = 62000L; + currentTime = currentTime + DELAYED_DATA_FREQ_HALF - 1; byte[] contentBytes = "content".getBytes(StandardCharsets.UTF_8); InputStream inputStream = new ByteArrayInputStream(contentBytes); when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); @@ -278,7 +284,7 @@ public void testRealtimeRun() throws Exception { // Execute a third time, but this time make sure we exceed the data check interval, but keep the delayedDataDetector response // the same - currentTime = 62000L + DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS + 1; + currentTime = currentTime + DELAYED_DATA_FREQ_HALF; inputStream = new ByteArrayInputStream(contentBytes); when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); when(dataExtractor.next()).thenReturn(Optional.of(inputStream)); @@ -313,7 +319,7 @@ public void testRealtimeRun() throws Exception { IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(0); assertThat(indexRequest.index(), equalTo(AnnotationIndex.WRITE_ALIAS_NAME)); assertThat(indexRequest.id(), nullValue()); - assertThat(indexRequest.source(), equalTo(expectedSource)); + assertThat(indexRequest.source().utf8ToString(), equalTo(expectedSource.utf8ToString())); assertThat(indexRequest.opType(), equalTo(DocWriteRequest.OpType.INDEX)); } @@ -325,7 +331,7 @@ public void testRealtimeRun() throws Exception { when(delayedDataDetector.detectMissingData(2000)) .thenReturn(Arrays.asList(BucketWithMissingData.fromMissingAndBucket(10, bucket), BucketWithMissingData.fromMissingAndBucket(5, bucket2))); - currentTime = currentTime + DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS + 1; + currentTime = currentTime + DELAYED_DATA_WINDOW + 1; inputStream = new ByteArrayInputStream(contentBytes); when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); when(dataExtractor.next()).thenReturn(Optional.of(inputStream)); @@ -365,7 +371,7 @@ public void testRealtimeRun() throws Exception { } // Execute a fifth time, no changes should occur as annotation is the same - currentTime = currentTime + DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS + 1; + currentTime = currentTime + DELAYED_DATA_WINDOW + 1; inputStream = new ByteArrayInputStream(contentBytes); when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); when(dataExtractor.next()).thenReturn(Optional.of(inputStream)); @@ -491,10 +497,16 @@ public void testFlushAnalysisProblemIsConflict() { private DatafeedJob createDatafeedJob(long frequencyMs, long queryDelayMs, long latestFinalBucketEndTimeMs, long latestRecordTimeMs, boolean haveSeenDataPreviously) { + return createDatafeedJob(frequencyMs, queryDelayMs, latestFinalBucketEndTimeMs, latestRecordTimeMs, haveSeenDataPreviously, + DELAYED_DATA_CHECK_FREQ.get(Settings.EMPTY).millis()); + } + + private DatafeedJob createDatafeedJob(long frequencyMs, long queryDelayMs, long latestFinalBucketEndTimeMs, + long latestRecordTimeMs, boolean haveSeenDataPreviously, long delayedDataFreq) { Supplier currentTimeSupplier = () -> currentTime; return new DatafeedJob(jobId, dataDescription.build(), frequencyMs, queryDelayMs, dataExtractorFactory, timingStatsReporter, client, auditor, new AnnotationPersister(resultsPersisterService), currentTimeSupplier, - delayedDataDetector, null, latestFinalBucketEndTimeMs, latestRecordTimeMs, haveSeenDataPreviously); + delayedDataDetector, null, latestFinalBucketEndTimeMs, latestRecordTimeMs, haveSeenDataPreviously, delayedDataFreq); } @SuppressWarnings("unchecked")