Skip to content

Commit

Permalink
Adds support for ES 7.3.2 (opendistro-for-elasticsearch#109)
Browse files Browse the repository at this point in the history
* Updates usage of LockService to new async API (opendistro-for-elasticsearch#107)

* Support ES 7.3.2

* Remove unused code
  • Loading branch information
dbbaughe authored Nov 21, 2019
1 parent 6349454 commit 4eea94f
Show file tree
Hide file tree
Showing 13 changed files with 46 additions and 19 deletions.
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

buildscript {
ext {
es_version = System.getProperty("es.version", "7.2.0")
es_version = System.getProperty("es.version", "7.3.2")
kotlin_version = System.getProperty("kotlin.version", "1.3.31")
}

Expand Down Expand Up @@ -73,12 +73,12 @@ detekt {

dependencies {
compileOnly "org.elasticsearch:elasticsearch:${es_version}"
compileOnly "com.amazon.opendistroforelasticsearch:opendistro-job-scheduler-spi:1.2.0.0"
compileOnly "com.amazon.opendistroforelasticsearch:opendistro-job-scheduler-spi:1.3.0.0"
compile "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
compile "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.1'
compile "org.jetbrains:annotations:13.0"
compile "com.amazon.opendistroforelasticsearch:notification:1.2.0.0"
compile "com.amazon.opendistroforelasticsearch:notification:1.3.0.0"

testCompile "org.elasticsearch.test:framework:${es_version}"
testCompile "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# permissions and limitations under the License.
#

version = 1.2.0
version = 1.3.0
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-5.4.1-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.2-all.zip
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ internal class IndexStateManagementPlugin : JobSchedulerExtension, ActionPlugin,
override fun getActions(): List<ActionPlugin.ActionHandler<out ActionRequest, out ActionResponse>> {
return listOf(
ActionPlugin.ActionHandler(
UpdateManagedIndexMetaDataAction,
UpdateManagedIndexMetaDataAction.INSTANCE,
TransportUpdateManagedIndexMetaDataAction::class.java
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ class ManagedIndexCoordinator(
val request = UpdateManagedIndexMetaDataRequest(indicesToRemoveManagedIndexMetaDataFrom = indices)

retryPolicy.retry(logger) {
val response: AcknowledgedResponse = client.suspendUntil { execute(UpdateManagedIndexMetaDataAction, request, it) }
val response: AcknowledgedResponse = client.suspendUntil { execute(UpdateManagedIndexMetaDataAction.INSTANCE, request, it) }

if (!response.isAcknowledged) logger.error("Failed to remove ManagedIndexMetaData for [indices=$indices]")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,13 @@ object ManagedIndexRunner : ScheduledJobRunner,

launch {
// Attempt to acquire lock
val lock: LockModel? = withContext(Dispatchers.IO) { context.lockService.acquireLock(job, context) }
val lock: LockModel? = context.lockService.suspendUntil { acquireLock(job, context, it) }
if (lock == null) {
logger.debug("Could not acquire lock for ${job.index}")
} else {
runManagedIndexConfig(job)
// Release lock
val released = withContext(Dispatchers.IO) { context.lockService.release(lock) }
val released: Boolean = context.lockService.suspendUntil { release(lock, it) }
if (!released) {
logger.debug("Could not release lock for ${job.index}")
}
Expand Down Expand Up @@ -510,7 +510,7 @@ object ManagedIndexRunner : ScheduledJobRunner,
)
)
updateMetaDataRetryPolicy.retry(logger) {
val response: AcknowledgedResponse = client.suspendUntil { execute(UpdateManagedIndexMetaDataAction, request, it) }
val response: AcknowledgedResponse = client.suspendUntil { execute(UpdateManagedIndexMetaDataAction.INSTANCE, request, it) }
if (response.isAcknowledged) {
result = true
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.coordinator.ClusterStateManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.utils.LockService
import kotlinx.coroutines.delay
import org.apache.logging.log4j.Logger
import org.elasticsearch.ElasticsearchException
Expand Down Expand Up @@ -124,6 +125,20 @@ suspend fun <C : ElasticsearchClient, T> C.suspendUntil(block: C.(ActionListener
})
}

/**
* Converts [LockService] methods that take a callback into a kotlin suspending function.
*
* @param block - a block of code that is passed an [ActionListener] that should be passed to the LockService API.
*/
suspend fun <T> LockService.suspendUntil(block: LockService.(ActionListener<T>) -> Unit): T =
suspendCoroutine { cont ->
block(object : ActionListener<T> {
override fun onResponse(response: T) = cont.resume(response)

override fun onFailure(e: Exception) = cont.resumeWithException(e)
})
}

/**
* Compares current and previous IndexMetaData to determine if we should create [ManagedIndexConfig].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class RestRetryFailedManagedIndexAction(
val updateManagedIndexMetaDataRequest =
UpdateManagedIndexMetaDataRequest(indicesToAddManagedIndexMetaDataTo = listOfIndexMetaData)
client.execute(
UpdateManagedIndexMetaDataAction,
UpdateManagedIndexMetaDataAction.INSTANCE,
updateManagedIndexMetaDataRequest,
ActionListener.wrap(::onUpdateManagedIndexMetaDataActionResponse, ::onFailure)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver
import org.elasticsearch.cluster.metadata.MetaData
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.inject.Inject
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.Writeable
import org.elasticsearch.threadpool.ThreadPool
import org.elasticsearch.transport.TransportService
import java.util.function.Supplier
Expand All @@ -53,7 +55,7 @@ class TransportUpdateManagedIndexMetaDataAction : TransportMasterNodeAction<Upda
indexNameExpressionResolver: IndexNameExpressionResolver,
indexStateManagementHistory: IndexStateManagementHistory
) : super(
UpdateManagedIndexMetaDataAction.name(),
UpdateManagedIndexMetaDataAction.INSTANCE.name(),
transportService,
clusterService,
threadPool,
Expand Down Expand Up @@ -135,8 +137,12 @@ class TransportUpdateManagedIndexMetaDataAction : TransportMasterNodeAction<Upda
}
}

override fun read(si: StreamInput): AcknowledgedResponse {
return AcknowledgedResponse(si)
}

override fun newResponse(): AcknowledgedResponse {
return AcknowledgedResponse()
throw UnsupportedOperationException("usage of Streamable is to be replaced by Writeable")
}

override fun executor(): String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,18 @@

package com.amazon.opendistroforelasticsearch.indexstatemanagement.transport.action.updateindexmetadata

import org.elasticsearch.action.Action
import org.elasticsearch.action.ActionType
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.common.io.stream.Writeable

object UpdateManagedIndexMetaDataAction : Action<AcknowledgedResponse>("cluster:admin/ism/update/managedindexmetadata") {
override fun newResponse(): AcknowledgedResponse {
return AcknowledgedResponse()
class UpdateManagedIndexMetaDataAction : ActionType<AcknowledgedResponse>(NAME, reader) {

companion object {
const val NAME = "cluster:admin/ism/update/managedindexmetadata"
val INSTANCE = UpdateManagedIndexMetaDataAction()

val reader = Writeable.Reader { AcknowledgedResponse(it) }
}

override fun getResponseReader(): Writeable.Reader<AcknowledgedResponse> = reader
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ class ManagedIndexCoordinatorTests : ESAllocationTestCase() {

settings = Settings.builder().build()

discoveryNode = DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), emptyMap<String, String>(),
DiscoveryNode.Role.values().toSet(), Version.CURRENT)
discoveryNode = DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT)

val settingSet = hashSetOf<Setting<*>>()
settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
Expand Down
Binary file not shown.
Binary file not shown.

0 comments on commit 4eea94f

Please sign in to comment.