Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Fix notifications listener leak in threat intel monitor #1361

Merged
merged 2 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
sendNotificationResponse -> {
if (sendNotificationResponse.getStatus() == RestStatus.OK) {
logger.info("Successfully sent a notification, Notification Event: " + sendNotificationResponse.getNotificationEvent());
listener.onResponse(null);

Check warning on line 84 in src/main/java/org/opensearch/securityanalytics/correlation/alert/notifications/NotificationService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/correlation/alert/notifications/NotificationService.java#L84

Added line #L84 was not covered by tests
} else {
listener.onFailure(new Exception("Error while sending a notification, Notification Event: " + sendNotificationResponse.getNotificationEvent()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,19 @@
}
}
actionListener.onResponse(null);
}, actionListener::onFailure), bulkRequestList.size());
}, e1 -> {
log.error("Failed to bulk index " + getEntityName(), e1);
actionListener.onFailure(e1);

Check warning on line 120 in src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/dao/BaseEntityCrudService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/dao/BaseEntityCrudService.java#L119-L120

Added lines #L119 - L120 were not covered by tests
}), bulkRequestList.size());

for (BulkRequest req : bulkRequestList) {
try {
client.bulk(req, groupedListener); //todo why stash context here?
client.bulk(req, groupedListener);
} catch (Exception e) {
log.error(
() -> new ParameterizedMessage("Failed to bulk save {} {}.", req.batchSize(), getEntityName()),
e);
groupedListener.onFailure(e);

Check warning on line 130 in src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/dao/BaseEntityCrudService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/dao/BaseEntityCrudService.java#L130

Added line #L130 was not covered by tests
}
}
}, e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,19 @@
(iocFindings, e1) -> {
if (e1 != null) {
log.error(
() -> new ParameterizedMessage("Threat intel monitor {}: Failed to create ioc findings/ ",
() -> new ParameterizedMessage("Threat intel monitor {}: Failed to create ioc findings",

Check warning on line 59 in src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/IoCScanService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/IoCScanService.java#L59

Added line #L59 was not covered by tests
iocScanContext.getMonitor().getId(), data.size()),
e1);
scanCallback.accept(null, e1);
scanCallback.accept(data, e1);

Check warning on line 62 in src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/IoCScanService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/IoCScanService.java#L62

Added line #L62 was not covered by tests
} else {
BiConsumer<List<ThreatIntelAlert>, Exception> triggerResultConsumer = (alerts, e2) -> {
if (e2 != null) {
log.error(
() -> new ParameterizedMessage("Threat intel monitor {}: Failed to execute threat intel triggers/ ",
iocScanContext.getMonitor().getId(), data.size()),
e2);
scanCallback.accept(null, e2);
return;
// if findings are generated successfully but alerts/notifications fail we mark execution as succeeded, so that duplicate findings are not created
scanCallback.accept(data, null);
} else {
scanCallback.accept(data, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@
} else {
fetchExistingAlertsForTrigger(monitor, triggerMatchedFindings, trigger, ActionListener.wrap(
existingAlerts -> {
executeActionsAndSaveAlerts(iocFindings, trigger, monitor, existingAlerts, triggerMatchedFindings, threatIntelTrigger, listener);
saveAlertsAndExecuteActions(iocFindings, trigger, monitor, existingAlerts, triggerMatchedFindings, threatIntelTrigger, listener);
},
e -> {
log.error(() -> new ParameterizedMessage(
Expand All @@ -132,7 +132,7 @@
}
}

private void executeActionsAndSaveAlerts(List<IocFinding> iocFindings,
private void saveAlertsAndExecuteActions(List<IocFinding> iocFindings,
Trigger trigger,
Monitor monitor,
List<ThreatIntelAlert> existingAlerts,
Expand All @@ -147,36 +147,38 @@
newAlerts,
existingAlerts);
if (false == trigger.getActions().isEmpty()) {
GroupedActionListener<Void> notifsListener = new GroupedActionListener<>(ActionListener.wrap(
r -> {
saveAlerts(new ArrayList<>(iocToUpdatedAlertsMap.values()),
newAlerts,
monitor,
(threatIntelAlerts, e) -> {
if (e != null) {
log.error(String.format("Threat intel monitor %s: Failed to save alerts for trigger {}", monitor.getId(), trigger.getId()), e);
listener.onFailure(e);
} else {
saveAlerts(new ArrayList<>(iocToUpdatedAlertsMap.values()),
newAlerts,
monitor,
(threatIntelAlerts, e) -> {
if (e != null) {
log.error(String.format("Threat intel monitor %s: Failed to save alerts for trigger %s", monitor.getId(), trigger.getId()), e);
listener.onFailure(e);

Check warning on line 156 in src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/SaIoCScanService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/SaIoCScanService.java#L155-L156

Added lines #L155 - L156 were not covered by tests
} else {
GroupedActionListener<Void> notifsListener = new GroupedActionListener<>(ActionListener.wrap(
r -> {
listener.onResponse(threatIntelAlerts);
}, ex -> {

Check warning on line 161 in src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/SaIoCScanService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/SaIoCScanService.java#L161

Added line #L161 was not covered by tests
log.error(String.format("Threat intel monitor {}: Failed to send notification for trigger {}", monitor.getId(), trigger.getId()), ex);
listener.onFailure(new SecurityAnalyticsException("Failed to send notification", RestStatus.INTERNAL_SERVER_ERROR, ex));
}
});
}, e -> {
log.error(String.format("Threat intel monitor %s: Failed to send notification for trigger {}", monitor.getId(), trigger.getId()), e);
listener.onFailure(new SecurityAnalyticsException("Failed to send notification", RestStatus.INTERNAL_SERVER_ERROR, e));
}
), trigger.getActions().size());
for (Action action : trigger.getActions()) {
try {
String transformedSubject = NotificationService.compileTemplate(ctx, action.getSubjectTemplate());
String transformedMessage = NotificationService.compileTemplate(ctx, action.getMessageTemplate());
String configId = action.getDestinationId();
notificationService.sendNotification(configId, trigger.getSeverity(), transformedSubject, transformedMessage, notifsListener);
} catch (Exception e) {
log.error(String.format("Threat intel monitor %s: Failed to send notification to %s for trigger %s", monitor.getId(), action.getDestinationId(), trigger.getId()), e);
notifsListener.onFailure(new SecurityAnalyticsException("Failed to send notification", RestStatus.INTERNAL_SERVER_ERROR, e));
}
), trigger.getActions().size());

for (Action action : trigger.getActions()) {
try {
String transformedSubject = NotificationService.compileTemplate(ctx, action.getSubjectTemplate());
String transformedMessage = NotificationService.compileTemplate(ctx, action.getMessageTemplate());
String configId = action.getDestinationId();
notificationService.sendNotification(configId, trigger.getSeverity(), transformedSubject, transformedMessage, notifsListener);
} catch (Exception ex) {
log.error(String.format("Threat intel monitor %s: Failed to send notification to %s for trigger %s", monitor.getId(), action.getDestinationId(), trigger.getId()), ex);
notifsListener.onFailure(new SecurityAnalyticsException("Failed to send notification", RestStatus.INTERNAL_SERVER_ERROR, ex));

Check warning on line 175 in src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/SaIoCScanService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/SaIoCScanService.java#L173-L175

Added lines #L173 - L175 were not covered by tests
}

}
}
});

}
} else {
saveAlerts(new ArrayList<>(iocToUpdatedAlertsMap.values()),
newAlerts,
Expand Down Expand Up @@ -235,7 +237,7 @@
r -> {
List<ThreatIntelAlert> list = new ArrayList<>();
r.forEach(list::addAll);
triggerResultConsumer.accept(list, null); //todo change emptylist to actual response
triggerResultConsumer.accept(list, null);
}, e -> {
log.error(() -> new ParameterizedMessage(
"Threat intel monitor {} Failed to execute triggers {}", monitor.getId()),
Expand Down
10 changes: 10 additions & 0 deletions src/test/java/org/opensearch/securityanalytics/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,16 @@ public static Action randomAction(String destinationId) {
return new Action(name, destinationId, template, template, throttleEnabled, throttle, OpenSearchRestTestCase.randomAlphaOfLength(10), null);
}

public static Action randomThreatInteMonitorAction(String destinationId) {
String name = OpenSearchRestTestCase.randomUnicodeOfLength(10);
Script template = randomTemplateScript("Threat intel Monitor {{ctx.monitor.name}} just entered alert status. Please investigate the issue.\n" +
" - Trigger: {{ctx.trigger.name}}\n" +
" - Severity: {{ctx.trigger.severity}}", null);
Boolean throttleEnabled = false;
Throttle throttle = randomThrottle(null, null);
return new Action(name, destinationId, template, template, throttleEnabled, throttle, OpenSearchRestTestCase.randomAlphaOfLength(10), null);
}

public static Script randomTemplateScript(String source, Map<String, Object> params) {
if (params == null) {
params = new HashMap<>();
Expand Down
Loading
Loading