From 9eb4d5e85066de23d22708043df91ffe68f10cf4 Mon Sep 17 00:00:00 2001 From: Ryan Ernst Date: Tue, 14 Feb 2017 20:40:19 -0800 Subject: [PATCH 1/2] Script: Fix value of `ctx._now` to be current epoch time in milliseconds In update scripts, `ctx._now` uses the same milliseconds value used by the rest of the system to caluclate deltas. However, that time is not actually epoch milliseconds, as it is derived from `System.nanoTime()`. This change reworks the estimated time thread in ThreadPool which this time is based on to make available both the relative time, as well as absolute milliseconds (epoch) which may be used with calendar system. It also renames the EstimatedTimeThread to a more apt CachedTimeThread. closes #23169 --- .../action/bulk/TransportShardBulkAction.java | 2 +- .../action/update/TransportUpdateAction.java | 2 +- .../index/engine/EngineConfig.java | 2 +- .../index/engine/InternalEngine.java | 10 +-- .../elasticsearch/search/SearchService.java | 4 +- .../elasticsearch/threadpool/ThreadPool.java | 76 ++++++++++++++----- 6 files changed, 67 insertions(+), 29 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index fc580dd388028..1557377fd054a 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -266,7 +266,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind final UpdateHelper.Result translate; // translate update request try { - translate = updateHelper.prepare(updateRequest, primary, threadPool::estimatedTimeInMillis); + translate = updateHelper.prepare(updateRequest, primary, threadPool::absoluteTimeInMillis); } catch (Exception failure) { // we may fail translating a update to index or delete operation // we use index result to communicate failure while translating update request diff --git a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java index 0235dd95a4b18..67d62113062a7 100644 --- a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java +++ b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -171,7 +171,7 @@ protected void shardOperation(final UpdateRequest request, final ActionListener< final ShardId shardId = request.getShardId(); final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); final IndexShard indexShard = indexService.getShard(shardId.getId()); - final UpdateHelper.Result result = updateHelper.prepare(request, indexShard, threadPool::estimatedTimeInMillis); + final UpdateHelper.Result result = updateHelper.prepare(request, indexShard, threadPool::absoluteTimeInMillis); switch (result.getResponseResult()) { case CREATED: IndexRequest upsertRequest = result.action(); diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 4fe947660c142..60dddc4d40db1 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -188,7 +188,7 @@ public Codec getCodec() { /** * Returns a thread-pool mainly used to get estimated time stamps from - * {@link org.elasticsearch.threadpool.ThreadPool#estimatedTimeInMillis()} and to schedule + * {@link org.elasticsearch.threadpool.ThreadPool#relativeTimeInMillis()} and to schedule * async force merge calls on the {@link org.elasticsearch.threadpool.ThreadPool.Names#FORCE_MERGE} thread-pool */ public ThreadPool getThreadPool() { diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index bcfee5026ce9a..ebc602f6386f5 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -147,7 +147,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { EngineMergeScheduler scheduler = null; boolean success = false; try { - this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().estimatedTimeInMillis(); + this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().relativeTimeInMillis(); mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings()); throttle = new IndexThrottle(); @@ -446,7 +446,7 @@ private boolean checkVersionConflict(final Operation op, final long currentVersi private long checkDeletedAndGCed(VersionValue versionValue) { long currentVersion; - if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > getGcDeletesInMillis()) { + if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().relativeTimeInMillis() - versionValue.time()) > getGcDeletesInMillis()) { currentVersion = Versions.NOT_FOUND; // deleted, and GC } else { currentVersion = versionValue.version(); @@ -710,7 +710,7 @@ public DeleteResult delete(Delete delete) throws IOException { private void maybePruneDeletedTombstones() { // It's expensive to prune because we walk the deletes map acquiring dirtyLock for each uid so we only do it // every 1/4 of gcDeletesInMillis: - if (engineConfig.isEnableGcDeletes() && engineConfig.getThreadPool().estimatedTimeInMillis() - lastDeleteVersionPruneTimeMSec > getGcDeletesInMillis() * 0.25) { + if (engineConfig.isEnableGcDeletes() && engineConfig.getThreadPool().relativeTimeInMillis() - lastDeleteVersionPruneTimeMSec > getGcDeletesInMillis() * 0.25) { pruneDeletedTombstones(); } } @@ -756,7 +756,7 @@ private DeleteResult innerDelete(Delete delete) throws IOException { deleteResult = new DeleteResult(updatedVersion, seqNo, found); versionMap.putUnderLock(delete.uid().bytes(), - new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis())); + new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().relativeTimeInMillis())); } if (!deleteResult.hasFailure()) { location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY @@ -1031,7 +1031,7 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti } private void pruneDeletedTombstones() { - long timeMSec = engineConfig.getThreadPool().estimatedTimeInMillis(); + long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis(); // TODO: not good that we reach into LiveVersionMap here; can we move this inside VersionMap instead? problem is the dirtyLock... diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index 9044db37a33e8..3d093e5ae7282 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -561,7 +561,7 @@ private void contextProcessing(SearchContext context) { } private void contextProcessedSuccessfully(SearchContext context) { - context.accessed(threadPool.estimatedTimeInMillis()); + context.accessed(threadPool.relativeTimeInMillis()); } private void cleanContext(SearchContext context) { @@ -794,7 +794,7 @@ public int getActiveContexts() { class Reaper implements Runnable { @Override public void run() { - final long time = threadPool.estimatedTimeInMillis(); + final long time = threadPool.relativeTimeInMillis(); for (SearchContext context : activeContexts.values()) { // Use the same value for both checks since lastAccessTime can // be modified by another thread between checks! diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 6817ae991a375..b68037b8dc6f2 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -142,7 +142,7 @@ public static ThreadPoolType fromType(String type) { private final ScheduledThreadPoolExecutor scheduler; - private final EstimatedTimeThread estimatedTimeThread; + private final CachedTimeThread cachedTimeThread; static final ExecutorService DIRECT_EXECUTOR = EsExecutors.newDirectExecutorService(); @@ -213,16 +213,33 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui this.scheduler.setRemoveOnCancelPolicy(true); TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings); - this.estimatedTimeThread = new EstimatedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis()); - this.estimatedTimeThread.start(); + this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis()); + this.cachedTimeThread.start(); } - public long estimatedTimeInMillis() { - return estimatedTimeThread.estimatedTimeInMillis(); + /** + * Returns a value of milliseconds that may be used for relative time calculations. + * + * This method should only be used for calculating time deltas. For an epoch based + * timestamp, see {@link #absoluteTimeInMillis()}. + */ + public long relativeTimeInMillis() { + return cachedTimeThread.relativeTimeInMillis(); + } + + /** + * Returns the value of milliseconds since UNIX epoch. + * + * This method should only be used for exact date/time formatting. For calculating + * time deltas that should not suffer from negative deltas, which are possible with + * this method, see {@link #relativeTimeInMillis()}. + */ + public long absoluteTimeInMillis() { + return cachedTimeThread.absoluteTimeInMillis(); } public Counter estimatedTimeInMillisCounter() { - return estimatedTimeThread.counter; + return cachedTimeThread.counter; } public ThreadPoolInfo info() { @@ -342,8 +359,8 @@ public ScheduledFuture schedule(TimeValue delay, String executor, Runnable co } public void shutdown() { - estimatedTimeThread.running = false; - estimatedTimeThread.interrupt(); + cachedTimeThread.running = false; + cachedTimeThread.interrupt(); scheduler.shutdown(); for (ExecutorHolder executor : executors.values()) { if (executor.executor() instanceof ThreadPoolExecutor) { @@ -353,8 +370,8 @@ public void shutdown() { } public void shutdownNow() { - estimatedTimeThread.running = false; - estimatedTimeThread.interrupt(); + cachedTimeThread.running = false; + cachedTimeThread.interrupt(); scheduler.shutdownNow(); for (ExecutorHolder executor : executors.values()) { if (executor.executor() instanceof ThreadPoolExecutor) { @@ -371,7 +388,7 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE } } - estimatedTimeThread.join(unit.toMillis(timeout)); + cachedTimeThread.join(unit.toMillis(timeout)); return result; } @@ -471,29 +488,50 @@ public String toString() { } } - static class EstimatedTimeThread extends Thread { + /** + * A thread to cache millisecond time values from + * {@link System#nanoTime()} and {@link System#currentTimeMillis()}. + * + * The values are updated at a specified interval. + */ + static class CachedTimeThread extends Thread { final long interval; final TimeCounter counter; volatile boolean running = true; - volatile long estimatedTimeInMillis; + volatile long relativeMillis; + volatile long absoluteMillis; - EstimatedTimeThread(String name, long interval) { + CachedTimeThread(String name, long interval) { super(name); this.interval = interval; - this.estimatedTimeInMillis = TimeValue.nsecToMSec(System.nanoTime()); + this.relativeMillis = TimeValue.nsecToMSec(System.nanoTime()); + this.absoluteMillis = System.currentTimeMillis(); this.counter = new TimeCounter(); setDaemon(true); } - public long estimatedTimeInMillis() { - return this.estimatedTimeInMillis; + /** + * Return the current time used for relative calculations. This is + * {@link System#nanoTime()} truncated to milliseconds. + */ + long relativeTimeInMillis() { + return relativeMillis; + } + + /** + * Return the current epoch time, used to find absolute time. This is + * a cached version of {@link System#currentTimeMillis()}. + */ + long absoluteTimeInMillis() { + return absoluteMillis; } @Override public void run() { while (running) { - estimatedTimeInMillis = TimeValue.nsecToMSec(System.nanoTime()); + relativeMillis = TimeValue.nsecToMSec(System.nanoTime()); + absoluteMillis = System.currentTimeMillis(); try { Thread.sleep(interval); } catch (InterruptedException e) { @@ -512,7 +550,7 @@ public long addAndGet(long delta) { @Override public long get() { - return estimatedTimeInMillis; + return relativeMillis; } } } From ff17b5d181f230c2057c8022c8f7a42550254f9b Mon Sep 17 00:00:00 2001 From: Ryan Ernst Date: Wed, 22 Feb 2017 15:02:33 -0800 Subject: [PATCH 2/2] Add basic test for absolute time --- .../elasticsearch/threadpool/ThreadPoolTests.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java b/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java index daad1a51a085b..b277de64a9d9f 100644 --- a/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java +++ b/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java @@ -46,4 +46,17 @@ public void testBoundedByBetweenMinAndMax() { assertThat(ThreadPool.boundedBy(value, min, max), equalTo(value)); } + public void testAbsoluteTime() throws Exception { + TestThreadPool threadPool = new TestThreadPool("test"); + try { + long currentTime = System.currentTimeMillis(); + long gotTime = threadPool.absoluteTimeInMillis(); + long delta = Math.abs(gotTime - currentTime); + assertTrue("thread pool cached absolute time " + gotTime + " is too far from real current time " + currentTime, + delta < 10000); // the delta can be large, we just care it is the same order of magnitude + } finally { + threadPool.shutdown(); + threadPool.close(); + } + } }