Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support copy alias in rollover #892

Merged
merged 5 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 36 additions & 40 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -613,32 +613,44 @@ task integTestRemote(type: RestIntegTestTask) {

// === Set up BWC tests ===

String bwcMinVersion = "1.13.2.0"
String bwcJobSchedulerVersion = "1.13.0.0"
String bwcMinVersion = "2.6.0.0"
String bwcBundleVersion = "1.3.2.0"
Boolean bwcBundleTest = (project.findProperty('customDistributionDownloadType') != null &&
project.properties['customDistributionDownloadType'] == "bundle");
String bwcVersion = bwcBundleTest ? bwcBundleVersion : bwcMinVersion
String bwcCurrentVersion = opensearch_version.replace("-SNAPSHOT", "")
String baseName = "indexmanagementBwcCluster"
String bwcFilePath = "src/test/resources/bwc/"
String bwc_js_resource_location = bwcFilePath + "job-scheduler/" + bwcJobSchedulerVersion
String bwc_im_resource_location = bwcFilePath + "indexmanagement/" + bwcVersion

// Downloads the bwc job scheduler version
String bwc_js_download_url = "https://github.com/opendistro-for-elasticsearch/job-scheduler/releases/download/v" +
bwcJobSchedulerVersion + "/job-scheduler-artifacts.zip"

// Downloads the bwc index management version
String bwc_im_download_url = "https://github.com/opendistro-for-elasticsearch/index-management/releases/download/v" +
bwcMinVersion + "/index-management-artifacts.zip"
getPluginResource(bwc_im_resource_location, bwc_im_download_url)

configurations {
bwcZip
}
dependencies {
bwcZip "org.opensearch.plugin:opensearch-job-scheduler:${bwcMinVersion}-SNAPSHOT@zip"
bwcZip "org.opensearch.plugin:opensearch-index-management:${bwcMinVersion}-SNAPSHOT@zip"
}
ext.resolvebwcZipFile = { pluginId ->
return new Callable<RegularFile>() {
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
return configurations.bwcZip.resolvedConfiguration.resolvedArtifacts
.find { ResolvedArtifact f ->
f.name.startsWith(pluginId)
}
.file
}
}
}
}
}
Integer bwcNumNodes = 3
2.times {i ->
testClusters {
"${baseName}$i"{
testDistribution = "ARCHIVE"
numberOfNodes = 3
numberOfNodes = bwcNumNodes
if (bwcBundleTest) {
versions = [
"1.3.2", bwcCurrentVersion
Expand Down Expand Up @@ -686,31 +698,10 @@ getPluginResource(bwc_im_resource_location, bwc_im_download_url)
}
} else {
versions = [
"7.10.2", opensearch_version
"2.6.0-SNAPSHOT", opensearch_version
]
plugin(provider(new Callable<RegularFile>(){
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
return getPluginResource(bwc_js_resource_location, bwc_js_download_url)
}
}
}
}))

plugin(provider(new Callable<RegularFile>(){
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
return getPluginResource(bwc_im_resource_location, bwc_im_download_url)
}
}
}
}))
plugin(provider(resolvebwcZipFile("opensearch-job-scheduler")))
plugin(provider(resolvebwcZipFile("opensearch-index-management")))
}

setting 'path.repo',
Expand Down Expand Up @@ -748,6 +739,7 @@ task prepareBwcTests {
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}$i".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}$i".getName()}")
systemProperty 'tests.security.manager', 'false'
systemProperty 'cluster.number_of_nodes', "${bwcNumNodes}"
}
}

Expand All @@ -772,6 +764,7 @@ task "${baseName}#oneThirdsUpgradeCluster"(type: StandaloneRestIntegTestTask) {
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}0".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}0".getName()}")
systemProperty 'tests.security.manager', 'false'
systemProperty 'cluster.number_of_nodes', "${bwcNumNodes}"
}

// Upgrade the second node to new OpenSearch version with upgraded plugin version after the first node is upgraded.
Expand All @@ -792,6 +785,7 @@ task "${baseName}#twoThirdsUpgradedClusterTask"(type: StandaloneRestIntegTestTas
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}0".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}0".getName()}")
systemProperty 'tests.security.manager', 'false'
systemProperty 'cluster.number_of_nodes', "${bwcNumNodes}"
}

// Upgrade the third node to new OpenSearch version with upgraded plugin version after the second node is upgraded.
Expand All @@ -812,6 +806,7 @@ task "${baseName}#rollingUpgradeClusterTask"(type: StandaloneRestIntegTestTask)
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}0".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}0".getName()}")
systemProperty 'tests.security.manager', 'false'
systemProperty 'cluster.number_of_nodes', "${bwcNumNodes}"
}

// Upgrade all the nodes of the old cluster to new OpenSearch version with upgraded plugin version
Expand All @@ -830,14 +825,15 @@ task "${baseName}#fullRestartClusterTask"(type: StandaloneRestIntegTestTask) {
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}1".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}1".getName()}")
systemProperty 'tests.security.manager', 'false'
systemProperty 'cluster.number_of_nodes', "${bwcNumNodes}"
}

