Skip to content

Commit

Permalink
Merge branch 'main' into drop-ccs-telemetry-flag
Browse files Browse the repository at this point in the history
  • Loading branch information
smalyshev committed Oct 9, 2024
2 parents 2909390 + b3448ed commit f8d3a1f
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,13 @@ public void execute(BuildFinishedFlowAction.Parameters parameters) throws FileNo
// So, if you change this such that the artifact will have a slash/directory in it, you'll need to update the logic
// below as well
pb.directory(uploadFileDir);
pb.start().waitFor();
try {
// we are very generious here, as the upload can take
// a long time depending on its size
pb.start().waitFor(30, java.util.concurrent.TimeUnit.MINUTES);
} catch (InterruptedException e) {
System.out.println("Failed to upload buildkite artifact " + e.getMessage());
}

System.out.println("Generating buildscan link for artifact...");

Expand Down
30 changes: 6 additions & 24 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ tests:
- class: "org.elasticsearch.xpack.deprecation.DeprecationHttpIT"
issue: "https://github.com/elastic/elasticsearch/issues/108628"
method: "testDeprecatedSettingsReturnWarnings"
- class: "org.elasticsearch.xpack.test.rest.XPackRestIT"
issue: "https://github.com/elastic/elasticsearch/issues/109687"
method: "test {p0=sql/translate/Translate SQL}"
- class: org.elasticsearch.index.store.FsDirectoryFactoryTests
method: testStoreDirectory
issue: https://github.com/elastic/elasticsearch/issues/110210
Expand Down Expand Up @@ -95,9 +92,6 @@ tests:
- class: org.elasticsearch.xpack.ml.integration.MlJobIT
method: testDeleteJobAfterMissingIndex
issue: https://github.com/elastic/elasticsearch/issues/112088
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
method: test {p0=transform/preview_transforms/Test preview transform latest}
issue: https://github.com/elastic/elasticsearch/issues/112144
- class: org.elasticsearch.smoketest.SmokeTestMultiNodeClientYamlTestSuiteIT
issue: https://github.com/elastic/elasticsearch/issues/112147
- class: org.elasticsearch.smoketest.WatcherYamlRestIT
Expand Down Expand Up @@ -127,9 +121,6 @@ tests:
- class: org.elasticsearch.xpack.esql.action.ManyShardsIT
method: testConcurrentQueries
issue: https://github.com/elastic/elasticsearch/issues/112424
- class: org.elasticsearch.xpack.inference.external.http.RequestBasedTaskRunnerTests
method: testLoopOneAtATime
issue: https://github.com/elastic/elasticsearch/issues/112471
- class: org.elasticsearch.ingest.geoip.IngestGeoIpClientYamlTestSuiteIT
issue: https://github.com/elastic/elasticsearch/issues/111497
- class: org.elasticsearch.smoketest.SmokeTestIngestWithAllDepsClientYamlTestSuiteIT
Expand All @@ -141,9 +132,6 @@ tests:
- class: org.elasticsearch.search.basic.SearchWhileRelocatingIT
method: testSearchAndRelocateConcurrentlyRandomReplicas
issue: https://github.com/elastic/elasticsearch/issues/112515
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
method: test {p0=terms_enum/10_basic/Test search after on unconfigured constant keyword field}
issue: https://github.com/elastic/elasticsearch/issues/112624
- class: org.elasticsearch.xpack.esql.EsqlAsyncSecurityIT
method: testIndexPatternErrorMessageComparison_ESQL_SearchDSL
issue: https://github.com/elastic/elasticsearch/issues/112630
Expand Down Expand Up @@ -230,21 +218,9 @@ tests:
- class: org.elasticsearch.packaging.test.WindowsServiceTests
method: test81JavaOptsInJvmOptions
issue: https://github.com/elastic/elasticsearch/issues/113313
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
method: test {p0=esql/50_index_patterns/disjoint_mappings}
issue: https://github.com/elastic/elasticsearch/issues/113315
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
method: test {p0=wildcard/10_wildcard_basic/Query_string wildcard query}
issue: https://github.com/elastic/elasticsearch/issues/113316
- class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT
method: test {p0=mtermvectors/10_basic/Tests catching other exceptions per item}
issue: https://github.com/elastic/elasticsearch/issues/113325
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
method: test {p0=transform/transforms_force_delete/Test force deleting a running transform}
issue: https://github.com/elastic/elasticsearch/issues/113327
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
method: test {p0=analytics/top_metrics/sort by scaled float field}
issue: https://github.com/elastic/elasticsearch/issues/113340
- class: org.elasticsearch.smoketest.DocsClientYamlTestSuiteIT
method: test {yaml=reference/ccr/apis/follow/post-resume-follow/line_84}
issue: https://github.com/elastic/elasticsearch/issues/113343
Expand Down Expand Up @@ -378,6 +354,12 @@ tests:
- class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT
method: test {p0=synonyms/60_synonym_rule_get/Synonym set not found}
issue: https://github.com/elastic/elasticsearch/issues/114432
- class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT
method: test {p0=synonyms/60_synonym_rule_get/Get a synonym rule}
issue: https://github.com/elastic/elasticsearch/issues/114443
- class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT
method: test {p0=synonyms/60_synonym_rule_get/Synonym rule not found}
issue: https://github.com/elastic/elasticsearch/issues/114444

