Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change Thread.sleep to waitUntil function under test files #1242

Merged
merged 7 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
import org.opensearch.search.aggregations.metrics.CardinalityAggregationBuilder
import org.opensearch.search.aggregations.support.MultiTermsValuesSourceConfig
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.test.OpenSearchTestCase
import java.net.URLEncoder
import java.time.Instant
import java.time.ZonedDateTime
Expand All @@ -55,6 +56,7 @@ import java.time.temporal.ChronoUnit
import java.time.temporal.ChronoUnit.DAYS
import java.time.temporal.ChronoUnit.MILLIS
import java.time.temporal.ChronoUnit.MINUTES
import java.util.concurrent.TimeUnit

class MonitorRunnerServiceIT : AlertingRestTestCase() {

Expand Down Expand Up @@ -138,7 +140,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
verifyAlert(firstRunAlert, monitor)
// Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to
// see lastNotificationTime change.
Thread.sleep(200)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 200, TimeUnit.MILLISECONDS)
executeMonitor(monitor.id)
val secondRunAlert = searchAlerts(monitor).single()
verifyAlert(secondRunAlert, monitor)
Expand Down Expand Up @@ -265,7 +269,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {

// Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to
// let lastNotificationTime change. W/o this sleep the test can result in a false negative.
Thread.sleep(200)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 200, TimeUnit.MILLISECONDS)
val response = executeMonitor(monitor.id)

val output = entityAsMap(response)
Expand Down Expand Up @@ -765,7 +771,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
verifyAlert(activeAlert1.single(), monitor, ACTIVE)
val actionResults1 = verifyActionExecutionResultInAlert(activeAlert1[0], mutableMapOf(Pair(actionThrottleEnabled.id, 0)))

Thread.sleep(200)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 200, TimeUnit.MILLISECONDS)
updateMonitor(monitor.copy(triggers = listOf(trigger.copy(condition = NEVER_RUN)), id = monitor.id))
executeMonitor(monitor.id)
val completedAlert = searchAlerts(monitor, AlertIndices.ALL_ALERT_INDEX_PATTERN).single()
Expand Down Expand Up @@ -1398,7 +1406,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {

// Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to
// let lastNotificationTime change. W/o this sleep the test can result in a false negative.
Thread.sleep(200)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 200, TimeUnit.MILLISECONDS)
executeMonitor(monitor.id)

// Check that the lastNotification time of the acknowledged Alert wasn't updated and the active Alert's was
Expand All @@ -1418,7 +1428,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
)