// A bwc test suite which runs all the bwc tasks combined
task bwcTestSuite(type: StandaloneRestIntegTestTask) {
exclude '**/*Test*'
exclude '**/*IT*'
// TODO refactor bwc test #677
// dependsOn tasks.named("${baseName}#rollingUpgradeClusterTask")
dependsOn tasks.named("${baseName}#rollingUpgradeClusterTask")
dependsOn tasks.named("${baseName}#fullRestartClusterTask")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ data class ManagedIndexMetaData(
val info: Map<String, Any>?,
val id: String = NO_ID,
val seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO,
val primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM
val primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
val rolledOverIndexName: String? = null,
) : Writeable, ToXContentFragment {

@Suppress("ComplexMethod")
Expand All @@ -51,6 +52,7 @@ data class ManagedIndexMetaData(
if (policyPrimaryTerm != null) resultMap[POLICY_PRIMARY_TERM] = policyPrimaryTerm.toString()
if (policyCompleted != null) resultMap[POLICY_COMPLETED] = policyCompleted.toString()
if (rolledOver != null) resultMap[ROLLED_OVER] = rolledOver.toString()
if (rolledOverIndexName != null) resultMap[ROLLED_OVER_INDEX_NAME] = rolledOverIndexName
if (indexCreationDate != null) resultMap[INDEX_CREATION_DATE] = indexCreationDate.toString()
if (transitionTo != null) resultMap[TRANSITION_TO] = transitionTo
if (stateMetaData != null) resultMap[StateMetaData.STATE] = stateMetaData.getMapValueString()
Expand All @@ -76,6 +78,7 @@ data class ManagedIndexMetaData(
.field(POLICY_PRIMARY_TERM, policyPrimaryTerm)
.field(POLICY_COMPLETED, policyCompleted)
.field(ROLLED_OVER, rolledOver)
.field(ROLLED_OVER_INDEX_NAME, rolledOverIndexName)
.field(INDEX_CREATION_DATE, indexCreationDate)
.field(TRANSITION_TO, transitionTo)
.addObject(StateMetaData.STATE, stateMetaData, params, true)
Expand Down Expand Up @@ -110,6 +113,7 @@ data class ManagedIndexMetaData(
// Only show rolled_over if we have rolled over or we are in the rollover action
if (rolledOver == true || (actionMetaData != null && actionMetaData.name == "rollover")) {
builder.field(ROLLED_OVER, rolledOver)
if (rolledOverIndexName != null) builder.field(ROLLED_OVER_INDEX_NAME, rolledOverIndexName)
}

if (indexCreationDate != null) builder.field(INDEX_CREATION_DATE, indexCreationDate)
Expand Down Expand Up @@ -142,6 +146,7 @@ data class ManagedIndexMetaData(
streamOutput.writeOptionalLong(policyPrimaryTerm)
streamOutput.writeOptionalBoolean(policyCompleted)
streamOutput.writeOptionalBoolean(rolledOver)
streamOutput.writeOptionalString(rolledOverIndexName)
streamOutput.writeOptionalLong(indexCreationDate)
streamOutput.writeOptionalString(transitionTo)

Expand Down Expand Up @@ -172,6 +177,7 @@ data class ManagedIndexMetaData(
const val POLICY_PRIMARY_TERM = "policy_primary_term"
const val POLICY_COMPLETED = "policy_completed"
const val ROLLED_OVER = "rolled_over"
const val ROLLED_OVER_INDEX_NAME = "rolled_over_index_name"
const val INDEX_CREATION_DATE = "index_creation_date"
const val TRANSITION_TO = "transition_to"
const val INFO = "info"
Expand All @@ -185,6 +191,7 @@ data class ManagedIndexMetaData(
val policyPrimaryTerm: Long? = si.readOptionalLong()
val policyCompleted: Boolean? = si.readOptionalBoolean()
val rolledOver: Boolean? = si.readOptionalBoolean()
val rolledOverIndexName: String? = si.readOptionalString()
val indexCreationDate: Long? = si.readOptionalLong()
val transitionTo: String? = si.readOptionalString()

Expand All @@ -207,6 +214,7 @@ data class ManagedIndexMetaData(
policyPrimaryTerm = policyPrimaryTerm,
policyCompleted = policyCompleted,
rolledOver = rolledOver,
rolledOverIndexName = rolledOverIndexName,
indexCreationDate = indexCreationDate,
transitionTo = transitionTo,
stateMetaData = state,
Expand Down Expand Up @@ -234,6 +242,7 @@ data class ManagedIndexMetaData(
var policyPrimaryTerm: Long? = null
var policyCompleted: Boolean? = null
var rolledOver: Boolean? = null
var rolledOverIndexName: String? = null
var indexCreationDate: Long? = null
var transitionTo: String? = null

Expand All @@ -256,6 +265,7 @@ data class ManagedIndexMetaData(
POLICY_PRIMARY_TERM -> policyPrimaryTerm = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.longValue()
POLICY_COMPLETED -> policyCompleted = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.booleanValue()
ROLLED_OVER -> rolledOver = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.booleanValue()
ROLLED_OVER_INDEX_NAME -> rolledOverIndexName = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.text()
INDEX_CREATION_DATE -> indexCreationDate = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.longValue()
TRANSITION_TO -> transitionTo = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.text()
StateMetaData.STATE -> {
Expand All @@ -277,23 +287,24 @@ data class ManagedIndexMetaData(
}

return ManagedIndexMetaData(
requireNotNull(index) { "$INDEX is null" },
requireNotNull(indexUuid) { "$INDEX_UUID is null" },
requireNotNull(policyID) { "$POLICY_ID is null" },
policySeqNo,
policyPrimaryTerm,
policyCompleted,
rolledOver,
indexCreationDate,
transitionTo,
state,
action,
step,
retryInfo,
info,
id,
seqNo,
primaryTerm
index = requireNotNull(index) { "$INDEX is null" },
indexUuid = requireNotNull(indexUuid) { "$INDEX_UUID is null" },
policyID = requireNotNull(policyID) { "$POLICY_ID is null" },
policySeqNo = policySeqNo,
policyPrimaryTerm = policyPrimaryTerm,
policyCompleted = policyCompleted,
rolledOver = rolledOver,
rolledOverIndexName = rolledOverIndexName,
indexCreationDate = indexCreationDate,
transitionTo = transitionTo,
stateMetaData = state,
actionMetaData = action,
stepMetaData = step,
policyRetryInfo = retryInfo,
info = info,
id = id,
seqNo = seqNo,
primaryTerm = primaryTerm,
)
}

Expand Down Expand Up @@ -323,6 +334,7 @@ data class ManagedIndexMetaData(
policyPrimaryTerm = map[POLICY_PRIMARY_TERM]?.toLong(),
policyCompleted = map[POLICY_COMPLETED]?.toBoolean(),
rolledOver = map[ROLLED_OVER]?.toBoolean(),
rolledOverIndexName = map[ROLLED_OVER_INDEX_NAME],
indexCreationDate = map[INDEX_CREATION_DATE]?.toLong(),
transitionTo = map[TRANSITION_TO],
stateMetaData = StateMetaData.fromManagedIndexMetaDataMap(map),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class RolloverAction(
val minDocs: Long?,
val minAge: TimeValue?,
val minPrimaryShardSize: ByteSizeValue?,
val copyAlias: Boolean = false,
index: Int
) : Action(name, index) {

Expand Down Expand Up @@ -47,6 +48,7 @@ class RolloverAction(
if (minDocs != null) builder.field(MIN_DOC_COUNT_FIELD, minDocs)
if (minAge != null) builder.field(MIN_INDEX_AGE_FIELD, minAge.stringRep)
if (minPrimaryShardSize != null) builder.field(MIN_PRIMARY_SHARD_SIZE_FIELD, minPrimaryShardSize.stringRep)
builder.field(COPY_ALIAS_FIELD, copyAlias)
builder.endObject()
}

Expand All @@ -55,6 +57,7 @@ class RolloverAction(
out.writeOptionalLong(minDocs)
out.writeOptionalTimeValue(minAge)
out.writeOptionalWriteable(minPrimaryShardSize)
out.writeBoolean(copyAlias)
out.writeInt(actionIndex)
}

Expand All @@ -64,5 +67,6 @@ class RolloverAction(
const val MIN_DOC_COUNT_FIELD = "min_doc_count"
const val MIN_INDEX_AGE_FIELD = "min_index_age"
const val MIN_PRIMARY_SHARD_SIZE_FIELD = "min_primary_shard_size"
const val COPY_ALIAS_FIELD = "copy_alias"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ class RolloverActionParser : ActionParser() {
val minDocs = sin.readOptionalLong()
val minAge = sin.readOptionalTimeValue()
val minPrimaryShardSize = sin.readOptionalWriteable(::ByteSizeValue)
val copyAlias = sin.readBoolean()
val index = sin.readInt()

return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, index)
return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, copyAlias, index)
}

override fun fromXContent(xcp: XContentParser, index: Int): Action {
var minSize: ByteSizeValue? = null
var minDocs: Long? = null
var minAge: TimeValue? = null
var minPrimaryShardSize: ByteSizeValue? = null
var copyAlias = false

ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
Expand All @@ -44,11 +46,12 @@ class RolloverActionParser : ActionParser() {
RolloverAction
.MIN_PRIMARY_SHARD_SIZE_FIELD
)
RolloverAction.COPY_ALIAS_FIELD -> copyAlias = xcp.booleanValue()
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in RolloverAction.")
}
}

return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, index)
return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, copyAlias, index)
}

override fun getActionType(): String {
Expand Down
Loading