Skip to content

Commit

Permalink
Adds snapshot management notification implementation (opensearch-proj…
Browse files Browse the repository at this point in the history
…ect#387)

* Adds notification integration for snapshot management

Signed-off-by: Clay Downs <[email protected]>

* Changes based on PR comments

Signed-off-by: Clay Downs <[email protected]>
  • Loading branch information
downsrob authored Jun 24, 2022
1 parent d64e100 commit a46f118
Show file tree
Hide file tree
Showing 26 changed files with 450 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,25 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.model.destination
package org.opensearch.indexmanagement.common.model.notification

import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.commons.ConfigConstants
import org.opensearch.commons.authuser.User
import org.opensearch.commons.notifications.NotificationsPluginInterface
import org.opensearch.commons.notifications.action.SendNotificationResponse
import org.opensearch.commons.notifications.model.ChannelMessage
import org.opensearch.commons.notifications.model.EventSource
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.util.SecurityUtils.Companion.generateUserString
import java.io.IOException

data class Channel(val id: String) : ToXContent, Writeable {
Expand Down Expand Up @@ -59,4 +69,30 @@ data class Channel(val id: String) : ToXContent, Writeable {
return Channel(requireNotNull(id) { "Channel ID is null" })
}
}

/**
* Extension function for publishing a notification to a channel in the Notification plugin.
*/
suspend fun sendNotification(
client: Client,
eventSource: EventSource,
message: String,
user: User?
) {
val channel = this
client.threadPool().threadContext.stashContext().use {
// We need to set the user context information in the thread context for notification plugin to correctly resolve the user object
client.threadPool().threadContext.putTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, generateUserString(user))
val res: SendNotificationResponse = NotificationsPluginInterface.suspendUntil {
this.sendNotification(
(client as NodeClient),
eventSource,
ChannelMessage(message, null, null),
listOf(channel.id),
it
)
}
validateResponseStatus(res.getStatus(), res.notificationEvent.eventSource.referenceId)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

@file:JvmName("NotificationUtils")
package org.opensearch.indexmanagement.common.model.notification

import org.opensearch.OpenSearchStatusException
import org.opensearch.rest.RestStatus

/**
* all valid response status
*/
private val VALID_RESPONSE_STATUS = setOf(
RestStatus.OK.status, RestStatus.CREATED.status, RestStatus.ACCEPTED.status,
RestStatus.NON_AUTHORITATIVE_INFORMATION.status, RestStatus.NO_CONTENT.status,
RestStatus.RESET_CONTENT.status, RestStatus.PARTIAL_CONTENT.status,
RestStatus.MULTI_STATUS.status
)

@Throws(OpenSearchStatusException::class)
fun validateResponseStatus(restStatus: RestStatus, responseContent: String) {
if (!VALID_RESPONSE_STATUS.contains(restStatus.status)) {
throw OpenSearchStatusException("Failed: $responseContent", restStatus)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.action
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Channel
import org.opensearch.indexmanagement.common.model.notification.Channel
import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Destination
import org.opensearch.indexmanagement.indexstatemanagement.step.notification.AttemptNotificationStep
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.indexmanagement.indexstatemanagement.action.NotificationAction.Companion.CHANNEL_FIELD
import org.opensearch.indexmanagement.indexstatemanagement.action.NotificationAction.Companion.DESTINATION_FIELD
import org.opensearch.indexmanagement.indexstatemanagement.action.NotificationAction.Companion.MESSAGE_TEMPLATE_FIELD
import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Channel
import org.opensearch.indexmanagement.common.model.notification.Channel
import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Destination
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Channel
import org.opensearch.indexmanagement.common.model.notification.Channel
import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Destination
import org.opensearch.script.Script
import java.io.IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,17 @@
@file:JvmName("NotificationUtils")
package org.opensearch.indexmanagement.indexstatemanagement.util

import org.opensearch.OpenSearchStatusException
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT
import org.opensearch.commons.authuser.User
import org.opensearch.commons.destination.message.LegacyBaseMessage
import org.opensearch.commons.notifications.NotificationsPluginInterface
import org.opensearch.commons.notifications.action.LegacyPublishNotificationRequest
import org.opensearch.commons.notifications.action.LegacyPublishNotificationResponse
import org.opensearch.commons.notifications.action.SendNotificationResponse
import org.opensearch.commons.notifications.model.ChannelMessage
import org.opensearch.commons.notifications.model.EventSource
import org.opensearch.commons.notifications.model.SeverityType
import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Channel
import org.opensearch.indexmanagement.common.model.notification.Channel
import org.opensearch.indexmanagement.common.model.notification.validateResponseStatus
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.rest.RestStatus
Expand All @@ -45,7 +42,8 @@ suspend fun LegacyBaseMessage.publishLegacyNotification(client: Client) {
}

/**
* Extension function for publishing a notification to a channel in the Notification plugin.
* Extension function for publishing a notification to a channel in the Notification plugin. Builds the event source directly
* from the managed index metadata.
*/
suspend fun Channel.sendNotification(
client: Client,
Expand All @@ -54,49 +52,10 @@ suspend fun Channel.sendNotification(
compiledMessage: String,
user: User?
) {
val channel = this
client.threadPool().threadContext.stashContext().use {
// We need to set the user context information in the thread context for notification plugin to correctly resolve the user object
client.threadPool().threadContext.putTransient(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, generateUserString(user))
val res: SendNotificationResponse = NotificationsPluginInterface.suspendUntil {
this.sendNotification(
(client as NodeClient),
managedIndexMetaData.getEventSource(title),
ChannelMessage(compiledMessage, null, null),
listOf(channel.id),
it
)
}
validateResponseStatus(res.getStatus(), res.notificationEvent.eventSource.referenceId)
}
}

private fun generateUserString(user: User?): String {
if (user == null) return ""
val backendRoles = user.backendRoles.joinToString(",")
val roles = user.roles.joinToString(",")
val requestedTenant = user.requestedTenant
val userName = user.name
return "$userName|$backendRoles|$roles|$requestedTenant"
val eventSource = managedIndexMetaData.getEventSource(title)
this.sendNotification(client, eventSource, compiledMessage, user)
}

fun ManagedIndexMetaData.getEventSource(title: String): EventSource {
return EventSource(title, indexUuid, SeverityType.INFO)
}

/**
* all valid response status
*/
private val VALID_RESPONSE_STATUS = setOf(
RestStatus.OK.status, RestStatus.CREATED.status, RestStatus.ACCEPTED.status,
RestStatus.NON_AUTHORITATIVE_INFORMATION.status, RestStatus.NO_CONTENT.status,
RestStatus.RESET_CONTENT.status, RestStatus.PARTIAL_CONTENT.status,
RestStatus.MULTI_STATUS.status
)

@Throws(OpenSearchStatusException::class)
fun validateResponseStatus(restStatus: RestStatus, responseContent: String) {
if (!VALID_RESPONSE_STATUS.contains(restStatus.status)) {
throw OpenSearchStatusException("Failed: $responseContent", restStatus)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -331,13 +331,24 @@ fun timeLimitExceeded(
workflow: WorkflowType,
log: Logger,
): SMResult {
log.warn(getTimeLimitExceedMessage(timeLimit))
val message = getTimeLimitExceededMessage(timeLimit, workflow)
log.warn(message)
metadataBuilder.setLatestExecution(
status = SMMetadata.LatestExecution.Status.TIME_LIMIT_EXCEEDED,
cause = SnapshotManagementException(message = getTimeLimitExceedMessage(timeLimit)),
cause = SnapshotManagementException(message = message),
endTime = now(),
)
return SMResult.Fail(metadataBuilder, workflow, forceReset = true)
}

fun getTimeLimitExceedMessage(timeLimit: TimeValue) = "Time limit $timeLimit exceeded."
fun getTimeLimitExceededMessage(timeLimit: TimeValue, workflow: WorkflowType): String {
val workflow = when (workflow) {
WorkflowType.CREATION -> {
"creation"
}
WorkflowType.DELETION -> {
"deletion"
}
}
return "Time limit $timeLimit exceeded during snapshot $workflow step"
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.opensearch.common.util.concurrent.ThreadContext.StoredContext
import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT
import org.opensearch.commons.authuser.User
import org.opensearch.index.engine.VersionConflictEngineException
import org.opensearch.indexmanagement.util.IndexManagementException
import org.opensearch.indexmanagement.util.SecurityUtils
import org.opensearch.rest.RestStatus
import org.opensearch.tasks.Task
Expand Down Expand Up @@ -51,6 +52,8 @@ abstract class BaseTransportAction<Request : ActionRequest, Response : ActionRes
client.threadPool().threadContext.stashContext().use { threadContext ->
listener.onResponse(executeRequest(request, user, threadContext))
}
} catch (ex: IndexManagementException) {
listener.onFailure(ex)
} catch (ex: VersionConflictEngineException) {
listener.onFailure(ex)
} catch (ex: OpenSearchStatusException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.opensearch.indexmanagement.snapshotmanagement.engine.states.WorkflowT
import org.opensearch.indexmanagement.snapshotmanagement.indexMetadata
import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata
import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata.LatestExecution.Status.TIME_LIMIT_EXCEEDED
import org.opensearch.indexmanagement.util.OpenForTesting
import org.opensearch.threadpool.ThreadPool
import java.time.Instant.now
Expand Down Expand Up @@ -107,6 +108,7 @@ class SMStateMachine(
// can still execute other lateral states if exists
}
is SMResult.Fail -> {
handleFailureNotification(result)
// any error causing Fail state is logged in place
if (result.forceReset == true) {
updateMetadata(result.metadataToSave.resetWorkflow().build())
Expand All @@ -118,6 +120,8 @@ class SMStateMachine(
}
} while (currentState.instance.continuous && result is SMResult.Next)
} catch (ex: Exception) {
val message = "There was an exception at ${now()} while executing Snapshot Management policy ${job.policyName}, please check logs."
job.notificationConfig?.sendFailureNotification(client, job.policyName, message, job.user, log)
if (ex is SnapshotManagementException &&
ex.exKey == ExceptionKey.METADATA_INDEXING_FAILURE
) {
Expand All @@ -129,6 +133,24 @@ class SMStateMachine(
return this
}

private suspend fun handleFailureNotification(result: SMResult.Fail) {
val message = result.metadataToSave.getWorkflowMetadata()?.latestExecution?.info?.message
if (message != null) {
// Time limit exceeded is a special failure case which needs a different notification
if (result.metadataToSave.getWorkflowMetadata()?.latestExecution?.status == TIME_LIMIT_EXCEEDED) {
job.notificationConfig?.sendTimeLimitExceededNotification(
client,
job.policyName,
message,
job.user,
log
)
} else {
job.notificationConfig?.sendFailureNotification(client, job.policyName, message, job.user, log)
}
}
}

private fun handleRetry(result: SMResult.Fail, prevState: SMState): SMMetadata.Builder {
val metadataBuilder = result.metadataToSave.setCurrentState(prevState)
val metadata = result.metadataToSave.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ object CreatingState : State {
.source(addSMPolicyInSnapshotMetadata(job.snapshotConfig, job.policyName))
.waitForCompletion(false)
val res: CreateSnapshotResponse = client.admin().cluster().suspendUntil { createSnapshot(req, it) }
// TODO SM notification that snapshot starts to be created

metadataBuilder.setLatestExecution(
status = SMMetadata.LatestExecution.Status.IN_PROGRESS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,14 @@ object CreationFinishedState : State {
val snapshot = getSnapshots.first()
when (snapshot.state()) {
SnapshotState.SUCCESS -> {
val creationMessage = getSnapshotCreationSucceedMessage(snapshotName)
metadataBuilder.setLatestExecution(
status = SMMetadata.LatestExecution.Status.SUCCESS,
message = getSnapshotCreationSucceedMessage(snapshotName),
message = creationMessage,
endTime = now(),
).setCreationStarted(null)
// TODO SM notification snapshot created
// If creation notifications are enabled, send one now
job.notificationConfig?.sendCreationNotification(client, job.policyName, creationMessage, job.user, log)
}
SnapshotState.IN_PROGRESS -> {
job.creation.timeLimit?.let { timeLimit ->
Expand All @@ -81,18 +83,19 @@ object CreationFinishedState : State {
}
else -> {
// FAILED, PARTIAL, INCOMPATIBLE
val currentState = snapshot.state()?.name
metadataBuilder.setLatestExecution(
status = SMMetadata.LatestExecution.Status.FAILED,
cause = SnapshotManagementException(message = "Snapshot $snapshotName creation end with state ${snapshot.state()}."),
message = "Snapshot $snapshotName creation failed as the snapshot is in the $currentState state.",
cause = snapshot.reason()?.let { SnapshotManagementException(message = it) },
endTime = now(),
).setCreationStarted(null)
// TODO SM notification snapshot creation has problem
return SMResult.Fail(metadataBuilder, WorkflowType.CREATION, true)
}
}

// TODO SM notification: if now is after next creation time, update nextCreationTime to the next
// and try notify user that we skip the execution because snapshot creation time
// is longer than execution period
// if now is after next creation time, update nextCreationTime to next execution schedule
// TODO may want to notify user that we skipped the execution because snapshot creation time is longer than execution schedule
val result = tryUpdatingNextExecutionTime(
metadataBuilder, metadata.creation.trigger.time, job.creation.schedule,
WorkflowType.CREATION, log
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ object DeletionFinishedState : State {
val existingSnapshotsNameSet = getSnapshots.map { it.snapshotId().name }.toSet()
val remainingSnapshotsName = existingSnapshotsNameSet intersect snapshotsStartedDeletion.toSet()
if (remainingSnapshotsName.isEmpty()) {
// TODO SM notification snapshot deleted
val deletionMessage = "Snapshot(s) $snapshotsStartedDeletion deletion has finished."
job.notificationConfig?.sendDeletionNotification(client, job.policyName, deletionMessage, job.user, log)
metadataBuilder.setLatestExecution(
status = SMMetadata.LatestExecution.Status.SUCCESS,
message = "Snapshots ${metadata.deletion.started} deletion has finished.",
message = deletionMessage,
endTime = now(),
).setDeletionStarted(null)
} else {
Expand All @@ -64,14 +65,10 @@ object DeletionFinishedState : State {
}

log.info("Retention snapshots haven't been deleted: $remainingSnapshotsName.")
metadataBuilder.setDeletionStarted(
remainingSnapshotsName.toList(),
)
}

// TODO SM notification: if now is after next creation time, update nextCreationTime to the next
// and try notify user that we skip the execution because snapshot creation time
// is longer than execution period
// if now is after next deletion time, update next execution schedule
// TODO may want to notify user that we skipped the execution because snapshot deletion time is longer than execution schedule
job.deletion?.let {
val result = tryUpdatingNextExecutionTime(
metadataBuilder, metadata.deletion.trigger.time, job.deletion.schedule, WorkflowType.DELETION, log
Expand Down
Loading

0 comments on commit a46f118

Please sign in to comment.