Skip to content

Commit

Permalink
fix bugs; tune max running entities setting; add more UT
Browse files Browse the repository at this point in the history
  • Loading branch information
ylwu-amzn committed Sep 3, 2021
1 parent f63ba3e commit 96b914a
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ private AnomalyDetectorSettings() {}
public static final Setting<Integer> MAX_RUNNING_ENTITIES_PER_DETECTOR_FOR_HISTORICAL_ANALYSIS = Setting
.intSetting(
"plugins.anomaly_detection.max_running_entities_per_detector_for_historical_analysis",
1,
2,
1,
1000,
Setting.Property.NodeScope,
Expand Down
23 changes: 21 additions & 2 deletions src/main/java/org/opensearch/ad/task/ADTaskCacheManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,26 @@

public class ADTaskCacheManager {
private final Logger logger = LogManager.getLogger(ADTaskCacheManager.class);
private final Map<String, ADBatchTaskCache> taskCaches;

private volatile Integer maxAdBatchTaskPerNode;
private volatile Integer maxCachedDeletedTask;
private final MemoryTracker memoryTracker;
private final int numberSize = 8;
public static final int TASK_RETRY_LIMIT = 3;

// This field is to record all batch tasks. Both single entity detector task
// and HC entity task will be cached in this field.
// Key: task id
private final Map<String, ADBatchTaskCache> taskCaches;

// We use this field to record all detector level tasks which running on the
// coordinating node to resolve race condition. We will check if
// detector id exists in cache or not first. If user starts
// multiple tasks for the same detector, we will put the first
// task in cache. For other tasks, we find the detector id exists,
// that means there is already one task running for this detector,
// so we will reject the task.
// Key: detector id; Value: detector level task id
private Map<String, String> detectorTasks;

// Use this field to cache all HC tasks. Key is detector id
Expand All @@ -91,7 +97,7 @@ public class ADTaskCacheManager {

// This field is to cache all realtime tasks. Key is detector id
private Map<String, ADRealtimeTaskCache> realtimeTaskCaches;
// This field is to cache all detectors' task slot and task lane limit
// This field is to cache all detectors' task slot and task lane limit. Key is detector id
private Map<String, ADTaskSlotLimit> detectorTaskSlotLimit;

/**
Expand Down Expand Up @@ -399,6 +405,18 @@ public void removeHistoricalTaskCache(String detectorId) {
taskCache.clear();
hcTaskCaches.remove(detectorId);
}
List<String> tasksOfDetector = getTasksOfDetector(detectorId);
for (String taskId : tasksOfDetector) {
remove(taskId);
}
if (tasksOfDetector.size() > 0) {
logger
.warn(
"Removed historical AD task from cache for detector {}, taskId: {}",
detectorId,
Arrays.toString(tasksOfDetector.toArray(new String[0]))
);
}
if (detectorTasks.containsKey(detectorId)) {
detectorTasks.remove(detectorId);
logger.info("Removed detector from AD task coordinating node cache, detectorId: " + detectorId);
Expand Down Expand Up @@ -681,6 +699,7 @@ public synchronized int scaleDownHCDetectorTaskSlots(String detectorId, int delt
if (adTaskSlotLimit != null && delta > 0) {
int newTaskSlots = taskSlots - delta;
if (newTaskSlots > 0) {
taskSlots = newTaskSlots;
logger.info("Scale down task slots of detector {} from {} to {}", detectorId, taskSlots, newTaskSlots);
adTaskSlotLimit.setDetectorTaskSlots(newTaskSlots);
return newTaskSlots;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,20 @@ protected void doExecute(Task task, ForwardADTaskRequest request, ActionListener
}, e -> listener.onFailure(e)));
break;
case FINISHED:
logger.debug("Received FINISHED action for detector {}", detectorId);
boolean historicalTask = adTask.isHistoricalTask();
logger
.debug(
"Received FINISHED action for detector {}, taskId: {}, historical: {}",
detectorId,
adTask.getTaskId(),
historicalTask
);
// Historical analysis finished, so we need to remove detector cache. Only single entity detectors use this.
adTaskCacheManager.removeHistoricalTaskCache(detectorId);
adTaskCacheManager.removeRealtimeTaskCache(detectorId);
if (historicalTask) {
adTaskCacheManager.removeHistoricalTaskCache(detectorId);
} else {
adTaskCacheManager.removeRealtimeTaskCache(detectorId);
}
listener.onResponse(new AnomalyDetectorJobResponse(detector.getDetectorId(), 0, 0, 0, RestStatus.OK));
break;
case NEXT_ENTITY:
Expand Down
20 changes: 18 additions & 2 deletions src/test/java/org/opensearch/ad/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -926,13 +926,28 @@ public static ADTask randomAdTask(
Instant executionEndTime,
String stoppedBy,
String detectorId,
AnomalyDetector detector
AnomalyDetector detector,
ADTaskType adTaskType
) {
executionEndTime = executionEndTime == null ? null : executionEndTime.truncatedTo(ChronoUnit.SECONDS);
Entity entity = null;
if (ADTaskType.HISTORICAL_HC_ENTITY == adTaskType) {
List<String> categoryField = detector.getCategoryField();
if (categoryField != null) {
if (categoryField.size() == 1) {
entity = Entity.createSingleAttributeEntity(categoryField.get(0), randomAlphaOfLength(5));
} else if (categoryField.size() == 2) {
entity = Entity
.createEntityByReordering(
ImmutableMap.of(categoryField.get(0), randomAlphaOfLength(5), categoryField.get(1), randomAlphaOfLength(5))
);
}
}
}
ADTask task = ADTask
.builder()
.taskId(taskId)
.taskType(ADTaskType.HISTORICAL_SINGLE_ENTITY.name())
.taskType(adTaskType.name())
.detectorId(detectorId)
.detector(detector)
.state(state.name())
Expand All @@ -947,6 +962,7 @@ public static ADTask randomAdTask(
.lastUpdateTime(Instant.now().truncatedTo(ChronoUnit.SECONDS))
.startedBy(randomAlphaOfLength(5))
.stoppedBy(stoppedBy)
.entity(entity)
.build();
return task;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.opensearch.ad.TestHelpers.AD_BASE_STATS_URI;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.BATCH_TASK_PIECE_INTERVAL_SECONDS;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.MAX_RUNNING_ENTITIES_PER_DETECTOR_FOR_HISTORICAL_ANALYSIS;
import static org.opensearch.ad.stats.StatNames.AD_TOTAL_BATCH_TASK_EXECUTION_COUNT;
import static org.opensearch.ad.stats.StatNames.MULTI_ENTITY_DETECTOR_COUNT;
import static org.opensearch.ad.stats.StatNames.SINGLE_ENTITY_DETECTOR_COUNT;
Expand Down Expand Up @@ -60,7 +61,9 @@ public class HistoricalAnalysisRestApiIT extends HistoricalAnalysisRestTestCase
@Before
@Override
public void setUp() throws Exception {
super.categoryFieldDocCount = 3;
super.setUp();
updateClusterSettings(MAX_RUNNING_ENTITIES_PER_DETECTOR_FOR_HISTORICAL_ANALYSIS.getKey(), 2);
updateClusterSettings(BATCH_TASK_PIECE_INTERVAL_SECONDS.getKey(), 5);
updateClusterSettings(MAX_BATCH_TASK_PER_NODE.getKey(), 10);
}
Expand All @@ -86,11 +89,11 @@ public void testHistoricalAnalysisForMultiCategoryHC() throws Exception {
checkIfTaskCanFinishCorrectly(detectorId, taskId, ADTaskState.FINISHED);
}

private void checkIfTaskCanFinishCorrectly(String detectorId, String taskId, ADTaskState finished) throws InterruptedException {
private void checkIfTaskCanFinishCorrectly(String detectorId, String taskId, ADTaskState state) throws InterruptedException {
ADTaskProfile endTaskProfile = waitUntilTaskDone(detectorId);
ADTask stoppedAdTask = endTaskProfile.getAdTask();
assertEquals(taskId, stoppedAdTask.getTaskId());
assertEquals(finished.name(), stoppedAdTask.getState());
assertEquals(state.name(), stoppedAdTask.getState());
}

@SuppressWarnings("unchecked")
Expand All @@ -107,7 +110,7 @@ private List<String> startHistoricalAnalysis(int categoryFieldSize) throws Excep
if (!ADTaskState.RUNNING.name().equals(adTaskProfile.getAdTask().getState())) {
adTaskProfile = waitUntilTaskReachState(detectorId, ImmutableSet.of(ADTaskState.RUNNING.name()));
}
assertEquals(categoryFieldSize * categoryFieldDocCount, adTaskProfile.getTotalEntitiesCount().intValue());
assertEquals((int) Math.pow(categoryFieldDocCount, categoryFieldSize), adTaskProfile.getTotalEntitiesCount().intValue());
assertTrue(adTaskProfile.getPendingEntitiesCount() > 0);
assertTrue(adTaskProfile.getRunningEntitiesCount() > 0);
}
Expand Down
161 changes: 160 additions & 1 deletion src/test/java/org/opensearch/ad/task/ADTaskCacheManagerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static org.opensearch.ad.task.ADTaskCacheManager.TASK_RETRY_LIMIT;

import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -53,6 +54,7 @@
import org.opensearch.ad.model.ADTask;
import org.opensearch.ad.model.ADTaskState;
import org.opensearch.ad.model.ADTaskType;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -131,12 +133,49 @@ public void testPutDuplicateTask() throws IOException {
adTask1.getExecutionEndTime(),
adTask1.getStoppedBy(),
adTask1.getDetectorId(),
adTask1.getDetector()
adTask1.getDetector(),
ADTaskType.HISTORICAL_SINGLE_ENTITY
);
DuplicateTaskException e2 = expectThrows(DuplicateTaskException.class, () -> adTaskCacheManager.add(adTask2));
assertEquals(DETECTOR_IS_RUNNING, e2.getMessage());
}

public void testPutMultipleEntityTasks() throws IOException {
when(memoryTracker.canAllocateReserved(anyLong())).thenReturn(true);
AnomalyDetector detector = TestHelpers
.randomAnomalyDetector(
ImmutableList.of(TestHelpers.randomFeature(true)),
null,
Instant.now(),
true,
ImmutableList.of(randomAlphaOfLength(5))
);
ADTask adTask1 = TestHelpers
.randomAdTask(
randomAlphaOfLength(5),
ADTaskState.CREATED,
Instant.now(),
null,
detector.getDetectorId(),
detector,
ADTaskType.HISTORICAL_HC_ENTITY
);
ADTask adTask2 = TestHelpers
.randomAdTask(
randomAlphaOfLength(5),
ADTaskState.CREATED,
Instant.now(),
null,
detector.getDetectorId(),
detector,
ADTaskType.HISTORICAL_HC_ENTITY
);
adTaskCacheManager.add(adTask1);
adTaskCacheManager.add(adTask2);
List<String> tasks = adTaskCacheManager.getTasksOfDetector(detector.getDetectorId());
assertEquals(2, tasks.size());
}

public void testAddDetector() throws IOException {
String detectorId = randomAlphaOfLength(10);
ADTask adTask = TestHelpers.randomAdTask(ADTaskType.HISTORICAL_HC_DETECTOR);
Expand Down Expand Up @@ -368,4 +407,124 @@ public void testAcquireTaskUpdatingSemaphore() throws IOException {
assertTrue(adTaskCacheManager.tryAcquireTaskUpdatingSemaphore(detectorId));
assertFalse(adTaskCacheManager.tryAcquireTaskUpdatingSemaphore(detectorId));
}

public void testGetTasksOfDetectorWithNonExistingDetectorId() throws IOException {
List<String> tasksOfDetector = adTaskCacheManager.getTasksOfDetector(randomAlphaOfLength(10));
assertEquals(0, tasksOfDetector.size());
}

public void testHistoricalTaskCache() throws IOException, InterruptedException {
List<String> result = addHCDetectorCache();
String detectorId = result.get(0);
String detectorTaskId = result.get(1);
assertTrue(adTaskCacheManager.containsTaskOfDetector(detectorId));
assertTrue(adTaskCacheManager.isHCTaskCoordinatingNode(detectorId));
assertTrue(adTaskCacheManager.isHCTaskRunning(detectorId));
assertEquals(detectorTaskId, adTaskCacheManager.getDetectorTaskId(detectorId));
Instant lastScaleEntityTaskLaneTime = adTaskCacheManager.getLastScaleEntityTaskLaneTime(detectorId);
assertNotNull(lastScaleEntityTaskLaneTime);
Thread.sleep(500);
adTaskCacheManager.refreshLastScaleEntityTaskLaneTime(detectorId);
assertTrue(lastScaleEntityTaskLaneTime.isBefore(adTaskCacheManager.getLastScaleEntityTaskLaneTime(detectorId)));

adTaskCacheManager.removeHistoricalTaskCache(detectorId);
assertFalse(adTaskCacheManager.containsTaskOfDetector(detectorId));
assertFalse(adTaskCacheManager.isHCTaskCoordinatingNode(detectorId));
assertFalse(adTaskCacheManager.isHCTaskRunning(detectorId));
assertNull(adTaskCacheManager.getDetectorTaskId(detectorId));
assertNull(adTaskCacheManager.getLastScaleEntityTaskLaneTime(detectorId));
}

private List<String> addHCDetectorCache() throws IOException {
when(memoryTracker.canAllocateReserved(anyLong())).thenReturn(true);
AnomalyDetector detector = TestHelpers
.randomAnomalyDetector(
ImmutableList.of(TestHelpers.randomFeature(true)),
null,
Instant.now(),
true,
ImmutableList.of(randomAlphaOfLength(5))
);
String detectorId = detector.getDetectorId();
ADTask adDetectorTask = TestHelpers
.randomAdTask(
randomAlphaOfLength(5),
ADTaskState.CREATED,
Instant.now(),
null,
detectorId,
detector,
ADTaskType.HISTORICAL_HC_DETECTOR
);
ADTask adEntityTask = TestHelpers
.randomAdTask(
randomAlphaOfLength(5),
ADTaskState.CREATED,
Instant.now(),
null,
detectorId,
detector,
ADTaskType.HISTORICAL_HC_ENTITY
);
adTaskCacheManager.add(detectorId, adDetectorTask);
adTaskCacheManager.add(adEntityTask);
assertEquals(adEntityTask.getEntity(), adTaskCacheManager.getEntity(adEntityTask.getTaskId()));
String entityValue = randomAlphaOfLength(5);
adTaskCacheManager.addPendingEntities(detectorId, ImmutableList.of(entityValue));
assertEquals(1, adTaskCacheManager.getUnfinishedEntityCount(detectorId));
return ImmutableList.of(detectorId, adDetectorTask.getTaskId(), adEntityTask.getTaskId(), entityValue);
}

public void testCancelHCDetector() throws IOException {
List<String> result = addHCDetectorCache();
String detectorId = result.get(0);
String entityTaskId = result.get(2);
assertFalse(adTaskCacheManager.isCancelled(entityTaskId));
adTaskCacheManager.cancelByDetectorId(detectorId, "testReason", "testUser");
assertTrue(adTaskCacheManager.isCancelled(entityTaskId));
}

public void testTempEntity() throws IOException {
List<String> result = addHCDetectorCache();
String detectorId = result.get(0);
String entityValue = result.get(3);
assertEquals(1, adTaskCacheManager.getPendingEntityCount(detectorId));
assertEquals(0, adTaskCacheManager.getTempEntityCount(detectorId));
adTaskCacheManager.pollEntity(detectorId);
assertEquals(0, adTaskCacheManager.getPendingEntityCount(detectorId));
assertEquals(1, adTaskCacheManager.getTempEntityCount(detectorId));
adTaskCacheManager.addPendingEntity(detectorId, entityValue);
assertEquals(1, adTaskCacheManager.getPendingEntityCount(detectorId));
assertEquals(0, adTaskCacheManager.getTempEntityCount(detectorId));
assertNotNull(adTaskCacheManager.pollEntity(detectorId));
assertNull(adTaskCacheManager.pollEntity(detectorId));
}

public void testScaleTaskSlots() throws IOException {
List<String> result = addHCDetectorCache();
String detectorId = result.get(0);
int taskSlots = randomIntBetween(6, 10);
int taskLaneLimit = randomIntBetween(1, 10);
adTaskCacheManager.setDetectorTaskLaneLimit(detectorId, taskLaneLimit);
adTaskCacheManager.setDetectorTaskSlots(detectorId, taskSlots);
assertEquals(taskSlots, adTaskCacheManager.getDetectorTaskSlots(detectorId));
int scaleUpDelta = randomIntBetween(1, 5);
adTaskCacheManager.scaleUpDetectorTaskSlots(detectorId, scaleUpDelta);
assertEquals(taskSlots + scaleUpDelta, adTaskCacheManager.getDetectorTaskSlots(detectorId));
int scaleDownDelta = randomIntBetween(1, 5);
int newTaskSlots = adTaskCacheManager.scaleDownHCDetectorTaskSlots(detectorId, scaleDownDelta);
assertEquals(taskSlots + scaleUpDelta - scaleDownDelta, newTaskSlots);
assertEquals(taskSlots + scaleUpDelta - scaleDownDelta, adTaskCacheManager.getDetectorTaskSlots(detectorId));
int newTaskSlots2 = adTaskCacheManager.scaleDownHCDetectorTaskSlots(detectorId, taskSlots * 10);
assertEquals(newTaskSlots, adTaskCacheManager.getDetectorTaskSlots(detectorId));
assertEquals(newTaskSlots, newTaskSlots2);
}

public void testTaskLanes() throws IOException {
List<String> result = addHCDetectorCache();
String detectorId = result.get(0);
int maxTaskLanes = randomIntBetween(1, 10);
adTaskCacheManager.setAllowedRunningEntities(detectorId, maxTaskLanes);
assertEquals(maxTaskLanes, adTaskCacheManager.getAvailableNewEntityTaskLanes(detectorId));
}
}
Loading

0 comments on commit 96b914a

Please sign in to comment.