diff --git a/build.gradle b/build.gradle index 2fbecd584..d6b24cdc3 100644 --- a/build.gradle +++ b/build.gradle @@ -15,7 +15,7 @@ buildscript { ext { - es_version = System.getProperty("es.version", "7.2.0") + es_version = System.getProperty("es.version", "7.3.2") kotlin_version = System.getProperty("kotlin.version", "1.3.31") } @@ -73,12 +73,12 @@ detekt { dependencies { compileOnly "org.elasticsearch:elasticsearch:${es_version}" - compileOnly "com.amazon.opendistroforelasticsearch:opendistro-job-scheduler-spi:1.2.0.0" + compileOnly "com.amazon.opendistroforelasticsearch:opendistro-job-scheduler-spi:1.3.0.0" compile "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}" compile "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}" compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.1' compile "org.jetbrains:annotations:13.0" - compile "com.amazon.opendistroforelasticsearch:notification:1.2.0.0" + compile "com.amazon.opendistroforelasticsearch:notification:1.3.0.0" testCompile "org.elasticsearch.test:framework:${es_version}" testCompile "org.jetbrains.kotlin:kotlin-test:${kotlin_version}" diff --git a/gradle.properties b/gradle.properties index dee6691b1..466663f8a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -13,4 +13,4 @@ # permissions and limitations under the License. # -version = 1.2.0 +version = 1.3.0 diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 213210821..6fb1305a4 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-5.4.1-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.2-all.zip diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementPlugin.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementPlugin.kt index f93cb968b..3c9e92c87 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementPlugin.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementPlugin.kt @@ -181,7 +181,7 @@ internal class IndexStateManagementPlugin : JobSchedulerExtension, ActionPlugin, override fun getActions(): List> { return listOf( ActionPlugin.ActionHandler( - UpdateManagedIndexMetaDataAction, + UpdateManagedIndexMetaDataAction.INSTANCE, TransportUpdateManagedIndexMetaDataAction::class.java ) ) diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexCoordinator.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexCoordinator.kt index f1b6202da..3ff04627d 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexCoordinator.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexCoordinator.kt @@ -428,7 +428,7 @@ class ManagedIndexCoordinator( val request = UpdateManagedIndexMetaDataRequest(indicesToRemoveManagedIndexMetaDataFrom = indices) retryPolicy.retry(logger) { - val response: AcknowledgedResponse = client.suspendUntil { execute(UpdateManagedIndexMetaDataAction, request, it) } + val response: AcknowledgedResponse = client.suspendUntil { execute(UpdateManagedIndexMetaDataAction.INSTANCE, request, it) } if (!response.isAcknowledged) logger.error("Failed to remove ManagedIndexMetaData for [indices=$indices]") } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexRunner.kt index 823ffe2c0..05c70ce21 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexRunner.kt @@ -174,13 +174,13 @@ object ManagedIndexRunner : ScheduledJobRunner, launch { // Attempt to acquire lock - val lock: LockModel? = withContext(Dispatchers.IO) { context.lockService.acquireLock(job, context) } + val lock: LockModel? = context.lockService.suspendUntil { acquireLock(job, context, it) } if (lock == null) { logger.debug("Could not acquire lock for ${job.index}") } else { runManagedIndexConfig(job) // Release lock - val released = withContext(Dispatchers.IO) { context.lockService.release(lock) } + val released: Boolean = context.lockService.suspendUntil { release(lock, it) } if (!released) { logger.debug("Could not release lock for ${job.index}") } @@ -510,7 +510,7 @@ object ManagedIndexRunner : ScheduledJobRunner, ) ) updateMetaDataRetryPolicy.retry(logger) { - val response: AcknowledgedResponse = client.suspendUntil { execute(UpdateManagedIndexMetaDataAction, request, it) } + val response: AcknowledgedResponse = client.suspendUntil { execute(UpdateManagedIndexMetaDataAction.INSTANCE, request, it) } if (response.isAcknowledged) { result = true } else { diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/elasticapi/ElasticExtensions.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/elasticapi/ElasticExtensions.kt index 8fc53ac19..576fea622 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/elasticapi/ElasticExtensions.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/elasticapi/ElasticExtensions.kt @@ -20,6 +20,7 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.coordinator.ClusterStateManagedIndexConfig import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings +import com.amazon.opendistroforelasticsearch.jobscheduler.spi.utils.LockService import kotlinx.coroutines.delay import org.apache.logging.log4j.Logger import org.elasticsearch.ElasticsearchException @@ -124,6 +125,20 @@ suspend fun C.suspendUntil(block: C.(ActionListener }) } +/** + * Converts [LockService] methods that take a callback into a kotlin suspending function. + * + * @param block - a block of code that is passed an [ActionListener] that should be passed to the LockService API. + */ +suspend fun LockService.suspendUntil(block: LockService.(ActionListener) -> Unit): T = + suspendCoroutine { cont -> + block(object : ActionListener { + override fun onResponse(response: T) = cont.resume(response) + + override fun onFailure(e: Exception) = cont.resumeWithException(e) + }) + } + /** * Compares current and previous IndexMetaData to determine if we should create [ManagedIndexConfig]. * diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestRetryFailedManagedIndexAction.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestRetryFailedManagedIndexAction.kt index 881d14000..35258695d 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestRetryFailedManagedIndexAction.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestRetryFailedManagedIndexAction.kt @@ -166,7 +166,7 @@ class RestRetryFailedManagedIndexAction( val updateManagedIndexMetaDataRequest = UpdateManagedIndexMetaDataRequest(indicesToAddManagedIndexMetaDataTo = listOfIndexMetaData) client.execute( - UpdateManagedIndexMetaDataAction, + UpdateManagedIndexMetaDataAction.INSTANCE, updateManagedIndexMetaDataRequest, ActionListener.wrap(::onUpdateManagedIndexMetaDataActionResponse, ::onFailure) ) diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/transport/action/updateindexmetadata/TransportUpdateManagedIndexMetaDataAction.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/transport/action/updateindexmetadata/TransportUpdateManagedIndexMetaDataAction.kt index a0d0712cf..2d3bf0a58 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/transport/action/updateindexmetadata/TransportUpdateManagedIndexMetaDataAction.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/transport/action/updateindexmetadata/TransportUpdateManagedIndexMetaDataAction.kt @@ -37,6 +37,8 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver import org.elasticsearch.cluster.metadata.MetaData import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.common.inject.Inject +import org.elasticsearch.common.io.stream.StreamInput +import org.elasticsearch.common.io.stream.Writeable import org.elasticsearch.threadpool.ThreadPool import org.elasticsearch.transport.TransportService import java.util.function.Supplier @@ -53,7 +55,7 @@ class TransportUpdateManagedIndexMetaDataAction : TransportMasterNodeAction("cluster:admin/ism/update/managedindexmetadata") { - override fun newResponse(): AcknowledgedResponse { - return AcknowledgedResponse() +class UpdateManagedIndexMetaDataAction : ActionType(NAME, reader) { + + companion object { + const val NAME = "cluster:admin/ism/update/managedindexmetadata" + val INSTANCE = UpdateManagedIndexMetaDataAction() + + val reader = Writeable.Reader { AcknowledgedResponse(it) } } + + override fun getResponseReader(): Writeable.Reader = reader } diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt index a0e6b22e2..07c86a399 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt @@ -64,8 +64,7 @@ class ManagedIndexCoordinatorTests : ESAllocationTestCase() { settings = Settings.builder().build() - discoveryNode = DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), emptyMap(), - DiscoveryNode.Role.values().toSet(), Version.CURRENT) + discoveryNode = DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT) val settingSet = hashSetOf>() settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) diff --git a/src/test/resources/job-scheduler/opendistro-job-scheduler-1.2.0.0-SNAPSHOT.zip b/src/test/resources/job-scheduler/opendistro-job-scheduler-1.2.0.0-SNAPSHOT.zip deleted file mode 100644 index 0da726bde..000000000 Binary files a/src/test/resources/job-scheduler/opendistro-job-scheduler-1.2.0.0-SNAPSHOT.zip and /dev/null differ diff --git a/src/test/resources/job-scheduler/opendistro-job-scheduler-1.3.0.0-SNAPSHOT.zip b/src/test/resources/job-scheduler/opendistro-job-scheduler-1.3.0.0-SNAPSHOT.zip new file mode 100644 index 000000000..60bc7b143 Binary files /dev/null and b/src/test/resources/job-scheduler/opendistro-job-scheduler-1.3.0.0-SNAPSHOT.zip differ