Skip to content

Commit

Permalink
Reindex negative TimeValue fix (#54057)
Browse files Browse the repository at this point in the history
Reindex would use timeValueNanos(System.nanoTime()). The intended use
for TimeValue is as a duration, not as absolute time. In particular,
this could result in negative TimeValue's, being unsupported in #53913.
Modified to use the bare long nano-second value.
  • Loading branch information
henningandersen committed Mar 24, 2020
1 parent fc498f6 commit 7ce7aff
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,16 +248,16 @@ public void start() {
void onScrollResponse(ScrollableHitSource.AsyncResponse asyncResponse) {
// lastBatchStartTime is essentially unused (see WorkerBulkByScrollTaskState.throttleWaitTime. Leaving it for now, since it seems
// like a bug?
onScrollResponse(new TimeValue(System.nanoTime()), this.lastBatchSize, asyncResponse);
onScrollResponse(System.nanoTime(), this.lastBatchSize, asyncResponse);
}

/**
* Process a scroll response.
* @param lastBatchStartTime the time when the last batch started. Used to calculate the throttling delay.
* @param lastBatchStartTimeNS the time when the last batch started. Used to calculate the throttling delay.
* @param lastBatchSize the size of the last batch. Used to calculate the throttling delay.
* @param asyncResponse the response to process from ScrollableHitSource
*/
void onScrollResponse(TimeValue lastBatchStartTime, int lastBatchSize, ScrollableHitSource.AsyncResponse asyncResponse) {
void onScrollResponse(long lastBatchStartTimeNS, int lastBatchSize, ScrollableHitSource.AsyncResponse asyncResponse) {
ScrollableHitSource.Response response = asyncResponse.response();
logger.debug("[{}]: got scroll response with [{}] hits", task.getId(), response.getHits().size());
if (task.isCancelled()) {
Expand Down Expand Up @@ -285,7 +285,7 @@ protected void doRun() throws Exception {
* It is important that the batch start time be calculated from here, scroll response to scroll response. That way the time
* waiting on the scroll doesn't count against this batch in the throttle.
*/
prepareBulkRequest(timeValueNanos(System.nanoTime()), asyncResponse);
prepareBulkRequest(System.nanoTime(), asyncResponse);
}

@Override
Expand All @@ -294,15 +294,15 @@ public void onFailure(Exception e) {
}
};
prepareBulkRequestRunnable = (AbstractRunnable) threadPool.getThreadContext().preserveContext(prepareBulkRequestRunnable);
worker.delayPrepareBulkRequest(threadPool, lastBatchStartTime, lastBatchSize, prepareBulkRequestRunnable);
worker.delayPrepareBulkRequest(threadPool, lastBatchStartTimeNS, lastBatchSize, prepareBulkRequestRunnable);
}

/**
* Prepare the bulk request. Called on the generic thread pool after some preflight checks have been done one the SearchResponse and any
* delay has been slept. Uses the generic thread pool because reindex is rare enough not to need its own thread pool and because the
* thread may be blocked by the user script.
*/
void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.AsyncResponse asyncResponse) {
void prepareBulkRequest(long thisBatchStartTimeNS, ScrollableHitSource.AsyncResponse asyncResponse) {
ScrollableHitSource.Response response = asyncResponse.response();
logger.debug("[{}]: preparing bulk request", task.getId());
if (task.isCancelled()) {
Expand All @@ -328,12 +328,12 @@ void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.AsyncR
/*
* If we noop-ed the entire batch then just skip to the next batch or the BulkRequest would fail validation.
*/
notifyDone(thisBatchStartTime, asyncResponse, 0);
notifyDone(thisBatchStartTimeNS, asyncResponse, 0);
return;
}
request.timeout(mainRequest.getTimeout());
request.waitForActiveShards(mainRequest.getWaitForActiveShards());
sendBulkRequest(request, () -> notifyDone(thisBatchStartTime, asyncResponse, request.requests().size()));
sendBulkRequest(request, () -> notifyDone(thisBatchStartTimeNS, asyncResponse, request.requests().size()));
}

/**
Expand Down Expand Up @@ -419,14 +419,14 @@ void onBulkResponse(BulkResponse response, Runnable onSuccess) {
}
}

void notifyDone(TimeValue thisBatchStartTime, ScrollableHitSource.AsyncResponse asyncResponse, int batchSize) {
void notifyDone(long thisBatchStartTimeNS, ScrollableHitSource.AsyncResponse asyncResponse, int batchSize) {
if (task.isCancelled()) {
logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
finishHim(null);
return;
}
this.lastBatchSize = batchSize;
asyncResponse.done(worker.throttleWaitTime(thisBatchStartTime, timeValueNanos(System.nanoTime()), batchSize));
asyncResponse.done(worker.throttleWaitTime(thisBatchStartTimeNS, System.nanoTime(), batchSize));
}

private void recordFailure(Failure failure, List<Failure> failures) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@
import static org.apache.lucene.util.TestUtil.randomSimpleString;
import static org.elasticsearch.action.bulk.BackoffPolicy.constantBackoff;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -256,7 +255,7 @@ public void testScrollResponseSetsTotal() {

long total = randomIntBetween(0, Integer.MAX_VALUE);
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), total, emptyList(), null);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueSeconds(0), 0, response);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), 0, 0, response);
assertEquals(total, testTask.getStatus().getTotal());
}

Expand All @@ -269,7 +268,7 @@ public void testScrollResponseBatchingBehavior() throws Exception {
Hit hit = new ScrollableHitSource.BasicHit("index", "type", "id", 0);
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 1, singletonList(hit), null);
DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction();
simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 0, response);
simulateScrollResponse(action, System.nanoTime(), 0, response);

// Use assert busy because the update happens on another thread
final int expectedBatches = batches;
Expand Down Expand Up @@ -355,7 +354,7 @@ public ScheduledCancellable schedule(Runnable command, TimeValue delay, String n
}
});
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 0, emptyList(), null);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 10, response);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), System.nanoTime(), 10, response);
ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
assertThat(e.getCause(), instanceOf(EsRejectedExecutionException.class));
assertThat(e.getCause(), hasToString(containsString("test")));
Expand All @@ -373,7 +372,7 @@ public void testShardFailuresAbortRequest() throws Exception {
SearchFailure shardFailure = new SearchFailure(new RuntimeException("test"));
ScrollableHitSource.Response scrollResponse = new ScrollableHitSource.Response(false, singletonList(shardFailure), 0,
emptyList(), null);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), System.nanoTime(), 0, scrollResponse);
BulkByScrollResponse response = listener.get();
assertThat(response.getBulkFailures(), empty());
assertThat(response.getSearchFailures(), contains(shardFailure));
Expand All @@ -387,7 +386,7 @@ public void testShardFailuresAbortRequest() throws Exception {
*/
public void testSearchTimeoutsAbortRequest() throws Exception {
ScrollableHitSource.Response scrollResponse = new ScrollableHitSource.Response(true, emptyList(), 0, emptyList(), null);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), System.nanoTime(), 0, scrollResponse);
BulkByScrollResponse response = listener.get();
assertThat(response.getBulkFailures(), empty());
assertThat(response.getSearchFailures(), empty());
Expand Down Expand Up @@ -424,7 +423,7 @@ protected AbstractAsyncBulkByScrollAction.RequestWrapper<?> buildRequest(Hit doc
ScrollableHitSource.BasicHit hit = new ScrollableHitSource.BasicHit("index", "type", "id", 0);
hit.setSource(new BytesArray("{}"), XContentType.JSON);
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 1, singletonList(hit), null);
simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 0, response);
simulateScrollResponse(action, System.nanoTime(), 0, response);
ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
assertThat(e.getCause(), instanceOf(RuntimeException.class));
assertThat(e.getCause().getMessage(), equalTo("surprise"));
Expand Down Expand Up @@ -620,7 +619,7 @@ public void testCancelBeforeInitialSearch() throws Exception {
}

public void testCancelBeforeScrollResponse() throws Exception {
cancelTaskCase((DummyAsyncBulkByScrollAction action) -> simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 1,
cancelTaskCase((DummyAsyncBulkByScrollAction action) -> simulateScrollResponse(action, System.nanoTime(), 1,
new ScrollableHitSource.Response(false, emptyList(), between(1, 100000), emptyList(), null)));
}

Expand All @@ -635,7 +634,7 @@ public void testCancelBeforeOnBulkResponse() throws Exception {
}

public void testCancelBeforeStartNextScroll() throws Exception {
TimeValue now = timeValueNanos(System.nanoTime());
long now = System.nanoTime();
cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.notifyDone(now, null, 0));
}

Expand Down Expand Up @@ -684,7 +683,7 @@ public ScheduledCancellable schedule(Runnable command, TimeValue delay, String n
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), total, emptyList(), null);
// Use a long delay here so the test will time out if the cancellation doesn't reschedule the throttled task
worker.rethrottle(1);
simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 1000, response);
simulateScrollResponse(action, System.nanoTime(), 1000, response);

