diff --git a/build-tools/esplugin-coverage.gradle b/build-tools/esplugin-coverage.gradle index b2380db29..a9ebc5814 100644 --- a/build-tools/esplugin-coverage.gradle +++ b/build-tools/esplugin-coverage.gradle @@ -69,8 +69,8 @@ integTestCluster { jacocoTestReport { dependsOn integTest, test executionData dummyTest.jacoco.destinationFile, dummyIntegTest.jacoco.destinationFile - sourceDirectories = sourceSets.main.allSource - classDirectories = sourceSets.main.output + sourceDirectories.from = "src/main/kotlin" + classDirectories.from = sourceSets.main.output reports { html.enabled = true // human readable xml.enabled = true // for coverlay diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexRunner.kt index 8014519ce..e2f3dcbfd 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexRunner.kt @@ -291,7 +291,7 @@ object ManagedIndexRunner : ScheduledJobRunner, if (updateResult && state != null && action != null && step != null && currentActionMetaData != null) { // Step null check is done in getStartingManagedIndexMetaData - step.execute() + step.preExecute(logger).execute().postExecute(logger) var executedManagedIndexMetaData = startingManagedIndexMetaData.getCompletedManagedIndexMetaData(action, step) if (executedManagedIndexMetaData.isFailed) { diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/elasticapi/ElasticExtensions.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/elasticapi/ElasticExtensions.kt index 576fea622..7be0288e3 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/elasticapi/ElasticExtensions.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/elasticapi/ElasticExtensions.kt @@ -24,8 +24,10 @@ import com.amazon.opendistroforelasticsearch.jobscheduler.spi.utils.LockService import kotlinx.coroutines.delay import org.apache.logging.log4j.Logger import org.elasticsearch.ElasticsearchException +import org.elasticsearch.ExceptionsHelper import org.elasticsearch.action.ActionListener import org.elasticsearch.action.bulk.BackoffPolicy +import org.elasticsearch.action.support.DefaultShardOperationFailedException import org.elasticsearch.client.ElasticsearchClient import org.elasticsearch.cluster.metadata.IndexMetaData import org.elasticsearch.common.bytes.BytesReference @@ -36,6 +38,7 @@ import org.elasticsearch.common.xcontent.XContentParser import org.elasticsearch.common.xcontent.XContentParserUtils import org.elasticsearch.common.xcontent.XContentType import org.elasticsearch.rest.RestStatus +import org.elasticsearch.transport.RemoteTransportException import java.time.Instant import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException @@ -202,9 +205,7 @@ fun IndexMetaData.getRolloverAlias(): String? { fun IndexMetaData.getClusterStateManagedIndexConfig(): ClusterStateManagedIndexConfig? { val index = this.index.name val uuid = this.index.uuid - val policyID = this.getPolicyID() - - if (policyID == null) return null + val policyID = this.getPolicyID() ?: return null return ClusterStateManagedIndexConfig(index = index, uuid = uuid, policyID = policyID) } @@ -217,3 +218,13 @@ fun IndexMetaData.getManagedIndexMetaData(): ManagedIndexMetaData? { } return null } + +fun Throwable.findRemoteTransportException(): RemoteTransportException? { + if (this is RemoteTransportException) return this + return this.cause?.findRemoteTransportException() +} + +fun DefaultShardOperationFailedException.getUsefulCauseString(): String { + val rte = this.cause?.findRemoteTransportException() + return if (rte == null) this.toString() else ExceptionsHelper.unwrapCause(rte).toString() +} diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/coordinator/SweptManagedIndexConfig.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/coordinator/SweptManagedIndexConfig.kt index 52feb7784..07761bd90 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/coordinator/SweptManagedIndexConfig.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/coordinator/SweptManagedIndexConfig.kt @@ -40,6 +40,7 @@ data class SweptManagedIndexConfig( ) { companion object { + @Suppress("ComplexMethod") @JvmStatic @Throws(IOException::class) fun parse(xcp: XContentParser, seqNo: Long, primaryTerm: Long): SweptManagedIndexConfig { diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/Step.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/Step.kt index 35cd30051..81ddc9da8 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/Step.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/Step.kt @@ -17,6 +17,7 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.step import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData +import org.apache.logging.log4j.Logger import org.elasticsearch.common.io.stream.StreamInput import org.elasticsearch.common.io.stream.StreamOutput import org.elasticsearch.common.io.stream.Writeable @@ -25,7 +26,17 @@ import java.util.Locale abstract class Step(val name: String, val managedIndexMetaData: ManagedIndexMetaData, val isSafeToDisableOn: Boolean = true) { - abstract suspend fun execute() + fun preExecute(logger: Logger): Step { + logger.info("Executing $name for ${managedIndexMetaData.index}") + return this + } + + abstract suspend fun execute(): Step + + fun postExecute(logger: Logger): Step { + logger.info("Finished executing $name for ${managedIndexMetaData.index}") + return this + } abstract fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData @@ -44,9 +55,7 @@ abstract class Step(val name: String, val managedIndexMetaData: ManagedIndexMeta */ abstract fun isIdempotent(): Boolean - fun getStartingStepMetaData(): StepMetaData { - return StepMetaData(name, getStepStartTime().toEpochMilli(), StepStatus.STARTING) - } + fun getStartingStepMetaData(): StepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), StepStatus.STARTING) fun getStepStartTime(): Instant { if (managedIndexMetaData.stepMetaData == null || managedIndexMetaData.stepMetaData.name != this.name) { @@ -55,6 +64,8 @@ abstract class Step(val name: String, val managedIndexMetaData: ManagedIndexMeta return Instant.ofEpochMilli(managedIndexMetaData.stepMetaData.startTime) } + protected val indexName: String = managedIndexMetaData.index + enum class StepStatus(val status: String) : Writeable { STARTING("starting"), CONDITION_NOT_MET("condition_not_met"), diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/close/AttemptCloseStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/close/AttemptCloseStep.kt index d7192f122..8d8782ec6 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/close/AttemptCloseStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/close/AttemptCloseStep.kt @@ -21,11 +21,13 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.C import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step import org.apache.logging.log4j.LogManager +import org.elasticsearch.ExceptionsHelper import org.elasticsearch.action.admin.indices.close.CloseIndexRequest import org.elasticsearch.action.support.master.AcknowledgedResponse import org.elasticsearch.client.Client import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.snapshots.SnapshotInProgressException +import org.elasticsearch.transport.RemoteTransportException class AttemptCloseStep( val clusterService: ClusterService, @@ -41,34 +43,52 @@ class AttemptCloseStep( override fun isIdempotent() = true @Suppress("TooGenericExceptionCaught") - override suspend fun execute() { - val index = managedIndexMetaData.index + override suspend fun execute(): AttemptCloseStep { try { - logger.info("Executing close on $index") val closeIndexRequest = CloseIndexRequest() - .indices(index) + .indices(indexName) val response: AcknowledgedResponse = client.admin().indices().suspendUntil { close(closeIndexRequest, it) } - logger.info("Close index for $index was acknowledged=${response.isAcknowledged}") if (response.isAcknowledged) { stepStatus = StepStatus.COMPLETED - info = mapOf("message" to "Successfully closed index") + info = mapOf("message" to getSuccessMessage(indexName)) } else { + val message = getFailedMessage(indexName) + logger.warn(message) stepStatus = StepStatus.FAILED - info = mapOf("message" to "Failed to close index") + info = mapOf("message" to message) + } + } catch (e: RemoteTransportException) { + val cause = ExceptionsHelper.unwrapCause(e) + if (cause is SnapshotInProgressException) { + handleSnapshotException(cause) + } else { + handleException(cause as Exception) } } catch (e: SnapshotInProgressException) { - logger.warn("Failed to close index [index=$index] with snapshot in progress") - stepStatus = StepStatus.CONDITION_NOT_MET - info = mapOf("message" to "Index had snapshot in progress, retrying closing") + handleSnapshotException(e) } catch (e: Exception) { - logger.error("Failed to set index to close [index=$index]", e) - stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to "Failed to set index to close") - val errorMessage = e.message - if (errorMessage != null) mutableInfo["cause"] = errorMessage - info = mutableInfo.toMap() + handleException(e) } + + return this + } + + private fun handleSnapshotException(e: SnapshotInProgressException) { + val message = getSnapshotMessage(indexName) + logger.warn(message, e) + stepStatus = StepStatus.CONDITION_NOT_MET + info = mapOf("message" to message) + } + + private fun handleException(e: Exception) { + val message = getFailedMessage(indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() } override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData { @@ -78,4 +98,10 @@ class AttemptCloseStep( info = info ) } + + companion object { + fun getFailedMessage(index: String) = "Failed to close index [index=$index]" + fun getSuccessMessage(index: String) = "Successfully closed index [index=$index]" + fun getSnapshotMessage(index: String) = "Index had snapshot in progress, retrying closing [index=$index]" + } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/delete/AttemptDeleteStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/delete/AttemptDeleteStep.kt index a7ad4dba3..b7bd75a1f 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/delete/AttemptDeleteStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/delete/AttemptDeleteStep.kt @@ -21,11 +21,13 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.D import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step import org.apache.logging.log4j.LogManager +import org.elasticsearch.ExceptionsHelper import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest import org.elasticsearch.action.support.master.AcknowledgedResponse import org.elasticsearch.client.Client import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.snapshots.SnapshotInProgressException +import org.elasticsearch.transport.RemoteTransportException import java.lang.Exception class AttemptDeleteStep( @@ -42,30 +44,51 @@ class AttemptDeleteStep( override fun isIdempotent() = true @Suppress("TooGenericExceptionCaught") - override suspend fun execute() { + override suspend fun execute(): AttemptDeleteStep { try { val response: AcknowledgedResponse = client.admin().indices() - .suspendUntil { delete(DeleteIndexRequest(managedIndexMetaData.index), it) } + .suspendUntil { delete(DeleteIndexRequest(indexName), it) } if (response.isAcknowledged) { stepStatus = StepStatus.COMPLETED - info = mapOf("message" to "Deleted index") + info = mapOf("message" to getSuccessMessage(indexName)) } else { + val message = getFailedMessage(indexName) + logger.warn(message) stepStatus = StepStatus.FAILED - info = mapOf("message" to "Failed to delete index") + info = mapOf("message" to message) + } + } catch (e: RemoteTransportException) { + val cause = ExceptionsHelper.unwrapCause(e) + if (cause is SnapshotInProgressException) { + handleSnapshotException(cause) + } else { + handleException(cause as Exception) } } catch (e: SnapshotInProgressException) { - logger.warn("Failed to delete index [index=${managedIndexMetaData.index}] with snapshot in progress") - stepStatus = StepStatus.CONDITION_NOT_MET - info = mapOf("message" to "Index had snapshot in progress, retrying deletion") + handleSnapshotException(e) } catch (e: Exception) { - logger.error("Failed to delete index [index=${managedIndexMetaData.index}]", e) - stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to "Failed to delete index") - val errorMessage = e.message - if (errorMessage != null) mutableInfo["cause"] = errorMessage - info = mutableInfo.toMap() + handleException(e) } + + return this + } + + private fun handleSnapshotException(e: SnapshotInProgressException) { + val message = getSnapshotMessage(indexName) + logger.warn(message, e) + stepStatus = StepStatus.CONDITION_NOT_MET + info = mapOf("message" to message) + } + + private fun handleException(e: Exception) { + val message = getFailedMessage(indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() } override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData { @@ -78,5 +101,8 @@ class AttemptDeleteStep( companion object { const val name = "attempt_delete" + fun getFailedMessage(index: String) = "Failed to delete index [index=$index]" + fun getSuccessMessage(index: String) = "Successfully deleted index [index=$index]" + fun getSnapshotMessage(index: String) = "Index had snapshot in progress, retrying deletion [index=$index]" } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptCallForceMergeStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptCallForceMergeStep.kt index 66e4baae1..72544a395 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptCallForceMergeStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptCallForceMergeStep.kt @@ -15,18 +15,27 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.step.forcemerge +import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.getUsefulCauseString import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.suspendUntil import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ForceMergeActionConfig import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.ActionProperties import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import org.elasticsearch.client.Client import org.elasticsearch.cluster.service.ClusterService import org.apache.logging.log4j.LogManager +import org.elasticsearch.ExceptionsHelper import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse import org.elasticsearch.rest.RestStatus +import org.elasticsearch.transport.RemoteTransportException +import java.time.Instant class AttemptCallForceMergeStep( val clusterService: ClusterService, @@ -42,35 +51,61 @@ class AttemptCallForceMergeStep( override fun isIdempotent() = false @Suppress("TooGenericExceptionCaught") - override suspend fun execute() { + override suspend fun execute(): AttemptCallForceMergeStep { try { - val indexName = managedIndexMetaData.index - logger.info("Attempting to force merge on [$indexName]") + val startTime = Instant.now().toEpochMilli() val request = ForceMergeRequest(indexName).maxNumSegments(config.maxNumSegments) - val response: ForceMergeResponse = client.admin().indices().suspendUntil { forceMerge(request, it) } + var response: ForceMergeResponse? = null + var throwable: Throwable? = null + GlobalScope.launch(Dispatchers.IO + CoroutineName("ISM-ForceMerge-$indexName")) { + try { + response = client.admin().indices().suspendUntil { forceMerge(request, it) } + if (response?.status == RestStatus.OK) { + logger.info(getSuccessMessage(indexName)) + } else { + logger.warn(getFailedMessage(indexName)) + } + } catch (t: Throwable) { + throwable = t + } + } + + while (response == null && (Instant.now().toEpochMilli() - startTime) < FIVE_MINUTES_IN_MILLIS) { + delay(FIVE_SECONDS_IN_MILLIS) + throwable?.let { throw it } + } - // If response is OK then the force merge operation has started - if (response.status == RestStatus.OK) { + val shadowedResponse = response + if (shadowedResponse?.let { it.status == RestStatus.OK } != false) { stepStatus = StepStatus.COMPLETED - info = mapOf("message" to "Started force merge") + info = mapOf("message" to if (shadowedResponse == null) getSuccessfulCallMessage(indexName) else getSuccessMessage(indexName)) } else { // Otherwise the request to force merge encountered some problem stepStatus = StepStatus.FAILED info = mapOf( - "message" to "Failed to start force merge", - "status" to response.status, - "shard_failures" to response.shardFailures.map { it.toString() } + "message" to getFailedMessage(indexName), + "status" to shadowedResponse.status, + "shard_failures" to shadowedResponse.shardFailures.map { it.getUsefulCauseString() } ) } + } catch (e: RemoteTransportException) { + handleException(ExceptionsHelper.unwrapCause(e) as Exception) } catch (e: Exception) { - logger.error("Failed to start force merge [index=${managedIndexMetaData.index}]", e) - stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to "Failed to start force merge") - val errorMessage = e.message - if (errorMessage != null) mutableInfo["cause"] = errorMessage - info = mutableInfo.toMap() + handleException(e) } + + return this + } + + private fun handleException(e: Exception) { + val message = getFailedMessage(indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() } override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData { @@ -87,5 +122,10 @@ class AttemptCallForceMergeStep( companion object { const val name = "attempt_call_force_merge" + const val FIVE_MINUTES_IN_MILLIS = 1000 * 60 * 5 // how long to wait for the force merge request before moving on + const val FIVE_SECONDS_IN_MILLIS = 1000L * 5L // delay + fun getFailedMessage(index: String) = "Failed to start force merge [index=$index]" + fun getSuccessfulCallMessage(index: String) = "Successfully called force merge [index=$index]" + fun getSuccessMessage(index: String) = "Successfully completed force merge [index=$index]" } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptSetReadOnlyStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptSetReadOnlyStep.kt index 5894d2fca..daf0fac5d 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptSetReadOnlyStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptSetReadOnlyStep.kt @@ -21,12 +21,14 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.F import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step import org.apache.logging.log4j.LogManager +import org.elasticsearch.ExceptionsHelper import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest import org.elasticsearch.action.support.master.AcknowledgedResponse import org.elasticsearch.client.Client -import org.elasticsearch.cluster.metadata.IndexMetaData +import org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_WRITE import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.common.settings.Settings +import org.elasticsearch.transport.RemoteTransportException class AttemptSetReadOnlyStep( val clusterService: ClusterService, @@ -41,18 +43,17 @@ class AttemptSetReadOnlyStep( override fun isIdempotent() = true - override suspend fun execute() { - val indexName = managedIndexMetaData.index - - logger.info("Attempting to set [$indexName] to read-only for force_merge action") + override suspend fun execute(): AttemptSetReadOnlyStep { val indexSetToReadOnly = setIndexToReadOnly(indexName) // If setIndexToReadOnly returns false, updating settings failed and failed info was already updated, can return early - if (!indexSetToReadOnly) return + if (!indexSetToReadOnly) return this // Complete step since index is read-only stepStatus = StepStatus.COMPLETED - info = mapOf("message" to "Set index to read-only") + info = mapOf("message" to getSuccessMessage(indexName)) + + return this } @Suppress("TooGenericExceptionCaught") @@ -60,37 +61,44 @@ class AttemptSetReadOnlyStep( try { val updateSettingsRequest = UpdateSettingsRequest() .indices(indexName) - .settings( - Settings.builder().put(IndexMetaData.SETTING_BLOCKS_WRITE, true) - ) + .settings(Settings.builder().put(SETTING_BLOCKS_WRITE, true)) val response: AcknowledgedResponse = client.admin().indices() .suspendUntil { updateSettings(updateSettingsRequest, it) } if (response.isAcknowledged) { - logger.info("Successfully set [$indexName] to read-only for force_merge action") return true } // If response is not acknowledged, then add failed info - logger.error("Request to set [$indexName] to read-only was NOT acknowledged") + val message = getFailedMessage(indexName) + logger.warn(message) stepStatus = StepStatus.FAILED - info = mapOf("message" to "Failed to set index to read-only") + info = mapOf("message" to message) + } catch (e: RemoteTransportException) { + handleException(ExceptionsHelper.unwrapCause(e) as Exception) } catch (e: Exception) { - logger.error("Failed to set index to read-only [index=${managedIndexMetaData.index}]", e) - stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to "Failed to set index to read-only") - val errorMessage = e.message - if (errorMessage != null) mutableInfo["cause"] = errorMessage - info = mutableInfo.toMap() + handleException(e) } return false } + private fun handleException(e: Exception) { + val message = getFailedMessage(indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() + } + override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData = currentMetaData.copy(stepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), stepStatus), transitionTo = null, info = info) companion object { const val name = "attempt_set_read_only" + fun getFailedMessage(index: String) = "Failed to set index to read-only [index=$index]" + fun getSuccessMessage(index: String) = "Successfully set index to read-only [index=$index]" } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt index 04d3989f9..2a543174d 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt @@ -15,6 +15,7 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.step.forcemerge +import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.getUsefulCauseString import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.suspendUntil import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ForceMergeActionConfig @@ -43,26 +44,22 @@ class WaitForForceMergeStep( override fun isIdempotent() = true - @Suppress("TooGenericExceptionCaught") - override suspend fun execute() { - val indexName = managedIndexMetaData.index - - logger.info("Checking if force merge is complete on [$indexName]") - + @Suppress("TooGenericExceptionCaught", "ReturnCount") + override suspend fun execute(): WaitForForceMergeStep { // Retrieve maxNumSegments value from ActionProperties. If ActionProperties is null, update failed info and return early. - val maxNumSegments = getMaxNumSegments() ?: return + val maxNumSegments = getMaxNumSegments() ?: return this // Get the number of shards with a segment count greater than maxNumSegments, meaning they are still merging val shardsStillMergingSegments = getShardsStillMergingSegments(indexName, maxNumSegments) // If shardsStillMergingSegments is null, failed info has already been updated and can return early - shardsStillMergingSegments ?: return + shardsStillMergingSegments ?: return this // If there are no longer shardsStillMergingSegments, then the force merge has completed if (shardsStillMergingSegments == 0) { - logger.info("Force merge completed on [$indexName]") - + val message = getSuccessMessage(indexName) + logger.info(message) stepStatus = StepStatus.COMPLETED - info = mapOf("message" to "Force merge completed") + info = mapOf("message" to message) } else { /* * If there are still shards with segments merging then no action is taken and the step will be reevaluated @@ -79,21 +76,21 @@ class WaitForForceMergeStep( val timeoutInSeconds: Long = config.configTimeout?.timeout?.seconds ?: FORCE_MERGE_TIMEOUT_IN_SECONDS if (timeWaitingForForceMerge.toSeconds() > timeoutInSeconds) { - logger.error( - "Force merge on [$indexName] timed out with [$shardsStillMergingSegments] shards containing unmerged segments" - ) + logger.error("Force merge on [$indexName] timed out with" + + " [$shardsStillMergingSegments] shards containing unmerged segments") stepStatus = StepStatus.FAILED - info = mapOf("message" to "Force merge timed out") + info = mapOf("message" to getFailedTimedOutMessage(indexName)) } else { - logger.debug( - "Force merge still running on [$indexName] with [$shardsStillMergingSegments] shards containing unmerged segments" - ) + logger.debug("Force merge still running on [$indexName] with" + + " [$shardsStillMergingSegments] shards containing unmerged segments") stepStatus = StepStatus.CONDITION_NOT_MET - info = mapOf("message" to "Waiting for force merge to complete") + info = mapOf("message" to getWaitingMessage(indexName)) } } + + return this } private fun getMaxNumSegments(): Int? { @@ -125,17 +122,18 @@ class WaitForForceMergeStep( } } - logger.debug("Failed to get index stats for index: [$indexName], status response: [${statsResponse.status}]") - + val message = getFailedSegmentCheckMessage(indexName) + logger.warn("$message - ${statsResponse.status}") stepStatus = StepStatus.FAILED info = mapOf( - "message" to "Failed to check segments when waiting for force merge to complete", - "shard_failures" to statsResponse.shardFailures.map { it.toString() } + "message" to message, + "shard_failures" to statsResponse.shardFailures.map { it.getUsefulCauseString() } ) } catch (e: Exception) { - logger.error("Failed to check segments when waiting for force merge to complete [index=${managedIndexMetaData.index}]", e) + val message = getFailedSegmentCheckMessage(indexName) + logger.error(message, e) stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to "Failed to check segments when waiting for force merge to complete") + val mutableInfo = mutableMapOf("message" to message) val errorMessage = e.message if (errorMessage != null) mutableInfo["cause"] = errorMessage info = mutableInfo.toMap() @@ -169,7 +167,10 @@ class WaitForForceMergeStep( companion object { const val name = "wait_for_force_merge" - const val FORCE_MERGE_TIMEOUT_IN_SECONDS = 43200L // 12 hours + fun getFailedTimedOutMessage(index: String) = "Force merge timed out [index=$index]" + fun getFailedSegmentCheckMessage(index: String) = "Failed to check segments when waiting for force merge to complete [index=$index]" + fun getWaitingMessage(index: String) = "Waiting for force merge to complete [index=$index]" + fun getSuccessMessage(index: String) = "Successfully confirmed segments force merged [index=$index]" } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/notification/AttemptNotificationStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/notification/AttemptNotificationStep.kt index 75b8c7bd7..39f8e5c63 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/notification/AttemptNotificationStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/notification/AttemptNotificationStep.kt @@ -44,24 +44,30 @@ class AttemptNotificationStep( override fun isIdempotent() = false @Suppress("TooGenericExceptionCaught") - override suspend fun execute() { + override suspend fun execute(): AttemptNotificationStep { try { - logger.info("Executing $name on ${managedIndexMetaData.index}") withContext(Dispatchers.IO) { config.destination.publish(null, compileTemplate(config.messageTemplate, managedIndexMetaData)) } // publish internally throws an error for any invalid responses so its safe to assume if we reach this point it was successful stepStatus = StepStatus.COMPLETED - info = mapOf("message" to "Successfully sent notification") + info = mapOf("message" to getSuccessMessage(indexName)) } catch (e: Exception) { - logger.error("Failed to send notification [index=${managedIndexMetaData.index}]", e) - stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to "Failed to send notification") - val errorMessage = e.message - if (errorMessage != null) mutableInfo["cause"] = errorMessage - info = mutableInfo.toMap() + handleException(e) } + + return this + } + + private fun handleException(e: Exception) { + val message = getFailedMessage(indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() } override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData { @@ -77,4 +83,9 @@ class AttemptNotificationStep( .newInstance(template.params + mapOf("ctx" to managedIndexMetaData.convertToMap())) .execute() } + + companion object { + fun getFailedMessage(index: String) = "Failed to send notification [index=$index]" + fun getSuccessMessage(index: String) = "Successfully sent notification [index=$index]" + } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/open/AttemptOpenStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/open/AttemptOpenStep.kt index 950357ad1..38d378808 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/open/AttemptOpenStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/open/AttemptOpenStep.kt @@ -21,10 +21,12 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.O import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step import org.apache.logging.log4j.LogManager +import org.elasticsearch.ExceptionsHelper import org.elasticsearch.action.admin.indices.open.OpenIndexRequest import org.elasticsearch.action.admin.indices.open.OpenIndexResponse import org.elasticsearch.client.Client import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.transport.RemoteTransportException class AttemptOpenStep( val clusterService: ClusterService, @@ -40,28 +42,38 @@ class AttemptOpenStep( override fun isIdempotent() = true @Suppress("TooGenericExceptionCaught") - override suspend fun execute() { + override suspend fun execute(): AttemptOpenStep { try { - logger.info("Executing open on ${managedIndexMetaData.index}") val openIndexRequest = OpenIndexRequest() - .indices(managedIndexMetaData.index) + .indices(indexName) val response: OpenIndexResponse = client.admin().indices().suspendUntil { open(openIndexRequest, it) } if (response.isAcknowledged) { stepStatus = StepStatus.COMPLETED - info = mapOf("message" to "Successfully opened index") + info = mapOf("message" to getSuccessMessage(indexName)) } else { + val message = getFailedMessage(indexName) + logger.warn(message) stepStatus = StepStatus.FAILED - info = mapOf("message" to "Failed to open index: ${managedIndexMetaData.index}") + info = mapOf("message" to message) } + } catch (e: RemoteTransportException) { + handleException(ExceptionsHelper.unwrapCause(e) as Exception) } catch (e: Exception) { - logger.error("Failed to set index to open [index=${managedIndexMetaData.index}]", e) - stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to "Failed to set index to open") - val errorMessage = e.message - if (errorMessage != null) mutableInfo["cause"] = errorMessage - info = mutableInfo.toMap() + handleException(e) } + + return this + } + + private fun handleException(e: Exception) { + val message = getFailedMessage(indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() } override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData { @@ -71,4 +83,9 @@ class AttemptOpenStep( info = info ) } + + companion object { + fun getFailedMessage(index: String) = "Failed to open index [index=$index]" + fun getSuccessMessage(index: String) = "Successfully opened index [index=$index]" + } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readonly/SetReadOnlyStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readonly/SetReadOnlyStep.kt index 9d217515c..4642c6d6b 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readonly/SetReadOnlyStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readonly/SetReadOnlyStep.kt @@ -21,11 +21,14 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.R import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step import org.apache.logging.log4j.LogManager +import org.elasticsearch.ExceptionsHelper import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest import org.elasticsearch.action.support.master.AcknowledgedResponse import org.elasticsearch.client.Client +import org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_WRITE import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.common.settings.Settings +import org.elasticsearch.transport.RemoteTransportException class SetReadOnlyStep( val clusterService: ClusterService, @@ -41,31 +44,40 @@ class SetReadOnlyStep( override fun isIdempotent() = true @Suppress("TooGenericExceptionCaught") - override suspend fun execute() { + override suspend fun execute(): SetReadOnlyStep { try { val updateSettingsRequest = UpdateSettingsRequest() - .indices(managedIndexMetaData.index) - .settings( - Settings.builder().put("index.blocks.write", true) - ) + .indices(indexName) + .settings(Settings.builder().put(SETTING_BLOCKS_WRITE, true)) val response: AcknowledgedResponse = client.admin().indices() .suspendUntil { updateSettings(updateSettingsRequest, it) } if (response.isAcknowledged) { stepStatus = StepStatus.COMPLETED - info = mapOf("message" to "Set index to read-only") + info = mapOf("message" to getSuccessMessage(indexName)) } else { + val message = getFailedMessage(indexName) + logger.warn(message) stepStatus = StepStatus.FAILED - info = mapOf("message" to "Failed to set index to read-only") + info = mapOf("message" to message) } + } catch (e: RemoteTransportException) { + handleException(ExceptionsHelper.unwrapCause(e) as Exception) } catch (e: Exception) { - logger.error("Failed to set index to read-only [index=${managedIndexMetaData.index}]", e) - stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to "Failed to set index to read-only") - val errorMessage = e.message - if (errorMessage != null) mutableInfo["cause"] = errorMessage - info = mutableInfo.toMap() + handleException(e) } + + return this + } + + private fun handleException(e: Exception) { + val message = getFailedMessage(indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() } override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData { @@ -75,4 +87,9 @@ class SetReadOnlyStep( info = info ) } + + companion object { + fun getFailedMessage(index: String) = "Failed to set index to read-only [index=$index]" + fun getSuccessMessage(index: String) = "Successfully set index to read-only [index=$index]" + } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readwrite/SetReadWriteStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readwrite/SetReadWriteStep.kt index f38d6ce3f..0e77c8c1b 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readwrite/SetReadWriteStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readwrite/SetReadWriteStep.kt @@ -21,11 +21,14 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.R import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step import org.apache.logging.log4j.LogManager +import org.elasticsearch.ExceptionsHelper import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest import org.elasticsearch.action.support.master.AcknowledgedResponse import org.elasticsearch.client.Client +import org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_WRITE import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.common.settings.Settings +import org.elasticsearch.transport.RemoteTransportException class SetReadWriteStep( val clusterService: ClusterService, @@ -41,31 +44,42 @@ class SetReadWriteStep( override fun isIdempotent() = true @Suppress("TooGenericExceptionCaught") - override suspend fun execute() { + override suspend fun execute(): SetReadWriteStep { try { val updateSettingsRequest = UpdateSettingsRequest() - .indices(managedIndexMetaData.index) + .indices(indexName) .settings( - Settings.builder().put("index.blocks.write", false) + Settings.builder().put(SETTING_BLOCKS_WRITE, false) ) val response: AcknowledgedResponse = client.admin().indices() .suspendUntil { updateSettings(updateSettingsRequest, it) } if (response.isAcknowledged) { stepStatus = StepStatus.COMPLETED - info = mapOf("message" to "Set index to read-write") + info = mapOf("message" to getSuccessMessage(indexName)) } else { + val message = getFailedMessage(indexName) + logger.warn(message) stepStatus = StepStatus.FAILED - info = mapOf("message" to "Failed to set index to read-write") + info = mapOf("message" to message) } + } catch (e: RemoteTransportException) { + handleException(ExceptionsHelper.unwrapCause(e) as Exception) } catch (e: Exception) { - logger.error("Failed to set index to read-write [index=${managedIndexMetaData.index}]", e) - stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to "Failed to set index to read-write") - val errorMessage = e.message - if (errorMessage != null) mutableInfo["cause"] = errorMessage - info = mutableInfo.toMap() + handleException(e) } + + return this + } + + private fun handleException(e: Exception) { + val message = getFailedMessage(indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() } override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData { @@ -75,4 +89,9 @@ class SetReadWriteStep( info = info ) } + + companion object { + fun getFailedMessage(index: String) = "Failed to set index to read-write [index=$index]" + fun getSuccessMessage(index: String) = "Successfully set index to read-write [index=$index]" + } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/replicacount/AttemptSetReplicaCountStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/replicacount/AttemptSetReplicaCountStep.kt index e452736b0..adfb08964 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/replicacount/AttemptSetReplicaCountStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/replicacount/AttemptSetReplicaCountStep.kt @@ -21,11 +21,14 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.R import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step import org.apache.logging.log4j.LogManager +import org.elasticsearch.ExceptionsHelper import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest import org.elasticsearch.action.support.master.AcknowledgedResponse import org.elasticsearch.client.Client +import org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.common.settings.Settings +import org.elasticsearch.transport.RemoteTransportException class AttemptSetReplicaCountStep( val clusterService: ClusterService, @@ -37,36 +40,45 @@ class AttemptSetReplicaCountStep( private val logger = LogManager.getLogger(javaClass) private var stepStatus = StepStatus.STARTING private var info: Map? = null + private val numOfReplicas = config.numOfReplicas override fun isIdempotent() = true @Suppress("TooGenericExceptionCaught") - override suspend fun execute() { - val numOfReplicas = config.numOfReplicas + override suspend fun execute(): AttemptSetReplicaCountStep { try { - logger.info("Executing $name on ${managedIndexMetaData.index}") val updateSettingsRequest = UpdateSettingsRequest() - .indices(managedIndexMetaData.index) - .settings(Settings.builder().put("index.number_of_replicas", numOfReplicas)) + .indices(indexName) + .settings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, numOfReplicas)) val response: AcknowledgedResponse = client.admin().indices() .suspendUntil { updateSettings(updateSettingsRequest, it) } if (response.isAcknowledged) { - logger.info("Successfully executed $name on ${managedIndexMetaData.index}") stepStatus = StepStatus.COMPLETED - info = mapOf("message" to "Set number_of_replicas to $numOfReplicas") + info = mapOf("message" to getSuccessMessage(indexName, numOfReplicas)) } else { + val message = getFailedMessage(indexName, numOfReplicas) + logger.warn(message) stepStatus = StepStatus.FAILED - info = mapOf("message" to "Failed to set number_of_replicas to $numOfReplicas") + info = mapOf("message" to message) } + } catch (e: RemoteTransportException) { + handleException(ExceptionsHelper.unwrapCause(e) as Exception) } catch (e: Exception) { - logger.error("Failed to set number_of_replicas [index=${managedIndexMetaData.index}]", e) - stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to "Failed to set number_of_replicas to $numOfReplicas") - val errorMessage = e.message - if (errorMessage != null) mutableInfo["cause"] = errorMessage - info = mutableInfo.toMap() + handleException(e) } + + return this + } + + private fun handleException(e: Exception) { + val message = getFailedMessage(indexName, numOfReplicas) + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() } override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData { @@ -76,4 +88,9 @@ class AttemptSetReplicaCountStep( info = info ) } + + companion object { + fun getFailedMessage(index: String, numOfReplicas: Int) = "Failed to set number_of_replicas to $numOfReplicas [index=$index]" + fun getSuccessMessage(index: String, numOfReplicas: Int) = "Successfully set number_of_replicas to $numOfReplicas [index=$index]" + } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt index 56410dfa9..13b5ed023 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt @@ -16,6 +16,7 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.step.rollover import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.getRolloverAlias +import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.getUsefulCauseString import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.suspendUntil import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.RolloverActionConfig @@ -30,6 +31,7 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse import org.elasticsearch.client.Client import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.common.unit.ByteSizeValue +import org.elasticsearch.common.unit.TimeValue import org.elasticsearch.rest.RestStatus import java.time.Instant @@ -48,43 +50,69 @@ class AttemptRolloverStep( override fun isIdempotent() = false @Suppress("TooGenericExceptionCaught") - override suspend fun execute() { - val index = managedIndexMetaData.index + override suspend fun execute(): AttemptRolloverStep { // If we have already rolled over this index then fail as we only allow an index to be rolled over once if (managedIndexMetaData.rolledOver == true) { - logger.warn("$index was already rolled over, cannot execute rollover step") + logger.warn("$indexName was already rolled over, cannot execute rollover step") stepStatus = StepStatus.FAILED - info = mapOf("message" to "This index has already been rolled over") - return + info = mapOf("message" to getFailedDuplicateRolloverMessage(indexName)) + return this } val alias = getAliasOrUpdateInfo() // If alias is null we already updated failed info from getAliasOrUpdateInfo and can return early - alias ?: return + alias ?: return this val statsResponse = getIndexStatsOrUpdateInfo() // If statsResponse is null we already updated failed info from getIndexStatsOrUpdateInfo and can return early - statsResponse ?: return + statsResponse ?: return this - val indexCreationDate = clusterService.state().metaData().index(index).creationDate - val indexCreationDateInstant = Instant.ofEpochMilli(indexCreationDate) - if (indexCreationDate == -1L) { - logger.warn("$index had an indexCreationDate=-1L, cannot use for comparison") + val indexCreationDate = clusterService.state().metaData().index(indexName).creationDate + val indexAgeTimeValue = if (indexCreationDate == -1L) { + logger.warn("$indexName had an indexCreationDate=-1L, cannot use for comparison") + // since we cannot use for comparison, we can set it to 0 as minAge will never be <= 0 + TimeValue.timeValueMillis(0) + } else { + TimeValue.timeValueMillis(Instant.now().toEpochMilli() - indexCreationDate) } val numDocs = statsResponse.primaries.docs?.count ?: 0 val indexSize = ByteSizeValue(statsResponse.primaries.docs?.totalSizeInBytes ?: 0) - - if (config.evaluateConditions(indexCreationDateInstant, numDocs, indexSize)) { - logger.info("$index rollover conditions evaluated to true [indexCreationDate=$indexCreationDate," + + val conditions = listOfNotNull( + config.minAge?.let { + RolloverActionConfig.MIN_INDEX_AGE_FIELD to mapOf( + "condition" to it.toString(), + "current" to indexAgeTimeValue.toString(), + "creationDate" to indexCreationDate + ) + }, + config.minDocs?.let { + RolloverActionConfig.MIN_DOC_COUNT_FIELD to mapOf( + "condition" to it, + "current" to numDocs + ) + }, + config.minSize?.let { + RolloverActionConfig.MIN_SIZE_FIELD to mapOf( + "condition" to it.toString(), + "current" to indexSize.toString() + ) + } + ).toMap() + + if (config.evaluateConditions(indexAgeTimeValue, numDocs, indexSize)) { + logger.info("$indexName rollover conditions evaluated to true [indexCreationDate=$indexCreationDate," + " numDocs=$numDocs, indexSize=${indexSize.bytes}]") - executeRollover(alias) + executeRollover(alias, conditions) } else { stepStatus = StepStatus.CONDITION_NOT_MET - info = mapOf("message" to "Attempting to rollover") + info = mapOf("message" to getAttemptingMessage(indexName), "conditions" to conditions) } + + return this } - private suspend fun executeRollover(alias: String) { + @Suppress("ComplexMethod") + private suspend fun executeRollover(alias: String, conditions: Map>) { try { val request = RolloverRequest(alias, null) val response: RolloverResponse = client.admin().indices().suspendUntil { rolloverIndex(request, it) } @@ -95,29 +123,34 @@ class AttemptRolloverStep( // If response isAcknowledged it means the index was created and alias was added to new index if (response.isAcknowledged) { stepStatus = StepStatus.COMPLETED - info = mapOf("message" to "Rolled over index") + info = listOfNotNull( + "message" to getSuccessMessage(indexName), + if (conditions.isEmpty()) null else "conditions" to conditions // don't show empty conditions object if no conditions specified + ).toMap() } else { // If the alias update response is NOT acknowledged we will get back isAcknowledged=false // This means the new index was created but we failed to swap the alias + val message = getFailedAliasUpdateMessage(indexName, response.newIndex) + logger.warn(message) stepStatus = StepStatus.FAILED - info = mapOf("message" to "New index created (${response.newIndex}), but failed to update alias") + info = listOfNotNull( + "message" to message, + if (conditions.isEmpty()) null else "conditions" to conditions // don't show empty conditions object if no conditions specified + ).toMap() } } catch (e: Exception) { - logger.error("Failed to rollover index [index=${managedIndexMetaData.index}]", e) - stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to "Failed to rollover index") - val errorMessage = e.message - if (errorMessage != null) mutableInfo.put("cause", errorMessage) - info = mutableInfo.toMap() + handleException(e) } } private fun getAliasOrUpdateInfo(): String? { - val alias = clusterService.state().metaData().index(managedIndexMetaData.index).getRolloverAlias() + val alias = clusterService.state().metaData().index(indexName).getRolloverAlias() if (alias == null) { + val message = getFailedNoValidAliasMessage(indexName) + logger.warn(message) stepStatus = StepStatus.FAILED - info = mapOf("message" to "There is no valid rollover_alias=$alias set on ${managedIndexMetaData.index}") + info = mapOf("message" to message) } return alias @@ -126,26 +159,25 @@ class AttemptRolloverStep( private suspend fun getIndexStatsOrUpdateInfo(): IndicesStatsResponse? { try { val statsRequest = IndicesStatsRequest() - .indices(managedIndexMetaData.index).clear().docs(true) + .indices(indexName).clear().docs(true) val statsResponse: IndicesStatsResponse = client.admin().indices().suspendUntil { stats(statsRequest, it) } if (statsResponse.status == RestStatus.OK) { return statsResponse } - logger.debug( - "Failed to get index stats for index: [${managedIndexMetaData.index}], status response: [${statsResponse.status}]" - ) - + val message = getFailedEvaluateMessage(indexName) + logger.warn("$message - ${statsResponse.status}") stepStatus = StepStatus.FAILED info = mapOf( - "message" to "Failed to evaluate conditions for rollover", - "shard_failures" to statsResponse.shardFailures.map { it.toString() } + "message" to message, + "shard_failures" to statsResponse.shardFailures.map { it.getUsefulCauseString() } ) } catch (e: Exception) { - logger.error("Failed to evaluate conditions for rollover [index=${managedIndexMetaData.index}]", e) + val message = getFailedEvaluateMessage(indexName) + logger.error(message, e) stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to "Failed to evaluate conditions for rollover") + val mutableInfo = mutableMapOf("message" to message) val errorMessage = e.message if (errorMessage != null) mutableInfo["cause"] = errorMessage info = mutableInfo.toMap() @@ -154,6 +186,16 @@ class AttemptRolloverStep( return null } + private fun handleException(e: Exception) { + val message = getFailedMessage(indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() + } + override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData { return currentMetaData.copy( stepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), stepStatus), @@ -162,4 +204,15 @@ class AttemptRolloverStep( info = info ) } + + companion object { + fun getFailedMessage(index: String) = "Failed to rollover index [index=$index]" + fun getFailedAliasUpdateMessage(index: String, newIndex: String) = + "New index created, but failed to update alias [index=$index, newIndex=$newIndex]" + fun getFailedNoValidAliasMessage(index: String) = "Missing rollover_alias index setting [index=$index]" + fun getFailedDuplicateRolloverMessage(index: String) = "Index has already been rolled over [index=$index]" + fun getFailedEvaluateMessage(index: String) = "Failed to evaluate conditions for rollover [index=$index]" + fun getAttemptingMessage(index: String) = "Attempting to rollover index [index=$index]" + fun getSuccessMessage(index: String) = "Successfully rolled over index [index=$index]" + } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/transition/AttemptTransitionStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/transition/AttemptTransitionStep.kt index 17da79078..728fc7f87 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/transition/AttemptTransitionStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/transition/AttemptTransitionStep.kt @@ -15,6 +15,7 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.step.transition +import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.getUsefulCauseString import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.suspendUntil import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.TransitionsActionConfig @@ -23,14 +24,16 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.evaluateConditions import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.hasStatsConditions import org.apache.logging.log4j.LogManager +import org.elasticsearch.ExceptionsHelper import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse import org.elasticsearch.client.Client import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.common.unit.ByteSizeValue import org.elasticsearch.rest.RestStatus -import java.lang.Exception +import org.elasticsearch.transport.RemoteTransportException import java.time.Instant +import kotlin.Exception /** * Attempt to transition to the next state @@ -53,21 +56,20 @@ class AttemptTransitionStep( override fun isIdempotent() = true - @Suppress("TooGenericExceptionCaught") - override suspend fun execute() { - val index = managedIndexMetaData.index + @Suppress("TooGenericExceptionCaught", "ReturnCount", "ComplexMethod") + override suspend fun execute(): AttemptTransitionStep { try { if (config.transitions.isEmpty()) { - logger.info("$index transitions are empty, completing policy") + logger.info("$indexName transitions are empty, completing policy") policyCompleted = true stepStatus = StepStatus.COMPLETED - return + return this } - val indexCreationDate = clusterService.state().metaData().index(index).creationDate + val indexCreationDate = clusterService.state().metaData().index(indexName).creationDate val indexCreationDateInstant = Instant.ofEpochMilli(indexCreationDate) if (indexCreationDate == -1L) { - logger.warn("$index had an indexCreationDate=-1L, cannot use for comparison") + logger.warn("$indexName had an indexCreationDate=-1L, cannot use for comparison") } val stepStartTime = getStepStartTime() var numDocs: Long? = null @@ -75,47 +77,54 @@ class AttemptTransitionStep( if (config.transitions.any { it.hasStatsConditions() }) { val statsRequest = IndicesStatsRequest() - .indices(index).clear().docs(true) + .indices(indexName).clear().docs(true) val statsResponse: IndicesStatsResponse = client.admin().indices().suspendUntil { stats(statsRequest, it) } if (statsResponse.status != RestStatus.OK) { - logger.debug( - "Failed to get index stats for index: [$index], status response: [${statsResponse.status}]" - ) - + val message = getFailedStatsMessage(indexName) + logger.warn("$message - ${statsResponse.status}") stepStatus = StepStatus.FAILED info = mapOf( - "message" to "Failed to evaluate conditions for transition", - "shard_failures" to statsResponse.shardFailures.map { it.toString() } + "message" to message, + "shard_failures" to statsResponse.shardFailures.map { it.getUsefulCauseString() } ) - return + return this } - - numDocs = statsResponse.primaries.docs?.count ?: 0 - indexSize = ByteSizeValue(statsResponse.primaries.docs?.totalSizeInBytes ?: 0) + numDocs = statsResponse.primaries.getDocs()?.count ?: 0 + indexSize = ByteSizeValue(statsResponse.primaries.getDocs()?.totalSizeInBytes ?: 0) } // Find the first transition that evaluates to true and get the state to transition to, otherwise return null if none are true stateName = config.transitions.find { it.evaluateConditions(indexCreationDateInstant, numDocs, indexSize, stepStartTime) }?.stateName val message: String + val stateName = stateName // shadowed on purpose to prevent var from changing if (stateName != null) { - logger.info("$index transition conditions evaluated to true [indexCreationDate=$indexCreationDate," + + logger.info("$indexName transition conditions evaluated to true [indexCreationDate=$indexCreationDate," + " numDocs=$numDocs, indexSize=${indexSize?.bytes},stepStartTime=${stepStartTime.toEpochMilli()}]") stepStatus = StepStatus.COMPLETED - message = "Transitioning to $stateName" + message = getSuccessMessage(indexName, stateName) } else { stepStatus = StepStatus.CONDITION_NOT_MET - message = "Attempting to transition" + message = getEvaluatingMessage(indexName) } info = mapOf("message" to message) + } catch (e: RemoteTransportException) { + handleException(ExceptionsHelper.unwrapCause(e) as Exception) } catch (e: Exception) { - logger.error("Failed to transition index [index=$index]", e) - stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to "Failed to transition index") - val errorMessage = e.message - if (errorMessage != null) mutableInfo["cause"] = errorMessage - info = mutableInfo.toMap() + handleException(e) } + + return this + } + + private fun handleException(e: Exception) { + val message = getFailedMessage(indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() } override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData { @@ -126,4 +135,11 @@ class AttemptTransitionStep( info = info ) } + + companion object { + fun getFailedMessage(index: String) = "Failed to transition index [index=$index]" + fun getFailedStatsMessage(index: String) = "Failed to get stats information for the index [index=$index]" + fun getEvaluatingMessage(index: String) = "Evaluating transition conditions [index=$index]" + fun getSuccessMessage(index: String, state: String) = "Transitioning to $state [index=$index]" + } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtils.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtils.kt index 64216b499..a40ba71de 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtils.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtils.kt @@ -46,6 +46,7 @@ import org.elasticsearch.action.update.UpdateRequest import org.elasticsearch.client.Client import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.common.unit.ByteSizeValue +import org.elasticsearch.common.unit.TimeValue import org.elasticsearch.common.xcontent.ToXContent import org.elasticsearch.common.xcontent.XContentFactory import org.elasticsearch.index.query.BoolQueryBuilder @@ -218,7 +219,7 @@ fun Transition.hasStatsConditions(): Boolean = this.conditions?.docCount != null @Suppress("ReturnCount") fun RolloverActionConfig.evaluateConditions( - indexCreationDate: Instant, + indexAgeTimeValue: TimeValue, numDocs: Long, indexSize: ByteSizeValue ): Boolean { @@ -234,11 +235,7 @@ fun RolloverActionConfig.evaluateConditions( } if (this.minAge != null) { - val indexCreationDateMilli = indexCreationDate.toEpochMilli() - if (indexCreationDateMilli != -1L) { - val elapsedTime = Instant.now().toEpochMilli() - indexCreationDateMilli - if (this.minAge.millis <= elapsedTime) return true - } + if (this.minAge.millis <= indexAgeTimeValue.millis) return true } if (this.minSize != null) { diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementRestTestCase.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementRestTestCase.kt index a10da5106..bfac8a52e 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementRestTestCase.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementRestTestCase.kt @@ -334,22 +334,28 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() { } @Suppress("UNCHECKED_CAST") - protected fun getSegmentCount(index: String): Int { - val statsResponse: Map = getStats(index) - - // Assert that shard count of stats response is 1 since the stats request being used is at the index level - // (meaning the segment count in the response is aggregated) but segment count for force merge - // (which this method is primarily being used for) is going to be validated per shard - val shardsInfo = statsResponse["_shards"] as Map - assertEquals("Shard count higher than expected", 1, shardsInfo["successful"]) - - val indicesStats = statsResponse["indices"] as Map>>> - return indicesStats[index]!!["primaries"]!!["segments"]!!["count"] as Int + protected fun validateSegmentCount(index: String, min: Int? = null, max: Int? = null): Boolean { + if (min == null && max == null) throw IllegalArgumentException("Must provide at least a min or max") + val statsResponse: Map = getShardSegmentStats(index) + + val indicesStats = statsResponse["indices"] as Map>>>>> + return indicesStats[index]!!["shards"]!!.values.all { list -> + list.filter { it["routing"]!!["primary"] == true }.all { + if (it["routing"]!!["state"] != "STARTED") { + false + } else { + val count = it["segments"]!!["count"] as Int + if (min != null && count < min) return false + if (max != null && count > max) return false + return true + } + } + } } /** Get stats for [index] */ - private fun getStats(index: String): Map { - val response = client().makeRequest("GET", "/$index/_stats") + private fun getShardSegmentStats(index: String): Map { + val response = client().makeRequest("GET", "/$index/_stats/segments?level=shards") assertEquals("Stats request failed", RestStatus.OK, response.restStatus()) diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/TestHelpers.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/TestHelpers.kt index 815ff22ad..7a01b27b5 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/TestHelpers.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/TestHelpers.kt @@ -20,6 +20,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ChangePo import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Conditions import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ErrorNotification import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Policy import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.State import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.StateFilter @@ -360,6 +361,11 @@ fun ManagedIndexConfig.toJsonString(): String { return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string() } +fun ManagedIndexMetaData.toJsonString(): String { + val builder = XContentFactory.jsonBuilder().startObject() + return this.toXContent(builder, ToXContent.EMPTY_PARAMS).endObject().string() +} + /** * Wrapper for [RestClient.performRequest] which was deprecated in ES 6.5 and is used in tests. This provides * a single place to suppress deprecation warnings. This will probably need further work when the API is removed entirely diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ActionRetryIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ActionRetryIT.kt index c974e95c6..fb571cf46 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ActionRetryIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ActionRetryIT.kt @@ -21,6 +21,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedi import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.PolicyRetryInfoMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StateMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.rollover.AttemptRolloverStep import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor import java.time.Instant import java.util.Locale @@ -42,7 +43,7 @@ class ActionRetryIT : IndexStateManagementRestTestCase() { val indexName = "${testIndexName}_index_1" val policyID = "${testIndexName}_testPolicyName_1" createPolicyJson(testPolicy, policyID) - val expectedInfoString = mapOf("message" to "There is no valid rollover_alias=null set on $indexName").toString() + val expectedInfoString = mapOf("message" to AttemptRolloverStep.getFailedNoValidAliasMessage(indexName)).toString() createIndex(indexName, policyID) @@ -134,7 +135,7 @@ class ActionRetryIT : IndexStateManagementRestTestCase() { // even if we ran couple times we should have backed off and only retried once. waitFor { - val expectedInfoString = mapOf("message" to "There is no valid rollover_alias=null set on $indexName").toString() + val expectedInfoString = mapOf("message" to AttemptRolloverStep.getFailedNoValidAliasMessage(indexName)).toString() assertPredicatesOnMetaData( listOf( indexName to listOf( diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ActionTimeoutIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ActionTimeoutIT.kt index 8f94b5741..24b10d35b 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ActionTimeoutIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ActionTimeoutIT.kt @@ -19,7 +19,10 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateMana import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ActionConfig import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.ActionMetaData +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.open.AttemptOpenStep +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.rollover.AttemptRolloverStep import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor +import org.hamcrest.collection.IsMapContaining import java.time.Instant import java.util.Locale @@ -54,13 +57,11 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() { // the second execution we move into rollover action, we won't hit the timeout as this is the execution that sets the startTime updateManagedIndexConfigStartTime(managedIndexConfig) - - val expectedInfoString = mapOf("message" to "Attempting to rollover").toString() waitFor { - assertPredicatesOnMetaData( - listOf(indexName to listOf(ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedInfoString == info.toString())), - getExplainMap(indexName), - strict = false + assertThat( + "Should be attempting to rollover", + getExplainManagedIndexMetaData(indexName).info, + IsMapContaining.hasEntry("message", AttemptRolloverStep.getAttemptingMessage(indexName) as Any?) ) } @@ -107,7 +108,7 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() { // the second execution we move into open action, we won't hit the timeout as this is the execution that sets the startTime updateManagedIndexConfigStartTime(managedIndexConfig) - val expectedOpenInfoString = mapOf("message" to "Successfully opened index").toString() + val expectedOpenInfoString = mapOf("message" to AttemptOpenStep.getSuccessMessage(indexName)).toString() waitFor { assertPredicatesOnMetaData( listOf(indexName to listOf(ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedOpenInfoString == info.toString())), @@ -122,13 +123,11 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() { // the third execution we move into rollover action, we should not hit the timeout yet because its the first execution of rollover // but there was a bug before where it would use the startTime from the previous actions metadata and immediately fail updateManagedIndexConfigStartTime(managedIndexConfig) - - val expectedRolloverInfoString = mapOf("message" to "Attempting to rollover").toString() waitFor { - assertPredicatesOnMetaData( - listOf(indexName to listOf(ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedRolloverInfoString == info.toString())), - getExplainMap(indexName), - strict = false + assertThat( + "Should be attempting to rollover", + getExplainManagedIndexMetaData(indexName).info, + IsMapContaining.hasEntry("message", AttemptRolloverStep.getAttemptingMessage(indexName) as Any?) ) } } diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ForceMergeActionIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ForceMergeActionIT.kt index e95f602c6..9623ea68c 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ForceMergeActionIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ForceMergeActionIT.kt @@ -20,6 +20,9 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Policy import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.State import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ForceMergeActionConfig import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomErrorNotification +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.forcemerge.AttemptCallForceMergeStep +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.forcemerge.AttemptSetReadOnlyStep +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.forcemerge.WaitForForceMergeStep import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor import org.elasticsearch.cluster.metadata.IndexMetaData import org.elasticsearch.common.settings.Settings @@ -55,7 +58,7 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() { // Add sample data to increase segment count, passing in a delay to ensure multiple segments get created insertSampleData(indexName, 3, 1000) - waitFor { assertTrue("Segment count for [$indexName] was less than expected", getSegmentCount(indexName) > 1) } + waitFor { assertTrue("Segment count for [$indexName] was less than expected", validateSegmentCount(indexName, min = 2)) } val managedIndexConfig = getExistingManagedIndexConfig(indexName) @@ -72,7 +75,7 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() { // Third execution: Force merge operation is kicked off updateManagedIndexConfigStartTime(managedIndexConfig) - Thread.sleep(3000) + // verify we set maxNumSegments in action properties when kicking off force merge waitFor { assertEquals( @@ -84,9 +87,8 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() { // Fourth execution: Waits for force merge to complete, which will happen in this execution since index is small updateManagedIndexConfigStartTime(managedIndexConfig) - Thread.sleep(3000) - waitFor { assertEquals("Segment count for [$indexName] after force merge is incorrect", 1, getSegmentCount(indexName)) } + waitFor { assertTrue("Segment count for [$indexName] after force merge is incorrect", validateSegmentCount(indexName, min = 1, max = 1)) } // verify we reset actionproperties at end of forcemerge waitFor { assertNull("maxNumSegments was not reset", getExplainManagedIndexMetaData(indexName).actionMetaData?.actionProperties) } // index should still be readonly after force merge finishes @@ -117,7 +119,7 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() { // Add sample data to increase segment count, passing in a delay to ensure multiple segments get created insertSampleData(indexName, 3, 1000) - waitFor { assertTrue("Segment count for [$indexName] was less than expected", getSegmentCount(indexName) > 1) } + waitFor { assertTrue("Segment count for [$indexName] was less than expected", validateSegmentCount(indexName, min = 2)) } // Set index to read-only updateIndexSettings(indexName, Settings.builder().put(IndexMetaData.SETTING_BLOCKS_WRITE, true)) @@ -133,17 +135,18 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() { // Second execution: Index was already read-only and should remain so for force_merge updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { assertEquals(AttemptSetReadOnlyStep.getSuccessMessage(indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) } waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) } // Third execution: Force merge operation is kicked off updateManagedIndexConfigStartTime(managedIndexConfig) - Thread.sleep(3000) + waitFor { assertEquals(AttemptCallForceMergeStep.getSuccessMessage(indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) } // Fourth execution: Waits for force merge to complete, which will happen in this execution since index is small updateManagedIndexConfigStartTime(managedIndexConfig) - Thread.sleep(3000) - waitFor { assertEquals("Segment count for [$indexName] after force merge is incorrect", 1, getSegmentCount(indexName)) } - waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) } + waitFor { assertEquals(WaitForForceMergeStep.getSuccessMessage(indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) } + assertTrue("Segment count for [$indexName] after force merge is incorrect", validateSegmentCount(indexName, min = 1, max = 1)) + assertEquals("true", getIndexBlocksWriteSetting(indexName)) } } diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/IndexStateManagementHistoryIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/IndexStateManagementHistoryIT.kt index d72dcfc3c..050e9fe42 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/IndexStateManagementHistoryIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/IndexStateManagementHistoryIT.kt @@ -27,6 +27,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedi import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StateMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomErrorNotification import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.readonly.SetReadOnlyStep import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor import org.elasticsearch.action.search.SearchResponse import java.time.Instant @@ -91,7 +92,7 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { actionMetaData = ActionMetaData(ActionConfig.ActionType.READ_ONLY.toString(), actualHistory.actionMetaData!!.startTime, 0, false, 0, 0, null), stepMetaData = null, policyRetryInfo = PolicyRetryInfoMetaData(false, 0), - info = mapOf("message" to "Set index to read-only") + info = mapOf("message" to SetReadOnlyStep.getSuccessMessage(indexName)) ) assertEquals(expectedHistory, actualHistory) @@ -157,7 +158,7 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { actionMetaData = ActionMetaData(ActionConfig.ActionType.READ_ONLY.toString(), actualHistory.actionMetaData!!.startTime, 0, false, 0, 0, null), stepMetaData = null, policyRetryInfo = PolicyRetryInfoMetaData(false, 0), - info = mapOf("message" to "Set index to read-only") + info = mapOf("message" to SetReadOnlyStep.getSuccessMessage(indexName)) ) assertEquals(expectedHistory, actualHistory) @@ -223,7 +224,7 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { actionMetaData = ActionMetaData(ActionConfig.ActionType.READ_ONLY.toString(), actualHistory.actionMetaData!!.startTime, 0, false, 0, 0, null), stepMetaData = null, policyRetryInfo = PolicyRetryInfoMetaData(false, 0), - info = mapOf("message" to "Set index to read-only") + info = mapOf("message" to SetReadOnlyStep.getSuccessMessage(indexName)) ) assertEquals(expectedHistory, actualHistory) @@ -313,7 +314,7 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { actionMetaData = ActionMetaData(ActionConfig.ActionType.READ_ONLY.toString(), actualHistory1.actionMetaData!!.startTime, 0, false, 0, 0, null), stepMetaData = null, policyRetryInfo = PolicyRetryInfoMetaData(false, 0), - info = mapOf("message" to "Set index to read-only") + info = mapOf("message" to SetReadOnlyStep.getSuccessMessage(indexName)) ) assertEquals(expectedHistory1, actualHistory1) diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/RolloverActionIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/RolloverActionIT.kt index ab4c21f97..af0ecdd39 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/RolloverActionIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/RolloverActionIT.kt @@ -16,17 +16,16 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.action import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementRestTestCase -import com.amazon.opendistroforelasticsearch.indexstatemanagement.makeRequest import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Policy import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.State import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.RolloverActionConfig import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomErrorNotification +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.rollover.AttemptRolloverStep import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor -import org.apache.http.entity.ContentType -import org.apache.http.entity.StringEntity import org.elasticsearch.common.unit.ByteSizeUnit import org.elasticsearch.common.unit.ByteSizeValue -import org.elasticsearch.rest.RestRequest +import org.elasticsearch.common.unit.TimeValue +import org.hamcrest.core.Is.isA import org.junit.Assert import java.time.Instant import java.time.temporal.ChronoUnit @@ -36,6 +35,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { private val testIndexName = javaClass.simpleName.toLowerCase(Locale.ROOT) + @Suppress("UNCHECKED_CAST") fun `test rollover no condition`() { val aliasName = "${testIndexName}_alias" val indexNameBase = "${testIndexName}_index" @@ -65,10 +65,15 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { // Need to speed up to second execution where it will trigger the first execution of the action updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals("Index did not rollover.", mapOf("message" to "Rolled over index"), getExplainManagedIndexMetaData(firstIndex).info) } + waitFor { + val info = getExplainManagedIndexMetaData(firstIndex).info as Map + assertEquals("Index did not rollover.", AttemptRolloverStep.getSuccessMessage(firstIndex), info["message"]) + assertNull("Should not have conditions if none specified", info["conditions"]) + } Assert.assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002")) } + @Suppress("UNCHECKED_CAST") fun `test rollover multi condition byte size`() { val aliasName = "${testIndexName}_byte_alias" val indexNameBase = "${testIndexName}_index_byte" @@ -98,36 +103,48 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { // Need to speed up to second execution where it will trigger the first execution of the action updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals("Index rollover before it met the condition.", mapOf("message" to "Attempting to rollover"), getExplainManagedIndexMetaData(firstIndex).info) } - - client().makeRequest( - RestRequest.Method.PUT.toString(), - "$firstIndex/_doc/1111", - StringEntity("{ \"testkey\": \"some valueaaaaaaa\" }", ContentType.APPLICATION_JSON) - ) - client().makeRequest( - RestRequest.Method.PUT.toString(), - "$firstIndex/_doc/2222", - StringEntity("{ \"testkey1\": \"some value1\" }", ContentType.APPLICATION_JSON) - ) - client().makeRequest( - RestRequest.Method.PUT.toString(), - "$firstIndex/_doc/3333", - StringEntity("{ \"testkey2\": \"some value2\" }", ContentType.APPLICATION_JSON) - ) + waitFor { + val info = getExplainManagedIndexMetaData(firstIndex).info as Map + assertEquals("Index rollover before it met the condition.", + AttemptRolloverStep.getAttemptingMessage(firstIndex), info["message"]) + val conditions = info["conditions"] as Map + assertEquals("Did not have exclusively min size and min doc count conditions", + setOf(RolloverActionConfig.MIN_SIZE_FIELD, RolloverActionConfig.MIN_DOC_COUNT_FIELD), conditions.keys) + val minSize = conditions[RolloverActionConfig.MIN_SIZE_FIELD] as Map + val minDocCount = conditions[RolloverActionConfig.MIN_DOC_COUNT_FIELD] as Map + assertEquals("Did not have min size condition", "10b", minSize["condition"]) + assertThat("Did not have min size current", minSize["current"], isA(String::class.java)) + assertEquals("Did not have min doc count condition", 1000000, minDocCount["condition"]) + assertEquals("Did not have min doc count current", 0, minDocCount["current"]) + } + + insertSampleData(index = firstIndex, docCount = 5, delay = 0) // Need to speed up to second execution where it will trigger the first execution of the action updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals("Index did not rollover.", mapOf("message" to "Rolled over index"), getExplainManagedIndexMetaData(firstIndex).info) } + waitFor { + val info = getExplainManagedIndexMetaData(firstIndex).info as Map + assertEquals("Index did not rollover", AttemptRolloverStep.getSuccessMessage(firstIndex), info["message"]) + val conditions = info["conditions"] as Map + assertEquals("Did not have exclusively min size and min doc count conditions", + setOf(RolloverActionConfig.MIN_SIZE_FIELD, RolloverActionConfig.MIN_DOC_COUNT_FIELD), conditions.keys) + val minSize = conditions[RolloverActionConfig.MIN_SIZE_FIELD] as Map + val minDocCount = conditions[RolloverActionConfig.MIN_DOC_COUNT_FIELD] as Map + assertEquals("Did not have min size condition", "10b", minSize["condition"]) + assertThat("Did not have min size current", minSize["current"], isA(String::class.java)) + assertEquals("Did not have min doc count condition", 1000000, minDocCount["condition"]) + assertEquals("Did not have min doc count current", 5, minDocCount["current"]) + } Assert.assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002")) } + @Suppress("UNCHECKED_CAST") fun `test rollover multi condition doc size`() { val aliasName = "${testIndexName}_doc_alias" val indexNameBase = "${testIndexName}_index_doc" val firstIndex = "$indexNameBase-1" val policyID = "${testIndexName}_testPolicyName_doc_1" - val actionConfig = RolloverActionConfig(ByteSizeValue(10, ByteSizeUnit.TB), 3, null, 0) + val actionConfig = RolloverActionConfig(null, 3, TimeValue.timeValueHours(48), 0) val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) val policy = Policy( id = policyID, @@ -151,27 +168,38 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { // Need to speed up to second execution where it will trigger the first execution of the action updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals("Index rollover before it met the condition.", mapOf("message" to "Attempting to rollover"), getExplainManagedIndexMetaData(firstIndex).info) } - - client().makeRequest( - RestRequest.Method.PUT.toString(), - "$firstIndex/_doc/1111", - StringEntity("{ \"testkey\": \"some value\" }", ContentType.APPLICATION_JSON) - ) - client().makeRequest( - RestRequest.Method.PUT.toString(), - "$firstIndex/_doc/2222", - StringEntity("{ \"testkey1\": \"some value1\" }", ContentType.APPLICATION_JSON) - ) - client().makeRequest( - RestRequest.Method.PUT.toString(), - "$firstIndex/_doc/3333", - StringEntity("{ \"testkey2\": \"some value2\" }", ContentType.APPLICATION_JSON) - ) + waitFor { + val info = getExplainManagedIndexMetaData(firstIndex).info as Map + assertEquals("Index rollover before it met the condition.", + AttemptRolloverStep.getAttemptingMessage(firstIndex), info["message"]) + val conditions = info["conditions"] as Map + assertEquals("Did not have exclusively min age and min doc count conditions", + setOf(RolloverActionConfig.MIN_INDEX_AGE_FIELD, RolloverActionConfig.MIN_DOC_COUNT_FIELD), conditions.keys) + val minAge = conditions[RolloverActionConfig.MIN_INDEX_AGE_FIELD] as Map + val minDocCount = conditions[RolloverActionConfig.MIN_DOC_COUNT_FIELD] as Map + assertEquals("Did not have min age condition", "2d", minAge["condition"]) + assertThat("Did not have min age current", minAge["current"], isA(String::class.java)) + assertEquals("Did not have min doc count condition", 3, minDocCount["condition"]) + assertEquals("Did not have min doc count current", 0, minDocCount["current"]) + } + + insertSampleData(index = firstIndex, docCount = 5, delay = 0) // Need to speed up to second execution where it will trigger the first execution of the action updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals("Index did not rollover.", mapOf("message" to "Rolled over index"), getExplainManagedIndexMetaData(firstIndex).info) } + waitFor { + val info = getExplainManagedIndexMetaData(firstIndex).info as Map + assertEquals("Index did not rollover", AttemptRolloverStep.getSuccessMessage(firstIndex), info["message"]) + val conditions = info["conditions"] as Map + assertEquals("Did not have exclusively min age and min doc count conditions", + setOf(RolloverActionConfig.MIN_INDEX_AGE_FIELD, RolloverActionConfig.MIN_DOC_COUNT_FIELD), conditions.keys) + val minAge = conditions[RolloverActionConfig.MIN_INDEX_AGE_FIELD] as Map + val minDocCount = conditions[RolloverActionConfig.MIN_DOC_COUNT_FIELD] as Map + assertEquals("Did not have min age condition", "2d", minAge["condition"]) + assertThat("Did not have min age current", minAge["current"], isA(String::class.java)) + assertEquals("Did not have min doc count condition", 3, minDocCount["condition"]) + assertEquals("Did not have min doc count current", 5, minDocCount["current"]) + } Assert.assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002")) } } diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/TransitionActionIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/TransitionActionIT.kt new file mode 100644 index 000000000..ff684f682 --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/TransitionActionIT.kt @@ -0,0 +1,78 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexstatemanagement.action + +import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementRestTestCase +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Conditions +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Policy +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.State +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Transition +import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomErrorNotification +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.transition.AttemptTransitionStep +import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor +import java.time.Instant +import java.time.temporal.ChronoUnit +import java.util.Locale + +class TransitionActionIT : IndexStateManagementRestTestCase() { + + private val testIndexName = javaClass.simpleName.toLowerCase(Locale.ROOT) + + fun `test doc count condition`() { + val indexName = "${testIndexName}_index_1" + val policyID = "${testIndexName}_testPolicyName_1" + val secondStateName = "second" + val states = listOf( + State("first", listOf(), listOf(Transition(secondStateName, Conditions(docCount = 5L)))), + State(secondStateName, listOf(), listOf()) + ) + + val policy = Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states + ) + + createPolicy(policy, policyID) + createIndex(indexName, policyID) + + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + + // Initializing the policy/metadata + updateManagedIndexConfigStartTime(managedIndexConfig) + + waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } + + // Evaluating transition conditions for first time + updateManagedIndexConfigStartTime(managedIndexConfig) + + // Should not have evaluated to true + waitFor { assertEquals(AttemptTransitionStep.getEvaluatingMessage(indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) } + + // Add 6 documents (>5) + insertSampleData(indexName, 6) + + // Evaluating transition conditions for second time + updateManagedIndexConfigStartTime(managedIndexConfig) + + // Should have evaluated to true + waitFor { assertEquals(AttemptTransitionStep.getSuccessMessage(indexName, secondStateName), getExplainManagedIndexMetaData(indexName).info?.get("message")) } + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt index bd25c24dc..349245cdb 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt @@ -28,6 +28,8 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.F import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.RolloverActionConfig import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomErrorNotification import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.forcemerge.WaitForForceMergeStep +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.rollover.AttemptRolloverStep import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.xcontent.XContentType @@ -195,16 +197,16 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() { updateClusterSetting(ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED.key, "true") // Confirm job was re-enabled - val enableddManagedIndexConfig: ManagedIndexConfig = waitFor { + val enabledManagedIndexConfig: ManagedIndexConfig = waitFor { val config = getExistingManagedIndexConfig(indexName) assertEquals("ManagedIndexConfig was not re-enabled", true, config.enabled) config } // Speed up to next execution where the job should be rescheduled and the index rolled over - updateManagedIndexConfigStartTime(enableddManagedIndexConfig) + updateManagedIndexConfigStartTime(enabledManagedIndexConfig) - waitFor { assertEquals("Rolled over index", getExplainManagedIndexMetaData(indexName).info?.get("message")) } + waitFor { assertEquals(AttemptRolloverStep.getSuccessMessage(indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) } } fun `test not disabling ism on unsafe step`() { @@ -239,7 +241,7 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() { // Add sample data to increase segment count, passing in a delay to ensure multiple segments get created insertSampleData(indexName, 3, 1000) - waitFor { assertTrue("Segment count for [$indexName] was less than expected", getSegmentCount(indexName) > 1) } + waitFor { assertTrue("Segment count for [$indexName] was less than expected", validateSegmentCount(indexName, min = 2)) } val managedIndexConfig = getExistingManagedIndexConfig(indexName) @@ -258,7 +260,6 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() { // Third execution: Force merge operation is kicked off updateManagedIndexConfigStartTime(managedIndexConfig) - Thread.sleep(3000) // Verify maxNumSegments is set in action properties when kicking off force merge waitFor { @@ -274,19 +275,23 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() { // Fourth execution: WaitForForceMergeStep is not safe to disable on, so the job should not disable yet updateManagedIndexConfigStartTime(managedIndexConfig) - Thread.sleep(3000) + + // Confirm we successfully executed the WaitForForceMergeStep + waitFor { assertEquals(WaitForForceMergeStep.getSuccessMessage(indexName), + getExplainManagedIndexMetaData(indexName).info?.get("message")) } // Confirm job was not disabled assertEquals("ManagedIndexConfig was disabled early", true, getExistingManagedIndexConfig(indexName).enabled) // Validate segments were merged - waitFor { assertEquals("Segment count for [$indexName] after force merge is incorrect", 1, getSegmentCount(indexName)) } + waitFor { assertTrue("Segment count for [$indexName] after force merge is incorrect", validateSegmentCount(indexName, min = 1, max = 1)) } // Fifth execution: Attempt transition, which is safe to disable on, so job should be disabled updateManagedIndexConfigStartTime(managedIndexConfig) // Explain API info should still be that of the last executed Step - waitFor { assertEquals("Force merge completed", getExplainManagedIndexMetaData(indexName).info?.get("message")) } + waitFor { assertEquals(WaitForForceMergeStep.getSuccessMessage(indexName), + getExplainManagedIndexMetaData(indexName).info?.get("message")) } // Confirm job was disabled val disabledManagedIndexConfig: ManagedIndexConfig = waitFor { @@ -299,7 +304,7 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() { updateManagedIndexConfigStartTime(disabledManagedIndexConfig) waitFor { - val expectedInfoString = mapOf("message" to "Force merge completed").toString() + val expectedInfoString = mapOf("message" to WaitForForceMergeStep.getSuccessMessage(indexName)).toString() assertPredicatesOnMetaData( listOf( indexName to listOf( diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/XContentTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/XContentTests.kt index d26b450b7..e47dcb400 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/XContentTests.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/XContentTests.kt @@ -154,6 +154,27 @@ class XContentTests : ESTestCase() { assertEquals("Round tripping ManagedIndexConfig doesn't work with id and version", configThree, parsedConfigThree) } + fun `test managed index metadata parsing`() { + val metadata = ManagedIndexMetaData( + index = randomAlphaOfLength(10), + indexUuid = randomAlphaOfLength(10), + policyID = randomAlphaOfLength(10), + policySeqNo = randomNonNegativeLong(), + policyPrimaryTerm = randomNonNegativeLong(), + policyCompleted = null, + rolledOver = null, + transitionTo = randomAlphaOfLength(10), + stateMetaData = null, + actionMetaData = null, + stepMetaData = null, + policyRetryInfo = null, + info = null + ) + val metadataString = metadata.toJsonString() + val parsedMetaData = ManagedIndexMetaData.parse(parser(metadataString)) + assertEquals("Round tripping ManagedIndexMetaData doesn't work", metadata, parsedMetaData) + } + fun `test change policy parsing`() { val changePolicy = randomChangePolicy() diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt index dd4e6c22b..3df3876e1 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt @@ -34,6 +34,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomReplicaC import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomState import com.amazon.opendistroforelasticsearch.indexstatemanagement.resthandler.RestChangePolicyAction.Companion.INDEX_NOT_MANAGED import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.rollover.AttemptRolloverStep import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.FAILED_INDICES import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.FAILURES import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.UPDATED_INDICES @@ -531,7 +532,8 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { // verify we are in rollover and have not completed it yet waitFor { assertEquals(ActionConfig.ActionType.ROLLOVER.type, getExplainManagedIndexMetaData(indexName).actionMetaData?.name) - assertEquals("Attempting to rollover", getExplainManagedIndexMetaData(indexName).info?.get("message")) + assertEquals(AttemptRolloverStep.getAttemptingMessage(indexName), + getExplainManagedIndexMetaData(indexName).info?.get("message")) } val newStateWithReadOnlyAction = randomState(name = stateWithReadOnlyAction.name, actions = listOf(actionConfig.copy(minDocs = 5))) @@ -564,7 +566,8 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { // which should now actually rollover because 5 docs is less than 10 docs updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals("Rolled over index", getExplainManagedIndexMetaData(indexName).info?.get("message")) } + waitFor { assertEquals(AttemptRolloverStep.getSuccessMessage(indexName), + getExplainManagedIndexMetaData(indexName).info?.get("message")) } } fun `test changing failed init policy`() { diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestRetryFailedManagedIndexActionIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestRetryFailedManagedIndexActionIT.kt index c5af7c889..94a6d0cd3 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestRetryFailedManagedIndexActionIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestRetryFailedManagedIndexActionIT.kt @@ -254,7 +254,7 @@ class RestRetryFailedManagedIndexActionIT : IndexStateManagementRestTestCase() { ) } - // speed up to execute first action, readonly + // speed up to execute set read only force merge step updateManagedIndexConfigStartTime(managedIndexConfig) waitFor { @@ -271,7 +271,7 @@ class RestRetryFailedManagedIndexActionIT : IndexStateManagementRestTestCase() { // close the index to cause next execution to fail closeIndex(indexName) - // speed up to execute first action and fail, call force merge + // speed up to execute attempt call force merge step updateManagedIndexConfigStartTime(managedIndexConfig) // verify failed and save the startTime diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/runner/ManagedIndexRunnerIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/runner/ManagedIndexRunnerIT.kt index 4c9ce902b..9aa106d9f 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/runner/ManagedIndexRunnerIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/runner/ManagedIndexRunnerIT.kt @@ -29,6 +29,9 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomReadWrit import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomState import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomTransition import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.readonly.SetReadOnlyStep +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.readwrite.SetReadWriteStep +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.transition.AttemptTransitionStep import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor import com.amazon.opendistroforelasticsearch.jobscheduler.spi.schedule.IntervalSchedule import java.time.Instant @@ -130,19 +133,19 @@ class ManagedIndexRunnerIT : IndexStateManagementRestTestCase() { // speed up to first execution that should set index to read only updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals("Set index to read-only", getExplainManagedIndexMetaData(indexName).info?.get("message")) } + waitFor { assertEquals(SetReadOnlyStep.getSuccessMessage(indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) } // speed up to second execution that should transition to second_state updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals("Transitioning to second_state", getExplainManagedIndexMetaData(indexName).info?.get("message")) } + waitFor { assertEquals(AttemptTransitionStep.getSuccessMessage(indexName, secondState.name), getExplainManagedIndexMetaData(indexName).info?.get("message")) } // speed up to third execution that should set index back to read write updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals("Set index to read-write", getExplainManagedIndexMetaData(indexName).info?.get("message")) } + waitFor { assertEquals(SetReadWriteStep.getSuccessMessage(indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) } // speed up to fourth execution that should transition to first_state updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals("Transitioning to first_state", getExplainManagedIndexMetaData(indexName).info?.get("message")) } + waitFor { assertEquals(AttemptTransitionStep.getSuccessMessage(indexName, firstState.name), getExplainManagedIndexMetaData(indexName).info?.get("message")) } // remove read_only from the allowlist val allowedActions = ActionConfig.ActionType.values().toList() diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptCloseStepTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptCloseStepTests.kt index d6886cdbe..3f6927f4e 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptCloseStepTests.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptCloseStepTests.kt @@ -32,6 +32,7 @@ import org.elasticsearch.client.IndicesAdminClient import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.snapshots.SnapshotInProgressException import org.elasticsearch.test.ESTestCase +import org.elasticsearch.transport.RemoteTransportException import kotlin.IllegalArgumentException class AttemptCloseStepTests : ESTestCase() { @@ -76,7 +77,6 @@ class AttemptCloseStepTests : ESTestCase() { val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData) attemptCloseStep.execute() val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) - logger.info(updatedManagedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) } } @@ -95,6 +95,35 @@ class AttemptCloseStepTests : ESTestCase() { } } + fun `test close step remote transport snapshot in progress exception`() { + val exception = RemoteTransportException("rte", SnapshotInProgressException("nested")) + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val closeActionConfig = CloseActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData) + attemptCloseStep.execute() + val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not CONDITION_NOT_MET", Step.StepStatus.CONDITION_NOT_MET, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test close step remote transport exception`() { + val exception = RemoteTransportException("rte", IllegalArgumentException("nested")) + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val closeActionConfig = CloseActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData) + attemptCloseStep.execute() + val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals("Did not get cause from nested exception", "nested", updatedManagedIndexMetaData.info!!["cause"]) + } + } + private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient } private fun getAdminClient(indicesAdminClient: IndicesAdminClient): AdminClient = mock { on { indices() } doReturn indicesAdminClient } private fun getIndicesAdminClient(closeIndexResponse: AcknowledgedResponse?, exception: Exception?): IndicesAdminClient { diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptOpenStepTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptOpenStepTests.kt new file mode 100644 index 000000000..05b105886 --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptOpenStepTests.kt @@ -0,0 +1,95 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexstatemanagement.step + +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.OpenActionConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.open.AttemptOpenStep +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.doAnswer +import com.nhaarman.mockitokotlin2.doReturn +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import kotlinx.coroutines.runBlocking +import org.elasticsearch.action.ActionListener +import org.elasticsearch.action.admin.indices.open.OpenIndexResponse +import org.elasticsearch.client.AdminClient +import org.elasticsearch.client.Client +import org.elasticsearch.client.IndicesAdminClient +import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.test.ESTestCase +import org.elasticsearch.transport.RemoteTransportException + +class AttemptOpenStepTests : ESTestCase() { + + private val clusterService: ClusterService = mock() + + fun `test open step sets step status to failed when not acknowledged`() { + val openIndexResponse = OpenIndexResponse(false, false) + val client = getClient(getAdminClient(getIndicesAdminClient(openIndexResponse, null))) + + runBlocking { + val openActionConfig = OpenActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val attemptOpenStep = AttemptOpenStep(clusterService, client, openActionConfig, managedIndexMetaData) + attemptOpenStep.execute() + val updatedManagedIndexMetaData = attemptOpenStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test open step sets step status to failed when error thrown`() { + val exception = IllegalArgumentException("example") + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val openActionConfig = OpenActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val attemptOpenStep = AttemptOpenStep(clusterService, client, openActionConfig, managedIndexMetaData) + attemptOpenStep.execute() + val updatedManagedIndexMetaData = attemptOpenStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test open step remote transport exception`() { + val exception = RemoteTransportException("rte", IllegalArgumentException("nested")) + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val openActionConfig = OpenActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val attemptOpenStep = AttemptOpenStep(clusterService, client, openActionConfig, managedIndexMetaData) + attemptOpenStep.execute() + val updatedManagedIndexMetaData = attemptOpenStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals("Did not get cause from nested exception", "nested", updatedManagedIndexMetaData.info!!["cause"]) + } + } + + private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient } + private fun getAdminClient(indicesAdminClient: IndicesAdminClient): AdminClient = mock { on { indices() } doReturn indicesAdminClient } + private fun getIndicesAdminClient(openIndexResponse: OpenIndexResponse?, exception: Exception?): IndicesAdminClient { + assertTrue("Must provide one and only one response or exception", (openIndexResponse != null).xor(exception != null)) + return mock { + doAnswer { invocationOnMock -> + val listener = invocationOnMock.getArgument>(1) + if (openIndexResponse != null) listener.onResponse(openIndexResponse) + else listener.onFailure(exception) + }.whenever(this.mock).open(any(), any()) + } + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptSetReplicaCountStepTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptSetReplicaCountStepTests.kt new file mode 100644 index 000000000..87764bfbd --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptSetReplicaCountStepTests.kt @@ -0,0 +1,95 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexstatemanagement.step + +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ReplicaCountActionConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.replicacount.AttemptSetReplicaCountStep +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.doAnswer +import com.nhaarman.mockitokotlin2.doReturn +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import kotlinx.coroutines.runBlocking +import org.elasticsearch.action.ActionListener +import org.elasticsearch.action.support.master.AcknowledgedResponse +import org.elasticsearch.client.AdminClient +import org.elasticsearch.client.Client +import org.elasticsearch.client.IndicesAdminClient +import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.test.ESTestCase +import org.elasticsearch.transport.RemoteTransportException + +class AttemptSetReplicaCountStepTests : ESTestCase() { + + private val clusterService: ClusterService = mock() + + fun `test replica step sets step status to failed when not acknowledged`() { + val replicaCountResponse = AcknowledgedResponse(false) + val client = getClient(getAdminClient(getIndicesAdminClient(replicaCountResponse, null))) + + runBlocking { + val replicaCountActionConfig = ReplicaCountActionConfig(2, 0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val replicaCountStep = AttemptSetReplicaCountStep(clusterService, client, replicaCountActionConfig, managedIndexMetaData) + replicaCountStep.execute() + val updatedManagedIndexMetaData = replicaCountStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test replica step sets step status to failed when error thrown`() { + val exception = IllegalArgumentException("example") + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val replicaCountActionConfig = ReplicaCountActionConfig(2, 0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val replicaCountStep = AttemptSetReplicaCountStep(clusterService, client, replicaCountActionConfig, managedIndexMetaData) + replicaCountStep.execute() + val updatedManagedIndexMetaData = replicaCountStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test replica step sets step status to failed when remote transport error thrown`() { + val exception = RemoteTransportException("rte", IllegalArgumentException("nested")) + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val replicaCountActionConfig = ReplicaCountActionConfig(2, 0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val replicaCountStep = AttemptSetReplicaCountStep(clusterService, client, replicaCountActionConfig, managedIndexMetaData) + replicaCountStep.execute() + val updatedManagedIndexMetaData = replicaCountStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals("Did not get cause from nested exception", "nested", updatedManagedIndexMetaData.info!!["cause"]) + } + } + + private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient } + private fun getAdminClient(indicesAdminClient: IndicesAdminClient): AdminClient = mock { on { indices() } doReturn indicesAdminClient } + private fun getIndicesAdminClient(replicaResponse: AcknowledgedResponse?, exception: Exception?): IndicesAdminClient { + assertTrue("Must provide one and only one response or exception", (replicaResponse != null).xor(exception != null)) + return mock { + doAnswer { invocationOnMock -> + val listener = invocationOnMock.getArgument>(1) + if (replicaResponse != null) listener.onResponse(replicaResponse) + else listener.onFailure(exception) + }.whenever(this.mock).updateSettings(any(), any()) + } + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptTransitionStepTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptTransitionStepTests.kt new file mode 100644 index 000000000..28b1d7eb5 --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/AttemptTransitionStepTests.kt @@ -0,0 +1,118 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexstatemanagement.step + +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Conditions +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Transition +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.TransitionsActionConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.transition.AttemptTransitionStep +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.doAnswer +import com.nhaarman.mockitokotlin2.doReturn +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import kotlinx.coroutines.runBlocking +import org.elasticsearch.action.ActionListener +import org.elasticsearch.action.admin.indices.stats.CommonStats +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse +import org.elasticsearch.client.AdminClient +import org.elasticsearch.client.Client +import org.elasticsearch.client.IndicesAdminClient +import org.elasticsearch.cluster.ClusterState +import org.elasticsearch.cluster.metadata.IndexMetaData +import org.elasticsearch.cluster.metadata.MetaData +import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.index.shard.DocsStats +import org.elasticsearch.rest.RestStatus +import org.elasticsearch.test.ESTestCase +import org.elasticsearch.transport.RemoteTransportException + +class AttemptTransitionStepTests : ESTestCase() { + + private val indexMetadata: IndexMetaData = mock() + private val metadata: MetaData = mock { on { index(any()) } doReturn indexMetadata } + private val clusterState: ClusterState = mock { on { metaData() } doReturn metadata } + private val clusterService: ClusterService = mock { on { state() } doReturn clusterState } + + private val docsStats: DocsStats = mock() + private val primaries: CommonStats = mock { on { getDocs() } doReturn docsStats } + private val statsResponse: IndicesStatsResponse = mock { on { primaries } doReturn primaries } + + fun `test stats response not OK`() { + whenever(indexMetadata.creationDate).doReturn(5L) + whenever(statsResponse.status).doReturn(RestStatus.INTERNAL_SERVER_ERROR) + whenever(statsResponse.shardFailures).doReturn(IndicesStatsResponse.EMPTY) + whenever(docsStats.count).doReturn(6L) + whenever(docsStats.totalSizeInBytes).doReturn(2) + val client = getClient(getAdminClient(getIndicesAdminClient(statsResponse, null))) + + runBlocking { + val config = TransitionsActionConfig(listOf(Transition("some_state", Conditions(docCount = 5L)))) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val step = AttemptTransitionStep(clusterService, client, config, managedIndexMetaData) + step.execute() + val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals("Did not get correct failed message", AttemptTransitionStep.getFailedStatsMessage("test"), updatedManagedIndexMetaData.info!!["message"]) + } + } + + fun `test transitions fails on exception`() { + whenever(indexMetadata.creationDate).doReturn(5L) + val exception = IllegalArgumentException("example") + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val config = TransitionsActionConfig(listOf(Transition("some_state", Conditions(docCount = 5L)))) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val step = AttemptTransitionStep(clusterService, client, config, managedIndexMetaData) + step.execute() + val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals("Did not get cause from nested exception", "example", updatedManagedIndexMetaData.info!!["cause"]) + } + } + + fun `test transitions remote transport exception`() { + whenever(indexMetadata.creationDate).doReturn(5L) + val exception = RemoteTransportException("rte", IllegalArgumentException("nested")) + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val config = TransitionsActionConfig(listOf(Transition("some_state", Conditions(docCount = 5L)))) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val step = AttemptTransitionStep(clusterService, client, config, managedIndexMetaData) + step.execute() + val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals("Did not get cause from nested exception", "nested", updatedManagedIndexMetaData.info!!["cause"]) + } + } + + private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient } + private fun getAdminClient(indicesAdminClient: IndicesAdminClient): AdminClient = mock { on { indices() } doReturn indicesAdminClient } + private fun getIndicesAdminClient(statsResponse: IndicesStatsResponse?, exception: Exception?): IndicesAdminClient { + assertTrue("Must provide one and only one response or exception", (statsResponse != null).xor(exception != null)) + return mock { + doAnswer { invocationOnMock -> + val listener = invocationOnMock.getArgument>(1) + if (statsResponse != null) listener.onResponse(statsResponse) + else listener.onFailure(exception) + }.whenever(this.mock).stats(any(), any()) + } + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/SetReadOnlyStepTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/SetReadOnlyStepTests.kt new file mode 100644 index 000000000..1856c6317 --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/SetReadOnlyStepTests.kt @@ -0,0 +1,95 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexstatemanagement.step + +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ReadOnlyActionConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.readonly.SetReadOnlyStep +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.doAnswer +import com.nhaarman.mockitokotlin2.doReturn +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import kotlinx.coroutines.runBlocking +import org.elasticsearch.action.ActionListener +import org.elasticsearch.action.support.master.AcknowledgedResponse +import org.elasticsearch.client.AdminClient +import org.elasticsearch.client.Client +import org.elasticsearch.client.IndicesAdminClient +import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.test.ESTestCase +import org.elasticsearch.transport.RemoteTransportException + +class SetReadOnlyStepTests : ESTestCase() { + + private val clusterService: ClusterService = mock() + + fun `test read only step sets step status to failed when not acknowledged`() { + val setReadOnlyResponse = AcknowledgedResponse(false) + val client = getClient(getAdminClient(getIndicesAdminClient(setReadOnlyResponse, null))) + + runBlocking { + val readOnlyActionConfig = ReadOnlyActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val setReadOnlyStep = SetReadOnlyStep(clusterService, client, readOnlyActionConfig, managedIndexMetaData) + setReadOnlyStep.execute() + val updatedManagedIndexMetaData = setReadOnlyStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test read only step sets step status to failed when error thrown`() { + val exception = IllegalArgumentException("example") + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val readOnlyActionConfig = ReadOnlyActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val setReadOnlyStep = SetReadOnlyStep(clusterService, client, readOnlyActionConfig, managedIndexMetaData) + setReadOnlyStep.execute() + val updatedManagedIndexMetaData = setReadOnlyStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test read only step sets step status to failed when remote transport error thrown`() { + val exception = RemoteTransportException("rte", IllegalArgumentException("nested")) + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val readOnlyActionConfig = ReadOnlyActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val setReadOnlyStep = SetReadOnlyStep(clusterService, client, readOnlyActionConfig, managedIndexMetaData) + setReadOnlyStep.execute() + val updatedManagedIndexMetaData = setReadOnlyStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals("Did not get cause from nested exception", "nested", updatedManagedIndexMetaData.info!!["cause"]) + } + } + + private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient } + private fun getAdminClient(indicesAdminClient: IndicesAdminClient): AdminClient = mock { on { indices() } doReturn indicesAdminClient } + private fun getIndicesAdminClient(setReadOnlyResponse: AcknowledgedResponse?, exception: Exception?): IndicesAdminClient { + assertTrue("Must provide one and only one response or exception", (setReadOnlyResponse != null).xor(exception != null)) + return mock { + doAnswer { invocationOnMock -> + val listener = invocationOnMock.getArgument>(1) + if (setReadOnlyResponse != null) listener.onResponse(setReadOnlyResponse) + else listener.onFailure(exception) + }.whenever(this.mock).updateSettings(any(), any()) + } + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/SetReadWriteStepTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/SetReadWriteStepTests.kt new file mode 100644 index 000000000..f917494b9 --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/SetReadWriteStepTests.kt @@ -0,0 +1,95 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexstatemanagement.step + +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ReadWriteActionConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.readwrite.SetReadWriteStep +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.doAnswer +import com.nhaarman.mockitokotlin2.doReturn +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import kotlinx.coroutines.runBlocking +import org.elasticsearch.action.ActionListener +import org.elasticsearch.action.support.master.AcknowledgedResponse +import org.elasticsearch.client.AdminClient +import org.elasticsearch.client.Client +import org.elasticsearch.client.IndicesAdminClient +import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.test.ESTestCase +import org.elasticsearch.transport.RemoteTransportException + +class SetReadWriteStepTests : ESTestCase() { + + private val clusterService: ClusterService = mock() + + fun `test read write step sets step status to failed when not acknowledged`() { + val setReadWriteResponse = AcknowledgedResponse(false) + val client = getClient(getAdminClient(getIndicesAdminClient(setReadWriteResponse, null))) + + runBlocking { + val readWriteActionConfig = ReadWriteActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val setReadWriteStep = SetReadWriteStep(clusterService, client, readWriteActionConfig, managedIndexMetaData) + setReadWriteStep.execute() + val updatedManagedIndexMetaData = setReadWriteStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test read write step sets step status to failed when error thrown`() { + val exception = IllegalArgumentException("example") + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val readWriteActionConfig = ReadWriteActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val setReadWriteStep = SetReadWriteStep(clusterService, client, readWriteActionConfig, managedIndexMetaData) + setReadWriteStep.execute() + val updatedManagedIndexMetaData = setReadWriteStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + } + } + + fun `test read write step sets step status to failed when remote transport error thrown`() { + val exception = RemoteTransportException("rte", IllegalArgumentException("nested")) + val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) + + runBlocking { + val readWriteActionConfig = ReadWriteActionConfig(0) + val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) + val setReadWriteStep = SetReadWriteStep(clusterService, client, readWriteActionConfig, managedIndexMetaData) + setReadWriteStep.execute() + val updatedManagedIndexMetaData = setReadWriteStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals("Did not get cause from nested exception", "nested", updatedManagedIndexMetaData.info!!["cause"]) + } + } + + private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient } + private fun getAdminClient(indicesAdminClient: IndicesAdminClient): AdminClient = mock { on { indices() } doReturn indicesAdminClient } + private fun getIndicesAdminClient(setReadWriteResponse: AcknowledgedResponse?, exception: Exception?): IndicesAdminClient { + assertTrue("Must provide one and only one response or exception", (setReadWriteResponse != null).xor(exception != null)) + return mock { + doAnswer { invocationOnMock -> + val listener = invocationOnMock.getArgument>(1) + if (setReadWriteResponse != null) listener.onResponse(setReadWriteResponse) + else listener.onFailure(exception) + }.whenever(this.mock).updateSettings(any(), any()) + } + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtilsTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtilsTests.kt index 2ae64bc00..0b253da55 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtilsTests.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtilsTests.kt @@ -182,49 +182,45 @@ class ManagedIndexUtilsTests : ESTestCase() { fun `test rollover action config evaluate conditions`() { val noConditionsConfig = RolloverActionConfig(minSize = null, minDocs = null, minAge = null, index = 0) assertTrue("No conditions should always pass", noConditionsConfig - .evaluateConditions(indexCreationDate = Instant.ofEpochMilli(-1L), numDocs = 0, indexSize = ByteSizeValue(0))) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(0), numDocs = 0, indexSize = ByteSizeValue(0))) assertTrue("No conditions should always pass", noConditionsConfig - .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 5, indexSize = ByteSizeValue(5))) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(100), numDocs = 5, indexSize = ByteSizeValue(5))) assertTrue("No conditions should always pass", noConditionsConfig - .evaluateConditions(indexCreationDate = Instant.now().minusSeconds(600), numDocs = 5, indexSize = ByteSizeValue(5))) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(-6000), numDocs = 5, indexSize = ByteSizeValue(5))) assertTrue("No conditions should always pass", noConditionsConfig - .evaluateConditions(indexCreationDate = Instant.now().plusSeconds(600), numDocs = 5, indexSize = ByteSizeValue(5))) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(6000), numDocs = 5, indexSize = ByteSizeValue(5))) val minSizeConfig = RolloverActionConfig(minSize = ByteSizeValue(5), minDocs = null, minAge = null, index = 0) assertFalse("Less bytes should not pass", minSizeConfig - .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(1000), numDocs = 0, indexSize = ByteSizeValue.ZERO)) assertTrue("Equal bytes should pass", minSizeConfig - .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue(5))) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(1000), numDocs = 0, indexSize = ByteSizeValue(5))) assertTrue("More bytes should pass", minSizeConfig - .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue(10))) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(1000), numDocs = 0, indexSize = ByteSizeValue(10))) val minDocsConfig = RolloverActionConfig(minSize = null, minDocs = 5, minAge = null, index = 0) assertFalse("Less docs should not pass", minDocsConfig - .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(1000), numDocs = 0, indexSize = ByteSizeValue.ZERO)) assertTrue("Equal docs should pass", minDocsConfig - .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 5, indexSize = ByteSizeValue.ZERO)) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(1000), numDocs = 5, indexSize = ByteSizeValue.ZERO)) assertTrue("More docs should pass", minDocsConfig - .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 10, indexSize = ByteSizeValue.ZERO)) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(1000), numDocs = 10, indexSize = ByteSizeValue.ZERO)) val minAgeConfig = RolloverActionConfig(minSize = null, minDocs = null, minAge = TimeValue.timeValueSeconds(5), index = 0) assertFalse("Index age that is too young should not pass", minAgeConfig - .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(1000), numDocs = 0, indexSize = ByteSizeValue.ZERO)) assertTrue("Index age that is older should pass", minAgeConfig - .evaluateConditions(indexCreationDate = Instant.now().minusSeconds(10), numDocs = 0, indexSize = ByteSizeValue.ZERO)) - assertFalse("Index age that is -1L should not pass", minAgeConfig - .evaluateConditions(indexCreationDate = Instant.ofEpochMilli(-1L), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(10000), numDocs = 0, indexSize = ByteSizeValue.ZERO)) val multiConfig = RolloverActionConfig(minSize = ByteSizeValue(1), minDocs = 1, minAge = TimeValue.timeValueSeconds(5), index = 0) assertFalse("No conditions met should not pass", multiConfig - .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue.ZERO)) - assertFalse("Multi condition, index age -1L should not pass", multiConfig - .evaluateConditions(indexCreationDate = Instant.ofEpochMilli(-1L), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(0), numDocs = 0, indexSize = ByteSizeValue.ZERO)) assertTrue("Multi condition, age should pass", multiConfig - .evaluateConditions(indexCreationDate = Instant.now().minusSeconds(10), numDocs = 0, indexSize = ByteSizeValue.ZERO)) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(10000), numDocs = 0, indexSize = ByteSizeValue.ZERO)) assertTrue("Multi condition, docs should pass", multiConfig - .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 2, indexSize = ByteSizeValue.ZERO)) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(0), numDocs = 2, indexSize = ByteSizeValue.ZERO)) assertTrue("Multi condition, size should pass", multiConfig - .evaluateConditions(indexCreationDate = Instant.now(), numDocs = 0, indexSize = ByteSizeValue(2))) + .evaluateConditions(indexAgeTimeValue = TimeValue.timeValueMillis(0), numDocs = 0, indexSize = ByteSizeValue(2))) } fun `test transition evaluate conditions`() {