diff --git a/alerting/build.gradle b/alerting/build.gradle index 47d0026f5..7a97478f2 100644 --- a/alerting/build.gradle +++ b/alerting/build.gradle @@ -111,6 +111,20 @@ testClusters.integTest { debugPort += 1 } } + + plugin(provider({ + new RegularFile() { + @Override + File getAsFile() { fileTree("src/test/resources/notifications-core").getSingleFile() } + } + })) + + plugin(provider({ + new RegularFile() { + @Override + File getAsFile() { fileTree("src/test/resources/notifications").getSingleFile() } + } + })) } testClusters.integTest.nodes.each { node -> @@ -187,7 +201,19 @@ task prepareBwcTests { dependsOn bundle doLast { plugins = [ - project.getObjects().fileProperty().value(bundle.getArchiveFile()) + project.getObjects().fileProperty().value(bundle.getArchiveFile()), + provider({ + new RegularFile() { + @Override + File getAsFile() { fileTree("src/test/resources/notifications-core").getSingleFile() } + } + }), + provider({ + new RegularFile() { + @Override + File getAsFile() { fileTree("src/test/resources/notifications").getSingleFile() } + } + }) ] } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt index 655bdea6b..c241fdaf1 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt @@ -17,9 +17,6 @@ import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.alerting.alerts.AlertError import org.opensearch.alerting.alerts.AlertIndices -import org.opensearch.alerting.elasticapi.firstFailureOrNull -import org.opensearch.alerting.elasticapi.retry -import org.opensearch.alerting.elasticapi.suspendUntil import org.opensearch.alerting.model.ActionExecutionResult import org.opensearch.alerting.model.ActionRunResult import org.opensearch.alerting.model.AggregationResultBucket @@ -29,6 +26,9 @@ import org.opensearch.alerting.model.Monitor import org.opensearch.alerting.model.QueryLevelTriggerRunResult import org.opensearch.alerting.model.Trigger import org.opensearch.alerting.model.action.AlertCategory +import org.opensearch.alerting.opensearchapi.firstFailureOrNull +import org.opensearch.alerting.opensearchapi.retry +import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.getBucketKeysHash diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 7afe7c856..2e74e13bc 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -82,6 +82,7 @@ import org.opensearch.alerting.transport.TransportIndexMonitorAction import org.opensearch.alerting.transport.TransportSearchEmailAccountAction import org.opensearch.alerting.transport.TransportSearchEmailGroupAction import org.opensearch.alerting.transport.TransportSearchMonitorAction +import org.opensearch.alerting.util.destinationmigration.DestinationMigrationCoordinator import org.opensearch.client.Client import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.node.DiscoveryNodes @@ -149,6 +150,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R lateinit var threadPool: ThreadPool lateinit var alertIndices: AlertIndices lateinit var clusterService: ClusterService + lateinit var destinationMigrationCoordinator: DestinationMigrationCoordinator override fun getRestHandlers( settings: Settings, @@ -248,9 +250,10 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService) scheduler = JobScheduler(threadPool, runner) sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES) + destinationMigrationCoordinator = DestinationMigrationCoordinator(client, clusterService, threadPool) this.threadPool = threadPool this.clusterService = clusterService - return listOf(sweeper, scheduler, runner, scheduledJobIndices) + return listOf(sweeper, scheduler, runner, scheduledJobIndices, destinationMigrationCoordinator) } override fun getSettings(): List> { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index 4af66dc7c..a5916a73c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -10,11 +10,11 @@ import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.alerting.core.model.ClusterMetricsInput import org.opensearch.alerting.core.model.SearchInput -import org.opensearch.alerting.elasticapi.convertToMap -import org.opensearch.alerting.elasticapi.suspendUntil import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.Monitor import org.opensearch.alerting.model.TriggerAfterKey +import org.opensearch.alerting.opensearchapi.convertToMap +import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.util.AggregationQueryRewriter import org.opensearch.alerting.util.addUserBackendRolesFilter import org.opensearch.alerting.util.executeTransportAction diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt index a109cce47..8ffe31bfe 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt @@ -17,9 +17,6 @@ import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.alerts.moveAlerts import org.opensearch.alerting.core.JobRunner import org.opensearch.alerting.core.model.ScheduledJob -import org.opensearch.alerting.elasticapi.InjectorContextElement -import org.opensearch.alerting.elasticapi.retry -import org.opensearch.alerting.elasticapi.withClosableContext import org.opensearch.alerting.model.ActionRunResult import org.opensearch.alerting.model.Alert import org.opensearch.alerting.model.AlertingConfigAccessor @@ -39,6 +36,9 @@ import org.opensearch.alerting.model.action.AlertCategory import org.opensearch.alerting.model.action.PerAlertActionScope import org.opensearch.alerting.model.action.PerExecutionActionScope import org.opensearch.alerting.model.destination.DestinationContextFactory +import org.opensearch.alerting.opensearchapi.InjectorContextElement +import org.opensearch.alerting.opensearchapi.retry +import org.opensearch.alerting.opensearchapi.withClosableContext import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext import org.opensearch.alerting.script.TriggerExecutionContext diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertError.kt b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertError.kt index 99f5be86c..72d788684 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertError.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertError.kt @@ -5,8 +5,8 @@ package org.opensearch.alerting.alerts -import org.opensearch.alerting.elasticapi.instant -import org.opensearch.alerting.elasticapi.optionalTimeField +import org.opensearch.alerting.opensearchapi.instant +import org.opensearch.alerting.opensearchapi.optionalTimeField import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput import org.opensearch.common.io.stream.Writeable diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt index 85c20e0cc..4b13a3085 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt @@ -23,7 +23,7 @@ import org.opensearch.action.support.IndicesOptions import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.alerting.alerts.AlertIndices.Companion.ALERT_INDEX import org.opensearch.alerting.alerts.AlertIndices.Companion.HISTORY_WRITE_INDEX -import org.opensearch.alerting.elasticapi.suspendUntil +import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_HISTORY_ENABLED import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_HISTORY_INDEX_MAX_AGE diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertMover.kt b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertMover.kt index 880faa332..06b9bea48 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertMover.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertMover.kt @@ -13,9 +13,9 @@ import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.alerting.alerts.AlertIndices.Companion.ALERT_INDEX import org.opensearch.alerting.alerts.AlertIndices.Companion.HISTORY_WRITE_INDEX -import org.opensearch.alerting.elasticapi.suspendUntil import org.opensearch.alerting.model.Alert import org.opensearch.alerting.model.Monitor +import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.client.Client import org.opensearch.common.bytes.BytesReference import org.opensearch.common.xcontent.LoggingDeprecationHandler diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/ActionExecutionResult.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/ActionExecutionResult.kt index f44fcf337..ecdbd8ea4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/ActionExecutionResult.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/ActionExecutionResult.kt @@ -5,8 +5,8 @@ package org.opensearch.alerting.model -import org.opensearch.alerting.elasticapi.instant -import org.opensearch.alerting.elasticapi.optionalTimeField +import org.opensearch.alerting.opensearchapi.instant +import org.opensearch.alerting.opensearchapi.optionalTimeField import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput import org.opensearch.common.io.stream.Writeable diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/Alert.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/Alert.kt index 6fffd0032..e892bf560 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/Alert.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/Alert.kt @@ -6,9 +6,9 @@ package org.opensearch.alerting.model import org.opensearch.alerting.alerts.AlertError -import org.opensearch.alerting.elasticapi.instant -import org.opensearch.alerting.elasticapi.optionalTimeField -import org.opensearch.alerting.elasticapi.optionalUserField +import org.opensearch.alerting.opensearchapi.instant +import org.opensearch.alerting.opensearchapi.optionalTimeField +import org.opensearch.alerting.opensearchapi.optionalUserField import org.opensearch.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/AlertingConfigAccessor.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/AlertingConfigAccessor.kt index d96a4e566..7e982dd41 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/AlertingConfigAccessor.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/AlertingConfigAccessor.kt @@ -10,10 +10,10 @@ import kotlinx.coroutines.withContext import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse import org.opensearch.alerting.core.model.ScheduledJob -import org.opensearch.alerting.elasticapi.suspendUntil import org.opensearch.alerting.model.destination.Destination import org.opensearch.alerting.model.destination.email.EmailAccount import org.opensearch.alerting.model.destination.email.EmailGroup +import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.client.Client import org.opensearch.common.bytes.BytesReference import org.opensearch.common.xcontent.LoggingDeprecationHandler diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt index 68bcf5966..8141ebb42 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt @@ -11,9 +11,9 @@ import org.opensearch.alerting.core.model.Input import org.opensearch.alerting.core.model.Schedule import org.opensearch.alerting.core.model.ScheduledJob import org.opensearch.alerting.core.model.SearchInput -import org.opensearch.alerting.elasticapi.instant -import org.opensearch.alerting.elasticapi.optionalTimeField -import org.opensearch.alerting.elasticapi.optionalUserField +import org.opensearch.alerting.opensearchapi.instant +import org.opensearch.alerting.opensearchapi.optionalTimeField +import org.opensearch.alerting.opensearchapi.optionalUserField import org.opensearch.alerting.settings.AlertingSettings.Companion.MONITOR_MAX_INPUTS import org.opensearch.alerting.settings.AlertingSettings.Companion.MONITOR_MAX_TRIGGERS import org.opensearch.alerting.settings.SupportedClusterMetricsSettings @@ -289,7 +289,7 @@ data class Monitor( requireNotNull(schedule) { "Monitor schedule is null" }, lastUpdateTime ?: Instant.now(), enabledTime, - MonitorType.valueOf(monitorType.toUpperCase(Locale.ROOT)), + MonitorType.valueOf(monitorType.uppercase(Locale.ROOT)), user, schemaVersion, inputs.toList(), diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorRunResult.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorRunResult.kt index 8203fb6cb..7e13f9281 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorRunResult.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorRunResult.kt @@ -8,7 +8,7 @@ package org.opensearch.alerting.model import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchException import org.opensearch.alerting.alerts.AlertError -import org.opensearch.alerting.elasticapi.optionalTimeField +import org.opensearch.alerting.opensearchapi.optionalTimeField import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput import org.opensearch.common.io.stream.Writeable diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/action/Throttle.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/action/Throttle.kt index df6816718..177345b44 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/action/Throttle.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/action/Throttle.kt @@ -58,7 +58,7 @@ data class Throttle( xcp.nextToken() when (fieldName) { UNIT_FIELD -> { - val unitString = xcp.text().toUpperCase(Locale.ROOT) + val unitString = xcp.text().uppercase(Locale.ROOT) require(StringUtils.equals(unitString, ChronoUnit.MINUTES.name), { "Only support MINUTES throttle unit currently" }) unit = ChronoUnit.valueOf(unitString) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/Chime.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/Chime.kt index 513f4825e..b79997703 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/Chime.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/Chime.kt @@ -5,7 +5,7 @@ package org.opensearch.alerting.model.destination -import org.opensearch.alerting.elasticapi.string +import org.opensearch.alerting.opensearchapi.string import org.opensearch.common.Strings import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/Destination.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/Destination.kt index a03378bec..83e7b12a5 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/Destination.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/Destination.kt @@ -12,11 +12,11 @@ import org.opensearch.alerting.destination.message.ChimeMessage import org.opensearch.alerting.destination.message.CustomWebhookMessage import org.opensearch.alerting.destination.message.EmailMessage import org.opensearch.alerting.destination.message.SlackMessage -import org.opensearch.alerting.elasticapi.convertToMap -import org.opensearch.alerting.elasticapi.instant -import org.opensearch.alerting.elasticapi.optionalTimeField -import org.opensearch.alerting.elasticapi.optionalUserField import org.opensearch.alerting.model.destination.email.Email +import org.opensearch.alerting.opensearchapi.convertToMap +import org.opensearch.alerting.opensearchapi.instant +import org.opensearch.alerting.opensearchapi.optionalTimeField +import org.opensearch.alerting.opensearchapi.optionalUserField import org.opensearch.alerting.util.DestinationType import org.opensearch.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION import org.opensearch.alerting.util.isHostInDenylist @@ -193,7 +193,7 @@ data class Destination( schemaVersion, seqNo, primaryTerm, - DestinationType.valueOf(type.toUpperCase(Locale.ROOT)), + DestinationType.valueOf(type.uppercase(Locale.ROOT)), requireNotNull(name) { "Destination name is null" }, user, lastUpdateTime ?: Instant.now(), diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/Slack.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/Slack.kt index 0341ca598..3d7ea6c9c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/Slack.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/Slack.kt @@ -5,7 +5,7 @@ package org.opensearch.alerting.model.destination -import org.opensearch.alerting.elasticapi.string +import org.opensearch.alerting.opensearchapi.string import org.opensearch.common.Strings import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/email/Email.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/email/Email.kt index 1968435f1..418172984 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/email/Email.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/email/Email.kt @@ -173,7 +173,7 @@ data class Recipient( } return Recipient( - RecipientType.valueOf(type.toUpperCase(Locale.ROOT)), + RecipientType.valueOf(type.uppercase(Locale.ROOT)), emailGroupID, email ) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportAcknowledgeAlertAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportAcknowledgeAlertAction.kt index 908b895cd..1dbce69fd 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportAcknowledgeAlertAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportAcknowledgeAlertAction.kt @@ -18,8 +18,8 @@ import org.opensearch.alerting.action.AcknowledgeAlertAction import org.opensearch.alerting.action.AcknowledgeAlertRequest import org.opensearch.alerting.action.AcknowledgeAlertResponse import org.opensearch.alerting.alerts.AlertIndices -import org.opensearch.alerting.elasticapi.optionalTimeField import org.opensearch.alerting.model.Alert +import org.opensearch.alerting.opensearchapi.optionalTimeField import org.opensearch.alerting.util.AlertingException import org.opensearch.client.Client import org.opensearch.common.inject.Inject diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetAlertsAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetAlertsAction.kt index 1df607015..d39fd1100 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetAlertsAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetAlertsAction.kt @@ -15,8 +15,8 @@ import org.opensearch.alerting.action.GetAlertsAction import org.opensearch.alerting.action.GetAlertsRequest import org.opensearch.alerting.action.GetAlertsResponse import org.opensearch.alerting.alerts.AlertIndices -import org.opensearch.alerting.elasticapi.addFilter import org.opensearch.alerting.model.Alert +import org.opensearch.alerting.opensearchapi.addFilter import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException import org.opensearch.client.Client diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetDestinationsAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetDestinationsAction.kt index cef192700..245a1bd87 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetDestinationsAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetDestinationsAction.kt @@ -15,8 +15,8 @@ import org.opensearch.alerting.action.GetDestinationsAction import org.opensearch.alerting.action.GetDestinationsRequest import org.opensearch.alerting.action.GetDestinationsResponse import org.opensearch.alerting.core.model.ScheduledJob -import org.opensearch.alerting.elasticapi.addFilter import org.opensearch.alerting.model.destination.Destination +import org.opensearch.alerting.opensearchapi.addFilter import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException import org.opensearch.client.Client diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt index ed892619b..f97e382ff 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt @@ -13,7 +13,7 @@ import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.alerting.action.SearchMonitorAction import org.opensearch.alerting.action.SearchMonitorRequest -import org.opensearch.alerting.elasticapi.addFilter +import org.opensearch.alerting.opensearchapi.addFilter import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException import org.opensearch.client.Client diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/SupportedClusterMetricsSettingsExtensions.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/SupportedClusterMetricsSettingsExtensions.kt index cb70d578a..2e3027991 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/SupportedClusterMetricsSettingsExtensions.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/SupportedClusterMetricsSettingsExtensions.kt @@ -24,7 +24,7 @@ import org.opensearch.action.admin.indices.recovery.RecoveryRequest import org.opensearch.action.admin.indices.recovery.RecoveryResponse import org.opensearch.alerting.core.model.ClusterMetricsInput import org.opensearch.alerting.core.model.ClusterMetricsInput.ClusterMetricType -import org.opensearch.alerting.elasticapi.convertToMap +import org.opensearch.alerting.opensearchapi.convertToMap import org.opensearch.alerting.settings.SupportedClusterMetricsSettings import org.opensearch.alerting.settings.SupportedClusterMetricsSettings.Companion.resolveToActionRequest import org.opensearch.client.Client diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationConversionUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationConversionUtils.kt new file mode 100644 index 000000000..3dec5174e --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationConversionUtils.kt @@ -0,0 +1,184 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util.destinationmigration + +import org.apache.http.client.utils.URIBuilder +import org.opensearch.alerting.model.destination.Destination +import org.opensearch.alerting.model.destination.email.EmailAccount +import org.opensearch.alerting.model.destination.email.Recipient +import org.opensearch.alerting.util.DestinationType +import org.opensearch.common.Strings +import org.opensearch.commons.notifications.model.Chime +import org.opensearch.commons.notifications.model.ConfigType +import org.opensearch.commons.notifications.model.Email +import org.opensearch.commons.notifications.model.EmailGroup +import org.opensearch.commons.notifications.model.EmailRecipient +import org.opensearch.commons.notifications.model.HttpMethodType +import org.opensearch.commons.notifications.model.MethodType +import org.opensearch.commons.notifications.model.NotificationConfig +import org.opensearch.commons.notifications.model.Slack +import org.opensearch.commons.notifications.model.SmtpAccount +import org.opensearch.commons.notifications.model.Webhook +import java.net.URI +import java.net.URISyntaxException +import java.util.Locale + +class DestinationConversionUtils { + + companion object { + + fun convertDestinationToNotificationConfig(destination: Destination): NotificationConfig? { + when (destination.type) { + DestinationType.CHIME -> { + val alertChime = destination.chime ?: return null + val chime = Chime(alertChime.url) + val description = "Chime destination created from the Alerting plugin" + return NotificationConfig( + destination.name, + description, + ConfigType.CHIME, + chime + ) + } + DestinationType.SLACK -> { + val alertSlack = destination.slack ?: return null + val slack = Slack(alertSlack.url) + val description = "Slack destination created from the Alerting plugin" + return NotificationConfig( + destination.name, + description, + ConfigType.SLACK, + slack + ) + } + // TODO: Add this back after adding SNS to Destination data models +// DestinationType.SNS -> { +// val alertSNS = destination.sns ?: return null +// val sns = Sns(alertSNS.topicARN, alertSNS.roleARN) +// val description = "SNS destination created from the Alerting plugin" +// return NotificationConfig( +// destination.name, +// description, +// ConfigType.SNS, +// sns +// ) +// } + DestinationType.CUSTOM_WEBHOOK -> { + val alertWebhook = destination.customWebhook ?: return null + val uri = buildUri( + alertWebhook.url, + alertWebhook.scheme, + alertWebhook.host, + alertWebhook.port, + alertWebhook.path, + alertWebhook.queryParams + ).toString() + val methodType = when (alertWebhook.method?.uppercase(Locale.ENGLISH)) { + "POST" -> HttpMethodType.POST + "PUT" -> HttpMethodType.PUT + "PATCH" -> HttpMethodType.PATCH + else -> HttpMethodType.POST + } + val webhook = Webhook(uri, alertWebhook.headerParams, methodType) + val description = "Webhook destination created from the Alerting plugin" + return NotificationConfig( + destination.name, + description, + ConfigType.WEBHOOK, + webhook + ) + } + DestinationType.EMAIL -> { + val alertEmail = destination.email ?: return null + val recipients = mutableListOf() + val emailGroupIds = mutableListOf() + alertEmail.recipients.forEach { + if (it.type == Recipient.RecipientType.EMAIL_GROUP) + it.emailGroupID?.let { emailGroup -> emailGroupIds.add(emailGroup) } + else it.email?.let { emailRecipient -> recipients.add(EmailRecipient(emailRecipient)) } + } + + val email = Email(alertEmail.emailAccountID, recipients, emailGroupIds) + val description = "Email destination created from the Alerting plugin" + return NotificationConfig( + destination.name, + description, + ConfigType.EMAIL, + email + ) + } + else -> return null + } + } + + fun convertEmailAccountToNotificationConfig(emailAccount: EmailAccount): NotificationConfig { + val methodType = convertAlertingToNotificationMethodType(emailAccount.method) + val smtpAccount = SmtpAccount(emailAccount.host, emailAccount.port, methodType, emailAccount.email) + val description = "Email account created from the Alerting plugin" + return NotificationConfig( + emailAccount.name, + description, + ConfigType.SMTP_ACCOUNT, + smtpAccount + ) + } + + fun convertEmailGroupToNotificationConfig( + emailGroup: org.opensearch.alerting.model.destination.email.EmailGroup + ): NotificationConfig { + val recipients = mutableListOf() + emailGroup.emails.forEach { + recipients.add(EmailRecipient(it.email)) + } + val notificationEmailGroup = EmailGroup(recipients) + + val description = "Email group created from the Alerting plugin" + return NotificationConfig( + emailGroup.name, + description, + ConfigType.EMAIL_GROUP, + notificationEmailGroup + ) + } + + private fun buildUri( + endpoint: String?, + scheme: String?, + host: String?, + port: Int, + path: String?, + queryParams: Map + ): URI? { + return try { + if (Strings.isNullOrEmpty(endpoint)) { + if (host == null) { + throw IllegalStateException("No host was provided when endpoint was null") + } + var uriScheme = scheme + if (Strings.isNullOrEmpty(scheme)) { + uriScheme = "https" + } + val uriBuilder = URIBuilder() + if (queryParams.isNotEmpty()) { + for ((key, value) in queryParams) uriBuilder.addParameter(key, value) + } + return uriBuilder.setScheme(uriScheme).setHost(host).setPort(port).setPath(path).build() + } + URIBuilder(endpoint).build() + } catch (e: URISyntaxException) { + throw IllegalStateException("Error creating URI", e) + } + } + + private fun convertAlertingToNotificationMethodType(alertMethodType: EmailAccount.MethodType): MethodType { + return when (alertMethodType) { + EmailAccount.MethodType.NONE -> MethodType.NONE + EmailAccount.MethodType.SSL -> MethodType.SSL + EmailAccount.MethodType.TLS -> MethodType.START_TLS + } + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationCoordinator.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationCoordinator.kt new file mode 100644 index 000000000..a806e3aeb --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationCoordinator.kt @@ -0,0 +1,98 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util.destinationmigration + +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import org.apache.logging.log4j.LogManager +import org.opensearch.client.Client +import org.opensearch.client.node.NodeClient +import org.opensearch.cluster.ClusterChangedEvent +import org.opensearch.cluster.ClusterStateListener +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.component.LifecycleListener +import org.opensearch.common.unit.TimeValue +import org.opensearch.threadpool.Scheduler +import org.opensearch.threadpool.ThreadPool +import kotlin.coroutines.CoroutineContext + +class DestinationMigrationCoordinator( + private val client: Client, + private val clusterService: ClusterService, + private val threadPool: ThreadPool +) : ClusterStateListener, CoroutineScope, LifecycleListener() { + + private val logger = LogManager.getLogger(javaClass) + + override val coroutineContext: CoroutineContext + get() = Dispatchers.Default + CoroutineName("DestinationMigrationCoordinator") + + private var scheduledMigration: Scheduler.Cancellable? = null + + @Volatile + private var runningLock = false + + init { + clusterService.addListener(this) + clusterService.addLifecycleListener(this) + } + + override fun clusterChanged(event: ClusterChangedEvent) { + logger.info("Detected cluster change event for destination migration") + if (DestinationMigrationUtilService.finishFlag) { + logger.info("Reset destination migration process.") + scheduledMigration?.cancel() + DestinationMigrationUtilService.finishFlag = false + } + if ( + event.localNodeMaster() && + !runningLock && + (scheduledMigration == null || scheduledMigration!!.isCancelled) + ) { + try { + runningLock = true + initMigrateDestinations() + } finally { + runningLock = false + } + } else if (!event.localNodeMaster()) { + scheduledMigration?.cancel() + } + } + + private fun initMigrateDestinations() { + if (!clusterService.state().nodes().isLocalNodeElectedMaster) { + scheduledMigration?.cancel() + return + } + + if (DestinationMigrationUtilService.finishFlag) { + logger.info("Destination migration is already complete, cancelling migration process.") + scheduledMigration?.cancel() + return + } + + val scheduledJob = Runnable { + launch { + try { + if (DestinationMigrationUtilService.finishFlag) { + logger.info("Cancel background destination migration process.") + scheduledMigration?.cancel() + } + + logger.info("Performing migration of destination data.") + DestinationMigrationUtilService.migrateDestinations(client as NodeClient) + } catch (e: Exception) { + logger.error("Failed to migrate destination data", e) + } + } + } + + scheduledMigration = threadPool.scheduleWithFixedDelay(scheduledJob, TimeValue.timeValueMinutes(1), ThreadPool.Names.MANAGEMENT) + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationUtilService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationUtilService.kt new file mode 100644 index 000000000..b225a1e93 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationUtilService.kt @@ -0,0 +1,221 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util.destinationmigration + +import org.apache.logging.log4j.LogManager +import org.opensearch.action.bulk.BulkRequest +import org.opensearch.action.bulk.BulkResponse +import org.opensearch.action.delete.DeleteRequest +import org.opensearch.action.search.SearchRequest +import org.opensearch.action.search.SearchResponse +import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.core.model.ScheduledJob +import org.opensearch.alerting.model.destination.Destination +import org.opensearch.alerting.model.destination.email.EmailAccount +import org.opensearch.alerting.model.destination.email.EmailGroup +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.util.destinationmigration.DestinationConversionUtils.Companion.convertDestinationToNotificationConfig +import org.opensearch.alerting.util.destinationmigration.DestinationConversionUtils.Companion.convertEmailAccountToNotificationConfig +import org.opensearch.alerting.util.destinationmigration.DestinationConversionUtils.Companion.convertEmailGroupToNotificationConfig +import org.opensearch.alerting.util.destinationmigration.NotificationApiUtils.Companion.createNotificationConfig +import org.opensearch.client.node.NodeClient +import org.opensearch.common.Strings +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.ConfigConstants +import org.opensearch.commons.notifications.action.CreateNotificationConfigRequest +import org.opensearch.commons.notifications.model.NotificationConfig +import org.opensearch.commons.notifications.model.NotificationConfigInfo +import org.opensearch.index.query.QueryBuilders +import org.opensearch.rest.RestStatus +import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.search.fetch.subphase.FetchSourceContext +import java.time.Instant + +class DestinationMigrationUtilService { + + companion object { + + private val logger = LogManager.getLogger(DestinationMigrationUtilService::class) + + @Volatile + private var runningLock = false // In case 2 migrateDestinations() processes are running + + // Used in DestinationMigrationCoordinator to cancel scheduled process + @Volatile + var finishFlag = false + internal set + + suspend fun migrateDestinations(client: NodeClient) { + if (runningLock) { + logger.info("There is already a migrate destination process running...") + return + } else if (finishFlag) { + logger.info("Destination migration has finished.") + return + } + try { + runningLock = true + + val destinationsToMigrate = retrieveDestinationsToMigrate(client) + logger.info("Need to migrate ${destinationsToMigrate.size} destinations") + if (destinationsToMigrate.isEmpty()) { + finishFlag = true + runningLock = false + return + } + val migratedDestinations = createNotificationChannelIfNotExists(client, destinationsToMigrate) + logger.info("Migrated ${migratedDestinations.size} destinations") + val failedDeletedDestinations = deleteOldDestinations(client, migratedDestinations) + logger.info("Failed to delete ${failedDeletedDestinations.size} destinations from migration process cleanup") + } finally { + runningLock = false + } + } + + private suspend fun deleteOldDestinations(client: NodeClient, destinationIds: List): List { + val bulkDeleteRequest = BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + destinationIds.forEach { + val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, it) + bulkDeleteRequest.add(deleteRequest) + } + + val failedToDeleteDestinations = mutableListOf() + try { + val bulkResponse: BulkResponse = client.suspendUntil { client.bulk(bulkDeleteRequest, it) } + failedToDeleteDestinations.addAll(bulkResponse.items.filter { it.isFailed }.map { it.id }) + } catch (e: Exception) { + logger.error("Failed to delete all destinations", e) + failedToDeleteDestinations.addAll(destinationIds) + } + return failedToDeleteDestinations + } + + private suspend fun createNotificationChannelIfNotExists( + client: NodeClient, + notificationConfigInfoList: List> + ): List { + val migratedNotificationConfigs = mutableListOf() + notificationConfigInfoList.forEach { + val notificationConfigInfo = it.first + val userStr = it.second + val createNotificationConfigRequest = CreateNotificationConfigRequest( + notificationConfigInfo.notificationConfig, + notificationConfigInfo.configId + ) + try { + // TODO: recreate user object to pass along the same permissions. Make sure this works when user based security is removed + client.threadPool().threadContext.stashContext().use { + if (userStr.isNotBlank()) { + client.threadPool().threadContext + .putTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, userStr) + } + val createResponse = createNotificationConfig(client, createNotificationConfigRequest) + migratedNotificationConfigs.add(createResponse.configId) + logger.debug(("Migrated destination: ${createResponse.configId}")) + } + } catch (e: Exception) { + if (e.message?.contains("version conflict, document already exists") == true) { + migratedNotificationConfigs.add(notificationConfigInfo.configId) + } else { + logger.warn( + "Failed to migrate over Destination ${notificationConfigInfo.configId} because failed to " + + "create channel in Notification plugin.", + e + ) + } + } + } + return migratedNotificationConfigs + } + + private suspend fun retrieveDestinationsToMigrate(client: NodeClient): List> { + var start = 0 + val size = 100 + val notificationConfigInfoList = mutableListOf>() + var hasMoreResults = true + + while (hasMoreResults) { + val searchSourceBuilder = SearchSourceBuilder() + .size(size) + .from(start) + .fetchSource(FetchSourceContext(true, Strings.EMPTY_ARRAY, Strings.EMPTY_ARRAY)) + .seqNoAndPrimaryTerm(true) + .version(true) + val queryBuilder = QueryBuilders.boolQuery() + .should(QueryBuilders.existsQuery("email_account")) + .should(QueryBuilders.existsQuery("email_group")) + .should(QueryBuilders.existsQuery("destination")) + searchSourceBuilder.query(queryBuilder) + + val searchRequest = SearchRequest() + .source(searchSourceBuilder) + .indices(ScheduledJob.SCHEDULED_JOBS_INDEX) + val response: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } + + if (response.status() != RestStatus.OK) { + logger.error("Failed to retrieve destinations to migrate") + hasMoreResults = false + } else { + if (response.hits.hits.isEmpty()) { + hasMoreResults = false + } + for (hit in response.hits) { + val xcp = XContentFactory.xContent(XContentType.JSON) + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, hit.sourceAsString) + var notificationConfig: NotificationConfig? + var userStr = "" + when { + hit.sourceAsString.contains("\"email_group\"") -> { + val emailGroup = EmailGroup.parseWithType(xcp, hit.id, hit.version) + notificationConfig = convertEmailGroupToNotificationConfig(emailGroup) + } + hit.sourceAsString.contains("\"email_account\"") -> { + val emailAccount = EmailAccount.parseWithType(xcp, hit.id, hit.version) + notificationConfig = convertEmailAccountToNotificationConfig(emailAccount) + } + else -> { + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, xcp.nextToken(), xcp) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + val destination = Destination.parse( + xcp, + hit.id, + hit.version, + hit.seqNo.toInt(), + hit.primaryTerm.toInt() + ) + userStr = destination.user.toString() + notificationConfig = convertDestinationToNotificationConfig(destination) + } + } + + if (notificationConfig != null) + notificationConfigInfoList.add( + Pair( + NotificationConfigInfo( + hit.id, + Instant.now(), + Instant.now(), + notificationConfig + ), + userStr + ) + ) + } + } + + start += size + } + + return notificationConfigInfoList + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/NotificationApiUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/NotificationApiUtils.kt new file mode 100644 index 000000000..225714da2 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/NotificationApiUtils.kt @@ -0,0 +1,153 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util.destinationmigration + +import org.apache.logging.log4j.LogManager +import org.opensearch.action.bulk.BackoffPolicy +import org.opensearch.alerting.opensearchapi.retryForNotification +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.client.node.NodeClient +import org.opensearch.common.unit.TimeValue +import org.opensearch.commons.ConfigConstants +import org.opensearch.commons.notifications.NotificationsPluginInterface +import org.opensearch.commons.notifications.action.CreateNotificationConfigRequest +import org.opensearch.commons.notifications.action.CreateNotificationConfigResponse +import org.opensearch.commons.notifications.action.DeleteNotificationConfigRequest +import org.opensearch.commons.notifications.action.DeleteNotificationConfigResponse +import org.opensearch.commons.notifications.action.GetNotificationConfigRequest +import org.opensearch.commons.notifications.action.GetNotificationConfigResponse +import org.opensearch.commons.notifications.action.SendNotificationRequest +import org.opensearch.commons.notifications.action.SendNotificationResponse +import org.opensearch.commons.notifications.action.UpdateNotificationConfigRequest +import org.opensearch.commons.notifications.action.UpdateNotificationConfigResponse + +class NotificationApiUtils { + + companion object { + + private val logger = LogManager.getLogger(NotificationApiUtils::class) + + private val defaultRetryPolicy = + BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(100), 2) + + suspend fun getNotificationConfig( + client: NodeClient, + getNotificationConfigRequest: GetNotificationConfigRequest, + retryPolicy: BackoffPolicy = defaultRetryPolicy + ): GetNotificationConfigResponse { + lateinit var getNotificationConfigResponse: GetNotificationConfigResponse + val userStr = client.threadPool().threadContext + .getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT) + client.threadPool().threadContext.stashContext().use { + client.threadPool().threadContext.putTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, userStr) + retryPolicy.retryForNotification(logger) { + getNotificationConfigResponse = NotificationsPluginInterface.suspendUntil { + this.getNotificationConfig( + client, + getNotificationConfigRequest, + it + ) + } + } + } + return getNotificationConfigResponse + } + + suspend fun createNotificationConfig( + client: NodeClient, + createNotificationConfigRequest: CreateNotificationConfigRequest, + retryPolicy: BackoffPolicy = defaultRetryPolicy + ): CreateNotificationConfigResponse { + lateinit var createNotificationConfigResponse: CreateNotificationConfigResponse + val userStr = client.threadPool().threadContext + .getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT) + client.threadPool().threadContext.stashContext().use { + client.threadPool().threadContext.putTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, userStr) + retryPolicy.retryForNotification(logger) { + createNotificationConfigResponse = NotificationsPluginInterface.suspendUntil { + this.createNotificationConfig( + client, + createNotificationConfigRequest, + it + ) + } + } + } + return createNotificationConfigResponse + } + + suspend fun updateNotificationConfig( + client: NodeClient, + updateNotificationConfigRequest: UpdateNotificationConfigRequest, + retryPolicy: BackoffPolicy = defaultRetryPolicy + ): UpdateNotificationConfigResponse { + lateinit var updateNotificationConfigResponse: UpdateNotificationConfigResponse + val userStr = client.threadPool() + .threadContext.getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT) + client.threadPool().threadContext.stashContext().use { + client.threadPool().threadContext.putTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, userStr) + retryPolicy.retryForNotification(logger) { + updateNotificationConfigResponse = NotificationsPluginInterface.suspendUntil { + this.updateNotificationConfig( + client, + updateNotificationConfigRequest, + it + ) + } + } + } + return updateNotificationConfigResponse + } + + suspend fun deleteNotificationConfig( + client: NodeClient, + deleteNotificationConfigRequest: DeleteNotificationConfigRequest, + retryPolicy: BackoffPolicy = defaultRetryPolicy + ): DeleteNotificationConfigResponse { + lateinit var deleteNotificationConfigResponse: DeleteNotificationConfigResponse + val userStr = client.threadPool() + .threadContext.getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT) + client.threadPool().threadContext.stashContext().use { + client.threadPool().threadContext.putTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, userStr) + retryPolicy.retryForNotification(logger) { + deleteNotificationConfigResponse = NotificationsPluginInterface.suspendUntil { + this.deleteNotificationConfig( + client, + deleteNotificationConfigRequest, + it + ) + } + } + } + return deleteNotificationConfigResponse + } + + suspend fun sendNotification( + client: NodeClient, + sendNotificationRequest: SendNotificationRequest, + retryPolicy: BackoffPolicy = defaultRetryPolicy + ): SendNotificationResponse { + lateinit var sendNotificationResponse: SendNotificationResponse + val userStr = client.threadPool().threadContext + .getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT) + client.threadPool().threadContext.stashContext().use { + client.threadPool().threadContext.putTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, userStr) + retryPolicy.retryForNotification(logger) { + sendNotificationResponse = NotificationsPluginInterface.suspendUntil { + this.sendNotification( + client, + sendNotificationRequest.eventSource, + sendNotificationRequest.channelMessage, + sendNotificationRequest.channelIds, + it + ) + } + } + } + return sendNotificationResponse + } + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index a12f187b1..d46325a50 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -20,14 +20,17 @@ import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.core.model.ScheduledJob import org.opensearch.alerting.core.model.SearchInput import org.opensearch.alerting.core.settings.ScheduledJobSettings -import org.opensearch.alerting.elasticapi.string import org.opensearch.alerting.model.Alert import org.opensearch.alerting.model.BucketLevelTrigger import org.opensearch.alerting.model.Monitor import org.opensearch.alerting.model.QueryLevelTrigger +import org.opensearch.alerting.model.destination.Chime +import org.opensearch.alerting.model.destination.CustomWebhook import org.opensearch.alerting.model.destination.Destination +import org.opensearch.alerting.model.destination.Slack import org.opensearch.alerting.model.destination.email.EmailAccount import org.opensearch.alerting.model.destination.email.EmailGroup +import org.opensearch.alerting.opensearchapi.string import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.settings.DestinationSettings import org.opensearch.alerting.util.DestinationType @@ -363,19 +366,59 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { ) } - protected fun getRandomDestination(salt: String): Destination { + fun getSlackDestination(): Destination { + val slack = Slack("https://hooks.slack.com/services/slackId") return Destination( - type = DestinationType.TEST_ACTION, - name = salt + "test", + type = DestinationType.SLACK, + name = "test", user = randomUser(), lastUpdateTime = Instant.now(), chime = null, + slack = slack, + customWebhook = null, + email = null + ) + } + + fun getChimeDestination(): Destination { + val chime = Chime("https://hooks.chime.aws/incomingwebhooks/chimeId") + return Destination( + type = DestinationType.CHIME, + name = "test", + user = randomUser(), + lastUpdateTime = Instant.now(), + chime = chime, slack = null, customWebhook = null, email = null ) } + fun getCustomWebhookDestination(): Destination { + val customWebhook = CustomWebhook( + "https://hooks.slack.com/services/customWebhookId", + null, + null, + 80, + null, + null, + emptyMap(), + emptyMap(), + null, + null + ) + return Destination( + type = DestinationType.CUSTOM_WEBHOOK, + name = "test", + user = randomUser(), + lastUpdateTime = Instant.now(), + chime = null, + slack = null, + customWebhook = customWebhook, + email = null + ) + } + private fun getTestEmailAccount(): EmailAccount { return EmailAccount( name = "test", @@ -670,7 +713,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return StringEntity(toJsonString(), APPLICATION_JSON) } - private fun Destination.toJsonString(): String { + protected fun Destination.toJsonString(): String { val builder = XContentFactory.jsonBuilder() return shuffleXContent(toXContent(builder)).string() } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/ODFERestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/ODFERestTestCase.kt index 710ccbb60..222637d6f 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/ODFERestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/ODFERestTestCase.kt @@ -45,6 +45,21 @@ abstract class ODFERestTestCase : OpenSearchRestTestCase() { return System.getProperty("security", "false")!!.toBoolean() } + @Suppress("UNCHECKED_CAST") + fun isNotificationPluginInstalled(): Boolean { + val response = entityAsMap(client().makeRequest("GET", "_nodes/plugins")) + val nodesInfo = response["nodes"] as Map> + for (nodeInfo in nodesInfo.values) { + val plugins = nodeInfo["plugins"] as List> + for (plugin in plugins) { + if (plugin["name"] == "opensearch-notifications") { + return true + } + } + } + return false + } + override fun getProtocol(): String { return if (isHttps()) { "https" diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index f16ef1c40..8955d0766 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -15,7 +15,6 @@ import org.opensearch.alerting.core.model.Input import org.opensearch.alerting.core.model.IntervalSchedule import org.opensearch.alerting.core.model.Schedule import org.opensearch.alerting.core.model.SearchInput -import org.opensearch.alerting.elasticapi.string import org.opensearch.alerting.model.ActionExecutionResult import org.opensearch.alerting.model.ActionRunResult import org.opensearch.alerting.model.AggregationResultBucket @@ -38,6 +37,7 @@ import org.opensearch.alerting.model.action.Throttle import org.opensearch.alerting.model.destination.email.EmailAccount import org.opensearch.alerting.model.destination.email.EmailEntry import org.opensearch.alerting.model.destination.email.EmailGroup +import org.opensearch.alerting.opensearchapi.string import org.opensearch.alerting.util.getBucketKeysHash import org.opensearch.client.Request import org.opensearch.client.RequestOptions diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/action/GetAlertsResponseTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/action/GetAlertsResponseTests.kt index d4280daaa..5d29f1107 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/action/GetAlertsResponseTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/action/GetAlertsResponseTests.kt @@ -7,8 +7,8 @@ package org.opensearch.alerting.action import org.junit.Assert import org.opensearch.alerting.builder -import org.opensearch.alerting.elasticapi.string import org.opensearch.alerting.model.Alert +import org.opensearch.alerting.opensearchapi.string import org.opensearch.alerting.randomUser import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.common.io.stream.StreamInput diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/model/XContentTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/model/XContentTests.kt index 0f5c5969b..edee16c5f 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/model/XContentTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/model/XContentTests.kt @@ -7,13 +7,13 @@ package org.opensearch.alerting.model import org.opensearch.alerting.builder import org.opensearch.alerting.core.model.SearchInput -import org.opensearch.alerting.elasticapi.string import org.opensearch.alerting.model.action.Action import org.opensearch.alerting.model.action.ActionExecutionPolicy import org.opensearch.alerting.model.action.PerExecutionActionScope import org.opensearch.alerting.model.action.Throttle import org.opensearch.alerting.model.destination.email.EmailAccount import org.opensearch.alerting.model.destination.email.EmailGroup +import org.opensearch.alerting.opensearchapi.string import org.opensearch.alerting.parser import org.opensearch.alerting.randomAction import org.opensearch.alerting.randomActionExecutionPolicy diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationUtilServiceIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationUtilServiceIT.kt new file mode 100644 index 000000000..b8d6de389 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationUtilServiceIT.kt @@ -0,0 +1,105 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util.destinationmigration + +import org.opensearch.alerting.AlertingRestTestCase +import org.opensearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX +import org.opensearch.alerting.makeRequest +import org.opensearch.alerting.model.destination.Destination +import org.opensearch.alerting.model.destination.email.Email +import org.opensearch.alerting.model.destination.email.EmailAccount +import org.opensearch.alerting.model.destination.email.EmailEntry +import org.opensearch.alerting.model.destination.email.EmailGroup +import org.opensearch.alerting.model.destination.email.Recipient +import org.opensearch.alerting.randomUser +import org.opensearch.alerting.toJsonString +import org.opensearch.alerting.util.DestinationType +import org.opensearch.client.ResponseException +import org.opensearch.rest.RestStatus +import java.time.Instant +import java.util.UUID + +class DestinationMigrationUtilServiceIT : AlertingRestTestCase() { + + fun `test migrateData`() { + if (isNotificationPluginInstalled()) { + // Create alerting config index + createRandomMonitor() + + val emailAccount = EmailAccount( + name = "test", + email = "test@email.com", + host = "smtp.com", + port = 25, + method = EmailAccount.MethodType.NONE, + username = null, + password = null + ) + val emailAccountDoc = "{\"email_account\" : ${emailAccount.toJsonString()}}" + val emailGroup = EmailGroup( + name = "test", + emails = listOf(EmailEntry("test@email.com")) + ) + val emailGroupDoc = "{\"email_group\" : ${emailGroup.toJsonString()}}" + val emailAccountId = UUID.randomUUID().toString() + val emailGroupId = UUID.randomUUID().toString() + indexDoc(SCHEDULED_JOBS_INDEX, emailAccountId, emailAccountDoc) + indexDoc(SCHEDULED_JOBS_INDEX, emailGroupId, emailGroupDoc) + + val recipient = Recipient(Recipient.RecipientType.EMAIL, null, "test@email.com") + val email = Email(emailAccountId, listOf(recipient)) + val emailDest = Destination( + id = UUID.randomUUID().toString(), + type = DestinationType.EMAIL, + name = "test", + user = randomUser(), + lastUpdateTime = Instant.now(), + chime = null, + slack = null, + customWebhook = null, + email = email + ) + val slackDestination = getSlackDestination().copy(id = UUID.randomUUID().toString()) + val chimeDestination = getChimeDestination().copy(id = UUID.randomUUID().toString()) + val customWebhookDestination = getCustomWebhookDestination().copy(id = UUID.randomUUID().toString()) + + val destinations = listOf(emailDest, slackDestination, chimeDestination, customWebhookDestination) + + val ids = mutableListOf(emailAccountId, emailGroupId) + for (destination in destinations) { + val dest = """ + { + "destination" : ${destination.toJsonString()} + } + """.trimIndent() + indexDoc(SCHEDULED_JOBS_INDEX, destination.id, dest) + ids.add(destination.id) + } + + // Create cluster change event and wait for migration service to complete migrating data over + client().updateSettings("indices.recovery.max_bytes_per_sec", "40mb") + Thread.sleep(120000) + + for (id in ids) { + val response = client().makeRequest( + "GET", + "_plugins/_notifications/configs/$id" + ) + assertEquals(RestStatus.OK, response.restStatus()) + + try { + client().makeRequest( + "GET", + ".opendistro-alerting-config/_doc/$id" + ) + fail("Expecting ResponseException") + } catch (e: ResponseException) { + assertEquals(RestStatus.NOT_FOUND, e.response.restStatus()) + } + } + } + } +} diff --git a/alerting/src/test/resources/notifications-core/opensearch-notifications-core-2.0.0.0-alpha1-SNAPSHOT.zip b/alerting/src/test/resources/notifications-core/opensearch-notifications-core-2.0.0.0-alpha1-SNAPSHOT.zip new file mode 100644 index 000000000..3f05bb997 Binary files /dev/null and b/alerting/src/test/resources/notifications-core/opensearch-notifications-core-2.0.0.0-alpha1-SNAPSHOT.zip differ diff --git a/alerting/src/test/resources/notifications/opensearch-notifications-2.0.0.0-alpha1-SNAPSHOT.zip b/alerting/src/test/resources/notifications/opensearch-notifications-2.0.0.0-alpha1-SNAPSHOT.zip new file mode 100644 index 000000000..a6a63b807 Binary files /dev/null and b/alerting/src/test/resources/notifications/opensearch-notifications-2.0.0.0-alpha1-SNAPSHOT.zip differ diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/JobSweeper.kt b/core/src/main/kotlin/org/opensearch/alerting/core/JobSweeper.kt index daae181e8..d778f31e6 100644 --- a/core/src/main/kotlin/org/opensearch/alerting/core/JobSweeper.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/core/JobSweeper.kt @@ -16,8 +16,8 @@ import org.opensearch.alerting.core.settings.ScheduledJobSettings.Companion.SWEE import org.opensearch.alerting.core.settings.ScheduledJobSettings.Companion.SWEEP_BACKOFF_RETRY_COUNT import org.opensearch.alerting.core.settings.ScheduledJobSettings.Companion.SWEEP_PAGE_SIZE import org.opensearch.alerting.core.settings.ScheduledJobSettings.Companion.SWEEP_PERIOD -import org.opensearch.alerting.elasticapi.firstFailureOrNull -import org.opensearch.alerting.elasticapi.retry +import org.opensearch.alerting.opensearchapi.firstFailureOrNull +import org.opensearch.alerting.opensearchapi.retry import org.opensearch.client.Client import org.opensearch.cluster.ClusterChangedEvent import org.opensearch.cluster.ClusterStateListener diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/action/node/ScheduledJobStats.kt b/core/src/main/kotlin/org/opensearch/alerting/core/action/node/ScheduledJobStats.kt index 8b14fdfa0..93413d8f7 100644 --- a/core/src/main/kotlin/org/opensearch/alerting/core/action/node/ScheduledJobStats.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/core/action/node/ScheduledJobStats.kt @@ -15,6 +15,7 @@ import org.opensearch.common.io.stream.StreamOutput import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.ToXContentFragment import org.opensearch.common.xcontent.XContentBuilder +import java.util.Locale /** * Scheduled job stat that will be generated by each node. @@ -66,7 +67,7 @@ class ScheduledJobStats : BaseNodeResponse, ToXContentFragment { override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { builder.field("name", node.name) builder.field("schedule_status", status) - builder.field("roles", node.roles.map { it.roleName().toUpperCase() }) + builder.field("roles", node.roles.map { it.roleName().uppercase(Locale.getDefault()) }) if (jobSweeperMetrics != null) { builder.startObject(RestScheduledJobStatsHandler.JOB_SCHEDULING_METRICS) jobSweeperMetrics!!.toXContent(builder, params) diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/model/Schedule.kt b/core/src/main/kotlin/org/opensearch/alerting/core/model/Schedule.kt index ada904ccc..7867dee07 100644 --- a/core/src/main/kotlin/org/opensearch/alerting/core/model/Schedule.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/core/model/Schedule.kt @@ -25,6 +25,7 @@ import java.time.ZoneId import java.time.ZonedDateTime import java.time.temporal.ChronoUnit import java.time.zone.ZoneRulesException +import java.util.Locale sealed class Schedule : Writeable, ToXContentObject { enum class TYPE { CRON, INTERVAL } @@ -73,7 +74,7 @@ sealed class Schedule : Writeable, ToXContentObject { xcp.nextToken() when (cronFieldName) { INTERVAL_FIELD -> interval = xcp.intValue() - UNIT_FIELD -> unit = ChronoUnit.valueOf(xcp.text().toUpperCase()) + UNIT_FIELD -> unit = ChronoUnit.valueOf(xcp.text().uppercase(Locale.getDefault())) } } } diff --git a/core/src/main/kotlin/org/opensearch/alerting/elasticapi/ElasticExtensions.kt b/core/src/main/kotlin/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt similarity index 80% rename from core/src/main/kotlin/org/opensearch/alerting/elasticapi/ElasticExtensions.kt rename to core/src/main/kotlin/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt index d6e89654a..a49181292 100644 --- a/core/src/main/kotlin/org/opensearch/alerting/elasticapi/ElasticExtensions.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.alerting.elasticapi +package org.opensearch.alerting.opensearchapi import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ThreadContextElement @@ -28,6 +28,7 @@ import org.opensearch.common.xcontent.XContentParserUtils import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.InjectSecurity import org.opensearch.commons.authuser.User +import org.opensearch.commons.notifications.NotificationsPluginInterface import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.QueryBuilders import org.opensearch.rest.RestStatus @@ -66,6 +67,36 @@ fun BackoffPolicy.retry(block: () -> T): T { } while (true) } +/** + * Backs off and retries a lambda that makes a request. This retries on any Exception unless it detects the + * Notification plugin is not installed. + * + * @param logger - logger used to log intermediate failures + * @param block - the block of code to retry. This should be a suspend function. + */ +suspend fun BackoffPolicy.retryForNotification( + logger: Logger, + block: suspend () -> T +): T { + val iter = iterator() + do { + try { + return block() + } catch (e: java.lang.Exception) { + val isMissingNotificationPlugin = e.message?.contains("failed to find action") ?: false + if (isMissingNotificationPlugin) { + throw OpenSearchException("Notification plugin is not installed. Please install the Notification plugin.", e) + } else if (iter.hasNext()) { + val backoff = iter.next() + logger.warn("Notification operation failed. Retrying in $backoff.", e) + delay(backoff.millis) + } else { + throw e + } + } + } while (true) +} + /** * Retries the given [block] of code as specified by the receiver [BackoffPolicy], if [block] throws an [OpenSearchException] * that is retriable (502, 503, 504). @@ -162,6 +193,20 @@ suspend fun C.suspendUntil(block: C.(ActionListener }) } +/** + * Converts [NotificationsPluginInterface] methods that take a callback into a kotlin suspending function. + * + * @param block - a block of code that is passed an [ActionListener] that should be passed to the NotificationsPluginInterface API. + */ +suspend fun NotificationsPluginInterface.suspendUntil(block: NotificationsPluginInterface.(ActionListener) -> Unit): T = + suspendCoroutine { cont -> + block(object : ActionListener { + override fun onResponse(response: T) = cont.resume(response) + + override fun onFailure(e: Exception) = cont.resumeWithException(e) + }) + } + /** * Store a [ThreadContext] and restore a [ThreadContext] when the coroutine resumes on a different thread. * diff --git a/core/src/test/kotlin/org/opensearch/alerting/core/XContentTests.kt b/core/src/test/kotlin/org/opensearch/alerting/core/XContentTests.kt index 92b876112..610125469 100644 --- a/core/src/test/kotlin/org/opensearch/alerting/core/XContentTests.kt +++ b/core/src/test/kotlin/org/opensearch/alerting/core/XContentTests.kt @@ -8,7 +8,7 @@ package org.opensearch.alerting.core import org.opensearch.alerting.core.model.Input import org.opensearch.alerting.core.model.SearchInput import org.opensearch.alerting.core.model.XContentTestBase -import org.opensearch.alerting.elasticapi.string +import org.opensearch.alerting.opensearchapi.string import org.opensearch.common.xcontent.ToXContent import org.opensearch.index.query.QueryBuilders import org.opensearch.search.builder.SearchSourceBuilder diff --git a/core/src/test/kotlin/org/opensearch/alerting/core/model/ScheduleTest.kt b/core/src/test/kotlin/org/opensearch/alerting/core/model/ScheduleTest.kt index a0be7a473..53699c36f 100644 --- a/core/src/test/kotlin/org/opensearch/alerting/core/model/ScheduleTest.kt +++ b/core/src/test/kotlin/org/opensearch/alerting/core/model/ScheduleTest.kt @@ -5,7 +5,7 @@ package org.opensearch.alerting.core.model -import org.opensearch.alerting.elasticapi.string +import org.opensearch.alerting.opensearchapi.string import org.opensearch.common.xcontent.ToXContent import java.time.Instant import java.time.ZoneId