Skip to content

Commit

Permalink
Refactors shrink action steps and adds unit tests (#349)
Browse files Browse the repository at this point in the history
* Refactors logging

Signed-off-by: Clay Downs <[email protected]>

* Refactors shrink steps

Signed-off-by: Clay Downs <[email protected]>

* Cleans up cleanup

Signed-off-by: Clay Downs <[email protected]>

* Adds unit tests

Signed-off-by: Clay Downs <[email protected]>

* Fixed detekt issues

Signed-off-by: Clay Downs <[email protected]>

* Fixes unit test

Signed-off-by: Clay Downs <[email protected]>

* Refactors cleanup and fail

Signed-off-by: Clay Downs <[email protected]>
  • Loading branch information
downsrob authored May 17, 2022
1 parent e789214 commit 4f8e722
Show file tree
Hide file tree
Showing 8 changed files with 520 additions and 396 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ http
.project
.settings
src/test/resources/job-scheduler/
src/test/resources/bwc/
src/test/resources/bwc/
src/test/resources/notifications-core/
src/test/resources/notifications/

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@

package org.opensearch.indexmanagement.indexstatemanagement.step.shrink

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchSecurityException
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse
import org.opensearch.action.admin.indices.shrink.ResizeRequest
Expand All @@ -16,94 +13,40 @@ import org.opensearch.action.admin.indices.stats.IndicesStatsRequest
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse
import org.opensearch.common.settings.Settings
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.Companion.getSecurityFailureMessage
import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_SHARDS
import org.opensearch.indexmanagement.indexstatemanagement.util.resetReadOnlyAndRouting
import org.opensearch.indexmanagement.indexstatemanagement.util.getNodeFreeMemoryAfterShrink
import org.opensearch.indexmanagement.indexstatemanagement.util.isIndexGreen
import org.opensearch.indexmanagement.indexstatemanagement.util.releaseShrinkLock
import org.opensearch.indexmanagement.indexstatemanagement.util.renewShrinkLock
import org.opensearch.indexmanagement.indexstatemanagement.util.getUpdatedShrinkActionProperties
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ShrinkActionProperties
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.transport.RemoteTransportException
import java.lang.Exception

class AttemptShrinkStep(private val action: ShrinkAction) : Step(name) {
private val logger = LogManager.getLogger(javaClass)
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null
private var shrinkActionProperties: ShrinkActionProperties? = null
class AttemptShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, true, true, false) {

@Suppress("TooGenericExceptionCaught", "ComplexMethod", "ReturnCount")
override suspend fun execute(): AttemptShrinkStep {
val context = this.context ?: return this
@Suppress("ReturnCount")
override suspend fun wrappedExecute(context: StepContext): AttemptShrinkStep {
val indexName = context.metadata.index
val actionMetadata = context.metadata.actionMetaData
val localShrinkActionProperties = actionMetadata?.actionProperties?.shrinkActionProperties
shrinkActionProperties = localShrinkActionProperties
if (localShrinkActionProperties == null) {
logger.error(WaitForMoveShardsStep.METADATA_FAILURE_MESSAGE)
cleanupAndFail(WaitForMoveShardsStep.METADATA_FAILURE_MESSAGE)
return this
}
val lock = renewShrinkLock(localShrinkActionProperties, context.lockService, logger)
if (lock == null) {
logger.error("Shrink action failed to renew lock on node [${localShrinkActionProperties.nodeName}]")
cleanupAndFail("Failed to renew lock on node [${localShrinkActionProperties.nodeName}]")
return this
}
shrinkActionProperties = getUpdatedShrinkActionProperties(localShrinkActionProperties, lock)
try {
if (!isIndexGreen(context.client, indexName)) {
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to INDEX_HEALTH_NOT_GREEN_MESSAGE)
return this
}
if (!isNodeStillSuitable(localShrinkActionProperties.nodeName, indexName, context)) return this
// If the returned shrinkActionProperties are null, then the status has been set to failed, just return
val localShrinkActionProperties = updateAndGetShrinkActionProperties(context) ?: return this

// If the resize index api fails, the step will be set to failed and resizeIndex will return false
if (!resizeIndex(indexName, localShrinkActionProperties, context)) return this
info = mapOf("message" to getSuccessMessage(localShrinkActionProperties.targetIndexName))
stepStatus = StepStatus.COMPLETED
return this
} catch (e: OpenSearchSecurityException) {
cleanupAndFail(getSecurityFailureMessage(e.localizedMessage), e.message, e)
return this
} catch (e: RemoteTransportException) {
val unwrappedException = ExceptionsHelper.unwrapCause(e)
cleanupAndFail(FAILURE_MESSAGE, cause = e.message, e = unwrappedException as Exception)
return this
} catch (e: Exception) {
cleanupAndFail(FAILURE_MESSAGE, e.message, e)
if (!isIndexGreen(context.client, indexName)) {
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to INDEX_HEALTH_NOT_GREEN_MESSAGE)
return this
}
}
if (!isNodeStillSuitable(localShrinkActionProperties.nodeName, indexName, context)) return this

// Sets the action to failed, clears the readonly and allocation settings on the source index, and releases the shrink lock
private suspend fun cleanupAndFail(message: String, cause: String? = null, e: Exception? = null) {
e?.let { logger.error(message, e) }
info = if (cause == null) mapOf("message" to message) else mapOf("message" to message, "cause" to cause)
stepStatus = StepStatus.FAILED
// Non-null assertion !! is used to throw an exception on null which would just be caught and logged
try {
resetReadOnlyAndRouting(context!!.metadata.index, context!!.client, shrinkActionProperties!!.originalIndexSettings)
} catch (e: Exception) {
logger.error("Shrink action failed while trying to clean up routing and readonly setting after a failure: $e")
}
try {
releaseShrinkLock(shrinkActionProperties!!, context!!.lockService, logger)
} catch (e: Exception) {
logger.error("Shrink action failed while trying to release the node lock after a failure: $e")
}
shrinkActionProperties = null
// If the resize index api fails, the step will be set to failed and resizeIndex will return false
if (!resizeIndex(indexName, localShrinkActionProperties, context)) return this
info = mapOf("message" to getSuccessMessage(localShrinkActionProperties.targetIndexName))
stepStatus = StepStatus.COMPLETED
return this
}

override fun getGenericFailureMessage(): String = FAILURE_MESSAGE

@Suppress("ReturnCount")
private suspend fun isNodeStillSuitable(nodeName: String, indexName: String, context: StepContext): Boolean {
// Get the size of the index
Expand All @@ -113,8 +56,7 @@ class AttemptShrinkStep(private val action: ShrinkAction) : Step(name) {
}
val statsStore = statsResponse.total.store
if (statsStore == null) {
logger.error("Shrink action failed as indices stats request was missing store stats.")
cleanupAndFail(FAILURE_MESSAGE)
cleanupAndFail(FAILURE_MESSAGE, "Shrink action failed as indices stats request was missing store stats.")
return false
}
val indexSizeInBytes = statsStore.sizeInBytes
Expand All @@ -124,14 +66,12 @@ class AttemptShrinkStep(private val action: ShrinkAction) : Step(name) {
// If the node has been replaced, this will fail
val node = nodeStatsResponse.nodes.firstOrNull { it.node.name == nodeName }
if (node == null) {
logger.error("Shrink action failed as node stats were missing the previously selected node.")
cleanupAndFail(FAILURE_MESSAGE)
cleanupAndFail(FAILURE_MESSAGE, "Shrink action failed as node stats were missing the previously selected node.")
return false
}
val remainingMem = getNodeFreeMemoryAfterShrink(node, indexSizeInBytes, context.clusterService.clusterSettings)
if (remainingMem < 1L) {
logger.error("Shrink action failed as the previously selected node no longer has enough free space.")
cleanupAndFail(NOT_ENOUGH_SPACE_FAILURE_MESSAGE)
cleanupAndFail(NOT_ENOUGH_SPACE_FAILURE_MESSAGE, NOT_ENOUGH_SPACE_FAILURE_MESSAGE)
return false
}
return true
Expand All @@ -149,8 +89,7 @@ class AttemptShrinkStep(private val action: ShrinkAction) : Step(name) {
action.aliases?.forEach { req.targetIndexRequest.alias(it) }
val resizeResponse: ResizeResponse = context.client.admin().indices().suspendUntil { resizeIndex(req, it) }
if (!resizeResponse.isAcknowledged) {
logger.error("Shrink action failed as the resize index request was not acknowledged.")
cleanupAndFail(FAILURE_MESSAGE)
cleanupAndFail(FAILURE_MESSAGE, "Shrink action failed as the resize index request was not acknowledged.")
return false
}
return true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.step.shrink

import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchSecurityException
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.Companion.getSecurityFailureMessage
import org.opensearch.indexmanagement.indexstatemanagement.util.getUpdatedShrinkActionProperties
import org.opensearch.indexmanagement.indexstatemanagement.util.releaseShrinkLock
import org.opensearch.indexmanagement.indexstatemanagement.util.renewShrinkLock
import org.opensearch.indexmanagement.indexstatemanagement.util.resetReadOnlyAndRouting
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ShrinkActionProperties
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.transport.RemoteTransportException

abstract class ShrinkStep(
name: String,
private val cleanupSettings: Boolean,
private val cleanupLock: Boolean,
private val cleanupTargetIndex: Boolean
) : Step(name) {
protected val logger: Logger = LogManager.getLogger(javaClass)
protected var stepStatus = StepStatus.STARTING
protected var info: Map<String, Any>? = null
protected var shrinkActionProperties: ShrinkActionProperties? = null

@Suppress("ReturnCount")
override suspend fun execute(): Step {
val context = this.context ?: return this
try {
wrappedExecute(context)
} catch (e: OpenSearchSecurityException) {
val securityFailureMessage = getSecurityFailureMessage(e.localizedMessage)
cleanupAndFail(securityFailureMessage, securityFailureMessage, e.message, e)
return this
} catch (e: RemoteTransportException) {
val unwrappedException = ExceptionsHelper.unwrapCause(e)
cleanupAndFail(getGenericFailureMessage(), getGenericFailureMessage(), cause = e.message, e = unwrappedException as java.lang.Exception)
return this
} catch (e: Exception) {
cleanupAndFail(getGenericFailureMessage(), getGenericFailureMessage(), cause = e.message, e = e)
return this
}
return this
}

protected suspend fun cleanupAndFail(infoMessage: String, logMessage: String? = null, cause: String? = null, e: Exception? = null) {
cleanupResources(cleanupSettings, cleanupLock, cleanupTargetIndex)
fail(infoMessage, logMessage, cause, e)
}

abstract fun getGenericFailureMessage(): String

abstract suspend fun wrappedExecute(context: StepContext): Step

@Suppress("ReturnCount")
protected suspend fun updateAndGetShrinkActionProperties(context: StepContext): ShrinkActionProperties? {
val actionMetadata = context.metadata.actionMetaData
var localShrinkActionProperties = actionMetadata?.actionProperties?.shrinkActionProperties
shrinkActionProperties = localShrinkActionProperties
if (localShrinkActionProperties == null) {
cleanupAndFail(METADATA_FAILURE_MESSAGE, METADATA_FAILURE_MESSAGE)
return null
}
val lock = renewShrinkLock(localShrinkActionProperties, context.lockService, logger)
if (lock == null) {
cleanupAndFail(
"Failed to renew lock on node [${localShrinkActionProperties.nodeName}]",
"Shrink action failed to renew lock on node [${localShrinkActionProperties.nodeName}]"
)
return null
}
// After renewing the lock we need to update the primary term and sequence number
localShrinkActionProperties = getUpdatedShrinkActionProperties(localShrinkActionProperties, lock)
shrinkActionProperties = localShrinkActionProperties
return localShrinkActionProperties
}

protected fun fail(infoMessage: String, logMessage: String? = null, cause: String? = null, e: Exception? = null) {
if (logMessage != null) {
if (e != null) {
logger.error(logMessage, e)
} else {
logger.error(logMessage)
}
}
info = if (cause == null) mapOf("message" to infoMessage) else mapOf("message" to infoMessage, "cause" to cause)
stepStatus = StepStatus.FAILED
shrinkActionProperties = null
}

protected suspend fun cleanupResources(resetSettings: Boolean, releaseLock: Boolean, deleteTargetIndex: Boolean) {
val localShrinkActionProperties = shrinkActionProperties
if (localShrinkActionProperties != null) {
if (resetSettings) resetIndexSettings(localShrinkActionProperties)
if (deleteTargetIndex) deleteTargetIndex(localShrinkActionProperties)
if (releaseLock) releaseLock(localShrinkActionProperties)
} else {
logger.error("Shrink action failed to clean up resources due to null shrink action properties.")
}
}

private suspend fun resetIndexSettings(shrinkActionProperties: ShrinkActionProperties) {
val originalIndexSettings = shrinkActionProperties.originalIndexSettings
val indexName = context?.metadata?.index
val client = context?.client
try {
if (indexName != null && client != null) {
val reset = resetReadOnlyAndRouting(indexName, client, originalIndexSettings)
if (!reset) logger.error("Shrink action failed to reset index settings on [$indexName]")
} else {
logger.error("Shrink action failed to reset index settings on [$indexName] due to uninitialized metadata values.")
}
} catch (e: Exception) {
logger.error("Shrink action failed while trying to clean up routing and readonly setting on [$indexName] due to failure: $e")
}
}

@Suppress("NestedBlockDepth")
private suspend fun deleteTargetIndex(shrinkActionProperties: ShrinkActionProperties) {
val client = context?.client
val targetIndexName = shrinkActionProperties.targetIndexName
try {
if (client != null) {
// Use plugin level permissions when deleting the failed target shrink index after a failure
client.threadPool().threadContext.stashContext().use {
val deleteRequest = DeleteIndexRequest(targetIndexName)
val response: AcknowledgedResponse =
client.admin().indices().suspendUntil { delete(deleteRequest, it) }
if (!response.isAcknowledged) {
logger.error("Shrink action failed to delete target index [$targetIndexName] during cleanup after a failure")
}
}
} else {
logger.error(
"Shrink action failed to delete target index [$targetIndexName] after a failure due to a null client in the step context"
)
}
} catch (e: Exception) {
logger.error("Shrink action failed while trying to delete the target index [$targetIndexName] after a failure: $e")
}
}

private suspend fun releaseLock(shrinkActionProperties: ShrinkActionProperties) {
val lockService = context?.lockService
try {
if (lockService != null) {
val released = releaseShrinkLock(shrinkActionProperties, lockService)
if (!released) logger.error("Failed to release Shrink action lock on node [${shrinkActionProperties.nodeName}]")
} else {
logger.error(
"Shrink action failed to release lock on node [${shrinkActionProperties.nodeName}] due to uninitialized metadata values."
)
}
} catch (e: Exception) {
logger.error("Failed to release Shrink action lock on node [${shrinkActionProperties.nodeName}]: $e")
}
}

companion object {
const val METADATA_FAILURE_MESSAGE = "Shrink action properties are null, metadata was not properly populated"
}
}
Loading

0 comments on commit 4f8e722

Please sign in to comment.