// Now that we've got our cancel we'll just verify that it all came through all right
assertEquals(reason, listener.get(10, TimeUnit.SECONDS).getReasonCancelled());
Expand Down Expand Up @@ -713,7 +712,7 @@ private void cancelTaskCase(Consumer<DummyAsyncBulkByScrollAction> testMe) throw
/**
* Simulate a scroll response by setting the scroll id and firing the onScrollResponse method.
*/
private void simulateScrollResponse(DummyAsyncBulkByScrollAction action, TimeValue lastBatchTime, int lastBatchSize,
private void simulateScrollResponse(DummyAsyncBulkByScrollAction action, long lastBatchTime, int lastBatchSize,
ScrollableHitSource.Response response) {
action.setScroll(scrollId());
action.onScrollResponse(lastBatchTime, lastBatchSize, new ScrollableHitSource.AsyncResponse() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,11 @@ TimeValue throttledUntil() {
* Schedule prepareBulkRequestRunnable to run after some delay. This is where throttling plugs into reindexing so the request can be
* rescheduled over and over again.
*/
public void delayPrepareBulkRequest(ThreadPool threadPool, TimeValue lastBatchStartTime, int lastBatchSize,
public void delayPrepareBulkRequest(ThreadPool threadPool, long lastBatchStartTimeNS, int lastBatchSize,
AbstractRunnable prepareBulkRequestRunnable) {
// Synchronize so we are less likely to schedule the same request twice.
synchronized (delayedPrepareBulkRequestReference) {
TimeValue delay = throttleWaitTime(lastBatchStartTime, timeValueNanos(System.nanoTime()), lastBatchSize);
TimeValue delay = throttleWaitTime(lastBatchStartTimeNS, System.nanoTime(), lastBatchSize);
logger.debug("[{}]: preparing bulk request for [{}]", task.getId(), delay);
try {
delayedPrepareBulkRequestReference.set(new DelayedPrepareBulkRequest(threadPool, getRequestsPerSecond(),
Expand All @@ -197,8 +197,8 @@ public void delayPrepareBulkRequest(ThreadPool threadPool, TimeValue lastBatchSt
}
}

public TimeValue throttleWaitTime(TimeValue lastBatchStartTime, TimeValue now, int lastBatchSize) {
long earliestNextBatchStartTime = now.nanos() + (long) perfectlyThrottledBatchTime(lastBatchSize);
public TimeValue throttleWaitTime(long lastBatchStartTimeNS, long nowNS, int lastBatchSize) {
long earliestNextBatchStartTime = nowNS + (long) perfectlyThrottledBatchTime(lastBatchSize);
long waitTime = min(MAX_THROTTLE_WAIT_TIME.nanos(), max(0, earliestNextBatchStartTime - System.nanoTime()));
return timeValueNanos(waitTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.closeTo;
Expand Down Expand Up @@ -152,7 +151,7 @@ public ScheduledCancellable schedule(Runnable command, TimeValue delay, String n
}
};
try {
workerState.delayPrepareBulkRequest(threadPool, timeValueNanos(System.nanoTime()), batchSizeForMaxDelay,
workerState.delayPrepareBulkRequest(threadPool, System.nanoTime(), batchSizeForMaxDelay,
new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
Expand Down Expand Up @@ -225,7 +224,7 @@ public boolean isCancelled() {
};
try {
// Have the task use the thread pool to delay a task that does nothing
workerState.delayPrepareBulkRequest(threadPool, timeValueSeconds(0), 1, new AbstractRunnable() {
workerState.delayPrepareBulkRequest(threadPool, 0, 1, new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
}
Expand Down

0 comments on commit 7ce7aff

Please sign in to comment.