Skip to content

Commit

Permalink
Adds custom action parsing (#273)
Browse files Browse the repository at this point in the history
* Adds custom action parsing logic

Signed-off-by: Robert Downs <[email protected]>
  • Loading branch information
downsrob authored Feb 24, 2022
1 parent 10d724b commit 6044aa1
Show file tree
Hide file tree
Showing 16 changed files with 416 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ abstract class Action(

var configTimeout: ActionTimeout? = null
var configRetry: ActionRetry? = ActionRetry(DEFAULT_RETRIES)
var customAction: Boolean = false

final override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
configTimeout?.toXContent(builder, params)
configRetry?.toXContent(builder, params)
// TODO: We should add "custom" object wrapper based on the params
// Include a "custom" object wrapper for custom actions to allow extensions to put arbitrary action configs in the config
// index. The EXCLUDE_CUSTOM_FIELD_PARAM is used to not include this wrapper in api responses
if (customAction && !params.paramAsBoolean(EXCLUDE_CUSTOM_FIELD_PARAM, false)) builder.startObject(CUSTOM_ACTION_FIELD)
populateAction(builder, params)
if (customAction && !params.paramAsBoolean(EXCLUDE_CUSTOM_FIELD_PARAM, false)) builder.endObject()
return builder.endObject()
}

Expand Down Expand Up @@ -70,5 +74,7 @@ abstract class Action(

companion object {
const val DEFAULT_RETRIES = 3L
const val CUSTOM_ACTION_FIELD = "custom"
const val EXCLUDE_CUSTOM_FIELD_PARAM = "exclude_custom"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ class ISMActionsParser private constructor() {
val instance = ISMActionsParser()
}

// TODO: Add other action parsers as they are implemented
val parsers = mutableListOf<ActionParser>(
AllocationActionParser(),
CloseActionParser(),
Expand All @@ -50,6 +49,9 @@ class ISMActionsParser private constructor() {
)

fun addParser(parser: ActionParser) {
if (parsers.map { it.getActionType() }.contains(parser.getActionType())) {
throw IllegalArgumentException(getDuplicateActionTypesMessage(parser.getActionType()))
}
parsers.add(parser)
}

Expand Down Expand Up @@ -77,13 +79,17 @@ class ISMActionsParser private constructor() {
when (type) {
ActionTimeout.TIMEOUT_FIELD -> timeout = ActionTimeout.parse(xcp)
ActionRetry.RETRY_FIELD -> retry = ActionRetry.parse(xcp)
Action.CUSTOM_ACTION_FIELD -> {
// The "custom" wrapper allows extensions to create arbitrary actions without updating the config mappings
// We consume the full custom wrapper and parse the action in this step
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
val customActionType = xcp.currentName()
XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, xcp.nextToken(), xcp)
action = parseAction(xcp, totalActions, customActionType)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp)
}
else -> {
val parser = parsers.firstOrNull { it.getActionType() == type }
if (parser != null) {
action = parser.fromXContent(xcp, totalActions)
} else {
throw IllegalArgumentException("Invalid field: [$type] found in Actions.")
}
action = parseAction(xcp, totalActions, type)
}
}
}
Expand All @@ -96,7 +102,21 @@ class ISMActionsParser private constructor() {
return action
}

private fun parseAction(xcp: XContentParser, totalActions: Int, type: String): Action {
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
val action: Action?
val parser = parsers.firstOrNull { it.getActionType() == type }
if (parser != null) {
action = parser.fromXContent(xcp, totalActions)
action.customAction = parser.customAction
} else {
throw IllegalArgumentException("Invalid field: [$type] found in Actions.")
}
return action
}

companion object {
val instance: ISMActionsParser by lazy { HOLDER.instance }
fun getDuplicateActionTypesMessage(actionType: String) = "Multiple action parsers attempted to register the same action type [$actionType]"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ data class Policy(
.field(SCHEMA_VERSION_FIELD, schemaVersion)
.field(ERROR_NOTIFICATION_FIELD, errorNotification)
.field(DEFAULT_STATE_FIELD, defaultState)
.field(STATES_FIELD, states.toTypedArray())
.startArray(STATES_FIELD)
.also { states.forEach { state -> state.toXContent(it, params) } }
.endArray()
.optionalISMTemplateField(ISM_TEMPLATE, ismTemplate)
if (params.paramAsBoolean(WITH_USER, true)) builder.optionalUserField(USER_FIELD, user)
if (params.paramAsBoolean(WITH_TYPE, true)) builder.endObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ data class State(
builder
.startObject()
.field(NAME_FIELD, name)
.field(ACTIONS_FIELD, actions.toTypedArray())
.startArray(ACTIONS_FIELD)
.also { actions.forEach { action -> action.toXContent(it, params) } }
.endArray()
.field(TRANSITIONS_FIELD, transitions.toTypedArray())
.endObject()
return builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE_AND_USER
import org.opensearch.indexmanagement.indexstatemanagement.util.WITH_TYPE
import org.opensearch.indexmanagement.indexstatemanagement.util.WITH_USER
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.util._ID
import org.opensearch.indexmanagement.util._PRIMARY_TERM
import org.opensearch.indexmanagement.util._SEQ_NO
Expand Down Expand Up @@ -43,6 +45,7 @@ class GetPoliciesResponse : ActionResponse, ToXContentObject {
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
val policyParams = ToXContent.MapParams(mapOf(WITH_TYPE to "false", WITH_USER to "false", Action.EXCLUDE_CUSTOM_FIELD_PARAM to "true"))
return builder.startObject()
.startArray("policies")
.apply {
Expand All @@ -51,7 +54,7 @@ class GetPoliciesResponse : ActionResponse, ToXContentObject {
.field(_ID, policy.id)
.field(_SEQ_NO, policy.seqNo)
.field(_PRIMARY_TERM, policy.primaryTerm)
.field(Policy.POLICY_TYPE, policy, XCONTENT_WITHOUT_TYPE_AND_USER)
.field(Policy.POLICY_TYPE, policy, policyParams)
.endObject()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE_AND_USER
import org.opensearch.indexmanagement.indexstatemanagement.util.WITH_TYPE
import org.opensearch.indexmanagement.indexstatemanagement.util.WITH_USER
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.util._ID
import org.opensearch.indexmanagement.util._PRIMARY_TERM
import org.opensearch.indexmanagement.util._SEQ_NO
Expand Down Expand Up @@ -65,7 +67,8 @@ class GetPolicyResponse : ActionResponse, ToXContentObject {
.field(_SEQ_NO, seqNo)
.field(_PRIMARY_TERM, primaryTerm)
if (policy != null) {
builder.field(Policy.POLICY_TYPE, policy, XCONTENT_WITHOUT_TYPE_AND_USER)
val policyParams = ToXContent.MapParams(mapOf(WITH_TYPE to "false", WITH_USER to "false", Action.EXCLUDE_CUSTOM_FIELD_PARAM to "true"))
builder.field(Policy.POLICY_TYPE, policy, policyParams)
}

return builder.endObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_USER
import org.opensearch.indexmanagement.indexstatemanagement.util.WITH_USER
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action.Companion.EXCLUDE_CUSTOM_FIELD_PARAM
import org.opensearch.indexmanagement.util._ID
import org.opensearch.indexmanagement.util._PRIMARY_TERM
import org.opensearch.indexmanagement.util._SEQ_NO
Expand Down Expand Up @@ -66,12 +67,13 @@ class IndexPolicyResponse : ActionResponse, ToXContentObject {
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
val policyParams = ToXContent.MapParams(mapOf(WITH_USER to "false", EXCLUDE_CUSTOM_FIELD_PARAM to "true"))
return builder.startObject()
.field(_ID, id)
.field(_VERSION, version)
.field(_PRIMARY_TERM, primaryTerm)
.field(_SEQ_NO, seqNo)
.field(Policy.POLICY_TYPE, policy, XCONTENT_WITHOUT_USER)
.field(Policy.POLICY_TYPE, policy, policyParams)
.endObject()
}
}
4 changes: 4 additions & 0 deletions src/main/resources/mappings/opendistro-ism-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,10 @@
}
}
}
},
"custom": {
"enabled": false,
"type": "object"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.extension

import org.junit.After
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentType
import org.opensearch.indexmanagement.indexstatemanagement.ISMActionsParser
import org.opensearch.indexmanagement.opensearchapi.convertToMap
import org.opensearch.indexmanagement.opensearchapi.string
import org.opensearch.test.OpenSearchTestCase
import kotlin.test.assertFailsWith

class ISMActionsParserTests : OpenSearchTestCase() {

/*
* If any tests added the custom action parser, it should be removed from the static instance to not impact other tests
*/
@After
@Suppress("UnusedPrivateMember")
private fun removeCustomActionParser() {
ISMActionsParser.instance.parsers.removeIf { it.getActionType() == SampleCustomActionParser.SampleCustomAction.name }
}

fun `test duplicate action names fail`() {
val customActionParser = SampleCustomActionParser()
customActionParser.customAction = true
// Duplicate custom parser names should fail
ISMActionsParser.instance.addParser(customActionParser)
assertFailsWith<IllegalArgumentException>("Expected IllegalArgumentException for duplicate action names") {
ISMActionsParser.instance.addParser(customActionParser)
}
// Adding any duplicate parser should fail
assertFailsWith<IllegalArgumentException>("Expected IllegalArgumentException for duplicate action names") {
val randomExistingParser = ISMActionsParser.instance.parsers.random()
ISMActionsParser.instance.addParser(randomExistingParser)
}
}

fun `test custom action parsing`() {
val customActionParser = SampleCustomActionParser()
customActionParser.customAction = true
ISMActionsParser.instance.addParser(customActionParser)
val customAction = SampleCustomActionParser.SampleCustomAction(randomInt(), 0)
val builder = XContentFactory.jsonBuilder()

val customActionString = customAction.toXContent(builder, ToXContent.EMPTY_PARAMS).string()
val parser = XContentType.JSON.xContent().createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, customActionString)
parser.nextToken()
val parsedCustomAction = ISMActionsParser.instance.parse(parser, 1)
assertTrue("Action was not set to be custom after parsing", parsedCustomAction.customAction)
customAction.customAction = true
assertEquals("Round tripping custom action doesn't work", customAction.convertToMap(), parsedCustomAction.convertToMap())
}

fun `test parsing custom action without custom flag`() {
val customActionParser = SampleCustomActionParser()
customActionParser.customAction = true
ISMActionsParser.instance.addParser(customActionParser)
val customAction = SampleCustomActionParser.SampleCustomAction(randomInt(), 0)
customAction.customAction = true

val customActionString = "{\"retry\":{\"count\":3,\"backoff\":\"exponential\",\"delay\":\"1m\"},\"some_custom_action\":{\"some_int_field\":${customAction.someInt}}}"
val parser = XContentType.JSON.xContent().createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, customActionString)
parser.nextToken()
val parsedCustomAction = ISMActionsParser.instance.parse(parser, 1)
assertTrue("Action was not set to be custom after parsing", parsedCustomAction.customAction)
assertEquals("Round tripping custom action doesn't work", customAction.convertToMap(), parsedCustomAction.convertToMap())
assertTrue("Custom action did not have custom keyword after parsing", parsedCustomAction.convertToMap().containsKey("custom"))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.extension

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData

class SampleCustomActionParser : ActionParser() {
override fun fromStreamInput(sin: StreamInput): Action {
val someInt = sin.readInt()
val index = sin.readInt()
return SampleCustomAction(someInt, index)
}

override fun fromXContent(xcp: XContentParser, index: Int): Action {
var someInt: Int? = null

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
SampleCustomAction.SOME_INT_FIELD -> someInt = xcp.intValue()
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in SampleCustomAction.")
}
}
return SampleCustomAction(someInt = requireNotNull(someInt) { "SomeInt field must be specified" }, index)
}

override fun getActionType(): String {
return SampleCustomAction.name
}
class SampleCustomAction(val someInt: Int, index: Int) : Action(name, index) {

private val sampleCustomStep = SampleCustomStep()
private val steps = listOf(sampleCustomStep)

override fun getStepToExecute(context: StepContext): Step = sampleCustomStep

override fun getSteps(): List<Step> = steps

override fun populateAction(builder: XContentBuilder, params: ToXContent.Params) {
builder.startObject(type)
builder.field(SOME_INT_FIELD, someInt)
builder.endObject()
}

override fun populateAction(out: StreamOutput) {
out.writeInt(someInt)
out.writeInt(actionIndex)
}

companion object {
const val name = "some_custom_action"
const val SOME_INT_FIELD = "some_int_field"
}
}
class SampleCustomStep : Step(name) {
private var stepStatus = StepStatus.STARTING

override suspend fun execute(): Step {
stepStatus = StepStatus.COMPLETED
return this
}

override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData {
return currentMetadata.copy(
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus),
transitionTo = null,
info = null
)
}

override fun isIdempotent(): Boolean = true

companion object {
const val name = "some_custom_step"
}
}
}
Loading

0 comments on commit 6044aa1

Please sign in to comment.