diff --git a/build.gradle b/build.gradle index 7028c4431..698d2be56 100644 --- a/build.gradle +++ b/build.gradle @@ -307,7 +307,6 @@ integTest { exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/IndexStateManagementHistoryIT.class' exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/NotificationActionIT.class' exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/OpenActionIT.class' - exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/ReadWriteActionIT.class' exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/ReplicaCountActionIT.class' exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.class' exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/RollupActionIT.class' diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt index 2463de695..8d5be5f7a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt @@ -11,6 +11,7 @@ import org.opensearch.common.xcontent.XContentParserUtils import org.opensearch.indexmanagement.indexstatemanagement.action.CloseActionParser import org.opensearch.indexmanagement.indexstatemanagement.action.DeleteActionParser import org.opensearch.indexmanagement.indexstatemanagement.action.ReadOnlyActionParser +import org.opensearch.indexmanagement.indexstatemanagement.action.ReadWriteActionParser import org.opensearch.indexmanagement.spi.indexstatemanagement.Action import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry @@ -26,7 +27,8 @@ class ISMActionsParser private constructor() { val parsers = mutableListOf( CloseActionParser(), DeleteActionParser(), - ReadOnlyActionParser() + ReadOnlyActionParser(), + ReadWriteActionParser() ) fun addParser(parser: ActionParser) { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ReadWriteAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ReadWriteAction.kt index b9b2fdff9..3da520302 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ReadWriteAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ReadWriteAction.kt @@ -5,6 +5,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.action +import org.opensearch.indexmanagement.indexstatemanagement.step.readwrite.SetReadWriteStep import org.opensearch.indexmanagement.spi.indexstatemanagement.Action import org.opensearch.indexmanagement.spi.indexstatemanagement.Step import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext @@ -17,11 +18,12 @@ class ReadWriteAction( const val name = "read_write" } + private val setReadWriteStep = SetReadWriteStep() + private val steps = listOf(setReadWriteStep) + override fun getStepToExecute(context: StepContext): Step { - TODO("Not yet implemented") + return setReadWriteStep } - override fun getSteps(): List { - TODO("Not yet implemented") - } + override fun getSteps(): List = steps } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ReadWriteActionParser.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ReadWriteActionParser.kt index d40d4fe09..1093c028c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ReadWriteActionParser.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ReadWriteActionParser.kt @@ -7,19 +7,24 @@ package org.opensearch.indexmanagement.indexstatemanagement.action import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken import org.opensearch.indexmanagement.spi.indexstatemanagement.Action import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser class ReadWriteActionParser : ActionParser() { override fun fromStreamInput(sin: StreamInput): Action { - TODO("Not yet implemented") + val index = sin.readInt() + return ReadWriteAction(index) } override fun fromXContent(xcp: XContentParser, index: Int): Action { - TODO("Not yet implemented") + ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp) + + return ReadWriteAction(index) } override fun getActionType(): String { - TODO("Not yet implemented") + return ReadWriteAction.name } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/readwrite/SetReadWriteStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/readwrite/SetReadWriteStep.kt index 170a3f1ee..91e4d1a54 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/readwrite/SetReadWriteStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/readwrite/SetReadWriteStep.kt @@ -5,24 +5,77 @@ package org.opensearch.indexmanagement.indexstatemanagement.step.readwrite +import org.apache.logging.log4j.LogManager +import org.opensearch.ExceptionsHelper +import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest +import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_WRITE +import org.opensearch.common.settings.Settings +import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.spi.indexstatemanagement.Step import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.transport.RemoteTransportException class SetReadWriteStep : Step(name) { + private val logger = LogManager.getLogger(javaClass) + private var stepStatus = StepStatus.STARTING + private var info: Map? = null + override suspend fun execute(): Step { - TODO("Not yet implemented") + val context = this.context ?: return this + val indexName = context.metadata.index + try { + val updateSettingsRequest = UpdateSettingsRequest() + .indices(indexName) + .settings( + Settings.builder().put(SETTING_BLOCKS_WRITE, false) + ) + val response: AcknowledgedResponse = context.client.admin().indices() + .suspendUntil { updateSettings(updateSettingsRequest, it) } + + if (response.isAcknowledged) { + stepStatus = StepStatus.COMPLETED + info = mapOf("message" to getSuccessMessage(indexName)) + } else { + val message = getFailedMessage(indexName) + logger.warn(message) + stepStatus = StepStatus.FAILED + info = mapOf("message" to message) + } + } catch (e: RemoteTransportException) { + handleException(indexName, ExceptionsHelper.unwrapCause(e) as Exception) + } catch (e: Exception) { + handleException(indexName, e) + } + + return this } - override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData { - TODO("Not yet implemented") + private fun handleException(indexName: String, e: Exception) { + val message = getFailedMessage(indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() } - override fun isIdempotent(): Boolean { - TODO("Not yet implemented") + override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData { + return currentMetadata.copy( + stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus), + transitionTo = null, + info = info + ) } + override fun isIdempotent(): Boolean = true + companion object { const val name = "set_read_write" + fun getFailedMessage(index: String) = "Failed to set index to read-write [index=$index]" + fun getSuccessMessage(index: String) = "Successfully set index to read-write [index=$index]" } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt index 1681051e3..fa13fe3b4 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt @@ -19,6 +19,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.randomForceMergeActio import org.opensearch.indexmanagement.indexstatemanagement.randomIndexPriorityActionConfig import org.opensearch.indexmanagement.indexstatemanagement.randomNotificationActionConfig import org.opensearch.indexmanagement.indexstatemanagement.randomReadOnlyActionConfig +import org.opensearch.indexmanagement.indexstatemanagement.randomReadWriteActionConfig import org.opensearch.indexmanagement.indexstatemanagement.randomReplicaCountActionConfig import org.opensearch.indexmanagement.indexstatemanagement.randomRolloverActionConfig import org.opensearch.indexmanagement.indexstatemanagement.randomSnapshotActionConfig @@ -75,6 +76,10 @@ class ActionTests : OpenSearchTestCase() { } } + fun `test set read write action round trip`() { + roundTripAction(randomReadWriteActionConfig()) + } + fun `test set read only action round trip`() { roundTripAction(randomReadOnlyActionConfig()) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/XContentTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/XContentTests.kt index 331fff582..0495ae27a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/XContentTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/XContentTests.kt @@ -103,12 +103,12 @@ class XContentTests : OpenSearchTestCase() { assertEquals("Round tripping ReadOnlyAction doesn't work", readOnlyAction.convertToMap(), parsedReadOnlyAction.convertToMap()) } - private fun `test read_write action config parsing`() { - val readWriteActionConfig = randomReadWriteActionConfig() + fun `test read_write action config parsing`() { + val readWriteAction = randomReadWriteActionConfig() - val readWriteActionConfigString = readWriteActionConfig.toJsonString() - val parsedReadWriteActionConfig = ISMActionsParser.instance.parse(parser(readWriteActionConfigString), 0) - assertEquals("Round tripping ReadWriteActionConfig doesn't work", readWriteActionConfig, parsedReadWriteActionConfig) + val readWriteActionString = readWriteAction.toJsonString() + val parsedReadWriteAction = ISMActionsParser.instance.parse(parser(readWriteActionString), 0) + assertEquals("Round tripping ReadWriteAction doesn't work", readWriteAction.convertToMap(), parsedReadWriteAction.convertToMap()) } private fun `test replica_count action config parsing`() { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadWriteStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadWriteStepTests.kt index f6763e652..15da3fb9a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadWriteStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/SetReadWriteStepTests.kt @@ -5,22 +5,39 @@ package org.opensearch.indexmanagement.indexstatemanagement.step +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.doAnswer +import com.nhaarman.mockitokotlin2.doReturn +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import kotlinx.coroutines.runBlocking +import org.opensearch.action.ActionListener +import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.client.AdminClient +import org.opensearch.client.Client +import org.opensearch.client.IndicesAdminClient +import org.opensearch.cluster.service.ClusterService +import org.opensearch.indexmanagement.indexstatemanagement.step.readwrite.SetReadWriteStep +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.test.OpenSearchTestCase +import org.opensearch.transport.RemoteTransportException class SetReadWriteStepTests : OpenSearchTestCase() { - /*private val clusterService: ClusterService = mock() + private val clusterService: ClusterService = mock() fun `test read write step sets step status to failed when not acknowledged`() { val setReadWriteResponse = AcknowledgedResponse(false) val client = getClient(getAdminClient(getIndicesAdminClient(setReadWriteResponse, null))) runBlocking { - val readWriteActionConfig = ReadWriteActionConfig(0) val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) - val setReadWriteStep = SetReadWriteStep(clusterService, client, readWriteActionConfig, managedIndexMetaData) - setReadWriteStep.execute() - val updatedManagedIndexMetaData = setReadWriteStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + val setReadWriteStep = SetReadWriteStep() + val context = StepContext(managedIndexMetaData, clusterService, client, null, null) + setReadWriteStep.preExecute(logger, context).execute() + val updatedManagedIndexMetaData = setReadWriteStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) } } @@ -30,11 +47,11 @@ class SetReadWriteStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val readWriteActionConfig = ReadWriteActionConfig(0) val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) - val setReadWriteStep = SetReadWriteStep(clusterService, client, readWriteActionConfig, managedIndexMetaData) - setReadWriteStep.execute() - val updatedManagedIndexMetaData = setReadWriteStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + val setReadWriteStep = SetReadWriteStep() + val context = StepContext(managedIndexMetaData, clusterService, client, null, null) + setReadWriteStep.preExecute(logger, context).execute() + val updatedManagedIndexMetaData = setReadWriteStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) } } @@ -44,11 +61,11 @@ class SetReadWriteStepTests : OpenSearchTestCase() { val client = getClient(getAdminClient(getIndicesAdminClient(null, exception))) runBlocking { - val readWriteActionConfig = ReadWriteActionConfig(0) val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null) - val setReadWriteStep = SetReadWriteStep(clusterService, client, readWriteActionConfig, managedIndexMetaData) - setReadWriteStep.execute() - val updatedManagedIndexMetaData = setReadWriteStep.getUpdatedManagedIndexMetaData(managedIndexMetaData) + val setReadWriteStep = SetReadWriteStep() + val context = StepContext(managedIndexMetaData, clusterService, client, null, null) + setReadWriteStep.preExecute(logger, context).execute() + val updatedManagedIndexMetaData = setReadWriteStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) assertEquals("Did not get cause from nested exception", "nested", updatedManagedIndexMetaData.info!!["cause"]) } @@ -65,5 +82,5 @@ class SetReadWriteStepTests : OpenSearchTestCase() { else listener.onFailure(exception) }.whenever(this.mock).updateSettings(any(), any()) } - }*/ + } }