Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Refactors files/classes to prepare for multiple features under the In… #285

Merged
merged 3 commits into from
Aug 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ check.dependsOn jacocoTestReport

esplugin {
name 'opendistro_index_management'
description 'Open Distro Index State Management Plugin'
classname 'com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementPlugin'
description 'Open Distro Index Management Plugin'
classname 'com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin'
extendedPlugins = ['opendistro-job-scheduler']
}

allOpen {
annotation("com.amazon.opendistroforelasticsearch.indexstatemanagement.util.OpenForTesting")
annotation("com.amazon.opendistroforelasticsearch.indexmanagement.util.OpenForTesting")
}

configurations {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexstatemanagement
package com.amazon.opendistroforelasticsearch.indexmanagement

import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementPlugin.Companion.INDEX_STATE_MANAGEMENT_INDEX
import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.IndexUtils
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.OpenForTesting
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util._DOC
import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexmanagement.util.IndexUtils
import com.amazon.opendistroforelasticsearch.indexmanagement.util.OpenForTesting
import com.amazon.opendistroforelasticsearch.indexmanagement.util._DOC
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ResourceAlreadyExistsException
import org.elasticsearch.action.ActionListener
Expand All @@ -36,17 +36,17 @@ import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.xcontent.XContentType

@OpenForTesting
class IndexStateManagementIndices(
class IndexManagementIndices(
private val client: IndicesAdminClient,
private val clusterService: ClusterService
) {

private val logger = LogManager.getLogger(javaClass)

fun checkAndUpdateISMConfigIndex(actionListener: ActionListener<AcknowledgedResponse>) {
if (!indexStateManagementIndexExists()) {
val indexRequest = CreateIndexRequest(INDEX_STATE_MANAGEMENT_INDEX)
.mapping(_DOC, indexStateManagementMappings, XContentType.JSON)
fun checkAndUpdateIMConfigIndex(actionListener: ActionListener<AcknowledgedResponse>) {
if (!indexManagementIndexExists()) {
val indexRequest = CreateIndexRequest(INDEX_MANAGEMENT_INDEX)
.mapping(_DOC, indexManagementMappings, XContentType.JSON)
.settings(Settings.builder().put("index.hidden", true).build())
client.create(indexRequest, object : ActionListener<CreateIndexResponse> {
override fun onFailure(e: Exception) {
Expand All @@ -62,26 +62,26 @@ class IndexStateManagementIndices(
}
}

fun indexStateManagementIndexExists(): Boolean = clusterService.state().routingTable.hasIndex(INDEX_STATE_MANAGEMENT_INDEX)
fun indexManagementIndexExists(): Boolean = clusterService.state().routingTable.hasIndex(INDEX_MANAGEMENT_INDEX)

/**
* Attempt to create [INDEX_STATE_MANAGEMENT_INDEX] and return whether it exists
* Attempt to create [INDEX_MANAGEMENT_INDEX] and return whether it exists
*/
@Suppress("ReturnCount")
suspend fun attemptInitStateManagementIndex(client: Client): Boolean {
if (indexStateManagementIndexExists()) return true
if (indexManagementIndexExists()) return true

return try {
val response: AcknowledgedResponse = client.suspendUntil { checkAndUpdateISMConfigIndex(it) }
val response: AcknowledgedResponse = client.suspendUntil { checkAndUpdateIMConfigIndex(it) }
if (response.isAcknowledged) {
return true
}
logger.error("Creating $INDEX_STATE_MANAGEMENT_INDEX with mappings NOT acknowledged")
logger.error("Creating $INDEX_MANAGEMENT_INDEX with mappings NOT acknowledged")
return false
} catch (e: ResourceAlreadyExistsException) {
true
} catch (e: Exception) {
logger.error("Error trying to create $INDEX_STATE_MANAGEMENT_INDEX", e)
logger.error("Error trying to create $INDEX_MANAGEMENT_INDEX", e)
false
}
}
Expand Down Expand Up @@ -142,9 +142,9 @@ class IndexStateManagementIndices(
const val HISTORY_INDEX_PATTERN = "<$HISTORY_INDEX_BASE-{now/d{yyyy.MM.dd}}-1>"
const val HISTORY_ALL = "$HISTORY_INDEX_BASE*"

val indexStateManagementMappings = IndexStateManagementIndices::class.java.classLoader
val indexManagementMappings = IndexManagementIndices::class.java.classLoader
.getResource("mappings/opendistro-ism-config.json").readText()
val indexStateManagementHistoryMappings = IndexStateManagementIndices::class.java.classLoader
val indexStateManagementHistoryMappings = IndexManagementIndices::class.java.classLoader
.getResource("mappings/opendistro-ism-history.json").readText()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,24 @@
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexstatemanagement

import com.amazon.opendistroforelasticsearch.indexstatemanagement.transport.action.updateindexmetadata.TransportUpdateManagedIndexMetaDataAction
import com.amazon.opendistroforelasticsearch.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Policy
import com.amazon.opendistroforelasticsearch.indexstatemanagement.resthandler.RestAddPolicyAction
import com.amazon.opendistroforelasticsearch.indexstatemanagement.resthandler.RestChangePolicyAction
import com.amazon.opendistroforelasticsearch.indexstatemanagement.resthandler.RestDeletePolicyAction
import com.amazon.opendistroforelasticsearch.indexstatemanagement.resthandler.RestExplainAction
import com.amazon.opendistroforelasticsearch.indexstatemanagement.resthandler.RestGetPolicyAction
import com.amazon.opendistroforelasticsearch.indexstatemanagement.resthandler.RestIndexPolicyAction
import com.amazon.opendistroforelasticsearch.indexstatemanagement.resthandler.RestRemovePolicyAction
import com.amazon.opendistroforelasticsearch.indexstatemanagement.resthandler.RestRetryFailedManagedIndexAction
import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings
package com.amazon.opendistroforelasticsearch.indexmanagement

import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.IndexStateManagementHistory
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.ManagedIndexCoordinator
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.ManagedIndexRunner
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.TransportUpdateManagedIndexMetaDataAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.Policy
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestAddPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestChangePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestDeletePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestExplainAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestGetPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestIndexPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestRemovePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestRetryFailedManagedIndexAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobSchedulerExtension
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobParser
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobRunner
Expand Down Expand Up @@ -59,32 +62,27 @@ import org.elasticsearch.threadpool.ThreadPool
import org.elasticsearch.watcher.ResourceWatcherService
import java.util.function.Supplier

internal class IndexStateManagementPlugin : JobSchedulerExtension, ActionPlugin, Plugin() {
internal class IndexManagementPlugin : JobSchedulerExtension, ActionPlugin, Plugin() {

private val logger = LogManager.getLogger(javaClass)
lateinit var indexStateManagementIndices: IndexStateManagementIndices
lateinit var indexManagementIndices: IndexManagementIndices
lateinit var clusterService: ClusterService

companion object {
const val PLUGIN_NAME = "opendistro-ism"
const val ISM_BASE_URI = "/_opendistro/_ism"
const val PLUGIN_NAME = "opendistro-im"
const val OPEN_DISTRO_BASE_URI = "/_opendistro"
const val ISM_BASE_URI = "$OPEN_DISTRO_BASE_URI/_ism"
const val POLICY_BASE_URI = "$ISM_BASE_URI/policies"
const val INDEX_STATE_MANAGEMENT_INDEX = ".opendistro-ism-config"
const val INDEX_STATE_MANAGEMENT_JOB_TYPE = "opendistro-managed-index"
const val INDEX_MANAGEMENT_INDEX = ".opendistro-ism-config"
dbbaughe marked this conversation as resolved.
Show resolved Hide resolved
const val INDEX_MANAGEMENT_JOB_TYPE = "opendistro-index-management"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a note: Job Scheduler only lets us specify one job type which is why we changed this to be more generic for the whole plugin. At the same time it does not actually use this so it's not that important.

const val INDEX_STATE_MANAGEMENT_HISTORY_TYPE = "managed_index_meta_data"
}

override fun getJobIndex(): String {
return INDEX_STATE_MANAGEMENT_INDEX
}
override fun getJobIndex(): String = INDEX_MANAGEMENT_INDEX

override fun getJobType(): String {
return INDEX_STATE_MANAGEMENT_JOB_TYPE
}
override fun getJobType(): String = INDEX_MANAGEMENT_JOB_TYPE

override fun getJobRunner(): ScheduledJobRunner {
return ManagedIndexRunner
}
override fun getJobRunner(): ScheduledJobRunner = ManagedIndexRunner

override fun getJobParser(): ScheduledJobParser {
return ScheduledJobParser { xcp, id, jobDocVersion ->
Expand All @@ -101,7 +99,7 @@ internal class IndexStateManagementPlugin : JobSchedulerExtension, ActionPlugin,
return@ScheduledJobParser null
}
else -> {
logger.info("Unsupported document was indexed in $INDEX_STATE_MANAGEMENT_INDEX with type: $fieldName")
logger.info("Unsupported document was indexed in $INDEX_MANAGEMENT_INDEX with type: $fieldName")
}
}
}
Expand All @@ -119,7 +117,7 @@ internal class IndexStateManagementPlugin : JobSchedulerExtension, ActionPlugin,
nodesInCluster: Supplier<DiscoveryNodes>
): List<RestHandler> {
return listOf(
RestIndexPolicyAction(settings, clusterService, indexStateManagementIndices),
RestIndexPolicyAction(settings, clusterService, indexManagementIndices),
RestGetPolicyAction(),
RestDeletePolicyAction(),
RestExplainAction(),
Expand Down Expand Up @@ -153,30 +151,36 @@ internal class IndexStateManagementPlugin : JobSchedulerExtension, ActionPlugin,
.registerSettings(settings)
.registerConsumers() // registerConsumers must happen after registerSettings/clusterService

indexStateManagementIndices = IndexStateManagementIndices(client.admin().indices(), clusterService)
indexManagementIndices = IndexManagementIndices(client.admin().indices(), clusterService)
val indexStateManagementHistory =
IndexStateManagementHistory(settings, client, threadPool, clusterService, indexStateManagementIndices)
IndexStateManagementHistory(
settings,
client,
threadPool,
clusterService,
indexManagementIndices
)

val managedIndexCoordinator = ManagedIndexCoordinator(environment.settings(),
client, clusterService, threadPool, indexStateManagementIndices)
client, clusterService, threadPool, indexManagementIndices)

return listOf(managedIndexRunner, indexStateManagementIndices, managedIndexCoordinator, indexStateManagementHistory)
return listOf(managedIndexRunner, indexManagementIndices, managedIndexCoordinator, indexStateManagementHistory)
}

override fun getSettings(): List<Setting<*>> {
return listOf(
ManagedIndexSettings.HISTORY_ENABLED,
ManagedIndexSettings.HISTORY_INDEX_MAX_AGE,
ManagedIndexSettings.HISTORY_MAX_DOCS,
ManagedIndexSettings.HISTORY_RETENTION_PERIOD,
ManagedIndexSettings.HISTORY_ROLLOVER_CHECK_PERIOD,
ManagedIndexSettings.POLICY_ID,
ManagedIndexSettings.ROLLOVER_ALIAS,
ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED,
ManagedIndexSettings.JOB_INTERVAL,
ManagedIndexSettings.SWEEP_PERIOD,
ManagedIndexSettings.COORDINATOR_BACKOFF_COUNT,
ManagedIndexSettings.COORDINATOR_BACKOFF_MILLIS,
ManagedIndexSettings.HISTORY_ENABLED,
ManagedIndexSettings.HISTORY_MAX_DOCS,
ManagedIndexSettings.HISTORY_INDEX_MAX_AGE,
ManagedIndexSettings.HISTORY_ROLLOVER_CHECK_PERIOD,
ManagedIndexSettings.HISTORY_RETENTION_PERIOD,
ManagedIndexSettings.ALLOW_LIST
)
}
Expand Down
Loading