Skip to content

Commit

Permalink
Add updated upsertMonitorFromQueries call
Browse files Browse the repository at this point in the history
Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Feb 14, 2024
1 parent 2734d2a commit dbb5596
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ public void createMappingAction(String indexName, String logType, String aliasMa
String index = indexName;
boolean shouldUpsertIndexTemplate = IndexUtils.isConcreteIndex(indexName, this.clusterService.state()) == false;
if (IndexUtils.isDataStream(indexName, this.clusterService.state()) || IndexUtils.isAlias(indexName, this.clusterService.state())) {
log.debug("PERF_DEBUG_SAP: {} is an alias or datastream. Fetching write index.", indexName);
String writeIndex = IndexUtils.getWriteIndex(indexName, this.clusterService.state());
if (writeIndex != null) {
log.debug("PERF_DEBUG_SAP: Write index for {} is {}",indexName, writeIndex );
index = writeIndex;
}
}
Expand All @@ -91,6 +93,7 @@ public void onResponse(GetMappingsResponse getMappingsResponse) {
applyAliasMappings(getMappingsResponse.getMappings(), logType, aliasMappings, partial, new ActionListener<>() {
@Override
public void onResponse(Collection<CreateMappingResult> createMappingResponse) {
log.debug("PERF_DEBUG_SAP: Completed create mappings for {}", indexName);
// We will return ack==false if one of the requests returned that
// else return ack==true
Optional<AcknowledgedResponse> notAckd = createMappingResponse.stream()
Expand All @@ -109,6 +112,7 @@ public void onResponse(Collection<CreateMappingResult> createMappingResponse) {

@Override
public void onFailure(Exception e) {
log.debug("PERF_DEBUG_SAP: Failed to create mappings for {}", indexName );
actionListener.onFailure(e);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -1361,16 +1360,14 @@ public void onFailure(Exception e) {
});
}

private void upsertMonitorFromQueries(List<Pair<String, Rule>> queries, Detector detector, String logIndex, ActionListener<List<IndexMonitorResponse>> listener) throws Exception {
logger.error("PERF_DEBUG: Fetching alias path pairs to construct rule_field_names");
private void upsertMonitorFromQueries(List<Pair<String, Rule>> queries, Detector detector, String logIndex, ActionListener<List<IndexMonitorResponse>> listener) {
logger.error("PERF_DEBUG_SAP: Fetching alias path pairs to construct rule_field_names");
long start = System.currentTimeMillis();
Set<String> ruleFieldNames = new HashSet<>();
for (Pair<String, Rule> query : queries) {
List<String> queryFieldNames = query.getValue().getQueryFieldNames().stream().map(Value::getValue).collect(Collectors.toList());
ruleFieldNames.addAll(queryFieldNames);
}

CountDownLatch indexMappingsLatch = new CountDownLatch(1);
client.execute(GetIndexMappingsAction.INSTANCE, new GetIndexMappingsRequest(logIndex), new ActionListener<>() {
@Override
public void onResponse(GetIndexMappingsResponse getMappingsViewResponse) {
Expand All @@ -1384,13 +1381,18 @@ public void onResponse(GetIndexMappingsResponse getMappingsViewResponse) {
ruleFieldNames.add(aliasPathPair.getRight());
}
}
long took = System.currentTimeMillis() - start;
log.error("PERF_DEBUG_SAP: completed collecting rule_field_names in {} millis", took);
if (request.getMethod() == Method.POST) {
createMonitorFromQueries(queries, detector, listener, request.getRefreshPolicy(), new ArrayList<>(ruleFieldNames));
} else if (request.getMethod() == Method.PUT) {
updateMonitorFromQueries(logIndex, queries, detector, listener, request.getRefreshPolicy(), new ArrayList<>(ruleFieldNames));
}
} catch (Exception e) {
logger.error("Failure in parsing rule field names/aliases while " +
logger.error("PERF_DEBUG_SAP: Failure in parsing rule field names/aliases while " +
detector.getId() == null ? "creating" : "updating" +
" detector. Not optimizing detector queries with relevant fields", e);
ruleFieldNames.clear();
} finally {
indexMappingsLatch.countDown();
}

}
Expand All @@ -1399,17 +1401,8 @@ public void onResponse(GetIndexMappingsResponse getMappingsViewResponse) {
public void onFailure(Exception e) {
log.error("Failed to fetch mappings view response for log index " + logIndex, e);
listener.onFailure(e);
indexMappingsLatch.countDown();
}
});
indexMappingsLatch.await();
long took = System.currentTimeMillis() - start;
log.error("PERF_DEBUG: completed collecting rule_field_names in {} millis", took);
if (request.getMethod() == Method.POST) {
createMonitorFromQueries(queries, detector, listener, request.getRefreshPolicy(), new ArrayList<>(ruleFieldNames));
} else if (request.getMethod() == Method.PUT) {
updateMonitorFromQueries(logIndex, queries, detector, listener, request.getRefreshPolicy(), new ArrayList<>(ruleFieldNames));
}
}

@SuppressWarnings("unchecked")
Expand Down

0 comments on commit dbb5596

Please sign in to comment.