From 3f080132b261ab3ee2bde2130750835365e7c75c Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Mon, 25 Jul 2022 23:34:52 +0530 Subject: [PATCH] Addressing review comment - adding tests to test concurrency Signed-off-by: Bharathwaj G --- .../org/opensearch/search/SearchService.java | 5 + .../search/CreatePitMultiNodeTests.java | 113 ++++++++++++++++++ .../search/DeletePitMultiNodeTests.java | 47 ++++++++ 3 files changed, 165 insertions(+) diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 3213ddde5def5..747b13b031824 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -1078,6 +1078,11 @@ public DeletePitResponse freeReaderContextsIfFound(List operationThreads = new ArrayList<>(); + CountDownLatch countDownLatch = new CountDownLatch(concurrentRuns); + Set createSet = new HashSet<>(); + for (int i = 0; i < concurrentRuns; i++) { + Runnable thread = () -> { + logger.info("Triggering pit create --->"); + LatchedActionListener listener = new LatchedActionListener<>(new ActionListener() { + @Override + public void onResponse(CreatePitResponse createPitResponse) { + if (createSet.add(createPitResponse.getId())) { + numSuccess.incrementAndGet(); + } + } + + @Override + public void onFailure(Exception e) {} + }, countDownLatch); + client().execute(CreatePitAction.INSTANCE, createPitRequest, listener); + }; + operationThreads.add(thread); + } + TestThreadPool finalTestThreadPool = testThreadPool; + operationThreads.forEach(runnable -> finalTestThreadPool.executor("generic").execute(runnable)); + countDownLatch.await(); + assertEquals(concurrentRuns, numSuccess.get()); + } finally { + ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); + } + } + + public void testConcurrentCreatesWithDeletes() throws InterruptedException, ExecutionException { + CreatePitRequest createPitRequest = new CreatePitRequest(TimeValue.timeValueDays(1), true); + createPitRequest.setIndices(new String[] { "index" }); + List pitIds = new ArrayList<>(); + String id = client().execute(CreatePitAction.INSTANCE, createPitRequest).get().getId(); + pitIds.add(id); + DeletePitRequest deletePITRequest = new DeletePitRequest(pitIds); + Set createSet = new HashSet<>(); + AtomicInteger numSuccess = new AtomicInteger(); + TestThreadPool testThreadPool = null; + try { + testThreadPool = new TestThreadPool(CreatePitMultiNodeTests.class.getName()); + int concurrentRuns = randomIntBetween(20, 50); + + List operationThreads = new ArrayList<>(); + CountDownLatch countDownLatch = new CountDownLatch(concurrentRuns); + long randomDeleteThread = randomLongBetween(0, concurrentRuns - 1); + for (int i = 0; i < concurrentRuns; i++) { + int currentThreadIteration = i; + Runnable thread = () -> { + if (currentThreadIteration == randomDeleteThread) { + LatchedActionListener listener = new LatchedActionListener<>(new ActionListener() { + @Override + public void onResponse(CreatePitResponse createPitResponse) { + if (createSet.add(createPitResponse.getId())) { + numSuccess.incrementAndGet(); + } + } + + @Override + public void onFailure(Exception e) {} + }, countDownLatch); + client().execute(CreatePitAction.INSTANCE, createPitRequest, listener); + } else { + LatchedActionListener listener = new LatchedActionListener<>(new ActionListener() { + @Override + public void onResponse(DeletePitResponse deletePitResponse) { + if (deletePitResponse.getDeletePitResults().get(0).isSuccessful()) { + numSuccess.incrementAndGet(); + } + } + + @Override + public void onFailure(Exception e) {} + }, countDownLatch); + client().execute(DeletePitAction.INSTANCE, deletePITRequest, listener); + } + }; + operationThreads.add(thread); + } + TestThreadPool finalTestThreadPool = testThreadPool; + operationThreads.forEach(runnable -> finalTestThreadPool.executor("generic").execute(runnable)); + countDownLatch.await(); + assertEquals(concurrentRuns, numSuccess.get()); + + } finally { + ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); + } + } } diff --git a/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java b/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java index 79e4a86b3dba9..89607b9201cd9 100644 --- a/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java +++ b/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java @@ -11,6 +11,8 @@ import org.junit.After; import org.junit.Before; import org.opensearch.action.ActionFuture; +import org.opensearch.action.ActionListener; +import org.opensearch.action.LatchedActionListener; import org.opensearch.action.search.CreatePitAction; import org.opensearch.action.search.CreatePitRequest; import org.opensearch.action.search.CreatePitResponse; @@ -23,12 +25,16 @@ import org.opensearch.search.builder.PointInTimeBuilder; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.Matchers.blankOrNullString; import static org.hamcrest.Matchers.containsString; @@ -283,4 +289,45 @@ public void testDeleteWhileSearch() throws Exception { thread.join(); } } + + public void testtConcurrentDeletes() throws InterruptedException, ExecutionException { + CreatePitResponse pitResponse = createPitOnIndex("index"); + ensureGreen(); + int concurrentRuns = randomIntBetween(20, 50); + List pitIds = new ArrayList<>(); + pitIds.add(pitResponse.getId()); + DeletePitRequest deletePITRequest = new DeletePitRequest(pitIds); + AtomicInteger numDeleteAcknowledged = new AtomicInteger(); + TestThreadPool testThreadPool = null; + try { + testThreadPool = new TestThreadPool(DeletePitMultiNodeTests.class.getName()); + List operationThreads = new ArrayList<>(); + CountDownLatch countDownLatch = new CountDownLatch(concurrentRuns); + for (int i = 0; i < concurrentRuns; i++) { + Runnable thread = () -> { + logger.info("Triggering pit delete --->"); + LatchedActionListener listener = new LatchedActionListener<>(new ActionListener() { + @Override + public void onResponse(DeletePitResponse deletePitResponse) { + if (deletePitResponse.getDeletePitResults().get(0).isSuccessful()) { + numDeleteAcknowledged.incrementAndGet(); + } + } + + @Override + public void onFailure(Exception e) {} + }, countDownLatch); + client().execute(DeletePitAction.INSTANCE, deletePITRequest, listener); + }; + operationThreads.add(thread); + } + TestThreadPool finalTestThreadPool = testThreadPool; + operationThreads.forEach(runnable -> finalTestThreadPool.executor("generic").execute(runnable)); + countDownLatch.await(); + assertEquals(concurrentRuns, numDeleteAcknowledged.get()); + } finally { + ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); + } + } + }