# Examples:
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class RequestBasedTaskRunner {
* Else, offload to a new thread so we do not block another threadpool's thread.
*/
public void requestNextRun() {
if (loopCount.getAndIncrement() == 0) {
if (isRunning.get() && loopCount.getAndIncrement() == 0) {
var currentThreadPool = EsExecutors.executorName(Thread.currentThread().getName());
if (executorServiceName.equalsIgnoreCase(currentThreadPool)) {
run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,150 +7,98 @@

package org.elasticsearch.xpack.inference.external.http;

import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.xpack.inference.InferencePlugin.UTILITY_THREAD_POOL_NAME;
import static org.elasticsearch.xpack.inference.Utils.inferenceUtilityPool;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.spy;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

public class RequestBasedTaskRunnerTests extends ESTestCase {
private ThreadPool threadPool;

@Before
public void setUp() throws Exception {
super.setUp();
threadPool = spy(createThreadPool(inferenceUtilityPool()));
threadPool = mock();
when(threadPool.executor(UTILITY_THREAD_POOL_NAME)).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
}

@After
public void tearDown() throws Exception {
terminate(threadPool);
super.tearDown();
}
public void testRequestWhileLoopingWillRerunCommand() {
var expectedTimesRerun = randomInt(5);
AtomicInteger counter = new AtomicInteger(0);

public void testLoopOneAtATime() throws Exception {
// count the number of times the runnable is called
var counter = new AtomicInteger(0);

// block the runnable and wait for the test thread to take an action
var lock = new ReentrantLock();
var condition = lock.newCondition();
Runnable block = () -> {
try {
try {
lock.lock();
condition.await();
} finally {
lock.unlock();
}
} catch (InterruptedException e) {
fail(e, "did not unblock the thread in time, likely during threadpool terminate");
}
};
Runnable unblock = () -> {
try {
lock.lock();
condition.signalAll();
} finally {
lock.unlock();
var requestNextRun = new AtomicReference<Runnable>();
Runnable command = () -> {
if (counter.getAndIncrement() < expectedTimesRerun) {
requestNextRun.get().run();
}
};

var runner = new RequestBasedTaskRunner(() -> {
counter.incrementAndGet();
block.run();
}, threadPool, UTILITY_THREAD_POOL_NAME);

// given we have not called requestNextRun, then no thread should have started
assertThat(counter.get(), equalTo(0));
verify(threadPool, times(0)).executor(UTILITY_THREAD_POOL_NAME);

var runner = new RequestBasedTaskRunner(command, threadPool, UTILITY_THREAD_POOL_NAME);
requestNextRun.set(runner::requestNextRun);
runner.requestNextRun();

// given that we have called requestNextRun, then 1 thread should run once
assertBusy(() -> {
verify(threadPool, times(1)).executor(UTILITY_THREAD_POOL_NAME);
assertThat(counter.get(), equalTo(1));
});

// given that we have called requestNextRun while a thread was running, and the thread was blocked
runner.requestNextRun();
// then 1 thread should run once
verify(threadPool, times(1)).executor(UTILITY_THREAD_POOL_NAME);
assertThat(counter.get(), equalTo(1));
verify(threadPool, times(1)).executor(eq(UTILITY_THREAD_POOL_NAME));
verifyNoMoreInteractions(threadPool);
assertThat(counter.get(), equalTo(expectedTimesRerun + 1));
}

// given the thread is unblocked
unblock.run();
// then 1 thread should run twice
verify(threadPool, times(1)).executor(UTILITY_THREAD_POOL_NAME);
assertBusy(() -> assertThat(counter.get(), equalTo(2)));
public void testRequestWhileNotLoopingWillQueueCommand() {
AtomicInteger counter = new AtomicInteger(0);

// given the thread is unblocked again, but there were only two calls to requestNextRun
unblock.run();
// then 1 thread should run twice
verify(threadPool, times(1)).executor(UTILITY_THREAD_POOL_NAME);
assertBusy(() -> assertThat(counter.get(), equalTo(2)));
var runner = new RequestBasedTaskRunner(counter::incrementAndGet, threadPool, UTILITY_THREAD_POOL_NAME);

// given no thread is running, when we call requestNextRun
runner.requestNextRun();
// then a second thread should start for the third run
assertBusy(() -> {
verify(threadPool, times(2)).executor(UTILITY_THREAD_POOL_NAME);
assertThat(counter.get(), equalTo(3));
});

// given the thread is unblocked, then it should exit and rejoin the threadpool
unblock.run();
assertTrue("Test thread should unblock after all runs complete", terminate(threadPool));

// final check - we ran three times on two threads
verify(threadPool, times(2)).executor(UTILITY_THREAD_POOL_NAME);
assertThat(counter.get(), equalTo(3));
for (int i = 1; i < randomInt(10); i++) {
runner.requestNextRun();
verify(threadPool, times(i)).executor(eq(UTILITY_THREAD_POOL_NAME));
assertThat(counter.get(), equalTo(i));
}
;
}

public void testCancel() throws Exception {
// count the number of times the runnable is called
var counter = new AtomicInteger(0);
var latch = new CountDownLatch(1);
var runner = new RequestBasedTaskRunner(() -> {
counter.incrementAndGet();
try {
latch.await();
} catch (InterruptedException e) {
fail(e, "did not unblock the thread in time, likely during threadpool terminate");
}
}, threadPool, UTILITY_THREAD_POOL_NAME);
public void testCancelBeforeRunning() {
AtomicInteger counter = new AtomicInteger(0);

// given that we have called requestNextRun, then 1 thread should run once
var runner = new RequestBasedTaskRunner(counter::incrementAndGet, threadPool, UTILITY_THREAD_POOL_NAME);
runner.cancel();
runner.requestNextRun();
assertBusy(() -> {
verify(threadPool, times(1)).executor(UTILITY_THREAD_POOL_NAME);
assertThat(counter.get(), equalTo(1));
});

// given that a thread is running, three more calls will be queued
runner.requestNextRun();
runner.requestNextRun();
verifyNoInteractions(threadPool);
assertThat(counter.get(), equalTo(0));
}

public void testCancelWhileRunning() {
var expectedTimesRerun = randomInt(5);
AtomicInteger counter = new AtomicInteger(0);

var runnerRef = new AtomicReference<RequestBasedTaskRunner>();
Runnable command = () -> {
if (counter.getAndIncrement() < expectedTimesRerun) {
runnerRef.get().requestNextRun();
}
runnerRef.get().cancel();
};
var runner = new RequestBasedTaskRunner(command, threadPool, UTILITY_THREAD_POOL_NAME);
runnerRef.set(runner);
runner.requestNextRun();

// when we cancel the thread, then the thread should immediately exit and rejoin
runner.cancel();
latch.countDown();
assertTrue("Test thread should unblock after all runs complete", terminate(threadPool));
verify(threadPool, times(1)).executor(eq(UTILITY_THREAD_POOL_NAME));
verifyNoMoreInteractions(threadPool);
assertThat(counter.get(), equalTo(1));

// given that we called cancel, when we call requestNextRun then no thread should start
runner.requestNextRun();
verify(threadPool, times(1)).executor(UTILITY_THREAD_POOL_NAME);
verifyNoMoreInteractions(threadPool);
assertThat(counter.get(), equalTo(1));
}

Expand Down

0 comments on commit f8d3a1f

Please sign in to comment.