Skip to content

Commit

Permalink
HBASE-23110 Backport HBASE-23054 "Remove synchronization block from M…
Browse files Browse the repository at this point in the history
…etaTableMetrics and fix LossyCounting algorithm" to branch-1 (#683)

Additional fixes for reported findbugs and checkstyle warnings.
  • Loading branch information
apurtell committed Oct 7, 2019
1 parent 7cec975 commit 44a29d8
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import com.google.common.collect.ImmutableMap;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.TableName;
Expand All @@ -27,15 +29,10 @@
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.metrics.Meter;
import org.apache.hadoop.hbase.metrics.Metric;
import org.apache.hadoop.hbase.metrics.MetricRegistry;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.LossyCounting;

import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;

/**
* A coprocessor that collects metrics from meta table.
* <p>
Expand All @@ -45,21 +42,20 @@
*
* @see MetaTableMetrics
*/

@InterfaceAudience.Private
public class MetaTableMetrics extends BaseRegionObserver {

private Map<String, Optional<Metric>> requestsMap;
private MetricRegistry registry;
private LossyCounting clientMetricsLossyCounting, regionMetricsLossyCounting;
private boolean active = false;
private Set<String> metrics = new HashSet<String>();

enum MetaTableOps {
GET, PUT, DELETE;
}

private ImmutableMap<Class, MetaTableOps> opsNameMap =
ImmutableMap.<Class, MetaTableOps>builder().put(Put.class, MetaTableOps.PUT)
private ImmutableMap<Class<?>, MetaTableOps> opsNameMap =
ImmutableMap.<Class<?>, MetaTableOps>builder().put(Put.class, MetaTableOps.PUT)
.put(Get.class, MetaTableOps.GET).put(Delete.class, MetaTableOps.DELETE).build();

@Override
Expand Down Expand Up @@ -91,66 +87,6 @@ private void registerAndMarkMetrics(ObserverContext<RegionCoprocessorEnvironment
opWithClientMetricRegisterAndMark(row);
}

private void markMeterIfPresent(String requestMeter) {
if (requestMeter.isEmpty()) {
return;
}

Optional<Metric> optionalMetric = requestsMap.get(requestMeter);
if (optionalMetric != null && optionalMetric.isPresent()) {
Meter metric = (Meter) optionalMetric.get();
metric.mark();
}
}

private void registerMeterIfNotPresent(String requestMeter) {
if (requestMeter.isEmpty()) {
return;
}
if (!requestsMap.containsKey(requestMeter)) {
registry.meter(requestMeter);
requestsMap.put(requestMeter, registry.get(requestMeter));
}
}

/**
* Registers and counts lossyCount for Meters that kept by lossy counting.
* By using lossy count to maintain meters, at most 7 / e meters will be kept (e is error rate)
* e.g. when e is 0.02 by default, at most 350 Clients request metrics will be kept
* also, all kept elements have frequency higher than e * N. (N is total count)
*
* @param requestMeter meter to be registered
* @param lossyCounting lossyCounting object for one type of meters.
*/
private void registerLossyCountingMeterIfNotPresent(String requestMeter,
LossyCounting lossyCounting) {
if (requestMeter.isEmpty()) {
return;
}
synchronized (lossyCounting) {
Set<String> metersToBeRemoved = lossyCounting.addByOne(requestMeter);

boolean isNewMeter = !requestsMap.containsKey(requestMeter);
boolean requestMeterRemoved = metersToBeRemoved.contains(requestMeter);
if (isNewMeter) {
if (requestMeterRemoved) {
// if the new metric is swept off by lossyCounting then don't add in the map
metersToBeRemoved.remove(requestMeter);
} else {
// else register the new metric and add in the map
registry.meter(requestMeter);
requestsMap.put(requestMeter, registry.get(requestMeter));
}
}

for (String meter : metersToBeRemoved) {
//cleanup requestsMap according to the swept data from lossy count;
requestsMap.remove(meter);
registry.remove(meter);
}
}
}

/**
* Get table name from Ops such as: get, put, delete.
*
Expand Down Expand Up @@ -187,14 +123,14 @@ private boolean isMetaTableOp(ObserverContext<RegionCoprocessorEnvironment> e) {

private void clientMetricRegisterAndMark() {
// Mark client metric
String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : "";
String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : null;
if (clientIP == null || clientIP.isEmpty()) {
return;
}

String clientRequestMeter = clientRequestMeterName(clientIP);
registerLossyCountingMeterIfNotPresent(clientRequestMeter, clientMetricsLossyCounting);
markMeterIfPresent(clientRequestMeter);
clientMetricsLossyCounting.add(clientRequestMeter);
registerAndMarkMeter(clientRequestMeter);
}

private void tableMetricRegisterAndMark(Row op) {
Expand All @@ -204,7 +140,7 @@ private void tableMetricRegisterAndMark(Row op) {
return;
}
String tableRequestMeter = tableMeterName(tableName);
registerAndMarkMeterIfNotPresent(tableRequestMeter);
registerAndMarkMeter(tableRequestMeter);
}

private void regionMetricRegisterAndMark(Row op) {
Expand All @@ -214,8 +150,8 @@ private void regionMetricRegisterAndMark(Row op) {
return;
}
String regionRequestMeter = regionMeterName(regionId);
registerLossyCountingMeterIfNotPresent(regionRequestMeter, regionMetricsLossyCounting);
markMeterIfPresent(regionRequestMeter);
regionMetricsLossyCounting.add(regionRequestMeter);
registerAndMarkMeter(regionRequestMeter);
}

private void opMetricRegisterAndMark(Row op) {
Expand All @@ -224,7 +160,7 @@ private void opMetricRegisterAndMark(Row op) {
if (opMeterName == null || opMeterName.isEmpty()) {
return;
}
registerAndMarkMeterIfNotPresent(opMeterName);
registerAndMarkMeter(opMeterName);
}

private void opWithClientMetricRegisterAndMark(Object op) {
Expand All @@ -233,13 +169,17 @@ private void opWithClientMetricRegisterAndMark(Object op) {
if (opWithClientMeterName == null || opWithClientMeterName.isEmpty()) {
return;
}
registerAndMarkMeterIfNotPresent(opWithClientMeterName);
registerAndMarkMeter(opWithClientMeterName);
}

// Helper function to register and mark meter if not present
private void registerAndMarkMeterIfNotPresent(String name) {
registerMeterIfNotPresent(name);
markMeterIfPresent(name);
private void registerAndMarkMeter(String requestMeter) {
if (requestMeter.isEmpty()) {
return;
}
if (!registry.get(requestMeter).isPresent()){
metrics.add(requestMeter);
}
registry.meter(requestMeter).mark();
}

private String opWithClientMeterName(Object op) {
Expand Down Expand Up @@ -311,9 +251,14 @@ public void start(CoprocessorEnvironment env) throws IOException {
.equals(TableName.META_TABLE_NAME)) {
RegionCoprocessorEnvironment regionCoprocessorEnv = (RegionCoprocessorEnvironment) env;
registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
requestsMap = new ConcurrentHashMap<>();
clientMetricsLossyCounting = new LossyCounting("clientMetaMetrics");
regionMetricsLossyCounting = new LossyCounting("regionMetaMetrics");
LossyCounting.LossyCountingListener listener = new LossyCounting.LossyCountingListener(){
@Override public void sweep(String key) {
registry.remove(key);
metrics.remove(key);
}
};
clientMetricsLossyCounting = new LossyCounting(listener);
regionMetricsLossyCounting = new LossyCounting(listener);
// only be active mode when this region holds meta table.
active = true;
}
Expand All @@ -322,10 +267,8 @@ public void start(CoprocessorEnvironment env) throws IOException {
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
// since meta region can move around, clear stale metrics when stop.
if (requestsMap != null) {
for (String meterName : requestsMap.keySet()) {
registry.remove(meterName);
}
for (String metric:metrics){
registry.remove(metric);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,13 @@

package org.apache.hadoop.hbase.util;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* LossyCounting utility, bounded data structure that maintains approximate high frequency
Expand All @@ -44,69 +40,75 @@

@InterfaceAudience.Private
public class LossyCounting {
private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class);
private long bucketSize;
private long currentTerm;
private double errorRate;
private int currentTerm;
private Map<String, Integer> data;
private long totalDataCount;
private String name;
private LossyCountingListener listener;

public interface LossyCountingListener {
void sweep(String key);
}

public LossyCounting(double errorRate, String name) {
this.errorRate = errorRate;
this.name = name;
public LossyCounting(double errorRate, LossyCountingListener listener) {
if (errorRate < 0.0 || errorRate > 1.0) {
throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]");
}
this.bucketSize = (long) Math.ceil(1 / errorRate);
this.currentTerm = 1;
this.totalDataCount = 0;
this.data = new ConcurrentHashMap<>();
this.listener = listener;
calculateCurrentTerm();
}

public LossyCounting(String name) {
public LossyCounting(LossyCountingListener listener) {
this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02),
name);
listener);
}

public Set<String> addByOne(String key) {
if (!data.containsKey(key)) {
data.put(key, 0);
private void addByOne(String key) {
//If entry exists, we update the entry by incrementing its frequency by one. Otherwise,
//we create a new entry starting with currentTerm so that it will not be pruned immediately
Integer i = data.get(key);
if (i == null) {
i = currentTerm != 0 ? currentTerm - 1 : 0;
}
data.put(key, data.get(key) + 1);
data.put(key, i + 1);
//update totalDataCount and term
totalDataCount++;
calculateCurrentTerm();
Set<String> dataToBeSwept = new HashSet<>();
}

public void add(String key) {
addByOne(key);
if(totalDataCount % bucketSize == 0) {
dataToBeSwept = sweep();
//sweep the entries at bucket boundaries
sweep();
}
return dataToBeSwept;
}

/**
* sweep low frequency data
* @return Names of elements got swept
*/
private Set<String> sweep() {
Set<String> dataToBeSwept = new HashSet<>();
private void sweep() {
for(Map.Entry<String, Integer> entry : data.entrySet()) {
if(entry.getValue() + errorRate < currentTerm) {
dataToBeSwept.add(entry.getKey());
if(entry.getValue() < currentTerm) {
String metric = entry.getKey();
data.remove(metric);
if (listener != null) {
listener.sweep(metric);
}
}
}
for(String key : dataToBeSwept) {
data.remove(key);
}
LOG.trace(String.format("%s swept %d elements.", name, dataToBeSwept.size()));
return dataToBeSwept;
}

/**
* Calculate and set current term
*/
private void calculateCurrentTerm() {
this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / bucketSize);
this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / (double) bucketSize);
}

public long getBucketSize(){
Expand All @@ -121,6 +123,10 @@ public boolean contains(String key) {
return data.containsKey(key);
}

public Set<String> getElements(){
return data.keySet();
}

public long getCurrentTerm() {
return currentTerm;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.coprocessor;
Expand Down
Loading

0 comments on commit 44a29d8

Please sign in to comment.