diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index 0546d0733..6c943cef8 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -9,10 +9,15 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchException import org.opensearch.OpenSearchSecurityException import org.opensearch.OpenSearchStatusException +import org.opensearch.ResourceAlreadyExistsException import org.opensearch.action.ActionListener import org.opensearch.action.ActionRequest +import org.opensearch.action.admin.cluster.health.ClusterHealthAction +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse import org.opensearch.action.admin.indices.create.CreateIndexResponse import org.opensearch.action.admin.indices.get.GetIndexRequest import org.opensearch.action.admin.indices.get.GetIndexResponse @@ -297,10 +302,30 @@ class TransportIndexMonitorAction @Inject constructor( if (!scheduledJobIndices.scheduledJobIndexExists()) { scheduledJobIndices.initScheduledJobIndex(object : ActionListener { override fun onResponse(response: CreateIndexResponse) { - onCreateMappingsResponse(response) + onCreateMappingsResponse(response.isAcknowledged) } override fun onFailure(t: Exception) { - actionListener.onFailure(AlertingException.wrap(t)) + // https://github.com/opensearch-project/alerting/issues/646 + if (t is ResourceAlreadyExistsException && t.message?.contains("already exists") == true) { + scope.launch { + // Wait for the yellow status + val request = ClusterHealthRequest() + .indices(SCHEDULED_JOBS_INDEX) + .waitForYellowStatus() + val response: ClusterHealthResponse = client.suspendUntil { + execute(ClusterHealthAction.INSTANCE, request, it) + } + if (response.isTimedOut) { + actionListener.onFailure( + OpenSearchException("Cannot determine that the $SCHEDULED_JOBS_INDEX index is healthy") + ) + } + // Retry mapping of monitor + onCreateMappingsResponse(true) + } + } else { + actionListener.onFailure(AlertingException.wrap(t)) + } } }) } else if (!IndexUtils.scheduledJobIndexUpdated) { @@ -346,6 +371,7 @@ class TransportIndexMonitorAction @Inject constructor( val query = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("${Monitor.MONITOR_TYPE}.type", Monitor.MONITOR_TYPE)) val searchSource = SearchSourceBuilder().query(query).timeout(requestTimeout) val searchRequest = SearchRequest(SCHEDULED_JOBS_INDEX).source(searchSource) + client.search( searchRequest, object : ActionListener { @@ -401,8 +427,8 @@ class TransportIndexMonitorAction @Inject constructor( } } - private fun onCreateMappingsResponse(response: CreateIndexResponse) { - if (response.isAcknowledged) { + private fun onCreateMappingsResponse(isAcknowledged: Boolean) { + if (isAcknowledged) { log.info("Created $SCHEDULED_JOBS_INDEX with mappings.") prepareMonitorIndexing() IndexUtils.scheduledJobIndexUpdated()