Skip to content

Commit

Permalink
Add execute streaming detectors API
Browse files Browse the repository at this point in the history
Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Feb 12, 2024
1 parent eb9649e commit 1ee02e2
Show file tree
Hide file tree
Showing 24 changed files with 1,439 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.commons.alerting.model.Monitor;
import org.opensearch.core.action.ActionListener;
import org.opensearch.action.ActionRequest;
import org.opensearch.core.action.ActionResponse;
Expand Down Expand Up @@ -183,13 +184,15 @@ public List<RestHandler> getRestHandlers(Settings settings,
new RestSearchCorrelationRuleAction(),
new RestIndexCustomLogTypeAction(),
new RestSearchCustomLogTypeAction(),
new RestDeleteCustomLogTypeAction()
new RestDeleteCustomLogTypeAction(),
new RestExecuteStreamingDetectorsAction(settings)
);
}

@Override
public List<NamedXContentRegistry.Entry> getNamedXContent() {
return List.of(
Monitor.Companion.getXCONTENT_REGISTRY(),
Detector.XCONTENT_REGISTRY,
DetectorInput.XCONTENT_REGISTRY,
Rule.XCONTENT_REGISTRY,
Expand Down Expand Up @@ -248,7 +251,8 @@ public List<Setting<?>> getSettings() {
SecurityAnalyticsSettings.CORRELATION_TIME_WINDOW,
SecurityAnalyticsSettings.ENABLE_AUTO_CORRELATIONS,
SecurityAnalyticsSettings.DEFAULT_MAPPING_SCHEMA,
SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE
SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE,
SecurityAnalyticsSettings.ENABLE_STREAMING_DETECTORS
);
}

Expand Down Expand Up @@ -279,7 +283,8 @@ public List<Setting<?>> getSettings() {
new ActionPlugin.ActionHandler<>(SearchCorrelationRuleAction.INSTANCE, TransportSearchCorrelationRuleAction.class),
new ActionHandler<>(IndexCustomLogTypeAction.INSTANCE, TransportIndexCustomLogTypeAction.class),
new ActionHandler<>(SearchCustomLogTypeAction.INSTANCE, TransportSearchCustomLogTypeAction.class),
new ActionHandler<>(DeleteCustomLogTypeAction.INSTANCE, TransportDeleteCustomLogTypeAction.class)
new ActionHandler<>(DeleteCustomLogTypeAction.INSTANCE, TransportDeleteCustomLogTypeAction.class),
new ActionHandler<>(ExecuteStreamingDetectorsAction.INSTANCE, TransportExecuteStreamingDetectorsAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.securityanalytics.action;

import org.opensearch.action.ActionType;
import org.opensearch.action.bulk.BulkResponse;

public class ExecuteStreamingDetectorsAction extends ActionType<BulkResponse> {
public static final ExecuteStreamingDetectorsAction INSTANCE = new ExecuteStreamingDetectorsAction();
public static final String NAME = "cluster:admin/opensearch/securityanalytics/detectors/streaming/execute";

public ExecuteStreamingDetectorsAction() {
super(NAME, BulkResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.securityanalytics.converters;

import org.opensearch.common.inject.Inject;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.common.xcontent.support.XContentMapValues;
import org.opensearch.commons.alerting.action.ExecuteStreamingWorkflowRequest;
import org.opensearch.commons.alerting.model.IdDocPair;
import org.opensearch.commons.alerting.model.StreamingIndex;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.mapper.MapperParsingException;
import org.opensearch.securityanalytics.model.DocData;
import org.opensearch.securityanalytics.model.StreamingDetectorMetadata;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class ExecuteStreamingWorkflowRequestConverter {
private final NamedXContentRegistry xContentRegistry;

@Inject
public ExecuteStreamingWorkflowRequestConverter(final NamedXContentRegistry xContentRegistry) {
this.xContentRegistry = xContentRegistry;
}

public ExecuteStreamingWorkflowRequest convert(final StreamingDetectorMetadata streamingDetectorMetadata) {
final List<StreamingIndex> streamingIndices = streamingDetectorMetadata.getIndexToDocData().entrySet().stream()
.map(entry -> createStreamingIndex(entry, streamingDetectorMetadata.getQueryFields()))
.collect(Collectors.toList());

return new ExecuteStreamingWorkflowRequest(streamingDetectorMetadata.getWorkflowId(), streamingIndices);
}

private StreamingIndex createStreamingIndex(final Map.Entry<String, List<DocData>> indexToDocData, final Set<String> fieldNames) {
final List<IdDocPair> filteredIdDocPairs = getFilteredIdDocPairs(indexToDocData.getValue(), fieldNames);
return new StreamingIndex(indexToDocData.getKey(), filteredIdDocPairs);
}

private List<IdDocPair> getFilteredIdDocPairs(final List<DocData> indexToDocData, final Set<String> fieldNames) {
return indexToDocData.stream()
.map(DocData::getIdDocPair)
.map(idDocPair -> {
final String docId = idDocPair.getDocId();
final BytesReference filteredDocument = getFilteredDocument(idDocPair.getDocument(), fieldNames);
return new IdDocPair(docId, filteredDocument);
})
.collect(Collectors.toList());
}

// TODO - this logic is consuming ~40% of the CPU. Is there a more efficient way to filter the docs?
private BytesReference getFilteredDocument(final BytesReference document, final Set<String> fieldNames) {
try {
final XContentParser xcp = XContentType.JSON.xContent().createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE, document.streamInput());
final Map<String, ?> documentAsMap = xcp.map();
final Map<String, Object> filteredDocumentAsMap = XContentMapValues.filter(documentAsMap, fieldNames.toArray(String[]::new), new String[0]);

final XContentBuilder builder = XContentFactory.jsonBuilder();
builder.map(filteredDocumentAsMap);
return BytesReference.bytes(builder);
} catch (final Exception e) {
throw new MapperParsingException("Exception parsing document to map", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.securityanalytics.converters;

import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.commons.alerting.model.IdDocPair;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.securityanalytics.model.DocData;
import org.opensearch.securityanalytics.util.SecurityAnalyticsException;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;

public class IndexNameToDocDataConverter {
public Map<String, List<DocData>> convert(final BulkRequest bulkRequest, final BulkResponse bulkResponse) {
if (bulkRequest.requests().size() != bulkResponse.getItems().length) {
throw new SecurityAnalyticsException(
"BulkRequest item length did not match BulkResponse item length. Unable to proceed.",
RestStatus.INTERNAL_SERVER_ERROR,
null
);
}

final Map<String, List<DocData>> indexToDocData = new HashMap<>();
IntStream.range(0, bulkRequest.requests().size()).forEach(requestIndex -> {
final DocWriteRequest<?> request = bulkRequest.requests().get(requestIndex);
final BulkItemResponse response = bulkResponse.getItems()[requestIndex];

// No work for SAP to do if doc is being deleted or DocWriteRequest failed
if (isDeleteOperation(request) || response.isFailed()) {
return;
}

indexToDocData.putIfAbsent(request.index(), new ArrayList<>());
final BytesReference document = getDocument(request);
final String docId = response.getId();
final IdDocPair idDocPair = new IdDocPair(docId, document);
final DocData docData = new DocData(idDocPair, requestIndex);

indexToDocData.get(request.index()).add(docData);
});

return indexToDocData;
}

private boolean isDeleteOperation(final DocWriteRequest<?> docWriteRequest) {
return DocWriteRequest.OpType.DELETE.equals(docWriteRequest.opType());
}

private BytesReference getDocument(final DocWriteRequest<?> docWriteRequest) {
switch (docWriteRequest.opType()) {
case CREATE:
case INDEX: return ((IndexRequest) docWriteRequest).source();
case UPDATE: return ((UpdateRequest) docWriteRequest).doc().source();
default: throw new UnsupportedOperationException("No handler for operation type: " + docWriteRequest.opType());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.securityanalytics.converters;

import org.opensearch.securityanalytics.model.Detector;
import org.opensearch.securityanalytics.model.DetectorInput;
import org.opensearch.securityanalytics.model.DocData;
import org.opensearch.securityanalytics.model.StreamingDetectorMetadata;
import org.opensearch.securityanalytics.validators.StreamingDetectorValidators;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class StreamingDetectorMetadataConverter {
public List<StreamingDetectorMetadata> convert(final List<Detector> detectors, final Map<String, List<DocData>> indexToDocData) {
return detectors.stream()
.peek(StreamingDetectorValidators::validateDetector)
.filter(Detector::isStreamingDetector)
.filter(detector -> doesDetectorHaveIndexAsInput(detector, indexToDocData.keySet()))
.map(detector -> createStreamingDetectorMetadata(detector, indexToDocData))
.collect(Collectors.toList());
}

// TODO - some edge cases here since index patterns and IndexRequests directly to a write index are not considered
private boolean doesDetectorHaveIndexAsInput(final Detector detector, final Set<String> indexNames) {
final DetectorInput detectorInput = detector.getInputs().get(0);
return detectorInput.getIndices().stream().anyMatch(indexNames::contains);
}

private StreamingDetectorMetadata createStreamingDetectorMetadata(final Detector detector,
final Map<String, List<DocData>> indexToDocData) {
final Map<String, List<DocData>> indexToDocDataForDetectorIndices = getIndexToDocDataForDetectorIndices(
detector.getInputs().get(0).getIndices(), indexToDocData);

return new StreamingDetectorMetadata(
detector.getName(),
indexToDocDataForDetectorIndices,
detector.getWorkflowIds().get(0),
detector.getMonitorIds().get(0)
);
}

private Map<String, List<DocData>> getIndexToDocDataForDetectorIndices(final List<String> detectorIndices,
final Map<String, List<DocData>> indexToDocData) {
return indexToDocData.entrySet().stream()
.filter(entry -> detectorIndices.contains(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void createMappingAction(String indexName, String logType, String aliasMa
// since you can't update documents in non-write indices
String index = indexName;
boolean shouldUpsertIndexTemplate = IndexUtils.isConcreteIndex(indexName, this.clusterService.state()) == false;
if (IndexUtils.isDataStream(indexName, this.clusterService.state())) {
if (IndexUtils.isDataStream(indexName, this.clusterService.state()) || IndexUtils.isAlias(indexName, this.clusterService.state())) {
String writeIndex = IndexUtils.getWriteIndex(indexName, this.clusterService.state());
if (writeIndex != null) {
index = writeIndex;
Expand Down
Loading

0 comments on commit 1ee02e2

Please sign in to comment.