From 2684f67c038b282f3df3e0847657ac80fd30b5e4 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Mon, 7 Nov 2022 12:57:32 -0800 Subject: [PATCH] =?UTF-8?q?Added=20exception=20check=20once=20the=20.opend?= =?UTF-8?q?istro-alerting-config=20index=20is=20b=E2=80=A6=20(#650)=20(#65?= =?UTF-8?q?7)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Added exception check once the .opendistro-alerting-config index is being created During .opendistro-alerting-config index creation, if ResourceAlreadyExists exception is being raised, the flow will check first if the index is in yellow state and then it will re-try to index monitor Signed-off-by: Stevan Buzejic * Formating of the file fixed Signed-off-by: Stevan Buzejic Signed-off-by: Stevan Buzejic (cherry picked from commit ceff6093dc32aa45ecb75acef6388f4b477e1177) Co-authored-by: Stevan Buzejic <30922513+stevanbz@users.noreply.github.com> --- .../transport/TransportIndexMonitorAction.kt | 34 ++++++++++++++++--- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index 0546d0733..6c943cef8 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -9,10 +9,15 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchException import org.opensearch.OpenSearchSecurityException import org.opensearch.OpenSearchStatusException +import org.opensearch.ResourceAlreadyExistsException import org.opensearch.action.ActionListener import org.opensearch.action.ActionRequest +import org.opensearch.action.admin.cluster.health.ClusterHealthAction +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse import org.opensearch.action.admin.indices.create.CreateIndexResponse import org.opensearch.action.admin.indices.get.GetIndexRequest import org.opensearch.action.admin.indices.get.GetIndexResponse @@ -297,10 +302,30 @@ class TransportIndexMonitorAction @Inject constructor( if (!scheduledJobIndices.scheduledJobIndexExists()) { scheduledJobIndices.initScheduledJobIndex(object : ActionListener { override fun onResponse(response: CreateIndexResponse) { - onCreateMappingsResponse(response) + onCreateMappingsResponse(response.isAcknowledged) } override fun onFailure(t: Exception) { - actionListener.onFailure(AlertingException.wrap(t)) + // https://github.com/opensearch-project/alerting/issues/646 + if (t is ResourceAlreadyExistsException && t.message?.contains("already exists") == true) { + scope.launch { + // Wait for the yellow status + val request = ClusterHealthRequest() + .indices(SCHEDULED_JOBS_INDEX) + .waitForYellowStatus() + val response: ClusterHealthResponse = client.suspendUntil { + execute(ClusterHealthAction.INSTANCE, request, it) + } + if (response.isTimedOut) { + actionListener.onFailure( + OpenSearchException("Cannot determine that the $SCHEDULED_JOBS_INDEX index is healthy") + ) + } + // Retry mapping of monitor + onCreateMappingsResponse(true) + } + } else { + actionListener.onFailure(AlertingException.wrap(t)) + } } }) } else if (!IndexUtils.scheduledJobIndexUpdated) { @@ -346,6 +371,7 @@ class TransportIndexMonitorAction @Inject constructor( val query = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("${Monitor.MONITOR_TYPE}.type", Monitor.MONITOR_TYPE)) val searchSource = SearchSourceBuilder().query(query).timeout(requestTimeout) val searchRequest = SearchRequest(SCHEDULED_JOBS_INDEX).source(searchSource) + client.search( searchRequest, object : ActionListener { @@ -401,8 +427,8 @@ class TransportIndexMonitorAction @Inject constructor( } } - private fun onCreateMappingsResponse(response: CreateIndexResponse) { - if (response.isAcknowledged) { + private fun onCreateMappingsResponse(isAcknowledged: Boolean) { + if (isAcknowledged) { log.info("Created $SCHEDULED_JOBS_INDEX with mappings.") prepareMonitorIndexing() IndexUtils.scheduledJobIndexUpdated()