Skip to content

Commit

Permalink
Merge branch 'development-extension' into read-write
Browse files Browse the repository at this point in the history
  • Loading branch information
downsrob authored Dec 17, 2021
2 parents 1b735fc + 715ae2b commit 73ff8cf
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 40 deletions.
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ integTest {
exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/ActionRetryIT.class'
exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/ActionTimeoutIT.class'
exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/AllocationActionIT.class'
exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/CloseActionIT.class'
exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/ForceMergeActionIT.class'
exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/IndexPriorityActionIT.class'
exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/IndexStateManagementHistoryIT.class'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.indexmanagement.indexstatemanagement
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.indexmanagement.indexstatemanagement.action.CloseActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.DeleteActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.ReadWriteActionParser
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
Expand All @@ -23,6 +24,7 @@ class ISMActionsParser private constructor() {

// TODO: Add other action parsers as they are implemented
val parsers = mutableListOf<ActionParser>(
CloseActionParser(),
DeleteActionParser(),
ReadWriteActionParser()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.indexmanagement.indexstatemanagement.step.close.AttemptCloseStep
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
Expand All @@ -16,12 +17,13 @@ class CloseAction(
companion object {
const val name = "close"
}
private val attemptCloseStep = AttemptCloseStep()

private val steps = listOf(attemptCloseStep)

override fun getStepToExecute(context: StepContext): Step {
TODO("Not yet implemented")
return attemptCloseStep
}

override fun getSteps(): List<Step> {
TODO("Not yet implemented")
}
override fun getSteps(): List<Step> = steps
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,24 @@ package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser

class CloseActionParser : ActionParser() {
override fun fromStreamInput(sin: StreamInput): Action {
TODO("Not yet implemented")
val index = sin.readInt()
return CloseAction(index)
}

override fun fromXContent(xcp: XContentParser, index: Int): Action {
TODO("Not yet implemented")
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp)

return CloseAction(index)
}

override fun getActionType(): String {
TODO("Not yet implemented")
return CloseAction.name
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,89 @@

package org.opensearch.indexmanagement.indexstatemanagement.step.close

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.action.admin.indices.close.CloseIndexRequest
import org.opensearch.action.admin.indices.close.CloseIndexResponse
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.snapshots.SnapshotInProgressException
import org.opensearch.transport.RemoteTransportException

class AttemptCloseStep : Step(name) {

private val logger = LogManager.getLogger(javaClass)
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override suspend fun execute(): Step {
TODO("Not yet implemented")
val context = this.context ?: return this
val indexName = context.metadata.index
try {
val closeIndexRequest = CloseIndexRequest()
.indices(indexName)

val response: CloseIndexResponse = context.client.admin().indices()
.suspendUntil { close(closeIndexRequest, it) }

if (response.isAcknowledged) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to getSuccessMessage(indexName))
} else {
val message = getFailedMessage(indexName)
logger.warn(message)
stepStatus = StepStatus.FAILED
info = mapOf("message" to message)
}
} catch (e: RemoteTransportException) {
val cause = ExceptionsHelper.unwrapCause(e)
if (cause is SnapshotInProgressException) {
handleSnapshotException(indexName, cause as SnapshotInProgressException)
} else {
handleException(indexName, cause as Exception)
}
} catch (e: SnapshotInProgressException) {
handleSnapshotException(indexName, e)
} catch (e: Exception) {
handleException(indexName, e)
}

return this
}

override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData {
TODO("Not yet implemented")
private fun handleSnapshotException(indexName: String, e: SnapshotInProgressException) {
val message = getSnapshotMessage(indexName)
logger.warn(message, e)
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to message)
}

override fun isIdempotent(): Boolean {
TODO("Not yet implemented")
private fun handleException(indexName: String, e: Exception) {
val message = getFailedMessage(indexName)
logger.error(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}

override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData {
return currentMetadata.copy(
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus),
transitionTo = null,
info = info
)
}

override fun isIdempotent() = true

companion object {
const val name = "attempt_close"
fun getFailedMessage(index: String) = "Failed to close index [index=$index]"
fun getSuccessMessage(index: String) = "Successfully closed index [index=$index]"
fun getSnapshotMessage(index: String) = "Index had snapshot in progress, retrying closing [index=$index]"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ class CloseActionIT : IndexStateManagementRestTestCase() {
waitFor { assertEquals("close", getIndexState(indexName)) }
}

fun `test transitioning a closed index`() {
// TODO: Remove "private" once transition action is implemented
private fun `test transitioning a closed index`() {
val indexName = "${testIndexName}_index_3"
val policyID = "${testIndexName}_testPolicyName_3"
val actionConfig = CloseAction(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,41 @@

package org.opensearch.indexmanagement.indexstatemanagement.step

import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.doAnswer
import com.nhaarman.mockitokotlin2.doReturn
import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.whenever
import kotlinx.coroutines.runBlocking
import org.opensearch.action.ActionListener
import org.opensearch.action.admin.indices.close.CloseIndexResponse
import org.opensearch.client.AdminClient
import org.opensearch.client.Client
import org.opensearch.client.IndicesAdminClient
import org.opensearch.cluster.service.ClusterService
import org.opensearch.indexmanagement.indexstatemanagement.step.close.AttemptCloseStep
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.snapshots.SnapshotInProgressException
import org.opensearch.test.OpenSearchTestCase
import org.opensearch.transport.RemoteTransportException
import kotlin.IllegalArgumentException

class AttemptCloseStepTests : OpenSearchTestCase() {

/*private val clusterService: ClusterService = mock()
private val clusterService: ClusterService = mock()

fun `test close step sets step status to completed when successful`() {
val closeIndexResponse = CloseIndexResponse(true, true, listOf())
val client = getClient(getAdminClient(getIndicesAdminClient(closeIndexResponse, null)))

runBlocking {
val closeActionConfig = CloseActionConfig(0)
val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null)
val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData)
attemptCloseStep.execute()
val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData)
val attemptCloseStep = AttemptCloseStep()
val context = StepContext(managedIndexMetaData, clusterService, client, null, null)
attemptCloseStep.preExecute(logger, context).execute()
val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetadata(managedIndexMetaData)
assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
}
}
Expand All @@ -30,11 +49,11 @@ class AttemptCloseStepTests : OpenSearchTestCase() {
val client = getClient(getAdminClient(getIndicesAdminClient(closeIndexResponse, null)))

runBlocking {
val closeActionConfig = CloseActionConfig(0)
val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null)
val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData)
attemptCloseStep.execute()
val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData)
val attemptCloseStep = AttemptCloseStep()
val context = StepContext(managedIndexMetaData, clusterService, client, null, null)
attemptCloseStep.preExecute(logger, context).execute()
val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetadata(managedIndexMetaData)
assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
}
}
Expand All @@ -44,11 +63,11 @@ class AttemptCloseStepTests : OpenSearchTestCase() {
val client = getClient(getAdminClient(getIndicesAdminClient(null, exception)))

runBlocking {
val closeActionConfig = CloseActionConfig(0)
val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null)
val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData)
attemptCloseStep.execute()
val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData)
val attemptCloseStep = AttemptCloseStep()
val context = StepContext(managedIndexMetaData, clusterService, client, null, null)
attemptCloseStep.preExecute(logger, context).execute()
val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetadata(managedIndexMetaData)
assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
}
}
Expand All @@ -58,11 +77,11 @@ class AttemptCloseStepTests : OpenSearchTestCase() {
val client = getClient(getAdminClient(getIndicesAdminClient(null, exception)))

runBlocking {
val closeActionConfig = CloseActionConfig(0)
val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null)
val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData)
attemptCloseStep.execute()
val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData)
val attemptCloseStep = AttemptCloseStep()
val context = StepContext(managedIndexMetaData, clusterService, client, null, null)
attemptCloseStep.preExecute(logger, context).execute()
val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetadata(managedIndexMetaData)
assertEquals("Step status is not CONDITION_NOT_MET", Step.StepStatus.CONDITION_NOT_MET, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
}
}
Expand All @@ -72,11 +91,11 @@ class AttemptCloseStepTests : OpenSearchTestCase() {
val client = getClient(getAdminClient(getIndicesAdminClient(null, exception)))

runBlocking {
val closeActionConfig = CloseActionConfig(0)
val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null)
val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData)
attemptCloseStep.execute()
val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData)
val attemptCloseStep = AttemptCloseStep()
val context = StepContext(managedIndexMetaData, clusterService, client, null, null)
attemptCloseStep.preExecute(logger, context).execute()
val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetadata(managedIndexMetaData)
assertEquals("Step status is not CONDITION_NOT_MET", Step.StepStatus.CONDITION_NOT_MET, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
}
}
Expand All @@ -86,11 +105,11 @@ class AttemptCloseStepTests : OpenSearchTestCase() {
val client = getClient(getAdminClient(getIndicesAdminClient(null, exception)))

runBlocking {
val closeActionConfig = CloseActionConfig(0)
val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null)
val attemptCloseStep = AttemptCloseStep(clusterService, client, closeActionConfig, managedIndexMetaData)
attemptCloseStep.execute()
val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetaData(managedIndexMetaData)
val attemptCloseStep = AttemptCloseStep()
val context = StepContext(managedIndexMetaData, clusterService, client, null, null)
attemptCloseStep.preExecute(logger, context).execute()
val updatedManagedIndexMetaData = attemptCloseStep.getUpdatedManagedIndexMetadata(managedIndexMetaData)
assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
assertEquals("Did not get cause from nested exception", "nested", updatedManagedIndexMetaData.info!!["cause"])
}
Expand All @@ -107,5 +126,5 @@ class AttemptCloseStepTests : OpenSearchTestCase() {
else listener.onFailure(exception)
}.whenever(this.mock).close(any(), any())
}
}*/
}
}

0 comments on commit 73ff8cf

Please sign in to comment.