Skip to content

Commit

Permalink
Non-inclusive nonmenclature update (opensearch-project#337)
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn authored Apr 19, 2022
1 parent 39368bb commit b57346b
Show file tree
Hide file tree
Showing 13 changed files with 51 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ import java.time.Instant
* and the [INDEX_MANAGEMENT_INDEX] for current managed index jobs. It will then compare these
* ManagedIndices to appropriately create or delete each [ManagedIndexConfig]. Each node that has
* the [IndexManagementPlugin] installed will have an instance of this class, but only the elected
* master node will set up the background sweep process and listen for [ClusterChangedEvent].
* cluster manager node will set up the background sweep process and listen for [ClusterChangedEvent].
*
* We do not allow updating to a new policy through Coordinator as this can have bad side effects. If
* a user wants to update an existing [ManagedIndexConfig] to a new policy (or updated version of policy)
Expand Down Expand Up @@ -138,8 +138,8 @@ class ManagedIndexCoordinator(
@Volatile private var jobInterval = JOB_INTERVAL.get(settings)
@Volatile private var jobJitter = JITTER.get(settings)

@Volatile private var isMaster = false
@Volatile private var onMasterTimeStamp: Long = 0L
@Volatile private var isClusterManager = false
@Volatile private var onClusterManagerTimeStamp: Long = 0L

init {
clusterService.addListener(this)
Expand Down Expand Up @@ -177,20 +177,20 @@ class ManagedIndexCoordinator(
return ThreadPool.Names.MANAGEMENT
}

fun onMaster() {
onMasterTimeStamp = System.currentTimeMillis()
logger.info("Cache master node onMaster time: $onMasterTimeStamp")
fun onClusterManager() {
onClusterManagerTimeStamp = System.currentTimeMillis()
logger.info("Cache cluster manager node onClusterManager time: $onClusterManagerTimeStamp")

// Init background sweep when promoted to being master
// Init background sweep when promoted to being cluster manager
initBackgroundSweep()

initMoveMetadata()

initTemplateMigration(templateMigrationEnabledSetting)
}

fun offMaster() {
// Cancel background sweep when demoted from being master
fun offClusterManager() {
// Cancel background sweep when demoted from being cluster manager
scheduledFullSweep?.cancel()

scheduledMoveMetadata?.cancel()
Expand All @@ -199,15 +199,15 @@ class ManagedIndexCoordinator(
}

override fun clusterChanged(event: ClusterChangedEvent) {
// Instead of using a LocalNodeMasterListener to track master changes, this service will
// track them here to avoid conditions where master listener events run after other
// listeners that depend on what happened in the master listener
if (this.isMaster != event.localNodeMaster()) {
this.isMaster = event.localNodeMaster()
if (this.isMaster) {
onMaster()
// Instead of using a LocalNodeMasterListener to track cluster manager changes, this service will
// track them here to avoid conditions where cluster manager listener events run after other
// listeners that depend on what happened in the cluster manager listener
if (this.isClusterManager != event.localNodeMaster()) {
this.isClusterManager = event.localNodeMaster()
if (this.isClusterManager) {
onClusterManager()
} else {
offMaster()
offClusterManager()
}
}

Expand Down Expand Up @@ -465,15 +465,15 @@ class ManagedIndexCoordinator(
/**
* Background sweep process that periodically sweeps for updates to ManagedIndices
*
* This background sweep will only be initialized if the local node is the elected master node.
* This background sweep will only be initialized if the local node is the elected cluster manager node.
* Creates a runnable that is executed as a coroutine in the shared pool of threads on JVM.
*/
@OpenForTesting
fun initBackgroundSweep() {
// If ISM is disabled return early
if (!isIndexStateManagementEnabled()) return

// Do not setup background sweep if we're not the elected master node
// Do not setup background sweep if we're not the elected cluster manager node
if (!clusterService.state().nodes().isLocalNodeElectedMaster) return

// Cancel existing background sweep
Expand Down Expand Up @@ -555,10 +555,10 @@ class ManagedIndexCoordinator(

logger.info("Performing ISM template migration.")
if (enableSetting == 0L) {
if (onMasterTimeStamp != 0L)
templateService.doMigration(Instant.ofEpochMilli(onMasterTimeStamp))
if (onClusterManagerTimeStamp != 0L)
templateService.doMigration(Instant.ofEpochMilli(onClusterManagerTimeStamp))
else {
logger.error("No valid onMaster time cached, cancel ISM template migration job.")
logger.error("No valid onClusterManager time cached, cancel ISM template migration job.")
scheduledTemplateMigration?.cancel()
}
} else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ object ManagedIndexRunner :
@Suppress("ReturnCount", "ComplexMethod", "LongMethod", "ComplexCondition", "NestedBlockDepth")
private suspend fun runManagedIndexConfig(managedIndexConfig: ManagedIndexConfig, jobContext: JobExecutionContext) {
logger.debug("Run job for index ${managedIndexConfig.index}")
// doing a check of local cluster health as we do not want to overload master node with potentially a lot of calls
// doing a check of local cluster health as we do not want to overload cluster manager node with potentially a lot of calls
if (clusterIsRed()) {
logger.debug("Skipping current execution of ${managedIndexConfig.index} because of red cluster health")
return
Expand Down Expand Up @@ -816,7 +816,7 @@ object ManagedIndexRunner :

indexMetaData = response.state.metadata.indices.firstOrNull()?.value
} catch (e: Exception) {
logger.error("Failed to get IndexMetaData from master cluster state for index=$index", e)
logger.error("Failed to get IndexMetaData from cluster manager cluster state for index=$index", e)
}

return indexMetaData
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class LegacyOpenDistroManagedIndexSettings {
Setting.Property.Dynamic
)

// 0: enabled, use onMaster time as ISM template last_updated_time
// 0: enabled, use onClusterManager time as ISM template last_updated_time
// -1: migration ended successfully
// -2: migration ended unsuccessfully
// >0: use this setting (epoch millis) as ISM template last_updated_time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ManagedIndexSettings {
Setting.Property.Dynamic
)

// 0: enabled, use onMaster time as ISM template last_updated_time
// 0: enabled, use onClusterManager time as ISM template last_updated_time
// -1: migration ended successfully
// -2: migration ended unsuccessfully
// >0: use this setting (epoch millis) as ISM template last_updated_time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@ class ExplainRequest : ActionRequest {

val indices: List<String>
val local: Boolean
val masterTimeout: TimeValue
val clusterManagerTimeout: TimeValue
val searchParams: SearchParams
val showPolicy: Boolean
val indexType: String

constructor(
indices: List<String>,
local: Boolean,
masterTimeout: TimeValue,
clusterManagerTimeout: TimeValue,
searchParams: SearchParams,
showPolicy: Boolean,
indexType: String
) : super() {
this.indices = indices
this.local = local
this.masterTimeout = masterTimeout
this.clusterManagerTimeout = clusterManagerTimeout
this.searchParams = searchParams
this.showPolicy = showPolicy
this.indexType = indexType
Expand All @@ -44,7 +44,7 @@ class ExplainRequest : ActionRequest {
constructor(sin: StreamInput) : this(
indices = sin.readStringList(),
local = sin.readBoolean(),
masterTimeout = sin.readTimeValue(),
clusterManagerTimeout = sin.readTimeValue(),
searchParams = SearchParams(sin),
showPolicy = sin.readBoolean(),
indexType = sin.readString()
Expand All @@ -65,7 +65,7 @@ class ExplainRequest : ActionRequest {
override fun writeTo(out: StreamOutput) {
out.writeStringCollection(indices)
out.writeBoolean(local)
out.writeTimeValue(masterTimeout)
out.writeTimeValue(clusterManagerTimeout)
searchParams.writeTo(out)
out.writeBoolean(showPolicy)
out.writeString(indexType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ class TransportExplainAction @Inject constructor(
.indices(*indexNames.toTypedArray())
.metadata(true)
.local(request.local)
.masterNodeTimeout(request.masterTimeout)
.masterNodeTimeout(request.clusterManagerTimeout)

client.admin().cluster().state(
clusterStateRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ import java.io.IOException
class RetryFailedManagedIndexRequest(
val indices: List<String>,
val startState: String?,
val masterTimeout: TimeValue,
val clusterManagerTimeout: TimeValue,
val indexType: String
) : ActionRequest() {

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
indices = sin.readStringList(),
startState = sin.readOptionalString(),
masterTimeout = sin.readTimeValue(),
clusterManagerTimeout = sin.readTimeValue(),
indexType = sin.readString()
)

Expand All @@ -46,7 +46,7 @@ class RetryFailedManagedIndexRequest(
override fun writeTo(out: StreamOutput) {
out.writeStringCollection(indices)
out.writeOptionalString(startState)
out.writeTimeValue(masterTimeout)
out.writeTimeValue(clusterManagerTimeout)
out.writeString(indexType)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class TransportRetryFailedManagedIndexAction @Inject constructor(
.indices(*request.indices.toTypedArray())
.metadata(true)
.local(false)
.masterNodeTimeout(request.masterTimeout)
.masterNodeTimeout(request.clusterManagerTimeout)
.indicesOptions(strictExpandIndicesOptions)

client.threadPool().threadContext.stashContext().use {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ class RollupMapperService(
private fun indexExists(index: String): Boolean = clusterService.state().routingTable.hasIndex(index)

// TODO: error handling - can RemoteTransportException happen here?
// TODO: The use of the master transport action UpdateRollupMappingAction will prevent
// TODO: The use of the cluster manager transport action UpdateRollupMappingAction will prevent
// overwriting an existing rollup job _meta by checking for the job id
// but there is still a race condition if two jobs are added at the same time for the
// same target index. There is a small time window after get mapping and put mappings
Expand Down
2 changes: 1 addition & 1 deletion src/main/plugin-metadata/plugin-security.policy
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
grant {
// needed to find the classloader to load whitelisted classes.
// needed to find the classloader to load allowlisted classes.
permission java.lang.RuntimePermission "createClassLoader";
permission java.lang.RuntimePermission "getClassLoader";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,20 +98,20 @@ class ManagedIndexCoordinatorTests : OpenSearchAllocationTestCase() {
Mockito.verify(cancellable).cancel()
}

fun `test on master`() {
coordinator.onMaster()
fun `test on cluster manager`() {
coordinator.onClusterManager()
Mockito.verify(threadPool, Mockito.times(3)).scheduleWithFixedDelay(Mockito.any(), Mockito.any(), Mockito.anyString())
}

fun `test off master`() {
fun `test off cluster manager`() {
val cancellable = Mockito.mock(Scheduler.Cancellable::class.java)

coordinator.offMaster()
coordinator.offClusterManager()
Mockito.verify(cancellable, Mockito.times(0)).cancel()

Mockito.`when`(threadPool.scheduleWithFixedDelay(Mockito.any(), Mockito.any(), Mockito.anyString())).thenReturn(cancellable)
coordinator.initBackgroundSweep()
coordinator.offMaster()
coordinator.offClusterManager()
Mockito.verify(cancellable).cancel()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ class ExplainRequestTests : OpenSearchTestCase() {
fun `test explain request`() {
val indices = listOf("index1", "index2")
val local = true
val masterTimeout = TimeValue.timeValueSeconds(30)
val clusterManagerTimeout = TimeValue.timeValueSeconds(30)
val params = SearchParams(0, 20, "sort-field", "asc", "*")
val showPolicy = false
val req = ExplainRequest(indices, local, masterTimeout, params, showPolicy, DEFAULT_INDEX_TYPE)
val req = ExplainRequest(indices, local, clusterManagerTimeout, params, showPolicy, DEFAULT_INDEX_TYPE)

val out = BytesStreamOutput()
req.writeTo(out)
Expand All @@ -33,10 +33,10 @@ class ExplainRequestTests : OpenSearchTestCase() {
fun `test explain policy request with non default index type and multiple indices fails`() {
val indices = listOf("index1", "index2")
val local = true
val masterTimeout = TimeValue.timeValueSeconds(30)
val clusterManagerTimeout = TimeValue.timeValueSeconds(30)
val params = SearchParams(0, 20, "sort-field", "asc", "*")
val showPolicy = false
val req = ExplainRequest(indices, local, masterTimeout, params, showPolicy, "non-existent-index-type")
val req = ExplainRequest(indices, local, clusterManagerTimeout, params, showPolicy, "non-existent-index-type")

val actualException: String? = req.validate()?.validationErrors()?.firstOrNull()
val expectedException: String = ExplainRequest.MULTIPLE_INDICES_CUSTOM_INDEX_TYPE_ERROR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ class RetryFailedManagedIndexRequestTests : OpenSearchTestCase() {
fun `test retry managed index request`() {
val indices = listOf("index1", "index2")
val startState = "state1"
val masterTimeout = TimeValue.timeValueSeconds(30)
val req = RetryFailedManagedIndexRequest(indices, startState, masterTimeout, DEFAULT_INDEX_TYPE)
val clusterManagerTimeout = TimeValue.timeValueSeconds(30)
val req = RetryFailedManagedIndexRequest(indices, startState, clusterManagerTimeout, DEFAULT_INDEX_TYPE)

val out = BytesStreamOutput()
req.writeTo(out)
Expand All @@ -30,8 +30,8 @@ class RetryFailedManagedIndexRequestTests : OpenSearchTestCase() {
fun `test retry managed index request with non default index type and multiple indices fails`() {
val indices = listOf("index1", "index2")
val startState = "state1"
val masterTimeout = TimeValue.timeValueSeconds(30)
val req = RetryFailedManagedIndexRequest(indices, startState, masterTimeout, "non-existent-index-type")
val clusterManagerTimeout = TimeValue.timeValueSeconds(30)
val req = RetryFailedManagedIndexRequest(indices, startState, clusterManagerTimeout, "non-existent-index-type")

val actualException: String? = req.validate()?.validationErrors()?.firstOrNull()
val expectedException: String = RetryFailedManagedIndexRequest.MULTIPLE_INDICES_CUSTOM_INDEX_TYPE_ERROR
Expand Down

0 comments on commit b57346b

Please sign in to comment.