diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/Channel.kt b/src/main/kotlin/org/opensearch/indexmanagement/common/model/notification/Channel.kt similarity index 51% rename from src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/Channel.kt rename to src/main/kotlin/org/opensearch/indexmanagement/common/model/notification/Channel.kt index 8f28d32b1..8b4c1cec4 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/Channel.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/common/model/notification/Channel.kt @@ -3,8 +3,10 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.indexmanagement.indexstatemanagement.model.destination +package org.opensearch.indexmanagement.common.model.notification +import org.opensearch.client.Client +import org.opensearch.client.node.NodeClient import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput import org.opensearch.common.io.stream.Writeable @@ -12,6 +14,14 @@ import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.XContentBuilder import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken +import org.opensearch.commons.ConfigConstants +import org.opensearch.commons.authuser.User +import org.opensearch.commons.notifications.NotificationsPluginInterface +import org.opensearch.commons.notifications.action.SendNotificationResponse +import org.opensearch.commons.notifications.model.ChannelMessage +import org.opensearch.commons.notifications.model.EventSource +import org.opensearch.indexmanagement.opensearchapi.suspendUntil +import org.opensearch.indexmanagement.util.SecurityUtils.Companion.generateUserString import java.io.IOException data class Channel(val id: String) : ToXContent, Writeable { @@ -59,4 +69,30 @@ data class Channel(val id: String) : ToXContent, Writeable { return Channel(requireNotNull(id) { "Channel ID is null" }) } } + + /** + * Extension function for publishing a notification to a channel in the Notification plugin. + */ + suspend fun sendNotification( + client: Client, + eventSource: EventSource, + message: String, + user: User? + ) { + val channel = this + client.threadPool().threadContext.stashContext().use { + // We need to set the user context information in the thread context for notification plugin to correctly resolve the user object + client.threadPool().threadContext.putTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, generateUserString(user)) + val res: SendNotificationResponse = NotificationsPluginInterface.suspendUntil { + this.sendNotification( + (client as NodeClient), + eventSource, + ChannelMessage(message, null, null), + listOf(channel.id), + it + ) + } + validateResponseStatus(res.getStatus(), res.notificationEvent.eventSource.referenceId) + } + } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/common/model/notification/NotificationUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/common/model/notification/NotificationUtils.kt new file mode 100644 index 000000000..bbadd25c4 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/common/model/notification/NotificationUtils.kt @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +@file:JvmName("NotificationUtils") +package org.opensearch.indexmanagement.common.model.notification + +import org.opensearch.OpenSearchStatusException +import org.opensearch.rest.RestStatus + +/** + * all valid response status + */ +private val VALID_RESPONSE_STATUS = setOf( + RestStatus.OK.status, RestStatus.CREATED.status, RestStatus.ACCEPTED.status, + RestStatus.NON_AUTHORITATIVE_INFORMATION.status, RestStatus.NO_CONTENT.status, + RestStatus.RESET_CONTENT.status, RestStatus.PARTIAL_CONTENT.status, + RestStatus.MULTI_STATUS.status +) + +@Throws(OpenSearchStatusException::class) +fun validateResponseStatus(restStatus: RestStatus, responseContent: String) { + if (!VALID_RESPONSE_STATUS.contains(restStatus.status)) { + throw OpenSearchStatusException("Failed: $responseContent", restStatus) + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/NotificationAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/NotificationAction.kt index 0fb483a98..b81aa95cc 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/NotificationAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/NotificationAction.kt @@ -8,7 +8,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.action import org.opensearch.common.io.stream.StreamOutput import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.XContentBuilder -import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Channel +import org.opensearch.indexmanagement.common.model.notification.Channel import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Destination import org.opensearch.indexmanagement.indexstatemanagement.step.notification.AttemptNotificationStep import org.opensearch.indexmanagement.spi.indexstatemanagement.Action diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/NotificationActionParser.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/NotificationActionParser.kt index 9f351f15c..97f98909e 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/NotificationActionParser.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/NotificationActionParser.kt @@ -12,7 +12,7 @@ import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken import org.opensearch.indexmanagement.indexstatemanagement.action.NotificationAction.Companion.CHANNEL_FIELD import org.opensearch.indexmanagement.indexstatemanagement.action.NotificationAction.Companion.DESTINATION_FIELD import org.opensearch.indexmanagement.indexstatemanagement.action.NotificationAction.Companion.MESSAGE_TEMPLATE_FIELD -import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Channel +import org.opensearch.indexmanagement.common.model.notification.Channel import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Destination import org.opensearch.indexmanagement.spi.indexstatemanagement.Action import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ErrorNotification.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ErrorNotification.kt index 589ff9b0d..f99e438c6 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ErrorNotification.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ErrorNotification.kt @@ -14,7 +14,7 @@ import org.opensearch.common.xcontent.XContentBuilder import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentParser.Token import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken -import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Channel +import org.opensearch.indexmanagement.common.model.notification.Channel import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Destination import org.opensearch.script.Script import java.io.IOException diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/NotificationUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/NotificationUtils.kt index b577579f8..ce8d531f0 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/NotificationUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/NotificationUtils.kt @@ -6,20 +6,17 @@ @file:JvmName("NotificationUtils") package org.opensearch.indexmanagement.indexstatemanagement.util -import org.opensearch.OpenSearchStatusException import org.opensearch.client.Client import org.opensearch.client.node.NodeClient -import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT import org.opensearch.commons.authuser.User import org.opensearch.commons.destination.message.LegacyBaseMessage import org.opensearch.commons.notifications.NotificationsPluginInterface import org.opensearch.commons.notifications.action.LegacyPublishNotificationRequest import org.opensearch.commons.notifications.action.LegacyPublishNotificationResponse -import org.opensearch.commons.notifications.action.SendNotificationResponse -import org.opensearch.commons.notifications.model.ChannelMessage import org.opensearch.commons.notifications.model.EventSource import org.opensearch.commons.notifications.model.SeverityType -import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Channel +import org.opensearch.indexmanagement.common.model.notification.Channel +import org.opensearch.indexmanagement.common.model.notification.validateResponseStatus import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.rest.RestStatus @@ -45,7 +42,8 @@ suspend fun LegacyBaseMessage.publishLegacyNotification(client: Client) { } /** - * Extension function for publishing a notification to a channel in the Notification plugin. + * Extension function for publishing a notification to a channel in the Notification plugin. Builds the event source directly + * from the managed index metadata. */ suspend fun Channel.sendNotification( client: Client, @@ -54,49 +52,10 @@ suspend fun Channel.sendNotification( compiledMessage: String, user: User? ) { - val channel = this - client.threadPool().threadContext.stashContext().use { - // We need to set the user context information in the thread context for notification plugin to correctly resolve the user object - client.threadPool().threadContext.putTransient(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, generateUserString(user)) - val res: SendNotificationResponse = NotificationsPluginInterface.suspendUntil { - this.sendNotification( - (client as NodeClient), - managedIndexMetaData.getEventSource(title), - ChannelMessage(compiledMessage, null, null), - listOf(channel.id), - it - ) - } - validateResponseStatus(res.getStatus(), res.notificationEvent.eventSource.referenceId) - } -} - -private fun generateUserString(user: User?): String { - if (user == null) return "" - val backendRoles = user.backendRoles.joinToString(",") - val roles = user.roles.joinToString(",") - val requestedTenant = user.requestedTenant - val userName = user.name - return "$userName|$backendRoles|$roles|$requestedTenant" + val eventSource = managedIndexMetaData.getEventSource(title) + this.sendNotification(client, eventSource, compiledMessage, user) } fun ManagedIndexMetaData.getEventSource(title: String): EventSource { return EventSource(title, indexUuid, SeverityType.INFO) } - -/** - * all valid response status - */ -private val VALID_RESPONSE_STATUS = setOf( - RestStatus.OK.status, RestStatus.CREATED.status, RestStatus.ACCEPTED.status, - RestStatus.NON_AUTHORITATIVE_INFORMATION.status, RestStatus.NO_CONTENT.status, - RestStatus.RESET_CONTENT.status, RestStatus.PARTIAL_CONTENT.status, - RestStatus.MULTI_STATUS.status -) - -@Throws(OpenSearchStatusException::class) -fun validateResponseStatus(restStatus: RestStatus, responseContent: String) { - if (!VALID_RESPONSE_STATUS.contains(restStatus.status)) { - throw OpenSearchStatusException("Failed: $responseContent", restStatus) - } -} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtils.kt index 2657739f5..50f4b78ab 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtils.kt @@ -331,13 +331,24 @@ fun timeLimitExceeded( workflow: WorkflowType, log: Logger, ): SMResult { - log.warn(getTimeLimitExceedMessage(timeLimit)) + val message = getTimeLimitExceededMessage(timeLimit, workflow) + log.warn(message) metadataBuilder.setLatestExecution( status = SMMetadata.LatestExecution.Status.TIME_LIMIT_EXCEEDED, - cause = SnapshotManagementException(message = getTimeLimitExceedMessage(timeLimit)), + cause = SnapshotManagementException(message = message), endTime = now(), ) return SMResult.Fail(metadataBuilder, workflow, forceReset = true) } -fun getTimeLimitExceedMessage(timeLimit: TimeValue) = "Time limit $timeLimit exceeded." +fun getTimeLimitExceededMessage(timeLimit: TimeValue, workflow: WorkflowType): String { + val workflow = when (workflow) { + WorkflowType.CREATION -> { + "creation" + } + WorkflowType.DELETION -> { + "deletion" + } + } + return "Time limit $timeLimit exceeded during snapshot $workflow step" +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/BaseTransportAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/BaseTransportAction.kt index cca4c1391..db1e6d3e9 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/BaseTransportAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/BaseTransportAction.kt @@ -21,6 +21,7 @@ import org.opensearch.common.util.concurrent.ThreadContext.StoredContext import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT import org.opensearch.commons.authuser.User import org.opensearch.index.engine.VersionConflictEngineException +import org.opensearch.indexmanagement.util.IndexManagementException import org.opensearch.indexmanagement.util.SecurityUtils import org.opensearch.rest.RestStatus import org.opensearch.tasks.Task @@ -51,6 +52,8 @@ abstract class BaseTransportAction listener.onResponse(executeRequest(request, user, threadContext)) } + } catch (ex: IndexManagementException) { + listener.onFailure(ex) } catch (ex: VersionConflictEngineException) { listener.onFailure(ex) } catch (ex: OpenSearchStatusException) { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt index cea678572..ed56157e6 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt @@ -24,6 +24,7 @@ import org.opensearch.indexmanagement.snapshotmanagement.engine.states.WorkflowT import org.opensearch.indexmanagement.snapshotmanagement.indexMetadata import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata +import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata.LatestExecution.Status.TIME_LIMIT_EXCEEDED import org.opensearch.indexmanagement.util.OpenForTesting import org.opensearch.threadpool.ThreadPool import java.time.Instant.now @@ -107,6 +108,7 @@ class SMStateMachine( // can still execute other lateral states if exists } is SMResult.Fail -> { + handleFailureNotification(result) // any error causing Fail state is logged in place if (result.forceReset == true) { updateMetadata(result.metadataToSave.resetWorkflow().build()) @@ -118,6 +120,8 @@ class SMStateMachine( } } while (currentState.instance.continuous && result is SMResult.Next) } catch (ex: Exception) { + val message = "There was an exception at ${now()} while executing Snapshot Management policy ${job.policyName}, please check logs." + job.notificationConfig?.sendFailureNotification(client, job.policyName, message, job.user, log) if (ex is SnapshotManagementException && ex.exKey == ExceptionKey.METADATA_INDEXING_FAILURE ) { @@ -129,6 +133,24 @@ class SMStateMachine( return this } + private suspend fun handleFailureNotification(result: SMResult.Fail) { + val message = result.metadataToSave.getWorkflowMetadata()?.latestExecution?.info?.message + if (message != null) { + // Time limit exceeded is a special failure case which needs a different notification + if (result.metadataToSave.getWorkflowMetadata()?.latestExecution?.status == TIME_LIMIT_EXCEEDED) { + job.notificationConfig?.sendTimeLimitExceededNotification( + client, + job.policyName, + message, + job.user, + log + ) + } else { + job.notificationConfig?.sendFailureNotification(client, job.policyName, message, job.user, log) + } + } + } + private fun handleRetry(result: SMResult.Fail, prevState: SMState): SMMetadata.Builder { val metadataBuilder = result.metadataToSave.setCurrentState(prevState) val metadata = result.metadataToSave.build() diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreatingState.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreatingState.kt index 7a0ea530e..860646045 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreatingState.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreatingState.kt @@ -68,7 +68,6 @@ object CreatingState : State { .source(addSMPolicyInSnapshotMetadata(job.snapshotConfig, job.policyName)) .waitForCompletion(false) val res: CreateSnapshotResponse = client.admin().cluster().suspendUntil { createSnapshot(req, it) } - // TODO SM notification that snapshot starts to be created metadataBuilder.setLatestExecution( status = SMMetadata.LatestExecution.Status.IN_PROGRESS, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreationFinishedState.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreationFinishedState.kt index 451b9af53..a48121e8b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreationFinishedState.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreationFinishedState.kt @@ -63,12 +63,14 @@ object CreationFinishedState : State { val snapshot = getSnapshots.first() when (snapshot.state()) { SnapshotState.SUCCESS -> { + val creationMessage = getSnapshotCreationSucceedMessage(snapshotName) metadataBuilder.setLatestExecution( status = SMMetadata.LatestExecution.Status.SUCCESS, - message = getSnapshotCreationSucceedMessage(snapshotName), + message = creationMessage, endTime = now(), ).setCreationStarted(null) - // TODO SM notification snapshot created + // If creation notifications are enabled, send one now + job.notificationConfig?.sendCreationNotification(client, job.policyName, creationMessage, job.user, log) } SnapshotState.IN_PROGRESS -> { job.creation.timeLimit?.let { timeLimit -> @@ -81,18 +83,19 @@ object CreationFinishedState : State { } else -> { // FAILED, PARTIAL, INCOMPATIBLE + val currentState = snapshot.state()?.name metadataBuilder.setLatestExecution( status = SMMetadata.LatestExecution.Status.FAILED, - cause = SnapshotManagementException(message = "Snapshot $snapshotName creation end with state ${snapshot.state()}."), + message = "Snapshot $snapshotName creation failed as the snapshot is in the $currentState state.", + cause = snapshot.reason()?.let { SnapshotManagementException(message = it) }, endTime = now(), ).setCreationStarted(null) - // TODO SM notification snapshot creation has problem + return SMResult.Fail(metadataBuilder, WorkflowType.CREATION, true) } } - // TODO SM notification: if now is after next creation time, update nextCreationTime to the next - // and try notify user that we skip the execution because snapshot creation time - // is longer than execution period + // if now is after next creation time, update nextCreationTime to next execution schedule + // TODO may want to notify user that we skipped the execution because snapshot creation time is longer than execution schedule val result = tryUpdatingNextExecutionTime( metadataBuilder, metadata.creation.trigger.time, job.creation.schedule, WorkflowType.CREATION, log diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/deletion/DeletionFinishedState.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/deletion/DeletionFinishedState.kt index 2e75c9cf6..31631b123 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/deletion/DeletionFinishedState.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/deletion/DeletionFinishedState.kt @@ -50,10 +50,11 @@ object DeletionFinishedState : State { val existingSnapshotsNameSet = getSnapshots.map { it.snapshotId().name }.toSet() val remainingSnapshotsName = existingSnapshotsNameSet intersect snapshotsStartedDeletion.toSet() if (remainingSnapshotsName.isEmpty()) { - // TODO SM notification snapshot deleted + val deletionMessage = "Snapshot(s) $snapshotsStartedDeletion deletion has finished." + job.notificationConfig?.sendDeletionNotification(client, job.policyName, deletionMessage, job.user, log) metadataBuilder.setLatestExecution( status = SMMetadata.LatestExecution.Status.SUCCESS, - message = "Snapshots ${metadata.deletion.started} deletion has finished.", + message = deletionMessage, endTime = now(), ).setDeletionStarted(null) } else { @@ -64,14 +65,10 @@ object DeletionFinishedState : State { } log.info("Retention snapshots haven't been deleted: $remainingSnapshotsName.") - metadataBuilder.setDeletionStarted( - remainingSnapshotsName.toList(), - ) } - // TODO SM notification: if now is after next creation time, update nextCreationTime to the next - // and try notify user that we skip the execution because snapshot creation time - // is longer than execution period + // if now is after next deletion time, update next execution schedule + // TODO may want to notify user that we skipped the execution because snapshot deletion time is longer than execution schedule job.deletion?.let { val result = tryUpdatingNextExecutionTime( metadataBuilder, metadata.deletion.trigger.time, job.deletion.schedule, WorkflowType.DELETION, log diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/NotificationConfig.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/NotificationConfig.kt new file mode 100644 index 000000000..7f0d8e69f --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/NotificationConfig.kt @@ -0,0 +1,198 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement.model + +import org.apache.logging.log4j.Logger +import org.opensearch.client.Client +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.ToXContentObject +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParser.Token +import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken +import org.opensearch.commons.authuser.User +import org.opensearch.commons.notifications.model.EventSource +import org.opensearch.commons.notifications.model.SeverityType +import org.opensearch.indexmanagement.common.model.notification.Channel +import java.io.IOException + +/* + * The data model for the configuration of notifications within a Snapshot Management policy definition. + */ +data class NotificationConfig( + val channel: Channel, + val conditions: Conditions, +) : ToXContentObject, Writeable { + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field(CHANNEL_FIELD, channel) + .field(CONDITIONS_FIELD, conditions) + .endObject() + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + Channel(sin), + Conditions(sin) + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + channel.writeTo(out) + conditions.writeTo(out) + } + + suspend fun sendCreationNotification(client: Client, policyName: String, message: String, user: User?, log: Logger) { + if (this.conditions.creation) { + try { + val eventSource = EventSource(CREATION_TITLE, policyName, SeverityType.INFO) + channel.sendNotification(client, eventSource, message, user) + } catch (e: Exception) { + // If we succeeded to create the snapshot but failed to send the notification, just log + log.error("Failed to send snapshot management creation notification for [$policyName]", e) + } + } + } + + suspend fun sendDeletionNotification(client: Client, policyName: String, message: String, user: User?, log: Logger) { + if (this.conditions.deletion) { + try { + val eventSource = EventSource(DELETION_TITLE, policyName, SeverityType.INFO) + channel.sendNotification(client, eventSource, message, user) + } catch (e: Exception) { + // If we succeeded to delete the snapshot but failed to send the notification, just log + log.error("Failed to send snapshot management deletion notification for [$policyName]", e) + } + } + } + + suspend fun sendTimeLimitExceededNotification( + client: Client, + policyName: String, + message: String, + user: User?, + log: Logger + ) { + if (this.conditions.timeLimitExceeded) { + try { + val eventSource = EventSource(TIME_LIMIT_EXCEEDED_TITLE, policyName, SeverityType.INFO) + channel.sendNotification(client, eventSource, message, user) + } catch (e: Exception) { + log.error("Failed to send snapshot management time limit exceeded notification for [$policyName]", e) + } + } + } + + suspend fun sendFailureNotification(client: Client, policyName: String, message: String, user: User?, log: Logger) { + if (this.conditions.failure) { + try { + val eventSource = EventSource(FAILURE_TITLE, policyName, SeverityType.INFO) + channel.sendNotification(client, eventSource, message, user) + } catch (e: Exception) { + // We already failed, just log the notification error + log.error("Failed to send snapshot management failure notification for [$policyName]", e) + } + } + } + + companion object { + const val CHANNEL_FIELD = "channel" + const val CONDITIONS_FIELD = "conditions" + const val CREATION_TITLE = "Snapshot Management - Snapshot Creation Notification" + const val DELETION_TITLE = "Snapshot Management - Snapshot Deletion Notification" + const val FAILURE_TITLE = "Snapshot Management - Snapshot Failure Notification" + const val TIME_LIMIT_EXCEEDED_TITLE = "Snapshot Management - Snapshot Time Limit Notification" + + @JvmStatic + @Throws(IOException::class) + fun parse(xcp: XContentParser): NotificationConfig { + var channel: Channel? = null + var conditions: Conditions? = null + + ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + CHANNEL_FIELD -> channel = Channel.parse(xcp) + CONDITIONS_FIELD -> conditions = Conditions.parse(xcp) + else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in Snapshot Management notification config.") + } + } + + return NotificationConfig( + channel = requireNotNull(channel) { "Snapshot Management notification channel must not be null" }, + conditions = conditions ?: Conditions() + ) + } + } + + data class Conditions( + val creation: Boolean = true, + val deletion: Boolean = false, + val failure: Boolean = false, + val timeLimitExceeded: Boolean = false + ) : Writeable, ToXContent { + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field(CREATION_FIELD, creation) + .field(DELETION_FIELD, deletion) + .field(FAILURE_FIELD, failure) + .field(TIME_LIMIT_EXCEEDED_FIELD, timeLimitExceeded) + .endObject() + } + + companion object { + const val CREATION_FIELD = "creation" + const val DELETION_FIELD = "deletion" + const val FAILURE_FIELD = "failure" + const val TIME_LIMIT_EXCEEDED_FIELD = "time_limit_exceeded" + + fun parse(xcp: XContentParser): Conditions { + var creation = true + var deletion = false + var failure = false + var timeLimitExceeded = false + + ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + CREATION_FIELD -> creation = xcp.booleanValue() + DELETION_FIELD -> deletion = xcp.booleanValue() + FAILURE_FIELD -> failure = xcp.booleanValue() + TIME_LIMIT_EXCEEDED_FIELD -> timeLimitExceeded = xcp.booleanValue() + else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in conditions.") + } + } + + return Conditions(creation, deletion, failure, timeLimitExceeded) + } + } + + constructor(sin: StreamInput) : this( + creation = sin.readBoolean(), + deletion = sin.readBoolean(), + failure = sin.readBoolean(), + timeLimitExceeded = sin.readBoolean(), + ) + + override fun writeTo(out: StreamOutput) { + out.writeBoolean(creation) + out.writeBoolean(deletion) + out.writeBoolean(failure) + out.writeBoolean(timeLimitExceeded) + } + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMMetadata.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMMetadata.kt index 88f45360d..4680055ce 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMMetadata.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMMetadata.kt @@ -558,6 +558,32 @@ data class SMMetadata( return this } + fun getWorkflowMetadata(): WorkflowMetadata? { + return when (workflowType) { + WorkflowType.CREATION -> { + metadata.creation + } + WorkflowType.DELETION -> { + metadata.deletion + } + } + } + + fun getWorkflowType(): WorkflowType { + return workflowType + } + + fun getStartedSnapshots(): List? { + return when (workflowType) { + WorkflowType.CREATION -> { + metadata.creation.started + } + WorkflowType.DELETION -> { + metadata.deletion?.started + } + } + } + fun setRetry(count: Int): Builder { when (workflowType) { WorkflowType.CREATION -> { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt index 2a0d5cb9e..607ddace5 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt @@ -53,7 +53,8 @@ data class SMPolicy( val jobSchedule: Schedule, val seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO, val primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM, - val user: User? = null + val notificationConfig: NotificationConfig? = null, + val user: User? = null, ) : ScheduledJobParameter, Writeable { init { @@ -93,6 +94,7 @@ data class SMPolicy( .field(ENABLED_FIELD, jobEnabled) .optionalTimeField(LAST_UPDATED_TIME_FIELD, jobLastUpdateTime) .optionalTimeField(ENABLED_TIME_FIELD, jobEnabledTime) + .optionalField(NOTIFICATION_FIELD, notificationConfig) if (params.paramAsBoolean(WITH_USER, true)) builder.optionalUserField(USER_FIELD, user) if (params.paramAsBoolean(WITH_TYPE, true)) builder.endObject() return builder.endObject() @@ -114,6 +116,7 @@ data class SMPolicy( const val LAST_UPDATED_TIME_FIELD = "last_updated_time" const val ENABLED_TIME_FIELD = "enabled_time" const val SCHEDULE_FIELD = "schedule" + const val NOTIFICATION_FIELD = "notification" const val USER_FIELD = "user" // Used by sub models Creation and Deletion @@ -135,6 +138,7 @@ data class SMPolicy( var enabledTime: Instant? = null var schedule: Schedule? = null var enabled = true + var notificationConfig: NotificationConfig? = null var user: User? = null if (xcp.currentToken() == null) xcp.nextToken() @@ -154,6 +158,7 @@ data class SMPolicy( ENABLED_TIME_FIELD -> enabledTime = xcp.instant() SCHEDULE_FIELD -> schedule = ScheduleParser.parse(xcp) ENABLED_FIELD -> enabled = xcp.booleanValue() + NOTIFICATION_FIELD -> notificationConfig = NotificationConfig.parse(xcp) USER_FIELD -> user = if (xcp.currentToken() == Token.VALUE_NULL) null else User.parse(xcp) } } @@ -201,6 +206,7 @@ data class SMPolicy( id = id, seqNo = seqNo, primaryTerm = primaryTerm, + notificationConfig = notificationConfig, user = user ) } @@ -219,6 +225,7 @@ data class SMPolicy( id = sin.readString(), seqNo = sin.readLong(), primaryTerm = sin.readLong(), + notificationConfig = sin.readOptionalWriteable { NotificationConfig(it) }, user = sin.readOptionalWriteable(::User) ) @@ -235,6 +242,7 @@ data class SMPolicy( out.writeString(id) out.writeLong(seqNo) out.writeLong(primaryTerm) + out.writeOptionalWriteable(notificationConfig) out.writeOptionalWriteable(user) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/util/SecurityUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/util/SecurityUtils.kt index 5bf50f6a8..65a0903e6 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/util/SecurityUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/util/SecurityUtils.kt @@ -161,5 +161,18 @@ class SecurityUtils { ) queryBuilder.filter(filterQuery) } + + /** + * Generates a user string formed by the username, backend roles, roles and requested tenants separated by '|'. This is the user + * string format used internally in the OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT and may be parsed using User.parse(). + */ + fun generateUserString(user: User?): String { + if (user == null) return "" + val backendRoles = user.backendRoles.joinToString(",") + val roles = user.roles.joinToString(",") + val requestedTenant = user.requestedTenant + val userName = user.name + return "$userName|$backendRoles|$roles|$requestedTenant" + } } } diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json index 4efaf477c..6e5e2d0fc 100644 --- a/src/main/resources/mappings/opendistro-ism-config.json +++ b/src/main/resources/mappings/opendistro-ism-config.json @@ -1414,6 +1414,33 @@ } } }, + "notification": { + "properties": { + "channel": { + "properties": { + "id": { + "type": "keyword" + } + } + }, + "conditions": { + "properties": { + "creation": { + "type": "boolean" + }, + "deletion": { + "type": "boolean" + }, + "failure": { + "type": "boolean" + }, + "time_limit_exceeded": { + "type": "boolean" + } + } + } + } + }, "user": { "properties": { "name": { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt index bab64d0b5..aad56c02b 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt @@ -37,7 +37,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.StateFilter import org.opensearch.indexmanagement.indexstatemanagement.model.Transition import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.ClusterStateManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.SweptManagedIndexConfig -import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Channel +import org.opensearch.indexmanagement.common.model.notification.Channel import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Chime import org.opensearch.indexmanagement.indexstatemanagement.model.destination.CustomWebhook import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Destination diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/XContentTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/XContentTests.kt index 9d9aaf2f5..2d4c4a784 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/XContentTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/XContentTests.kt @@ -10,7 +10,7 @@ import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentType import org.opensearch.indexmanagement.indexstatemanagement.ISMActionsParser import org.opensearch.indexmanagement.indexstatemanagement.action.RollupAction -import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Channel +import org.opensearch.indexmanagement.common.model.notification.Channel import org.opensearch.indexmanagement.indexstatemanagement.model.destination.DestinationType import org.opensearch.indexmanagement.indexstatemanagement.nonNullRandomConditions import org.opensearch.indexmanagement.indexstatemanagement.randomAllocationActionConfig diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt index 79849c9d0..76662d599 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt @@ -161,9 +161,10 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { protected fun getRollupMetadata( metadataId: String, + refresh: Boolean = true, header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json"), ): RollupMetadata { - val response = client().makeRequest("GET", "$INDEX_MANAGEMENT_INDEX/_doc/$metadataId", null, header) + val response = client().makeRequest("GET", "$INDEX_MANAGEMENT_INDEX/_doc/$metadataId?refresh=$refresh", null, header) assertEquals("Unable to get rollup metadata $metadataId", RestStatus.OK, response.restStatus()) return parseRollupMetadata(response) } @@ -171,9 +172,10 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { protected fun getRollupMetadataWithRoutingId( routingId: String, metadataId: String, + refresh: Boolean = true, header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json"), ): RollupMetadata { - val response = client().makeRequest("GET", "$INDEX_MANAGEMENT_INDEX/_doc/$metadataId?routing=$routingId", null, header) + val response = client().makeRequest("GET", "$INDEX_MANAGEMENT_INDEX/_doc/$metadataId?routing=$routingId&refresh=$refresh", null, header) assertEquals("Unable to get rollup metadata $metadataId", RestStatus.OK, response.restStatus()) return parseRollupMetadata(response) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/TestUtils.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/TestUtils.kt index d8b19267f..3e0b75a84 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/TestUtils.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/TestUtils.kt @@ -24,11 +24,13 @@ import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentType import org.opensearch.indexmanagement.opensearchapi.string import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.indexmanagement.indexstatemanagement.randomChannel import org.opensearch.indexmanagement.opensearchapi.toMap import org.opensearch.indexmanagement.randomCronSchedule import org.opensearch.indexmanagement.randomInstant import org.opensearch.indexmanagement.randomIntervalSchedule import org.opensearch.indexmanagement.snapshotmanagement.engine.states.SMState +import org.opensearch.indexmanagement.snapshotmanagement.model.NotificationConfig import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy import org.opensearch.jobscheduler.spi.schedule.CronSchedule @@ -38,6 +40,7 @@ import org.opensearch.snapshots.Snapshot import org.opensearch.snapshots.SnapshotId import org.opensearch.snapshots.SnapshotInfo import org.opensearch.test.OpenSearchTestCase.randomAlphaOfLength +import org.opensearch.test.OpenSearchTestCase.randomBoolean import org.opensearch.test.OpenSearchTestCase.randomIntBetween import org.opensearch.test.OpenSearchTestCase.randomNonNegativeLong import org.opensearch.test.rest.OpenSearchRestTestCase @@ -110,6 +113,7 @@ fun randomSMPolicy( jobSchedule: IntervalSchedule = randomIntervalSchedule(), seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + notificationConfig: NotificationConfig? = null ): SMPolicy { if (dateFormat != null) { snapshotConfig["date_format"] = dateFormat @@ -136,6 +140,7 @@ fun randomSMPolicy( jobSchedule = jobSchedule, seqNo = seqNo, primaryTerm = primaryTerm, + notificationConfig = notificationConfig, ) } @@ -163,6 +168,10 @@ fun randomSnapshotName(): String = randomAlphaOfLength(10) fun randomSMState(): SMState = SMState.values()[randomIntBetween(0, SMState.values().size - 1)] +fun randomNotificationConfig(): NotificationConfig = NotificationConfig(randomChannel(), randomConditions()) + +fun randomConditions(): NotificationConfig.Conditions = NotificationConfig.Conditions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean()) + fun ToXContent.toJsonString(params: ToXContent.Params = ToXContent.EMPTY_PARAMS): String = this.toXContent(XContentFactory.jsonBuilder(), params).string() fun ToXContent.toMap(params: ToXContent.Params = ToXContent.EMPTY_PARAMS): Map = this.toXContent(XContentFactory.jsonBuilder(), params).toMap() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreationFinishedStateTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreationFinishedStateTests.kt index 31c7421c5..27ef430e7 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreationFinishedStateTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreationFinishedStateTests.kt @@ -84,8 +84,8 @@ class CreationFinishedStateTests : MocksTestCase() { val context = SMStateMachine(client, job, metadata, settings, threadPool, indicesManager) val result = SMState.CREATION_FINISHED.instance.execute(context) - assertTrue("Execution results should be Next.", result is SMResult.Next) - result as SMResult.Next + assertTrue("Execution results should be Fail.", result is SMResult.Fail) + result as SMResult.Fail val metadataToSave = result.metadataToSave.build() assertNull("Started creation should be reset to null.", metadataToSave.creation.started) assertEquals("Latest execution status is failed", SMMetadata.LatestExecution.Status.FAILED, metadataToSave.creation.latestExecution!!.status) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/WriteableTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/WriteableTests.kt index bc30654b6..fc9b9edf9 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/WriteableTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/WriteableTests.kt @@ -7,6 +7,7 @@ package org.opensearch.indexmanagement.snapshotmanagement.model import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.common.io.stream.StreamInput +import org.opensearch.indexmanagement.snapshotmanagement.randomNotificationConfig import org.opensearch.indexmanagement.snapshotmanagement.randomSMMetadata import org.opensearch.indexmanagement.snapshotmanagement.randomSMPolicy import org.opensearch.test.OpenSearchTestCase @@ -14,7 +15,7 @@ import org.opensearch.test.OpenSearchTestCase class WriteableTests : OpenSearchTestCase() { fun `test sm policy as stream`() { - val smPolicy = randomSMPolicy() + val smPolicy = randomSMPolicy(notificationConfig = randomNotificationConfig()) val out = BytesStreamOutput().also { smPolicy.writeTo(it) } val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) val streamedSMPolicy = SMPolicy(sin) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/XContentTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/XContentTests.kt index 1bff6efca..8af4a8e17 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/XContentTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/XContentTests.kt @@ -8,6 +8,7 @@ package org.opensearch.indexmanagement.snapshotmanagement.model import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE import org.opensearch.indexmanagement.opensearchapi.parseWithType import org.opensearch.indexmanagement.snapshotmanagement.parser +import org.opensearch.indexmanagement.snapshotmanagement.randomNotificationConfig import org.opensearch.indexmanagement.snapshotmanagement.randomSMMetadata import org.opensearch.indexmanagement.snapshotmanagement.randomSMPolicy import org.opensearch.indexmanagement.snapshotmanagement.toJsonString @@ -16,14 +17,14 @@ import org.opensearch.test.OpenSearchTestCase class XContentTests : OpenSearchTestCase() { fun `test sm policy parsing`() { - val smPolicy = randomSMPolicy() + val smPolicy = randomSMPolicy(notificationConfig = randomNotificationConfig()) val smPolicyString = smPolicy.toJsonString() val parsedSMPolicy = smPolicyString.parser().parseWithType(smPolicy.id, smPolicy.seqNo, smPolicy.primaryTerm, SMPolicy.Companion::parse) assertEquals("Round tripping sm policy with type doesn't work", smPolicy, parsedSMPolicy) } fun `test sm policy parsing without type`() { - val smPolicy = randomSMPolicy() + val smPolicy = randomSMPolicy(notificationConfig = randomNotificationConfig()) val smPolicyString = smPolicy.toJsonString(XCONTENT_WITHOUT_TYPE) val parsedSMPolicy = SMPolicy.parse(smPolicyString.parser(), smPolicy.id, smPolicy.seqNo, smPolicy.primaryTerm) assertEquals("Round tripping sm policy without type doesn't work", smPolicy, parsedSMPolicy) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/resthandler/RestExplainSnapshotManagementIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/resthandler/RestExplainSnapshotManagementIT.kt index d6e09d386..7df78f43e 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/resthandler/RestExplainSnapshotManagementIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/resthandler/RestExplainSnapshotManagementIT.kt @@ -22,6 +22,7 @@ import java.time.temporal.ChronoUnit @Suppress("UNCHECKED_CAST") class RestExplainSnapshotManagementIT : SnapshotManagementRestTestCase() { + fun `test explaining a snapshot management policy`() { val smPolicy = createSMPolicy( randomSMPolicy().copy( diff --git a/src/test/resources/mappings/cached-opendistro-ism-config.json b/src/test/resources/mappings/cached-opendistro-ism-config.json index 4efaf477c..6e5e2d0fc 100644 --- a/src/test/resources/mappings/cached-opendistro-ism-config.json +++ b/src/test/resources/mappings/cached-opendistro-ism-config.json @@ -1414,6 +1414,33 @@ } } }, + "notification": { + "properties": { + "channel": { + "properties": { + "id": { + "type": "keyword" + } + } + }, + "conditions": { + "properties": { + "creation": { + "type": "boolean" + }, + "deletion": { + "type": "boolean" + }, + "failure": { + "type": "boolean" + }, + "time_limit_exceeded": { + "type": "boolean" + } + } + } + } + }, "user": { "properties": { "name": {