diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java index bb88c23e65ed4..57e6d65bc0945 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java @@ -10,6 +10,8 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; @@ -161,8 +163,27 @@ protected void doSaveState(IndexerState indexerState, Map positi @Override protected void onFinish(ActionListener listener) { - logger.debug("Finished indexing for job [" + job.getConfig().getId() + "]"); - listener.onResponse(null); + final RollupJobConfig jobConfig = job.getConfig(); + final ActionListener refreshResponseActionListener = new ActionListener<>() { + + @Override + public void onResponse(RefreshResponse refreshResponse) { + logger.trace("refreshing rollup index {} successful for job {}", jobConfig.getRollupIndex(), jobConfig.getId()); + listener.onResponse(null); + } + + @Override + public void onFailure(Exception e) { + logger.warn( + "refreshing rollup index {} failed for job {} with exception {}", + jobConfig.getRollupIndex(), + jobConfig.getId(), + e + ); + listener.onResponse(null); + } + }; + client.admin().indices().refresh(new RefreshRequest(jobConfig.getRollupIndex()), refreshResponseActionListener); } @Override diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java index 5c44d181284d0..8e2714f8bb9e1 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java @@ -706,7 +706,7 @@ private void executeTestCase( String dateHistoField = config.getGroupConfig().getDateHistogram().getField(); final ThreadPool threadPool = new TestThreadPool(getTestName()); - try { + try (dir; reader) { RollupJob job = new RollupJob(config, Collections.emptyMap()); final SyncRollupIndexer action = new SyncRollupIndexer( threadPool, @@ -718,8 +718,6 @@ private void executeTestCase( rollupConsumer.accept(action.triggerAndWaitForCompletion(now)); } finally { ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); - reader.close(); - dir.close(); } } diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java index 5b0e5cc952334..e6ed75c29b7b4 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java @@ -435,7 +435,6 @@ public void testAbortAfterCompletion() throws Exception { // Don't use the indexer's latch because we completely change doNextSearch() final CountDownLatch doNextSearchLatch = new CountDownLatch(1); - try { DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(threadPool, job, state, null) { @Override @@ -840,7 +839,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws final ThreadPool threadPool = new TestThreadPool(getTestName()); try { - NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer( threadPool, job, @@ -897,7 +895,6 @@ public void testSearchShardFailure() throws Exception { final ThreadPool threadPool = new TestThreadPool(getTestName()); try { - NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer( threadPool, job, diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java index 3aae8e719dafc..259698b618660 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java @@ -10,6 +10,9 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.admin.indices.refresh.RefreshAction; +import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.client.internal.Client; @@ -49,6 +52,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -584,6 +588,11 @@ public void testTriggerWithoutHeaders() throws Exception { final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); Client client = mock(Client.class); + doAnswer(invocationOnMock -> { + RefreshResponse r = new RefreshResponse(2, 2, 0, Collections.emptyList()); + ((ActionListener) invocationOnMock.getArguments()[2]).onResponse(r); + return null; + }).when(client).execute(eq(RefreshAction.INSTANCE), any(), any()); when(client.settings()).thenReturn(Settings.EMPTY); AtomicBoolean started = new AtomicBoolean(false); @@ -609,7 +618,7 @@ public void testTriggerWithoutHeaders() throws Exception { ((ActionListener) invocationOnMock.getArguments()[2]).onResponse(r); return null; - }).when(client).execute(any(), any(), any()); + }).when(client).execute(eq(SearchAction.INSTANCE), any(), any()); SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); TaskId taskId = new TaskId("node", 123); @@ -686,7 +695,11 @@ public void testTriggerWithHeaders() throws Exception { headers.put("_xpack_security_authentication", "bar"); RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), headers); Client client = mock(Client.class); - when(client.settings()).thenReturn(Settings.EMPTY); + doAnswer(invocationOnMock -> { + RefreshResponse r = new RefreshResponse(2, 2, 0, Collections.emptyList()); + ((ActionListener) invocationOnMock.getArguments()[2]).onResponse(r); + return null; + }).when(client).execute(eq(RefreshAction.INSTANCE), any(), any()); AtomicBoolean started = new AtomicBoolean(false); AtomicBoolean finished = new AtomicBoolean(false); @@ -714,7 +727,7 @@ public void testTriggerWithHeaders() throws Exception { ((ActionListener) invocationOnMock.getArguments()[2]).onResponse(r); return null; - }).when(client).execute(any(), any(), any()); + }).when(client).execute(eq(SearchAction.INSTANCE), any(), any()); SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); TaskId taskId = new TaskId("node", 123); @@ -780,7 +793,7 @@ public void onFailure(Exception e) { latch.countDown(); // Wait for the final persistent status to finish - assertBusy(() -> assertTrue(finished.get())); + assertBusy(() -> assertTrue(finished.get()), 30, TimeUnit.SECONDS); } @SuppressWarnings("unchecked") @@ -791,6 +804,11 @@ public void testSaveStateChangesIDScheme() throws Exception { headers.put("_xpack_security_authentication", "bar"); RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), headers); Client client = mock(Client.class); + doAnswer(invocationOnMock -> { + RefreshResponse r = new RefreshResponse(2, 2, 0, Collections.emptyList()); + ((ActionListener) invocationOnMock.getArguments()[2]).onResponse(r); + return null; + }).when(client).execute(eq(RefreshAction.INSTANCE), any(), any()); when(client.settings()).thenReturn(Settings.EMPTY); AtomicBoolean started = new AtomicBoolean(false); @@ -819,7 +837,7 @@ public void testSaveStateChangesIDScheme() throws Exception { ((ActionListener) invocationOnMock.getArguments()[2]).onResponse(r); return null; - }).when(client).execute(any(), any(), any()); + }).when(client).execute(eq(SearchAction.INSTANCE), any(), any()); SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, null);