diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java index aeca7caf2d446..ec8840739a924 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java @@ -248,16 +248,16 @@ public void start() { void onScrollResponse(ScrollableHitSource.AsyncResponse asyncResponse) { // lastBatchStartTime is essentially unused (see WorkerBulkByScrollTaskState.throttleWaitTime. Leaving it for now, since it seems // like a bug? - onScrollResponse(new TimeValue(System.nanoTime()), this.lastBatchSize, asyncResponse); + onScrollResponse(System.nanoTime(), this.lastBatchSize, asyncResponse); } /** * Process a scroll response. - * @param lastBatchStartTime the time when the last batch started. Used to calculate the throttling delay. + * @param lastBatchStartTimeNS the time when the last batch started. Used to calculate the throttling delay. * @param lastBatchSize the size of the last batch. Used to calculate the throttling delay. * @param asyncResponse the response to process from ScrollableHitSource */ - void onScrollResponse(TimeValue lastBatchStartTime, int lastBatchSize, ScrollableHitSource.AsyncResponse asyncResponse) { + void onScrollResponse(long lastBatchStartTimeNS, int lastBatchSize, ScrollableHitSource.AsyncResponse asyncResponse) { ScrollableHitSource.Response response = asyncResponse.response(); logger.debug("[{}]: got scroll response with [{}] hits", task.getId(), response.getHits().size()); if (task.isCancelled()) { @@ -285,7 +285,7 @@ protected void doRun() throws Exception { * It is important that the batch start time be calculated from here, scroll response to scroll response. That way the time * waiting on the scroll doesn't count against this batch in the throttle. */ - prepareBulkRequest(timeValueNanos(System.nanoTime()), asyncResponse); + prepareBulkRequest(System.nanoTime(), asyncResponse); } @Override @@ -294,7 +294,7 @@ public void onFailure(Exception e) { } }; prepareBulkRequestRunnable = (AbstractRunnable) threadPool.getThreadContext().preserveContext(prepareBulkRequestRunnable); - worker.delayPrepareBulkRequest(threadPool, lastBatchStartTime, lastBatchSize, prepareBulkRequestRunnable); + worker.delayPrepareBulkRequest(threadPool, lastBatchStartTimeNS, lastBatchSize, prepareBulkRequestRunnable); } /** @@ -302,7 +302,7 @@ public void onFailure(Exception e) { * delay has been slept. Uses the generic thread pool because reindex is rare enough not to need its own thread pool and because the * thread may be blocked by the user script. */ - void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.AsyncResponse asyncResponse) { + void prepareBulkRequest(long thisBatchStartTimeNS, ScrollableHitSource.AsyncResponse asyncResponse) { ScrollableHitSource.Response response = asyncResponse.response(); logger.debug("[{}]: preparing bulk request", task.getId()); if (task.isCancelled()) { @@ -328,12 +328,12 @@ void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.AsyncR /* * If we noop-ed the entire batch then just skip to the next batch or the BulkRequest would fail validation. */ - notifyDone(thisBatchStartTime, asyncResponse, 0); + notifyDone(thisBatchStartTimeNS, asyncResponse, 0); return; } request.timeout(mainRequest.getTimeout()); request.waitForActiveShards(mainRequest.getWaitForActiveShards()); - sendBulkRequest(request, () -> notifyDone(thisBatchStartTime, asyncResponse, request.requests().size())); + sendBulkRequest(request, () -> notifyDone(thisBatchStartTimeNS, asyncResponse, request.requests().size())); } /** @@ -419,14 +419,14 @@ void onBulkResponse(BulkResponse response, Runnable onSuccess) { } } - void notifyDone(TimeValue thisBatchStartTime, ScrollableHitSource.AsyncResponse asyncResponse, int batchSize) { + void notifyDone(long thisBatchStartTimeNS, ScrollableHitSource.AsyncResponse asyncResponse, int batchSize) { if (task.isCancelled()) { logger.debug("[{}]: finishing early because the task was cancelled", task.getId()); finishHim(null); return; } this.lastBatchSize = batchSize; - asyncResponse.done(worker.throttleWaitTime(thisBatchStartTime, timeValueNanos(System.nanoTime()), batchSize)); + asyncResponse.done(worker.throttleWaitTime(thisBatchStartTimeNS, System.nanoTime(), batchSize)); } private void recordFailure(Failure failure, List failures) { diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java index 1e9aacb5afa62..ae451364a289a 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java @@ -112,7 +112,6 @@ import static org.apache.lucene.util.TestUtil.randomSimpleString; import static org.elasticsearch.action.bulk.BackoffPolicy.constantBackoff; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; -import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; @@ -256,7 +255,7 @@ public void testScrollResponseSetsTotal() { long total = randomIntBetween(0, Integer.MAX_VALUE); ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), total, emptyList(), null); - simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueSeconds(0), 0, response); + simulateScrollResponse(new DummyAsyncBulkByScrollAction(), 0, 0, response); assertEquals(total, testTask.getStatus().getTotal()); } @@ -269,7 +268,7 @@ public void testScrollResponseBatchingBehavior() throws Exception { Hit hit = new ScrollableHitSource.BasicHit("index", "type", "id", 0); ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 1, singletonList(hit), null); DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction(); - simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 0, response); + simulateScrollResponse(action, System.nanoTime(), 0, response); // Use assert busy because the update happens on another thread final int expectedBatches = batches; @@ -355,7 +354,7 @@ public ScheduledCancellable schedule(Runnable command, TimeValue delay, String n } }); ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 0, emptyList(), null); - simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 10, response); + simulateScrollResponse(new DummyAsyncBulkByScrollAction(), System.nanoTime(), 10, response); ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get()); assertThat(e.getCause(), instanceOf(EsRejectedExecutionException.class)); assertThat(e.getCause(), hasToString(containsString("test"))); @@ -373,7 +372,7 @@ public void testShardFailuresAbortRequest() throws Exception { SearchFailure shardFailure = new SearchFailure(new RuntimeException("test")); ScrollableHitSource.Response scrollResponse = new ScrollableHitSource.Response(false, singletonList(shardFailure), 0, emptyList(), null); - simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse); + simulateScrollResponse(new DummyAsyncBulkByScrollAction(), System.nanoTime(), 0, scrollResponse); BulkByScrollResponse response = listener.get(); assertThat(response.getBulkFailures(), empty()); assertThat(response.getSearchFailures(), contains(shardFailure)); @@ -387,7 +386,7 @@ public void testShardFailuresAbortRequest() throws Exception { */ public void testSearchTimeoutsAbortRequest() throws Exception { ScrollableHitSource.Response scrollResponse = new ScrollableHitSource.Response(true, emptyList(), 0, emptyList(), null); - simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse); + simulateScrollResponse(new DummyAsyncBulkByScrollAction(), System.nanoTime(), 0, scrollResponse); BulkByScrollResponse response = listener.get(); assertThat(response.getBulkFailures(), empty()); assertThat(response.getSearchFailures(), empty()); @@ -424,7 +423,7 @@ protected AbstractAsyncBulkByScrollAction.RequestWrapper buildRequest(Hit doc ScrollableHitSource.BasicHit hit = new ScrollableHitSource.BasicHit("index", "type", "id", 0); hit.setSource(new BytesArray("{}"), XContentType.JSON); ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 1, singletonList(hit), null); - simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 0, response); + simulateScrollResponse(action, System.nanoTime(), 0, response); ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get()); assertThat(e.getCause(), instanceOf(RuntimeException.class)); assertThat(e.getCause().getMessage(), equalTo("surprise")); @@ -620,7 +619,7 @@ public void testCancelBeforeInitialSearch() throws Exception { } public void testCancelBeforeScrollResponse() throws Exception { - cancelTaskCase((DummyAsyncBulkByScrollAction action) -> simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 1, + cancelTaskCase((DummyAsyncBulkByScrollAction action) -> simulateScrollResponse(action, System.nanoTime(), 1, new ScrollableHitSource.Response(false, emptyList(), between(1, 100000), emptyList(), null))); } @@ -635,7 +634,7 @@ public void testCancelBeforeOnBulkResponse() throws Exception { } public void testCancelBeforeStartNextScroll() throws Exception { - TimeValue now = timeValueNanos(System.nanoTime()); + long now = System.nanoTime(); cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.notifyDone(now, null, 0)); } @@ -684,7 +683,7 @@ public ScheduledCancellable schedule(Runnable command, TimeValue delay, String n ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), total, emptyList(), null); // Use a long delay here so the test will time out if the cancellation doesn't reschedule the throttled task worker.rethrottle(1); - simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 1000, response); + simulateScrollResponse(action, System.nanoTime(), 1000, response); // Now that we've got our cancel we'll just verify that it all came through all right assertEquals(reason, listener.get(10, TimeUnit.SECONDS).getReasonCancelled()); @@ -713,7 +712,7 @@ private void cancelTaskCase(Consumer testMe) throw /** * Simulate a scroll response by setting the scroll id and firing the onScrollResponse method. */ - private void simulateScrollResponse(DummyAsyncBulkByScrollAction action, TimeValue lastBatchTime, int lastBatchSize, + private void simulateScrollResponse(DummyAsyncBulkByScrollAction action, long lastBatchTime, int lastBatchSize, ScrollableHitSource.Response response) { action.setScroll(scrollId()); action.onScrollResponse(lastBatchTime, lastBatchSize, new ScrollableHitSource.AsyncResponse() { diff --git a/server/src/main/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskState.java b/server/src/main/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskState.java index ae2a6a552cba4..5d50cf7791419 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskState.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskState.java @@ -182,11 +182,11 @@ TimeValue throttledUntil() { * Schedule prepareBulkRequestRunnable to run after some delay. This is where throttling plugs into reindexing so the request can be * rescheduled over and over again. */ - public void delayPrepareBulkRequest(ThreadPool threadPool, TimeValue lastBatchStartTime, int lastBatchSize, + public void delayPrepareBulkRequest(ThreadPool threadPool, long lastBatchStartTimeNS, int lastBatchSize, AbstractRunnable prepareBulkRequestRunnable) { // Synchronize so we are less likely to schedule the same request twice. synchronized (delayedPrepareBulkRequestReference) { - TimeValue delay = throttleWaitTime(lastBatchStartTime, timeValueNanos(System.nanoTime()), lastBatchSize); + TimeValue delay = throttleWaitTime(lastBatchStartTimeNS, System.nanoTime(), lastBatchSize); logger.debug("[{}]: preparing bulk request for [{}]", task.getId(), delay); try { delayedPrepareBulkRequestReference.set(new DelayedPrepareBulkRequest(threadPool, getRequestsPerSecond(), @@ -197,8 +197,8 @@ public void delayPrepareBulkRequest(ThreadPool threadPool, TimeValue lastBatchSt } } - public TimeValue throttleWaitTime(TimeValue lastBatchStartTime, TimeValue now, int lastBatchSize) { - long earliestNextBatchStartTime = now.nanos() + (long) perfectlyThrottledBatchTime(lastBatchSize); + public TimeValue throttleWaitTime(long lastBatchStartTimeNS, long nowNS, int lastBatchSize) { + long earliestNextBatchStartTime = nowNS + (long) perfectlyThrottledBatchTime(lastBatchSize); long waitTime = min(MAX_THROTTLE_WAIT_TIME.nanos(), max(0, earliestNextBatchStartTime - System.nanoTime())); return timeValueNanos(waitTime); } diff --git a/server/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java b/server/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java index a76fced9772f5..770fc65621055 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java @@ -36,7 +36,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.closeTo; @@ -152,7 +151,7 @@ public ScheduledCancellable schedule(Runnable command, TimeValue delay, String n } }; try { - workerState.delayPrepareBulkRequest(threadPool, timeValueNanos(System.nanoTime()), batchSizeForMaxDelay, + workerState.delayPrepareBulkRequest(threadPool, System.nanoTime(), batchSizeForMaxDelay, new AbstractRunnable() { @Override protected void doRun() throws Exception { @@ -225,7 +224,7 @@ public boolean isCancelled() { }; try { // Have the task use the thread pool to delay a task that does nothing - workerState.delayPrepareBulkRequest(threadPool, timeValueSeconds(0), 1, new AbstractRunnable() { + workerState.delayPrepareBulkRequest(threadPool, 0, 1, new AbstractRunnable() { @Override protected void doRun() throws Exception { }