Skip to content

Commit

Permalink
Implement OpenAction using new interface (#230)
Browse files Browse the repository at this point in the history
* Support open action

Signed-off-by: Annie Lee <[email protected]>

* Update AttemptOpenStep.kt

Signed-off-by: Annie Lee <[email protected]>

* Add close action test

Signed-off-by: Annie Lee <[email protected]>

* Add open action related tests

Signed-off-by: Annie Lee <[email protected]>

* Add open action test round trip

Signed-off-by: Annie Lee <[email protected]>

* Fix open action xcontent test

Signed-off-by: Annie Lee <[email protected]>

* Modify XContentTests for better comparison

Signed-off-by: Annie Lee <[email protected]>

* Update XContentTests.kt

Signed-off-by: Annie Lee <[email protected]>
  • Loading branch information
annie3431 authored Dec 21, 2021
1 parent 1a038ca commit e9b4212
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 29 deletions.
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,6 @@ integTest {
exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/IndexPriorityActionIT.class'
exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/IndexStateManagementHistoryIT.class'
exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/NotificationActionIT.class'
exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/OpenActionIT.class'
exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/ReplicaCountActionIT.class'
exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.class'
exclude 'org/opensearch/indexmanagement/indexstatemanagement/action/RollupActionIT.class'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.indexmanagement.indexstatemanagement.action.CloseActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.DeleteActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.OpenActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.ReadOnlyActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.ReadWriteActionParser
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
Expand All @@ -27,6 +28,7 @@ class ISMActionsParser private constructor() {
val parsers = mutableListOf<ActionParser>(
CloseActionParser(),
DeleteActionParser(),
OpenActionParser(),
ReadOnlyActionParser(),
ReadWriteActionParser()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.indexmanagement.indexstatemanagement.step.open.AttemptOpenStep
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
Expand All @@ -16,12 +17,12 @@ class OpenAction(
companion object {
const val name = "open"
}
private val attemptOpenStep = AttemptOpenStep()
private val steps = listOf(attemptOpenStep)

override fun getStepToExecute(context: StepContext): Step {
TODO("Not yet implemented")
return attemptOpenStep
}

override fun getSteps(): List<Step> {
TODO("Not yet implemented")
}
override fun getSteps(): List<Step> = steps
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,24 @@ package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser

class OpenActionParser : ActionParser() {
override fun fromStreamInput(sin: StreamInput): Action {
TODO("Not yet implemented")
val index = sin.readInt()
return OpenAction(index)
}

override fun fromXContent(xcp: XContentParser, index: Int): Action {
TODO("Not yet implemented")
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp)

return OpenAction(index)
}

override fun getActionType(): String {
TODO("Not yet implemented")
return OpenAction.name
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,69 @@

package org.opensearch.indexmanagement.indexstatemanagement.step.open

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.action.admin.indices.open.OpenIndexRequest
import org.opensearch.action.admin.indices.open.OpenIndexResponse
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.transport.RemoteTransportException

class AttemptOpenStep : Step(name) {
private val logger = LogManager.getLogger(javaClass)
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override suspend fun execute(): Step {
TODO("Not yet implemented")
val context = this.context ?: return this
val indexName = context.metadata.index
try {
val openIndexRequest = OpenIndexRequest()
.indices(indexName)

val response: OpenIndexResponse = context.client.admin().indices().suspendUntil { open(openIndexRequest, it) }
if (response.isAcknowledged) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to getSuccessMessage(indexName))
} else {
val message = getFailedMessage(indexName)
logger.warn(message)
stepStatus = StepStatus.FAILED
info = mapOf("message" to message)
}
} catch (e: RemoteTransportException) {
handleException(indexName, ExceptionsHelper.unwrapCause(e) as Exception)
} catch (e: Exception) {
handleException(indexName, e)
}

return this
}
private fun handleException(indexName: String, e: Exception) {
val message = getFailedMessage(indexName)
logger.error(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}

override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData {
TODO("Not yet implemented")
return currentMetadata.copy(
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus),
transitionTo = null,
info = info
)
}

override fun isIdempotent(): Boolean {
TODO("Not yet implemented")
}
override fun isIdempotent() = true

companion object {
const val name = "attempt_open"
// TODO: fixme
fun getSuccessMessage(indexName: String) = ""
fun getFailedMessage(indexName: String) = "Failed to open index [index=$indexName]"
fun getSuccessMessage(indexName: String) = "Successfully opened index [index=$indexName]"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.indexmanagement.indexstatemanagement.action.AllocationAction
import org.opensearch.indexmanagement.indexstatemanagement.action.CloseAction
import org.opensearch.indexmanagement.indexstatemanagement.action.DeleteAction
import org.opensearch.indexmanagement.indexstatemanagement.action.ForceMergeAction
import org.opensearch.indexmanagement.indexstatemanagement.action.IndexPriorityAction
import org.opensearch.indexmanagement.indexstatemanagement.action.NotificationAction
import org.opensearch.indexmanagement.indexstatemanagement.action.OpenAction
import org.opensearch.indexmanagement.indexstatemanagement.action.ReadOnlyAction
import org.opensearch.indexmanagement.indexstatemanagement.action.ReadWriteAction
import org.opensearch.indexmanagement.indexstatemanagement.action.ReplicaCountAction
Expand Down Expand Up @@ -165,6 +167,14 @@ fun randomRollupActionConfig(): RollupAction {
return RollupAction(ismRollup = randomISMRollup(), index = 0)
}

fun randomCloseActionConfig(): CloseAction {
return CloseAction(index = 0)
}

fun randomOpenActionConfig(): OpenAction {
return OpenAction(index = 0)
}

fun randomDestination(type: DestinationType = randomDestinationType()): Destination {
return Destination(
type = type,
Expand Down Expand Up @@ -415,6 +425,16 @@ fun RollupAction.toJsonString(): String {
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()
}

fun CloseAction.toJsonString(): String {
val builder = XContentFactory.jsonBuilder()
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()
}

fun OpenAction.toJsonString(): String {
val builder = XContentFactory.jsonBuilder()
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()
}

fun ISMTemplate.toJsonString(): String {
val builder = XContentFactory.jsonBuilder()
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import org.opensearch.common.xcontent.XContentType
import org.opensearch.indexmanagement.indexstatemanagement.ISMActionsParser
import org.opensearch.indexmanagement.indexstatemanagement.action.DeleteAction
import org.opensearch.indexmanagement.indexstatemanagement.randomAllocationActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.randomCloseActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.randomForceMergeActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.randomIndexPriorityActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.randomNotificationActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.randomOpenActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.randomReadOnlyActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.randomReadWriteActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.randomReplicaCountActionConfig
Expand Down Expand Up @@ -119,6 +121,14 @@ class ActionTests : OpenSearchTestCase() {
roundTripAction(randomAllocationActionConfig(require = mapOf("box_type" to "hot")))
}

fun `test close action round trip`() {
roundTripAction(randomCloseActionConfig())
}

fun `test open action round trip`() {
roundTripAction(randomOpenActionConfig())
}

fun `test action timeout and retry round trip`() {
val builder = XContentFactory.jsonBuilder()
.startObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Des
import org.opensearch.indexmanagement.indexstatemanagement.nonNullRandomConditions
import org.opensearch.indexmanagement.indexstatemanagement.randomAllocationActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.randomChangePolicy
import org.opensearch.indexmanagement.indexstatemanagement.randomCloseActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.randomDeleteActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.randomDestination
import org.opensearch.indexmanagement.indexstatemanagement.randomForceMergeActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.randomIndexPriorityActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.randomManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.randomNotificationActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.randomOpenActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.randomPolicy
import org.opensearch.indexmanagement.indexstatemanagement.randomReadOnlyActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.randomReadWriteActionConfig
Expand Down Expand Up @@ -208,6 +210,22 @@ class XContentTests : OpenSearchTestCase() {
assertEquals("Round tripping RollupActionConfig doesn't work", rollupActionConfig.ismRollup, parsedRollupActionConfig.ismRollup)
}

fun `test close action parsing`() {
val closeAction = randomCloseActionConfig()
val closeActionString = closeAction.toJsonString()
val parsedCloseAction = ISMActionsParser.instance.parse(parser(closeActionString), 0)

assertEquals("Round tripping CloseAction doesn't work", closeAction.convertToMap(), parsedCloseAction.convertToMap())
}

fun `test open action parsing`() {
val openAction = randomOpenActionConfig()
val openActionString = openAction.toJsonString()
val parsedOpenAction = ISMActionsParser.instance.parse(parser(openActionString), 0)

assertEquals("Round tripping OpenAction doesn't work", openAction.convertToMap(), parsedOpenAction.convertToMap())
}

fun `test managed index metadata parsing`() {
val metadata = ManagedIndexMetaData(
index = randomAlphaOfLength(10),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,39 @@

package org.opensearch.indexmanagement.indexstatemanagement.step

import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.doAnswer
import com.nhaarman.mockitokotlin2.doReturn
import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.whenever
import kotlinx.coroutines.runBlocking
import org.opensearch.action.ActionListener
import org.opensearch.action.admin.indices.open.OpenIndexResponse
import org.opensearch.client.AdminClient
import org.opensearch.client.Client
import org.opensearch.client.IndicesAdminClient
import org.opensearch.cluster.service.ClusterService
import org.opensearch.indexmanagement.indexstatemanagement.step.open.AttemptOpenStep
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.test.OpenSearchTestCase
import org.opensearch.transport.RemoteTransportException

class AttemptOpenStepTests : OpenSearchTestCase() {

/*private val clusterService: ClusterService = mock()
private val clusterService: ClusterService = mock()

fun `test open step sets step status to failed when not acknowledged`() {
val openIndexResponse = OpenIndexResponse(false, false)
val client = getClient(getAdminClient(getIndicesAdminClient(openIndexResponse, null)))

runBlocking {
val openActionConfig = OpenActionConfig(0)
val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null)
val attemptOpenStep = AttemptOpenStep(clusterService, client, openActionConfig, managedIndexMetaData)
attemptOpenStep.execute()
val updatedManagedIndexMetaData = attemptOpenStep.getUpdatedManagedIndexMetaData(managedIndexMetaData)
val attemptOpenStep = AttemptOpenStep()
val context = StepContext(managedIndexMetaData, clusterService, client, null, null)
attemptOpenStep.preExecute(logger, context).execute()
val updatedManagedIndexMetaData = attemptOpenStep.getUpdatedManagedIndexMetadata(managedIndexMetaData)
assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
}
}
Expand All @@ -30,11 +47,11 @@ class AttemptOpenStepTests : OpenSearchTestCase() {
val client = getClient(getAdminClient(getIndicesAdminClient(null, exception)))

runBlocking {
val openActionConfig = OpenActionConfig(0)
val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null)
val attemptOpenStep = AttemptOpenStep(clusterService, client, openActionConfig, managedIndexMetaData)
attemptOpenStep.execute()
val updatedManagedIndexMetaData = attemptOpenStep.getUpdatedManagedIndexMetaData(managedIndexMetaData)
val attemptOpenStep = AttemptOpenStep()
val context = StepContext(managedIndexMetaData, clusterService, client, null, null)
attemptOpenStep.preExecute(logger, context).execute()
val updatedManagedIndexMetaData = attemptOpenStep.getUpdatedManagedIndexMetadata(managedIndexMetaData)
assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
}
}
Expand All @@ -44,11 +61,11 @@ class AttemptOpenStepTests : OpenSearchTestCase() {
val client = getClient(getAdminClient(getIndicesAdminClient(null, exception)))

runBlocking {
val openActionConfig = OpenActionConfig(0)
val managedIndexMetaData = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, null, null, null)
val attemptOpenStep = AttemptOpenStep(clusterService, client, openActionConfig, managedIndexMetaData)
attemptOpenStep.execute()
val updatedManagedIndexMetaData = attemptOpenStep.getUpdatedManagedIndexMetaData(managedIndexMetaData)
val attemptOpenStep = AttemptOpenStep()
val context = StepContext(managedIndexMetaData, clusterService, client, null, null)
attemptOpenStep.preExecute(logger, context).execute()
val updatedManagedIndexMetaData = attemptOpenStep.getUpdatedManagedIndexMetadata(managedIndexMetaData)
assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
assertEquals("Did not get cause from nested exception", "nested", updatedManagedIndexMetaData.info!!["cause"])
}
Expand All @@ -65,5 +82,5 @@ class AttemptOpenStepTests : OpenSearchTestCase() {
else listener.onFailure(exception)
}.whenever(this.mock).open(any(), any())
}
}*/
}
}

0 comments on commit e9b4212

Please sign in to comment.