Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky test and log level/msgs and enable auto-expand replication #202

Merged
merged 5 commits into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -526,9 +526,6 @@ List<String> jacocoExclusions = [
'org.opensearch.ad.common.exception.ResourceNotFoundException',
'org.opensearch.ad.task.ADTaskSlotLimit',
'org.opensearch.ad.task.ADTaskCacheManager',

// TODO: to be fixed by kaituo
'org.opensearch.ad.indices.AnomalyDetectionIndices'
]

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ protected void runAdJob(
);
return;
}
indexUtil.updateMappingIfNecessary();
indexUtil.update();
/*
* We need to handle 3 cases:
* 1. Detectors created by older versions and never updated. These detectors wont have User details in the
Expand Down Expand Up @@ -643,10 +643,11 @@ private void updateLatestRealtimeTask(
Long detectorIntervalInMinutes,
String error
) {
// Don't need info as this will be printed repeatedly in each interval
adTaskManager
.updateLatestRealtimeTask(detectorId, taskState, rcfTotalUpdates, detectorIntervalInMinutes, error, ActionListener.wrap(r -> {
log
.info(
.debug(
"Updated latest realtime task successfully for detector {}, taskState: {},"
+ " RCF total update: {}, detectorIntervalInMinutes: {}",
detectorId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,14 @@ public Collection<Object> createComponents(
this.clientUtil = new ClientUtil(settings, client, throttler, threadPool);
this.indexUtils = new IndexUtils(client, clientUtil, clusterService, indexNameExpressionResolver);
this.nodeFilter = new DiscoveryNodeFilterer(clusterService);
this.anomalyDetectionIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings, nodeFilter);
this.anomalyDetectionIndices = new AnomalyDetectionIndices(
client,
clusterService,
threadPool,
settings,
nodeFilter,
AnomalyDetectorSettings.MAX_UPDATE_RETRY_TIMES
);
this.clusterService = clusterService;

SingleFeatureLinearUniformInterpolator singleFeatureLinearUniformInterpolator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ private void processResponse(SearchResponse response, Runnable retry, ActionList

try {
Page page = analyzePage(response);
if (totalResults < maxEntities && afterKey != null) {
// we can process at most maxEntities entities
if (totalResults <= maxEntities && afterKey != null) {
updateCompositeAfterKey(response, source);
listener.onResponse(page);
} else {
Expand Down
247 changes: 223 additions & 24 deletions src/main/java/org/opensearch/ad/indices/AnomalyDetectionIndices.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.opensearch.ad.NodeStateManager;
import org.opensearch.ad.breaker.ADCircuitBreakerService;
import org.opensearch.ad.caching.CacheProvider;
import org.opensearch.ad.common.exception.EndRunException;
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.indices.ADIndex;
import org.opensearch.ad.indices.AnomalyDetectionIndices;
Expand Down Expand Up @@ -168,6 +170,10 @@ protected ActionListener<MultiGetResponse> getResponseListener(List<EntityFeatur
// lazy init since we don't expect retryable requests to happen often
Set<String> retryableRequests = null;
Set<String> notFoundModels = null;
boolean printedUnexpectedFailure = false;
// contain requests that we will set the detector's exception to
// EndRunException (stop now = false)
Map<String, Exception> stopDetectorRequests = null;
ylwu-amzn marked this conversation as resolved.
Show resolved Hide resolved
for (MultiGetItemResponse itemResponse : itemResponses) {
String modelId = itemResponse.getId();
if (itemResponse.isFailed()) {
Expand All @@ -189,7 +195,17 @@ protected ActionListener<MultiGetResponse> getResponseListener(List<EntityFeatur
LOG.error("too many get AD model checkpoint requests or shard not available");
setCoolDownStart();
} else {
LOG.error("Unexpected failure", failure);
// some unexpected bug occurred or cluster is unstable (e.g., ClusterBlockException) or index is red (e.g.
// NoShardAvailableActionException) while fetching a checkpoint. As this might happen for a large amount
// of entities, we don't want to flood logs with such exception trace. Only print it once.
if (!printedUnexpectedFailure) {
LOG.error("Unexpected failure", failure);
printedUnexpectedFailure = true;
}
if (stopDetectorRequests == null) {
stopDetectorRequests = new HashMap<>();
}
stopDetectorRequests.put(modelId, failure);
}
} else if (!itemResponse.getResponse().isExists()) {
// lazy init as we don't expect retrying happens often
Expand All @@ -213,6 +229,22 @@ protected ActionListener<MultiGetResponse> getResponseListener(List<EntityFeatur
}
}

// deal with failures that we will retry for a limited amount of times
// before stopping the detector
if (stopDetectorRequests != null) {
for (EntityRequest origRequest : toProcess) {
Optional<String> modelId = origRequest.getModelId();
if (modelId.isPresent() && stopDetectorRequests.containsKey(modelId.get())) {
String adID = origRequest.detectorId;
nodeStateManager
.setException(
adID,
new EndRunException(adID, CommonErrorMessages.BUG_RESPONSE, stopDetectorRequests.get(modelId.get()), false)
);
}
}
}

if (successfulRequests.isEmpty() && (retryableRequests == null || retryableRequests.isEmpty())) {
// don't need to proceed further since no checkpoint is available
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,4 +788,9 @@ private AnomalyDetectorSettings() {}
// such as "there are at least 10000 entities", the default is set to 10,000. That is, requests will count the
// total entities up to 10,000.
public static final int MAX_TOTAL_ENTITIES_TO_TRACK = 10_000;

// ======================================
// AD Index setting
// ======================================
public static int MAX_UPDATE_RETRY_TIMES = 10_000;
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import org.opensearch.common.io.stream.NotSerializableExceptionWrapper;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.NetworkExceptionHelper;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.index.IndexNotFoundException;
Expand Down Expand Up @@ -793,13 +794,16 @@ private void findException(Throwable cause, String adID, AtomicReference<Excepti
}
} else if (causeException instanceof IndexNotFoundException
&& causeException.getMessage().contains(CommonName.CHECKPOINT_INDEX_NAME)) {
// checkpoint index does not exist
// ResourceNotFoundException will trigger cold start later
failure.set(new ResourceNotFoundException(adID, causeException.getMessage()));
} else if (causeException instanceof OpenSearchTimeoutException) {
// we can have OpenSearchTimeoutException when a node tries to load RCF or
// threshold model
failure.set(new InternalFailure(adID, causeException));
} else {
// some unexpected bugs occur while predicting anomaly
// some unexpected bug occurred or cluster is unstable (e.g., ClusterBlockException) or index is red (e.g.
// NoShardAvailableActionException) while predicting anomaly
failure.set(new EndRunException(adID, CommonErrorMessages.BUG_RESPONSE, causeException, false));
}
}
Expand Down Expand Up @@ -1099,6 +1103,7 @@ private boolean hasConnectionIssue(Throwable e) {
|| e instanceof ReceiveTimeoutTransportException
|| e instanceof NodeNotConnectedException
|| e instanceof ConnectException
|| NetworkExceptionHelper.isCloseConnectionException(e)
|| e instanceof ActionNotFoundTransportException;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ protected void adExecute(
ThreadContext.StoredContext storedContext,
ActionListener<IndexAnomalyDetectorResponse> listener
) {
anomalyDetectionIndices.updateMappingIfNecessary();
anomalyDetectionIndices.update();
String detectorId = request.getDetectorID();
long seqNo = request.getSeqNo();
long primaryTerm = request.getPrimaryTerm();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.ad.TestHelpers;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.ad.util.RestHandlerUtils;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -72,7 +73,14 @@ public void setup() {

nodeFilter = new DiscoveryNodeFilterer(clusterService());

indices = new AnomalyDetectionIndices(client(), clusterService(), client().threadPool(), settings, nodeFilter);
indices = new AnomalyDetectionIndices(
client(),
clusterService(),
client().threadPool(),
settings,
nodeFilter,
AnomalyDetectorSettings.MAX_UPDATE_RETRY_TIMES
);
}

public void testAnomalyDetectorIndexNotExists() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,14 @@ public void setUp() throws Exception {
clusterState = ClusterState.builder(clusterName).metadata(Metadata.builder().build()).build();
when(clusterService.state()).thenReturn(clusterState);

adIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings, nodeFilter);
adIndices = new AnomalyDetectionIndices(
client,
clusterService,
threadPool,
settings,
nodeFilter,
AnomalyDetectorSettings.MAX_UPDATE_RETRY_TIMES
);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -175,7 +182,11 @@ private void adaptivePrimaryShardsIndexCreationTemplate(String index) throws IOE
}

Settings settings = request.settings();
assertThat(settings.get("index.number_of_shards"), equalTo(Integer.toString(numberOfHotNodes)));
if (index.equals(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX)) {
assertThat(settings.get("index.number_of_shards"), equalTo(Integer.toString(1)));
} else {
assertThat(settings.get("index.number_of_shards"), equalTo(Integer.toString(numberOfHotNodes)));
}

ActionListener<CreateIndexResponse> listener = (ActionListener<CreateIndexResponse>) invocation.getArgument(1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,14 @@ public void setUp() throws Exception {
numberOfNodes = 2;
when(nodeFilter.getNumberOfEligibleDataNodes()).thenReturn(numberOfNodes);

adIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings, nodeFilter);
adIndices = new AnomalyDetectionIndices(
client,
clusterService,
threadPool,
settings,
nodeFilter,
AnomalyDetectorSettings.MAX_UPDATE_RETRY_TIMES
);

clusterAdminClient = mock(ClusterAdminClient.class);
when(adminClient.cluster()).thenReturn(clusterAdminClient);
Expand Down
Loading