From 750defabbb7fa619eacbe83f22c350b6946f3f86 Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Thu, 24 Jun 2021 12:47:38 -0700 Subject: [PATCH] Bug fix in EntityColdStartWorker EntityColdStartWorker used COLD_ENTITY_QUEUE_CONCURRENCY instead of ENTITY_COLD_START_QUEUE_CONCURRENCY. This PR fixes the bug. This PR also made changes to AnomalyDetectorSettings.java by * removing unused setting COLD_ENTITY_QUEUE_CONCURRENCY * renaming setting from entity_coldstart to entity_cold_start to be consistent Testing done: 1. Added unit tests for the changed code. --- .../opensearch/ad/AnomalyDetectorPlugin.java | 3 +- .../ad/ratelimit/EntityColdStartWorker.java | 4 +- .../ad/settings/AnomalyDetectorSettings.java | 17 +-- .../ratelimit/AbstractRateLimitingTest.java | 5 + .../ratelimit/CheckpointReadWorkerTests.java | 5 - .../ad/ratelimit/ColdEntityWorkerTests.java | 16 +++ .../ratelimit/EntityColdStartWorkerTests.java | 119 ++++++++++++++++++ .../AnomalyDetectorSettingsTests.java | 3 +- 8 files changed, 146 insertions(+), 26 deletions(-) create mode 100644 src/test/java/org/opensearch/ad/ratelimit/ColdEntityWorkerTests.java diff --git a/src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java b/src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java index ca518bca4..e8fdb2852 100644 --- a/src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java +++ b/src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java @@ -824,12 +824,11 @@ public List> getSettings() { // rate limiting AnomalyDetectorSettings.CHECKPOINT_READ_QUEUE_CONCURRENCY, AnomalyDetectorSettings.CHECKPOINT_WRITE_QUEUE_CONCURRENCY, - AnomalyDetectorSettings.ENTITY_COLDSTART_QUEUE_CONCURRENCY, + AnomalyDetectorSettings.ENTITY_COLD_START_QUEUE_CONCURRENCY, AnomalyDetectorSettings.RESULT_WRITE_QUEUE_CONCURRENCY, AnomalyDetectorSettings.CHECKPOINT_READ_QUEUE_BATCH_SIZE, AnomalyDetectorSettings.CHECKPOINT_WRITE_QUEUE_BATCH_SIZE, AnomalyDetectorSettings.RESULT_WRITE_QUEUE_BATCH_SIZE, - AnomalyDetectorSettings.COLD_ENTITY_QUEUE_CONCURRENCY, AnomalyDetectorSettings.COLD_ENTITY_QUEUE_MAX_HEAP_PERCENT, AnomalyDetectorSettings.CHECKPOINT_READ_QUEUE_MAX_HEAP_PERCENT, AnomalyDetectorSettings.CHECKPOINT_WRITE_QUEUE_MAX_HEAP_PERCENT, diff --git a/src/main/java/org/opensearch/ad/ratelimit/EntityColdStartWorker.java b/src/main/java/org/opensearch/ad/ratelimit/EntityColdStartWorker.java index 9533466c7..b02fbc36a 100644 --- a/src/main/java/org/opensearch/ad/ratelimit/EntityColdStartWorker.java +++ b/src/main/java/org/opensearch/ad/ratelimit/EntityColdStartWorker.java @@ -11,7 +11,7 @@ package org.opensearch.ad.ratelimit; -import static org.opensearch.ad.settings.AnomalyDetectorSettings.COLD_ENTITY_QUEUE_CONCURRENCY; +import static org.opensearch.ad.settings.AnomalyDetectorSettings.ENTITY_COLD_START_QUEUE_CONCURRENCY; import java.time.Clock; import java.time.Duration; @@ -83,7 +83,7 @@ public EntityColdStartWorker( mediumSegmentPruneRatio, lowSegmentPruneRatio, maintenanceFreqConstant, - COLD_ENTITY_QUEUE_CONCURRENCY, + ENTITY_COLD_START_QUEUE_CONCURRENCY, executionTtl, stateTtl, nodeStateManager diff --git a/src/main/java/org/opensearch/ad/settings/AnomalyDetectorSettings.java b/src/main/java/org/opensearch/ad/settings/AnomalyDetectorSettings.java index 7f308f86f..0d67978d6 100644 --- a/src/main/java/org/opensearch/ad/settings/AnomalyDetectorSettings.java +++ b/src/main/java/org/opensearch/ad/settings/AnomalyDetectorSettings.java @@ -564,9 +564,9 @@ private AnomalyDetectorSettings() {} /** * Max concurrent entity cold starts per node */ - public static final Setting ENTITY_COLDSTART_QUEUE_CONCURRENCY = Setting + public static final Setting ENTITY_COLD_START_QUEUE_CONCURRENCY = Setting .intSetting( - "plugins.anomaly_detection.entity_coldstart_queue_concurrency", + "plugins.anomaly_detection.entity_cold_start_queue_concurrency", 1, 1, 10, @@ -614,19 +614,6 @@ private AnomalyDetectorSettings() {} Setting.Property.Dynamic ); - /** - * Max concurrent cold entity processing per node - */ - public static final Setting COLD_ENTITY_QUEUE_CONCURRENCY = Setting - .intSetting( - "plugins.anomaly_detection.cold_entity_queue_concurrency", - 1, - 1, - 10, - Setting.Property.NodeScope, - Setting.Property.Dynamic - ); - /** * Assume each checkpoint takes roughly 200KB. 25 requests are of 5 MB. */ diff --git a/src/test/java/org/opensearch/ad/ratelimit/AbstractRateLimitingTest.java b/src/test/java/org/opensearch/ad/ratelimit/AbstractRateLimitingTest.java index cd01e60d8..bf587c15b 100644 --- a/src/test/java/org/opensearch/ad/ratelimit/AbstractRateLimitingTest.java +++ b/src/test/java/org/opensearch/ad/ratelimit/AbstractRateLimitingTest.java @@ -26,6 +26,7 @@ import org.opensearch.ad.NodeStateManager; import org.opensearch.ad.TestHelpers; import org.opensearch.ad.model.AnomalyDetector; +import org.opensearch.ad.model.Entity; import org.opensearch.threadpool.ThreadPool; public class AbstractRateLimitingTest extends AbstractADTest { @@ -34,6 +35,7 @@ public class AbstractRateLimitingTest extends AbstractADTest { NodeStateManager nodeStateManager; String detectorId; String categoryField; + Entity entity, entity2; @SuppressWarnings("unchecked") @Override @@ -56,5 +58,8 @@ public void setUp() throws Exception { listener.onResponse(Optional.of(detector)); return null; }).when(nodeStateManager).getAnomalyDetector(any(String.class), any(ActionListener.class)); + + entity = Entity.createSingleAttributeEntity(detectorId, categoryField, "value"); + entity2 = Entity.createSingleAttributeEntity(detectorId, categoryField, "value2"); } } diff --git a/src/test/java/org/opensearch/ad/ratelimit/CheckpointReadWorkerTests.java b/src/test/java/org/opensearch/ad/ratelimit/CheckpointReadWorkerTests.java index ed7efd3d1..f995a2170 100644 --- a/src/test/java/org/opensearch/ad/ratelimit/CheckpointReadWorkerTests.java +++ b/src/test/java/org/opensearch/ad/ratelimit/CheckpointReadWorkerTests.java @@ -50,7 +50,6 @@ import org.opensearch.ad.ml.ModelManager; import org.opensearch.ad.ml.ModelState; import org.opensearch.ad.ml.ThresholdingResult; -import org.opensearch.ad.model.Entity; import org.opensearch.ad.settings.AnomalyDetectorSettings; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; @@ -80,7 +79,6 @@ public class CheckpointReadWorkerTests extends AbstractRateLimitingTest { ResultWriteWorker resultWriteQueue; AnomalyDetectionIndices anomalyDetectionIndices; CacheProvider cacheProvider; - Entity entity, entity2; EntityCache entityCache; EntityFeatureRequest request, request2; @@ -154,9 +152,6 @@ public void setUp() throws Exception { checkpointWriteQueue ); - entity = Entity.createSingleAttributeEntity(detectorId, categoryField, "value"); - entity2 = Entity.createSingleAttributeEntity(detectorId, categoryField, "value2"); - request = new EntityFeatureRequest(Integer.MAX_VALUE, detectorId, RequestPriority.MEDIUM, entity, new double[] { 0 }, 0); request2 = new EntityFeatureRequest(Integer.MAX_VALUE, detectorId, RequestPriority.MEDIUM, entity2, new double[] { 0 }, 0); } diff --git a/src/test/java/org/opensearch/ad/ratelimit/ColdEntityWorkerTests.java b/src/test/java/org/opensearch/ad/ratelimit/ColdEntityWorkerTests.java new file mode 100644 index 000000000..ffb19e8a2 --- /dev/null +++ b/src/test/java/org/opensearch/ad/ratelimit/ColdEntityWorkerTests.java @@ -0,0 +1,16 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.ad.ratelimit; + +public class ColdEntityWorkerTests { + +} diff --git a/src/test/java/org/opensearch/ad/ratelimit/EntityColdStartWorkerTests.java b/src/test/java/org/opensearch/ad/ratelimit/EntityColdStartWorkerTests.java index 56cb98339..77dc78015 100644 --- a/src/test/java/org/opensearch/ad/ratelimit/EntityColdStartWorkerTests.java +++ b/src/test/java/org/opensearch/ad/ratelimit/EntityColdStartWorkerTests.java @@ -11,9 +11,128 @@ package org.opensearch.ad.ratelimit; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.Random; + +import org.opensearch.OpenSearchStatusException; +import org.opensearch.action.ActionListener; +import org.opensearch.ad.breaker.ADCircuitBreakerService; +import org.opensearch.ad.ml.EntityColdStarter; +import org.opensearch.ad.settings.AnomalyDetectorSettings; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; +import org.opensearch.rest.RestStatus; public class EntityColdStartWorkerTests extends AbstractRateLimitingTest { ClusterService clusterService; + EntityColdStartWorker worker; + EntityColdStarter entityColdStarter; + + @Override + public void setUp() throws Exception { + super.setUp(); + clusterService = mock(ClusterService.class); + ClusterSettings clusterSettings = new ClusterSettings( + Settings.EMPTY, + Collections + .unmodifiableSet( + new HashSet<>( + Arrays + .asList( + AnomalyDetectorSettings.ENTITY_COLD_START_QUEUE_MAX_HEAP_PERCENT, + AnomalyDetectorSettings.ENTITY_COLD_START_QUEUE_CONCURRENCY + ) + ) + ) + ); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + + entityColdStarter = mock(EntityColdStarter.class); + + // Integer.MAX_VALUE makes a huge heap + worker = new EntityColdStartWorker( + Integer.MAX_VALUE, + AnomalyDetectorSettings.ENTITY_REQUEST_SIZE_IN_BYTES, + AnomalyDetectorSettings.ENTITY_COLD_START_QUEUE_MAX_HEAP_PERCENT, + clusterService, + new Random(42), + mock(ADCircuitBreakerService.class), + threadPool, + Settings.EMPTY, + AnomalyDetectorSettings.MAX_QUEUED_TASKS_RATIO, + clock, + AnomalyDetectorSettings.MEDIUM_SEGMENT_PRUNE_RATIO, + AnomalyDetectorSettings.LOW_SEGMENT_PRUNE_RATIO, + AnomalyDetectorSettings.MAINTENANCE_FREQ_CONSTANT, + AnomalyDetectorSettings.QUEUE_MAINTENANCE, + entityColdStarter, + AnomalyDetectorSettings.HOURLY_MAINTENANCE, + nodeStateManager + ); + } + + public void testEmptyModelId() { + EntityRequest request = mock(EntityRequest.class); + when(request.getPriority()).thenReturn(RequestPriority.LOW); + when(request.getModelId()).thenReturn(Optional.empty()); + worker.put(request); + verify(entityColdStarter, never()).trainModel(any(), anyString(), any(), any()); + verify(request, times(1)).getModelId(); + } + + public void testOverloaded() { + EntityRequest request = new EntityRequest(Integer.MAX_VALUE, detectorId, RequestPriority.MEDIUM, entity); + + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(3); + listener.onFailure(new OpenSearchRejectedExecutionException("blah", true)); + + return null; + }).when(entityColdStarter).trainModel(any(), anyString(), any(), any()); + + worker.put(request); + + verify(entityColdStarter, times(1)).trainModel(any(), anyString(), any(), any()); + verify(nodeStateManager, times(1)).setException(eq(detectorId), any(OpenSearchRejectedExecutionException.class)); + + // 2nd put request won't trigger anything as we are in cooldown mode + worker.put(request); + verify(entityColdStarter, times(1)).trainModel(any(), anyString(), any(), any()); + } + + public void testException() { + EntityRequest request = new EntityRequest(Integer.MAX_VALUE, detectorId, RequestPriority.MEDIUM, entity); + + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(3); + listener.onFailure(new OpenSearchStatusException("blah", RestStatus.REQUEST_TIMEOUT)); + + return null; + }).when(entityColdStarter).trainModel(any(), anyString(), any(), any()); + + worker.put(request); + + verify(entityColdStarter, times(1)).trainModel(any(), anyString(), any(), any()); + verify(nodeStateManager, times(1)).setException(eq(detectorId), any(OpenSearchStatusException.class)); + // 2nd put request triggers another setException + worker.put(request); + verify(entityColdStarter, times(2)).trainModel(any(), anyString(), any(), any()); + verify(nodeStateManager, times(2)).setException(eq(detectorId), any(OpenSearchStatusException.class)); + } } diff --git a/src/test/java/org/opensearch/ad/settings/AnomalyDetectorSettingsTests.java b/src/test/java/org/opensearch/ad/settings/AnomalyDetectorSettingsTests.java index 09dd19a37..8c2b40b0a 100644 --- a/src/test/java/org/opensearch/ad/settings/AnomalyDetectorSettingsTests.java +++ b/src/test/java/org/opensearch/ad/settings/AnomalyDetectorSettingsTests.java @@ -118,12 +118,11 @@ public void testAllOpenSearchSettingsReturned() { AnomalyDetectorSettings.BATCH_TASK_PIECE_SIZE, AnomalyDetectorSettings.CHECKPOINT_READ_QUEUE_CONCURRENCY, AnomalyDetectorSettings.CHECKPOINT_WRITE_QUEUE_CONCURRENCY, - AnomalyDetectorSettings.ENTITY_COLDSTART_QUEUE_CONCURRENCY, + AnomalyDetectorSettings.ENTITY_COLD_START_QUEUE_CONCURRENCY, AnomalyDetectorSettings.RESULT_WRITE_QUEUE_CONCURRENCY, AnomalyDetectorSettings.CHECKPOINT_READ_QUEUE_BATCH_SIZE, AnomalyDetectorSettings.CHECKPOINT_WRITE_QUEUE_BATCH_SIZE, AnomalyDetectorSettings.RESULT_WRITE_QUEUE_BATCH_SIZE, - AnomalyDetectorSettings.COLD_ENTITY_QUEUE_CONCURRENCY, AnomalyDetectorSettings.DEDICATED_CACHE_SIZE, AnomalyDetectorSettings.COLD_ENTITY_QUEUE_MAX_HEAP_PERCENT, AnomalyDetectorSettings.CHECKPOINT_READ_QUEUE_MAX_HEAP_PERCENT,