Skip to content

Commit

Permalink
Added minimum for search.cancel_after_time_interval setting for rollu…
Browse files Browse the repository at this point in the history
…ps (#1026) (#1041)

* Added minimum for search.cancel_after_time_interval setting for rollups



* Added constant for cancel_after_time_interval for rollup search



* Handled case of default value for cancel interval



* Added comment explanation for default rollup cancel after time interval



* Fixed github workflow checks



---------



(cherry picked from commit 9b7b3bd)

Signed-off-by: Joshua Au <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: bowenlan-amzn <[email protected]>
  • Loading branch information
3 people authored Nov 20, 2023
1 parent 369dc52 commit 6c61935
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ import org.opensearch.core.action.ActionListener
import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.action.search.SearchPhaseExecutionException
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.search.TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.core.common.breaker.CircuitBreakingException
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.indexmanagement.opensearchapi.retry
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.rollup.model.RollupMetadata
import org.opensearch.indexmanagement.rollup.settings.RollupSettings.Companion.MINIMUM_CANCEL_AFTER_TIME_INTERVAL_MINUTES
import org.opensearch.indexmanagement.rollup.settings.RollupSettings.Companion.ROLLUP_SEARCH_BACKOFF_COUNT
import org.opensearch.indexmanagement.rollup.settings.RollupSettings.Companion.ROLLUP_SEARCH_BACKOFF_MILLIS
import org.opensearch.indexmanagement.rollup.util.getRollupSearchRequest
Expand All @@ -44,10 +47,16 @@ class RollupSearchService(
@Volatile private var retrySearchPolicy =
BackoffPolicy.constantBackoff(ROLLUP_SEARCH_BACKOFF_MILLIS.get(settings), ROLLUP_SEARCH_BACKOFF_COUNT.get(settings))

@Volatile private var cancelAfterTimeInterval = SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING.get(settings)

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(ROLLUP_SEARCH_BACKOFF_MILLIS, ROLLUP_SEARCH_BACKOFF_COUNT) { millis, count ->
retrySearchPolicy = BackoffPolicy.constantBackoff(millis, count)
}

clusterService.clusterSettings.addSettingsUpdateConsumer(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING) {
cancelAfterTimeInterval = it
}
}

// TODO: Failed shouldn't process? How to recover from failed -> how does a user retry a failed rollup
Expand Down Expand Up @@ -103,7 +112,12 @@ class RollupSearchService(
"Composite search failed for rollup, retrying [#${retryCount - 1}] -" +
" reducing page size of composite aggregation from ${job.pageSize} to $pageSize"
)
search(job.copy(pageSize = pageSize).getRollupSearchRequest(metadata), listener)

val searchRequest = job.copy(pageSize = pageSize).getRollupSearchRequest(metadata)
val cancelTimeoutTimeValue = TimeValue.timeValueMinutes(getCancelAfterTimeInterval(cancelAfterTimeInterval.minutes))
searchRequest.cancelAfterTimeInterval = cancelTimeoutTimeValue

search(searchRequest, listener)
}
}
)
Expand Down Expand Up @@ -132,6 +146,16 @@ class RollupSearchService(
RollupSearchResult.Failure(cause = e)
}
}

private fun getCancelAfterTimeInterval(givenInterval: Long): Long {
// The default value for the cancelAfterTimeInterval is -1 and so, in this case
// we should ignore processing on the value
if (givenInterval == -1L) {
return givenInterval
}

return max(cancelAfterTimeInterval.minutes(), MINIMUM_CANCEL_AFTER_TIME_INTERVAL_MINUTES)
}
}

sealed class RollupSearchResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class RollupSettings {
const val DEFAULT_RENEW_LOCK_RETRY_DELAY = 1000L
const val DEFAULT_CLIENT_REQUEST_RETRY_COUNT = 3
const val DEFAULT_CLIENT_REQUEST_RETRY_DELAY = 1000L
const val MINIMUM_CANCEL_AFTER_TIME_INTERVAL_MINUTES = 10L

val ROLLUP_ENABLED: Setting<Boolean> = Setting.boolSetting(
"plugins.rollup.enabled",
Expand Down

0 comments on commit 6c61935

Please sign in to comment.