Skip to content

Commit

Permalink
Support copy alias in rollover
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed Aug 15, 2023
1 parent 2d87a75 commit 181c359
Show file tree
Hide file tree
Showing 12 changed files with 272 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ data class ManagedIndexMetaData(
val info: Map<String, Any>?,
val id: String = NO_ID,
val seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO,
val primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM
val primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
val rolledOverIndexName: String? = null,
) : Writeable, ToXContentFragment {

@Suppress("ComplexMethod")
Expand All @@ -51,6 +52,7 @@ data class ManagedIndexMetaData(
if (policyPrimaryTerm != null) resultMap[POLICY_PRIMARY_TERM] = policyPrimaryTerm.toString()
if (policyCompleted != null) resultMap[POLICY_COMPLETED] = policyCompleted.toString()
if (rolledOver != null) resultMap[ROLLED_OVER] = rolledOver.toString()
if (rolledOverIndexName != null) resultMap[ROLLED_OVER_INDEX_NAME] = rolledOverIndexName
if (indexCreationDate != null) resultMap[INDEX_CREATION_DATE] = indexCreationDate.toString()
if (transitionTo != null) resultMap[TRANSITION_TO] = transitionTo
if (stateMetaData != null) resultMap[StateMetaData.STATE] = stateMetaData.getMapValueString()
Expand All @@ -76,6 +78,7 @@ data class ManagedIndexMetaData(
.field(POLICY_PRIMARY_TERM, policyPrimaryTerm)
.field(POLICY_COMPLETED, policyCompleted)
.field(ROLLED_OVER, rolledOver)
.field(ROLLED_OVER_INDEX_NAME, rolledOverIndexName)
.field(INDEX_CREATION_DATE, indexCreationDate)
.field(TRANSITION_TO, transitionTo)
.addObject(StateMetaData.STATE, stateMetaData, params, true)
Expand Down Expand Up @@ -110,6 +113,7 @@ data class ManagedIndexMetaData(
// Only show rolled_over if we have rolled over or we are in the rollover action
if (rolledOver == true || (actionMetaData != null && actionMetaData.name == "rollover")) {
builder.field(ROLLED_OVER, rolledOver)
if (rolledOverIndexName != null) builder.field(ROLLED_OVER_INDEX_NAME, rolledOverIndexName)
}

if (indexCreationDate != null) builder.field(INDEX_CREATION_DATE, indexCreationDate)
Expand Down Expand Up @@ -142,6 +146,7 @@ data class ManagedIndexMetaData(
streamOutput.writeOptionalLong(policyPrimaryTerm)
streamOutput.writeOptionalBoolean(policyCompleted)
streamOutput.writeOptionalBoolean(rolledOver)
streamOutput.writeOptionalString(rolledOverIndexName)
streamOutput.writeOptionalLong(indexCreationDate)
streamOutput.writeOptionalString(transitionTo)

Expand Down Expand Up @@ -172,6 +177,7 @@ data class ManagedIndexMetaData(
const val POLICY_PRIMARY_TERM = "policy_primary_term"
const val POLICY_COMPLETED = "policy_completed"
const val ROLLED_OVER = "rolled_over"
const val ROLLED_OVER_INDEX_NAME = "rolled_over_index_name"
const val INDEX_CREATION_DATE = "index_creation_date"
const val TRANSITION_TO = "transition_to"
const val INFO = "info"
Expand All @@ -185,6 +191,7 @@ data class ManagedIndexMetaData(
val policyPrimaryTerm: Long? = si.readOptionalLong()
val policyCompleted: Boolean? = si.readOptionalBoolean()
val rolledOver: Boolean? = si.readOptionalBoolean()
val rolledOverIndexName: String? = si.readOptionalString()
val indexCreationDate: Long? = si.readOptionalLong()
val transitionTo: String? = si.readOptionalString()

Expand All @@ -207,6 +214,7 @@ data class ManagedIndexMetaData(
policyPrimaryTerm = policyPrimaryTerm,
policyCompleted = policyCompleted,
rolledOver = rolledOver,
rolledOverIndexName = rolledOverIndexName,
indexCreationDate = indexCreationDate,
transitionTo = transitionTo,
stateMetaData = state,
Expand Down Expand Up @@ -234,6 +242,7 @@ data class ManagedIndexMetaData(
var policyPrimaryTerm: Long? = null
var policyCompleted: Boolean? = null
var rolledOver: Boolean? = null
var rolledOverIndexName: String? = null
var indexCreationDate: Long? = null
var transitionTo: String? = null

Expand All @@ -256,6 +265,7 @@ data class ManagedIndexMetaData(
POLICY_PRIMARY_TERM -> policyPrimaryTerm = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.longValue()
POLICY_COMPLETED -> policyCompleted = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.booleanValue()
ROLLED_OVER -> rolledOver = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.booleanValue()
ROLLED_OVER_INDEX_NAME -> rolledOverIndexName = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.text()
INDEX_CREATION_DATE -> indexCreationDate = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.longValue()
TRANSITION_TO -> transitionTo = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.text()
StateMetaData.STATE -> {
Expand All @@ -277,23 +287,24 @@ data class ManagedIndexMetaData(
}

return ManagedIndexMetaData(
requireNotNull(index) { "$INDEX is null" },
requireNotNull(indexUuid) { "$INDEX_UUID is null" },
requireNotNull(policyID) { "$POLICY_ID is null" },
policySeqNo,
policyPrimaryTerm,
policyCompleted,
rolledOver,
indexCreationDate,
transitionTo,
state,
action,
step,
retryInfo,
info,
id,
seqNo,
primaryTerm
index = requireNotNull(index) { "$INDEX is null" },
indexUuid = requireNotNull(indexUuid) { "$INDEX_UUID is null" },
policyID = requireNotNull(policyID) { "$POLICY_ID is null" },
policySeqNo = policySeqNo,
policyPrimaryTerm = policyPrimaryTerm,
policyCompleted = policyCompleted,
rolledOver = rolledOver,
rolledOverIndexName = rolledOverIndexName,
indexCreationDate = indexCreationDate,
transitionTo = transitionTo,
stateMetaData = state,
actionMetaData = action,
stepMetaData = step,
policyRetryInfo = retryInfo,
info = info,
id = id,
seqNo = seqNo,
primaryTerm = primaryTerm,
)
}

Expand Down Expand Up @@ -323,6 +334,7 @@ data class ManagedIndexMetaData(
policyPrimaryTerm = map[POLICY_PRIMARY_TERM]?.toLong(),
policyCompleted = map[POLICY_COMPLETED]?.toBoolean(),
rolledOver = map[ROLLED_OVER]?.toBoolean(),
rolledOverIndexName = map[ROLLED_OVER_INDEX_NAME],
indexCreationDate = map[INDEX_CREATION_DATE]?.toLong(),
transitionTo = map[TRANSITION_TO],
stateMetaData = StateMetaData.fromManagedIndexMetaDataMap(map),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class RolloverAction(
val minDocs: Long?,
val minAge: TimeValue?,
val minPrimaryShardSize: ByteSizeValue?,
val copyAlias: Boolean = false,
index: Int
) : Action(name, index) {

Expand Down Expand Up @@ -47,6 +48,7 @@ class RolloverAction(
if (minDocs != null) builder.field(MIN_DOC_COUNT_FIELD, minDocs)
if (minAge != null) builder.field(MIN_INDEX_AGE_FIELD, minAge.stringRep)
if (minPrimaryShardSize != null) builder.field(MIN_PRIMARY_SHARD_SIZE_FIELD, minPrimaryShardSize.stringRep)
builder.field(COPY_ALIAS_FIELD, copyAlias)
builder.endObject()
}

Expand All @@ -55,6 +57,7 @@ class RolloverAction(
out.writeOptionalLong(minDocs)
out.writeOptionalTimeValue(minAge)
out.writeOptionalWriteable(minPrimaryShardSize)
out.writeBoolean(copyAlias)
out.writeInt(actionIndex)
}

Expand All @@ -64,5 +67,6 @@ class RolloverAction(
const val MIN_DOC_COUNT_FIELD = "min_doc_count"
const val MIN_INDEX_AGE_FIELD = "min_index_age"
const val MIN_PRIMARY_SHARD_SIZE_FIELD = "min_primary_shard_size"
const val COPY_ALIAS_FIELD = "copy_alias"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ class RolloverActionParser : ActionParser() {
val minDocs = sin.readOptionalLong()
val minAge = sin.readOptionalTimeValue()
val minPrimaryShardSize = sin.readOptionalWriteable(::ByteSizeValue)
val copyAlias = sin.readBoolean()
val index = sin.readInt()

return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, index)
return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, copyAlias, index)
}

override fun fromXContent(xcp: XContentParser, index: Int): Action {
var minSize: ByteSizeValue? = null
var minDocs: Long? = null
var minAge: TimeValue? = null
var minPrimaryShardSize: ByteSizeValue? = null
var copyAlias = false

ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
Expand All @@ -44,11 +46,12 @@ class RolloverActionParser : ActionParser() {
RolloverAction
.MIN_PRIMARY_SHARD_SIZE_FIELD
)
RolloverAction.COPY_ALIAS_FIELD -> copyAlias = xcp.booleanValue()
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in RolloverAction.")
}
}

return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, index)
return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, copyAlias, index)
}

override fun getActionType(): String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,16 @@ package org.opensearch.indexmanagement.indexstatemanagement.step.rollover

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions
import org.opensearch.action.admin.indices.rollover.RolloverRequest
import org.opensearch.action.admin.indices.rollover.RolloverResponse
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse
import org.opensearch.common.unit.ByteSizeValue
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.unit.TimeValue
import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getRolloverAlias
Expand All @@ -33,6 +38,7 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) {
private val logger = LogManager.getLogger(javaClass)
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null
private var newIndex: String? = null

@Suppress("ComplexMethod", "LongMethod")
override suspend fun execute(): Step {
Expand All @@ -53,6 +59,9 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) {
if (clusterService.state().metadata.index(indexName).rolloverInfos.containsKey(rolloverTarget)) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to getAlreadyRolledOverMessage(indexName, rolloverTarget))

// If already rolled over, alias may not get copied over yet
copyAlias(clusterService, indexName, context.client, rolloverTarget, context.metadata)
return this
}

Expand Down Expand Up @@ -114,6 +123,7 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) {
" numDocs=$numDocs, indexSize=${indexSize.bytes}, primaryShardSize=${largestPrimaryShardSize.bytes}]"
)
executeRollover(context, rolloverTarget, isDataStream, conditions)
copyAlias(clusterService, indexName, context.client, rolloverTarget, context.metadata)
} else {
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to getPendingMessage(indexName), "conditions" to conditions)
Expand Down Expand Up @@ -158,14 +168,14 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) {
val indexName = context.metadata.index
val metadata = context.clusterService.state().metadata
val indexAlias = metadata.index(indexName)?.aliases?.get(alias)
logger.debug("Index $indexName has aliases $indexAlias")
logger.debug("Index {} has aliases {}", indexName, indexAlias)
if (indexAlias == null) {
return false
}
val isWriteIndex = indexAlias.writeIndex() // this could be null
if (isWriteIndex != true) {
val aliasIndices = metadata.indicesLookup[alias]?.indices?.map { it.index }
logger.debug("Alias $alias contains indices $aliasIndices")
logger.debug("Alias {} contains indices {}", alias, aliasIndices)
if (aliasIndices != null && aliasIndices.size > 1) {
return false
}
Expand Down Expand Up @@ -225,6 +235,9 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) {
else -> getSuccessMessage(indexName)
}

// Save newIndex later to metadata to be reused in case of failures
newIndex = response.newIndex

stepStatus = StepStatus.COMPLETED
info = listOfNotNull(
"message" to message,
Expand All @@ -251,21 +264,89 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) {
}
}

/**
* This method should be called ASAP after rollover succeed
*
* Rollover currently only copy the alias being rolled over on to new index
* This method copy any remaining aliases to new index
*
* TODO This method can be deprecated once this issue finished
* https://github.com/opensearch-project/index-management/issues/849 finished
*/
private suspend fun copyAlias(
clusterService: ClusterService,
indexName: String,
client: Client,
rolloverTarget: String,
metadata: ManagedIndexMetaData
) {
if (!action.copyAlias) return
val rolledOverIndexName = newIndex ?: metadata.rolledOverIndexName
if (rolledOverIndexName == null) {
logger.warn("Cannot find rolled over index to copy aliases to.")
return
}

val aliasActions = mutableListOf<AliasActions>()
val aliases = clusterService.state().metadata().index(indexName).aliases
for (alias in aliases) {
val aliasName = alias.key
// Skip the alias that has been rolled over on, since it's already copied
if (aliasName == rolloverTarget) continue

val aliasMetadata = alias.value
val aliasAction = AliasActions(AliasActions.Type.ADD).index(rolledOverIndexName)
.alias(aliasMetadata.alias)
.filter(aliasMetadata.filter?.toString())
.searchRouting(aliasMetadata.searchRouting)
.indexRouting(aliasMetadata.indexRouting)
.isHidden(aliasMetadata.isHidden)
aliasActions.add(aliasAction)
}
val aliasReq = IndicesAliasesRequest()
aliasActions.forEach { aliasReq.addAliasAction(it) }

// Preserve the rolled over conditions
val conditions = info?.get("conditions") ?: context?.metadata?.info?.get("conditions")
try {
val aliasRes: AcknowledgedResponse = client.admin().indices().suspendUntil { aliases(aliasReq, it) }
if (aliasRes.isAcknowledged) {
stepStatus = StepStatus.COMPLETED
info = listOfNotNull(
"message" to getSuccessCopyAliasMessage(indexName, rolledOverIndexName),
if (conditions != null) "conditions" to conditions else null
).toMap()
} else {
stepStatus = StepStatus.FAILED
info = listOfNotNull(
"message" to getFailedCopyAliasMessage(indexName, rolledOverIndexName),
if (conditions != null) "conditions" to conditions else null
).toMap()
}
} catch (e: Exception) {
handleException(indexName, e, getFailedCopyAliasMessage(indexName, rolledOverIndexName), conditions)
}
}

override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData {
logger.info("attempt rollover step completed, updating metadata")
logger.info("current metadata: ${currentMetadata.rolledOverIndexName}, new metadata: $newIndex")
return currentMetadata.copy(
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus),
rolledOver = if (currentMetadata.rolledOver == true) true else stepStatus == StepStatus.COMPLETED,
rolledOverIndexName = if (currentMetadata.rolledOverIndexName != null) currentMetadata.rolledOverIndexName else newIndex,
transitionTo = null,
info = info
)
}

private fun handleException(indexName: String, e: Exception, message: String = getFailedMessage(indexName)) {
private fun handleException(indexName: String, e: Exception, message: String = getFailedMessage(indexName), conditions: Any? = null) {
logger.error(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to message)
val mutableInfo: MutableMap<String, Any> = mutableMapOf("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
if (conditions != null) mutableInfo["conditions"] = conditions
info = mutableInfo.toMap()
}

Expand All @@ -288,5 +369,9 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) {
fun getSkipRolloverMessage(index: String) = "Skipped rollover action for [index=$index]"
fun getAlreadyRolledOverMessage(index: String, alias: String) =
"This index has already been rolled over using this alias, treating as a success [index=$index, alias=$alias]"
fun getSuccessCopyAliasMessage(index: String, newIndex: String) =
"Successfully rolled over and copied alias from [index=$index] to [index=$newIndex]"
fun getFailedCopyAliasMessage(index: String, newIndex: String) =
"Successfully rolled over but failed to copied alias from [index=$index] to [index=$newIndex]"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ fun ManagedIndexMetaData.getCompletedManagedIndexMetaData(
return this.copy(
policyCompleted = updatedStepMetaData.policyCompleted,
rolledOver = updatedStepMetaData.rolledOver,
rolledOverIndexName = updatedStepMetaData.rolledOverIndexName,
actionMetaData = updatedActionMetaData,
stepMetaData = updatedStepMetaData.stepMetaData,
transitionTo = updatedStepMetaData.transitionTo,
Expand Down
Loading

0 comments on commit 181c359

Please sign in to comment.