Skip to content

Commit

Permalink
HBASE-22804 Provide an API to get list of successful regions and tota…
Browse files Browse the repository at this point in the history
…l expected regions in Canary

(cherry picked from commit ac07609)

Change-Id: Ife60a36aae1f9872e4e55b72900d5bb0e4460e35
  • Loading branch information
Caroline Zhou authored and Jenkins committed Sep 16, 2019
1 parent 0ffb363 commit 354b651
Show file tree
Hide file tree
Showing 2 changed files with 191 additions and 1 deletion.
139 changes: 138 additions & 1 deletion hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.atomic.LongAdder;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.lang3.time.StopWatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
Expand Down Expand Up @@ -133,14 +134,20 @@ public interface Sink {
long incWriteFailureCount();
Map<String,String> getWriteFailures();
void updateWriteFailures(String regionName, String serverName);
long getReadSuccessCount();
long incReadSuccessCount();
long getWriteSuccessCount();
long incWriteSuccessCount();
}

/**
* Simple implementation of canary sink that allows plotting to a file or standard output.
*/
public static class StdOutSink implements Sink {
private AtomicLong readFailureCount = new AtomicLong(0),
writeFailureCount = new AtomicLong(0);
writeFailureCount = new AtomicLong(0),
readSuccessCount = new AtomicLong(0),
writeSuccessCount = new AtomicLong(0);
private Map<String, String> readFailures = new ConcurrentHashMap<>();
private Map<String, String> writeFailures = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -183,6 +190,26 @@ public Map<String, String> getWriteFailures() {
public void updateWriteFailures(String regionName, String serverName) {
writeFailures.put(regionName, serverName);
}

@Override
public long getReadSuccessCount() {
return readSuccessCount.get();
}

@Override
public long incReadSuccessCount() {
return readSuccessCount.incrementAndGet();
}

@Override
public long getWriteSuccessCount() {
return writeSuccessCount.get();
}

@Override
public long incWriteSuccessCount() {
return writeSuccessCount.incrementAndGet();
}
}

/**
Expand Down Expand Up @@ -219,6 +246,7 @@ public void publishReadTiming(String znode, String server, long msTime) {
public static class RegionStdOutSink extends StdOutSink {
private Map<String, LongAdder> perTableReadLatency = new HashMap<>();
private LongAdder writeLatency = new LongAdder();
private final Map<String, RegionTaskResult> regionMap = new ConcurrentHashMap<>();

public void publishReadFailure(ServerName serverName, RegionInfo region, Exception e) {
incReadFailureCount();
Expand All @@ -234,6 +262,10 @@ public void publishReadFailure(ServerName serverName, RegionInfo region,

public void publishReadTiming(ServerName serverName, RegionInfo region,
ColumnFamilyDescriptor column, long msTime) {
incReadSuccessCount();
RegionTaskResult res = this.regionMap.get(region.getRegionNameAsString());
res.setReadSuccess();
res.setReadLatency(msTime);
LOG.info("Read from {} on {} {} in {}ms", region.getRegionNameAsString(), serverName,
column.getNameAsString(), msTime);
}
Expand All @@ -252,6 +284,10 @@ public void publishWriteFailure(ServerName serverName, RegionInfo region,

public void publishWriteTiming(ServerName serverName, RegionInfo region,
ColumnFamilyDescriptor column, long msTime) {
incWriteSuccessCount();
RegionTaskResult res = this.regionMap.get(region.getRegionNameAsString());
res.setWriteSuccess();
res.setWriteLatency(msTime);
LOG.info("Write to {} on {} {} in {}ms",
region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime);
}
Expand All @@ -273,6 +309,14 @@ public void initializeWriteLatency() {
public LongAdder getWriteLatency() {
return this.writeLatency;
}

public Map<String, RegionTaskResult> getRegionMap() {
return this.regionMap;
}

public int getTotalExpectedRegions() {
return this.regionMap.size();
}
}

/**
Expand Down Expand Up @@ -931,6 +975,96 @@ Sink getSink(Configuration configuration, Class clazz) {
clazz, Sink.class));
}

/**
* Canary region mode-specific data structure which stores information about each region
* to be scanned
*/
public static class RegionTaskResult {
private RegionInfo region;
private TableName tableName;
private ServerName serverName;
private AtomicLong readLatency = null;
private AtomicLong writeLatency = null;
private boolean readSuccess = false;
private boolean writeSuccess = false;

public RegionTaskResult(RegionInfo region, TableName tableName, ServerName serverName) {
this.region = region;
this.tableName = tableName;
this.serverName = serverName;
}

public RegionInfo getRegionInfo() {
return this.region;
}

public String getRegionNameAsString() {
return this.region.getRegionNameAsString();
}

public TableName getTableName() {
return this.tableName;
}

public String getTableNameAsString() {
return this.tableName.getNameAsString();
}

public ServerName getServerName() {
return this.serverName;
}

public String getServerNameAsString() {
return this.serverName.getServerName();
}

public long getReadLatency() {
if (this.readLatency == null) {
return -1;
}
return this.readLatency.get();
}

public void setReadLatency(long readLatency) {
if (this.readLatency != null) {
this.readLatency.set(readLatency);
} else {
this.readLatency = new AtomicLong(readLatency);
}
}

public long getWriteLatency() {
if (this.writeLatency == null) {
return -1;
}
return this.writeLatency.get();
}

public void setWriteLatency(long writeLatency) {
if (this.writeLatency != null) {
this.writeLatency.set(writeLatency);
} else {
this.writeLatency = new AtomicLong(writeLatency);
}
}

public boolean isReadSuccess() {
return this.readSuccess;
}

public void setReadSuccess() {
this.readSuccess = true;
}

public boolean isWriteSuccess() {
return this.writeSuccess;
}

public void setWriteSuccess() {
this.writeSuccess = true;
}
}

/**
* A Factory method for {@link Monitor}.
* Makes a RegionServerMonitor, or a ZooKeeperMonitor, or a RegionMonitor.
Expand Down Expand Up @@ -1346,6 +1480,9 @@ private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
RegionInfo region = location.getRegion();
tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink)sink,
taskType, rawScanEnabled, rwLatency));
Map<String, RegionTaskResult> regionMap = ((RegionStdOutSink) sink).getRegionMap();
regionMap.put(region.getRegionNameAsString(), new RegionTaskResult(region,
region.getTable(), rs));
}
return executor.invokeAll(tasks);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
package org.apache.hadoop.hbase.tool;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
Expand All @@ -29,6 +32,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.hadoop.hbase.HBaseClassTestRule;
Expand Down Expand Up @@ -126,6 +130,55 @@ public void testBasicCanaryWorks() throws Exception {
isA(ColumnFamilyDescriptor.class), anyLong());
}

@Test
public void testCanaryRegionTaskResult() throws Exception {
TableName tableName = TableName.valueOf("testCanaryRegionTaskResult");
Table table = testingUtility.createTable(tableName, new byte[][] { FAMILY });
// insert some test rows
for (int i=0; i<1000; i++) {
byte[] iBytes = Bytes.toBytes(i);
Put p = new Put(iBytes);
p.addColumn(FAMILY, COLUMN, iBytes);
table.put(p);
}
ExecutorService executor = new ScheduledThreadPoolExecutor(1);
Canary.RegionStdOutSink sink = spy(new Canary.RegionStdOutSink());
Canary canary = new Canary(executor, sink);
String[] args = { "-writeSniffing", "-t", "10000", "testCanaryRegionTaskResult" };
assertEquals(0, ToolRunner.run(testingUtility.getConfiguration(), canary, args));

assertTrue("verify read success count > 0", sink.getReadSuccessCount() > 0);
assertTrue("verify write success count > 0", sink.getWriteSuccessCount() > 0);
verify(sink, atLeastOnce()).publishReadTiming(isA(ServerName.class), isA(RegionInfo.class),
isA(ColumnFamilyDescriptor.class), anyLong());
verify(sink, atLeastOnce()).publishWriteTiming(isA(ServerName.class), isA(RegionInfo.class),
isA(ColumnFamilyDescriptor.class), anyLong());

assertTrue("canary should expect to scan at least 1 region",
sink.getTotalExpectedRegions() > 0);
Map<String, Canary.RegionTaskResult> regionMap = sink.getRegionMap();
assertFalse("verify region map has size > 0", regionMap.isEmpty());

for (String regionName : regionMap.keySet()) {
Canary.RegionTaskResult res = regionMap.get(regionName);
assertNotNull("verify each expected region has a RegionTaskResult object in the map", res);
assertNotNull("verify getRegionNameAsString()", regionName);
assertNotNull("verify getRegionInfo()", res.getRegionInfo());
assertNotNull("verify getTableName()", res.getTableName());
assertNotNull("verify getTableNameAsString()", res.getTableNameAsString());
assertNotNull("verify getServerName()", res.getServerName());
assertNotNull("verify getServerNameAsString()", res.getServerNameAsString());

if (regionName.contains(Canary.DEFAULT_WRITE_TABLE_NAME.getNameAsString())) {
assertTrue("write to region " + regionName + " succeeded", res.isWriteSuccess());
assertTrue("write took some time", res.getWriteLatency() > -1);
} else {
assertTrue("read from region " + regionName + " succeeded", res.isReadSuccess());
assertTrue("read took some time", res.getReadLatency() > -1);
}
}
}

@Test
@Ignore("Intermittent argument matching failures, see HBASE-18813")
public void testReadTableTimeouts() throws Exception {
Expand Down

0 comments on commit 354b651

Please sign in to comment.