From 623fd63827b2953c150597f24c7205737119bebe Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Tue, 16 Jun 2015 13:06:15 -0700 Subject: [PATCH] HBASE-13876 Improving performance of HeapMemoryManager --- .../regionserver/DefaultHeapMemoryTuner.java | 248 ++++++++++++++++-- .../hbase/regionserver/HRegionServer.java | 3 +- .../hbase/regionserver/HeapMemoryManager.java | 51 +++- .../hbase/util/RollingStatCalculator.java | 113 ++++++++ .../regionserver/TestHeapMemoryManager.java | 201 ++++++++++++-- 5 files changed, 556 insertions(+), 60 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/util/RollingStatCalculator.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java index 5e97b803052f..93a95b008353 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java @@ -24,63 +24,234 @@ import static org.apache.hadoop.hbase.regionserver.HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY; import static org.apache.hadoop.hbase.regionserver.HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerContext; import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerResult; +import org.apache.hadoop.hbase.util.RollingStatCalculator; /** - * The default implementation for the HeapMemoryTuner. This will do simple checks to decide - * whether there should be changes in the heap size of memstore/block cache. When there is no block - * cache eviction at all but there are flushes because of global heap pressure, it will increase the - * memstore heap size and decrease block cache size. The step value for this heap size change can be - * specified using the config hbase.regionserver.heapmemory.autotuner.step. When there is no - * memstore flushes because of heap pressure but there is block cache evictions it will increase the - * block cache heap. + * The default implementation for the HeapMemoryTuner. This will do statistical checks on + * number of evictions, cache misses and flushes to decide whether there should be changes + * in the heap size of memstore/block cache. During each tuner operation tuner takes a step + * which can either be INCREASE_BLOCK_CACHE_SIZE (increase block cache size), + * INCREASE_MEMSTORE_SIZE (increase memstore size) and by default it is NEUTRAL (no change). + * We say block cache is sufficient when there is no block cache eviction at all or major amount of + * memory allocated to block cache is empty, similarly we say memory allocated for memstore is + * sufficient when there is no memstore flushes because of heap pressure or major amount of + * memory allocated to memstore is empty. If both are sufficient we do nothing, if exactly one of + * them is found to be sufficient we decrease its size by step and increase the other by + * same amount. If none of them is sufficient we do statistical analysis on number of cache misses + * and flushes to determine tuner direction. Based on these statistics we decide the tuner + * direction. If we are not confident about which step direction to take we do nothing and wait for + * next iteration. On expectation we will be tuning for at least 22% tuner calls. The number of + * past periods to consider for statistics calculation can be specified in config by + * hbase.regionserver.heapmemory.autotuner.lookup.periods. Also these many initial calls to + * tuner will be ignored (cache is warming up and we leave the system to reach steady state). + * After the tuner takes a step, in next call we insure that last call was indeed helpful and did + * not do us any harm. If not then we revert the previous step. The step size is dynamic and it + * changes based on current and previous tuning direction. When last tuner step was NEUTRAL + * and current tuning step is not NEUTRAL then we assume we are restarting the tuning process and + * step size is changed to maximum allowed size which can be specified in config by + * hbase.regionserver.heapmemory.autotuner.step.max. If we are reverting the previous step + * then we decrease step size to half. This decrease is similar to binary search where we try to + * reach the most desired value. The minimum step size can be specified in config by + * hbase.regionserver.heapmemory.autotuner.step.max. In other cases we leave step size + * unchanged. */ @InterfaceAudience.Private class DefaultHeapMemoryTuner implements HeapMemoryTuner { - - public static final String STEP_KEY = "hbase.regionserver.heapmemory.autotuner.step"; - public static final float DEFAULT_STEP_VALUE = 0.02f; // 2% - - private static final TunerResult TUNER_RESULT = new TunerResult(true); + public static final String MAX_STEP_KEY = "hbase.regionserver.heapmemory.autotuner.step.max"; + public static final String MIN_STEP_KEY = "hbase.regionserver.heapmemory.autotuner.step.min"; + public static final String SUFFICIENT_MEMORY_LEVEL_KEY = + "hbase.regionserver.heapmemory.autotuner.sufficient.memory.level"; + public static final String LOOKUP_PERIODS_KEY = + "hbase.regionserver.heapmemory.autotuner.lookup.periods"; + public static final String NUM_PERIODS_TO_IGNORE = + "hbase.regionserver.heapmemory.autotuner.ignored.periods"; + // Maximum step size that the tuner can take + public static final float DEFAULT_MAX_STEP_VALUE = 0.08f; // 8% + // Minimum step size that the tuner can take + public static final float DEFAULT_MIN_STEP_VALUE = 0.005f; // 0.5% + // If current block cache size or memstore size in use is below this level relative to memory + // provided to it then corresponding component will be considered to have sufficient memory + public static final float DEFAULT_SUFFICIENT_MEMORY_LEVEL_VALUE = 0.5f; // 50% + // Number of tuner periods that will be considered while calculating mean and deviation + // If set to zero, all stats will be calculated from the start + public static final int DEFAULT_LOOKUP_PERIODS = 60; + public static final int DEFAULT_NUM_PERIODS_IGNORED = 60; private static final TunerResult NO_OP_TUNER_RESULT = new TunerResult(false); + private Log LOG = LogFactory.getLog(DefaultHeapMemoryTuner.class); + private TunerResult TUNER_RESULT = new TunerResult(true); private Configuration conf; - private float step = DEFAULT_STEP_VALUE; + private float sufficientMemoryLevel = DEFAULT_SUFFICIENT_MEMORY_LEVEL_VALUE; + private float maximumStepSize = DEFAULT_MAX_STEP_VALUE; + private float minimumStepSize = DEFAULT_MIN_STEP_VALUE; + private int tunerLookupPeriods = DEFAULT_LOOKUP_PERIODS; + private int numPeriodsToIgnore = DEFAULT_NUM_PERIODS_IGNORED; + // Counter to ignore few initial periods while cache is still warming up + // Memory tuner will do no operation for the first "tunerLookupPeriods" + private int ignoreInitialPeriods = 0; private float globalMemStorePercentMinRange; private float globalMemStorePercentMaxRange; private float blockCachePercentMinRange; private float blockCachePercentMaxRange; + // Store statistics about the corresponding parameters for memory tuning + private RollingStatCalculator rollingStatsForCacheMisses; + private RollingStatCalculator rollingStatsForFlushes; + private RollingStatCalculator rollingStatsForEvictions; + // Set step size to max value for tuning, this step size will adjust dynamically while tuning + private float step = DEFAULT_MAX_STEP_VALUE; + private StepDirection prevTuneDirection = StepDirection.NEUTRAL; @Override public TunerResult tune(TunerContext context) { long blockedFlushCount = context.getBlockedFlushCount(); long unblockedFlushCount = context.getUnblockedFlushCount(); long evictCount = context.getEvictCount(); - boolean memstoreSufficient = blockedFlushCount == 0 && unblockedFlushCount == 0; - boolean blockCacheSufficient = evictCount == 0; - if (memstoreSufficient && blockCacheSufficient) { + long cacheMissCount = context.getCacheMissCount(); + long totalFlushCount = blockedFlushCount+unblockedFlushCount; + rollingStatsForCacheMisses.insertDataValue(cacheMissCount); + rollingStatsForFlushes.insertDataValue(totalFlushCount); + rollingStatsForEvictions.insertDataValue(evictCount); + StepDirection newTuneDirection = StepDirection.NEUTRAL; + if (ignoreInitialPeriods < numPeriodsToIgnore) { + // Ignoring the first few tuner periods + ignoreInitialPeriods++; return NO_OP_TUNER_RESULT; } + String tunerLog = ""; + // We can consider memstore or block cache to be sufficient if + // we are using only a minor fraction of what have been already provided to it. + boolean earlyMemstoreSufficientCheck = totalFlushCount == 0 + || context.getCurMemStoreUsed() < context.getCurMemStoreSize()*sufficientMemoryLevel; + boolean earlyBlockCacheSufficientCheck = evictCount == 0 || + context.getCurBlockCacheUsed() < context.getCurBlockCacheSize()*sufficientMemoryLevel; float newMemstoreSize; float newBlockCacheSize; - if (memstoreSufficient) { - // Increase the block cache size and corresponding decrease in memstore size - newBlockCacheSize = context.getCurBlockCacheSize() + step; - newMemstoreSize = context.getCurMemStoreSize() - step; - } else if (blockCacheSufficient) { - // Increase the memstore size and corresponding decrease in block cache size - newBlockCacheSize = context.getCurBlockCacheSize() - step; - newMemstoreSize = context.getCurMemStoreSize() + step; + if (earlyMemstoreSufficientCheck && earlyBlockCacheSufficientCheck) { + // Both memstore and block cache memory seems to be sufficient. No operation required. + newTuneDirection = StepDirection.NEUTRAL; + } else if (earlyMemstoreSufficientCheck) { + // Increase the block cache size and corresponding decrease in memstore size. + newTuneDirection = StepDirection.INCREASE_BLOCK_CACHE_SIZE; + } else if (earlyBlockCacheSufficientCheck) { + // Increase the memstore size and corresponding decrease in block cache size. + newTuneDirection = StepDirection.INCREASE_MEMSTORE_SIZE; } else { - return NO_OP_TUNER_RESULT; - // As of now not making any tuning in write/read heavy scenario. + // Early checks for sufficient memory failed. Tuning memory based on past statistics. + // Boolean indicator to show if we need to revert previous step or not. + boolean isReverting = false; + switch (prevTuneDirection) { + // Here we are using number of evictions rather than cache misses because it is more + // strong indicator for deficient cache size. Improving caching is what we + // would like to optimize for in steady state. + case INCREASE_BLOCK_CACHE_SIZE: + if ((double)evictCount > rollingStatsForEvictions.getMean() || + (double)totalFlushCount > rollingStatsForFlushes.getMean() + + rollingStatsForFlushes.getDeviation()/2.00) { + // Reverting previous step as it was not useful. + // Tuning failed to decrease evictions or tuning resulted in large number of flushes. + newTuneDirection = StepDirection.INCREASE_MEMSTORE_SIZE; + tunerLog += "Reverting previous tuning."; + if ((double)evictCount > rollingStatsForEvictions.getMean()) { + tunerLog += " As could not decrease evctions sufficiently."; + } else { + tunerLog += " As number of flushes rose significantly."; + } + isReverting = true; + } + break; + case INCREASE_MEMSTORE_SIZE: + if ((double)totalFlushCount > rollingStatsForFlushes.getMean() || + (double)evictCount > rollingStatsForEvictions.getMean() + + rollingStatsForEvictions.getDeviation()/2.00) { + // Reverting previous step as it was not useful. + // Tuning failed to decrease flushes or tuning resulted in large number of evictions. + newTuneDirection = StepDirection.INCREASE_BLOCK_CACHE_SIZE; + tunerLog += "Reverting previous tuning."; + if ((double)totalFlushCount > rollingStatsForFlushes.getMean()) { + tunerLog += " As could not decrease flushes sufficiently."; + } else { + tunerLog += " As number of evictions rose significantly."; + } + isReverting = true; + } + break; + default: + // Last step was neutral, revert doesn't not apply here. + break; + } + // If we are not reverting. We try to tune memory sizes by looking at cache misses / flushes. + if (!isReverting){ + // mean +- deviation/2 is considered to be normal + // below it its consider low and above it is considered high. + // We can safely assume that the number cache misses, flushes are normally distributed over + // past periods and hence on all the above mentioned classes (normal, high and low) + // are equally likely with 33% probability each. Hence there is very good probability that + // we will not always fall in default step. + if ((double)cacheMissCount < rollingStatsForCacheMisses.getMean() - + rollingStatsForCacheMisses.getDeviation()/2.00 && + (double)totalFlushCount < rollingStatsForFlushes.getMean() - + rollingStatsForFlushes.getDeviation()/2.00) { + // Everything is fine no tuning required + newTuneDirection = StepDirection.NEUTRAL; + } else if ((double)cacheMissCount > rollingStatsForCacheMisses.getMean() + + rollingStatsForCacheMisses.getDeviation()/2.00 && + (double)totalFlushCount < rollingStatsForFlushes.getMean() - + rollingStatsForFlushes.getDeviation()/2.00) { + // more misses , increasing cache size + newTuneDirection = StepDirection.INCREASE_BLOCK_CACHE_SIZE; + tunerLog += + "Increasing block cache size as observed increase in number of cache misses."; + } else if ((double)cacheMissCount < rollingStatsForCacheMisses.getMean() - + rollingStatsForCacheMisses.getDeviation()/2.00 && + (double)totalFlushCount > rollingStatsForFlushes.getMean() + + rollingStatsForFlushes.getDeviation()/2.00) { + // more flushes , increasing memstore size + newTuneDirection = StepDirection.INCREASE_MEMSTORE_SIZE; + tunerLog += "Increasing memstore size as observed increase in number of flushes."; + } else { + // Default. Not enough facts to do tuning. + newTuneDirection = StepDirection.NEUTRAL; + } + } + } + // Adjusting step size for tuning to get to steady state. + // Even if the step size was 4% and 32 GB memory size, we will be shifting 1 GB back and forth + // per tuner operation and it can affect the performance of cluster + if (prevTuneDirection == StepDirection.NEUTRAL && newTuneDirection != StepDirection.NEUTRAL) { + // Restarting the tuning from steady state. + step = maximumStepSize; + } else if (prevTuneDirection != newTuneDirection) { + // Decrease the step size to reach the steady state. Similar procedure as binary search. + step = step/2.00f; + if (step < minimumStepSize) { + // Ensure step size does not gets too small. + step = minimumStepSize; + } } + // Increase / decrease the memstore / block cahce sizes depending on new tuner step. + switch (newTuneDirection) { + case INCREASE_BLOCK_CACHE_SIZE: + newBlockCacheSize = context.getCurBlockCacheSize() + step; + newMemstoreSize = context.getCurMemStoreSize() - step; + break; + case INCREASE_MEMSTORE_SIZE: + newBlockCacheSize = context.getCurBlockCacheSize() - step; + newMemstoreSize = context.getCurMemStoreSize() + step; + break; + default: + prevTuneDirection = StepDirection.NEUTRAL; + return NO_OP_TUNER_RESULT; + } + // Check we are within max/min bounds. if (newMemstoreSize > globalMemStorePercentMaxRange) { newMemstoreSize = globalMemStorePercentMaxRange; } else if (newMemstoreSize < globalMemStorePercentMinRange) { @@ -93,6 +264,10 @@ public TunerResult tune(TunerContext context) { } TUNER_RESULT.setBlockCacheSize(newBlockCacheSize); TUNER_RESULT.setMemstoreSize(newMemstoreSize); + if (LOG.isDebugEnabled()) { + LOG.debug(tunerLog); + } + prevTuneDirection = newTuneDirection; return TUNER_RESULT; } @@ -104,7 +279,12 @@ public Configuration getConf() { @Override public void setConf(Configuration conf) { this.conf = conf; - this.step = conf.getFloat(STEP_KEY, DEFAULT_STEP_VALUE); + this.maximumStepSize = conf.getFloat(MAX_STEP_KEY, DEFAULT_MAX_STEP_VALUE); + this.minimumStepSize = conf.getFloat(MIN_STEP_KEY, DEFAULT_MIN_STEP_VALUE); + this.step = this.maximumStepSize; + this.sufficientMemoryLevel = conf.getFloat(SUFFICIENT_MEMORY_LEVEL_KEY, + DEFAULT_SUFFICIENT_MEMORY_LEVEL_VALUE); + this.tunerLookupPeriods = conf.getInt(LOOKUP_PERIODS_KEY, DEFAULT_LOOKUP_PERIODS); this.blockCachePercentMinRange = conf.getFloat(BLOCK_CACHE_SIZE_MIN_RANGE_KEY, conf.getFloat(HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT)); this.blockCachePercentMaxRange = conf.getFloat(BLOCK_CACHE_SIZE_MAX_RANGE_KEY, @@ -113,5 +293,19 @@ public void setConf(Configuration conf) { HeapMemorySizeUtil.getGlobalMemStorePercent(conf, false)); this.globalMemStorePercentMaxRange = conf.getFloat(MEMSTORE_SIZE_MAX_RANGE_KEY, HeapMemorySizeUtil.getGlobalMemStorePercent(conf, false)); + // Default value of periods to ignore is number of lookup periods + this.numPeriodsToIgnore = conf.getInt(NUM_PERIODS_TO_IGNORE, this.tunerLookupPeriods); + this.rollingStatsForCacheMisses = new RollingStatCalculator(this.tunerLookupPeriods); + this.rollingStatsForFlushes = new RollingStatCalculator(this.tunerLookupPeriods); + this.rollingStatsForEvictions = new RollingStatCalculator(this.tunerLookupPeriods); + } + + private enum StepDirection{ + // block cache size was increased + INCREASE_BLOCK_CACHE_SIZE, + // memstore size was increased + INCREASE_MEMSTORE_SIZE, + // no operation was performed + NEUTRAL } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index a08877debde8..ae739b380a38 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1380,7 +1380,8 @@ protected void handleReportForDutyResponse(final RegionServerStartupResponse c) } private void startHeapMemoryManager() { - this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this); + this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, + this, this.regionServerAccounting); if (this.hMemManager != null) { this.hMemManager.start(getChoreService()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java index 3deb2580c1cc..8f001a16621c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java @@ -76,6 +76,7 @@ public class HeapMemoryManager { private final ResizableBlockCache blockCache; private final FlushRequester memStoreFlusher; private final Server server; + private final RegionServerAccounting regionServerAccounting; private HeapMemoryTunerChore heapMemTunerChore = null; private final boolean tunerOn; @@ -85,21 +86,23 @@ public class HeapMemoryManager { private long maxHeapSize = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); public static HeapMemoryManager create(Configuration conf, FlushRequester memStoreFlusher, - Server server) { + Server server, RegionServerAccounting regionServerAccounting) { BlockCache blockCache = CacheConfig.instantiateBlockCache(conf); if (blockCache instanceof ResizableBlockCache) { - return new HeapMemoryManager((ResizableBlockCache) blockCache, memStoreFlusher, server); + return new HeapMemoryManager((ResizableBlockCache) blockCache, memStoreFlusher, server, + regionServerAccounting); } return null; } @VisibleForTesting HeapMemoryManager(ResizableBlockCache blockCache, FlushRequester memStoreFlusher, - Server server) { + Server server, RegionServerAccounting regionServerAccounting) { Configuration conf = server.getConfiguration(); this.blockCache = blockCache; this.memStoreFlusher = memStoreFlusher; this.server = server; + this.regionServerAccounting = regionServerAccounting; this.tunerOn = doInit(conf); this.defaultChorePeriod = conf.getInt(HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, HBASE_RS_HEAP_MEMORY_TUNER_DEFAULT_PERIOD); @@ -217,6 +220,7 @@ private class HeapMemoryTunerChore extends ScheduledChore implements FlushReques private AtomicLong blockedFlushCount = new AtomicLong(); private AtomicLong unblockedFlushCount = new AtomicLong(); private long evictCount = 0L; + private long cacheMissCount = 0L; private TunerContext tunerContext = new TunerContext(); private boolean alarming = false; @@ -264,11 +268,21 @@ protected void chore() { } private void tune() { - long curEvictCount = blockCache.getStats().getEvictedCount(); + // TODO check if we can increase the memory boundaries + // while remaining in the limits + long curEvictCount; + long curCacheMisCount; + curEvictCount = blockCache.getStats().getEvictedCount(); tunerContext.setEvictCount(curEvictCount - evictCount); evictCount = curEvictCount; + curCacheMisCount = blockCache.getStats().getMissCachingCount(); + tunerContext.setCacheMissCount(curCacheMisCount-cacheMissCount); + cacheMissCount = curCacheMisCount; tunerContext.setBlockedFlushCount(blockedFlushCount.getAndSet(0)); tunerContext.setUnblockedFlushCount(unblockedFlushCount.getAndSet(0)); + tunerContext.setCurBlockCacheUsed((float)blockCache.getCurrentSize() / maxHeapSize); + tunerContext.setCurMemStoreUsed( + (float)regionServerAccounting.getGlobalMemstoreSize() / maxHeapSize); tunerContext.setCurBlockCacheSize(blockCachePercent); tunerContext.setCurMemStoreSize(globalMemStorePercent); TunerResult result = null; @@ -321,6 +335,8 @@ private void tune() { globalMemStorePercent = memstoreSize; memStoreFlusher.setGlobalMemstoreLimit(newMemstoreSize); } + } else if (LOG.isDebugEnabled()) { + LOG.debug("No changes made by HeapMemoryTuner."); } } @@ -349,6 +365,9 @@ public static final class TunerContext { private long blockedFlushCount; private long unblockedFlushCount; private long evictCount; + private long cacheMissCount; + private float curBlockCacheUsed; + private float curMemStoreUsed; private float curMemStoreSize; private float curBlockCacheSize; @@ -391,6 +410,30 @@ public float getCurBlockCacheSize() { public void setCurBlockCacheSize(float curBlockCacheSize) { this.curBlockCacheSize = curBlockCacheSize; } + + public long getCacheMissCount() { + return cacheMissCount; + } + + public void setCacheMissCount(long cacheMissCount) { + this.cacheMissCount = cacheMissCount; + } + + public float getCurBlockCacheUsed() { + return curBlockCacheUsed; + } + + public void setCurBlockCacheUsed(float curBlockCacheUsed) { + this.curBlockCacheUsed = curBlockCacheUsed; + } + + public float getCurMemStoreUsed() { + return curMemStoreUsed; + } + + public void setCurMemStoreUsed(float d) { + this.curMemStoreUsed = d; + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RollingStatCalculator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RollingStatCalculator.java new file mode 100644 index 000000000000..554d6f51a719 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RollingStatCalculator.java @@ -0,0 +1,113 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +/** + * This class maintains mean and variation for any sequence of input provided to it. + * It is initialized with number of rolling periods which basically means the number of past + * inputs whose data will be considered to maintain mean and variation. + * It will use O(N) memory to maintain these statistics, where N is number of look up periods it + * was initialized with. + * If zero is passed during initialization then it will maintain mean and variance from the + * start. It will use O(1) memory only. But note that since it will maintain mean / variance + * from the start the statistics may behave like constants and may ignore short trends. + * All operations are O(1) except the initialization which is O(N). + */ +public class RollingStatCalculator { + private double currentSum; + private double currentSqrSum; + // Total number of data values whose statistic is currently present + private long numberOfDataValues; + private int rollingPeriod; + private int currentIndexPosition; + // to be used only if we have non-zero rolling period + private long [] dataValues; + + /** + * Creates a RollingStatCalculator with given number of rolling periods. + * @param rollingPeriod + */ + public RollingStatCalculator(int rollingPeriod) { + this.rollingPeriod = rollingPeriod; + this.dataValues = fillWithZeros(rollingPeriod); + this.currentSum = 0.0; + this.currentSqrSum = 0.0; + this.currentIndexPosition = 0; + this.numberOfDataValues = 0; + } + + /** + * Inserts given data value to array of data values to be considered for statistics calculation + * @param data + */ + public void insertDataValue(long data) { + // if current number of data points already equals rolling period and rolling period is + // non-zero then remove one data and update the statistics + if(numberOfDataValues >= rollingPeriod && rollingPeriod > 0) { + this.removeData(dataValues[currentIndexPosition]); + } + numberOfDataValues++; + currentSum = currentSum + (double)data; + currentSqrSum = currentSqrSum + ((double)data * data); + if (rollingPeriod >0) + { + dataValues[currentIndexPosition] = data; + currentIndexPosition = (currentIndexPosition + 1) % rollingPeriod; + } + } + + /** + * Update the statistics after removing the given data value + * @param data + */ + private void removeData(long data) { + currentSum = currentSum - (double)data; + currentSqrSum = currentSqrSum - ((double)data * data); + numberOfDataValues--; + } + + /** + * @return mean of the data values that are in the current list of data values + */ + public double getMean() { + return this.currentSum / (double)numberOfDataValues; + } + + /** + * @return deviation of the data values that are in the current list of data values + */ + public double getDeviation() { + double variance = (currentSqrSum - (currentSum*currentSum)/(double)(numberOfDataValues))/ + numberOfDataValues; + return Math.sqrt(variance); + } + + /** + * @param size + * @return an array of given size initialized with zeros + */ + private long [] fillWithZeros(int size) { + long [] zeros = new long [size]; + for (int i=0; i 0) { - assertTrue(expctedMinDelta <= (newHeapSpace - oldHeapSpace)); + assertTrue(expctedMinDelta*error <= (double)(newHeapSpace - oldHeapSpace)); + assertTrue(expctedMinDelta/error >= (double)(newHeapSpace - oldHeapSpace)); } else { - assertTrue(expctedMinDelta <= (oldHeapSpace - newHeapSpace)); + assertTrue(-expctedMinDelta*error <= (double)(oldHeapSpace - newHeapSpace)); + assertTrue(-expctedMinDelta/error >= (double)(oldHeapSpace - newHeapSpace)); } } private static class BlockCacheStub implements ResizableBlockCache { CacheStats stats = new CacheStats("test"); long maxSize = 0; - + private long testBlockSize = 0; + public BlockCacheStub(long size){ this.maxSize = size; } @@ -378,7 +508,7 @@ public long getFreeSize() { @Override public long getCurrentSize() { - return 0; + return this.testBlockSize; } @Override @@ -400,6 +530,10 @@ public Iterator iterator() { public BlockCache[] getBlockCaches() { return null; } + + public void setTestBlockSize(long testBlockSize) { + this.testBlockSize = testBlockSize; + } } private static class MemstoreFlusherStub implements FlushRequester { @@ -526,4 +660,15 @@ public TunerResult tune(TunerContext context) { return result; } } + + private static class RegionServerAccountingStub extends RegionServerAccounting { + private long testMemstoreSize = 0; + @Override + public long getGlobalMemstoreSize() { + return testMemstoreSize; + } + public void setTestMemstoreSize(long testMemstoreSize) { + this.testMemstoreSize = testMemstoreSize; + } + } }