diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/transport/action/updateindexmetadata/TransportUpdateManagedIndexMetaDataAction.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/transport/action/updateindexmetadata/TransportUpdateManagedIndexMetaDataAction.kt index a0d0712cf..044938d77 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/transport/action/updateindexmetadata/TransportUpdateManagedIndexMetaDataAction.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/transport/action/updateindexmetadata/TransportUpdateManagedIndexMetaDataAction.kt @@ -28,17 +28,23 @@ import org.elasticsearch.action.support.ActionFilters import org.elasticsearch.action.support.master.AcknowledgedResponse import org.elasticsearch.action.support.master.TransportMasterNodeAction import org.elasticsearch.client.Client -import org.elasticsearch.cluster.AckedClusterStateUpdateTask import org.elasticsearch.cluster.ClusterState +import org.elasticsearch.cluster.ClusterStateTaskConfig +import org.elasticsearch.cluster.ClusterStateTaskExecutor +import org.elasticsearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult +import org.elasticsearch.cluster.ClusterStateTaskListener import org.elasticsearch.cluster.block.ClusterBlockException import org.elasticsearch.cluster.block.ClusterBlockLevel import org.elasticsearch.cluster.metadata.IndexMetaData import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver import org.elasticsearch.cluster.metadata.MetaData import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.common.Priority import org.elasticsearch.common.inject.Inject +import org.elasticsearch.index.Index import org.elasticsearch.threadpool.ThreadPool import org.elasticsearch.transport.TransportService +import java.lang.Exception import java.util.function.Supplier class TransportUpdateManagedIndexMetaDataAction : TransportMasterNodeAction { @@ -68,6 +74,7 @@ class TransportUpdateManagedIndexMetaDataAction : TransportMasterNodeAction(request, listener) { - override fun execute(currentState: ClusterState): ClusterState { - // If there are no indices to make changes to, return early. - // Also doing this because when creating a metaDataBuilder and making no changes to it, for some - // reason the task does not complete, leading to indefinite suspension. - if (request.indicesToAddManagedIndexMetaDataTo.isEmpty() && - request.indicesToRemoveManagedIndexMetaDataFrom.isEmpty() - ) { - return currentState - } - - val metaDataBuilder = MetaData.builder(currentState.metaData) - - for (pair in request.indicesToAddManagedIndexMetaDataTo) { - if (currentState.metaData.hasIndex(pair.first.name)) { - metaDataBuilder.put(IndexMetaData.builder(currentState.metaData.index(pair.first)) - .putCustom(ManagedIndexMetaData.MANAGED_INDEX_METADATA, pair.second.toMap())) - } else { - log.debug("No IndexMetaData found for [${pair.first.name}] when updating ManagedIndexMetaData") - } - } - - for (index in request.indicesToRemoveManagedIndexMetaDataFrom) { - if (currentState.metaData.hasIndex(index.name)) { - val indexMetaDataBuilder = IndexMetaData.builder(currentState.metaData.index(index)) - indexMetaDataBuilder.removeCustom(ManagedIndexMetaData.MANAGED_INDEX_METADATA) - - metaDataBuilder.put(indexMetaDataBuilder) - } else { - log.debug("No IndexMetaData found for [${index.name}] when removing ManagedIndexMetaData") - } - } - - return ClusterState.builder(currentState).metaData(metaDataBuilder).build() - } - - override fun newResponse(acknowledged: Boolean): AcknowledgedResponse { - return AcknowledgedResponse(acknowledged) - } + ManagedIndexMetaDataTask(request.indicesToAddManagedIndexMetaDataTo, request.indicesToRemoveManagedIndexMetaDataFrom), + ClusterStateTaskConfig.build(Priority.NORMAL), + executor, + object : ClusterStateTaskListener { + override fun onFailure(source: String, e: Exception) = listener.onFailure(e) + + override fun clusterStateProcessed(source: String, oldState: ClusterState, newState: ClusterState) = + listener.onResponse(AcknowledgedResponse(true)) } ) @@ -142,4 +118,55 @@ class TransportUpdateManagedIndexMetaDataAction : TransportMasterNodeAction { + + override fun execute(currentState: ClusterState, tasks: List): ClusterTasksResult { + val newClusterState = getUpdatedClusterState(currentState, tasks) + return ClusterTasksResult.builder().successes(tasks).build(newClusterState) + } + } + + fun getUpdatedClusterState(currentState: ClusterState, tasks: List): ClusterState { + // If there are no indices to make changes to, return early. + // Also doing this because when creating a metaDataBuilder and making no changes to it, for some + // reason the task does not complete, leading to indefinite suspension. + if (tasks.all { it.indicesToAddManagedIndexMetaDataTo.isEmpty() && it.indicesToRemoveManagedIndexMetaDataFrom.isEmpty() } + ) { + return currentState + } + log.trace("Start of building new cluster state") + val metaDataBuilder = MetaData.builder(currentState.metaData) + for (task in tasks) { + for (pair in task.indicesToAddManagedIndexMetaDataTo) { + if (currentState.metaData.hasIndex(pair.first.name)) { + metaDataBuilder.put(IndexMetaData.builder(currentState.metaData.index(pair.first)) + .putCustom(ManagedIndexMetaData.MANAGED_INDEX_METADATA, pair.second.toMap())) + } else { + log.debug("No IndexMetaData found for [${pair.first.name}] when updating ManagedIndexMetaData") + } + } + + for (index in task.indicesToRemoveManagedIndexMetaDataFrom) { + if (currentState.metaData.hasIndex(index.name)) { + val indexMetaDataBuilder = IndexMetaData.builder(currentState.metaData.index(index)) + indexMetaDataBuilder.removeCustom(ManagedIndexMetaData.MANAGED_INDEX_METADATA) + + metaDataBuilder.put(indexMetaDataBuilder) + } else { + log.debug("No IndexMetaData found for [${index.name}] when removing ManagedIndexMetaData") + } + } + } + log.trace("End of building new cluster state") + + return ClusterState.builder(currentState).metaData(metaDataBuilder).build() + } + + companion object { + data class ManagedIndexMetaDataTask( + val indicesToAddManagedIndexMetaDataTo: List>, + val indicesToRemoveManagedIndexMetaDataFrom: List + ) + } }