diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 46bd825d0c768..efc01ab45f818 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -225,7 +225,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind final UpdateHelper.Result translate; // translate update request try { - translate = updateHelper.prepare(updateRequest, primary, threadPool::estimatedTimeInMillis); + translate = updateHelper.prepare(updateRequest, primary, threadPool::absoluteTimeInMillis); } catch (Exception failure) { // we may fail translating a update to index or delete operation // we use index result to communicate failure while translating update request diff --git a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java index 0235dd95a4b18..67d62113062a7 100644 --- a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java +++ b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -171,7 +171,7 @@ protected void shardOperation(final UpdateRequest request, final ActionListener< final ShardId shardId = request.getShardId(); final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); final IndexShard indexShard = indexService.getShard(shardId.getId()); - final UpdateHelper.Result result = updateHelper.prepare(request, indexShard, threadPool::estimatedTimeInMillis); + final UpdateHelper.Result result = updateHelper.prepare(request, indexShard, threadPool::absoluteTimeInMillis); switch (result.getResponseResult()) { case CREATED: IndexRequest upsertRequest = result.action(); diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 4fe947660c142..60dddc4d40db1 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -188,7 +188,7 @@ public Codec getCodec() { /** * Returns a thread-pool mainly used to get estimated time stamps from - * {@link org.elasticsearch.threadpool.ThreadPool#estimatedTimeInMillis()} and to schedule + * {@link org.elasticsearch.threadpool.ThreadPool#relativeTimeInMillis()} and to schedule * async force merge calls on the {@link org.elasticsearch.threadpool.ThreadPool.Names#FORCE_MERGE} thread-pool */ public ThreadPool getThreadPool() { diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index a1ae385237ab2..0fa6855ce0867 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -147,7 +147,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { EngineMergeScheduler scheduler = null; boolean success = false; try { - this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().estimatedTimeInMillis(); + this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().relativeTimeInMillis(); mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings()); throttle = new IndexThrottle(); @@ -446,7 +446,7 @@ private boolean checkVersionConflict(final Operation op, final long currentVersi private long checkDeletedAndGCed(VersionValue versionValue) { long currentVersion; - if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > getGcDeletesInMillis()) { + if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().relativeTimeInMillis() - versionValue.time()) > getGcDeletesInMillis()) { currentVersion = Versions.NOT_FOUND; // deleted, and GC } else { currentVersion = versionValue.version(); @@ -726,7 +726,7 @@ public DeleteResult delete(Delete delete) throws IOException { private void maybePruneDeletedTombstones() { // It's expensive to prune because we walk the deletes map acquiring dirtyLock for each uid so we only do it // every 1/4 of gcDeletesInMillis: - if (engineConfig.isEnableGcDeletes() && engineConfig.getThreadPool().estimatedTimeInMillis() - lastDeleteVersionPruneTimeMSec > getGcDeletesInMillis() * 0.25) { + if (engineConfig.isEnableGcDeletes() && engineConfig.getThreadPool().relativeTimeInMillis() - lastDeleteVersionPruneTimeMSec > getGcDeletesInMillis() * 0.25) { pruneDeletedTombstones(); } } @@ -772,7 +772,7 @@ private DeleteResult innerDelete(Delete delete) throws IOException { deleteResult = new DeleteResult(updatedVersion, seqNo, found); versionMap.putUnderLock(delete.uid().bytes(), - new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis())); + new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().relativeTimeInMillis())); } if (!deleteResult.hasFailure()) { location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY @@ -1047,7 +1047,7 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti } private void pruneDeletedTombstones() { - long timeMSec = engineConfig.getThreadPool().estimatedTimeInMillis(); + long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis(); // TODO: not good that we reach into LiveVersionMap here; can we move this inside VersionMap instead? problem is the dirtyLock... diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index 9044db37a33e8..3d093e5ae7282 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -561,7 +561,7 @@ private void contextProcessing(SearchContext context) { } private void contextProcessedSuccessfully(SearchContext context) { - context.accessed(threadPool.estimatedTimeInMillis()); + context.accessed(threadPool.relativeTimeInMillis()); } private void cleanContext(SearchContext context) { @@ -794,7 +794,7 @@ public int getActiveContexts() { class Reaper implements Runnable { @Override public void run() { - final long time = threadPool.estimatedTimeInMillis(); + final long time = threadPool.relativeTimeInMillis(); for (SearchContext context : activeContexts.values()) { // Use the same value for both checks since lastAccessTime can // be modified by another thread between checks! diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 6817ae991a375..b68037b8dc6f2 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -142,7 +142,7 @@ public static ThreadPoolType fromType(String type) { private final ScheduledThreadPoolExecutor scheduler; - private final EstimatedTimeThread estimatedTimeThread; + private final CachedTimeThread cachedTimeThread; static final ExecutorService DIRECT_EXECUTOR = EsExecutors.newDirectExecutorService(); @@ -213,16 +213,33 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui this.scheduler.setRemoveOnCancelPolicy(true); TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings); - this.estimatedTimeThread = new EstimatedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis()); - this.estimatedTimeThread.start(); + this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis()); + this.cachedTimeThread.start(); } - public long estimatedTimeInMillis() { - return estimatedTimeThread.estimatedTimeInMillis(); + /** + * Returns a value of milliseconds that may be used for relative time calculations. + * + * This method should only be used for calculating time deltas. For an epoch based + * timestamp, see {@link #absoluteTimeInMillis()}. + */ + public long relativeTimeInMillis() { + return cachedTimeThread.relativeTimeInMillis(); + } + + /** + * Returns the value of milliseconds since UNIX epoch. + * + * This method should only be used for exact date/time formatting. For calculating + * time deltas that should not suffer from negative deltas, which are possible with + * this method, see {@link #relativeTimeInMillis()}. + */ + public long absoluteTimeInMillis() { + return cachedTimeThread.absoluteTimeInMillis(); } public Counter estimatedTimeInMillisCounter() { - return estimatedTimeThread.counter; + return cachedTimeThread.counter; } public ThreadPoolInfo info() { @@ -342,8 +359,8 @@ public ScheduledFuture schedule(TimeValue delay, String executor, Runnable co } public void shutdown() { - estimatedTimeThread.running = false; - estimatedTimeThread.interrupt(); + cachedTimeThread.running = false; + cachedTimeThread.interrupt(); scheduler.shutdown(); for (ExecutorHolder executor : executors.values()) { if (executor.executor() instanceof ThreadPoolExecutor) { @@ -353,8 +370,8 @@ public void shutdown() { } public void shutdownNow() { - estimatedTimeThread.running = false; - estimatedTimeThread.interrupt(); + cachedTimeThread.running = false; + cachedTimeThread.interrupt(); scheduler.shutdownNow(); for (ExecutorHolder executor : executors.values()) { if (executor.executor() instanceof ThreadPoolExecutor) { @@ -371,7 +388,7 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE } } - estimatedTimeThread.join(unit.toMillis(timeout)); + cachedTimeThread.join(unit.toMillis(timeout)); return result; } @@ -471,29 +488,50 @@ public String toString() { } } - static class EstimatedTimeThread extends Thread { + /** + * A thread to cache millisecond time values from + * {@link System#nanoTime()} and {@link System#currentTimeMillis()}. + * + * The values are updated at a specified interval. + */ + static class CachedTimeThread extends Thread { final long interval; final TimeCounter counter; volatile boolean running = true; - volatile long estimatedTimeInMillis; + volatile long relativeMillis; + volatile long absoluteMillis; - EstimatedTimeThread(String name, long interval) { + CachedTimeThread(String name, long interval) { super(name); this.interval = interval; - this.estimatedTimeInMillis = TimeValue.nsecToMSec(System.nanoTime()); + this.relativeMillis = TimeValue.nsecToMSec(System.nanoTime()); + this.absoluteMillis = System.currentTimeMillis(); this.counter = new TimeCounter(); setDaemon(true); } - public long estimatedTimeInMillis() { - return this.estimatedTimeInMillis; + /** + * Return the current time used for relative calculations. This is + * {@link System#nanoTime()} truncated to milliseconds. + */ + long relativeTimeInMillis() { + return relativeMillis; + } + + /** + * Return the current epoch time, used to find absolute time. This is + * a cached version of {@link System#currentTimeMillis()}. + */ + long absoluteTimeInMillis() { + return absoluteMillis; } @Override public void run() { while (running) { - estimatedTimeInMillis = TimeValue.nsecToMSec(System.nanoTime()); + relativeMillis = TimeValue.nsecToMSec(System.nanoTime()); + absoluteMillis = System.currentTimeMillis(); try { Thread.sleep(interval); } catch (InterruptedException e) { @@ -512,7 +550,7 @@ public long addAndGet(long delta) { @Override public long get() { - return estimatedTimeInMillis; + return relativeMillis; } } } diff --git a/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java b/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java index daad1a51a085b..b277de64a9d9f 100644 --- a/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java +++ b/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java @@ -46,4 +46,17 @@ public void testBoundedByBetweenMinAndMax() { assertThat(ThreadPool.boundedBy(value, min, max), equalTo(value)); } + public void testAbsoluteTime() throws Exception { + TestThreadPool threadPool = new TestThreadPool("test"); + try { + long currentTime = System.currentTimeMillis(); + long gotTime = threadPool.absoluteTimeInMillis(); + long delta = Math.abs(gotTime - currentTime); + assertTrue("thread pool cached absolute time " + gotTime + " is too far from real current time " + currentTime, + delta < 10000); // the delta can be large, we just care it is the same order of magnitude + } finally { + threadPool.shutdown(); + threadPool.close(); + } + } }