Skip to content

Commit

Permalink
Support copy alias in rollover (#907)
Browse files Browse the repository at this point in the history
* Support copy alias in rollover

Signed-off-by: bowenlan-amzn <[email protected]>

* 2.10

Signed-off-by: bowenlan-amzn <[email protected]>

---------

Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn authored Sep 1, 2023
1 parent c2ccf7e commit 686c2d6
Show file tree
Hide file tree
Showing 17 changed files with 770 additions and 98 deletions.
73 changes: 36 additions & 37 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -613,49 +613,43 @@ task integTestRemote(type: RestIntegTestTask) {

// === Set up BWC tests ===

String bwcVersionShort = "2.5.0"
String bwcVersionShort = "2.10.0"
String bwcVersion = bwcVersionShort + ".0"
String baseName = "indexmanagementBwcCluster"
String bwcFilePath = "src/test/resources/bwc/"
String bwc_js_resource_location = bwcFilePath + "job-scheduler/" + bwcVersion
String bwc_im_resource_location = bwcFilePath + "indexmanagement/" + bwcVersion

// Downloads the bwc job scheduler version
String bwc_js_download_url = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + bwcVersionShort + '/latest/linux/x64/tar/builds/' +
'opensearch/plugins/opensearch-job-scheduler-' + bwcVersion + '.zip'

// Downloads the bwc index management version
String bwc_im_download_url = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + bwcVersionShort + '/latest/linux/x64/tar/builds/' +
'opensearch/plugins/opensearch-index-management-' + bwcVersion + '.zip'

2.times { i ->
configurations {
bwcZip
}
dependencies {
bwcZip "org.opensearch.plugin:opensearch-job-scheduler:${bwcVersion}-SNAPSHOT@zip"
bwcZip "org.opensearch.plugin:opensearch-index-management:${bwcVersion}-SNAPSHOT@zip"
}
ext.resolvebwcZipFile = { pluginId ->
return new Callable<RegularFile>() {
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
return configurations.bwcZip.resolvedConfiguration.resolvedArtifacts
.find { ResolvedArtifact f ->
f.name.startsWith(pluginId)
}
.file
}
}
}
}
}
Integer bwcNumNodes = 3
2.times {i ->
testClusters {
"${baseName}$i" {
testDistribution = "ARCHIVE"
versions = [bwcVersionShort, opensearch_version]
numberOfNodes = 3
plugin(provider(new Callable<RegularFile>() {
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
return getPluginResource(bwc_js_resource_location, bwc_js_download_url)
}
}
}
}))
plugin(provider(new Callable<RegularFile>() {
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
return getPluginResource(bwc_im_resource_location, bwc_im_download_url)
}
}
}
}))
numberOfNodes = bwcNumNodes
plugin(provider(resolvebwcZipFile("opensearch-job-scheduler")))
plugin(provider(resolvebwcZipFile("opensearch-index-management")))
setting 'path.repo', "${buildDir}/cluster/shared/repo/${baseName}"
setting 'http.content_type.required', 'true'
}
Expand Down Expand Up @@ -690,6 +684,7 @@ task prepareBwcTests {
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}$i".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}$i".getName()}")
systemProperty 'tests.security.manager', 'false'
systemProperty 'cluster.number_of_nodes', "${bwcNumNodes}"
}
}

Expand All @@ -714,6 +709,7 @@ task "${baseName}#oneThirdsUpgradeCluster"(type: StandaloneRestIntegTestTask) {
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}0".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}0".getName()}")
systemProperty 'tests.security.manager', 'false'
systemProperty 'cluster.number_of_nodes', "${bwcNumNodes}"
}

// Upgrade the second node to new OpenSearch version with upgraded plugin version after the first node is upgraded.
Expand All @@ -734,6 +730,7 @@ task "${baseName}#twoThirdsUpgradedClusterTask"(type: StandaloneRestIntegTestTas
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}0".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}0".getName()}")
systemProperty 'tests.security.manager', 'false'
systemProperty 'cluster.number_of_nodes', "${bwcNumNodes}"
}

// Upgrade the third node to new OpenSearch version with upgraded plugin version after the second node is upgraded.
Expand All @@ -754,6 +751,7 @@ task "${baseName}#rollingUpgradeClusterTask"(type: StandaloneRestIntegTestTask)
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}0".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}0".getName()}")
systemProperty 'tests.security.manager', 'false'
systemProperty 'cluster.number_of_nodes', "${bwcNumNodes}"
}

// Upgrade all the nodes of the old cluster to new OpenSearch version with upgraded plugin version
Expand All @@ -772,14 +770,15 @@ task "${baseName}#fullRestartClusterTask"(type: StandaloneRestIntegTestTask) {
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}1".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}1".getName()}")
systemProperty 'tests.security.manager', 'false'
systemProperty 'cluster.number_of_nodes', "${bwcNumNodes}"
}

