Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Switches UpdateManagedIndexMetaData to batch tasks using custom execu… #211

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,25 @@ import org.elasticsearch.action.support.ActionFilters
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.action.support.master.TransportMasterNodeAction
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.AckedClusterStateUpdateTask
import org.elasticsearch.cluster.ClusterState
import org.elasticsearch.cluster.ClusterStateTaskConfig
import org.elasticsearch.cluster.ClusterStateTaskExecutor
import org.elasticsearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult
import org.elasticsearch.cluster.ClusterStateTaskListener
import org.elasticsearch.cluster.block.ClusterBlockException
import org.elasticsearch.cluster.block.ClusterBlockLevel
import org.elasticsearch.cluster.metadata.IndexMetaData
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver
import org.elasticsearch.cluster.metadata.MetaData
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.Priority
import org.elasticsearch.common.inject.Inject
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.Writeable
import org.elasticsearch.index.Index
import org.elasticsearch.threadpool.ThreadPool
import org.elasticsearch.transport.TransportService
import java.lang.Exception

class TransportUpdateManagedIndexMetaDataAction : TransportMasterNodeAction<UpdateManagedIndexMetaDataRequest, AcknowledgedResponse> {

Expand Down Expand Up @@ -69,6 +75,7 @@ class TransportUpdateManagedIndexMetaDataAction : TransportMasterNodeAction<Upda
private val log = LogManager.getLogger(javaClass)
private val client: Client
private val indexStateManagementHistory: IndexStateManagementHistory
private val executor = ManagedIndexMetaDataExecutor()

override fun checkBlock(request: UpdateManagedIndexMetaDataRequest, state: ClusterState): ClusterBlockException? {
// https://github.com/elastic/elasticsearch/commit/ae14b4e6f96b554ca8f4aaf4039b468f52df0123
Expand All @@ -87,45 +94,14 @@ class TransportUpdateManagedIndexMetaDataAction : TransportMasterNodeAction<Upda
) {
clusterService.submitStateUpdateTask(
IndexStateManagementPlugin.PLUGIN_NAME,
object : AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
override fun execute(currentState: ClusterState): ClusterState {
// If there are no indices to make changes to, return early.
// Also doing this because when creating a metaDataBuilder and making no changes to it, for some
// reason the task does not complete, leading to indefinite suspension.
if (request.indicesToAddManagedIndexMetaDataTo.isEmpty() &&
request.indicesToRemoveManagedIndexMetaDataFrom.isEmpty()
) {
return currentState
}

val metaDataBuilder = MetaData.builder(currentState.metaData)

for (pair in request.indicesToAddManagedIndexMetaDataTo) {
if (currentState.metaData.hasIndex(pair.first.name)) {
metaDataBuilder.put(IndexMetaData.builder(currentState.metaData.index(pair.first))
.putCustom(ManagedIndexMetaData.MANAGED_INDEX_METADATA, pair.second.toMap()))
} else {
log.debug("No IndexMetaData found for [${pair.first.name}] when updating ManagedIndexMetaData")
}
}

for (index in request.indicesToRemoveManagedIndexMetaDataFrom) {
if (currentState.metaData.hasIndex(index.name)) {
val indexMetaDataBuilder = IndexMetaData.builder(currentState.metaData.index(index))
indexMetaDataBuilder.removeCustom(ManagedIndexMetaData.MANAGED_INDEX_METADATA)

metaDataBuilder.put(indexMetaDataBuilder)
} else {
log.debug("No IndexMetaData found for [${index.name}] when removing ManagedIndexMetaData")
}
}

return ClusterState.builder(currentState).metaData(metaDataBuilder).build()
}

override fun newResponse(acknowledged: Boolean): AcknowledgedResponse {
return AcknowledgedResponse(acknowledged)
}
ManagedIndexMetaDataTask(request.indicesToAddManagedIndexMetaDataTo, request.indicesToRemoveManagedIndexMetaDataFrom),
ClusterStateTaskConfig.build(Priority.NORMAL),
executor,
object : ClusterStateTaskListener {
override fun onFailure(source: String, e: Exception) = listener.onFailure(e)

override fun clusterStateProcessed(source: String, oldState: ClusterState, newState: ClusterState) =
listener.onResponse(AcknowledgedResponse(true))
}
)

Expand All @@ -143,4 +119,55 @@ class TransportUpdateManagedIndexMetaDataAction : TransportMasterNodeAction<Upda
override fun executor(): String {
return ThreadPool.Names.SAME
}

inner class ManagedIndexMetaDataExecutor : ClusterStateTaskExecutor<ManagedIndexMetaDataTask> {

override fun execute(currentState: ClusterState, tasks: List<ManagedIndexMetaDataTask>): ClusterTasksResult<ManagedIndexMetaDataTask> {
val newClusterState = getUpdatedClusterState(currentState, tasks)
return ClusterTasksResult.builder<ManagedIndexMetaDataTask>().successes(tasks).build(newClusterState)
}
}

fun getUpdatedClusterState(currentState: ClusterState, tasks: List<ManagedIndexMetaDataTask>): ClusterState {
// If there are no indices to make changes to, return early.
// Also doing this because when creating a metaDataBuilder and making no changes to it, for some
// reason the task does not complete, leading to indefinite suspension.
if (tasks.all { it.indicesToAddManagedIndexMetaDataTo.isEmpty() && it.indicesToRemoveManagedIndexMetaDataFrom.isEmpty() }
) {
return currentState
}
log.trace("Start of building new cluster state")
val metaDataBuilder = MetaData.builder(currentState.metaData)
for (task in tasks) {
for (pair in task.indicesToAddManagedIndexMetaDataTo) {
if (currentState.metaData.hasIndex(pair.first.name)) {
metaDataBuilder.put(IndexMetaData.builder(currentState.metaData.index(pair.first))
.putCustom(ManagedIndexMetaData.MANAGED_INDEX_METADATA, pair.second.toMap()))
} else {
log.debug("No IndexMetaData found for [${pair.first.name}] when updating ManagedIndexMetaData")
}
}

for (index in task.indicesToRemoveManagedIndexMetaDataFrom) {
if (currentState.metaData.hasIndex(index.name)) {
val indexMetaDataBuilder = IndexMetaData.builder(currentState.metaData.index(index))
indexMetaDataBuilder.removeCustom(ManagedIndexMetaData.MANAGED_INDEX_METADATA)

metaDataBuilder.put(indexMetaDataBuilder)
} else {
log.debug("No IndexMetaData found for [${index.name}] when removing ManagedIndexMetaData")
}
}
}
log.trace("End of building new cluster state")

return ClusterState.builder(currentState).metaData(metaDataBuilder).build()
}

companion object {
data class ManagedIndexMetaDataTask(
val indicesToAddManagedIndexMetaDataTo: List<Pair<Index, ManagedIndexMetaData>>,
val indicesToRemoveManagedIndexMetaDataFrom: List<Index>
)
}
}