Skip to content

Commit

Permalink
Script: Fix value of ctx._now to be current epoch time in milliseco…
Browse files Browse the repository at this point in the history
…nds (#23175)

In update scripts, `ctx._now` uses the same milliseconds value used by the
rest of the system to calculate deltas. However, that time is not
actually epoch milliseconds, as it is derived from `System.nanoTime()`.
This change reworks the estimated time thread in ThreadPool which this
time is based on to make available both the relative time, as well as
absolute milliseconds (epoch) which may be used with calendar system. It
also renames the EstimatedTimeThread to a more apt CachedTimeThread.

closes #23169
  • Loading branch information
rjernst authored Feb 22, 2017
1 parent 175bda6 commit 18f57c0
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind
final UpdateHelper.Result translate;
// translate update request
try {
translate = updateHelper.prepare(updateRequest, primary, threadPool::estimatedTimeInMillis);
translate = updateHelper.prepare(updateRequest, primary, threadPool::absoluteTimeInMillis);
} catch (Exception failure) {
// we may fail translating a update to index or delete operation
// we use index result to communicate failure while translating update request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ protected void shardOperation(final UpdateRequest request, final ActionListener<
final ShardId shardId = request.getShardId();
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShard(shardId.getId());
final UpdateHelper.Result result = updateHelper.prepare(request, indexShard, threadPool::estimatedTimeInMillis);
final UpdateHelper.Result result = updateHelper.prepare(request, indexShard, threadPool::absoluteTimeInMillis);
switch (result.getResponseResult()) {
case CREATED:
IndexRequest upsertRequest = result.action();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public Codec getCodec() {

/**
* Returns a thread-pool mainly used to get estimated time stamps from
* {@link org.elasticsearch.threadpool.ThreadPool#estimatedTimeInMillis()} and to schedule
* {@link org.elasticsearch.threadpool.ThreadPool#relativeTimeInMillis()} and to schedule
* async force merge calls on the {@link org.elasticsearch.threadpool.ThreadPool.Names#FORCE_MERGE} thread-pool
*/
public ThreadPool getThreadPool() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
EngineMergeScheduler scheduler = null;
boolean success = false;
try {
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().estimatedTimeInMillis();
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().relativeTimeInMillis();

mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
throttle = new IndexThrottle();
Expand Down Expand Up @@ -446,7 +446,7 @@ private boolean checkVersionConflict(final Operation op, final long currentVersi

private long checkDeletedAndGCed(VersionValue versionValue) {
long currentVersion;
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > getGcDeletesInMillis()) {
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().relativeTimeInMillis() - versionValue.time()) > getGcDeletesInMillis()) {
currentVersion = Versions.NOT_FOUND; // deleted, and GC
} else {
currentVersion = versionValue.version();
Expand Down Expand Up @@ -726,7 +726,7 @@ public DeleteResult delete(Delete delete) throws IOException {
private void maybePruneDeletedTombstones() {
// It's expensive to prune because we walk the deletes map acquiring dirtyLock for each uid so we only do it
// every 1/4 of gcDeletesInMillis:
if (engineConfig.isEnableGcDeletes() && engineConfig.getThreadPool().estimatedTimeInMillis() - lastDeleteVersionPruneTimeMSec > getGcDeletesInMillis() * 0.25) {
if (engineConfig.isEnableGcDeletes() && engineConfig.getThreadPool().relativeTimeInMillis() - lastDeleteVersionPruneTimeMSec > getGcDeletesInMillis() * 0.25) {
pruneDeletedTombstones();
}
}
Expand Down Expand Up @@ -772,7 +772,7 @@ private DeleteResult innerDelete(Delete delete) throws IOException {
deleteResult = new DeleteResult(updatedVersion, seqNo, found);

versionMap.putUnderLock(delete.uid().bytes(),
new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis()));
new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().relativeTimeInMillis()));
}
if (!deleteResult.hasFailure()) {
location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
Expand Down Expand Up @@ -1047,7 +1047,7 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
}

private void pruneDeletedTombstones() {
long timeMSec = engineConfig.getThreadPool().estimatedTimeInMillis();
long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis();

// TODO: not good that we reach into LiveVersionMap here; can we move this inside VersionMap instead? problem is the dirtyLock...

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ private void contextProcessing(SearchContext context) {
}

private void contextProcessedSuccessfully(SearchContext context) {
context.accessed(threadPool.estimatedTimeInMillis());
context.accessed(threadPool.relativeTimeInMillis());
}

private void cleanContext(SearchContext context) {
Expand Down Expand Up @@ -794,7 +794,7 @@ public int getActiveContexts() {
class Reaper implements Runnable {
@Override
public void run() {
final long time = threadPool.estimatedTimeInMillis();
final long time = threadPool.relativeTimeInMillis();
for (SearchContext context : activeContexts.values()) {
// Use the same value for both checks since lastAccessTime can
// be modified by another thread between checks!
Expand Down
76 changes: 57 additions & 19 deletions core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public static ThreadPoolType fromType(String type) {

private final ScheduledThreadPoolExecutor scheduler;

private final EstimatedTimeThread estimatedTimeThread;
private final CachedTimeThread cachedTimeThread;

static final ExecutorService DIRECT_EXECUTOR = EsExecutors.newDirectExecutorService();

Expand Down Expand Up @@ -213,16 +213,33 @@ public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBui
this.scheduler.setRemoveOnCancelPolicy(true);

TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings);
this.estimatedTimeThread = new EstimatedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());
this.estimatedTimeThread.start();
this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());
this.cachedTimeThread.start();
}

public long estimatedTimeInMillis() {
return estimatedTimeThread.estimatedTimeInMillis();
/**
* Returns a value of milliseconds that may be used for relative time calculations.
*
* This method should only be used for calculating time deltas. For an epoch based
* timestamp, see {@link #absoluteTimeInMillis()}.
*/
public long relativeTimeInMillis() {
return cachedTimeThread.relativeTimeInMillis();
}

/**
* Returns the value of milliseconds since UNIX epoch.
*
* This method should only be used for exact date/time formatting. For calculating
* time deltas that should not suffer from negative deltas, which are possible with
* this method, see {@link #relativeTimeInMillis()}.
*/
public long absoluteTimeInMillis() {
return cachedTimeThread.absoluteTimeInMillis();
}

public Counter estimatedTimeInMillisCounter() {
return estimatedTimeThread.counter;
return cachedTimeThread.counter;
}

public ThreadPoolInfo info() {
Expand Down Expand Up @@ -342,8 +359,8 @@ public ScheduledFuture<?> schedule(TimeValue delay, String executor, Runnable co
}

public void shutdown() {
estimatedTimeThread.running = false;
estimatedTimeThread.interrupt();
cachedTimeThread.running = false;
cachedTimeThread.interrupt();
scheduler.shutdown();
for (ExecutorHolder executor : executors.values()) {
if (executor.executor() instanceof ThreadPoolExecutor) {
Expand All @@ -353,8 +370,8 @@ public void shutdown() {
}

public void shutdownNow() {
estimatedTimeThread.running = false;
estimatedTimeThread.interrupt();
cachedTimeThread.running = false;
cachedTimeThread.interrupt();
scheduler.shutdownNow();
for (ExecutorHolder executor : executors.values()) {
if (executor.executor() instanceof ThreadPoolExecutor) {
Expand All @@ -371,7 +388,7 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE
}
}

estimatedTimeThread.join(unit.toMillis(timeout));
cachedTimeThread.join(unit.toMillis(timeout));
return result;
}

Expand Down Expand Up @@ -471,29 +488,50 @@ public String toString() {
}
}

static class EstimatedTimeThread extends Thread {
/**
* A thread to cache millisecond time values from
* {@link System#nanoTime()} and {@link System#currentTimeMillis()}.
*
* The values are updated at a specified interval.
*/
static class CachedTimeThread extends Thread {

final long interval;
final TimeCounter counter;
volatile boolean running = true;
volatile long estimatedTimeInMillis;
volatile long relativeMillis;
volatile long absoluteMillis;

EstimatedTimeThread(String name, long interval) {
CachedTimeThread(String name, long interval) {
super(name);
this.interval = interval;
this.estimatedTimeInMillis = TimeValue.nsecToMSec(System.nanoTime());
this.relativeMillis = TimeValue.nsecToMSec(System.nanoTime());
this.absoluteMillis = System.currentTimeMillis();
this.counter = new TimeCounter();
setDaemon(true);
}

public long estimatedTimeInMillis() {
return this.estimatedTimeInMillis;
/**
* Return the current time used for relative calculations. This is
* {@link System#nanoTime()} truncated to milliseconds.
*/
long relativeTimeInMillis() {
return relativeMillis;
}

/**
* Return the current epoch time, used to find absolute time. This is
* a cached version of {@link System#currentTimeMillis()}.
*/
long absoluteTimeInMillis() {
return absoluteMillis;
}

@Override
public void run() {
while (running) {
estimatedTimeInMillis = TimeValue.nsecToMSec(System.nanoTime());
relativeMillis = TimeValue.nsecToMSec(System.nanoTime());
absoluteMillis = System.currentTimeMillis();
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
Expand All @@ -512,7 +550,7 @@ public long addAndGet(long delta) {

@Override
public long get() {
return estimatedTimeInMillis;
return relativeMillis;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,17 @@ public void testBoundedByBetweenMinAndMax() {
assertThat(ThreadPool.boundedBy(value, min, max), equalTo(value));
}

public void testAbsoluteTime() throws Exception {
TestThreadPool threadPool = new TestThreadPool("test");
try {
long currentTime = System.currentTimeMillis();
long gotTime = threadPool.absoluteTimeInMillis();
long delta = Math.abs(gotTime - currentTime);
assertTrue("thread pool cached absolute time " + gotTime + " is too far from real current time " + currentTime,
delta < 10000); // the delta can be large, we just care it is the same order of magnitude
} finally {
threadPool.shutdown();
threadPool.close();
}
}
}

0 comments on commit 18f57c0

Please sign in to comment.