diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 0a80f33ae..e90d2193d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -346,7 +346,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R AlertingSettings.FINDING_HISTORY_MAX_DOCS, AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE, AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD, - AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD + AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD, + AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 5226b3d43..1e6b3f4f2 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -24,6 +24,7 @@ import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.model.userErrorMessage import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext +import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.defaultToPerExecutionAction @@ -476,6 +477,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val findingDocPairs = mutableListOf>() val findings = mutableListOf() val indexRequests = mutableListOf() + monitorCtx.findingsIndexBatchSize = FINDINGS_INDEXING_BATCH_SIZE.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(FINDINGS_INDEXING_BATCH_SIZE) { + monitorCtx.findingsIndexBatchSize = it + } docsToQueries.forEach { val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } @@ -502,15 +507,41 @@ object DocumentLevelMonitorRunner : MonitorRunner() { .string() logger.debug("Findings: $findingStr") - if (shouldCreateFinding) { - indexRequests += IndexRequest(monitor.dataSources.findingsIndex) - .source(findingStr, XContentType.JSON) - .id(finding.id) - .routing(finding.id) - .opType(DocWriteRequest.OpType.INDEX) + if (indexRequests.size > monitorCtx.findingsIndexBatchSize) { + // make bulk indexing call here and flush the indexRequest object + bulkIndexFindings(monitor, monitorCtx, indexRequests) + indexRequests.clear() + } else { + if (shouldCreateFinding) { + indexRequests += IndexRequest(monitor.dataSources.findingsIndex) + .source(findingStr, XContentType.JSON) + .id(finding.id) + .routing(finding.id) + .opType(DocWriteRequest.OpType.INDEX) + } } } + if (indexRequests.size <= monitorCtx.findingsIndexBatchSize) { + bulkIndexFindings(monitor, monitorCtx, indexRequests) + } + + try { + findings.forEach { finding -> + publishFinding(monitor, monitorCtx, finding) + } + } catch (e: Exception) { + // suppress exception + logger.error("Optional finding callback failed", e) + } + return findingDocPairs + } + + private suspend fun bulkIndexFindings( + monitor: Monitor, + monitorCtx: MonitorRunnerExecutionContext, + indexRequests: List + ) { if (indexRequests.isNotEmpty()) { val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil { bulk(BulkRequest().add(indexRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), it) @@ -518,23 +549,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() { if (bulkResponse.hasFailures()) { bulkResponse.items.forEach { item -> if (item.isFailed) { - logger.debug("Failed indexing the finding ${item.id} of monitor [${monitor.id}]") + logger.error("Failed indexing the finding ${item.id} of monitor [${monitor.id}]") } } } else { logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.") } } - - try { - findings.forEach { finding -> - publishFinding(monitor, monitorCtx, finding) - } - } catch (e: Exception) { - // suppress exception - logger.error("Optional finding callback failed", e) - } - return findingDocPairs } private fun publishFinding( @@ -658,7 +679,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { matchingDocs.addAll(getAllDocs(hits, index, concreteIndex, monitor.id, conflictingFields)) } } catch (e: Exception) { - logger.warn("Failed to run for shard $shard. Error: ${e.message}") + logger.error("Failed to run for shard $shard. Error: ${e.message}") } } return matchingDocs diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index 41a26bb79..2c98495de 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -47,5 +47,6 @@ data class MonitorRunnerExecutionContext( @Volatile var destinationContextFactory: DestinationContextFactory? = null, @Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT, - @Volatile var indexTimeout: TimeValue? = null + @Volatile var indexTimeout: TimeValue? = null, + @Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE ) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index e23d44c5b..37f6d9468 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -17,6 +17,7 @@ class AlertingSettings { companion object { const val DEFAULT_MAX_ACTIONABLE_ALERT_COUNT = 50L + const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 10000 val ALERTING_MAX_MONITORS = Setting.intSetting( "plugins.alerting.monitor.max_monitors", @@ -153,5 +154,12 @@ class AlertingSettings { -1L, Setting.Property.NodeScope, Setting.Property.Dynamic ) + + val FINDINGS_INDEXING_BATCH_SIZE = Setting.intSetting( + "plugins.alerting.alert_findings_indexing_batch_size", + DEFAULT_FINDINGS_INDEXING_BATCH_SIZE, + 0, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) } }