// A bwc test suite which runs all the bwc tasks combined
task bwcTestSuite(type: StandaloneRestIntegTestTask) {
exclude '**/*Test*'
exclude '**/*IT*'
// TODO refactor bwc test #677
// dependsOn tasks.named("${baseName}#rollingUpgradeClusterTask")
dependsOn tasks.named("${baseName}#rollingUpgradeClusterTask")
dependsOn tasks.named("${baseName}#fullRestartClusterTask")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ data class ManagedIndexMetaData(
val info: Map<String, Any>?,
val id: String = NO_ID,
val seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO,
val primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM
val primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
val rolledOverIndexName: String? = null,
) : Writeable, ToXContentFragment {

@Suppress("ComplexMethod")
Expand All @@ -50,6 +51,7 @@ data class ManagedIndexMetaData(
if (policyPrimaryTerm != null) resultMap[POLICY_PRIMARY_TERM] = policyPrimaryTerm.toString()
if (policyCompleted != null) resultMap[POLICY_COMPLETED] = policyCompleted.toString()
if (rolledOver != null) resultMap[ROLLED_OVER] = rolledOver.toString()
if (rolledOverIndexName != null) resultMap[ROLLED_OVER_INDEX_NAME] = rolledOverIndexName
if (indexCreationDate != null) resultMap[INDEX_CREATION_DATE] = indexCreationDate.toString()
if (transitionTo != null) resultMap[TRANSITION_TO] = transitionTo
if (stateMetaData != null) resultMap[StateMetaData.STATE] = stateMetaData.getMapValueString()
Expand All @@ -75,6 +77,7 @@ data class ManagedIndexMetaData(
.field(POLICY_PRIMARY_TERM, policyPrimaryTerm)
.field(POLICY_COMPLETED, policyCompleted)
.field(ROLLED_OVER, rolledOver)
.field(ROLLED_OVER_INDEX_NAME, rolledOverIndexName)
.field(INDEX_CREATION_DATE, indexCreationDate)
.field(TRANSITION_TO, transitionTo)
.addObject(StateMetaData.STATE, stateMetaData, params, true)
Expand Down Expand Up @@ -109,6 +112,7 @@ data class ManagedIndexMetaData(
// Only show rolled_over if we have rolled over or we are in the rollover action
if (rolledOver == true || (actionMetaData != null && actionMetaData.name == "rollover")) {
builder.field(ROLLED_OVER, rolledOver)
if (rolledOverIndexName != null) builder.field(ROLLED_OVER_INDEX_NAME, rolledOverIndexName)
}

if (indexCreationDate != null) builder.field(INDEX_CREATION_DATE, indexCreationDate)
Expand Down Expand Up @@ -141,6 +145,7 @@ data class ManagedIndexMetaData(
streamOutput.writeOptionalLong(policyPrimaryTerm)
streamOutput.writeOptionalBoolean(policyCompleted)
streamOutput.writeOptionalBoolean(rolledOver)
streamOutput.writeOptionalString(rolledOverIndexName)
streamOutput.writeOptionalLong(indexCreationDate)
streamOutput.writeOptionalString(transitionTo)

Expand Down Expand Up @@ -171,6 +176,7 @@ data class ManagedIndexMetaData(
const val POLICY_PRIMARY_TERM = "policy_primary_term"
const val POLICY_COMPLETED = "policy_completed"
const val ROLLED_OVER = "rolled_over"
const val ROLLED_OVER_INDEX_NAME = "rolled_over_index_name"
const val INDEX_CREATION_DATE = "index_creation_date"
const val TRANSITION_TO = "transition_to"
const val INFO = "info"
Expand All @@ -184,6 +190,7 @@ data class ManagedIndexMetaData(
val policyPrimaryTerm: Long? = si.readOptionalLong()
val policyCompleted: Boolean? = si.readOptionalBoolean()
val rolledOver: Boolean? = si.readOptionalBoolean()
val rolledOverIndexName: String? = si.readOptionalString()
val indexCreationDate: Long? = si.readOptionalLong()
val transitionTo: String? = si.readOptionalString()

Expand All @@ -206,6 +213,7 @@ data class ManagedIndexMetaData(
policyPrimaryTerm = policyPrimaryTerm,
policyCompleted = policyCompleted,
rolledOver = rolledOver,
rolledOverIndexName = rolledOverIndexName,
indexCreationDate = indexCreationDate,
transitionTo = transitionTo,
stateMetaData = state,
Expand Down Expand Up @@ -233,6 +241,7 @@ data class ManagedIndexMetaData(
var policyPrimaryTerm: Long? = null
var policyCompleted: Boolean? = null
var rolledOver: Boolean? = null
var rolledOverIndexName: String? = null
var indexCreationDate: Long? = null
var transitionTo: String? = null

Expand All @@ -255,6 +264,7 @@ data class ManagedIndexMetaData(
POLICY_PRIMARY_TERM -> policyPrimaryTerm = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.longValue()
POLICY_COMPLETED -> policyCompleted = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.booleanValue()
ROLLED_OVER -> rolledOver = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.booleanValue()
ROLLED_OVER_INDEX_NAME -> rolledOverIndexName = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.text()
INDEX_CREATION_DATE -> indexCreationDate = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.longValue()
TRANSITION_TO -> transitionTo = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.text()
StateMetaData.STATE -> {
Expand All @@ -276,23 +286,24 @@ data class ManagedIndexMetaData(
}

return ManagedIndexMetaData(
requireNotNull(index) { "$INDEX is null" },
requireNotNull(indexUuid) { "$INDEX_UUID is null" },
requireNotNull(policyID) { "$POLICY_ID is null" },
policySeqNo,
policyPrimaryTerm,
policyCompleted,
rolledOver,
indexCreationDate,
transitionTo,
state,
action,
step,
retryInfo,
info,
id,
seqNo,
primaryTerm
index = requireNotNull(index) { "$INDEX is null" },
indexUuid = requireNotNull(indexUuid) { "$INDEX_UUID is null" },
policyID = requireNotNull(policyID) { "$POLICY_ID is null" },
policySeqNo = policySeqNo,
policyPrimaryTerm = policyPrimaryTerm,
policyCompleted = policyCompleted,
rolledOver = rolledOver,
rolledOverIndexName = rolledOverIndexName,
indexCreationDate = indexCreationDate,
transitionTo = transitionTo,
stateMetaData = state,
actionMetaData = action,
stepMetaData = step,
policyRetryInfo = retryInfo,
info = info,
id = id,
seqNo = seqNo,
primaryTerm = primaryTerm,
)
}

Expand Down Expand Up @@ -322,6 +333,7 @@ data class ManagedIndexMetaData(
policyPrimaryTerm = map[POLICY_PRIMARY_TERM]?.toLong(),
policyCompleted = map[POLICY_COMPLETED]?.toBoolean(),
rolledOver = map[ROLLED_OVER]?.toBoolean(),
rolledOverIndexName = map[ROLLED_OVER_INDEX_NAME],
indexCreationDate = map[INDEX_CREATION_DATE]?.toLong(),
transitionTo = map[TRANSITION_TO],
stateMetaData = StateMetaData.fromManagedIndexMetaDataMap(map),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class RolloverAction(
val minDocs: Long?,
val minAge: TimeValue?,
val minPrimaryShardSize: ByteSizeValue?,
val copyAlias: Boolean = false,
index: Int
) : Action(name, index) {

Expand Down Expand Up @@ -47,6 +48,7 @@ class RolloverAction(
if (minDocs != null) builder.field(MIN_DOC_COUNT_FIELD, minDocs)
if (minAge != null) builder.field(MIN_INDEX_AGE_FIELD, minAge.stringRep)
if (minPrimaryShardSize != null) builder.field(MIN_PRIMARY_SHARD_SIZE_FIELD, minPrimaryShardSize.stringRep)
builder.field(COPY_ALIAS_FIELD, copyAlias)
builder.endObject()
}

Expand All @@ -55,6 +57,7 @@ class RolloverAction(
out.writeOptionalLong(minDocs)
out.writeOptionalTimeValue(minAge)
out.writeOptionalWriteable(minPrimaryShardSize)
out.writeBoolean(copyAlias)
out.writeInt(actionIndex)
}

Expand All @@ -64,5 +67,6 @@ class RolloverAction(
const val MIN_DOC_COUNT_FIELD = "min_doc_count"
const val MIN_INDEX_AGE_FIELD = "min_index_age"
const val MIN_PRIMARY_SHARD_SIZE_FIELD = "min_primary_shard_size"
const val COPY_ALIAS_FIELD = "copy_alias"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ class RolloverActionParser : ActionParser() {
val minDocs = sin.readOptionalLong()
val minAge = sin.readOptionalTimeValue()
val minPrimaryShardSize = sin.readOptionalWriteable(::ByteSizeValue)
val copyAlias = sin.readBoolean()
val index = sin.readInt()

return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, index)
return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, copyAlias, index)
}

override fun fromXContent(xcp: XContentParser, index: Int): Action {
var minSize: ByteSizeValue? = null
var minDocs: Long? = null
var minAge: TimeValue? = null
var minPrimaryShardSize: ByteSizeValue? = null
var copyAlias = false

ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
Expand All @@ -44,11 +46,12 @@ class RolloverActionParser : ActionParser() {
RolloverAction
.MIN_PRIMARY_SHARD_SIZE_FIELD
)
RolloverAction.COPY_ALIAS_FIELD -> copyAlias = xcp.booleanValue()
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in RolloverAction.")
}
}

return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, index)
return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, copyAlias, index)
}

override fun getActionType(): String {
Expand Down
Loading

0 comments on commit 686c2d6

Please sign in to comment.