Skip to content

Commit

Permalink
Merge branch '2.x' into backport1000
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenlan-amzn authored Oct 13, 2023
2 parents b98d962 + ef71c70 commit 382e03d
Show file tree
Hide file tree
Showing 29 changed files with 1,646 additions and 61 deletions.
2 changes: 1 addition & 1 deletion DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ However, to build the `index management` plugin project, we also use the OpenSea

### Building from the command line

1. `./gradlew build` builds and tests project.
1. `./gradlew build` builds and tests project.
2. `./gradlew run` launches a single node cluster with the index management (and job-scheduler) plugin installed.
3. `./gradlew run -PnumNodes=3` launches a multi-node cluster with the index management (and job-scheduler) plugin installed.
4. `./gradlew integTest` launches a single node cluster with the index management (and job-scheduler) plugin installed and runs all integ tests.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
## Version 2.11.0.0 2023-10-11

Compatible with OpenSearch 2.10.0

### Maintenance
* Increment version to 2.11.0-SNAPSHOT. ([#922](https://github.com/opensearch-project/index-management/pull/922))

### Enhancements
* Provide unique id for each rollup job and add debug logs. ([#968](https://github.com/opensearch-project/index-management/pull/968))

### Bug fixes
* Fix auto managed index always have -2 seqNo bug. ([#924](https://github.com/opensearch-project/index-management/pull/924))

### Infrastructure
* Upload docker test cluster log. ([#964](https://github.com/opensearch-project/index-management/pull/964))
* Reduce test running time. ([#965](https://github.com/opensearch-project/index-management/pull/965))
* Parallel test run. ([#966](https://github.com/opensearch-project/index-management/pull/966))
* Security test filtered. ([#969](https://github.com/opensearch-project/index-management/pull/969))

### Documentation
* Added 2.11 release notes. ([#1004](https://github.com/opensearch-project/index-management/pull/1004))
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ data class ActionProperties(
val snapshotName: String? = null,
val rollupId: String? = null,
val hasRollupFailed: Boolean? = null,
val shrinkActionProperties: ShrinkActionProperties? = null
val shrinkActionProperties: ShrinkActionProperties? = null,
val transformActionProperties: TransformActionProperties? = null
) : Writeable, ToXContentFragment {

override fun writeTo(out: StreamOutput) {
Expand All @@ -32,6 +33,7 @@ data class ActionProperties(
out.writeOptionalString(rollupId)
out.writeOptionalBoolean(hasRollupFailed)
out.writeOptionalWriteable(shrinkActionProperties)
out.writeOptionalWriteable(transformActionProperties)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
Expand All @@ -40,6 +42,7 @@ data class ActionProperties(
if (rollupId != null) builder.field(Properties.ROLLUP_ID.key, rollupId)
if (hasRollupFailed != null) builder.field(Properties.HAS_ROLLUP_FAILED.key, hasRollupFailed)
if (shrinkActionProperties != null) builder.addObject(ShrinkActionProperties.SHRINK_ACTION_PROPERTIES, shrinkActionProperties, params)
if (transformActionProperties != null) builder.addObject(TransformActionProperties.TRANSFORM_ACTION_PROPERTIES, transformActionProperties, params)
return builder
}

Expand All @@ -52,7 +55,8 @@ data class ActionProperties(
val rollupId: String? = si.readOptionalString()
val hasRollupFailed: Boolean? = si.readOptionalBoolean()
val shrinkActionProperties: ShrinkActionProperties? = si.readOptionalWriteable { ShrinkActionProperties.fromStreamInput(it) }
return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties)
val transformActionProperties: TransformActionProperties? = si.readOptionalWriteable { TransformActionProperties.fromStreamInput(it) }
return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties, transformActionProperties)
}

fun parse(xcp: XContentParser): ActionProperties {
Expand All @@ -61,6 +65,7 @@ data class ActionProperties(
var rollupId: String? = null
var hasRollupFailed: Boolean? = null
var shrinkActionProperties: ShrinkActionProperties? = null
var transformActionProperties: TransformActionProperties? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
Expand All @@ -75,10 +80,13 @@ data class ActionProperties(
ShrinkActionProperties.SHRINK_ACTION_PROPERTIES -> {
shrinkActionProperties = if (xcp.currentToken() == Token.VALUE_NULL) null else ShrinkActionProperties.parse(xcp)
}
TransformActionProperties.TRANSFORM_ACTION_PROPERTIES -> {
transformActionProperties = if (xcp.currentToken() == Token.VALUE_NULL) null else TransformActionProperties.parse(xcp)
}
}
}

return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties)
return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties, transformActionProperties)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement.model

import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.common.io.stream.Writeable
import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.ToXContentFragment
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser

data class TransformActionProperties(
val transformId: String?
) : Writeable, ToXContentFragment {

override fun writeTo(out: StreamOutput) {
out.writeOptionalString(transformId)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder {
if (transformId != null) builder.field(Properties.TRANSFORM_ID.key, transformId)
return builder
}

companion object {
const val TRANSFORM_ACTION_PROPERTIES = "transform_action_properties"

fun fromStreamInput(sin: StreamInput): TransformActionProperties {
val transformId: String? = sin.readOptionalString()
return TransformActionProperties(transformId)
}

fun parse(xcp: XContentParser): TransformActionProperties {
var transformId: String? = null

ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
Properties.TRANSFORM_ID.key -> transformId = xcp.text()
}
}

return TransformActionProperties(transformId)
}
}

enum class Properties(val key: String) {
TRANSFORM_ID("transform_id")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin

val managedIndexCoordinator = ManagedIndexCoordinator(
environment.settings(),
client, clusterService, threadPool, indexManagementIndices, metadataService, templateService, indexMetadataProvider
client, clusterService, threadPool, indexManagementIndices, metadataService, templateService, indexMetadataProvider, xContentRegistry
)

val smRunner = SMRunner.init(client, threadPool, settings, indexManagementIndices, clusterService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction
import org.opensearch.indexmanagement.indexstatemanagement.action.RollupActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.SnapshotActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.TransformActionParser
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry
Expand All @@ -49,7 +50,8 @@ class ISMActionsParser private constructor() {
RollupActionParser(),
RolloverActionParser(),
ShrinkActionParser(),
SnapshotActionParser()
SnapshotActionParser(),
TransformActionParser(),
)

val customActionExtensionMap = mutableMapOf<String, String>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.opensearch.common.regex.Regex
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.commons.authuser.User
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.index.Index
import org.opensearch.index.query.QueryBuilders
import org.opensearch.indexmanagement.IndexManagementIndices
Expand Down Expand Up @@ -117,7 +118,8 @@ class ManagedIndexCoordinator(
indexManagementIndices: IndexManagementIndices,
private val metadataService: MetadataService,
private val templateService: ISMTemplateService,
private val indexMetadataProvider: IndexMetadataProvider
private val indexMetadataProvider: IndexMetadataProvider,
private val xContentRegistry: NamedXContentRegistry
) : ClusterStateListener,
CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ManagedIndexCoordinator")),
LifecycleListener() {
Expand Down Expand Up @@ -462,7 +464,7 @@ class ManagedIndexCoordinator(

return try {
val response: SearchResponse = client.suspendUntil { search(searchRequest, it) }
parseFromSearchResponse(response = response, parse = Policy.Companion::parse)
parseFromSearchResponse(response, xContentRegistry, Policy.Companion::parse)
} catch (ex: IndexNotFoundException) {
emptyList()
} catch (ex: ClusterBlockException) {
Expand Down Expand Up @@ -713,7 +715,7 @@ class ManagedIndexCoordinator(
}
mRes.forEach {
if (it.response.isExists) {
result[it.id] = contentParser(it.response.sourceAsBytesRef).parseWithType(
result[it.id] = contentParser(it.response.sourceAsBytesRef, xContentRegistry).parseWithType(
it.response.id, it.response.seqNo, it.response.primaryTerm, ManagedIndexConfig.Companion::parse
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.indexmanagement.indexstatemanagement.step.transform.AttemptCreateTransformJobStep
import org.opensearch.indexmanagement.indexstatemanagement.step.transform.WaitForTransformCompletionStep
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.transform.model.ISMTransform

class TransformAction(
val ismTransform: ISMTransform,
index: Int
) : Action(name, index) {

companion object {
const val name = "transform"
const val ISM_TRANSFORM_FIELD = "ism_transform"
}

private val attemptCreateTransformJobStep = AttemptCreateTransformJobStep(this)
private val waitForTransformCompletionStep = WaitForTransformCompletionStep()
private val steps = listOf(attemptCreateTransformJobStep, waitForTransformCompletionStep)

@Suppress("ReturnCount")
override fun getStepToExecute(context: StepContext): Step {
// if stepMetaData is null, return first step
val stepMetaData = context.metadata.stepMetaData ?: return attemptCreateTransformJobStep

// if the current step has completed, return the next step
if (stepMetaData.stepStatus == Step.StepStatus.COMPLETED) {
return when (stepMetaData.name) {
AttemptCreateTransformJobStep.name -> waitForTransformCompletionStep
else -> attemptCreateTransformJobStep
}
}

return when (stepMetaData.name) {
AttemptCreateTransformJobStep.name -> attemptCreateTransformJobStep
else -> waitForTransformCompletionStep
}
}

override fun getSteps(): List<Step> = steps

override fun populateAction(builder: XContentBuilder, params: ToXContent.Params) {
builder.startObject(type)
builder.field(ISM_TRANSFORM_FIELD, ismTransform)
builder.endObject()
}

override fun populateAction(out: StreamOutput) {
ismTransform.writeTo(out)
out.writeInt(actionIndex)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
import org.opensearch.indexmanagement.transform.model.ISMTransform

class TransformActionParser : ActionParser() {
override fun fromStreamInput(sin: StreamInput): Action {
val ismTransform = ISMTransform(sin)
val index = sin.readInt()
return TransformAction(ismTransform, index)
}

override fun fromXContent(xcp: XContentParser, index: Int): Action {
var ismTransform: ISMTransform? = null

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
TransformAction.ISM_TRANSFORM_FIELD -> ismTransform = ISMTransform.parse(xcp)
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in TransformAction.")
}
}

return TransformAction(ismTransform = requireNotNull(ismTransform) { "TransformAction transform is null." }, index)
}

override fun getActionType(): String {
return TransformAction.name
}
}
Loading

0 comments on commit 382e03d

Please sign in to comment.