diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java index 203857deb1f0..a5a880bc964b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java @@ -13,10 +13,12 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.collect.ImmutableMap; + import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.TableName; @@ -27,15 +29,10 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.ipc.RpcServer; -import org.apache.hadoop.hbase.metrics.Meter; -import org.apache.hadoop.hbase.metrics.Metric; import org.apache.hadoop.hbase.metrics.MetricRegistry; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.LossyCounting; -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableMap; - /** * A coprocessor that collects metrics from meta table. *

@@ -45,21 +42,20 @@ * * @see MetaTableMetrics */ - @InterfaceAudience.Private public class MetaTableMetrics extends BaseRegionObserver { - private Map> requestsMap; private MetricRegistry registry; private LossyCounting clientMetricsLossyCounting, regionMetricsLossyCounting; private boolean active = false; + private Set metrics = new HashSet(); enum MetaTableOps { GET, PUT, DELETE; } - private ImmutableMap opsNameMap = - ImmutableMap.builder().put(Put.class, MetaTableOps.PUT) + private ImmutableMap, MetaTableOps> opsNameMap = + ImmutableMap., MetaTableOps>builder().put(Put.class, MetaTableOps.PUT) .put(Get.class, MetaTableOps.GET).put(Delete.class, MetaTableOps.DELETE).build(); @Override @@ -91,66 +87,6 @@ private void registerAndMarkMetrics(ObserverContext optionalMetric = requestsMap.get(requestMeter); - if (optionalMetric != null && optionalMetric.isPresent()) { - Meter metric = (Meter) optionalMetric.get(); - metric.mark(); - } - } - - private void registerMeterIfNotPresent(String requestMeter) { - if (requestMeter.isEmpty()) { - return; - } - if (!requestsMap.containsKey(requestMeter)) { - registry.meter(requestMeter); - requestsMap.put(requestMeter, registry.get(requestMeter)); - } - } - - /** - * Registers and counts lossyCount for Meters that kept by lossy counting. - * By using lossy count to maintain meters, at most 7 / e meters will be kept (e is error rate) - * e.g. when e is 0.02 by default, at most 350 Clients request metrics will be kept - * also, all kept elements have frequency higher than e * N. (N is total count) - * - * @param requestMeter meter to be registered - * @param lossyCounting lossyCounting object for one type of meters. - */ - private void registerLossyCountingMeterIfNotPresent(String requestMeter, - LossyCounting lossyCounting) { - if (requestMeter.isEmpty()) { - return; - } - synchronized (lossyCounting) { - Set metersToBeRemoved = lossyCounting.addByOne(requestMeter); - - boolean isNewMeter = !requestsMap.containsKey(requestMeter); - boolean requestMeterRemoved = metersToBeRemoved.contains(requestMeter); - if (isNewMeter) { - if (requestMeterRemoved) { - // if the new metric is swept off by lossyCounting then don't add in the map - metersToBeRemoved.remove(requestMeter); - } else { - // else register the new metric and add in the map - registry.meter(requestMeter); - requestsMap.put(requestMeter, registry.get(requestMeter)); - } - } - - for (String meter : metersToBeRemoved) { - //cleanup requestsMap according to the swept data from lossy count; - requestsMap.remove(meter); - registry.remove(meter); - } - } - } - /** * Get table name from Ops such as: get, put, delete. * @@ -187,14 +123,14 @@ private boolean isMetaTableOp(ObserverContext e) { private void clientMetricRegisterAndMark() { // Mark client metric - String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : ""; + String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : null; if (clientIP == null || clientIP.isEmpty()) { return; } String clientRequestMeter = clientRequestMeterName(clientIP); - registerLossyCountingMeterIfNotPresent(clientRequestMeter, clientMetricsLossyCounting); - markMeterIfPresent(clientRequestMeter); + clientMetricsLossyCounting.add(clientRequestMeter); + registerAndMarkMeter(clientRequestMeter); } private void tableMetricRegisterAndMark(Row op) { @@ -204,7 +140,7 @@ private void tableMetricRegisterAndMark(Row op) { return; } String tableRequestMeter = tableMeterName(tableName); - registerAndMarkMeterIfNotPresent(tableRequestMeter); + registerAndMarkMeter(tableRequestMeter); } private void regionMetricRegisterAndMark(Row op) { @@ -214,8 +150,8 @@ private void regionMetricRegisterAndMark(Row op) { return; } String regionRequestMeter = regionMeterName(regionId); - registerLossyCountingMeterIfNotPresent(regionRequestMeter, regionMetricsLossyCounting); - markMeterIfPresent(regionRequestMeter); + regionMetricsLossyCounting.add(regionRequestMeter); + registerAndMarkMeter(regionRequestMeter); } private void opMetricRegisterAndMark(Row op) { @@ -224,7 +160,7 @@ private void opMetricRegisterAndMark(Row op) { if (opMeterName == null || opMeterName.isEmpty()) { return; } - registerAndMarkMeterIfNotPresent(opMeterName); + registerAndMarkMeter(opMeterName); } private void opWithClientMetricRegisterAndMark(Object op) { @@ -233,13 +169,17 @@ private void opWithClientMetricRegisterAndMark(Object op) { if (opWithClientMeterName == null || opWithClientMeterName.isEmpty()) { return; } - registerAndMarkMeterIfNotPresent(opWithClientMeterName); + registerAndMarkMeter(opWithClientMeterName); } - // Helper function to register and mark meter if not present - private void registerAndMarkMeterIfNotPresent(String name) { - registerMeterIfNotPresent(name); - markMeterIfPresent(name); + private void registerAndMarkMeter(String requestMeter) { + if (requestMeter.isEmpty()) { + return; + } + if (!registry.get(requestMeter).isPresent()){ + metrics.add(requestMeter); + } + registry.meter(requestMeter).mark(); } private String opWithClientMeterName(Object op) { @@ -311,9 +251,14 @@ public void start(CoprocessorEnvironment env) throws IOException { .equals(TableName.META_TABLE_NAME)) { RegionCoprocessorEnvironment regionCoprocessorEnv = (RegionCoprocessorEnvironment) env; registry = regionCoprocessorEnv.getMetricRegistryForRegionServer(); - requestsMap = new ConcurrentHashMap<>(); - clientMetricsLossyCounting = new LossyCounting("clientMetaMetrics"); - regionMetricsLossyCounting = new LossyCounting("regionMetaMetrics"); + LossyCounting.LossyCountingListener listener = new LossyCounting.LossyCountingListener(){ + @Override public void sweep(String key) { + registry.remove(key); + metrics.remove(key); + } + }; + clientMetricsLossyCounting = new LossyCounting(listener); + regionMetricsLossyCounting = new LossyCounting(listener); // only be active mode when this region holds meta table. active = true; } @@ -322,10 +267,8 @@ public void start(CoprocessorEnvironment env) throws IOException { @Override public void stop(CoprocessorEnvironment env) throws IOException { // since meta region can move around, clear stale metrics when stop. - if (requestsMap != null) { - for (String meterName : requestsMap.keySet()) { - registry.remove(meterName); - } + for (String metric:metrics){ + registry.remove(metric); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java index d3a66bed7c94..255f7206de3c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.util; -import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -27,9 +26,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * LossyCounting utility, bounded data structure that maintains approximate high frequency @@ -44,17 +40,17 @@ @InterfaceAudience.Private public class LossyCounting { - private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class); private long bucketSize; - private long currentTerm; - private double errorRate; + private int currentTerm; private Map data; private long totalDataCount; - private String name; + private LossyCountingListener listener; + + public interface LossyCountingListener { + void sweep(String key); + } - public LossyCounting(double errorRate, String name) { - this.errorRate = errorRate; - this.name = name; + public LossyCounting(double errorRate, LossyCountingListener listener) { if (errorRate < 0.0 || errorRate > 1.0) { throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]"); } @@ -62,51 +58,57 @@ public LossyCounting(double errorRate, String name) { this.currentTerm = 1; this.totalDataCount = 0; this.data = new ConcurrentHashMap<>(); + this.listener = listener; calculateCurrentTerm(); } - public LossyCounting(String name) { + public LossyCounting(LossyCountingListener listener) { this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02), - name); + listener); } - public Set addByOne(String key) { - if (!data.containsKey(key)) { - data.put(key, 0); + private void addByOne(String key) { + //If entry exists, we update the entry by incrementing its frequency by one. Otherwise, + //we create a new entry starting with currentTerm so that it will not be pruned immediately + Integer i = data.get(key); + if (i == null) { + i = currentTerm != 0 ? currentTerm - 1 : 0; } - data.put(key, data.get(key) + 1); + data.put(key, i + 1); + //update totalDataCount and term totalDataCount++; calculateCurrentTerm(); - Set dataToBeSwept = new HashSet<>(); + } + + public void add(String key) { + addByOne(key); if(totalDataCount % bucketSize == 0) { - dataToBeSwept = sweep(); + //sweep the entries at bucket boundaries + sweep(); } - return dataToBeSwept; } /** * sweep low frequency data * @return Names of elements got swept */ - private Set sweep() { - Set dataToBeSwept = new HashSet<>(); + private void sweep() { for(Map.Entry entry : data.entrySet()) { - if(entry.getValue() + errorRate < currentTerm) { - dataToBeSwept.add(entry.getKey()); + if(entry.getValue() < currentTerm) { + String metric = entry.getKey(); + data.remove(metric); + if (listener != null) { + listener.sweep(metric); + } } } - for(String key : dataToBeSwept) { - data.remove(key); - } - LOG.trace(String.format("%s swept %d elements.", name, dataToBeSwept.size())); - return dataToBeSwept; } /** * Calculate and set current term */ private void calculateCurrentTerm() { - this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / bucketSize); + this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / (double) bucketSize); } public long getBucketSize(){ @@ -121,6 +123,10 @@ public boolean contains(String key) { return data.containsKey(key); } + public Set getElements(){ + return data.keySet(); + } + public long getCurrentTerm() { return currentTerm; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java index 0f35d6082617..15987bc8c116 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java @@ -1,12 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable - * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" - * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License - * for the specific language governing permissions and limitations under the License. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.hadoop.hbase.coprocessor; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java index 0d41717fd261..de6949c51a4d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java @@ -33,18 +33,18 @@ public class TestLossyCounting { @Test public void testBucketSize() { - LossyCounting lossyCounting = new LossyCounting(0.01, "testBucketSize"); + LossyCounting lossyCounting = new LossyCounting(0.01, null); assertEquals(100L, lossyCounting.getBucketSize()); - LossyCounting lossyCounting2 = new LossyCounting("testBucketSize2"); + LossyCounting lossyCounting2 = new LossyCounting(null); assertEquals(50L, lossyCounting2.getBucketSize()); } @Test public void testAddByOne() { - LossyCounting lossyCounting = new LossyCounting(0.01, "testAddByOne"); + LossyCounting lossyCounting = new LossyCounting(0.01, null); for(int i = 0; i < 100; i++){ String key = "" + i; - lossyCounting.addByOne(key); + lossyCounting.add(key); } assertEquals(100L, lossyCounting.getDataSize()); for(int i = 0; i < 100; i++){ @@ -55,26 +55,27 @@ public void testAddByOne() { @Test public void testSweep1() { - LossyCounting lossyCounting = new LossyCounting(0.01, "testSweep1"); + LossyCounting lossyCounting = new LossyCounting(0.01, null); for(int i = 0; i < 400; i++){ String key = "" + i; - lossyCounting.addByOne(key); + lossyCounting.add(key); } assertEquals(4L, lossyCounting.getCurrentTerm()); - assertEquals(0L, lossyCounting.getDataSize()); + //if total rows added are proportional to bucket size + assertEquals(lossyCounting.getBucketSize() - 1, lossyCounting.getDataSize()); } @Test public void testSweep2() { - LossyCounting lossyCounting = new LossyCounting(0.1, "testSweep2"); + LossyCounting lossyCounting = new LossyCounting(0.1, null); for(int i = 0; i < 10; i++){ String key = "" + i; - lossyCounting.addByOne(key); + lossyCounting.add(key); } assertEquals(10L, lossyCounting.getDataSize()); for(int i = 0; i < 10; i++){ String key = "1"; - lossyCounting.addByOne(key); + lossyCounting.add(key); } assertEquals(1L, lossyCounting.getDataSize()); }