From f4d287652e09da627f51779bec1c3ca7f1d46e70 Mon Sep 17 00:00:00 2001 From: Stephen Connolly Date: Wed, 20 Apr 2016 17:16:15 +0100 Subject: [PATCH] [FIXED JENKINS-34213] Ensure that the unexporter cleans up whatever it can each sweep (#81) * [FIXED JENKINS-34213] Ensure that the unexporter cleans up whatever it can each sweep * [JENKINS-34213] Collect and report meaningful stats, batch the reference collection for better stability - We want to report on these things only if they are an issue. Logging of the actual stats should be below the radar of the users using a default logger level of `INFO` provided that the Unexporter is not doing much - When the Unexporter is busy (i.e. the m1 rate is > 100/sec) then we should start reporting at `INFO` - In the event that there is sustained high levels of work, we should alert the user and recommend turning off the stack traces to reduce GC pressure - My stress testing revealed that under very heavy load it is better to batch the removal and then batch the clean-up even if this batching means that the sweeps are not as frequent. * [JENKINS-34213] Oops typo in the stats measurement * Ignore when the channel is already closed due to a race between testing for close and close Spotted in https://jenkins.ci.cloudbees.com/job/libraries/job/remoting/177/console * [JENKINS-34213] Copy and paste error The three most common programming errors are: - Off by one errors - Copy and paste errors * [JENKINS-34213] Just when you were least expecting it, findbugs finds an actual real bug --- .../remoting/RemoteInvocationHandler.java | 336 ++++++++++++++++-- 1 file changed, 306 insertions(+), 30 deletions(-) diff --git a/src/main/java/hudson/remoting/RemoteInvocationHandler.java b/src/main/java/hudson/remoting/RemoteInvocationHandler.java index ec3fd9bfa..3766a49ab 100644 --- a/src/main/java/hudson/remoting/RemoteInvocationHandler.java +++ b/src/main/java/hudson/remoting/RemoteInvocationHandler.java @@ -23,9 +23,6 @@ */ package hudson.remoting; -import org.jenkinsci.remoting.RoleChecker; - -import javax.annotation.CheckForNull; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; @@ -51,6 +48,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.CheckForNull; +import org.jenkinsci.remoting.RoleChecker; /** * Sits behind a proxy object and implements the proxy logic. @@ -342,7 +341,7 @@ private PhantomReferenceImpl(RemoteInvocationHandler referent, ReferenceQueue referenceQueue) { super(referent, referenceQueue); this.oid = referent.oid; - this.origin = referent.origin; + this.origin = Unexporter.retainOrigin ? referent.origin : null; this.channel = referent.channel; } @@ -373,6 +372,65 @@ private void cleanup() throws IOException { */ private static class Unexporter implements Runnable { + /** + * Constant to help conversion + */ + private static final long NANOSECONDS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1L); + /** + * Constant to help conversion + */ + private static final double NANOSECONDS_PER_SECOND = TimeUnit.SECONDS.toNanos(1L); + /** + * If you have a high throughput of remoting requests and you do not need the call-site tracability + * you can reduce GC pressure by discarding the origin call-site stack traces and setting this system + * property. + * @since FIXME after merge + */ + private static final boolean retainOrigin = + Boolean.parseBoolean(System.getProperty(Unexporter.class.getName() + ".retainOrigin", "true")); + /** + * How often to sweep out references from {@link Channel} instances that are closed (and therefore have a no-op + * {@link PhantomReferenceImpl#cleanup()}. + * @since FIXME after merge + */ + private static final long sweepInterval = + secSysPropAsNanos(Unexporter.class.getName() + ".sweepInterval", 0.1, 0.2, 5.0); + /** + * How often to measure the quantity of work being done by the {@link Unexporter}. + * + * @since FIXME after merge + */ + private static final long measureInterval = + secSysPropAsNanos(Unexporter.class.getName() + ".measureInterval", 5.0, 5.0, 60.0); + /** + * How often to report the statistics for the work being done by the {@link Unexporter}. + * + * @since FIXME after merge + */ + private static final long reportInterval = + secSysPropAsNanos(Unexporter.class.getName() + ".reportInterval", 30.0, 60.0, 3600.0); + /** + * Stress testing has revealed that it is better to batch removing references and processing their clean-up + * even if this means that the sweep gets delayed. This constant allows for tuning of the batch size, + * if the current recommendation proves insufficient in real world scenarios. + */ + private static final int batchSize = Math.max(10, Math.min(10000, Integer.getInteger( + Unexporter.class.getName() + ".batchSize", 256))); + /** + * The decay factor for a rolling average that expects to be updated every {@link #measureInterval} and + * should have a lifetime of approx 1 minute. + */ + private static final double m1Alpha = 1.0 - Math.exp( -measureInterval * 1.0 / TimeUnit.MINUTES.toNanos(1)); + /** + * The decay factor for a rolling average that expects to be updated every {@link #measureInterval} and + * should have a lifetime of approx 5 minutes. + */ + private static final double m5Alpha = 1.0 - Math.exp( -measureInterval * 1.0 / TimeUnit.MINUTES.toNanos(5)); + /** + * The decay factor for a rolling average that expects to be updated every {@link #measureInterval} and + * should have a lifetime of approx 15 minutes. + */ + private static final double m15Alpha = 1.0 - Math.exp( -measureInterval * 1.0 / TimeUnit.MINUTES.toNanos(15)); /** * Our executor service, we use at most one thread for all {@link Channel} instances in the current classloader. */ @@ -397,6 +455,85 @@ private static class Unexporter implements Runnable { */ private final ConcurrentMap> referenceLists = new ConcurrentHashMap>(); + /** + * The 1 minute rolling average. + */ + private double m1Avg = 0.0; + /** + * The 1 minute rolling variance. + */ + private double m1Var = 0.0; + /** + * The 5 minute rolling average. + */ + private double m5Avg = 0.0; + /** + * The 5 minute rolling variance. + */ + private double m5Var = 0.0; + /** + * The 15 minute rolling average. + */ + private double m15Avg = 0.0; + /** + * The 15 minute rolling variance. + */ + private double m15Var = 0.0; + /** + * The cumulative all-time average. + */ + private double tAvg = 0.0; + /** + * The cumulative number of measurements. + */ + private long tCount = 0; + /** + * The cumulative all-time variance multiplied by the {@link #tCount}. It needs to be this way to minimize + * the iterative updating cost while maximizing the precision. + */ + private double tVarTimesCount = 0.0; + /** + * When we started the current measurement. + */ + private long countStart = System.nanoTime(); + /** + * The current measurement. + */ + private long count = 0; + /** + * When we would like to complete the current measurement. + */ + private long nextMeasure = countStart + measureInterval; + /** + * When we would like to report the statistics. + */ + private long nextReport = countStart + reportInterval; + + /** + * Gets the named system property value which is treated as a number of seconds and converted into nanoseconds. + * The value can be specified using decimals which allows the user to specify meaningful times in a consistent + * unit rather than force them to jump between different units. + * + * @param name the name of the system property. + * @param min the minimum (in seconds) to return. + * @param def the default (in seconds) to return. + * @param max the maximum (in seconds) to return. + * @return the value of the system property after converting into nanoseconds. + */ + private static long secSysPropAsNanos(String name, double min, double def, double max) { + String value = System.getProperty(name); + double seconds; + try { + seconds = value == null || value.isEmpty() ? def : Double.parseDouble(value); + } catch (NumberFormatException e) { + logger.log(Level.WARNING, + String.format("The system property '%s'='%s' could not be parsed", name, value), + e + ); + seconds = def; + } + return (long) (1.0e9 * Math.max(min, Math.min(max, seconds))); + } /** * {@inheritDoc} @@ -409,42 +546,66 @@ public void run() { } inQueue.set(false); // we have started execution and running is true, so queued can be reset try { - long nextSweep = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(200); + long nextSweep = System.nanoTime() + sweepInterval; + PhantomReferenceImpl[] batch = new PhantomReferenceImpl[batchSize]; while (!referenceLists.isEmpty()) { + if (System.nanoTime() - nextMeasure > 0) { + updateStats(); + } + if (System.nanoTime() - nextReport > 0) { + reportStats(); + } try { - Reference ref = queue.remove(100); - if (ref instanceof PhantomReferenceImpl) { - PhantomReferenceImpl r = (PhantomReferenceImpl) ref; - final Channel.Ref channelRef = r.channel; - try { - r.cleanup(); - } catch (IOException e) { - logger.log(Level.WARNING, - String.format("Couldn't clean up oid=%d from %s", r.oid, r.origin), e); - } catch (Error e) { - logger.log(Level.SEVERE, - String.format("Couldn't clean up oid=%d from %s", r.oid, r.origin), e); - throw e; // pass on as there is nothing we can do with an error - } catch (Throwable e) { - logger.log(Level.WARNING, - String.format("Couldn't clean up oid=%d from %s", r.oid, r.origin), e); - } finally { - if (channelRef != null) { - final List referenceList = referenceLists.get(channelRef); - if (referenceList != null) { - referenceList.remove(r); - if (channelRef.channel() == null) { - cleanList(referenceList); + long remaining; + int batchIndex = 0; + while (nextSweep - System.nanoTime() > 0) { + while (batchIndex < batch.length + && (remaining = (nextSweep - System.nanoTime())/ NANOSECONDS_PER_MILLISECOND) > 0) { + Reference ref = queue.remove(remaining); + if (ref == null) { + break; + } + if (ref instanceof PhantomReferenceImpl) { + batch[batchIndex++] = (PhantomReferenceImpl) ref; + } + } + for (int index = 0; index < batchIndex; index++) { + count++; + final Channel.Ref channelRef = batch[index].channel; + try { + batch[index].cleanup(); + } catch (ChannelClosedException e) { + // ignore, the cleanup is a no-op + } catch (IOException e) { + logger.log(Level.WARNING, String.format("Couldn't clean up oid=%d from %s", + batch[index].oid, batch[index].origin), e); + } catch (Error e) { + logger.log(Level.SEVERE, String.format("Couldn't clean up oid=%d from %s", + batch[index].oid, batch[index].origin), e); + throw e; // pass on as there is nothing we can do with an error + } catch (Throwable e) { + logger.log(Level.WARNING, String.format("Couldn't clean up oid=%d from %s", + batch[index].oid, batch[index].origin), e); + } finally { + if (channelRef != null) { + final List referenceList = referenceLists.get(channelRef); + if (referenceList != null) { + if (channelRef.channel() == null) { + cleanList(referenceList); + } else { + referenceList.remove(batch[index]); + } } } + batch[index] = null; // clear out the reference from the array as we reuse the array } } } } catch (InterruptedException e) { logger.log(Level.FINE, "Interrupted", e); } - if (System.nanoTime() > nextSweep) { - nextSweep = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(200); + if (System.nanoTime() - nextSweep > 0) { + nextSweep = System.nanoTime() + sweepInterval; // purge any dead channels, it does not matter if we spend a long time here as we are // removing future potential work for us from ever entering the queue and freeing garbage for (Iterator>> @@ -465,6 +626,121 @@ public void run() { } } + private void updateStats() { + long measureDuration = System.nanoTime() - countStart; + double instantRate = count * NANOSECONDS_PER_SECOND / measureDuration; + countStart = System.nanoTime(); + nextMeasure = countStart + measureInterval; + count = 0; + if (tCount == 0) { + m1Avg = m5Avg = m15Avg = tAvg = instantRate; + tCount = 1; + } else { + // compute the exponentially weighted moving average and variance + // see http://www-uxsup.csx.cam.ac.uk/~fanf2/hermes/doc/antiforgery/stats.pdf + double diff = instantRate - m1Avg; + double incr = m1Alpha * diff; + m1Avg = m1Avg + incr; + m1Var = (1 - m1Alpha) * (m1Var + diff * incr); + diff = instantRate - m5Avg; + incr = m5Alpha * diff; + m5Avg = m5Avg + incr; + m5Var = (1 - m5Alpha) * (m5Var + diff * incr); + diff = instantRate - m15Avg; + incr = m15Alpha * diff; + m15Avg = m15Avg + incr; + m15Var = (1 - m15Alpha) * (m15Var + diff * incr); + diff = instantRate - tAvg; + tAvg = (tAvg * tCount + instantRate) / (++tCount); + tVarTimesCount = tVarTimesCount + diff * (instantRate - tAvg); + } + } + + private void reportStats() { + nextReport = System.nanoTime() + reportInterval; + double m1Std = m1Var <= 0 ? 0 : Math.sqrt(m1Var); + double m5Std = m5Var <= 0 ? 0 : Math.sqrt(m5Var); + double m15Std = m15Var <= 0 ? 0 : Math.sqrt(m15Var); + double tStd = tCount <= 0 || tVarTimesCount < 0 ? 0 : Math.sqrt(tVarTimesCount / tCount); + Level targetLevel = m15Avg > 100 ? Level.INFO : m15Avg > 50 ? Level.FINE : Level.FINER; + if (logger.isLoggable(targetLevel)) { + logger.log(targetLevel, "rate(1min) = {0,number,0.0}±{1,number,0.0}/sec; " + + "rate(5min) = {2,number,0.0}±{3,number,0.0}/sec; " + + "rate(15min) = {4,number,0.0}±{5,number,0.0}/sec; " + + "rate(total) = {6,number,0.0}±{7,number,0.0}/sec; N = {8,number}", + new Object[]{ + m1Avg, m1Std, m5Avg, m5Std, m15Avg, m15Std, tAvg, tStd, tCount + }); + } + if (tCount < 10L) { + // less than 10 reports is too soon to start alerting the user + return; + } + // now test if the average is above 100/sec + if (m15Std > 1 && 100 < m15Avg - 2 * m15Std) { + if (tStd > 1 && 100 < tAvg - 2 * tStd) { + logger.log(Level.SEVERE, + retainOrigin ? + "The all time average rate is {0,number,0.0}±{1,number,0.0}/sec. " + + "The 15 minute average rate is {2,number,0.0}±{3,number,0.0}/sec. " + + "At the 95% confidence level both are above 100.0/sec. " + + "If this message is repeated often in the logs then PLEASE " + + "seriously consider setting system property ''hudson.remoting" + + ".RemoteInvocationHandler.Unexporter.retainOrigin'' to " + + "''false'' to trade debug diagnostics for reduced memory " + + "pressure." + : "The all time average rate is {0,number,0.0}±{1,number,0.0}/sec. " + + "The 15 minute average rate is {2,number,0.0}±{3,number,0.0}/sec. " + + "At the 95% confidence level both are above 100.0/sec. ", + new Object[]{tAvg, tStd, m15Avg, m15Std}); + return; + } + logger.log(Level.WARNING, + retainOrigin ? + "The 15 minute average rate is {0,number,0.0}±{1,number,0.0}/sec. " + + "At the 95% confidence level this is above 100.0/sec. " + + "If this message is repeated often in the logs then very " + + "seriously consider setting system property ''hudson.remoting" + + ".RemoteInvocationHandler.Unexporter.retainOrigin'' to " + + "''false'' to trade debug diagnostics for reduced memory " + + "pressure." + : "The 15 minute average rate is {0,number,0.0}±{1,number,0.0}/sec. " + + "At the 95% confidence level this is above 100.0/sec. ", + new Object[]{m15Avg, m15Std}); + return; + } + if (m5Std > 1 && 100 < m5Avg - 2 * m5Std) { + logger.log(Level.WARNING, + retainOrigin ? + "The 5 minute average rate is {0,number,0.0}±{1,number,0.0}/sec. " + + "At the 95% confidence level this is above 100.0/sec. " + + "If this message is repeated often in the logs then " + + "seriously consider setting system property ''hudson.remoting" + + ".RemoteInvocationHandler.Unexporter.retainOrigin'' to " + + "''false'' to trade debug diagnostics for reduced memory " + + "pressure." + : "The 5 minute average rate is {0,number,0.0}±{1,number,0.0}/sec. " + + "At the 95% confidence level this is above 100.0/sec. ", + new Object[]{m5Avg, m5Std}); + return; + } + if (m1Std > 1 && 100 < m1Avg - 2 * m1Std) { + logger.log(Level.INFO, + retainOrigin ? + "The 1 minute average rate is {0,number,0.0}±{1,number,0.0}/sec. " + + "At the 95% confidence level this is above 100.0/sec. " + + "If this message is repeated often in the logs then " + + "consider setting system property ''hudson.remoting" + + ".RemoteInvocationHandler.Unexporter.retainOrigin'' to " + + "''false'' to trade debug diagnostics for reduced memory " + + "pressure." + : "The 1 minute average rate is {0,number,0.0}±{1,number,0.0}/sec. " + + "At the 95% confidence level this is above 100.0/sec. ", + new Object[]{m1Avg, m1Std}); + return; + } + } + /** * Cleans a {@link List} of {@link PhantomReferenceImpl} as the {@link Channel} has closed. *