Skip to content

Commit

Permalink
Add primary first calls for different monitor types (#1205) (#1214)
Browse files Browse the repository at this point in the history
(cherry picked from commit 2197d46)

Signed-off-by: Ashish Agrawal <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 0951c26 commit 11ba463
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.routing.Preference
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.xcontent.XContentFactory
Expand Down Expand Up @@ -642,6 +643,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
.query(boolQueryBuilder)
.size(10000) // fixme: make this configurable.
)
.preference(Preference.PRIMARY_FIRST.type())
val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) }
if (response.status() !== RestStatus.OK) {
throw IOException("Failed to search shard: $shard")
Expand Down Expand Up @@ -674,7 +676,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR)
)
}
val searchRequest = SearchRequest(queryIndex)
val searchRequest = SearchRequest(queryIndex).preference(Preference.PRIMARY_FIRST.type())
val searchSourceBuilder = SearchSourceBuilder()
searchSourceBuilder.query(boolQueryBuilder)
searchRequest.source(searchSourceBuilder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap
import org.opensearch.alerting.util.getRoleFilterEnabled
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.cluster.routing.Preference
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.io.stream.BytesStreamOutput
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -99,7 +100,9 @@ class InputService(
.newInstance(searchParams)
.execute()

val searchRequest = SearchRequest().indices(*input.indices.toTypedArray())
val searchRequest = SearchRequest()
.indices(*input.indices.toTypedArray())
.preference(Preference.PRIMARY_FIRST.type())
XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use {
searchRequest.source(SearchSourceBuilder.fromXContent(it))
}
Expand Down Expand Up @@ -191,7 +194,9 @@ class InputService(
.newInstance(searchParams)
.execute()

val searchRequest = SearchRequest().indices(*input.indices.toTypedArray())
val searchRequest = SearchRequest()
.indices(*input.indices.toTypedArray())
.preference(Preference.PRIMARY_FIRST.type())
XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use {
searchRequest.source(SearchSourceBuilder.fromXContent(it))
}
Expand Down

0 comments on commit 11ba463

Please sign in to comment.