diff --git a/README.md b/README.md index 6695fef41..9c9cc8d53 100644 --- a/README.md +++ b/README.md @@ -57,15 +57,6 @@ The project in this package uses the [Gradle](https://docs.gradle.org/current/us However, to build the `index management` plugin project, we also use the Elastic build tools for Gradle. These tools are idiosyncratic and don't always follow the conventions and instructions for building regular Java code using Gradle. Not everything in `index management` will work the way it's described in the Gradle documentation. If you encounter such a situation, the Elastic build tools [source code](https://github.com/elastic/elasticsearch/tree/master/buildSrc/src/main/groovy/org/elasticsearch/gradle) is your best bet for figuring out what's going on. -This project currently uses the Notification subproject from the [Alerting plugin](https://github.com/opendistro-for-elasticsearch/alerting). There is an [open PR](https://github.com/opendistro-for-elasticsearch/alerting/pull/97) that introduces the maven publish task in Alerting for publishing the Notification jars. Until this PR is fully merged and jars published you will need to pull down the PR yourself and publish the jars to your local maven repository in order to build Index Management. - -1. Visit the PR [here](https://github.com/opendistro-for-elasticsearch/alerting/pull/97) and pull down the Alerting plugin along with the PR changes -2. You may need to cherry-pick the changes into a separate branch if you require a specific version to be published -3. Build the Alerting plugin (w/ the changes in PR) and publish the artifacts to your local maven repository - 1. `./gradlew clean` - 2. `./gradlew build` or `./gradlew assemble` build will run the tests and build artifacts, assemble will only build the artifacts - 3. `./gradlew publishToMavenLocal` publishes artifacts to your local maven repository - ### Building from the command line 1. `./gradlew build` builds and tests project. diff --git a/build-tools/esplugin-coverage.gradle b/build-tools/esplugin-coverage.gradle index 56c0b6e93..9f25e8ebe 100644 --- a/build-tools/esplugin-coverage.gradle +++ b/build-tools/esplugin-coverage.gradle @@ -65,8 +65,8 @@ integTestCluster { jacocoTestReport { dependsOn integTest, test executionData dummyTest.jacoco.destinationFile, dummyIntegTest.jacoco.destinationFile - sourceDirectories = sourceSets.main.allSource - classDirectories = sourceSets.main.output + sourceDirectories.from = "src/main/kotlin" + classDirectories.from = sourceSets.main.output reports { html.enabled = true // human readable xml.enabled = true // for coverlay diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexRunner.kt index d6532ae71..1f2099543 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexRunner.kt @@ -290,7 +290,7 @@ object ManagedIndexRunner : ScheduledJobRunner, if (updateResult && state != null && action != null && step != null && currentActionMetaData != null) { // Step null check is done in getStartingManagedIndexMetaData - step.execute() + step.preExecute(logger).execute().postExecute(logger) var executedManagedIndexMetaData = startingManagedIndexMetaData.getCompletedManagedIndexMetaData(action, step) if (executedManagedIndexMetaData.isFailed) { diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/elasticapi/ElasticExtensions.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/elasticapi/ElasticExtensions.kt index 576fea622..7be0288e3 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/elasticapi/ElasticExtensions.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/elasticapi/ElasticExtensions.kt @@ -24,8 +24,10 @@ import com.amazon.opendistroforelasticsearch.jobscheduler.spi.utils.LockService import kotlinx.coroutines.delay import org.apache.logging.log4j.Logger import org.elasticsearch.ElasticsearchException +import org.elasticsearch.ExceptionsHelper import org.elasticsearch.action.ActionListener import org.elasticsearch.action.bulk.BackoffPolicy +import org.elasticsearch.action.support.DefaultShardOperationFailedException import org.elasticsearch.client.ElasticsearchClient import org.elasticsearch.cluster.metadata.IndexMetaData import org.elasticsearch.common.bytes.BytesReference @@ -36,6 +38,7 @@ import org.elasticsearch.common.xcontent.XContentParser import org.elasticsearch.common.xcontent.XContentParserUtils import org.elasticsearch.common.xcontent.XContentType import org.elasticsearch.rest.RestStatus +import org.elasticsearch.transport.RemoteTransportException import java.time.Instant import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException @@ -202,9 +205,7 @@ fun IndexMetaData.getRolloverAlias(): String? { fun IndexMetaData.getClusterStateManagedIndexConfig(): ClusterStateManagedIndexConfig? { val index = this.index.name val uuid = this.index.uuid - val policyID = this.getPolicyID() - - if (policyID == null) return null + val policyID = this.getPolicyID() ?: return null return ClusterStateManagedIndexConfig(index = index, uuid = uuid, policyID = policyID) } @@ -217,3 +218,13 @@ fun IndexMetaData.getManagedIndexMetaData(): ManagedIndexMetaData? { } return null } + +fun Throwable.findRemoteTransportException(): RemoteTransportException? { + if (this is RemoteTransportException) return this + return this.cause?.findRemoteTransportException() +} + +fun DefaultShardOperationFailedException.getUsefulCauseString(): String { + val rte = this.cause?.findRemoteTransportException() + return if (rte == null) this.toString() else ExceptionsHelper.unwrapCause(rte).toString() +} diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/coordinator/SweptManagedIndexConfig.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/coordinator/SweptManagedIndexConfig.kt index 52feb7784..07761bd90 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/coordinator/SweptManagedIndexConfig.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/coordinator/SweptManagedIndexConfig.kt @@ -40,6 +40,7 @@ data class SweptManagedIndexConfig( ) { companion object { + @Suppress("ComplexMethod") @JvmStatic @Throws(IOException::class) fun parse(xcp: XContentParser, seqNo: Long, primaryTerm: Long): SweptManagedIndexConfig { diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/Step.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/Step.kt index 35cd30051..81ddc9da8 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/Step.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/Step.kt @@ -17,6 +17,7 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.step import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData +import org.apache.logging.log4j.Logger import org.elasticsearch.common.io.stream.StreamInput import org.elasticsearch.common.io.stream.StreamOutput import org.elasticsearch.common.io.stream.Writeable @@ -25,7 +26,17 @@ import java.util.Locale abstract class Step(val name: String, val managedIndexMetaData: ManagedIndexMetaData, val isSafeToDisableOn: Boolean = true) { - abstract suspend fun execute() + fun preExecute(logger: Logger): Step { + logger.info("Executing $name for ${managedIndexMetaData.index}") + return this + } + + abstract suspend fun execute(): Step + + fun postExecute(logger: Logger): Step { + logger.info("Finished executing $name for ${managedIndexMetaData.index}") + return this + } abstract fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData @@ -44,9 +55,7 @@ abstract class Step(val name: String, val managedIndexMetaData: ManagedIndexMeta */ abstract fun isIdempotent(): Boolean - fun getStartingStepMetaData(): StepMetaData { - return StepMetaData(name, getStepStartTime().toEpochMilli(), StepStatus.STARTING) - } + fun getStartingStepMetaData(): StepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), StepStatus.STARTING) fun getStepStartTime(): Instant { if (managedIndexMetaData.stepMetaData == null || managedIndexMetaData.stepMetaData.name != this.name) { @@ -55,6 +64,8 @@ abstract class Step(val name: String, val managedIndexMetaData: ManagedIndexMeta return Instant.ofEpochMilli(managedIndexMetaData.stepMetaData.startTime) } + protected val indexName: String = managedIndexMetaData.index + enum class StepStatus(val status: String) : Writeable { STARTING("starting"), CONDITION_NOT_MET("condition_not_met"), diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/close/AttemptCloseStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/close/AttemptCloseStep.kt index 7389d4970..ebe7bd71d 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/close/AttemptCloseStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/close/AttemptCloseStep.kt @@ -21,11 +21,13 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.C import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step import org.apache.logging.log4j.LogManager +import org.elasticsearch.ExceptionsHelper import org.elasticsearch.action.admin.indices.close.CloseIndexRequest import org.elasticsearch.action.admin.indices.close.CloseIndexResponse import org.elasticsearch.client.Client import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.snapshots.SnapshotInProgressException +import org.elasticsearch.transport.RemoteTransportException class AttemptCloseStep( val clusterService: ClusterService, @@ -41,34 +43,52 @@ class AttemptCloseStep( override fun isIdempotent() = true @Suppress("TooGenericExceptionCaught") - override suspend fun execute() { - val index = managedIndexMetaData.index + override suspend fun execute(): AttemptCloseStep { try { - logger.info("Executing close on $index") val closeIndexRequest = CloseIndexRequest() - .indices(index) + .indices(indexName) val response: CloseIndexResponse = client.admin().indices().suspendUntil { close(closeIndexRequest, it) } - logger.info("Close index for $index was acknowledged=${response.isAcknowledged}") if (response.isAcknowledged) { stepStatus = StepStatus.COMPLETED - info = mapOf("message" to "Successfully closed index") + info = mapOf("message" to getSuccessMessage(indexName)) } else { + val message = getFailedMessage(indexName) + logger.warn(message) stepStatus = StepStatus.FAILED - info = mapOf("message" to "Failed to close index") + info = mapOf("message" to message) + } + } catch (e: RemoteTransportException) { + val cause = ExceptionsHelper.unwrapCause(e) + if (cause is SnapshotInProgressException) { + handleSnapshotException(cause) + } else { + handleException(cause as Exception) } } catch (e: SnapshotInProgressException) { - logger.warn("Failed to close index [index=$index] with snapshot in progress") - stepStatus = StepStatus.CONDITION_NOT_MET - info = mapOf("message" to "Index had snapshot in progress, retrying closing") + handleSnapshotException(e) } catch (e: Exception) { - logger.error("Failed to set index to close [index=$index]", e) - stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to "Failed to set index to close") - val errorMessage = e.message - if (errorMessage != null) mutableInfo["cause"] = errorMessage - info = mutableInfo.toMap() + handleException(e) } + + return this + } + + private fun handleSnapshotException(e: SnapshotInProgressException) { + val message = getSnapshotMessage(indexName) + logger.warn(message, e) + stepStatus = StepStatus.CONDITION_NOT_MET + info = mapOf("message" to message) + } + + private fun handleException(e: Exception) { + val message = getFailedMessage(indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() } override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData { @@ -78,4 +98,10 @@ class AttemptCloseStep( info = info ) } + + companion object { + fun getFailedMessage(index: String) = "Failed to close index [index=$index]" + fun getSuccessMessage(index: String) = "Successfully closed index [index=$index]" + fun getSnapshotMessage(index: String) = "Index had snapshot in progress, retrying closing [index=$index]" + } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/delete/AttemptDeleteStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/delete/AttemptDeleteStep.kt index a7ad4dba3..b7bd75a1f 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/delete/AttemptDeleteStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/delete/AttemptDeleteStep.kt @@ -21,11 +21,13 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.D import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step import org.apache.logging.log4j.LogManager +import org.elasticsearch.ExceptionsHelper import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest import org.elasticsearch.action.support.master.AcknowledgedResponse import org.elasticsearch.client.Client import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.snapshots.SnapshotInProgressException +import org.elasticsearch.transport.RemoteTransportException import java.lang.Exception class AttemptDeleteStep( @@ -42,30 +44,51 @@ class AttemptDeleteStep( override fun isIdempotent() = true @Suppress("TooGenericExceptionCaught") - override suspend fun execute() { + override suspend fun execute(): AttemptDeleteStep { try { val response: AcknowledgedResponse = client.admin().indices() - .suspendUntil { delete(DeleteIndexRequest(managedIndexMetaData.index), it) } + .suspendUntil { delete(DeleteIndexRequest(indexName), it) } if (response.isAcknowledged) { stepStatus = StepStatus.COMPLETED - info = mapOf("message" to "Deleted index") + info = mapOf("message" to getSuccessMessage(indexName)) } else { + val message = getFailedMessage(indexName) + logger.warn(message) stepStatus = StepStatus.FAILED - info = mapOf("message" to "Failed to delete index") + info = mapOf("message" to message) + } + } catch (e: RemoteTransportException) { + val cause = ExceptionsHelper.unwrapCause(e) + if (cause is SnapshotInProgressException) { + handleSnapshotException(cause) + } else { + handleException(cause as Exception) } } catch (e: SnapshotInProgressException) { - logger.warn("Failed to delete index [index=${managedIndexMetaData.index}] with snapshot in progress") - stepStatus = StepStatus.CONDITION_NOT_MET - info = mapOf("message" to "Index had snapshot in progress, retrying deletion") + handleSnapshotException(e) } catch (e: Exception) { - logger.error("Failed to delete index [index=${managedIndexMetaData.index}]", e) - stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to "Failed to delete index") - val errorMessage = e.message - if (errorMessage != null) mutableInfo["cause"] = errorMessage - info = mutableInfo.toMap() + handleException(e) } + + return this + } + + private fun handleSnapshotException(e: SnapshotInProgressException) { + val message = getSnapshotMessage(indexName) + logger.warn(message, e) + stepStatus = StepStatus.CONDITION_NOT_MET + info = mapOf("message" to message) + } + + private fun handleException(e: Exception) { + val message = getFailedMessage(indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() } override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData { @@ -78,5 +101,8 @@ class AttemptDeleteStep( companion object { const val name = "attempt_delete" + fun getFailedMessage(index: String) = "Failed to delete index [index=$index]" + fun getSuccessMessage(index: String) = "Successfully deleted index [index=$index]" + fun getSnapshotMessage(index: String) = "Index had snapshot in progress, retrying deletion [index=$index]" } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptCallForceMergeStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptCallForceMergeStep.kt index 66e4baae1..72544a395 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptCallForceMergeStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptCallForceMergeStep.kt @@ -15,18 +15,27 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.step.forcemerge +import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.getUsefulCauseString import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.suspendUntil import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ForceMergeActionConfig import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.ActionProperties import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import org.elasticsearch.client.Client import org.elasticsearch.cluster.service.ClusterService import org.apache.logging.log4j.LogManager +import org.elasticsearch.ExceptionsHelper import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse import org.elasticsearch.rest.RestStatus +import org.elasticsearch.transport.RemoteTransportException +import java.time.Instant class AttemptCallForceMergeStep( val clusterService: ClusterService, @@ -42,35 +51,61 @@ class AttemptCallForceMergeStep( override fun isIdempotent() = false @Suppress("TooGenericExceptionCaught") - override suspend fun execute() { + override suspend fun execute(): AttemptCallForceMergeStep { try { - val indexName = managedIndexMetaData.index - logger.info("Attempting to force merge on [$indexName]") + val startTime = Instant.now().toEpochMilli() val request = ForceMergeRequest(indexName).maxNumSegments(config.maxNumSegments) - val response: ForceMergeResponse = client.admin().indices().suspendUntil { forceMerge(request, it) } + var response: ForceMergeResponse? = null + var throwable: Throwable? = null + GlobalScope.launch(Dispatchers.IO + CoroutineName("ISM-ForceMerge-$indexName")) { + try { + response = client.admin().indices().suspendUntil { forceMerge(request, it) } + if (response?.status == RestStatus.OK) { + logger.info(getSuccessMessage(indexName)) + } else { + logger.warn(getFailedMessage(indexName)) + } + } catch (t: Throwable) { + throwable = t + } + } + + while (response == null && (Instant.now().toEpochMilli() - startTime) < FIVE_MINUTES_IN_MILLIS) { + delay(FIVE_SECONDS_IN_MILLIS) + throwable?.let { throw it } + } - // If response is OK then the force merge operation has started - if (response.status == RestStatus.OK) { + val shadowedResponse = response + if (shadowedResponse?.let { it.status == RestStatus.OK } != false) { stepStatus = StepStatus.COMPLETED - info = mapOf("message" to "Started force merge") + info = mapOf("message" to if (shadowedResponse == null) getSuccessfulCallMessage(indexName) else getSuccessMessage(indexName)) } else { // Otherwise the request to force merge encountered some problem stepStatus = StepStatus.FAILED info = mapOf( - "message" to "Failed to start force merge", - "status" to response.status, - "shard_failures" to response.shardFailures.map { it.toString() } + "message" to getFailedMessage(indexName), + "status" to shadowedResponse.status, + "shard_failures" to shadowedResponse.shardFailures.map { it.getUsefulCauseString() } ) } + } catch (e: RemoteTransportException) { + handleException(ExceptionsHelper.unwrapCause(e) as Exception) } catch (e: Exception) { - logger.error("Failed to start force merge [index=${managedIndexMetaData.index}]", e) - stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to "Failed to start force merge") - val errorMessage = e.message - if (errorMessage != null) mutableInfo["cause"] = errorMessage - info = mutableInfo.toMap() + handleException(e) } + + return this + } + + private fun handleException(e: Exception) { + val message = getFailedMessage(indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() } override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData { @@ -87,5 +122,10 @@ class AttemptCallForceMergeStep( companion object { const val name = "attempt_call_force_merge" + const val FIVE_MINUTES_IN_MILLIS = 1000 * 60 * 5 // how long to wait for the force merge request before moving on + const val FIVE_SECONDS_IN_MILLIS = 1000L * 5L // delay + fun getFailedMessage(index: String) = "Failed to start force merge [index=$index]" + fun getSuccessfulCallMessage(index: String) = "Successfully called force merge [index=$index]" + fun getSuccessMessage(index: String) = "Successfully completed force merge [index=$index]" } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptSetReadOnlyStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptSetReadOnlyStep.kt index 5894d2fca..daf0fac5d 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptSetReadOnlyStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptSetReadOnlyStep.kt @@ -21,12 +21,14 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.F import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step import org.apache.logging.log4j.LogManager +import org.elasticsearch.ExceptionsHelper import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest import org.elasticsearch.action.support.master.AcknowledgedResponse import org.elasticsearch.client.Client -import org.elasticsearch.cluster.metadata.IndexMetaData +import org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_WRITE import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.common.settings.Settings +import org.elasticsearch.transport.RemoteTransportException class AttemptSetReadOnlyStep( val clusterService: ClusterService, @@ -41,18 +43,17 @@ class AttemptSetReadOnlyStep( override fun isIdempotent() = true - override suspend fun execute() { - val indexName = managedIndexMetaData.index - - logger.info("Attempting to set [$indexName] to read-only for force_merge action") + override suspend fun execute(): AttemptSetReadOnlyStep { val indexSetToReadOnly = setIndexToReadOnly(indexName) // If setIndexToReadOnly returns false, updating settings failed and failed info was already updated, can return early - if (!indexSetToReadOnly) return + if (!indexSetToReadOnly) return this // Complete step since index is read-only stepStatus = StepStatus.COMPLETED - info = mapOf("message" to "Set index to read-only") + info = mapOf("message" to getSuccessMessage(indexName)) + + return this } @Suppress("TooGenericExceptionCaught") @@ -60,37 +61,44 @@ class AttemptSetReadOnlyStep( try { val updateSettingsRequest = UpdateSettingsRequest() .indices(indexName) - .settings( - Settings.builder().put(IndexMetaData.SETTING_BLOCKS_WRITE, true) - ) + .settings(Settings.builder().put(SETTING_BLOCKS_WRITE, true)) val response: AcknowledgedResponse = client.admin().indices() .suspendUntil { updateSettings(updateSettingsRequest, it) } if (response.isAcknowledged) { - logger.info("Successfully set [$indexName] to read-only for force_merge action") return true } // If response is not acknowledged, then add failed info - logger.error("Request to set [$indexName] to read-only was NOT acknowledged") + val message = getFailedMessage(indexName) + logger.warn(message) stepStatus = StepStatus.FAILED - info = mapOf("message" to "Failed to set index to read-only") + info = mapOf("message" to message) + } catch (e: RemoteTransportException) { + handleException(ExceptionsHelper.unwrapCause(e) as Exception) } catch (e: Exception) { - logger.error("Failed to set index to read-only [index=${managedIndexMetaData.index}]", e) - stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to "Failed to set index to read-only") - val errorMessage = e.message - if (errorMessage != null) mutableInfo["cause"] = errorMessage - info = mutableInfo.toMap() + handleException(e) } return false } + private fun handleException(e: Exception) { + val message = getFailedMessage(indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() + } + override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData = currentMetaData.copy(stepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), stepStatus), transitionTo = null, info = info) companion object { const val name = "attempt_set_read_only" + fun getFailedMessage(index: String) = "Failed to set index to read-only [index=$index]" + fun getSuccessMessage(index: String) = "Successfully set index to read-only [index=$index]" } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt index 04d3989f9..2a543174d 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt @@ -15,6 +15,7 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.step.forcemerge +import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.getUsefulCauseString import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.suspendUntil import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ForceMergeActionConfig @@ -43,26 +44,22 @@ class WaitForForceMergeStep( override fun isIdempotent() = true - @Suppress("TooGenericExceptionCaught") - override suspend fun execute() { - val indexName = managedIndexMetaData.index - - logger.info("Checking if force merge is complete on [$indexName]") - + @Suppress("TooGenericExceptionCaught", "ReturnCount") + override suspend fun execute(): WaitForForceMergeStep { // Retrieve maxNumSegments value from ActionProperties. If ActionProperties is null, update failed info and return early. - val maxNumSegments = getMaxNumSegments() ?: return + val maxNumSegments = getMaxNumSegments() ?: return this // Get the number of shards with a segment count greater than maxNumSegments, meaning they are still merging val shardsStillMergingSegments = getShardsStillMergingSegments(indexName, maxNumSegments) // If shardsStillMergingSegments is null, failed info has already been updated and can return early - shardsStillMergingSegments ?: return + shardsStillMergingSegments ?: return this // If there are no longer shardsStillMergingSegments, then the force merge has completed if (shardsStillMergingSegments == 0) { - logger.info("Force merge completed on [$indexName]") - + val message = getSuccessMessage(indexName) + logger.info(message) stepStatus = StepStatus.COMPLETED - info = mapOf("message" to "Force merge completed") + info = mapOf("message" to message) } else { /* * If there are still shards with segments merging then no action is taken and the step will be reevaluated @@ -79,21 +76,21 @@ class WaitForForceMergeStep( val timeoutInSeconds: Long = config.configTimeout?.timeout?.seconds ?: FORCE_MERGE_TIMEOUT_IN_SECONDS if (timeWaitingForForceMerge.toSeconds() > timeoutInSeconds) { - logger.error( - "Force merge on [$indexName] timed out with [$shardsStillMergingSegments] shards containing unmerged segments" - ) + logger.error("Force merge on [$indexName] timed out with" + + " [$shardsStillMergingSegments] shards containing unmerged segments") stepStatus = StepStatus.FAILED - info = mapOf("message" to "Force merge timed out") + info = mapOf("message" to getFailedTimedOutMessage(indexName)) } else { - logger.debug( - "Force merge still running on [$indexName] with [$shardsStillMergingSegments] shards containing unmerged segments" - ) + logger.debug("Force merge still running on [$indexName] with" + + " [$shardsStillMergingSegments] shards containing unmerged segments") stepStatus = StepStatus.CONDITION_NOT_MET - info = mapOf("message" to "Waiting for force merge to complete") + info = mapOf("message" to getWaitingMessage(indexName)) } } + + return this } private fun getMaxNumSegments(): Int? { @@ -125,17 +122,18 @@ class WaitForForceMergeStep( } } - logger.debug("Failed to get index stats for index: [$indexName], status response: [${statsResponse.status}]") - + val message = getFailedSegmentCheckMessage(indexName) + logger.warn("$message - ${statsResponse.status}") stepStatus = StepStatus.FAILED info = mapOf( - "message" to "Failed to check segments when waiting for force merge to complete", - "shard_failures" to statsResponse.shardFailures.map { it.toString() } + "message" to message, + "shard_failures" to statsResponse.shardFailures.map { it.getUsefulCauseString() } ) } catch (e: Exception) { - logger.error("Failed to check segments when waiting for force merge to complete [index=${managedIndexMetaData.index}]", e) + val message = getFailedSegmentCheckMessage(indexName) + logger.error(message, e) stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to "Failed to check segments when waiting for force merge to complete") + val mutableInfo = mutableMapOf("message" to message) val errorMessage = e.message if (errorMessage != null) mutableInfo["cause"] = errorMessage info = mutableInfo.toMap() @@ -169,7 +167,10 @@ class WaitForForceMergeStep( companion object { const val name = "wait_for_force_merge" - const val FORCE_MERGE_TIMEOUT_IN_SECONDS = 43200L // 12 hours + fun getFailedTimedOutMessage(index: String) = "Force merge timed out [index=$index]" + fun getFailedSegmentCheckMessage(index: String) = "Failed to check segments when waiting for force merge to complete [index=$index]" + fun getWaitingMessage(index: String) = "Waiting for force merge to complete [index=$index]" + fun getSuccessMessage(index: String) = "Successfully confirmed segments force merged [index=$index]" } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/notification/AttemptNotificationStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/notification/AttemptNotificationStep.kt index 75b8c7bd7..39f8e5c63 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/notification/AttemptNotificationStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/notification/AttemptNotificationStep.kt @@ -44,24 +44,30 @@ class AttemptNotificationStep( override fun isIdempotent() = false @Suppress("TooGenericExceptionCaught") - override suspend fun execute() { + override suspend fun execute(): AttemptNotificationStep { try { - logger.info("Executing $name on ${managedIndexMetaData.index}") withContext(Dispatchers.IO) { config.destination.publish(null, compileTemplate(config.messageTemplate, managedIndexMetaData)) } // publish internally throws an error for any invalid responses so its safe to assume if we reach this point it was successful stepStatus = StepStatus.COMPLETED - info = mapOf("message" to "Successfully sent notification") + info = mapOf("message" to getSuccessMessage(indexName)) } catch (e: Exception) { - logger.error("Failed to send notification [index=${managedIndexMetaData.index}]", e) - stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to "Failed to send notification") - val errorMessage = e.message - if (errorMessage != null) mutableInfo["cause"] = errorMessage - info = mutableInfo.toMap() + handleException(e) } + + return this + } + + private fun handleException(e: Exception) { + val message = getFailedMessage(indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() } override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData { @@ -77,4 +83,9 @@ class AttemptNotificationStep( .newInstance(template.params + mapOf("ctx" to managedIndexMetaData.convertToMap())) .execute() } + + companion object { + fun getFailedMessage(index: String) = "Failed to send notification [index=$index]" + fun getSuccessMessage(index: String) = "Successfully sent notification [index=$index]" + } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/open/AttemptOpenStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/open/AttemptOpenStep.kt index 950357ad1..38d378808 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/open/AttemptOpenStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/open/AttemptOpenStep.kt @@ -21,10 +21,12 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.O import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step import org.apache.logging.log4j.LogManager +import org.elasticsearch.ExceptionsHelper import org.elasticsearch.action.admin.indices.open.OpenIndexRequest import org.elasticsearch.action.admin.indices.open.OpenIndexResponse import org.elasticsearch.client.Client import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.transport.RemoteTransportException class AttemptOpenStep( val clusterService: ClusterService, @@ -40,28 +42,38 @@ class AttemptOpenStep( override fun isIdempotent() = true @Suppress("TooGenericExceptionCaught") - override suspend fun execute() { + override suspend fun execute(): AttemptOpenStep { try { - logger.info("Executing open on ${managedIndexMetaData.index}") val openIndexRequest = OpenIndexRequest() - .indices(managedIndexMetaData.index) + .indices(indexName) val response: OpenIndexResponse = client.admin().indices().suspendUntil { open(openIndexRequest, it) } if (response.isAcknowledged) { stepStatus = StepStatus.COMPLETED - info = mapOf("message" to "Successfully opened index") + info = mapOf("message" to getSuccessMessage(indexName)) } else { + val message = getFailedMessage(indexName) + logger.warn(message) stepStatus = StepStatus.FAILED - info = mapOf("message" to "Failed to open index: ${managedIndexMetaData.index}") + info = mapOf("message" to message) } + } catch (e: RemoteTransportException) { + handleException(ExceptionsHelper.unwrapCause(e) as Exception) } catch (e: Exception) { - logger.error("Failed to set index to open [index=${managedIndexMetaData.index}]", e) - stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to "Failed to set index to open") - val errorMessage = e.message - if (errorMessage != null) mutableInfo["cause"] = errorMessage - info = mutableInfo.toMap() + handleException(e) } + + return this + } + + private fun handleException(e: Exception) { + val message = getFailedMessage(indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() } override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData { @@ -71,4 +83,9 @@ class AttemptOpenStep( info = info ) } + + companion object { + fun getFailedMessage(index: String) = "Failed to open index [index=$index]" + fun getSuccessMessage(index: String) = "Successfully opened index [index=$index]" + } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readonly/SetReadOnlyStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readonly/SetReadOnlyStep.kt index 9d217515c..4642c6d6b 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readonly/SetReadOnlyStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readonly/SetReadOnlyStep.kt @@ -21,11 +21,14 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.R import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step import org.apache.logging.log4j.LogManager +import org.elasticsearch.ExceptionsHelper import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest import org.elasticsearch.action.support.master.AcknowledgedResponse import org.elasticsearch.client.Client +import org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_WRITE import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.common.settings.Settings +import org.elasticsearch.transport.RemoteTransportException class SetReadOnlyStep( val clusterService: ClusterService, @@ -41,31 +44,40 @@ class SetReadOnlyStep( override fun isIdempotent() = true @Suppress("TooGenericExceptionCaught") - override suspend fun execute() { + override suspend fun execute(): SetReadOnlyStep { try { val updateSettingsRequest = UpdateSettingsRequest() - .indices(managedIndexMetaData.index) - .settings( - Settings.builder().put("index.blocks.write", true) - ) + .indices(indexName) + .settings(Settings.builder().put(SETTING_BLOCKS_WRITE, true)) val response: AcknowledgedResponse = client.admin().indices() .suspendUntil { updateSettings(updateSettingsRequest, it) } if (response.isAcknowledged) { stepStatus = StepStatus.COMPLETED - info = mapOf("message" to "Set index to read-only") + info = mapOf("message" to getSuccessMessage(indexName)) } else { + val message = getFailedMessage(indexName) + logger.warn(message) stepStatus = StepStatus.FAILED - info = mapOf("message" to "Failed to set index to read-only") + info = mapOf("message" to message) } + } catch (e: RemoteTransportException) { + handleException(ExceptionsHelper.unwrapCause(e) as Exception) } catch (e: Exception) { - logger.error("Failed to set index to read-only [index=${managedIndexMetaData.index}]", e) - stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to "Failed to set index to read-only") - val errorMessage = e.message - if (errorMessage != null) mutableInfo["cause"] = errorMessage - info = mutableInfo.toMap() + handleException(e) } + + return this + } + + private fun handleException(e: Exception) { + val message = getFailedMessage(indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() } override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData { @@ -75,4 +87,9 @@ class SetReadOnlyStep( info = info ) } + + companion object { + fun getFailedMessage(index: String) = "Failed to set index to read-only [index=$index]" + fun getSuccessMessage(index: String) = "Successfully set index to read-only [index=$index]" + } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readwrite/SetReadWriteStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readwrite/SetReadWriteStep.kt index f38d6ce3f..0e77c8c1b 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readwrite/SetReadWriteStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readwrite/SetReadWriteStep.kt @@ -21,11 +21,14 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.R import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step import org.apache.logging.log4j.LogManager +import org.elasticsearch.ExceptionsHelper import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest import org.elasticsearch.action.support.master.AcknowledgedResponse import org.elasticsearch.client.Client +import org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_WRITE import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.common.settings.Settings +import org.elasticsearch.transport.RemoteTransportException class SetReadWriteStep( val clusterService: ClusterService, @@ -41,31 +44,42 @@ class SetReadWriteStep( override fun isIdempotent() = true @Suppress("TooGenericExceptionCaught") - override suspend fun execute() { + override suspend fun execute(): SetReadWriteStep { try { val updateSettingsRequest = UpdateSettingsRequest() - .indices(managedIndexMetaData.index) + .indices(indexName) .settings( - Settings.builder().put("index.blocks.write", false) + Settings.builder().put(SETTING_BLOCKS_WRITE, false) ) val response: AcknowledgedResponse = client.admin().indices() .suspendUntil { updateSettings(updateSettingsRequest, it) } if (response.isAcknowledged) { stepStatus = StepStatus.COMPLETED - info = mapOf("message" to "Set index to read-write") + info = mapOf("message" to getSuccessMessage(indexName)) } else { + val message = getFailedMessage(indexName) + logger.warn(message) stepStatus = StepStatus.FAILED - info = mapOf("message" to "Failed to set index to read-write") + info = mapOf("message" to message) } + } catch (e: RemoteTransportException) { + handleException(ExceptionsHelper.unwrapCause(e) as Exception) } catch (e: Exception) { - logger.error("Failed to set index to read-write [index=${managedIndexMetaData.index}]", e) - stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to "Failed to set index to read-write") - val errorMessage = e.message - if (errorMessage != null) mutableInfo["cause"] = errorMessage - info = mutableInfo.toMap() + handleException(e) } + + return this + } + + private fun handleException(e: Exception) { + val message = getFailedMessage(indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() } override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData { @@ -75,4 +89,9 @@ class SetReadWriteStep( info = info ) } + + companion object { + fun getFailedMessage(index: String) = "Failed to set index to read-write [index=$index]" + fun getSuccessMessage(index: String) = "Successfully set index to read-write [index=$index]" + } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/replicacount/AttemptSetReplicaCountStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/replicacount/AttemptSetReplicaCountStep.kt index e452736b0..adfb08964 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/replicacount/AttemptSetReplicaCountStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/replicacount/AttemptSetReplicaCountStep.kt @@ -21,11 +21,14 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.R import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step import org.apache.logging.log4j.LogManager +import org.elasticsearch.ExceptionsHelper import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest import org.elasticsearch.action.support.master.AcknowledgedResponse import org.elasticsearch.client.Client +import org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.common.settings.Settings +import org.elasticsearch.transport.RemoteTransportException class AttemptSetReplicaCountStep( val clusterService: ClusterService, @@ -37,36 +40,45 @@ class AttemptSetReplicaCountStep( private val logger = LogManager.getLogger(javaClass) private var stepStatus = StepStatus.STARTING private var info: Map? = null + private val numOfReplicas = config.numOfReplicas override fun isIdempotent() = true @Suppress("TooGenericExceptionCaught") - override suspend fun execute() { - val numOfReplicas = config.numOfReplicas + override suspend fun execute(): AttemptSetReplicaCountStep { try { - logger.info("Executing $name on ${managedIndexMetaData.index}") val updateSettingsRequest = UpdateSettingsRequest() - .indices(managedIndexMetaData.index) - .settings(Settings.builder().put("index.number_of_replicas", numOfReplicas)) + .indices(indexName) + .settings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, numOfReplicas)) val response: AcknowledgedResponse = client.admin().indices() .suspendUntil { updateSettings(updateSettingsRequest, it) } if (response.isAcknowledged) { - logger.info("Successfully executed $name on ${managedIndexMetaData.index}") stepStatus = StepStatus.COMPLETED - info = mapOf("message" to "Set number_of_replicas to $numOfReplicas") + info = mapOf("message" to getSuccessMessage(indexName, numOfReplicas)) } else { + val message = getFailedMessage(indexName, numOfReplicas) + logger.warn(message) stepStatus = StepStatus.FAILED - info = mapOf("message" to "Failed to set number_of_replicas to $numOfReplicas") + info = mapOf("message" to message) } + } catch (e: RemoteTransportException) { + handleException(ExceptionsHelper.unwrapCause(e) as Exception) } catch (e: Exception) { - logger.error("Failed to set number_of_replicas [index=${managedIndexMetaData.index}]", e) - stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to "Failed to set number_of_replicas to $numOfReplicas") - val errorMessage = e.message - if (errorMessage != null) mutableInfo["cause"] = errorMessage - info = mutableInfo.toMap() + handleException(e) } + + return this + } + + private fun handleException(e: Exception) { + val message = getFailedMessage(indexName, numOfReplicas) + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() } override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData { @@ -76,4 +88,9 @@ class AttemptSetReplicaCountStep( info = info ) } + + companion object { + fun getFailedMessage(index: String, numOfReplicas: Int) = "Failed to set number_of_replicas to $numOfReplicas [index=$index]" + fun getSuccessMessage(index: String, numOfReplicas: Int) = "Successfully set number_of_replicas to $numOfReplicas [index=$index]" + } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt index 56410dfa9..13b5ed023 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt @@ -16,6 +16,7 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.step.rollover import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.getRolloverAlias +import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.getUsefulCauseString import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.suspendUntil import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.RolloverActionConfig @@ -30,6 +31,7 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse import org.elasticsearch.client.Client import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.common.unit.ByteSizeValue +import org.elasticsearch.common.unit.TimeValue import org.elasticsearch.rest.RestStatus import java.time.Instant @@ -48,43 +50,69 @@ class AttemptRolloverStep( override fun isIdempotent() = false @Suppress("TooGenericExceptionCaught") - override suspend fun execute() { - val index = managedIndexMetaData.index + override suspend fun execute(): AttemptRolloverStep { // If we have already rolled over this index then fail as we only allow an index to be rolled over once if (managedIndexMetaData.rolledOver == true) { - logger.warn("$index was already rolled over, cannot execute rollover step") + logger.warn("$indexName was already rolled over, cannot execute rollover step") stepStatus = StepStatus.FAILED - info = mapOf("message" to "This index has already been rolled over") - return + info = mapOf("message" to getFailedDuplicateRolloverMessage(indexName)) + return this } val alias = getAliasOrUpdateInfo() // If alias is null we already updated failed info from getAliasOrUpdateInfo and can return early - alias ?: return + alias ?: return this val statsResponse = getIndexStatsOrUpdateInfo() // If statsResponse is null we already updated failed info from getIndexStatsOrUpdateInfo and can return early - statsResponse ?: return + statsResponse ?: return this - val indexCreationDate = clusterService.state().metaData().index(index).creationDate - val indexCreationDateInstant = Instant.ofEpochMilli(indexCreationDate) - if (indexCreationDate == -1L) { - logger.warn("$index had an indexCreationDate=-1L, cannot use for comparison") + val indexCreationDate = clusterService.state().metaData().index(indexName).creationDate + val indexAgeTimeValue = if (indexCreationDate == -1L) { + logger.warn("$indexName had an indexCreationDate=-1L, cannot use for comparison") + // since we cannot use for comparison, we can set it to 0 as minAge will never be <= 0 + TimeValue.timeValueMillis(0) + } else { + TimeValue.timeValueMillis(Instant.now().toEpochMilli() - indexCreationDate) } val numDocs = statsResponse.primaries.docs?.count ?: 0 val indexSize = ByteSizeValue(statsResponse.primaries.docs?.totalSizeInBytes ?: 0) - - if (config.evaluateConditions(indexCreationDateInstant, numDocs, indexSize)) { - logger.info("$index rollover conditions evaluated to true [indexCreationDate=$indexCreationDate," + + val conditions = listOfNotNull( + config.minAge?.let { + RolloverActionConfig.MIN_INDEX_AGE_FIELD to mapOf( + "condition" to it.toString(), + "current" to indexAgeTimeValue.toString(), + "creationDate" to indexCreationDate + ) + }, + config.minDocs?.let { + RolloverActionConfig.MIN_DOC_COUNT_FIELD to mapOf( + "condition" to it, + "current" to numDocs + ) + }, + config.minSize?.let { + RolloverActionConfig.MIN_SIZE_FIELD to mapOf( + "condition" to it.toString(), + "current" to indexSize.toString() + ) + } + ).toMap() + + if (config.evaluateConditions(indexAgeTimeValue, numDocs, indexSize)) { + logger.info("$indexName rollover conditions evaluated to true [indexCreationDate=$indexCreationDate," + " numDocs=$numDocs, indexSize=${indexSize.bytes}]") - executeRollover(alias) + executeRollover(alias, conditions) } else { stepStatus = StepStatus.CONDITION_NOT_MET - info = mapOf("message" to "Attempting to rollover") + info = mapOf("message" to getAttemptingMessage(indexName), "conditions" to conditions) } + + return this } - private suspend fun executeRollover(alias: String) { + @Suppress("ComplexMethod") + private suspend fun executeRollover(alias: String, conditions: Map>) { try { val request = RolloverRequest(alias, null) val response: RolloverResponse = client.admin().indices().suspendUntil { rolloverIndex(request, it) } @@ -95,29 +123,34 @@ class AttemptRolloverStep( // If response isAcknowledged it means the index was created and alias was added to new index if (response.isAcknowledged) { stepStatus = StepStatus.COMPLETED - info = mapOf("message" to "Rolled over index") + info = listOfNotNull( + "message" to getSuccessMessage(indexName), + if (conditions.isEmpty()) null else "conditions" to conditions // don't show empty conditions object if no conditions specified + ).toMap() } else { // If the alias update response is NOT acknowledged we will get back isAcknowledged=false // This means the new index was created but we failed to swap the alias + val message = getFailedAliasUpdateMessage(indexName, response.newIndex) + logger.warn(message) stepStatus = StepStatus.FAILED - info = mapOf("message" to "New index created (${response.newIndex}), but failed to update alias") + info = listOfNotNull( + "message" to message, + if (conditions.isEmpty()) null else "conditions" to conditions // don't show empty conditions object if no conditions specified + ).toMap() } } catch (e: Exception) { - logger.error("Failed to rollover index [index=${managedIndexMetaData.index}]", e) - stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to "Failed to rollover index") - val errorMessage = e.message - if (errorMessage != null) mutableInfo.put("cause", errorMessage) - info = mutableInfo.toMap() + handleException(e) } } private fun getAliasOrUpdateInfo(): String? { - val alias = clusterService.state().metaData().index(managedIndexMetaData.index).getRolloverAlias() + val alias = clusterService.state().metaData().index(indexName).getRolloverAlias() if (alias == null) { + val message = getFailedNoValidAliasMessage(indexName) + logger.warn(message) stepStatus = StepStatus.FAILED - info = mapOf("message" to "There is no valid rollover_alias=$alias set on ${managedIndexMetaData.index}") + info = mapOf("message" to message) } return alias @@ -126,26 +159,25 @@ class AttemptRolloverStep( private suspend fun getIndexStatsOrUpdateInfo(): IndicesStatsResponse? { try { val statsRequest = IndicesStatsRequest() - .indices(managedIndexMetaData.index).clear().docs(true) + .indices(indexName).clear().docs(true) val statsResponse: IndicesStatsResponse = client.admin().indices().suspendUntil { stats(statsRequest, it) } if (statsResponse.status == RestStatus.OK) { return statsResponse } - logger.debug( - "Failed to get index stats for index: [${managedIndexMetaData.index}], status response: [${statsResponse.status}]" - ) - + val message = getFailedEvaluateMessage(indexName) + logger.warn("$message - ${statsResponse.status}") stepStatus = StepStatus.FAILED info = mapOf( - "message" to "Failed to evaluate conditions for rollover", - "shard_failures" to statsResponse.shardFailures.map { it.toString() } + "message" to message, + "shard_failures" to statsResponse.shardFailures.map { it.getUsefulCauseString() } ) } catch (e: Exception) { - logger.error("Failed to evaluate conditions for rollover [index=${managedIndexMetaData.index}]", e) + val message = getFailedEvaluateMessage(indexName) + logger.error(message, e) stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to "Failed to evaluate conditions for rollover") + val mutableInfo = mutableMapOf("message" to message) val errorMessage = e.message if (errorMessage != null) mutableInfo["cause"] = errorMessage info = mutableInfo.toMap() @@ -154,6 +186,16 @@ class AttemptRolloverStep( return null } + private fun handleException(e: Exception) { + val message = getFailedMessage(indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() + } + override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData { return currentMetaData.copy( stepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), stepStatus), @@ -162,4 +204,15 @@ class AttemptRolloverStep( info = info ) } + + companion object { + fun getFailedMessage(index: String) = "Failed to rollover index [index=$index]" + fun getFailedAliasUpdateMessage(index: String, newIndex: String) = + "New index created, but failed to update alias [index=$index, newIndex=$newIndex]" + fun getFailedNoValidAliasMessage(index: String) = "Missing rollover_alias index setting [index=$index]" + fun getFailedDuplicateRolloverMessage(index: String) = "Index has already been rolled over [index=$index]" + fun getFailedEvaluateMessage(index: String) = "Failed to evaluate conditions for rollover [index=$index]" + fun getAttemptingMessage(index: String) = "Attempting to rollover index [index=$index]" + fun getSuccessMessage(index: String) = "Successfully rolled over index [index=$index]" + } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/transition/AttemptTransitionStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/transition/AttemptTransitionStep.kt index 17da79078..728fc7f87 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/transition/AttemptTransitionStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/transition/AttemptTransitionStep.kt @@ -15,6 +15,7 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.step.transition +import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.getUsefulCauseString import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.suspendUntil import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.TransitionsActionConfig @@ -23,14 +24,16 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.evaluateConditions import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.hasStatsConditions import org.apache.logging.log4j.LogManager +import org.elasticsearch.ExceptionsHelper import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse import org.elasticsearch.client.Client import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.common.unit.ByteSizeValue import org.elasticsearch.rest.RestStatus -import java.lang.Exception +import org.elasticsearch.transport.RemoteTransportException import java.time.Instant +import kotlin.Exception /** * Attempt to transition to the next state @@ -53,21 +56,20 @@ class AttemptTransitionStep( override fun isIdempotent() = true - @Suppress("TooGenericExceptionCaught") - override suspend fun execute() { - val index = managedIndexMetaData.index + @Suppress("TooGenericExceptionCaught", "ReturnCount", "ComplexMethod") + override suspend fun execute(): AttemptTransitionStep { try { if (config.transitions.isEmpty()) { - logger.info("$index transitions are empty, completing policy") + logger.info("$indexName transitions are empty, completing policy") policyCompleted = true stepStatus = StepStatus.COMPLETED - return + return this } - val indexCreationDate = clusterService.state().metaData().index(index).creationDate + val indexCreationDate = clusterService.state().metaData().index(indexName).creationDate val indexCreationDateInstant = Instant.ofEpochMilli(indexCreationDate) if (indexCreationDate == -1L) { - logger.warn("$index had an indexCreationDate=-1L, cannot use for comparison") + logger.warn("$indexName had an indexCreationDate=-1L, cannot use for comparison") } val stepStartTime = getStepStartTime() var numDocs: Long? = null @@ -75,47 +77,54 @@ class AttemptTransitionStep( if (config.transitions.any { it.hasStatsConditions() }) { val statsRequest = IndicesStatsRequest() - .indices(index).clear().docs(true) + .indices(indexName).clear().docs(true) val statsResponse: IndicesStatsResponse = client.admin().indices().suspendUntil { stats(statsRequest, it) } if (statsResponse.status != RestStatus.OK) { - logger.debug( - "Failed to get index stats for index: [$index], status response: [${statsResponse.status}]" - ) - + val message = getFailedStatsMessage(indexName) + logger.warn("$message - ${statsResponse.status}") stepStatus = StepStatus.FAILED info = mapOf( - "message" to "Failed to evaluate conditions for transition", - "shard_failures" to statsResponse.shardFailures.map { it.toString() } + "message" to message, + "shard_failures" to statsResponse.shardFailures.map { it.getUsefulCauseString() } ) - return + return this } - - numDocs = statsResponse.primaries.docs?.count ?: 0 - indexSize = ByteSizeValue(statsResponse.primaries.docs?.totalSizeInBytes ?: 0) + numDocs = statsResponse.primaries.getDocs()?.count ?: 0 + indexSize = ByteSizeValue(statsResponse.primaries.getDocs()?.totalSizeInBytes ?: 0) } // Find the first transition that evaluates to true and get the state to transition to, otherwise return null if none are true stateName = config.transitions.find { it.evaluateConditions(indexCreationDateInstant, numDocs, indexSize, stepStartTime) }?.stateName val message: String + val stateName = stateName // shadowed on purpose to prevent var from changing if (stateName != null) { - logger.info("$index transition conditions evaluated to true [indexCreationDate=$indexCreationDate," + + logger.info("$indexName transition conditions evaluated to true [indexCreationDate=$indexCreationDate," + " numDocs=$numDocs, indexSize=${indexSize?.bytes},stepStartTime=${stepStartTime.toEpochMilli()}]") stepStatus = StepStatus.COMPLETED - message = "Transitioning to $stateName" + message = getSuccessMessage(indexName, stateName) } else { stepStatus = StepStatus.CONDITION_NOT_MET - message = "Attempting to transition" + message = getEvaluatingMessage(indexName) } info = mapOf("message" to message) + } catch (e: RemoteTransportException) { + handleException(ExceptionsHelper.unwrapCause(e) as Exception) } catch (e: Exception) { - logger.error("Failed to transition index [index=$index]", e) - stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to "Failed to transition index") - val errorMessage = e.message - if (errorMessage != null) mutableInfo["cause"] = errorMessage - info = mutableInfo.toMap() + handleException(e) } + + return this + } + + private fun handleException(e: Exception) { + val message = getFailedMessage(indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() } override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData { @@ -126,4 +135,11 @@ class AttemptTransitionStep( info = info ) } + + companion object { + fun getFailedMessage(index: String) = "Failed to transition index [index=$index]" + fun getFailedStatsMessage(index: String) = "Failed to get stats information for the index [index=$index]" + fun getEvaluatingMessage(index: String) = "Evaluating transition conditions [index=$index]" + fun getSuccessMessage(index: String, state: String) = "Transitioning to $state [index=$index]" + } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtils.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtils.kt index f203b035d..bca0a9cbc 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtils.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtils.kt @@ -46,6 +46,7 @@ import org.elasticsearch.action.update.UpdateRequest import org.elasticsearch.client.Client import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.common.unit.ByteSizeValue +import org.elasticsearch.common.unit.TimeValue import org.elasticsearch.common.xcontent.ToXContent import org.elasticsearch.common.xcontent.XContentFactory import org.elasticsearch.index.query.BoolQueryBuilder @@ -216,7 +217,7 @@ fun Transition.hasStatsConditions(): Boolean = this.conditions?.docCount != null @Suppress("ReturnCount") fun RolloverActionConfig.evaluateConditions( - indexCreationDate: Instant, + indexAgeTimeValue: TimeValue, numDocs: Long, indexSize: ByteSizeValue ): Boolean { @@ -232,11 +233,7 @@ fun RolloverActionConfig.evaluateConditions( } if (this.minAge != null) { - val indexCreationDateMilli = indexCreationDate.toEpochMilli() - if (indexCreationDateMilli != -1L) { - val elapsedTime = Instant.now().toEpochMilli() - indexCreationDateMilli - if (this.minAge.millis <= elapsedTime) return true - } + if (this.minAge.millis <= indexAgeTimeValue.millis) return true } if (this.minSize != null) { diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementRestTestCase.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementRestTestCase.kt index 2cc45ff76..70edb25d7 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementRestTestCase.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementRestTestCase.kt @@ -330,22 +330,28 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() { } @Suppress("UNCHECKED_CAST") - protected fun getSegmentCount(index: String): Int { - val statsResponse: Map = getStats(index) - - // Assert that shard count of stats response is 1 since the stats request being used is at the index level - // (meaning the segment count in the response is aggregated) but segment count for force merge - // (which this method is primarily being used for) is going to be validated per shard - val shardsInfo = statsResponse["_shards"] as Map - assertEquals("Shard count higher than expected", 1, shardsInfo["successful"]) - - val indicesStats = statsResponse["indices"] as Map>>> - return indicesStats[index]!!["primaries"]!!["segments"]!!["count"] as Int + protected fun validateSegmentCount(index: String, min: Int? = null, max: Int? = null): Boolean { + if (min == null && max == null) throw IllegalArgumentException("Must provide at least a min or max") + val statsResponse: Map = getShardSegmentStats(index) + + val indicesStats = statsResponse["indices"] as Map>>>>> + return indicesStats[index]!!["shards"]!!.values.all { list -> + list.filter { it["routing"]!!["primary"] == true }.all { + if (it["routing"]!!["state"] != "STARTED") { + false + } else { + val count = it["segments"]!!["count"] as Int + if (min != null && count < min) return false + if (max != null && count > max) return false + return true + } + } + } } /** Get stats for [index] */ - private fun getStats(index: String): Map { - val response = client().makeRequest("GET", "/$index/_stats") + private fun getShardSegmentStats(index: String): Map { + val response = client().makeRequest("GET", "/$index/_stats/segments?level=shards") assertEquals("Stats request failed", RestStatus.OK, response.restStatus()) diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/TestHelpers.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/TestHelpers.kt index 815ff22ad..7a01b27b5 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/TestHelpers.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/TestHelpers.kt @@ -20,6 +20,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ChangePo import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Conditions import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ErrorNotification import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Policy import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.State import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.StateFilter @@ -360,6 +361,11 @@ fun ManagedIndexConfig.toJsonString(): String { return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string() } +fun ManagedIndexMetaData.toJsonString(): String { + val builder = XContentFactory.jsonBuilder().startObject() + return this.toXContent(builder, ToXContent.EMPTY_PARAMS).endObject().string() +} + /** * Wrapper for [RestClient.performRequest] which was deprecated in ES 6.5 and is used in tests. This provides * a single place to suppress deprecation warnings. This will probably need further work when the API is removed entirely diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ActionRetryIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ActionRetryIT.kt index c974e95c6..fb571cf46 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ActionRetryIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ActionRetryIT.kt @@ -21,6 +21,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedi import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.PolicyRetryInfoMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StateMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.rollover.AttemptRolloverStep import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor import java.time.Instant import java.util.Locale @@ -42,7 +43,7 @@ class ActionRetryIT : IndexStateManagementRestTestCase() { val indexName = "${testIndexName}_index_1" val policyID = "${testIndexName}_testPolicyName_1" createPolicyJson(testPolicy, policyID) - val expectedInfoString = mapOf("message" to "There is no valid rollover_alias=null set on $indexName").toString() + val expectedInfoString = mapOf("message" to AttemptRolloverStep.getFailedNoValidAliasMessage(indexName)).toString() createIndex(indexName, policyID) @@ -134,7 +135,7 @@ class ActionRetryIT : IndexStateManagementRestTestCase() { // even if we ran couple times we should have backed off and only retried once. waitFor { - val expectedInfoString = mapOf("message" to "There is no valid rollover_alias=null set on $indexName").toString() + val expectedInfoString = mapOf("message" to AttemptRolloverStep.getFailedNoValidAliasMessage(indexName)).toString() assertPredicatesOnMetaData( listOf( indexName to listOf( diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ActionTimeoutIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ActionTimeoutIT.kt index 8f94b5741..24b10d35b 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ActionTimeoutIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ActionTimeoutIT.kt @@ -19,7 +19,10 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateMana import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ActionConfig import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.ActionMetaData +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.open.AttemptOpenStep +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.rollover.AttemptRolloverStep import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor +import org.hamcrest.collection.IsMapContaining import java.time.Instant import java.util.Locale @@ -54,13 +57,11 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() { // the second execution we move into rollover action, we won't hit the timeout as this is the execution that sets the startTime updateManagedIndexConfigStartTime(managedIndexConfig) - - val expectedInfoString = mapOf("message" to "Attempting to rollover").toString() waitFor { - assertPredicatesOnMetaData( - listOf(indexName to listOf(ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedInfoString == info.toString())), - getExplainMap(indexName), - strict = false + assertThat( + "Should be attempting to rollover", + getExplainManagedIndexMetaData(indexName).info, + IsMapContaining.hasEntry("message", AttemptRolloverStep.getAttemptingMessage(indexName) as Any?) ) } @@ -107,7 +108,7 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() { // the second execution we move into open action, we won't hit the timeout as this is the execution that sets the startTime updateManagedIndexConfigStartTime(managedIndexConfig) - val expectedOpenInfoString = mapOf("message" to "Successfully opened index").toString() + val expectedOpenInfoString = mapOf("message" to AttemptOpenStep.getSuccessMessage(indexName)).toString() waitFor { assertPredicatesOnMetaData( listOf(indexName to listOf(ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedOpenInfoString == info.toString())), @@ -122,13 +123,11 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() { // the third execution we move into rollover action, we should not hit the timeout yet because its the first execution of rollover // but there was a bug before where it would use the startTime from the previous actions metadata and immediately fail updateManagedIndexConfigStartTime(managedIndexConfig) - - val expectedRolloverInfoString = mapOf("message" to "Attempting to rollover").toString() waitFor { - assertPredicatesOnMetaData( - listOf(indexName to listOf(ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedRolloverInfoString == info.toString())), - getExplainMap(indexName), - strict = false + assertThat( + "Should be attempting to rollover", + getExplainManagedIndexMetaData(indexName).info, + IsMapContaining.hasEntry("message", AttemptRolloverStep.getAttemptingMessage(indexName) as Any?) ) } } diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ForceMergeActionIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ForceMergeActionIT.kt index e95f602c6..9623ea68c 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ForceMergeActionIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ForceMergeActionIT.kt @@ -20,6 +20,9 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Policy import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.State import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ForceMergeActionConfig import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomErrorNotification +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.forcemerge.AttemptCallForceMergeStep +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.forcemerge.AttemptSetReadOnlyStep +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.forcemerge.WaitForForceMergeStep import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor import org.elasticsearch.cluster.metadata.IndexMetaData import org.elasticsearch.common.settings.Settings @@ -55,7 +58,7 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() { // Add sample data to increase segment count, passing in a delay to ensure multiple segments get created insertSampleData(indexName, 3, 1000) - waitFor { assertTrue("Segment count for [$indexName] was less than expected", getSegmentCount(indexName) > 1) } + waitFor { assertTrue("Segment count for [$indexName] was less than expected", validateSegmentCount(indexName, min = 2)) } val managedIndexConfig = getExistingManagedIndexConfig(indexName) @@ -72,7 +75,7 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() { // Third execution: Force merge operation is kicked off updateManagedIndexConfigStartTime(managedIndexConfig) - Thread.sleep(3000) + // verify we set maxNumSegments in action properties when kicking off force merge waitFor { assertEquals( @@ -84,9 +87,8 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() { // Fourth execution: Waits for force merge to complete, which will happen in this execution since index is small updateManagedIndexConfigStartTime(managedIndexConfig) - Thread.sleep(3000) - waitFor { assertEquals("Segment count for [$indexName] after force merge is incorrect", 1, getSegmentCount(indexName)) } + waitFor { assertTrue("Segment count for [$indexName] after force merge is incorrect", validateSegmentCount(indexName, min = 1, max = 1)) } // verify we reset actionproperties at end of forcemerge waitFor { assertNull("maxNumSegments was not reset", getExplainManagedIndexMetaData(indexName).actionMetaData?.actionProperties) } // index should still be readonly after force merge finishes @@ -117,7 +119,7 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() { // Add sample data to increase segment count, passing in a delay to ensure multiple segments get created insertSampleData(indexName, 3, 1000) - waitFor { assertTrue("Segment count for [$indexName] was less than expected", getSegmentCount(indexName) > 1) } + waitFor { assertTrue("Segment count for [$indexName] was less than expected", validateSegmentCount(indexName, min = 2)) } // Set index to read-only updateIndexSettings(indexName, Settings.builder().put(IndexMetaData.SETTING_BLOCKS_WRITE, true)) @@ -133,17 +135,18 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() { // Second execution: Index was already read-only and should remain so for force_merge updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { assertEquals(AttemptSetReadOnlyStep.getSuccessMessage(indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) } waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) } // Third execution: Force merge operation is kicked off updateManagedIndexConfigStartTime(managedIndexConfig) - Thread.sleep(3000) + waitFor { assertEquals(AttemptCallForceMergeStep.getSuccessMessage(indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) } // Fourth execution: Waits for force merge to complete, which will happen in this execution since index is small updateManagedIndexConfigStartTime(managedIndexConfig) - Thread.sleep(3000) - waitFor { assertEquals("Segment count for [$indexName] after force merge is incorrect", 1, getSegmentCount(indexName)) } - waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) } + waitFor { assertEquals(WaitForForceMergeStep.getSuccessMessage(indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) } + assertTrue("Segment count for [$indexName] after force merge is incorrect", validateSegmentCount(indexName, min = 1, max = 1)) + assertEquals("true", getIndexBlocksWriteSetting(indexName)) } } diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/IndexStateManagementHistoryIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/IndexStateManagementHistoryIT.kt index ef4f74336..1131ea025 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/IndexStateManagementHistoryIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/IndexStateManagementHistoryIT.kt @@ -27,6 +27,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedi import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StateMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomErrorNotification import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.readonly.SetReadOnlyStep import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor import org.elasticsearch.action.search.SearchResponse import java.time.Instant @@ -91,7 +92,7 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { actionMetaData = ActionMetaData(ActionConfig.ActionType.READ_ONLY.toString(), actualHistory.actionMetaData!!.startTime, 0, false, 0, 0, null), stepMetaData = null, policyRetryInfo = PolicyRetryInfoMetaData(false, 0), - info = mapOf("message" to "Set index to read-only") + info = mapOf("message" to SetReadOnlyStep.getSuccessMessage(indexName)) ) assertEquals(expectedHistory, actualHistory) @@ -157,7 +158,7 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { actionMetaData = ActionMetaData(ActionConfig.ActionType.READ_ONLY.toString(), actualHistory.actionMetaData!!.startTime, 0, false, 0, 0, null), stepMetaData = null, policyRetryInfo = PolicyRetryInfoMetaData(false, 0), - info = mapOf("message" to "Set index to read-only") + info = mapOf("message" to SetReadOnlyStep.getSuccessMessage(indexName)) ) assertEquals(expectedHistory, actualHistory) @@ -223,7 +224,7 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { actionMetaData = ActionMetaData(ActionConfig.ActionType.READ_ONLY.toString(), actualHistory.actionMetaData!!.startTime, 0, false, 0, 0, null), stepMetaData = null, policyRetryInfo = PolicyRetryInfoMetaData(false, 0), - info = mapOf("message" to "Set index to read-only") + info = mapOf("message" to SetReadOnlyStep.getSuccessMessage(indexName)) ) assertEquals(expectedHistory, actualHistory) @@ -313,7 +314,7 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { actionMetaData = ActionMetaData(ActionConfig.ActionType.READ_ONLY.toString(), actualHistory1.actionMetaData!!.startTime, 0, false, 0, 0, null), stepMetaData = null, policyRetryInfo = PolicyRetryInfoMetaData(false, 0), - info = mapOf("message" to "Set index to read-only") + info = mapOf("message" to SetReadOnlyStep.getSuccessMessage(indexName)) ) assertEquals(expectedHistory1, actualHistory1) diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/RolloverActionIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/RolloverActionIT.kt index ab4c21f97..ea1338f8d 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/RolloverActionIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/RolloverActionIT.kt @@ -16,17 +16,16 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.action import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementRestTestCase -import com.amazon.opendistroforelasticsearch.indexstatemanagement.makeRequest import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Policy import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.State import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.RolloverActionConfig import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomErrorNotification +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.rollover.AttemptRolloverStep import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor -import org.apache.http.entity.ContentType -import org.apache.http.entity.StringEntity import org.elasticsearch.common.unit.ByteSizeUnit import org.elasticsearch.common.unit.ByteSizeValue -import org.elasticsearch.rest.RestRequest +import org.elasticsearch.common.unit.TimeValue +import org.hamcrest.core.Is.isA import org.junit.Assert import java.time.Instant import java.time.temporal.ChronoUnit @@ -36,6 +35,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { private val testIndexName = javaClass.simpleName.toLowerCase(Locale.ROOT) + @Suppress("UNCHECKED_CAST") fun `test rollover no condition`() { val aliasName = "${testIndexName}_alias" val indexNameBase = "${testIndexName}_index" @@ -65,10 +65,15 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { // Need to speed up to second execution where it will trigger the first execution of the action updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals("Index did not rollover.", mapOf("message" to "Rolled over index"), getExplainManagedIndexMetaData(firstIndex).info) } + waitFor { + val info = getExplainManagedIndexMetaData(firstIndex).info as Map + assertEquals("Index did not rollover.", AttemptRolloverStep.getSuccessMessage(firstIndex), info["message"]) + assertNull("Should not have conditions if none specified", info["conditions"]) + } Assert.assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002")) } + @Suppress("UNCHECKED_CAST") fun `test rollover multi condition byte size`() { val aliasName = "${testIndexName}_byte_alias" val indexNameBase = "${testIndexName}_index_byte" @@ -98,36 +103,48 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { // Need to speed up to second execution where it will trigger the first execution of the action updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals("Index rollover before it met the condition.", mapOf("message" to "Attempting to rollover"), getExplainManagedIndexMetaData(firstIndex).info) } - - client().makeRequest( - RestRequest.Method.PUT.toString(), - "$firstIndex/_doc/1111", - StringEntity("{ \"testkey\": \"some valueaaaaaaa\" }", ContentType.APPLICATION_JSON) - ) - client().makeRequest( - RestRequest.Method.PUT.toString(), - "$firstIndex/_doc/2222", - StringEntity("{ \"testkey1\": \"some value1\" }", ContentType.APPLICATION_JSON) - ) - client().makeRequest( - RestRequest.Method.PUT.toString(), - "$firstIndex/_doc/3333", - StringEntity("{ \"testkey2\": \"some value2\" }", ContentType.APPLICATION_JSON) - ) + waitFor { + val info = getExplainManagedIndexMetaData(firstIndex).info as Map + assertEquals("Index rollover before it met the condition.", + AttemptRolloverStep.getAttemptingMessage(firstIndex), info["message"]) + val conditions = info["conditions"] as Map + assertEquals("Did not have exclusively min size and min doc count conditions", + setOf(RolloverActionConfig.MIN_SIZE_FIELD, RolloverActionConfig.MIN_DOC_COUNT_FIELD), conditions.keys) + val minSize = conditions[RolloverActionConfig.MIN_SIZE_FIELD] as Map + val minDocCount = conditions[RolloverActionConfig.MIN_DOC_COUNT_FIELD] as Map + assertEquals("Did not have min size condition", "10b", minSize["condition"]) + assertThat("Did not have min size current", minSize["current"], isA(String::class.java)) + assertEquals("Did not have min doc count condition", 1000000, minDocCount["condition"]) + assertEquals("Did not have min doc count current", 0, minDocCount["current"]) + } + + insertSampleData(index = firstIndex, docCount = 5, delay = 0) // Need to speed up to second execution where it will trigger the first execution of the action updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals("Index did not rollover.", mapOf("message" to "Rolled over index"), getExplainManagedIndexMetaData(firstIndex).info) } + waitFor { + val info = getExplainManagedIndexMetaData(firstIndex).info as Map + assertEquals("Index did not rollover", AttemptRolloverStep.getSuccessMessage(firstIndex), info["message"]) + val conditions = info["conditions"] as Map + assertEquals("Did not have exclusively min size and min doc count conditions", + setOf(RolloverActionConfig.MIN_SIZE_FIELD, RolloverActionConfig.MIN_DOC_COUNT_FIELD), conditions.keys) + val minSize = conditions[RolloverActionConfig.MIN_SIZE_FIELD] as Map + val minDocCount = conditions[RolloverActionConfig.MIN_DOC_COUNT_FIELD] as Map + assertEquals("Did not have min size condition", "10b", minSize["condition"]) + assertThat("Did not have min size current", minSize["current"], isA(String::class.java)) + assertEquals("Did not have min doc count condition", 1000000, minDocCount["condition"]) + assertEquals("Did not have min doc count current", 5, minDocCount["current"]) + } Assert.assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002")) } + @Suppress("UNCHECKED_CAST") fun `test rollover multi condition doc size`() { val aliasName = "${testIndexName}_doc_alias" val indexNameBase = "${testIndexName}_index_doc" val firstIndex = "$indexNameBase-1" val policyID = "${testIndexName}_testPolicyName_doc_1" - val actionConfig = RolloverActionConfig(ByteSizeValue(10, ByteSizeUnit.TB), 3, null, 0) + val actionConfig = RolloverActionConfig(null, 3, TimeValue.timeValueHours(48), 0) val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) val policy = Policy( id = policyID, @@ -151,27 +168,38 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { // Need to speed up to second execution where it will trigger the first execution of the action updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals("Index rollover before it met the condition.", mapOf("message" to "Attempting to rollover"), getExplainManagedIndexMetaData(firstIndex).info) } - - client().makeRequest( - RestRequest.Method.PUT.toString(), - "$firstIndex/_doc/1111", - StringEntity("{ \"testkey\": \"some value\" }", ContentType.APPLICATION_JSON) - ) - client().makeRequest( - RestRequest.Method.PUT.toString(), - "$firstIndex/_doc/2222", - StringEntity("{ \"testkey1\": \"some value1\" }", ContentType.APPLICATION_JSON) - ) - client().makeRequest( - RestRequest.Method.PUT.toString(), - "$firstIndex/_doc/3333", - StringEntity("{ \"testkey2\": \"some value2\" }", ContentType.APPLICATION_JSON) - ) + waitFor { + val info = getExplainManagedIndexMetaData(firstIndex).info as Map + assertEquals("Index rollover before it met the condition.", + AttemptRolloverStep.getAttemptingMessage(firstIndex), info["message"]) + val conditions = info["conditions"] as Map + assertEquals("Did not have exclusively min age and min doc count conditions", + setOf(RolloverActionConfig.MIN_INDEX_AGE_FIELD, RolloverActionConfig.MIN_DOC_COUNT_FIELD), conditions.keys) + val minAge = conditions[RolloverActionConfig.MIN_INDEX_AGE_FIELD] as Map + val minDocCount = conditions[RolloverActionConfig.MIN_DOC_COUNT_FIELD] as Map + assertEquals("Did not have min age condition", "2d", minAge["condition"]) + assertThat("Did not have min age current", minAge["current"], isA(String::class.java)) + assertEquals("Did not have min doc count condition", 3, minDocCount["condition"]) + assertEquals("Did not have min doc count current", 0, minDocCount["current"]) + } + + insertSampleData(index = firstIndex, docCount = 5, delay = 0) // Need to speed up to second execution where it will trigger the first execution of the action updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals("Index did not rollover.", mapOf("message" to "Rolled over index"), getExplainManagedIndexMetaData(firstIndex).info) } + waitFor { + val info = getExplainManagedIndexMetaData(firstIndex).info as Map + assertEquals("Index did not rollover", AttemptRolloverStep.getSuccessMessage(firstIndex), info["message"]) + val conditions = info["conditions"] as Map + assertEquals("Did not have exclusively min age and min doc count conditions", + setOf(RolloverActionConfig.MIN_INDEX_AGE_FIELD, RolloverActionConfig.MIN_DOC_COUNT_FIELD), conditions.keys) + val minAge = conditions[RolloverActionConfig.MIN_INDEX_AGE_FIELD] as Map + val minDocCount = conditions[RolloverActionConfig.MIN_DOC_COUNT_FIELD] as Map + assertEquals("Did not have min age condition", "2d", minAge["condition"]) + assertThat("Did not have min age current", minAge["current"], isA(String::class.java)) + assertEquals("Did not have min doc count condition", 3, minDocCount["condition"]) + assertEquals("Did not have min doc count current", 5, minDocCount["current"]) + } Assert.assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002")) } } diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/TransitionActionIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/TransitionActionIT.kt new file mode 100644 index 000000000..e6a35d5b6 --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/TransitionActionIT.kt @@ -0,0 +1,63 @@ +package com.amazon.opendistroforelasticsearch.indexstatemanagement.action + +import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementRestTestCase +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Conditions +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Policy +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.State +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Transition +import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomErrorNotification +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.transition.AttemptTransitionStep +import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor +import java.time.Instant +import java.time.temporal.ChronoUnit +import java.util.Locale + +class TransitionActionIT : IndexStateManagementRestTestCase() { + + private val testIndexName = javaClass.simpleName.toLowerCase(Locale.ROOT) + + fun `test doc count condition`() { + val indexName = "${testIndexName}_index_1" + val policyID = "${testIndexName}_testPolicyName_1" + val secondStateName = "second" + val states = listOf( + State("first", listOf(), listOf(Transition(secondStateName, Conditions(docCount = 5L)))), + State(secondStateName, listOf(), listOf()) + ) + + val policy = Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states + ) + + createPolicy(policy, policyID) + createIndex(indexName, policyID) + + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + + // Initializing the policy/metadata + updateManagedIndexConfigStartTime(managedIndexConfig) + + waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } + + // Evaluating transition conditions for first time + updateManagedIndexConfigStartTime(managedIndexConfig) + + // Should not have evaluated to true + waitFor { assertEquals(AttemptTransitionStep.getEvaluatingMessage(indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) } + + // Add 6 documents (>5) + insertSampleData(indexName, 6) + + // Evaluating transition conditions for second time + updateManagedIndexConfigStartTime(managedIndexConfig) + + // Should have evaluated to true + waitFor { assertEquals(AttemptTransitionStep.getSuccessMessage(indexName, secondStateName), getExplainManagedIndexMetaData(indexName).info?.get("message")) } + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt index bd25c24dc..349245cdb 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt @@ -28,6 +28,8 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.F import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.RolloverActionConfig import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomErrorNotification import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.forcemerge.WaitForForceMergeStep +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.rollover.AttemptRolloverStep import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.xcontent.XContentType @@ -195,16 +197,16 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() { updateClusterSetting(ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED.key, "true") // Confirm job was re-enabled - val enableddManagedIndexConfig: ManagedIndexConfig = waitFor { + val enabledManagedIndexConfig: ManagedIndexConfig = waitFor { val config = getExistingManagedIndexConfig(indexName) assertEquals("ManagedIndexConfig was not re-enabled", true, config.enabled) config } // Speed up to next execution where the job should be rescheduled and the index rolled over - updateManagedIndexConfigStartTime(enableddManagedIndexConfig) + updateManagedIndexConfigStartTime(enabledManagedIndexConfig) - waitFor { assertEquals("Rolled over index", getExplainManagedIndexMetaData(indexName).info?.get("message")) } + waitFor { assertEquals(AttemptRolloverStep.getSuccessMessage(indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) } } fun `test not disabling ism on unsafe step`() { @@ -239,7 +241,7 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() { // Add sample data to increase segment count, passing in a delay to ensure multiple segments get created insertSampleData(indexName, 3, 1000) - waitFor { assertTrue("Segment count for [$indexName] was less than expected", getSegmentCount(indexName) > 1) } + waitFor { assertTrue("Segment count for [$indexName] was less than expected", validateSegmentCount(indexName, min = 2)) } val managedIndexConfig = getExistingManagedIndexConfig(indexName) @@ -258,7 +260,6 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() { // Third execution: Force merge operation is kicked off updateManagedIndexConfigStartTime(managedIndexConfig) - Thread.sleep(3000) // Verify maxNumSegments is set in action properties when kicking off force merge waitFor { @@ -274,19 +275,23 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() { // Fourth execution: WaitForForceMergeStep is not safe to disable on, so the job should not disable yet updateManagedIndexConfigStartTime(managedIndexConfig) - Thread.sleep(3000) + + // Confirm we successfully executed the WaitForForceMergeStep + waitFor { assertEquals(WaitForForceMergeStep.getSuccessMessage(indexName), + getExplainManagedIndexMetaData(indexName).info?.get("message")) } // Confirm job was not disabled assertEquals("ManagedIndexConfig was disabled early", true, getExistingManagedIndexConfig(indexName).enabled) // Validate segments were merged - waitFor { assertEquals("Segment count for [$indexName] after force merge is incorrect", 1, getSegmentCount(indexName)) } + waitFor { assertTrue("Segment count for [$indexName] after force merge is incorrect", validateSegmentCount(indexName, min = 1, max = 1)) } // Fifth execution: Attempt transition, which is safe to disable on, so job should be disabled updateManagedIndexConfigStartTime(managedIndexConfig) // Explain API info should still be that of the last executed Step - waitFor { assertEquals("Force merge completed", getExplainManagedIndexMetaData(indexName).info?.get("message")) } + waitFor { assertEquals(WaitForForceMergeStep.getSuccessMessage(indexName), + getExplainManagedIndexMetaData(indexName).info?.get("message")) } // Confirm job was disabled val disabledManagedIndexConfig: ManagedIndexConfig = waitFor { @@ -299,7 +304,7 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() { updateManagedIndexConfigStartTime(disabledManagedIndexConfig) waitFor { - val expectedInfoString = mapOf("message" to "Force merge completed").toString() + val expectedInfoString = mapOf("message" to WaitForForceMergeStep.getSuccessMessage(indexName)).toString() assertPredicatesOnMetaData( listOf( indexName to listOf( diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/XContentTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/XContentTests.kt index d26b450b7..e47dcb400 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/XContentTests.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/XContentTests.kt @@ -154,6 +154,27 @@ class XContentTests : ESTestCase() { assertEquals("Round tripping ManagedIndexConfig doesn't work with id and version", configThree, parsedConfigThree) } + fun `test managed index metadata parsing`() { + val metadata = ManagedIndexMetaData( + index = randomAlphaOfLength(10), + indexUuid = randomAlphaOfLength(10), + policyID = randomAlphaOfLength(10), + policySeqNo = randomNonNegativeLong(), + policyPrimaryTerm = randomNonNegativeLong(), + policyCompleted = null, + rolledOver = null, + transitionTo = randomAlphaOfLength(10), + stateMetaData = null, + actionMetaData = null, + stepMetaData = null, + policyRetryInfo = null, + info = null + ) + val metadataString = metadata.toJsonString() + val parsedMetaData = ManagedIndexMetaData.parse(parser(metadataString)) + assertEquals("Round tripping ManagedIndexMetaData doesn't work", metadata, parsedMetaData) + } + fun `test change policy parsing`() { val changePolicy = randomChangePolicy() diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt index f4aa5af61..afd283d1a 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt @@ -34,6 +34,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomReplicaC import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomState import com.amazon.opendistroforelasticsearch.indexstatemanagement.resthandler.RestChangePolicyAction.Companion.INDEX_NOT_MANAGED import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.rollover.AttemptRolloverStep import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.FAILED_INDICES import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.FAILURES import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.UPDATED_INDICES @@ -528,7 +529,8 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { // verify we are in rollover and have not completed it yet waitFor { assertEquals(ActionConfig.ActionType.ROLLOVER.type, getExplainManagedIndexMetaData(indexName).actionMetaData?.name) - assertEquals("Attempting to rollover", getExplainManagedIndexMetaData(indexName).info?.get("message")) + assertEquals(AttemptRolloverStep.getAttemptingMessage(indexName), + getExplainManagedIndexMetaData(indexName).info?.get("message")) } val newStateWithReadOnlyAction = randomState(name = stateWithReadOnlyAction.name, actions = listOf(actionConfig.copy(minDocs = 5))) @@ -561,7 +563,8 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { // which should now actually rollover because 5 docs is less than 10 docs updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals("Rolled over index", getExplainManagedIndexMetaData(indexName).info?.get("message")) } + waitFor { assertEquals(AttemptRolloverStep.getSuccessMessage(indexName), + getExplainManagedIndexMetaData(indexName).info?.get("message")) } } fun `test changing failed init policy`() { diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestRetryFailedManagedIndexActionIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestRetryFailedManagedIndexActionIT.kt index c5af7c889..94a6d0cd3 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestRetryFailedManagedIndexActionIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestRetryFailedManagedIndexActionIT.kt @@ -254,7 +254,7 @@ class RestRetryFailedManagedIndexActionIT : IndexStateManagementRestTestCase() { ) } - // speed up to execute first action, readonly + // speed up to execute set read only force merge step updateManagedIndexConfigStartTime(managedIndexConfig) waitFor { @@ -271,7 +271,7 @@ class RestRetryFailedManagedIndexActionIT : IndexStateManagementRestTestCase() { // close the index to cause next execution to fail closeIndex(indexName) - // speed up to execute first action and fail, call force merge + // speed up to execute attempt call force merge step updateManagedIndexConfigStartTime(managedIndexConfig) // verify failed and save the startTime diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/runner/ManagedIndexRunnerIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/runner/ManagedIndexRunnerIT.kt index 4c9ce902b..9aa106d9f 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/runner/ManagedIndexRunnerIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/runner/ManagedIndexRunnerIT.kt @@ -29,6 +29,9 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomReadWrit import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomState import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomTransition import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.readonly.SetReadOnlyStep +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.readwrite.SetReadWriteStep +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.transition.AttemptTransitionStep import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor import com.amazon.opendistroforelasticsearch.jobscheduler.spi.schedule.IntervalSchedule import java.time.Instant @@ -130,19 +133,19 @@ class ManagedIndexRunnerIT : IndexStateManagementRestTestCase() { // speed up to first execution that should set index to read only updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals("Set index to read-only", getExplainManagedIndexMetaData(indexName).info?.get("message")) } + waitFor { assertEquals(SetReadOnlyStep.getSuccessMessage(indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) } // speed up to second execution that should transition to second_state updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals("Transitioning to second_state", getExplainManagedIndexMetaData(indexName).info?.get("message")) } + waitFor { assertEquals(AttemptTransitionStep.getSuccessMessage(indexName, secondState.name), getExplainManagedIndexMetaData(indexName).info?.get("message")) } // speed up to third execution that should set index back to read write updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals("Set index to read-write", getExplainManagedIndexMetaData(indexName).info?.get("message")) } + waitFor { assertEquals(SetReadWriteStep.getSuccessMessage(indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) } // speed up to fourth execution that should transition to first_state updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals("Transitioning to first_state", getExplainManagedIndexMetaData(indexName).info?.get("message")) } + waitFor { assertEquals(AttemptTransitionStep.getSuccessMessage(indexName, firstState.name), getExplainManagedIndexMetaData(indexName).info?.get("message")) } // remove read_only from the allowlist val allowedActions = ActionConfig.ActionType.values().toList() diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptCloseStepTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptCloseStepTests.kt index 2e49c9cfb..1d94d83d3 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptCloseStepTests.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptCloseStepTests.kt @@ -32,6 +32,7 @@ import org.elasticsearch.client.IndicesAdminClient import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.snapshots.SnapshotInProgressException import org.elasticsearch.test.ESTestCase +import org.elasticsearch.transport.RemoteTransportException import kotlin.IllegalArgumentException class AttemptCloseStepTests : ESTestCase() { @@ -76,7 +77,6 @@ class AttemptCloseStepTests : ESTestCase() { val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData) attemptCloseStep.execute() val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) - logger.info(updatedManagedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) } } @@ -95,6 +95,35 @@ class AttemptCloseStepTests : ESTestCase() { } } + fun `test close step remote transport snapshot in progress exception`() { + val exception = RemoteTransportException("rte", SnapshotInProgressException("nested")) + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val closeActionConfig = CloseActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData) + attemptCloseStep.execute() + val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not CONDITION_NOT_MET", Step.StepStatus.CONDITION_NOT_MET, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test close step remote transport exception`() { + val exception = RemoteTransportException("rte", IllegalArgumentException("nested")) + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val closeActionConfig = CloseActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData) + attemptCloseStep.execute() + val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals("Did not get cause from nested exception", "nested", updatedManagedIndexMetaData.info!!["cause"]) + } + } + private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient } private fun getAdminClient(indicesAdminClient: IndicesAdminClient): AdminClient = mock { on { indices() } doReturn indicesAdminClient } private fun getIndicesAdminClient(closeIndexResponse: CloseIndexResponse?, exception: Exception?): IndicesAdminClient { diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptOpenStepTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptOpenStepTests.kt new file mode 100644 index 000000000..05b105886 --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptOpenStepTests.kt @@ -0,0 +1,95 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexstatemanagement.step + +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.OpenActionConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.open.AttemptOpenStep +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.doAnswer +import com.nhaarman.mockitokotlin2.doReturn +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import kotlinx.coroutines.runBlocking +import org.elasticsearch.action.ActionListener +import org.elasticsearch.action.admin.indices.open.OpenIndexResponse +import org.elasticsearch.client.AdminClient +import org.elasticsearch.client.Client +import org.elasticsearch.client.IndicesAdminClient +import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.test.ESTestCase +import org.elasticsearch.transport.RemoteTransportException + +class AttemptOpenStepTests : ESTestCase() { + + private val clusterService: ClusterService = mock() + + fun `test open step sets step status to failed when not acknowledged`() { + val openIndexResponse = OpenIndexResponse(false, false) + val client = getClient(getAdminClient(getIndicesAdminClient(openIndexResponse, null))) + + runBlocking { + val openActionConfig = OpenActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val attemptOpenStep = AttemptOpenStep(clusterService, client, openActionConfig, managedIndexMetaData) + attemptOpenStep.execute() + val updatedManagedIndexMetaData = attemptOpenStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test open step sets step status to failed when error thrown`() { + val exception = IllegalArgumentException("example") + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val openActionConfig = OpenActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val attemptOpenStep = AttemptOpenStep(clusterService, client, openActionConfig, managedIndexMetaData) + attemptOpenStep.execute() + val updatedManagedIndexMetaData = attemptOpenStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test open step remote transport exception`() { + val exception = RemoteTransportException("rte", IllegalArgumentException("nested")) + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val openActionConfig = OpenActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val attemptOpenStep = AttemptOpenStep(clusterService, client, openActionConfig, managedIndexMetaData) + attemptOpenStep.execute() + val updatedManagedIndexMetaData = attemptOpenStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals("Did not get cause from nested exception", "nested", updatedManagedIndexMetaData.info!!["cause"]) + } + } + + private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient } + private fun getAdminClient(indicesAdminClient: IndicesAdminClient): AdminClient = mock { on { indices() } doReturn indicesAdminClient } + private fun getIndicesAdminClient(openIndexResponse: OpenIndexResponse?, exception: Exception?): IndicesAdminClient { + assertTrue("Must provide one and only one response or exception", (openIndexResponse != null).xor(exception != null)) + return mock { + doAnswer { invocationOnMock -> + val listener = invocationOnMock.getArgument>(1) + if (openIndexResponse != null) listener.onResponse(openIndexResponse) + else listener.onFailure(exception) + }.whenever(this.mock).open(any(), any()) + } + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptSetReplicaCountStepTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptSetReplicaCountStepTests.kt new file mode 100644 index 000000000..87764bfbd --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptSetReplicaCountStepTests.kt @@ -0,0 +1,95 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexstatemanagement.step + +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ReplicaCountActionConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.replicacount.AttemptSetReplicaCountStep +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.doAnswer +import com.nhaarman.mockitokotlin2.doReturn +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import kotlinx.coroutines.runBlocking +import org.elasticsearch.action.ActionListener +import org.elasticsearch.action.support.master.AcknowledgedResponse +import org.elasticsearch.client.AdminClient +import org.elasticsearch.client.Client +import org.elasticsearch.client.IndicesAdminClient +import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.test.ESTestCase +import org.elasticsearch.transport.RemoteTransportException + +class AttemptSetReplicaCountStepTests : ESTestCase() { + + private val clusterService: ClusterService = mock() + + fun `test replica step sets step status to failed when not acknowledged`() { + val replicaCountResponse = AcknowledgedResponse(false) + val client = getClient(getAdminClient(getIndicesAdminClient(replicaCountResponse, null))) + + runBlocking { + val replicaCountActionConfig = ReplicaCountActionConfig(2, 0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val replicaCountStep = AttemptSetReplicaCountStep(clusterService, client, replicaCountActionConfig, managedIndexMetaData) + replicaCountStep.execute() + val updatedManagedIndexMetaData = replicaCountStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test replica step sets step status to failed when error thrown`() { + val exception = IllegalArgumentException("example") + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val replicaCountActionConfig = ReplicaCountActionConfig(2, 0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val replicaCountStep = AttemptSetReplicaCountStep(clusterService, client, replicaCountActionConfig, managedIndexMetaData) + replicaCountStep.execute() + val updatedManagedIndexMetaData = replicaCountStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test replica step sets step status to failed when remote transport error thrown`() { + val exception = RemoteTransportException("rte", IllegalArgumentException("nested")) + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val replicaCountActionConfig = ReplicaCountActionConfig(2, 0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val replicaCountStep = AttemptSetReplicaCountStep(clusterService, client, replicaCountActionConfig, managedIndexMetaData) + replicaCountStep.execute() + val updatedManagedIndexMetaData = replicaCountStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals("Did not get cause from nested exception", "nested", updatedManagedIndexMetaData.info!!["cause"]) + } + } + + private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient } + private fun getAdminClient(indicesAdminClient: IndicesAdminClient): AdminClient = mock { on { indices() } doReturn indicesAdminClient } + private fun getIndicesAdminClient(replicaResponse: AcknowledgedResponse?, exception: Exception?): IndicesAdminClient { + assertTrue("Must provide one and only one response or exception", (replicaResponse != null).xor(exception != null)) + return mock { + doAnswer { invocationOnMock -> + val listener = invocationOnMock.getArgument>(1) + if (replicaResponse != null) listener.onResponse(replicaResponse) + else listener.onFailure(exception) + }.whenever(this.mock).updateSettings(any(), any()) + } + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptTransitionStepTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptTransitionStepTests.kt new file mode 100644 index 000000000..28b1d7eb5 --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptTransitionStepTests.kt @@ -0,0 +1,118 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexstatemanagement.step + +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Conditions +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Transition +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.TransitionsActionConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.transition.AttemptTransitionStep +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.doAnswer +import com.nhaarman.mockitokotlin2.doReturn +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import kotlinx.coroutines.runBlocking +import org.elasticsearch.action.ActionListener +import org.elasticsearch.action.admin.indices.stats.CommonStats +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse +import org.elasticsearch.client.AdminClient +import org.elasticsearch.client.Client +import org.elasticsearch.client.IndicesAdminClient +import org.elasticsearch.cluster.ClusterState +import org.elasticsearch.cluster.metadata.IndexMetaData +import org.elasticsearch.cluster.metadata.MetaData +import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.index.shard.DocsStats +import org.elasticsearch.rest.RestStatus +import org.elasticsearch.test.ESTestCase +import org.elasticsearch.transport.RemoteTransportException + +class AttemptTransitionStepTests : ESTestCase() { + + private val indexMetadata: IndexMetaData = mock() + private val metadata: MetaData = mock { on { index(any()) } doReturn indexMetadata } + private val clusterState: ClusterState = mock { on { metaData() } doReturn metadata } + private val clusterService: ClusterService = mock { on { state() } doReturn clusterState } + + private val docsStats: DocsStats = mock() + private val primaries: CommonStats = mock { on { getDocs() } doReturn docsStats } + private val statsResponse: IndicesStatsResponse = mock { on { primaries } doReturn primaries } + + fun `test stats response not OK`() { + whenever(indexMetadata.creationDate).doReturn(5L) + whenever(statsResponse.status).doReturn(RestStatus.INTERNAL_SERVER_ERROR) + whenever(statsResponse.shardFailures).doReturn(IndicesStatsResponse.EMPTY) + whenever(docsStats.count).doReturn(6L) + whenever(docsStats.totalSizeInBytes).doReturn(2) + val client = getClient(getAdminClient(getIndicesAdminClient(statsResponse, null))) + + runBlocking { + val config = TransitionsActionConfig(listOf(Transition("some_state", Conditions(docCount = 5L)))) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val step = AttemptTransitionStep(clusterService, client, config, managedIndexMetaData) + step.execute() + val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals("Did not get correct failed message", AttemptTransitionStep.getFailedStatsMessage("test"), updatedManagedIndexMetaData.info!!["message"]) + } + } + + fun `test transitions fails on exception`() { + whenever(indexMetadata.creationDate).doReturn(5L) + val exception = IllegalArgumentException("example") + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val config = TransitionsActionConfig(listOf(Transition("some_state", Conditions(docCount = 5L)))) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val step = AttemptTransitionStep(clusterService, client, config, managedIndexMetaData) + step.execute() + val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals("Did not get cause from nested exception", "example", updatedManagedIndexMetaData.info!!["cause"]) + } + } + + fun `test transitions remote transport exception`() { + whenever(indexMetadata.creationDate).doReturn(5L) + val exception = RemoteTransportException("rte", IllegalArgumentException("nested")) + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val config = TransitionsActionConfig(listOf(Transition("some_state", Conditions(docCount = 5L)))) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val step = AttemptTransitionStep(clusterService, client, config, managedIndexMetaData) + step.execute() + val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals("Did not get cause from nested exception", "nested", updatedManagedIndexMetaData.info!!["cause"]) + } + } + + private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient } + private fun getAdminClient(indicesAdminClient: IndicesAdminClient): AdminClient = mock { on { indices() } doReturn indicesAdminClient } + private fun getIndicesAdminClient(statsResponse: IndicesStatsResponse?, exception: Exception?): IndicesAdminClient { + assertTrue("Must provide one and only one response or exception", (statsResponse != null).xor(exception != null)) + return mock { + doAnswer { invocationOnMock -> + val listener = invocationOnMock.getArgument>(1) + if (statsResponse != null) listener.onResponse(statsResponse) + else listener.onFailure(exception) + }.whenever(this.mock).stats(any(), any()) + } + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/SetReadOnlyStepTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/SetReadOnlyStepTests.kt new file mode 100644 index 000000000..1856c6317 --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/SetReadOnlyStepTests.kt @@ -0,0 +1,95 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexstatemanagement.step + +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ReadOnlyActionConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.readonly.SetReadOnlyStep +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.doAnswer +import com.nhaarman.mockitokotlin2.doReturn +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import kotlinx.coroutines.runBlocking +import org.elasticsearch.action.ActionListener +import org.elasticsearch.action.support.master.AcknowledgedResponse +import org.elasticsearch.client.AdminClient +import org.elasticsearch.client.Client +import org.elasticsearch.client.IndicesAdminClient +import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.test.ESTestCase +import org.elasticsearch.transport.RemoteTransportException + +class SetReadOnlyStepTests : ESTestCase() { + + private val clusterService: ClusterService = mock() + + fun `test read only step sets step status to failed when not acknowledged`() { + val setReadOnlyResponse = AcknowledgedResponse(false) + val client = getClient(getAdminClient(getIndicesAdminClient(setReadOnlyResponse, null))) + + runBlocking { + val readOnlyActionConfig = ReadOnlyActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val setReadOnlyStep = SetReadOnlyStep(clusterService, client, readOnlyActionConfig, managedIndexMetaData) + setReadOnlyStep.execute() + val updatedManagedIndexMetaData = setReadOnlyStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test read only step sets step status to failed when error thrown`() { + val exception = IllegalArgumentException("example") + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val readOnlyActionConfig = ReadOnlyActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val setReadOnlyStep = SetReadOnlyStep(clusterService, client, readOnlyActionConfig, managedIndexMetaData) + setReadOnlyStep.execute() + val updatedManagedIndexMetaData = setReadOnlyStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test read only step sets step status to failed when remote transport error thrown`() { + val exception = RemoteTransportException("rte", IllegalArgumentException("nested")) + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val readOnlyActionConfig = ReadOnlyActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val setReadOnlyStep = SetReadOnlyStep(clusterService, client, readOnlyActionConfig, managedIndexMetaData) + setReadOnlyStep.execute() + val updatedManagedIndexMetaData = setReadOnlyStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals("Did not get cause from nested exception", "nested", updatedManagedIndexMetaData.info!!["cause"]) + } + } + + private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient } + private fun getAdminClient(indicesAdminClient: IndicesAdminClient): AdminClient = mock { on { indices() } doReturn indicesAdminClient } + private fun getIndicesAdminClient(setReadOnlyResponse: AcknowledgedResponse?, exception: Exception?): IndicesAdminClient { + assertTrue("Must provide one and only one response or exception", (setReadOnlyResponse != null).xor(exception != null)) + return mock { + doAnswer { invocationOnMock -> + val listener = invocationOnMock.getArgument>(1) + if (setReadOnlyResponse != null) listener.onResponse(setReadOnlyResponse) + else listener.onFailure(exception) + }.whenever(this.mock).updateSettings(any(), any()) + } + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/SetReadWriteStepTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/SetReadWriteStepTests.kt new file mode 100644 index 000000000..f917494b9 --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/SetReadWriteStepTests.kt @@ -0,0 +1,95 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexstatemanagement.step + +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ReadWriteActionConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.readwrite.SetReadWriteStep +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.doAnswer +import com.nhaarman.mockitokotlin2.doReturn +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import kotlinx.coroutines.runBlocking +import org.elasticsearch.action.ActionListener +import org.elasticsearch.action.support.master.AcknowledgedResponse +import org.elasticsearch.client.AdminClient +import org.elasticsearch.client.Client +import org.elasticsearch.client.IndicesAdminClient +import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.test.ESTestCase +import org.elasticsearch.transport.RemoteTransportException + +class SetReadWriteStepTests : ESTestCase() { + + private val clusterService: ClusterService = mock() + + fun `test read write step sets step status to failed when not acknowledged`() { + val setReadWriteResponse = AcknowledgedResponse(false) + val client = getClient(getAdminClient(getIndicesAdminClient(setReadWriteResponse, null))) + + runBlocking { + val readWriteActionConfig = ReadWriteActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val setReadWriteStep = SetReadWriteStep(clusterService, client, readWriteActionConfig, managedIndexMetaData) + setReadWriteStep.execute() + val updatedManagedIndexMetaData = setReadWriteStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test read write step sets step status to failed when error thrown`() { + val exception = IllegalArgumentException("example") + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val readWriteActionConfig = ReadWriteActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val setReadWriteStep = SetReadWriteStep(clusterService, client, readWriteActionConfig, managedIndexMetaData) + setReadWriteStep.execute() + val updatedManagedIndexMetaData = setReadWriteStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test read write step sets step status to failed when remote transport error thrown`() { + val exception = RemoteTransportException("rte", IllegalArgumentException("nested")) + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val readWriteActionConfig = ReadWriteActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val setReadWriteStep = SetReadWriteStep(clusterService, client, readWriteActionConfig, managedIndexMetaData) + setReadWriteStep.execute() + val updatedManagedIndexMetaData = setReadWriteStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals("Did not get cause from nested exception", "nested", updatedManagedIndexMetaData.info!!["cause"]) + } + } + + private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient } + private fun getAdminClient(indicesAdminClient: IndicesAdminClient): AdminClient = mock { on { indices() } doReturn indicesAdminClient } + private fun getIndicesAdminClient(setReadWriteResponse: AcknowledgedResponse?, exception: Exception?): IndicesAdminClient { + assertTrue("Must provide one and only one response or exception", (setReadWriteResponse != null).xor(exception != null)) + return mock { + doAnswer { invocationOnMock -> + val listener = invocationOnMock.getArgument>(1) + if (setReadWriteResponse != null) listener.onResponse(setReadWriteResponse) + else listener.onFailure(exception) + }.whenever(this.mock).updateSettings(any(), any()) + } + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtilsTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtilsTests.kt index 2ae64bc00..0b253da55 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtilsTests.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtilsTests.kt @@ -182,49 +182,45 @@ class ManagedIndexUtilsTests : ESTestCase() { fun `test rollover action config evaluate conditions`() { val noConditionsConfig = RolloverActionConfig(minSize = null, minDocs = null, minAge = null, index = 0) assertTrue("No conditions should always pass", noConditionsConfig - .evaluateConditions(indexCreationDate = Instant.ofEpochMilli(-1L), numDocs = 0, indexSize = ByteSizeValue(0))) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(0), numDocs = 0, indexSize = ByteSizeValue(0))) assertTrue("No conditions should always pass", noConditionsConfig - .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 5, indexSize = ByteSizeValue(5))) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(100), numDocs = 5, indexSize = ByteSizeValue(5))) assertTrue("No conditions should always pass", noConditionsConfig - .evaluateConditions(indexCreationDate = Instant.now().minusSeconds(600), numDocs = 5, indexSize = ByteSizeValue(5))) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(-6000), numDocs = 5, indexSize = ByteSizeValue(5))) assertTrue("No conditions should always pass", noConditionsConfig - .evaluateConditions(indexCreationDate = Instant.now().plusSeconds(600), numDocs = 5, indexSize = ByteSizeValue(5))) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(6000), numDocs = 5, indexSize = ByteSizeValue(5))) val minSizeConfig = RolloverActionConfig(minSize = ByteSizeValue(5), minDocs = null, minAge = null, index = 0) assertFalse("Less bytes should not pass", minSizeConfig - .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(1000), numDocs = 0, indexSize = ByteSizeValue.ZERO)) assertTrue("Equal bytes should pass", minSizeConfig - .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue(5))) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(1000), numDocs = 0, indexSize = ByteSizeValue(5))) assertTrue("More bytes should pass", minSizeConfig - .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue(10))) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(1000), numDocs = 0, indexSize = ByteSizeValue(10))) val minDocsConfig = RolloverActionConfig(minSize = null, minDocs = 5, minAge = null, index = 0) assertFalse("Less docs should not pass", minDocsConfig - .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(1000), numDocs = 0, indexSize = ByteSizeValue.ZERO)) assertTrue("Equal docs should pass", minDocsConfig - .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 5, indexSize = ByteSizeValue.ZERO)) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(1000), numDocs = 5, indexSize = ByteSizeValue.ZERO)) assertTrue("More docs should pass", minDocsConfig - .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 10, indexSize = ByteSizeValue.ZERO)) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(1000), numDocs = 10, indexSize = ByteSizeValue.ZERO)) val minAgeConfig = RolloverActionConfig(minSize = null, minDocs = null, minAge = TimeValue.timeValueSeconds(5), index = 0) assertFalse("Index age that is too young should not pass", minAgeConfig - .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(1000), numDocs = 0, indexSize = ByteSizeValue.ZERO)) assertTrue("Index age that is older should pass", minAgeConfig - .evaluateConditions(indexCreationDate = Instant.now().minusSeconds(10), numDocs = 0, indexSize = ByteSizeValue.ZERO)) - assertFalse("Index age that is -1L should not pass", minAgeConfig - .evaluateConditions(indexCreationDate = Instant.ofEpochMilli(-1L), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(10000), numDocs = 0, indexSize = ByteSizeValue.ZERO)) val multiConfig = RolloverActionConfig(minSize = ByteSizeValue(1), minDocs = 1, minAge = TimeValue.timeValueSeconds(5), index = 0) assertFalse("No conditions met should not pass", multiConfig - .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue.ZERO)) - assertFalse("Multi condition, index age -1L should not pass", multiConfig - .evaluateConditions(indexCreationDate = Instant.ofEpochMilli(-1L), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(0), numDocs = 0, indexSize = ByteSizeValue.ZERO)) assertTrue("Multi condition, age should pass", multiConfig - .evaluateConditions(indexCreationDate = Instant.now().minusSeconds(10), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(10000), numDocs = 0, indexSize = ByteSizeValue.ZERO)) assertTrue("Multi condition, docs should pass", multiConfig - .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 2, indexSize = ByteSizeValue.ZERO)) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(0), numDocs = 2, indexSize = ByteSizeValue.ZERO)) assertTrue("Multi condition, size should pass", multiConfig - .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue(2))) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(0), numDocs = 0, indexSize = ByteSizeValue(2))) } fun `test transition evaluate conditions`() {