// Execute Monitor and check that both Alerts were updated
Thread.sleep(200)
OpenSearchTestCase.waitUntil({
return@waitUntil (searchAlerts(monitor, AlertIndices.ALL_ALERT_INDEX_PATTERN).size == 2)
}, 200, TimeUnit.MILLISECONDS)
executeMonitor(monitor.id)
currentAlerts = searchAlerts(monitor, AlertIndices.ALL_ALERT_INDEX_PATTERN)
val completedAlerts = currentAlerts.filter { it.state == COMPLETED }
Expand Down Expand Up @@ -1940,7 +1952,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {

// Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to
// let Action executionTime change. W/o this sleep the test can result in a false negative.
Thread.sleep(200)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 200, TimeUnit.MILLISECONDS)
val monitorRunResultThrottled = entityAsMap(executeMonitor(monitor.id))
verifyActionThrottleResultsForBucketLevelMonitor(
monitorRunResult = monitorRunResultThrottled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ class AlertIndicesIT : AlertingRestTestCase() {
executeMonitor(trueMonitor)

// Allow for a rollover index.
Thread.sleep(2000)
OpenSearchTestCase.waitUntil({
return@waitUntil (getAlertIndices().size >= 3)
}, 2, TimeUnit.SECONDS)
assertTrue("Did not find 3 alert indices", getAlertIndices().size >= 3)
}

Expand All @@ -157,7 +159,9 @@ class AlertIndicesIT : AlertingRestTestCase() {
executeMonitor(trueMonitor.id)

// Allow for a rollover index.
Thread.sleep(2000)
OpenSearchTestCase.waitUntil({
return@waitUntil (getFindingIndices().size >= 2)
}, 2, TimeUnit.SECONDS)
assertTrue("Did not find 2 alert indices", getFindingIndices().size >= 2)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.core.rest.RestStatus
import org.opensearch.index.query.QueryBuilders
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.test.OpenSearchTestCase
import java.util.concurrent.TimeUnit

class AlertingBackwardsCompatibilityIT : AlertingRestTestCase() {

Expand Down Expand Up @@ -69,7 +71,9 @@ class AlertingBackwardsCompatibilityIT : AlertingRestTestCase() {
// the test execution by a lot (might have to wait for Job Scheduler plugin integration first)
// Waiting a minute to ensure the Monitor ran again at least once before checking if the job is running
// on time
Thread.sleep(60000)
OpenSearchTestCase.waitUntil({
return@waitUntil false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of false, we would want to verifyMonitorStats. That way we get the benefit of the waitUtil function

}, 1, TimeUnit.MINUTES)
verifyMonitorStats("/_plugins/_alerting")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,10 @@ class MonitorRestApiIT : AlertingRestTestCase() {
assertEquals("Delete request not successful", RestStatus.OK, deleteResponse.restStatus())

// Wait 5 seconds for event to be processed and alerts moved
Thread.sleep(5000)
OpenSearchTestCase.waitUntil({
val historyAlerts = searchAlerts(monitor, AlertIndices.ALERT_HISTORY_WRITE_INDEX)
return@waitUntil (historyAlerts.size == 1)
}, 5, TimeUnit.SECONDS)

val alerts = searchAlerts(monitor)
assertEquals("Active alert was not deleted", 0, alerts.size)
Expand Down Expand Up @@ -842,7 +845,10 @@ class MonitorRestApiIT : AlertingRestTestCase() {
assertEquals("Update request not successful", RestStatus.OK, updateResponse.restStatus())

// Wait 5 seconds for event to be processed and alerts moved
Thread.sleep(5000)
OpenSearchTestCase.waitUntil({
val historyAlerts = searchAlerts(monitor, AlertIndices.ALERT_HISTORY_WRITE_INDEX)
return@waitUntil (historyAlerts.size == 1)
}, 5, TimeUnit.SECONDS)

val alerts = searchAlerts(monitor)
assertEquals("Active alert was not deleted", 0, alerts.size)
Expand Down Expand Up @@ -870,7 +876,11 @@ class MonitorRestApiIT : AlertingRestTestCase() {
assertEquals("Update request not successful", RestStatus.OK, updateResponse.restStatus())

// Wait 5 seconds for event to be processed and alerts moved
Thread.sleep(5000)
OpenSearchTestCase.waitUntil({
val alerts = searchAlerts(monitor)
val historyAlerts = searchAlerts(monitor, AlertIndices.ALERT_HISTORY_WRITE_INDEX)
return@waitUntil (alerts.isEmpty() && historyAlerts.size == 1)
}, 5, TimeUnit.SECONDS)

val alerts = searchAlerts(monitor)
assertEquals("Active alert was not deleted", 0, alerts.size)
Expand Down Expand Up @@ -956,10 +966,13 @@ class MonitorRestApiIT : AlertingRestTestCase() {

fun `test monitor stats when disabling and re-enabling scheduled jobs with existing monitor`() {
// Enable Monitor jobs
enableScheduledJob()

var response = enableScheduledJob()
AWSHurneyt marked this conversation as resolved.
Show resolved Hide resolved
val monitorId = createMonitor(randomQueryLevelMonitor(enabled = true), refresh = true).id

if (isMultiNode) Thread.sleep(2000)
if (isMultiNode) OpenSearchTestCase.waitUntil({
return@waitUntil (response == null)
}, 2, TimeUnit.SECONDS)
var alertingStats = getAlertingStats()
assertAlertingStatsSweeperEnabled(alertingStats, true)
assertEquals("Scheduled job index does not exist", true, alertingStats["scheduled_job_index_exists"])
Expand Down Expand Up @@ -989,10 +1002,12 @@ class MonitorRestApiIT : AlertingRestTestCase() {
)

// Re-enable Monitor jobs
enableScheduledJob()
response = enableScheduledJob()

// Sleep briefly so sweep can reschedule the Monitor
Thread.sleep(2000)
OpenSearchTestCase.waitUntil({
return@waitUntil (response == null)
}, 2, TimeUnit.SECONDS)

alertingStats = getAlertingStats()
assertAlertingStatsSweeperEnabled(alertingStats, true)
Expand All @@ -1015,10 +1030,13 @@ class MonitorRestApiIT : AlertingRestTestCase() {

fun `test monitor stats jobs`() {
// Enable the Monitor plugin.
enableScheduledJob()

var response = enableScheduledJob()
createRandomMonitor(refresh = true)

if (isMultiNode) Thread.sleep(2000)
if (isMultiNode) OpenSearchTestCase.waitUntil({
return@waitUntil (response == null)
}, 2, TimeUnit.SECONDS)
val responseMap = getAlertingStats()
assertAlertingStatsSweeperEnabled(responseMap, true)
assertEquals("Scheduled job index does not exist", true, responseMap["scheduled_job_index_exists"])
Expand Down Expand Up @@ -1048,10 +1066,12 @@ class MonitorRestApiIT : AlertingRestTestCase() {

fun `test monitor specific metric`() {
// Enable the Monitor plugin.
enableScheduledJob()
var response = enableScheduledJob()
createRandomMonitor(refresh = true)

if (isMultiNode) Thread.sleep(2000)
if (isMultiNode) OpenSearchTestCase.waitUntil({
return@waitUntil (response == null)
}, 2, TimeUnit.SECONDS)
val responseMap = getAlertingStats("/jobs_info")
assertAlertingStatsSweeperEnabled(responseMap, true)
assertEquals("Scheduled job index does not exist", true, responseMap["scheduled_job_index_exists"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ import org.opensearch.index.query.QueryBuilders
import org.opensearch.script.Script
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.test.OpenSearchTestCase
import org.opensearch.test.junit.annotations.TestLogging
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.Collections
import java.util.Locale
import java.util.UUID
import java.util.concurrent.TimeUnit

@TestLogging("level:DEBUG", reason = "Debug for tests.")
@Suppress("UNCHECKED_CAST")
Expand Down Expand Up @@ -1180,7 +1182,10 @@ class WorkflowRestApiIT : AlertingRestTestCase() {
}"""

indexDoc(index, "1", testDoc)
Thread.sleep(80000)
OpenSearchTestCase.waitUntil({
val findings = searchFindings(monitor.copy(id = monitorResponse.id))
return@waitUntil (findings.size == 1)
}, 80, TimeUnit.SECONDS)

val findings = searchFindings(monitor.copy(id = monitorResponse.id))
assertEquals("Findings saved for test monitor", 1, findings.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import org.opensearch.alerting.util.DestinationType
import org.opensearch.client.ResponseException
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import org.opensearch.core.rest.RestStatus
import org.opensearch.test.OpenSearchTestCase
import java.time.Instant
import java.util.UUID
import java.util.concurrent.TimeUnit

class DestinationMigrationUtilServiceIT : AlertingRestTestCase() {

Expand Down Expand Up @@ -79,8 +81,11 @@ class DestinationMigrationUtilServiceIT : AlertingRestTestCase() {
}

// Create cluster change event and wait for migration service to complete migrating data over
client().updateSettings("indices.recovery.max_bytes_per_sec", "40mb")
Thread.sleep(120000)
var request: Map<String, Any>? = null
request = client().updateSettings("indices.recovery.max_bytes_per_sec", "40mb")
OpenSearchTestCase.waitUntil({
return@waitUntil request == null
}, 2, TimeUnit.MINUTES)

for (id in ids) {
val response = client().makeRequest(
Expand Down
Loading